xref: /openbmc/qemu/migration/rdma.c (revision 8f3f329f5e0117bd1a23a79ab751f8a7d3471e4b)
1  /*
2   * RDMA protocol and interfaces
3   *
4   * Copyright IBM, Corp. 2010-2013
5   * Copyright Red Hat, Inc. 2015-2016
6   *
7   * Authors:
8   *  Michael R. Hines <mrhines@us.ibm.com>
9   *  Jiuxing Liu <jl@us.ibm.com>
10   *  Daniel P. Berrange <berrange@redhat.com>
11   *
12   * This work is licensed under the terms of the GNU GPL, version 2 or
13   * later.  See the COPYING file in the top-level directory.
14   *
15   */
16  
17  #include "qemu/osdep.h"
18  #include "qapi/error.h"
19  #include "qemu/cutils.h"
20  #include "exec/target_page.h"
21  #include "rdma.h"
22  #include "migration.h"
23  #include "migration-stats.h"
24  #include "qemu-file.h"
25  #include "ram.h"
26  #include "qemu/error-report.h"
27  #include "qemu/main-loop.h"
28  #include "qemu/module.h"
29  #include "qemu/rcu.h"
30  #include "qemu/sockets.h"
31  #include "qemu/bitmap.h"
32  #include "qemu/coroutine.h"
33  #include "exec/memory.h"
34  #include <sys/socket.h>
35  #include <netdb.h>
36  #include <arpa/inet.h>
37  #include <rdma/rdma_cma.h>
38  #include "trace.h"
39  #include "qom/object.h"
40  #include "options.h"
41  #include <poll.h>
42  
43  #define RDMA_RESOLVE_TIMEOUT_MS 10000
44  
45  /* Do not merge data if larger than this. */
46  #define RDMA_MERGE_MAX (2 * 1024 * 1024)
47  #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
48  
49  #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
50  
51  /*
52   * This is only for non-live state being migrated.
53   * Instead of RDMA_WRITE messages, we use RDMA_SEND
54   * messages for that state, which requires a different
55   * delivery design than main memory.
56   */
57  #define RDMA_SEND_INCREMENT 32768
58  
59  /*
60   * Maximum size infiniband SEND message
61   */
62  #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
63  #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
64  
65  #define RDMA_CONTROL_VERSION_CURRENT 1
66  /*
67   * Capabilities for negotiation.
68   */
69  #define RDMA_CAPABILITY_PIN_ALL 0x01
70  
71  /*
72   * Add the other flags above to this list of known capabilities
73   * as they are introduced.
74   */
75  static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
76  
77  /*
78   * A work request ID is 64-bits and we split up these bits
79   * into 3 parts:
80   *
81   * bits 0-15 : type of control message, 2^16
82   * bits 16-29: ram block index, 2^14
83   * bits 30-63: ram block chunk number, 2^34
84   *
85   * The last two bit ranges are only used for RDMA writes,
86   * in order to track their completion and potentially
87   * also track unregistration status of the message.
88   */
89  #define RDMA_WRID_TYPE_SHIFT  0UL
90  #define RDMA_WRID_BLOCK_SHIFT 16UL
91  #define RDMA_WRID_CHUNK_SHIFT 30UL
92  
93  #define RDMA_WRID_TYPE_MASK \
94      ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
95  
96  #define RDMA_WRID_BLOCK_MASK \
97      (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
98  
99  #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
100  
101  /*
102   * RDMA migration protocol:
103   * 1. RDMA Writes (data messages, i.e. RAM)
104   * 2. IB Send/Recv (control channel messages)
105   */
106  enum {
107      RDMA_WRID_NONE = 0,
108      RDMA_WRID_RDMA_WRITE = 1,
109      RDMA_WRID_SEND_CONTROL = 2000,
110      RDMA_WRID_RECV_CONTROL = 4000,
111  };
112  
113  /*
114   * Work request IDs for IB SEND messages only (not RDMA writes).
115   * This is used by the migration protocol to transmit
116   * control messages (such as device state and registration commands)
117   *
118   * We could use more WRs, but we have enough for now.
119   */
120  enum {
121      RDMA_WRID_READY = 0,
122      RDMA_WRID_DATA,
123      RDMA_WRID_CONTROL,
124      RDMA_WRID_MAX,
125  };
126  
127  /*
128   * SEND/RECV IB Control Messages.
129   */
130  enum {
131      RDMA_CONTROL_NONE = 0,
132      RDMA_CONTROL_ERROR,
133      RDMA_CONTROL_READY,               /* ready to receive */
134      RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
135      RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
136      RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
137      RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
138      RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
139      RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
140      RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
141      RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
142      RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
143  };
144  
145  
146  /*
147   * Memory and MR structures used to represent an IB Send/Recv work request.
148   * This is *not* used for RDMA writes, only IB Send/Recv.
149   */
150  typedef struct {
151      uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
152      struct   ibv_mr *control_mr;               /* registration metadata */
153      size_t   control_len;                      /* length of the message */
154      uint8_t *control_curr;                     /* start of unconsumed bytes */
155  } RDMAWorkRequestData;
156  
157  /*
158   * Negotiate RDMA capabilities during connection-setup time.
159   */
160  typedef struct {
161      uint32_t version;
162      uint32_t flags;
163  } RDMACapabilities;
164  
caps_to_network(RDMACapabilities * cap)165  static void caps_to_network(RDMACapabilities *cap)
166  {
167      cap->version = htonl(cap->version);
168      cap->flags = htonl(cap->flags);
169  }
170  
network_to_caps(RDMACapabilities * cap)171  static void network_to_caps(RDMACapabilities *cap)
172  {
173      cap->version = ntohl(cap->version);
174      cap->flags = ntohl(cap->flags);
175  }
176  
177  /*
178   * Representation of a RAMBlock from an RDMA perspective.
179   * This is not transmitted, only local.
180   * This and subsequent structures cannot be linked lists
181   * because we're using a single IB message to transmit
182   * the information. It's small anyway, so a list is overkill.
183   */
184  typedef struct RDMALocalBlock {
185      char          *block_name;
186      uint8_t       *local_host_addr; /* local virtual address */
187      uint64_t       remote_host_addr; /* remote virtual address */
188      uint64_t       offset;
189      uint64_t       length;
190      struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
191      struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
192      uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
193      uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
194      int            index;           /* which block are we */
195      unsigned int   src_index;       /* (Only used on dest) */
196      bool           is_ram_block;
197      int            nb_chunks;
198      unsigned long *transit_bitmap;
199      unsigned long *unregister_bitmap;
200  } RDMALocalBlock;
201  
202  /*
203   * Also represents a RAMblock, but only on the dest.
204   * This gets transmitted by the dest during connection-time
205   * to the source VM and then is used to populate the
206   * corresponding RDMALocalBlock with
207   * the information needed to perform the actual RDMA.
208   */
209  typedef struct QEMU_PACKED RDMADestBlock {
210      uint64_t remote_host_addr;
211      uint64_t offset;
212      uint64_t length;
213      uint32_t remote_rkey;
214      uint32_t padding;
215  } RDMADestBlock;
216  
control_desc(unsigned int rdma_control)217  static const char *control_desc(unsigned int rdma_control)
218  {
219      static const char *strs[] = {
220          [RDMA_CONTROL_NONE] = "NONE",
221          [RDMA_CONTROL_ERROR] = "ERROR",
222          [RDMA_CONTROL_READY] = "READY",
223          [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
224          [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
225          [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
226          [RDMA_CONTROL_COMPRESS] = "COMPRESS",
227          [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
228          [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
229          [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
230          [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
231          [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
232      };
233  
234      if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
235          return "??BAD CONTROL VALUE??";
236      }
237  
238      return strs[rdma_control];
239  }
240  
241  #if !defined(htonll)
htonll(uint64_t v)242  static uint64_t htonll(uint64_t v)
243  {
244      union { uint32_t lv[2]; uint64_t llv; } u;
245      u.lv[0] = htonl(v >> 32);
246      u.lv[1] = htonl(v & 0xFFFFFFFFULL);
247      return u.llv;
248  }
249  #endif
250  
251  #if !defined(ntohll)
ntohll(uint64_t v)252  static uint64_t ntohll(uint64_t v)
253  {
254      union { uint32_t lv[2]; uint64_t llv; } u;
255      u.llv = v;
256      return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
257  }
258  #endif
259  
dest_block_to_network(RDMADestBlock * db)260  static void dest_block_to_network(RDMADestBlock *db)
261  {
262      db->remote_host_addr = htonll(db->remote_host_addr);
263      db->offset = htonll(db->offset);
264      db->length = htonll(db->length);
265      db->remote_rkey = htonl(db->remote_rkey);
266  }
267  
network_to_dest_block(RDMADestBlock * db)268  static void network_to_dest_block(RDMADestBlock *db)
269  {
270      db->remote_host_addr = ntohll(db->remote_host_addr);
271      db->offset = ntohll(db->offset);
272      db->length = ntohll(db->length);
273      db->remote_rkey = ntohl(db->remote_rkey);
274  }
275  
276  /*
277   * Virtual address of the above structures used for transmitting
278   * the RAMBlock descriptions at connection-time.
279   * This structure is *not* transmitted.
280   */
281  typedef struct RDMALocalBlocks {
282      int nb_blocks;
283      bool     init;             /* main memory init complete */
284      RDMALocalBlock *block;
285  } RDMALocalBlocks;
286  
287  /*
288   * Main data structure for RDMA state.
289   * While there is only one copy of this structure being allocated right now,
290   * this is the place where one would start if you wanted to consider
291   * having more than one RDMA connection open at the same time.
292   */
293  typedef struct RDMAContext {
294      char *host;
295      int port;
296  
297      RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
298  
299      /*
300       * This is used by *_exchange_send() to figure out whether or not
301       * the initial "READY" message has already been received or not.
302       * This is because other functions may potentially poll() and detect
303       * the READY message before send() does, in which case we need to
304       * know if it completed.
305       */
306      int control_ready_expected;
307  
308      /* number of outstanding writes */
309      int nb_sent;
310  
311      /* store info about current buffer so that we can
312         merge it with future sends */
313      uint64_t current_addr;
314      uint64_t current_length;
315      /* index of ram block the current buffer belongs to */
316      int current_index;
317      /* index of the chunk in the current ram block */
318      int current_chunk;
319  
320      bool pin_all;
321  
322      /*
323       * infiniband-specific variables for opening the device
324       * and maintaining connection state and so forth.
325       *
326       * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
327       * cm_id->verbs, cm_id->channel, and cm_id->qp.
328       */
329      struct rdma_cm_id *cm_id;               /* connection manager ID */
330      struct rdma_cm_id *listen_id;
331      bool connected;
332  
333      struct ibv_context          *verbs;
334      struct rdma_event_channel   *channel;
335      struct ibv_qp *qp;                      /* queue pair */
336      struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
337      struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
338      struct ibv_pd *pd;                      /* protection domain */
339      struct ibv_cq *recv_cq;                 /* recvieve completion queue */
340      struct ibv_cq *send_cq;                 /* send completion queue */
341  
342      /*
343       * If a previous write failed (perhaps because of a failed
344       * memory registration, then do not attempt any future work
345       * and remember the error state.
346       */
347      bool errored;
348      bool error_reported;
349      bool received_error;
350  
351      /*
352       * Description of ram blocks used throughout the code.
353       */
354      RDMALocalBlocks local_ram_blocks;
355      RDMADestBlock  *dest_blocks;
356  
357      /* Index of the next RAMBlock received during block registration */
358      unsigned int    next_src_index;
359  
360      /*
361       * Migration on *destination* started.
362       * Then use coroutine yield function.
363       * Source runs in a thread, so we don't care.
364       */
365      int migration_started_on_destination;
366  
367      int total_registrations;
368      int total_writes;
369  
370      int unregister_current, unregister_next;
371      uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
372  
373      GHashTable *blockmap;
374  
375      /* the RDMAContext for return path */
376      struct RDMAContext *return_path;
377      bool is_return_path;
378  } RDMAContext;
379  
380  #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
381  OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
382  
383  
384  
385  struct QIOChannelRDMA {
386      QIOChannel parent;
387      RDMAContext *rdmain;
388      RDMAContext *rdmaout;
389      QEMUFile *file;
390      bool blocking; /* XXX we don't actually honour this yet */
391  };
392  
393  /*
394   * Main structure for IB Send/Recv control messages.
395   * This gets prepended at the beginning of every Send/Recv.
396   */
397  typedef struct QEMU_PACKED {
398      uint32_t len;     /* Total length of data portion */
399      uint32_t type;    /* which control command to perform */
400      uint32_t repeat;  /* number of commands in data portion of same type */
401      uint32_t padding;
402  } RDMAControlHeader;
403  
control_to_network(RDMAControlHeader * control)404  static void control_to_network(RDMAControlHeader *control)
405  {
406      control->type = htonl(control->type);
407      control->len = htonl(control->len);
408      control->repeat = htonl(control->repeat);
409  }
410  
network_to_control(RDMAControlHeader * control)411  static void network_to_control(RDMAControlHeader *control)
412  {
413      control->type = ntohl(control->type);
414      control->len = ntohl(control->len);
415      control->repeat = ntohl(control->repeat);
416  }
417  
418  /*
419   * Register a single Chunk.
420   * Information sent by the source VM to inform the dest
421   * to register an single chunk of memory before we can perform
422   * the actual RDMA operation.
423   */
424  typedef struct QEMU_PACKED {
425      union QEMU_PACKED {
426          uint64_t current_addr;  /* offset into the ram_addr_t space */
427          uint64_t chunk;         /* chunk to lookup if unregistering */
428      } key;
429      uint32_t current_index; /* which ramblock the chunk belongs to */
430      uint32_t padding;
431      uint64_t chunks;            /* how many sequential chunks to register */
432  } RDMARegister;
433  
rdma_errored(RDMAContext * rdma)434  static bool rdma_errored(RDMAContext *rdma)
435  {
436      if (rdma->errored && !rdma->error_reported) {
437          error_report("RDMA is in an error state waiting migration"
438                       " to abort!");
439          rdma->error_reported = true;
440      }
441      return rdma->errored;
442  }
443  
register_to_network(RDMAContext * rdma,RDMARegister * reg)444  static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
445  {
446      RDMALocalBlock *local_block;
447      local_block  = &rdma->local_ram_blocks.block[reg->current_index];
448  
449      if (local_block->is_ram_block) {
450          /*
451           * current_addr as passed in is an address in the local ram_addr_t
452           * space, we need to translate this for the destination
453           */
454          reg->key.current_addr -= local_block->offset;
455          reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
456      }
457      reg->key.current_addr = htonll(reg->key.current_addr);
458      reg->current_index = htonl(reg->current_index);
459      reg->chunks = htonll(reg->chunks);
460  }
461  
network_to_register(RDMARegister * reg)462  static void network_to_register(RDMARegister *reg)
463  {
464      reg->key.current_addr = ntohll(reg->key.current_addr);
465      reg->current_index = ntohl(reg->current_index);
466      reg->chunks = ntohll(reg->chunks);
467  }
468  
469  typedef struct QEMU_PACKED {
470      uint32_t value;     /* if zero, we will madvise() */
471      uint32_t block_idx; /* which ram block index */
472      uint64_t offset;    /* Address in remote ram_addr_t space */
473      uint64_t length;    /* length of the chunk */
474  } RDMACompress;
475  
compress_to_network(RDMAContext * rdma,RDMACompress * comp)476  static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
477  {
478      comp->value = htonl(comp->value);
479      /*
480       * comp->offset as passed in is an address in the local ram_addr_t
481       * space, we need to translate this for the destination
482       */
483      comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
484      comp->offset += rdma->dest_blocks[comp->block_idx].offset;
485      comp->block_idx = htonl(comp->block_idx);
486      comp->offset = htonll(comp->offset);
487      comp->length = htonll(comp->length);
488  }
489  
network_to_compress(RDMACompress * comp)490  static void network_to_compress(RDMACompress *comp)
491  {
492      comp->value = ntohl(comp->value);
493      comp->block_idx = ntohl(comp->block_idx);
494      comp->offset = ntohll(comp->offset);
495      comp->length = ntohll(comp->length);
496  }
497  
498  /*
499   * The result of the dest's memory registration produces an "rkey"
500   * which the source VM must reference in order to perform
501   * the RDMA operation.
502   */
503  typedef struct QEMU_PACKED {
504      uint32_t rkey;
505      uint32_t padding;
506      uint64_t host_addr;
507  } RDMARegisterResult;
508  
result_to_network(RDMARegisterResult * result)509  static void result_to_network(RDMARegisterResult *result)
510  {
511      result->rkey = htonl(result->rkey);
512      result->host_addr = htonll(result->host_addr);
513  };
514  
network_to_result(RDMARegisterResult * result)515  static void network_to_result(RDMARegisterResult *result)
516  {
517      result->rkey = ntohl(result->rkey);
518      result->host_addr = ntohll(result->host_addr);
519  };
520  
521  static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
522                                     uint8_t *data, RDMAControlHeader *resp,
523                                     int *resp_idx,
524                                     int (*callback)(RDMAContext *rdma,
525                                                     Error **errp),
526                                     Error **errp);
527  
ram_chunk_index(const uint8_t * start,const uint8_t * host)528  static inline uint64_t ram_chunk_index(const uint8_t *start,
529                                         const uint8_t *host)
530  {
531      return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
532  }
533  
ram_chunk_start(const RDMALocalBlock * rdma_ram_block,uint64_t i)534  static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
535                                         uint64_t i)
536  {
537      return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
538                                    (i << RDMA_REG_CHUNK_SHIFT));
539  }
540  
ram_chunk_end(const RDMALocalBlock * rdma_ram_block,uint64_t i)541  static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
542                                       uint64_t i)
543  {
544      uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
545                                           (1UL << RDMA_REG_CHUNK_SHIFT);
546  
547      if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
548          result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
549      }
550  
551      return result;
552  }
553  
rdma_add_block(RDMAContext * rdma,const char * block_name,void * host_addr,ram_addr_t block_offset,uint64_t length)554  static void rdma_add_block(RDMAContext *rdma, const char *block_name,
555                             void *host_addr,
556                             ram_addr_t block_offset, uint64_t length)
557  {
558      RDMALocalBlocks *local = &rdma->local_ram_blocks;
559      RDMALocalBlock *block;
560      RDMALocalBlock *old = local->block;
561  
562      local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
563  
564      if (local->nb_blocks) {
565          if (rdma->blockmap) {
566              for (int x = 0; x < local->nb_blocks; x++) {
567                  g_hash_table_remove(rdma->blockmap,
568                                      (void *)(uintptr_t)old[x].offset);
569                  g_hash_table_insert(rdma->blockmap,
570                                      (void *)(uintptr_t)old[x].offset,
571                                      &local->block[x]);
572              }
573          }
574          memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
575          g_free(old);
576      }
577  
578      block = &local->block[local->nb_blocks];
579  
580      block->block_name = g_strdup(block_name);
581      block->local_host_addr = host_addr;
582      block->offset = block_offset;
583      block->length = length;
584      block->index = local->nb_blocks;
585      block->src_index = ~0U; /* Filled in by the receipt of the block list */
586      block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
587      block->transit_bitmap = bitmap_new(block->nb_chunks);
588      bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
589      block->unregister_bitmap = bitmap_new(block->nb_chunks);
590      bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
591      block->remote_keys = g_new0(uint32_t, block->nb_chunks);
592  
593      block->is_ram_block = local->init ? false : true;
594  
595      if (rdma->blockmap) {
596          g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
597      }
598  
599      trace_rdma_add_block(block_name, local->nb_blocks,
600                           (uintptr_t) block->local_host_addr,
601                           block->offset, block->length,
602                           (uintptr_t) (block->local_host_addr + block->length),
603                           BITS_TO_LONGS(block->nb_chunks) *
604                               sizeof(unsigned long) * 8,
605                           block->nb_chunks);
606  
607      local->nb_blocks++;
608  }
609  
610  /*
611   * Memory regions need to be registered with the device and queue pairs setup
612   * in advanced before the migration starts. This tells us where the RAM blocks
613   * are so that we can register them individually.
614   */
qemu_rdma_init_one_block(RAMBlock * rb,void * opaque)615  static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
616  {
617      const char *block_name = qemu_ram_get_idstr(rb);
618      void *host_addr = qemu_ram_get_host_addr(rb);
619      ram_addr_t block_offset = qemu_ram_get_offset(rb);
620      ram_addr_t length = qemu_ram_get_used_length(rb);
621      rdma_add_block(opaque, block_name, host_addr, block_offset, length);
622      return 0;
623  }
624  
625  /*
626   * Identify the RAMBlocks and their quantity. They will be references to
627   * identify chunk boundaries inside each RAMBlock and also be referenced
628   * during dynamic page registration.
629   */
qemu_rdma_init_ram_blocks(RDMAContext * rdma)630  static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
631  {
632      RDMALocalBlocks *local = &rdma->local_ram_blocks;
633      int ret;
634  
635      assert(rdma->blockmap == NULL);
636      memset(local, 0, sizeof *local);
637      ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
638      assert(!ret);
639      trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
640      rdma->dest_blocks = g_new0(RDMADestBlock,
641                                 rdma->local_ram_blocks.nb_blocks);
642      local->init = true;
643  }
644  
645  /*
646   * Note: If used outside of cleanup, the caller must ensure that the destination
647   * block structures are also updated
648   */
rdma_delete_block(RDMAContext * rdma,RDMALocalBlock * block)649  static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
650  {
651      RDMALocalBlocks *local = &rdma->local_ram_blocks;
652      RDMALocalBlock *old = local->block;
653  
654      if (rdma->blockmap) {
655          g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
656      }
657      if (block->pmr) {
658          for (int j = 0; j < block->nb_chunks; j++) {
659              if (!block->pmr[j]) {
660                  continue;
661              }
662              ibv_dereg_mr(block->pmr[j]);
663              rdma->total_registrations--;
664          }
665          g_free(block->pmr);
666          block->pmr = NULL;
667      }
668  
669      if (block->mr) {
670          ibv_dereg_mr(block->mr);
671          rdma->total_registrations--;
672          block->mr = NULL;
673      }
674  
675      g_free(block->transit_bitmap);
676      block->transit_bitmap = NULL;
677  
678      g_free(block->unregister_bitmap);
679      block->unregister_bitmap = NULL;
680  
681      g_free(block->remote_keys);
682      block->remote_keys = NULL;
683  
684      g_free(block->block_name);
685      block->block_name = NULL;
686  
687      if (rdma->blockmap) {
688          for (int x = 0; x < local->nb_blocks; x++) {
689              g_hash_table_remove(rdma->blockmap,
690                                  (void *)(uintptr_t)old[x].offset);
691          }
692      }
693  
694      if (local->nb_blocks > 1) {
695  
696          local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
697  
698          if (block->index) {
699              memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
700          }
701  
702          if (block->index < (local->nb_blocks - 1)) {
703              memcpy(local->block + block->index, old + (block->index + 1),
704                  sizeof(RDMALocalBlock) *
705                      (local->nb_blocks - (block->index + 1)));
706              for (int x = block->index; x < local->nb_blocks - 1; x++) {
707                  local->block[x].index--;
708              }
709          }
710      } else {
711          assert(block == local->block);
712          local->block = NULL;
713      }
714  
715      trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
716                             block->offset, block->length,
717                              (uintptr_t)(block->local_host_addr + block->length),
718                             BITS_TO_LONGS(block->nb_chunks) *
719                                 sizeof(unsigned long) * 8, block->nb_chunks);
720  
721      g_free(old);
722  
723      local->nb_blocks--;
724  
725      if (local->nb_blocks && rdma->blockmap) {
726          for (int x = 0; x < local->nb_blocks; x++) {
727              g_hash_table_insert(rdma->blockmap,
728                                  (void *)(uintptr_t)local->block[x].offset,
729                                  &local->block[x]);
730          }
731      }
732  }
733  
734  /*
735   * Trace RDMA device open, with device details.
736   */
qemu_rdma_dump_id(const char * who,struct ibv_context * verbs)737  static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
738  {
739      struct ibv_port_attr port;
740  
741      if (ibv_query_port(verbs, 1, &port)) {
742          trace_qemu_rdma_dump_id_failed(who);
743          return;
744      }
745  
746      trace_qemu_rdma_dump_id(who,
747                  verbs->device->name,
748                  verbs->device->dev_name,
749                  verbs->device->dev_path,
750                  verbs->device->ibdev_path,
751                  port.link_layer,
752                  port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband"
753                  : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet"
754                  : "Unknown");
755  }
756  
757  /*
758   * Trace RDMA gid addressing information.
759   * Useful for understanding the RDMA device hierarchy in the kernel.
760   */
qemu_rdma_dump_gid(const char * who,struct rdma_cm_id * id)761  static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
762  {
763      char sgid[33];
764      char dgid[33];
765      inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
766      inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
767      trace_qemu_rdma_dump_gid(who, sgid, dgid);
768  }
769  
770  /*
771   * As of now, IPv6 over RoCE / iWARP is not supported by linux.
772   * We will try the next addrinfo struct, and fail if there are
773   * no other valid addresses to bind against.
774   *
775   * If user is listening on '[::]', then we will not have a opened a device
776   * yet and have no way of verifying if the device is RoCE or not.
777   *
778   * In this case, the source VM will throw an error for ALL types of
779   * connections (both IPv4 and IPv6) if the destination machine does not have
780   * a regular infiniband network available for use.
781   *
782   * The only way to guarantee that an error is thrown for broken kernels is
783   * for the management software to choose a *specific* interface at bind time
784   * and validate what time of hardware it is.
785   *
786   * Unfortunately, this puts the user in a fix:
787   *
788   *  If the source VM connects with an IPv4 address without knowing that the
789   *  destination has bound to '[::]' the migration will unconditionally fail
790   *  unless the management software is explicitly listening on the IPv4
791   *  address while using a RoCE-based device.
792   *
793   *  If the source VM connects with an IPv6 address, then we're OK because we can
794   *  throw an error on the source (and similarly on the destination).
795   *
796   *  But in mixed environments, this will be broken for a while until it is fixed
797   *  inside linux.
798   *
799   * We do provide a *tiny* bit of help in this function: We can list all of the
800   * devices in the system and check to see if all the devices are RoCE or
801   * Infiniband.
802   *
803   * If we detect that we have a *pure* RoCE environment, then we can safely
804   * thrown an error even if the management software has specified '[::]' as the
805   * bind address.
806   *
807   * However, if there is are multiple hetergeneous devices, then we cannot make
808   * this assumption and the user just has to be sure they know what they are
809   * doing.
810   *
811   * Patches are being reviewed on linux-rdma.
812   */
qemu_rdma_broken_ipv6_kernel(struct ibv_context * verbs,Error ** errp)813  static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
814  {
815      /* This bug only exists in linux, to our knowledge. */
816  #ifdef CONFIG_LINUX
817      struct ibv_port_attr port_attr;
818  
819      /*
820       * Verbs are only NULL if management has bound to '[::]'.
821       *
822       * Let's iterate through all the devices and see if there any pure IB
823       * devices (non-ethernet).
824       *
825       * If not, then we can safely proceed with the migration.
826       * Otherwise, there are no guarantees until the bug is fixed in linux.
827       */
828      if (!verbs) {
829          int num_devices;
830          struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
831          bool roce_found = false;
832          bool ib_found = false;
833  
834          for (int x = 0; x < num_devices; x++) {
835              verbs = ibv_open_device(dev_list[x]);
836              /*
837               * ibv_open_device() is not documented to set errno.  If
838               * it does, it's somebody else's doc bug.  If it doesn't,
839               * the use of errno below is wrong.
840               * TODO Find out whether ibv_open_device() sets errno.
841               */
842              if (!verbs) {
843                  if (errno == EPERM) {
844                      continue;
845                  } else {
846                      error_setg_errno(errp, errno,
847                                       "could not open RDMA device context");
848                      return -1;
849                  }
850              }
851  
852              if (ibv_query_port(verbs, 1, &port_attr)) {
853                  ibv_close_device(verbs);
854                  error_setg(errp,
855                             "RDMA ERROR: Could not query initial IB port");
856                  return -1;
857              }
858  
859              if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
860                  ib_found = true;
861              } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
862                  roce_found = true;
863              }
864  
865              ibv_close_device(verbs);
866  
867          }
868  
869          if (roce_found) {
870              if (ib_found) {
871                  warn_report("migrations may fail:"
872                              " IPv6 over RoCE / iWARP in linux"
873                              " is broken. But since you appear to have a"
874                              " mixed RoCE / IB environment, be sure to only"
875                              " migrate over the IB fabric until the kernel "
876                              " fixes the bug.");
877              } else {
878                  error_setg(errp, "RDMA ERROR: "
879                             "You only have RoCE / iWARP devices in your systems"
880                             " and your management software has specified '[::]'"
881                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
882                  return -1;
883              }
884          }
885  
886          return 0;
887      }
888  
889      /*
890       * If we have a verbs context, that means that some other than '[::]' was
891       * used by the management software for binding. In which case we can
892       * actually warn the user about a potentially broken kernel.
893       */
894  
895      /* IB ports start with 1, not 0 */
896      if (ibv_query_port(verbs, 1, &port_attr)) {
897          error_setg(errp, "RDMA ERROR: Could not query initial IB port");
898          return -1;
899      }
900  
901      if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
902          error_setg(errp, "RDMA ERROR: "
903                     "Linux kernel's RoCE / iWARP does not support IPv6 "
904                     "(but patches on linux-rdma in progress)");
905          return -1;
906      }
907  
908  #endif
909  
910      return 0;
911  }
912  
913  /*
914   * Figure out which RDMA device corresponds to the requested IP hostname
915   * Also create the initial connection manager identifiers for opening
916   * the connection.
917   */
qemu_rdma_resolve_host(RDMAContext * rdma,Error ** errp)918  static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
919  {
920      Error *err = NULL;
921      int ret;
922      struct rdma_addrinfo *res;
923      char port_str[16];
924      struct rdma_cm_event *cm_event;
925      char ip[40] = "unknown";
926  
927      if (rdma->host == NULL || !strcmp(rdma->host, "")) {
928          error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
929          return -1;
930      }
931  
932      /* create CM channel */
933      rdma->channel = rdma_create_event_channel();
934      if (!rdma->channel) {
935          error_setg(errp, "RDMA ERROR: could not create CM channel");
936          return -1;
937      }
938  
939      /* create CM id */
940      ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
941      if (ret < 0) {
942          error_setg(errp, "RDMA ERROR: could not create channel id");
943          goto err_resolve_create_id;
944      }
945  
946      snprintf(port_str, 16, "%d", rdma->port);
947      port_str[15] = '\0';
948  
949      ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
950      if (ret) {
951          error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
952                     rdma->host);
953          goto err_resolve_get_addr;
954      }
955  
956      /* Try all addresses, saving the first error in @err */
957      for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) {
958          Error **local_errp = err ? NULL : &err;
959  
960          inet_ntop(e->ai_family,
961              &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
962          trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
963  
964          ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
965                  RDMA_RESOLVE_TIMEOUT_MS);
966          if (ret >= 0) {
967              if (e->ai_family == AF_INET6) {
968                  ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs,
969                                                     local_errp);
970                  if (ret < 0) {
971                      continue;
972                  }
973              }
974              error_free(err);
975              goto route;
976          }
977      }
978  
979      rdma_freeaddrinfo(res);
980      if (err) {
981          error_propagate(errp, err);
982      } else {
983          error_setg(errp, "RDMA ERROR: could not resolve address %s",
984                     rdma->host);
985      }
986      goto err_resolve_get_addr;
987  
988  route:
989      rdma_freeaddrinfo(res);
990      qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
991  
992      ret = rdma_get_cm_event(rdma->channel, &cm_event);
993      if (ret < 0) {
994          error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
995          goto err_resolve_get_addr;
996      }
997  
998      if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
999          error_setg(errp,
1000                     "RDMA ERROR: result not equal to event_addr_resolved %s",
1001                     rdma_event_str(cm_event->event));
1002          rdma_ack_cm_event(cm_event);
1003          goto err_resolve_get_addr;
1004      }
1005      rdma_ack_cm_event(cm_event);
1006  
1007      /* resolve route */
1008      ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1009      if (ret < 0) {
1010          error_setg(errp, "RDMA ERROR: could not resolve rdma route");
1011          goto err_resolve_get_addr;
1012      }
1013  
1014      ret = rdma_get_cm_event(rdma->channel, &cm_event);
1015      if (ret < 0) {
1016          error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
1017          goto err_resolve_get_addr;
1018      }
1019      if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1020          error_setg(errp, "RDMA ERROR: "
1021                     "result not equal to event_route_resolved: %s",
1022                     rdma_event_str(cm_event->event));
1023          rdma_ack_cm_event(cm_event);
1024          goto err_resolve_get_addr;
1025      }
1026      rdma_ack_cm_event(cm_event);
1027      rdma->verbs = rdma->cm_id->verbs;
1028      qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1029      qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1030      return 0;
1031  
1032  err_resolve_get_addr:
1033      rdma_destroy_id(rdma->cm_id);
1034      rdma->cm_id = NULL;
1035  err_resolve_create_id:
1036      rdma_destroy_event_channel(rdma->channel);
1037      rdma->channel = NULL;
1038      return -1;
1039  }
1040  
1041  /*
1042   * Create protection domain and completion queues
1043   */
qemu_rdma_alloc_pd_cq(RDMAContext * rdma,Error ** errp)1044  static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp)
1045  {
1046      /* allocate pd */
1047      rdma->pd = ibv_alloc_pd(rdma->verbs);
1048      if (!rdma->pd) {
1049          error_setg(errp, "failed to allocate protection domain");
1050          return -1;
1051      }
1052  
1053      /* create receive completion channel */
1054      rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1055      if (!rdma->recv_comp_channel) {
1056          error_setg(errp, "failed to allocate receive completion channel");
1057          goto err_alloc_pd_cq;
1058      }
1059  
1060      /*
1061       * Completion queue can be filled by read work requests.
1062       */
1063      rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1064                                    NULL, rdma->recv_comp_channel, 0);
1065      if (!rdma->recv_cq) {
1066          error_setg(errp, "failed to allocate receive completion queue");
1067          goto err_alloc_pd_cq;
1068      }
1069  
1070      /* create send completion channel */
1071      rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1072      if (!rdma->send_comp_channel) {
1073          error_setg(errp, "failed to allocate send completion channel");
1074          goto err_alloc_pd_cq;
1075      }
1076  
1077      rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1078                                    NULL, rdma->send_comp_channel, 0);
1079      if (!rdma->send_cq) {
1080          error_setg(errp, "failed to allocate send completion queue");
1081          goto err_alloc_pd_cq;
1082      }
1083  
1084      return 0;
1085  
1086  err_alloc_pd_cq:
1087      if (rdma->pd) {
1088          ibv_dealloc_pd(rdma->pd);
1089      }
1090      if (rdma->recv_comp_channel) {
1091          ibv_destroy_comp_channel(rdma->recv_comp_channel);
1092      }
1093      if (rdma->send_comp_channel) {
1094          ibv_destroy_comp_channel(rdma->send_comp_channel);
1095      }
1096      if (rdma->recv_cq) {
1097          ibv_destroy_cq(rdma->recv_cq);
1098          rdma->recv_cq = NULL;
1099      }
1100      rdma->pd = NULL;
1101      rdma->recv_comp_channel = NULL;
1102      rdma->send_comp_channel = NULL;
1103      return -1;
1104  
1105  }
1106  
1107  /*
1108   * Create queue pairs.
1109   */
qemu_rdma_alloc_qp(RDMAContext * rdma)1110  static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1111  {
1112      struct ibv_qp_init_attr attr = { 0 };
1113  
1114      attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1115      attr.cap.max_recv_wr = 3;
1116      attr.cap.max_send_sge = 1;
1117      attr.cap.max_recv_sge = 1;
1118      attr.send_cq = rdma->send_cq;
1119      attr.recv_cq = rdma->recv_cq;
1120      attr.qp_type = IBV_QPT_RC;
1121  
1122      if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) {
1123          return -1;
1124      }
1125  
1126      rdma->qp = rdma->cm_id->qp;
1127      return 0;
1128  }
1129  
1130  /* Check whether On-Demand Paging is supported by RDAM device */
rdma_support_odp(struct ibv_context * dev)1131  static bool rdma_support_odp(struct ibv_context *dev)
1132  {
1133      struct ibv_device_attr_ex attr = {0};
1134  
1135      if (ibv_query_device_ex(dev, NULL, &attr)) {
1136          return false;
1137      }
1138  
1139      if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1140          return true;
1141      }
1142  
1143      return false;
1144  }
1145  
1146  /*
1147   * ibv_advise_mr to avoid RNR NAK error as far as possible.
1148   * The responder mr registering with ODP will sent RNR NAK back to
1149   * the requester in the face of the page fault.
1150   */
qemu_rdma_advise_prefetch_mr(struct ibv_pd * pd,uint64_t addr,uint32_t len,uint32_t lkey,const char * name,bool wr)1151  static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1152                                           uint32_t len,  uint32_t lkey,
1153                                           const char *name, bool wr)
1154  {
1155  #ifdef HAVE_IBV_ADVISE_MR
1156      int ret;
1157      int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1158                   IBV_ADVISE_MR_ADVICE_PREFETCH;
1159      struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1160  
1161      ret = ibv_advise_mr(pd, advice,
1162                          IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1163      /* ignore the error */
1164      trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
1165  #endif
1166  }
1167  
qemu_rdma_reg_whole_ram_blocks(RDMAContext * rdma,Error ** errp)1168  static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp)
1169  {
1170      int i;
1171      RDMALocalBlocks *local = &rdma->local_ram_blocks;
1172  
1173      for (i = 0; i < local->nb_blocks; i++) {
1174          int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1175  
1176          local->block[i].mr =
1177              ibv_reg_mr(rdma->pd,
1178                      local->block[i].local_host_addr,
1179                      local->block[i].length, access
1180                      );
1181          /*
1182           * ibv_reg_mr() is not documented to set errno.  If it does,
1183           * it's somebody else's doc bug.  If it doesn't, the use of
1184           * errno below is wrong.
1185           * TODO Find out whether ibv_reg_mr() sets errno.
1186           */
1187          if (!local->block[i].mr &&
1188              errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1189                  access |= IBV_ACCESS_ON_DEMAND;
1190                  /* register ODP mr */
1191                  local->block[i].mr =
1192                      ibv_reg_mr(rdma->pd,
1193                                 local->block[i].local_host_addr,
1194                                 local->block[i].length, access);
1195                  trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1196  
1197                  if (local->block[i].mr) {
1198                      qemu_rdma_advise_prefetch_mr(rdma->pd,
1199                                      (uintptr_t)local->block[i].local_host_addr,
1200                                      local->block[i].length,
1201                                      local->block[i].mr->lkey,
1202                                      local->block[i].block_name,
1203                                      true);
1204                  }
1205          }
1206  
1207          if (!local->block[i].mr) {
1208              error_setg_errno(errp, errno,
1209                               "Failed to register local dest ram block!");
1210              goto err;
1211          }
1212          rdma->total_registrations++;
1213      }
1214  
1215      return 0;
1216  
1217  err:
1218      for (i--; i >= 0; i--) {
1219          ibv_dereg_mr(local->block[i].mr);
1220          local->block[i].mr = NULL;
1221          rdma->total_registrations--;
1222      }
1223  
1224      return -1;
1225  
1226  }
1227  
1228  /*
1229   * Find the ram block that corresponds to the page requested to be
1230   * transmitted by QEMU.
1231   *
1232   * Once the block is found, also identify which 'chunk' within that
1233   * block that the page belongs to.
1234   */
qemu_rdma_search_ram_block(RDMAContext * rdma,uintptr_t block_offset,uint64_t offset,uint64_t length,uint64_t * block_index,uint64_t * chunk_index)1235  static void qemu_rdma_search_ram_block(RDMAContext *rdma,
1236                                         uintptr_t block_offset,
1237                                         uint64_t offset,
1238                                         uint64_t length,
1239                                         uint64_t *block_index,
1240                                         uint64_t *chunk_index)
1241  {
1242      uint64_t current_addr = block_offset + offset;
1243      RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1244                                                  (void *) block_offset);
1245      assert(block);
1246      assert(current_addr >= block->offset);
1247      assert((current_addr + length) <= (block->offset + block->length));
1248  
1249      *block_index = block->index;
1250      *chunk_index = ram_chunk_index(block->local_host_addr,
1251                  block->local_host_addr + (current_addr - block->offset));
1252  }
1253  
1254  /*
1255   * Register a chunk with IB. If the chunk was already registered
1256   * previously, then skip.
1257   *
1258   * Also return the keys associated with the registration needed
1259   * to perform the actual RDMA operation.
1260   */
qemu_rdma_register_and_get_keys(RDMAContext * rdma,RDMALocalBlock * block,uintptr_t host_addr,uint32_t * lkey,uint32_t * rkey,int chunk,uint8_t * chunk_start,uint8_t * chunk_end)1261  static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1262          RDMALocalBlock *block, uintptr_t host_addr,
1263          uint32_t *lkey, uint32_t *rkey, int chunk,
1264          uint8_t *chunk_start, uint8_t *chunk_end)
1265  {
1266      if (block->mr) {
1267          if (lkey) {
1268              *lkey = block->mr->lkey;
1269          }
1270          if (rkey) {
1271              *rkey = block->mr->rkey;
1272          }
1273          return 0;
1274      }
1275  
1276      /* allocate memory to store chunk MRs */
1277      if (!block->pmr) {
1278          block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1279      }
1280  
1281      /*
1282       * If 'rkey', then we're the destination, so grant access to the source.
1283       *
1284       * If 'lkey', then we're the source VM, so grant access only to ourselves.
1285       */
1286      if (!block->pmr[chunk]) {
1287          uint64_t len = chunk_end - chunk_start;
1288          int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1289                       0;
1290  
1291          trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1292  
1293          block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1294          /*
1295           * ibv_reg_mr() is not documented to set errno.  If it does,
1296           * it's somebody else's doc bug.  If it doesn't, the use of
1297           * errno below is wrong.
1298           * TODO Find out whether ibv_reg_mr() sets errno.
1299           */
1300          if (!block->pmr[chunk] &&
1301              errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1302              access |= IBV_ACCESS_ON_DEMAND;
1303              /* register ODP mr */
1304              block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1305              trace_qemu_rdma_register_odp_mr(block->block_name);
1306  
1307              if (block->pmr[chunk]) {
1308                  qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1309                                              len, block->pmr[chunk]->lkey,
1310                                              block->block_name, rkey);
1311  
1312              }
1313          }
1314      }
1315      if (!block->pmr[chunk]) {
1316          return -1;
1317      }
1318      rdma->total_registrations++;
1319  
1320      if (lkey) {
1321          *lkey = block->pmr[chunk]->lkey;
1322      }
1323      if (rkey) {
1324          *rkey = block->pmr[chunk]->rkey;
1325      }
1326      return 0;
1327  }
1328  
1329  /*
1330   * Register (at connection time) the memory used for control
1331   * channel messages.
1332   */
qemu_rdma_reg_control(RDMAContext * rdma,int idx)1333  static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1334  {
1335      rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1336              rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1337              IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1338      if (rdma->wr_data[idx].control_mr) {
1339          rdma->total_registrations++;
1340          return 0;
1341      }
1342      return -1;
1343  }
1344  
1345  /*
1346   * Perform a non-optimized memory unregistration after every transfer
1347   * for demonstration purposes, only if pin-all is not requested.
1348   *
1349   * Potential optimizations:
1350   * 1. Start a new thread to run this function continuously
1351          - for bit clearing
1352          - and for receipt of unregister messages
1353   * 2. Use an LRU.
1354   * 3. Use workload hints.
1355   */
qemu_rdma_unregister_waiting(RDMAContext * rdma)1356  static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1357  {
1358      Error *err = NULL;
1359  
1360      while (rdma->unregistrations[rdma->unregister_current]) {
1361          int ret;
1362          uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1363          uint64_t chunk =
1364              (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1365          uint64_t index =
1366              (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1367          RDMALocalBlock *block =
1368              &(rdma->local_ram_blocks.block[index]);
1369          RDMARegister reg = { .current_index = index };
1370          RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1371                                   };
1372          RDMAControlHeader head = { .len = sizeof(RDMARegister),
1373                                     .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1374                                     .repeat = 1,
1375                                   };
1376  
1377          trace_qemu_rdma_unregister_waiting_proc(chunk,
1378                                                  rdma->unregister_current);
1379  
1380          rdma->unregistrations[rdma->unregister_current] = 0;
1381          rdma->unregister_current++;
1382  
1383          if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1384              rdma->unregister_current = 0;
1385          }
1386  
1387  
1388          /*
1389           * Unregistration is speculative (because migration is single-threaded
1390           * and we cannot break the protocol's inifinband message ordering).
1391           * Thus, if the memory is currently being used for transmission,
1392           * then abort the attempt to unregister and try again
1393           * later the next time a completion is received for this memory.
1394           */
1395          clear_bit(chunk, block->unregister_bitmap);
1396  
1397          if (test_bit(chunk, block->transit_bitmap)) {
1398              trace_qemu_rdma_unregister_waiting_inflight(chunk);
1399              continue;
1400          }
1401  
1402          trace_qemu_rdma_unregister_waiting_send(chunk);
1403  
1404          ret = ibv_dereg_mr(block->pmr[chunk]);
1405          block->pmr[chunk] = NULL;
1406          block->remote_keys[chunk] = 0;
1407  
1408          if (ret != 0) {
1409              error_report("unregistration chunk failed: %s",
1410                           strerror(ret));
1411              return -1;
1412          }
1413          rdma->total_registrations--;
1414  
1415          reg.key.chunk = chunk;
1416          register_to_network(rdma, &reg);
1417          ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1418                                        &resp, NULL, NULL, &err);
1419          if (ret < 0) {
1420              error_report_err(err);
1421              return -1;
1422          }
1423  
1424          trace_qemu_rdma_unregister_waiting_complete(chunk);
1425      }
1426  
1427      return 0;
1428  }
1429  
qemu_rdma_make_wrid(uint64_t wr_id,uint64_t index,uint64_t chunk)1430  static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1431                                           uint64_t chunk)
1432  {
1433      uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1434  
1435      result |= (index << RDMA_WRID_BLOCK_SHIFT);
1436      result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1437  
1438      return result;
1439  }
1440  
1441  /*
1442   * Consult the connection manager to see a work request
1443   * (of any kind) has completed.
1444   * Return the work request ID that completed.
1445   */
qemu_rdma_poll(RDMAContext * rdma,struct ibv_cq * cq,uint64_t * wr_id_out,uint32_t * byte_len)1446  static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1447                            uint64_t *wr_id_out, uint32_t *byte_len)
1448  {
1449      int ret;
1450      struct ibv_wc wc;
1451      uint64_t wr_id;
1452  
1453      ret = ibv_poll_cq(cq, 1, &wc);
1454  
1455      if (!ret) {
1456          *wr_id_out = RDMA_WRID_NONE;
1457          return 0;
1458      }
1459  
1460      if (ret < 0) {
1461          return -1;
1462      }
1463  
1464      wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1465  
1466      if (wc.status != IBV_WC_SUCCESS) {
1467          return -1;
1468      }
1469  
1470      if (rdma->control_ready_expected &&
1471          (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1472          trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
1473                                    rdma->nb_sent);
1474          rdma->control_ready_expected = 0;
1475      }
1476  
1477      if (wr_id == RDMA_WRID_RDMA_WRITE) {
1478          uint64_t chunk =
1479              (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1480          uint64_t index =
1481              (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1482          RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1483  
1484          trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
1485                                     index, chunk, block->local_host_addr,
1486                                     (void *)(uintptr_t)block->remote_host_addr);
1487  
1488          clear_bit(chunk, block->transit_bitmap);
1489  
1490          if (rdma->nb_sent > 0) {
1491              rdma->nb_sent--;
1492          }
1493      } else {
1494          trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
1495      }
1496  
1497      *wr_id_out = wc.wr_id;
1498      if (byte_len) {
1499          *byte_len = wc.byte_len;
1500      }
1501  
1502      return  0;
1503  }
1504  
1505  /* Wait for activity on the completion channel.
1506   * Returns 0 on success, none-0 on error.
1507   */
qemu_rdma_wait_comp_channel(RDMAContext * rdma,struct ibv_comp_channel * comp_channel)1508  static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1509                                         struct ibv_comp_channel *comp_channel)
1510  {
1511      struct rdma_cm_event *cm_event;
1512  
1513      /*
1514       * Coroutine doesn't start until migration_fd_process_incoming()
1515       * so don't yield unless we know we're running inside of a coroutine.
1516       */
1517      if (rdma->migration_started_on_destination &&
1518          migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1519          yield_until_fd_readable(comp_channel->fd);
1520      } else {
1521          /* This is the source side, we're in a separate thread
1522           * or destination prior to migration_fd_process_incoming()
1523           * after postcopy, the destination also in a separate thread.
1524           * we can't yield; so we have to poll the fd.
1525           * But we need to be able to handle 'cancel' or an error
1526           * without hanging forever.
1527           */
1528          while (!rdma->errored && !rdma->received_error) {
1529              GPollFD pfds[2];
1530              pfds[0].fd = comp_channel->fd;
1531              pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1532              pfds[0].revents = 0;
1533  
1534              pfds[1].fd = rdma->channel->fd;
1535              pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1536              pfds[1].revents = 0;
1537  
1538              /* 0.1s timeout, should be fine for a 'cancel' */
1539              switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1540              case 2:
1541              case 1: /* fd active */
1542                  if (pfds[0].revents) {
1543                      return 0;
1544                  }
1545  
1546                  if (pfds[1].revents) {
1547                      if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
1548                          return -1;
1549                      }
1550  
1551                      if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1552                          cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1553                          rdma_ack_cm_event(cm_event);
1554                          return -1;
1555                      }
1556                      rdma_ack_cm_event(cm_event);
1557                  }
1558                  break;
1559  
1560              case 0: /* Timeout, go around again */
1561                  break;
1562  
1563              default: /* Error of some type -
1564                        * I don't trust errno from qemu_poll_ns
1565                       */
1566                  return -1;
1567              }
1568  
1569              if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1570                  /* Bail out and let the cancellation happen */
1571                  return -1;
1572              }
1573          }
1574      }
1575  
1576      if (rdma->received_error) {
1577          return -1;
1578      }
1579      return -rdma->errored;
1580  }
1581  
to_channel(RDMAContext * rdma,uint64_t wrid)1582  static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
1583  {
1584      return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1585             rdma->recv_comp_channel;
1586  }
1587  
to_cq(RDMAContext * rdma,uint64_t wrid)1588  static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
1589  {
1590      return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1591  }
1592  
1593  /*
1594   * Block until the next work request has completed.
1595   *
1596   * First poll to see if a work request has already completed,
1597   * otherwise block.
1598   *
1599   * If we encounter completed work requests for IDs other than
1600   * the one we're interested in, then that's generally an error.
1601   *
1602   * The only exception is actual RDMA Write completions. These
1603   * completions only need to be recorded, but do not actually
1604   * need further processing.
1605   */
qemu_rdma_block_for_wrid(RDMAContext * rdma,uint64_t wrid_requested,uint32_t * byte_len)1606  static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
1607                                      uint64_t wrid_requested,
1608                                      uint32_t *byte_len)
1609  {
1610      int num_cq_events = 0, ret;
1611      struct ibv_cq *cq;
1612      void *cq_ctx;
1613      uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1614      struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1615      struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1616  
1617      if (ibv_req_notify_cq(poll_cq, 0)) {
1618          return -1;
1619      }
1620      /* poll cq first */
1621      while (wr_id != wrid_requested) {
1622          ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1623          if (ret < 0) {
1624              return -1;
1625          }
1626  
1627          wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1628  
1629          if (wr_id == RDMA_WRID_NONE) {
1630              break;
1631          }
1632          if (wr_id != wrid_requested) {
1633              trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1634          }
1635      }
1636  
1637      if (wr_id == wrid_requested) {
1638          return 0;
1639      }
1640  
1641      while (1) {
1642          ret = qemu_rdma_wait_comp_channel(rdma, ch);
1643          if (ret < 0) {
1644              goto err_block_for_wrid;
1645          }
1646  
1647          ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1648          if (ret < 0) {
1649              goto err_block_for_wrid;
1650          }
1651  
1652          num_cq_events++;
1653  
1654          if (ibv_req_notify_cq(cq, 0)) {
1655              goto err_block_for_wrid;
1656          }
1657  
1658          while (wr_id != wrid_requested) {
1659              ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1660              if (ret < 0) {
1661                  goto err_block_for_wrid;
1662              }
1663  
1664              wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1665  
1666              if (wr_id == RDMA_WRID_NONE) {
1667                  break;
1668              }
1669              if (wr_id != wrid_requested) {
1670                  trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1671              }
1672          }
1673  
1674          if (wr_id == wrid_requested) {
1675              goto success_block_for_wrid;
1676          }
1677      }
1678  
1679  success_block_for_wrid:
1680      if (num_cq_events) {
1681          ibv_ack_cq_events(cq, num_cq_events);
1682      }
1683      return 0;
1684  
1685  err_block_for_wrid:
1686      if (num_cq_events) {
1687          ibv_ack_cq_events(cq, num_cq_events);
1688      }
1689  
1690      rdma->errored = true;
1691      return -1;
1692  }
1693  
1694  /*
1695   * Post a SEND message work request for the control channel
1696   * containing some data and block until the post completes.
1697   */
qemu_rdma_post_send_control(RDMAContext * rdma,uint8_t * buf,RDMAControlHeader * head,Error ** errp)1698  static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1699                                         RDMAControlHeader *head,
1700                                         Error **errp)
1701  {
1702      int ret;
1703      RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1704      struct ibv_send_wr *bad_wr;
1705      struct ibv_sge sge = {
1706                             .addr = (uintptr_t)(wr->control),
1707                             .length = head->len + sizeof(RDMAControlHeader),
1708                             .lkey = wr->control_mr->lkey,
1709                           };
1710      struct ibv_send_wr send_wr = {
1711                                     .wr_id = RDMA_WRID_SEND_CONTROL,
1712                                     .opcode = IBV_WR_SEND,
1713                                     .send_flags = IBV_SEND_SIGNALED,
1714                                     .sg_list = &sge,
1715                                     .num_sge = 1,
1716                                  };
1717  
1718      trace_qemu_rdma_post_send_control(control_desc(head->type));
1719  
1720      /*
1721       * We don't actually need to do a memcpy() in here if we used
1722       * the "sge" properly, but since we're only sending control messages
1723       * (not RAM in a performance-critical path), then its OK for now.
1724       *
1725       * The copy makes the RDMAControlHeader simpler to manipulate
1726       * for the time being.
1727       */
1728      assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1729      memcpy(wr->control, head, sizeof(RDMAControlHeader));
1730      control_to_network((void *) wr->control);
1731  
1732      if (buf) {
1733          memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1734      }
1735  
1736  
1737      ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1738  
1739      if (ret > 0) {
1740          error_setg(errp, "Failed to use post IB SEND for control");
1741          return -1;
1742      }
1743  
1744      ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1745      if (ret < 0) {
1746          error_setg(errp, "rdma migration: send polling control error");
1747          return -1;
1748      }
1749  
1750      return 0;
1751  }
1752  
1753  /*
1754   * Post a RECV work request in anticipation of some future receipt
1755   * of data on the control channel.
1756   */
qemu_rdma_post_recv_control(RDMAContext * rdma,int idx,Error ** errp)1757  static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx,
1758                                         Error **errp)
1759  {
1760      struct ibv_recv_wr *bad_wr;
1761      struct ibv_sge sge = {
1762                              .addr = (uintptr_t)(rdma->wr_data[idx].control),
1763                              .length = RDMA_CONTROL_MAX_BUFFER,
1764                              .lkey = rdma->wr_data[idx].control_mr->lkey,
1765                           };
1766  
1767      struct ibv_recv_wr recv_wr = {
1768                                      .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1769                                      .sg_list = &sge,
1770                                      .num_sge = 1,
1771                                   };
1772  
1773  
1774      if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1775          error_setg(errp, "error posting control recv");
1776          return -1;
1777      }
1778  
1779      return 0;
1780  }
1781  
1782  /*
1783   * Block and wait for a RECV control channel message to arrive.
1784   */
qemu_rdma_exchange_get_response(RDMAContext * rdma,RDMAControlHeader * head,uint32_t expecting,int idx,Error ** errp)1785  static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1786                  RDMAControlHeader *head, uint32_t expecting, int idx,
1787                  Error **errp)
1788  {
1789      uint32_t byte_len;
1790      int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1791                                         &byte_len);
1792  
1793      if (ret < 0) {
1794          error_setg(errp, "rdma migration: recv polling control error!");
1795          return -1;
1796      }
1797  
1798      network_to_control((void *) rdma->wr_data[idx].control);
1799      memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1800  
1801      trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1802  
1803      if (expecting == RDMA_CONTROL_NONE) {
1804          trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1805                                               head->type);
1806      } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1807          error_setg(errp, "Was expecting a %s (%d) control message"
1808                  ", but got: %s (%d), length: %d",
1809                  control_desc(expecting), expecting,
1810                  control_desc(head->type), head->type, head->len);
1811          if (head->type == RDMA_CONTROL_ERROR) {
1812              rdma->received_error = true;
1813          }
1814          return -1;
1815      }
1816      if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1817          error_setg(errp, "too long length: %d", head->len);
1818          return -1;
1819      }
1820      if (sizeof(*head) + head->len != byte_len) {
1821          error_setg(errp, "Malformed length: %d byte_len %d",
1822                     head->len, byte_len);
1823          return -1;
1824      }
1825  
1826      return 0;
1827  }
1828  
1829  /*
1830   * When a RECV work request has completed, the work request's
1831   * buffer is pointed at the header.
1832   *
1833   * This will advance the pointer to the data portion
1834   * of the control message of the work request's buffer that
1835   * was populated after the work request finished.
1836   */
qemu_rdma_move_header(RDMAContext * rdma,int idx,RDMAControlHeader * head)1837  static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1838                                    RDMAControlHeader *head)
1839  {
1840      rdma->wr_data[idx].control_len = head->len;
1841      rdma->wr_data[idx].control_curr =
1842          rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1843  }
1844  
1845  /*
1846   * This is an 'atomic' high-level operation to deliver a single, unified
1847   * control-channel message.
1848   *
1849   * Additionally, if the user is expecting some kind of reply to this message,
1850   * they can request a 'resp' response message be filled in by posting an
1851   * additional work request on behalf of the user and waiting for an additional
1852   * completion.
1853   *
1854   * The extra (optional) response is used during registration to us from having
1855   * to perform an *additional* exchange of message just to provide a response by
1856   * instead piggy-backing on the acknowledgement.
1857   */
qemu_rdma_exchange_send(RDMAContext * rdma,RDMAControlHeader * head,uint8_t * data,RDMAControlHeader * resp,int * resp_idx,int (* callback)(RDMAContext * rdma,Error ** errp),Error ** errp)1858  static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1859                                     uint8_t *data, RDMAControlHeader *resp,
1860                                     int *resp_idx,
1861                                     int (*callback)(RDMAContext *rdma,
1862                                                     Error **errp),
1863                                     Error **errp)
1864  {
1865      int ret;
1866  
1867      /*
1868       * Wait until the dest is ready before attempting to deliver the message
1869       * by waiting for a READY message.
1870       */
1871      if (rdma->control_ready_expected) {
1872          RDMAControlHeader resp_ignored;
1873  
1874          ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
1875                                                RDMA_CONTROL_READY,
1876                                                RDMA_WRID_READY, errp);
1877          if (ret < 0) {
1878              return -1;
1879          }
1880      }
1881  
1882      /*
1883       * If the user is expecting a response, post a WR in anticipation of it.
1884       */
1885      if (resp) {
1886          ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp);
1887          if (ret < 0) {
1888              return -1;
1889          }
1890      }
1891  
1892      /*
1893       * Post a WR to replace the one we just consumed for the READY message.
1894       */
1895      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1896      if (ret < 0) {
1897          return -1;
1898      }
1899  
1900      /*
1901       * Deliver the control message that was requested.
1902       */
1903      ret = qemu_rdma_post_send_control(rdma, data, head, errp);
1904  
1905      if (ret < 0) {
1906          return -1;
1907      }
1908  
1909      /*
1910       * If we're expecting a response, block and wait for it.
1911       */
1912      if (resp) {
1913          if (callback) {
1914              trace_qemu_rdma_exchange_send_issue_callback();
1915              ret = callback(rdma, errp);
1916              if (ret < 0) {
1917                  return -1;
1918              }
1919          }
1920  
1921          trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1922          ret = qemu_rdma_exchange_get_response(rdma, resp,
1923                                                resp->type, RDMA_WRID_DATA,
1924                                                errp);
1925  
1926          if (ret < 0) {
1927              return -1;
1928          }
1929  
1930          qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1931          if (resp_idx) {
1932              *resp_idx = RDMA_WRID_DATA;
1933          }
1934          trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1935      }
1936  
1937      rdma->control_ready_expected = 1;
1938  
1939      return 0;
1940  }
1941  
1942  /*
1943   * This is an 'atomic' high-level operation to receive a single, unified
1944   * control-channel message.
1945   */
qemu_rdma_exchange_recv(RDMAContext * rdma,RDMAControlHeader * head,uint32_t expecting,Error ** errp)1946  static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1947                                     uint32_t expecting, Error **errp)
1948  {
1949      RDMAControlHeader ready = {
1950                                  .len = 0,
1951                                  .type = RDMA_CONTROL_READY,
1952                                  .repeat = 1,
1953                                };
1954      int ret;
1955  
1956      /*
1957       * Inform the source that we're ready to receive a message.
1958       */
1959      ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp);
1960  
1961      if (ret < 0) {
1962          return -1;
1963      }
1964  
1965      /*
1966       * Block and wait for the message.
1967       */
1968      ret = qemu_rdma_exchange_get_response(rdma, head,
1969                                            expecting, RDMA_WRID_READY, errp);
1970  
1971      if (ret < 0) {
1972          return -1;
1973      }
1974  
1975      qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1976  
1977      /*
1978       * Post a new RECV work request to replace the one we just consumed.
1979       */
1980      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1981      if (ret < 0) {
1982          return -1;
1983      }
1984  
1985      return 0;
1986  }
1987  
1988  /*
1989   * Write an actual chunk of memory using RDMA.
1990   *
1991   * If we're using dynamic registration on the dest-side, we have to
1992   * send a registration command first.
1993   */
qemu_rdma_write_one(RDMAContext * rdma,int current_index,uint64_t current_addr,uint64_t length,Error ** errp)1994  static int qemu_rdma_write_one(RDMAContext *rdma,
1995                                 int current_index, uint64_t current_addr,
1996                                 uint64_t length, Error **errp)
1997  {
1998      struct ibv_sge sge;
1999      struct ibv_send_wr send_wr = { 0 };
2000      struct ibv_send_wr *bad_wr;
2001      int reg_result_idx, ret, count = 0;
2002      uint64_t chunk, chunks;
2003      uint8_t *chunk_start, *chunk_end;
2004      RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2005      RDMARegister reg;
2006      RDMARegisterResult *reg_result;
2007      RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2008      RDMAControlHeader head = { .len = sizeof(RDMARegister),
2009                                 .type = RDMA_CONTROL_REGISTER_REQUEST,
2010                                 .repeat = 1,
2011                               };
2012  
2013  retry:
2014      sge.addr = (uintptr_t)(block->local_host_addr +
2015                              (current_addr - block->offset));
2016      sge.length = length;
2017  
2018      chunk = ram_chunk_index(block->local_host_addr,
2019                              (uint8_t *)(uintptr_t)sge.addr);
2020      chunk_start = ram_chunk_start(block, chunk);
2021  
2022      if (block->is_ram_block) {
2023          chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2024  
2025          if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2026              chunks--;
2027          }
2028      } else {
2029          chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2030  
2031          if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2032              chunks--;
2033          }
2034      }
2035  
2036      trace_qemu_rdma_write_one_top(chunks + 1,
2037                                    (chunks + 1) *
2038                                    (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2039  
2040      chunk_end = ram_chunk_end(block, chunk + chunks);
2041  
2042  
2043      while (test_bit(chunk, block->transit_bitmap)) {
2044          (void)count;
2045          trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2046                  sge.addr, length, rdma->nb_sent, block->nb_chunks);
2047  
2048          ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2049  
2050          if (ret < 0) {
2051              error_setg(errp, "Failed to Wait for previous write to complete "
2052                      "block %d chunk %" PRIu64
2053                      " current %" PRIu64 " len %" PRIu64 " %d",
2054                      current_index, chunk, sge.addr, length, rdma->nb_sent);
2055              return -1;
2056          }
2057      }
2058  
2059      if (!rdma->pin_all || !block->is_ram_block) {
2060          if (!block->remote_keys[chunk]) {
2061              /*
2062               * This chunk has not yet been registered, so first check to see
2063               * if the entire chunk is zero. If so, tell the other size to
2064               * memset() + madvise() the entire chunk without RDMA.
2065               */
2066  
2067              if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2068                  RDMACompress comp = {
2069                                          .offset = current_addr,
2070                                          .value = 0,
2071                                          .block_idx = current_index,
2072                                          .length = length,
2073                                      };
2074  
2075                  head.len = sizeof(comp);
2076                  head.type = RDMA_CONTROL_COMPRESS;
2077  
2078                  trace_qemu_rdma_write_one_zero(chunk, sge.length,
2079                                                 current_index, current_addr);
2080  
2081                  compress_to_network(rdma, &comp);
2082                  ret = qemu_rdma_exchange_send(rdma, &head,
2083                                  (uint8_t *) &comp, NULL, NULL, NULL, errp);
2084  
2085                  if (ret < 0) {
2086                      return -1;
2087                  }
2088  
2089                  /*
2090                   * TODO: Here we are sending something, but we are not
2091                   * accounting for anything transferred.  The following is wrong:
2092                   *
2093                   * stat64_add(&mig_stats.rdma_bytes, sge.length);
2094                   *
2095                   * because we are using some kind of compression.  I
2096                   * would think that head.len would be the more similar
2097                   * thing to a correct value.
2098                   */
2099                  stat64_add(&mig_stats.zero_pages,
2100                             sge.length / qemu_target_page_size());
2101                  return 1;
2102              }
2103  
2104              /*
2105               * Otherwise, tell other side to register.
2106               */
2107              reg.current_index = current_index;
2108              if (block->is_ram_block) {
2109                  reg.key.current_addr = current_addr;
2110              } else {
2111                  reg.key.chunk = chunk;
2112              }
2113              reg.chunks = chunks;
2114  
2115              trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2116                                                current_addr);
2117  
2118              register_to_network(rdma, &reg);
2119              ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2120                                      &resp, &reg_result_idx, NULL, errp);
2121              if (ret < 0) {
2122                  return -1;
2123              }
2124  
2125              /* try to overlap this single registration with the one we sent. */
2126              if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2127                                                  &sge.lkey, NULL, chunk,
2128                                                  chunk_start, chunk_end)) {
2129                  error_setg(errp, "cannot get lkey");
2130                  return -1;
2131              }
2132  
2133              reg_result = (RDMARegisterResult *)
2134                      rdma->wr_data[reg_result_idx].control_curr;
2135  
2136              network_to_result(reg_result);
2137  
2138              trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2139                                                   reg_result->rkey, chunk);
2140  
2141              block->remote_keys[chunk] = reg_result->rkey;
2142              block->remote_host_addr = reg_result->host_addr;
2143          } else {
2144              /* already registered before */
2145              if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2146                                                  &sge.lkey, NULL, chunk,
2147                                                  chunk_start, chunk_end)) {
2148                  error_setg(errp, "cannot get lkey!");
2149                  return -1;
2150              }
2151          }
2152  
2153          send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2154      } else {
2155          send_wr.wr.rdma.rkey = block->remote_rkey;
2156  
2157          if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2158                                                       &sge.lkey, NULL, chunk,
2159                                                       chunk_start, chunk_end)) {
2160              error_setg(errp, "cannot get lkey!");
2161              return -1;
2162          }
2163      }
2164  
2165      /*
2166       * Encode the ram block index and chunk within this wrid.
2167       * We will use this information at the time of completion
2168       * to figure out which bitmap to check against and then which
2169       * chunk in the bitmap to look for.
2170       */
2171      send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2172                                          current_index, chunk);
2173  
2174      send_wr.opcode = IBV_WR_RDMA_WRITE;
2175      send_wr.send_flags = IBV_SEND_SIGNALED;
2176      send_wr.sg_list = &sge;
2177      send_wr.num_sge = 1;
2178      send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2179                                  (current_addr - block->offset);
2180  
2181      trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2182                                     sge.length);
2183  
2184      /*
2185       * ibv_post_send() does not return negative error numbers,
2186       * per the specification they are positive - no idea why.
2187       */
2188      ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2189  
2190      if (ret == ENOMEM) {
2191          trace_qemu_rdma_write_one_queue_full();
2192          ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2193          if (ret < 0) {
2194              error_setg(errp, "rdma migration: failed to make "
2195                           "room in full send queue!");
2196              return -1;
2197          }
2198  
2199          goto retry;
2200  
2201      } else if (ret > 0) {
2202          error_setg_errno(errp, ret,
2203                           "rdma migration: post rdma write failed");
2204          return -1;
2205      }
2206  
2207      set_bit(chunk, block->transit_bitmap);
2208      stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
2209      /*
2210       * We are adding to transferred the amount of data written, but no
2211       * overhead at all.  I will assume that RDMA is magicaly and don't
2212       * need to transfer (at least) the addresses where it wants to
2213       * write the pages.  Here it looks like it should be something
2214       * like:
2215       *     sizeof(send_wr) + sge.length
2216       * but this being RDMA, who knows.
2217       */
2218      stat64_add(&mig_stats.rdma_bytes, sge.length);
2219      ram_transferred_add(sge.length);
2220      rdma->total_writes++;
2221  
2222      return 0;
2223  }
2224  
2225  /*
2226   * Push out any unwritten RDMA operations.
2227   *
2228   * We support sending out multiple chunks at the same time.
2229   * Not all of them need to get signaled in the completion queue.
2230   */
qemu_rdma_write_flush(RDMAContext * rdma,Error ** errp)2231  static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp)
2232  {
2233      int ret;
2234  
2235      if (!rdma->current_length) {
2236          return 0;
2237      }
2238  
2239      ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr,
2240                                rdma->current_length, errp);
2241  
2242      if (ret < 0) {
2243          return -1;
2244      }
2245  
2246      if (ret == 0) {
2247          rdma->nb_sent++;
2248          trace_qemu_rdma_write_flush(rdma->nb_sent);
2249      }
2250  
2251      rdma->current_length = 0;
2252      rdma->current_addr = 0;
2253  
2254      return 0;
2255  }
2256  
qemu_rdma_buffer_mergeable(RDMAContext * rdma,uint64_t offset,uint64_t len)2257  static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
2258                      uint64_t offset, uint64_t len)
2259  {
2260      RDMALocalBlock *block;
2261      uint8_t *host_addr;
2262      uint8_t *chunk_end;
2263  
2264      if (rdma->current_index < 0) {
2265          return false;
2266      }
2267  
2268      if (rdma->current_chunk < 0) {
2269          return false;
2270      }
2271  
2272      block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2273      host_addr = block->local_host_addr + (offset - block->offset);
2274      chunk_end = ram_chunk_end(block, rdma->current_chunk);
2275  
2276      if (rdma->current_length == 0) {
2277          return false;
2278      }
2279  
2280      /*
2281       * Only merge into chunk sequentially.
2282       */
2283      if (offset != (rdma->current_addr + rdma->current_length)) {
2284          return false;
2285      }
2286  
2287      if (offset < block->offset) {
2288          return false;
2289      }
2290  
2291      if ((offset + len) > (block->offset + block->length)) {
2292          return false;
2293      }
2294  
2295      if ((host_addr + len) > chunk_end) {
2296          return false;
2297      }
2298  
2299      return true;
2300  }
2301  
2302  /*
2303   * We're not actually writing here, but doing three things:
2304   *
2305   * 1. Identify the chunk the buffer belongs to.
2306   * 2. If the chunk is full or the buffer doesn't belong to the current
2307   *    chunk, then start a new chunk and flush() the old chunk.
2308   * 3. To keep the hardware busy, we also group chunks into batches
2309   *    and only require that a batch gets acknowledged in the completion
2310   *    queue instead of each individual chunk.
2311   */
qemu_rdma_write(RDMAContext * rdma,uint64_t block_offset,uint64_t offset,uint64_t len,Error ** errp)2312  static int qemu_rdma_write(RDMAContext *rdma,
2313                             uint64_t block_offset, uint64_t offset,
2314                             uint64_t len, Error **errp)
2315  {
2316      uint64_t current_addr = block_offset + offset;
2317      uint64_t index = rdma->current_index;
2318      uint64_t chunk = rdma->current_chunk;
2319  
2320      /* If we cannot merge it, we flush the current buffer first. */
2321      if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
2322          if (qemu_rdma_write_flush(rdma, errp) < 0) {
2323              return -1;
2324          }
2325          rdma->current_length = 0;
2326          rdma->current_addr = current_addr;
2327  
2328          qemu_rdma_search_ram_block(rdma, block_offset,
2329                                     offset, len, &index, &chunk);
2330          rdma->current_index = index;
2331          rdma->current_chunk = chunk;
2332      }
2333  
2334      /* merge it */
2335      rdma->current_length += len;
2336  
2337      /* flush it if buffer is too large */
2338      if (rdma->current_length >= RDMA_MERGE_MAX) {
2339          return qemu_rdma_write_flush(rdma, errp);
2340      }
2341  
2342      return 0;
2343  }
2344  
qemu_rdma_cleanup(RDMAContext * rdma)2345  static void qemu_rdma_cleanup(RDMAContext *rdma)
2346  {
2347      Error *err = NULL;
2348  
2349      if (rdma->cm_id && rdma->connected) {
2350          if ((rdma->errored ||
2351               migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2352              !rdma->received_error) {
2353              RDMAControlHeader head = { .len = 0,
2354                                         .type = RDMA_CONTROL_ERROR,
2355                                         .repeat = 1,
2356                                       };
2357              warn_report("Early error. Sending error.");
2358              if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) {
2359                  warn_report_err(err);
2360              }
2361          }
2362  
2363          rdma_disconnect(rdma->cm_id);
2364          trace_qemu_rdma_cleanup_disconnect();
2365          rdma->connected = false;
2366      }
2367  
2368      if (rdma->channel) {
2369          qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2370      }
2371      g_free(rdma->dest_blocks);
2372      rdma->dest_blocks = NULL;
2373  
2374      for (int i = 0; i < RDMA_WRID_MAX; i++) {
2375          if (rdma->wr_data[i].control_mr) {
2376              rdma->total_registrations--;
2377              ibv_dereg_mr(rdma->wr_data[i].control_mr);
2378          }
2379          rdma->wr_data[i].control_mr = NULL;
2380      }
2381  
2382      if (rdma->local_ram_blocks.block) {
2383          while (rdma->local_ram_blocks.nb_blocks) {
2384              rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2385          }
2386      }
2387  
2388      if (rdma->qp) {
2389          rdma_destroy_qp(rdma->cm_id);
2390          rdma->qp = NULL;
2391      }
2392      if (rdma->recv_cq) {
2393          ibv_destroy_cq(rdma->recv_cq);
2394          rdma->recv_cq = NULL;
2395      }
2396      if (rdma->send_cq) {
2397          ibv_destroy_cq(rdma->send_cq);
2398          rdma->send_cq = NULL;
2399      }
2400      if (rdma->recv_comp_channel) {
2401          ibv_destroy_comp_channel(rdma->recv_comp_channel);
2402          rdma->recv_comp_channel = NULL;
2403      }
2404      if (rdma->send_comp_channel) {
2405          ibv_destroy_comp_channel(rdma->send_comp_channel);
2406          rdma->send_comp_channel = NULL;
2407      }
2408      if (rdma->pd) {
2409          ibv_dealloc_pd(rdma->pd);
2410          rdma->pd = NULL;
2411      }
2412      if (rdma->cm_id) {
2413          rdma_destroy_id(rdma->cm_id);
2414          rdma->cm_id = NULL;
2415      }
2416  
2417      /* the destination side, listen_id and channel is shared */
2418      if (rdma->listen_id) {
2419          if (!rdma->is_return_path) {
2420              rdma_destroy_id(rdma->listen_id);
2421          }
2422          rdma->listen_id = NULL;
2423  
2424          if (rdma->channel) {
2425              if (!rdma->is_return_path) {
2426                  rdma_destroy_event_channel(rdma->channel);
2427              }
2428              rdma->channel = NULL;
2429          }
2430      }
2431  
2432      if (rdma->channel) {
2433          rdma_destroy_event_channel(rdma->channel);
2434          rdma->channel = NULL;
2435      }
2436      g_free(rdma->host);
2437      rdma->host = NULL;
2438  }
2439  
2440  
qemu_rdma_source_init(RDMAContext * rdma,bool pin_all,Error ** errp)2441  static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2442  {
2443      int ret;
2444  
2445      /*
2446       * Will be validated against destination's actual capabilities
2447       * after the connect() completes.
2448       */
2449      rdma->pin_all = pin_all;
2450  
2451      ret = qemu_rdma_resolve_host(rdma, errp);
2452      if (ret < 0) {
2453          goto err_rdma_source_init;
2454      }
2455  
2456      ret = qemu_rdma_alloc_pd_cq(rdma, errp);
2457      if (ret < 0) {
2458          goto err_rdma_source_init;
2459      }
2460  
2461      ret = qemu_rdma_alloc_qp(rdma);
2462      if (ret < 0) {
2463          error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
2464          goto err_rdma_source_init;
2465      }
2466  
2467      qemu_rdma_init_ram_blocks(rdma);
2468  
2469      /* Build the hash that maps from offset to RAMBlock */
2470      rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2471      for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) {
2472          g_hash_table_insert(rdma->blockmap,
2473                  (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset,
2474                  &rdma->local_ram_blocks.block[i]);
2475      }
2476  
2477      for (int i = 0; i < RDMA_WRID_MAX; i++) {
2478          ret = qemu_rdma_reg_control(rdma, i);
2479          if (ret < 0) {
2480              error_setg(errp, "RDMA ERROR: rdma migration: error "
2481                         "registering %d control!", i);
2482              goto err_rdma_source_init;
2483          }
2484      }
2485  
2486      return 0;
2487  
2488  err_rdma_source_init:
2489      qemu_rdma_cleanup(rdma);
2490      return -1;
2491  }
2492  
qemu_get_cm_event_timeout(RDMAContext * rdma,struct rdma_cm_event ** cm_event,long msec,Error ** errp)2493  static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2494                                       struct rdma_cm_event **cm_event,
2495                                       long msec, Error **errp)
2496  {
2497      int ret;
2498      struct pollfd poll_fd = {
2499                                  .fd = rdma->channel->fd,
2500                                  .events = POLLIN,
2501                                  .revents = 0
2502                              };
2503  
2504      do {
2505          ret = poll(&poll_fd, 1, msec);
2506      } while (ret < 0 && errno == EINTR);
2507  
2508      if (ret == 0) {
2509          error_setg(errp, "RDMA ERROR: poll cm event timeout");
2510          return -1;
2511      } else if (ret < 0) {
2512          error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
2513                     errno);
2514          return -1;
2515      } else if (poll_fd.revents & POLLIN) {
2516          if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
2517              error_setg(errp, "RDMA ERROR: failed to get cm event");
2518              return -1;
2519          }
2520          return 0;
2521      } else {
2522          error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
2523                     poll_fd.revents);
2524          return -1;
2525      }
2526  }
2527  
qemu_rdma_connect(RDMAContext * rdma,bool return_path,Error ** errp)2528  static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
2529                               Error **errp)
2530  {
2531      RDMACapabilities cap = {
2532                                  .version = RDMA_CONTROL_VERSION_CURRENT,
2533                                  .flags = 0,
2534                             };
2535      struct rdma_conn_param conn_param = { .initiator_depth = 2,
2536                                            .retry_count = 5,
2537                                            .private_data = &cap,
2538                                            .private_data_len = sizeof(cap),
2539                                          };
2540      struct rdma_cm_event *cm_event;
2541      int ret;
2542  
2543      /*
2544       * Only negotiate the capability with destination if the user
2545       * on the source first requested the capability.
2546       */
2547      if (rdma->pin_all) {
2548          trace_qemu_rdma_connect_pin_all_requested();
2549          cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2550      }
2551  
2552      caps_to_network(&cap);
2553  
2554      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
2555      if (ret < 0) {
2556          goto err_rdma_source_connect;
2557      }
2558  
2559      ret = rdma_connect(rdma->cm_id, &conn_param);
2560      if (ret < 0) {
2561          error_setg_errno(errp, errno,
2562                           "RDMA ERROR: connecting to destination!");
2563          goto err_rdma_source_connect;
2564      }
2565  
2566      if (return_path) {
2567          ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2568      } else {
2569          ret = rdma_get_cm_event(rdma->channel, &cm_event);
2570          if (ret < 0) {
2571              error_setg_errno(errp, errno,
2572                               "RDMA ERROR: failed to get cm event");
2573          }
2574      }
2575      if (ret < 0) {
2576          goto err_rdma_source_connect;
2577      }
2578  
2579      if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2580          error_setg(errp, "RDMA ERROR: connecting to destination!");
2581          rdma_ack_cm_event(cm_event);
2582          goto err_rdma_source_connect;
2583      }
2584      rdma->connected = true;
2585  
2586      memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2587      network_to_caps(&cap);
2588  
2589      /*
2590       * Verify that the *requested* capabilities are supported by the destination
2591       * and disable them otherwise.
2592       */
2593      if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2594          warn_report("RDMA: Server cannot support pinning all memory. "
2595                      "Will register memory dynamically.");
2596          rdma->pin_all = false;
2597      }
2598  
2599      trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2600  
2601      rdma_ack_cm_event(cm_event);
2602  
2603      rdma->control_ready_expected = 1;
2604      rdma->nb_sent = 0;
2605      return 0;
2606  
2607  err_rdma_source_connect:
2608      qemu_rdma_cleanup(rdma);
2609      return -1;
2610  }
2611  
qemu_rdma_dest_init(RDMAContext * rdma,Error ** errp)2612  static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2613  {
2614      Error *err = NULL;
2615      int ret;
2616      struct rdma_cm_id *listen_id;
2617      char ip[40] = "unknown";
2618      struct rdma_addrinfo *res, *e;
2619      char port_str[16];
2620      int reuse = 1;
2621  
2622      for (int i = 0; i < RDMA_WRID_MAX; i++) {
2623          rdma->wr_data[i].control_len = 0;
2624          rdma->wr_data[i].control_curr = NULL;
2625      }
2626  
2627      if (!rdma->host || !rdma->host[0]) {
2628          error_setg(errp, "RDMA ERROR: RDMA host is not set!");
2629          rdma->errored = true;
2630          return -1;
2631      }
2632      /* create CM channel */
2633      rdma->channel = rdma_create_event_channel();
2634      if (!rdma->channel) {
2635          error_setg(errp, "RDMA ERROR: could not create rdma event channel");
2636          rdma->errored = true;
2637          return -1;
2638      }
2639  
2640      /* create CM id */
2641      ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2642      if (ret < 0) {
2643          error_setg(errp, "RDMA ERROR: could not create cm_id!");
2644          goto err_dest_init_create_listen_id;
2645      }
2646  
2647      snprintf(port_str, 16, "%d", rdma->port);
2648      port_str[15] = '\0';
2649  
2650      ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2651      if (ret) {
2652          error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
2653                     rdma->host);
2654          goto err_dest_init_bind_addr;
2655      }
2656  
2657      ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2658                            &reuse, sizeof reuse);
2659      if (ret < 0) {
2660          error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
2661          goto err_dest_init_bind_addr;
2662      }
2663  
2664      /* Try all addresses, saving the first error in @err */
2665      for (e = res; e != NULL; e = e->ai_next) {
2666          Error **local_errp = err ? NULL : &err;
2667  
2668          inet_ntop(e->ai_family,
2669              &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2670          trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2671          ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2672          if (ret < 0) {
2673              continue;
2674          }
2675          if (e->ai_family == AF_INET6) {
2676              ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs,
2677                                                 local_errp);
2678              if (ret < 0) {
2679                  continue;
2680              }
2681          }
2682          error_free(err);
2683          break;
2684      }
2685  
2686      rdma_freeaddrinfo(res);
2687      if (!e) {
2688          if (err) {
2689              error_propagate(errp, err);
2690          } else {
2691              error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
2692          }
2693          goto err_dest_init_bind_addr;
2694      }
2695  
2696      rdma->listen_id = listen_id;
2697      qemu_rdma_dump_gid("dest_init", listen_id);
2698      return 0;
2699  
2700  err_dest_init_bind_addr:
2701      rdma_destroy_id(listen_id);
2702  err_dest_init_create_listen_id:
2703      rdma_destroy_event_channel(rdma->channel);
2704      rdma->channel = NULL;
2705      rdma->errored = true;
2706      return -1;
2707  
2708  }
2709  
qemu_rdma_return_path_dest_init(RDMAContext * rdma_return_path,RDMAContext * rdma)2710  static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2711                                              RDMAContext *rdma)
2712  {
2713      for (int i = 0; i < RDMA_WRID_MAX; i++) {
2714          rdma_return_path->wr_data[i].control_len = 0;
2715          rdma_return_path->wr_data[i].control_curr = NULL;
2716      }
2717  
2718      /*the CM channel and CM id is shared*/
2719      rdma_return_path->channel = rdma->channel;
2720      rdma_return_path->listen_id = rdma->listen_id;
2721  
2722      rdma->return_path = rdma_return_path;
2723      rdma_return_path->return_path = rdma;
2724      rdma_return_path->is_return_path = true;
2725  }
2726  
qemu_rdma_data_init(InetSocketAddress * saddr,Error ** errp)2727  static RDMAContext *qemu_rdma_data_init(InetSocketAddress *saddr, Error **errp)
2728  {
2729      RDMAContext *rdma = NULL;
2730  
2731      rdma = g_new0(RDMAContext, 1);
2732      rdma->current_index = -1;
2733      rdma->current_chunk = -1;
2734  
2735      rdma->host = g_strdup(saddr->host);
2736      rdma->port = atoi(saddr->port);
2737      return rdma;
2738  }
2739  
2740  /*
2741   * QEMUFile interface to the control channel.
2742   * SEND messages for control only.
2743   * VM's ram is handled with regular RDMA messages.
2744   */
qio_channel_rdma_writev(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)2745  static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2746                                         const struct iovec *iov,
2747                                         size_t niov,
2748                                         int *fds,
2749                                         size_t nfds,
2750                                         int flags,
2751                                         Error **errp)
2752  {
2753      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2754      RDMAContext *rdma;
2755      int ret;
2756      ssize_t done = 0;
2757      size_t len;
2758  
2759      RCU_READ_LOCK_GUARD();
2760      rdma = qatomic_rcu_read(&rioc->rdmaout);
2761  
2762      if (!rdma) {
2763          error_setg(errp, "RDMA control channel output is not set");
2764          return -1;
2765      }
2766  
2767      if (rdma->errored) {
2768          error_setg(errp,
2769                     "RDMA is in an error state waiting migration to abort!");
2770          return -1;
2771      }
2772  
2773      /*
2774       * Push out any writes that
2775       * we're queued up for VM's ram.
2776       */
2777      ret = qemu_rdma_write_flush(rdma, errp);
2778      if (ret < 0) {
2779          rdma->errored = true;
2780          return -1;
2781      }
2782  
2783      for (int i = 0; i < niov; i++) {
2784          size_t remaining = iov[i].iov_len;
2785          uint8_t * data = (void *)iov[i].iov_base;
2786          while (remaining) {
2787              RDMAControlHeader head = {};
2788  
2789              len = MIN(remaining, RDMA_SEND_INCREMENT);
2790              remaining -= len;
2791  
2792              head.len = len;
2793              head.type = RDMA_CONTROL_QEMU_FILE;
2794  
2795              ret = qemu_rdma_exchange_send(rdma, &head,
2796                                            data, NULL, NULL, NULL, errp);
2797  
2798              if (ret < 0) {
2799                  rdma->errored = true;
2800                  return -1;
2801              }
2802  
2803              data += len;
2804              done += len;
2805          }
2806      }
2807  
2808      return done;
2809  }
2810  
qemu_rdma_fill(RDMAContext * rdma,uint8_t * buf,size_t size,int idx)2811  static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2812                               size_t size, int idx)
2813  {
2814      size_t len = 0;
2815  
2816      if (rdma->wr_data[idx].control_len) {
2817          trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2818  
2819          len = MIN(size, rdma->wr_data[idx].control_len);
2820          memcpy(buf, rdma->wr_data[idx].control_curr, len);
2821          rdma->wr_data[idx].control_curr += len;
2822          rdma->wr_data[idx].control_len -= len;
2823      }
2824  
2825      return len;
2826  }
2827  
2828  /*
2829   * QEMUFile interface to the control channel.
2830   * RDMA links don't use bytestreams, so we have to
2831   * return bytes to QEMUFile opportunistically.
2832   */
qio_channel_rdma_readv(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)2833  static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2834                                        const struct iovec *iov,
2835                                        size_t niov,
2836                                        int **fds,
2837                                        size_t *nfds,
2838                                        int flags,
2839                                        Error **errp)
2840  {
2841      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2842      RDMAContext *rdma;
2843      RDMAControlHeader head;
2844      int ret;
2845      ssize_t done = 0;
2846      size_t len;
2847  
2848      RCU_READ_LOCK_GUARD();
2849      rdma = qatomic_rcu_read(&rioc->rdmain);
2850  
2851      if (!rdma) {
2852          error_setg(errp, "RDMA control channel input is not set");
2853          return -1;
2854      }
2855  
2856      if (rdma->errored) {
2857          error_setg(errp,
2858                     "RDMA is in an error state waiting migration to abort!");
2859          return -1;
2860      }
2861  
2862      for (int i = 0; i < niov; i++) {
2863          size_t want = iov[i].iov_len;
2864          uint8_t *data = (void *)iov[i].iov_base;
2865  
2866          /*
2867           * First, we hold on to the last SEND message we
2868           * were given and dish out the bytes until we run
2869           * out of bytes.
2870           */
2871          len = qemu_rdma_fill(rdma, data, want, 0);
2872          done += len;
2873          want -= len;
2874          /* Got what we needed, so go to next iovec */
2875          if (want == 0) {
2876              continue;
2877          }
2878  
2879          /* If we got any data so far, then don't wait
2880           * for more, just return what we have */
2881          if (done > 0) {
2882              break;
2883          }
2884  
2885  
2886          /* We've got nothing at all, so lets wait for
2887           * more to arrive
2888           */
2889          ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE,
2890                                        errp);
2891  
2892          if (ret < 0) {
2893              rdma->errored = true;
2894              return -1;
2895          }
2896  
2897          /*
2898           * SEND was received with new bytes, now try again.
2899           */
2900          len = qemu_rdma_fill(rdma, data, want, 0);
2901          done += len;
2902          want -= len;
2903  
2904          /* Still didn't get enough, so lets just return */
2905          if (want) {
2906              if (done == 0) {
2907                  return QIO_CHANNEL_ERR_BLOCK;
2908              } else {
2909                  break;
2910              }
2911          }
2912      }
2913      return done;
2914  }
2915  
2916  /*
2917   * Block until all the outstanding chunks have been delivered by the hardware.
2918   */
qemu_rdma_drain_cq(RDMAContext * rdma)2919  static int qemu_rdma_drain_cq(RDMAContext *rdma)
2920  {
2921      Error *err = NULL;
2922  
2923      if (qemu_rdma_write_flush(rdma, &err) < 0) {
2924          error_report_err(err);
2925          return -1;
2926      }
2927  
2928      while (rdma->nb_sent) {
2929          if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) {
2930              error_report("rdma migration: complete polling error!");
2931              return -1;
2932          }
2933      }
2934  
2935      qemu_rdma_unregister_waiting(rdma);
2936  
2937      return 0;
2938  }
2939  
2940  
qio_channel_rdma_set_blocking(QIOChannel * ioc,bool blocking,Error ** errp)2941  static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2942                                           bool blocking,
2943                                           Error **errp)
2944  {
2945      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2946      /* XXX we should make readv/writev actually honour this :-) */
2947      rioc->blocking = blocking;
2948      return 0;
2949  }
2950  
2951  
2952  typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2953  struct QIOChannelRDMASource {
2954      GSource parent;
2955      QIOChannelRDMA *rioc;
2956      GIOCondition condition;
2957  };
2958  
2959  static gboolean
qio_channel_rdma_source_prepare(GSource * source,gint * timeout)2960  qio_channel_rdma_source_prepare(GSource *source,
2961                                  gint *timeout)
2962  {
2963      QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2964      RDMAContext *rdma;
2965      GIOCondition cond = 0;
2966      *timeout = -1;
2967  
2968      RCU_READ_LOCK_GUARD();
2969      if (rsource->condition == G_IO_IN) {
2970          rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2971      } else {
2972          rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2973      }
2974  
2975      if (!rdma) {
2976          error_report("RDMAContext is NULL when prepare Gsource");
2977          return FALSE;
2978      }
2979  
2980      if (rdma->wr_data[0].control_len) {
2981          cond |= G_IO_IN;
2982      }
2983      cond |= G_IO_OUT;
2984  
2985      return cond & rsource->condition;
2986  }
2987  
2988  static gboolean
qio_channel_rdma_source_check(GSource * source)2989  qio_channel_rdma_source_check(GSource *source)
2990  {
2991      QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2992      RDMAContext *rdma;
2993      GIOCondition cond = 0;
2994  
2995      RCU_READ_LOCK_GUARD();
2996      if (rsource->condition == G_IO_IN) {
2997          rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2998      } else {
2999          rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3000      }
3001  
3002      if (!rdma) {
3003          error_report("RDMAContext is NULL when check Gsource");
3004          return FALSE;
3005      }
3006  
3007      if (rdma->wr_data[0].control_len) {
3008          cond |= G_IO_IN;
3009      }
3010      cond |= G_IO_OUT;
3011  
3012      return cond & rsource->condition;
3013  }
3014  
3015  static gboolean
qio_channel_rdma_source_dispatch(GSource * source,GSourceFunc callback,gpointer user_data)3016  qio_channel_rdma_source_dispatch(GSource *source,
3017                                   GSourceFunc callback,
3018                                   gpointer user_data)
3019  {
3020      QIOChannelFunc func = (QIOChannelFunc)callback;
3021      QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3022      RDMAContext *rdma;
3023      GIOCondition cond = 0;
3024  
3025      RCU_READ_LOCK_GUARD();
3026      if (rsource->condition == G_IO_IN) {
3027          rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3028      } else {
3029          rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3030      }
3031  
3032      if (!rdma) {
3033          error_report("RDMAContext is NULL when dispatch Gsource");
3034          return FALSE;
3035      }
3036  
3037      if (rdma->wr_data[0].control_len) {
3038          cond |= G_IO_IN;
3039      }
3040      cond |= G_IO_OUT;
3041  
3042      return (*func)(QIO_CHANNEL(rsource->rioc),
3043                     (cond & rsource->condition),
3044                     user_data);
3045  }
3046  
3047  static void
qio_channel_rdma_source_finalize(GSource * source)3048  qio_channel_rdma_source_finalize(GSource *source)
3049  {
3050      QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3051  
3052      object_unref(OBJECT(ssource->rioc));
3053  }
3054  
3055  static GSourceFuncs qio_channel_rdma_source_funcs = {
3056      qio_channel_rdma_source_prepare,
3057      qio_channel_rdma_source_check,
3058      qio_channel_rdma_source_dispatch,
3059      qio_channel_rdma_source_finalize
3060  };
3061  
qio_channel_rdma_create_watch(QIOChannel * ioc,GIOCondition condition)3062  static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3063                                                GIOCondition condition)
3064  {
3065      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3066      QIOChannelRDMASource *ssource;
3067      GSource *source;
3068  
3069      source = g_source_new(&qio_channel_rdma_source_funcs,
3070                            sizeof(QIOChannelRDMASource));
3071      ssource = (QIOChannelRDMASource *)source;
3072  
3073      ssource->rioc = rioc;
3074      object_ref(OBJECT(rioc));
3075  
3076      ssource->condition = condition;
3077  
3078      return source;
3079  }
3080  
qio_channel_rdma_set_aio_fd_handler(QIOChannel * ioc,AioContext * read_ctx,IOHandler * io_read,AioContext * write_ctx,IOHandler * io_write,void * opaque)3081  static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3082                                                  AioContext *read_ctx,
3083                                                  IOHandler *io_read,
3084                                                  AioContext *write_ctx,
3085                                                  IOHandler *io_write,
3086                                                  void *opaque)
3087  {
3088      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3089      if (io_read) {
3090          aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
3091                             io_read, io_write, NULL, NULL, opaque);
3092          aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
3093                             io_read, io_write, NULL, NULL, opaque);
3094      } else {
3095          aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
3096                             io_read, io_write, NULL, NULL, opaque);
3097          aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
3098                             io_read, io_write, NULL, NULL, opaque);
3099      }
3100  }
3101  
3102  struct rdma_close_rcu {
3103      struct rcu_head rcu;
3104      RDMAContext *rdmain;
3105      RDMAContext *rdmaout;
3106  };
3107  
3108  /* callback from qio_channel_rdma_close via call_rcu */
qio_channel_rdma_close_rcu(struct rdma_close_rcu * rcu)3109  static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3110  {
3111      if (rcu->rdmain) {
3112          qemu_rdma_cleanup(rcu->rdmain);
3113      }
3114  
3115      if (rcu->rdmaout) {
3116          qemu_rdma_cleanup(rcu->rdmaout);
3117      }
3118  
3119      g_free(rcu->rdmain);
3120      g_free(rcu->rdmaout);
3121      g_free(rcu);
3122  }
3123  
qio_channel_rdma_close(QIOChannel * ioc,Error ** errp)3124  static int qio_channel_rdma_close(QIOChannel *ioc,
3125                                    Error **errp)
3126  {
3127      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3128      RDMAContext *rdmain, *rdmaout;
3129      struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3130  
3131      trace_qemu_rdma_close();
3132  
3133      rdmain = rioc->rdmain;
3134      if (rdmain) {
3135          qatomic_rcu_set(&rioc->rdmain, NULL);
3136      }
3137  
3138      rdmaout = rioc->rdmaout;
3139      if (rdmaout) {
3140          qatomic_rcu_set(&rioc->rdmaout, NULL);
3141      }
3142  
3143      rcu->rdmain = rdmain;
3144      rcu->rdmaout = rdmaout;
3145      call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3146  
3147      return 0;
3148  }
3149  
3150  static int
qio_channel_rdma_shutdown(QIOChannel * ioc,QIOChannelShutdown how,Error ** errp)3151  qio_channel_rdma_shutdown(QIOChannel *ioc,
3152                              QIOChannelShutdown how,
3153                              Error **errp)
3154  {
3155      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3156      RDMAContext *rdmain, *rdmaout;
3157  
3158      RCU_READ_LOCK_GUARD();
3159  
3160      rdmain = qatomic_rcu_read(&rioc->rdmain);
3161      rdmaout = qatomic_rcu_read(&rioc->rdmain);
3162  
3163      switch (how) {
3164      case QIO_CHANNEL_SHUTDOWN_READ:
3165          if (rdmain) {
3166              rdmain->errored = true;
3167          }
3168          break;
3169      case QIO_CHANNEL_SHUTDOWN_WRITE:
3170          if (rdmaout) {
3171              rdmaout->errored = true;
3172          }
3173          break;
3174      case QIO_CHANNEL_SHUTDOWN_BOTH:
3175      default:
3176          if (rdmain) {
3177              rdmain->errored = true;
3178          }
3179          if (rdmaout) {
3180              rdmaout->errored = true;
3181          }
3182          break;
3183      }
3184  
3185      return 0;
3186  }
3187  
3188  /*
3189   * Parameters:
3190   *    @offset == 0 :
3191   *        This means that 'block_offset' is a full virtual address that does not
3192   *        belong to a RAMBlock of the virtual machine and instead
3193   *        represents a private malloc'd memory area that the caller wishes to
3194   *        transfer.
3195   *
3196   *    @offset != 0 :
3197   *        Offset is an offset to be added to block_offset and used
3198   *        to also lookup the corresponding RAMBlock.
3199   *
3200   *    @size : Number of bytes to transfer
3201   *
3202   *    @pages_sent : User-specificed pointer to indicate how many pages were
3203   *                  sent. Usually, this will not be more than a few bytes of
3204   *                  the protocol because most transfers are sent asynchronously.
3205   */
qemu_rdma_save_page(QEMUFile * f,ram_addr_t block_offset,ram_addr_t offset,size_t size)3206  static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
3207                                 ram_addr_t offset, size_t size)
3208  {
3209      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3210      Error *err = NULL;
3211      RDMAContext *rdma;
3212      int ret;
3213  
3214      RCU_READ_LOCK_GUARD();
3215      rdma = qatomic_rcu_read(&rioc->rdmaout);
3216  
3217      if (!rdma) {
3218          return -1;
3219      }
3220  
3221      if (rdma_errored(rdma)) {
3222          return -1;
3223      }
3224  
3225      qemu_fflush(f);
3226  
3227      /*
3228       * Add this page to the current 'chunk'. If the chunk
3229       * is full, or the page doesn't belong to the current chunk,
3230       * an actual RDMA write will occur and a new chunk will be formed.
3231       */
3232      ret = qemu_rdma_write(rdma, block_offset, offset, size, &err);
3233      if (ret < 0) {
3234          error_report_err(err);
3235          goto err;
3236      }
3237  
3238      /*
3239       * Drain the Completion Queue if possible, but do not block,
3240       * just poll.
3241       *
3242       * If nothing to poll, the end of the iteration will do this
3243       * again to make sure we don't overflow the request queue.
3244       */
3245      while (1) {
3246          uint64_t wr_id, wr_id_in;
3247          ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3248  
3249          if (ret < 0) {
3250              error_report("rdma migration: polling error");
3251              goto err;
3252          }
3253  
3254          wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3255  
3256          if (wr_id == RDMA_WRID_NONE) {
3257              break;
3258          }
3259      }
3260  
3261      while (1) {
3262          uint64_t wr_id, wr_id_in;
3263          ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3264  
3265          if (ret < 0) {
3266              error_report("rdma migration: polling error");
3267              goto err;
3268          }
3269  
3270          wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3271  
3272          if (wr_id == RDMA_WRID_NONE) {
3273              break;
3274          }
3275      }
3276  
3277      return RAM_SAVE_CONTROL_DELAYED;
3278  
3279  err:
3280      rdma->errored = true;
3281      return -1;
3282  }
3283  
rdma_control_save_page(QEMUFile * f,ram_addr_t block_offset,ram_addr_t offset,size_t size)3284  int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset,
3285                             ram_addr_t offset, size_t size)
3286  {
3287      if (!migrate_rdma() || migration_in_postcopy()) {
3288          return RAM_SAVE_CONTROL_NOT_SUPP;
3289      }
3290  
3291      int ret = qemu_rdma_save_page(f, block_offset, offset, size);
3292  
3293      if (ret != RAM_SAVE_CONTROL_DELAYED &&
3294          ret != RAM_SAVE_CONTROL_NOT_SUPP) {
3295          if (ret < 0) {
3296              qemu_file_set_error(f, ret);
3297          }
3298      }
3299      return ret;
3300  }
3301  
3302  static void rdma_accept_incoming_migration(void *opaque);
3303  
rdma_cm_poll_handler(void * opaque)3304  static void rdma_cm_poll_handler(void *opaque)
3305  {
3306      RDMAContext *rdma = opaque;
3307      struct rdma_cm_event *cm_event;
3308      MigrationIncomingState *mis = migration_incoming_get_current();
3309  
3310      if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
3311          error_report("get_cm_event failed %d", errno);
3312          return;
3313      }
3314  
3315      if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3316          cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3317          if (!rdma->errored &&
3318              migration_incoming_get_current()->state !=
3319                MIGRATION_STATUS_COMPLETED) {
3320              error_report("receive cm event, cm event is %d", cm_event->event);
3321              rdma->errored = true;
3322              if (rdma->return_path) {
3323                  rdma->return_path->errored = true;
3324              }
3325          }
3326          rdma_ack_cm_event(cm_event);
3327          if (mis->loadvm_co) {
3328              qemu_coroutine_enter(mis->loadvm_co);
3329          }
3330          return;
3331      }
3332      rdma_ack_cm_event(cm_event);
3333  }
3334  
qemu_rdma_accept(RDMAContext * rdma)3335  static int qemu_rdma_accept(RDMAContext *rdma)
3336  {
3337      Error *err = NULL;
3338      RDMACapabilities cap;
3339      struct rdma_conn_param conn_param = {
3340                                              .responder_resources = 2,
3341                                              .private_data = &cap,
3342                                              .private_data_len = sizeof(cap),
3343                                           };
3344      RDMAContext *rdma_return_path = NULL;
3345      g_autoptr(InetSocketAddress) isock = g_new0(InetSocketAddress, 1);
3346      struct rdma_cm_event *cm_event;
3347      struct ibv_context *verbs;
3348      int ret;
3349  
3350      ret = rdma_get_cm_event(rdma->channel, &cm_event);
3351      if (ret < 0) {
3352          goto err_rdma_dest_wait;
3353      }
3354  
3355      if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3356          rdma_ack_cm_event(cm_event);
3357          goto err_rdma_dest_wait;
3358      }
3359  
3360      isock->host = g_strdup(rdma->host);
3361      isock->port = g_strdup_printf("%d", rdma->port);
3362  
3363      /*
3364       * initialize the RDMAContext for return path for postcopy after first
3365       * connection request reached.
3366       */
3367      if ((migrate_postcopy() || migrate_return_path())
3368          && !rdma->is_return_path) {
3369          rdma_return_path = qemu_rdma_data_init(isock, NULL);
3370          if (rdma_return_path == NULL) {
3371              rdma_ack_cm_event(cm_event);
3372              goto err_rdma_dest_wait;
3373          }
3374  
3375          qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3376      }
3377  
3378      memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3379  
3380      network_to_caps(&cap);
3381  
3382      if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3383          error_report("Unknown source RDMA version: %d, bailing...",
3384                       cap.version);
3385          rdma_ack_cm_event(cm_event);
3386          goto err_rdma_dest_wait;
3387      }
3388  
3389      /*
3390       * Respond with only the capabilities this version of QEMU knows about.
3391       */
3392      cap.flags &= known_capabilities;
3393  
3394      /*
3395       * Enable the ones that we do know about.
3396       * Add other checks here as new ones are introduced.
3397       */
3398      if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3399          rdma->pin_all = true;
3400      }
3401  
3402      rdma->cm_id = cm_event->id;
3403      verbs = cm_event->id->verbs;
3404  
3405      rdma_ack_cm_event(cm_event);
3406  
3407      trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3408  
3409      caps_to_network(&cap);
3410  
3411      trace_qemu_rdma_accept_pin_verbsc(verbs);
3412  
3413      if (!rdma->verbs) {
3414          rdma->verbs = verbs;
3415      } else if (rdma->verbs != verbs) {
3416          error_report("ibv context not matching %p, %p!", rdma->verbs,
3417                       verbs);
3418          goto err_rdma_dest_wait;
3419      }
3420  
3421      qemu_rdma_dump_id("dest_init", verbs);
3422  
3423      ret = qemu_rdma_alloc_pd_cq(rdma, &err);
3424      if (ret < 0) {
3425          error_report_err(err);
3426          goto err_rdma_dest_wait;
3427      }
3428  
3429      ret = qemu_rdma_alloc_qp(rdma);
3430      if (ret < 0) {
3431          error_report("rdma migration: error allocating qp!");
3432          goto err_rdma_dest_wait;
3433      }
3434  
3435      qemu_rdma_init_ram_blocks(rdma);
3436  
3437      for (int i = 0; i < RDMA_WRID_MAX; i++) {
3438          ret = qemu_rdma_reg_control(rdma, i);
3439          if (ret < 0) {
3440              error_report("rdma: error registering %d control", i);
3441              goto err_rdma_dest_wait;
3442          }
3443      }
3444  
3445      /* Accept the second connection request for return path */
3446      if ((migrate_postcopy() || migrate_return_path())
3447          && !rdma->is_return_path) {
3448          qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3449                              NULL,
3450                              (void *)(intptr_t)rdma->return_path);
3451      } else {
3452          qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3453                              NULL, rdma);
3454      }
3455  
3456      ret = rdma_accept(rdma->cm_id, &conn_param);
3457      if (ret < 0) {
3458          error_report("rdma_accept failed");
3459          goto err_rdma_dest_wait;
3460      }
3461  
3462      ret = rdma_get_cm_event(rdma->channel, &cm_event);
3463      if (ret < 0) {
3464          error_report("rdma_accept get_cm_event failed");
3465          goto err_rdma_dest_wait;
3466      }
3467  
3468      if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3469          error_report("rdma_accept not event established");
3470          rdma_ack_cm_event(cm_event);
3471          goto err_rdma_dest_wait;
3472      }
3473  
3474      rdma_ack_cm_event(cm_event);
3475      rdma->connected = true;
3476  
3477      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err);
3478      if (ret < 0) {
3479          error_report_err(err);
3480          goto err_rdma_dest_wait;
3481      }
3482  
3483      qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3484  
3485      return 0;
3486  
3487  err_rdma_dest_wait:
3488      rdma->errored = true;
3489      qemu_rdma_cleanup(rdma);
3490      g_free(rdma_return_path);
3491      return -1;
3492  }
3493  
dest_ram_sort_func(const void * a,const void * b)3494  static int dest_ram_sort_func(const void *a, const void *b)
3495  {
3496      unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3497      unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3498  
3499      return (a_index < b_index) ? -1 : (a_index != b_index);
3500  }
3501  
3502  /*
3503   * During each iteration of the migration, we listen for instructions
3504   * by the source VM to perform dynamic page registrations before they
3505   * can perform RDMA operations.
3506   *
3507   * We respond with the 'rkey'.
3508   *
3509   * Keep doing this until the source tells us to stop.
3510   */
rdma_registration_handle(QEMUFile * f)3511  int rdma_registration_handle(QEMUFile *f)
3512  {
3513      RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3514                                 .type = RDMA_CONTROL_REGISTER_RESULT,
3515                                 .repeat = 0,
3516                               };
3517      RDMAControlHeader unreg_resp = { .len = 0,
3518                                 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3519                                 .repeat = 0,
3520                               };
3521      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3522                                   .repeat = 1 };
3523      QIOChannelRDMA *rioc;
3524      Error *err = NULL;
3525      RDMAContext *rdma;
3526      RDMALocalBlocks *local;
3527      RDMAControlHeader head;
3528      RDMARegister *reg, *registers;
3529      RDMACompress *comp;
3530      RDMARegisterResult *reg_result;
3531      static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3532      RDMALocalBlock *block;
3533      void *host_addr;
3534      int ret;
3535      int idx = 0;
3536  
3537      if (!migrate_rdma()) {
3538          return 0;
3539      }
3540  
3541      RCU_READ_LOCK_GUARD();
3542      rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3543      rdma = qatomic_rcu_read(&rioc->rdmain);
3544  
3545      if (!rdma) {
3546          return -1;
3547      }
3548  
3549      if (rdma_errored(rdma)) {
3550          return -1;
3551      }
3552  
3553      local = &rdma->local_ram_blocks;
3554      do {
3555          trace_rdma_registration_handle_wait();
3556  
3557          ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err);
3558  
3559          if (ret < 0) {
3560              error_report_err(err);
3561              break;
3562          }
3563  
3564          if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3565              error_report("rdma: Too many requests in this message (%d)."
3566                              "Bailing.", head.repeat);
3567              break;
3568          }
3569  
3570          switch (head.type) {
3571          case RDMA_CONTROL_COMPRESS:
3572              comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3573              network_to_compress(comp);
3574  
3575              trace_rdma_registration_handle_compress(comp->length,
3576                                                      comp->block_idx,
3577                                                      comp->offset);
3578              if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3579                  error_report("rdma: 'compress' bad block index %u (vs %d)",
3580                               (unsigned int)comp->block_idx,
3581                               rdma->local_ram_blocks.nb_blocks);
3582                  goto err;
3583              }
3584              block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3585  
3586              host_addr = block->local_host_addr +
3587                              (comp->offset - block->offset);
3588              if (comp->value) {
3589                  error_report("rdma: Zero page with non-zero (%d) value",
3590                               comp->value);
3591                  goto err;
3592              }
3593              ram_handle_zero(host_addr, comp->length);
3594              break;
3595  
3596          case RDMA_CONTROL_REGISTER_FINISHED:
3597              trace_rdma_registration_handle_finished();
3598              return 0;
3599  
3600          case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3601              trace_rdma_registration_handle_ram_blocks();
3602  
3603              /* Sort our local RAM Block list so it's the same as the source,
3604               * we can do this since we've filled in a src_index in the list
3605               * as we received the RAMBlock list earlier.
3606               */
3607              qsort(rdma->local_ram_blocks.block,
3608                    rdma->local_ram_blocks.nb_blocks,
3609                    sizeof(RDMALocalBlock), dest_ram_sort_func);
3610              for (int i = 0; i < local->nb_blocks; i++) {
3611                  local->block[i].index = i;
3612              }
3613  
3614              if (rdma->pin_all) {
3615                  ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err);
3616                  if (ret < 0) {
3617                      error_report_err(err);
3618                      goto err;
3619                  }
3620              }
3621  
3622              /*
3623               * Dest uses this to prepare to transmit the RAMBlock descriptions
3624               * to the source VM after connection setup.
3625               * Both sides use the "remote" structure to communicate and update
3626               * their "local" descriptions with what was sent.
3627               */
3628              for (int i = 0; i < local->nb_blocks; i++) {
3629                  rdma->dest_blocks[i].remote_host_addr =
3630                      (uintptr_t)(local->block[i].local_host_addr);
3631  
3632                  if (rdma->pin_all) {
3633                      rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3634                  }
3635  
3636                  rdma->dest_blocks[i].offset = local->block[i].offset;
3637                  rdma->dest_blocks[i].length = local->block[i].length;
3638  
3639                  dest_block_to_network(&rdma->dest_blocks[i]);
3640                  trace_rdma_registration_handle_ram_blocks_loop(
3641                      local->block[i].block_name,
3642                      local->block[i].offset,
3643                      local->block[i].length,
3644                      local->block[i].local_host_addr,
3645                      local->block[i].src_index);
3646              }
3647  
3648              blocks.len = rdma->local_ram_blocks.nb_blocks
3649                                                  * sizeof(RDMADestBlock);
3650  
3651  
3652              ret = qemu_rdma_post_send_control(rdma,
3653                                      (uint8_t *) rdma->dest_blocks, &blocks,
3654                                      &err);
3655  
3656              if (ret < 0) {
3657                  error_report_err(err);
3658                  goto err;
3659              }
3660  
3661              break;
3662          case RDMA_CONTROL_REGISTER_REQUEST:
3663              trace_rdma_registration_handle_register(head.repeat);
3664  
3665              reg_resp.repeat = head.repeat;
3666              registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3667  
3668              for (int count = 0; count < head.repeat; count++) {
3669                  uint64_t chunk;
3670                  uint8_t *chunk_start, *chunk_end;
3671  
3672                  reg = &registers[count];
3673                  network_to_register(reg);
3674  
3675                  reg_result = &results[count];
3676  
3677                  trace_rdma_registration_handle_register_loop(count,
3678                           reg->current_index, reg->key.current_addr, reg->chunks);
3679  
3680                  if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3681                      error_report("rdma: 'register' bad block index %u (vs %d)",
3682                                   (unsigned int)reg->current_index,
3683                                   rdma->local_ram_blocks.nb_blocks);
3684                      goto err;
3685                  }
3686                  block = &(rdma->local_ram_blocks.block[reg->current_index]);
3687                  if (block->is_ram_block) {
3688                      if (block->offset > reg->key.current_addr) {
3689                          error_report("rdma: bad register address for block %s"
3690                              " offset: %" PRIx64 " current_addr: %" PRIx64,
3691                              block->block_name, block->offset,
3692                              reg->key.current_addr);
3693                          goto err;
3694                      }
3695                      host_addr = (block->local_host_addr +
3696                                  (reg->key.current_addr - block->offset));
3697                      chunk = ram_chunk_index(block->local_host_addr,
3698                                              (uint8_t *) host_addr);
3699                  } else {
3700                      chunk = reg->key.chunk;
3701                      host_addr = block->local_host_addr +
3702                          (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3703                      /* Check for particularly bad chunk value */
3704                      if (host_addr < (void *)block->local_host_addr) {
3705                          error_report("rdma: bad chunk for block %s"
3706                              " chunk: %" PRIx64,
3707                              block->block_name, reg->key.chunk);
3708                          goto err;
3709                      }
3710                  }
3711                  chunk_start = ram_chunk_start(block, chunk);
3712                  chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3713                  /* avoid "-Waddress-of-packed-member" warning */
3714                  uint32_t tmp_rkey = 0;
3715                  if (qemu_rdma_register_and_get_keys(rdma, block,
3716                              (uintptr_t)host_addr, NULL, &tmp_rkey,
3717                              chunk, chunk_start, chunk_end)) {
3718                      error_report("cannot get rkey");
3719                      goto err;
3720                  }
3721                  reg_result->rkey = tmp_rkey;
3722  
3723                  reg_result->host_addr = (uintptr_t)block->local_host_addr;
3724  
3725                  trace_rdma_registration_handle_register_rkey(reg_result->rkey);
3726  
3727                  result_to_network(reg_result);
3728              }
3729  
3730              ret = qemu_rdma_post_send_control(rdma,
3731                              (uint8_t *) results, &reg_resp, &err);
3732  
3733              if (ret < 0) {
3734                  error_report_err(err);
3735                  goto err;
3736              }
3737              break;
3738          case RDMA_CONTROL_UNREGISTER_REQUEST:
3739              trace_rdma_registration_handle_unregister(head.repeat);
3740              unreg_resp.repeat = head.repeat;
3741              registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3742  
3743              for (int count = 0; count < head.repeat; count++) {
3744                  reg = &registers[count];
3745                  network_to_register(reg);
3746  
3747                  trace_rdma_registration_handle_unregister_loop(count,
3748                             reg->current_index, reg->key.chunk);
3749  
3750                  block = &(rdma->local_ram_blocks.block[reg->current_index]);
3751  
3752                  ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3753                  block->pmr[reg->key.chunk] = NULL;
3754  
3755                  if (ret != 0) {
3756                      error_report("rdma unregistration chunk failed: %s",
3757                                   strerror(errno));
3758                      goto err;
3759                  }
3760  
3761                  rdma->total_registrations--;
3762  
3763                  trace_rdma_registration_handle_unregister_success(reg->key.chunk);
3764              }
3765  
3766              ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err);
3767  
3768              if (ret < 0) {
3769                  error_report_err(err);
3770                  goto err;
3771              }
3772              break;
3773          case RDMA_CONTROL_REGISTER_RESULT:
3774              error_report("Invalid RESULT message at dest.");
3775              goto err;
3776          default:
3777              error_report("Unknown control message %s", control_desc(head.type));
3778              goto err;
3779          }
3780      } while (1);
3781  
3782  err:
3783      rdma->errored = true;
3784      return -1;
3785  }
3786  
3787  /* Destination:
3788   * Called during the initial RAM load section which lists the
3789   * RAMBlocks by name.  This lets us know the order of the RAMBlocks on
3790   * the source.  We've already built our local RAMBlock list, but not
3791   * yet sent the list to the source.
3792   */
rdma_block_notification_handle(QEMUFile * f,const char * name)3793  int rdma_block_notification_handle(QEMUFile *f, const char *name)
3794  {
3795      int curr;
3796      int found = -1;
3797  
3798      if (!migrate_rdma()) {
3799          return 0;
3800      }
3801  
3802      RCU_READ_LOCK_GUARD();
3803      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3804      RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain);
3805  
3806      if (!rdma) {
3807          return -1;
3808      }
3809  
3810      /* Find the matching RAMBlock in our local list */
3811      for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3812          if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3813              found = curr;
3814              break;
3815          }
3816      }
3817  
3818      if (found == -1) {
3819          error_report("RAMBlock '%s' not found on destination", name);
3820          return -1;
3821      }
3822  
3823      rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3824      trace_rdma_block_notification_handle(name, rdma->next_src_index);
3825      rdma->next_src_index++;
3826  
3827      return 0;
3828  }
3829  
rdma_registration_start(QEMUFile * f,uint64_t flags)3830  int rdma_registration_start(QEMUFile *f, uint64_t flags)
3831  {
3832      if (!migrate_rdma() || migration_in_postcopy()) {
3833          return 0;
3834      }
3835  
3836      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3837      RCU_READ_LOCK_GUARD();
3838      RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout);
3839      if (!rdma) {
3840          return -1;
3841      }
3842  
3843      if (rdma_errored(rdma)) {
3844          return -1;
3845      }
3846  
3847      trace_rdma_registration_start(flags);
3848      qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3849      return qemu_fflush(f);
3850  }
3851  
3852  /*
3853   * Inform dest that dynamic registrations are done for now.
3854   * First, flush writes, if any.
3855   */
rdma_registration_stop(QEMUFile * f,uint64_t flags)3856  int rdma_registration_stop(QEMUFile *f, uint64_t flags)
3857  {
3858      QIOChannelRDMA *rioc;
3859      Error *err = NULL;
3860      RDMAContext *rdma;
3861      RDMAControlHeader head = { .len = 0, .repeat = 1 };
3862      int ret;
3863  
3864      if (!migrate_rdma() || migration_in_postcopy()) {
3865          return 0;
3866      }
3867  
3868      RCU_READ_LOCK_GUARD();
3869      rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3870      rdma = qatomic_rcu_read(&rioc->rdmaout);
3871      if (!rdma) {
3872          return -1;
3873      }
3874  
3875      if (rdma_errored(rdma)) {
3876          return -1;
3877      }
3878  
3879      qemu_fflush(f);
3880      ret = qemu_rdma_drain_cq(rdma);
3881  
3882      if (ret < 0) {
3883          goto err;
3884      }
3885  
3886      if (flags == RAM_CONTROL_SETUP) {
3887          RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3888          RDMALocalBlocks *local = &rdma->local_ram_blocks;
3889          int reg_result_idx, nb_dest_blocks;
3890  
3891          head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3892          trace_rdma_registration_stop_ram();
3893  
3894          /*
3895           * Make sure that we parallelize the pinning on both sides.
3896           * For very large guests, doing this serially takes a really
3897           * long time, so we have to 'interleave' the pinning locally
3898           * with the control messages by performing the pinning on this
3899           * side before we receive the control response from the other
3900           * side that the pinning has completed.
3901           */
3902          ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3903                      &reg_result_idx, rdma->pin_all ?
3904                      qemu_rdma_reg_whole_ram_blocks : NULL,
3905                      &err);
3906          if (ret < 0) {
3907              error_report_err(err);
3908              return -1;
3909          }
3910  
3911          nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3912  
3913          /*
3914           * The protocol uses two different sets of rkeys (mutually exclusive):
3915           * 1. One key to represent the virtual address of the entire ram block.
3916           *    (dynamic chunk registration disabled - pin everything with one rkey.)
3917           * 2. One to represent individual chunks within a ram block.
3918           *    (dynamic chunk registration enabled - pin individual chunks.)
3919           *
3920           * Once the capability is successfully negotiated, the destination transmits
3921           * the keys to use (or sends them later) including the virtual addresses
3922           * and then propagates the remote ram block descriptions to his local copy.
3923           */
3924  
3925          if (local->nb_blocks != nb_dest_blocks) {
3926              error_report("ram blocks mismatch (Number of blocks %d vs %d)",
3927                           local->nb_blocks, nb_dest_blocks);
3928              error_printf("Your QEMU command line parameters are probably "
3929                           "not identical on both the source and destination.");
3930              rdma->errored = true;
3931              return -1;
3932          }
3933  
3934          qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3935          memcpy(rdma->dest_blocks,
3936              rdma->wr_data[reg_result_idx].control_curr, resp.len);
3937          for (int i = 0; i < nb_dest_blocks; i++) {
3938              network_to_dest_block(&rdma->dest_blocks[i]);
3939  
3940              /* We require that the blocks are in the same order */
3941              if (rdma->dest_blocks[i].length != local->block[i].length) {
3942                  error_report("Block %s/%d has a different length %" PRIu64
3943                               "vs %" PRIu64,
3944                               local->block[i].block_name, i,
3945                               local->block[i].length,
3946                               rdma->dest_blocks[i].length);
3947                  rdma->errored = true;
3948                  return -1;
3949              }
3950              local->block[i].remote_host_addr =
3951                      rdma->dest_blocks[i].remote_host_addr;
3952              local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3953          }
3954      }
3955  
3956      trace_rdma_registration_stop(flags);
3957  
3958      head.type = RDMA_CONTROL_REGISTER_FINISHED;
3959      ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err);
3960  
3961      if (ret < 0) {
3962          error_report_err(err);
3963          goto err;
3964      }
3965  
3966      return 0;
3967  err:
3968      rdma->errored = true;
3969      return -1;
3970  }
3971  
qio_channel_rdma_finalize(Object * obj)3972  static void qio_channel_rdma_finalize(Object *obj)
3973  {
3974      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3975      if (rioc->rdmain) {
3976          qemu_rdma_cleanup(rioc->rdmain);
3977          g_free(rioc->rdmain);
3978          rioc->rdmain = NULL;
3979      }
3980      if (rioc->rdmaout) {
3981          qemu_rdma_cleanup(rioc->rdmaout);
3982          g_free(rioc->rdmaout);
3983          rioc->rdmaout = NULL;
3984      }
3985  }
3986  
qio_channel_rdma_class_init(ObjectClass * klass,void * class_data G_GNUC_UNUSED)3987  static void qio_channel_rdma_class_init(ObjectClass *klass,
3988                                          void *class_data G_GNUC_UNUSED)
3989  {
3990      QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3991  
3992      ioc_klass->io_writev = qio_channel_rdma_writev;
3993      ioc_klass->io_readv = qio_channel_rdma_readv;
3994      ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3995      ioc_klass->io_close = qio_channel_rdma_close;
3996      ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3997      ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3998      ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3999  }
4000  
4001  static const TypeInfo qio_channel_rdma_info = {
4002      .parent = TYPE_QIO_CHANNEL,
4003      .name = TYPE_QIO_CHANNEL_RDMA,
4004      .instance_size = sizeof(QIOChannelRDMA),
4005      .instance_finalize = qio_channel_rdma_finalize,
4006      .class_init = qio_channel_rdma_class_init,
4007  };
4008  
qio_channel_rdma_register_types(void)4009  static void qio_channel_rdma_register_types(void)
4010  {
4011      type_register_static(&qio_channel_rdma_info);
4012  }
4013  
4014  type_init(qio_channel_rdma_register_types);
4015  
rdma_new_input(RDMAContext * rdma)4016  static QEMUFile *rdma_new_input(RDMAContext *rdma)
4017  {
4018      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4019  
4020      rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
4021      rioc->rdmain = rdma;
4022      rioc->rdmaout = rdma->return_path;
4023  
4024      return rioc->file;
4025  }
4026  
rdma_new_output(RDMAContext * rdma)4027  static QEMUFile *rdma_new_output(RDMAContext *rdma)
4028  {
4029      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4030  
4031      rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
4032      rioc->rdmaout = rdma;
4033      rioc->rdmain = rdma->return_path;
4034  
4035      return rioc->file;
4036  }
4037  
rdma_accept_incoming_migration(void * opaque)4038  static void rdma_accept_incoming_migration(void *opaque)
4039  {
4040      RDMAContext *rdma = opaque;
4041      QEMUFile *f;
4042  
4043      trace_qemu_rdma_accept_incoming_migration();
4044      if (qemu_rdma_accept(rdma) < 0) {
4045          error_report("RDMA ERROR: Migration initialization failed");
4046          return;
4047      }
4048  
4049      trace_qemu_rdma_accept_incoming_migration_accepted();
4050  
4051      if (rdma->is_return_path) {
4052          return;
4053      }
4054  
4055      f = rdma_new_input(rdma);
4056      if (f == NULL) {
4057          error_report("RDMA ERROR: could not open RDMA for input");
4058          qemu_rdma_cleanup(rdma);
4059          return;
4060      }
4061  
4062      rdma->migration_started_on_destination = 1;
4063      migration_fd_process_incoming(f);
4064  }
4065  
rdma_start_incoming_migration(InetSocketAddress * host_port,Error ** errp)4066  void rdma_start_incoming_migration(InetSocketAddress *host_port,
4067                                     Error **errp)
4068  {
4069      MigrationState *s = migrate_get_current();
4070      int ret;
4071      RDMAContext *rdma;
4072  
4073      trace_rdma_start_incoming_migration();
4074  
4075      /* Avoid ram_block_discard_disable(), cannot change during migration. */
4076      if (ram_block_discard_is_required()) {
4077          error_setg(errp, "RDMA: cannot disable RAM discard");
4078          return;
4079      }
4080  
4081      rdma = qemu_rdma_data_init(host_port, errp);
4082      if (rdma == NULL) {
4083          goto err;
4084      }
4085  
4086      ret = qemu_rdma_dest_init(rdma, errp);
4087      if (ret < 0) {
4088          goto err;
4089      }
4090  
4091      trace_rdma_start_incoming_migration_after_dest_init();
4092  
4093      ret = rdma_listen(rdma->listen_id, 5);
4094  
4095      if (ret < 0) {
4096          error_setg(errp, "RDMA ERROR: listening on socket!");
4097          goto cleanup_rdma;
4098      }
4099  
4100      trace_rdma_start_incoming_migration_after_rdma_listen();
4101      s->rdma_migration = true;
4102      qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4103                          NULL, (void *)(intptr_t)rdma);
4104      return;
4105  
4106  cleanup_rdma:
4107      qemu_rdma_cleanup(rdma);
4108  err:
4109      if (rdma) {
4110          g_free(rdma->host);
4111      }
4112      g_free(rdma);
4113  }
4114  
rdma_start_outgoing_migration(void * opaque,InetSocketAddress * host_port,Error ** errp)4115  void rdma_start_outgoing_migration(void *opaque,
4116                              InetSocketAddress *host_port, Error **errp)
4117  {
4118      MigrationState *s = opaque;
4119      RDMAContext *rdma_return_path = NULL;
4120      RDMAContext *rdma;
4121      int ret;
4122  
4123      /* Avoid ram_block_discard_disable(), cannot change during migration. */
4124      if (ram_block_discard_is_required()) {
4125          error_setg(errp, "RDMA: cannot disable RAM discard");
4126          return;
4127      }
4128  
4129      rdma = qemu_rdma_data_init(host_port, errp);
4130      if (rdma == NULL) {
4131          goto err;
4132      }
4133  
4134      ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
4135  
4136      if (ret < 0) {
4137          goto err;
4138      }
4139  
4140      trace_rdma_start_outgoing_migration_after_rdma_source_init();
4141      ret = qemu_rdma_connect(rdma, false, errp);
4142  
4143      if (ret < 0) {
4144          goto err;
4145      }
4146  
4147      /* RDMA postcopy need a separate queue pair for return path */
4148      if (migrate_postcopy() || migrate_return_path()) {
4149          rdma_return_path = qemu_rdma_data_init(host_port, errp);
4150  
4151          if (rdma_return_path == NULL) {
4152              goto return_path_err;
4153          }
4154  
4155          ret = qemu_rdma_source_init(rdma_return_path,
4156                                      migrate_rdma_pin_all(), errp);
4157  
4158          if (ret < 0) {
4159              goto return_path_err;
4160          }
4161  
4162          ret = qemu_rdma_connect(rdma_return_path, true, errp);
4163  
4164          if (ret < 0) {
4165              goto return_path_err;
4166          }
4167  
4168          rdma->return_path = rdma_return_path;
4169          rdma_return_path->return_path = rdma;
4170          rdma_return_path->is_return_path = true;
4171      }
4172  
4173      trace_rdma_start_outgoing_migration_after_rdma_connect();
4174  
4175      s->to_dst_file = rdma_new_output(rdma);
4176      s->rdma_migration = true;
4177      migrate_fd_connect(s, NULL);
4178      return;
4179  return_path_err:
4180      qemu_rdma_cleanup(rdma);
4181  err:
4182      g_free(rdma);
4183      g_free(rdma_return_path);
4184  }
4185