xref: /openbmc/qemu/migration/rdma.c (revision 14b6d44d4720681a57b5d2c58cabdfc6364f8263)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  *
6  * Authors:
7  *  Michael R. Hines <mrhines@us.ibm.com>
8  *  Jiuxing Liu <jl@us.ibm.com>
9  *
10  * This work is licensed under the terms of the GNU GPL, version 2 or
11  * later.  See the COPYING file in the top-level directory.
12  *
13  */
14 #include "qemu/osdep.h"
15 #include "qapi/error.h"
16 #include "qemu-common.h"
17 #include "migration/migration.h"
18 #include "migration/qemu-file.h"
19 #include "exec/cpu-common.h"
20 #include "qemu/error-report.h"
21 #include "qemu/main-loop.h"
22 #include "qemu/sockets.h"
23 #include "qemu/bitmap.h"
24 #include "qemu/coroutine.h"
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <arpa/inet.h>
28 #include <rdma/rdma_cma.h>
29 #include "trace.h"
30 
31 /*
32  * Print and error on both the Monitor and the Log file.
33  */
34 #define ERROR(errp, fmt, ...) \
35     do { \
36         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
37         if (errp && (*(errp) == NULL)) { \
38             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
39         } \
40     } while (0)
41 
42 #define RDMA_RESOLVE_TIMEOUT_MS 10000
43 
44 /* Do not merge data if larger than this. */
45 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
46 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
47 
48 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
49 
50 /*
51  * This is only for non-live state being migrated.
52  * Instead of RDMA_WRITE messages, we use RDMA_SEND
53  * messages for that state, which requires a different
54  * delivery design than main memory.
55  */
56 #define RDMA_SEND_INCREMENT 32768
57 
58 /*
59  * Maximum size infiniband SEND message
60  */
61 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
62 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
63 
64 #define RDMA_CONTROL_VERSION_CURRENT 1
65 /*
66  * Capabilities for negotiation.
67  */
68 #define RDMA_CAPABILITY_PIN_ALL 0x01
69 
70 /*
71  * Add the other flags above to this list of known capabilities
72  * as they are introduced.
73  */
74 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
75 
76 #define CHECK_ERROR_STATE() \
77     do { \
78         if (rdma->error_state) { \
79             if (!rdma->error_reported) { \
80                 error_report("RDMA is in an error state waiting migration" \
81                                 " to abort!"); \
82                 rdma->error_reported = 1; \
83             } \
84             return rdma->error_state; \
85         } \
86     } while (0);
87 
88 /*
89  * A work request ID is 64-bits and we split up these bits
90  * into 3 parts:
91  *
92  * bits 0-15 : type of control message, 2^16
93  * bits 16-29: ram block index, 2^14
94  * bits 30-63: ram block chunk number, 2^34
95  *
96  * The last two bit ranges are only used for RDMA writes,
97  * in order to track their completion and potentially
98  * also track unregistration status of the message.
99  */
100 #define RDMA_WRID_TYPE_SHIFT  0UL
101 #define RDMA_WRID_BLOCK_SHIFT 16UL
102 #define RDMA_WRID_CHUNK_SHIFT 30UL
103 
104 #define RDMA_WRID_TYPE_MASK \
105     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
106 
107 #define RDMA_WRID_BLOCK_MASK \
108     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
109 
110 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
111 
112 /*
113  * RDMA migration protocol:
114  * 1. RDMA Writes (data messages, i.e. RAM)
115  * 2. IB Send/Recv (control channel messages)
116  */
117 enum {
118     RDMA_WRID_NONE = 0,
119     RDMA_WRID_RDMA_WRITE = 1,
120     RDMA_WRID_SEND_CONTROL = 2000,
121     RDMA_WRID_RECV_CONTROL = 4000,
122 };
123 
124 static const char *wrid_desc[] = {
125     [RDMA_WRID_NONE] = "NONE",
126     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
127     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
128     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
129 };
130 
131 /*
132  * Work request IDs for IB SEND messages only (not RDMA writes).
133  * This is used by the migration protocol to transmit
134  * control messages (such as device state and registration commands)
135  *
136  * We could use more WRs, but we have enough for now.
137  */
138 enum {
139     RDMA_WRID_READY = 0,
140     RDMA_WRID_DATA,
141     RDMA_WRID_CONTROL,
142     RDMA_WRID_MAX,
143 };
144 
145 /*
146  * SEND/RECV IB Control Messages.
147  */
148 enum {
149     RDMA_CONTROL_NONE = 0,
150     RDMA_CONTROL_ERROR,
151     RDMA_CONTROL_READY,               /* ready to receive */
152     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
153     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
154     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
155     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
156     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
157     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
158     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
159     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
160     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
161 };
162 
163 static const char *control_desc[] = {
164     [RDMA_CONTROL_NONE] = "NONE",
165     [RDMA_CONTROL_ERROR] = "ERROR",
166     [RDMA_CONTROL_READY] = "READY",
167     [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
168     [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
169     [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
170     [RDMA_CONTROL_COMPRESS] = "COMPRESS",
171     [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
172     [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
173     [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
174     [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
175     [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
176 };
177 
178 /*
179  * Memory and MR structures used to represent an IB Send/Recv work request.
180  * This is *not* used for RDMA writes, only IB Send/Recv.
181  */
182 typedef struct {
183     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
184     struct   ibv_mr *control_mr;               /* registration metadata */
185     size_t   control_len;                      /* length of the message */
186     uint8_t *control_curr;                     /* start of unconsumed bytes */
187 } RDMAWorkRequestData;
188 
189 /*
190  * Negotiate RDMA capabilities during connection-setup time.
191  */
192 typedef struct {
193     uint32_t version;
194     uint32_t flags;
195 } RDMACapabilities;
196 
197 static void caps_to_network(RDMACapabilities *cap)
198 {
199     cap->version = htonl(cap->version);
200     cap->flags = htonl(cap->flags);
201 }
202 
203 static void network_to_caps(RDMACapabilities *cap)
204 {
205     cap->version = ntohl(cap->version);
206     cap->flags = ntohl(cap->flags);
207 }
208 
209 /*
210  * Representation of a RAMBlock from an RDMA perspective.
211  * This is not transmitted, only local.
212  * This and subsequent structures cannot be linked lists
213  * because we're using a single IB message to transmit
214  * the information. It's small anyway, so a list is overkill.
215  */
216 typedef struct RDMALocalBlock {
217     char          *block_name;
218     uint8_t       *local_host_addr; /* local virtual address */
219     uint64_t       remote_host_addr; /* remote virtual address */
220     uint64_t       offset;
221     uint64_t       length;
222     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
223     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
224     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
225     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
226     int            index;           /* which block are we */
227     unsigned int   src_index;       /* (Only used on dest) */
228     bool           is_ram_block;
229     int            nb_chunks;
230     unsigned long *transit_bitmap;
231     unsigned long *unregister_bitmap;
232 } RDMALocalBlock;
233 
234 /*
235  * Also represents a RAMblock, but only on the dest.
236  * This gets transmitted by the dest during connection-time
237  * to the source VM and then is used to populate the
238  * corresponding RDMALocalBlock with
239  * the information needed to perform the actual RDMA.
240  */
241 typedef struct QEMU_PACKED RDMADestBlock {
242     uint64_t remote_host_addr;
243     uint64_t offset;
244     uint64_t length;
245     uint32_t remote_rkey;
246     uint32_t padding;
247 } RDMADestBlock;
248 
249 static uint64_t htonll(uint64_t v)
250 {
251     union { uint32_t lv[2]; uint64_t llv; } u;
252     u.lv[0] = htonl(v >> 32);
253     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
254     return u.llv;
255 }
256 
257 static uint64_t ntohll(uint64_t v) {
258     union { uint32_t lv[2]; uint64_t llv; } u;
259     u.llv = v;
260     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
261 }
262 
263 static void dest_block_to_network(RDMADestBlock *db)
264 {
265     db->remote_host_addr = htonll(db->remote_host_addr);
266     db->offset = htonll(db->offset);
267     db->length = htonll(db->length);
268     db->remote_rkey = htonl(db->remote_rkey);
269 }
270 
271 static void network_to_dest_block(RDMADestBlock *db)
272 {
273     db->remote_host_addr = ntohll(db->remote_host_addr);
274     db->offset = ntohll(db->offset);
275     db->length = ntohll(db->length);
276     db->remote_rkey = ntohl(db->remote_rkey);
277 }
278 
279 /*
280  * Virtual address of the above structures used for transmitting
281  * the RAMBlock descriptions at connection-time.
282  * This structure is *not* transmitted.
283  */
284 typedef struct RDMALocalBlocks {
285     int nb_blocks;
286     bool     init;             /* main memory init complete */
287     RDMALocalBlock *block;
288 } RDMALocalBlocks;
289 
290 /*
291  * Main data structure for RDMA state.
292  * While there is only one copy of this structure being allocated right now,
293  * this is the place where one would start if you wanted to consider
294  * having more than one RDMA connection open at the same time.
295  */
296 typedef struct RDMAContext {
297     char *host;
298     int port;
299 
300     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
301 
302     /*
303      * This is used by *_exchange_send() to figure out whether or not
304      * the initial "READY" message has already been received or not.
305      * This is because other functions may potentially poll() and detect
306      * the READY message before send() does, in which case we need to
307      * know if it completed.
308      */
309     int control_ready_expected;
310 
311     /* number of outstanding writes */
312     int nb_sent;
313 
314     /* store info about current buffer so that we can
315        merge it with future sends */
316     uint64_t current_addr;
317     uint64_t current_length;
318     /* index of ram block the current buffer belongs to */
319     int current_index;
320     /* index of the chunk in the current ram block */
321     int current_chunk;
322 
323     bool pin_all;
324 
325     /*
326      * infiniband-specific variables for opening the device
327      * and maintaining connection state and so forth.
328      *
329      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
330      * cm_id->verbs, cm_id->channel, and cm_id->qp.
331      */
332     struct rdma_cm_id *cm_id;               /* connection manager ID */
333     struct rdma_cm_id *listen_id;
334     bool connected;
335 
336     struct ibv_context          *verbs;
337     struct rdma_event_channel   *channel;
338     struct ibv_qp *qp;                      /* queue pair */
339     struct ibv_comp_channel *comp_channel;  /* completion channel */
340     struct ibv_pd *pd;                      /* protection domain */
341     struct ibv_cq *cq;                      /* completion queue */
342 
343     /*
344      * If a previous write failed (perhaps because of a failed
345      * memory registration, then do not attempt any future work
346      * and remember the error state.
347      */
348     int error_state;
349     int error_reported;
350 
351     /*
352      * Description of ram blocks used throughout the code.
353      */
354     RDMALocalBlocks local_ram_blocks;
355     RDMADestBlock  *dest_blocks;
356 
357     /* Index of the next RAMBlock received during block registration */
358     unsigned int    next_src_index;
359 
360     /*
361      * Migration on *destination* started.
362      * Then use coroutine yield function.
363      * Source runs in a thread, so we don't care.
364      */
365     int migration_started_on_destination;
366 
367     int total_registrations;
368     int total_writes;
369 
370     int unregister_current, unregister_next;
371     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
372 
373     GHashTable *blockmap;
374 } RDMAContext;
375 
376 /*
377  * Interface to the rest of the migration call stack.
378  */
379 typedef struct QEMUFileRDMA {
380     RDMAContext *rdma;
381     size_t len;
382     void *file;
383 } QEMUFileRDMA;
384 
385 /*
386  * Main structure for IB Send/Recv control messages.
387  * This gets prepended at the beginning of every Send/Recv.
388  */
389 typedef struct QEMU_PACKED {
390     uint32_t len;     /* Total length of data portion */
391     uint32_t type;    /* which control command to perform */
392     uint32_t repeat;  /* number of commands in data portion of same type */
393     uint32_t padding;
394 } RDMAControlHeader;
395 
396 static void control_to_network(RDMAControlHeader *control)
397 {
398     control->type = htonl(control->type);
399     control->len = htonl(control->len);
400     control->repeat = htonl(control->repeat);
401 }
402 
403 static void network_to_control(RDMAControlHeader *control)
404 {
405     control->type = ntohl(control->type);
406     control->len = ntohl(control->len);
407     control->repeat = ntohl(control->repeat);
408 }
409 
410 /*
411  * Register a single Chunk.
412  * Information sent by the source VM to inform the dest
413  * to register an single chunk of memory before we can perform
414  * the actual RDMA operation.
415  */
416 typedef struct QEMU_PACKED {
417     union QEMU_PACKED {
418         uint64_t current_addr;  /* offset into the ram_addr_t space */
419         uint64_t chunk;         /* chunk to lookup if unregistering */
420     } key;
421     uint32_t current_index; /* which ramblock the chunk belongs to */
422     uint32_t padding;
423     uint64_t chunks;            /* how many sequential chunks to register */
424 } RDMARegister;
425 
426 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
427 {
428     RDMALocalBlock *local_block;
429     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
430 
431     if (local_block->is_ram_block) {
432         /*
433          * current_addr as passed in is an address in the local ram_addr_t
434          * space, we need to translate this for the destination
435          */
436         reg->key.current_addr -= local_block->offset;
437         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
438     }
439     reg->key.current_addr = htonll(reg->key.current_addr);
440     reg->current_index = htonl(reg->current_index);
441     reg->chunks = htonll(reg->chunks);
442 }
443 
444 static void network_to_register(RDMARegister *reg)
445 {
446     reg->key.current_addr = ntohll(reg->key.current_addr);
447     reg->current_index = ntohl(reg->current_index);
448     reg->chunks = ntohll(reg->chunks);
449 }
450 
451 typedef struct QEMU_PACKED {
452     uint32_t value;     /* if zero, we will madvise() */
453     uint32_t block_idx; /* which ram block index */
454     uint64_t offset;    /* Address in remote ram_addr_t space */
455     uint64_t length;    /* length of the chunk */
456 } RDMACompress;
457 
458 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
459 {
460     comp->value = htonl(comp->value);
461     /*
462      * comp->offset as passed in is an address in the local ram_addr_t
463      * space, we need to translate this for the destination
464      */
465     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
466     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
467     comp->block_idx = htonl(comp->block_idx);
468     comp->offset = htonll(comp->offset);
469     comp->length = htonll(comp->length);
470 }
471 
472 static void network_to_compress(RDMACompress *comp)
473 {
474     comp->value = ntohl(comp->value);
475     comp->block_idx = ntohl(comp->block_idx);
476     comp->offset = ntohll(comp->offset);
477     comp->length = ntohll(comp->length);
478 }
479 
480 /*
481  * The result of the dest's memory registration produces an "rkey"
482  * which the source VM must reference in order to perform
483  * the RDMA operation.
484  */
485 typedef struct QEMU_PACKED {
486     uint32_t rkey;
487     uint32_t padding;
488     uint64_t host_addr;
489 } RDMARegisterResult;
490 
491 static void result_to_network(RDMARegisterResult *result)
492 {
493     result->rkey = htonl(result->rkey);
494     result->host_addr = htonll(result->host_addr);
495 };
496 
497 static void network_to_result(RDMARegisterResult *result)
498 {
499     result->rkey = ntohl(result->rkey);
500     result->host_addr = ntohll(result->host_addr);
501 };
502 
503 const char *print_wrid(int wrid);
504 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
505                                    uint8_t *data, RDMAControlHeader *resp,
506                                    int *resp_idx,
507                                    int (*callback)(RDMAContext *rdma));
508 
509 static inline uint64_t ram_chunk_index(const uint8_t *start,
510                                        const uint8_t *host)
511 {
512     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
513 }
514 
515 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
516                                        uint64_t i)
517 {
518     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
519                                   (i << RDMA_REG_CHUNK_SHIFT));
520 }
521 
522 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
523                                      uint64_t i)
524 {
525     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
526                                          (1UL << RDMA_REG_CHUNK_SHIFT);
527 
528     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
529         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
530     }
531 
532     return result;
533 }
534 
535 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
536                          void *host_addr,
537                          ram_addr_t block_offset, uint64_t length)
538 {
539     RDMALocalBlocks *local = &rdma->local_ram_blocks;
540     RDMALocalBlock *block;
541     RDMALocalBlock *old = local->block;
542 
543     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
544 
545     if (local->nb_blocks) {
546         int x;
547 
548         if (rdma->blockmap) {
549             for (x = 0; x < local->nb_blocks; x++) {
550                 g_hash_table_remove(rdma->blockmap,
551                                     (void *)(uintptr_t)old[x].offset);
552                 g_hash_table_insert(rdma->blockmap,
553                                     (void *)(uintptr_t)old[x].offset,
554                                     &local->block[x]);
555             }
556         }
557         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
558         g_free(old);
559     }
560 
561     block = &local->block[local->nb_blocks];
562 
563     block->block_name = g_strdup(block_name);
564     block->local_host_addr = host_addr;
565     block->offset = block_offset;
566     block->length = length;
567     block->index = local->nb_blocks;
568     block->src_index = ~0U; /* Filled in by the receipt of the block list */
569     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
570     block->transit_bitmap = bitmap_new(block->nb_chunks);
571     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
572     block->unregister_bitmap = bitmap_new(block->nb_chunks);
573     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
574     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
575 
576     block->is_ram_block = local->init ? false : true;
577 
578     if (rdma->blockmap) {
579         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
580     }
581 
582     trace_rdma_add_block(block_name, local->nb_blocks,
583                          (uintptr_t) block->local_host_addr,
584                          block->offset, block->length,
585                          (uintptr_t) (block->local_host_addr + block->length),
586                          BITS_TO_LONGS(block->nb_chunks) *
587                              sizeof(unsigned long) * 8,
588                          block->nb_chunks);
589 
590     local->nb_blocks++;
591 
592     return 0;
593 }
594 
595 /*
596  * Memory regions need to be registered with the device and queue pairs setup
597  * in advanced before the migration starts. This tells us where the RAM blocks
598  * are so that we can register them individually.
599  */
600 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
601     ram_addr_t block_offset, ram_addr_t length, void *opaque)
602 {
603     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
604 }
605 
606 /*
607  * Identify the RAMBlocks and their quantity. They will be references to
608  * identify chunk boundaries inside each RAMBlock and also be referenced
609  * during dynamic page registration.
610  */
611 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
612 {
613     RDMALocalBlocks *local = &rdma->local_ram_blocks;
614 
615     assert(rdma->blockmap == NULL);
616     memset(local, 0, sizeof *local);
617     qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
618     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
619     rdma->dest_blocks = g_new0(RDMADestBlock,
620                                rdma->local_ram_blocks.nb_blocks);
621     local->init = true;
622     return 0;
623 }
624 
625 /*
626  * Note: If used outside of cleanup, the caller must ensure that the destination
627  * block structures are also updated
628  */
629 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
630 {
631     RDMALocalBlocks *local = &rdma->local_ram_blocks;
632     RDMALocalBlock *old = local->block;
633     int x;
634 
635     if (rdma->blockmap) {
636         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
637     }
638     if (block->pmr) {
639         int j;
640 
641         for (j = 0; j < block->nb_chunks; j++) {
642             if (!block->pmr[j]) {
643                 continue;
644             }
645             ibv_dereg_mr(block->pmr[j]);
646             rdma->total_registrations--;
647         }
648         g_free(block->pmr);
649         block->pmr = NULL;
650     }
651 
652     if (block->mr) {
653         ibv_dereg_mr(block->mr);
654         rdma->total_registrations--;
655         block->mr = NULL;
656     }
657 
658     g_free(block->transit_bitmap);
659     block->transit_bitmap = NULL;
660 
661     g_free(block->unregister_bitmap);
662     block->unregister_bitmap = NULL;
663 
664     g_free(block->remote_keys);
665     block->remote_keys = NULL;
666 
667     g_free(block->block_name);
668     block->block_name = NULL;
669 
670     if (rdma->blockmap) {
671         for (x = 0; x < local->nb_blocks; x++) {
672             g_hash_table_remove(rdma->blockmap,
673                                 (void *)(uintptr_t)old[x].offset);
674         }
675     }
676 
677     if (local->nb_blocks > 1) {
678 
679         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
680 
681         if (block->index) {
682             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
683         }
684 
685         if (block->index < (local->nb_blocks - 1)) {
686             memcpy(local->block + block->index, old + (block->index + 1),
687                 sizeof(RDMALocalBlock) *
688                     (local->nb_blocks - (block->index + 1)));
689         }
690     } else {
691         assert(block == local->block);
692         local->block = NULL;
693     }
694 
695     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
696                            block->offset, block->length,
697                             (uintptr_t)(block->local_host_addr + block->length),
698                            BITS_TO_LONGS(block->nb_chunks) *
699                                sizeof(unsigned long) * 8, block->nb_chunks);
700 
701     g_free(old);
702 
703     local->nb_blocks--;
704 
705     if (local->nb_blocks && rdma->blockmap) {
706         for (x = 0; x < local->nb_blocks; x++) {
707             g_hash_table_insert(rdma->blockmap,
708                                 (void *)(uintptr_t)local->block[x].offset,
709                                 &local->block[x]);
710         }
711     }
712 
713     return 0;
714 }
715 
716 /*
717  * Put in the log file which RDMA device was opened and the details
718  * associated with that device.
719  */
720 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
721 {
722     struct ibv_port_attr port;
723 
724     if (ibv_query_port(verbs, 1, &port)) {
725         error_report("Failed to query port information");
726         return;
727     }
728 
729     printf("%s RDMA Device opened: kernel name %s "
730            "uverbs device name %s, "
731            "infiniband_verbs class device path %s, "
732            "infiniband class device path %s, "
733            "transport: (%d) %s\n",
734                 who,
735                 verbs->device->name,
736                 verbs->device->dev_name,
737                 verbs->device->dev_path,
738                 verbs->device->ibdev_path,
739                 port.link_layer,
740                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
741                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
742                     ? "Ethernet" : "Unknown"));
743 }
744 
745 /*
746  * Put in the log file the RDMA gid addressing information,
747  * useful for folks who have trouble understanding the
748  * RDMA device hierarchy in the kernel.
749  */
750 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
751 {
752     char sgid[33];
753     char dgid[33];
754     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
755     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
756     trace_qemu_rdma_dump_gid(who, sgid, dgid);
757 }
758 
759 /*
760  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
761  * We will try the next addrinfo struct, and fail if there are
762  * no other valid addresses to bind against.
763  *
764  * If user is listening on '[::]', then we will not have a opened a device
765  * yet and have no way of verifying if the device is RoCE or not.
766  *
767  * In this case, the source VM will throw an error for ALL types of
768  * connections (both IPv4 and IPv6) if the destination machine does not have
769  * a regular infiniband network available for use.
770  *
771  * The only way to guarantee that an error is thrown for broken kernels is
772  * for the management software to choose a *specific* interface at bind time
773  * and validate what time of hardware it is.
774  *
775  * Unfortunately, this puts the user in a fix:
776  *
777  *  If the source VM connects with an IPv4 address without knowing that the
778  *  destination has bound to '[::]' the migration will unconditionally fail
779  *  unless the management software is explicitly listening on the IPv4
780  *  address while using a RoCE-based device.
781  *
782  *  If the source VM connects with an IPv6 address, then we're OK because we can
783  *  throw an error on the source (and similarly on the destination).
784  *
785  *  But in mixed environments, this will be broken for a while until it is fixed
786  *  inside linux.
787  *
788  * We do provide a *tiny* bit of help in this function: We can list all of the
789  * devices in the system and check to see if all the devices are RoCE or
790  * Infiniband.
791  *
792  * If we detect that we have a *pure* RoCE environment, then we can safely
793  * thrown an error even if the management software has specified '[::]' as the
794  * bind address.
795  *
796  * However, if there is are multiple hetergeneous devices, then we cannot make
797  * this assumption and the user just has to be sure they know what they are
798  * doing.
799  *
800  * Patches are being reviewed on linux-rdma.
801  */
802 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
803 {
804     struct ibv_port_attr port_attr;
805 
806     /* This bug only exists in linux, to our knowledge. */
807 #ifdef CONFIG_LINUX
808 
809     /*
810      * Verbs are only NULL if management has bound to '[::]'.
811      *
812      * Let's iterate through all the devices and see if there any pure IB
813      * devices (non-ethernet).
814      *
815      * If not, then we can safely proceed with the migration.
816      * Otherwise, there are no guarantees until the bug is fixed in linux.
817      */
818     if (!verbs) {
819         int num_devices, x;
820         struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
821         bool roce_found = false;
822         bool ib_found = false;
823 
824         for (x = 0; x < num_devices; x++) {
825             verbs = ibv_open_device(dev_list[x]);
826             if (!verbs) {
827                 if (errno == EPERM) {
828                     continue;
829                 } else {
830                     return -EINVAL;
831                 }
832             }
833 
834             if (ibv_query_port(verbs, 1, &port_attr)) {
835                 ibv_close_device(verbs);
836                 ERROR(errp, "Could not query initial IB port");
837                 return -EINVAL;
838             }
839 
840             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
841                 ib_found = true;
842             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
843                 roce_found = true;
844             }
845 
846             ibv_close_device(verbs);
847 
848         }
849 
850         if (roce_found) {
851             if (ib_found) {
852                 fprintf(stderr, "WARN: migrations may fail:"
853                                 " IPv6 over RoCE / iWARP in linux"
854                                 " is broken. But since you appear to have a"
855                                 " mixed RoCE / IB environment, be sure to only"
856                                 " migrate over the IB fabric until the kernel "
857                                 " fixes the bug.\n");
858             } else {
859                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
860                             " and your management software has specified '[::]'"
861                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
862                 return -ENONET;
863             }
864         }
865 
866         return 0;
867     }
868 
869     /*
870      * If we have a verbs context, that means that some other than '[::]' was
871      * used by the management software for binding. In which case we can
872      * actually warn the user about a potentially broken kernel.
873      */
874 
875     /* IB ports start with 1, not 0 */
876     if (ibv_query_port(verbs, 1, &port_attr)) {
877         ERROR(errp, "Could not query initial IB port");
878         return -EINVAL;
879     }
880 
881     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
882         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
883                     "(but patches on linux-rdma in progress)");
884         return -ENONET;
885     }
886 
887 #endif
888 
889     return 0;
890 }
891 
892 /*
893  * Figure out which RDMA device corresponds to the requested IP hostname
894  * Also create the initial connection manager identifiers for opening
895  * the connection.
896  */
897 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
898 {
899     int ret;
900     struct rdma_addrinfo *res;
901     char port_str[16];
902     struct rdma_cm_event *cm_event;
903     char ip[40] = "unknown";
904     struct rdma_addrinfo *e;
905 
906     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
907         ERROR(errp, "RDMA hostname has not been set");
908         return -EINVAL;
909     }
910 
911     /* create CM channel */
912     rdma->channel = rdma_create_event_channel();
913     if (!rdma->channel) {
914         ERROR(errp, "could not create CM channel");
915         return -EINVAL;
916     }
917 
918     /* create CM id */
919     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
920     if (ret) {
921         ERROR(errp, "could not create channel id");
922         goto err_resolve_create_id;
923     }
924 
925     snprintf(port_str, 16, "%d", rdma->port);
926     port_str[15] = '\0';
927 
928     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
929     if (ret < 0) {
930         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
931         goto err_resolve_get_addr;
932     }
933 
934     for (e = res; e != NULL; e = e->ai_next) {
935         inet_ntop(e->ai_family,
936             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
937         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
938 
939         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
940                 RDMA_RESOLVE_TIMEOUT_MS);
941         if (!ret) {
942             if (e->ai_family == AF_INET6) {
943                 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
944                 if (ret) {
945                     continue;
946                 }
947             }
948             goto route;
949         }
950     }
951 
952     ERROR(errp, "could not resolve address %s", rdma->host);
953     goto err_resolve_get_addr;
954 
955 route:
956     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
957 
958     ret = rdma_get_cm_event(rdma->channel, &cm_event);
959     if (ret) {
960         ERROR(errp, "could not perform event_addr_resolved");
961         goto err_resolve_get_addr;
962     }
963 
964     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
965         ERROR(errp, "result not equal to event_addr_resolved %s",
966                 rdma_event_str(cm_event->event));
967         perror("rdma_resolve_addr");
968         rdma_ack_cm_event(cm_event);
969         ret = -EINVAL;
970         goto err_resolve_get_addr;
971     }
972     rdma_ack_cm_event(cm_event);
973 
974     /* resolve route */
975     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
976     if (ret) {
977         ERROR(errp, "could not resolve rdma route");
978         goto err_resolve_get_addr;
979     }
980 
981     ret = rdma_get_cm_event(rdma->channel, &cm_event);
982     if (ret) {
983         ERROR(errp, "could not perform event_route_resolved");
984         goto err_resolve_get_addr;
985     }
986     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
987         ERROR(errp, "result not equal to event_route_resolved: %s",
988                         rdma_event_str(cm_event->event));
989         rdma_ack_cm_event(cm_event);
990         ret = -EINVAL;
991         goto err_resolve_get_addr;
992     }
993     rdma_ack_cm_event(cm_event);
994     rdma->verbs = rdma->cm_id->verbs;
995     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
996     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
997     return 0;
998 
999 err_resolve_get_addr:
1000     rdma_destroy_id(rdma->cm_id);
1001     rdma->cm_id = NULL;
1002 err_resolve_create_id:
1003     rdma_destroy_event_channel(rdma->channel);
1004     rdma->channel = NULL;
1005     return ret;
1006 }
1007 
1008 /*
1009  * Create protection domain and completion queues
1010  */
1011 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1012 {
1013     /* allocate pd */
1014     rdma->pd = ibv_alloc_pd(rdma->verbs);
1015     if (!rdma->pd) {
1016         error_report("failed to allocate protection domain");
1017         return -1;
1018     }
1019 
1020     /* create completion channel */
1021     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1022     if (!rdma->comp_channel) {
1023         error_report("failed to allocate completion channel");
1024         goto err_alloc_pd_cq;
1025     }
1026 
1027     /*
1028      * Completion queue can be filled by both read and write work requests,
1029      * so must reflect the sum of both possible queue sizes.
1030      */
1031     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1032             NULL, rdma->comp_channel, 0);
1033     if (!rdma->cq) {
1034         error_report("failed to allocate completion queue");
1035         goto err_alloc_pd_cq;
1036     }
1037 
1038     return 0;
1039 
1040 err_alloc_pd_cq:
1041     if (rdma->pd) {
1042         ibv_dealloc_pd(rdma->pd);
1043     }
1044     if (rdma->comp_channel) {
1045         ibv_destroy_comp_channel(rdma->comp_channel);
1046     }
1047     rdma->pd = NULL;
1048     rdma->comp_channel = NULL;
1049     return -1;
1050 
1051 }
1052 
1053 /*
1054  * Create queue pairs.
1055  */
1056 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1057 {
1058     struct ibv_qp_init_attr attr = { 0 };
1059     int ret;
1060 
1061     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1062     attr.cap.max_recv_wr = 3;
1063     attr.cap.max_send_sge = 1;
1064     attr.cap.max_recv_sge = 1;
1065     attr.send_cq = rdma->cq;
1066     attr.recv_cq = rdma->cq;
1067     attr.qp_type = IBV_QPT_RC;
1068 
1069     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1070     if (ret) {
1071         return -1;
1072     }
1073 
1074     rdma->qp = rdma->cm_id->qp;
1075     return 0;
1076 }
1077 
1078 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1079 {
1080     int i;
1081     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1082 
1083     for (i = 0; i < local->nb_blocks; i++) {
1084         local->block[i].mr =
1085             ibv_reg_mr(rdma->pd,
1086                     local->block[i].local_host_addr,
1087                     local->block[i].length,
1088                     IBV_ACCESS_LOCAL_WRITE |
1089                     IBV_ACCESS_REMOTE_WRITE
1090                     );
1091         if (!local->block[i].mr) {
1092             perror("Failed to register local dest ram block!\n");
1093             break;
1094         }
1095         rdma->total_registrations++;
1096     }
1097 
1098     if (i >= local->nb_blocks) {
1099         return 0;
1100     }
1101 
1102     for (i--; i >= 0; i--) {
1103         ibv_dereg_mr(local->block[i].mr);
1104         rdma->total_registrations--;
1105     }
1106 
1107     return -1;
1108 
1109 }
1110 
1111 /*
1112  * Find the ram block that corresponds to the page requested to be
1113  * transmitted by QEMU.
1114  *
1115  * Once the block is found, also identify which 'chunk' within that
1116  * block that the page belongs to.
1117  *
1118  * This search cannot fail or the migration will fail.
1119  */
1120 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1121                                       uintptr_t block_offset,
1122                                       uint64_t offset,
1123                                       uint64_t length,
1124                                       uint64_t *block_index,
1125                                       uint64_t *chunk_index)
1126 {
1127     uint64_t current_addr = block_offset + offset;
1128     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1129                                                 (void *) block_offset);
1130     assert(block);
1131     assert(current_addr >= block->offset);
1132     assert((current_addr + length) <= (block->offset + block->length));
1133 
1134     *block_index = block->index;
1135     *chunk_index = ram_chunk_index(block->local_host_addr,
1136                 block->local_host_addr + (current_addr - block->offset));
1137 
1138     return 0;
1139 }
1140 
1141 /*
1142  * Register a chunk with IB. If the chunk was already registered
1143  * previously, then skip.
1144  *
1145  * Also return the keys associated with the registration needed
1146  * to perform the actual RDMA operation.
1147  */
1148 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1149         RDMALocalBlock *block, uintptr_t host_addr,
1150         uint32_t *lkey, uint32_t *rkey, int chunk,
1151         uint8_t *chunk_start, uint8_t *chunk_end)
1152 {
1153     if (block->mr) {
1154         if (lkey) {
1155             *lkey = block->mr->lkey;
1156         }
1157         if (rkey) {
1158             *rkey = block->mr->rkey;
1159         }
1160         return 0;
1161     }
1162 
1163     /* allocate memory to store chunk MRs */
1164     if (!block->pmr) {
1165         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1166     }
1167 
1168     /*
1169      * If 'rkey', then we're the destination, so grant access to the source.
1170      *
1171      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1172      */
1173     if (!block->pmr[chunk]) {
1174         uint64_t len = chunk_end - chunk_start;
1175 
1176         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1177 
1178         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1179                 chunk_start, len,
1180                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1181                         IBV_ACCESS_REMOTE_WRITE) : 0));
1182 
1183         if (!block->pmr[chunk]) {
1184             perror("Failed to register chunk!");
1185             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1186                             " start %" PRIuPTR " end %" PRIuPTR
1187                             " host %" PRIuPTR
1188                             " local %" PRIuPTR " registrations: %d\n",
1189                             block->index, chunk, (uintptr_t)chunk_start,
1190                             (uintptr_t)chunk_end, host_addr,
1191                             (uintptr_t)block->local_host_addr,
1192                             rdma->total_registrations);
1193             return -1;
1194         }
1195         rdma->total_registrations++;
1196     }
1197 
1198     if (lkey) {
1199         *lkey = block->pmr[chunk]->lkey;
1200     }
1201     if (rkey) {
1202         *rkey = block->pmr[chunk]->rkey;
1203     }
1204     return 0;
1205 }
1206 
1207 /*
1208  * Register (at connection time) the memory used for control
1209  * channel messages.
1210  */
1211 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1212 {
1213     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1214             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1215             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1216     if (rdma->wr_data[idx].control_mr) {
1217         rdma->total_registrations++;
1218         return 0;
1219     }
1220     error_report("qemu_rdma_reg_control failed");
1221     return -1;
1222 }
1223 
1224 const char *print_wrid(int wrid)
1225 {
1226     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1227         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1228     }
1229     return wrid_desc[wrid];
1230 }
1231 
1232 /*
1233  * RDMA requires memory registration (mlock/pinning), but this is not good for
1234  * overcommitment.
1235  *
1236  * In preparation for the future where LRU information or workload-specific
1237  * writable writable working set memory access behavior is available to QEMU
1238  * it would be nice to have in place the ability to UN-register/UN-pin
1239  * particular memory regions from the RDMA hardware when it is determine that
1240  * those regions of memory will likely not be accessed again in the near future.
1241  *
1242  * While we do not yet have such information right now, the following
1243  * compile-time option allows us to perform a non-optimized version of this
1244  * behavior.
1245  *
1246  * By uncommenting this option, you will cause *all* RDMA transfers to be
1247  * unregistered immediately after the transfer completes on both sides of the
1248  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1249  *
1250  * This will have a terrible impact on migration performance, so until future
1251  * workload information or LRU information is available, do not attempt to use
1252  * this feature except for basic testing.
1253  */
1254 //#define RDMA_UNREGISTRATION_EXAMPLE
1255 
1256 /*
1257  * Perform a non-optimized memory unregistration after every transfer
1258  * for demonstration purposes, only if pin-all is not requested.
1259  *
1260  * Potential optimizations:
1261  * 1. Start a new thread to run this function continuously
1262         - for bit clearing
1263         - and for receipt of unregister messages
1264  * 2. Use an LRU.
1265  * 3. Use workload hints.
1266  */
1267 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1268 {
1269     while (rdma->unregistrations[rdma->unregister_current]) {
1270         int ret;
1271         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1272         uint64_t chunk =
1273             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1274         uint64_t index =
1275             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1276         RDMALocalBlock *block =
1277             &(rdma->local_ram_blocks.block[index]);
1278         RDMARegister reg = { .current_index = index };
1279         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1280                                  };
1281         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1282                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1283                                    .repeat = 1,
1284                                  };
1285 
1286         trace_qemu_rdma_unregister_waiting_proc(chunk,
1287                                                 rdma->unregister_current);
1288 
1289         rdma->unregistrations[rdma->unregister_current] = 0;
1290         rdma->unregister_current++;
1291 
1292         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1293             rdma->unregister_current = 0;
1294         }
1295 
1296 
1297         /*
1298          * Unregistration is speculative (because migration is single-threaded
1299          * and we cannot break the protocol's inifinband message ordering).
1300          * Thus, if the memory is currently being used for transmission,
1301          * then abort the attempt to unregister and try again
1302          * later the next time a completion is received for this memory.
1303          */
1304         clear_bit(chunk, block->unregister_bitmap);
1305 
1306         if (test_bit(chunk, block->transit_bitmap)) {
1307             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1308             continue;
1309         }
1310 
1311         trace_qemu_rdma_unregister_waiting_send(chunk);
1312 
1313         ret = ibv_dereg_mr(block->pmr[chunk]);
1314         block->pmr[chunk] = NULL;
1315         block->remote_keys[chunk] = 0;
1316 
1317         if (ret != 0) {
1318             perror("unregistration chunk failed");
1319             return -ret;
1320         }
1321         rdma->total_registrations--;
1322 
1323         reg.key.chunk = chunk;
1324         register_to_network(rdma, &reg);
1325         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1326                                 &resp, NULL, NULL);
1327         if (ret < 0) {
1328             return ret;
1329         }
1330 
1331         trace_qemu_rdma_unregister_waiting_complete(chunk);
1332     }
1333 
1334     return 0;
1335 }
1336 
1337 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1338                                          uint64_t chunk)
1339 {
1340     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1341 
1342     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1343     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1344 
1345     return result;
1346 }
1347 
1348 /*
1349  * Set bit for unregistration in the next iteration.
1350  * We cannot transmit right here, but will unpin later.
1351  */
1352 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1353                                         uint64_t chunk, uint64_t wr_id)
1354 {
1355     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1356         error_report("rdma migration: queue is full");
1357     } else {
1358         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1359 
1360         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1361             trace_qemu_rdma_signal_unregister_append(chunk,
1362                                                      rdma->unregister_next);
1363 
1364             rdma->unregistrations[rdma->unregister_next++] =
1365                     qemu_rdma_make_wrid(wr_id, index, chunk);
1366 
1367             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1368                 rdma->unregister_next = 0;
1369             }
1370         } else {
1371             trace_qemu_rdma_signal_unregister_already(chunk);
1372         }
1373     }
1374 }
1375 
1376 /*
1377  * Consult the connection manager to see a work request
1378  * (of any kind) has completed.
1379  * Return the work request ID that completed.
1380  */
1381 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1382                                uint32_t *byte_len)
1383 {
1384     int ret;
1385     struct ibv_wc wc;
1386     uint64_t wr_id;
1387 
1388     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1389 
1390     if (!ret) {
1391         *wr_id_out = RDMA_WRID_NONE;
1392         return 0;
1393     }
1394 
1395     if (ret < 0) {
1396         error_report("ibv_poll_cq return %d", ret);
1397         return ret;
1398     }
1399 
1400     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1401 
1402     if (wc.status != IBV_WC_SUCCESS) {
1403         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1404                         wc.status, ibv_wc_status_str(wc.status));
1405         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1406 
1407         return -1;
1408     }
1409 
1410     if (rdma->control_ready_expected &&
1411         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1412         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1413                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1414         rdma->control_ready_expected = 0;
1415     }
1416 
1417     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1418         uint64_t chunk =
1419             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1420         uint64_t index =
1421             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1422         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1423 
1424         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1425                                    index, chunk, block->local_host_addr,
1426                                    (void *)(uintptr_t)block->remote_host_addr);
1427 
1428         clear_bit(chunk, block->transit_bitmap);
1429 
1430         if (rdma->nb_sent > 0) {
1431             rdma->nb_sent--;
1432         }
1433 
1434         if (!rdma->pin_all) {
1435             /*
1436              * FYI: If one wanted to signal a specific chunk to be unregistered
1437              * using LRU or workload-specific information, this is the function
1438              * you would call to do so. That chunk would then get asynchronously
1439              * unregistered later.
1440              */
1441 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1442             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1443 #endif
1444         }
1445     } else {
1446         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1447     }
1448 
1449     *wr_id_out = wc.wr_id;
1450     if (byte_len) {
1451         *byte_len = wc.byte_len;
1452     }
1453 
1454     return  0;
1455 }
1456 
1457 /*
1458  * Block until the next work request has completed.
1459  *
1460  * First poll to see if a work request has already completed,
1461  * otherwise block.
1462  *
1463  * If we encounter completed work requests for IDs other than
1464  * the one we're interested in, then that's generally an error.
1465  *
1466  * The only exception is actual RDMA Write completions. These
1467  * completions only need to be recorded, but do not actually
1468  * need further processing.
1469  */
1470 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1471                                     uint32_t *byte_len)
1472 {
1473     int num_cq_events = 0, ret = 0;
1474     struct ibv_cq *cq;
1475     void *cq_ctx;
1476     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1477 
1478     if (ibv_req_notify_cq(rdma->cq, 0)) {
1479         return -1;
1480     }
1481     /* poll cq first */
1482     while (wr_id != wrid_requested) {
1483         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1484         if (ret < 0) {
1485             return ret;
1486         }
1487 
1488         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1489 
1490         if (wr_id == RDMA_WRID_NONE) {
1491             break;
1492         }
1493         if (wr_id != wrid_requested) {
1494             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1495                        wrid_requested, print_wrid(wr_id), wr_id);
1496         }
1497     }
1498 
1499     if (wr_id == wrid_requested) {
1500         return 0;
1501     }
1502 
1503     while (1) {
1504         /*
1505          * Coroutine doesn't start until process_incoming_migration()
1506          * so don't yield unless we know we're running inside of a coroutine.
1507          */
1508         if (rdma->migration_started_on_destination) {
1509             yield_until_fd_readable(rdma->comp_channel->fd);
1510         }
1511 
1512         if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1513             perror("ibv_get_cq_event");
1514             goto err_block_for_wrid;
1515         }
1516 
1517         num_cq_events++;
1518 
1519         if (ibv_req_notify_cq(cq, 0)) {
1520             goto err_block_for_wrid;
1521         }
1522 
1523         while (wr_id != wrid_requested) {
1524             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1525             if (ret < 0) {
1526                 goto err_block_for_wrid;
1527             }
1528 
1529             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1530 
1531             if (wr_id == RDMA_WRID_NONE) {
1532                 break;
1533             }
1534             if (wr_id != wrid_requested) {
1535                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1536                                    wrid_requested, print_wrid(wr_id), wr_id);
1537             }
1538         }
1539 
1540         if (wr_id == wrid_requested) {
1541             goto success_block_for_wrid;
1542         }
1543     }
1544 
1545 success_block_for_wrid:
1546     if (num_cq_events) {
1547         ibv_ack_cq_events(cq, num_cq_events);
1548     }
1549     return 0;
1550 
1551 err_block_for_wrid:
1552     if (num_cq_events) {
1553         ibv_ack_cq_events(cq, num_cq_events);
1554     }
1555     return ret;
1556 }
1557 
1558 /*
1559  * Post a SEND message work request for the control channel
1560  * containing some data and block until the post completes.
1561  */
1562 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1563                                        RDMAControlHeader *head)
1564 {
1565     int ret = 0;
1566     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1567     struct ibv_send_wr *bad_wr;
1568     struct ibv_sge sge = {
1569                            .addr = (uintptr_t)(wr->control),
1570                            .length = head->len + sizeof(RDMAControlHeader),
1571                            .lkey = wr->control_mr->lkey,
1572                          };
1573     struct ibv_send_wr send_wr = {
1574                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1575                                    .opcode = IBV_WR_SEND,
1576                                    .send_flags = IBV_SEND_SIGNALED,
1577                                    .sg_list = &sge,
1578                                    .num_sge = 1,
1579                                 };
1580 
1581     trace_qemu_rdma_post_send_control(control_desc[head->type]);
1582 
1583     /*
1584      * We don't actually need to do a memcpy() in here if we used
1585      * the "sge" properly, but since we're only sending control messages
1586      * (not RAM in a performance-critical path), then its OK for now.
1587      *
1588      * The copy makes the RDMAControlHeader simpler to manipulate
1589      * for the time being.
1590      */
1591     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1592     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1593     control_to_network((void *) wr->control);
1594 
1595     if (buf) {
1596         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1597     }
1598 
1599 
1600     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1601 
1602     if (ret > 0) {
1603         error_report("Failed to use post IB SEND for control");
1604         return -ret;
1605     }
1606 
1607     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1608     if (ret < 0) {
1609         error_report("rdma migration: send polling control error");
1610     }
1611 
1612     return ret;
1613 }
1614 
1615 /*
1616  * Post a RECV work request in anticipation of some future receipt
1617  * of data on the control channel.
1618  */
1619 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1620 {
1621     struct ibv_recv_wr *bad_wr;
1622     struct ibv_sge sge = {
1623                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1624                             .length = RDMA_CONTROL_MAX_BUFFER,
1625                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1626                          };
1627 
1628     struct ibv_recv_wr recv_wr = {
1629                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1630                                     .sg_list = &sge,
1631                                     .num_sge = 1,
1632                                  };
1633 
1634 
1635     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1636         return -1;
1637     }
1638 
1639     return 0;
1640 }
1641 
1642 /*
1643  * Block and wait for a RECV control channel message to arrive.
1644  */
1645 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1646                 RDMAControlHeader *head, int expecting, int idx)
1647 {
1648     uint32_t byte_len;
1649     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1650                                        &byte_len);
1651 
1652     if (ret < 0) {
1653         error_report("rdma migration: recv polling control error!");
1654         return ret;
1655     }
1656 
1657     network_to_control((void *) rdma->wr_data[idx].control);
1658     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1659 
1660     trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
1661 
1662     if (expecting == RDMA_CONTROL_NONE) {
1663         trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
1664                                              head->type);
1665     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1666         error_report("Was expecting a %s (%d) control message"
1667                 ", but got: %s (%d), length: %d",
1668                 control_desc[expecting], expecting,
1669                 control_desc[head->type], head->type, head->len);
1670         return -EIO;
1671     }
1672     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1673         error_report("too long length: %d", head->len);
1674         return -EINVAL;
1675     }
1676     if (sizeof(*head) + head->len != byte_len) {
1677         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1678         return -EINVAL;
1679     }
1680 
1681     return 0;
1682 }
1683 
1684 /*
1685  * When a RECV work request has completed, the work request's
1686  * buffer is pointed at the header.
1687  *
1688  * This will advance the pointer to the data portion
1689  * of the control message of the work request's buffer that
1690  * was populated after the work request finished.
1691  */
1692 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1693                                   RDMAControlHeader *head)
1694 {
1695     rdma->wr_data[idx].control_len = head->len;
1696     rdma->wr_data[idx].control_curr =
1697         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1698 }
1699 
1700 /*
1701  * This is an 'atomic' high-level operation to deliver a single, unified
1702  * control-channel message.
1703  *
1704  * Additionally, if the user is expecting some kind of reply to this message,
1705  * they can request a 'resp' response message be filled in by posting an
1706  * additional work request on behalf of the user and waiting for an additional
1707  * completion.
1708  *
1709  * The extra (optional) response is used during registration to us from having
1710  * to perform an *additional* exchange of message just to provide a response by
1711  * instead piggy-backing on the acknowledgement.
1712  */
1713 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1714                                    uint8_t *data, RDMAControlHeader *resp,
1715                                    int *resp_idx,
1716                                    int (*callback)(RDMAContext *rdma))
1717 {
1718     int ret = 0;
1719 
1720     /*
1721      * Wait until the dest is ready before attempting to deliver the message
1722      * by waiting for a READY message.
1723      */
1724     if (rdma->control_ready_expected) {
1725         RDMAControlHeader resp;
1726         ret = qemu_rdma_exchange_get_response(rdma,
1727                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1728         if (ret < 0) {
1729             return ret;
1730         }
1731     }
1732 
1733     /*
1734      * If the user is expecting a response, post a WR in anticipation of it.
1735      */
1736     if (resp) {
1737         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1738         if (ret) {
1739             error_report("rdma migration: error posting"
1740                     " extra control recv for anticipated result!");
1741             return ret;
1742         }
1743     }
1744 
1745     /*
1746      * Post a WR to replace the one we just consumed for the READY message.
1747      */
1748     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1749     if (ret) {
1750         error_report("rdma migration: error posting first control recv!");
1751         return ret;
1752     }
1753 
1754     /*
1755      * Deliver the control message that was requested.
1756      */
1757     ret = qemu_rdma_post_send_control(rdma, data, head);
1758 
1759     if (ret < 0) {
1760         error_report("Failed to send control buffer!");
1761         return ret;
1762     }
1763 
1764     /*
1765      * If we're expecting a response, block and wait for it.
1766      */
1767     if (resp) {
1768         if (callback) {
1769             trace_qemu_rdma_exchange_send_issue_callback();
1770             ret = callback(rdma);
1771             if (ret < 0) {
1772                 return ret;
1773             }
1774         }
1775 
1776         trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
1777         ret = qemu_rdma_exchange_get_response(rdma, resp,
1778                                               resp->type, RDMA_WRID_DATA);
1779 
1780         if (ret < 0) {
1781             return ret;
1782         }
1783 
1784         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1785         if (resp_idx) {
1786             *resp_idx = RDMA_WRID_DATA;
1787         }
1788         trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
1789     }
1790 
1791     rdma->control_ready_expected = 1;
1792 
1793     return 0;
1794 }
1795 
1796 /*
1797  * This is an 'atomic' high-level operation to receive a single, unified
1798  * control-channel message.
1799  */
1800 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1801                                 int expecting)
1802 {
1803     RDMAControlHeader ready = {
1804                                 .len = 0,
1805                                 .type = RDMA_CONTROL_READY,
1806                                 .repeat = 1,
1807                               };
1808     int ret;
1809 
1810     /*
1811      * Inform the source that we're ready to receive a message.
1812      */
1813     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1814 
1815     if (ret < 0) {
1816         error_report("Failed to send control buffer!");
1817         return ret;
1818     }
1819 
1820     /*
1821      * Block and wait for the message.
1822      */
1823     ret = qemu_rdma_exchange_get_response(rdma, head,
1824                                           expecting, RDMA_WRID_READY);
1825 
1826     if (ret < 0) {
1827         return ret;
1828     }
1829 
1830     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1831 
1832     /*
1833      * Post a new RECV work request to replace the one we just consumed.
1834      */
1835     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1836     if (ret) {
1837         error_report("rdma migration: error posting second control recv!");
1838         return ret;
1839     }
1840 
1841     return 0;
1842 }
1843 
1844 /*
1845  * Write an actual chunk of memory using RDMA.
1846  *
1847  * If we're using dynamic registration on the dest-side, we have to
1848  * send a registration command first.
1849  */
1850 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1851                                int current_index, uint64_t current_addr,
1852                                uint64_t length)
1853 {
1854     struct ibv_sge sge;
1855     struct ibv_send_wr send_wr = { 0 };
1856     struct ibv_send_wr *bad_wr;
1857     int reg_result_idx, ret, count = 0;
1858     uint64_t chunk, chunks;
1859     uint8_t *chunk_start, *chunk_end;
1860     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1861     RDMARegister reg;
1862     RDMARegisterResult *reg_result;
1863     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1864     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1865                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1866                                .repeat = 1,
1867                              };
1868 
1869 retry:
1870     sge.addr = (uintptr_t)(block->local_host_addr +
1871                             (current_addr - block->offset));
1872     sge.length = length;
1873 
1874     chunk = ram_chunk_index(block->local_host_addr,
1875                             (uint8_t *)(uintptr_t)sge.addr);
1876     chunk_start = ram_chunk_start(block, chunk);
1877 
1878     if (block->is_ram_block) {
1879         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1880 
1881         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1882             chunks--;
1883         }
1884     } else {
1885         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1886 
1887         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1888             chunks--;
1889         }
1890     }
1891 
1892     trace_qemu_rdma_write_one_top(chunks + 1,
1893                                   (chunks + 1) *
1894                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1895 
1896     chunk_end = ram_chunk_end(block, chunk + chunks);
1897 
1898     if (!rdma->pin_all) {
1899 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1900         qemu_rdma_unregister_waiting(rdma);
1901 #endif
1902     }
1903 
1904     while (test_bit(chunk, block->transit_bitmap)) {
1905         (void)count;
1906         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1907                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1908 
1909         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1910 
1911         if (ret < 0) {
1912             error_report("Failed to Wait for previous write to complete "
1913                     "block %d chunk %" PRIu64
1914                     " current %" PRIu64 " len %" PRIu64 " %d",
1915                     current_index, chunk, sge.addr, length, rdma->nb_sent);
1916             return ret;
1917         }
1918     }
1919 
1920     if (!rdma->pin_all || !block->is_ram_block) {
1921         if (!block->remote_keys[chunk]) {
1922             /*
1923              * This chunk has not yet been registered, so first check to see
1924              * if the entire chunk is zero. If so, tell the other size to
1925              * memset() + madvise() the entire chunk without RDMA.
1926              */
1927 
1928             if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
1929                                                    length)
1930                    && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
1931                                                     length) == length) {
1932                 RDMACompress comp = {
1933                                         .offset = current_addr,
1934                                         .value = 0,
1935                                         .block_idx = current_index,
1936                                         .length = length,
1937                                     };
1938 
1939                 head.len = sizeof(comp);
1940                 head.type = RDMA_CONTROL_COMPRESS;
1941 
1942                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
1943                                                current_index, current_addr);
1944 
1945                 compress_to_network(rdma, &comp);
1946                 ret = qemu_rdma_exchange_send(rdma, &head,
1947                                 (uint8_t *) &comp, NULL, NULL, NULL);
1948 
1949                 if (ret < 0) {
1950                     return -EIO;
1951                 }
1952 
1953                 acct_update_position(f, sge.length, true);
1954 
1955                 return 1;
1956             }
1957 
1958             /*
1959              * Otherwise, tell other side to register.
1960              */
1961             reg.current_index = current_index;
1962             if (block->is_ram_block) {
1963                 reg.key.current_addr = current_addr;
1964             } else {
1965                 reg.key.chunk = chunk;
1966             }
1967             reg.chunks = chunks;
1968 
1969             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1970                                               current_addr);
1971 
1972             register_to_network(rdma, &reg);
1973             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1974                                     &resp, &reg_result_idx, NULL);
1975             if (ret < 0) {
1976                 return ret;
1977             }
1978 
1979             /* try to overlap this single registration with the one we sent. */
1980             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1981                                                 &sge.lkey, NULL, chunk,
1982                                                 chunk_start, chunk_end)) {
1983                 error_report("cannot get lkey");
1984                 return -EINVAL;
1985             }
1986 
1987             reg_result = (RDMARegisterResult *)
1988                     rdma->wr_data[reg_result_idx].control_curr;
1989 
1990             network_to_result(reg_result);
1991 
1992             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
1993                                                  reg_result->rkey, chunk);
1994 
1995             block->remote_keys[chunk] = reg_result->rkey;
1996             block->remote_host_addr = reg_result->host_addr;
1997         } else {
1998             /* already registered before */
1999             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2000                                                 &sge.lkey, NULL, chunk,
2001                                                 chunk_start, chunk_end)) {
2002                 error_report("cannot get lkey!");
2003                 return -EINVAL;
2004             }
2005         }
2006 
2007         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2008     } else {
2009         send_wr.wr.rdma.rkey = block->remote_rkey;
2010 
2011         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2012                                                      &sge.lkey, NULL, chunk,
2013                                                      chunk_start, chunk_end)) {
2014             error_report("cannot get lkey!");
2015             return -EINVAL;
2016         }
2017     }
2018 
2019     /*
2020      * Encode the ram block index and chunk within this wrid.
2021      * We will use this information at the time of completion
2022      * to figure out which bitmap to check against and then which
2023      * chunk in the bitmap to look for.
2024      */
2025     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2026                                         current_index, chunk);
2027 
2028     send_wr.opcode = IBV_WR_RDMA_WRITE;
2029     send_wr.send_flags = IBV_SEND_SIGNALED;
2030     send_wr.sg_list = &sge;
2031     send_wr.num_sge = 1;
2032     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2033                                 (current_addr - block->offset);
2034 
2035     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2036                                    sge.length);
2037 
2038     /*
2039      * ibv_post_send() does not return negative error numbers,
2040      * per the specification they are positive - no idea why.
2041      */
2042     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2043 
2044     if (ret == ENOMEM) {
2045         trace_qemu_rdma_write_one_queue_full();
2046         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2047         if (ret < 0) {
2048             error_report("rdma migration: failed to make "
2049                          "room in full send queue! %d", ret);
2050             return ret;
2051         }
2052 
2053         goto retry;
2054 
2055     } else if (ret > 0) {
2056         perror("rdma migration: post rdma write failed");
2057         return -ret;
2058     }
2059 
2060     set_bit(chunk, block->transit_bitmap);
2061     acct_update_position(f, sge.length, false);
2062     rdma->total_writes++;
2063 
2064     return 0;
2065 }
2066 
2067 /*
2068  * Push out any unwritten RDMA operations.
2069  *
2070  * We support sending out multiple chunks at the same time.
2071  * Not all of them need to get signaled in the completion queue.
2072  */
2073 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2074 {
2075     int ret;
2076 
2077     if (!rdma->current_length) {
2078         return 0;
2079     }
2080 
2081     ret = qemu_rdma_write_one(f, rdma,
2082             rdma->current_index, rdma->current_addr, rdma->current_length);
2083 
2084     if (ret < 0) {
2085         return ret;
2086     }
2087 
2088     if (ret == 0) {
2089         rdma->nb_sent++;
2090         trace_qemu_rdma_write_flush(rdma->nb_sent);
2091     }
2092 
2093     rdma->current_length = 0;
2094     rdma->current_addr = 0;
2095 
2096     return 0;
2097 }
2098 
2099 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2100                     uint64_t offset, uint64_t len)
2101 {
2102     RDMALocalBlock *block;
2103     uint8_t *host_addr;
2104     uint8_t *chunk_end;
2105 
2106     if (rdma->current_index < 0) {
2107         return 0;
2108     }
2109 
2110     if (rdma->current_chunk < 0) {
2111         return 0;
2112     }
2113 
2114     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2115     host_addr = block->local_host_addr + (offset - block->offset);
2116     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2117 
2118     if (rdma->current_length == 0) {
2119         return 0;
2120     }
2121 
2122     /*
2123      * Only merge into chunk sequentially.
2124      */
2125     if (offset != (rdma->current_addr + rdma->current_length)) {
2126         return 0;
2127     }
2128 
2129     if (offset < block->offset) {
2130         return 0;
2131     }
2132 
2133     if ((offset + len) > (block->offset + block->length)) {
2134         return 0;
2135     }
2136 
2137     if ((host_addr + len) > chunk_end) {
2138         return 0;
2139     }
2140 
2141     return 1;
2142 }
2143 
2144 /*
2145  * We're not actually writing here, but doing three things:
2146  *
2147  * 1. Identify the chunk the buffer belongs to.
2148  * 2. If the chunk is full or the buffer doesn't belong to the current
2149  *    chunk, then start a new chunk and flush() the old chunk.
2150  * 3. To keep the hardware busy, we also group chunks into batches
2151  *    and only require that a batch gets acknowledged in the completion
2152  *    qeueue instead of each individual chunk.
2153  */
2154 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2155                            uint64_t block_offset, uint64_t offset,
2156                            uint64_t len)
2157 {
2158     uint64_t current_addr = block_offset + offset;
2159     uint64_t index = rdma->current_index;
2160     uint64_t chunk = rdma->current_chunk;
2161     int ret;
2162 
2163     /* If we cannot merge it, we flush the current buffer first. */
2164     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2165         ret = qemu_rdma_write_flush(f, rdma);
2166         if (ret) {
2167             return ret;
2168         }
2169         rdma->current_length = 0;
2170         rdma->current_addr = current_addr;
2171 
2172         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2173                                          offset, len, &index, &chunk);
2174         if (ret) {
2175             error_report("ram block search failed");
2176             return ret;
2177         }
2178         rdma->current_index = index;
2179         rdma->current_chunk = chunk;
2180     }
2181 
2182     /* merge it */
2183     rdma->current_length += len;
2184 
2185     /* flush it if buffer is too large */
2186     if (rdma->current_length >= RDMA_MERGE_MAX) {
2187         return qemu_rdma_write_flush(f, rdma);
2188     }
2189 
2190     return 0;
2191 }
2192 
2193 static void qemu_rdma_cleanup(RDMAContext *rdma)
2194 {
2195     struct rdma_cm_event *cm_event;
2196     int ret, idx;
2197 
2198     if (rdma->cm_id && rdma->connected) {
2199         if (rdma->error_state) {
2200             RDMAControlHeader head = { .len = 0,
2201                                        .type = RDMA_CONTROL_ERROR,
2202                                        .repeat = 1,
2203                                      };
2204             error_report("Early error. Sending error.");
2205             qemu_rdma_post_send_control(rdma, NULL, &head);
2206         }
2207 
2208         ret = rdma_disconnect(rdma->cm_id);
2209         if (!ret) {
2210             trace_qemu_rdma_cleanup_waiting_for_disconnect();
2211             ret = rdma_get_cm_event(rdma->channel, &cm_event);
2212             if (!ret) {
2213                 rdma_ack_cm_event(cm_event);
2214             }
2215         }
2216         trace_qemu_rdma_cleanup_disconnect();
2217         rdma->connected = false;
2218     }
2219 
2220     g_free(rdma->dest_blocks);
2221     rdma->dest_blocks = NULL;
2222 
2223     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2224         if (rdma->wr_data[idx].control_mr) {
2225             rdma->total_registrations--;
2226             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2227         }
2228         rdma->wr_data[idx].control_mr = NULL;
2229     }
2230 
2231     if (rdma->local_ram_blocks.block) {
2232         while (rdma->local_ram_blocks.nb_blocks) {
2233             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2234         }
2235     }
2236 
2237     if (rdma->qp) {
2238         rdma_destroy_qp(rdma->cm_id);
2239         rdma->qp = NULL;
2240     }
2241     if (rdma->cq) {
2242         ibv_destroy_cq(rdma->cq);
2243         rdma->cq = NULL;
2244     }
2245     if (rdma->comp_channel) {
2246         ibv_destroy_comp_channel(rdma->comp_channel);
2247         rdma->comp_channel = NULL;
2248     }
2249     if (rdma->pd) {
2250         ibv_dealloc_pd(rdma->pd);
2251         rdma->pd = NULL;
2252     }
2253     if (rdma->cm_id) {
2254         rdma_destroy_id(rdma->cm_id);
2255         rdma->cm_id = NULL;
2256     }
2257     if (rdma->listen_id) {
2258         rdma_destroy_id(rdma->listen_id);
2259         rdma->listen_id = NULL;
2260     }
2261     if (rdma->channel) {
2262         rdma_destroy_event_channel(rdma->channel);
2263         rdma->channel = NULL;
2264     }
2265     g_free(rdma->host);
2266     rdma->host = NULL;
2267 }
2268 
2269 
2270 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2271 {
2272     int ret, idx;
2273     Error *local_err = NULL, **temp = &local_err;
2274 
2275     /*
2276      * Will be validated against destination's actual capabilities
2277      * after the connect() completes.
2278      */
2279     rdma->pin_all = pin_all;
2280 
2281     ret = qemu_rdma_resolve_host(rdma, temp);
2282     if (ret) {
2283         goto err_rdma_source_init;
2284     }
2285 
2286     ret = qemu_rdma_alloc_pd_cq(rdma);
2287     if (ret) {
2288         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2289                     " limits may be too low. Please check $ ulimit -a # and "
2290                     "search for 'ulimit -l' in the output");
2291         goto err_rdma_source_init;
2292     }
2293 
2294     ret = qemu_rdma_alloc_qp(rdma);
2295     if (ret) {
2296         ERROR(temp, "rdma migration: error allocating qp!");
2297         goto err_rdma_source_init;
2298     }
2299 
2300     ret = qemu_rdma_init_ram_blocks(rdma);
2301     if (ret) {
2302         ERROR(temp, "rdma migration: error initializing ram blocks!");
2303         goto err_rdma_source_init;
2304     }
2305 
2306     /* Build the hash that maps from offset to RAMBlock */
2307     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2308     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2309         g_hash_table_insert(rdma->blockmap,
2310                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2311                 &rdma->local_ram_blocks.block[idx]);
2312     }
2313 
2314     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2315         ret = qemu_rdma_reg_control(rdma, idx);
2316         if (ret) {
2317             ERROR(temp, "rdma migration: error registering %d control!",
2318                                                             idx);
2319             goto err_rdma_source_init;
2320         }
2321     }
2322 
2323     return 0;
2324 
2325 err_rdma_source_init:
2326     error_propagate(errp, local_err);
2327     qemu_rdma_cleanup(rdma);
2328     return -1;
2329 }
2330 
2331 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2332 {
2333     RDMACapabilities cap = {
2334                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2335                                 .flags = 0,
2336                            };
2337     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2338                                           .retry_count = 5,
2339                                           .private_data = &cap,
2340                                           .private_data_len = sizeof(cap),
2341                                         };
2342     struct rdma_cm_event *cm_event;
2343     int ret;
2344 
2345     /*
2346      * Only negotiate the capability with destination if the user
2347      * on the source first requested the capability.
2348      */
2349     if (rdma->pin_all) {
2350         trace_qemu_rdma_connect_pin_all_requested();
2351         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2352     }
2353 
2354     caps_to_network(&cap);
2355 
2356     ret = rdma_connect(rdma->cm_id, &conn_param);
2357     if (ret) {
2358         perror("rdma_connect");
2359         ERROR(errp, "connecting to destination!");
2360         goto err_rdma_source_connect;
2361     }
2362 
2363     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2364     if (ret) {
2365         perror("rdma_get_cm_event after rdma_connect");
2366         ERROR(errp, "connecting to destination!");
2367         rdma_ack_cm_event(cm_event);
2368         goto err_rdma_source_connect;
2369     }
2370 
2371     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2372         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2373         ERROR(errp, "connecting to destination!");
2374         rdma_ack_cm_event(cm_event);
2375         goto err_rdma_source_connect;
2376     }
2377     rdma->connected = true;
2378 
2379     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2380     network_to_caps(&cap);
2381 
2382     /*
2383      * Verify that the *requested* capabilities are supported by the destination
2384      * and disable them otherwise.
2385      */
2386     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2387         ERROR(errp, "Server cannot support pinning all memory. "
2388                         "Will register memory dynamically.");
2389         rdma->pin_all = false;
2390     }
2391 
2392     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2393 
2394     rdma_ack_cm_event(cm_event);
2395 
2396     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2397     if (ret) {
2398         ERROR(errp, "posting second control recv!");
2399         goto err_rdma_source_connect;
2400     }
2401 
2402     rdma->control_ready_expected = 1;
2403     rdma->nb_sent = 0;
2404     return 0;
2405 
2406 err_rdma_source_connect:
2407     qemu_rdma_cleanup(rdma);
2408     return -1;
2409 }
2410 
2411 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2412 {
2413     int ret, idx;
2414     struct rdma_cm_id *listen_id;
2415     char ip[40] = "unknown";
2416     struct rdma_addrinfo *res, *e;
2417     char port_str[16];
2418 
2419     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2420         rdma->wr_data[idx].control_len = 0;
2421         rdma->wr_data[idx].control_curr = NULL;
2422     }
2423 
2424     if (!rdma->host || !rdma->host[0]) {
2425         ERROR(errp, "RDMA host is not set!");
2426         rdma->error_state = -EINVAL;
2427         return -1;
2428     }
2429     /* create CM channel */
2430     rdma->channel = rdma_create_event_channel();
2431     if (!rdma->channel) {
2432         ERROR(errp, "could not create rdma event channel");
2433         rdma->error_state = -EINVAL;
2434         return -1;
2435     }
2436 
2437     /* create CM id */
2438     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2439     if (ret) {
2440         ERROR(errp, "could not create cm_id!");
2441         goto err_dest_init_create_listen_id;
2442     }
2443 
2444     snprintf(port_str, 16, "%d", rdma->port);
2445     port_str[15] = '\0';
2446 
2447     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2448     if (ret < 0) {
2449         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2450         goto err_dest_init_bind_addr;
2451     }
2452 
2453     for (e = res; e != NULL; e = e->ai_next) {
2454         inet_ntop(e->ai_family,
2455             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2456         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2457         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2458         if (ret) {
2459             continue;
2460         }
2461         if (e->ai_family == AF_INET6) {
2462             ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
2463             if (ret) {
2464                 continue;
2465             }
2466         }
2467         break;
2468     }
2469 
2470     if (!e) {
2471         ERROR(errp, "Error: could not rdma_bind_addr!");
2472         goto err_dest_init_bind_addr;
2473     }
2474 
2475     rdma->listen_id = listen_id;
2476     qemu_rdma_dump_gid("dest_init", listen_id);
2477     return 0;
2478 
2479 err_dest_init_bind_addr:
2480     rdma_destroy_id(listen_id);
2481 err_dest_init_create_listen_id:
2482     rdma_destroy_event_channel(rdma->channel);
2483     rdma->channel = NULL;
2484     rdma->error_state = ret;
2485     return ret;
2486 
2487 }
2488 
2489 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2490 {
2491     RDMAContext *rdma = NULL;
2492     InetSocketAddress *addr;
2493 
2494     if (host_port) {
2495         rdma = g_new0(RDMAContext, 1);
2496         rdma->current_index = -1;
2497         rdma->current_chunk = -1;
2498 
2499         addr = inet_parse(host_port, NULL);
2500         if (addr != NULL) {
2501             rdma->port = atoi(addr->port);
2502             rdma->host = g_strdup(addr->host);
2503         } else {
2504             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2505             g_free(rdma);
2506             rdma = NULL;
2507         }
2508 
2509         qapi_free_InetSocketAddress(addr);
2510     }
2511 
2512     return rdma;
2513 }
2514 
2515 /*
2516  * QEMUFile interface to the control channel.
2517  * SEND messages for control only.
2518  * VM's ram is handled with regular RDMA messages.
2519  */
2520 static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
2521                                     int64_t pos, size_t size)
2522 {
2523     QEMUFileRDMA *r = opaque;
2524     QEMUFile *f = r->file;
2525     RDMAContext *rdma = r->rdma;
2526     size_t remaining = size;
2527     uint8_t * data = (void *) buf;
2528     int ret;
2529 
2530     CHECK_ERROR_STATE();
2531 
2532     /*
2533      * Push out any writes that
2534      * we're queued up for VM's ram.
2535      */
2536     ret = qemu_rdma_write_flush(f, rdma);
2537     if (ret < 0) {
2538         rdma->error_state = ret;
2539         return ret;
2540     }
2541 
2542     while (remaining) {
2543         RDMAControlHeader head;
2544 
2545         r->len = MIN(remaining, RDMA_SEND_INCREMENT);
2546         remaining -= r->len;
2547 
2548         /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */
2549         head.len = (uint32_t)r->len;
2550         head.type = RDMA_CONTROL_QEMU_FILE;
2551 
2552         ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2553 
2554         if (ret < 0) {
2555             rdma->error_state = ret;
2556             return ret;
2557         }
2558 
2559         data += r->len;
2560     }
2561 
2562     return size;
2563 }
2564 
2565 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2566                              size_t size, int idx)
2567 {
2568     size_t len = 0;
2569 
2570     if (rdma->wr_data[idx].control_len) {
2571         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2572 
2573         len = MIN(size, rdma->wr_data[idx].control_len);
2574         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2575         rdma->wr_data[idx].control_curr += len;
2576         rdma->wr_data[idx].control_len -= len;
2577     }
2578 
2579     return len;
2580 }
2581 
2582 /*
2583  * QEMUFile interface to the control channel.
2584  * RDMA links don't use bytestreams, so we have to
2585  * return bytes to QEMUFile opportunistically.
2586  */
2587 static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
2588                                     int64_t pos, size_t size)
2589 {
2590     QEMUFileRDMA *r = opaque;
2591     RDMAContext *rdma = r->rdma;
2592     RDMAControlHeader head;
2593     int ret = 0;
2594 
2595     CHECK_ERROR_STATE();
2596 
2597     /*
2598      * First, we hold on to the last SEND message we
2599      * were given and dish out the bytes until we run
2600      * out of bytes.
2601      */
2602     r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
2603     if (r->len) {
2604         return r->len;
2605     }
2606 
2607     /*
2608      * Once we run out, we block and wait for another
2609      * SEND message to arrive.
2610      */
2611     ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2612 
2613     if (ret < 0) {
2614         rdma->error_state = ret;
2615         return ret;
2616     }
2617 
2618     /*
2619      * SEND was received with new bytes, now try again.
2620      */
2621     return qemu_rdma_fill(r->rdma, buf, size, 0);
2622 }
2623 
2624 /*
2625  * Block until all the outstanding chunks have been delivered by the hardware.
2626  */
2627 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2628 {
2629     int ret;
2630 
2631     if (qemu_rdma_write_flush(f, rdma) < 0) {
2632         return -EIO;
2633     }
2634 
2635     while (rdma->nb_sent) {
2636         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2637         if (ret < 0) {
2638             error_report("rdma migration: complete polling error!");
2639             return -EIO;
2640         }
2641     }
2642 
2643     qemu_rdma_unregister_waiting(rdma);
2644 
2645     return 0;
2646 }
2647 
2648 static int qemu_rdma_close(void *opaque)
2649 {
2650     trace_qemu_rdma_close();
2651     QEMUFileRDMA *r = opaque;
2652     if (r->rdma) {
2653         qemu_rdma_cleanup(r->rdma);
2654         g_free(r->rdma);
2655     }
2656     g_free(r);
2657     return 0;
2658 }
2659 
2660 /*
2661  * Parameters:
2662  *    @offset == 0 :
2663  *        This means that 'block_offset' is a full virtual address that does not
2664  *        belong to a RAMBlock of the virtual machine and instead
2665  *        represents a private malloc'd memory area that the caller wishes to
2666  *        transfer.
2667  *
2668  *    @offset != 0 :
2669  *        Offset is an offset to be added to block_offset and used
2670  *        to also lookup the corresponding RAMBlock.
2671  *
2672  *    @size > 0 :
2673  *        Initiate an transfer this size.
2674  *
2675  *    @size == 0 :
2676  *        A 'hint' or 'advice' that means that we wish to speculatively
2677  *        and asynchronously unregister this memory. In this case, there is no
2678  *        guarantee that the unregister will actually happen, for example,
2679  *        if the memory is being actively transmitted. Additionally, the memory
2680  *        may be re-registered at any future time if a write within the same
2681  *        chunk was requested again, even if you attempted to unregister it
2682  *        here.
2683  *
2684  *    @size < 0 : TODO, not yet supported
2685  *        Unregister the memory NOW. This means that the caller does not
2686  *        expect there to be any future RDMA transfers and we just want to clean
2687  *        things up. This is used in case the upper layer owns the memory and
2688  *        cannot wait for qemu_fclose() to occur.
2689  *
2690  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2691  *                  sent. Usually, this will not be more than a few bytes of
2692  *                  the protocol because most transfers are sent asynchronously.
2693  */
2694 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2695                                   ram_addr_t block_offset, ram_addr_t offset,
2696                                   size_t size, uint64_t *bytes_sent)
2697 {
2698     QEMUFileRDMA *rfile = opaque;
2699     RDMAContext *rdma = rfile->rdma;
2700     int ret;
2701 
2702     CHECK_ERROR_STATE();
2703 
2704     qemu_fflush(f);
2705 
2706     if (size > 0) {
2707         /*
2708          * Add this page to the current 'chunk'. If the chunk
2709          * is full, or the page doen't belong to the current chunk,
2710          * an actual RDMA write will occur and a new chunk will be formed.
2711          */
2712         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2713         if (ret < 0) {
2714             error_report("rdma migration: write error! %d", ret);
2715             goto err;
2716         }
2717 
2718         /*
2719          * We always return 1 bytes because the RDMA
2720          * protocol is completely asynchronous. We do not yet know
2721          * whether an  identified chunk is zero or not because we're
2722          * waiting for other pages to potentially be merged with
2723          * the current chunk. So, we have to call qemu_update_position()
2724          * later on when the actual write occurs.
2725          */
2726         if (bytes_sent) {
2727             *bytes_sent = 1;
2728         }
2729     } else {
2730         uint64_t index, chunk;
2731 
2732         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2733         if (size < 0) {
2734             ret = qemu_rdma_drain_cq(f, rdma);
2735             if (ret < 0) {
2736                 fprintf(stderr, "rdma: failed to synchronously drain"
2737                                 " completion queue before unregistration.\n");
2738                 goto err;
2739             }
2740         }
2741         */
2742 
2743         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2744                                          offset, size, &index, &chunk);
2745 
2746         if (ret) {
2747             error_report("ram block search failed");
2748             goto err;
2749         }
2750 
2751         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2752 
2753         /*
2754          * TODO: Synchronous, guaranteed unregistration (should not occur during
2755          * fast-path). Otherwise, unregisters will process on the next call to
2756          * qemu_rdma_drain_cq()
2757         if (size < 0) {
2758             qemu_rdma_unregister_waiting(rdma);
2759         }
2760         */
2761     }
2762 
2763     /*
2764      * Drain the Completion Queue if possible, but do not block,
2765      * just poll.
2766      *
2767      * If nothing to poll, the end of the iteration will do this
2768      * again to make sure we don't overflow the request queue.
2769      */
2770     while (1) {
2771         uint64_t wr_id, wr_id_in;
2772         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2773         if (ret < 0) {
2774             error_report("rdma migration: polling error! %d", ret);
2775             goto err;
2776         }
2777 
2778         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2779 
2780         if (wr_id == RDMA_WRID_NONE) {
2781             break;
2782         }
2783     }
2784 
2785     return RAM_SAVE_CONTROL_DELAYED;
2786 err:
2787     rdma->error_state = ret;
2788     return ret;
2789 }
2790 
2791 static int qemu_rdma_accept(RDMAContext *rdma)
2792 {
2793     RDMACapabilities cap;
2794     struct rdma_conn_param conn_param = {
2795                                             .responder_resources = 2,
2796                                             .private_data = &cap,
2797                                             .private_data_len = sizeof(cap),
2798                                          };
2799     struct rdma_cm_event *cm_event;
2800     struct ibv_context *verbs;
2801     int ret = -EINVAL;
2802     int idx;
2803 
2804     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2805     if (ret) {
2806         goto err_rdma_dest_wait;
2807     }
2808 
2809     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2810         rdma_ack_cm_event(cm_event);
2811         goto err_rdma_dest_wait;
2812     }
2813 
2814     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2815 
2816     network_to_caps(&cap);
2817 
2818     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2819             error_report("Unknown source RDMA version: %d, bailing...",
2820                             cap.version);
2821             rdma_ack_cm_event(cm_event);
2822             goto err_rdma_dest_wait;
2823     }
2824 
2825     /*
2826      * Respond with only the capabilities this version of QEMU knows about.
2827      */
2828     cap.flags &= known_capabilities;
2829 
2830     /*
2831      * Enable the ones that we do know about.
2832      * Add other checks here as new ones are introduced.
2833      */
2834     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2835         rdma->pin_all = true;
2836     }
2837 
2838     rdma->cm_id = cm_event->id;
2839     verbs = cm_event->id->verbs;
2840 
2841     rdma_ack_cm_event(cm_event);
2842 
2843     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
2844 
2845     caps_to_network(&cap);
2846 
2847     trace_qemu_rdma_accept_pin_verbsc(verbs);
2848 
2849     if (!rdma->verbs) {
2850         rdma->verbs = verbs;
2851     } else if (rdma->verbs != verbs) {
2852             error_report("ibv context not matching %p, %p!", rdma->verbs,
2853                          verbs);
2854             goto err_rdma_dest_wait;
2855     }
2856 
2857     qemu_rdma_dump_id("dest_init", verbs);
2858 
2859     ret = qemu_rdma_alloc_pd_cq(rdma);
2860     if (ret) {
2861         error_report("rdma migration: error allocating pd and cq!");
2862         goto err_rdma_dest_wait;
2863     }
2864 
2865     ret = qemu_rdma_alloc_qp(rdma);
2866     if (ret) {
2867         error_report("rdma migration: error allocating qp!");
2868         goto err_rdma_dest_wait;
2869     }
2870 
2871     ret = qemu_rdma_init_ram_blocks(rdma);
2872     if (ret) {
2873         error_report("rdma migration: error initializing ram blocks!");
2874         goto err_rdma_dest_wait;
2875     }
2876 
2877     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2878         ret = qemu_rdma_reg_control(rdma, idx);
2879         if (ret) {
2880             error_report("rdma: error registering %d control", idx);
2881             goto err_rdma_dest_wait;
2882         }
2883     }
2884 
2885     qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2886 
2887     ret = rdma_accept(rdma->cm_id, &conn_param);
2888     if (ret) {
2889         error_report("rdma_accept returns %d", ret);
2890         goto err_rdma_dest_wait;
2891     }
2892 
2893     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2894     if (ret) {
2895         error_report("rdma_accept get_cm_event failed %d", ret);
2896         goto err_rdma_dest_wait;
2897     }
2898 
2899     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2900         error_report("rdma_accept not event established");
2901         rdma_ack_cm_event(cm_event);
2902         goto err_rdma_dest_wait;
2903     }
2904 
2905     rdma_ack_cm_event(cm_event);
2906     rdma->connected = true;
2907 
2908     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2909     if (ret) {
2910         error_report("rdma migration: error posting second control recv");
2911         goto err_rdma_dest_wait;
2912     }
2913 
2914     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
2915 
2916     return 0;
2917 
2918 err_rdma_dest_wait:
2919     rdma->error_state = ret;
2920     qemu_rdma_cleanup(rdma);
2921     return ret;
2922 }
2923 
2924 static int dest_ram_sort_func(const void *a, const void *b)
2925 {
2926     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
2927     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
2928 
2929     return (a_index < b_index) ? -1 : (a_index != b_index);
2930 }
2931 
2932 /*
2933  * During each iteration of the migration, we listen for instructions
2934  * by the source VM to perform dynamic page registrations before they
2935  * can perform RDMA operations.
2936  *
2937  * We respond with the 'rkey'.
2938  *
2939  * Keep doing this until the source tells us to stop.
2940  */
2941 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
2942 {
2943     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
2944                                .type = RDMA_CONTROL_REGISTER_RESULT,
2945                                .repeat = 0,
2946                              };
2947     RDMAControlHeader unreg_resp = { .len = 0,
2948                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
2949                                .repeat = 0,
2950                              };
2951     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
2952                                  .repeat = 1 };
2953     QEMUFileRDMA *rfile = opaque;
2954     RDMAContext *rdma = rfile->rdma;
2955     RDMALocalBlocks *local = &rdma->local_ram_blocks;
2956     RDMAControlHeader head;
2957     RDMARegister *reg, *registers;
2958     RDMACompress *comp;
2959     RDMARegisterResult *reg_result;
2960     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
2961     RDMALocalBlock *block;
2962     void *host_addr;
2963     int ret = 0;
2964     int idx = 0;
2965     int count = 0;
2966     int i = 0;
2967 
2968     CHECK_ERROR_STATE();
2969 
2970     do {
2971         trace_qemu_rdma_registration_handle_wait();
2972 
2973         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
2974 
2975         if (ret < 0) {
2976             break;
2977         }
2978 
2979         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
2980             error_report("rdma: Too many requests in this message (%d)."
2981                             "Bailing.", head.repeat);
2982             ret = -EIO;
2983             break;
2984         }
2985 
2986         switch (head.type) {
2987         case RDMA_CONTROL_COMPRESS:
2988             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
2989             network_to_compress(comp);
2990 
2991             trace_qemu_rdma_registration_handle_compress(comp->length,
2992                                                          comp->block_idx,
2993                                                          comp->offset);
2994             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
2995                 error_report("rdma: 'compress' bad block index %u (vs %d)",
2996                              (unsigned int)comp->block_idx,
2997                              rdma->local_ram_blocks.nb_blocks);
2998                 ret = -EIO;
2999                 goto out;
3000             }
3001             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3002 
3003             host_addr = block->local_host_addr +
3004                             (comp->offset - block->offset);
3005 
3006             ram_handle_compressed(host_addr, comp->value, comp->length);
3007             break;
3008 
3009         case RDMA_CONTROL_REGISTER_FINISHED:
3010             trace_qemu_rdma_registration_handle_finished();
3011             goto out;
3012 
3013         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3014             trace_qemu_rdma_registration_handle_ram_blocks();
3015 
3016             /* Sort our local RAM Block list so it's the same as the source,
3017              * we can do this since we've filled in a src_index in the list
3018              * as we received the RAMBlock list earlier.
3019              */
3020             qsort(rdma->local_ram_blocks.block,
3021                   rdma->local_ram_blocks.nb_blocks,
3022                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3023             if (rdma->pin_all) {
3024                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3025                 if (ret) {
3026                     error_report("rdma migration: error dest "
3027                                     "registering ram blocks");
3028                     goto out;
3029                 }
3030             }
3031 
3032             /*
3033              * Dest uses this to prepare to transmit the RAMBlock descriptions
3034              * to the source VM after connection setup.
3035              * Both sides use the "remote" structure to communicate and update
3036              * their "local" descriptions with what was sent.
3037              */
3038             for (i = 0; i < local->nb_blocks; i++) {
3039                 rdma->dest_blocks[i].remote_host_addr =
3040                     (uintptr_t)(local->block[i].local_host_addr);
3041 
3042                 if (rdma->pin_all) {
3043                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3044                 }
3045 
3046                 rdma->dest_blocks[i].offset = local->block[i].offset;
3047                 rdma->dest_blocks[i].length = local->block[i].length;
3048 
3049                 dest_block_to_network(&rdma->dest_blocks[i]);
3050                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3051                     local->block[i].block_name,
3052                     local->block[i].offset,
3053                     local->block[i].length,
3054                     local->block[i].local_host_addr,
3055                     local->block[i].src_index);
3056             }
3057 
3058             blocks.len = rdma->local_ram_blocks.nb_blocks
3059                                                 * sizeof(RDMADestBlock);
3060 
3061 
3062             ret = qemu_rdma_post_send_control(rdma,
3063                                         (uint8_t *) rdma->dest_blocks, &blocks);
3064 
3065             if (ret < 0) {
3066                 error_report("rdma migration: error sending remote info");
3067                 goto out;
3068             }
3069 
3070             break;
3071         case RDMA_CONTROL_REGISTER_REQUEST:
3072             trace_qemu_rdma_registration_handle_register(head.repeat);
3073 
3074             reg_resp.repeat = head.repeat;
3075             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3076 
3077             for (count = 0; count < head.repeat; count++) {
3078                 uint64_t chunk;
3079                 uint8_t *chunk_start, *chunk_end;
3080 
3081                 reg = &registers[count];
3082                 network_to_register(reg);
3083 
3084                 reg_result = &results[count];
3085 
3086                 trace_qemu_rdma_registration_handle_register_loop(count,
3087                          reg->current_index, reg->key.current_addr, reg->chunks);
3088 
3089                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3090                     error_report("rdma: 'register' bad block index %u (vs %d)",
3091                                  (unsigned int)reg->current_index,
3092                                  rdma->local_ram_blocks.nb_blocks);
3093                     ret = -ENOENT;
3094                     goto out;
3095                 }
3096                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3097                 if (block->is_ram_block) {
3098                     if (block->offset > reg->key.current_addr) {
3099                         error_report("rdma: bad register address for block %s"
3100                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3101                             block->block_name, block->offset,
3102                             reg->key.current_addr);
3103                         ret = -ERANGE;
3104                         goto out;
3105                     }
3106                     host_addr = (block->local_host_addr +
3107                                 (reg->key.current_addr - block->offset));
3108                     chunk = ram_chunk_index(block->local_host_addr,
3109                                             (uint8_t *) host_addr);
3110                 } else {
3111                     chunk = reg->key.chunk;
3112                     host_addr = block->local_host_addr +
3113                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3114                     /* Check for particularly bad chunk value */
3115                     if (host_addr < (void *)block->local_host_addr) {
3116                         error_report("rdma: bad chunk for block %s"
3117                             " chunk: %" PRIx64,
3118                             block->block_name, reg->key.chunk);
3119                         ret = -ERANGE;
3120                         goto out;
3121                     }
3122                 }
3123                 chunk_start = ram_chunk_start(block, chunk);
3124                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3125                 if (qemu_rdma_register_and_get_keys(rdma, block,
3126                             (uintptr_t)host_addr, NULL, &reg_result->rkey,
3127                             chunk, chunk_start, chunk_end)) {
3128                     error_report("cannot get rkey");
3129                     ret = -EINVAL;
3130                     goto out;
3131                 }
3132 
3133                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3134 
3135                 trace_qemu_rdma_registration_handle_register_rkey(
3136                                                            reg_result->rkey);
3137 
3138                 result_to_network(reg_result);
3139             }
3140 
3141             ret = qemu_rdma_post_send_control(rdma,
3142                             (uint8_t *) results, &reg_resp);
3143 
3144             if (ret < 0) {
3145                 error_report("Failed to send control buffer");
3146                 goto out;
3147             }
3148             break;
3149         case RDMA_CONTROL_UNREGISTER_REQUEST:
3150             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3151             unreg_resp.repeat = head.repeat;
3152             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3153 
3154             for (count = 0; count < head.repeat; count++) {
3155                 reg = &registers[count];
3156                 network_to_register(reg);
3157 
3158                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3159                            reg->current_index, reg->key.chunk);
3160 
3161                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3162 
3163                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3164                 block->pmr[reg->key.chunk] = NULL;
3165 
3166                 if (ret != 0) {
3167                     perror("rdma unregistration chunk failed");
3168                     ret = -ret;
3169                     goto out;
3170                 }
3171 
3172                 rdma->total_registrations--;
3173 
3174                 trace_qemu_rdma_registration_handle_unregister_success(
3175                                                        reg->key.chunk);
3176             }
3177 
3178             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3179 
3180             if (ret < 0) {
3181                 error_report("Failed to send control buffer");
3182                 goto out;
3183             }
3184             break;
3185         case RDMA_CONTROL_REGISTER_RESULT:
3186             error_report("Invalid RESULT message at dest.");
3187             ret = -EIO;
3188             goto out;
3189         default:
3190             error_report("Unknown control message %s", control_desc[head.type]);
3191             ret = -EIO;
3192             goto out;
3193         }
3194     } while (1);
3195 out:
3196     if (ret < 0) {
3197         rdma->error_state = ret;
3198     }
3199     return ret;
3200 }
3201 
3202 /* Destination:
3203  * Called via a ram_control_load_hook during the initial RAM load section which
3204  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3205  * on the source.
3206  * We've already built our local RAMBlock list, but not yet sent the list to
3207  * the source.
3208  */
3209 static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name)
3210 {
3211     RDMAContext *rdma = rfile->rdma;
3212     int curr;
3213     int found = -1;
3214 
3215     /* Find the matching RAMBlock in our local list */
3216     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3217         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3218             found = curr;
3219             break;
3220         }
3221     }
3222 
3223     if (found == -1) {
3224         error_report("RAMBlock '%s' not found on destination", name);
3225         return -ENOENT;
3226     }
3227 
3228     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3229     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3230     rdma->next_src_index++;
3231 
3232     return 0;
3233 }
3234 
3235 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3236 {
3237     switch (flags) {
3238     case RAM_CONTROL_BLOCK_REG:
3239         return rdma_block_notification_handle(opaque, data);
3240 
3241     case RAM_CONTROL_HOOK:
3242         return qemu_rdma_registration_handle(f, opaque);
3243 
3244     default:
3245         /* Shouldn't be called with any other values */
3246         abort();
3247     }
3248 }
3249 
3250 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3251                                         uint64_t flags, void *data)
3252 {
3253     QEMUFileRDMA *rfile = opaque;
3254     RDMAContext *rdma = rfile->rdma;
3255 
3256     CHECK_ERROR_STATE();
3257 
3258     trace_qemu_rdma_registration_start(flags);
3259     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3260     qemu_fflush(f);
3261 
3262     return 0;
3263 }
3264 
3265 /*
3266  * Inform dest that dynamic registrations are done for now.
3267  * First, flush writes, if any.
3268  */
3269 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3270                                        uint64_t flags, void *data)
3271 {
3272     Error *local_err = NULL, **errp = &local_err;
3273     QEMUFileRDMA *rfile = opaque;
3274     RDMAContext *rdma = rfile->rdma;
3275     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3276     int ret = 0;
3277 
3278     CHECK_ERROR_STATE();
3279 
3280     qemu_fflush(f);
3281     ret = qemu_rdma_drain_cq(f, rdma);
3282 
3283     if (ret < 0) {
3284         goto err;
3285     }
3286 
3287     if (flags == RAM_CONTROL_SETUP) {
3288         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3289         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3290         int reg_result_idx, i, nb_dest_blocks;
3291 
3292         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3293         trace_qemu_rdma_registration_stop_ram();
3294 
3295         /*
3296          * Make sure that we parallelize the pinning on both sides.
3297          * For very large guests, doing this serially takes a really
3298          * long time, so we have to 'interleave' the pinning locally
3299          * with the control messages by performing the pinning on this
3300          * side before we receive the control response from the other
3301          * side that the pinning has completed.
3302          */
3303         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3304                     &reg_result_idx, rdma->pin_all ?
3305                     qemu_rdma_reg_whole_ram_blocks : NULL);
3306         if (ret < 0) {
3307             ERROR(errp, "receiving remote info!");
3308             return ret;
3309         }
3310 
3311         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3312 
3313         /*
3314          * The protocol uses two different sets of rkeys (mutually exclusive):
3315          * 1. One key to represent the virtual address of the entire ram block.
3316          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3317          * 2. One to represent individual chunks within a ram block.
3318          *    (dynamic chunk registration enabled - pin individual chunks.)
3319          *
3320          * Once the capability is successfully negotiated, the destination transmits
3321          * the keys to use (or sends them later) including the virtual addresses
3322          * and then propagates the remote ram block descriptions to his local copy.
3323          */
3324 
3325         if (local->nb_blocks != nb_dest_blocks) {
3326             ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3327                         "Your QEMU command line parameters are probably "
3328                         "not identical on both the source and destination.",
3329                         local->nb_blocks, nb_dest_blocks);
3330             rdma->error_state = -EINVAL;
3331             return -EINVAL;
3332         }
3333 
3334         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3335         memcpy(rdma->dest_blocks,
3336             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3337         for (i = 0; i < nb_dest_blocks; i++) {
3338             network_to_dest_block(&rdma->dest_blocks[i]);
3339 
3340             /* We require that the blocks are in the same order */
3341             if (rdma->dest_blocks[i].length != local->block[i].length) {
3342                 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3343                             "vs %" PRIu64, local->block[i].block_name, i,
3344                             local->block[i].length,
3345                             rdma->dest_blocks[i].length);
3346                 rdma->error_state = -EINVAL;
3347                 return -EINVAL;
3348             }
3349             local->block[i].remote_host_addr =
3350                     rdma->dest_blocks[i].remote_host_addr;
3351             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3352         }
3353     }
3354 
3355     trace_qemu_rdma_registration_stop(flags);
3356 
3357     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3358     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3359 
3360     if (ret < 0) {
3361         goto err;
3362     }
3363 
3364     return 0;
3365 err:
3366     rdma->error_state = ret;
3367     return ret;
3368 }
3369 
3370 static int qemu_rdma_get_fd(void *opaque)
3371 {
3372     QEMUFileRDMA *rfile = opaque;
3373     RDMAContext *rdma = rfile->rdma;
3374 
3375     return rdma->comp_channel->fd;
3376 }
3377 
3378 static const QEMUFileOps rdma_read_ops = {
3379     .get_buffer    = qemu_rdma_get_buffer,
3380     .get_fd        = qemu_rdma_get_fd,
3381     .close         = qemu_rdma_close,
3382     .hook_ram_load = rdma_load_hook,
3383 };
3384 
3385 static const QEMUFileOps rdma_write_ops = {
3386     .put_buffer         = qemu_rdma_put_buffer,
3387     .close              = qemu_rdma_close,
3388     .before_ram_iterate = qemu_rdma_registration_start,
3389     .after_ram_iterate  = qemu_rdma_registration_stop,
3390     .save_page          = qemu_rdma_save_page,
3391 };
3392 
3393 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3394 {
3395     QEMUFileRDMA *r;
3396 
3397     if (qemu_file_mode_is_not_valid(mode)) {
3398         return NULL;
3399     }
3400 
3401     r = g_new0(QEMUFileRDMA, 1);
3402     r->rdma = rdma;
3403 
3404     if (mode[0] == 'w') {
3405         r->file = qemu_fopen_ops(r, &rdma_write_ops);
3406     } else {
3407         r->file = qemu_fopen_ops(r, &rdma_read_ops);
3408     }
3409 
3410     return r->file;
3411 }
3412 
3413 static void rdma_accept_incoming_migration(void *opaque)
3414 {
3415     RDMAContext *rdma = opaque;
3416     int ret;
3417     QEMUFile *f;
3418     Error *local_err = NULL, **errp = &local_err;
3419 
3420     trace_qemu_rdma_accept_incoming_migration();
3421     ret = qemu_rdma_accept(rdma);
3422 
3423     if (ret) {
3424         ERROR(errp, "RDMA Migration initialization failed!");
3425         return;
3426     }
3427 
3428     trace_qemu_rdma_accept_incoming_migration_accepted();
3429 
3430     f = qemu_fopen_rdma(rdma, "rb");
3431     if (f == NULL) {
3432         ERROR(errp, "could not qemu_fopen_rdma!");
3433         qemu_rdma_cleanup(rdma);
3434         return;
3435     }
3436 
3437     rdma->migration_started_on_destination = 1;
3438     process_incoming_migration(f);
3439 }
3440 
3441 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3442 {
3443     int ret;
3444     RDMAContext *rdma;
3445     Error *local_err = NULL;
3446 
3447     trace_rdma_start_incoming_migration();
3448     rdma = qemu_rdma_data_init(host_port, &local_err);
3449 
3450     if (rdma == NULL) {
3451         goto err;
3452     }
3453 
3454     ret = qemu_rdma_dest_init(rdma, &local_err);
3455 
3456     if (ret) {
3457         goto err;
3458     }
3459 
3460     trace_rdma_start_incoming_migration_after_dest_init();
3461 
3462     ret = rdma_listen(rdma->listen_id, 5);
3463 
3464     if (ret) {
3465         ERROR(errp, "listening on socket!");
3466         goto err;
3467     }
3468 
3469     trace_rdma_start_incoming_migration_after_rdma_listen();
3470 
3471     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3472                         NULL, (void *)(intptr_t)rdma);
3473     return;
3474 err:
3475     error_propagate(errp, local_err);
3476     g_free(rdma);
3477 }
3478 
3479 void rdma_start_outgoing_migration(void *opaque,
3480                             const char *host_port, Error **errp)
3481 {
3482     MigrationState *s = opaque;
3483     Error *local_err = NULL, **temp = &local_err;
3484     RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
3485     int ret = 0;
3486 
3487     if (rdma == NULL) {
3488         ERROR(temp, "Failed to initialize RDMA data structures! %d", ret);
3489         goto err;
3490     }
3491 
3492     ret = qemu_rdma_source_init(rdma, &local_err,
3493         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
3494 
3495     if (ret) {
3496         goto err;
3497     }
3498 
3499     trace_rdma_start_outgoing_migration_after_rdma_source_init();
3500     ret = qemu_rdma_connect(rdma, &local_err);
3501 
3502     if (ret) {
3503         goto err;
3504     }
3505 
3506     trace_rdma_start_outgoing_migration_after_rdma_connect();
3507 
3508     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
3509     migrate_fd_connect(s);
3510     return;
3511 err:
3512     error_propagate(errp, local_err);
3513     g_free(rdma);
3514     migrate_fd_error(s);
3515 }
3516