xref: /openbmc/qemu/migration/rdma.c (revision e0c72452)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "exec/target_page.h"
21 #include "rdma.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "qemu-file.h"
25 #include "ram.h"
26 #include "qemu/error-report.h"
27 #include "qemu/main-loop.h"
28 #include "qemu/module.h"
29 #include "qemu/rcu.h"
30 #include "qemu/sockets.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/coroutine.h"
33 #include "exec/memory.h"
34 #include <sys/socket.h>
35 #include <netdb.h>
36 #include <arpa/inet.h>
37 #include <rdma/rdma_cma.h>
38 #include "trace.h"
39 #include "qom/object.h"
40 #include "options.h"
41 #include <poll.h>
42 
43 #define RDMA_RESOLVE_TIMEOUT_MS 10000
44 
45 /* Do not merge data if larger than this. */
46 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
48 
49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
50 
51 /*
52  * This is only for non-live state being migrated.
53  * Instead of RDMA_WRITE messages, we use RDMA_SEND
54  * messages for that state, which requires a different
55  * delivery design than main memory.
56  */
57 #define RDMA_SEND_INCREMENT 32768
58 
59 /*
60  * Maximum size infiniband SEND message
61  */
62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
64 
65 #define RDMA_CONTROL_VERSION_CURRENT 1
66 /*
67  * Capabilities for negotiation.
68  */
69 #define RDMA_CAPABILITY_PIN_ALL 0x01
70 
71 /*
72  * Add the other flags above to this list of known capabilities
73  * as they are introduced.
74  */
75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
76 
77 /*
78  * A work request ID is 64-bits and we split up these bits
79  * into 3 parts:
80  *
81  * bits 0-15 : type of control message, 2^16
82  * bits 16-29: ram block index, 2^14
83  * bits 30-63: ram block chunk number, 2^34
84  *
85  * The last two bit ranges are only used for RDMA writes,
86  * in order to track their completion and potentially
87  * also track unregistration status of the message.
88  */
89 #define RDMA_WRID_TYPE_SHIFT  0UL
90 #define RDMA_WRID_BLOCK_SHIFT 16UL
91 #define RDMA_WRID_CHUNK_SHIFT 30UL
92 
93 #define RDMA_WRID_TYPE_MASK \
94     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
95 
96 #define RDMA_WRID_BLOCK_MASK \
97     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
98 
99 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
100 
101 /*
102  * RDMA migration protocol:
103  * 1. RDMA Writes (data messages, i.e. RAM)
104  * 2. IB Send/Recv (control channel messages)
105  */
106 enum {
107     RDMA_WRID_NONE = 0,
108     RDMA_WRID_RDMA_WRITE = 1,
109     RDMA_WRID_SEND_CONTROL = 2000,
110     RDMA_WRID_RECV_CONTROL = 4000,
111 };
112 
113 /*
114  * Work request IDs for IB SEND messages only (not RDMA writes).
115  * This is used by the migration protocol to transmit
116  * control messages (such as device state and registration commands)
117  *
118  * We could use more WRs, but we have enough for now.
119  */
120 enum {
121     RDMA_WRID_READY = 0,
122     RDMA_WRID_DATA,
123     RDMA_WRID_CONTROL,
124     RDMA_WRID_MAX,
125 };
126 
127 /*
128  * SEND/RECV IB Control Messages.
129  */
130 enum {
131     RDMA_CONTROL_NONE = 0,
132     RDMA_CONTROL_ERROR,
133     RDMA_CONTROL_READY,               /* ready to receive */
134     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
135     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
136     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
137     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
138     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
139     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
140     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
141     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
142     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
143 };
144 
145 
146 /*
147  * Memory and MR structures used to represent an IB Send/Recv work request.
148  * This is *not* used for RDMA writes, only IB Send/Recv.
149  */
150 typedef struct {
151     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
152     struct   ibv_mr *control_mr;               /* registration metadata */
153     size_t   control_len;                      /* length of the message */
154     uint8_t *control_curr;                     /* start of unconsumed bytes */
155 } RDMAWorkRequestData;
156 
157 /*
158  * Negotiate RDMA capabilities during connection-setup time.
159  */
160 typedef struct {
161     uint32_t version;
162     uint32_t flags;
163 } RDMACapabilities;
164 
165 static void caps_to_network(RDMACapabilities *cap)
166 {
167     cap->version = htonl(cap->version);
168     cap->flags = htonl(cap->flags);
169 }
170 
171 static void network_to_caps(RDMACapabilities *cap)
172 {
173     cap->version = ntohl(cap->version);
174     cap->flags = ntohl(cap->flags);
175 }
176 
177 /*
178  * Representation of a RAMBlock from an RDMA perspective.
179  * This is not transmitted, only local.
180  * This and subsequent structures cannot be linked lists
181  * because we're using a single IB message to transmit
182  * the information. It's small anyway, so a list is overkill.
183  */
184 typedef struct RDMALocalBlock {
185     char          *block_name;
186     uint8_t       *local_host_addr; /* local virtual address */
187     uint64_t       remote_host_addr; /* remote virtual address */
188     uint64_t       offset;
189     uint64_t       length;
190     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
191     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
192     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
193     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
194     int            index;           /* which block are we */
195     unsigned int   src_index;       /* (Only used on dest) */
196     bool           is_ram_block;
197     int            nb_chunks;
198     unsigned long *transit_bitmap;
199     unsigned long *unregister_bitmap;
200 } RDMALocalBlock;
201 
202 /*
203  * Also represents a RAMblock, but only on the dest.
204  * This gets transmitted by the dest during connection-time
205  * to the source VM and then is used to populate the
206  * corresponding RDMALocalBlock with
207  * the information needed to perform the actual RDMA.
208  */
209 typedef struct QEMU_PACKED RDMADestBlock {
210     uint64_t remote_host_addr;
211     uint64_t offset;
212     uint64_t length;
213     uint32_t remote_rkey;
214     uint32_t padding;
215 } RDMADestBlock;
216 
217 static const char *control_desc(unsigned int rdma_control)
218 {
219     static const char *strs[] = {
220         [RDMA_CONTROL_NONE] = "NONE",
221         [RDMA_CONTROL_ERROR] = "ERROR",
222         [RDMA_CONTROL_READY] = "READY",
223         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
224         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
225         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
226         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
227         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
228         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
229         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
230         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
231         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
232     };
233 
234     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
235         return "??BAD CONTROL VALUE??";
236     }
237 
238     return strs[rdma_control];
239 }
240 
241 static uint64_t htonll(uint64_t v)
242 {
243     union { uint32_t lv[2]; uint64_t llv; } u;
244     u.lv[0] = htonl(v >> 32);
245     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
246     return u.llv;
247 }
248 
249 static uint64_t ntohll(uint64_t v)
250 {
251     union { uint32_t lv[2]; uint64_t llv; } u;
252     u.llv = v;
253     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
254 }
255 
256 static void dest_block_to_network(RDMADestBlock *db)
257 {
258     db->remote_host_addr = htonll(db->remote_host_addr);
259     db->offset = htonll(db->offset);
260     db->length = htonll(db->length);
261     db->remote_rkey = htonl(db->remote_rkey);
262 }
263 
264 static void network_to_dest_block(RDMADestBlock *db)
265 {
266     db->remote_host_addr = ntohll(db->remote_host_addr);
267     db->offset = ntohll(db->offset);
268     db->length = ntohll(db->length);
269     db->remote_rkey = ntohl(db->remote_rkey);
270 }
271 
272 /*
273  * Virtual address of the above structures used for transmitting
274  * the RAMBlock descriptions at connection-time.
275  * This structure is *not* transmitted.
276  */
277 typedef struct RDMALocalBlocks {
278     int nb_blocks;
279     bool     init;             /* main memory init complete */
280     RDMALocalBlock *block;
281 } RDMALocalBlocks;
282 
283 /*
284  * Main data structure for RDMA state.
285  * While there is only one copy of this structure being allocated right now,
286  * this is the place where one would start if you wanted to consider
287  * having more than one RDMA connection open at the same time.
288  */
289 typedef struct RDMAContext {
290     char *host;
291     int port;
292     char *host_port;
293 
294     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
295 
296     /*
297      * This is used by *_exchange_send() to figure out whether or not
298      * the initial "READY" message has already been received or not.
299      * This is because other functions may potentially poll() and detect
300      * the READY message before send() does, in which case we need to
301      * know if it completed.
302      */
303     int control_ready_expected;
304 
305     /* number of outstanding writes */
306     int nb_sent;
307 
308     /* store info about current buffer so that we can
309        merge it with future sends */
310     uint64_t current_addr;
311     uint64_t current_length;
312     /* index of ram block the current buffer belongs to */
313     int current_index;
314     /* index of the chunk in the current ram block */
315     int current_chunk;
316 
317     bool pin_all;
318 
319     /*
320      * infiniband-specific variables for opening the device
321      * and maintaining connection state and so forth.
322      *
323      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
324      * cm_id->verbs, cm_id->channel, and cm_id->qp.
325      */
326     struct rdma_cm_id *cm_id;               /* connection manager ID */
327     struct rdma_cm_id *listen_id;
328     bool connected;
329 
330     struct ibv_context          *verbs;
331     struct rdma_event_channel   *channel;
332     struct ibv_qp *qp;                      /* queue pair */
333     struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
334     struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
335     struct ibv_pd *pd;                      /* protection domain */
336     struct ibv_cq *recv_cq;                 /* recvieve completion queue */
337     struct ibv_cq *send_cq;                 /* send completion queue */
338 
339     /*
340      * If a previous write failed (perhaps because of a failed
341      * memory registration, then do not attempt any future work
342      * and remember the error state.
343      */
344     bool errored;
345     bool error_reported;
346     bool received_error;
347 
348     /*
349      * Description of ram blocks used throughout the code.
350      */
351     RDMALocalBlocks local_ram_blocks;
352     RDMADestBlock  *dest_blocks;
353 
354     /* Index of the next RAMBlock received during block registration */
355     unsigned int    next_src_index;
356 
357     /*
358      * Migration on *destination* started.
359      * Then use coroutine yield function.
360      * Source runs in a thread, so we don't care.
361      */
362     int migration_started_on_destination;
363 
364     int total_registrations;
365     int total_writes;
366 
367     int unregister_current, unregister_next;
368     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
369 
370     GHashTable *blockmap;
371 
372     /* the RDMAContext for return path */
373     struct RDMAContext *return_path;
374     bool is_return_path;
375 } RDMAContext;
376 
377 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
378 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
379 
380 
381 
382 struct QIOChannelRDMA {
383     QIOChannel parent;
384     RDMAContext *rdmain;
385     RDMAContext *rdmaout;
386     QEMUFile *file;
387     bool blocking; /* XXX we don't actually honour this yet */
388 };
389 
390 /*
391  * Main structure for IB Send/Recv control messages.
392  * This gets prepended at the beginning of every Send/Recv.
393  */
394 typedef struct QEMU_PACKED {
395     uint32_t len;     /* Total length of data portion */
396     uint32_t type;    /* which control command to perform */
397     uint32_t repeat;  /* number of commands in data portion of same type */
398     uint32_t padding;
399 } RDMAControlHeader;
400 
401 static void control_to_network(RDMAControlHeader *control)
402 {
403     control->type = htonl(control->type);
404     control->len = htonl(control->len);
405     control->repeat = htonl(control->repeat);
406 }
407 
408 static void network_to_control(RDMAControlHeader *control)
409 {
410     control->type = ntohl(control->type);
411     control->len = ntohl(control->len);
412     control->repeat = ntohl(control->repeat);
413 }
414 
415 /*
416  * Register a single Chunk.
417  * Information sent by the source VM to inform the dest
418  * to register an single chunk of memory before we can perform
419  * the actual RDMA operation.
420  */
421 typedef struct QEMU_PACKED {
422     union QEMU_PACKED {
423         uint64_t current_addr;  /* offset into the ram_addr_t space */
424         uint64_t chunk;         /* chunk to lookup if unregistering */
425     } key;
426     uint32_t current_index; /* which ramblock the chunk belongs to */
427     uint32_t padding;
428     uint64_t chunks;            /* how many sequential chunks to register */
429 } RDMARegister;
430 
431 static bool rdma_errored(RDMAContext *rdma)
432 {
433     if (rdma->errored && !rdma->error_reported) {
434         error_report("RDMA is in an error state waiting migration"
435                      " to abort!");
436         rdma->error_reported = true;
437     }
438     return rdma->errored;
439 }
440 
441 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
442 {
443     RDMALocalBlock *local_block;
444     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
445 
446     if (local_block->is_ram_block) {
447         /*
448          * current_addr as passed in is an address in the local ram_addr_t
449          * space, we need to translate this for the destination
450          */
451         reg->key.current_addr -= local_block->offset;
452         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
453     }
454     reg->key.current_addr = htonll(reg->key.current_addr);
455     reg->current_index = htonl(reg->current_index);
456     reg->chunks = htonll(reg->chunks);
457 }
458 
459 static void network_to_register(RDMARegister *reg)
460 {
461     reg->key.current_addr = ntohll(reg->key.current_addr);
462     reg->current_index = ntohl(reg->current_index);
463     reg->chunks = ntohll(reg->chunks);
464 }
465 
466 typedef struct QEMU_PACKED {
467     uint32_t value;     /* if zero, we will madvise() */
468     uint32_t block_idx; /* which ram block index */
469     uint64_t offset;    /* Address in remote ram_addr_t space */
470     uint64_t length;    /* length of the chunk */
471 } RDMACompress;
472 
473 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
474 {
475     comp->value = htonl(comp->value);
476     /*
477      * comp->offset as passed in is an address in the local ram_addr_t
478      * space, we need to translate this for the destination
479      */
480     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
481     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
482     comp->block_idx = htonl(comp->block_idx);
483     comp->offset = htonll(comp->offset);
484     comp->length = htonll(comp->length);
485 }
486 
487 static void network_to_compress(RDMACompress *comp)
488 {
489     comp->value = ntohl(comp->value);
490     comp->block_idx = ntohl(comp->block_idx);
491     comp->offset = ntohll(comp->offset);
492     comp->length = ntohll(comp->length);
493 }
494 
495 /*
496  * The result of the dest's memory registration produces an "rkey"
497  * which the source VM must reference in order to perform
498  * the RDMA operation.
499  */
500 typedef struct QEMU_PACKED {
501     uint32_t rkey;
502     uint32_t padding;
503     uint64_t host_addr;
504 } RDMARegisterResult;
505 
506 static void result_to_network(RDMARegisterResult *result)
507 {
508     result->rkey = htonl(result->rkey);
509     result->host_addr = htonll(result->host_addr);
510 };
511 
512 static void network_to_result(RDMARegisterResult *result)
513 {
514     result->rkey = ntohl(result->rkey);
515     result->host_addr = ntohll(result->host_addr);
516 };
517 
518 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
519                                    uint8_t *data, RDMAControlHeader *resp,
520                                    int *resp_idx,
521                                    int (*callback)(RDMAContext *rdma,
522                                                    Error **errp),
523                                    Error **errp);
524 
525 static inline uint64_t ram_chunk_index(const uint8_t *start,
526                                        const uint8_t *host)
527 {
528     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
529 }
530 
531 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
532                                        uint64_t i)
533 {
534     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
535                                   (i << RDMA_REG_CHUNK_SHIFT));
536 }
537 
538 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
539                                      uint64_t i)
540 {
541     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
542                                          (1UL << RDMA_REG_CHUNK_SHIFT);
543 
544     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
545         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
546     }
547 
548     return result;
549 }
550 
551 static void rdma_add_block(RDMAContext *rdma, const char *block_name,
552                            void *host_addr,
553                            ram_addr_t block_offset, uint64_t length)
554 {
555     RDMALocalBlocks *local = &rdma->local_ram_blocks;
556     RDMALocalBlock *block;
557     RDMALocalBlock *old = local->block;
558 
559     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
560 
561     if (local->nb_blocks) {
562         if (rdma->blockmap) {
563             for (int x = 0; x < local->nb_blocks; x++) {
564                 g_hash_table_remove(rdma->blockmap,
565                                     (void *)(uintptr_t)old[x].offset);
566                 g_hash_table_insert(rdma->blockmap,
567                                     (void *)(uintptr_t)old[x].offset,
568                                     &local->block[x]);
569             }
570         }
571         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
572         g_free(old);
573     }
574 
575     block = &local->block[local->nb_blocks];
576 
577     block->block_name = g_strdup(block_name);
578     block->local_host_addr = host_addr;
579     block->offset = block_offset;
580     block->length = length;
581     block->index = local->nb_blocks;
582     block->src_index = ~0U; /* Filled in by the receipt of the block list */
583     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
584     block->transit_bitmap = bitmap_new(block->nb_chunks);
585     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
586     block->unregister_bitmap = bitmap_new(block->nb_chunks);
587     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
588     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
589 
590     block->is_ram_block = local->init ? false : true;
591 
592     if (rdma->blockmap) {
593         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
594     }
595 
596     trace_rdma_add_block(block_name, local->nb_blocks,
597                          (uintptr_t) block->local_host_addr,
598                          block->offset, block->length,
599                          (uintptr_t) (block->local_host_addr + block->length),
600                          BITS_TO_LONGS(block->nb_chunks) *
601                              sizeof(unsigned long) * 8,
602                          block->nb_chunks);
603 
604     local->nb_blocks++;
605 }
606 
607 /*
608  * Memory regions need to be registered with the device and queue pairs setup
609  * in advanced before the migration starts. This tells us where the RAM blocks
610  * are so that we can register them individually.
611  */
612 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
613 {
614     const char *block_name = qemu_ram_get_idstr(rb);
615     void *host_addr = qemu_ram_get_host_addr(rb);
616     ram_addr_t block_offset = qemu_ram_get_offset(rb);
617     ram_addr_t length = qemu_ram_get_used_length(rb);
618     rdma_add_block(opaque, block_name, host_addr, block_offset, length);
619     return 0;
620 }
621 
622 /*
623  * Identify the RAMBlocks and their quantity. They will be references to
624  * identify chunk boundaries inside each RAMBlock and also be referenced
625  * during dynamic page registration.
626  */
627 static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
628 {
629     RDMALocalBlocks *local = &rdma->local_ram_blocks;
630     int ret;
631 
632     assert(rdma->blockmap == NULL);
633     memset(local, 0, sizeof *local);
634     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
635     assert(!ret);
636     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
637     rdma->dest_blocks = g_new0(RDMADestBlock,
638                                rdma->local_ram_blocks.nb_blocks);
639     local->init = true;
640 }
641 
642 /*
643  * Note: If used outside of cleanup, the caller must ensure that the destination
644  * block structures are also updated
645  */
646 static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
647 {
648     RDMALocalBlocks *local = &rdma->local_ram_blocks;
649     RDMALocalBlock *old = local->block;
650 
651     if (rdma->blockmap) {
652         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
653     }
654     if (block->pmr) {
655         for (int j = 0; j < block->nb_chunks; j++) {
656             if (!block->pmr[j]) {
657                 continue;
658             }
659             ibv_dereg_mr(block->pmr[j]);
660             rdma->total_registrations--;
661         }
662         g_free(block->pmr);
663         block->pmr = NULL;
664     }
665 
666     if (block->mr) {
667         ibv_dereg_mr(block->mr);
668         rdma->total_registrations--;
669         block->mr = NULL;
670     }
671 
672     g_free(block->transit_bitmap);
673     block->transit_bitmap = NULL;
674 
675     g_free(block->unregister_bitmap);
676     block->unregister_bitmap = NULL;
677 
678     g_free(block->remote_keys);
679     block->remote_keys = NULL;
680 
681     g_free(block->block_name);
682     block->block_name = NULL;
683 
684     if (rdma->blockmap) {
685         for (int x = 0; x < local->nb_blocks; x++) {
686             g_hash_table_remove(rdma->blockmap,
687                                 (void *)(uintptr_t)old[x].offset);
688         }
689     }
690 
691     if (local->nb_blocks > 1) {
692 
693         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
694 
695         if (block->index) {
696             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
697         }
698 
699         if (block->index < (local->nb_blocks - 1)) {
700             memcpy(local->block + block->index, old + (block->index + 1),
701                 sizeof(RDMALocalBlock) *
702                     (local->nb_blocks - (block->index + 1)));
703             for (int x = block->index; x < local->nb_blocks - 1; x++) {
704                 local->block[x].index--;
705             }
706         }
707     } else {
708         assert(block == local->block);
709         local->block = NULL;
710     }
711 
712     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
713                            block->offset, block->length,
714                             (uintptr_t)(block->local_host_addr + block->length),
715                            BITS_TO_LONGS(block->nb_chunks) *
716                                sizeof(unsigned long) * 8, block->nb_chunks);
717 
718     g_free(old);
719 
720     local->nb_blocks--;
721 
722     if (local->nb_blocks && rdma->blockmap) {
723         for (int x = 0; x < local->nb_blocks; x++) {
724             g_hash_table_insert(rdma->blockmap,
725                                 (void *)(uintptr_t)local->block[x].offset,
726                                 &local->block[x]);
727         }
728     }
729 }
730 
731 /*
732  * Trace RDMA device open, with device details.
733  */
734 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
735 {
736     struct ibv_port_attr port;
737 
738     if (ibv_query_port(verbs, 1, &port)) {
739         trace_qemu_rdma_dump_id_failed(who);
740         return;
741     }
742 
743     trace_qemu_rdma_dump_id(who,
744                 verbs->device->name,
745                 verbs->device->dev_name,
746                 verbs->device->dev_path,
747                 verbs->device->ibdev_path,
748                 port.link_layer,
749                 port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband"
750                 : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet"
751                 : "Unknown");
752 }
753 
754 /*
755  * Trace RDMA gid addressing information.
756  * Useful for understanding the RDMA device hierarchy in the kernel.
757  */
758 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
759 {
760     char sgid[33];
761     char dgid[33];
762     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
763     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
764     trace_qemu_rdma_dump_gid(who, sgid, dgid);
765 }
766 
767 /*
768  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
769  * We will try the next addrinfo struct, and fail if there are
770  * no other valid addresses to bind against.
771  *
772  * If user is listening on '[::]', then we will not have a opened a device
773  * yet and have no way of verifying if the device is RoCE or not.
774  *
775  * In this case, the source VM will throw an error for ALL types of
776  * connections (both IPv4 and IPv6) if the destination machine does not have
777  * a regular infiniband network available for use.
778  *
779  * The only way to guarantee that an error is thrown for broken kernels is
780  * for the management software to choose a *specific* interface at bind time
781  * and validate what time of hardware it is.
782  *
783  * Unfortunately, this puts the user in a fix:
784  *
785  *  If the source VM connects with an IPv4 address without knowing that the
786  *  destination has bound to '[::]' the migration will unconditionally fail
787  *  unless the management software is explicitly listening on the IPv4
788  *  address while using a RoCE-based device.
789  *
790  *  If the source VM connects with an IPv6 address, then we're OK because we can
791  *  throw an error on the source (and similarly on the destination).
792  *
793  *  But in mixed environments, this will be broken for a while until it is fixed
794  *  inside linux.
795  *
796  * We do provide a *tiny* bit of help in this function: We can list all of the
797  * devices in the system and check to see if all the devices are RoCE or
798  * Infiniband.
799  *
800  * If we detect that we have a *pure* RoCE environment, then we can safely
801  * thrown an error even if the management software has specified '[::]' as the
802  * bind address.
803  *
804  * However, if there is are multiple hetergeneous devices, then we cannot make
805  * this assumption and the user just has to be sure they know what they are
806  * doing.
807  *
808  * Patches are being reviewed on linux-rdma.
809  */
810 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
811 {
812     /* This bug only exists in linux, to our knowledge. */
813 #ifdef CONFIG_LINUX
814     struct ibv_port_attr port_attr;
815 
816     /*
817      * Verbs are only NULL if management has bound to '[::]'.
818      *
819      * Let's iterate through all the devices and see if there any pure IB
820      * devices (non-ethernet).
821      *
822      * If not, then we can safely proceed with the migration.
823      * Otherwise, there are no guarantees until the bug is fixed in linux.
824      */
825     if (!verbs) {
826         int num_devices;
827         struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
828         bool roce_found = false;
829         bool ib_found = false;
830 
831         for (int x = 0; x < num_devices; x++) {
832             verbs = ibv_open_device(dev_list[x]);
833             /*
834              * ibv_open_device() is not documented to set errno.  If
835              * it does, it's somebody else's doc bug.  If it doesn't,
836              * the use of errno below is wrong.
837              * TODO Find out whether ibv_open_device() sets errno.
838              */
839             if (!verbs) {
840                 if (errno == EPERM) {
841                     continue;
842                 } else {
843                     error_setg_errno(errp, errno,
844                                      "could not open RDMA device context");
845                     return -1;
846                 }
847             }
848 
849             if (ibv_query_port(verbs, 1, &port_attr)) {
850                 ibv_close_device(verbs);
851                 error_setg(errp,
852                            "RDMA ERROR: Could not query initial IB port");
853                 return -1;
854             }
855 
856             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
857                 ib_found = true;
858             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
859                 roce_found = true;
860             }
861 
862             ibv_close_device(verbs);
863 
864         }
865 
866         if (roce_found) {
867             if (ib_found) {
868                 warn_report("migrations may fail:"
869                             " IPv6 over RoCE / iWARP in linux"
870                             " is broken. But since you appear to have a"
871                             " mixed RoCE / IB environment, be sure to only"
872                             " migrate over the IB fabric until the kernel "
873                             " fixes the bug.");
874             } else {
875                 error_setg(errp, "RDMA ERROR: "
876                            "You only have RoCE / iWARP devices in your systems"
877                            " and your management software has specified '[::]'"
878                            ", but IPv6 over RoCE / iWARP is not supported in Linux.");
879                 return -1;
880             }
881         }
882 
883         return 0;
884     }
885 
886     /*
887      * If we have a verbs context, that means that some other than '[::]' was
888      * used by the management software for binding. In which case we can
889      * actually warn the user about a potentially broken kernel.
890      */
891 
892     /* IB ports start with 1, not 0 */
893     if (ibv_query_port(verbs, 1, &port_attr)) {
894         error_setg(errp, "RDMA ERROR: Could not query initial IB port");
895         return -1;
896     }
897 
898     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
899         error_setg(errp, "RDMA ERROR: "
900                    "Linux kernel's RoCE / iWARP does not support IPv6 "
901                    "(but patches on linux-rdma in progress)");
902         return -1;
903     }
904 
905 #endif
906 
907     return 0;
908 }
909 
910 /*
911  * Figure out which RDMA device corresponds to the requested IP hostname
912  * Also create the initial connection manager identifiers for opening
913  * the connection.
914  */
915 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
916 {
917     Error *err = NULL;
918     int ret;
919     struct rdma_addrinfo *res;
920     char port_str[16];
921     struct rdma_cm_event *cm_event;
922     char ip[40] = "unknown";
923 
924     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
925         error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
926         return -1;
927     }
928 
929     /* create CM channel */
930     rdma->channel = rdma_create_event_channel();
931     if (!rdma->channel) {
932         error_setg(errp, "RDMA ERROR: could not create CM channel");
933         return -1;
934     }
935 
936     /* create CM id */
937     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
938     if (ret < 0) {
939         error_setg(errp, "RDMA ERROR: could not create channel id");
940         goto err_resolve_create_id;
941     }
942 
943     snprintf(port_str, 16, "%d", rdma->port);
944     port_str[15] = '\0';
945 
946     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
947     if (ret) {
948         error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
949                    rdma->host);
950         goto err_resolve_get_addr;
951     }
952 
953     /* Try all addresses, saving the first error in @err */
954     for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) {
955         Error **local_errp = err ? NULL : &err;
956 
957         inet_ntop(e->ai_family,
958             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
959         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
960 
961         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
962                 RDMA_RESOLVE_TIMEOUT_MS);
963         if (ret >= 0) {
964             if (e->ai_family == AF_INET6) {
965                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs,
966                                                    local_errp);
967                 if (ret < 0) {
968                     continue;
969                 }
970             }
971             error_free(err);
972             goto route;
973         }
974     }
975 
976     rdma_freeaddrinfo(res);
977     if (err) {
978         error_propagate(errp, err);
979     } else {
980         error_setg(errp, "RDMA ERROR: could not resolve address %s",
981                    rdma->host);
982     }
983     goto err_resolve_get_addr;
984 
985 route:
986     rdma_freeaddrinfo(res);
987     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
988 
989     ret = rdma_get_cm_event(rdma->channel, &cm_event);
990     if (ret < 0) {
991         error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
992         goto err_resolve_get_addr;
993     }
994 
995     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
996         error_setg(errp,
997                    "RDMA ERROR: result not equal to event_addr_resolved %s",
998                    rdma_event_str(cm_event->event));
999         rdma_ack_cm_event(cm_event);
1000         goto err_resolve_get_addr;
1001     }
1002     rdma_ack_cm_event(cm_event);
1003 
1004     /* resolve route */
1005     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1006     if (ret < 0) {
1007         error_setg(errp, "RDMA ERROR: could not resolve rdma route");
1008         goto err_resolve_get_addr;
1009     }
1010 
1011     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1012     if (ret < 0) {
1013         error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
1014         goto err_resolve_get_addr;
1015     }
1016     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1017         error_setg(errp, "RDMA ERROR: "
1018                    "result not equal to event_route_resolved: %s",
1019                    rdma_event_str(cm_event->event));
1020         rdma_ack_cm_event(cm_event);
1021         goto err_resolve_get_addr;
1022     }
1023     rdma_ack_cm_event(cm_event);
1024     rdma->verbs = rdma->cm_id->verbs;
1025     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1026     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1027     return 0;
1028 
1029 err_resolve_get_addr:
1030     rdma_destroy_id(rdma->cm_id);
1031     rdma->cm_id = NULL;
1032 err_resolve_create_id:
1033     rdma_destroy_event_channel(rdma->channel);
1034     rdma->channel = NULL;
1035     return -1;
1036 }
1037 
1038 /*
1039  * Create protection domain and completion queues
1040  */
1041 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp)
1042 {
1043     /* allocate pd */
1044     rdma->pd = ibv_alloc_pd(rdma->verbs);
1045     if (!rdma->pd) {
1046         error_setg(errp, "failed to allocate protection domain");
1047         return -1;
1048     }
1049 
1050     /* create receive completion channel */
1051     rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1052     if (!rdma->recv_comp_channel) {
1053         error_setg(errp, "failed to allocate receive completion channel");
1054         goto err_alloc_pd_cq;
1055     }
1056 
1057     /*
1058      * Completion queue can be filled by read work requests.
1059      */
1060     rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1061                                   NULL, rdma->recv_comp_channel, 0);
1062     if (!rdma->recv_cq) {
1063         error_setg(errp, "failed to allocate receive completion queue");
1064         goto err_alloc_pd_cq;
1065     }
1066 
1067     /* create send completion channel */
1068     rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1069     if (!rdma->send_comp_channel) {
1070         error_setg(errp, "failed to allocate send completion channel");
1071         goto err_alloc_pd_cq;
1072     }
1073 
1074     rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1075                                   NULL, rdma->send_comp_channel, 0);
1076     if (!rdma->send_cq) {
1077         error_setg(errp, "failed to allocate send completion queue");
1078         goto err_alloc_pd_cq;
1079     }
1080 
1081     return 0;
1082 
1083 err_alloc_pd_cq:
1084     if (rdma->pd) {
1085         ibv_dealloc_pd(rdma->pd);
1086     }
1087     if (rdma->recv_comp_channel) {
1088         ibv_destroy_comp_channel(rdma->recv_comp_channel);
1089     }
1090     if (rdma->send_comp_channel) {
1091         ibv_destroy_comp_channel(rdma->send_comp_channel);
1092     }
1093     if (rdma->recv_cq) {
1094         ibv_destroy_cq(rdma->recv_cq);
1095         rdma->recv_cq = NULL;
1096     }
1097     rdma->pd = NULL;
1098     rdma->recv_comp_channel = NULL;
1099     rdma->send_comp_channel = NULL;
1100     return -1;
1101 
1102 }
1103 
1104 /*
1105  * Create queue pairs.
1106  */
1107 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1108 {
1109     struct ibv_qp_init_attr attr = { 0 };
1110 
1111     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1112     attr.cap.max_recv_wr = 3;
1113     attr.cap.max_send_sge = 1;
1114     attr.cap.max_recv_sge = 1;
1115     attr.send_cq = rdma->send_cq;
1116     attr.recv_cq = rdma->recv_cq;
1117     attr.qp_type = IBV_QPT_RC;
1118 
1119     if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) {
1120         return -1;
1121     }
1122 
1123     rdma->qp = rdma->cm_id->qp;
1124     return 0;
1125 }
1126 
1127 /* Check whether On-Demand Paging is supported by RDAM device */
1128 static bool rdma_support_odp(struct ibv_context *dev)
1129 {
1130     struct ibv_device_attr_ex attr = {0};
1131 
1132     if (ibv_query_device_ex(dev, NULL, &attr)) {
1133         return false;
1134     }
1135 
1136     if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1137         return true;
1138     }
1139 
1140     return false;
1141 }
1142 
1143 /*
1144  * ibv_advise_mr to avoid RNR NAK error as far as possible.
1145  * The responder mr registering with ODP will sent RNR NAK back to
1146  * the requester in the face of the page fault.
1147  */
1148 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1149                                          uint32_t len,  uint32_t lkey,
1150                                          const char *name, bool wr)
1151 {
1152 #ifdef HAVE_IBV_ADVISE_MR
1153     int ret;
1154     int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1155                  IBV_ADVISE_MR_ADVICE_PREFETCH;
1156     struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1157 
1158     ret = ibv_advise_mr(pd, advice,
1159                         IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1160     /* ignore the error */
1161     trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
1162 #endif
1163 }
1164 
1165 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp)
1166 {
1167     int i;
1168     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1169 
1170     for (i = 0; i < local->nb_blocks; i++) {
1171         int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1172 
1173         local->block[i].mr =
1174             ibv_reg_mr(rdma->pd,
1175                     local->block[i].local_host_addr,
1176                     local->block[i].length, access
1177                     );
1178         /*
1179          * ibv_reg_mr() is not documented to set errno.  If it does,
1180          * it's somebody else's doc bug.  If it doesn't, the use of
1181          * errno below is wrong.
1182          * TODO Find out whether ibv_reg_mr() sets errno.
1183          */
1184         if (!local->block[i].mr &&
1185             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1186                 access |= IBV_ACCESS_ON_DEMAND;
1187                 /* register ODP mr */
1188                 local->block[i].mr =
1189                     ibv_reg_mr(rdma->pd,
1190                                local->block[i].local_host_addr,
1191                                local->block[i].length, access);
1192                 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1193 
1194                 if (local->block[i].mr) {
1195                     qemu_rdma_advise_prefetch_mr(rdma->pd,
1196                                     (uintptr_t)local->block[i].local_host_addr,
1197                                     local->block[i].length,
1198                                     local->block[i].mr->lkey,
1199                                     local->block[i].block_name,
1200                                     true);
1201                 }
1202         }
1203 
1204         if (!local->block[i].mr) {
1205             error_setg_errno(errp, errno,
1206                              "Failed to register local dest ram block!");
1207             goto err;
1208         }
1209         rdma->total_registrations++;
1210     }
1211 
1212     return 0;
1213 
1214 err:
1215     for (i--; i >= 0; i--) {
1216         ibv_dereg_mr(local->block[i].mr);
1217         local->block[i].mr = NULL;
1218         rdma->total_registrations--;
1219     }
1220 
1221     return -1;
1222 
1223 }
1224 
1225 /*
1226  * Find the ram block that corresponds to the page requested to be
1227  * transmitted by QEMU.
1228  *
1229  * Once the block is found, also identify which 'chunk' within that
1230  * block that the page belongs to.
1231  */
1232 static void qemu_rdma_search_ram_block(RDMAContext *rdma,
1233                                        uintptr_t block_offset,
1234                                        uint64_t offset,
1235                                        uint64_t length,
1236                                        uint64_t *block_index,
1237                                        uint64_t *chunk_index)
1238 {
1239     uint64_t current_addr = block_offset + offset;
1240     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1241                                                 (void *) block_offset);
1242     assert(block);
1243     assert(current_addr >= block->offset);
1244     assert((current_addr + length) <= (block->offset + block->length));
1245 
1246     *block_index = block->index;
1247     *chunk_index = ram_chunk_index(block->local_host_addr,
1248                 block->local_host_addr + (current_addr - block->offset));
1249 }
1250 
1251 /*
1252  * Register a chunk with IB. If the chunk was already registered
1253  * previously, then skip.
1254  *
1255  * Also return the keys associated with the registration needed
1256  * to perform the actual RDMA operation.
1257  */
1258 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1259         RDMALocalBlock *block, uintptr_t host_addr,
1260         uint32_t *lkey, uint32_t *rkey, int chunk,
1261         uint8_t *chunk_start, uint8_t *chunk_end)
1262 {
1263     if (block->mr) {
1264         if (lkey) {
1265             *lkey = block->mr->lkey;
1266         }
1267         if (rkey) {
1268             *rkey = block->mr->rkey;
1269         }
1270         return 0;
1271     }
1272 
1273     /* allocate memory to store chunk MRs */
1274     if (!block->pmr) {
1275         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1276     }
1277 
1278     /*
1279      * If 'rkey', then we're the destination, so grant access to the source.
1280      *
1281      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1282      */
1283     if (!block->pmr[chunk]) {
1284         uint64_t len = chunk_end - chunk_start;
1285         int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1286                      0;
1287 
1288         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1289 
1290         block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1291         /*
1292          * ibv_reg_mr() is not documented to set errno.  If it does,
1293          * it's somebody else's doc bug.  If it doesn't, the use of
1294          * errno below is wrong.
1295          * TODO Find out whether ibv_reg_mr() sets errno.
1296          */
1297         if (!block->pmr[chunk] &&
1298             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1299             access |= IBV_ACCESS_ON_DEMAND;
1300             /* register ODP mr */
1301             block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1302             trace_qemu_rdma_register_odp_mr(block->block_name);
1303 
1304             if (block->pmr[chunk]) {
1305                 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1306                                             len, block->pmr[chunk]->lkey,
1307                                             block->block_name, rkey);
1308 
1309             }
1310         }
1311     }
1312     if (!block->pmr[chunk]) {
1313         return -1;
1314     }
1315     rdma->total_registrations++;
1316 
1317     if (lkey) {
1318         *lkey = block->pmr[chunk]->lkey;
1319     }
1320     if (rkey) {
1321         *rkey = block->pmr[chunk]->rkey;
1322     }
1323     return 0;
1324 }
1325 
1326 /*
1327  * Register (at connection time) the memory used for control
1328  * channel messages.
1329  */
1330 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1331 {
1332     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1333             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1334             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1335     if (rdma->wr_data[idx].control_mr) {
1336         rdma->total_registrations++;
1337         return 0;
1338     }
1339     return -1;
1340 }
1341 
1342 /*
1343  * Perform a non-optimized memory unregistration after every transfer
1344  * for demonstration purposes, only if pin-all is not requested.
1345  *
1346  * Potential optimizations:
1347  * 1. Start a new thread to run this function continuously
1348         - for bit clearing
1349         - and for receipt of unregister messages
1350  * 2. Use an LRU.
1351  * 3. Use workload hints.
1352  */
1353 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1354 {
1355     Error *err = NULL;
1356 
1357     while (rdma->unregistrations[rdma->unregister_current]) {
1358         int ret;
1359         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1360         uint64_t chunk =
1361             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1362         uint64_t index =
1363             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1364         RDMALocalBlock *block =
1365             &(rdma->local_ram_blocks.block[index]);
1366         RDMARegister reg = { .current_index = index };
1367         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1368                                  };
1369         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1370                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1371                                    .repeat = 1,
1372                                  };
1373 
1374         trace_qemu_rdma_unregister_waiting_proc(chunk,
1375                                                 rdma->unregister_current);
1376 
1377         rdma->unregistrations[rdma->unregister_current] = 0;
1378         rdma->unregister_current++;
1379 
1380         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1381             rdma->unregister_current = 0;
1382         }
1383 
1384 
1385         /*
1386          * Unregistration is speculative (because migration is single-threaded
1387          * and we cannot break the protocol's inifinband message ordering).
1388          * Thus, if the memory is currently being used for transmission,
1389          * then abort the attempt to unregister and try again
1390          * later the next time a completion is received for this memory.
1391          */
1392         clear_bit(chunk, block->unregister_bitmap);
1393 
1394         if (test_bit(chunk, block->transit_bitmap)) {
1395             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1396             continue;
1397         }
1398 
1399         trace_qemu_rdma_unregister_waiting_send(chunk);
1400 
1401         ret = ibv_dereg_mr(block->pmr[chunk]);
1402         block->pmr[chunk] = NULL;
1403         block->remote_keys[chunk] = 0;
1404 
1405         if (ret != 0) {
1406             error_report("unregistration chunk failed: %s",
1407                          strerror(ret));
1408             return -1;
1409         }
1410         rdma->total_registrations--;
1411 
1412         reg.key.chunk = chunk;
1413         register_to_network(rdma, &reg);
1414         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1415                                       &resp, NULL, NULL, &err);
1416         if (ret < 0) {
1417             error_report_err(err);
1418             return -1;
1419         }
1420 
1421         trace_qemu_rdma_unregister_waiting_complete(chunk);
1422     }
1423 
1424     return 0;
1425 }
1426 
1427 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1428                                          uint64_t chunk)
1429 {
1430     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1431 
1432     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1433     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1434 
1435     return result;
1436 }
1437 
1438 /*
1439  * Consult the connection manager to see a work request
1440  * (of any kind) has completed.
1441  * Return the work request ID that completed.
1442  */
1443 static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1444                           uint64_t *wr_id_out, uint32_t *byte_len)
1445 {
1446     int ret;
1447     struct ibv_wc wc;
1448     uint64_t wr_id;
1449 
1450     ret = ibv_poll_cq(cq, 1, &wc);
1451 
1452     if (!ret) {
1453         *wr_id_out = RDMA_WRID_NONE;
1454         return 0;
1455     }
1456 
1457     if (ret < 0) {
1458         return -1;
1459     }
1460 
1461     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1462 
1463     if (wc.status != IBV_WC_SUCCESS) {
1464         return -1;
1465     }
1466 
1467     if (rdma->control_ready_expected &&
1468         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1469         trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
1470                                   rdma->nb_sent);
1471         rdma->control_ready_expected = 0;
1472     }
1473 
1474     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1475         uint64_t chunk =
1476             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1477         uint64_t index =
1478             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1479         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1480 
1481         trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
1482                                    index, chunk, block->local_host_addr,
1483                                    (void *)(uintptr_t)block->remote_host_addr);
1484 
1485         clear_bit(chunk, block->transit_bitmap);
1486 
1487         if (rdma->nb_sent > 0) {
1488             rdma->nb_sent--;
1489         }
1490     } else {
1491         trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
1492     }
1493 
1494     *wr_id_out = wc.wr_id;
1495     if (byte_len) {
1496         *byte_len = wc.byte_len;
1497     }
1498 
1499     return  0;
1500 }
1501 
1502 /* Wait for activity on the completion channel.
1503  * Returns 0 on success, none-0 on error.
1504  */
1505 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1506                                        struct ibv_comp_channel *comp_channel)
1507 {
1508     struct rdma_cm_event *cm_event;
1509 
1510     /*
1511      * Coroutine doesn't start until migration_fd_process_incoming()
1512      * so don't yield unless we know we're running inside of a coroutine.
1513      */
1514     if (rdma->migration_started_on_destination &&
1515         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1516         yield_until_fd_readable(comp_channel->fd);
1517     } else {
1518         /* This is the source side, we're in a separate thread
1519          * or destination prior to migration_fd_process_incoming()
1520          * after postcopy, the destination also in a separate thread.
1521          * we can't yield; so we have to poll the fd.
1522          * But we need to be able to handle 'cancel' or an error
1523          * without hanging forever.
1524          */
1525         while (!rdma->errored && !rdma->received_error) {
1526             GPollFD pfds[2];
1527             pfds[0].fd = comp_channel->fd;
1528             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1529             pfds[0].revents = 0;
1530 
1531             pfds[1].fd = rdma->channel->fd;
1532             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1533             pfds[1].revents = 0;
1534 
1535             /* 0.1s timeout, should be fine for a 'cancel' */
1536             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1537             case 2:
1538             case 1: /* fd active */
1539                 if (pfds[0].revents) {
1540                     return 0;
1541                 }
1542 
1543                 if (pfds[1].revents) {
1544                     if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
1545                         return -1;
1546                     }
1547 
1548                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1549                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1550                         rdma_ack_cm_event(cm_event);
1551                         return -1;
1552                     }
1553                     rdma_ack_cm_event(cm_event);
1554                 }
1555                 break;
1556 
1557             case 0: /* Timeout, go around again */
1558                 break;
1559 
1560             default: /* Error of some type -
1561                       * I don't trust errno from qemu_poll_ns
1562                      */
1563                 return -1;
1564             }
1565 
1566             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1567                 /* Bail out and let the cancellation happen */
1568                 return -1;
1569             }
1570         }
1571     }
1572 
1573     if (rdma->received_error) {
1574         return -1;
1575     }
1576     return -rdma->errored;
1577 }
1578 
1579 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
1580 {
1581     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1582            rdma->recv_comp_channel;
1583 }
1584 
1585 static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
1586 {
1587     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1588 }
1589 
1590 /*
1591  * Block until the next work request has completed.
1592  *
1593  * First poll to see if a work request has already completed,
1594  * otherwise block.
1595  *
1596  * If we encounter completed work requests for IDs other than
1597  * the one we're interested in, then that's generally an error.
1598  *
1599  * The only exception is actual RDMA Write completions. These
1600  * completions only need to be recorded, but do not actually
1601  * need further processing.
1602  */
1603 static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
1604                                     uint64_t wrid_requested,
1605                                     uint32_t *byte_len)
1606 {
1607     int num_cq_events = 0, ret;
1608     struct ibv_cq *cq;
1609     void *cq_ctx;
1610     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1611     struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1612     struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1613 
1614     if (ibv_req_notify_cq(poll_cq, 0)) {
1615         return -1;
1616     }
1617     /* poll cq first */
1618     while (wr_id != wrid_requested) {
1619         ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1620         if (ret < 0) {
1621             return -1;
1622         }
1623 
1624         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1625 
1626         if (wr_id == RDMA_WRID_NONE) {
1627             break;
1628         }
1629         if (wr_id != wrid_requested) {
1630             trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1631         }
1632     }
1633 
1634     if (wr_id == wrid_requested) {
1635         return 0;
1636     }
1637 
1638     while (1) {
1639         ret = qemu_rdma_wait_comp_channel(rdma, ch);
1640         if (ret < 0) {
1641             goto err_block_for_wrid;
1642         }
1643 
1644         ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1645         if (ret < 0) {
1646             goto err_block_for_wrid;
1647         }
1648 
1649         num_cq_events++;
1650 
1651         if (ibv_req_notify_cq(cq, 0)) {
1652             goto err_block_for_wrid;
1653         }
1654 
1655         while (wr_id != wrid_requested) {
1656             ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1657             if (ret < 0) {
1658                 goto err_block_for_wrid;
1659             }
1660 
1661             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1662 
1663             if (wr_id == RDMA_WRID_NONE) {
1664                 break;
1665             }
1666             if (wr_id != wrid_requested) {
1667                 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1668             }
1669         }
1670 
1671         if (wr_id == wrid_requested) {
1672             goto success_block_for_wrid;
1673         }
1674     }
1675 
1676 success_block_for_wrid:
1677     if (num_cq_events) {
1678         ibv_ack_cq_events(cq, num_cq_events);
1679     }
1680     return 0;
1681 
1682 err_block_for_wrid:
1683     if (num_cq_events) {
1684         ibv_ack_cq_events(cq, num_cq_events);
1685     }
1686 
1687     rdma->errored = true;
1688     return -1;
1689 }
1690 
1691 /*
1692  * Post a SEND message work request for the control channel
1693  * containing some data and block until the post completes.
1694  */
1695 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1696                                        RDMAControlHeader *head,
1697                                        Error **errp)
1698 {
1699     int ret;
1700     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1701     struct ibv_send_wr *bad_wr;
1702     struct ibv_sge sge = {
1703                            .addr = (uintptr_t)(wr->control),
1704                            .length = head->len + sizeof(RDMAControlHeader),
1705                            .lkey = wr->control_mr->lkey,
1706                          };
1707     struct ibv_send_wr send_wr = {
1708                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1709                                    .opcode = IBV_WR_SEND,
1710                                    .send_flags = IBV_SEND_SIGNALED,
1711                                    .sg_list = &sge,
1712                                    .num_sge = 1,
1713                                 };
1714 
1715     trace_qemu_rdma_post_send_control(control_desc(head->type));
1716 
1717     /*
1718      * We don't actually need to do a memcpy() in here if we used
1719      * the "sge" properly, but since we're only sending control messages
1720      * (not RAM in a performance-critical path), then its OK for now.
1721      *
1722      * The copy makes the RDMAControlHeader simpler to manipulate
1723      * for the time being.
1724      */
1725     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1726     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1727     control_to_network((void *) wr->control);
1728 
1729     if (buf) {
1730         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1731     }
1732 
1733 
1734     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1735 
1736     if (ret > 0) {
1737         error_setg(errp, "Failed to use post IB SEND for control");
1738         return -1;
1739     }
1740 
1741     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1742     if (ret < 0) {
1743         error_setg(errp, "rdma migration: send polling control error");
1744         return -1;
1745     }
1746 
1747     return 0;
1748 }
1749 
1750 /*
1751  * Post a RECV work request in anticipation of some future receipt
1752  * of data on the control channel.
1753  */
1754 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx,
1755                                        Error **errp)
1756 {
1757     struct ibv_recv_wr *bad_wr;
1758     struct ibv_sge sge = {
1759                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1760                             .length = RDMA_CONTROL_MAX_BUFFER,
1761                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1762                          };
1763 
1764     struct ibv_recv_wr recv_wr = {
1765                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1766                                     .sg_list = &sge,
1767                                     .num_sge = 1,
1768                                  };
1769 
1770 
1771     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1772         error_setg(errp, "error posting control recv");
1773         return -1;
1774     }
1775 
1776     return 0;
1777 }
1778 
1779 /*
1780  * Block and wait for a RECV control channel message to arrive.
1781  */
1782 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1783                 RDMAControlHeader *head, uint32_t expecting, int idx,
1784                 Error **errp)
1785 {
1786     uint32_t byte_len;
1787     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1788                                        &byte_len);
1789 
1790     if (ret < 0) {
1791         error_setg(errp, "rdma migration: recv polling control error!");
1792         return -1;
1793     }
1794 
1795     network_to_control((void *) rdma->wr_data[idx].control);
1796     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1797 
1798     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1799 
1800     if (expecting == RDMA_CONTROL_NONE) {
1801         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1802                                              head->type);
1803     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1804         error_setg(errp, "Was expecting a %s (%d) control message"
1805                 ", but got: %s (%d), length: %d",
1806                 control_desc(expecting), expecting,
1807                 control_desc(head->type), head->type, head->len);
1808         if (head->type == RDMA_CONTROL_ERROR) {
1809             rdma->received_error = true;
1810         }
1811         return -1;
1812     }
1813     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1814         error_setg(errp, "too long length: %d", head->len);
1815         return -1;
1816     }
1817     if (sizeof(*head) + head->len != byte_len) {
1818         error_setg(errp, "Malformed length: %d byte_len %d",
1819                    head->len, byte_len);
1820         return -1;
1821     }
1822 
1823     return 0;
1824 }
1825 
1826 /*
1827  * When a RECV work request has completed, the work request's
1828  * buffer is pointed at the header.
1829  *
1830  * This will advance the pointer to the data portion
1831  * of the control message of the work request's buffer that
1832  * was populated after the work request finished.
1833  */
1834 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1835                                   RDMAControlHeader *head)
1836 {
1837     rdma->wr_data[idx].control_len = head->len;
1838     rdma->wr_data[idx].control_curr =
1839         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1840 }
1841 
1842 /*
1843  * This is an 'atomic' high-level operation to deliver a single, unified
1844  * control-channel message.
1845  *
1846  * Additionally, if the user is expecting some kind of reply to this message,
1847  * they can request a 'resp' response message be filled in by posting an
1848  * additional work request on behalf of the user and waiting for an additional
1849  * completion.
1850  *
1851  * The extra (optional) response is used during registration to us from having
1852  * to perform an *additional* exchange of message just to provide a response by
1853  * instead piggy-backing on the acknowledgement.
1854  */
1855 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1856                                    uint8_t *data, RDMAControlHeader *resp,
1857                                    int *resp_idx,
1858                                    int (*callback)(RDMAContext *rdma,
1859                                                    Error **errp),
1860                                    Error **errp)
1861 {
1862     int ret;
1863 
1864     /*
1865      * Wait until the dest is ready before attempting to deliver the message
1866      * by waiting for a READY message.
1867      */
1868     if (rdma->control_ready_expected) {
1869         RDMAControlHeader resp_ignored;
1870 
1871         ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
1872                                               RDMA_CONTROL_READY,
1873                                               RDMA_WRID_READY, errp);
1874         if (ret < 0) {
1875             return -1;
1876         }
1877     }
1878 
1879     /*
1880      * If the user is expecting a response, post a WR in anticipation of it.
1881      */
1882     if (resp) {
1883         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp);
1884         if (ret < 0) {
1885             return -1;
1886         }
1887     }
1888 
1889     /*
1890      * Post a WR to replace the one we just consumed for the READY message.
1891      */
1892     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1893     if (ret < 0) {
1894         return -1;
1895     }
1896 
1897     /*
1898      * Deliver the control message that was requested.
1899      */
1900     ret = qemu_rdma_post_send_control(rdma, data, head, errp);
1901 
1902     if (ret < 0) {
1903         return -1;
1904     }
1905 
1906     /*
1907      * If we're expecting a response, block and wait for it.
1908      */
1909     if (resp) {
1910         if (callback) {
1911             trace_qemu_rdma_exchange_send_issue_callback();
1912             ret = callback(rdma, errp);
1913             if (ret < 0) {
1914                 return -1;
1915             }
1916         }
1917 
1918         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1919         ret = qemu_rdma_exchange_get_response(rdma, resp,
1920                                               resp->type, RDMA_WRID_DATA,
1921                                               errp);
1922 
1923         if (ret < 0) {
1924             return -1;
1925         }
1926 
1927         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1928         if (resp_idx) {
1929             *resp_idx = RDMA_WRID_DATA;
1930         }
1931         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1932     }
1933 
1934     rdma->control_ready_expected = 1;
1935 
1936     return 0;
1937 }
1938 
1939 /*
1940  * This is an 'atomic' high-level operation to receive a single, unified
1941  * control-channel message.
1942  */
1943 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1944                                    uint32_t expecting, Error **errp)
1945 {
1946     RDMAControlHeader ready = {
1947                                 .len = 0,
1948                                 .type = RDMA_CONTROL_READY,
1949                                 .repeat = 1,
1950                               };
1951     int ret;
1952 
1953     /*
1954      * Inform the source that we're ready to receive a message.
1955      */
1956     ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp);
1957 
1958     if (ret < 0) {
1959         return -1;
1960     }
1961 
1962     /*
1963      * Block and wait for the message.
1964      */
1965     ret = qemu_rdma_exchange_get_response(rdma, head,
1966                                           expecting, RDMA_WRID_READY, errp);
1967 
1968     if (ret < 0) {
1969         return -1;
1970     }
1971 
1972     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1973 
1974     /*
1975      * Post a new RECV work request to replace the one we just consumed.
1976      */
1977     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1978     if (ret < 0) {
1979         return -1;
1980     }
1981 
1982     return 0;
1983 }
1984 
1985 /*
1986  * Write an actual chunk of memory using RDMA.
1987  *
1988  * If we're using dynamic registration on the dest-side, we have to
1989  * send a registration command first.
1990  */
1991 static int qemu_rdma_write_one(RDMAContext *rdma,
1992                                int current_index, uint64_t current_addr,
1993                                uint64_t length, Error **errp)
1994 {
1995     struct ibv_sge sge;
1996     struct ibv_send_wr send_wr = { 0 };
1997     struct ibv_send_wr *bad_wr;
1998     int reg_result_idx, ret, count = 0;
1999     uint64_t chunk, chunks;
2000     uint8_t *chunk_start, *chunk_end;
2001     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2002     RDMARegister reg;
2003     RDMARegisterResult *reg_result;
2004     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2005     RDMAControlHeader head = { .len = sizeof(RDMARegister),
2006                                .type = RDMA_CONTROL_REGISTER_REQUEST,
2007                                .repeat = 1,
2008                              };
2009 
2010 retry:
2011     sge.addr = (uintptr_t)(block->local_host_addr +
2012                             (current_addr - block->offset));
2013     sge.length = length;
2014 
2015     chunk = ram_chunk_index(block->local_host_addr,
2016                             (uint8_t *)(uintptr_t)sge.addr);
2017     chunk_start = ram_chunk_start(block, chunk);
2018 
2019     if (block->is_ram_block) {
2020         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2021 
2022         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2023             chunks--;
2024         }
2025     } else {
2026         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2027 
2028         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2029             chunks--;
2030         }
2031     }
2032 
2033     trace_qemu_rdma_write_one_top(chunks + 1,
2034                                   (chunks + 1) *
2035                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2036 
2037     chunk_end = ram_chunk_end(block, chunk + chunks);
2038 
2039 
2040     while (test_bit(chunk, block->transit_bitmap)) {
2041         (void)count;
2042         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2043                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2044 
2045         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2046 
2047         if (ret < 0) {
2048             error_setg(errp, "Failed to Wait for previous write to complete "
2049                     "block %d chunk %" PRIu64
2050                     " current %" PRIu64 " len %" PRIu64 " %d",
2051                     current_index, chunk, sge.addr, length, rdma->nb_sent);
2052             return -1;
2053         }
2054     }
2055 
2056     if (!rdma->pin_all || !block->is_ram_block) {
2057         if (!block->remote_keys[chunk]) {
2058             /*
2059              * This chunk has not yet been registered, so first check to see
2060              * if the entire chunk is zero. If so, tell the other size to
2061              * memset() + madvise() the entire chunk without RDMA.
2062              */
2063 
2064             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2065                 RDMACompress comp = {
2066                                         .offset = current_addr,
2067                                         .value = 0,
2068                                         .block_idx = current_index,
2069                                         .length = length,
2070                                     };
2071 
2072                 head.len = sizeof(comp);
2073                 head.type = RDMA_CONTROL_COMPRESS;
2074 
2075                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2076                                                current_index, current_addr);
2077 
2078                 compress_to_network(rdma, &comp);
2079                 ret = qemu_rdma_exchange_send(rdma, &head,
2080                                 (uint8_t *) &comp, NULL, NULL, NULL, errp);
2081 
2082                 if (ret < 0) {
2083                     return -1;
2084                 }
2085 
2086                 /*
2087                  * TODO: Here we are sending something, but we are not
2088                  * accounting for anything transferred.  The following is wrong:
2089                  *
2090                  * stat64_add(&mig_stats.rdma_bytes, sge.length);
2091                  *
2092                  * because we are using some kind of compression.  I
2093                  * would think that head.len would be the more similar
2094                  * thing to a correct value.
2095                  */
2096                 stat64_add(&mig_stats.zero_pages,
2097                            sge.length / qemu_target_page_size());
2098                 return 1;
2099             }
2100 
2101             /*
2102              * Otherwise, tell other side to register.
2103              */
2104             reg.current_index = current_index;
2105             if (block->is_ram_block) {
2106                 reg.key.current_addr = current_addr;
2107             } else {
2108                 reg.key.chunk = chunk;
2109             }
2110             reg.chunks = chunks;
2111 
2112             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2113                                               current_addr);
2114 
2115             register_to_network(rdma, &reg);
2116             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2117                                     &resp, &reg_result_idx, NULL, errp);
2118             if (ret < 0) {
2119                 return -1;
2120             }
2121 
2122             /* try to overlap this single registration with the one we sent. */
2123             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2124                                                 &sge.lkey, NULL, chunk,
2125                                                 chunk_start, chunk_end)) {
2126                 error_setg(errp, "cannot get lkey");
2127                 return -1;
2128             }
2129 
2130             reg_result = (RDMARegisterResult *)
2131                     rdma->wr_data[reg_result_idx].control_curr;
2132 
2133             network_to_result(reg_result);
2134 
2135             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2136                                                  reg_result->rkey, chunk);
2137 
2138             block->remote_keys[chunk] = reg_result->rkey;
2139             block->remote_host_addr = reg_result->host_addr;
2140         } else {
2141             /* already registered before */
2142             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2143                                                 &sge.lkey, NULL, chunk,
2144                                                 chunk_start, chunk_end)) {
2145                 error_setg(errp, "cannot get lkey!");
2146                 return -1;
2147             }
2148         }
2149 
2150         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2151     } else {
2152         send_wr.wr.rdma.rkey = block->remote_rkey;
2153 
2154         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2155                                                      &sge.lkey, NULL, chunk,
2156                                                      chunk_start, chunk_end)) {
2157             error_setg(errp, "cannot get lkey!");
2158             return -1;
2159         }
2160     }
2161 
2162     /*
2163      * Encode the ram block index and chunk within this wrid.
2164      * We will use this information at the time of completion
2165      * to figure out which bitmap to check against and then which
2166      * chunk in the bitmap to look for.
2167      */
2168     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2169                                         current_index, chunk);
2170 
2171     send_wr.opcode = IBV_WR_RDMA_WRITE;
2172     send_wr.send_flags = IBV_SEND_SIGNALED;
2173     send_wr.sg_list = &sge;
2174     send_wr.num_sge = 1;
2175     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2176                                 (current_addr - block->offset);
2177 
2178     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2179                                    sge.length);
2180 
2181     /*
2182      * ibv_post_send() does not return negative error numbers,
2183      * per the specification they are positive - no idea why.
2184      */
2185     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2186 
2187     if (ret == ENOMEM) {
2188         trace_qemu_rdma_write_one_queue_full();
2189         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2190         if (ret < 0) {
2191             error_setg(errp, "rdma migration: failed to make "
2192                          "room in full send queue!");
2193             return -1;
2194         }
2195 
2196         goto retry;
2197 
2198     } else if (ret > 0) {
2199         error_setg_errno(errp, ret,
2200                          "rdma migration: post rdma write failed");
2201         return -1;
2202     }
2203 
2204     set_bit(chunk, block->transit_bitmap);
2205     stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
2206     /*
2207      * We are adding to transferred the amount of data written, but no
2208      * overhead at all.  I will asume that RDMA is magicaly and don't
2209      * need to transfer (at least) the addresses where it wants to
2210      * write the pages.  Here it looks like it should be something
2211      * like:
2212      *     sizeof(send_wr) + sge.length
2213      * but this being RDMA, who knows.
2214      */
2215     stat64_add(&mig_stats.rdma_bytes, sge.length);
2216     ram_transferred_add(sge.length);
2217     rdma->total_writes++;
2218 
2219     return 0;
2220 }
2221 
2222 /*
2223  * Push out any unwritten RDMA operations.
2224  *
2225  * We support sending out multiple chunks at the same time.
2226  * Not all of them need to get signaled in the completion queue.
2227  */
2228 static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp)
2229 {
2230     int ret;
2231 
2232     if (!rdma->current_length) {
2233         return 0;
2234     }
2235 
2236     ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr,
2237                               rdma->current_length, errp);
2238 
2239     if (ret < 0) {
2240         return -1;
2241     }
2242 
2243     if (ret == 0) {
2244         rdma->nb_sent++;
2245         trace_qemu_rdma_write_flush(rdma->nb_sent);
2246     }
2247 
2248     rdma->current_length = 0;
2249     rdma->current_addr = 0;
2250 
2251     return 0;
2252 }
2253 
2254 static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
2255                     uint64_t offset, uint64_t len)
2256 {
2257     RDMALocalBlock *block;
2258     uint8_t *host_addr;
2259     uint8_t *chunk_end;
2260 
2261     if (rdma->current_index < 0) {
2262         return false;
2263     }
2264 
2265     if (rdma->current_chunk < 0) {
2266         return false;
2267     }
2268 
2269     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2270     host_addr = block->local_host_addr + (offset - block->offset);
2271     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2272 
2273     if (rdma->current_length == 0) {
2274         return false;
2275     }
2276 
2277     /*
2278      * Only merge into chunk sequentially.
2279      */
2280     if (offset != (rdma->current_addr + rdma->current_length)) {
2281         return false;
2282     }
2283 
2284     if (offset < block->offset) {
2285         return false;
2286     }
2287 
2288     if ((offset + len) > (block->offset + block->length)) {
2289         return false;
2290     }
2291 
2292     if ((host_addr + len) > chunk_end) {
2293         return false;
2294     }
2295 
2296     return true;
2297 }
2298 
2299 /*
2300  * We're not actually writing here, but doing three things:
2301  *
2302  * 1. Identify the chunk the buffer belongs to.
2303  * 2. If the chunk is full or the buffer doesn't belong to the current
2304  *    chunk, then start a new chunk and flush() the old chunk.
2305  * 3. To keep the hardware busy, we also group chunks into batches
2306  *    and only require that a batch gets acknowledged in the completion
2307  *    queue instead of each individual chunk.
2308  */
2309 static int qemu_rdma_write(RDMAContext *rdma,
2310                            uint64_t block_offset, uint64_t offset,
2311                            uint64_t len, Error **errp)
2312 {
2313     uint64_t current_addr = block_offset + offset;
2314     uint64_t index = rdma->current_index;
2315     uint64_t chunk = rdma->current_chunk;
2316 
2317     /* If we cannot merge it, we flush the current buffer first. */
2318     if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
2319         if (qemu_rdma_write_flush(rdma, errp) < 0) {
2320             return -1;
2321         }
2322         rdma->current_length = 0;
2323         rdma->current_addr = current_addr;
2324 
2325         qemu_rdma_search_ram_block(rdma, block_offset,
2326                                    offset, len, &index, &chunk);
2327         rdma->current_index = index;
2328         rdma->current_chunk = chunk;
2329     }
2330 
2331     /* merge it */
2332     rdma->current_length += len;
2333 
2334     /* flush it if buffer is too large */
2335     if (rdma->current_length >= RDMA_MERGE_MAX) {
2336         return qemu_rdma_write_flush(rdma, errp);
2337     }
2338 
2339     return 0;
2340 }
2341 
2342 static void qemu_rdma_cleanup(RDMAContext *rdma)
2343 {
2344     Error *err = NULL;
2345 
2346     if (rdma->cm_id && rdma->connected) {
2347         if ((rdma->errored ||
2348              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2349             !rdma->received_error) {
2350             RDMAControlHeader head = { .len = 0,
2351                                        .type = RDMA_CONTROL_ERROR,
2352                                        .repeat = 1,
2353                                      };
2354             warn_report("Early error. Sending error.");
2355             if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) {
2356                 warn_report_err(err);
2357             }
2358         }
2359 
2360         rdma_disconnect(rdma->cm_id);
2361         trace_qemu_rdma_cleanup_disconnect();
2362         rdma->connected = false;
2363     }
2364 
2365     if (rdma->channel) {
2366         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2367     }
2368     g_free(rdma->dest_blocks);
2369     rdma->dest_blocks = NULL;
2370 
2371     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2372         if (rdma->wr_data[i].control_mr) {
2373             rdma->total_registrations--;
2374             ibv_dereg_mr(rdma->wr_data[i].control_mr);
2375         }
2376         rdma->wr_data[i].control_mr = NULL;
2377     }
2378 
2379     if (rdma->local_ram_blocks.block) {
2380         while (rdma->local_ram_blocks.nb_blocks) {
2381             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2382         }
2383     }
2384 
2385     if (rdma->qp) {
2386         rdma_destroy_qp(rdma->cm_id);
2387         rdma->qp = NULL;
2388     }
2389     if (rdma->recv_cq) {
2390         ibv_destroy_cq(rdma->recv_cq);
2391         rdma->recv_cq = NULL;
2392     }
2393     if (rdma->send_cq) {
2394         ibv_destroy_cq(rdma->send_cq);
2395         rdma->send_cq = NULL;
2396     }
2397     if (rdma->recv_comp_channel) {
2398         ibv_destroy_comp_channel(rdma->recv_comp_channel);
2399         rdma->recv_comp_channel = NULL;
2400     }
2401     if (rdma->send_comp_channel) {
2402         ibv_destroy_comp_channel(rdma->send_comp_channel);
2403         rdma->send_comp_channel = NULL;
2404     }
2405     if (rdma->pd) {
2406         ibv_dealloc_pd(rdma->pd);
2407         rdma->pd = NULL;
2408     }
2409     if (rdma->cm_id) {
2410         rdma_destroy_id(rdma->cm_id);
2411         rdma->cm_id = NULL;
2412     }
2413 
2414     /* the destination side, listen_id and channel is shared */
2415     if (rdma->listen_id) {
2416         if (!rdma->is_return_path) {
2417             rdma_destroy_id(rdma->listen_id);
2418         }
2419         rdma->listen_id = NULL;
2420 
2421         if (rdma->channel) {
2422             if (!rdma->is_return_path) {
2423                 rdma_destroy_event_channel(rdma->channel);
2424             }
2425             rdma->channel = NULL;
2426         }
2427     }
2428 
2429     if (rdma->channel) {
2430         rdma_destroy_event_channel(rdma->channel);
2431         rdma->channel = NULL;
2432     }
2433     g_free(rdma->host);
2434     g_free(rdma->host_port);
2435     rdma->host = NULL;
2436     rdma->host_port = NULL;
2437 }
2438 
2439 
2440 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2441 {
2442     int ret;
2443 
2444     /*
2445      * Will be validated against destination's actual capabilities
2446      * after the connect() completes.
2447      */
2448     rdma->pin_all = pin_all;
2449 
2450     ret = qemu_rdma_resolve_host(rdma, errp);
2451     if (ret < 0) {
2452         goto err_rdma_source_init;
2453     }
2454 
2455     ret = qemu_rdma_alloc_pd_cq(rdma, errp);
2456     if (ret < 0) {
2457         goto err_rdma_source_init;
2458     }
2459 
2460     ret = qemu_rdma_alloc_qp(rdma);
2461     if (ret < 0) {
2462         error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
2463         goto err_rdma_source_init;
2464     }
2465 
2466     qemu_rdma_init_ram_blocks(rdma);
2467 
2468     /* Build the hash that maps from offset to RAMBlock */
2469     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2470     for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) {
2471         g_hash_table_insert(rdma->blockmap,
2472                 (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset,
2473                 &rdma->local_ram_blocks.block[i]);
2474     }
2475 
2476     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2477         ret = qemu_rdma_reg_control(rdma, i);
2478         if (ret < 0) {
2479             error_setg(errp, "RDMA ERROR: rdma migration: error "
2480                        "registering %d control!", i);
2481             goto err_rdma_source_init;
2482         }
2483     }
2484 
2485     return 0;
2486 
2487 err_rdma_source_init:
2488     qemu_rdma_cleanup(rdma);
2489     return -1;
2490 }
2491 
2492 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2493                                      struct rdma_cm_event **cm_event,
2494                                      long msec, Error **errp)
2495 {
2496     int ret;
2497     struct pollfd poll_fd = {
2498                                 .fd = rdma->channel->fd,
2499                                 .events = POLLIN,
2500                                 .revents = 0
2501                             };
2502 
2503     do {
2504         ret = poll(&poll_fd, 1, msec);
2505     } while (ret < 0 && errno == EINTR);
2506 
2507     if (ret == 0) {
2508         error_setg(errp, "RDMA ERROR: poll cm event timeout");
2509         return -1;
2510     } else if (ret < 0) {
2511         error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
2512                    errno);
2513         return -1;
2514     } else if (poll_fd.revents & POLLIN) {
2515         if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
2516             error_setg(errp, "RDMA ERROR: failed to get cm event");
2517             return -1;
2518         }
2519         return 0;
2520     } else {
2521         error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
2522                    poll_fd.revents);
2523         return -1;
2524     }
2525 }
2526 
2527 static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
2528                              Error **errp)
2529 {
2530     RDMACapabilities cap = {
2531                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2532                                 .flags = 0,
2533                            };
2534     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2535                                           .retry_count = 5,
2536                                           .private_data = &cap,
2537                                           .private_data_len = sizeof(cap),
2538                                         };
2539     struct rdma_cm_event *cm_event;
2540     int ret;
2541 
2542     /*
2543      * Only negotiate the capability with destination if the user
2544      * on the source first requested the capability.
2545      */
2546     if (rdma->pin_all) {
2547         trace_qemu_rdma_connect_pin_all_requested();
2548         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2549     }
2550 
2551     caps_to_network(&cap);
2552 
2553     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
2554     if (ret < 0) {
2555         goto err_rdma_source_connect;
2556     }
2557 
2558     ret = rdma_connect(rdma->cm_id, &conn_param);
2559     if (ret < 0) {
2560         error_setg_errno(errp, errno,
2561                          "RDMA ERROR: connecting to destination!");
2562         goto err_rdma_source_connect;
2563     }
2564 
2565     if (return_path) {
2566         ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2567     } else {
2568         ret = rdma_get_cm_event(rdma->channel, &cm_event);
2569         if (ret < 0) {
2570             error_setg_errno(errp, errno,
2571                              "RDMA ERROR: failed to get cm event");
2572         }
2573     }
2574     if (ret < 0) {
2575         goto err_rdma_source_connect;
2576     }
2577 
2578     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2579         error_setg(errp, "RDMA ERROR: connecting to destination!");
2580         rdma_ack_cm_event(cm_event);
2581         goto err_rdma_source_connect;
2582     }
2583     rdma->connected = true;
2584 
2585     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2586     network_to_caps(&cap);
2587 
2588     /*
2589      * Verify that the *requested* capabilities are supported by the destination
2590      * and disable them otherwise.
2591      */
2592     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2593         warn_report("RDMA: Server cannot support pinning all memory. "
2594                     "Will register memory dynamically.");
2595         rdma->pin_all = false;
2596     }
2597 
2598     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2599 
2600     rdma_ack_cm_event(cm_event);
2601 
2602     rdma->control_ready_expected = 1;
2603     rdma->nb_sent = 0;
2604     return 0;
2605 
2606 err_rdma_source_connect:
2607     qemu_rdma_cleanup(rdma);
2608     return -1;
2609 }
2610 
2611 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2612 {
2613     Error *err = NULL;
2614     int ret;
2615     struct rdma_cm_id *listen_id;
2616     char ip[40] = "unknown";
2617     struct rdma_addrinfo *res, *e;
2618     char port_str[16];
2619     int reuse = 1;
2620 
2621     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2622         rdma->wr_data[i].control_len = 0;
2623         rdma->wr_data[i].control_curr = NULL;
2624     }
2625 
2626     if (!rdma->host || !rdma->host[0]) {
2627         error_setg(errp, "RDMA ERROR: RDMA host is not set!");
2628         rdma->errored = true;
2629         return -1;
2630     }
2631     /* create CM channel */
2632     rdma->channel = rdma_create_event_channel();
2633     if (!rdma->channel) {
2634         error_setg(errp, "RDMA ERROR: could not create rdma event channel");
2635         rdma->errored = true;
2636         return -1;
2637     }
2638 
2639     /* create CM id */
2640     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2641     if (ret < 0) {
2642         error_setg(errp, "RDMA ERROR: could not create cm_id!");
2643         goto err_dest_init_create_listen_id;
2644     }
2645 
2646     snprintf(port_str, 16, "%d", rdma->port);
2647     port_str[15] = '\0';
2648 
2649     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2650     if (ret) {
2651         error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
2652                    rdma->host);
2653         goto err_dest_init_bind_addr;
2654     }
2655 
2656     ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2657                           &reuse, sizeof reuse);
2658     if (ret < 0) {
2659         error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
2660         goto err_dest_init_bind_addr;
2661     }
2662 
2663     /* Try all addresses, saving the first error in @err */
2664     for (e = res; e != NULL; e = e->ai_next) {
2665         Error **local_errp = err ? NULL : &err;
2666 
2667         inet_ntop(e->ai_family,
2668             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2669         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2670         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2671         if (ret < 0) {
2672             continue;
2673         }
2674         if (e->ai_family == AF_INET6) {
2675             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs,
2676                                                local_errp);
2677             if (ret < 0) {
2678                 continue;
2679             }
2680         }
2681         error_free(err);
2682         break;
2683     }
2684 
2685     rdma_freeaddrinfo(res);
2686     if (!e) {
2687         if (err) {
2688             error_propagate(errp, err);
2689         } else {
2690             error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
2691         }
2692         goto err_dest_init_bind_addr;
2693     }
2694 
2695     rdma->listen_id = listen_id;
2696     qemu_rdma_dump_gid("dest_init", listen_id);
2697     return 0;
2698 
2699 err_dest_init_bind_addr:
2700     rdma_destroy_id(listen_id);
2701 err_dest_init_create_listen_id:
2702     rdma_destroy_event_channel(rdma->channel);
2703     rdma->channel = NULL;
2704     rdma->errored = true;
2705     return -1;
2706 
2707 }
2708 
2709 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2710                                             RDMAContext *rdma)
2711 {
2712     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2713         rdma_return_path->wr_data[i].control_len = 0;
2714         rdma_return_path->wr_data[i].control_curr = NULL;
2715     }
2716 
2717     /*the CM channel and CM id is shared*/
2718     rdma_return_path->channel = rdma->channel;
2719     rdma_return_path->listen_id = rdma->listen_id;
2720 
2721     rdma->return_path = rdma_return_path;
2722     rdma_return_path->return_path = rdma;
2723     rdma_return_path->is_return_path = true;
2724 }
2725 
2726 static RDMAContext *qemu_rdma_data_init(const char *host_port, Error **errp)
2727 {
2728     RDMAContext *rdma = NULL;
2729     InetSocketAddress *addr;
2730 
2731     rdma = g_new0(RDMAContext, 1);
2732     rdma->current_index = -1;
2733     rdma->current_chunk = -1;
2734 
2735     addr = g_new(InetSocketAddress, 1);
2736     if (!inet_parse(addr, host_port, NULL)) {
2737         rdma->port = atoi(addr->port);
2738         rdma->host = g_strdup(addr->host);
2739         rdma->host_port = g_strdup(host_port);
2740     } else {
2741         error_setg(errp, "RDMA ERROR: bad RDMA migration address '%s'",
2742                    host_port);
2743         g_free(rdma);
2744         rdma = NULL;
2745     }
2746 
2747     qapi_free_InetSocketAddress(addr);
2748     return rdma;
2749 }
2750 
2751 /*
2752  * QEMUFile interface to the control channel.
2753  * SEND messages for control only.
2754  * VM's ram is handled with regular RDMA messages.
2755  */
2756 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2757                                        const struct iovec *iov,
2758                                        size_t niov,
2759                                        int *fds,
2760                                        size_t nfds,
2761                                        int flags,
2762                                        Error **errp)
2763 {
2764     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2765     RDMAContext *rdma;
2766     int ret;
2767     ssize_t done = 0;
2768     size_t len;
2769 
2770     RCU_READ_LOCK_GUARD();
2771     rdma = qatomic_rcu_read(&rioc->rdmaout);
2772 
2773     if (!rdma) {
2774         error_setg(errp, "RDMA control channel output is not set");
2775         return -1;
2776     }
2777 
2778     if (rdma->errored) {
2779         error_setg(errp,
2780                    "RDMA is in an error state waiting migration to abort!");
2781         return -1;
2782     }
2783 
2784     /*
2785      * Push out any writes that
2786      * we're queued up for VM's ram.
2787      */
2788     ret = qemu_rdma_write_flush(rdma, errp);
2789     if (ret < 0) {
2790         rdma->errored = true;
2791         return -1;
2792     }
2793 
2794     for (int i = 0; i < niov; i++) {
2795         size_t remaining = iov[i].iov_len;
2796         uint8_t * data = (void *)iov[i].iov_base;
2797         while (remaining) {
2798             RDMAControlHeader head = {};
2799 
2800             len = MIN(remaining, RDMA_SEND_INCREMENT);
2801             remaining -= len;
2802 
2803             head.len = len;
2804             head.type = RDMA_CONTROL_QEMU_FILE;
2805 
2806             ret = qemu_rdma_exchange_send(rdma, &head,
2807                                           data, NULL, NULL, NULL, errp);
2808 
2809             if (ret < 0) {
2810                 rdma->errored = true;
2811                 return -1;
2812             }
2813 
2814             data += len;
2815             done += len;
2816         }
2817     }
2818 
2819     return done;
2820 }
2821 
2822 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2823                              size_t size, int idx)
2824 {
2825     size_t len = 0;
2826 
2827     if (rdma->wr_data[idx].control_len) {
2828         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2829 
2830         len = MIN(size, rdma->wr_data[idx].control_len);
2831         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2832         rdma->wr_data[idx].control_curr += len;
2833         rdma->wr_data[idx].control_len -= len;
2834     }
2835 
2836     return len;
2837 }
2838 
2839 /*
2840  * QEMUFile interface to the control channel.
2841  * RDMA links don't use bytestreams, so we have to
2842  * return bytes to QEMUFile opportunistically.
2843  */
2844 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2845                                       const struct iovec *iov,
2846                                       size_t niov,
2847                                       int **fds,
2848                                       size_t *nfds,
2849                                       int flags,
2850                                       Error **errp)
2851 {
2852     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2853     RDMAContext *rdma;
2854     RDMAControlHeader head;
2855     int ret;
2856     ssize_t done = 0;
2857     size_t len;
2858 
2859     RCU_READ_LOCK_GUARD();
2860     rdma = qatomic_rcu_read(&rioc->rdmain);
2861 
2862     if (!rdma) {
2863         error_setg(errp, "RDMA control channel input is not set");
2864         return -1;
2865     }
2866 
2867     if (rdma->errored) {
2868         error_setg(errp,
2869                    "RDMA is in an error state waiting migration to abort!");
2870         return -1;
2871     }
2872 
2873     for (int i = 0; i < niov; i++) {
2874         size_t want = iov[i].iov_len;
2875         uint8_t *data = (void *)iov[i].iov_base;
2876 
2877         /*
2878          * First, we hold on to the last SEND message we
2879          * were given and dish out the bytes until we run
2880          * out of bytes.
2881          */
2882         len = qemu_rdma_fill(rdma, data, want, 0);
2883         done += len;
2884         want -= len;
2885         /* Got what we needed, so go to next iovec */
2886         if (want == 0) {
2887             continue;
2888         }
2889 
2890         /* If we got any data so far, then don't wait
2891          * for more, just return what we have */
2892         if (done > 0) {
2893             break;
2894         }
2895 
2896 
2897         /* We've got nothing at all, so lets wait for
2898          * more to arrive
2899          */
2900         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE,
2901                                       errp);
2902 
2903         if (ret < 0) {
2904             rdma->errored = true;
2905             return -1;
2906         }
2907 
2908         /*
2909          * SEND was received with new bytes, now try again.
2910          */
2911         len = qemu_rdma_fill(rdma, data, want, 0);
2912         done += len;
2913         want -= len;
2914 
2915         /* Still didn't get enough, so lets just return */
2916         if (want) {
2917             if (done == 0) {
2918                 return QIO_CHANNEL_ERR_BLOCK;
2919             } else {
2920                 break;
2921             }
2922         }
2923     }
2924     return done;
2925 }
2926 
2927 /*
2928  * Block until all the outstanding chunks have been delivered by the hardware.
2929  */
2930 static int qemu_rdma_drain_cq(RDMAContext *rdma)
2931 {
2932     Error *err = NULL;
2933 
2934     if (qemu_rdma_write_flush(rdma, &err) < 0) {
2935         error_report_err(err);
2936         return -1;
2937     }
2938 
2939     while (rdma->nb_sent) {
2940         if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) {
2941             error_report("rdma migration: complete polling error!");
2942             return -1;
2943         }
2944     }
2945 
2946     qemu_rdma_unregister_waiting(rdma);
2947 
2948     return 0;
2949 }
2950 
2951 
2952 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2953                                          bool blocking,
2954                                          Error **errp)
2955 {
2956     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2957     /* XXX we should make readv/writev actually honour this :-) */
2958     rioc->blocking = blocking;
2959     return 0;
2960 }
2961 
2962 
2963 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2964 struct QIOChannelRDMASource {
2965     GSource parent;
2966     QIOChannelRDMA *rioc;
2967     GIOCondition condition;
2968 };
2969 
2970 static gboolean
2971 qio_channel_rdma_source_prepare(GSource *source,
2972                                 gint *timeout)
2973 {
2974     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2975     RDMAContext *rdma;
2976     GIOCondition cond = 0;
2977     *timeout = -1;
2978 
2979     RCU_READ_LOCK_GUARD();
2980     if (rsource->condition == G_IO_IN) {
2981         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2982     } else {
2983         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2984     }
2985 
2986     if (!rdma) {
2987         error_report("RDMAContext is NULL when prepare Gsource");
2988         return FALSE;
2989     }
2990 
2991     if (rdma->wr_data[0].control_len) {
2992         cond |= G_IO_IN;
2993     }
2994     cond |= G_IO_OUT;
2995 
2996     return cond & rsource->condition;
2997 }
2998 
2999 static gboolean
3000 qio_channel_rdma_source_check(GSource *source)
3001 {
3002     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3003     RDMAContext *rdma;
3004     GIOCondition cond = 0;
3005 
3006     RCU_READ_LOCK_GUARD();
3007     if (rsource->condition == G_IO_IN) {
3008         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3009     } else {
3010         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3011     }
3012 
3013     if (!rdma) {
3014         error_report("RDMAContext is NULL when check Gsource");
3015         return FALSE;
3016     }
3017 
3018     if (rdma->wr_data[0].control_len) {
3019         cond |= G_IO_IN;
3020     }
3021     cond |= G_IO_OUT;
3022 
3023     return cond & rsource->condition;
3024 }
3025 
3026 static gboolean
3027 qio_channel_rdma_source_dispatch(GSource *source,
3028                                  GSourceFunc callback,
3029                                  gpointer user_data)
3030 {
3031     QIOChannelFunc func = (QIOChannelFunc)callback;
3032     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3033     RDMAContext *rdma;
3034     GIOCondition cond = 0;
3035 
3036     RCU_READ_LOCK_GUARD();
3037     if (rsource->condition == G_IO_IN) {
3038         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3039     } else {
3040         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3041     }
3042 
3043     if (!rdma) {
3044         error_report("RDMAContext is NULL when dispatch Gsource");
3045         return FALSE;
3046     }
3047 
3048     if (rdma->wr_data[0].control_len) {
3049         cond |= G_IO_IN;
3050     }
3051     cond |= G_IO_OUT;
3052 
3053     return (*func)(QIO_CHANNEL(rsource->rioc),
3054                    (cond & rsource->condition),
3055                    user_data);
3056 }
3057 
3058 static void
3059 qio_channel_rdma_source_finalize(GSource *source)
3060 {
3061     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3062 
3063     object_unref(OBJECT(ssource->rioc));
3064 }
3065 
3066 static GSourceFuncs qio_channel_rdma_source_funcs = {
3067     qio_channel_rdma_source_prepare,
3068     qio_channel_rdma_source_check,
3069     qio_channel_rdma_source_dispatch,
3070     qio_channel_rdma_source_finalize
3071 };
3072 
3073 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3074                                               GIOCondition condition)
3075 {
3076     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3077     QIOChannelRDMASource *ssource;
3078     GSource *source;
3079 
3080     source = g_source_new(&qio_channel_rdma_source_funcs,
3081                           sizeof(QIOChannelRDMASource));
3082     ssource = (QIOChannelRDMASource *)source;
3083 
3084     ssource->rioc = rioc;
3085     object_ref(OBJECT(rioc));
3086 
3087     ssource->condition = condition;
3088 
3089     return source;
3090 }
3091 
3092 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3093                                                 AioContext *read_ctx,
3094                                                 IOHandler *io_read,
3095                                                 AioContext *write_ctx,
3096                                                 IOHandler *io_write,
3097                                                 void *opaque)
3098 {
3099     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3100     if (io_read) {
3101         aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
3102                            io_read, io_write, NULL, NULL, opaque);
3103         aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
3104                            io_read, io_write, NULL, NULL, opaque);
3105     } else {
3106         aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
3107                            io_read, io_write, NULL, NULL, opaque);
3108         aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
3109                            io_read, io_write, NULL, NULL, opaque);
3110     }
3111 }
3112 
3113 struct rdma_close_rcu {
3114     struct rcu_head rcu;
3115     RDMAContext *rdmain;
3116     RDMAContext *rdmaout;
3117 };
3118 
3119 /* callback from qio_channel_rdma_close via call_rcu */
3120 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3121 {
3122     if (rcu->rdmain) {
3123         qemu_rdma_cleanup(rcu->rdmain);
3124     }
3125 
3126     if (rcu->rdmaout) {
3127         qemu_rdma_cleanup(rcu->rdmaout);
3128     }
3129 
3130     g_free(rcu->rdmain);
3131     g_free(rcu->rdmaout);
3132     g_free(rcu);
3133 }
3134 
3135 static int qio_channel_rdma_close(QIOChannel *ioc,
3136                                   Error **errp)
3137 {
3138     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3139     RDMAContext *rdmain, *rdmaout;
3140     struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3141 
3142     trace_qemu_rdma_close();
3143 
3144     rdmain = rioc->rdmain;
3145     if (rdmain) {
3146         qatomic_rcu_set(&rioc->rdmain, NULL);
3147     }
3148 
3149     rdmaout = rioc->rdmaout;
3150     if (rdmaout) {
3151         qatomic_rcu_set(&rioc->rdmaout, NULL);
3152     }
3153 
3154     rcu->rdmain = rdmain;
3155     rcu->rdmaout = rdmaout;
3156     call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3157 
3158     return 0;
3159 }
3160 
3161 static int
3162 qio_channel_rdma_shutdown(QIOChannel *ioc,
3163                             QIOChannelShutdown how,
3164                             Error **errp)
3165 {
3166     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3167     RDMAContext *rdmain, *rdmaout;
3168 
3169     RCU_READ_LOCK_GUARD();
3170 
3171     rdmain = qatomic_rcu_read(&rioc->rdmain);
3172     rdmaout = qatomic_rcu_read(&rioc->rdmain);
3173 
3174     switch (how) {
3175     case QIO_CHANNEL_SHUTDOWN_READ:
3176         if (rdmain) {
3177             rdmain->errored = true;
3178         }
3179         break;
3180     case QIO_CHANNEL_SHUTDOWN_WRITE:
3181         if (rdmaout) {
3182             rdmaout->errored = true;
3183         }
3184         break;
3185     case QIO_CHANNEL_SHUTDOWN_BOTH:
3186     default:
3187         if (rdmain) {
3188             rdmain->errored = true;
3189         }
3190         if (rdmaout) {
3191             rdmaout->errored = true;
3192         }
3193         break;
3194     }
3195 
3196     return 0;
3197 }
3198 
3199 /*
3200  * Parameters:
3201  *    @offset == 0 :
3202  *        This means that 'block_offset' is a full virtual address that does not
3203  *        belong to a RAMBlock of the virtual machine and instead
3204  *        represents a private malloc'd memory area that the caller wishes to
3205  *        transfer.
3206  *
3207  *    @offset != 0 :
3208  *        Offset is an offset to be added to block_offset and used
3209  *        to also lookup the corresponding RAMBlock.
3210  *
3211  *    @size : Number of bytes to transfer
3212  *
3213  *    @pages_sent : User-specificed pointer to indicate how many pages were
3214  *                  sent. Usually, this will not be more than a few bytes of
3215  *                  the protocol because most transfers are sent asynchronously.
3216  */
3217 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
3218                                ram_addr_t offset, size_t size)
3219 {
3220     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3221     Error *err = NULL;
3222     RDMAContext *rdma;
3223     int ret;
3224 
3225     RCU_READ_LOCK_GUARD();
3226     rdma = qatomic_rcu_read(&rioc->rdmaout);
3227 
3228     if (!rdma) {
3229         return -1;
3230     }
3231 
3232     if (rdma_errored(rdma)) {
3233         return -1;
3234     }
3235 
3236     qemu_fflush(f);
3237 
3238     /*
3239      * Add this page to the current 'chunk'. If the chunk
3240      * is full, or the page doesn't belong to the current chunk,
3241      * an actual RDMA write will occur and a new chunk will be formed.
3242      */
3243     ret = qemu_rdma_write(rdma, block_offset, offset, size, &err);
3244     if (ret < 0) {
3245         error_report_err(err);
3246         goto err;
3247     }
3248 
3249     /*
3250      * Drain the Completion Queue if possible, but do not block,
3251      * just poll.
3252      *
3253      * If nothing to poll, the end of the iteration will do this
3254      * again to make sure we don't overflow the request queue.
3255      */
3256     while (1) {
3257         uint64_t wr_id, wr_id_in;
3258         ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3259 
3260         if (ret < 0) {
3261             error_report("rdma migration: polling error");
3262             goto err;
3263         }
3264 
3265         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3266 
3267         if (wr_id == RDMA_WRID_NONE) {
3268             break;
3269         }
3270     }
3271 
3272     while (1) {
3273         uint64_t wr_id, wr_id_in;
3274         ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3275 
3276         if (ret < 0) {
3277             error_report("rdma migration: polling error");
3278             goto err;
3279         }
3280 
3281         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3282 
3283         if (wr_id == RDMA_WRID_NONE) {
3284             break;
3285         }
3286     }
3287 
3288     return RAM_SAVE_CONTROL_DELAYED;
3289 
3290 err:
3291     rdma->errored = true;
3292     return -1;
3293 }
3294 
3295 int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset,
3296                            ram_addr_t offset, size_t size)
3297 {
3298     if (!migrate_rdma() || migration_in_postcopy()) {
3299         return RAM_SAVE_CONTROL_NOT_SUPP;
3300     }
3301 
3302     int ret = qemu_rdma_save_page(f, block_offset, offset, size);
3303 
3304     if (ret != RAM_SAVE_CONTROL_DELAYED &&
3305         ret != RAM_SAVE_CONTROL_NOT_SUPP) {
3306         if (ret < 0) {
3307             qemu_file_set_error(f, ret);
3308         }
3309     }
3310     return ret;
3311 }
3312 
3313 static void rdma_accept_incoming_migration(void *opaque);
3314 
3315 static void rdma_cm_poll_handler(void *opaque)
3316 {
3317     RDMAContext *rdma = opaque;
3318     struct rdma_cm_event *cm_event;
3319     MigrationIncomingState *mis = migration_incoming_get_current();
3320 
3321     if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
3322         error_report("get_cm_event failed %d", errno);
3323         return;
3324     }
3325 
3326     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3327         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3328         if (!rdma->errored &&
3329             migration_incoming_get_current()->state !=
3330               MIGRATION_STATUS_COMPLETED) {
3331             error_report("receive cm event, cm event is %d", cm_event->event);
3332             rdma->errored = true;
3333             if (rdma->return_path) {
3334                 rdma->return_path->errored = true;
3335             }
3336         }
3337         rdma_ack_cm_event(cm_event);
3338         if (mis->loadvm_co) {
3339             qemu_coroutine_enter(mis->loadvm_co);
3340         }
3341         return;
3342     }
3343     rdma_ack_cm_event(cm_event);
3344 }
3345 
3346 static int qemu_rdma_accept(RDMAContext *rdma)
3347 {
3348     Error *err = NULL;
3349     RDMACapabilities cap;
3350     struct rdma_conn_param conn_param = {
3351                                             .responder_resources = 2,
3352                                             .private_data = &cap,
3353                                             .private_data_len = sizeof(cap),
3354                                          };
3355     RDMAContext *rdma_return_path = NULL;
3356     struct rdma_cm_event *cm_event;
3357     struct ibv_context *verbs;
3358     int ret;
3359 
3360     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3361     if (ret < 0) {
3362         goto err_rdma_dest_wait;
3363     }
3364 
3365     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3366         rdma_ack_cm_event(cm_event);
3367         goto err_rdma_dest_wait;
3368     }
3369 
3370     /*
3371      * initialize the RDMAContext for return path for postcopy after first
3372      * connection request reached.
3373      */
3374     if ((migrate_postcopy() || migrate_return_path())
3375         && !rdma->is_return_path) {
3376         rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
3377         if (rdma_return_path == NULL) {
3378             rdma_ack_cm_event(cm_event);
3379             goto err_rdma_dest_wait;
3380         }
3381 
3382         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3383     }
3384 
3385     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3386 
3387     network_to_caps(&cap);
3388 
3389     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3390         error_report("Unknown source RDMA version: %d, bailing...",
3391                      cap.version);
3392         rdma_ack_cm_event(cm_event);
3393         goto err_rdma_dest_wait;
3394     }
3395 
3396     /*
3397      * Respond with only the capabilities this version of QEMU knows about.
3398      */
3399     cap.flags &= known_capabilities;
3400 
3401     /*
3402      * Enable the ones that we do know about.
3403      * Add other checks here as new ones are introduced.
3404      */
3405     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3406         rdma->pin_all = true;
3407     }
3408 
3409     rdma->cm_id = cm_event->id;
3410     verbs = cm_event->id->verbs;
3411 
3412     rdma_ack_cm_event(cm_event);
3413 
3414     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3415 
3416     caps_to_network(&cap);
3417 
3418     trace_qemu_rdma_accept_pin_verbsc(verbs);
3419 
3420     if (!rdma->verbs) {
3421         rdma->verbs = verbs;
3422     } else if (rdma->verbs != verbs) {
3423         error_report("ibv context not matching %p, %p!", rdma->verbs,
3424                      verbs);
3425         goto err_rdma_dest_wait;
3426     }
3427 
3428     qemu_rdma_dump_id("dest_init", verbs);
3429 
3430     ret = qemu_rdma_alloc_pd_cq(rdma, &err);
3431     if (ret < 0) {
3432         error_report_err(err);
3433         goto err_rdma_dest_wait;
3434     }
3435 
3436     ret = qemu_rdma_alloc_qp(rdma);
3437     if (ret < 0) {
3438         error_report("rdma migration: error allocating qp!");
3439         goto err_rdma_dest_wait;
3440     }
3441 
3442     qemu_rdma_init_ram_blocks(rdma);
3443 
3444     for (int i = 0; i < RDMA_WRID_MAX; i++) {
3445         ret = qemu_rdma_reg_control(rdma, i);
3446         if (ret < 0) {
3447             error_report("rdma: error registering %d control", i);
3448             goto err_rdma_dest_wait;
3449         }
3450     }
3451 
3452     /* Accept the second connection request for return path */
3453     if ((migrate_postcopy() || migrate_return_path())
3454         && !rdma->is_return_path) {
3455         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3456                             NULL,
3457                             (void *)(intptr_t)rdma->return_path);
3458     } else {
3459         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3460                             NULL, rdma);
3461     }
3462 
3463     ret = rdma_accept(rdma->cm_id, &conn_param);
3464     if (ret < 0) {
3465         error_report("rdma_accept failed");
3466         goto err_rdma_dest_wait;
3467     }
3468 
3469     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3470     if (ret < 0) {
3471         error_report("rdma_accept get_cm_event failed");
3472         goto err_rdma_dest_wait;
3473     }
3474 
3475     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3476         error_report("rdma_accept not event established");
3477         rdma_ack_cm_event(cm_event);
3478         goto err_rdma_dest_wait;
3479     }
3480 
3481     rdma_ack_cm_event(cm_event);
3482     rdma->connected = true;
3483 
3484     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err);
3485     if (ret < 0) {
3486         error_report_err(err);
3487         goto err_rdma_dest_wait;
3488     }
3489 
3490     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3491 
3492     return 0;
3493 
3494 err_rdma_dest_wait:
3495     rdma->errored = true;
3496     qemu_rdma_cleanup(rdma);
3497     g_free(rdma_return_path);
3498     return -1;
3499 }
3500 
3501 static int dest_ram_sort_func(const void *a, const void *b)
3502 {
3503     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3504     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3505 
3506     return (a_index < b_index) ? -1 : (a_index != b_index);
3507 }
3508 
3509 /*
3510  * During each iteration of the migration, we listen for instructions
3511  * by the source VM to perform dynamic page registrations before they
3512  * can perform RDMA operations.
3513  *
3514  * We respond with the 'rkey'.
3515  *
3516  * Keep doing this until the source tells us to stop.
3517  */
3518 int rdma_registration_handle(QEMUFile *f)
3519 {
3520     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3521                                .type = RDMA_CONTROL_REGISTER_RESULT,
3522                                .repeat = 0,
3523                              };
3524     RDMAControlHeader unreg_resp = { .len = 0,
3525                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3526                                .repeat = 0,
3527                              };
3528     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3529                                  .repeat = 1 };
3530     QIOChannelRDMA *rioc;
3531     Error *err = NULL;
3532     RDMAContext *rdma;
3533     RDMALocalBlocks *local;
3534     RDMAControlHeader head;
3535     RDMARegister *reg, *registers;
3536     RDMACompress *comp;
3537     RDMARegisterResult *reg_result;
3538     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3539     RDMALocalBlock *block;
3540     void *host_addr;
3541     int ret;
3542     int idx = 0;
3543 
3544     if (!migrate_rdma()) {
3545         return 0;
3546     }
3547 
3548     RCU_READ_LOCK_GUARD();
3549     rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3550     rdma = qatomic_rcu_read(&rioc->rdmain);
3551 
3552     if (!rdma) {
3553         return -1;
3554     }
3555 
3556     if (rdma_errored(rdma)) {
3557         return -1;
3558     }
3559 
3560     local = &rdma->local_ram_blocks;
3561     do {
3562         trace_rdma_registration_handle_wait();
3563 
3564         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err);
3565 
3566         if (ret < 0) {
3567             error_report_err(err);
3568             break;
3569         }
3570 
3571         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3572             error_report("rdma: Too many requests in this message (%d)."
3573                             "Bailing.", head.repeat);
3574             break;
3575         }
3576 
3577         switch (head.type) {
3578         case RDMA_CONTROL_COMPRESS:
3579             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3580             network_to_compress(comp);
3581 
3582             trace_rdma_registration_handle_compress(comp->length,
3583                                                     comp->block_idx,
3584                                                     comp->offset);
3585             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3586                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3587                              (unsigned int)comp->block_idx,
3588                              rdma->local_ram_blocks.nb_blocks);
3589                 goto err;
3590             }
3591             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3592 
3593             host_addr = block->local_host_addr +
3594                             (comp->offset - block->offset);
3595 
3596             ram_handle_compressed(host_addr, comp->value, comp->length);
3597             break;
3598 
3599         case RDMA_CONTROL_REGISTER_FINISHED:
3600             trace_rdma_registration_handle_finished();
3601             return 0;
3602 
3603         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3604             trace_rdma_registration_handle_ram_blocks();
3605 
3606             /* Sort our local RAM Block list so it's the same as the source,
3607              * we can do this since we've filled in a src_index in the list
3608              * as we received the RAMBlock list earlier.
3609              */
3610             qsort(rdma->local_ram_blocks.block,
3611                   rdma->local_ram_blocks.nb_blocks,
3612                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3613             for (int i = 0; i < local->nb_blocks; i++) {
3614                 local->block[i].index = i;
3615             }
3616 
3617             if (rdma->pin_all) {
3618                 ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err);
3619                 if (ret < 0) {
3620                     error_report_err(err);
3621                     goto err;
3622                 }
3623             }
3624 
3625             /*
3626              * Dest uses this to prepare to transmit the RAMBlock descriptions
3627              * to the source VM after connection setup.
3628              * Both sides use the "remote" structure to communicate and update
3629              * their "local" descriptions with what was sent.
3630              */
3631             for (int i = 0; i < local->nb_blocks; i++) {
3632                 rdma->dest_blocks[i].remote_host_addr =
3633                     (uintptr_t)(local->block[i].local_host_addr);
3634 
3635                 if (rdma->pin_all) {
3636                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3637                 }
3638 
3639                 rdma->dest_blocks[i].offset = local->block[i].offset;
3640                 rdma->dest_blocks[i].length = local->block[i].length;
3641 
3642                 dest_block_to_network(&rdma->dest_blocks[i]);
3643                 trace_rdma_registration_handle_ram_blocks_loop(
3644                     local->block[i].block_name,
3645                     local->block[i].offset,
3646                     local->block[i].length,
3647                     local->block[i].local_host_addr,
3648                     local->block[i].src_index);
3649             }
3650 
3651             blocks.len = rdma->local_ram_blocks.nb_blocks
3652                                                 * sizeof(RDMADestBlock);
3653 
3654 
3655             ret = qemu_rdma_post_send_control(rdma,
3656                                     (uint8_t *) rdma->dest_blocks, &blocks,
3657                                     &err);
3658 
3659             if (ret < 0) {
3660                 error_report_err(err);
3661                 goto err;
3662             }
3663 
3664             break;
3665         case RDMA_CONTROL_REGISTER_REQUEST:
3666             trace_rdma_registration_handle_register(head.repeat);
3667 
3668             reg_resp.repeat = head.repeat;
3669             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3670 
3671             for (int count = 0; count < head.repeat; count++) {
3672                 uint64_t chunk;
3673                 uint8_t *chunk_start, *chunk_end;
3674 
3675                 reg = &registers[count];
3676                 network_to_register(reg);
3677 
3678                 reg_result = &results[count];
3679 
3680                 trace_rdma_registration_handle_register_loop(count,
3681                          reg->current_index, reg->key.current_addr, reg->chunks);
3682 
3683                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3684                     error_report("rdma: 'register' bad block index %u (vs %d)",
3685                                  (unsigned int)reg->current_index,
3686                                  rdma->local_ram_blocks.nb_blocks);
3687                     goto err;
3688                 }
3689                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3690                 if (block->is_ram_block) {
3691                     if (block->offset > reg->key.current_addr) {
3692                         error_report("rdma: bad register address for block %s"
3693                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3694                             block->block_name, block->offset,
3695                             reg->key.current_addr);
3696                         goto err;
3697                     }
3698                     host_addr = (block->local_host_addr +
3699                                 (reg->key.current_addr - block->offset));
3700                     chunk = ram_chunk_index(block->local_host_addr,
3701                                             (uint8_t *) host_addr);
3702                 } else {
3703                     chunk = reg->key.chunk;
3704                     host_addr = block->local_host_addr +
3705                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3706                     /* Check for particularly bad chunk value */
3707                     if (host_addr < (void *)block->local_host_addr) {
3708                         error_report("rdma: bad chunk for block %s"
3709                             " chunk: %" PRIx64,
3710                             block->block_name, reg->key.chunk);
3711                         goto err;
3712                     }
3713                 }
3714                 chunk_start = ram_chunk_start(block, chunk);
3715                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3716                 /* avoid "-Waddress-of-packed-member" warning */
3717                 uint32_t tmp_rkey = 0;
3718                 if (qemu_rdma_register_and_get_keys(rdma, block,
3719                             (uintptr_t)host_addr, NULL, &tmp_rkey,
3720                             chunk, chunk_start, chunk_end)) {
3721                     error_report("cannot get rkey");
3722                     goto err;
3723                 }
3724                 reg_result->rkey = tmp_rkey;
3725 
3726                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3727 
3728                 trace_rdma_registration_handle_register_rkey(reg_result->rkey);
3729 
3730                 result_to_network(reg_result);
3731             }
3732 
3733             ret = qemu_rdma_post_send_control(rdma,
3734                             (uint8_t *) results, &reg_resp, &err);
3735 
3736             if (ret < 0) {
3737                 error_report_err(err);
3738                 goto err;
3739             }
3740             break;
3741         case RDMA_CONTROL_UNREGISTER_REQUEST:
3742             trace_rdma_registration_handle_unregister(head.repeat);
3743             unreg_resp.repeat = head.repeat;
3744             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3745 
3746             for (int count = 0; count < head.repeat; count++) {
3747                 reg = &registers[count];
3748                 network_to_register(reg);
3749 
3750                 trace_rdma_registration_handle_unregister_loop(count,
3751                            reg->current_index, reg->key.chunk);
3752 
3753                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3754 
3755                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3756                 block->pmr[reg->key.chunk] = NULL;
3757 
3758                 if (ret != 0) {
3759                     error_report("rdma unregistration chunk failed: %s",
3760                                  strerror(errno));
3761                     goto err;
3762                 }
3763 
3764                 rdma->total_registrations--;
3765 
3766                 trace_rdma_registration_handle_unregister_success(reg->key.chunk);
3767             }
3768 
3769             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err);
3770 
3771             if (ret < 0) {
3772                 error_report_err(err);
3773                 goto err;
3774             }
3775             break;
3776         case RDMA_CONTROL_REGISTER_RESULT:
3777             error_report("Invalid RESULT message at dest.");
3778             goto err;
3779         default:
3780             error_report("Unknown control message %s", control_desc(head.type));
3781             goto err;
3782         }
3783     } while (1);
3784 
3785 err:
3786     rdma->errored = true;
3787     return -1;
3788 }
3789 
3790 /* Destination:
3791  * Called during the initial RAM load section which lists the
3792  * RAMBlocks by name.  This lets us know the order of the RAMBlocks on
3793  * the source.  We've already built our local RAMBlock list, but not
3794  * yet sent the list to the source.
3795  */
3796 int rdma_block_notification_handle(QEMUFile *f, const char *name)
3797 {
3798     int curr;
3799     int found = -1;
3800 
3801     if (!migrate_rdma()) {
3802         return 0;
3803     }
3804 
3805     RCU_READ_LOCK_GUARD();
3806     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3807     RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain);
3808 
3809     if (!rdma) {
3810         return -1;
3811     }
3812 
3813     /* Find the matching RAMBlock in our local list */
3814     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3815         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3816             found = curr;
3817             break;
3818         }
3819     }
3820 
3821     if (found == -1) {
3822         error_report("RAMBlock '%s' not found on destination", name);
3823         return -1;
3824     }
3825 
3826     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3827     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3828     rdma->next_src_index++;
3829 
3830     return 0;
3831 }
3832 
3833 int rdma_registration_start(QEMUFile *f, uint64_t flags)
3834 {
3835     if (!migrate_rdma() || migration_in_postcopy()) {
3836         return 0;
3837     }
3838 
3839     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3840     RCU_READ_LOCK_GUARD();
3841     RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout);
3842     if (!rdma) {
3843         return -1;
3844     }
3845 
3846     if (rdma_errored(rdma)) {
3847         return -1;
3848     }
3849 
3850     trace_rdma_registration_start(flags);
3851     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3852     qemu_fflush(f);
3853 
3854     return 0;
3855 }
3856 
3857 /*
3858  * Inform dest that dynamic registrations are done for now.
3859  * First, flush writes, if any.
3860  */
3861 int rdma_registration_stop(QEMUFile *f, uint64_t flags)
3862 {
3863     QIOChannelRDMA *rioc;
3864     Error *err = NULL;
3865     RDMAContext *rdma;
3866     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3867     int ret;
3868 
3869     if (!migrate_rdma() || migration_in_postcopy()) {
3870         return 0;
3871     }
3872 
3873     RCU_READ_LOCK_GUARD();
3874     rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3875     rdma = qatomic_rcu_read(&rioc->rdmaout);
3876     if (!rdma) {
3877         return -1;
3878     }
3879 
3880     if (rdma_errored(rdma)) {
3881         return -1;
3882     }
3883 
3884     qemu_fflush(f);
3885     ret = qemu_rdma_drain_cq(rdma);
3886 
3887     if (ret < 0) {
3888         goto err;
3889     }
3890 
3891     if (flags == RAM_CONTROL_SETUP) {
3892         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3893         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3894         int reg_result_idx, nb_dest_blocks;
3895 
3896         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3897         trace_rdma_registration_stop_ram();
3898 
3899         /*
3900          * Make sure that we parallelize the pinning on both sides.
3901          * For very large guests, doing this serially takes a really
3902          * long time, so we have to 'interleave' the pinning locally
3903          * with the control messages by performing the pinning on this
3904          * side before we receive the control response from the other
3905          * side that the pinning has completed.
3906          */
3907         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3908                     &reg_result_idx, rdma->pin_all ?
3909                     qemu_rdma_reg_whole_ram_blocks : NULL,
3910                     &err);
3911         if (ret < 0) {
3912             error_report_err(err);
3913             return -1;
3914         }
3915 
3916         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3917 
3918         /*
3919          * The protocol uses two different sets of rkeys (mutually exclusive):
3920          * 1. One key to represent the virtual address of the entire ram block.
3921          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3922          * 2. One to represent individual chunks within a ram block.
3923          *    (dynamic chunk registration enabled - pin individual chunks.)
3924          *
3925          * Once the capability is successfully negotiated, the destination transmits
3926          * the keys to use (or sends them later) including the virtual addresses
3927          * and then propagates the remote ram block descriptions to his local copy.
3928          */
3929 
3930         if (local->nb_blocks != nb_dest_blocks) {
3931             error_report("ram blocks mismatch (Number of blocks %d vs %d)",
3932                          local->nb_blocks, nb_dest_blocks);
3933             error_printf("Your QEMU command line parameters are probably "
3934                          "not identical on both the source and destination.");
3935             rdma->errored = true;
3936             return -1;
3937         }
3938 
3939         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3940         memcpy(rdma->dest_blocks,
3941             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3942         for (int i = 0; i < nb_dest_blocks; i++) {
3943             network_to_dest_block(&rdma->dest_blocks[i]);
3944 
3945             /* We require that the blocks are in the same order */
3946             if (rdma->dest_blocks[i].length != local->block[i].length) {
3947                 error_report("Block %s/%d has a different length %" PRIu64
3948                              "vs %" PRIu64,
3949                              local->block[i].block_name, i,
3950                              local->block[i].length,
3951                              rdma->dest_blocks[i].length);
3952                 rdma->errored = true;
3953                 return -1;
3954             }
3955             local->block[i].remote_host_addr =
3956                     rdma->dest_blocks[i].remote_host_addr;
3957             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3958         }
3959     }
3960 
3961     trace_rdma_registration_stop(flags);
3962 
3963     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3964     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err);
3965 
3966     if (ret < 0) {
3967         error_report_err(err);
3968         goto err;
3969     }
3970 
3971     return 0;
3972 err:
3973     rdma->errored = true;
3974     return -1;
3975 }
3976 
3977 static void qio_channel_rdma_finalize(Object *obj)
3978 {
3979     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3980     if (rioc->rdmain) {
3981         qemu_rdma_cleanup(rioc->rdmain);
3982         g_free(rioc->rdmain);
3983         rioc->rdmain = NULL;
3984     }
3985     if (rioc->rdmaout) {
3986         qemu_rdma_cleanup(rioc->rdmaout);
3987         g_free(rioc->rdmaout);
3988         rioc->rdmaout = NULL;
3989     }
3990 }
3991 
3992 static void qio_channel_rdma_class_init(ObjectClass *klass,
3993                                         void *class_data G_GNUC_UNUSED)
3994 {
3995     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3996 
3997     ioc_klass->io_writev = qio_channel_rdma_writev;
3998     ioc_klass->io_readv = qio_channel_rdma_readv;
3999     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
4000     ioc_klass->io_close = qio_channel_rdma_close;
4001     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
4002     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
4003     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
4004 }
4005 
4006 static const TypeInfo qio_channel_rdma_info = {
4007     .parent = TYPE_QIO_CHANNEL,
4008     .name = TYPE_QIO_CHANNEL_RDMA,
4009     .instance_size = sizeof(QIOChannelRDMA),
4010     .instance_finalize = qio_channel_rdma_finalize,
4011     .class_init = qio_channel_rdma_class_init,
4012 };
4013 
4014 static void qio_channel_rdma_register_types(void)
4015 {
4016     type_register_static(&qio_channel_rdma_info);
4017 }
4018 
4019 type_init(qio_channel_rdma_register_types);
4020 
4021 static QEMUFile *rdma_new_input(RDMAContext *rdma)
4022 {
4023     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4024 
4025     rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
4026     rioc->rdmain = rdma;
4027     rioc->rdmaout = rdma->return_path;
4028 
4029     return rioc->file;
4030 }
4031 
4032 static QEMUFile *rdma_new_output(RDMAContext *rdma)
4033 {
4034     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4035 
4036     rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
4037     rioc->rdmaout = rdma;
4038     rioc->rdmain = rdma->return_path;
4039 
4040     return rioc->file;
4041 }
4042 
4043 static void rdma_accept_incoming_migration(void *opaque)
4044 {
4045     RDMAContext *rdma = opaque;
4046     QEMUFile *f;
4047     Error *local_err = NULL;
4048 
4049     trace_qemu_rdma_accept_incoming_migration();
4050     if (qemu_rdma_accept(rdma) < 0) {
4051         error_report("RDMA ERROR: Migration initialization failed");
4052         return;
4053     }
4054 
4055     trace_qemu_rdma_accept_incoming_migration_accepted();
4056 
4057     if (rdma->is_return_path) {
4058         return;
4059     }
4060 
4061     f = rdma_new_input(rdma);
4062     if (f == NULL) {
4063         error_report("RDMA ERROR: could not open RDMA for input");
4064         qemu_rdma_cleanup(rdma);
4065         return;
4066     }
4067 
4068     rdma->migration_started_on_destination = 1;
4069     migration_fd_process_incoming(f, &local_err);
4070     if (local_err) {
4071         error_reportf_err(local_err, "RDMA ERROR:");
4072     }
4073 }
4074 
4075 void rdma_start_incoming_migration(const char *host_port, Error **errp)
4076 {
4077     MigrationState *s = migrate_get_current();
4078     int ret;
4079     RDMAContext *rdma;
4080 
4081     trace_rdma_start_incoming_migration();
4082 
4083     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4084     if (ram_block_discard_is_required()) {
4085         error_setg(errp, "RDMA: cannot disable RAM discard");
4086         return;
4087     }
4088 
4089     rdma = qemu_rdma_data_init(host_port, errp);
4090     if (rdma == NULL) {
4091         goto err;
4092     }
4093 
4094     ret = qemu_rdma_dest_init(rdma, errp);
4095     if (ret < 0) {
4096         goto err;
4097     }
4098 
4099     trace_rdma_start_incoming_migration_after_dest_init();
4100 
4101     ret = rdma_listen(rdma->listen_id, 5);
4102 
4103     if (ret < 0) {
4104         error_setg(errp, "RDMA ERROR: listening on socket!");
4105         goto cleanup_rdma;
4106     }
4107 
4108     trace_rdma_start_incoming_migration_after_rdma_listen();
4109     s->rdma_migration = true;
4110     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4111                         NULL, (void *)(intptr_t)rdma);
4112     return;
4113 
4114 cleanup_rdma:
4115     qemu_rdma_cleanup(rdma);
4116 err:
4117     if (rdma) {
4118         g_free(rdma->host);
4119         g_free(rdma->host_port);
4120     }
4121     g_free(rdma);
4122 }
4123 
4124 void rdma_start_outgoing_migration(void *opaque,
4125                             const char *host_port, Error **errp)
4126 {
4127     MigrationState *s = opaque;
4128     RDMAContext *rdma_return_path = NULL;
4129     RDMAContext *rdma;
4130     int ret;
4131 
4132     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4133     if (ram_block_discard_is_required()) {
4134         error_setg(errp, "RDMA: cannot disable RAM discard");
4135         return;
4136     }
4137 
4138     rdma = qemu_rdma_data_init(host_port, errp);
4139     if (rdma == NULL) {
4140         goto err;
4141     }
4142 
4143     ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
4144 
4145     if (ret < 0) {
4146         goto err;
4147     }
4148 
4149     trace_rdma_start_outgoing_migration_after_rdma_source_init();
4150     ret = qemu_rdma_connect(rdma, false, errp);
4151 
4152     if (ret < 0) {
4153         goto err;
4154     }
4155 
4156     /* RDMA postcopy need a separate queue pair for return path */
4157     if (migrate_postcopy() || migrate_return_path()) {
4158         rdma_return_path = qemu_rdma_data_init(host_port, errp);
4159 
4160         if (rdma_return_path == NULL) {
4161             goto return_path_err;
4162         }
4163 
4164         ret = qemu_rdma_source_init(rdma_return_path,
4165                                     migrate_rdma_pin_all(), errp);
4166 
4167         if (ret < 0) {
4168             goto return_path_err;
4169         }
4170 
4171         ret = qemu_rdma_connect(rdma_return_path, true, errp);
4172 
4173         if (ret < 0) {
4174             goto return_path_err;
4175         }
4176 
4177         rdma->return_path = rdma_return_path;
4178         rdma_return_path->return_path = rdma;
4179         rdma_return_path->is_return_path = true;
4180     }
4181 
4182     trace_rdma_start_outgoing_migration_after_rdma_connect();
4183 
4184     s->to_dst_file = rdma_new_output(rdma);
4185     s->rdma_migration = true;
4186     migrate_fd_connect(s, NULL);
4187     return;
4188 return_path_err:
4189     qemu_rdma_cleanup(rdma);
4190 err:
4191     g_free(rdma);
4192     g_free(rdma_return_path);
4193 }
4194