xref: /openbmc/qemu/migration/rdma.c (revision a7a05f5f6a4085afbede315e749b1c67e78c966b)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "exec/target_page.h"
21 #include "rdma.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "qemu-file.h"
25 #include "ram.h"
26 #include "qemu/error-report.h"
27 #include "qemu/main-loop.h"
28 #include "qemu/module.h"
29 #include "qemu/rcu.h"
30 #include "qemu/sockets.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/coroutine.h"
33 #include "exec/memory.h"
34 #include <sys/socket.h>
35 #include <netdb.h>
36 #include <arpa/inet.h>
37 #include <rdma/rdma_cma.h>
38 #include "trace.h"
39 #include "qom/object.h"
40 #include "options.h"
41 #include <poll.h>
42 
43 #define RDMA_RESOLVE_TIMEOUT_MS 10000
44 
45 /* Do not merge data if larger than this. */
46 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
48 
49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
50 
51 /*
52  * This is only for non-live state being migrated.
53  * Instead of RDMA_WRITE messages, we use RDMA_SEND
54  * messages for that state, which requires a different
55  * delivery design than main memory.
56  */
57 #define RDMA_SEND_INCREMENT 32768
58 
59 /*
60  * Maximum size infiniband SEND message
61  */
62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
64 
65 #define RDMA_CONTROL_VERSION_CURRENT 1
66 /*
67  * Capabilities for negotiation.
68  */
69 #define RDMA_CAPABILITY_PIN_ALL 0x01
70 
71 /*
72  * Add the other flags above to this list of known capabilities
73  * as they are introduced.
74  */
75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
76 
77 /*
78  * A work request ID is 64-bits and we split up these bits
79  * into 3 parts:
80  *
81  * bits 0-15 : type of control message, 2^16
82  * bits 16-29: ram block index, 2^14
83  * bits 30-63: ram block chunk number, 2^34
84  *
85  * The last two bit ranges are only used for RDMA writes,
86  * in order to track their completion and potentially
87  * also track unregistration status of the message.
88  */
89 #define RDMA_WRID_TYPE_SHIFT  0UL
90 #define RDMA_WRID_BLOCK_SHIFT 16UL
91 #define RDMA_WRID_CHUNK_SHIFT 30UL
92 
93 #define RDMA_WRID_TYPE_MASK \
94     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
95 
96 #define RDMA_WRID_BLOCK_MASK \
97     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
98 
99 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
100 
101 /*
102  * RDMA migration protocol:
103  * 1. RDMA Writes (data messages, i.e. RAM)
104  * 2. IB Send/Recv (control channel messages)
105  */
106 enum {
107     RDMA_WRID_NONE = 0,
108     RDMA_WRID_RDMA_WRITE = 1,
109     RDMA_WRID_SEND_CONTROL = 2000,
110     RDMA_WRID_RECV_CONTROL = 4000,
111 };
112 
113 /*
114  * Work request IDs for IB SEND messages only (not RDMA writes).
115  * This is used by the migration protocol to transmit
116  * control messages (such as device state and registration commands)
117  *
118  * We could use more WRs, but we have enough for now.
119  */
120 enum {
121     RDMA_WRID_READY = 0,
122     RDMA_WRID_DATA,
123     RDMA_WRID_CONTROL,
124     RDMA_WRID_MAX,
125 };
126 
127 /*
128  * SEND/RECV IB Control Messages.
129  */
130 enum {
131     RDMA_CONTROL_NONE = 0,
132     RDMA_CONTROL_ERROR,
133     RDMA_CONTROL_READY,               /* ready to receive */
134     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
135     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
136     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
137     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
138     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
139     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
140     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
141     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
142     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
143 };
144 
145 
146 /*
147  * Memory and MR structures used to represent an IB Send/Recv work request.
148  * This is *not* used for RDMA writes, only IB Send/Recv.
149  */
150 typedef struct {
151     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
152     struct   ibv_mr *control_mr;               /* registration metadata */
153     size_t   control_len;                      /* length of the message */
154     uint8_t *control_curr;                     /* start of unconsumed bytes */
155 } RDMAWorkRequestData;
156 
157 /*
158  * Negotiate RDMA capabilities during connection-setup time.
159  */
160 typedef struct {
161     uint32_t version;
162     uint32_t flags;
163 } RDMACapabilities;
164 
165 static void caps_to_network(RDMACapabilities *cap)
166 {
167     cap->version = htonl(cap->version);
168     cap->flags = htonl(cap->flags);
169 }
170 
171 static void network_to_caps(RDMACapabilities *cap)
172 {
173     cap->version = ntohl(cap->version);
174     cap->flags = ntohl(cap->flags);
175 }
176 
177 /*
178  * Representation of a RAMBlock from an RDMA perspective.
179  * This is not transmitted, only local.
180  * This and subsequent structures cannot be linked lists
181  * because we're using a single IB message to transmit
182  * the information. It's small anyway, so a list is overkill.
183  */
184 typedef struct RDMALocalBlock {
185     char          *block_name;
186     uint8_t       *local_host_addr; /* local virtual address */
187     uint64_t       remote_host_addr; /* remote virtual address */
188     uint64_t       offset;
189     uint64_t       length;
190     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
191     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
192     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
193     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
194     int            index;           /* which block are we */
195     unsigned int   src_index;       /* (Only used on dest) */
196     bool           is_ram_block;
197     int            nb_chunks;
198     unsigned long *transit_bitmap;
199     unsigned long *unregister_bitmap;
200 } RDMALocalBlock;
201 
202 /*
203  * Also represents a RAMblock, but only on the dest.
204  * This gets transmitted by the dest during connection-time
205  * to the source VM and then is used to populate the
206  * corresponding RDMALocalBlock with
207  * the information needed to perform the actual RDMA.
208  */
209 typedef struct QEMU_PACKED RDMADestBlock {
210     uint64_t remote_host_addr;
211     uint64_t offset;
212     uint64_t length;
213     uint32_t remote_rkey;
214     uint32_t padding;
215 } RDMADestBlock;
216 
217 static const char *control_desc(unsigned int rdma_control)
218 {
219     static const char *strs[] = {
220         [RDMA_CONTROL_NONE] = "NONE",
221         [RDMA_CONTROL_ERROR] = "ERROR",
222         [RDMA_CONTROL_READY] = "READY",
223         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
224         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
225         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
226         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
227         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
228         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
229         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
230         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
231         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
232     };
233 
234     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
235         return "??BAD CONTROL VALUE??";
236     }
237 
238     return strs[rdma_control];
239 }
240 
241 #if !defined(htonll)
242 static uint64_t htonll(uint64_t v)
243 {
244     union { uint32_t lv[2]; uint64_t llv; } u;
245     u.lv[0] = htonl(v >> 32);
246     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
247     return u.llv;
248 }
249 #endif
250 
251 #if !defined(ntohll)
252 static uint64_t ntohll(uint64_t v)
253 {
254     union { uint32_t lv[2]; uint64_t llv; } u;
255     u.llv = v;
256     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
257 }
258 #endif
259 
260 static void dest_block_to_network(RDMADestBlock *db)
261 {
262     db->remote_host_addr = htonll(db->remote_host_addr);
263     db->offset = htonll(db->offset);
264     db->length = htonll(db->length);
265     db->remote_rkey = htonl(db->remote_rkey);
266 }
267 
268 static void network_to_dest_block(RDMADestBlock *db)
269 {
270     db->remote_host_addr = ntohll(db->remote_host_addr);
271     db->offset = ntohll(db->offset);
272     db->length = ntohll(db->length);
273     db->remote_rkey = ntohl(db->remote_rkey);
274 }
275 
276 /*
277  * Virtual address of the above structures used for transmitting
278  * the RAMBlock descriptions at connection-time.
279  * This structure is *not* transmitted.
280  */
281 typedef struct RDMALocalBlocks {
282     int nb_blocks;
283     bool     init;             /* main memory init complete */
284     RDMALocalBlock *block;
285 } RDMALocalBlocks;
286 
287 /*
288  * Main data structure for RDMA state.
289  * While there is only one copy of this structure being allocated right now,
290  * this is the place where one would start if you wanted to consider
291  * having more than one RDMA connection open at the same time.
292  */
293 typedef struct RDMAContext {
294     char *host;
295     int port;
296 
297     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
298 
299     /*
300      * This is used by *_exchange_send() to figure out whether or not
301      * the initial "READY" message has already been received or not.
302      * This is because other functions may potentially poll() and detect
303      * the READY message before send() does, in which case we need to
304      * know if it completed.
305      */
306     int control_ready_expected;
307 
308     /* number of outstanding writes */
309     int nb_sent;
310 
311     /* store info about current buffer so that we can
312        merge it with future sends */
313     uint64_t current_addr;
314     uint64_t current_length;
315     /* index of ram block the current buffer belongs to */
316     int current_index;
317     /* index of the chunk in the current ram block */
318     int current_chunk;
319 
320     bool pin_all;
321 
322     /*
323      * infiniband-specific variables for opening the device
324      * and maintaining connection state and so forth.
325      *
326      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
327      * cm_id->verbs, cm_id->channel, and cm_id->qp.
328      */
329     struct rdma_cm_id *cm_id;               /* connection manager ID */
330     struct rdma_cm_id *listen_id;
331     bool connected;
332 
333     struct ibv_context          *verbs;
334     struct rdma_event_channel   *channel;
335     struct ibv_qp *qp;                      /* queue pair */
336     struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
337     struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
338     struct ibv_pd *pd;                      /* protection domain */
339     struct ibv_cq *recv_cq;                 /* recvieve completion queue */
340     struct ibv_cq *send_cq;                 /* send completion queue */
341 
342     /*
343      * If a previous write failed (perhaps because of a failed
344      * memory registration, then do not attempt any future work
345      * and remember the error state.
346      */
347     bool errored;
348     bool error_reported;
349     bool received_error;
350 
351     /*
352      * Description of ram blocks used throughout the code.
353      */
354     RDMALocalBlocks local_ram_blocks;
355     RDMADestBlock  *dest_blocks;
356 
357     /* Index of the next RAMBlock received during block registration */
358     unsigned int    next_src_index;
359 
360     /*
361      * Migration on *destination* started.
362      * Then use coroutine yield function.
363      * Source runs in a thread, so we don't care.
364      */
365     int migration_started_on_destination;
366 
367     int total_registrations;
368     int total_writes;
369 
370     int unregister_current, unregister_next;
371     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
372 
373     GHashTable *blockmap;
374 
375     /* the RDMAContext for return path */
376     struct RDMAContext *return_path;
377     bool is_return_path;
378 } RDMAContext;
379 
380 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
381 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
382 
383 
384 
385 struct QIOChannelRDMA {
386     QIOChannel parent;
387     RDMAContext *rdmain;
388     RDMAContext *rdmaout;
389     QEMUFile *file;
390     bool blocking; /* XXX we don't actually honour this yet */
391 };
392 
393 /*
394  * Main structure for IB Send/Recv control messages.
395  * This gets prepended at the beginning of every Send/Recv.
396  */
397 typedef struct QEMU_PACKED {
398     uint32_t len;     /* Total length of data portion */
399     uint32_t type;    /* which control command to perform */
400     uint32_t repeat;  /* number of commands in data portion of same type */
401     uint32_t padding;
402 } RDMAControlHeader;
403 
404 static void control_to_network(RDMAControlHeader *control)
405 {
406     control->type = htonl(control->type);
407     control->len = htonl(control->len);
408     control->repeat = htonl(control->repeat);
409 }
410 
411 static void network_to_control(RDMAControlHeader *control)
412 {
413     control->type = ntohl(control->type);
414     control->len = ntohl(control->len);
415     control->repeat = ntohl(control->repeat);
416 }
417 
418 /*
419  * Register a single Chunk.
420  * Information sent by the source VM to inform the dest
421  * to register an single chunk of memory before we can perform
422  * the actual RDMA operation.
423  */
424 typedef struct QEMU_PACKED {
425     union QEMU_PACKED {
426         uint64_t current_addr;  /* offset into the ram_addr_t space */
427         uint64_t chunk;         /* chunk to lookup if unregistering */
428     } key;
429     uint32_t current_index; /* which ramblock the chunk belongs to */
430     uint32_t padding;
431     uint64_t chunks;            /* how many sequential chunks to register */
432 } RDMARegister;
433 
434 static bool rdma_errored(RDMAContext *rdma)
435 {
436     if (rdma->errored && !rdma->error_reported) {
437         error_report("RDMA is in an error state waiting migration"
438                      " to abort!");
439         rdma->error_reported = true;
440     }
441     return rdma->errored;
442 }
443 
444 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
445 {
446     RDMALocalBlock *local_block;
447     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
448 
449     if (local_block->is_ram_block) {
450         /*
451          * current_addr as passed in is an address in the local ram_addr_t
452          * space, we need to translate this for the destination
453          */
454         reg->key.current_addr -= local_block->offset;
455         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
456     }
457     reg->key.current_addr = htonll(reg->key.current_addr);
458     reg->current_index = htonl(reg->current_index);
459     reg->chunks = htonll(reg->chunks);
460 }
461 
462 static void network_to_register(RDMARegister *reg)
463 {
464     reg->key.current_addr = ntohll(reg->key.current_addr);
465     reg->current_index = ntohl(reg->current_index);
466     reg->chunks = ntohll(reg->chunks);
467 }
468 
469 typedef struct QEMU_PACKED {
470     uint32_t value;     /* if zero, we will madvise() */
471     uint32_t block_idx; /* which ram block index */
472     uint64_t offset;    /* Address in remote ram_addr_t space */
473     uint64_t length;    /* length of the chunk */
474 } RDMACompress;
475 
476 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
477 {
478     comp->value = htonl(comp->value);
479     /*
480      * comp->offset as passed in is an address in the local ram_addr_t
481      * space, we need to translate this for the destination
482      */
483     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
484     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
485     comp->block_idx = htonl(comp->block_idx);
486     comp->offset = htonll(comp->offset);
487     comp->length = htonll(comp->length);
488 }
489 
490 static void network_to_compress(RDMACompress *comp)
491 {
492     comp->value = ntohl(comp->value);
493     comp->block_idx = ntohl(comp->block_idx);
494     comp->offset = ntohll(comp->offset);
495     comp->length = ntohll(comp->length);
496 }
497 
498 /*
499  * The result of the dest's memory registration produces an "rkey"
500  * which the source VM must reference in order to perform
501  * the RDMA operation.
502  */
503 typedef struct QEMU_PACKED {
504     uint32_t rkey;
505     uint32_t padding;
506     uint64_t host_addr;
507 } RDMARegisterResult;
508 
509 static void result_to_network(RDMARegisterResult *result)
510 {
511     result->rkey = htonl(result->rkey);
512     result->host_addr = htonll(result->host_addr);
513 };
514 
515 static void network_to_result(RDMARegisterResult *result)
516 {
517     result->rkey = ntohl(result->rkey);
518     result->host_addr = ntohll(result->host_addr);
519 };
520 
521 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
522                                    uint8_t *data, RDMAControlHeader *resp,
523                                    int *resp_idx,
524                                    int (*callback)(RDMAContext *rdma,
525                                                    Error **errp),
526                                    Error **errp);
527 
528 static inline uint64_t ram_chunk_index(const uint8_t *start,
529                                        const uint8_t *host)
530 {
531     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
532 }
533 
534 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
535                                        uint64_t i)
536 {
537     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
538                                   (i << RDMA_REG_CHUNK_SHIFT));
539 }
540 
541 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
542                                      uint64_t i)
543 {
544     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
545                                          (1UL << RDMA_REG_CHUNK_SHIFT);
546 
547     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
548         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
549     }
550 
551     return result;
552 }
553 
554 static void rdma_add_block(RDMAContext *rdma, const char *block_name,
555                            void *host_addr,
556                            ram_addr_t block_offset, uint64_t length)
557 {
558     RDMALocalBlocks *local = &rdma->local_ram_blocks;
559     RDMALocalBlock *block;
560     RDMALocalBlock *old = local->block;
561 
562     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
563 
564     if (local->nb_blocks) {
565         if (rdma->blockmap) {
566             for (int x = 0; x < local->nb_blocks; x++) {
567                 g_hash_table_remove(rdma->blockmap,
568                                     (void *)(uintptr_t)old[x].offset);
569                 g_hash_table_insert(rdma->blockmap,
570                                     (void *)(uintptr_t)old[x].offset,
571                                     &local->block[x]);
572             }
573         }
574         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
575         g_free(old);
576     }
577 
578     block = &local->block[local->nb_blocks];
579 
580     block->block_name = g_strdup(block_name);
581     block->local_host_addr = host_addr;
582     block->offset = block_offset;
583     block->length = length;
584     block->index = local->nb_blocks;
585     block->src_index = ~0U; /* Filled in by the receipt of the block list */
586     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
587     block->transit_bitmap = bitmap_new(block->nb_chunks);
588     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
589     block->unregister_bitmap = bitmap_new(block->nb_chunks);
590     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
591     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
592 
593     block->is_ram_block = local->init ? false : true;
594 
595     if (rdma->blockmap) {
596         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
597     }
598 
599     trace_rdma_add_block(block_name, local->nb_blocks,
600                          (uintptr_t) block->local_host_addr,
601                          block->offset, block->length,
602                          (uintptr_t) (block->local_host_addr + block->length),
603                          BITS_TO_LONGS(block->nb_chunks) *
604                              sizeof(unsigned long) * 8,
605                          block->nb_chunks);
606 
607     local->nb_blocks++;
608 }
609 
610 /*
611  * Memory regions need to be registered with the device and queue pairs setup
612  * in advanced before the migration starts. This tells us where the RAM blocks
613  * are so that we can register them individually.
614  */
615 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
616 {
617     const char *block_name = qemu_ram_get_idstr(rb);
618     void *host_addr = qemu_ram_get_host_addr(rb);
619     ram_addr_t block_offset = qemu_ram_get_offset(rb);
620     ram_addr_t length = qemu_ram_get_used_length(rb);
621     rdma_add_block(opaque, block_name, host_addr, block_offset, length);
622     return 0;
623 }
624 
625 /*
626  * Identify the RAMBlocks and their quantity. They will be references to
627  * identify chunk boundaries inside each RAMBlock and also be referenced
628  * during dynamic page registration.
629  */
630 static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
631 {
632     RDMALocalBlocks *local = &rdma->local_ram_blocks;
633     int ret;
634 
635     assert(rdma->blockmap == NULL);
636     memset(local, 0, sizeof *local);
637     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
638     assert(!ret);
639     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
640     rdma->dest_blocks = g_new0(RDMADestBlock,
641                                rdma->local_ram_blocks.nb_blocks);
642     local->init = true;
643 }
644 
645 /*
646  * Note: If used outside of cleanup, the caller must ensure that the destination
647  * block structures are also updated
648  */
649 static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
650 {
651     RDMALocalBlocks *local = &rdma->local_ram_blocks;
652     RDMALocalBlock *old = local->block;
653 
654     if (rdma->blockmap) {
655         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
656     }
657     if (block->pmr) {
658         for (int j = 0; j < block->nb_chunks; j++) {
659             if (!block->pmr[j]) {
660                 continue;
661             }
662             ibv_dereg_mr(block->pmr[j]);
663             rdma->total_registrations--;
664         }
665         g_free(block->pmr);
666         block->pmr = NULL;
667     }
668 
669     if (block->mr) {
670         ibv_dereg_mr(block->mr);
671         rdma->total_registrations--;
672         block->mr = NULL;
673     }
674 
675     g_free(block->transit_bitmap);
676     block->transit_bitmap = NULL;
677 
678     g_free(block->unregister_bitmap);
679     block->unregister_bitmap = NULL;
680 
681     g_free(block->remote_keys);
682     block->remote_keys = NULL;
683 
684     g_free(block->block_name);
685     block->block_name = NULL;
686 
687     if (rdma->blockmap) {
688         for (int x = 0; x < local->nb_blocks; x++) {
689             g_hash_table_remove(rdma->blockmap,
690                                 (void *)(uintptr_t)old[x].offset);
691         }
692     }
693 
694     if (local->nb_blocks > 1) {
695 
696         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
697 
698         if (block->index) {
699             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
700         }
701 
702         if (block->index < (local->nb_blocks - 1)) {
703             memcpy(local->block + block->index, old + (block->index + 1),
704                 sizeof(RDMALocalBlock) *
705                     (local->nb_blocks - (block->index + 1)));
706             for (int x = block->index; x < local->nb_blocks - 1; x++) {
707                 local->block[x].index--;
708             }
709         }
710     } else {
711         assert(block == local->block);
712         local->block = NULL;
713     }
714 
715     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
716                            block->offset, block->length,
717                             (uintptr_t)(block->local_host_addr + block->length),
718                            BITS_TO_LONGS(block->nb_chunks) *
719                                sizeof(unsigned long) * 8, block->nb_chunks);
720 
721     g_free(old);
722 
723     local->nb_blocks--;
724 
725     if (local->nb_blocks && rdma->blockmap) {
726         for (int x = 0; x < local->nb_blocks; x++) {
727             g_hash_table_insert(rdma->blockmap,
728                                 (void *)(uintptr_t)local->block[x].offset,
729                                 &local->block[x]);
730         }
731     }
732 }
733 
734 /*
735  * Trace RDMA device open, with device details.
736  */
737 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
738 {
739     struct ibv_port_attr port;
740 
741     if (ibv_query_port(verbs, 1, &port)) {
742         trace_qemu_rdma_dump_id_failed(who);
743         return;
744     }
745 
746     trace_qemu_rdma_dump_id(who,
747                 verbs->device->name,
748                 verbs->device->dev_name,
749                 verbs->device->dev_path,
750                 verbs->device->ibdev_path,
751                 port.link_layer,
752                 port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband"
753                 : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet"
754                 : "Unknown");
755 }
756 
757 /*
758  * Trace RDMA gid addressing information.
759  * Useful for understanding the RDMA device hierarchy in the kernel.
760  */
761 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
762 {
763     char sgid[33];
764     char dgid[33];
765     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
766     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
767     trace_qemu_rdma_dump_gid(who, sgid, dgid);
768 }
769 
770 /*
771  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
772  * We will try the next addrinfo struct, and fail if there are
773  * no other valid addresses to bind against.
774  *
775  * If user is listening on '[::]', then we will not have a opened a device
776  * yet and have no way of verifying if the device is RoCE or not.
777  *
778  * In this case, the source VM will throw an error for ALL types of
779  * connections (both IPv4 and IPv6) if the destination machine does not have
780  * a regular infiniband network available for use.
781  *
782  * The only way to guarantee that an error is thrown for broken kernels is
783  * for the management software to choose a *specific* interface at bind time
784  * and validate what time of hardware it is.
785  *
786  * Unfortunately, this puts the user in a fix:
787  *
788  *  If the source VM connects with an IPv4 address without knowing that the
789  *  destination has bound to '[::]' the migration will unconditionally fail
790  *  unless the management software is explicitly listening on the IPv4
791  *  address while using a RoCE-based device.
792  *
793  *  If the source VM connects with an IPv6 address, then we're OK because we can
794  *  throw an error on the source (and similarly on the destination).
795  *
796  *  But in mixed environments, this will be broken for a while until it is fixed
797  *  inside linux.
798  *
799  * We do provide a *tiny* bit of help in this function: We can list all of the
800  * devices in the system and check to see if all the devices are RoCE or
801  * Infiniband.
802  *
803  * If we detect that we have a *pure* RoCE environment, then we can safely
804  * thrown an error even if the management software has specified '[::]' as the
805  * bind address.
806  *
807  * However, if there is are multiple hetergeneous devices, then we cannot make
808  * this assumption and the user just has to be sure they know what they are
809  * doing.
810  *
811  * Patches are being reviewed on linux-rdma.
812  */
813 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
814 {
815     /* This bug only exists in linux, to our knowledge. */
816 #ifdef CONFIG_LINUX
817     struct ibv_port_attr port_attr;
818 
819     /*
820      * Verbs are only NULL if management has bound to '[::]'.
821      *
822      * Let's iterate through all the devices and see if there any pure IB
823      * devices (non-ethernet).
824      *
825      * If not, then we can safely proceed with the migration.
826      * Otherwise, there are no guarantees until the bug is fixed in linux.
827      */
828     if (!verbs) {
829         int num_devices;
830         struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
831         bool roce_found = false;
832         bool ib_found = false;
833 
834         for (int x = 0; x < num_devices; x++) {
835             verbs = ibv_open_device(dev_list[x]);
836             /*
837              * ibv_open_device() is not documented to set errno.  If
838              * it does, it's somebody else's doc bug.  If it doesn't,
839              * the use of errno below is wrong.
840              * TODO Find out whether ibv_open_device() sets errno.
841              */
842             if (!verbs) {
843                 if (errno == EPERM) {
844                     continue;
845                 } else {
846                     error_setg_errno(errp, errno,
847                                      "could not open RDMA device context");
848                     return -1;
849                 }
850             }
851 
852             if (ibv_query_port(verbs, 1, &port_attr)) {
853                 ibv_close_device(verbs);
854                 error_setg(errp,
855                            "RDMA ERROR: Could not query initial IB port");
856                 return -1;
857             }
858 
859             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
860                 ib_found = true;
861             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
862                 roce_found = true;
863             }
864 
865             ibv_close_device(verbs);
866 
867         }
868 
869         if (roce_found) {
870             if (ib_found) {
871                 warn_report("migrations may fail:"
872                             " IPv6 over RoCE / iWARP in linux"
873                             " is broken. But since you appear to have a"
874                             " mixed RoCE / IB environment, be sure to only"
875                             " migrate over the IB fabric until the kernel "
876                             " fixes the bug.");
877             } else {
878                 error_setg(errp, "RDMA ERROR: "
879                            "You only have RoCE / iWARP devices in your systems"
880                            " and your management software has specified '[::]'"
881                            ", but IPv6 over RoCE / iWARP is not supported in Linux.");
882                 return -1;
883             }
884         }
885 
886         return 0;
887     }
888 
889     /*
890      * If we have a verbs context, that means that some other than '[::]' was
891      * used by the management software for binding. In which case we can
892      * actually warn the user about a potentially broken kernel.
893      */
894 
895     /* IB ports start with 1, not 0 */
896     if (ibv_query_port(verbs, 1, &port_attr)) {
897         error_setg(errp, "RDMA ERROR: Could not query initial IB port");
898         return -1;
899     }
900 
901     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
902         error_setg(errp, "RDMA ERROR: "
903                    "Linux kernel's RoCE / iWARP does not support IPv6 "
904                    "(but patches on linux-rdma in progress)");
905         return -1;
906     }
907 
908 #endif
909 
910     return 0;
911 }
912 
913 /*
914  * Figure out which RDMA device corresponds to the requested IP hostname
915  * Also create the initial connection manager identifiers for opening
916  * the connection.
917  */
918 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
919 {
920     Error *err = NULL;
921     int ret;
922     struct rdma_addrinfo *res;
923     char port_str[16];
924     struct rdma_cm_event *cm_event;
925     char ip[40] = "unknown";
926 
927     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
928         error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
929         return -1;
930     }
931 
932     /* create CM channel */
933     rdma->channel = rdma_create_event_channel();
934     if (!rdma->channel) {
935         error_setg(errp, "RDMA ERROR: could not create CM channel");
936         return -1;
937     }
938 
939     /* create CM id */
940     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
941     if (ret < 0) {
942         error_setg(errp, "RDMA ERROR: could not create channel id");
943         goto err_resolve_create_id;
944     }
945 
946     snprintf(port_str, 16, "%d", rdma->port);
947     port_str[15] = '\0';
948 
949     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
950     if (ret) {
951         error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
952                    rdma->host);
953         goto err_resolve_get_addr;
954     }
955 
956     /* Try all addresses, saving the first error in @err */
957     for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) {
958         Error **local_errp = err ? NULL : &err;
959 
960         inet_ntop(e->ai_family,
961             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
962         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
963 
964         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
965                 RDMA_RESOLVE_TIMEOUT_MS);
966         if (ret >= 0) {
967             if (e->ai_family == AF_INET6) {
968                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs,
969                                                    local_errp);
970                 if (ret < 0) {
971                     continue;
972                 }
973             }
974             error_free(err);
975             goto route;
976         }
977     }
978 
979     rdma_freeaddrinfo(res);
980     if (err) {
981         error_propagate(errp, err);
982     } else {
983         error_setg(errp, "RDMA ERROR: could not resolve address %s",
984                    rdma->host);
985     }
986     goto err_resolve_get_addr;
987 
988 route:
989     rdma_freeaddrinfo(res);
990     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
991 
992     ret = rdma_get_cm_event(rdma->channel, &cm_event);
993     if (ret < 0) {
994         error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
995         goto err_resolve_get_addr;
996     }
997 
998     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
999         error_setg(errp,
1000                    "RDMA ERROR: result not equal to event_addr_resolved %s",
1001                    rdma_event_str(cm_event->event));
1002         rdma_ack_cm_event(cm_event);
1003         goto err_resolve_get_addr;
1004     }
1005     rdma_ack_cm_event(cm_event);
1006 
1007     /* resolve route */
1008     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1009     if (ret < 0) {
1010         error_setg(errp, "RDMA ERROR: could not resolve rdma route");
1011         goto err_resolve_get_addr;
1012     }
1013 
1014     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1015     if (ret < 0) {
1016         error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
1017         goto err_resolve_get_addr;
1018     }
1019     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1020         error_setg(errp, "RDMA ERROR: "
1021                    "result not equal to event_route_resolved: %s",
1022                    rdma_event_str(cm_event->event));
1023         rdma_ack_cm_event(cm_event);
1024         goto err_resolve_get_addr;
1025     }
1026     rdma_ack_cm_event(cm_event);
1027     rdma->verbs = rdma->cm_id->verbs;
1028     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1029     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1030     return 0;
1031 
1032 err_resolve_get_addr:
1033     rdma_destroy_id(rdma->cm_id);
1034     rdma->cm_id = NULL;
1035 err_resolve_create_id:
1036     rdma_destroy_event_channel(rdma->channel);
1037     rdma->channel = NULL;
1038     return -1;
1039 }
1040 
1041 /*
1042  * Create protection domain and completion queues
1043  */
1044 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp)
1045 {
1046     /* allocate pd */
1047     rdma->pd = ibv_alloc_pd(rdma->verbs);
1048     if (!rdma->pd) {
1049         error_setg(errp, "failed to allocate protection domain");
1050         return -1;
1051     }
1052 
1053     /* create receive completion channel */
1054     rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1055     if (!rdma->recv_comp_channel) {
1056         error_setg(errp, "failed to allocate receive completion channel");
1057         goto err_alloc_pd_cq;
1058     }
1059 
1060     /*
1061      * Completion queue can be filled by read work requests.
1062      */
1063     rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1064                                   NULL, rdma->recv_comp_channel, 0);
1065     if (!rdma->recv_cq) {
1066         error_setg(errp, "failed to allocate receive completion queue");
1067         goto err_alloc_pd_cq;
1068     }
1069 
1070     /* create send completion channel */
1071     rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1072     if (!rdma->send_comp_channel) {
1073         error_setg(errp, "failed to allocate send completion channel");
1074         goto err_alloc_pd_cq;
1075     }
1076 
1077     rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1078                                   NULL, rdma->send_comp_channel, 0);
1079     if (!rdma->send_cq) {
1080         error_setg(errp, "failed to allocate send completion queue");
1081         goto err_alloc_pd_cq;
1082     }
1083 
1084     return 0;
1085 
1086 err_alloc_pd_cq:
1087     if (rdma->pd) {
1088         ibv_dealloc_pd(rdma->pd);
1089     }
1090     if (rdma->recv_comp_channel) {
1091         ibv_destroy_comp_channel(rdma->recv_comp_channel);
1092     }
1093     if (rdma->send_comp_channel) {
1094         ibv_destroy_comp_channel(rdma->send_comp_channel);
1095     }
1096     if (rdma->recv_cq) {
1097         ibv_destroy_cq(rdma->recv_cq);
1098         rdma->recv_cq = NULL;
1099     }
1100     rdma->pd = NULL;
1101     rdma->recv_comp_channel = NULL;
1102     rdma->send_comp_channel = NULL;
1103     return -1;
1104 
1105 }
1106 
1107 /*
1108  * Create queue pairs.
1109  */
1110 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1111 {
1112     struct ibv_qp_init_attr attr = { 0 };
1113 
1114     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1115     attr.cap.max_recv_wr = 3;
1116     attr.cap.max_send_sge = 1;
1117     attr.cap.max_recv_sge = 1;
1118     attr.send_cq = rdma->send_cq;
1119     attr.recv_cq = rdma->recv_cq;
1120     attr.qp_type = IBV_QPT_RC;
1121 
1122     if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) {
1123         return -1;
1124     }
1125 
1126     rdma->qp = rdma->cm_id->qp;
1127     return 0;
1128 }
1129 
1130 /* Check whether On-Demand Paging is supported by RDAM device */
1131 static bool rdma_support_odp(struct ibv_context *dev)
1132 {
1133     struct ibv_device_attr_ex attr = {0};
1134 
1135     if (ibv_query_device_ex(dev, NULL, &attr)) {
1136         return false;
1137     }
1138 
1139     if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1140         return true;
1141     }
1142 
1143     return false;
1144 }
1145 
1146 /*
1147  * ibv_advise_mr to avoid RNR NAK error as far as possible.
1148  * The responder mr registering with ODP will sent RNR NAK back to
1149  * the requester in the face of the page fault.
1150  */
1151 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1152                                          uint32_t len,  uint32_t lkey,
1153                                          const char *name, bool wr)
1154 {
1155 #ifdef HAVE_IBV_ADVISE_MR
1156     int ret;
1157     int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1158                  IBV_ADVISE_MR_ADVICE_PREFETCH;
1159     struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1160 
1161     ret = ibv_advise_mr(pd, advice,
1162                         IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1163     /* ignore the error */
1164     trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
1165 #endif
1166 }
1167 
1168 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp)
1169 {
1170     int i;
1171     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1172 
1173     for (i = 0; i < local->nb_blocks; i++) {
1174         int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1175 
1176         local->block[i].mr =
1177             ibv_reg_mr(rdma->pd,
1178                     local->block[i].local_host_addr,
1179                     local->block[i].length, access
1180                     );
1181         /*
1182          * ibv_reg_mr() is not documented to set errno.  If it does,
1183          * it's somebody else's doc bug.  If it doesn't, the use of
1184          * errno below is wrong.
1185          * TODO Find out whether ibv_reg_mr() sets errno.
1186          */
1187         if (!local->block[i].mr &&
1188             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1189                 access |= IBV_ACCESS_ON_DEMAND;
1190                 /* register ODP mr */
1191                 local->block[i].mr =
1192                     ibv_reg_mr(rdma->pd,
1193                                local->block[i].local_host_addr,
1194                                local->block[i].length, access);
1195                 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1196 
1197                 if (local->block[i].mr) {
1198                     qemu_rdma_advise_prefetch_mr(rdma->pd,
1199                                     (uintptr_t)local->block[i].local_host_addr,
1200                                     local->block[i].length,
1201                                     local->block[i].mr->lkey,
1202                                     local->block[i].block_name,
1203                                     true);
1204                 }
1205         }
1206 
1207         if (!local->block[i].mr) {
1208             error_setg_errno(errp, errno,
1209                              "Failed to register local dest ram block!");
1210             goto err;
1211         }
1212         rdma->total_registrations++;
1213     }
1214 
1215     return 0;
1216 
1217 err:
1218     for (i--; i >= 0; i--) {
1219         ibv_dereg_mr(local->block[i].mr);
1220         local->block[i].mr = NULL;
1221         rdma->total_registrations--;
1222     }
1223 
1224     return -1;
1225 
1226 }
1227 
1228 /*
1229  * Find the ram block that corresponds to the page requested to be
1230  * transmitted by QEMU.
1231  *
1232  * Once the block is found, also identify which 'chunk' within that
1233  * block that the page belongs to.
1234  */
1235 static void qemu_rdma_search_ram_block(RDMAContext *rdma,
1236                                        uintptr_t block_offset,
1237                                        uint64_t offset,
1238                                        uint64_t length,
1239                                        uint64_t *block_index,
1240                                        uint64_t *chunk_index)
1241 {
1242     uint64_t current_addr = block_offset + offset;
1243     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1244                                                 (void *) block_offset);
1245     assert(block);
1246     assert(current_addr >= block->offset);
1247     assert((current_addr + length) <= (block->offset + block->length));
1248 
1249     *block_index = block->index;
1250     *chunk_index = ram_chunk_index(block->local_host_addr,
1251                 block->local_host_addr + (current_addr - block->offset));
1252 }
1253 
1254 /*
1255  * Register a chunk with IB. If the chunk was already registered
1256  * previously, then skip.
1257  *
1258  * Also return the keys associated with the registration needed
1259  * to perform the actual RDMA operation.
1260  */
1261 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1262         RDMALocalBlock *block, uintptr_t host_addr,
1263         uint32_t *lkey, uint32_t *rkey, int chunk,
1264         uint8_t *chunk_start, uint8_t *chunk_end)
1265 {
1266     if (block->mr) {
1267         if (lkey) {
1268             *lkey = block->mr->lkey;
1269         }
1270         if (rkey) {
1271             *rkey = block->mr->rkey;
1272         }
1273         return 0;
1274     }
1275 
1276     /* allocate memory to store chunk MRs */
1277     if (!block->pmr) {
1278         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1279     }
1280 
1281     /*
1282      * If 'rkey', then we're the destination, so grant access to the source.
1283      *
1284      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1285      */
1286     if (!block->pmr[chunk]) {
1287         uint64_t len = chunk_end - chunk_start;
1288         int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1289                      0;
1290 
1291         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1292 
1293         block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1294         /*
1295          * ibv_reg_mr() is not documented to set errno.  If it does,
1296          * it's somebody else's doc bug.  If it doesn't, the use of
1297          * errno below is wrong.
1298          * TODO Find out whether ibv_reg_mr() sets errno.
1299          */
1300         if (!block->pmr[chunk] &&
1301             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1302             access |= IBV_ACCESS_ON_DEMAND;
1303             /* register ODP mr */
1304             block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1305             trace_qemu_rdma_register_odp_mr(block->block_name);
1306 
1307             if (block->pmr[chunk]) {
1308                 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1309                                             len, block->pmr[chunk]->lkey,
1310                                             block->block_name, rkey);
1311 
1312             }
1313         }
1314     }
1315     if (!block->pmr[chunk]) {
1316         return -1;
1317     }
1318     rdma->total_registrations++;
1319 
1320     if (lkey) {
1321         *lkey = block->pmr[chunk]->lkey;
1322     }
1323     if (rkey) {
1324         *rkey = block->pmr[chunk]->rkey;
1325     }
1326     return 0;
1327 }
1328 
1329 /*
1330  * Register (at connection time) the memory used for control
1331  * channel messages.
1332  */
1333 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1334 {
1335     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1336             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1337             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1338     if (rdma->wr_data[idx].control_mr) {
1339         rdma->total_registrations++;
1340         return 0;
1341     }
1342     return -1;
1343 }
1344 
1345 /*
1346  * Perform a non-optimized memory unregistration after every transfer
1347  * for demonstration purposes, only if pin-all is not requested.
1348  *
1349  * Potential optimizations:
1350  * 1. Start a new thread to run this function continuously
1351         - for bit clearing
1352         - and for receipt of unregister messages
1353  * 2. Use an LRU.
1354  * 3. Use workload hints.
1355  */
1356 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1357 {
1358     Error *err = NULL;
1359 
1360     while (rdma->unregistrations[rdma->unregister_current]) {
1361         int ret;
1362         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1363         uint64_t chunk =
1364             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1365         uint64_t index =
1366             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1367         RDMALocalBlock *block =
1368             &(rdma->local_ram_blocks.block[index]);
1369         RDMARegister reg = { .current_index = index };
1370         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1371                                  };
1372         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1373                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1374                                    .repeat = 1,
1375                                  };
1376 
1377         trace_qemu_rdma_unregister_waiting_proc(chunk,
1378                                                 rdma->unregister_current);
1379 
1380         rdma->unregistrations[rdma->unregister_current] = 0;
1381         rdma->unregister_current++;
1382 
1383         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1384             rdma->unregister_current = 0;
1385         }
1386 
1387 
1388         /*
1389          * Unregistration is speculative (because migration is single-threaded
1390          * and we cannot break the protocol's inifinband message ordering).
1391          * Thus, if the memory is currently being used for transmission,
1392          * then abort the attempt to unregister and try again
1393          * later the next time a completion is received for this memory.
1394          */
1395         clear_bit(chunk, block->unregister_bitmap);
1396 
1397         if (test_bit(chunk, block->transit_bitmap)) {
1398             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1399             continue;
1400         }
1401 
1402         trace_qemu_rdma_unregister_waiting_send(chunk);
1403 
1404         ret = ibv_dereg_mr(block->pmr[chunk]);
1405         block->pmr[chunk] = NULL;
1406         block->remote_keys[chunk] = 0;
1407 
1408         if (ret != 0) {
1409             error_report("unregistration chunk failed: %s",
1410                          strerror(ret));
1411             return -1;
1412         }
1413         rdma->total_registrations--;
1414 
1415         reg.key.chunk = chunk;
1416         register_to_network(rdma, &reg);
1417         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1418                                       &resp, NULL, NULL, &err);
1419         if (ret < 0) {
1420             error_report_err(err);
1421             return -1;
1422         }
1423 
1424         trace_qemu_rdma_unregister_waiting_complete(chunk);
1425     }
1426 
1427     return 0;
1428 }
1429 
1430 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1431                                          uint64_t chunk)
1432 {
1433     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1434 
1435     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1436     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1437 
1438     return result;
1439 }
1440 
1441 /*
1442  * Consult the connection manager to see a work request
1443  * (of any kind) has completed.
1444  * Return the work request ID that completed.
1445  */
1446 static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1447                           uint64_t *wr_id_out, uint32_t *byte_len)
1448 {
1449     int ret;
1450     struct ibv_wc wc;
1451     uint64_t wr_id;
1452 
1453     ret = ibv_poll_cq(cq, 1, &wc);
1454 
1455     if (!ret) {
1456         *wr_id_out = RDMA_WRID_NONE;
1457         return 0;
1458     }
1459 
1460     if (ret < 0) {
1461         return -1;
1462     }
1463 
1464     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1465 
1466     if (wc.status != IBV_WC_SUCCESS) {
1467         return -1;
1468     }
1469 
1470     if (rdma->control_ready_expected &&
1471         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1472         trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
1473                                   rdma->nb_sent);
1474         rdma->control_ready_expected = 0;
1475     }
1476 
1477     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1478         uint64_t chunk =
1479             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1480         uint64_t index =
1481             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1482         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1483 
1484         trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
1485                                    index, chunk, block->local_host_addr,
1486                                    (void *)(uintptr_t)block->remote_host_addr);
1487 
1488         clear_bit(chunk, block->transit_bitmap);
1489 
1490         if (rdma->nb_sent > 0) {
1491             rdma->nb_sent--;
1492         }
1493     } else {
1494         trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
1495     }
1496 
1497     *wr_id_out = wc.wr_id;
1498     if (byte_len) {
1499         *byte_len = wc.byte_len;
1500     }
1501 
1502     return  0;
1503 }
1504 
1505 /* Wait for activity on the completion channel.
1506  * Returns 0 on success, none-0 on error.
1507  */
1508 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1509                                        struct ibv_comp_channel *comp_channel)
1510 {
1511     struct rdma_cm_event *cm_event;
1512 
1513     /*
1514      * Coroutine doesn't start until migration_fd_process_incoming()
1515      * so don't yield unless we know we're running inside of a coroutine.
1516      */
1517     if (rdma->migration_started_on_destination &&
1518         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1519         yield_until_fd_readable(comp_channel->fd);
1520     } else {
1521         /* This is the source side, we're in a separate thread
1522          * or destination prior to migration_fd_process_incoming()
1523          * after postcopy, the destination also in a separate thread.
1524          * we can't yield; so we have to poll the fd.
1525          * But we need to be able to handle 'cancel' or an error
1526          * without hanging forever.
1527          */
1528         while (!rdma->errored && !rdma->received_error) {
1529             GPollFD pfds[2];
1530             pfds[0].fd = comp_channel->fd;
1531             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1532             pfds[0].revents = 0;
1533 
1534             pfds[1].fd = rdma->channel->fd;
1535             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1536             pfds[1].revents = 0;
1537 
1538             /* 0.1s timeout, should be fine for a 'cancel' */
1539             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1540             case 2:
1541             case 1: /* fd active */
1542                 if (pfds[0].revents) {
1543                     return 0;
1544                 }
1545 
1546                 if (pfds[1].revents) {
1547                     if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
1548                         return -1;
1549                     }
1550 
1551                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1552                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1553                         rdma_ack_cm_event(cm_event);
1554                         return -1;
1555                     }
1556                     rdma_ack_cm_event(cm_event);
1557                 }
1558                 break;
1559 
1560             case 0: /* Timeout, go around again */
1561                 break;
1562 
1563             default: /* Error of some type -
1564                       * I don't trust errno from qemu_poll_ns
1565                      */
1566                 return -1;
1567             }
1568 
1569             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1570                 /* Bail out and let the cancellation happen */
1571                 return -1;
1572             }
1573         }
1574     }
1575 
1576     if (rdma->received_error) {
1577         return -1;
1578     }
1579     return -rdma->errored;
1580 }
1581 
1582 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
1583 {
1584     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1585            rdma->recv_comp_channel;
1586 }
1587 
1588 static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
1589 {
1590     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1591 }
1592 
1593 /*
1594  * Block until the next work request has completed.
1595  *
1596  * First poll to see if a work request has already completed,
1597  * otherwise block.
1598  *
1599  * If we encounter completed work requests for IDs other than
1600  * the one we're interested in, then that's generally an error.
1601  *
1602  * The only exception is actual RDMA Write completions. These
1603  * completions only need to be recorded, but do not actually
1604  * need further processing.
1605  */
1606 static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
1607                                     uint64_t wrid_requested,
1608                                     uint32_t *byte_len)
1609 {
1610     int num_cq_events = 0, ret;
1611     struct ibv_cq *cq;
1612     void *cq_ctx;
1613     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1614     struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1615     struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1616 
1617     if (ibv_req_notify_cq(poll_cq, 0)) {
1618         return -1;
1619     }
1620     /* poll cq first */
1621     while (wr_id != wrid_requested) {
1622         ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1623         if (ret < 0) {
1624             return -1;
1625         }
1626 
1627         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1628 
1629         if (wr_id == RDMA_WRID_NONE) {
1630             break;
1631         }
1632         if (wr_id != wrid_requested) {
1633             trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1634         }
1635     }
1636 
1637     if (wr_id == wrid_requested) {
1638         return 0;
1639     }
1640 
1641     while (1) {
1642         ret = qemu_rdma_wait_comp_channel(rdma, ch);
1643         if (ret < 0) {
1644             goto err_block_for_wrid;
1645         }
1646 
1647         ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1648         if (ret < 0) {
1649             goto err_block_for_wrid;
1650         }
1651 
1652         num_cq_events++;
1653 
1654         if (ibv_req_notify_cq(cq, 0)) {
1655             goto err_block_for_wrid;
1656         }
1657 
1658         while (wr_id != wrid_requested) {
1659             ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1660             if (ret < 0) {
1661                 goto err_block_for_wrid;
1662             }
1663 
1664             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1665 
1666             if (wr_id == RDMA_WRID_NONE) {
1667                 break;
1668             }
1669             if (wr_id != wrid_requested) {
1670                 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1671             }
1672         }
1673 
1674         if (wr_id == wrid_requested) {
1675             goto success_block_for_wrid;
1676         }
1677     }
1678 
1679 success_block_for_wrid:
1680     if (num_cq_events) {
1681         ibv_ack_cq_events(cq, num_cq_events);
1682     }
1683     return 0;
1684 
1685 err_block_for_wrid:
1686     if (num_cq_events) {
1687         ibv_ack_cq_events(cq, num_cq_events);
1688     }
1689 
1690     rdma->errored = true;
1691     return -1;
1692 }
1693 
1694 /*
1695  * Post a SEND message work request for the control channel
1696  * containing some data and block until the post completes.
1697  */
1698 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1699                                        RDMAControlHeader *head,
1700                                        Error **errp)
1701 {
1702     int ret;
1703     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1704     struct ibv_send_wr *bad_wr;
1705     struct ibv_sge sge = {
1706                            .addr = (uintptr_t)(wr->control),
1707                            .length = head->len + sizeof(RDMAControlHeader),
1708                            .lkey = wr->control_mr->lkey,
1709                          };
1710     struct ibv_send_wr send_wr = {
1711                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1712                                    .opcode = IBV_WR_SEND,
1713                                    .send_flags = IBV_SEND_SIGNALED,
1714                                    .sg_list = &sge,
1715                                    .num_sge = 1,
1716                                 };
1717 
1718     trace_qemu_rdma_post_send_control(control_desc(head->type));
1719 
1720     /*
1721      * We don't actually need to do a memcpy() in here if we used
1722      * the "sge" properly, but since we're only sending control messages
1723      * (not RAM in a performance-critical path), then its OK for now.
1724      *
1725      * The copy makes the RDMAControlHeader simpler to manipulate
1726      * for the time being.
1727      */
1728     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1729     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1730     control_to_network((void *) wr->control);
1731 
1732     if (buf) {
1733         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1734     }
1735 
1736 
1737     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1738 
1739     if (ret > 0) {
1740         error_setg(errp, "Failed to use post IB SEND for control");
1741         return -1;
1742     }
1743 
1744     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1745     if (ret < 0) {
1746         error_setg(errp, "rdma migration: send polling control error");
1747         return -1;
1748     }
1749 
1750     return 0;
1751 }
1752 
1753 /*
1754  * Post a RECV work request in anticipation of some future receipt
1755  * of data on the control channel.
1756  */
1757 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx,
1758                                        Error **errp)
1759 {
1760     struct ibv_recv_wr *bad_wr;
1761     struct ibv_sge sge = {
1762                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1763                             .length = RDMA_CONTROL_MAX_BUFFER,
1764                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1765                          };
1766 
1767     struct ibv_recv_wr recv_wr = {
1768                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1769                                     .sg_list = &sge,
1770                                     .num_sge = 1,
1771                                  };
1772 
1773 
1774     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1775         error_setg(errp, "error posting control recv");
1776         return -1;
1777     }
1778 
1779     return 0;
1780 }
1781 
1782 /*
1783  * Block and wait for a RECV control channel message to arrive.
1784  */
1785 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1786                 RDMAControlHeader *head, uint32_t expecting, int idx,
1787                 Error **errp)
1788 {
1789     uint32_t byte_len;
1790     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1791                                        &byte_len);
1792 
1793     if (ret < 0) {
1794         error_setg(errp, "rdma migration: recv polling control error!");
1795         return -1;
1796     }
1797 
1798     network_to_control((void *) rdma->wr_data[idx].control);
1799     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1800 
1801     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1802 
1803     if (expecting == RDMA_CONTROL_NONE) {
1804         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1805                                              head->type);
1806     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1807         error_setg(errp, "Was expecting a %s (%d) control message"
1808                 ", but got: %s (%d), length: %d",
1809                 control_desc(expecting), expecting,
1810                 control_desc(head->type), head->type, head->len);
1811         if (head->type == RDMA_CONTROL_ERROR) {
1812             rdma->received_error = true;
1813         }
1814         return -1;
1815     }
1816     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1817         error_setg(errp, "too long length: %d", head->len);
1818         return -1;
1819     }
1820     if (sizeof(*head) + head->len != byte_len) {
1821         error_setg(errp, "Malformed length: %d byte_len %d",
1822                    head->len, byte_len);
1823         return -1;
1824     }
1825 
1826     return 0;
1827 }
1828 
1829 /*
1830  * When a RECV work request has completed, the work request's
1831  * buffer is pointed at the header.
1832  *
1833  * This will advance the pointer to the data portion
1834  * of the control message of the work request's buffer that
1835  * was populated after the work request finished.
1836  */
1837 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1838                                   RDMAControlHeader *head)
1839 {
1840     rdma->wr_data[idx].control_len = head->len;
1841     rdma->wr_data[idx].control_curr =
1842         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1843 }
1844 
1845 /*
1846  * This is an 'atomic' high-level operation to deliver a single, unified
1847  * control-channel message.
1848  *
1849  * Additionally, if the user is expecting some kind of reply to this message,
1850  * they can request a 'resp' response message be filled in by posting an
1851  * additional work request on behalf of the user and waiting for an additional
1852  * completion.
1853  *
1854  * The extra (optional) response is used during registration to us from having
1855  * to perform an *additional* exchange of message just to provide a response by
1856  * instead piggy-backing on the acknowledgement.
1857  */
1858 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1859                                    uint8_t *data, RDMAControlHeader *resp,
1860                                    int *resp_idx,
1861                                    int (*callback)(RDMAContext *rdma,
1862                                                    Error **errp),
1863                                    Error **errp)
1864 {
1865     int ret;
1866 
1867     /*
1868      * Wait until the dest is ready before attempting to deliver the message
1869      * by waiting for a READY message.
1870      */
1871     if (rdma->control_ready_expected) {
1872         RDMAControlHeader resp_ignored;
1873 
1874         ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
1875                                               RDMA_CONTROL_READY,
1876                                               RDMA_WRID_READY, errp);
1877         if (ret < 0) {
1878             return -1;
1879         }
1880     }
1881 
1882     /*
1883      * If the user is expecting a response, post a WR in anticipation of it.
1884      */
1885     if (resp) {
1886         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp);
1887         if (ret < 0) {
1888             return -1;
1889         }
1890     }
1891 
1892     /*
1893      * Post a WR to replace the one we just consumed for the READY message.
1894      */
1895     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1896     if (ret < 0) {
1897         return -1;
1898     }
1899 
1900     /*
1901      * Deliver the control message that was requested.
1902      */
1903     ret = qemu_rdma_post_send_control(rdma, data, head, errp);
1904 
1905     if (ret < 0) {
1906         return -1;
1907     }
1908 
1909     /*
1910      * If we're expecting a response, block and wait for it.
1911      */
1912     if (resp) {
1913         if (callback) {
1914             trace_qemu_rdma_exchange_send_issue_callback();
1915             ret = callback(rdma, errp);
1916             if (ret < 0) {
1917                 return -1;
1918             }
1919         }
1920 
1921         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1922         ret = qemu_rdma_exchange_get_response(rdma, resp,
1923                                               resp->type, RDMA_WRID_DATA,
1924                                               errp);
1925 
1926         if (ret < 0) {
1927             return -1;
1928         }
1929 
1930         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1931         if (resp_idx) {
1932             *resp_idx = RDMA_WRID_DATA;
1933         }
1934         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1935     }
1936 
1937     rdma->control_ready_expected = 1;
1938 
1939     return 0;
1940 }
1941 
1942 /*
1943  * This is an 'atomic' high-level operation to receive a single, unified
1944  * control-channel message.
1945  */
1946 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1947                                    uint32_t expecting, Error **errp)
1948 {
1949     RDMAControlHeader ready = {
1950                                 .len = 0,
1951                                 .type = RDMA_CONTROL_READY,
1952                                 .repeat = 1,
1953                               };
1954     int ret;
1955 
1956     /*
1957      * Inform the source that we're ready to receive a message.
1958      */
1959     ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp);
1960 
1961     if (ret < 0) {
1962         return -1;
1963     }
1964 
1965     /*
1966      * Block and wait for the message.
1967      */
1968     ret = qemu_rdma_exchange_get_response(rdma, head,
1969                                           expecting, RDMA_WRID_READY, errp);
1970 
1971     if (ret < 0) {
1972         return -1;
1973     }
1974 
1975     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1976 
1977     /*
1978      * Post a new RECV work request to replace the one we just consumed.
1979      */
1980     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1981     if (ret < 0) {
1982         return -1;
1983     }
1984 
1985     return 0;
1986 }
1987 
1988 /*
1989  * Write an actual chunk of memory using RDMA.
1990  *
1991  * If we're using dynamic registration on the dest-side, we have to
1992  * send a registration command first.
1993  */
1994 static int qemu_rdma_write_one(RDMAContext *rdma,
1995                                int current_index, uint64_t current_addr,
1996                                uint64_t length, Error **errp)
1997 {
1998     struct ibv_sge sge;
1999     struct ibv_send_wr send_wr = { 0 };
2000     struct ibv_send_wr *bad_wr;
2001     int reg_result_idx, ret, count = 0;
2002     uint64_t chunk, chunks;
2003     uint8_t *chunk_start, *chunk_end;
2004     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2005     RDMARegister reg;
2006     RDMARegisterResult *reg_result;
2007     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2008     RDMAControlHeader head = { .len = sizeof(RDMARegister),
2009                                .type = RDMA_CONTROL_REGISTER_REQUEST,
2010                                .repeat = 1,
2011                              };
2012 
2013 retry:
2014     sge.addr = (uintptr_t)(block->local_host_addr +
2015                             (current_addr - block->offset));
2016     sge.length = length;
2017 
2018     chunk = ram_chunk_index(block->local_host_addr,
2019                             (uint8_t *)(uintptr_t)sge.addr);
2020     chunk_start = ram_chunk_start(block, chunk);
2021 
2022     if (block->is_ram_block) {
2023         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2024 
2025         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2026             chunks--;
2027         }
2028     } else {
2029         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2030 
2031         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2032             chunks--;
2033         }
2034     }
2035 
2036     trace_qemu_rdma_write_one_top(chunks + 1,
2037                                   (chunks + 1) *
2038                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2039 
2040     chunk_end = ram_chunk_end(block, chunk + chunks);
2041 
2042 
2043     while (test_bit(chunk, block->transit_bitmap)) {
2044         (void)count;
2045         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2046                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2047 
2048         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2049 
2050         if (ret < 0) {
2051             error_setg(errp, "Failed to Wait for previous write to complete "
2052                     "block %d chunk %" PRIu64
2053                     " current %" PRIu64 " len %" PRIu64 " %d",
2054                     current_index, chunk, sge.addr, length, rdma->nb_sent);
2055             return -1;
2056         }
2057     }
2058 
2059     if (!rdma->pin_all || !block->is_ram_block) {
2060         if (!block->remote_keys[chunk]) {
2061             /*
2062              * This chunk has not yet been registered, so first check to see
2063              * if the entire chunk is zero. If so, tell the other size to
2064              * memset() + madvise() the entire chunk without RDMA.
2065              */
2066 
2067             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2068                 RDMACompress comp = {
2069                                         .offset = current_addr,
2070                                         .value = 0,
2071                                         .block_idx = current_index,
2072                                         .length = length,
2073                                     };
2074 
2075                 head.len = sizeof(comp);
2076                 head.type = RDMA_CONTROL_COMPRESS;
2077 
2078                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2079                                                current_index, current_addr);
2080 
2081                 compress_to_network(rdma, &comp);
2082                 ret = qemu_rdma_exchange_send(rdma, &head,
2083                                 (uint8_t *) &comp, NULL, NULL, NULL, errp);
2084 
2085                 if (ret < 0) {
2086                     return -1;
2087                 }
2088 
2089                 /*
2090                  * TODO: Here we are sending something, but we are not
2091                  * accounting for anything transferred.  The following is wrong:
2092                  *
2093                  * stat64_add(&mig_stats.rdma_bytes, sge.length);
2094                  *
2095                  * because we are using some kind of compression.  I
2096                  * would think that head.len would be the more similar
2097                  * thing to a correct value.
2098                  */
2099                 stat64_add(&mig_stats.zero_pages,
2100                            sge.length / qemu_target_page_size());
2101                 return 1;
2102             }
2103 
2104             /*
2105              * Otherwise, tell other side to register.
2106              */
2107             reg.current_index = current_index;
2108             if (block->is_ram_block) {
2109                 reg.key.current_addr = current_addr;
2110             } else {
2111                 reg.key.chunk = chunk;
2112             }
2113             reg.chunks = chunks;
2114 
2115             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2116                                               current_addr);
2117 
2118             register_to_network(rdma, &reg);
2119             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2120                                     &resp, &reg_result_idx, NULL, errp);
2121             if (ret < 0) {
2122                 return -1;
2123             }
2124 
2125             /* try to overlap this single registration with the one we sent. */
2126             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2127                                                 &sge.lkey, NULL, chunk,
2128                                                 chunk_start, chunk_end)) {
2129                 error_setg(errp, "cannot get lkey");
2130                 return -1;
2131             }
2132 
2133             reg_result = (RDMARegisterResult *)
2134                     rdma->wr_data[reg_result_idx].control_curr;
2135 
2136             network_to_result(reg_result);
2137 
2138             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2139                                                  reg_result->rkey, chunk);
2140 
2141             block->remote_keys[chunk] = reg_result->rkey;
2142             block->remote_host_addr = reg_result->host_addr;
2143         } else {
2144             /* already registered before */
2145             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2146                                                 &sge.lkey, NULL, chunk,
2147                                                 chunk_start, chunk_end)) {
2148                 error_setg(errp, "cannot get lkey!");
2149                 return -1;
2150             }
2151         }
2152 
2153         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2154     } else {
2155         send_wr.wr.rdma.rkey = block->remote_rkey;
2156 
2157         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2158                                                      &sge.lkey, NULL, chunk,
2159                                                      chunk_start, chunk_end)) {
2160             error_setg(errp, "cannot get lkey!");
2161             return -1;
2162         }
2163     }
2164 
2165     /*
2166      * Encode the ram block index and chunk within this wrid.
2167      * We will use this information at the time of completion
2168      * to figure out which bitmap to check against and then which
2169      * chunk in the bitmap to look for.
2170      */
2171     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2172                                         current_index, chunk);
2173 
2174     send_wr.opcode = IBV_WR_RDMA_WRITE;
2175     send_wr.send_flags = IBV_SEND_SIGNALED;
2176     send_wr.sg_list = &sge;
2177     send_wr.num_sge = 1;
2178     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2179                                 (current_addr - block->offset);
2180 
2181     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2182                                    sge.length);
2183 
2184     /*
2185      * ibv_post_send() does not return negative error numbers,
2186      * per the specification they are positive - no idea why.
2187      */
2188     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2189 
2190     if (ret == ENOMEM) {
2191         trace_qemu_rdma_write_one_queue_full();
2192         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2193         if (ret < 0) {
2194             error_setg(errp, "rdma migration: failed to make "
2195                          "room in full send queue!");
2196             return -1;
2197         }
2198 
2199         goto retry;
2200 
2201     } else if (ret > 0) {
2202         error_setg_errno(errp, ret,
2203                          "rdma migration: post rdma write failed");
2204         return -1;
2205     }
2206 
2207     set_bit(chunk, block->transit_bitmap);
2208     stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
2209     /*
2210      * We are adding to transferred the amount of data written, but no
2211      * overhead at all.  I will assume that RDMA is magicaly and don't
2212      * need to transfer (at least) the addresses where it wants to
2213      * write the pages.  Here it looks like it should be something
2214      * like:
2215      *     sizeof(send_wr) + sge.length
2216      * but this being RDMA, who knows.
2217      */
2218     stat64_add(&mig_stats.rdma_bytes, sge.length);
2219     ram_transferred_add(sge.length);
2220     rdma->total_writes++;
2221 
2222     return 0;
2223 }
2224 
2225 /*
2226  * Push out any unwritten RDMA operations.
2227  *
2228  * We support sending out multiple chunks at the same time.
2229  * Not all of them need to get signaled in the completion queue.
2230  */
2231 static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp)
2232 {
2233     int ret;
2234 
2235     if (!rdma->current_length) {
2236         return 0;
2237     }
2238 
2239     ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr,
2240                               rdma->current_length, errp);
2241 
2242     if (ret < 0) {
2243         return -1;
2244     }
2245 
2246     if (ret == 0) {
2247         rdma->nb_sent++;
2248         trace_qemu_rdma_write_flush(rdma->nb_sent);
2249     }
2250 
2251     rdma->current_length = 0;
2252     rdma->current_addr = 0;
2253 
2254     return 0;
2255 }
2256 
2257 static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
2258                     uint64_t offset, uint64_t len)
2259 {
2260     RDMALocalBlock *block;
2261     uint8_t *host_addr;
2262     uint8_t *chunk_end;
2263 
2264     if (rdma->current_index < 0) {
2265         return false;
2266     }
2267 
2268     if (rdma->current_chunk < 0) {
2269         return false;
2270     }
2271 
2272     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2273     host_addr = block->local_host_addr + (offset - block->offset);
2274     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2275 
2276     if (rdma->current_length == 0) {
2277         return false;
2278     }
2279 
2280     /*
2281      * Only merge into chunk sequentially.
2282      */
2283     if (offset != (rdma->current_addr + rdma->current_length)) {
2284         return false;
2285     }
2286 
2287     if (offset < block->offset) {
2288         return false;
2289     }
2290 
2291     if ((offset + len) > (block->offset + block->length)) {
2292         return false;
2293     }
2294 
2295     if ((host_addr + len) > chunk_end) {
2296         return false;
2297     }
2298 
2299     return true;
2300 }
2301 
2302 /*
2303  * We're not actually writing here, but doing three things:
2304  *
2305  * 1. Identify the chunk the buffer belongs to.
2306  * 2. If the chunk is full or the buffer doesn't belong to the current
2307  *    chunk, then start a new chunk and flush() the old chunk.
2308  * 3. To keep the hardware busy, we also group chunks into batches
2309  *    and only require that a batch gets acknowledged in the completion
2310  *    queue instead of each individual chunk.
2311  */
2312 static int qemu_rdma_write(RDMAContext *rdma,
2313                            uint64_t block_offset, uint64_t offset,
2314                            uint64_t len, Error **errp)
2315 {
2316     uint64_t current_addr = block_offset + offset;
2317     uint64_t index = rdma->current_index;
2318     uint64_t chunk = rdma->current_chunk;
2319 
2320     /* If we cannot merge it, we flush the current buffer first. */
2321     if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
2322         if (qemu_rdma_write_flush(rdma, errp) < 0) {
2323             return -1;
2324         }
2325         rdma->current_length = 0;
2326         rdma->current_addr = current_addr;
2327 
2328         qemu_rdma_search_ram_block(rdma, block_offset,
2329                                    offset, len, &index, &chunk);
2330         rdma->current_index = index;
2331         rdma->current_chunk = chunk;
2332     }
2333 
2334     /* merge it */
2335     rdma->current_length += len;
2336 
2337     /* flush it if buffer is too large */
2338     if (rdma->current_length >= RDMA_MERGE_MAX) {
2339         return qemu_rdma_write_flush(rdma, errp);
2340     }
2341 
2342     return 0;
2343 }
2344 
2345 static void qemu_rdma_cleanup(RDMAContext *rdma)
2346 {
2347     Error *err = NULL;
2348 
2349     if (rdma->cm_id && rdma->connected) {
2350         if ((rdma->errored ||
2351              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2352             !rdma->received_error) {
2353             RDMAControlHeader head = { .len = 0,
2354                                        .type = RDMA_CONTROL_ERROR,
2355                                        .repeat = 1,
2356                                      };
2357             warn_report("Early error. Sending error.");
2358             if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) {
2359                 warn_report_err(err);
2360             }
2361         }
2362 
2363         rdma_disconnect(rdma->cm_id);
2364         trace_qemu_rdma_cleanup_disconnect();
2365         rdma->connected = false;
2366     }
2367 
2368     if (rdma->channel) {
2369         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2370     }
2371     g_free(rdma->dest_blocks);
2372     rdma->dest_blocks = NULL;
2373 
2374     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2375         if (rdma->wr_data[i].control_mr) {
2376             rdma->total_registrations--;
2377             ibv_dereg_mr(rdma->wr_data[i].control_mr);
2378         }
2379         rdma->wr_data[i].control_mr = NULL;
2380     }
2381 
2382     if (rdma->local_ram_blocks.block) {
2383         while (rdma->local_ram_blocks.nb_blocks) {
2384             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2385         }
2386     }
2387 
2388     if (rdma->qp) {
2389         rdma_destroy_qp(rdma->cm_id);
2390         rdma->qp = NULL;
2391     }
2392     if (rdma->recv_cq) {
2393         ibv_destroy_cq(rdma->recv_cq);
2394         rdma->recv_cq = NULL;
2395     }
2396     if (rdma->send_cq) {
2397         ibv_destroy_cq(rdma->send_cq);
2398         rdma->send_cq = NULL;
2399     }
2400     if (rdma->recv_comp_channel) {
2401         ibv_destroy_comp_channel(rdma->recv_comp_channel);
2402         rdma->recv_comp_channel = NULL;
2403     }
2404     if (rdma->send_comp_channel) {
2405         ibv_destroy_comp_channel(rdma->send_comp_channel);
2406         rdma->send_comp_channel = NULL;
2407     }
2408     if (rdma->pd) {
2409         ibv_dealloc_pd(rdma->pd);
2410         rdma->pd = NULL;
2411     }
2412     if (rdma->cm_id) {
2413         rdma_destroy_id(rdma->cm_id);
2414         rdma->cm_id = NULL;
2415     }
2416 
2417     /* the destination side, listen_id and channel is shared */
2418     if (rdma->listen_id) {
2419         if (!rdma->is_return_path) {
2420             rdma_destroy_id(rdma->listen_id);
2421         }
2422         rdma->listen_id = NULL;
2423 
2424         if (rdma->channel) {
2425             if (!rdma->is_return_path) {
2426                 rdma_destroy_event_channel(rdma->channel);
2427             }
2428             rdma->channel = NULL;
2429         }
2430     }
2431 
2432     if (rdma->channel) {
2433         rdma_destroy_event_channel(rdma->channel);
2434         rdma->channel = NULL;
2435     }
2436     g_free(rdma->host);
2437     rdma->host = NULL;
2438 }
2439 
2440 
2441 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2442 {
2443     int ret;
2444 
2445     /*
2446      * Will be validated against destination's actual capabilities
2447      * after the connect() completes.
2448      */
2449     rdma->pin_all = pin_all;
2450 
2451     ret = qemu_rdma_resolve_host(rdma, errp);
2452     if (ret < 0) {
2453         goto err_rdma_source_init;
2454     }
2455 
2456     ret = qemu_rdma_alloc_pd_cq(rdma, errp);
2457     if (ret < 0) {
2458         goto err_rdma_source_init;
2459     }
2460 
2461     ret = qemu_rdma_alloc_qp(rdma);
2462     if (ret < 0) {
2463         error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
2464         goto err_rdma_source_init;
2465     }
2466 
2467     qemu_rdma_init_ram_blocks(rdma);
2468 
2469     /* Build the hash that maps from offset to RAMBlock */
2470     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2471     for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) {
2472         g_hash_table_insert(rdma->blockmap,
2473                 (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset,
2474                 &rdma->local_ram_blocks.block[i]);
2475     }
2476 
2477     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2478         ret = qemu_rdma_reg_control(rdma, i);
2479         if (ret < 0) {
2480             error_setg(errp, "RDMA ERROR: rdma migration: error "
2481                        "registering %d control!", i);
2482             goto err_rdma_source_init;
2483         }
2484     }
2485 
2486     return 0;
2487 
2488 err_rdma_source_init:
2489     qemu_rdma_cleanup(rdma);
2490     return -1;
2491 }
2492 
2493 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2494                                      struct rdma_cm_event **cm_event,
2495                                      long msec, Error **errp)
2496 {
2497     int ret;
2498     struct pollfd poll_fd = {
2499                                 .fd = rdma->channel->fd,
2500                                 .events = POLLIN,
2501                                 .revents = 0
2502                             };
2503 
2504     do {
2505         ret = poll(&poll_fd, 1, msec);
2506     } while (ret < 0 && errno == EINTR);
2507 
2508     if (ret == 0) {
2509         error_setg(errp, "RDMA ERROR: poll cm event timeout");
2510         return -1;
2511     } else if (ret < 0) {
2512         error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
2513                    errno);
2514         return -1;
2515     } else if (poll_fd.revents & POLLIN) {
2516         if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
2517             error_setg(errp, "RDMA ERROR: failed to get cm event");
2518             return -1;
2519         }
2520         return 0;
2521     } else {
2522         error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
2523                    poll_fd.revents);
2524         return -1;
2525     }
2526 }
2527 
2528 static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
2529                              Error **errp)
2530 {
2531     RDMACapabilities cap = {
2532                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2533                                 .flags = 0,
2534                            };
2535     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2536                                           .retry_count = 5,
2537                                           .private_data = &cap,
2538                                           .private_data_len = sizeof(cap),
2539                                         };
2540     struct rdma_cm_event *cm_event;
2541     int ret;
2542 
2543     /*
2544      * Only negotiate the capability with destination if the user
2545      * on the source first requested the capability.
2546      */
2547     if (rdma->pin_all) {
2548         trace_qemu_rdma_connect_pin_all_requested();
2549         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2550     }
2551 
2552     caps_to_network(&cap);
2553 
2554     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
2555     if (ret < 0) {
2556         goto err_rdma_source_connect;
2557     }
2558 
2559     ret = rdma_connect(rdma->cm_id, &conn_param);
2560     if (ret < 0) {
2561         error_setg_errno(errp, errno,
2562                          "RDMA ERROR: connecting to destination!");
2563         goto err_rdma_source_connect;
2564     }
2565 
2566     if (return_path) {
2567         ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2568     } else {
2569         ret = rdma_get_cm_event(rdma->channel, &cm_event);
2570         if (ret < 0) {
2571             error_setg_errno(errp, errno,
2572                              "RDMA ERROR: failed to get cm event");
2573         }
2574     }
2575     if (ret < 0) {
2576         goto err_rdma_source_connect;
2577     }
2578 
2579     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2580         error_setg(errp, "RDMA ERROR: connecting to destination!");
2581         rdma_ack_cm_event(cm_event);
2582         goto err_rdma_source_connect;
2583     }
2584     rdma->connected = true;
2585 
2586     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2587     network_to_caps(&cap);
2588 
2589     /*
2590      * Verify that the *requested* capabilities are supported by the destination
2591      * and disable them otherwise.
2592      */
2593     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2594         warn_report("RDMA: Server cannot support pinning all memory. "
2595                     "Will register memory dynamically.");
2596         rdma->pin_all = false;
2597     }
2598 
2599     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2600 
2601     rdma_ack_cm_event(cm_event);
2602 
2603     rdma->control_ready_expected = 1;
2604     rdma->nb_sent = 0;
2605     return 0;
2606 
2607 err_rdma_source_connect:
2608     qemu_rdma_cleanup(rdma);
2609     return -1;
2610 }
2611 
2612 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2613 {
2614     Error *err = NULL;
2615     int ret;
2616     struct rdma_cm_id *listen_id;
2617     char ip[40] = "unknown";
2618     struct rdma_addrinfo *res, *e;
2619     char port_str[16];
2620     int reuse = 1;
2621 
2622     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2623         rdma->wr_data[i].control_len = 0;
2624         rdma->wr_data[i].control_curr = NULL;
2625     }
2626 
2627     if (!rdma->host || !rdma->host[0]) {
2628         error_setg(errp, "RDMA ERROR: RDMA host is not set!");
2629         rdma->errored = true;
2630         return -1;
2631     }
2632     /* create CM channel */
2633     rdma->channel = rdma_create_event_channel();
2634     if (!rdma->channel) {
2635         error_setg(errp, "RDMA ERROR: could not create rdma event channel");
2636         rdma->errored = true;
2637         return -1;
2638     }
2639 
2640     /* create CM id */
2641     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2642     if (ret < 0) {
2643         error_setg(errp, "RDMA ERROR: could not create cm_id!");
2644         goto err_dest_init_create_listen_id;
2645     }
2646 
2647     snprintf(port_str, 16, "%d", rdma->port);
2648     port_str[15] = '\0';
2649 
2650     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2651     if (ret) {
2652         error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
2653                    rdma->host);
2654         goto err_dest_init_bind_addr;
2655     }
2656 
2657     ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2658                           &reuse, sizeof reuse);
2659     if (ret < 0) {
2660         error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
2661         goto err_dest_init_bind_addr;
2662     }
2663 
2664     /* Try all addresses, saving the first error in @err */
2665     for (e = res; e != NULL; e = e->ai_next) {
2666         Error **local_errp = err ? NULL : &err;
2667 
2668         inet_ntop(e->ai_family,
2669             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2670         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2671         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2672         if (ret < 0) {
2673             continue;
2674         }
2675         if (e->ai_family == AF_INET6) {
2676             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs,
2677                                                local_errp);
2678             if (ret < 0) {
2679                 continue;
2680             }
2681         }
2682         error_free(err);
2683         break;
2684     }
2685 
2686     rdma_freeaddrinfo(res);
2687     if (!e) {
2688         if (err) {
2689             error_propagate(errp, err);
2690         } else {
2691             error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
2692         }
2693         goto err_dest_init_bind_addr;
2694     }
2695 
2696     rdma->listen_id = listen_id;
2697     qemu_rdma_dump_gid("dest_init", listen_id);
2698     return 0;
2699 
2700 err_dest_init_bind_addr:
2701     rdma_destroy_id(listen_id);
2702 err_dest_init_create_listen_id:
2703     rdma_destroy_event_channel(rdma->channel);
2704     rdma->channel = NULL;
2705     rdma->errored = true;
2706     return -1;
2707 
2708 }
2709 
2710 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2711                                             RDMAContext *rdma)
2712 {
2713     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2714         rdma_return_path->wr_data[i].control_len = 0;
2715         rdma_return_path->wr_data[i].control_curr = NULL;
2716     }
2717 
2718     /*the CM channel and CM id is shared*/
2719     rdma_return_path->channel = rdma->channel;
2720     rdma_return_path->listen_id = rdma->listen_id;
2721 
2722     rdma->return_path = rdma_return_path;
2723     rdma_return_path->return_path = rdma;
2724     rdma_return_path->is_return_path = true;
2725 }
2726 
2727 static RDMAContext *qemu_rdma_data_init(InetSocketAddress *saddr, Error **errp)
2728 {
2729     RDMAContext *rdma = NULL;
2730 
2731     rdma = g_new0(RDMAContext, 1);
2732     rdma->current_index = -1;
2733     rdma->current_chunk = -1;
2734 
2735     rdma->host = g_strdup(saddr->host);
2736     rdma->port = atoi(saddr->port);
2737     return rdma;
2738 }
2739 
2740 /*
2741  * QEMUFile interface to the control channel.
2742  * SEND messages for control only.
2743  * VM's ram is handled with regular RDMA messages.
2744  */
2745 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2746                                        const struct iovec *iov,
2747                                        size_t niov,
2748                                        int *fds,
2749                                        size_t nfds,
2750                                        int flags,
2751                                        Error **errp)
2752 {
2753     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2754     RDMAContext *rdma;
2755     int ret;
2756     ssize_t done = 0;
2757     size_t len;
2758 
2759     RCU_READ_LOCK_GUARD();
2760     rdma = qatomic_rcu_read(&rioc->rdmaout);
2761 
2762     if (!rdma) {
2763         error_setg(errp, "RDMA control channel output is not set");
2764         return -1;
2765     }
2766 
2767     if (rdma->errored) {
2768         error_setg(errp,
2769                    "RDMA is in an error state waiting migration to abort!");
2770         return -1;
2771     }
2772 
2773     /*
2774      * Push out any writes that
2775      * we're queued up for VM's ram.
2776      */
2777     ret = qemu_rdma_write_flush(rdma, errp);
2778     if (ret < 0) {
2779         rdma->errored = true;
2780         return -1;
2781     }
2782 
2783     for (int i = 0; i < niov; i++) {
2784         size_t remaining = iov[i].iov_len;
2785         uint8_t * data = (void *)iov[i].iov_base;
2786         while (remaining) {
2787             RDMAControlHeader head = {};
2788 
2789             len = MIN(remaining, RDMA_SEND_INCREMENT);
2790             remaining -= len;
2791 
2792             head.len = len;
2793             head.type = RDMA_CONTROL_QEMU_FILE;
2794 
2795             ret = qemu_rdma_exchange_send(rdma, &head,
2796                                           data, NULL, NULL, NULL, errp);
2797 
2798             if (ret < 0) {
2799                 rdma->errored = true;
2800                 return -1;
2801             }
2802 
2803             data += len;
2804             done += len;
2805         }
2806     }
2807 
2808     return done;
2809 }
2810 
2811 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2812                              size_t size, int idx)
2813 {
2814     size_t len = 0;
2815 
2816     if (rdma->wr_data[idx].control_len) {
2817         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2818 
2819         len = MIN(size, rdma->wr_data[idx].control_len);
2820         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2821         rdma->wr_data[idx].control_curr += len;
2822         rdma->wr_data[idx].control_len -= len;
2823     }
2824 
2825     return len;
2826 }
2827 
2828 /*
2829  * QEMUFile interface to the control channel.
2830  * RDMA links don't use bytestreams, so we have to
2831  * return bytes to QEMUFile opportunistically.
2832  */
2833 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2834                                       const struct iovec *iov,
2835                                       size_t niov,
2836                                       int **fds,
2837                                       size_t *nfds,
2838                                       int flags,
2839                                       Error **errp)
2840 {
2841     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2842     RDMAContext *rdma;
2843     RDMAControlHeader head;
2844     int ret;
2845     ssize_t done = 0;
2846     size_t len;
2847 
2848     RCU_READ_LOCK_GUARD();
2849     rdma = qatomic_rcu_read(&rioc->rdmain);
2850 
2851     if (!rdma) {
2852         error_setg(errp, "RDMA control channel input is not set");
2853         return -1;
2854     }
2855 
2856     if (rdma->errored) {
2857         error_setg(errp,
2858                    "RDMA is in an error state waiting migration to abort!");
2859         return -1;
2860     }
2861 
2862     for (int i = 0; i < niov; i++) {
2863         size_t want = iov[i].iov_len;
2864         uint8_t *data = (void *)iov[i].iov_base;
2865 
2866         /*
2867          * First, we hold on to the last SEND message we
2868          * were given and dish out the bytes until we run
2869          * out of bytes.
2870          */
2871         len = qemu_rdma_fill(rdma, data, want, 0);
2872         done += len;
2873         want -= len;
2874         /* Got what we needed, so go to next iovec */
2875         if (want == 0) {
2876             continue;
2877         }
2878 
2879         /* If we got any data so far, then don't wait
2880          * for more, just return what we have */
2881         if (done > 0) {
2882             break;
2883         }
2884 
2885 
2886         /* We've got nothing at all, so lets wait for
2887          * more to arrive
2888          */
2889         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE,
2890                                       errp);
2891 
2892         if (ret < 0) {
2893             rdma->errored = true;
2894             return -1;
2895         }
2896 
2897         /*
2898          * SEND was received with new bytes, now try again.
2899          */
2900         len = qemu_rdma_fill(rdma, data, want, 0);
2901         done += len;
2902         want -= len;
2903 
2904         /* Still didn't get enough, so lets just return */
2905         if (want) {
2906             if (done == 0) {
2907                 return QIO_CHANNEL_ERR_BLOCK;
2908             } else {
2909                 break;
2910             }
2911         }
2912     }
2913     return done;
2914 }
2915 
2916 /*
2917  * Block until all the outstanding chunks have been delivered by the hardware.
2918  */
2919 static int qemu_rdma_drain_cq(RDMAContext *rdma)
2920 {
2921     Error *err = NULL;
2922 
2923     if (qemu_rdma_write_flush(rdma, &err) < 0) {
2924         error_report_err(err);
2925         return -1;
2926     }
2927 
2928     while (rdma->nb_sent) {
2929         if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) {
2930             error_report("rdma migration: complete polling error!");
2931             return -1;
2932         }
2933     }
2934 
2935     qemu_rdma_unregister_waiting(rdma);
2936 
2937     return 0;
2938 }
2939 
2940 
2941 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2942                                          bool blocking,
2943                                          Error **errp)
2944 {
2945     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2946     /* XXX we should make readv/writev actually honour this :-) */
2947     rioc->blocking = blocking;
2948     return 0;
2949 }
2950 
2951 
2952 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2953 struct QIOChannelRDMASource {
2954     GSource parent;
2955     QIOChannelRDMA *rioc;
2956     GIOCondition condition;
2957 };
2958 
2959 static gboolean
2960 qio_channel_rdma_source_prepare(GSource *source,
2961                                 gint *timeout)
2962 {
2963     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2964     RDMAContext *rdma;
2965     GIOCondition cond = 0;
2966     *timeout = -1;
2967 
2968     RCU_READ_LOCK_GUARD();
2969     if (rsource->condition == G_IO_IN) {
2970         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2971     } else {
2972         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2973     }
2974 
2975     if (!rdma) {
2976         error_report("RDMAContext is NULL when prepare Gsource");
2977         return FALSE;
2978     }
2979 
2980     if (rdma->wr_data[0].control_len) {
2981         cond |= G_IO_IN;
2982     }
2983     cond |= G_IO_OUT;
2984 
2985     return cond & rsource->condition;
2986 }
2987 
2988 static gboolean
2989 qio_channel_rdma_source_check(GSource *source)
2990 {
2991     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2992     RDMAContext *rdma;
2993     GIOCondition cond = 0;
2994 
2995     RCU_READ_LOCK_GUARD();
2996     if (rsource->condition == G_IO_IN) {
2997         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2998     } else {
2999         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3000     }
3001 
3002     if (!rdma) {
3003         error_report("RDMAContext is NULL when check Gsource");
3004         return FALSE;
3005     }
3006 
3007     if (rdma->wr_data[0].control_len) {
3008         cond |= G_IO_IN;
3009     }
3010     cond |= G_IO_OUT;
3011 
3012     return cond & rsource->condition;
3013 }
3014 
3015 static gboolean
3016 qio_channel_rdma_source_dispatch(GSource *source,
3017                                  GSourceFunc callback,
3018                                  gpointer user_data)
3019 {
3020     QIOChannelFunc func = (QIOChannelFunc)callback;
3021     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3022     RDMAContext *rdma;
3023     GIOCondition cond = 0;
3024 
3025     RCU_READ_LOCK_GUARD();
3026     if (rsource->condition == G_IO_IN) {
3027         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3028     } else {
3029         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3030     }
3031 
3032     if (!rdma) {
3033         error_report("RDMAContext is NULL when dispatch Gsource");
3034         return FALSE;
3035     }
3036 
3037     if (rdma->wr_data[0].control_len) {
3038         cond |= G_IO_IN;
3039     }
3040     cond |= G_IO_OUT;
3041 
3042     return (*func)(QIO_CHANNEL(rsource->rioc),
3043                    (cond & rsource->condition),
3044                    user_data);
3045 }
3046 
3047 static void
3048 qio_channel_rdma_source_finalize(GSource *source)
3049 {
3050     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3051 
3052     object_unref(OBJECT(ssource->rioc));
3053 }
3054 
3055 static GSourceFuncs qio_channel_rdma_source_funcs = {
3056     qio_channel_rdma_source_prepare,
3057     qio_channel_rdma_source_check,
3058     qio_channel_rdma_source_dispatch,
3059     qio_channel_rdma_source_finalize
3060 };
3061 
3062 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3063                                               GIOCondition condition)
3064 {
3065     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3066     QIOChannelRDMASource *ssource;
3067     GSource *source;
3068 
3069     source = g_source_new(&qio_channel_rdma_source_funcs,
3070                           sizeof(QIOChannelRDMASource));
3071     ssource = (QIOChannelRDMASource *)source;
3072 
3073     ssource->rioc = rioc;
3074     object_ref(OBJECT(rioc));
3075 
3076     ssource->condition = condition;
3077 
3078     return source;
3079 }
3080 
3081 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3082                                                 AioContext *read_ctx,
3083                                                 IOHandler *io_read,
3084                                                 AioContext *write_ctx,
3085                                                 IOHandler *io_write,
3086                                                 void *opaque)
3087 {
3088     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3089     if (io_read) {
3090         aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
3091                            io_read, io_write, NULL, NULL, opaque);
3092         aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
3093                            io_read, io_write, NULL, NULL, opaque);
3094     } else {
3095         aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
3096                            io_read, io_write, NULL, NULL, opaque);
3097         aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
3098                            io_read, io_write, NULL, NULL, opaque);
3099     }
3100 }
3101 
3102 struct rdma_close_rcu {
3103     struct rcu_head rcu;
3104     RDMAContext *rdmain;
3105     RDMAContext *rdmaout;
3106 };
3107 
3108 /* callback from qio_channel_rdma_close via call_rcu */
3109 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3110 {
3111     if (rcu->rdmain) {
3112         qemu_rdma_cleanup(rcu->rdmain);
3113     }
3114 
3115     if (rcu->rdmaout) {
3116         qemu_rdma_cleanup(rcu->rdmaout);
3117     }
3118 
3119     g_free(rcu->rdmain);
3120     g_free(rcu->rdmaout);
3121     g_free(rcu);
3122 }
3123 
3124 static int qio_channel_rdma_close(QIOChannel *ioc,
3125                                   Error **errp)
3126 {
3127     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3128     RDMAContext *rdmain, *rdmaout;
3129     struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3130 
3131     trace_qemu_rdma_close();
3132 
3133     rdmain = rioc->rdmain;
3134     if (rdmain) {
3135         qatomic_rcu_set(&rioc->rdmain, NULL);
3136     }
3137 
3138     rdmaout = rioc->rdmaout;
3139     if (rdmaout) {
3140         qatomic_rcu_set(&rioc->rdmaout, NULL);
3141     }
3142 
3143     rcu->rdmain = rdmain;
3144     rcu->rdmaout = rdmaout;
3145     call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3146 
3147     return 0;
3148 }
3149 
3150 static int
3151 qio_channel_rdma_shutdown(QIOChannel *ioc,
3152                             QIOChannelShutdown how,
3153                             Error **errp)
3154 {
3155     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3156     RDMAContext *rdmain, *rdmaout;
3157 
3158     RCU_READ_LOCK_GUARD();
3159 
3160     rdmain = qatomic_rcu_read(&rioc->rdmain);
3161     rdmaout = qatomic_rcu_read(&rioc->rdmain);
3162 
3163     switch (how) {
3164     case QIO_CHANNEL_SHUTDOWN_READ:
3165         if (rdmain) {
3166             rdmain->errored = true;
3167         }
3168         break;
3169     case QIO_CHANNEL_SHUTDOWN_WRITE:
3170         if (rdmaout) {
3171             rdmaout->errored = true;
3172         }
3173         break;
3174     case QIO_CHANNEL_SHUTDOWN_BOTH:
3175     default:
3176         if (rdmain) {
3177             rdmain->errored = true;
3178         }
3179         if (rdmaout) {
3180             rdmaout->errored = true;
3181         }
3182         break;
3183     }
3184 
3185     return 0;
3186 }
3187 
3188 /*
3189  * Parameters:
3190  *    @offset == 0 :
3191  *        This means that 'block_offset' is a full virtual address that does not
3192  *        belong to a RAMBlock of the virtual machine and instead
3193  *        represents a private malloc'd memory area that the caller wishes to
3194  *        transfer.
3195  *
3196  *    @offset != 0 :
3197  *        Offset is an offset to be added to block_offset and used
3198  *        to also lookup the corresponding RAMBlock.
3199  *
3200  *    @size : Number of bytes to transfer
3201  *
3202  *    @pages_sent : User-specificed pointer to indicate how many pages were
3203  *                  sent. Usually, this will not be more than a few bytes of
3204  *                  the protocol because most transfers are sent asynchronously.
3205  */
3206 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
3207                                ram_addr_t offset, size_t size)
3208 {
3209     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3210     Error *err = NULL;
3211     RDMAContext *rdma;
3212     int ret;
3213 
3214     RCU_READ_LOCK_GUARD();
3215     rdma = qatomic_rcu_read(&rioc->rdmaout);
3216 
3217     if (!rdma) {
3218         return -1;
3219     }
3220 
3221     if (rdma_errored(rdma)) {
3222         return -1;
3223     }
3224 
3225     qemu_fflush(f);
3226 
3227     /*
3228      * Add this page to the current 'chunk'. If the chunk
3229      * is full, or the page doesn't belong to the current chunk,
3230      * an actual RDMA write will occur and a new chunk will be formed.
3231      */
3232     ret = qemu_rdma_write(rdma, block_offset, offset, size, &err);
3233     if (ret < 0) {
3234         error_report_err(err);
3235         goto err;
3236     }
3237 
3238     /*
3239      * Drain the Completion Queue if possible, but do not block,
3240      * just poll.
3241      *
3242      * If nothing to poll, the end of the iteration will do this
3243      * again to make sure we don't overflow the request queue.
3244      */
3245     while (1) {
3246         uint64_t wr_id, wr_id_in;
3247         ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3248 
3249         if (ret < 0) {
3250             error_report("rdma migration: polling error");
3251             goto err;
3252         }
3253 
3254         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3255 
3256         if (wr_id == RDMA_WRID_NONE) {
3257             break;
3258         }
3259     }
3260 
3261     while (1) {
3262         uint64_t wr_id, wr_id_in;
3263         ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3264 
3265         if (ret < 0) {
3266             error_report("rdma migration: polling error");
3267             goto err;
3268         }
3269 
3270         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3271 
3272         if (wr_id == RDMA_WRID_NONE) {
3273             break;
3274         }
3275     }
3276 
3277     return RAM_SAVE_CONTROL_DELAYED;
3278 
3279 err:
3280     rdma->errored = true;
3281     return -1;
3282 }
3283 
3284 int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset,
3285                            ram_addr_t offset, size_t size)
3286 {
3287     if (!migrate_rdma() || migration_in_postcopy()) {
3288         return RAM_SAVE_CONTROL_NOT_SUPP;
3289     }
3290 
3291     int ret = qemu_rdma_save_page(f, block_offset, offset, size);
3292 
3293     if (ret != RAM_SAVE_CONTROL_DELAYED &&
3294         ret != RAM_SAVE_CONTROL_NOT_SUPP) {
3295         if (ret < 0) {
3296             qemu_file_set_error(f, ret);
3297         }
3298     }
3299     return ret;
3300 }
3301 
3302 static void rdma_accept_incoming_migration(void *opaque);
3303 
3304 static void rdma_cm_poll_handler(void *opaque)
3305 {
3306     RDMAContext *rdma = opaque;
3307     struct rdma_cm_event *cm_event;
3308     MigrationIncomingState *mis = migration_incoming_get_current();
3309 
3310     if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
3311         error_report("get_cm_event failed %d", errno);
3312         return;
3313     }
3314 
3315     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3316         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3317         if (!rdma->errored &&
3318             migration_incoming_get_current()->state !=
3319               MIGRATION_STATUS_COMPLETED) {
3320             error_report("receive cm event, cm event is %d", cm_event->event);
3321             rdma->errored = true;
3322             if (rdma->return_path) {
3323                 rdma->return_path->errored = true;
3324             }
3325         }
3326         rdma_ack_cm_event(cm_event);
3327         if (mis->loadvm_co) {
3328             qemu_coroutine_enter(mis->loadvm_co);
3329         }
3330         return;
3331     }
3332     rdma_ack_cm_event(cm_event);
3333 }
3334 
3335 static int qemu_rdma_accept(RDMAContext *rdma)
3336 {
3337     Error *err = NULL;
3338     RDMACapabilities cap;
3339     struct rdma_conn_param conn_param = {
3340                                             .responder_resources = 2,
3341                                             .private_data = &cap,
3342                                             .private_data_len = sizeof(cap),
3343                                          };
3344     RDMAContext *rdma_return_path = NULL;
3345     g_autoptr(InetSocketAddress) isock = g_new0(InetSocketAddress, 1);
3346     struct rdma_cm_event *cm_event;
3347     struct ibv_context *verbs;
3348     int ret;
3349 
3350     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3351     if (ret < 0) {
3352         goto err_rdma_dest_wait;
3353     }
3354 
3355     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3356         rdma_ack_cm_event(cm_event);
3357         goto err_rdma_dest_wait;
3358     }
3359 
3360     isock->host = g_strdup(rdma->host);
3361     isock->port = g_strdup_printf("%d", rdma->port);
3362 
3363     /*
3364      * initialize the RDMAContext for return path for postcopy after first
3365      * connection request reached.
3366      */
3367     if ((migrate_postcopy() || migrate_return_path())
3368         && !rdma->is_return_path) {
3369         rdma_return_path = qemu_rdma_data_init(isock, NULL);
3370         if (rdma_return_path == NULL) {
3371             rdma_ack_cm_event(cm_event);
3372             goto err_rdma_dest_wait;
3373         }
3374 
3375         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3376     }
3377 
3378     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3379 
3380     network_to_caps(&cap);
3381 
3382     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3383         error_report("Unknown source RDMA version: %d, bailing...",
3384                      cap.version);
3385         rdma_ack_cm_event(cm_event);
3386         goto err_rdma_dest_wait;
3387     }
3388 
3389     /*
3390      * Respond with only the capabilities this version of QEMU knows about.
3391      */
3392     cap.flags &= known_capabilities;
3393 
3394     /*
3395      * Enable the ones that we do know about.
3396      * Add other checks here as new ones are introduced.
3397      */
3398     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3399         rdma->pin_all = true;
3400     }
3401 
3402     rdma->cm_id = cm_event->id;
3403     verbs = cm_event->id->verbs;
3404 
3405     rdma_ack_cm_event(cm_event);
3406 
3407     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3408 
3409     caps_to_network(&cap);
3410 
3411     trace_qemu_rdma_accept_pin_verbsc(verbs);
3412 
3413     if (!rdma->verbs) {
3414         rdma->verbs = verbs;
3415     } else if (rdma->verbs != verbs) {
3416         error_report("ibv context not matching %p, %p!", rdma->verbs,
3417                      verbs);
3418         goto err_rdma_dest_wait;
3419     }
3420 
3421     qemu_rdma_dump_id("dest_init", verbs);
3422 
3423     ret = qemu_rdma_alloc_pd_cq(rdma, &err);
3424     if (ret < 0) {
3425         error_report_err(err);
3426         goto err_rdma_dest_wait;
3427     }
3428 
3429     ret = qemu_rdma_alloc_qp(rdma);
3430     if (ret < 0) {
3431         error_report("rdma migration: error allocating qp!");
3432         goto err_rdma_dest_wait;
3433     }
3434 
3435     qemu_rdma_init_ram_blocks(rdma);
3436 
3437     for (int i = 0; i < RDMA_WRID_MAX; i++) {
3438         ret = qemu_rdma_reg_control(rdma, i);
3439         if (ret < 0) {
3440             error_report("rdma: error registering %d control", i);
3441             goto err_rdma_dest_wait;
3442         }
3443     }
3444 
3445     /* Accept the second connection request for return path */
3446     if ((migrate_postcopy() || migrate_return_path())
3447         && !rdma->is_return_path) {
3448         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3449                             NULL,
3450                             (void *)(intptr_t)rdma->return_path);
3451     } else {
3452         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3453                             NULL, rdma);
3454     }
3455 
3456     ret = rdma_accept(rdma->cm_id, &conn_param);
3457     if (ret < 0) {
3458         error_report("rdma_accept failed");
3459         goto err_rdma_dest_wait;
3460     }
3461 
3462     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3463     if (ret < 0) {
3464         error_report("rdma_accept get_cm_event failed");
3465         goto err_rdma_dest_wait;
3466     }
3467 
3468     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3469         error_report("rdma_accept not event established");
3470         rdma_ack_cm_event(cm_event);
3471         goto err_rdma_dest_wait;
3472     }
3473 
3474     rdma_ack_cm_event(cm_event);
3475     rdma->connected = true;
3476 
3477     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err);
3478     if (ret < 0) {
3479         error_report_err(err);
3480         goto err_rdma_dest_wait;
3481     }
3482 
3483     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3484 
3485     return 0;
3486 
3487 err_rdma_dest_wait:
3488     rdma->errored = true;
3489     qemu_rdma_cleanup(rdma);
3490     g_free(rdma_return_path);
3491     return -1;
3492 }
3493 
3494 static int dest_ram_sort_func(const void *a, const void *b)
3495 {
3496     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3497     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3498 
3499     return (a_index < b_index) ? -1 : (a_index != b_index);
3500 }
3501 
3502 /*
3503  * During each iteration of the migration, we listen for instructions
3504  * by the source VM to perform dynamic page registrations before they
3505  * can perform RDMA operations.
3506  *
3507  * We respond with the 'rkey'.
3508  *
3509  * Keep doing this until the source tells us to stop.
3510  */
3511 int rdma_registration_handle(QEMUFile *f)
3512 {
3513     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3514                                .type = RDMA_CONTROL_REGISTER_RESULT,
3515                                .repeat = 0,
3516                              };
3517     RDMAControlHeader unreg_resp = { .len = 0,
3518                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3519                                .repeat = 0,
3520                              };
3521     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3522                                  .repeat = 1 };
3523     QIOChannelRDMA *rioc;
3524     Error *err = NULL;
3525     RDMAContext *rdma;
3526     RDMALocalBlocks *local;
3527     RDMAControlHeader head;
3528     RDMARegister *reg, *registers;
3529     RDMACompress *comp;
3530     RDMARegisterResult *reg_result;
3531     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3532     RDMALocalBlock *block;
3533     void *host_addr;
3534     int ret;
3535     int idx = 0;
3536 
3537     if (!migrate_rdma()) {
3538         return 0;
3539     }
3540 
3541     RCU_READ_LOCK_GUARD();
3542     rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3543     rdma = qatomic_rcu_read(&rioc->rdmain);
3544 
3545     if (!rdma) {
3546         return -1;
3547     }
3548 
3549     if (rdma_errored(rdma)) {
3550         return -1;
3551     }
3552 
3553     local = &rdma->local_ram_blocks;
3554     do {
3555         trace_rdma_registration_handle_wait();
3556 
3557         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err);
3558 
3559         if (ret < 0) {
3560             error_report_err(err);
3561             break;
3562         }
3563 
3564         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3565             error_report("rdma: Too many requests in this message (%d)."
3566                             "Bailing.", head.repeat);
3567             break;
3568         }
3569 
3570         switch (head.type) {
3571         case RDMA_CONTROL_COMPRESS:
3572             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3573             network_to_compress(comp);
3574 
3575             trace_rdma_registration_handle_compress(comp->length,
3576                                                     comp->block_idx,
3577                                                     comp->offset);
3578             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3579                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3580                              (unsigned int)comp->block_idx,
3581                              rdma->local_ram_blocks.nb_blocks);
3582                 goto err;
3583             }
3584             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3585 
3586             host_addr = block->local_host_addr +
3587                             (comp->offset - block->offset);
3588             if (comp->value) {
3589                 error_report("rdma: Zero page with non-zero (%d) value",
3590                              comp->value);
3591                 goto err;
3592             }
3593             ram_handle_zero(host_addr, comp->length);
3594             break;
3595 
3596         case RDMA_CONTROL_REGISTER_FINISHED:
3597             trace_rdma_registration_handle_finished();
3598             return 0;
3599 
3600         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3601             trace_rdma_registration_handle_ram_blocks();
3602 
3603             /* Sort our local RAM Block list so it's the same as the source,
3604              * we can do this since we've filled in a src_index in the list
3605              * as we received the RAMBlock list earlier.
3606              */
3607             qsort(rdma->local_ram_blocks.block,
3608                   rdma->local_ram_blocks.nb_blocks,
3609                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3610             for (int i = 0; i < local->nb_blocks; i++) {
3611                 local->block[i].index = i;
3612             }
3613 
3614             if (rdma->pin_all) {
3615                 ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err);
3616                 if (ret < 0) {
3617                     error_report_err(err);
3618                     goto err;
3619                 }
3620             }
3621 
3622             /*
3623              * Dest uses this to prepare to transmit the RAMBlock descriptions
3624              * to the source VM after connection setup.
3625              * Both sides use the "remote" structure to communicate and update
3626              * their "local" descriptions with what was sent.
3627              */
3628             for (int i = 0; i < local->nb_blocks; i++) {
3629                 rdma->dest_blocks[i].remote_host_addr =
3630                     (uintptr_t)(local->block[i].local_host_addr);
3631 
3632                 if (rdma->pin_all) {
3633                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3634                 }
3635 
3636                 rdma->dest_blocks[i].offset = local->block[i].offset;
3637                 rdma->dest_blocks[i].length = local->block[i].length;
3638 
3639                 dest_block_to_network(&rdma->dest_blocks[i]);
3640                 trace_rdma_registration_handle_ram_blocks_loop(
3641                     local->block[i].block_name,
3642                     local->block[i].offset,
3643                     local->block[i].length,
3644                     local->block[i].local_host_addr,
3645                     local->block[i].src_index);
3646             }
3647 
3648             blocks.len = rdma->local_ram_blocks.nb_blocks
3649                                                 * sizeof(RDMADestBlock);
3650 
3651 
3652             ret = qemu_rdma_post_send_control(rdma,
3653                                     (uint8_t *) rdma->dest_blocks, &blocks,
3654                                     &err);
3655 
3656             if (ret < 0) {
3657                 error_report_err(err);
3658                 goto err;
3659             }
3660 
3661             break;
3662         case RDMA_CONTROL_REGISTER_REQUEST:
3663             trace_rdma_registration_handle_register(head.repeat);
3664 
3665             reg_resp.repeat = head.repeat;
3666             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3667 
3668             for (int count = 0; count < head.repeat; count++) {
3669                 uint64_t chunk;
3670                 uint8_t *chunk_start, *chunk_end;
3671 
3672                 reg = &registers[count];
3673                 network_to_register(reg);
3674 
3675                 reg_result = &results[count];
3676 
3677                 trace_rdma_registration_handle_register_loop(count,
3678                          reg->current_index, reg->key.current_addr, reg->chunks);
3679 
3680                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3681                     error_report("rdma: 'register' bad block index %u (vs %d)",
3682                                  (unsigned int)reg->current_index,
3683                                  rdma->local_ram_blocks.nb_blocks);
3684                     goto err;
3685                 }
3686                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3687                 if (block->is_ram_block) {
3688                     if (block->offset > reg->key.current_addr) {
3689                         error_report("rdma: bad register address for block %s"
3690                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3691                             block->block_name, block->offset,
3692                             reg->key.current_addr);
3693                         goto err;
3694                     }
3695                     host_addr = (block->local_host_addr +
3696                                 (reg->key.current_addr - block->offset));
3697                     chunk = ram_chunk_index(block->local_host_addr,
3698                                             (uint8_t *) host_addr);
3699                 } else {
3700                     chunk = reg->key.chunk;
3701                     host_addr = block->local_host_addr +
3702                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3703                     /* Check for particularly bad chunk value */
3704                     if (host_addr < (void *)block->local_host_addr) {
3705                         error_report("rdma: bad chunk for block %s"
3706                             " chunk: %" PRIx64,
3707                             block->block_name, reg->key.chunk);
3708                         goto err;
3709                     }
3710                 }
3711                 chunk_start = ram_chunk_start(block, chunk);
3712                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3713                 /* avoid "-Waddress-of-packed-member" warning */
3714                 uint32_t tmp_rkey = 0;
3715                 if (qemu_rdma_register_and_get_keys(rdma, block,
3716                             (uintptr_t)host_addr, NULL, &tmp_rkey,
3717                             chunk, chunk_start, chunk_end)) {
3718                     error_report("cannot get rkey");
3719                     goto err;
3720                 }
3721                 reg_result->rkey = tmp_rkey;
3722 
3723                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3724 
3725                 trace_rdma_registration_handle_register_rkey(reg_result->rkey);
3726 
3727                 result_to_network(reg_result);
3728             }
3729 
3730             ret = qemu_rdma_post_send_control(rdma,
3731                             (uint8_t *) results, &reg_resp, &err);
3732 
3733             if (ret < 0) {
3734                 error_report_err(err);
3735                 goto err;
3736             }
3737             break;
3738         case RDMA_CONTROL_UNREGISTER_REQUEST:
3739             trace_rdma_registration_handle_unregister(head.repeat);
3740             unreg_resp.repeat = head.repeat;
3741             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3742 
3743             for (int count = 0; count < head.repeat; count++) {
3744                 reg = &registers[count];
3745                 network_to_register(reg);
3746 
3747                 trace_rdma_registration_handle_unregister_loop(count,
3748                            reg->current_index, reg->key.chunk);
3749 
3750                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3751 
3752                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3753                 block->pmr[reg->key.chunk] = NULL;
3754 
3755                 if (ret != 0) {
3756                     error_report("rdma unregistration chunk failed: %s",
3757                                  strerror(errno));
3758                     goto err;
3759                 }
3760 
3761                 rdma->total_registrations--;
3762 
3763                 trace_rdma_registration_handle_unregister_success(reg->key.chunk);
3764             }
3765 
3766             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err);
3767 
3768             if (ret < 0) {
3769                 error_report_err(err);
3770                 goto err;
3771             }
3772             break;
3773         case RDMA_CONTROL_REGISTER_RESULT:
3774             error_report("Invalid RESULT message at dest.");
3775             goto err;
3776         default:
3777             error_report("Unknown control message %s", control_desc(head.type));
3778             goto err;
3779         }
3780     } while (1);
3781 
3782 err:
3783     rdma->errored = true;
3784     return -1;
3785 }
3786 
3787 /* Destination:
3788  * Called during the initial RAM load section which lists the
3789  * RAMBlocks by name.  This lets us know the order of the RAMBlocks on
3790  * the source.  We've already built our local RAMBlock list, but not
3791  * yet sent the list to the source.
3792  */
3793 int rdma_block_notification_handle(QEMUFile *f, const char *name)
3794 {
3795     int curr;
3796     int found = -1;
3797 
3798     if (!migrate_rdma()) {
3799         return 0;
3800     }
3801 
3802     RCU_READ_LOCK_GUARD();
3803     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3804     RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain);
3805 
3806     if (!rdma) {
3807         return -1;
3808     }
3809 
3810     /* Find the matching RAMBlock in our local list */
3811     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3812         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3813             found = curr;
3814             break;
3815         }
3816     }
3817 
3818     if (found == -1) {
3819         error_report("RAMBlock '%s' not found on destination", name);
3820         return -1;
3821     }
3822 
3823     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3824     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3825     rdma->next_src_index++;
3826 
3827     return 0;
3828 }
3829 
3830 int rdma_registration_start(QEMUFile *f, uint64_t flags)
3831 {
3832     if (!migrate_rdma() || migration_in_postcopy()) {
3833         return 0;
3834     }
3835 
3836     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3837     RCU_READ_LOCK_GUARD();
3838     RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout);
3839     if (!rdma) {
3840         return -1;
3841     }
3842 
3843     if (rdma_errored(rdma)) {
3844         return -1;
3845     }
3846 
3847     trace_rdma_registration_start(flags);
3848     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3849     return qemu_fflush(f);
3850 }
3851 
3852 /*
3853  * Inform dest that dynamic registrations are done for now.
3854  * First, flush writes, if any.
3855  */
3856 int rdma_registration_stop(QEMUFile *f, uint64_t flags)
3857 {
3858     QIOChannelRDMA *rioc;
3859     Error *err = NULL;
3860     RDMAContext *rdma;
3861     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3862     int ret;
3863 
3864     if (!migrate_rdma() || migration_in_postcopy()) {
3865         return 0;
3866     }
3867 
3868     RCU_READ_LOCK_GUARD();
3869     rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3870     rdma = qatomic_rcu_read(&rioc->rdmaout);
3871     if (!rdma) {
3872         return -1;
3873     }
3874 
3875     if (rdma_errored(rdma)) {
3876         return -1;
3877     }
3878 
3879     qemu_fflush(f);
3880     ret = qemu_rdma_drain_cq(rdma);
3881 
3882     if (ret < 0) {
3883         goto err;
3884     }
3885 
3886     if (flags == RAM_CONTROL_SETUP) {
3887         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3888         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3889         int reg_result_idx, nb_dest_blocks;
3890 
3891         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3892         trace_rdma_registration_stop_ram();
3893 
3894         /*
3895          * Make sure that we parallelize the pinning on both sides.
3896          * For very large guests, doing this serially takes a really
3897          * long time, so we have to 'interleave' the pinning locally
3898          * with the control messages by performing the pinning on this
3899          * side before we receive the control response from the other
3900          * side that the pinning has completed.
3901          */
3902         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3903                     &reg_result_idx, rdma->pin_all ?
3904                     qemu_rdma_reg_whole_ram_blocks : NULL,
3905                     &err);
3906         if (ret < 0) {
3907             error_report_err(err);
3908             return -1;
3909         }
3910 
3911         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3912 
3913         /*
3914          * The protocol uses two different sets of rkeys (mutually exclusive):
3915          * 1. One key to represent the virtual address of the entire ram block.
3916          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3917          * 2. One to represent individual chunks within a ram block.
3918          *    (dynamic chunk registration enabled - pin individual chunks.)
3919          *
3920          * Once the capability is successfully negotiated, the destination transmits
3921          * the keys to use (or sends them later) including the virtual addresses
3922          * and then propagates the remote ram block descriptions to his local copy.
3923          */
3924 
3925         if (local->nb_blocks != nb_dest_blocks) {
3926             error_report("ram blocks mismatch (Number of blocks %d vs %d)",
3927                          local->nb_blocks, nb_dest_blocks);
3928             error_printf("Your QEMU command line parameters are probably "
3929                          "not identical on both the source and destination.");
3930             rdma->errored = true;
3931             return -1;
3932         }
3933 
3934         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3935         memcpy(rdma->dest_blocks,
3936             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3937         for (int i = 0; i < nb_dest_blocks; i++) {
3938             network_to_dest_block(&rdma->dest_blocks[i]);
3939 
3940             /* We require that the blocks are in the same order */
3941             if (rdma->dest_blocks[i].length != local->block[i].length) {
3942                 error_report("Block %s/%d has a different length %" PRIu64
3943                              "vs %" PRIu64,
3944                              local->block[i].block_name, i,
3945                              local->block[i].length,
3946                              rdma->dest_blocks[i].length);
3947                 rdma->errored = true;
3948                 return -1;
3949             }
3950             local->block[i].remote_host_addr =
3951                     rdma->dest_blocks[i].remote_host_addr;
3952             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3953         }
3954     }
3955 
3956     trace_rdma_registration_stop(flags);
3957 
3958     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3959     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err);
3960 
3961     if (ret < 0) {
3962         error_report_err(err);
3963         goto err;
3964     }
3965 
3966     return 0;
3967 err:
3968     rdma->errored = true;
3969     return -1;
3970 }
3971 
3972 static void qio_channel_rdma_finalize(Object *obj)
3973 {
3974     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3975     if (rioc->rdmain) {
3976         qemu_rdma_cleanup(rioc->rdmain);
3977         g_free(rioc->rdmain);
3978         rioc->rdmain = NULL;
3979     }
3980     if (rioc->rdmaout) {
3981         qemu_rdma_cleanup(rioc->rdmaout);
3982         g_free(rioc->rdmaout);
3983         rioc->rdmaout = NULL;
3984     }
3985 }
3986 
3987 static void qio_channel_rdma_class_init(ObjectClass *klass,
3988                                         void *class_data G_GNUC_UNUSED)
3989 {
3990     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3991 
3992     ioc_klass->io_writev = qio_channel_rdma_writev;
3993     ioc_klass->io_readv = qio_channel_rdma_readv;
3994     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3995     ioc_klass->io_close = qio_channel_rdma_close;
3996     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3997     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3998     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3999 }
4000 
4001 static const TypeInfo qio_channel_rdma_info = {
4002     .parent = TYPE_QIO_CHANNEL,
4003     .name = TYPE_QIO_CHANNEL_RDMA,
4004     .instance_size = sizeof(QIOChannelRDMA),
4005     .instance_finalize = qio_channel_rdma_finalize,
4006     .class_init = qio_channel_rdma_class_init,
4007 };
4008 
4009 static void qio_channel_rdma_register_types(void)
4010 {
4011     type_register_static(&qio_channel_rdma_info);
4012 }
4013 
4014 type_init(qio_channel_rdma_register_types);
4015 
4016 static QEMUFile *rdma_new_input(RDMAContext *rdma)
4017 {
4018     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4019 
4020     rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
4021     rioc->rdmain = rdma;
4022     rioc->rdmaout = rdma->return_path;
4023 
4024     return rioc->file;
4025 }
4026 
4027 static QEMUFile *rdma_new_output(RDMAContext *rdma)
4028 {
4029     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4030 
4031     rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
4032     rioc->rdmaout = rdma;
4033     rioc->rdmain = rdma->return_path;
4034 
4035     return rioc->file;
4036 }
4037 
4038 static void rdma_accept_incoming_migration(void *opaque)
4039 {
4040     RDMAContext *rdma = opaque;
4041     QEMUFile *f;
4042 
4043     trace_qemu_rdma_accept_incoming_migration();
4044     if (qemu_rdma_accept(rdma) < 0) {
4045         error_report("RDMA ERROR: Migration initialization failed");
4046         return;
4047     }
4048 
4049     trace_qemu_rdma_accept_incoming_migration_accepted();
4050 
4051     if (rdma->is_return_path) {
4052         return;
4053     }
4054 
4055     f = rdma_new_input(rdma);
4056     if (f == NULL) {
4057         error_report("RDMA ERROR: could not open RDMA for input");
4058         qemu_rdma_cleanup(rdma);
4059         return;
4060     }
4061 
4062     rdma->migration_started_on_destination = 1;
4063     migration_fd_process_incoming(f);
4064 }
4065 
4066 void rdma_start_incoming_migration(InetSocketAddress *host_port,
4067                                    Error **errp)
4068 {
4069     MigrationState *s = migrate_get_current();
4070     int ret;
4071     RDMAContext *rdma;
4072 
4073     trace_rdma_start_incoming_migration();
4074 
4075     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4076     if (ram_block_discard_is_required()) {
4077         error_setg(errp, "RDMA: cannot disable RAM discard");
4078         return;
4079     }
4080 
4081     rdma = qemu_rdma_data_init(host_port, errp);
4082     if (rdma == NULL) {
4083         goto err;
4084     }
4085 
4086     ret = qemu_rdma_dest_init(rdma, errp);
4087     if (ret < 0) {
4088         goto err;
4089     }
4090 
4091     trace_rdma_start_incoming_migration_after_dest_init();
4092 
4093     ret = rdma_listen(rdma->listen_id, 5);
4094 
4095     if (ret < 0) {
4096         error_setg(errp, "RDMA ERROR: listening on socket!");
4097         goto cleanup_rdma;
4098     }
4099 
4100     trace_rdma_start_incoming_migration_after_rdma_listen();
4101     s->rdma_migration = true;
4102     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4103                         NULL, (void *)(intptr_t)rdma);
4104     return;
4105 
4106 cleanup_rdma:
4107     qemu_rdma_cleanup(rdma);
4108 err:
4109     if (rdma) {
4110         g_free(rdma->host);
4111     }
4112     g_free(rdma);
4113 }
4114 
4115 void rdma_start_outgoing_migration(void *opaque,
4116                             InetSocketAddress *host_port, Error **errp)
4117 {
4118     MigrationState *s = opaque;
4119     RDMAContext *rdma_return_path = NULL;
4120     RDMAContext *rdma;
4121     int ret;
4122 
4123     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4124     if (ram_block_discard_is_required()) {
4125         error_setg(errp, "RDMA: cannot disable RAM discard");
4126         return;
4127     }
4128 
4129     rdma = qemu_rdma_data_init(host_port, errp);
4130     if (rdma == NULL) {
4131         goto err;
4132     }
4133 
4134     ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
4135 
4136     if (ret < 0) {
4137         goto err;
4138     }
4139 
4140     trace_rdma_start_outgoing_migration_after_rdma_source_init();
4141     ret = qemu_rdma_connect(rdma, false, errp);
4142 
4143     if (ret < 0) {
4144         goto err;
4145     }
4146 
4147     /* RDMA postcopy need a separate queue pair for return path */
4148     if (migrate_postcopy() || migrate_return_path()) {
4149         rdma_return_path = qemu_rdma_data_init(host_port, errp);
4150 
4151         if (rdma_return_path == NULL) {
4152             goto return_path_err;
4153         }
4154 
4155         ret = qemu_rdma_source_init(rdma_return_path,
4156                                     migrate_rdma_pin_all(), errp);
4157 
4158         if (ret < 0) {
4159             goto return_path_err;
4160         }
4161 
4162         ret = qemu_rdma_connect(rdma_return_path, true, errp);
4163 
4164         if (ret < 0) {
4165             goto return_path_err;
4166         }
4167 
4168         rdma->return_path = rdma_return_path;
4169         rdma_return_path->return_path = rdma;
4170         rdma_return_path->is_return_path = true;
4171     }
4172 
4173     trace_rdma_start_outgoing_migration_after_rdma_connect();
4174 
4175     s->to_dst_file = rdma_new_output(rdma);
4176     s->rdma_migration = true;
4177     migration_connect(s, NULL);
4178     return;
4179 return_path_err:
4180     qemu_rdma_cleanup(rdma);
4181 err:
4182     g_free(rdma);
4183     g_free(rdma_return_path);
4184 }
4185