xref: /openbmc/qemu/migration/rdma.c (revision f3635813)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "rdma.h"
21 #include "migration.h"
22 #include "qemu-file.h"
23 #include "ram.h"
24 #include "qemu-file-channel.h"
25 #include "qemu/error-report.h"
26 #include "qemu/main-loop.h"
27 #include "qemu/module.h"
28 #include "qemu/rcu.h"
29 #include "qemu/sockets.h"
30 #include "qemu/bitmap.h"
31 #include "qemu/coroutine.h"
32 #include <sys/socket.h>
33 #include <netdb.h>
34 #include <arpa/inet.h>
35 #include <rdma/rdma_cma.h>
36 #include "trace.h"
37 
38 /*
39  * Print and error on both the Monitor and the Log file.
40  */
41 #define ERROR(errp, fmt, ...) \
42     do { \
43         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
44         if (errp && (*(errp) == NULL)) { \
45             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
46         } \
47     } while (0)
48 
49 #define RDMA_RESOLVE_TIMEOUT_MS 10000
50 
51 /* Do not merge data if larger than this. */
52 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
53 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
54 
55 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
56 
57 /*
58  * This is only for non-live state being migrated.
59  * Instead of RDMA_WRITE messages, we use RDMA_SEND
60  * messages for that state, which requires a different
61  * delivery design than main memory.
62  */
63 #define RDMA_SEND_INCREMENT 32768
64 
65 /*
66  * Maximum size infiniband SEND message
67  */
68 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
69 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
70 
71 #define RDMA_CONTROL_VERSION_CURRENT 1
72 /*
73  * Capabilities for negotiation.
74  */
75 #define RDMA_CAPABILITY_PIN_ALL 0x01
76 
77 /*
78  * Add the other flags above to this list of known capabilities
79  * as they are introduced.
80  */
81 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
82 
83 #define CHECK_ERROR_STATE() \
84     do { \
85         if (rdma->error_state) { \
86             if (!rdma->error_reported) { \
87                 error_report("RDMA is in an error state waiting migration" \
88                                 " to abort!"); \
89                 rdma->error_reported = 1; \
90             } \
91             return rdma->error_state; \
92         } \
93     } while (0)
94 
95 /*
96  * A work request ID is 64-bits and we split up these bits
97  * into 3 parts:
98  *
99  * bits 0-15 : type of control message, 2^16
100  * bits 16-29: ram block index, 2^14
101  * bits 30-63: ram block chunk number, 2^34
102  *
103  * The last two bit ranges are only used for RDMA writes,
104  * in order to track their completion and potentially
105  * also track unregistration status of the message.
106  */
107 #define RDMA_WRID_TYPE_SHIFT  0UL
108 #define RDMA_WRID_BLOCK_SHIFT 16UL
109 #define RDMA_WRID_CHUNK_SHIFT 30UL
110 
111 #define RDMA_WRID_TYPE_MASK \
112     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
113 
114 #define RDMA_WRID_BLOCK_MASK \
115     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
116 
117 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
118 
119 /*
120  * RDMA migration protocol:
121  * 1. RDMA Writes (data messages, i.e. RAM)
122  * 2. IB Send/Recv (control channel messages)
123  */
124 enum {
125     RDMA_WRID_NONE = 0,
126     RDMA_WRID_RDMA_WRITE = 1,
127     RDMA_WRID_SEND_CONTROL = 2000,
128     RDMA_WRID_RECV_CONTROL = 4000,
129 };
130 
131 static const char *wrid_desc[] = {
132     [RDMA_WRID_NONE] = "NONE",
133     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
134     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
135     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
136 };
137 
138 /*
139  * Work request IDs for IB SEND messages only (not RDMA writes).
140  * This is used by the migration protocol to transmit
141  * control messages (such as device state and registration commands)
142  *
143  * We could use more WRs, but we have enough for now.
144  */
145 enum {
146     RDMA_WRID_READY = 0,
147     RDMA_WRID_DATA,
148     RDMA_WRID_CONTROL,
149     RDMA_WRID_MAX,
150 };
151 
152 /*
153  * SEND/RECV IB Control Messages.
154  */
155 enum {
156     RDMA_CONTROL_NONE = 0,
157     RDMA_CONTROL_ERROR,
158     RDMA_CONTROL_READY,               /* ready to receive */
159     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
160     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
161     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
162     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
163     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
164     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
165     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
166     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
167     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
168 };
169 
170 
171 /*
172  * Memory and MR structures used to represent an IB Send/Recv work request.
173  * This is *not* used for RDMA writes, only IB Send/Recv.
174  */
175 typedef struct {
176     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
177     struct   ibv_mr *control_mr;               /* registration metadata */
178     size_t   control_len;                      /* length of the message */
179     uint8_t *control_curr;                     /* start of unconsumed bytes */
180 } RDMAWorkRequestData;
181 
182 /*
183  * Negotiate RDMA capabilities during connection-setup time.
184  */
185 typedef struct {
186     uint32_t version;
187     uint32_t flags;
188 } RDMACapabilities;
189 
190 static void caps_to_network(RDMACapabilities *cap)
191 {
192     cap->version = htonl(cap->version);
193     cap->flags = htonl(cap->flags);
194 }
195 
196 static void network_to_caps(RDMACapabilities *cap)
197 {
198     cap->version = ntohl(cap->version);
199     cap->flags = ntohl(cap->flags);
200 }
201 
202 /*
203  * Representation of a RAMBlock from an RDMA perspective.
204  * This is not transmitted, only local.
205  * This and subsequent structures cannot be linked lists
206  * because we're using a single IB message to transmit
207  * the information. It's small anyway, so a list is overkill.
208  */
209 typedef struct RDMALocalBlock {
210     char          *block_name;
211     uint8_t       *local_host_addr; /* local virtual address */
212     uint64_t       remote_host_addr; /* remote virtual address */
213     uint64_t       offset;
214     uint64_t       length;
215     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
216     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
217     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
218     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
219     int            index;           /* which block are we */
220     unsigned int   src_index;       /* (Only used on dest) */
221     bool           is_ram_block;
222     int            nb_chunks;
223     unsigned long *transit_bitmap;
224     unsigned long *unregister_bitmap;
225 } RDMALocalBlock;
226 
227 /*
228  * Also represents a RAMblock, but only on the dest.
229  * This gets transmitted by the dest during connection-time
230  * to the source VM and then is used to populate the
231  * corresponding RDMALocalBlock with
232  * the information needed to perform the actual RDMA.
233  */
234 typedef struct QEMU_PACKED RDMADestBlock {
235     uint64_t remote_host_addr;
236     uint64_t offset;
237     uint64_t length;
238     uint32_t remote_rkey;
239     uint32_t padding;
240 } RDMADestBlock;
241 
242 static const char *control_desc(unsigned int rdma_control)
243 {
244     static const char *strs[] = {
245         [RDMA_CONTROL_NONE] = "NONE",
246         [RDMA_CONTROL_ERROR] = "ERROR",
247         [RDMA_CONTROL_READY] = "READY",
248         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
249         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
250         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
251         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
252         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
253         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
254         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
255         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
256         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
257     };
258 
259     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
260         return "??BAD CONTROL VALUE??";
261     }
262 
263     return strs[rdma_control];
264 }
265 
266 static uint64_t htonll(uint64_t v)
267 {
268     union { uint32_t lv[2]; uint64_t llv; } u;
269     u.lv[0] = htonl(v >> 32);
270     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
271     return u.llv;
272 }
273 
274 static uint64_t ntohll(uint64_t v) {
275     union { uint32_t lv[2]; uint64_t llv; } u;
276     u.llv = v;
277     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
278 }
279 
280 static void dest_block_to_network(RDMADestBlock *db)
281 {
282     db->remote_host_addr = htonll(db->remote_host_addr);
283     db->offset = htonll(db->offset);
284     db->length = htonll(db->length);
285     db->remote_rkey = htonl(db->remote_rkey);
286 }
287 
288 static void network_to_dest_block(RDMADestBlock *db)
289 {
290     db->remote_host_addr = ntohll(db->remote_host_addr);
291     db->offset = ntohll(db->offset);
292     db->length = ntohll(db->length);
293     db->remote_rkey = ntohl(db->remote_rkey);
294 }
295 
296 /*
297  * Virtual address of the above structures used for transmitting
298  * the RAMBlock descriptions at connection-time.
299  * This structure is *not* transmitted.
300  */
301 typedef struct RDMALocalBlocks {
302     int nb_blocks;
303     bool     init;             /* main memory init complete */
304     RDMALocalBlock *block;
305 } RDMALocalBlocks;
306 
307 /*
308  * Main data structure for RDMA state.
309  * While there is only one copy of this structure being allocated right now,
310  * this is the place where one would start if you wanted to consider
311  * having more than one RDMA connection open at the same time.
312  */
313 typedef struct RDMAContext {
314     char *host;
315     int port;
316 
317     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
318 
319     /*
320      * This is used by *_exchange_send() to figure out whether or not
321      * the initial "READY" message has already been received or not.
322      * This is because other functions may potentially poll() and detect
323      * the READY message before send() does, in which case we need to
324      * know if it completed.
325      */
326     int control_ready_expected;
327 
328     /* number of outstanding writes */
329     int nb_sent;
330 
331     /* store info about current buffer so that we can
332        merge it with future sends */
333     uint64_t current_addr;
334     uint64_t current_length;
335     /* index of ram block the current buffer belongs to */
336     int current_index;
337     /* index of the chunk in the current ram block */
338     int current_chunk;
339 
340     bool pin_all;
341 
342     /*
343      * infiniband-specific variables for opening the device
344      * and maintaining connection state and so forth.
345      *
346      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
347      * cm_id->verbs, cm_id->channel, and cm_id->qp.
348      */
349     struct rdma_cm_id *cm_id;               /* connection manager ID */
350     struct rdma_cm_id *listen_id;
351     bool connected;
352 
353     struct ibv_context          *verbs;
354     struct rdma_event_channel   *channel;
355     struct ibv_qp *qp;                      /* queue pair */
356     struct ibv_comp_channel *comp_channel;  /* completion channel */
357     struct ibv_pd *pd;                      /* protection domain */
358     struct ibv_cq *cq;                      /* completion queue */
359 
360     /*
361      * If a previous write failed (perhaps because of a failed
362      * memory registration, then do not attempt any future work
363      * and remember the error state.
364      */
365     int error_state;
366     int error_reported;
367     int received_error;
368 
369     /*
370      * Description of ram blocks used throughout the code.
371      */
372     RDMALocalBlocks local_ram_blocks;
373     RDMADestBlock  *dest_blocks;
374 
375     /* Index of the next RAMBlock received during block registration */
376     unsigned int    next_src_index;
377 
378     /*
379      * Migration on *destination* started.
380      * Then use coroutine yield function.
381      * Source runs in a thread, so we don't care.
382      */
383     int migration_started_on_destination;
384 
385     int total_registrations;
386     int total_writes;
387 
388     int unregister_current, unregister_next;
389     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
390 
391     GHashTable *blockmap;
392 
393     /* the RDMAContext for return path */
394     struct RDMAContext *return_path;
395     bool is_return_path;
396 } RDMAContext;
397 
398 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
399 #define QIO_CHANNEL_RDMA(obj)                                     \
400     OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
401 
402 typedef struct QIOChannelRDMA QIOChannelRDMA;
403 
404 
405 struct QIOChannelRDMA {
406     QIOChannel parent;
407     RDMAContext *rdmain;
408     RDMAContext *rdmaout;
409     QEMUFile *file;
410     bool blocking; /* XXX we don't actually honour this yet */
411 };
412 
413 /*
414  * Main structure for IB Send/Recv control messages.
415  * This gets prepended at the beginning of every Send/Recv.
416  */
417 typedef struct QEMU_PACKED {
418     uint32_t len;     /* Total length of data portion */
419     uint32_t type;    /* which control command to perform */
420     uint32_t repeat;  /* number of commands in data portion of same type */
421     uint32_t padding;
422 } RDMAControlHeader;
423 
424 static void control_to_network(RDMAControlHeader *control)
425 {
426     control->type = htonl(control->type);
427     control->len = htonl(control->len);
428     control->repeat = htonl(control->repeat);
429 }
430 
431 static void network_to_control(RDMAControlHeader *control)
432 {
433     control->type = ntohl(control->type);
434     control->len = ntohl(control->len);
435     control->repeat = ntohl(control->repeat);
436 }
437 
438 /*
439  * Register a single Chunk.
440  * Information sent by the source VM to inform the dest
441  * to register an single chunk of memory before we can perform
442  * the actual RDMA operation.
443  */
444 typedef struct QEMU_PACKED {
445     union QEMU_PACKED {
446         uint64_t current_addr;  /* offset into the ram_addr_t space */
447         uint64_t chunk;         /* chunk to lookup if unregistering */
448     } key;
449     uint32_t current_index; /* which ramblock the chunk belongs to */
450     uint32_t padding;
451     uint64_t chunks;            /* how many sequential chunks to register */
452 } RDMARegister;
453 
454 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
455 {
456     RDMALocalBlock *local_block;
457     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
458 
459     if (local_block->is_ram_block) {
460         /*
461          * current_addr as passed in is an address in the local ram_addr_t
462          * space, we need to translate this for the destination
463          */
464         reg->key.current_addr -= local_block->offset;
465         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
466     }
467     reg->key.current_addr = htonll(reg->key.current_addr);
468     reg->current_index = htonl(reg->current_index);
469     reg->chunks = htonll(reg->chunks);
470 }
471 
472 static void network_to_register(RDMARegister *reg)
473 {
474     reg->key.current_addr = ntohll(reg->key.current_addr);
475     reg->current_index = ntohl(reg->current_index);
476     reg->chunks = ntohll(reg->chunks);
477 }
478 
479 typedef struct QEMU_PACKED {
480     uint32_t value;     /* if zero, we will madvise() */
481     uint32_t block_idx; /* which ram block index */
482     uint64_t offset;    /* Address in remote ram_addr_t space */
483     uint64_t length;    /* length of the chunk */
484 } RDMACompress;
485 
486 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
487 {
488     comp->value = htonl(comp->value);
489     /*
490      * comp->offset as passed in is an address in the local ram_addr_t
491      * space, we need to translate this for the destination
492      */
493     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
494     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
495     comp->block_idx = htonl(comp->block_idx);
496     comp->offset = htonll(comp->offset);
497     comp->length = htonll(comp->length);
498 }
499 
500 static void network_to_compress(RDMACompress *comp)
501 {
502     comp->value = ntohl(comp->value);
503     comp->block_idx = ntohl(comp->block_idx);
504     comp->offset = ntohll(comp->offset);
505     comp->length = ntohll(comp->length);
506 }
507 
508 /*
509  * The result of the dest's memory registration produces an "rkey"
510  * which the source VM must reference in order to perform
511  * the RDMA operation.
512  */
513 typedef struct QEMU_PACKED {
514     uint32_t rkey;
515     uint32_t padding;
516     uint64_t host_addr;
517 } RDMARegisterResult;
518 
519 static void result_to_network(RDMARegisterResult *result)
520 {
521     result->rkey = htonl(result->rkey);
522     result->host_addr = htonll(result->host_addr);
523 };
524 
525 static void network_to_result(RDMARegisterResult *result)
526 {
527     result->rkey = ntohl(result->rkey);
528     result->host_addr = ntohll(result->host_addr);
529 };
530 
531 const char *print_wrid(int wrid);
532 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
533                                    uint8_t *data, RDMAControlHeader *resp,
534                                    int *resp_idx,
535                                    int (*callback)(RDMAContext *rdma));
536 
537 static inline uint64_t ram_chunk_index(const uint8_t *start,
538                                        const uint8_t *host)
539 {
540     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
541 }
542 
543 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
544                                        uint64_t i)
545 {
546     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
547                                   (i << RDMA_REG_CHUNK_SHIFT));
548 }
549 
550 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
551                                      uint64_t i)
552 {
553     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
554                                          (1UL << RDMA_REG_CHUNK_SHIFT);
555 
556     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
557         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
558     }
559 
560     return result;
561 }
562 
563 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
564                          void *host_addr,
565                          ram_addr_t block_offset, uint64_t length)
566 {
567     RDMALocalBlocks *local = &rdma->local_ram_blocks;
568     RDMALocalBlock *block;
569     RDMALocalBlock *old = local->block;
570 
571     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
572 
573     if (local->nb_blocks) {
574         int x;
575 
576         if (rdma->blockmap) {
577             for (x = 0; x < local->nb_blocks; x++) {
578                 g_hash_table_remove(rdma->blockmap,
579                                     (void *)(uintptr_t)old[x].offset);
580                 g_hash_table_insert(rdma->blockmap,
581                                     (void *)(uintptr_t)old[x].offset,
582                                     &local->block[x]);
583             }
584         }
585         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
586         g_free(old);
587     }
588 
589     block = &local->block[local->nb_blocks];
590 
591     block->block_name = g_strdup(block_name);
592     block->local_host_addr = host_addr;
593     block->offset = block_offset;
594     block->length = length;
595     block->index = local->nb_blocks;
596     block->src_index = ~0U; /* Filled in by the receipt of the block list */
597     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
598     block->transit_bitmap = bitmap_new(block->nb_chunks);
599     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
600     block->unregister_bitmap = bitmap_new(block->nb_chunks);
601     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
602     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
603 
604     block->is_ram_block = local->init ? false : true;
605 
606     if (rdma->blockmap) {
607         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
608     }
609 
610     trace_rdma_add_block(block_name, local->nb_blocks,
611                          (uintptr_t) block->local_host_addr,
612                          block->offset, block->length,
613                          (uintptr_t) (block->local_host_addr + block->length),
614                          BITS_TO_LONGS(block->nb_chunks) *
615                              sizeof(unsigned long) * 8,
616                          block->nb_chunks);
617 
618     local->nb_blocks++;
619 
620     return 0;
621 }
622 
623 /*
624  * Memory regions need to be registered with the device and queue pairs setup
625  * in advanced before the migration starts. This tells us where the RAM blocks
626  * are so that we can register them individually.
627  */
628 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
629 {
630     const char *block_name = qemu_ram_get_idstr(rb);
631     void *host_addr = qemu_ram_get_host_addr(rb);
632     ram_addr_t block_offset = qemu_ram_get_offset(rb);
633     ram_addr_t length = qemu_ram_get_used_length(rb);
634     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
635 }
636 
637 /*
638  * Identify the RAMBlocks and their quantity. They will be references to
639  * identify chunk boundaries inside each RAMBlock and also be referenced
640  * during dynamic page registration.
641  */
642 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
643 {
644     RDMALocalBlocks *local = &rdma->local_ram_blocks;
645     int ret;
646 
647     assert(rdma->blockmap == NULL);
648     memset(local, 0, sizeof *local);
649     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
650     if (ret) {
651         return ret;
652     }
653     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
654     rdma->dest_blocks = g_new0(RDMADestBlock,
655                                rdma->local_ram_blocks.nb_blocks);
656     local->init = true;
657     return 0;
658 }
659 
660 /*
661  * Note: If used outside of cleanup, the caller must ensure that the destination
662  * block structures are also updated
663  */
664 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
665 {
666     RDMALocalBlocks *local = &rdma->local_ram_blocks;
667     RDMALocalBlock *old = local->block;
668     int x;
669 
670     if (rdma->blockmap) {
671         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
672     }
673     if (block->pmr) {
674         int j;
675 
676         for (j = 0; j < block->nb_chunks; j++) {
677             if (!block->pmr[j]) {
678                 continue;
679             }
680             ibv_dereg_mr(block->pmr[j]);
681             rdma->total_registrations--;
682         }
683         g_free(block->pmr);
684         block->pmr = NULL;
685     }
686 
687     if (block->mr) {
688         ibv_dereg_mr(block->mr);
689         rdma->total_registrations--;
690         block->mr = NULL;
691     }
692 
693     g_free(block->transit_bitmap);
694     block->transit_bitmap = NULL;
695 
696     g_free(block->unregister_bitmap);
697     block->unregister_bitmap = NULL;
698 
699     g_free(block->remote_keys);
700     block->remote_keys = NULL;
701 
702     g_free(block->block_name);
703     block->block_name = NULL;
704 
705     if (rdma->blockmap) {
706         for (x = 0; x < local->nb_blocks; x++) {
707             g_hash_table_remove(rdma->blockmap,
708                                 (void *)(uintptr_t)old[x].offset);
709         }
710     }
711 
712     if (local->nb_blocks > 1) {
713 
714         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
715 
716         if (block->index) {
717             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
718         }
719 
720         if (block->index < (local->nb_blocks - 1)) {
721             memcpy(local->block + block->index, old + (block->index + 1),
722                 sizeof(RDMALocalBlock) *
723                     (local->nb_blocks - (block->index + 1)));
724             for (x = block->index; x < local->nb_blocks - 1; x++) {
725                 local->block[x].index--;
726             }
727         }
728     } else {
729         assert(block == local->block);
730         local->block = NULL;
731     }
732 
733     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
734                            block->offset, block->length,
735                             (uintptr_t)(block->local_host_addr + block->length),
736                            BITS_TO_LONGS(block->nb_chunks) *
737                                sizeof(unsigned long) * 8, block->nb_chunks);
738 
739     g_free(old);
740 
741     local->nb_blocks--;
742 
743     if (local->nb_blocks && rdma->blockmap) {
744         for (x = 0; x < local->nb_blocks; x++) {
745             g_hash_table_insert(rdma->blockmap,
746                                 (void *)(uintptr_t)local->block[x].offset,
747                                 &local->block[x]);
748         }
749     }
750 
751     return 0;
752 }
753 
754 /*
755  * Put in the log file which RDMA device was opened and the details
756  * associated with that device.
757  */
758 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
759 {
760     struct ibv_port_attr port;
761 
762     if (ibv_query_port(verbs, 1, &port)) {
763         error_report("Failed to query port information");
764         return;
765     }
766 
767     printf("%s RDMA Device opened: kernel name %s "
768            "uverbs device name %s, "
769            "infiniband_verbs class device path %s, "
770            "infiniband class device path %s, "
771            "transport: (%d) %s\n",
772                 who,
773                 verbs->device->name,
774                 verbs->device->dev_name,
775                 verbs->device->dev_path,
776                 verbs->device->ibdev_path,
777                 port.link_layer,
778                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
779                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
780                     ? "Ethernet" : "Unknown"));
781 }
782 
783 /*
784  * Put in the log file the RDMA gid addressing information,
785  * useful for folks who have trouble understanding the
786  * RDMA device hierarchy in the kernel.
787  */
788 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
789 {
790     char sgid[33];
791     char dgid[33];
792     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
793     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
794     trace_qemu_rdma_dump_gid(who, sgid, dgid);
795 }
796 
797 /*
798  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
799  * We will try the next addrinfo struct, and fail if there are
800  * no other valid addresses to bind against.
801  *
802  * If user is listening on '[::]', then we will not have a opened a device
803  * yet and have no way of verifying if the device is RoCE or not.
804  *
805  * In this case, the source VM will throw an error for ALL types of
806  * connections (both IPv4 and IPv6) if the destination machine does not have
807  * a regular infiniband network available for use.
808  *
809  * The only way to guarantee that an error is thrown for broken kernels is
810  * for the management software to choose a *specific* interface at bind time
811  * and validate what time of hardware it is.
812  *
813  * Unfortunately, this puts the user in a fix:
814  *
815  *  If the source VM connects with an IPv4 address without knowing that the
816  *  destination has bound to '[::]' the migration will unconditionally fail
817  *  unless the management software is explicitly listening on the IPv4
818  *  address while using a RoCE-based device.
819  *
820  *  If the source VM connects with an IPv6 address, then we're OK because we can
821  *  throw an error on the source (and similarly on the destination).
822  *
823  *  But in mixed environments, this will be broken for a while until it is fixed
824  *  inside linux.
825  *
826  * We do provide a *tiny* bit of help in this function: We can list all of the
827  * devices in the system and check to see if all the devices are RoCE or
828  * Infiniband.
829  *
830  * If we detect that we have a *pure* RoCE environment, then we can safely
831  * thrown an error even if the management software has specified '[::]' as the
832  * bind address.
833  *
834  * However, if there is are multiple hetergeneous devices, then we cannot make
835  * this assumption and the user just has to be sure they know what they are
836  * doing.
837  *
838  * Patches are being reviewed on linux-rdma.
839  */
840 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
841 {
842     /* This bug only exists in linux, to our knowledge. */
843 #ifdef CONFIG_LINUX
844     struct ibv_port_attr port_attr;
845 
846     /*
847      * Verbs are only NULL if management has bound to '[::]'.
848      *
849      * Let's iterate through all the devices and see if there any pure IB
850      * devices (non-ethernet).
851      *
852      * If not, then we can safely proceed with the migration.
853      * Otherwise, there are no guarantees until the bug is fixed in linux.
854      */
855     if (!verbs) {
856         int num_devices, x;
857         struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
858         bool roce_found = false;
859         bool ib_found = false;
860 
861         for (x = 0; x < num_devices; x++) {
862             verbs = ibv_open_device(dev_list[x]);
863             if (!verbs) {
864                 if (errno == EPERM) {
865                     continue;
866                 } else {
867                     return -EINVAL;
868                 }
869             }
870 
871             if (ibv_query_port(verbs, 1, &port_attr)) {
872                 ibv_close_device(verbs);
873                 ERROR(errp, "Could not query initial IB port");
874                 return -EINVAL;
875             }
876 
877             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
878                 ib_found = true;
879             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
880                 roce_found = true;
881             }
882 
883             ibv_close_device(verbs);
884 
885         }
886 
887         if (roce_found) {
888             if (ib_found) {
889                 fprintf(stderr, "WARN: migrations may fail:"
890                                 " IPv6 over RoCE / iWARP in linux"
891                                 " is broken. But since you appear to have a"
892                                 " mixed RoCE / IB environment, be sure to only"
893                                 " migrate over the IB fabric until the kernel "
894                                 " fixes the bug.\n");
895             } else {
896                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
897                             " and your management software has specified '[::]'"
898                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
899                 return -ENONET;
900             }
901         }
902 
903         return 0;
904     }
905 
906     /*
907      * If we have a verbs context, that means that some other than '[::]' was
908      * used by the management software for binding. In which case we can
909      * actually warn the user about a potentially broken kernel.
910      */
911 
912     /* IB ports start with 1, not 0 */
913     if (ibv_query_port(verbs, 1, &port_attr)) {
914         ERROR(errp, "Could not query initial IB port");
915         return -EINVAL;
916     }
917 
918     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
919         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
920                     "(but patches on linux-rdma in progress)");
921         return -ENONET;
922     }
923 
924 #endif
925 
926     return 0;
927 }
928 
929 /*
930  * Figure out which RDMA device corresponds to the requested IP hostname
931  * Also create the initial connection manager identifiers for opening
932  * the connection.
933  */
934 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
935 {
936     int ret;
937     struct rdma_addrinfo *res;
938     char port_str[16];
939     struct rdma_cm_event *cm_event;
940     char ip[40] = "unknown";
941     struct rdma_addrinfo *e;
942 
943     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
944         ERROR(errp, "RDMA hostname has not been set");
945         return -EINVAL;
946     }
947 
948     /* create CM channel */
949     rdma->channel = rdma_create_event_channel();
950     if (!rdma->channel) {
951         ERROR(errp, "could not create CM channel");
952         return -EINVAL;
953     }
954 
955     /* create CM id */
956     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
957     if (ret) {
958         ERROR(errp, "could not create channel id");
959         goto err_resolve_create_id;
960     }
961 
962     snprintf(port_str, 16, "%d", rdma->port);
963     port_str[15] = '\0';
964 
965     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
966     if (ret < 0) {
967         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
968         goto err_resolve_get_addr;
969     }
970 
971     for (e = res; e != NULL; e = e->ai_next) {
972         inet_ntop(e->ai_family,
973             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
974         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
975 
976         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
977                 RDMA_RESOLVE_TIMEOUT_MS);
978         if (!ret) {
979             if (e->ai_family == AF_INET6) {
980                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
981                 if (ret) {
982                     continue;
983                 }
984             }
985             goto route;
986         }
987     }
988 
989     ERROR(errp, "could not resolve address %s", rdma->host);
990     goto err_resolve_get_addr;
991 
992 route:
993     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
994 
995     ret = rdma_get_cm_event(rdma->channel, &cm_event);
996     if (ret) {
997         ERROR(errp, "could not perform event_addr_resolved");
998         goto err_resolve_get_addr;
999     }
1000 
1001     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
1002         ERROR(errp, "result not equal to event_addr_resolved %s",
1003                 rdma_event_str(cm_event->event));
1004         perror("rdma_resolve_addr");
1005         rdma_ack_cm_event(cm_event);
1006         ret = -EINVAL;
1007         goto err_resolve_get_addr;
1008     }
1009     rdma_ack_cm_event(cm_event);
1010 
1011     /* resolve route */
1012     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1013     if (ret) {
1014         ERROR(errp, "could not resolve rdma route");
1015         goto err_resolve_get_addr;
1016     }
1017 
1018     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1019     if (ret) {
1020         ERROR(errp, "could not perform event_route_resolved");
1021         goto err_resolve_get_addr;
1022     }
1023     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1024         ERROR(errp, "result not equal to event_route_resolved: %s",
1025                         rdma_event_str(cm_event->event));
1026         rdma_ack_cm_event(cm_event);
1027         ret = -EINVAL;
1028         goto err_resolve_get_addr;
1029     }
1030     rdma_ack_cm_event(cm_event);
1031     rdma->verbs = rdma->cm_id->verbs;
1032     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1033     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1034     return 0;
1035 
1036 err_resolve_get_addr:
1037     rdma_destroy_id(rdma->cm_id);
1038     rdma->cm_id = NULL;
1039 err_resolve_create_id:
1040     rdma_destroy_event_channel(rdma->channel);
1041     rdma->channel = NULL;
1042     return ret;
1043 }
1044 
1045 /*
1046  * Create protection domain and completion queues
1047  */
1048 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1049 {
1050     /* allocate pd */
1051     rdma->pd = ibv_alloc_pd(rdma->verbs);
1052     if (!rdma->pd) {
1053         error_report("failed to allocate protection domain");
1054         return -1;
1055     }
1056 
1057     /* create completion channel */
1058     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1059     if (!rdma->comp_channel) {
1060         error_report("failed to allocate completion channel");
1061         goto err_alloc_pd_cq;
1062     }
1063 
1064     /*
1065      * Completion queue can be filled by both read and write work requests,
1066      * so must reflect the sum of both possible queue sizes.
1067      */
1068     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1069             NULL, rdma->comp_channel, 0);
1070     if (!rdma->cq) {
1071         error_report("failed to allocate completion queue");
1072         goto err_alloc_pd_cq;
1073     }
1074 
1075     return 0;
1076 
1077 err_alloc_pd_cq:
1078     if (rdma->pd) {
1079         ibv_dealloc_pd(rdma->pd);
1080     }
1081     if (rdma->comp_channel) {
1082         ibv_destroy_comp_channel(rdma->comp_channel);
1083     }
1084     rdma->pd = NULL;
1085     rdma->comp_channel = NULL;
1086     return -1;
1087 
1088 }
1089 
1090 /*
1091  * Create queue pairs.
1092  */
1093 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1094 {
1095     struct ibv_qp_init_attr attr = { 0 };
1096     int ret;
1097 
1098     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1099     attr.cap.max_recv_wr = 3;
1100     attr.cap.max_send_sge = 1;
1101     attr.cap.max_recv_sge = 1;
1102     attr.send_cq = rdma->cq;
1103     attr.recv_cq = rdma->cq;
1104     attr.qp_type = IBV_QPT_RC;
1105 
1106     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1107     if (ret) {
1108         return -1;
1109     }
1110 
1111     rdma->qp = rdma->cm_id->qp;
1112     return 0;
1113 }
1114 
1115 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1116 {
1117     int i;
1118     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1119 
1120     for (i = 0; i < local->nb_blocks; i++) {
1121         local->block[i].mr =
1122             ibv_reg_mr(rdma->pd,
1123                     local->block[i].local_host_addr,
1124                     local->block[i].length,
1125                     IBV_ACCESS_LOCAL_WRITE |
1126                     IBV_ACCESS_REMOTE_WRITE
1127                     );
1128         if (!local->block[i].mr) {
1129             perror("Failed to register local dest ram block!\n");
1130             break;
1131         }
1132         rdma->total_registrations++;
1133     }
1134 
1135     if (i >= local->nb_blocks) {
1136         return 0;
1137     }
1138 
1139     for (i--; i >= 0; i--) {
1140         ibv_dereg_mr(local->block[i].mr);
1141         rdma->total_registrations--;
1142     }
1143 
1144     return -1;
1145 
1146 }
1147 
1148 /*
1149  * Find the ram block that corresponds to the page requested to be
1150  * transmitted by QEMU.
1151  *
1152  * Once the block is found, also identify which 'chunk' within that
1153  * block that the page belongs to.
1154  *
1155  * This search cannot fail or the migration will fail.
1156  */
1157 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1158                                       uintptr_t block_offset,
1159                                       uint64_t offset,
1160                                       uint64_t length,
1161                                       uint64_t *block_index,
1162                                       uint64_t *chunk_index)
1163 {
1164     uint64_t current_addr = block_offset + offset;
1165     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1166                                                 (void *) block_offset);
1167     assert(block);
1168     assert(current_addr >= block->offset);
1169     assert((current_addr + length) <= (block->offset + block->length));
1170 
1171     *block_index = block->index;
1172     *chunk_index = ram_chunk_index(block->local_host_addr,
1173                 block->local_host_addr + (current_addr - block->offset));
1174 
1175     return 0;
1176 }
1177 
1178 /*
1179  * Register a chunk with IB. If the chunk was already registered
1180  * previously, then skip.
1181  *
1182  * Also return the keys associated with the registration needed
1183  * to perform the actual RDMA operation.
1184  */
1185 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1186         RDMALocalBlock *block, uintptr_t host_addr,
1187         uint32_t *lkey, uint32_t *rkey, int chunk,
1188         uint8_t *chunk_start, uint8_t *chunk_end)
1189 {
1190     if (block->mr) {
1191         if (lkey) {
1192             *lkey = block->mr->lkey;
1193         }
1194         if (rkey) {
1195             *rkey = block->mr->rkey;
1196         }
1197         return 0;
1198     }
1199 
1200     /* allocate memory to store chunk MRs */
1201     if (!block->pmr) {
1202         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1203     }
1204 
1205     /*
1206      * If 'rkey', then we're the destination, so grant access to the source.
1207      *
1208      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1209      */
1210     if (!block->pmr[chunk]) {
1211         uint64_t len = chunk_end - chunk_start;
1212 
1213         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1214 
1215         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1216                 chunk_start, len,
1217                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1218                         IBV_ACCESS_REMOTE_WRITE) : 0));
1219 
1220         if (!block->pmr[chunk]) {
1221             perror("Failed to register chunk!");
1222             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1223                             " start %" PRIuPTR " end %" PRIuPTR
1224                             " host %" PRIuPTR
1225                             " local %" PRIuPTR " registrations: %d\n",
1226                             block->index, chunk, (uintptr_t)chunk_start,
1227                             (uintptr_t)chunk_end, host_addr,
1228                             (uintptr_t)block->local_host_addr,
1229                             rdma->total_registrations);
1230             return -1;
1231         }
1232         rdma->total_registrations++;
1233     }
1234 
1235     if (lkey) {
1236         *lkey = block->pmr[chunk]->lkey;
1237     }
1238     if (rkey) {
1239         *rkey = block->pmr[chunk]->rkey;
1240     }
1241     return 0;
1242 }
1243 
1244 /*
1245  * Register (at connection time) the memory used for control
1246  * channel messages.
1247  */
1248 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1249 {
1250     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1251             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1252             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1253     if (rdma->wr_data[idx].control_mr) {
1254         rdma->total_registrations++;
1255         return 0;
1256     }
1257     error_report("qemu_rdma_reg_control failed");
1258     return -1;
1259 }
1260 
1261 const char *print_wrid(int wrid)
1262 {
1263     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1264         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1265     }
1266     return wrid_desc[wrid];
1267 }
1268 
1269 /*
1270  * RDMA requires memory registration (mlock/pinning), but this is not good for
1271  * overcommitment.
1272  *
1273  * In preparation for the future where LRU information or workload-specific
1274  * writable writable working set memory access behavior is available to QEMU
1275  * it would be nice to have in place the ability to UN-register/UN-pin
1276  * particular memory regions from the RDMA hardware when it is determine that
1277  * those regions of memory will likely not be accessed again in the near future.
1278  *
1279  * While we do not yet have such information right now, the following
1280  * compile-time option allows us to perform a non-optimized version of this
1281  * behavior.
1282  *
1283  * By uncommenting this option, you will cause *all* RDMA transfers to be
1284  * unregistered immediately after the transfer completes on both sides of the
1285  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1286  *
1287  * This will have a terrible impact on migration performance, so until future
1288  * workload information or LRU information is available, do not attempt to use
1289  * this feature except for basic testing.
1290  */
1291 //#define RDMA_UNREGISTRATION_EXAMPLE
1292 
1293 /*
1294  * Perform a non-optimized memory unregistration after every transfer
1295  * for demonstration purposes, only if pin-all is not requested.
1296  *
1297  * Potential optimizations:
1298  * 1. Start a new thread to run this function continuously
1299         - for bit clearing
1300         - and for receipt of unregister messages
1301  * 2. Use an LRU.
1302  * 3. Use workload hints.
1303  */
1304 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1305 {
1306     while (rdma->unregistrations[rdma->unregister_current]) {
1307         int ret;
1308         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1309         uint64_t chunk =
1310             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1311         uint64_t index =
1312             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1313         RDMALocalBlock *block =
1314             &(rdma->local_ram_blocks.block[index]);
1315         RDMARegister reg = { .current_index = index };
1316         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1317                                  };
1318         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1319                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1320                                    .repeat = 1,
1321                                  };
1322 
1323         trace_qemu_rdma_unregister_waiting_proc(chunk,
1324                                                 rdma->unregister_current);
1325 
1326         rdma->unregistrations[rdma->unregister_current] = 0;
1327         rdma->unregister_current++;
1328 
1329         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1330             rdma->unregister_current = 0;
1331         }
1332 
1333 
1334         /*
1335          * Unregistration is speculative (because migration is single-threaded
1336          * and we cannot break the protocol's inifinband message ordering).
1337          * Thus, if the memory is currently being used for transmission,
1338          * then abort the attempt to unregister and try again
1339          * later the next time a completion is received for this memory.
1340          */
1341         clear_bit(chunk, block->unregister_bitmap);
1342 
1343         if (test_bit(chunk, block->transit_bitmap)) {
1344             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1345             continue;
1346         }
1347 
1348         trace_qemu_rdma_unregister_waiting_send(chunk);
1349 
1350         ret = ibv_dereg_mr(block->pmr[chunk]);
1351         block->pmr[chunk] = NULL;
1352         block->remote_keys[chunk] = 0;
1353 
1354         if (ret != 0) {
1355             perror("unregistration chunk failed");
1356             return -ret;
1357         }
1358         rdma->total_registrations--;
1359 
1360         reg.key.chunk = chunk;
1361         register_to_network(rdma, &reg);
1362         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1363                                 &resp, NULL, NULL);
1364         if (ret < 0) {
1365             return ret;
1366         }
1367 
1368         trace_qemu_rdma_unregister_waiting_complete(chunk);
1369     }
1370 
1371     return 0;
1372 }
1373 
1374 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1375                                          uint64_t chunk)
1376 {
1377     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1378 
1379     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1380     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1381 
1382     return result;
1383 }
1384 
1385 /*
1386  * Set bit for unregistration in the next iteration.
1387  * We cannot transmit right here, but will unpin later.
1388  */
1389 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1390                                         uint64_t chunk, uint64_t wr_id)
1391 {
1392     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1393         error_report("rdma migration: queue is full");
1394     } else {
1395         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1396 
1397         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1398             trace_qemu_rdma_signal_unregister_append(chunk,
1399                                                      rdma->unregister_next);
1400 
1401             rdma->unregistrations[rdma->unregister_next++] =
1402                     qemu_rdma_make_wrid(wr_id, index, chunk);
1403 
1404             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1405                 rdma->unregister_next = 0;
1406             }
1407         } else {
1408             trace_qemu_rdma_signal_unregister_already(chunk);
1409         }
1410     }
1411 }
1412 
1413 /*
1414  * Consult the connection manager to see a work request
1415  * (of any kind) has completed.
1416  * Return the work request ID that completed.
1417  */
1418 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1419                                uint32_t *byte_len)
1420 {
1421     int ret;
1422     struct ibv_wc wc;
1423     uint64_t wr_id;
1424 
1425     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1426 
1427     if (!ret) {
1428         *wr_id_out = RDMA_WRID_NONE;
1429         return 0;
1430     }
1431 
1432     if (ret < 0) {
1433         error_report("ibv_poll_cq return %d", ret);
1434         return ret;
1435     }
1436 
1437     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1438 
1439     if (wc.status != IBV_WC_SUCCESS) {
1440         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1441                         wc.status, ibv_wc_status_str(wc.status));
1442         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1443 
1444         return -1;
1445     }
1446 
1447     if (rdma->control_ready_expected &&
1448         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1449         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1450                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1451         rdma->control_ready_expected = 0;
1452     }
1453 
1454     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1455         uint64_t chunk =
1456             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1457         uint64_t index =
1458             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1459         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1460 
1461         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1462                                    index, chunk, block->local_host_addr,
1463                                    (void *)(uintptr_t)block->remote_host_addr);
1464 
1465         clear_bit(chunk, block->transit_bitmap);
1466 
1467         if (rdma->nb_sent > 0) {
1468             rdma->nb_sent--;
1469         }
1470 
1471         if (!rdma->pin_all) {
1472             /*
1473              * FYI: If one wanted to signal a specific chunk to be unregistered
1474              * using LRU or workload-specific information, this is the function
1475              * you would call to do so. That chunk would then get asynchronously
1476              * unregistered later.
1477              */
1478 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1479             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1480 #endif
1481         }
1482     } else {
1483         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1484     }
1485 
1486     *wr_id_out = wc.wr_id;
1487     if (byte_len) {
1488         *byte_len = wc.byte_len;
1489     }
1490 
1491     return  0;
1492 }
1493 
1494 /* Wait for activity on the completion channel.
1495  * Returns 0 on success, none-0 on error.
1496  */
1497 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
1498 {
1499     struct rdma_cm_event *cm_event;
1500     int ret = -1;
1501 
1502     /*
1503      * Coroutine doesn't start until migration_fd_process_incoming()
1504      * so don't yield unless we know we're running inside of a coroutine.
1505      */
1506     if (rdma->migration_started_on_destination &&
1507         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1508         yield_until_fd_readable(rdma->comp_channel->fd);
1509     } else {
1510         /* This is the source side, we're in a separate thread
1511          * or destination prior to migration_fd_process_incoming()
1512          * after postcopy, the destination also in a seprate thread.
1513          * we can't yield; so we have to poll the fd.
1514          * But we need to be able to handle 'cancel' or an error
1515          * without hanging forever.
1516          */
1517         while (!rdma->error_state  && !rdma->received_error) {
1518             GPollFD pfds[2];
1519             pfds[0].fd = rdma->comp_channel->fd;
1520             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1521             pfds[0].revents = 0;
1522 
1523             pfds[1].fd = rdma->channel->fd;
1524             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1525             pfds[1].revents = 0;
1526 
1527             /* 0.1s timeout, should be fine for a 'cancel' */
1528             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1529             case 2:
1530             case 1: /* fd active */
1531                 if (pfds[0].revents) {
1532                     return 0;
1533                 }
1534 
1535                 if (pfds[1].revents) {
1536                     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1537                     if (!ret) {
1538                         rdma_ack_cm_event(cm_event);
1539                     }
1540 
1541                     error_report("receive cm event while wait comp channel,"
1542                                  "cm event is %d", cm_event->event);
1543                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1544                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1545                         return -EPIPE;
1546                     }
1547                 }
1548                 break;
1549 
1550             case 0: /* Timeout, go around again */
1551                 break;
1552 
1553             default: /* Error of some type -
1554                       * I don't trust errno from qemu_poll_ns
1555                      */
1556                 error_report("%s: poll failed", __func__);
1557                 return -EPIPE;
1558             }
1559 
1560             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1561                 /* Bail out and let the cancellation happen */
1562                 return -EPIPE;
1563             }
1564         }
1565     }
1566 
1567     if (rdma->received_error) {
1568         return -EPIPE;
1569     }
1570     return rdma->error_state;
1571 }
1572 
1573 /*
1574  * Block until the next work request has completed.
1575  *
1576  * First poll to see if a work request has already completed,
1577  * otherwise block.
1578  *
1579  * If we encounter completed work requests for IDs other than
1580  * the one we're interested in, then that's generally an error.
1581  *
1582  * The only exception is actual RDMA Write completions. These
1583  * completions only need to be recorded, but do not actually
1584  * need further processing.
1585  */
1586 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1587                                     uint32_t *byte_len)
1588 {
1589     int num_cq_events = 0, ret = 0;
1590     struct ibv_cq *cq;
1591     void *cq_ctx;
1592     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1593 
1594     if (ibv_req_notify_cq(rdma->cq, 0)) {
1595         return -1;
1596     }
1597     /* poll cq first */
1598     while (wr_id != wrid_requested) {
1599         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1600         if (ret < 0) {
1601             return ret;
1602         }
1603 
1604         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1605 
1606         if (wr_id == RDMA_WRID_NONE) {
1607             break;
1608         }
1609         if (wr_id != wrid_requested) {
1610             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1611                        wrid_requested, print_wrid(wr_id), wr_id);
1612         }
1613     }
1614 
1615     if (wr_id == wrid_requested) {
1616         return 0;
1617     }
1618 
1619     while (1) {
1620         ret = qemu_rdma_wait_comp_channel(rdma);
1621         if (ret) {
1622             goto err_block_for_wrid;
1623         }
1624 
1625         ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
1626         if (ret) {
1627             perror("ibv_get_cq_event");
1628             goto err_block_for_wrid;
1629         }
1630 
1631         num_cq_events++;
1632 
1633         ret = -ibv_req_notify_cq(cq, 0);
1634         if (ret) {
1635             goto err_block_for_wrid;
1636         }
1637 
1638         while (wr_id != wrid_requested) {
1639             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1640             if (ret < 0) {
1641                 goto err_block_for_wrid;
1642             }
1643 
1644             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1645 
1646             if (wr_id == RDMA_WRID_NONE) {
1647                 break;
1648             }
1649             if (wr_id != wrid_requested) {
1650                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1651                                    wrid_requested, print_wrid(wr_id), wr_id);
1652             }
1653         }
1654 
1655         if (wr_id == wrid_requested) {
1656             goto success_block_for_wrid;
1657         }
1658     }
1659 
1660 success_block_for_wrid:
1661     if (num_cq_events) {
1662         ibv_ack_cq_events(cq, num_cq_events);
1663     }
1664     return 0;
1665 
1666 err_block_for_wrid:
1667     if (num_cq_events) {
1668         ibv_ack_cq_events(cq, num_cq_events);
1669     }
1670 
1671     rdma->error_state = ret;
1672     return ret;
1673 }
1674 
1675 /*
1676  * Post a SEND message work request for the control channel
1677  * containing some data and block until the post completes.
1678  */
1679 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1680                                        RDMAControlHeader *head)
1681 {
1682     int ret = 0;
1683     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1684     struct ibv_send_wr *bad_wr;
1685     struct ibv_sge sge = {
1686                            .addr = (uintptr_t)(wr->control),
1687                            .length = head->len + sizeof(RDMAControlHeader),
1688                            .lkey = wr->control_mr->lkey,
1689                          };
1690     struct ibv_send_wr send_wr = {
1691                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1692                                    .opcode = IBV_WR_SEND,
1693                                    .send_flags = IBV_SEND_SIGNALED,
1694                                    .sg_list = &sge,
1695                                    .num_sge = 1,
1696                                 };
1697 
1698     trace_qemu_rdma_post_send_control(control_desc(head->type));
1699 
1700     /*
1701      * We don't actually need to do a memcpy() in here if we used
1702      * the "sge" properly, but since we're only sending control messages
1703      * (not RAM in a performance-critical path), then its OK for now.
1704      *
1705      * The copy makes the RDMAControlHeader simpler to manipulate
1706      * for the time being.
1707      */
1708     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1709     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1710     control_to_network((void *) wr->control);
1711 
1712     if (buf) {
1713         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1714     }
1715 
1716 
1717     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1718 
1719     if (ret > 0) {
1720         error_report("Failed to use post IB SEND for control");
1721         return -ret;
1722     }
1723 
1724     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1725     if (ret < 0) {
1726         error_report("rdma migration: send polling control error");
1727     }
1728 
1729     return ret;
1730 }
1731 
1732 /*
1733  * Post a RECV work request in anticipation of some future receipt
1734  * of data on the control channel.
1735  */
1736 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1737 {
1738     struct ibv_recv_wr *bad_wr;
1739     struct ibv_sge sge = {
1740                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1741                             .length = RDMA_CONTROL_MAX_BUFFER,
1742                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1743                          };
1744 
1745     struct ibv_recv_wr recv_wr = {
1746                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1747                                     .sg_list = &sge,
1748                                     .num_sge = 1,
1749                                  };
1750 
1751 
1752     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1753         return -1;
1754     }
1755 
1756     return 0;
1757 }
1758 
1759 /*
1760  * Block and wait for a RECV control channel message to arrive.
1761  */
1762 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1763                 RDMAControlHeader *head, int expecting, int idx)
1764 {
1765     uint32_t byte_len;
1766     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1767                                        &byte_len);
1768 
1769     if (ret < 0) {
1770         error_report("rdma migration: recv polling control error!");
1771         return ret;
1772     }
1773 
1774     network_to_control((void *) rdma->wr_data[idx].control);
1775     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1776 
1777     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1778 
1779     if (expecting == RDMA_CONTROL_NONE) {
1780         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1781                                              head->type);
1782     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1783         error_report("Was expecting a %s (%d) control message"
1784                 ", but got: %s (%d), length: %d",
1785                 control_desc(expecting), expecting,
1786                 control_desc(head->type), head->type, head->len);
1787         if (head->type == RDMA_CONTROL_ERROR) {
1788             rdma->received_error = true;
1789         }
1790         return -EIO;
1791     }
1792     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1793         error_report("too long length: %d", head->len);
1794         return -EINVAL;
1795     }
1796     if (sizeof(*head) + head->len != byte_len) {
1797         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1798         return -EINVAL;
1799     }
1800 
1801     return 0;
1802 }
1803 
1804 /*
1805  * When a RECV work request has completed, the work request's
1806  * buffer is pointed at the header.
1807  *
1808  * This will advance the pointer to the data portion
1809  * of the control message of the work request's buffer that
1810  * was populated after the work request finished.
1811  */
1812 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1813                                   RDMAControlHeader *head)
1814 {
1815     rdma->wr_data[idx].control_len = head->len;
1816     rdma->wr_data[idx].control_curr =
1817         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1818 }
1819 
1820 /*
1821  * This is an 'atomic' high-level operation to deliver a single, unified
1822  * control-channel message.
1823  *
1824  * Additionally, if the user is expecting some kind of reply to this message,
1825  * they can request a 'resp' response message be filled in by posting an
1826  * additional work request on behalf of the user and waiting for an additional
1827  * completion.
1828  *
1829  * The extra (optional) response is used during registration to us from having
1830  * to perform an *additional* exchange of message just to provide a response by
1831  * instead piggy-backing on the acknowledgement.
1832  */
1833 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1834                                    uint8_t *data, RDMAControlHeader *resp,
1835                                    int *resp_idx,
1836                                    int (*callback)(RDMAContext *rdma))
1837 {
1838     int ret = 0;
1839 
1840     /*
1841      * Wait until the dest is ready before attempting to deliver the message
1842      * by waiting for a READY message.
1843      */
1844     if (rdma->control_ready_expected) {
1845         RDMAControlHeader resp;
1846         ret = qemu_rdma_exchange_get_response(rdma,
1847                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1848         if (ret < 0) {
1849             return ret;
1850         }
1851     }
1852 
1853     /*
1854      * If the user is expecting a response, post a WR in anticipation of it.
1855      */
1856     if (resp) {
1857         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1858         if (ret) {
1859             error_report("rdma migration: error posting"
1860                     " extra control recv for anticipated result!");
1861             return ret;
1862         }
1863     }
1864 
1865     /*
1866      * Post a WR to replace the one we just consumed for the READY message.
1867      */
1868     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1869     if (ret) {
1870         error_report("rdma migration: error posting first control recv!");
1871         return ret;
1872     }
1873 
1874     /*
1875      * Deliver the control message that was requested.
1876      */
1877     ret = qemu_rdma_post_send_control(rdma, data, head);
1878 
1879     if (ret < 0) {
1880         error_report("Failed to send control buffer!");
1881         return ret;
1882     }
1883 
1884     /*
1885      * If we're expecting a response, block and wait for it.
1886      */
1887     if (resp) {
1888         if (callback) {
1889             trace_qemu_rdma_exchange_send_issue_callback();
1890             ret = callback(rdma);
1891             if (ret < 0) {
1892                 return ret;
1893             }
1894         }
1895 
1896         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1897         ret = qemu_rdma_exchange_get_response(rdma, resp,
1898                                               resp->type, RDMA_WRID_DATA);
1899 
1900         if (ret < 0) {
1901             return ret;
1902         }
1903 
1904         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1905         if (resp_idx) {
1906             *resp_idx = RDMA_WRID_DATA;
1907         }
1908         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1909     }
1910 
1911     rdma->control_ready_expected = 1;
1912 
1913     return 0;
1914 }
1915 
1916 /*
1917  * This is an 'atomic' high-level operation to receive a single, unified
1918  * control-channel message.
1919  */
1920 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1921                                 int expecting)
1922 {
1923     RDMAControlHeader ready = {
1924                                 .len = 0,
1925                                 .type = RDMA_CONTROL_READY,
1926                                 .repeat = 1,
1927                               };
1928     int ret;
1929 
1930     /*
1931      * Inform the source that we're ready to receive a message.
1932      */
1933     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1934 
1935     if (ret < 0) {
1936         error_report("Failed to send control buffer!");
1937         return ret;
1938     }
1939 
1940     /*
1941      * Block and wait for the message.
1942      */
1943     ret = qemu_rdma_exchange_get_response(rdma, head,
1944                                           expecting, RDMA_WRID_READY);
1945 
1946     if (ret < 0) {
1947         return ret;
1948     }
1949 
1950     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1951 
1952     /*
1953      * Post a new RECV work request to replace the one we just consumed.
1954      */
1955     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1956     if (ret) {
1957         error_report("rdma migration: error posting second control recv!");
1958         return ret;
1959     }
1960 
1961     return 0;
1962 }
1963 
1964 /*
1965  * Write an actual chunk of memory using RDMA.
1966  *
1967  * If we're using dynamic registration on the dest-side, we have to
1968  * send a registration command first.
1969  */
1970 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1971                                int current_index, uint64_t current_addr,
1972                                uint64_t length)
1973 {
1974     struct ibv_sge sge;
1975     struct ibv_send_wr send_wr = { 0 };
1976     struct ibv_send_wr *bad_wr;
1977     int reg_result_idx, ret, count = 0;
1978     uint64_t chunk, chunks;
1979     uint8_t *chunk_start, *chunk_end;
1980     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1981     RDMARegister reg;
1982     RDMARegisterResult *reg_result;
1983     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1984     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1985                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1986                                .repeat = 1,
1987                              };
1988 
1989 retry:
1990     sge.addr = (uintptr_t)(block->local_host_addr +
1991                             (current_addr - block->offset));
1992     sge.length = length;
1993 
1994     chunk = ram_chunk_index(block->local_host_addr,
1995                             (uint8_t *)(uintptr_t)sge.addr);
1996     chunk_start = ram_chunk_start(block, chunk);
1997 
1998     if (block->is_ram_block) {
1999         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2000 
2001         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2002             chunks--;
2003         }
2004     } else {
2005         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2006 
2007         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2008             chunks--;
2009         }
2010     }
2011 
2012     trace_qemu_rdma_write_one_top(chunks + 1,
2013                                   (chunks + 1) *
2014                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2015 
2016     chunk_end = ram_chunk_end(block, chunk + chunks);
2017 
2018     if (!rdma->pin_all) {
2019 #ifdef RDMA_UNREGISTRATION_EXAMPLE
2020         qemu_rdma_unregister_waiting(rdma);
2021 #endif
2022     }
2023 
2024     while (test_bit(chunk, block->transit_bitmap)) {
2025         (void)count;
2026         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2027                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2028 
2029         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2030 
2031         if (ret < 0) {
2032             error_report("Failed to Wait for previous write to complete "
2033                     "block %d chunk %" PRIu64
2034                     " current %" PRIu64 " len %" PRIu64 " %d",
2035                     current_index, chunk, sge.addr, length, rdma->nb_sent);
2036             return ret;
2037         }
2038     }
2039 
2040     if (!rdma->pin_all || !block->is_ram_block) {
2041         if (!block->remote_keys[chunk]) {
2042             /*
2043              * This chunk has not yet been registered, so first check to see
2044              * if the entire chunk is zero. If so, tell the other size to
2045              * memset() + madvise() the entire chunk without RDMA.
2046              */
2047 
2048             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2049                 RDMACompress comp = {
2050                                         .offset = current_addr,
2051                                         .value = 0,
2052                                         .block_idx = current_index,
2053                                         .length = length,
2054                                     };
2055 
2056                 head.len = sizeof(comp);
2057                 head.type = RDMA_CONTROL_COMPRESS;
2058 
2059                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2060                                                current_index, current_addr);
2061 
2062                 compress_to_network(rdma, &comp);
2063                 ret = qemu_rdma_exchange_send(rdma, &head,
2064                                 (uint8_t *) &comp, NULL, NULL, NULL);
2065 
2066                 if (ret < 0) {
2067                     return -EIO;
2068                 }
2069 
2070                 acct_update_position(f, sge.length, true);
2071 
2072                 return 1;
2073             }
2074 
2075             /*
2076              * Otherwise, tell other side to register.
2077              */
2078             reg.current_index = current_index;
2079             if (block->is_ram_block) {
2080                 reg.key.current_addr = current_addr;
2081             } else {
2082                 reg.key.chunk = chunk;
2083             }
2084             reg.chunks = chunks;
2085 
2086             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2087                                               current_addr);
2088 
2089             register_to_network(rdma, &reg);
2090             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2091                                     &resp, &reg_result_idx, NULL);
2092             if (ret < 0) {
2093                 return ret;
2094             }
2095 
2096             /* try to overlap this single registration with the one we sent. */
2097             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2098                                                 &sge.lkey, NULL, chunk,
2099                                                 chunk_start, chunk_end)) {
2100                 error_report("cannot get lkey");
2101                 return -EINVAL;
2102             }
2103 
2104             reg_result = (RDMARegisterResult *)
2105                     rdma->wr_data[reg_result_idx].control_curr;
2106 
2107             network_to_result(reg_result);
2108 
2109             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2110                                                  reg_result->rkey, chunk);
2111 
2112             block->remote_keys[chunk] = reg_result->rkey;
2113             block->remote_host_addr = reg_result->host_addr;
2114         } else {
2115             /* already registered before */
2116             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2117                                                 &sge.lkey, NULL, chunk,
2118                                                 chunk_start, chunk_end)) {
2119                 error_report("cannot get lkey!");
2120                 return -EINVAL;
2121             }
2122         }
2123 
2124         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2125     } else {
2126         send_wr.wr.rdma.rkey = block->remote_rkey;
2127 
2128         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2129                                                      &sge.lkey, NULL, chunk,
2130                                                      chunk_start, chunk_end)) {
2131             error_report("cannot get lkey!");
2132             return -EINVAL;
2133         }
2134     }
2135 
2136     /*
2137      * Encode the ram block index and chunk within this wrid.
2138      * We will use this information at the time of completion
2139      * to figure out which bitmap to check against and then which
2140      * chunk in the bitmap to look for.
2141      */
2142     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2143                                         current_index, chunk);
2144 
2145     send_wr.opcode = IBV_WR_RDMA_WRITE;
2146     send_wr.send_flags = IBV_SEND_SIGNALED;
2147     send_wr.sg_list = &sge;
2148     send_wr.num_sge = 1;
2149     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2150                                 (current_addr - block->offset);
2151 
2152     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2153                                    sge.length);
2154 
2155     /*
2156      * ibv_post_send() does not return negative error numbers,
2157      * per the specification they are positive - no idea why.
2158      */
2159     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2160 
2161     if (ret == ENOMEM) {
2162         trace_qemu_rdma_write_one_queue_full();
2163         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2164         if (ret < 0) {
2165             error_report("rdma migration: failed to make "
2166                          "room in full send queue! %d", ret);
2167             return ret;
2168         }
2169 
2170         goto retry;
2171 
2172     } else if (ret > 0) {
2173         perror("rdma migration: post rdma write failed");
2174         return -ret;
2175     }
2176 
2177     set_bit(chunk, block->transit_bitmap);
2178     acct_update_position(f, sge.length, false);
2179     rdma->total_writes++;
2180 
2181     return 0;
2182 }
2183 
2184 /*
2185  * Push out any unwritten RDMA operations.
2186  *
2187  * We support sending out multiple chunks at the same time.
2188  * Not all of them need to get signaled in the completion queue.
2189  */
2190 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2191 {
2192     int ret;
2193 
2194     if (!rdma->current_length) {
2195         return 0;
2196     }
2197 
2198     ret = qemu_rdma_write_one(f, rdma,
2199             rdma->current_index, rdma->current_addr, rdma->current_length);
2200 
2201     if (ret < 0) {
2202         return ret;
2203     }
2204 
2205     if (ret == 0) {
2206         rdma->nb_sent++;
2207         trace_qemu_rdma_write_flush(rdma->nb_sent);
2208     }
2209 
2210     rdma->current_length = 0;
2211     rdma->current_addr = 0;
2212 
2213     return 0;
2214 }
2215 
2216 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2217                     uint64_t offset, uint64_t len)
2218 {
2219     RDMALocalBlock *block;
2220     uint8_t *host_addr;
2221     uint8_t *chunk_end;
2222 
2223     if (rdma->current_index < 0) {
2224         return 0;
2225     }
2226 
2227     if (rdma->current_chunk < 0) {
2228         return 0;
2229     }
2230 
2231     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2232     host_addr = block->local_host_addr + (offset - block->offset);
2233     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2234 
2235     if (rdma->current_length == 0) {
2236         return 0;
2237     }
2238 
2239     /*
2240      * Only merge into chunk sequentially.
2241      */
2242     if (offset != (rdma->current_addr + rdma->current_length)) {
2243         return 0;
2244     }
2245 
2246     if (offset < block->offset) {
2247         return 0;
2248     }
2249 
2250     if ((offset + len) > (block->offset + block->length)) {
2251         return 0;
2252     }
2253 
2254     if ((host_addr + len) > chunk_end) {
2255         return 0;
2256     }
2257 
2258     return 1;
2259 }
2260 
2261 /*
2262  * We're not actually writing here, but doing three things:
2263  *
2264  * 1. Identify the chunk the buffer belongs to.
2265  * 2. If the chunk is full or the buffer doesn't belong to the current
2266  *    chunk, then start a new chunk and flush() the old chunk.
2267  * 3. To keep the hardware busy, we also group chunks into batches
2268  *    and only require that a batch gets acknowledged in the completion
2269  *    qeueue instead of each individual chunk.
2270  */
2271 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2272                            uint64_t block_offset, uint64_t offset,
2273                            uint64_t len)
2274 {
2275     uint64_t current_addr = block_offset + offset;
2276     uint64_t index = rdma->current_index;
2277     uint64_t chunk = rdma->current_chunk;
2278     int ret;
2279 
2280     /* If we cannot merge it, we flush the current buffer first. */
2281     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2282         ret = qemu_rdma_write_flush(f, rdma);
2283         if (ret) {
2284             return ret;
2285         }
2286         rdma->current_length = 0;
2287         rdma->current_addr = current_addr;
2288 
2289         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2290                                          offset, len, &index, &chunk);
2291         if (ret) {
2292             error_report("ram block search failed");
2293             return ret;
2294         }
2295         rdma->current_index = index;
2296         rdma->current_chunk = chunk;
2297     }
2298 
2299     /* merge it */
2300     rdma->current_length += len;
2301 
2302     /* flush it if buffer is too large */
2303     if (rdma->current_length >= RDMA_MERGE_MAX) {
2304         return qemu_rdma_write_flush(f, rdma);
2305     }
2306 
2307     return 0;
2308 }
2309 
2310 static void qemu_rdma_cleanup(RDMAContext *rdma)
2311 {
2312     int idx;
2313 
2314     if (rdma->cm_id && rdma->connected) {
2315         if ((rdma->error_state ||
2316              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2317             !rdma->received_error) {
2318             RDMAControlHeader head = { .len = 0,
2319                                        .type = RDMA_CONTROL_ERROR,
2320                                        .repeat = 1,
2321                                      };
2322             error_report("Early error. Sending error.");
2323             qemu_rdma_post_send_control(rdma, NULL, &head);
2324         }
2325 
2326         rdma_disconnect(rdma->cm_id);
2327         trace_qemu_rdma_cleanup_disconnect();
2328         rdma->connected = false;
2329     }
2330 
2331     if (rdma->channel) {
2332         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2333     }
2334     g_free(rdma->dest_blocks);
2335     rdma->dest_blocks = NULL;
2336 
2337     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2338         if (rdma->wr_data[idx].control_mr) {
2339             rdma->total_registrations--;
2340             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2341         }
2342         rdma->wr_data[idx].control_mr = NULL;
2343     }
2344 
2345     if (rdma->local_ram_blocks.block) {
2346         while (rdma->local_ram_blocks.nb_blocks) {
2347             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2348         }
2349     }
2350 
2351     if (rdma->qp) {
2352         rdma_destroy_qp(rdma->cm_id);
2353         rdma->qp = NULL;
2354     }
2355     if (rdma->cq) {
2356         ibv_destroy_cq(rdma->cq);
2357         rdma->cq = NULL;
2358     }
2359     if (rdma->comp_channel) {
2360         ibv_destroy_comp_channel(rdma->comp_channel);
2361         rdma->comp_channel = NULL;
2362     }
2363     if (rdma->pd) {
2364         ibv_dealloc_pd(rdma->pd);
2365         rdma->pd = NULL;
2366     }
2367     if (rdma->cm_id) {
2368         rdma_destroy_id(rdma->cm_id);
2369         rdma->cm_id = NULL;
2370     }
2371 
2372     /* the destination side, listen_id and channel is shared */
2373     if (rdma->listen_id) {
2374         if (!rdma->is_return_path) {
2375             rdma_destroy_id(rdma->listen_id);
2376         }
2377         rdma->listen_id = NULL;
2378 
2379         if (rdma->channel) {
2380             if (!rdma->is_return_path) {
2381                 rdma_destroy_event_channel(rdma->channel);
2382             }
2383             rdma->channel = NULL;
2384         }
2385     }
2386 
2387     if (rdma->channel) {
2388         rdma_destroy_event_channel(rdma->channel);
2389         rdma->channel = NULL;
2390     }
2391     g_free(rdma->host);
2392     rdma->host = NULL;
2393 }
2394 
2395 
2396 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2397 {
2398     int ret, idx;
2399     Error *local_err = NULL, **temp = &local_err;
2400 
2401     /*
2402      * Will be validated against destination's actual capabilities
2403      * after the connect() completes.
2404      */
2405     rdma->pin_all = pin_all;
2406 
2407     ret = qemu_rdma_resolve_host(rdma, temp);
2408     if (ret) {
2409         goto err_rdma_source_init;
2410     }
2411 
2412     ret = qemu_rdma_alloc_pd_cq(rdma);
2413     if (ret) {
2414         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2415                     " limits may be too low. Please check $ ulimit -a # and "
2416                     "search for 'ulimit -l' in the output");
2417         goto err_rdma_source_init;
2418     }
2419 
2420     ret = qemu_rdma_alloc_qp(rdma);
2421     if (ret) {
2422         ERROR(temp, "rdma migration: error allocating qp!");
2423         goto err_rdma_source_init;
2424     }
2425 
2426     ret = qemu_rdma_init_ram_blocks(rdma);
2427     if (ret) {
2428         ERROR(temp, "rdma migration: error initializing ram blocks!");
2429         goto err_rdma_source_init;
2430     }
2431 
2432     /* Build the hash that maps from offset to RAMBlock */
2433     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2434     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2435         g_hash_table_insert(rdma->blockmap,
2436                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2437                 &rdma->local_ram_blocks.block[idx]);
2438     }
2439 
2440     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2441         ret = qemu_rdma_reg_control(rdma, idx);
2442         if (ret) {
2443             ERROR(temp, "rdma migration: error registering %d control!",
2444                                                             idx);
2445             goto err_rdma_source_init;
2446         }
2447     }
2448 
2449     return 0;
2450 
2451 err_rdma_source_init:
2452     error_propagate(errp, local_err);
2453     qemu_rdma_cleanup(rdma);
2454     return -1;
2455 }
2456 
2457 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2458 {
2459     RDMACapabilities cap = {
2460                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2461                                 .flags = 0,
2462                            };
2463     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2464                                           .retry_count = 5,
2465                                           .private_data = &cap,
2466                                           .private_data_len = sizeof(cap),
2467                                         };
2468     struct rdma_cm_event *cm_event;
2469     int ret;
2470 
2471     /*
2472      * Only negotiate the capability with destination if the user
2473      * on the source first requested the capability.
2474      */
2475     if (rdma->pin_all) {
2476         trace_qemu_rdma_connect_pin_all_requested();
2477         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2478     }
2479 
2480     caps_to_network(&cap);
2481 
2482     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2483     if (ret) {
2484         ERROR(errp, "posting second control recv");
2485         goto err_rdma_source_connect;
2486     }
2487 
2488     ret = rdma_connect(rdma->cm_id, &conn_param);
2489     if (ret) {
2490         perror("rdma_connect");
2491         ERROR(errp, "connecting to destination!");
2492         goto err_rdma_source_connect;
2493     }
2494 
2495     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2496     if (ret) {
2497         perror("rdma_get_cm_event after rdma_connect");
2498         ERROR(errp, "connecting to destination!");
2499         rdma_ack_cm_event(cm_event);
2500         goto err_rdma_source_connect;
2501     }
2502 
2503     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2504         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2505         ERROR(errp, "connecting to destination!");
2506         rdma_ack_cm_event(cm_event);
2507         goto err_rdma_source_connect;
2508     }
2509     rdma->connected = true;
2510 
2511     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2512     network_to_caps(&cap);
2513 
2514     /*
2515      * Verify that the *requested* capabilities are supported by the destination
2516      * and disable them otherwise.
2517      */
2518     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2519         ERROR(errp, "Server cannot support pinning all memory. "
2520                         "Will register memory dynamically.");
2521         rdma->pin_all = false;
2522     }
2523 
2524     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2525 
2526     rdma_ack_cm_event(cm_event);
2527 
2528     rdma->control_ready_expected = 1;
2529     rdma->nb_sent = 0;
2530     return 0;
2531 
2532 err_rdma_source_connect:
2533     qemu_rdma_cleanup(rdma);
2534     return -1;
2535 }
2536 
2537 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2538 {
2539     int ret, idx;
2540     struct rdma_cm_id *listen_id;
2541     char ip[40] = "unknown";
2542     struct rdma_addrinfo *res, *e;
2543     char port_str[16];
2544 
2545     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2546         rdma->wr_data[idx].control_len = 0;
2547         rdma->wr_data[idx].control_curr = NULL;
2548     }
2549 
2550     if (!rdma->host || !rdma->host[0]) {
2551         ERROR(errp, "RDMA host is not set!");
2552         rdma->error_state = -EINVAL;
2553         return -1;
2554     }
2555     /* create CM channel */
2556     rdma->channel = rdma_create_event_channel();
2557     if (!rdma->channel) {
2558         ERROR(errp, "could not create rdma event channel");
2559         rdma->error_state = -EINVAL;
2560         return -1;
2561     }
2562 
2563     /* create CM id */
2564     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2565     if (ret) {
2566         ERROR(errp, "could not create cm_id!");
2567         goto err_dest_init_create_listen_id;
2568     }
2569 
2570     snprintf(port_str, 16, "%d", rdma->port);
2571     port_str[15] = '\0';
2572 
2573     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2574     if (ret < 0) {
2575         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2576         goto err_dest_init_bind_addr;
2577     }
2578 
2579     for (e = res; e != NULL; e = e->ai_next) {
2580         inet_ntop(e->ai_family,
2581             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2582         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2583         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2584         if (ret) {
2585             continue;
2586         }
2587         if (e->ai_family == AF_INET6) {
2588             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2589             if (ret) {
2590                 continue;
2591             }
2592         }
2593         break;
2594     }
2595 
2596     if (!e) {
2597         ERROR(errp, "Error: could not rdma_bind_addr!");
2598         goto err_dest_init_bind_addr;
2599     }
2600 
2601     rdma->listen_id = listen_id;
2602     qemu_rdma_dump_gid("dest_init", listen_id);
2603     return 0;
2604 
2605 err_dest_init_bind_addr:
2606     rdma_destroy_id(listen_id);
2607 err_dest_init_create_listen_id:
2608     rdma_destroy_event_channel(rdma->channel);
2609     rdma->channel = NULL;
2610     rdma->error_state = ret;
2611     return ret;
2612 
2613 }
2614 
2615 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2616                                             RDMAContext *rdma)
2617 {
2618     int idx;
2619 
2620     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2621         rdma_return_path->wr_data[idx].control_len = 0;
2622         rdma_return_path->wr_data[idx].control_curr = NULL;
2623     }
2624 
2625     /*the CM channel and CM id is shared*/
2626     rdma_return_path->channel = rdma->channel;
2627     rdma_return_path->listen_id = rdma->listen_id;
2628 
2629     rdma->return_path = rdma_return_path;
2630     rdma_return_path->return_path = rdma;
2631     rdma_return_path->is_return_path = true;
2632 }
2633 
2634 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2635 {
2636     RDMAContext *rdma = NULL;
2637     InetSocketAddress *addr;
2638 
2639     if (host_port) {
2640         rdma = g_new0(RDMAContext, 1);
2641         rdma->current_index = -1;
2642         rdma->current_chunk = -1;
2643 
2644         addr = g_new(InetSocketAddress, 1);
2645         if (!inet_parse(addr, host_port, NULL)) {
2646             rdma->port = atoi(addr->port);
2647             rdma->host = g_strdup(addr->host);
2648         } else {
2649             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2650             g_free(rdma);
2651             rdma = NULL;
2652         }
2653 
2654         qapi_free_InetSocketAddress(addr);
2655     }
2656 
2657     return rdma;
2658 }
2659 
2660 /*
2661  * QEMUFile interface to the control channel.
2662  * SEND messages for control only.
2663  * VM's ram is handled with regular RDMA messages.
2664  */
2665 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2666                                        const struct iovec *iov,
2667                                        size_t niov,
2668                                        int *fds,
2669                                        size_t nfds,
2670                                        Error **errp)
2671 {
2672     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2673     QEMUFile *f = rioc->file;
2674     RDMAContext *rdma;
2675     int ret;
2676     ssize_t done = 0;
2677     size_t i;
2678     size_t len = 0;
2679 
2680     RCU_READ_LOCK_GUARD();
2681     rdma = atomic_rcu_read(&rioc->rdmaout);
2682 
2683     if (!rdma) {
2684         return -EIO;
2685     }
2686 
2687     CHECK_ERROR_STATE();
2688 
2689     /*
2690      * Push out any writes that
2691      * we're queued up for VM's ram.
2692      */
2693     ret = qemu_rdma_write_flush(f, rdma);
2694     if (ret < 0) {
2695         rdma->error_state = ret;
2696         return ret;
2697     }
2698 
2699     for (i = 0; i < niov; i++) {
2700         size_t remaining = iov[i].iov_len;
2701         uint8_t * data = (void *)iov[i].iov_base;
2702         while (remaining) {
2703             RDMAControlHeader head;
2704 
2705             len = MIN(remaining, RDMA_SEND_INCREMENT);
2706             remaining -= len;
2707 
2708             head.len = len;
2709             head.type = RDMA_CONTROL_QEMU_FILE;
2710 
2711             ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2712 
2713             if (ret < 0) {
2714                 rdma->error_state = ret;
2715                 return ret;
2716             }
2717 
2718             data += len;
2719             done += len;
2720         }
2721     }
2722 
2723     return done;
2724 }
2725 
2726 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2727                              size_t size, int idx)
2728 {
2729     size_t len = 0;
2730 
2731     if (rdma->wr_data[idx].control_len) {
2732         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2733 
2734         len = MIN(size, rdma->wr_data[idx].control_len);
2735         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2736         rdma->wr_data[idx].control_curr += len;
2737         rdma->wr_data[idx].control_len -= len;
2738     }
2739 
2740     return len;
2741 }
2742 
2743 /*
2744  * QEMUFile interface to the control channel.
2745  * RDMA links don't use bytestreams, so we have to
2746  * return bytes to QEMUFile opportunistically.
2747  */
2748 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2749                                       const struct iovec *iov,
2750                                       size_t niov,
2751                                       int **fds,
2752                                       size_t *nfds,
2753                                       Error **errp)
2754 {
2755     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2756     RDMAContext *rdma;
2757     RDMAControlHeader head;
2758     int ret = 0;
2759     ssize_t i;
2760     size_t done = 0;
2761 
2762     RCU_READ_LOCK_GUARD();
2763     rdma = atomic_rcu_read(&rioc->rdmain);
2764 
2765     if (!rdma) {
2766         return -EIO;
2767     }
2768 
2769     CHECK_ERROR_STATE();
2770 
2771     for (i = 0; i < niov; i++) {
2772         size_t want = iov[i].iov_len;
2773         uint8_t *data = (void *)iov[i].iov_base;
2774 
2775         /*
2776          * First, we hold on to the last SEND message we
2777          * were given and dish out the bytes until we run
2778          * out of bytes.
2779          */
2780         ret = qemu_rdma_fill(rdma, data, want, 0);
2781         done += ret;
2782         want -= ret;
2783         /* Got what we needed, so go to next iovec */
2784         if (want == 0) {
2785             continue;
2786         }
2787 
2788         /* If we got any data so far, then don't wait
2789          * for more, just return what we have */
2790         if (done > 0) {
2791             break;
2792         }
2793 
2794 
2795         /* We've got nothing at all, so lets wait for
2796          * more to arrive
2797          */
2798         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2799 
2800         if (ret < 0) {
2801             rdma->error_state = ret;
2802             return ret;
2803         }
2804 
2805         /*
2806          * SEND was received with new bytes, now try again.
2807          */
2808         ret = qemu_rdma_fill(rdma, data, want, 0);
2809         done += ret;
2810         want -= ret;
2811 
2812         /* Still didn't get enough, so lets just return */
2813         if (want) {
2814             if (done == 0) {
2815                 return QIO_CHANNEL_ERR_BLOCK;
2816             } else {
2817                 break;
2818             }
2819         }
2820     }
2821     return done;
2822 }
2823 
2824 /*
2825  * Block until all the outstanding chunks have been delivered by the hardware.
2826  */
2827 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2828 {
2829     int ret;
2830 
2831     if (qemu_rdma_write_flush(f, rdma) < 0) {
2832         return -EIO;
2833     }
2834 
2835     while (rdma->nb_sent) {
2836         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2837         if (ret < 0) {
2838             error_report("rdma migration: complete polling error!");
2839             return -EIO;
2840         }
2841     }
2842 
2843     qemu_rdma_unregister_waiting(rdma);
2844 
2845     return 0;
2846 }
2847 
2848 
2849 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2850                                          bool blocking,
2851                                          Error **errp)
2852 {
2853     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2854     /* XXX we should make readv/writev actually honour this :-) */
2855     rioc->blocking = blocking;
2856     return 0;
2857 }
2858 
2859 
2860 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2861 struct QIOChannelRDMASource {
2862     GSource parent;
2863     QIOChannelRDMA *rioc;
2864     GIOCondition condition;
2865 };
2866 
2867 static gboolean
2868 qio_channel_rdma_source_prepare(GSource *source,
2869                                 gint *timeout)
2870 {
2871     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2872     RDMAContext *rdma;
2873     GIOCondition cond = 0;
2874     *timeout = -1;
2875 
2876     RCU_READ_LOCK_GUARD();
2877     if (rsource->condition == G_IO_IN) {
2878         rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2879     } else {
2880         rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2881     }
2882 
2883     if (!rdma) {
2884         error_report("RDMAContext is NULL when prepare Gsource");
2885         return FALSE;
2886     }
2887 
2888     if (rdma->wr_data[0].control_len) {
2889         cond |= G_IO_IN;
2890     }
2891     cond |= G_IO_OUT;
2892 
2893     return cond & rsource->condition;
2894 }
2895 
2896 static gboolean
2897 qio_channel_rdma_source_check(GSource *source)
2898 {
2899     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2900     RDMAContext *rdma;
2901     GIOCondition cond = 0;
2902 
2903     RCU_READ_LOCK_GUARD();
2904     if (rsource->condition == G_IO_IN) {
2905         rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2906     } else {
2907         rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2908     }
2909 
2910     if (!rdma) {
2911         error_report("RDMAContext is NULL when check Gsource");
2912         return FALSE;
2913     }
2914 
2915     if (rdma->wr_data[0].control_len) {
2916         cond |= G_IO_IN;
2917     }
2918     cond |= G_IO_OUT;
2919 
2920     return cond & rsource->condition;
2921 }
2922 
2923 static gboolean
2924 qio_channel_rdma_source_dispatch(GSource *source,
2925                                  GSourceFunc callback,
2926                                  gpointer user_data)
2927 {
2928     QIOChannelFunc func = (QIOChannelFunc)callback;
2929     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2930     RDMAContext *rdma;
2931     GIOCondition cond = 0;
2932 
2933     RCU_READ_LOCK_GUARD();
2934     if (rsource->condition == G_IO_IN) {
2935         rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2936     } else {
2937         rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2938     }
2939 
2940     if (!rdma) {
2941         error_report("RDMAContext is NULL when dispatch Gsource");
2942         return FALSE;
2943     }
2944 
2945     if (rdma->wr_data[0].control_len) {
2946         cond |= G_IO_IN;
2947     }
2948     cond |= G_IO_OUT;
2949 
2950     return (*func)(QIO_CHANNEL(rsource->rioc),
2951                    (cond & rsource->condition),
2952                    user_data);
2953 }
2954 
2955 static void
2956 qio_channel_rdma_source_finalize(GSource *source)
2957 {
2958     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2959 
2960     object_unref(OBJECT(ssource->rioc));
2961 }
2962 
2963 GSourceFuncs qio_channel_rdma_source_funcs = {
2964     qio_channel_rdma_source_prepare,
2965     qio_channel_rdma_source_check,
2966     qio_channel_rdma_source_dispatch,
2967     qio_channel_rdma_source_finalize
2968 };
2969 
2970 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2971                                               GIOCondition condition)
2972 {
2973     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2974     QIOChannelRDMASource *ssource;
2975     GSource *source;
2976 
2977     source = g_source_new(&qio_channel_rdma_source_funcs,
2978                           sizeof(QIOChannelRDMASource));
2979     ssource = (QIOChannelRDMASource *)source;
2980 
2981     ssource->rioc = rioc;
2982     object_ref(OBJECT(rioc));
2983 
2984     ssource->condition = condition;
2985 
2986     return source;
2987 }
2988 
2989 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
2990                                                   AioContext *ctx,
2991                                                   IOHandler *io_read,
2992                                                   IOHandler *io_write,
2993                                                   void *opaque)
2994 {
2995     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2996     if (io_read) {
2997         aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
2998                            false, io_read, io_write, NULL, opaque);
2999     } else {
3000         aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
3001                            false, io_read, io_write, NULL, opaque);
3002     }
3003 }
3004 
3005 struct rdma_close_rcu {
3006     struct rcu_head rcu;
3007     RDMAContext *rdmain;
3008     RDMAContext *rdmaout;
3009 };
3010 
3011 /* callback from qio_channel_rdma_close via call_rcu */
3012 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3013 {
3014     if (rcu->rdmain) {
3015         qemu_rdma_cleanup(rcu->rdmain);
3016     }
3017 
3018     if (rcu->rdmaout) {
3019         qemu_rdma_cleanup(rcu->rdmaout);
3020     }
3021 
3022     g_free(rcu->rdmain);
3023     g_free(rcu->rdmaout);
3024     g_free(rcu);
3025 }
3026 
3027 static int qio_channel_rdma_close(QIOChannel *ioc,
3028                                   Error **errp)
3029 {
3030     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3031     RDMAContext *rdmain, *rdmaout;
3032     struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3033 
3034     trace_qemu_rdma_close();
3035 
3036     rdmain = rioc->rdmain;
3037     if (rdmain) {
3038         atomic_rcu_set(&rioc->rdmain, NULL);
3039     }
3040 
3041     rdmaout = rioc->rdmaout;
3042     if (rdmaout) {
3043         atomic_rcu_set(&rioc->rdmaout, NULL);
3044     }
3045 
3046     rcu->rdmain = rdmain;
3047     rcu->rdmaout = rdmaout;
3048     call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3049 
3050     return 0;
3051 }
3052 
3053 static int
3054 qio_channel_rdma_shutdown(QIOChannel *ioc,
3055                             QIOChannelShutdown how,
3056                             Error **errp)
3057 {
3058     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3059     RDMAContext *rdmain, *rdmaout;
3060 
3061     RCU_READ_LOCK_GUARD();
3062 
3063     rdmain = atomic_rcu_read(&rioc->rdmain);
3064     rdmaout = atomic_rcu_read(&rioc->rdmain);
3065 
3066     switch (how) {
3067     case QIO_CHANNEL_SHUTDOWN_READ:
3068         if (rdmain) {
3069             rdmain->error_state = -1;
3070         }
3071         break;
3072     case QIO_CHANNEL_SHUTDOWN_WRITE:
3073         if (rdmaout) {
3074             rdmaout->error_state = -1;
3075         }
3076         break;
3077     case QIO_CHANNEL_SHUTDOWN_BOTH:
3078     default:
3079         if (rdmain) {
3080             rdmain->error_state = -1;
3081         }
3082         if (rdmaout) {
3083             rdmaout->error_state = -1;
3084         }
3085         break;
3086     }
3087 
3088     return 0;
3089 }
3090 
3091 /*
3092  * Parameters:
3093  *    @offset == 0 :
3094  *        This means that 'block_offset' is a full virtual address that does not
3095  *        belong to a RAMBlock of the virtual machine and instead
3096  *        represents a private malloc'd memory area that the caller wishes to
3097  *        transfer.
3098  *
3099  *    @offset != 0 :
3100  *        Offset is an offset to be added to block_offset and used
3101  *        to also lookup the corresponding RAMBlock.
3102  *
3103  *    @size > 0 :
3104  *        Initiate an transfer this size.
3105  *
3106  *    @size == 0 :
3107  *        A 'hint' or 'advice' that means that we wish to speculatively
3108  *        and asynchronously unregister this memory. In this case, there is no
3109  *        guarantee that the unregister will actually happen, for example,
3110  *        if the memory is being actively transmitted. Additionally, the memory
3111  *        may be re-registered at any future time if a write within the same
3112  *        chunk was requested again, even if you attempted to unregister it
3113  *        here.
3114  *
3115  *    @size < 0 : TODO, not yet supported
3116  *        Unregister the memory NOW. This means that the caller does not
3117  *        expect there to be any future RDMA transfers and we just want to clean
3118  *        things up. This is used in case the upper layer owns the memory and
3119  *        cannot wait for qemu_fclose() to occur.
3120  *
3121  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
3122  *                  sent. Usually, this will not be more than a few bytes of
3123  *                  the protocol because most transfers are sent asynchronously.
3124  */
3125 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
3126                                   ram_addr_t block_offset, ram_addr_t offset,
3127                                   size_t size, uint64_t *bytes_sent)
3128 {
3129     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3130     RDMAContext *rdma;
3131     int ret;
3132 
3133     RCU_READ_LOCK_GUARD();
3134     rdma = atomic_rcu_read(&rioc->rdmaout);
3135 
3136     if (!rdma) {
3137         return -EIO;
3138     }
3139 
3140     CHECK_ERROR_STATE();
3141 
3142     if (migration_in_postcopy()) {
3143         return RAM_SAVE_CONTROL_NOT_SUPP;
3144     }
3145 
3146     qemu_fflush(f);
3147 
3148     if (size > 0) {
3149         /*
3150          * Add this page to the current 'chunk'. If the chunk
3151          * is full, or the page doen't belong to the current chunk,
3152          * an actual RDMA write will occur and a new chunk will be formed.
3153          */
3154         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
3155         if (ret < 0) {
3156             error_report("rdma migration: write error! %d", ret);
3157             goto err;
3158         }
3159 
3160         /*
3161          * We always return 1 bytes because the RDMA
3162          * protocol is completely asynchronous. We do not yet know
3163          * whether an  identified chunk is zero or not because we're
3164          * waiting for other pages to potentially be merged with
3165          * the current chunk. So, we have to call qemu_update_position()
3166          * later on when the actual write occurs.
3167          */
3168         if (bytes_sent) {
3169             *bytes_sent = 1;
3170         }
3171     } else {
3172         uint64_t index, chunk;
3173 
3174         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
3175         if (size < 0) {
3176             ret = qemu_rdma_drain_cq(f, rdma);
3177             if (ret < 0) {
3178                 fprintf(stderr, "rdma: failed to synchronously drain"
3179                                 " completion queue before unregistration.\n");
3180                 goto err;
3181             }
3182         }
3183         */
3184 
3185         ret = qemu_rdma_search_ram_block(rdma, block_offset,
3186                                          offset, size, &index, &chunk);
3187 
3188         if (ret) {
3189             error_report("ram block search failed");
3190             goto err;
3191         }
3192 
3193         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
3194 
3195         /*
3196          * TODO: Synchronous, guaranteed unregistration (should not occur during
3197          * fast-path). Otherwise, unregisters will process on the next call to
3198          * qemu_rdma_drain_cq()
3199         if (size < 0) {
3200             qemu_rdma_unregister_waiting(rdma);
3201         }
3202         */
3203     }
3204 
3205     /*
3206      * Drain the Completion Queue if possible, but do not block,
3207      * just poll.
3208      *
3209      * If nothing to poll, the end of the iteration will do this
3210      * again to make sure we don't overflow the request queue.
3211      */
3212     while (1) {
3213         uint64_t wr_id, wr_id_in;
3214         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
3215         if (ret < 0) {
3216             error_report("rdma migration: polling error! %d", ret);
3217             goto err;
3218         }
3219 
3220         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3221 
3222         if (wr_id == RDMA_WRID_NONE) {
3223             break;
3224         }
3225     }
3226 
3227     return RAM_SAVE_CONTROL_DELAYED;
3228 err:
3229     rdma->error_state = ret;
3230     return ret;
3231 }
3232 
3233 static void rdma_accept_incoming_migration(void *opaque);
3234 
3235 static void rdma_cm_poll_handler(void *opaque)
3236 {
3237     RDMAContext *rdma = opaque;
3238     int ret;
3239     struct rdma_cm_event *cm_event;
3240     MigrationIncomingState *mis = migration_incoming_get_current();
3241 
3242     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3243     if (ret) {
3244         error_report("get_cm_event failed %d", errno);
3245         return;
3246     }
3247     rdma_ack_cm_event(cm_event);
3248 
3249     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3250         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3251         if (!rdma->error_state &&
3252             migration_incoming_get_current()->state !=
3253               MIGRATION_STATUS_COMPLETED) {
3254             error_report("receive cm event, cm event is %d", cm_event->event);
3255             rdma->error_state = -EPIPE;
3256             if (rdma->return_path) {
3257                 rdma->return_path->error_state = -EPIPE;
3258             }
3259         }
3260 
3261         if (mis->migration_incoming_co) {
3262             qemu_coroutine_enter(mis->migration_incoming_co);
3263         }
3264         return;
3265     }
3266 }
3267 
3268 static int qemu_rdma_accept(RDMAContext *rdma)
3269 {
3270     RDMACapabilities cap;
3271     struct rdma_conn_param conn_param = {
3272                                             .responder_resources = 2,
3273                                             .private_data = &cap,
3274                                             .private_data_len = sizeof(cap),
3275                                          };
3276     struct rdma_cm_event *cm_event;
3277     struct ibv_context *verbs;
3278     int ret = -EINVAL;
3279     int idx;
3280 
3281     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3282     if (ret) {
3283         goto err_rdma_dest_wait;
3284     }
3285 
3286     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3287         rdma_ack_cm_event(cm_event);
3288         goto err_rdma_dest_wait;
3289     }
3290 
3291     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3292 
3293     network_to_caps(&cap);
3294 
3295     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3296             error_report("Unknown source RDMA version: %d, bailing...",
3297                             cap.version);
3298             rdma_ack_cm_event(cm_event);
3299             goto err_rdma_dest_wait;
3300     }
3301 
3302     /*
3303      * Respond with only the capabilities this version of QEMU knows about.
3304      */
3305     cap.flags &= known_capabilities;
3306 
3307     /*
3308      * Enable the ones that we do know about.
3309      * Add other checks here as new ones are introduced.
3310      */
3311     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3312         rdma->pin_all = true;
3313     }
3314 
3315     rdma->cm_id = cm_event->id;
3316     verbs = cm_event->id->verbs;
3317 
3318     rdma_ack_cm_event(cm_event);
3319 
3320     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3321 
3322     caps_to_network(&cap);
3323 
3324     trace_qemu_rdma_accept_pin_verbsc(verbs);
3325 
3326     if (!rdma->verbs) {
3327         rdma->verbs = verbs;
3328     } else if (rdma->verbs != verbs) {
3329             error_report("ibv context not matching %p, %p!", rdma->verbs,
3330                          verbs);
3331             goto err_rdma_dest_wait;
3332     }
3333 
3334     qemu_rdma_dump_id("dest_init", verbs);
3335 
3336     ret = qemu_rdma_alloc_pd_cq(rdma);
3337     if (ret) {
3338         error_report("rdma migration: error allocating pd and cq!");
3339         goto err_rdma_dest_wait;
3340     }
3341 
3342     ret = qemu_rdma_alloc_qp(rdma);
3343     if (ret) {
3344         error_report("rdma migration: error allocating qp!");
3345         goto err_rdma_dest_wait;
3346     }
3347 
3348     ret = qemu_rdma_init_ram_blocks(rdma);
3349     if (ret) {
3350         error_report("rdma migration: error initializing ram blocks!");
3351         goto err_rdma_dest_wait;
3352     }
3353 
3354     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3355         ret = qemu_rdma_reg_control(rdma, idx);
3356         if (ret) {
3357             error_report("rdma: error registering %d control", idx);
3358             goto err_rdma_dest_wait;
3359         }
3360     }
3361 
3362     /* Accept the second connection request for return path */
3363     if (migrate_postcopy() && !rdma->is_return_path) {
3364         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3365                             NULL,
3366                             (void *)(intptr_t)rdma->return_path);
3367     } else {
3368         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3369                             NULL, rdma);
3370     }
3371 
3372     ret = rdma_accept(rdma->cm_id, &conn_param);
3373     if (ret) {
3374         error_report("rdma_accept returns %d", ret);
3375         goto err_rdma_dest_wait;
3376     }
3377 
3378     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3379     if (ret) {
3380         error_report("rdma_accept get_cm_event failed %d", ret);
3381         goto err_rdma_dest_wait;
3382     }
3383 
3384     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3385         error_report("rdma_accept not event established");
3386         rdma_ack_cm_event(cm_event);
3387         goto err_rdma_dest_wait;
3388     }
3389 
3390     rdma_ack_cm_event(cm_event);
3391     rdma->connected = true;
3392 
3393     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3394     if (ret) {
3395         error_report("rdma migration: error posting second control recv");
3396         goto err_rdma_dest_wait;
3397     }
3398 
3399     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3400 
3401     return 0;
3402 
3403 err_rdma_dest_wait:
3404     rdma->error_state = ret;
3405     qemu_rdma_cleanup(rdma);
3406     return ret;
3407 }
3408 
3409 static int dest_ram_sort_func(const void *a, const void *b)
3410 {
3411     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3412     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3413 
3414     return (a_index < b_index) ? -1 : (a_index != b_index);
3415 }
3416 
3417 /*
3418  * During each iteration of the migration, we listen for instructions
3419  * by the source VM to perform dynamic page registrations before they
3420  * can perform RDMA operations.
3421  *
3422  * We respond with the 'rkey'.
3423  *
3424  * Keep doing this until the source tells us to stop.
3425  */
3426 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3427 {
3428     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3429                                .type = RDMA_CONTROL_REGISTER_RESULT,
3430                                .repeat = 0,
3431                              };
3432     RDMAControlHeader unreg_resp = { .len = 0,
3433                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3434                                .repeat = 0,
3435                              };
3436     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3437                                  .repeat = 1 };
3438     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3439     RDMAContext *rdma;
3440     RDMALocalBlocks *local;
3441     RDMAControlHeader head;
3442     RDMARegister *reg, *registers;
3443     RDMACompress *comp;
3444     RDMARegisterResult *reg_result;
3445     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3446     RDMALocalBlock *block;
3447     void *host_addr;
3448     int ret = 0;
3449     int idx = 0;
3450     int count = 0;
3451     int i = 0;
3452 
3453     RCU_READ_LOCK_GUARD();
3454     rdma = atomic_rcu_read(&rioc->rdmain);
3455 
3456     if (!rdma) {
3457         return -EIO;
3458     }
3459 
3460     CHECK_ERROR_STATE();
3461 
3462     local = &rdma->local_ram_blocks;
3463     do {
3464         trace_qemu_rdma_registration_handle_wait();
3465 
3466         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3467 
3468         if (ret < 0) {
3469             break;
3470         }
3471 
3472         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3473             error_report("rdma: Too many requests in this message (%d)."
3474                             "Bailing.", head.repeat);
3475             ret = -EIO;
3476             break;
3477         }
3478 
3479         switch (head.type) {
3480         case RDMA_CONTROL_COMPRESS:
3481             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3482             network_to_compress(comp);
3483 
3484             trace_qemu_rdma_registration_handle_compress(comp->length,
3485                                                          comp->block_idx,
3486                                                          comp->offset);
3487             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3488                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3489                              (unsigned int)comp->block_idx,
3490                              rdma->local_ram_blocks.nb_blocks);
3491                 ret = -EIO;
3492                 goto out;
3493             }
3494             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3495 
3496             host_addr = block->local_host_addr +
3497                             (comp->offset - block->offset);
3498 
3499             ram_handle_compressed(host_addr, comp->value, comp->length);
3500             break;
3501 
3502         case RDMA_CONTROL_REGISTER_FINISHED:
3503             trace_qemu_rdma_registration_handle_finished();
3504             goto out;
3505 
3506         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3507             trace_qemu_rdma_registration_handle_ram_blocks();
3508 
3509             /* Sort our local RAM Block list so it's the same as the source,
3510              * we can do this since we've filled in a src_index in the list
3511              * as we received the RAMBlock list earlier.
3512              */
3513             qsort(rdma->local_ram_blocks.block,
3514                   rdma->local_ram_blocks.nb_blocks,
3515                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3516             for (i = 0; i < local->nb_blocks; i++) {
3517                 local->block[i].index = i;
3518             }
3519 
3520             if (rdma->pin_all) {
3521                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3522                 if (ret) {
3523                     error_report("rdma migration: error dest "
3524                                     "registering ram blocks");
3525                     goto out;
3526                 }
3527             }
3528 
3529             /*
3530              * Dest uses this to prepare to transmit the RAMBlock descriptions
3531              * to the source VM after connection setup.
3532              * Both sides use the "remote" structure to communicate and update
3533              * their "local" descriptions with what was sent.
3534              */
3535             for (i = 0; i < local->nb_blocks; i++) {
3536                 rdma->dest_blocks[i].remote_host_addr =
3537                     (uintptr_t)(local->block[i].local_host_addr);
3538 
3539                 if (rdma->pin_all) {
3540                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3541                 }
3542 
3543                 rdma->dest_blocks[i].offset = local->block[i].offset;
3544                 rdma->dest_blocks[i].length = local->block[i].length;
3545 
3546                 dest_block_to_network(&rdma->dest_blocks[i]);
3547                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3548                     local->block[i].block_name,
3549                     local->block[i].offset,
3550                     local->block[i].length,
3551                     local->block[i].local_host_addr,
3552                     local->block[i].src_index);
3553             }
3554 
3555             blocks.len = rdma->local_ram_blocks.nb_blocks
3556                                                 * sizeof(RDMADestBlock);
3557 
3558 
3559             ret = qemu_rdma_post_send_control(rdma,
3560                                         (uint8_t *) rdma->dest_blocks, &blocks);
3561 
3562             if (ret < 0) {
3563                 error_report("rdma migration: error sending remote info");
3564                 goto out;
3565             }
3566 
3567             break;
3568         case RDMA_CONTROL_REGISTER_REQUEST:
3569             trace_qemu_rdma_registration_handle_register(head.repeat);
3570 
3571             reg_resp.repeat = head.repeat;
3572             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3573 
3574             for (count = 0; count < head.repeat; count++) {
3575                 uint64_t chunk;
3576                 uint8_t *chunk_start, *chunk_end;
3577 
3578                 reg = &registers[count];
3579                 network_to_register(reg);
3580 
3581                 reg_result = &results[count];
3582 
3583                 trace_qemu_rdma_registration_handle_register_loop(count,
3584                          reg->current_index, reg->key.current_addr, reg->chunks);
3585 
3586                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3587                     error_report("rdma: 'register' bad block index %u (vs %d)",
3588                                  (unsigned int)reg->current_index,
3589                                  rdma->local_ram_blocks.nb_blocks);
3590                     ret = -ENOENT;
3591                     goto out;
3592                 }
3593                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3594                 if (block->is_ram_block) {
3595                     if (block->offset > reg->key.current_addr) {
3596                         error_report("rdma: bad register address for block %s"
3597                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3598                             block->block_name, block->offset,
3599                             reg->key.current_addr);
3600                         ret = -ERANGE;
3601                         goto out;
3602                     }
3603                     host_addr = (block->local_host_addr +
3604                                 (reg->key.current_addr - block->offset));
3605                     chunk = ram_chunk_index(block->local_host_addr,
3606                                             (uint8_t *) host_addr);
3607                 } else {
3608                     chunk = reg->key.chunk;
3609                     host_addr = block->local_host_addr +
3610                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3611                     /* Check for particularly bad chunk value */
3612                     if (host_addr < (void *)block->local_host_addr) {
3613                         error_report("rdma: bad chunk for block %s"
3614                             " chunk: %" PRIx64,
3615                             block->block_name, reg->key.chunk);
3616                         ret = -ERANGE;
3617                         goto out;
3618                     }
3619                 }
3620                 chunk_start = ram_chunk_start(block, chunk);
3621                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3622                 /* avoid "-Waddress-of-packed-member" warning */
3623                 uint32_t tmp_rkey = 0;
3624                 if (qemu_rdma_register_and_get_keys(rdma, block,
3625                             (uintptr_t)host_addr, NULL, &tmp_rkey,
3626                             chunk, chunk_start, chunk_end)) {
3627                     error_report("cannot get rkey");
3628                     ret = -EINVAL;
3629                     goto out;
3630                 }
3631                 reg_result->rkey = tmp_rkey;
3632 
3633                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3634 
3635                 trace_qemu_rdma_registration_handle_register_rkey(
3636                                                            reg_result->rkey);
3637 
3638                 result_to_network(reg_result);
3639             }
3640 
3641             ret = qemu_rdma_post_send_control(rdma,
3642                             (uint8_t *) results, &reg_resp);
3643 
3644             if (ret < 0) {
3645                 error_report("Failed to send control buffer");
3646                 goto out;
3647             }
3648             break;
3649         case RDMA_CONTROL_UNREGISTER_REQUEST:
3650             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3651             unreg_resp.repeat = head.repeat;
3652             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3653 
3654             for (count = 0; count < head.repeat; count++) {
3655                 reg = &registers[count];
3656                 network_to_register(reg);
3657 
3658                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3659                            reg->current_index, reg->key.chunk);
3660 
3661                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3662 
3663                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3664                 block->pmr[reg->key.chunk] = NULL;
3665 
3666                 if (ret != 0) {
3667                     perror("rdma unregistration chunk failed");
3668                     ret = -ret;
3669                     goto out;
3670                 }
3671 
3672                 rdma->total_registrations--;
3673 
3674                 trace_qemu_rdma_registration_handle_unregister_success(
3675                                                        reg->key.chunk);
3676             }
3677 
3678             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3679 
3680             if (ret < 0) {
3681                 error_report("Failed to send control buffer");
3682                 goto out;
3683             }
3684             break;
3685         case RDMA_CONTROL_REGISTER_RESULT:
3686             error_report("Invalid RESULT message at dest.");
3687             ret = -EIO;
3688             goto out;
3689         default:
3690             error_report("Unknown control message %s", control_desc(head.type));
3691             ret = -EIO;
3692             goto out;
3693         }
3694     } while (1);
3695 out:
3696     if (ret < 0) {
3697         rdma->error_state = ret;
3698     }
3699     return ret;
3700 }
3701 
3702 /* Destination:
3703  * Called via a ram_control_load_hook during the initial RAM load section which
3704  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3705  * on the source.
3706  * We've already built our local RAMBlock list, but not yet sent the list to
3707  * the source.
3708  */
3709 static int
3710 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3711 {
3712     RDMAContext *rdma;
3713     int curr;
3714     int found = -1;
3715 
3716     RCU_READ_LOCK_GUARD();
3717     rdma = atomic_rcu_read(&rioc->rdmain);
3718 
3719     if (!rdma) {
3720         return -EIO;
3721     }
3722 
3723     /* Find the matching RAMBlock in our local list */
3724     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3725         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3726             found = curr;
3727             break;
3728         }
3729     }
3730 
3731     if (found == -1) {
3732         error_report("RAMBlock '%s' not found on destination", name);
3733         return -ENOENT;
3734     }
3735 
3736     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3737     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3738     rdma->next_src_index++;
3739 
3740     return 0;
3741 }
3742 
3743 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3744 {
3745     switch (flags) {
3746     case RAM_CONTROL_BLOCK_REG:
3747         return rdma_block_notification_handle(opaque, data);
3748 
3749     case RAM_CONTROL_HOOK:
3750         return qemu_rdma_registration_handle(f, opaque);
3751 
3752     default:
3753         /* Shouldn't be called with any other values */
3754         abort();
3755     }
3756 }
3757 
3758 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3759                                         uint64_t flags, void *data)
3760 {
3761     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3762     RDMAContext *rdma;
3763 
3764     RCU_READ_LOCK_GUARD();
3765     rdma = atomic_rcu_read(&rioc->rdmaout);
3766     if (!rdma) {
3767         return -EIO;
3768     }
3769 
3770     CHECK_ERROR_STATE();
3771 
3772     if (migration_in_postcopy()) {
3773         return 0;
3774     }
3775 
3776     trace_qemu_rdma_registration_start(flags);
3777     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3778     qemu_fflush(f);
3779 
3780     return 0;
3781 }
3782 
3783 /*
3784  * Inform dest that dynamic registrations are done for now.
3785  * First, flush writes, if any.
3786  */
3787 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3788                                        uint64_t flags, void *data)
3789 {
3790     Error *local_err = NULL, **errp = &local_err;
3791     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3792     RDMAContext *rdma;
3793     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3794     int ret = 0;
3795 
3796     RCU_READ_LOCK_GUARD();
3797     rdma = atomic_rcu_read(&rioc->rdmaout);
3798     if (!rdma) {
3799         return -EIO;
3800     }
3801 
3802     CHECK_ERROR_STATE();
3803 
3804     if (migration_in_postcopy()) {
3805         return 0;
3806     }
3807 
3808     qemu_fflush(f);
3809     ret = qemu_rdma_drain_cq(f, rdma);
3810 
3811     if (ret < 0) {
3812         goto err;
3813     }
3814 
3815     if (flags == RAM_CONTROL_SETUP) {
3816         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3817         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3818         int reg_result_idx, i, nb_dest_blocks;
3819 
3820         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3821         trace_qemu_rdma_registration_stop_ram();
3822 
3823         /*
3824          * Make sure that we parallelize the pinning on both sides.
3825          * For very large guests, doing this serially takes a really
3826          * long time, so we have to 'interleave' the pinning locally
3827          * with the control messages by performing the pinning on this
3828          * side before we receive the control response from the other
3829          * side that the pinning has completed.
3830          */
3831         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3832                     &reg_result_idx, rdma->pin_all ?
3833                     qemu_rdma_reg_whole_ram_blocks : NULL);
3834         if (ret < 0) {
3835             ERROR(errp, "receiving remote info!");
3836             return ret;
3837         }
3838 
3839         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3840 
3841         /*
3842          * The protocol uses two different sets of rkeys (mutually exclusive):
3843          * 1. One key to represent the virtual address of the entire ram block.
3844          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3845          * 2. One to represent individual chunks within a ram block.
3846          *    (dynamic chunk registration enabled - pin individual chunks.)
3847          *
3848          * Once the capability is successfully negotiated, the destination transmits
3849          * the keys to use (or sends them later) including the virtual addresses
3850          * and then propagates the remote ram block descriptions to his local copy.
3851          */
3852 
3853         if (local->nb_blocks != nb_dest_blocks) {
3854             ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3855                         "Your QEMU command line parameters are probably "
3856                         "not identical on both the source and destination.",
3857                         local->nb_blocks, nb_dest_blocks);
3858             rdma->error_state = -EINVAL;
3859             return -EINVAL;
3860         }
3861 
3862         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3863         memcpy(rdma->dest_blocks,
3864             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3865         for (i = 0; i < nb_dest_blocks; i++) {
3866             network_to_dest_block(&rdma->dest_blocks[i]);
3867 
3868             /* We require that the blocks are in the same order */
3869             if (rdma->dest_blocks[i].length != local->block[i].length) {
3870                 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3871                             "vs %" PRIu64, local->block[i].block_name, i,
3872                             local->block[i].length,
3873                             rdma->dest_blocks[i].length);
3874                 rdma->error_state = -EINVAL;
3875                 return -EINVAL;
3876             }
3877             local->block[i].remote_host_addr =
3878                     rdma->dest_blocks[i].remote_host_addr;
3879             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3880         }
3881     }
3882 
3883     trace_qemu_rdma_registration_stop(flags);
3884 
3885     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3886     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3887 
3888     if (ret < 0) {
3889         goto err;
3890     }
3891 
3892     return 0;
3893 err:
3894     rdma->error_state = ret;
3895     return ret;
3896 }
3897 
3898 static const QEMUFileHooks rdma_read_hooks = {
3899     .hook_ram_load = rdma_load_hook,
3900 };
3901 
3902 static const QEMUFileHooks rdma_write_hooks = {
3903     .before_ram_iterate = qemu_rdma_registration_start,
3904     .after_ram_iterate  = qemu_rdma_registration_stop,
3905     .save_page          = qemu_rdma_save_page,
3906 };
3907 
3908 
3909 static void qio_channel_rdma_finalize(Object *obj)
3910 {
3911     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3912     if (rioc->rdmain) {
3913         qemu_rdma_cleanup(rioc->rdmain);
3914         g_free(rioc->rdmain);
3915         rioc->rdmain = NULL;
3916     }
3917     if (rioc->rdmaout) {
3918         qemu_rdma_cleanup(rioc->rdmaout);
3919         g_free(rioc->rdmaout);
3920         rioc->rdmaout = NULL;
3921     }
3922 }
3923 
3924 static void qio_channel_rdma_class_init(ObjectClass *klass,
3925                                         void *class_data G_GNUC_UNUSED)
3926 {
3927     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3928 
3929     ioc_klass->io_writev = qio_channel_rdma_writev;
3930     ioc_klass->io_readv = qio_channel_rdma_readv;
3931     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3932     ioc_klass->io_close = qio_channel_rdma_close;
3933     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3934     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3935     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3936 }
3937 
3938 static const TypeInfo qio_channel_rdma_info = {
3939     .parent = TYPE_QIO_CHANNEL,
3940     .name = TYPE_QIO_CHANNEL_RDMA,
3941     .instance_size = sizeof(QIOChannelRDMA),
3942     .instance_finalize = qio_channel_rdma_finalize,
3943     .class_init = qio_channel_rdma_class_init,
3944 };
3945 
3946 static void qio_channel_rdma_register_types(void)
3947 {
3948     type_register_static(&qio_channel_rdma_info);
3949 }
3950 
3951 type_init(qio_channel_rdma_register_types);
3952 
3953 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3954 {
3955     QIOChannelRDMA *rioc;
3956 
3957     if (qemu_file_mode_is_not_valid(mode)) {
3958         return NULL;
3959     }
3960 
3961     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3962 
3963     if (mode[0] == 'w') {
3964         rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3965         rioc->rdmaout = rdma;
3966         rioc->rdmain = rdma->return_path;
3967         qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3968     } else {
3969         rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3970         rioc->rdmain = rdma;
3971         rioc->rdmaout = rdma->return_path;
3972         qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3973     }
3974 
3975     return rioc->file;
3976 }
3977 
3978 static void rdma_accept_incoming_migration(void *opaque)
3979 {
3980     RDMAContext *rdma = opaque;
3981     int ret;
3982     QEMUFile *f;
3983     Error *local_err = NULL, **errp = &local_err;
3984 
3985     trace_qemu_rdma_accept_incoming_migration();
3986     ret = qemu_rdma_accept(rdma);
3987 
3988     if (ret) {
3989         ERROR(errp, "RDMA Migration initialization failed!");
3990         return;
3991     }
3992 
3993     trace_qemu_rdma_accept_incoming_migration_accepted();
3994 
3995     if (rdma->is_return_path) {
3996         return;
3997     }
3998 
3999     f = qemu_fopen_rdma(rdma, "rb");
4000     if (f == NULL) {
4001         ERROR(errp, "could not qemu_fopen_rdma!");
4002         qemu_rdma_cleanup(rdma);
4003         return;
4004     }
4005 
4006     rdma->migration_started_on_destination = 1;
4007     migration_fd_process_incoming(f);
4008 }
4009 
4010 void rdma_start_incoming_migration(const char *host_port, Error **errp)
4011 {
4012     int ret;
4013     RDMAContext *rdma, *rdma_return_path = NULL;
4014     Error *local_err = NULL;
4015 
4016     trace_rdma_start_incoming_migration();
4017     rdma = qemu_rdma_data_init(host_port, &local_err);
4018 
4019     if (rdma == NULL) {
4020         goto err;
4021     }
4022 
4023     ret = qemu_rdma_dest_init(rdma, &local_err);
4024 
4025     if (ret) {
4026         goto err;
4027     }
4028 
4029     trace_rdma_start_incoming_migration_after_dest_init();
4030 
4031     ret = rdma_listen(rdma->listen_id, 5);
4032 
4033     if (ret) {
4034         ERROR(errp, "listening on socket!");
4035         goto err;
4036     }
4037 
4038     trace_rdma_start_incoming_migration_after_rdma_listen();
4039 
4040     /* initialize the RDMAContext for return path */
4041     if (migrate_postcopy()) {
4042         rdma_return_path = qemu_rdma_data_init(host_port, &local_err);
4043 
4044         if (rdma_return_path == NULL) {
4045             goto err;
4046         }
4047 
4048         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
4049     }
4050 
4051     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4052                         NULL, (void *)(intptr_t)rdma);
4053     return;
4054 err:
4055     error_propagate(errp, local_err);
4056     g_free(rdma);
4057     g_free(rdma_return_path);
4058 }
4059 
4060 void rdma_start_outgoing_migration(void *opaque,
4061                             const char *host_port, Error **errp)
4062 {
4063     MigrationState *s = opaque;
4064     RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
4065     RDMAContext *rdma_return_path = NULL;
4066     int ret = 0;
4067 
4068     if (rdma == NULL) {
4069         goto err;
4070     }
4071 
4072     ret = qemu_rdma_source_init(rdma,
4073         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4074 
4075     if (ret) {
4076         goto err;
4077     }
4078 
4079     trace_rdma_start_outgoing_migration_after_rdma_source_init();
4080     ret = qemu_rdma_connect(rdma, errp);
4081 
4082     if (ret) {
4083         goto err;
4084     }
4085 
4086     /* RDMA postcopy need a seprate queue pair for return path */
4087     if (migrate_postcopy()) {
4088         rdma_return_path = qemu_rdma_data_init(host_port, errp);
4089 
4090         if (rdma_return_path == NULL) {
4091             goto err;
4092         }
4093 
4094         ret = qemu_rdma_source_init(rdma_return_path,
4095             s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4096 
4097         if (ret) {
4098             goto err;
4099         }
4100 
4101         ret = qemu_rdma_connect(rdma_return_path, errp);
4102 
4103         if (ret) {
4104             goto err;
4105         }
4106 
4107         rdma->return_path = rdma_return_path;
4108         rdma_return_path->return_path = rdma;
4109         rdma_return_path->is_return_path = true;
4110     }
4111 
4112     trace_rdma_start_outgoing_migration_after_rdma_connect();
4113 
4114     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
4115     migrate_fd_connect(s, NULL);
4116     return;
4117 err:
4118     g_free(rdma);
4119     g_free(rdma_return_path);
4120 }
4121