xref: /openbmc/qemu/migration/rdma.c (revision 65d1a2bd)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "rdma.h"
21 #include "migration.h"
22 #include "qemu-file.h"
23 #include "ram.h"
24 #include "qemu-file-channel.h"
25 #include "qemu/error-report.h"
26 #include "qemu/main-loop.h"
27 #include "qemu/module.h"
28 #include "qemu/rcu.h"
29 #include "qemu/sockets.h"
30 #include "qemu/bitmap.h"
31 #include "qemu/coroutine.h"
32 #include "exec/memory.h"
33 #include <sys/socket.h>
34 #include <netdb.h>
35 #include <arpa/inet.h>
36 #include <rdma/rdma_cma.h>
37 #include "trace.h"
38 #include "qom/object.h"
39 #include <poll.h>
40 
41 /*
42  * Print and error on both the Monitor and the Log file.
43  */
44 #define ERROR(errp, fmt, ...) \
45     do { \
46         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
47         if (errp && (*(errp) == NULL)) { \
48             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
49         } \
50     } while (0)
51 
52 #define RDMA_RESOLVE_TIMEOUT_MS 10000
53 
54 /* Do not merge data if larger than this. */
55 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
56 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
57 
58 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
59 
60 /*
61  * This is only for non-live state being migrated.
62  * Instead of RDMA_WRITE messages, we use RDMA_SEND
63  * messages for that state, which requires a different
64  * delivery design than main memory.
65  */
66 #define RDMA_SEND_INCREMENT 32768
67 
68 /*
69  * Maximum size infiniband SEND message
70  */
71 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
72 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
73 
74 #define RDMA_CONTROL_VERSION_CURRENT 1
75 /*
76  * Capabilities for negotiation.
77  */
78 #define RDMA_CAPABILITY_PIN_ALL 0x01
79 
80 /*
81  * Add the other flags above to this list of known capabilities
82  * as they are introduced.
83  */
84 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
85 
86 #define CHECK_ERROR_STATE() \
87     do { \
88         if (rdma->error_state) { \
89             if (!rdma->error_reported) { \
90                 error_report("RDMA is in an error state waiting migration" \
91                                 " to abort!"); \
92                 rdma->error_reported = 1; \
93             } \
94             return rdma->error_state; \
95         } \
96     } while (0)
97 
98 /*
99  * A work request ID is 64-bits and we split up these bits
100  * into 3 parts:
101  *
102  * bits 0-15 : type of control message, 2^16
103  * bits 16-29: ram block index, 2^14
104  * bits 30-63: ram block chunk number, 2^34
105  *
106  * The last two bit ranges are only used for RDMA writes,
107  * in order to track their completion and potentially
108  * also track unregistration status of the message.
109  */
110 #define RDMA_WRID_TYPE_SHIFT  0UL
111 #define RDMA_WRID_BLOCK_SHIFT 16UL
112 #define RDMA_WRID_CHUNK_SHIFT 30UL
113 
114 #define RDMA_WRID_TYPE_MASK \
115     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
116 
117 #define RDMA_WRID_BLOCK_MASK \
118     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
119 
120 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
121 
122 /*
123  * RDMA migration protocol:
124  * 1. RDMA Writes (data messages, i.e. RAM)
125  * 2. IB Send/Recv (control channel messages)
126  */
127 enum {
128     RDMA_WRID_NONE = 0,
129     RDMA_WRID_RDMA_WRITE = 1,
130     RDMA_WRID_SEND_CONTROL = 2000,
131     RDMA_WRID_RECV_CONTROL = 4000,
132 };
133 
134 static const char *wrid_desc[] = {
135     [RDMA_WRID_NONE] = "NONE",
136     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
137     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
138     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
139 };
140 
141 /*
142  * Work request IDs for IB SEND messages only (not RDMA writes).
143  * This is used by the migration protocol to transmit
144  * control messages (such as device state and registration commands)
145  *
146  * We could use more WRs, but we have enough for now.
147  */
148 enum {
149     RDMA_WRID_READY = 0,
150     RDMA_WRID_DATA,
151     RDMA_WRID_CONTROL,
152     RDMA_WRID_MAX,
153 };
154 
155 /*
156  * SEND/RECV IB Control Messages.
157  */
158 enum {
159     RDMA_CONTROL_NONE = 0,
160     RDMA_CONTROL_ERROR,
161     RDMA_CONTROL_READY,               /* ready to receive */
162     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
163     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
164     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
165     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
166     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
167     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
168     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
169     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
170     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
171 };
172 
173 
174 /*
175  * Memory and MR structures used to represent an IB Send/Recv work request.
176  * This is *not* used for RDMA writes, only IB Send/Recv.
177  */
178 typedef struct {
179     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
180     struct   ibv_mr *control_mr;               /* registration metadata */
181     size_t   control_len;                      /* length of the message */
182     uint8_t *control_curr;                     /* start of unconsumed bytes */
183 } RDMAWorkRequestData;
184 
185 /*
186  * Negotiate RDMA capabilities during connection-setup time.
187  */
188 typedef struct {
189     uint32_t version;
190     uint32_t flags;
191 } RDMACapabilities;
192 
193 static void caps_to_network(RDMACapabilities *cap)
194 {
195     cap->version = htonl(cap->version);
196     cap->flags = htonl(cap->flags);
197 }
198 
199 static void network_to_caps(RDMACapabilities *cap)
200 {
201     cap->version = ntohl(cap->version);
202     cap->flags = ntohl(cap->flags);
203 }
204 
205 /*
206  * Representation of a RAMBlock from an RDMA perspective.
207  * This is not transmitted, only local.
208  * This and subsequent structures cannot be linked lists
209  * because we're using a single IB message to transmit
210  * the information. It's small anyway, so a list is overkill.
211  */
212 typedef struct RDMALocalBlock {
213     char          *block_name;
214     uint8_t       *local_host_addr; /* local virtual address */
215     uint64_t       remote_host_addr; /* remote virtual address */
216     uint64_t       offset;
217     uint64_t       length;
218     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
219     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
220     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
221     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
222     int            index;           /* which block are we */
223     unsigned int   src_index;       /* (Only used on dest) */
224     bool           is_ram_block;
225     int            nb_chunks;
226     unsigned long *transit_bitmap;
227     unsigned long *unregister_bitmap;
228 } RDMALocalBlock;
229 
230 /*
231  * Also represents a RAMblock, but only on the dest.
232  * This gets transmitted by the dest during connection-time
233  * to the source VM and then is used to populate the
234  * corresponding RDMALocalBlock with
235  * the information needed to perform the actual RDMA.
236  */
237 typedef struct QEMU_PACKED RDMADestBlock {
238     uint64_t remote_host_addr;
239     uint64_t offset;
240     uint64_t length;
241     uint32_t remote_rkey;
242     uint32_t padding;
243 } RDMADestBlock;
244 
245 static const char *control_desc(unsigned int rdma_control)
246 {
247     static const char *strs[] = {
248         [RDMA_CONTROL_NONE] = "NONE",
249         [RDMA_CONTROL_ERROR] = "ERROR",
250         [RDMA_CONTROL_READY] = "READY",
251         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
252         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
253         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
254         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
255         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
256         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
257         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
258         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
259         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
260     };
261 
262     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
263         return "??BAD CONTROL VALUE??";
264     }
265 
266     return strs[rdma_control];
267 }
268 
269 static uint64_t htonll(uint64_t v)
270 {
271     union { uint32_t lv[2]; uint64_t llv; } u;
272     u.lv[0] = htonl(v >> 32);
273     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
274     return u.llv;
275 }
276 
277 static uint64_t ntohll(uint64_t v)
278 {
279     union { uint32_t lv[2]; uint64_t llv; } u;
280     u.llv = v;
281     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
282 }
283 
284 static void dest_block_to_network(RDMADestBlock *db)
285 {
286     db->remote_host_addr = htonll(db->remote_host_addr);
287     db->offset = htonll(db->offset);
288     db->length = htonll(db->length);
289     db->remote_rkey = htonl(db->remote_rkey);
290 }
291 
292 static void network_to_dest_block(RDMADestBlock *db)
293 {
294     db->remote_host_addr = ntohll(db->remote_host_addr);
295     db->offset = ntohll(db->offset);
296     db->length = ntohll(db->length);
297     db->remote_rkey = ntohl(db->remote_rkey);
298 }
299 
300 /*
301  * Virtual address of the above structures used for transmitting
302  * the RAMBlock descriptions at connection-time.
303  * This structure is *not* transmitted.
304  */
305 typedef struct RDMALocalBlocks {
306     int nb_blocks;
307     bool     init;             /* main memory init complete */
308     RDMALocalBlock *block;
309 } RDMALocalBlocks;
310 
311 /*
312  * Main data structure for RDMA state.
313  * While there is only one copy of this structure being allocated right now,
314  * this is the place where one would start if you wanted to consider
315  * having more than one RDMA connection open at the same time.
316  */
317 typedef struct RDMAContext {
318     char *host;
319     int port;
320     char *host_port;
321 
322     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
323 
324     /*
325      * This is used by *_exchange_send() to figure out whether or not
326      * the initial "READY" message has already been received or not.
327      * This is because other functions may potentially poll() and detect
328      * the READY message before send() does, in which case we need to
329      * know if it completed.
330      */
331     int control_ready_expected;
332 
333     /* number of outstanding writes */
334     int nb_sent;
335 
336     /* store info about current buffer so that we can
337        merge it with future sends */
338     uint64_t current_addr;
339     uint64_t current_length;
340     /* index of ram block the current buffer belongs to */
341     int current_index;
342     /* index of the chunk in the current ram block */
343     int current_chunk;
344 
345     bool pin_all;
346 
347     /*
348      * infiniband-specific variables for opening the device
349      * and maintaining connection state and so forth.
350      *
351      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
352      * cm_id->verbs, cm_id->channel, and cm_id->qp.
353      */
354     struct rdma_cm_id *cm_id;               /* connection manager ID */
355     struct rdma_cm_id *listen_id;
356     bool connected;
357 
358     struct ibv_context          *verbs;
359     struct rdma_event_channel   *channel;
360     struct ibv_qp *qp;                      /* queue pair */
361     struct ibv_comp_channel *comp_channel;  /* completion channel */
362     struct ibv_pd *pd;                      /* protection domain */
363     struct ibv_cq *cq;                      /* completion queue */
364 
365     /*
366      * If a previous write failed (perhaps because of a failed
367      * memory registration, then do not attempt any future work
368      * and remember the error state.
369      */
370     int error_state;
371     int error_reported;
372     int received_error;
373 
374     /*
375      * Description of ram blocks used throughout the code.
376      */
377     RDMALocalBlocks local_ram_blocks;
378     RDMADestBlock  *dest_blocks;
379 
380     /* Index of the next RAMBlock received during block registration */
381     unsigned int    next_src_index;
382 
383     /*
384      * Migration on *destination* started.
385      * Then use coroutine yield function.
386      * Source runs in a thread, so we don't care.
387      */
388     int migration_started_on_destination;
389 
390     int total_registrations;
391     int total_writes;
392 
393     int unregister_current, unregister_next;
394     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
395 
396     GHashTable *blockmap;
397 
398     /* the RDMAContext for return path */
399     struct RDMAContext *return_path;
400     bool is_return_path;
401 } RDMAContext;
402 
403 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
404 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
405 
406 
407 
408 struct QIOChannelRDMA {
409     QIOChannel parent;
410     RDMAContext *rdmain;
411     RDMAContext *rdmaout;
412     QEMUFile *file;
413     bool blocking; /* XXX we don't actually honour this yet */
414 };
415 
416 /*
417  * Main structure for IB Send/Recv control messages.
418  * This gets prepended at the beginning of every Send/Recv.
419  */
420 typedef struct QEMU_PACKED {
421     uint32_t len;     /* Total length of data portion */
422     uint32_t type;    /* which control command to perform */
423     uint32_t repeat;  /* number of commands in data portion of same type */
424     uint32_t padding;
425 } RDMAControlHeader;
426 
427 static void control_to_network(RDMAControlHeader *control)
428 {
429     control->type = htonl(control->type);
430     control->len = htonl(control->len);
431     control->repeat = htonl(control->repeat);
432 }
433 
434 static void network_to_control(RDMAControlHeader *control)
435 {
436     control->type = ntohl(control->type);
437     control->len = ntohl(control->len);
438     control->repeat = ntohl(control->repeat);
439 }
440 
441 /*
442  * Register a single Chunk.
443  * Information sent by the source VM to inform the dest
444  * to register an single chunk of memory before we can perform
445  * the actual RDMA operation.
446  */
447 typedef struct QEMU_PACKED {
448     union QEMU_PACKED {
449         uint64_t current_addr;  /* offset into the ram_addr_t space */
450         uint64_t chunk;         /* chunk to lookup if unregistering */
451     } key;
452     uint32_t current_index; /* which ramblock the chunk belongs to */
453     uint32_t padding;
454     uint64_t chunks;            /* how many sequential chunks to register */
455 } RDMARegister;
456 
457 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
458 {
459     RDMALocalBlock *local_block;
460     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
461 
462     if (local_block->is_ram_block) {
463         /*
464          * current_addr as passed in is an address in the local ram_addr_t
465          * space, we need to translate this for the destination
466          */
467         reg->key.current_addr -= local_block->offset;
468         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
469     }
470     reg->key.current_addr = htonll(reg->key.current_addr);
471     reg->current_index = htonl(reg->current_index);
472     reg->chunks = htonll(reg->chunks);
473 }
474 
475 static void network_to_register(RDMARegister *reg)
476 {
477     reg->key.current_addr = ntohll(reg->key.current_addr);
478     reg->current_index = ntohl(reg->current_index);
479     reg->chunks = ntohll(reg->chunks);
480 }
481 
482 typedef struct QEMU_PACKED {
483     uint32_t value;     /* if zero, we will madvise() */
484     uint32_t block_idx; /* which ram block index */
485     uint64_t offset;    /* Address in remote ram_addr_t space */
486     uint64_t length;    /* length of the chunk */
487 } RDMACompress;
488 
489 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
490 {
491     comp->value = htonl(comp->value);
492     /*
493      * comp->offset as passed in is an address in the local ram_addr_t
494      * space, we need to translate this for the destination
495      */
496     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
497     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
498     comp->block_idx = htonl(comp->block_idx);
499     comp->offset = htonll(comp->offset);
500     comp->length = htonll(comp->length);
501 }
502 
503 static void network_to_compress(RDMACompress *comp)
504 {
505     comp->value = ntohl(comp->value);
506     comp->block_idx = ntohl(comp->block_idx);
507     comp->offset = ntohll(comp->offset);
508     comp->length = ntohll(comp->length);
509 }
510 
511 /*
512  * The result of the dest's memory registration produces an "rkey"
513  * which the source VM must reference in order to perform
514  * the RDMA operation.
515  */
516 typedef struct QEMU_PACKED {
517     uint32_t rkey;
518     uint32_t padding;
519     uint64_t host_addr;
520 } RDMARegisterResult;
521 
522 static void result_to_network(RDMARegisterResult *result)
523 {
524     result->rkey = htonl(result->rkey);
525     result->host_addr = htonll(result->host_addr);
526 };
527 
528 static void network_to_result(RDMARegisterResult *result)
529 {
530     result->rkey = ntohl(result->rkey);
531     result->host_addr = ntohll(result->host_addr);
532 };
533 
534 const char *print_wrid(int wrid);
535 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
536                                    uint8_t *data, RDMAControlHeader *resp,
537                                    int *resp_idx,
538                                    int (*callback)(RDMAContext *rdma));
539 
540 static inline uint64_t ram_chunk_index(const uint8_t *start,
541                                        const uint8_t *host)
542 {
543     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
544 }
545 
546 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
547                                        uint64_t i)
548 {
549     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
550                                   (i << RDMA_REG_CHUNK_SHIFT));
551 }
552 
553 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
554                                      uint64_t i)
555 {
556     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
557                                          (1UL << RDMA_REG_CHUNK_SHIFT);
558 
559     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
560         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
561     }
562 
563     return result;
564 }
565 
566 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
567                          void *host_addr,
568                          ram_addr_t block_offset, uint64_t length)
569 {
570     RDMALocalBlocks *local = &rdma->local_ram_blocks;
571     RDMALocalBlock *block;
572     RDMALocalBlock *old = local->block;
573 
574     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
575 
576     if (local->nb_blocks) {
577         int x;
578 
579         if (rdma->blockmap) {
580             for (x = 0; x < local->nb_blocks; x++) {
581                 g_hash_table_remove(rdma->blockmap,
582                                     (void *)(uintptr_t)old[x].offset);
583                 g_hash_table_insert(rdma->blockmap,
584                                     (void *)(uintptr_t)old[x].offset,
585                                     &local->block[x]);
586             }
587         }
588         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
589         g_free(old);
590     }
591 
592     block = &local->block[local->nb_blocks];
593 
594     block->block_name = g_strdup(block_name);
595     block->local_host_addr = host_addr;
596     block->offset = block_offset;
597     block->length = length;
598     block->index = local->nb_blocks;
599     block->src_index = ~0U; /* Filled in by the receipt of the block list */
600     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
601     block->transit_bitmap = bitmap_new(block->nb_chunks);
602     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
603     block->unregister_bitmap = bitmap_new(block->nb_chunks);
604     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
605     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
606 
607     block->is_ram_block = local->init ? false : true;
608 
609     if (rdma->blockmap) {
610         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
611     }
612 
613     trace_rdma_add_block(block_name, local->nb_blocks,
614                          (uintptr_t) block->local_host_addr,
615                          block->offset, block->length,
616                          (uintptr_t) (block->local_host_addr + block->length),
617                          BITS_TO_LONGS(block->nb_chunks) *
618                              sizeof(unsigned long) * 8,
619                          block->nb_chunks);
620 
621     local->nb_blocks++;
622 
623     return 0;
624 }
625 
626 /*
627  * Memory regions need to be registered with the device and queue pairs setup
628  * in advanced before the migration starts. This tells us where the RAM blocks
629  * are so that we can register them individually.
630  */
631 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
632 {
633     const char *block_name = qemu_ram_get_idstr(rb);
634     void *host_addr = qemu_ram_get_host_addr(rb);
635     ram_addr_t block_offset = qemu_ram_get_offset(rb);
636     ram_addr_t length = qemu_ram_get_used_length(rb);
637     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
638 }
639 
640 /*
641  * Identify the RAMBlocks and their quantity. They will be references to
642  * identify chunk boundaries inside each RAMBlock and also be referenced
643  * during dynamic page registration.
644  */
645 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
646 {
647     RDMALocalBlocks *local = &rdma->local_ram_blocks;
648     int ret;
649 
650     assert(rdma->blockmap == NULL);
651     memset(local, 0, sizeof *local);
652     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
653     if (ret) {
654         return ret;
655     }
656     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
657     rdma->dest_blocks = g_new0(RDMADestBlock,
658                                rdma->local_ram_blocks.nb_blocks);
659     local->init = true;
660     return 0;
661 }
662 
663 /*
664  * Note: If used outside of cleanup, the caller must ensure that the destination
665  * block structures are also updated
666  */
667 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
668 {
669     RDMALocalBlocks *local = &rdma->local_ram_blocks;
670     RDMALocalBlock *old = local->block;
671     int x;
672 
673     if (rdma->blockmap) {
674         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
675     }
676     if (block->pmr) {
677         int j;
678 
679         for (j = 0; j < block->nb_chunks; j++) {
680             if (!block->pmr[j]) {
681                 continue;
682             }
683             ibv_dereg_mr(block->pmr[j]);
684             rdma->total_registrations--;
685         }
686         g_free(block->pmr);
687         block->pmr = NULL;
688     }
689 
690     if (block->mr) {
691         ibv_dereg_mr(block->mr);
692         rdma->total_registrations--;
693         block->mr = NULL;
694     }
695 
696     g_free(block->transit_bitmap);
697     block->transit_bitmap = NULL;
698 
699     g_free(block->unregister_bitmap);
700     block->unregister_bitmap = NULL;
701 
702     g_free(block->remote_keys);
703     block->remote_keys = NULL;
704 
705     g_free(block->block_name);
706     block->block_name = NULL;
707 
708     if (rdma->blockmap) {
709         for (x = 0; x < local->nb_blocks; x++) {
710             g_hash_table_remove(rdma->blockmap,
711                                 (void *)(uintptr_t)old[x].offset);
712         }
713     }
714 
715     if (local->nb_blocks > 1) {
716 
717         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
718 
719         if (block->index) {
720             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
721         }
722 
723         if (block->index < (local->nb_blocks - 1)) {
724             memcpy(local->block + block->index, old + (block->index + 1),
725                 sizeof(RDMALocalBlock) *
726                     (local->nb_blocks - (block->index + 1)));
727             for (x = block->index; x < local->nb_blocks - 1; x++) {
728                 local->block[x].index--;
729             }
730         }
731     } else {
732         assert(block == local->block);
733         local->block = NULL;
734     }
735 
736     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
737                            block->offset, block->length,
738                             (uintptr_t)(block->local_host_addr + block->length),
739                            BITS_TO_LONGS(block->nb_chunks) *
740                                sizeof(unsigned long) * 8, block->nb_chunks);
741 
742     g_free(old);
743 
744     local->nb_blocks--;
745 
746     if (local->nb_blocks && rdma->blockmap) {
747         for (x = 0; x < local->nb_blocks; x++) {
748             g_hash_table_insert(rdma->blockmap,
749                                 (void *)(uintptr_t)local->block[x].offset,
750                                 &local->block[x]);
751         }
752     }
753 
754     return 0;
755 }
756 
757 /*
758  * Put in the log file which RDMA device was opened and the details
759  * associated with that device.
760  */
761 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
762 {
763     struct ibv_port_attr port;
764 
765     if (ibv_query_port(verbs, 1, &port)) {
766         error_report("Failed to query port information");
767         return;
768     }
769 
770     printf("%s RDMA Device opened: kernel name %s "
771            "uverbs device name %s, "
772            "infiniband_verbs class device path %s, "
773            "infiniband class device path %s, "
774            "transport: (%d) %s\n",
775                 who,
776                 verbs->device->name,
777                 verbs->device->dev_name,
778                 verbs->device->dev_path,
779                 verbs->device->ibdev_path,
780                 port.link_layer,
781                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
782                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
783                     ? "Ethernet" : "Unknown"));
784 }
785 
786 /*
787  * Put in the log file the RDMA gid addressing information,
788  * useful for folks who have trouble understanding the
789  * RDMA device hierarchy in the kernel.
790  */
791 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
792 {
793     char sgid[33];
794     char dgid[33];
795     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
796     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
797     trace_qemu_rdma_dump_gid(who, sgid, dgid);
798 }
799 
800 /*
801  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
802  * We will try the next addrinfo struct, and fail if there are
803  * no other valid addresses to bind against.
804  *
805  * If user is listening on '[::]', then we will not have a opened a device
806  * yet and have no way of verifying if the device is RoCE or not.
807  *
808  * In this case, the source VM will throw an error for ALL types of
809  * connections (both IPv4 and IPv6) if the destination machine does not have
810  * a regular infiniband network available for use.
811  *
812  * The only way to guarantee that an error is thrown for broken kernels is
813  * for the management software to choose a *specific* interface at bind time
814  * and validate what time of hardware it is.
815  *
816  * Unfortunately, this puts the user in a fix:
817  *
818  *  If the source VM connects with an IPv4 address without knowing that the
819  *  destination has bound to '[::]' the migration will unconditionally fail
820  *  unless the management software is explicitly listening on the IPv4
821  *  address while using a RoCE-based device.
822  *
823  *  If the source VM connects with an IPv6 address, then we're OK because we can
824  *  throw an error on the source (and similarly on the destination).
825  *
826  *  But in mixed environments, this will be broken for a while until it is fixed
827  *  inside linux.
828  *
829  * We do provide a *tiny* bit of help in this function: We can list all of the
830  * devices in the system and check to see if all the devices are RoCE or
831  * Infiniband.
832  *
833  * If we detect that we have a *pure* RoCE environment, then we can safely
834  * thrown an error even if the management software has specified '[::]' as the
835  * bind address.
836  *
837  * However, if there is are multiple hetergeneous devices, then we cannot make
838  * this assumption and the user just has to be sure they know what they are
839  * doing.
840  *
841  * Patches are being reviewed on linux-rdma.
842  */
843 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
844 {
845     /* This bug only exists in linux, to our knowledge. */
846 #ifdef CONFIG_LINUX
847     struct ibv_port_attr port_attr;
848 
849     /*
850      * Verbs are only NULL if management has bound to '[::]'.
851      *
852      * Let's iterate through all the devices and see if there any pure IB
853      * devices (non-ethernet).
854      *
855      * If not, then we can safely proceed with the migration.
856      * Otherwise, there are no guarantees until the bug is fixed in linux.
857      */
858     if (!verbs) {
859         int num_devices, x;
860         struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
861         bool roce_found = false;
862         bool ib_found = false;
863 
864         for (x = 0; x < num_devices; x++) {
865             verbs = ibv_open_device(dev_list[x]);
866             if (!verbs) {
867                 if (errno == EPERM) {
868                     continue;
869                 } else {
870                     return -EINVAL;
871                 }
872             }
873 
874             if (ibv_query_port(verbs, 1, &port_attr)) {
875                 ibv_close_device(verbs);
876                 ERROR(errp, "Could not query initial IB port");
877                 return -EINVAL;
878             }
879 
880             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
881                 ib_found = true;
882             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
883                 roce_found = true;
884             }
885 
886             ibv_close_device(verbs);
887 
888         }
889 
890         if (roce_found) {
891             if (ib_found) {
892                 fprintf(stderr, "WARN: migrations may fail:"
893                                 " IPv6 over RoCE / iWARP in linux"
894                                 " is broken. But since you appear to have a"
895                                 " mixed RoCE / IB environment, be sure to only"
896                                 " migrate over the IB fabric until the kernel "
897                                 " fixes the bug.\n");
898             } else {
899                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
900                             " and your management software has specified '[::]'"
901                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
902                 return -ENONET;
903             }
904         }
905 
906         return 0;
907     }
908 
909     /*
910      * If we have a verbs context, that means that some other than '[::]' was
911      * used by the management software for binding. In which case we can
912      * actually warn the user about a potentially broken kernel.
913      */
914 
915     /* IB ports start with 1, not 0 */
916     if (ibv_query_port(verbs, 1, &port_attr)) {
917         ERROR(errp, "Could not query initial IB port");
918         return -EINVAL;
919     }
920 
921     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
922         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
923                     "(but patches on linux-rdma in progress)");
924         return -ENONET;
925     }
926 
927 #endif
928 
929     return 0;
930 }
931 
932 /*
933  * Figure out which RDMA device corresponds to the requested IP hostname
934  * Also create the initial connection manager identifiers for opening
935  * the connection.
936  */
937 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
938 {
939     int ret;
940     struct rdma_addrinfo *res;
941     char port_str[16];
942     struct rdma_cm_event *cm_event;
943     char ip[40] = "unknown";
944     struct rdma_addrinfo *e;
945 
946     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
947         ERROR(errp, "RDMA hostname has not been set");
948         return -EINVAL;
949     }
950 
951     /* create CM channel */
952     rdma->channel = rdma_create_event_channel();
953     if (!rdma->channel) {
954         ERROR(errp, "could not create CM channel");
955         return -EINVAL;
956     }
957 
958     /* create CM id */
959     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
960     if (ret) {
961         ERROR(errp, "could not create channel id");
962         goto err_resolve_create_id;
963     }
964 
965     snprintf(port_str, 16, "%d", rdma->port);
966     port_str[15] = '\0';
967 
968     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
969     if (ret < 0) {
970         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
971         goto err_resolve_get_addr;
972     }
973 
974     for (e = res; e != NULL; e = e->ai_next) {
975         inet_ntop(e->ai_family,
976             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
977         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
978 
979         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
980                 RDMA_RESOLVE_TIMEOUT_MS);
981         if (!ret) {
982             if (e->ai_family == AF_INET6) {
983                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
984                 if (ret) {
985                     continue;
986                 }
987             }
988             goto route;
989         }
990     }
991 
992     rdma_freeaddrinfo(res);
993     ERROR(errp, "could not resolve address %s", rdma->host);
994     goto err_resolve_get_addr;
995 
996 route:
997     rdma_freeaddrinfo(res);
998     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
999 
1000     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1001     if (ret) {
1002         ERROR(errp, "could not perform event_addr_resolved");
1003         goto err_resolve_get_addr;
1004     }
1005 
1006     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
1007         ERROR(errp, "result not equal to event_addr_resolved %s",
1008                 rdma_event_str(cm_event->event));
1009         perror("rdma_resolve_addr");
1010         rdma_ack_cm_event(cm_event);
1011         ret = -EINVAL;
1012         goto err_resolve_get_addr;
1013     }
1014     rdma_ack_cm_event(cm_event);
1015 
1016     /* resolve route */
1017     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1018     if (ret) {
1019         ERROR(errp, "could not resolve rdma route");
1020         goto err_resolve_get_addr;
1021     }
1022 
1023     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1024     if (ret) {
1025         ERROR(errp, "could not perform event_route_resolved");
1026         goto err_resolve_get_addr;
1027     }
1028     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1029         ERROR(errp, "result not equal to event_route_resolved: %s",
1030                         rdma_event_str(cm_event->event));
1031         rdma_ack_cm_event(cm_event);
1032         ret = -EINVAL;
1033         goto err_resolve_get_addr;
1034     }
1035     rdma_ack_cm_event(cm_event);
1036     rdma->verbs = rdma->cm_id->verbs;
1037     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1038     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1039     return 0;
1040 
1041 err_resolve_get_addr:
1042     rdma_destroy_id(rdma->cm_id);
1043     rdma->cm_id = NULL;
1044 err_resolve_create_id:
1045     rdma_destroy_event_channel(rdma->channel);
1046     rdma->channel = NULL;
1047     return ret;
1048 }
1049 
1050 /*
1051  * Create protection domain and completion queues
1052  */
1053 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1054 {
1055     /* allocate pd */
1056     rdma->pd = ibv_alloc_pd(rdma->verbs);
1057     if (!rdma->pd) {
1058         error_report("failed to allocate protection domain");
1059         return -1;
1060     }
1061 
1062     /* create completion channel */
1063     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1064     if (!rdma->comp_channel) {
1065         error_report("failed to allocate completion channel");
1066         goto err_alloc_pd_cq;
1067     }
1068 
1069     /*
1070      * Completion queue can be filled by both read and write work requests,
1071      * so must reflect the sum of both possible queue sizes.
1072      */
1073     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1074             NULL, rdma->comp_channel, 0);
1075     if (!rdma->cq) {
1076         error_report("failed to allocate completion queue");
1077         goto err_alloc_pd_cq;
1078     }
1079 
1080     return 0;
1081 
1082 err_alloc_pd_cq:
1083     if (rdma->pd) {
1084         ibv_dealloc_pd(rdma->pd);
1085     }
1086     if (rdma->comp_channel) {
1087         ibv_destroy_comp_channel(rdma->comp_channel);
1088     }
1089     rdma->pd = NULL;
1090     rdma->comp_channel = NULL;
1091     return -1;
1092 
1093 }
1094 
1095 /*
1096  * Create queue pairs.
1097  */
1098 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1099 {
1100     struct ibv_qp_init_attr attr = { 0 };
1101     int ret;
1102 
1103     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1104     attr.cap.max_recv_wr = 3;
1105     attr.cap.max_send_sge = 1;
1106     attr.cap.max_recv_sge = 1;
1107     attr.send_cq = rdma->cq;
1108     attr.recv_cq = rdma->cq;
1109     attr.qp_type = IBV_QPT_RC;
1110 
1111     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1112     if (ret) {
1113         return -1;
1114     }
1115 
1116     rdma->qp = rdma->cm_id->qp;
1117     return 0;
1118 }
1119 
1120 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1121 {
1122     int i;
1123     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1124 
1125     for (i = 0; i < local->nb_blocks; i++) {
1126         local->block[i].mr =
1127             ibv_reg_mr(rdma->pd,
1128                     local->block[i].local_host_addr,
1129                     local->block[i].length,
1130                     IBV_ACCESS_LOCAL_WRITE |
1131                     IBV_ACCESS_REMOTE_WRITE
1132                     );
1133         if (!local->block[i].mr) {
1134             perror("Failed to register local dest ram block!\n");
1135             break;
1136         }
1137         rdma->total_registrations++;
1138     }
1139 
1140     if (i >= local->nb_blocks) {
1141         return 0;
1142     }
1143 
1144     for (i--; i >= 0; i--) {
1145         ibv_dereg_mr(local->block[i].mr);
1146         rdma->total_registrations--;
1147     }
1148 
1149     return -1;
1150 
1151 }
1152 
1153 /*
1154  * Find the ram block that corresponds to the page requested to be
1155  * transmitted by QEMU.
1156  *
1157  * Once the block is found, also identify which 'chunk' within that
1158  * block that the page belongs to.
1159  *
1160  * This search cannot fail or the migration will fail.
1161  */
1162 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1163                                       uintptr_t block_offset,
1164                                       uint64_t offset,
1165                                       uint64_t length,
1166                                       uint64_t *block_index,
1167                                       uint64_t *chunk_index)
1168 {
1169     uint64_t current_addr = block_offset + offset;
1170     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1171                                                 (void *) block_offset);
1172     assert(block);
1173     assert(current_addr >= block->offset);
1174     assert((current_addr + length) <= (block->offset + block->length));
1175 
1176     *block_index = block->index;
1177     *chunk_index = ram_chunk_index(block->local_host_addr,
1178                 block->local_host_addr + (current_addr - block->offset));
1179 
1180     return 0;
1181 }
1182 
1183 /*
1184  * Register a chunk with IB. If the chunk was already registered
1185  * previously, then skip.
1186  *
1187  * Also return the keys associated with the registration needed
1188  * to perform the actual RDMA operation.
1189  */
1190 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1191         RDMALocalBlock *block, uintptr_t host_addr,
1192         uint32_t *lkey, uint32_t *rkey, int chunk,
1193         uint8_t *chunk_start, uint8_t *chunk_end)
1194 {
1195     if (block->mr) {
1196         if (lkey) {
1197             *lkey = block->mr->lkey;
1198         }
1199         if (rkey) {
1200             *rkey = block->mr->rkey;
1201         }
1202         return 0;
1203     }
1204 
1205     /* allocate memory to store chunk MRs */
1206     if (!block->pmr) {
1207         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1208     }
1209 
1210     /*
1211      * If 'rkey', then we're the destination, so grant access to the source.
1212      *
1213      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1214      */
1215     if (!block->pmr[chunk]) {
1216         uint64_t len = chunk_end - chunk_start;
1217 
1218         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1219 
1220         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1221                 chunk_start, len,
1222                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1223                         IBV_ACCESS_REMOTE_WRITE) : 0));
1224 
1225         if (!block->pmr[chunk]) {
1226             perror("Failed to register chunk!");
1227             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1228                             " start %" PRIuPTR " end %" PRIuPTR
1229                             " host %" PRIuPTR
1230                             " local %" PRIuPTR " registrations: %d\n",
1231                             block->index, chunk, (uintptr_t)chunk_start,
1232                             (uintptr_t)chunk_end, host_addr,
1233                             (uintptr_t)block->local_host_addr,
1234                             rdma->total_registrations);
1235             return -1;
1236         }
1237         rdma->total_registrations++;
1238     }
1239 
1240     if (lkey) {
1241         *lkey = block->pmr[chunk]->lkey;
1242     }
1243     if (rkey) {
1244         *rkey = block->pmr[chunk]->rkey;
1245     }
1246     return 0;
1247 }
1248 
1249 /*
1250  * Register (at connection time) the memory used for control
1251  * channel messages.
1252  */
1253 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1254 {
1255     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1256             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1257             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1258     if (rdma->wr_data[idx].control_mr) {
1259         rdma->total_registrations++;
1260         return 0;
1261     }
1262     error_report("qemu_rdma_reg_control failed");
1263     return -1;
1264 }
1265 
1266 const char *print_wrid(int wrid)
1267 {
1268     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1269         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1270     }
1271     return wrid_desc[wrid];
1272 }
1273 
1274 /*
1275  * RDMA requires memory registration (mlock/pinning), but this is not good for
1276  * overcommitment.
1277  *
1278  * In preparation for the future where LRU information or workload-specific
1279  * writable writable working set memory access behavior is available to QEMU
1280  * it would be nice to have in place the ability to UN-register/UN-pin
1281  * particular memory regions from the RDMA hardware when it is determine that
1282  * those regions of memory will likely not be accessed again in the near future.
1283  *
1284  * While we do not yet have such information right now, the following
1285  * compile-time option allows us to perform a non-optimized version of this
1286  * behavior.
1287  *
1288  * By uncommenting this option, you will cause *all* RDMA transfers to be
1289  * unregistered immediately after the transfer completes on both sides of the
1290  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1291  *
1292  * This will have a terrible impact on migration performance, so until future
1293  * workload information or LRU information is available, do not attempt to use
1294  * this feature except for basic testing.
1295  */
1296 /* #define RDMA_UNREGISTRATION_EXAMPLE */
1297 
1298 /*
1299  * Perform a non-optimized memory unregistration after every transfer
1300  * for demonstration purposes, only if pin-all is not requested.
1301  *
1302  * Potential optimizations:
1303  * 1. Start a new thread to run this function continuously
1304         - for bit clearing
1305         - and for receipt of unregister messages
1306  * 2. Use an LRU.
1307  * 3. Use workload hints.
1308  */
1309 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1310 {
1311     while (rdma->unregistrations[rdma->unregister_current]) {
1312         int ret;
1313         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1314         uint64_t chunk =
1315             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1316         uint64_t index =
1317             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1318         RDMALocalBlock *block =
1319             &(rdma->local_ram_blocks.block[index]);
1320         RDMARegister reg = { .current_index = index };
1321         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1322                                  };
1323         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1324                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1325                                    .repeat = 1,
1326                                  };
1327 
1328         trace_qemu_rdma_unregister_waiting_proc(chunk,
1329                                                 rdma->unregister_current);
1330 
1331         rdma->unregistrations[rdma->unregister_current] = 0;
1332         rdma->unregister_current++;
1333 
1334         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1335             rdma->unregister_current = 0;
1336         }
1337 
1338 
1339         /*
1340          * Unregistration is speculative (because migration is single-threaded
1341          * and we cannot break the protocol's inifinband message ordering).
1342          * Thus, if the memory is currently being used for transmission,
1343          * then abort the attempt to unregister and try again
1344          * later the next time a completion is received for this memory.
1345          */
1346         clear_bit(chunk, block->unregister_bitmap);
1347 
1348         if (test_bit(chunk, block->transit_bitmap)) {
1349             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1350             continue;
1351         }
1352 
1353         trace_qemu_rdma_unregister_waiting_send(chunk);
1354 
1355         ret = ibv_dereg_mr(block->pmr[chunk]);
1356         block->pmr[chunk] = NULL;
1357         block->remote_keys[chunk] = 0;
1358 
1359         if (ret != 0) {
1360             perror("unregistration chunk failed");
1361             return -ret;
1362         }
1363         rdma->total_registrations--;
1364 
1365         reg.key.chunk = chunk;
1366         register_to_network(rdma, &reg);
1367         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1368                                 &resp, NULL, NULL);
1369         if (ret < 0) {
1370             return ret;
1371         }
1372 
1373         trace_qemu_rdma_unregister_waiting_complete(chunk);
1374     }
1375 
1376     return 0;
1377 }
1378 
1379 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1380                                          uint64_t chunk)
1381 {
1382     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1383 
1384     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1385     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1386 
1387     return result;
1388 }
1389 
1390 /*
1391  * Set bit for unregistration in the next iteration.
1392  * We cannot transmit right here, but will unpin later.
1393  */
1394 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1395                                         uint64_t chunk, uint64_t wr_id)
1396 {
1397     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1398         error_report("rdma migration: queue is full");
1399     } else {
1400         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1401 
1402         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1403             trace_qemu_rdma_signal_unregister_append(chunk,
1404                                                      rdma->unregister_next);
1405 
1406             rdma->unregistrations[rdma->unregister_next++] =
1407                     qemu_rdma_make_wrid(wr_id, index, chunk);
1408 
1409             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1410                 rdma->unregister_next = 0;
1411             }
1412         } else {
1413             trace_qemu_rdma_signal_unregister_already(chunk);
1414         }
1415     }
1416 }
1417 
1418 /*
1419  * Consult the connection manager to see a work request
1420  * (of any kind) has completed.
1421  * Return the work request ID that completed.
1422  */
1423 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1424                                uint32_t *byte_len)
1425 {
1426     int ret;
1427     struct ibv_wc wc;
1428     uint64_t wr_id;
1429 
1430     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1431 
1432     if (!ret) {
1433         *wr_id_out = RDMA_WRID_NONE;
1434         return 0;
1435     }
1436 
1437     if (ret < 0) {
1438         error_report("ibv_poll_cq return %d", ret);
1439         return ret;
1440     }
1441 
1442     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1443 
1444     if (wc.status != IBV_WC_SUCCESS) {
1445         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1446                         wc.status, ibv_wc_status_str(wc.status));
1447         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1448 
1449         return -1;
1450     }
1451 
1452     if (rdma->control_ready_expected &&
1453         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1454         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1455                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1456         rdma->control_ready_expected = 0;
1457     }
1458 
1459     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1460         uint64_t chunk =
1461             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1462         uint64_t index =
1463             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1464         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1465 
1466         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1467                                    index, chunk, block->local_host_addr,
1468                                    (void *)(uintptr_t)block->remote_host_addr);
1469 
1470         clear_bit(chunk, block->transit_bitmap);
1471 
1472         if (rdma->nb_sent > 0) {
1473             rdma->nb_sent--;
1474         }
1475 
1476         if (!rdma->pin_all) {
1477             /*
1478              * FYI: If one wanted to signal a specific chunk to be unregistered
1479              * using LRU or workload-specific information, this is the function
1480              * you would call to do so. That chunk would then get asynchronously
1481              * unregistered later.
1482              */
1483 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1484             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1485 #endif
1486         }
1487     } else {
1488         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1489     }
1490 
1491     *wr_id_out = wc.wr_id;
1492     if (byte_len) {
1493         *byte_len = wc.byte_len;
1494     }
1495 
1496     return  0;
1497 }
1498 
1499 /* Wait for activity on the completion channel.
1500  * Returns 0 on success, none-0 on error.
1501  */
1502 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
1503 {
1504     struct rdma_cm_event *cm_event;
1505     int ret = -1;
1506 
1507     /*
1508      * Coroutine doesn't start until migration_fd_process_incoming()
1509      * so don't yield unless we know we're running inside of a coroutine.
1510      */
1511     if (rdma->migration_started_on_destination &&
1512         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1513         yield_until_fd_readable(rdma->comp_channel->fd);
1514     } else {
1515         /* This is the source side, we're in a separate thread
1516          * or destination prior to migration_fd_process_incoming()
1517          * after postcopy, the destination also in a separate thread.
1518          * we can't yield; so we have to poll the fd.
1519          * But we need to be able to handle 'cancel' or an error
1520          * without hanging forever.
1521          */
1522         while (!rdma->error_state  && !rdma->received_error) {
1523             GPollFD pfds[2];
1524             pfds[0].fd = rdma->comp_channel->fd;
1525             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1526             pfds[0].revents = 0;
1527 
1528             pfds[1].fd = rdma->channel->fd;
1529             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1530             pfds[1].revents = 0;
1531 
1532             /* 0.1s timeout, should be fine for a 'cancel' */
1533             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1534             case 2:
1535             case 1: /* fd active */
1536                 if (pfds[0].revents) {
1537                     return 0;
1538                 }
1539 
1540                 if (pfds[1].revents) {
1541                     ret = rdma_get_cm_event(rdma->channel, &cm_event);
1542                     if (ret) {
1543                         error_report("failed to get cm event while wait "
1544                                      "completion channel");
1545                         return -EPIPE;
1546                     }
1547 
1548                     error_report("receive cm event while wait comp channel,"
1549                                  "cm event is %d", cm_event->event);
1550                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1551                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1552                         rdma_ack_cm_event(cm_event);
1553                         return -EPIPE;
1554                     }
1555                     rdma_ack_cm_event(cm_event);
1556                 }
1557                 break;
1558 
1559             case 0: /* Timeout, go around again */
1560                 break;
1561 
1562             default: /* Error of some type -
1563                       * I don't trust errno from qemu_poll_ns
1564                      */
1565                 error_report("%s: poll failed", __func__);
1566                 return -EPIPE;
1567             }
1568 
1569             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1570                 /* Bail out and let the cancellation happen */
1571                 return -EPIPE;
1572             }
1573         }
1574     }
1575 
1576     if (rdma->received_error) {
1577         return -EPIPE;
1578     }
1579     return rdma->error_state;
1580 }
1581 
1582 /*
1583  * Block until the next work request has completed.
1584  *
1585  * First poll to see if a work request has already completed,
1586  * otherwise block.
1587  *
1588  * If we encounter completed work requests for IDs other than
1589  * the one we're interested in, then that's generally an error.
1590  *
1591  * The only exception is actual RDMA Write completions. These
1592  * completions only need to be recorded, but do not actually
1593  * need further processing.
1594  */
1595 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1596                                     uint32_t *byte_len)
1597 {
1598     int num_cq_events = 0, ret = 0;
1599     struct ibv_cq *cq;
1600     void *cq_ctx;
1601     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1602 
1603     if (ibv_req_notify_cq(rdma->cq, 0)) {
1604         return -1;
1605     }
1606     /* poll cq first */
1607     while (wr_id != wrid_requested) {
1608         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1609         if (ret < 0) {
1610             return ret;
1611         }
1612 
1613         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1614 
1615         if (wr_id == RDMA_WRID_NONE) {
1616             break;
1617         }
1618         if (wr_id != wrid_requested) {
1619             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1620                        wrid_requested, print_wrid(wr_id), wr_id);
1621         }
1622     }
1623 
1624     if (wr_id == wrid_requested) {
1625         return 0;
1626     }
1627 
1628     while (1) {
1629         ret = qemu_rdma_wait_comp_channel(rdma);
1630         if (ret) {
1631             goto err_block_for_wrid;
1632         }
1633 
1634         ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
1635         if (ret) {
1636             perror("ibv_get_cq_event");
1637             goto err_block_for_wrid;
1638         }
1639 
1640         num_cq_events++;
1641 
1642         ret = -ibv_req_notify_cq(cq, 0);
1643         if (ret) {
1644             goto err_block_for_wrid;
1645         }
1646 
1647         while (wr_id != wrid_requested) {
1648             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1649             if (ret < 0) {
1650                 goto err_block_for_wrid;
1651             }
1652 
1653             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1654 
1655             if (wr_id == RDMA_WRID_NONE) {
1656                 break;
1657             }
1658             if (wr_id != wrid_requested) {
1659                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1660                                    wrid_requested, print_wrid(wr_id), wr_id);
1661             }
1662         }
1663 
1664         if (wr_id == wrid_requested) {
1665             goto success_block_for_wrid;
1666         }
1667     }
1668 
1669 success_block_for_wrid:
1670     if (num_cq_events) {
1671         ibv_ack_cq_events(cq, num_cq_events);
1672     }
1673     return 0;
1674 
1675 err_block_for_wrid:
1676     if (num_cq_events) {
1677         ibv_ack_cq_events(cq, num_cq_events);
1678     }
1679 
1680     rdma->error_state = ret;
1681     return ret;
1682 }
1683 
1684 /*
1685  * Post a SEND message work request for the control channel
1686  * containing some data and block until the post completes.
1687  */
1688 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1689                                        RDMAControlHeader *head)
1690 {
1691     int ret = 0;
1692     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1693     struct ibv_send_wr *bad_wr;
1694     struct ibv_sge sge = {
1695                            .addr = (uintptr_t)(wr->control),
1696                            .length = head->len + sizeof(RDMAControlHeader),
1697                            .lkey = wr->control_mr->lkey,
1698                          };
1699     struct ibv_send_wr send_wr = {
1700                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1701                                    .opcode = IBV_WR_SEND,
1702                                    .send_flags = IBV_SEND_SIGNALED,
1703                                    .sg_list = &sge,
1704                                    .num_sge = 1,
1705                                 };
1706 
1707     trace_qemu_rdma_post_send_control(control_desc(head->type));
1708 
1709     /*
1710      * We don't actually need to do a memcpy() in here if we used
1711      * the "sge" properly, but since we're only sending control messages
1712      * (not RAM in a performance-critical path), then its OK for now.
1713      *
1714      * The copy makes the RDMAControlHeader simpler to manipulate
1715      * for the time being.
1716      */
1717     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1718     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1719     control_to_network((void *) wr->control);
1720 
1721     if (buf) {
1722         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1723     }
1724 
1725 
1726     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1727 
1728     if (ret > 0) {
1729         error_report("Failed to use post IB SEND for control");
1730         return -ret;
1731     }
1732 
1733     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1734     if (ret < 0) {
1735         error_report("rdma migration: send polling control error");
1736     }
1737 
1738     return ret;
1739 }
1740 
1741 /*
1742  * Post a RECV work request in anticipation of some future receipt
1743  * of data on the control channel.
1744  */
1745 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1746 {
1747     struct ibv_recv_wr *bad_wr;
1748     struct ibv_sge sge = {
1749                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1750                             .length = RDMA_CONTROL_MAX_BUFFER,
1751                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1752                          };
1753 
1754     struct ibv_recv_wr recv_wr = {
1755                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1756                                     .sg_list = &sge,
1757                                     .num_sge = 1,
1758                                  };
1759 
1760 
1761     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1762         return -1;
1763     }
1764 
1765     return 0;
1766 }
1767 
1768 /*
1769  * Block and wait for a RECV control channel message to arrive.
1770  */
1771 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1772                 RDMAControlHeader *head, int expecting, int idx)
1773 {
1774     uint32_t byte_len;
1775     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1776                                        &byte_len);
1777 
1778     if (ret < 0) {
1779         error_report("rdma migration: recv polling control error!");
1780         return ret;
1781     }
1782 
1783     network_to_control((void *) rdma->wr_data[idx].control);
1784     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1785 
1786     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1787 
1788     if (expecting == RDMA_CONTROL_NONE) {
1789         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1790                                              head->type);
1791     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1792         error_report("Was expecting a %s (%d) control message"
1793                 ", but got: %s (%d), length: %d",
1794                 control_desc(expecting), expecting,
1795                 control_desc(head->type), head->type, head->len);
1796         if (head->type == RDMA_CONTROL_ERROR) {
1797             rdma->received_error = true;
1798         }
1799         return -EIO;
1800     }
1801     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1802         error_report("too long length: %d", head->len);
1803         return -EINVAL;
1804     }
1805     if (sizeof(*head) + head->len != byte_len) {
1806         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1807         return -EINVAL;
1808     }
1809 
1810     return 0;
1811 }
1812 
1813 /*
1814  * When a RECV work request has completed, the work request's
1815  * buffer is pointed at the header.
1816  *
1817  * This will advance the pointer to the data portion
1818  * of the control message of the work request's buffer that
1819  * was populated after the work request finished.
1820  */
1821 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1822                                   RDMAControlHeader *head)
1823 {
1824     rdma->wr_data[idx].control_len = head->len;
1825     rdma->wr_data[idx].control_curr =
1826         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1827 }
1828 
1829 /*
1830  * This is an 'atomic' high-level operation to deliver a single, unified
1831  * control-channel message.
1832  *
1833  * Additionally, if the user is expecting some kind of reply to this message,
1834  * they can request a 'resp' response message be filled in by posting an
1835  * additional work request on behalf of the user and waiting for an additional
1836  * completion.
1837  *
1838  * The extra (optional) response is used during registration to us from having
1839  * to perform an *additional* exchange of message just to provide a response by
1840  * instead piggy-backing on the acknowledgement.
1841  */
1842 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1843                                    uint8_t *data, RDMAControlHeader *resp,
1844                                    int *resp_idx,
1845                                    int (*callback)(RDMAContext *rdma))
1846 {
1847     int ret = 0;
1848 
1849     /*
1850      * Wait until the dest is ready before attempting to deliver the message
1851      * by waiting for a READY message.
1852      */
1853     if (rdma->control_ready_expected) {
1854         RDMAControlHeader resp;
1855         ret = qemu_rdma_exchange_get_response(rdma,
1856                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1857         if (ret < 0) {
1858             return ret;
1859         }
1860     }
1861 
1862     /*
1863      * If the user is expecting a response, post a WR in anticipation of it.
1864      */
1865     if (resp) {
1866         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1867         if (ret) {
1868             error_report("rdma migration: error posting"
1869                     " extra control recv for anticipated result!");
1870             return ret;
1871         }
1872     }
1873 
1874     /*
1875      * Post a WR to replace the one we just consumed for the READY message.
1876      */
1877     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1878     if (ret) {
1879         error_report("rdma migration: error posting first control recv!");
1880         return ret;
1881     }
1882 
1883     /*
1884      * Deliver the control message that was requested.
1885      */
1886     ret = qemu_rdma_post_send_control(rdma, data, head);
1887 
1888     if (ret < 0) {
1889         error_report("Failed to send control buffer!");
1890         return ret;
1891     }
1892 
1893     /*
1894      * If we're expecting a response, block and wait for it.
1895      */
1896     if (resp) {
1897         if (callback) {
1898             trace_qemu_rdma_exchange_send_issue_callback();
1899             ret = callback(rdma);
1900             if (ret < 0) {
1901                 return ret;
1902             }
1903         }
1904 
1905         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1906         ret = qemu_rdma_exchange_get_response(rdma, resp,
1907                                               resp->type, RDMA_WRID_DATA);
1908 
1909         if (ret < 0) {
1910             return ret;
1911         }
1912 
1913         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1914         if (resp_idx) {
1915             *resp_idx = RDMA_WRID_DATA;
1916         }
1917         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1918     }
1919 
1920     rdma->control_ready_expected = 1;
1921 
1922     return 0;
1923 }
1924 
1925 /*
1926  * This is an 'atomic' high-level operation to receive a single, unified
1927  * control-channel message.
1928  */
1929 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1930                                 int expecting)
1931 {
1932     RDMAControlHeader ready = {
1933                                 .len = 0,
1934                                 .type = RDMA_CONTROL_READY,
1935                                 .repeat = 1,
1936                               };
1937     int ret;
1938 
1939     /*
1940      * Inform the source that we're ready to receive a message.
1941      */
1942     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1943 
1944     if (ret < 0) {
1945         error_report("Failed to send control buffer!");
1946         return ret;
1947     }
1948 
1949     /*
1950      * Block and wait for the message.
1951      */
1952     ret = qemu_rdma_exchange_get_response(rdma, head,
1953                                           expecting, RDMA_WRID_READY);
1954 
1955     if (ret < 0) {
1956         return ret;
1957     }
1958 
1959     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1960 
1961     /*
1962      * Post a new RECV work request to replace the one we just consumed.
1963      */
1964     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1965     if (ret) {
1966         error_report("rdma migration: error posting second control recv!");
1967         return ret;
1968     }
1969 
1970     return 0;
1971 }
1972 
1973 /*
1974  * Write an actual chunk of memory using RDMA.
1975  *
1976  * If we're using dynamic registration on the dest-side, we have to
1977  * send a registration command first.
1978  */
1979 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1980                                int current_index, uint64_t current_addr,
1981                                uint64_t length)
1982 {
1983     struct ibv_sge sge;
1984     struct ibv_send_wr send_wr = { 0 };
1985     struct ibv_send_wr *bad_wr;
1986     int reg_result_idx, ret, count = 0;
1987     uint64_t chunk, chunks;
1988     uint8_t *chunk_start, *chunk_end;
1989     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1990     RDMARegister reg;
1991     RDMARegisterResult *reg_result;
1992     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1993     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1994                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1995                                .repeat = 1,
1996                              };
1997 
1998 retry:
1999     sge.addr = (uintptr_t)(block->local_host_addr +
2000                             (current_addr - block->offset));
2001     sge.length = length;
2002 
2003     chunk = ram_chunk_index(block->local_host_addr,
2004                             (uint8_t *)(uintptr_t)sge.addr);
2005     chunk_start = ram_chunk_start(block, chunk);
2006 
2007     if (block->is_ram_block) {
2008         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2009 
2010         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2011             chunks--;
2012         }
2013     } else {
2014         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2015 
2016         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2017             chunks--;
2018         }
2019     }
2020 
2021     trace_qemu_rdma_write_one_top(chunks + 1,
2022                                   (chunks + 1) *
2023                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2024 
2025     chunk_end = ram_chunk_end(block, chunk + chunks);
2026 
2027     if (!rdma->pin_all) {
2028 #ifdef RDMA_UNREGISTRATION_EXAMPLE
2029         qemu_rdma_unregister_waiting(rdma);
2030 #endif
2031     }
2032 
2033     while (test_bit(chunk, block->transit_bitmap)) {
2034         (void)count;
2035         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2036                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
2037 
2038         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2039 
2040         if (ret < 0) {
2041             error_report("Failed to Wait for previous write to complete "
2042                     "block %d chunk %" PRIu64
2043                     " current %" PRIu64 " len %" PRIu64 " %d",
2044                     current_index, chunk, sge.addr, length, rdma->nb_sent);
2045             return ret;
2046         }
2047     }
2048 
2049     if (!rdma->pin_all || !block->is_ram_block) {
2050         if (!block->remote_keys[chunk]) {
2051             /*
2052              * This chunk has not yet been registered, so first check to see
2053              * if the entire chunk is zero. If so, tell the other size to
2054              * memset() + madvise() the entire chunk without RDMA.
2055              */
2056 
2057             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2058                 RDMACompress comp = {
2059                                         .offset = current_addr,
2060                                         .value = 0,
2061                                         .block_idx = current_index,
2062                                         .length = length,
2063                                     };
2064 
2065                 head.len = sizeof(comp);
2066                 head.type = RDMA_CONTROL_COMPRESS;
2067 
2068                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
2069                                                current_index, current_addr);
2070 
2071                 compress_to_network(rdma, &comp);
2072                 ret = qemu_rdma_exchange_send(rdma, &head,
2073                                 (uint8_t *) &comp, NULL, NULL, NULL);
2074 
2075                 if (ret < 0) {
2076                     return -EIO;
2077                 }
2078 
2079                 acct_update_position(f, sge.length, true);
2080 
2081                 return 1;
2082             }
2083 
2084             /*
2085              * Otherwise, tell other side to register.
2086              */
2087             reg.current_index = current_index;
2088             if (block->is_ram_block) {
2089                 reg.key.current_addr = current_addr;
2090             } else {
2091                 reg.key.chunk = chunk;
2092             }
2093             reg.chunks = chunks;
2094 
2095             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2096                                               current_addr);
2097 
2098             register_to_network(rdma, &reg);
2099             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2100                                     &resp, &reg_result_idx, NULL);
2101             if (ret < 0) {
2102                 return ret;
2103             }
2104 
2105             /* try to overlap this single registration with the one we sent. */
2106             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2107                                                 &sge.lkey, NULL, chunk,
2108                                                 chunk_start, chunk_end)) {
2109                 error_report("cannot get lkey");
2110                 return -EINVAL;
2111             }
2112 
2113             reg_result = (RDMARegisterResult *)
2114                     rdma->wr_data[reg_result_idx].control_curr;
2115 
2116             network_to_result(reg_result);
2117 
2118             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2119                                                  reg_result->rkey, chunk);
2120 
2121             block->remote_keys[chunk] = reg_result->rkey;
2122             block->remote_host_addr = reg_result->host_addr;
2123         } else {
2124             /* already registered before */
2125             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2126                                                 &sge.lkey, NULL, chunk,
2127                                                 chunk_start, chunk_end)) {
2128                 error_report("cannot get lkey!");
2129                 return -EINVAL;
2130             }
2131         }
2132 
2133         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2134     } else {
2135         send_wr.wr.rdma.rkey = block->remote_rkey;
2136 
2137         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2138                                                      &sge.lkey, NULL, chunk,
2139                                                      chunk_start, chunk_end)) {
2140             error_report("cannot get lkey!");
2141             return -EINVAL;
2142         }
2143     }
2144 
2145     /*
2146      * Encode the ram block index and chunk within this wrid.
2147      * We will use this information at the time of completion
2148      * to figure out which bitmap to check against and then which
2149      * chunk in the bitmap to look for.
2150      */
2151     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2152                                         current_index, chunk);
2153 
2154     send_wr.opcode = IBV_WR_RDMA_WRITE;
2155     send_wr.send_flags = IBV_SEND_SIGNALED;
2156     send_wr.sg_list = &sge;
2157     send_wr.num_sge = 1;
2158     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2159                                 (current_addr - block->offset);
2160 
2161     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2162                                    sge.length);
2163 
2164     /*
2165      * ibv_post_send() does not return negative error numbers,
2166      * per the specification they are positive - no idea why.
2167      */
2168     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2169 
2170     if (ret == ENOMEM) {
2171         trace_qemu_rdma_write_one_queue_full();
2172         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2173         if (ret < 0) {
2174             error_report("rdma migration: failed to make "
2175                          "room in full send queue! %d", ret);
2176             return ret;
2177         }
2178 
2179         goto retry;
2180 
2181     } else if (ret > 0) {
2182         perror("rdma migration: post rdma write failed");
2183         return -ret;
2184     }
2185 
2186     set_bit(chunk, block->transit_bitmap);
2187     acct_update_position(f, sge.length, false);
2188     rdma->total_writes++;
2189 
2190     return 0;
2191 }
2192 
2193 /*
2194  * Push out any unwritten RDMA operations.
2195  *
2196  * We support sending out multiple chunks at the same time.
2197  * Not all of them need to get signaled in the completion queue.
2198  */
2199 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2200 {
2201     int ret;
2202 
2203     if (!rdma->current_length) {
2204         return 0;
2205     }
2206 
2207     ret = qemu_rdma_write_one(f, rdma,
2208             rdma->current_index, rdma->current_addr, rdma->current_length);
2209 
2210     if (ret < 0) {
2211         return ret;
2212     }
2213 
2214     if (ret == 0) {
2215         rdma->nb_sent++;
2216         trace_qemu_rdma_write_flush(rdma->nb_sent);
2217     }
2218 
2219     rdma->current_length = 0;
2220     rdma->current_addr = 0;
2221 
2222     return 0;
2223 }
2224 
2225 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2226                     uint64_t offset, uint64_t len)
2227 {
2228     RDMALocalBlock *block;
2229     uint8_t *host_addr;
2230     uint8_t *chunk_end;
2231 
2232     if (rdma->current_index < 0) {
2233         return 0;
2234     }
2235 
2236     if (rdma->current_chunk < 0) {
2237         return 0;
2238     }
2239 
2240     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2241     host_addr = block->local_host_addr + (offset - block->offset);
2242     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2243 
2244     if (rdma->current_length == 0) {
2245         return 0;
2246     }
2247 
2248     /*
2249      * Only merge into chunk sequentially.
2250      */
2251     if (offset != (rdma->current_addr + rdma->current_length)) {
2252         return 0;
2253     }
2254 
2255     if (offset < block->offset) {
2256         return 0;
2257     }
2258 
2259     if ((offset + len) > (block->offset + block->length)) {
2260         return 0;
2261     }
2262 
2263     if ((host_addr + len) > chunk_end) {
2264         return 0;
2265     }
2266 
2267     return 1;
2268 }
2269 
2270 /*
2271  * We're not actually writing here, but doing three things:
2272  *
2273  * 1. Identify the chunk the buffer belongs to.
2274  * 2. If the chunk is full or the buffer doesn't belong to the current
2275  *    chunk, then start a new chunk and flush() the old chunk.
2276  * 3. To keep the hardware busy, we also group chunks into batches
2277  *    and only require that a batch gets acknowledged in the completion
2278  *    queue instead of each individual chunk.
2279  */
2280 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2281                            uint64_t block_offset, uint64_t offset,
2282                            uint64_t len)
2283 {
2284     uint64_t current_addr = block_offset + offset;
2285     uint64_t index = rdma->current_index;
2286     uint64_t chunk = rdma->current_chunk;
2287     int ret;
2288 
2289     /* If we cannot merge it, we flush the current buffer first. */
2290     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2291         ret = qemu_rdma_write_flush(f, rdma);
2292         if (ret) {
2293             return ret;
2294         }
2295         rdma->current_length = 0;
2296         rdma->current_addr = current_addr;
2297 
2298         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2299                                          offset, len, &index, &chunk);
2300         if (ret) {
2301             error_report("ram block search failed");
2302             return ret;
2303         }
2304         rdma->current_index = index;
2305         rdma->current_chunk = chunk;
2306     }
2307 
2308     /* merge it */
2309     rdma->current_length += len;
2310 
2311     /* flush it if buffer is too large */
2312     if (rdma->current_length >= RDMA_MERGE_MAX) {
2313         return qemu_rdma_write_flush(f, rdma);
2314     }
2315 
2316     return 0;
2317 }
2318 
2319 static void qemu_rdma_cleanup(RDMAContext *rdma)
2320 {
2321     int idx;
2322 
2323     if (rdma->cm_id && rdma->connected) {
2324         if ((rdma->error_state ||
2325              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2326             !rdma->received_error) {
2327             RDMAControlHeader head = { .len = 0,
2328                                        .type = RDMA_CONTROL_ERROR,
2329                                        .repeat = 1,
2330                                      };
2331             error_report("Early error. Sending error.");
2332             qemu_rdma_post_send_control(rdma, NULL, &head);
2333         }
2334 
2335         rdma_disconnect(rdma->cm_id);
2336         trace_qemu_rdma_cleanup_disconnect();
2337         rdma->connected = false;
2338     }
2339 
2340     if (rdma->channel) {
2341         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2342     }
2343     g_free(rdma->dest_blocks);
2344     rdma->dest_blocks = NULL;
2345 
2346     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2347         if (rdma->wr_data[idx].control_mr) {
2348             rdma->total_registrations--;
2349             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2350         }
2351         rdma->wr_data[idx].control_mr = NULL;
2352     }
2353 
2354     if (rdma->local_ram_blocks.block) {
2355         while (rdma->local_ram_blocks.nb_blocks) {
2356             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2357         }
2358     }
2359 
2360     if (rdma->qp) {
2361         rdma_destroy_qp(rdma->cm_id);
2362         rdma->qp = NULL;
2363     }
2364     if (rdma->cq) {
2365         ibv_destroy_cq(rdma->cq);
2366         rdma->cq = NULL;
2367     }
2368     if (rdma->comp_channel) {
2369         ibv_destroy_comp_channel(rdma->comp_channel);
2370         rdma->comp_channel = NULL;
2371     }
2372     if (rdma->pd) {
2373         ibv_dealloc_pd(rdma->pd);
2374         rdma->pd = NULL;
2375     }
2376     if (rdma->cm_id) {
2377         rdma_destroy_id(rdma->cm_id);
2378         rdma->cm_id = NULL;
2379     }
2380 
2381     /* the destination side, listen_id and channel is shared */
2382     if (rdma->listen_id) {
2383         if (!rdma->is_return_path) {
2384             rdma_destroy_id(rdma->listen_id);
2385         }
2386         rdma->listen_id = NULL;
2387 
2388         if (rdma->channel) {
2389             if (!rdma->is_return_path) {
2390                 rdma_destroy_event_channel(rdma->channel);
2391             }
2392             rdma->channel = NULL;
2393         }
2394     }
2395 
2396     if (rdma->channel) {
2397         rdma_destroy_event_channel(rdma->channel);
2398         rdma->channel = NULL;
2399     }
2400     g_free(rdma->host);
2401     g_free(rdma->host_port);
2402     rdma->host = NULL;
2403     rdma->host_port = NULL;
2404 }
2405 
2406 
2407 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2408 {
2409     int ret, idx;
2410     Error *local_err = NULL, **temp = &local_err;
2411 
2412     /*
2413      * Will be validated against destination's actual capabilities
2414      * after the connect() completes.
2415      */
2416     rdma->pin_all = pin_all;
2417 
2418     ret = qemu_rdma_resolve_host(rdma, temp);
2419     if (ret) {
2420         goto err_rdma_source_init;
2421     }
2422 
2423     ret = qemu_rdma_alloc_pd_cq(rdma);
2424     if (ret) {
2425         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2426                     " limits may be too low. Please check $ ulimit -a # and "
2427                     "search for 'ulimit -l' in the output");
2428         goto err_rdma_source_init;
2429     }
2430 
2431     ret = qemu_rdma_alloc_qp(rdma);
2432     if (ret) {
2433         ERROR(temp, "rdma migration: error allocating qp!");
2434         goto err_rdma_source_init;
2435     }
2436 
2437     ret = qemu_rdma_init_ram_blocks(rdma);
2438     if (ret) {
2439         ERROR(temp, "rdma migration: error initializing ram blocks!");
2440         goto err_rdma_source_init;
2441     }
2442 
2443     /* Build the hash that maps from offset to RAMBlock */
2444     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2445     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2446         g_hash_table_insert(rdma->blockmap,
2447                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2448                 &rdma->local_ram_blocks.block[idx]);
2449     }
2450 
2451     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2452         ret = qemu_rdma_reg_control(rdma, idx);
2453         if (ret) {
2454             ERROR(temp, "rdma migration: error registering %d control!",
2455                                                             idx);
2456             goto err_rdma_source_init;
2457         }
2458     }
2459 
2460     return 0;
2461 
2462 err_rdma_source_init:
2463     error_propagate(errp, local_err);
2464     qemu_rdma_cleanup(rdma);
2465     return -1;
2466 }
2467 
2468 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2469                                      struct rdma_cm_event **cm_event,
2470                                      long msec, Error **errp)
2471 {
2472     int ret;
2473     struct pollfd poll_fd = {
2474                                 .fd = rdma->channel->fd,
2475                                 .events = POLLIN,
2476                                 .revents = 0
2477                             };
2478 
2479     do {
2480         ret = poll(&poll_fd, 1, msec);
2481     } while (ret < 0 && errno == EINTR);
2482 
2483     if (ret == 0) {
2484         ERROR(errp, "poll cm event timeout");
2485         return -1;
2486     } else if (ret < 0) {
2487         ERROR(errp, "failed to poll cm event, errno=%i", errno);
2488         return -1;
2489     } else if (poll_fd.revents & POLLIN) {
2490         return rdma_get_cm_event(rdma->channel, cm_event);
2491     } else {
2492         ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents);
2493         return -1;
2494     }
2495 }
2496 
2497 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
2498 {
2499     RDMACapabilities cap = {
2500                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2501                                 .flags = 0,
2502                            };
2503     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2504                                           .retry_count = 5,
2505                                           .private_data = &cap,
2506                                           .private_data_len = sizeof(cap),
2507                                         };
2508     struct rdma_cm_event *cm_event;
2509     int ret;
2510 
2511     /*
2512      * Only negotiate the capability with destination if the user
2513      * on the source first requested the capability.
2514      */
2515     if (rdma->pin_all) {
2516         trace_qemu_rdma_connect_pin_all_requested();
2517         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2518     }
2519 
2520     caps_to_network(&cap);
2521 
2522     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2523     if (ret) {
2524         ERROR(errp, "posting second control recv");
2525         goto err_rdma_source_connect;
2526     }
2527 
2528     ret = rdma_connect(rdma->cm_id, &conn_param);
2529     if (ret) {
2530         perror("rdma_connect");
2531         ERROR(errp, "connecting to destination!");
2532         goto err_rdma_source_connect;
2533     }
2534 
2535     if (return_path) {
2536         ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2537     } else {
2538         ret = rdma_get_cm_event(rdma->channel, &cm_event);
2539     }
2540     if (ret) {
2541         perror("rdma_get_cm_event after rdma_connect");
2542         ERROR(errp, "connecting to destination!");
2543         goto err_rdma_source_connect;
2544     }
2545 
2546     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2547         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2548         ERROR(errp, "connecting to destination!");
2549         rdma_ack_cm_event(cm_event);
2550         goto err_rdma_source_connect;
2551     }
2552     rdma->connected = true;
2553 
2554     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2555     network_to_caps(&cap);
2556 
2557     /*
2558      * Verify that the *requested* capabilities are supported by the destination
2559      * and disable them otherwise.
2560      */
2561     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2562         ERROR(errp, "Server cannot support pinning all memory. "
2563                         "Will register memory dynamically.");
2564         rdma->pin_all = false;
2565     }
2566 
2567     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2568 
2569     rdma_ack_cm_event(cm_event);
2570 
2571     rdma->control_ready_expected = 1;
2572     rdma->nb_sent = 0;
2573     return 0;
2574 
2575 err_rdma_source_connect:
2576     qemu_rdma_cleanup(rdma);
2577     return -1;
2578 }
2579 
2580 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2581 {
2582     int ret, idx;
2583     struct rdma_cm_id *listen_id;
2584     char ip[40] = "unknown";
2585     struct rdma_addrinfo *res, *e;
2586     char port_str[16];
2587 
2588     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2589         rdma->wr_data[idx].control_len = 0;
2590         rdma->wr_data[idx].control_curr = NULL;
2591     }
2592 
2593     if (!rdma->host || !rdma->host[0]) {
2594         ERROR(errp, "RDMA host is not set!");
2595         rdma->error_state = -EINVAL;
2596         return -1;
2597     }
2598     /* create CM channel */
2599     rdma->channel = rdma_create_event_channel();
2600     if (!rdma->channel) {
2601         ERROR(errp, "could not create rdma event channel");
2602         rdma->error_state = -EINVAL;
2603         return -1;
2604     }
2605 
2606     /* create CM id */
2607     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2608     if (ret) {
2609         ERROR(errp, "could not create cm_id!");
2610         goto err_dest_init_create_listen_id;
2611     }
2612 
2613     snprintf(port_str, 16, "%d", rdma->port);
2614     port_str[15] = '\0';
2615 
2616     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2617     if (ret < 0) {
2618         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2619         goto err_dest_init_bind_addr;
2620     }
2621 
2622     for (e = res; e != NULL; e = e->ai_next) {
2623         inet_ntop(e->ai_family,
2624             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2625         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2626         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2627         if (ret) {
2628             continue;
2629         }
2630         if (e->ai_family == AF_INET6) {
2631             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2632             if (ret) {
2633                 continue;
2634             }
2635         }
2636         break;
2637     }
2638 
2639     rdma_freeaddrinfo(res);
2640     if (!e) {
2641         ERROR(errp, "Error: could not rdma_bind_addr!");
2642         goto err_dest_init_bind_addr;
2643     }
2644 
2645     rdma->listen_id = listen_id;
2646     qemu_rdma_dump_gid("dest_init", listen_id);
2647     return 0;
2648 
2649 err_dest_init_bind_addr:
2650     rdma_destroy_id(listen_id);
2651 err_dest_init_create_listen_id:
2652     rdma_destroy_event_channel(rdma->channel);
2653     rdma->channel = NULL;
2654     rdma->error_state = ret;
2655     return ret;
2656 
2657 }
2658 
2659 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2660                                             RDMAContext *rdma)
2661 {
2662     int idx;
2663 
2664     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2665         rdma_return_path->wr_data[idx].control_len = 0;
2666         rdma_return_path->wr_data[idx].control_curr = NULL;
2667     }
2668 
2669     /*the CM channel and CM id is shared*/
2670     rdma_return_path->channel = rdma->channel;
2671     rdma_return_path->listen_id = rdma->listen_id;
2672 
2673     rdma->return_path = rdma_return_path;
2674     rdma_return_path->return_path = rdma;
2675     rdma_return_path->is_return_path = true;
2676 }
2677 
2678 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2679 {
2680     RDMAContext *rdma = NULL;
2681     InetSocketAddress *addr;
2682 
2683     if (host_port) {
2684         rdma = g_new0(RDMAContext, 1);
2685         rdma->current_index = -1;
2686         rdma->current_chunk = -1;
2687 
2688         addr = g_new(InetSocketAddress, 1);
2689         if (!inet_parse(addr, host_port, NULL)) {
2690             rdma->port = atoi(addr->port);
2691             rdma->host = g_strdup(addr->host);
2692             rdma->host_port = g_strdup(host_port);
2693         } else {
2694             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2695             g_free(rdma);
2696             rdma = NULL;
2697         }
2698 
2699         qapi_free_InetSocketAddress(addr);
2700     }
2701 
2702     return rdma;
2703 }
2704 
2705 /*
2706  * QEMUFile interface to the control channel.
2707  * SEND messages for control only.
2708  * VM's ram is handled with regular RDMA messages.
2709  */
2710 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2711                                        const struct iovec *iov,
2712                                        size_t niov,
2713                                        int *fds,
2714                                        size_t nfds,
2715                                        Error **errp)
2716 {
2717     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2718     QEMUFile *f = rioc->file;
2719     RDMAContext *rdma;
2720     int ret;
2721     ssize_t done = 0;
2722     size_t i;
2723     size_t len = 0;
2724 
2725     RCU_READ_LOCK_GUARD();
2726     rdma = qatomic_rcu_read(&rioc->rdmaout);
2727 
2728     if (!rdma) {
2729         return -EIO;
2730     }
2731 
2732     CHECK_ERROR_STATE();
2733 
2734     /*
2735      * Push out any writes that
2736      * we're queued up for VM's ram.
2737      */
2738     ret = qemu_rdma_write_flush(f, rdma);
2739     if (ret < 0) {
2740         rdma->error_state = ret;
2741         return ret;
2742     }
2743 
2744     for (i = 0; i < niov; i++) {
2745         size_t remaining = iov[i].iov_len;
2746         uint8_t * data = (void *)iov[i].iov_base;
2747         while (remaining) {
2748             RDMAControlHeader head;
2749 
2750             len = MIN(remaining, RDMA_SEND_INCREMENT);
2751             remaining -= len;
2752 
2753             head.len = len;
2754             head.type = RDMA_CONTROL_QEMU_FILE;
2755 
2756             ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2757 
2758             if (ret < 0) {
2759                 rdma->error_state = ret;
2760                 return ret;
2761             }
2762 
2763             data += len;
2764             done += len;
2765         }
2766     }
2767 
2768     return done;
2769 }
2770 
2771 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2772                              size_t size, int idx)
2773 {
2774     size_t len = 0;
2775 
2776     if (rdma->wr_data[idx].control_len) {
2777         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2778 
2779         len = MIN(size, rdma->wr_data[idx].control_len);
2780         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2781         rdma->wr_data[idx].control_curr += len;
2782         rdma->wr_data[idx].control_len -= len;
2783     }
2784 
2785     return len;
2786 }
2787 
2788 /*
2789  * QEMUFile interface to the control channel.
2790  * RDMA links don't use bytestreams, so we have to
2791  * return bytes to QEMUFile opportunistically.
2792  */
2793 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2794                                       const struct iovec *iov,
2795                                       size_t niov,
2796                                       int **fds,
2797                                       size_t *nfds,
2798                                       Error **errp)
2799 {
2800     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2801     RDMAContext *rdma;
2802     RDMAControlHeader head;
2803     int ret = 0;
2804     ssize_t i;
2805     size_t done = 0;
2806 
2807     RCU_READ_LOCK_GUARD();
2808     rdma = qatomic_rcu_read(&rioc->rdmain);
2809 
2810     if (!rdma) {
2811         return -EIO;
2812     }
2813 
2814     CHECK_ERROR_STATE();
2815 
2816     for (i = 0; i < niov; i++) {
2817         size_t want = iov[i].iov_len;
2818         uint8_t *data = (void *)iov[i].iov_base;
2819 
2820         /*
2821          * First, we hold on to the last SEND message we
2822          * were given and dish out the bytes until we run
2823          * out of bytes.
2824          */
2825         ret = qemu_rdma_fill(rdma, data, want, 0);
2826         done += ret;
2827         want -= ret;
2828         /* Got what we needed, so go to next iovec */
2829         if (want == 0) {
2830             continue;
2831         }
2832 
2833         /* If we got any data so far, then don't wait
2834          * for more, just return what we have */
2835         if (done > 0) {
2836             break;
2837         }
2838 
2839 
2840         /* We've got nothing at all, so lets wait for
2841          * more to arrive
2842          */
2843         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2844 
2845         if (ret < 0) {
2846             rdma->error_state = ret;
2847             return ret;
2848         }
2849 
2850         /*
2851          * SEND was received with new bytes, now try again.
2852          */
2853         ret = qemu_rdma_fill(rdma, data, want, 0);
2854         done += ret;
2855         want -= ret;
2856 
2857         /* Still didn't get enough, so lets just return */
2858         if (want) {
2859             if (done == 0) {
2860                 return QIO_CHANNEL_ERR_BLOCK;
2861             } else {
2862                 break;
2863             }
2864         }
2865     }
2866     return done;
2867 }
2868 
2869 /*
2870  * Block until all the outstanding chunks have been delivered by the hardware.
2871  */
2872 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2873 {
2874     int ret;
2875 
2876     if (qemu_rdma_write_flush(f, rdma) < 0) {
2877         return -EIO;
2878     }
2879 
2880     while (rdma->nb_sent) {
2881         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2882         if (ret < 0) {
2883             error_report("rdma migration: complete polling error!");
2884             return -EIO;
2885         }
2886     }
2887 
2888     qemu_rdma_unregister_waiting(rdma);
2889 
2890     return 0;
2891 }
2892 
2893 
2894 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2895                                          bool blocking,
2896                                          Error **errp)
2897 {
2898     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2899     /* XXX we should make readv/writev actually honour this :-) */
2900     rioc->blocking = blocking;
2901     return 0;
2902 }
2903 
2904 
2905 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2906 struct QIOChannelRDMASource {
2907     GSource parent;
2908     QIOChannelRDMA *rioc;
2909     GIOCondition condition;
2910 };
2911 
2912 static gboolean
2913 qio_channel_rdma_source_prepare(GSource *source,
2914                                 gint *timeout)
2915 {
2916     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2917     RDMAContext *rdma;
2918     GIOCondition cond = 0;
2919     *timeout = -1;
2920 
2921     RCU_READ_LOCK_GUARD();
2922     if (rsource->condition == G_IO_IN) {
2923         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2924     } else {
2925         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2926     }
2927 
2928     if (!rdma) {
2929         error_report("RDMAContext is NULL when prepare Gsource");
2930         return FALSE;
2931     }
2932 
2933     if (rdma->wr_data[0].control_len) {
2934         cond |= G_IO_IN;
2935     }
2936     cond |= G_IO_OUT;
2937 
2938     return cond & rsource->condition;
2939 }
2940 
2941 static gboolean
2942 qio_channel_rdma_source_check(GSource *source)
2943 {
2944     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2945     RDMAContext *rdma;
2946     GIOCondition cond = 0;
2947 
2948     RCU_READ_LOCK_GUARD();
2949     if (rsource->condition == G_IO_IN) {
2950         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2951     } else {
2952         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2953     }
2954 
2955     if (!rdma) {
2956         error_report("RDMAContext is NULL when check Gsource");
2957         return FALSE;
2958     }
2959 
2960     if (rdma->wr_data[0].control_len) {
2961         cond |= G_IO_IN;
2962     }
2963     cond |= G_IO_OUT;
2964 
2965     return cond & rsource->condition;
2966 }
2967 
2968 static gboolean
2969 qio_channel_rdma_source_dispatch(GSource *source,
2970                                  GSourceFunc callback,
2971                                  gpointer user_data)
2972 {
2973     QIOChannelFunc func = (QIOChannelFunc)callback;
2974     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2975     RDMAContext *rdma;
2976     GIOCondition cond = 0;
2977 
2978     RCU_READ_LOCK_GUARD();
2979     if (rsource->condition == G_IO_IN) {
2980         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2981     } else {
2982         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2983     }
2984 
2985     if (!rdma) {
2986         error_report("RDMAContext is NULL when dispatch Gsource");
2987         return FALSE;
2988     }
2989 
2990     if (rdma->wr_data[0].control_len) {
2991         cond |= G_IO_IN;
2992     }
2993     cond |= G_IO_OUT;
2994 
2995     return (*func)(QIO_CHANNEL(rsource->rioc),
2996                    (cond & rsource->condition),
2997                    user_data);
2998 }
2999 
3000 static void
3001 qio_channel_rdma_source_finalize(GSource *source)
3002 {
3003     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3004 
3005     object_unref(OBJECT(ssource->rioc));
3006 }
3007 
3008 GSourceFuncs qio_channel_rdma_source_funcs = {
3009     qio_channel_rdma_source_prepare,
3010     qio_channel_rdma_source_check,
3011     qio_channel_rdma_source_dispatch,
3012     qio_channel_rdma_source_finalize
3013 };
3014 
3015 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3016                                               GIOCondition condition)
3017 {
3018     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3019     QIOChannelRDMASource *ssource;
3020     GSource *source;
3021 
3022     source = g_source_new(&qio_channel_rdma_source_funcs,
3023                           sizeof(QIOChannelRDMASource));
3024     ssource = (QIOChannelRDMASource *)source;
3025 
3026     ssource->rioc = rioc;
3027     object_ref(OBJECT(rioc));
3028 
3029     ssource->condition = condition;
3030 
3031     return source;
3032 }
3033 
3034 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3035                                                   AioContext *ctx,
3036                                                   IOHandler *io_read,
3037                                                   IOHandler *io_write,
3038                                                   void *opaque)
3039 {
3040     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3041     if (io_read) {
3042         aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
3043                            false, io_read, io_write, NULL, opaque);
3044     } else {
3045         aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
3046                            false, io_read, io_write, NULL, opaque);
3047     }
3048 }
3049 
3050 struct rdma_close_rcu {
3051     struct rcu_head rcu;
3052     RDMAContext *rdmain;
3053     RDMAContext *rdmaout;
3054 };
3055 
3056 /* callback from qio_channel_rdma_close via call_rcu */
3057 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3058 {
3059     if (rcu->rdmain) {
3060         qemu_rdma_cleanup(rcu->rdmain);
3061     }
3062 
3063     if (rcu->rdmaout) {
3064         qemu_rdma_cleanup(rcu->rdmaout);
3065     }
3066 
3067     g_free(rcu->rdmain);
3068     g_free(rcu->rdmaout);
3069     g_free(rcu);
3070 }
3071 
3072 static int qio_channel_rdma_close(QIOChannel *ioc,
3073                                   Error **errp)
3074 {
3075     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3076     RDMAContext *rdmain, *rdmaout;
3077     struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3078 
3079     trace_qemu_rdma_close();
3080 
3081     rdmain = rioc->rdmain;
3082     if (rdmain) {
3083         qatomic_rcu_set(&rioc->rdmain, NULL);
3084     }
3085 
3086     rdmaout = rioc->rdmaout;
3087     if (rdmaout) {
3088         qatomic_rcu_set(&rioc->rdmaout, NULL);
3089     }
3090 
3091     rcu->rdmain = rdmain;
3092     rcu->rdmaout = rdmaout;
3093     call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3094 
3095     return 0;
3096 }
3097 
3098 static int
3099 qio_channel_rdma_shutdown(QIOChannel *ioc,
3100                             QIOChannelShutdown how,
3101                             Error **errp)
3102 {
3103     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3104     RDMAContext *rdmain, *rdmaout;
3105 
3106     RCU_READ_LOCK_GUARD();
3107 
3108     rdmain = qatomic_rcu_read(&rioc->rdmain);
3109     rdmaout = qatomic_rcu_read(&rioc->rdmain);
3110 
3111     switch (how) {
3112     case QIO_CHANNEL_SHUTDOWN_READ:
3113         if (rdmain) {
3114             rdmain->error_state = -1;
3115         }
3116         break;
3117     case QIO_CHANNEL_SHUTDOWN_WRITE:
3118         if (rdmaout) {
3119             rdmaout->error_state = -1;
3120         }
3121         break;
3122     case QIO_CHANNEL_SHUTDOWN_BOTH:
3123     default:
3124         if (rdmain) {
3125             rdmain->error_state = -1;
3126         }
3127         if (rdmaout) {
3128             rdmaout->error_state = -1;
3129         }
3130         break;
3131     }
3132 
3133     return 0;
3134 }
3135 
3136 /*
3137  * Parameters:
3138  *    @offset == 0 :
3139  *        This means that 'block_offset' is a full virtual address that does not
3140  *        belong to a RAMBlock of the virtual machine and instead
3141  *        represents a private malloc'd memory area that the caller wishes to
3142  *        transfer.
3143  *
3144  *    @offset != 0 :
3145  *        Offset is an offset to be added to block_offset and used
3146  *        to also lookup the corresponding RAMBlock.
3147  *
3148  *    @size > 0 :
3149  *        Initiate an transfer this size.
3150  *
3151  *    @size == 0 :
3152  *        A 'hint' or 'advice' that means that we wish to speculatively
3153  *        and asynchronously unregister this memory. In this case, there is no
3154  *        guarantee that the unregister will actually happen, for example,
3155  *        if the memory is being actively transmitted. Additionally, the memory
3156  *        may be re-registered at any future time if a write within the same
3157  *        chunk was requested again, even if you attempted to unregister it
3158  *        here.
3159  *
3160  *    @size < 0 : TODO, not yet supported
3161  *        Unregister the memory NOW. This means that the caller does not
3162  *        expect there to be any future RDMA transfers and we just want to clean
3163  *        things up. This is used in case the upper layer owns the memory and
3164  *        cannot wait for qemu_fclose() to occur.
3165  *
3166  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
3167  *                  sent. Usually, this will not be more than a few bytes of
3168  *                  the protocol because most transfers are sent asynchronously.
3169  */
3170 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
3171                                   ram_addr_t block_offset, ram_addr_t offset,
3172                                   size_t size, uint64_t *bytes_sent)
3173 {
3174     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3175     RDMAContext *rdma;
3176     int ret;
3177 
3178     RCU_READ_LOCK_GUARD();
3179     rdma = qatomic_rcu_read(&rioc->rdmaout);
3180 
3181     if (!rdma) {
3182         return -EIO;
3183     }
3184 
3185     CHECK_ERROR_STATE();
3186 
3187     if (migration_in_postcopy()) {
3188         return RAM_SAVE_CONTROL_NOT_SUPP;
3189     }
3190 
3191     qemu_fflush(f);
3192 
3193     if (size > 0) {
3194         /*
3195          * Add this page to the current 'chunk'. If the chunk
3196          * is full, or the page doesn't belong to the current chunk,
3197          * an actual RDMA write will occur and a new chunk will be formed.
3198          */
3199         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
3200         if (ret < 0) {
3201             error_report("rdma migration: write error! %d", ret);
3202             goto err;
3203         }
3204 
3205         /*
3206          * We always return 1 bytes because the RDMA
3207          * protocol is completely asynchronous. We do not yet know
3208          * whether an  identified chunk is zero or not because we're
3209          * waiting for other pages to potentially be merged with
3210          * the current chunk. So, we have to call qemu_update_position()
3211          * later on when the actual write occurs.
3212          */
3213         if (bytes_sent) {
3214             *bytes_sent = 1;
3215         }
3216     } else {
3217         uint64_t index, chunk;
3218 
3219         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
3220         if (size < 0) {
3221             ret = qemu_rdma_drain_cq(f, rdma);
3222             if (ret < 0) {
3223                 fprintf(stderr, "rdma: failed to synchronously drain"
3224                                 " completion queue before unregistration.\n");
3225                 goto err;
3226             }
3227         }
3228         */
3229 
3230         ret = qemu_rdma_search_ram_block(rdma, block_offset,
3231                                          offset, size, &index, &chunk);
3232 
3233         if (ret) {
3234             error_report("ram block search failed");
3235             goto err;
3236         }
3237 
3238         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
3239 
3240         /*
3241          * TODO: Synchronous, guaranteed unregistration (should not occur during
3242          * fast-path). Otherwise, unregisters will process on the next call to
3243          * qemu_rdma_drain_cq()
3244         if (size < 0) {
3245             qemu_rdma_unregister_waiting(rdma);
3246         }
3247         */
3248     }
3249 
3250     /*
3251      * Drain the Completion Queue if possible, but do not block,
3252      * just poll.
3253      *
3254      * If nothing to poll, the end of the iteration will do this
3255      * again to make sure we don't overflow the request queue.
3256      */
3257     while (1) {
3258         uint64_t wr_id, wr_id_in;
3259         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
3260         if (ret < 0) {
3261             error_report("rdma migration: polling error! %d", ret);
3262             goto err;
3263         }
3264 
3265         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3266 
3267         if (wr_id == RDMA_WRID_NONE) {
3268             break;
3269         }
3270     }
3271 
3272     return RAM_SAVE_CONTROL_DELAYED;
3273 err:
3274     rdma->error_state = ret;
3275     return ret;
3276 }
3277 
3278 static void rdma_accept_incoming_migration(void *opaque);
3279 
3280 static void rdma_cm_poll_handler(void *opaque)
3281 {
3282     RDMAContext *rdma = opaque;
3283     int ret;
3284     struct rdma_cm_event *cm_event;
3285     MigrationIncomingState *mis = migration_incoming_get_current();
3286 
3287     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3288     if (ret) {
3289         error_report("get_cm_event failed %d", errno);
3290         return;
3291     }
3292 
3293     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3294         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3295         if (!rdma->error_state &&
3296             migration_incoming_get_current()->state !=
3297               MIGRATION_STATUS_COMPLETED) {
3298             error_report("receive cm event, cm event is %d", cm_event->event);
3299             rdma->error_state = -EPIPE;
3300             if (rdma->return_path) {
3301                 rdma->return_path->error_state = -EPIPE;
3302             }
3303         }
3304         rdma_ack_cm_event(cm_event);
3305 
3306         if (mis->migration_incoming_co) {
3307             qemu_coroutine_enter(mis->migration_incoming_co);
3308         }
3309         return;
3310     }
3311     rdma_ack_cm_event(cm_event);
3312 }
3313 
3314 static int qemu_rdma_accept(RDMAContext *rdma)
3315 {
3316     RDMACapabilities cap;
3317     struct rdma_conn_param conn_param = {
3318                                             .responder_resources = 2,
3319                                             .private_data = &cap,
3320                                             .private_data_len = sizeof(cap),
3321                                          };
3322     RDMAContext *rdma_return_path = NULL;
3323     struct rdma_cm_event *cm_event;
3324     struct ibv_context *verbs;
3325     int ret = -EINVAL;
3326     int idx;
3327 
3328     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3329     if (ret) {
3330         goto err_rdma_dest_wait;
3331     }
3332 
3333     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3334         rdma_ack_cm_event(cm_event);
3335         goto err_rdma_dest_wait;
3336     }
3337 
3338     /*
3339      * initialize the RDMAContext for return path for postcopy after first
3340      * connection request reached.
3341      */
3342     if (migrate_postcopy() && !rdma->is_return_path) {
3343         rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
3344         if (rdma_return_path == NULL) {
3345             rdma_ack_cm_event(cm_event);
3346             goto err_rdma_dest_wait;
3347         }
3348 
3349         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3350     }
3351 
3352     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3353 
3354     network_to_caps(&cap);
3355 
3356     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3357             error_report("Unknown source RDMA version: %d, bailing...",
3358                             cap.version);
3359             rdma_ack_cm_event(cm_event);
3360             goto err_rdma_dest_wait;
3361     }
3362 
3363     /*
3364      * Respond with only the capabilities this version of QEMU knows about.
3365      */
3366     cap.flags &= known_capabilities;
3367 
3368     /*
3369      * Enable the ones that we do know about.
3370      * Add other checks here as new ones are introduced.
3371      */
3372     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3373         rdma->pin_all = true;
3374     }
3375 
3376     rdma->cm_id = cm_event->id;
3377     verbs = cm_event->id->verbs;
3378 
3379     rdma_ack_cm_event(cm_event);
3380 
3381     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3382 
3383     caps_to_network(&cap);
3384 
3385     trace_qemu_rdma_accept_pin_verbsc(verbs);
3386 
3387     if (!rdma->verbs) {
3388         rdma->verbs = verbs;
3389     } else if (rdma->verbs != verbs) {
3390             error_report("ibv context not matching %p, %p!", rdma->verbs,
3391                          verbs);
3392             goto err_rdma_dest_wait;
3393     }
3394 
3395     qemu_rdma_dump_id("dest_init", verbs);
3396 
3397     ret = qemu_rdma_alloc_pd_cq(rdma);
3398     if (ret) {
3399         error_report("rdma migration: error allocating pd and cq!");
3400         goto err_rdma_dest_wait;
3401     }
3402 
3403     ret = qemu_rdma_alloc_qp(rdma);
3404     if (ret) {
3405         error_report("rdma migration: error allocating qp!");
3406         goto err_rdma_dest_wait;
3407     }
3408 
3409     ret = qemu_rdma_init_ram_blocks(rdma);
3410     if (ret) {
3411         error_report("rdma migration: error initializing ram blocks!");
3412         goto err_rdma_dest_wait;
3413     }
3414 
3415     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3416         ret = qemu_rdma_reg_control(rdma, idx);
3417         if (ret) {
3418             error_report("rdma: error registering %d control", idx);
3419             goto err_rdma_dest_wait;
3420         }
3421     }
3422 
3423     /* Accept the second connection request for return path */
3424     if (migrate_postcopy() && !rdma->is_return_path) {
3425         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3426                             NULL,
3427                             (void *)(intptr_t)rdma->return_path);
3428     } else {
3429         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3430                             NULL, rdma);
3431     }
3432 
3433     ret = rdma_accept(rdma->cm_id, &conn_param);
3434     if (ret) {
3435         error_report("rdma_accept returns %d", ret);
3436         goto err_rdma_dest_wait;
3437     }
3438 
3439     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3440     if (ret) {
3441         error_report("rdma_accept get_cm_event failed %d", ret);
3442         goto err_rdma_dest_wait;
3443     }
3444 
3445     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3446         error_report("rdma_accept not event established");
3447         rdma_ack_cm_event(cm_event);
3448         goto err_rdma_dest_wait;
3449     }
3450 
3451     rdma_ack_cm_event(cm_event);
3452     rdma->connected = true;
3453 
3454     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3455     if (ret) {
3456         error_report("rdma migration: error posting second control recv");
3457         goto err_rdma_dest_wait;
3458     }
3459 
3460     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3461 
3462     return 0;
3463 
3464 err_rdma_dest_wait:
3465     rdma->error_state = ret;
3466     qemu_rdma_cleanup(rdma);
3467     g_free(rdma_return_path);
3468     return ret;
3469 }
3470 
3471 static int dest_ram_sort_func(const void *a, const void *b)
3472 {
3473     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3474     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3475 
3476     return (a_index < b_index) ? -1 : (a_index != b_index);
3477 }
3478 
3479 /*
3480  * During each iteration of the migration, we listen for instructions
3481  * by the source VM to perform dynamic page registrations before they
3482  * can perform RDMA operations.
3483  *
3484  * We respond with the 'rkey'.
3485  *
3486  * Keep doing this until the source tells us to stop.
3487  */
3488 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3489 {
3490     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3491                                .type = RDMA_CONTROL_REGISTER_RESULT,
3492                                .repeat = 0,
3493                              };
3494     RDMAControlHeader unreg_resp = { .len = 0,
3495                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3496                                .repeat = 0,
3497                              };
3498     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3499                                  .repeat = 1 };
3500     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3501     RDMAContext *rdma;
3502     RDMALocalBlocks *local;
3503     RDMAControlHeader head;
3504     RDMARegister *reg, *registers;
3505     RDMACompress *comp;
3506     RDMARegisterResult *reg_result;
3507     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3508     RDMALocalBlock *block;
3509     void *host_addr;
3510     int ret = 0;
3511     int idx = 0;
3512     int count = 0;
3513     int i = 0;
3514 
3515     RCU_READ_LOCK_GUARD();
3516     rdma = qatomic_rcu_read(&rioc->rdmain);
3517 
3518     if (!rdma) {
3519         return -EIO;
3520     }
3521 
3522     CHECK_ERROR_STATE();
3523 
3524     local = &rdma->local_ram_blocks;
3525     do {
3526         trace_qemu_rdma_registration_handle_wait();
3527 
3528         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3529 
3530         if (ret < 0) {
3531             break;
3532         }
3533 
3534         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3535             error_report("rdma: Too many requests in this message (%d)."
3536                             "Bailing.", head.repeat);
3537             ret = -EIO;
3538             break;
3539         }
3540 
3541         switch (head.type) {
3542         case RDMA_CONTROL_COMPRESS:
3543             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3544             network_to_compress(comp);
3545 
3546             trace_qemu_rdma_registration_handle_compress(comp->length,
3547                                                          comp->block_idx,
3548                                                          comp->offset);
3549             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3550                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3551                              (unsigned int)comp->block_idx,
3552                              rdma->local_ram_blocks.nb_blocks);
3553                 ret = -EIO;
3554                 goto out;
3555             }
3556             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3557 
3558             host_addr = block->local_host_addr +
3559                             (comp->offset - block->offset);
3560 
3561             ram_handle_compressed(host_addr, comp->value, comp->length);
3562             break;
3563 
3564         case RDMA_CONTROL_REGISTER_FINISHED:
3565             trace_qemu_rdma_registration_handle_finished();
3566             goto out;
3567 
3568         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3569             trace_qemu_rdma_registration_handle_ram_blocks();
3570 
3571             /* Sort our local RAM Block list so it's the same as the source,
3572              * we can do this since we've filled in a src_index in the list
3573              * as we received the RAMBlock list earlier.
3574              */
3575             qsort(rdma->local_ram_blocks.block,
3576                   rdma->local_ram_blocks.nb_blocks,
3577                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3578             for (i = 0; i < local->nb_blocks; i++) {
3579                 local->block[i].index = i;
3580             }
3581 
3582             if (rdma->pin_all) {
3583                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3584                 if (ret) {
3585                     error_report("rdma migration: error dest "
3586                                     "registering ram blocks");
3587                     goto out;
3588                 }
3589             }
3590 
3591             /*
3592              * Dest uses this to prepare to transmit the RAMBlock descriptions
3593              * to the source VM after connection setup.
3594              * Both sides use the "remote" structure to communicate and update
3595              * their "local" descriptions with what was sent.
3596              */
3597             for (i = 0; i < local->nb_blocks; i++) {
3598                 rdma->dest_blocks[i].remote_host_addr =
3599                     (uintptr_t)(local->block[i].local_host_addr);
3600 
3601                 if (rdma->pin_all) {
3602                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3603                 }
3604 
3605                 rdma->dest_blocks[i].offset = local->block[i].offset;
3606                 rdma->dest_blocks[i].length = local->block[i].length;
3607 
3608                 dest_block_to_network(&rdma->dest_blocks[i]);
3609                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3610                     local->block[i].block_name,
3611                     local->block[i].offset,
3612                     local->block[i].length,
3613                     local->block[i].local_host_addr,
3614                     local->block[i].src_index);
3615             }
3616 
3617             blocks.len = rdma->local_ram_blocks.nb_blocks
3618                                                 * sizeof(RDMADestBlock);
3619 
3620 
3621             ret = qemu_rdma_post_send_control(rdma,
3622                                         (uint8_t *) rdma->dest_blocks, &blocks);
3623 
3624             if (ret < 0) {
3625                 error_report("rdma migration: error sending remote info");
3626                 goto out;
3627             }
3628 
3629             break;
3630         case RDMA_CONTROL_REGISTER_REQUEST:
3631             trace_qemu_rdma_registration_handle_register(head.repeat);
3632 
3633             reg_resp.repeat = head.repeat;
3634             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3635 
3636             for (count = 0; count < head.repeat; count++) {
3637                 uint64_t chunk;
3638                 uint8_t *chunk_start, *chunk_end;
3639 
3640                 reg = &registers[count];
3641                 network_to_register(reg);
3642 
3643                 reg_result = &results[count];
3644 
3645                 trace_qemu_rdma_registration_handle_register_loop(count,
3646                          reg->current_index, reg->key.current_addr, reg->chunks);
3647 
3648                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3649                     error_report("rdma: 'register' bad block index %u (vs %d)",
3650                                  (unsigned int)reg->current_index,
3651                                  rdma->local_ram_blocks.nb_blocks);
3652                     ret = -ENOENT;
3653                     goto out;
3654                 }
3655                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3656                 if (block->is_ram_block) {
3657                     if (block->offset > reg->key.current_addr) {
3658                         error_report("rdma: bad register address for block %s"
3659                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3660                             block->block_name, block->offset,
3661                             reg->key.current_addr);
3662                         ret = -ERANGE;
3663                         goto out;
3664                     }
3665                     host_addr = (block->local_host_addr +
3666                                 (reg->key.current_addr - block->offset));
3667                     chunk = ram_chunk_index(block->local_host_addr,
3668                                             (uint8_t *) host_addr);
3669                 } else {
3670                     chunk = reg->key.chunk;
3671                     host_addr = block->local_host_addr +
3672                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3673                     /* Check for particularly bad chunk value */
3674                     if (host_addr < (void *)block->local_host_addr) {
3675                         error_report("rdma: bad chunk for block %s"
3676                             " chunk: %" PRIx64,
3677                             block->block_name, reg->key.chunk);
3678                         ret = -ERANGE;
3679                         goto out;
3680                     }
3681                 }
3682                 chunk_start = ram_chunk_start(block, chunk);
3683                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3684                 /* avoid "-Waddress-of-packed-member" warning */
3685                 uint32_t tmp_rkey = 0;
3686                 if (qemu_rdma_register_and_get_keys(rdma, block,
3687                             (uintptr_t)host_addr, NULL, &tmp_rkey,
3688                             chunk, chunk_start, chunk_end)) {
3689                     error_report("cannot get rkey");
3690                     ret = -EINVAL;
3691                     goto out;
3692                 }
3693                 reg_result->rkey = tmp_rkey;
3694 
3695                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3696 
3697                 trace_qemu_rdma_registration_handle_register_rkey(
3698                                                            reg_result->rkey);
3699 
3700                 result_to_network(reg_result);
3701             }
3702 
3703             ret = qemu_rdma_post_send_control(rdma,
3704                             (uint8_t *) results, &reg_resp);
3705 
3706             if (ret < 0) {
3707                 error_report("Failed to send control buffer");
3708                 goto out;
3709             }
3710             break;
3711         case RDMA_CONTROL_UNREGISTER_REQUEST:
3712             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3713             unreg_resp.repeat = head.repeat;
3714             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3715 
3716             for (count = 0; count < head.repeat; count++) {
3717                 reg = &registers[count];
3718                 network_to_register(reg);
3719 
3720                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3721                            reg->current_index, reg->key.chunk);
3722 
3723                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3724 
3725                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3726                 block->pmr[reg->key.chunk] = NULL;
3727 
3728                 if (ret != 0) {
3729                     perror("rdma unregistration chunk failed");
3730                     ret = -ret;
3731                     goto out;
3732                 }
3733 
3734                 rdma->total_registrations--;
3735 
3736                 trace_qemu_rdma_registration_handle_unregister_success(
3737                                                        reg->key.chunk);
3738             }
3739 
3740             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3741 
3742             if (ret < 0) {
3743                 error_report("Failed to send control buffer");
3744                 goto out;
3745             }
3746             break;
3747         case RDMA_CONTROL_REGISTER_RESULT:
3748             error_report("Invalid RESULT message at dest.");
3749             ret = -EIO;
3750             goto out;
3751         default:
3752             error_report("Unknown control message %s", control_desc(head.type));
3753             ret = -EIO;
3754             goto out;
3755         }
3756     } while (1);
3757 out:
3758     if (ret < 0) {
3759         rdma->error_state = ret;
3760     }
3761     return ret;
3762 }
3763 
3764 /* Destination:
3765  * Called via a ram_control_load_hook during the initial RAM load section which
3766  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3767  * on the source.
3768  * We've already built our local RAMBlock list, but not yet sent the list to
3769  * the source.
3770  */
3771 static int
3772 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3773 {
3774     RDMAContext *rdma;
3775     int curr;
3776     int found = -1;
3777 
3778     RCU_READ_LOCK_GUARD();
3779     rdma = qatomic_rcu_read(&rioc->rdmain);
3780 
3781     if (!rdma) {
3782         return -EIO;
3783     }
3784 
3785     /* Find the matching RAMBlock in our local list */
3786     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3787         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3788             found = curr;
3789             break;
3790         }
3791     }
3792 
3793     if (found == -1) {
3794         error_report("RAMBlock '%s' not found on destination", name);
3795         return -ENOENT;
3796     }
3797 
3798     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3799     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3800     rdma->next_src_index++;
3801 
3802     return 0;
3803 }
3804 
3805 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3806 {
3807     switch (flags) {
3808     case RAM_CONTROL_BLOCK_REG:
3809         return rdma_block_notification_handle(opaque, data);
3810 
3811     case RAM_CONTROL_HOOK:
3812         return qemu_rdma_registration_handle(f, opaque);
3813 
3814     default:
3815         /* Shouldn't be called with any other values */
3816         abort();
3817     }
3818 }
3819 
3820 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3821                                         uint64_t flags, void *data)
3822 {
3823     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3824     RDMAContext *rdma;
3825 
3826     RCU_READ_LOCK_GUARD();
3827     rdma = qatomic_rcu_read(&rioc->rdmaout);
3828     if (!rdma) {
3829         return -EIO;
3830     }
3831 
3832     CHECK_ERROR_STATE();
3833 
3834     if (migration_in_postcopy()) {
3835         return 0;
3836     }
3837 
3838     trace_qemu_rdma_registration_start(flags);
3839     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3840     qemu_fflush(f);
3841 
3842     return 0;
3843 }
3844 
3845 /*
3846  * Inform dest that dynamic registrations are done for now.
3847  * First, flush writes, if any.
3848  */
3849 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3850                                        uint64_t flags, void *data)
3851 {
3852     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3853     RDMAContext *rdma;
3854     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3855     int ret = 0;
3856 
3857     RCU_READ_LOCK_GUARD();
3858     rdma = qatomic_rcu_read(&rioc->rdmaout);
3859     if (!rdma) {
3860         return -EIO;
3861     }
3862 
3863     CHECK_ERROR_STATE();
3864 
3865     if (migration_in_postcopy()) {
3866         return 0;
3867     }
3868 
3869     qemu_fflush(f);
3870     ret = qemu_rdma_drain_cq(f, rdma);
3871 
3872     if (ret < 0) {
3873         goto err;
3874     }
3875 
3876     if (flags == RAM_CONTROL_SETUP) {
3877         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3878         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3879         int reg_result_idx, i, nb_dest_blocks;
3880 
3881         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3882         trace_qemu_rdma_registration_stop_ram();
3883 
3884         /*
3885          * Make sure that we parallelize the pinning on both sides.
3886          * For very large guests, doing this serially takes a really
3887          * long time, so we have to 'interleave' the pinning locally
3888          * with the control messages by performing the pinning on this
3889          * side before we receive the control response from the other
3890          * side that the pinning has completed.
3891          */
3892         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3893                     &reg_result_idx, rdma->pin_all ?
3894                     qemu_rdma_reg_whole_ram_blocks : NULL);
3895         if (ret < 0) {
3896             fprintf(stderr, "receiving remote info!");
3897             return ret;
3898         }
3899 
3900         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3901 
3902         /*
3903          * The protocol uses two different sets of rkeys (mutually exclusive):
3904          * 1. One key to represent the virtual address of the entire ram block.
3905          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3906          * 2. One to represent individual chunks within a ram block.
3907          *    (dynamic chunk registration enabled - pin individual chunks.)
3908          *
3909          * Once the capability is successfully negotiated, the destination transmits
3910          * the keys to use (or sends them later) including the virtual addresses
3911          * and then propagates the remote ram block descriptions to his local copy.
3912          */
3913 
3914         if (local->nb_blocks != nb_dest_blocks) {
3915             fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
3916                     "Your QEMU command line parameters are probably "
3917                     "not identical on both the source and destination.",
3918                     local->nb_blocks, nb_dest_blocks);
3919             rdma->error_state = -EINVAL;
3920             return -EINVAL;
3921         }
3922 
3923         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3924         memcpy(rdma->dest_blocks,
3925             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3926         for (i = 0; i < nb_dest_blocks; i++) {
3927             network_to_dest_block(&rdma->dest_blocks[i]);
3928 
3929             /* We require that the blocks are in the same order */
3930             if (rdma->dest_blocks[i].length != local->block[i].length) {
3931                 fprintf(stderr, "Block %s/%d has a different length %" PRIu64
3932                         "vs %" PRIu64, local->block[i].block_name, i,
3933                         local->block[i].length,
3934                         rdma->dest_blocks[i].length);
3935                 rdma->error_state = -EINVAL;
3936                 return -EINVAL;
3937             }
3938             local->block[i].remote_host_addr =
3939                     rdma->dest_blocks[i].remote_host_addr;
3940             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3941         }
3942     }
3943 
3944     trace_qemu_rdma_registration_stop(flags);
3945 
3946     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3947     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3948 
3949     if (ret < 0) {
3950         goto err;
3951     }
3952 
3953     return 0;
3954 err:
3955     rdma->error_state = ret;
3956     return ret;
3957 }
3958 
3959 static const QEMUFileHooks rdma_read_hooks = {
3960     .hook_ram_load = rdma_load_hook,
3961 };
3962 
3963 static const QEMUFileHooks rdma_write_hooks = {
3964     .before_ram_iterate = qemu_rdma_registration_start,
3965     .after_ram_iterate  = qemu_rdma_registration_stop,
3966     .save_page          = qemu_rdma_save_page,
3967 };
3968 
3969 
3970 static void qio_channel_rdma_finalize(Object *obj)
3971 {
3972     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3973     if (rioc->rdmain) {
3974         qemu_rdma_cleanup(rioc->rdmain);
3975         g_free(rioc->rdmain);
3976         rioc->rdmain = NULL;
3977     }
3978     if (rioc->rdmaout) {
3979         qemu_rdma_cleanup(rioc->rdmaout);
3980         g_free(rioc->rdmaout);
3981         rioc->rdmaout = NULL;
3982     }
3983 }
3984 
3985 static void qio_channel_rdma_class_init(ObjectClass *klass,
3986                                         void *class_data G_GNUC_UNUSED)
3987 {
3988     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3989 
3990     ioc_klass->io_writev = qio_channel_rdma_writev;
3991     ioc_klass->io_readv = qio_channel_rdma_readv;
3992     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3993     ioc_klass->io_close = qio_channel_rdma_close;
3994     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3995     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3996     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3997 }
3998 
3999 static const TypeInfo qio_channel_rdma_info = {
4000     .parent = TYPE_QIO_CHANNEL,
4001     .name = TYPE_QIO_CHANNEL_RDMA,
4002     .instance_size = sizeof(QIOChannelRDMA),
4003     .instance_finalize = qio_channel_rdma_finalize,
4004     .class_init = qio_channel_rdma_class_init,
4005 };
4006 
4007 static void qio_channel_rdma_register_types(void)
4008 {
4009     type_register_static(&qio_channel_rdma_info);
4010 }
4011 
4012 type_init(qio_channel_rdma_register_types);
4013 
4014 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
4015 {
4016     QIOChannelRDMA *rioc;
4017 
4018     if (qemu_file_mode_is_not_valid(mode)) {
4019         return NULL;
4020     }
4021 
4022     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4023 
4024     if (mode[0] == 'w') {
4025         rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
4026         rioc->rdmaout = rdma;
4027         rioc->rdmain = rdma->return_path;
4028         qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
4029     } else {
4030         rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
4031         rioc->rdmain = rdma;
4032         rioc->rdmaout = rdma->return_path;
4033         qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
4034     }
4035 
4036     return rioc->file;
4037 }
4038 
4039 static void rdma_accept_incoming_migration(void *opaque)
4040 {
4041     RDMAContext *rdma = opaque;
4042     int ret;
4043     QEMUFile *f;
4044     Error *local_err = NULL;
4045 
4046     trace_qemu_rdma_accept_incoming_migration();
4047     ret = qemu_rdma_accept(rdma);
4048 
4049     if (ret) {
4050         fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
4051         return;
4052     }
4053 
4054     trace_qemu_rdma_accept_incoming_migration_accepted();
4055 
4056     if (rdma->is_return_path) {
4057         return;
4058     }
4059 
4060     f = qemu_fopen_rdma(rdma, "rb");
4061     if (f == NULL) {
4062         fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n");
4063         qemu_rdma_cleanup(rdma);
4064         return;
4065     }
4066 
4067     rdma->migration_started_on_destination = 1;
4068     migration_fd_process_incoming(f, &local_err);
4069     if (local_err) {
4070         error_reportf_err(local_err, "RDMA ERROR:");
4071     }
4072 }
4073 
4074 void rdma_start_incoming_migration(const char *host_port, Error **errp)
4075 {
4076     int ret;
4077     RDMAContext *rdma, *rdma_return_path = NULL;
4078     Error *local_err = NULL;
4079 
4080     trace_rdma_start_incoming_migration();
4081 
4082     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4083     if (ram_block_discard_is_required()) {
4084         error_setg(errp, "RDMA: cannot disable RAM discard");
4085         return;
4086     }
4087 
4088     rdma = qemu_rdma_data_init(host_port, &local_err);
4089     if (rdma == NULL) {
4090         goto err;
4091     }
4092 
4093     ret = qemu_rdma_dest_init(rdma, &local_err);
4094 
4095     if (ret) {
4096         goto err;
4097     }
4098 
4099     trace_rdma_start_incoming_migration_after_dest_init();
4100 
4101     ret = rdma_listen(rdma->listen_id, 5);
4102 
4103     if (ret) {
4104         ERROR(errp, "listening on socket!");
4105         goto cleanup_rdma;
4106     }
4107 
4108     trace_rdma_start_incoming_migration_after_rdma_listen();
4109 
4110     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4111                         NULL, (void *)(intptr_t)rdma);
4112     return;
4113 
4114 cleanup_rdma:
4115     qemu_rdma_cleanup(rdma);
4116 err:
4117     error_propagate(errp, local_err);
4118     if (rdma) {
4119         g_free(rdma->host);
4120         g_free(rdma->host_port);
4121     }
4122     g_free(rdma);
4123     g_free(rdma_return_path);
4124 }
4125 
4126 void rdma_start_outgoing_migration(void *opaque,
4127                             const char *host_port, Error **errp)
4128 {
4129     MigrationState *s = opaque;
4130     RDMAContext *rdma_return_path = NULL;
4131     RDMAContext *rdma;
4132     int ret = 0;
4133 
4134     /* Avoid ram_block_discard_disable(), cannot change during migration. */
4135     if (ram_block_discard_is_required()) {
4136         error_setg(errp, "RDMA: cannot disable RAM discard");
4137         return;
4138     }
4139 
4140     rdma = qemu_rdma_data_init(host_port, errp);
4141     if (rdma == NULL) {
4142         goto err;
4143     }
4144 
4145     ret = qemu_rdma_source_init(rdma,
4146         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4147 
4148     if (ret) {
4149         goto err;
4150     }
4151 
4152     trace_rdma_start_outgoing_migration_after_rdma_source_init();
4153     ret = qemu_rdma_connect(rdma, errp, false);
4154 
4155     if (ret) {
4156         goto err;
4157     }
4158 
4159     /* RDMA postcopy need a separate queue pair for return path */
4160     if (migrate_postcopy()) {
4161         rdma_return_path = qemu_rdma_data_init(host_port, errp);
4162 
4163         if (rdma_return_path == NULL) {
4164             goto return_path_err;
4165         }
4166 
4167         ret = qemu_rdma_source_init(rdma_return_path,
4168             s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4169 
4170         if (ret) {
4171             goto return_path_err;
4172         }
4173 
4174         ret = qemu_rdma_connect(rdma_return_path, errp, true);
4175 
4176         if (ret) {
4177             goto return_path_err;
4178         }
4179 
4180         rdma->return_path = rdma_return_path;
4181         rdma_return_path->return_path = rdma;
4182         rdma_return_path->is_return_path = true;
4183     }
4184 
4185     trace_rdma_start_outgoing_migration_after_rdma_connect();
4186 
4187     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
4188     migrate_fd_connect(s, NULL);
4189     return;
4190 return_path_err:
4191     qemu_rdma_cleanup(rdma);
4192 err:
4193     g_free(rdma);
4194     g_free(rdma_return_path);
4195 }
4196