1d32ca5adSJuan Quintela /* 2d32ca5adSJuan Quintela * Multifd common code 3d32ca5adSJuan Quintela * 4d32ca5adSJuan Quintela * Copyright (c) 2019-2020 Red Hat Inc 5d32ca5adSJuan Quintela * 6d32ca5adSJuan Quintela * Authors: 7d32ca5adSJuan Quintela * Juan Quintela <quintela@redhat.com> 8d32ca5adSJuan Quintela * 9d32ca5adSJuan Quintela * This work is licensed under the terms of the GNU GPL, version 2 or later. 10d32ca5adSJuan Quintela * See the COPYING file in the top-level directory. 11d32ca5adSJuan Quintela */ 12d32ca5adSJuan Quintela 13d32ca5adSJuan Quintela #include "qemu/osdep.h" 14d32ca5adSJuan Quintela #include "qemu/rcu.h" 15d32ca5adSJuan Quintela #include "exec/target_page.h" 16d32ca5adSJuan Quintela #include "sysemu/sysemu.h" 17d32ca5adSJuan Quintela #include "exec/ramblock.h" 18d32ca5adSJuan Quintela #include "qemu/error-report.h" 19d32ca5adSJuan Quintela #include "qapi/error.h" 20d32ca5adSJuan Quintela #include "ram.h" 21d32ca5adSJuan Quintela #include "migration.h" 22d32ca5adSJuan Quintela #include "socket.h" 2329647140SChuan Zheng #include "tls.h" 24d32ca5adSJuan Quintela #include "qemu-file.h" 25d32ca5adSJuan Quintela #include "trace.h" 26d32ca5adSJuan Quintela #include "multifd.h" 27d32ca5adSJuan Quintela 28*b5eea99eSLukas Straub #include "qemu/yank.h" 29*b5eea99eSLukas Straub #include "io/channel-socket.h" 30*b5eea99eSLukas Straub 31d32ca5adSJuan Quintela /* Multiple fd's */ 32d32ca5adSJuan Quintela 33d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U 34d32ca5adSJuan Quintela #define MULTIFD_VERSION 1 35d32ca5adSJuan Quintela 36d32ca5adSJuan Quintela typedef struct { 37d32ca5adSJuan Quintela uint32_t magic; 38d32ca5adSJuan Quintela uint32_t version; 39d32ca5adSJuan Quintela unsigned char uuid[16]; /* QemuUUID */ 40d32ca5adSJuan Quintela uint8_t id; 41d32ca5adSJuan Quintela uint8_t unused1[7]; /* Reserved for future use */ 42d32ca5adSJuan Quintela uint64_t unused2[4]; /* Reserved for future use */ 43d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t; 44d32ca5adSJuan Quintela 45ab7cbb0bSJuan Quintela /* Multifd without compression */ 46ab7cbb0bSJuan Quintela 47ab7cbb0bSJuan Quintela /** 48ab7cbb0bSJuan Quintela * nocomp_send_setup: setup send side 49ab7cbb0bSJuan Quintela * 50ab7cbb0bSJuan Quintela * For no compression this function does nothing. 51ab7cbb0bSJuan Quintela * 52ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 53ab7cbb0bSJuan Quintela * 54ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 55ab7cbb0bSJuan Quintela * @errp: pointer to an error 56ab7cbb0bSJuan Quintela */ 57ab7cbb0bSJuan Quintela static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 58ab7cbb0bSJuan Quintela { 59ab7cbb0bSJuan Quintela return 0; 60ab7cbb0bSJuan Quintela } 61ab7cbb0bSJuan Quintela 62ab7cbb0bSJuan Quintela /** 63ab7cbb0bSJuan Quintela * nocomp_send_cleanup: cleanup send side 64ab7cbb0bSJuan Quintela * 65ab7cbb0bSJuan Quintela * For no compression this function does nothing. 66ab7cbb0bSJuan Quintela * 67ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 68ab7cbb0bSJuan Quintela */ 69ab7cbb0bSJuan Quintela static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 70ab7cbb0bSJuan Quintela { 71ab7cbb0bSJuan Quintela return; 72ab7cbb0bSJuan Quintela } 73ab7cbb0bSJuan Quintela 74ab7cbb0bSJuan Quintela /** 75ab7cbb0bSJuan Quintela * nocomp_send_prepare: prepare date to be able to send 76ab7cbb0bSJuan Quintela * 77ab7cbb0bSJuan Quintela * For no compression we just have to calculate the size of the 78ab7cbb0bSJuan Quintela * packet. 79ab7cbb0bSJuan Quintela * 80ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 81ab7cbb0bSJuan Quintela * 82ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 83ab7cbb0bSJuan Quintela * @used: number of pages used 84ab7cbb0bSJuan Quintela * @errp: pointer to an error 85ab7cbb0bSJuan Quintela */ 86ab7cbb0bSJuan Quintela static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used, 87ab7cbb0bSJuan Quintela Error **errp) 88ab7cbb0bSJuan Quintela { 89ab7cbb0bSJuan Quintela p->next_packet_size = used * qemu_target_page_size(); 90ab7cbb0bSJuan Quintela p->flags |= MULTIFD_FLAG_NOCOMP; 91ab7cbb0bSJuan Quintela return 0; 92ab7cbb0bSJuan Quintela } 93ab7cbb0bSJuan Quintela 94ab7cbb0bSJuan Quintela /** 95ab7cbb0bSJuan Quintela * nocomp_send_write: do the actual write of the data 96ab7cbb0bSJuan Quintela * 97ab7cbb0bSJuan Quintela * For no compression we just have to write the data. 98ab7cbb0bSJuan Quintela * 99ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 100ab7cbb0bSJuan Quintela * 101ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 102ab7cbb0bSJuan Quintela * @used: number of pages used 103ab7cbb0bSJuan Quintela * @errp: pointer to an error 104ab7cbb0bSJuan Quintela */ 105ab7cbb0bSJuan Quintela static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) 106ab7cbb0bSJuan Quintela { 107ab7cbb0bSJuan Quintela return qio_channel_writev_all(p->c, p->pages->iov, used, errp); 108ab7cbb0bSJuan Quintela } 109ab7cbb0bSJuan Quintela 110ab7cbb0bSJuan Quintela /** 111ab7cbb0bSJuan Quintela * nocomp_recv_setup: setup receive side 112ab7cbb0bSJuan Quintela * 113ab7cbb0bSJuan Quintela * For no compression this function does nothing. 114ab7cbb0bSJuan Quintela * 115ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 116ab7cbb0bSJuan Quintela * 117ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 118ab7cbb0bSJuan Quintela * @errp: pointer to an error 119ab7cbb0bSJuan Quintela */ 120ab7cbb0bSJuan Quintela static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 121ab7cbb0bSJuan Quintela { 122ab7cbb0bSJuan Quintela return 0; 123ab7cbb0bSJuan Quintela } 124ab7cbb0bSJuan Quintela 125ab7cbb0bSJuan Quintela /** 126ab7cbb0bSJuan Quintela * nocomp_recv_cleanup: setup receive side 127ab7cbb0bSJuan Quintela * 128ab7cbb0bSJuan Quintela * For no compression this function does nothing. 129ab7cbb0bSJuan Quintela * 130ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 131ab7cbb0bSJuan Quintela */ 132ab7cbb0bSJuan Quintela static void nocomp_recv_cleanup(MultiFDRecvParams *p) 133ab7cbb0bSJuan Quintela { 134ab7cbb0bSJuan Quintela } 135ab7cbb0bSJuan Quintela 136ab7cbb0bSJuan Quintela /** 137ab7cbb0bSJuan Quintela * nocomp_recv_pages: read the data from the channel into actual pages 138ab7cbb0bSJuan Quintela * 139ab7cbb0bSJuan Quintela * For no compression we just need to read things into the correct place. 140ab7cbb0bSJuan Quintela * 141ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 142ab7cbb0bSJuan Quintela * 143ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 144ab7cbb0bSJuan Quintela * @used: number of pages used 145ab7cbb0bSJuan Quintela * @errp: pointer to an error 146ab7cbb0bSJuan Quintela */ 147ab7cbb0bSJuan Quintela static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) 148ab7cbb0bSJuan Quintela { 149ab7cbb0bSJuan Quintela uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 150ab7cbb0bSJuan Quintela 151ab7cbb0bSJuan Quintela if (flags != MULTIFD_FLAG_NOCOMP) { 152ab7cbb0bSJuan Quintela error_setg(errp, "multifd %d: flags received %x flags expected %x", 153ab7cbb0bSJuan Quintela p->id, flags, MULTIFD_FLAG_NOCOMP); 154ab7cbb0bSJuan Quintela return -1; 155ab7cbb0bSJuan Quintela } 156ab7cbb0bSJuan Quintela return qio_channel_readv_all(p->c, p->pages->iov, used, errp); 157ab7cbb0bSJuan Quintela } 158ab7cbb0bSJuan Quintela 159ab7cbb0bSJuan Quintela static MultiFDMethods multifd_nocomp_ops = { 160ab7cbb0bSJuan Quintela .send_setup = nocomp_send_setup, 161ab7cbb0bSJuan Quintela .send_cleanup = nocomp_send_cleanup, 162ab7cbb0bSJuan Quintela .send_prepare = nocomp_send_prepare, 163ab7cbb0bSJuan Quintela .send_write = nocomp_send_write, 164ab7cbb0bSJuan Quintela .recv_setup = nocomp_recv_setup, 165ab7cbb0bSJuan Quintela .recv_cleanup = nocomp_recv_cleanup, 166ab7cbb0bSJuan Quintela .recv_pages = nocomp_recv_pages 167ab7cbb0bSJuan Quintela }; 168ab7cbb0bSJuan Quintela 169ab7cbb0bSJuan Quintela static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 170ab7cbb0bSJuan Quintela [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 171ab7cbb0bSJuan Quintela }; 172ab7cbb0bSJuan Quintela 1737ec2c2b3SJuan Quintela void multifd_register_ops(int method, MultiFDMethods *ops) 1747ec2c2b3SJuan Quintela { 1757ec2c2b3SJuan Quintela assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 1767ec2c2b3SJuan Quintela multifd_ops[method] = ops; 1777ec2c2b3SJuan Quintela } 1787ec2c2b3SJuan Quintela 179d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 180d32ca5adSJuan Quintela { 181d32ca5adSJuan Quintela MultiFDInit_t msg = {}; 182d32ca5adSJuan Quintela int ret; 183d32ca5adSJuan Quintela 184d32ca5adSJuan Quintela msg.magic = cpu_to_be32(MULTIFD_MAGIC); 185d32ca5adSJuan Quintela msg.version = cpu_to_be32(MULTIFD_VERSION); 186d32ca5adSJuan Quintela msg.id = p->id; 187d32ca5adSJuan Quintela memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 188d32ca5adSJuan Quintela 189d32ca5adSJuan Quintela ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); 190d32ca5adSJuan Quintela if (ret != 0) { 191d32ca5adSJuan Quintela return -1; 192d32ca5adSJuan Quintela } 193d32ca5adSJuan Quintela return 0; 194d32ca5adSJuan Quintela } 195d32ca5adSJuan Quintela 196d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 197d32ca5adSJuan Quintela { 198d32ca5adSJuan Quintela MultiFDInit_t msg; 199d32ca5adSJuan Quintela int ret; 200d32ca5adSJuan Quintela 201d32ca5adSJuan Quintela ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 202d32ca5adSJuan Quintela if (ret != 0) { 203d32ca5adSJuan Quintela return -1; 204d32ca5adSJuan Quintela } 205d32ca5adSJuan Quintela 206d32ca5adSJuan Quintela msg.magic = be32_to_cpu(msg.magic); 207d32ca5adSJuan Quintela msg.version = be32_to_cpu(msg.version); 208d32ca5adSJuan Quintela 209d32ca5adSJuan Quintela if (msg.magic != MULTIFD_MAGIC) { 210d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet magic %x " 211d32ca5adSJuan Quintela "expected %x", msg.magic, MULTIFD_MAGIC); 212d32ca5adSJuan Quintela return -1; 213d32ca5adSJuan Quintela } 214d32ca5adSJuan Quintela 215d32ca5adSJuan Quintela if (msg.version != MULTIFD_VERSION) { 216d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet version %d " 217d32ca5adSJuan Quintela "expected %d", msg.version, MULTIFD_VERSION); 218d32ca5adSJuan Quintela return -1; 219d32ca5adSJuan Quintela } 220d32ca5adSJuan Quintela 221d32ca5adSJuan Quintela if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 222d32ca5adSJuan Quintela char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 223d32ca5adSJuan Quintela char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 224d32ca5adSJuan Quintela 225d32ca5adSJuan Quintela error_setg(errp, "multifd: received uuid '%s' and expected " 226d32ca5adSJuan Quintela "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 227d32ca5adSJuan Quintela g_free(uuid); 228d32ca5adSJuan Quintela g_free(msg_uuid); 229d32ca5adSJuan Quintela return -1; 230d32ca5adSJuan Quintela } 231d32ca5adSJuan Quintela 232d32ca5adSJuan Quintela if (msg.id > migrate_multifd_channels()) { 233d32ca5adSJuan Quintela error_setg(errp, "multifd: received channel version %d " 234d32ca5adSJuan Quintela "expected %d", msg.version, MULTIFD_VERSION); 235d32ca5adSJuan Quintela return -1; 236d32ca5adSJuan Quintela } 237d32ca5adSJuan Quintela 238d32ca5adSJuan Quintela return msg.id; 239d32ca5adSJuan Quintela } 240d32ca5adSJuan Quintela 241d32ca5adSJuan Quintela static MultiFDPages_t *multifd_pages_init(size_t size) 242d32ca5adSJuan Quintela { 243d32ca5adSJuan Quintela MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 244d32ca5adSJuan Quintela 245d32ca5adSJuan Quintela pages->allocated = size; 246d32ca5adSJuan Quintela pages->iov = g_new0(struct iovec, size); 247d32ca5adSJuan Quintela pages->offset = g_new0(ram_addr_t, size); 248d32ca5adSJuan Quintela 249d32ca5adSJuan Quintela return pages; 250d32ca5adSJuan Quintela } 251d32ca5adSJuan Quintela 252d32ca5adSJuan Quintela static void multifd_pages_clear(MultiFDPages_t *pages) 253d32ca5adSJuan Quintela { 254d32ca5adSJuan Quintela pages->used = 0; 255d32ca5adSJuan Quintela pages->allocated = 0; 256d32ca5adSJuan Quintela pages->packet_num = 0; 257d32ca5adSJuan Quintela pages->block = NULL; 258d32ca5adSJuan Quintela g_free(pages->iov); 259d32ca5adSJuan Quintela pages->iov = NULL; 260d32ca5adSJuan Quintela g_free(pages->offset); 261d32ca5adSJuan Quintela pages->offset = NULL; 262d32ca5adSJuan Quintela g_free(pages); 263d32ca5adSJuan Quintela } 264d32ca5adSJuan Quintela 265d32ca5adSJuan Quintela static void multifd_send_fill_packet(MultiFDSendParams *p) 266d32ca5adSJuan Quintela { 267d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 268d32ca5adSJuan Quintela int i; 269d32ca5adSJuan Quintela 270d32ca5adSJuan Quintela packet->flags = cpu_to_be32(p->flags); 271d32ca5adSJuan Quintela packet->pages_alloc = cpu_to_be32(p->pages->allocated); 272d32ca5adSJuan Quintela packet->pages_used = cpu_to_be32(p->pages->used); 273d32ca5adSJuan Quintela packet->next_packet_size = cpu_to_be32(p->next_packet_size); 274d32ca5adSJuan Quintela packet->packet_num = cpu_to_be64(p->packet_num); 275d32ca5adSJuan Quintela 276d32ca5adSJuan Quintela if (p->pages->block) { 277d32ca5adSJuan Quintela strncpy(packet->ramblock, p->pages->block->idstr, 256); 278d32ca5adSJuan Quintela } 279d32ca5adSJuan Quintela 280d32ca5adSJuan Quintela for (i = 0; i < p->pages->used; i++) { 281d32ca5adSJuan Quintela /* there are architectures where ram_addr_t is 32 bit */ 282d32ca5adSJuan Quintela uint64_t temp = p->pages->offset[i]; 283d32ca5adSJuan Quintela 284d32ca5adSJuan Quintela packet->offset[i] = cpu_to_be64(temp); 285d32ca5adSJuan Quintela } 286d32ca5adSJuan Quintela } 287d32ca5adSJuan Quintela 288d32ca5adSJuan Quintela static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 289d32ca5adSJuan Quintela { 290d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 291d32ca5adSJuan Quintela uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 292d32ca5adSJuan Quintela RAMBlock *block; 293d32ca5adSJuan Quintela int i; 294d32ca5adSJuan Quintela 295d32ca5adSJuan Quintela packet->magic = be32_to_cpu(packet->magic); 296d32ca5adSJuan Quintela if (packet->magic != MULTIFD_MAGIC) { 297d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 298d32ca5adSJuan Quintela "magic %x and expected magic %x", 299d32ca5adSJuan Quintela packet->magic, MULTIFD_MAGIC); 300d32ca5adSJuan Quintela return -1; 301d32ca5adSJuan Quintela } 302d32ca5adSJuan Quintela 303d32ca5adSJuan Quintela packet->version = be32_to_cpu(packet->version); 304d32ca5adSJuan Quintela if (packet->version != MULTIFD_VERSION) { 305d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 306d32ca5adSJuan Quintela "version %d and expected version %d", 307d32ca5adSJuan Quintela packet->version, MULTIFD_VERSION); 308d32ca5adSJuan Quintela return -1; 309d32ca5adSJuan Quintela } 310d32ca5adSJuan Quintela 311d32ca5adSJuan Quintela p->flags = be32_to_cpu(packet->flags); 312d32ca5adSJuan Quintela 313d32ca5adSJuan Quintela packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 314d32ca5adSJuan Quintela /* 315d32ca5adSJuan Quintela * If we received a packet that is 100 times bigger than expected 316d32ca5adSJuan Quintela * just stop migration. It is a magic number. 317d32ca5adSJuan Quintela */ 318d32ca5adSJuan Quintela if (packet->pages_alloc > pages_max * 100) { 319d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 320d32ca5adSJuan Quintela "with size %d and expected a maximum size of %d", 321d32ca5adSJuan Quintela packet->pages_alloc, pages_max * 100) ; 322d32ca5adSJuan Quintela return -1; 323d32ca5adSJuan Quintela } 324d32ca5adSJuan Quintela /* 325d32ca5adSJuan Quintela * We received a packet that is bigger than expected but inside 326d32ca5adSJuan Quintela * reasonable limits (see previous comment). Just reallocate. 327d32ca5adSJuan Quintela */ 328d32ca5adSJuan Quintela if (packet->pages_alloc > p->pages->allocated) { 329d32ca5adSJuan Quintela multifd_pages_clear(p->pages); 330d32ca5adSJuan Quintela p->pages = multifd_pages_init(packet->pages_alloc); 331d32ca5adSJuan Quintela } 332d32ca5adSJuan Quintela 333d32ca5adSJuan Quintela p->pages->used = be32_to_cpu(packet->pages_used); 334d32ca5adSJuan Quintela if (p->pages->used > packet->pages_alloc) { 335d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 336d32ca5adSJuan Quintela "with %d pages and expected maximum pages are %d", 337d32ca5adSJuan Quintela p->pages->used, packet->pages_alloc) ; 338d32ca5adSJuan Quintela return -1; 339d32ca5adSJuan Quintela } 340d32ca5adSJuan Quintela 341d32ca5adSJuan Quintela p->next_packet_size = be32_to_cpu(packet->next_packet_size); 342d32ca5adSJuan Quintela p->packet_num = be64_to_cpu(packet->packet_num); 343d32ca5adSJuan Quintela 344d32ca5adSJuan Quintela if (p->pages->used == 0) { 345d32ca5adSJuan Quintela return 0; 346d32ca5adSJuan Quintela } 347d32ca5adSJuan Quintela 348d32ca5adSJuan Quintela /* make sure that ramblock is 0 terminated */ 349d32ca5adSJuan Quintela packet->ramblock[255] = 0; 350d32ca5adSJuan Quintela block = qemu_ram_block_by_name(packet->ramblock); 351d32ca5adSJuan Quintela if (!block) { 352d32ca5adSJuan Quintela error_setg(errp, "multifd: unknown ram block %s", 353d32ca5adSJuan Quintela packet->ramblock); 354d32ca5adSJuan Quintela return -1; 355d32ca5adSJuan Quintela } 356d32ca5adSJuan Quintela 357d32ca5adSJuan Quintela for (i = 0; i < p->pages->used; i++) { 358d32ca5adSJuan Quintela uint64_t offset = be64_to_cpu(packet->offset[i]); 359d32ca5adSJuan Quintela 360d32ca5adSJuan Quintela if (offset > (block->used_length - qemu_target_page_size())) { 361d32ca5adSJuan Quintela error_setg(errp, "multifd: offset too long %" PRIu64 362d32ca5adSJuan Quintela " (max " RAM_ADDR_FMT ")", 363d32ca5adSJuan Quintela offset, block->max_length); 364d32ca5adSJuan Quintela return -1; 365d32ca5adSJuan Quintela } 366d32ca5adSJuan Quintela p->pages->iov[i].iov_base = block->host + offset; 367d32ca5adSJuan Quintela p->pages->iov[i].iov_len = qemu_target_page_size(); 368d32ca5adSJuan Quintela } 369d32ca5adSJuan Quintela 370d32ca5adSJuan Quintela return 0; 371d32ca5adSJuan Quintela } 372d32ca5adSJuan Quintela 373d32ca5adSJuan Quintela struct { 374d32ca5adSJuan Quintela MultiFDSendParams *params; 375d32ca5adSJuan Quintela /* array of pages to sent */ 376d32ca5adSJuan Quintela MultiFDPages_t *pages; 377d32ca5adSJuan Quintela /* global number of generated multifd packets */ 378d32ca5adSJuan Quintela uint64_t packet_num; 379d32ca5adSJuan Quintela /* send channels ready */ 380d32ca5adSJuan Quintela QemuSemaphore channels_ready; 381d32ca5adSJuan Quintela /* 382d32ca5adSJuan Quintela * Have we already run terminate threads. There is a race when it 383d32ca5adSJuan Quintela * happens that we got one error while we are exiting. 384d32ca5adSJuan Quintela * We will use atomic operations. Only valid values are 0 and 1. 385d32ca5adSJuan Quintela */ 386d32ca5adSJuan Quintela int exiting; 387ab7cbb0bSJuan Quintela /* multifd ops */ 388ab7cbb0bSJuan Quintela MultiFDMethods *ops; 389d32ca5adSJuan Quintela } *multifd_send_state; 390d32ca5adSJuan Quintela 391d32ca5adSJuan Quintela /* 392d32ca5adSJuan Quintela * How we use multifd_send_state->pages and channel->pages? 393d32ca5adSJuan Quintela * 394d32ca5adSJuan Quintela * We create a pages for each channel, and a main one. Each time that 395d32ca5adSJuan Quintela * we need to send a batch of pages we interchange the ones between 396d32ca5adSJuan Quintela * multifd_send_state and the channel that is sending it. There are 397d32ca5adSJuan Quintela * two reasons for that: 398d32ca5adSJuan Quintela * - to not have to do so many mallocs during migration 399d32ca5adSJuan Quintela * - to make easier to know what to free at the end of migration 400d32ca5adSJuan Quintela * 401d32ca5adSJuan Quintela * This way we always know who is the owner of each "pages" struct, 402d32ca5adSJuan Quintela * and we don't need any locking. It belongs to the migration thread 403d32ca5adSJuan Quintela * or to the channel thread. Switching is safe because the migration 404d32ca5adSJuan Quintela * thread is using the channel mutex when changing it, and the channel 405d32ca5adSJuan Quintela * have to had finish with its own, otherwise pending_job can't be 406d32ca5adSJuan Quintela * false. 407d32ca5adSJuan Quintela */ 408d32ca5adSJuan Quintela 409d32ca5adSJuan Quintela static int multifd_send_pages(QEMUFile *f) 410d32ca5adSJuan Quintela { 411d32ca5adSJuan Quintela int i; 412d32ca5adSJuan Quintela static int next_channel; 413d32ca5adSJuan Quintela MultiFDSendParams *p = NULL; /* make happy gcc */ 414d32ca5adSJuan Quintela MultiFDPages_t *pages = multifd_send_state->pages; 415d32ca5adSJuan Quintela uint64_t transferred; 416d32ca5adSJuan Quintela 417d73415a3SStefan Hajnoczi if (qatomic_read(&multifd_send_state->exiting)) { 418d32ca5adSJuan Quintela return -1; 419d32ca5adSJuan Quintela } 420d32ca5adSJuan Quintela 421d32ca5adSJuan Quintela qemu_sem_wait(&multifd_send_state->channels_ready); 4227e89a140SLaurent Vivier /* 4237e89a140SLaurent Vivier * next_channel can remain from a previous migration that was 4247e89a140SLaurent Vivier * using more channels, so ensure it doesn't overflow if the 4257e89a140SLaurent Vivier * limit is lower now. 4267e89a140SLaurent Vivier */ 4277e89a140SLaurent Vivier next_channel %= migrate_multifd_channels(); 428d32ca5adSJuan Quintela for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 429d32ca5adSJuan Quintela p = &multifd_send_state->params[i]; 430d32ca5adSJuan Quintela 431d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 432d32ca5adSJuan Quintela if (p->quit) { 433d32ca5adSJuan Quintela error_report("%s: channel %d has already quit!", __func__, i); 434d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 435d32ca5adSJuan Quintela return -1; 436d32ca5adSJuan Quintela } 437d32ca5adSJuan Quintela if (!p->pending_job) { 438d32ca5adSJuan Quintela p->pending_job++; 439d32ca5adSJuan Quintela next_channel = (i + 1) % migrate_multifd_channels(); 440d32ca5adSJuan Quintela break; 441d32ca5adSJuan Quintela } 442d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 443d32ca5adSJuan Quintela } 444d32ca5adSJuan Quintela assert(!p->pages->used); 445d32ca5adSJuan Quintela assert(!p->pages->block); 446d32ca5adSJuan Quintela 447d32ca5adSJuan Quintela p->packet_num = multifd_send_state->packet_num++; 448d32ca5adSJuan Quintela multifd_send_state->pages = p->pages; 449d32ca5adSJuan Quintela p->pages = pages; 450d32ca5adSJuan Quintela transferred = ((uint64_t) pages->used) * qemu_target_page_size() 451d32ca5adSJuan Quintela + p->packet_len; 452d32ca5adSJuan Quintela qemu_file_update_transfer(f, transferred); 453d32ca5adSJuan Quintela ram_counters.multifd_bytes += transferred; 454df555094SPhilippe Mathieu-Daudé ram_counters.transferred += transferred; 455d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 456d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 457d32ca5adSJuan Quintela 458d32ca5adSJuan Quintela return 1; 459d32ca5adSJuan Quintela } 460d32ca5adSJuan Quintela 461d32ca5adSJuan Quintela int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) 462d32ca5adSJuan Quintela { 463d32ca5adSJuan Quintela MultiFDPages_t *pages = multifd_send_state->pages; 464d32ca5adSJuan Quintela 465d32ca5adSJuan Quintela if (!pages->block) { 466d32ca5adSJuan Quintela pages->block = block; 467d32ca5adSJuan Quintela } 468d32ca5adSJuan Quintela 469d32ca5adSJuan Quintela if (pages->block == block) { 470d32ca5adSJuan Quintela pages->offset[pages->used] = offset; 471d32ca5adSJuan Quintela pages->iov[pages->used].iov_base = block->host + offset; 472d32ca5adSJuan Quintela pages->iov[pages->used].iov_len = qemu_target_page_size(); 473d32ca5adSJuan Quintela pages->used++; 474d32ca5adSJuan Quintela 475d32ca5adSJuan Quintela if (pages->used < pages->allocated) { 476d32ca5adSJuan Quintela return 1; 477d32ca5adSJuan Quintela } 478d32ca5adSJuan Quintela } 479d32ca5adSJuan Quintela 480d32ca5adSJuan Quintela if (multifd_send_pages(f) < 0) { 481d32ca5adSJuan Quintela return -1; 482d32ca5adSJuan Quintela } 483d32ca5adSJuan Quintela 484d32ca5adSJuan Quintela if (pages->block != block) { 485d32ca5adSJuan Quintela return multifd_queue_page(f, block, offset); 486d32ca5adSJuan Quintela } 487d32ca5adSJuan Quintela 488d32ca5adSJuan Quintela return 1; 489d32ca5adSJuan Quintela } 490d32ca5adSJuan Quintela 491d32ca5adSJuan Quintela static void multifd_send_terminate_threads(Error *err) 492d32ca5adSJuan Quintela { 493d32ca5adSJuan Quintela int i; 494d32ca5adSJuan Quintela 495d32ca5adSJuan Quintela trace_multifd_send_terminate_threads(err != NULL); 496d32ca5adSJuan Quintela 497d32ca5adSJuan Quintela if (err) { 498d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 499d32ca5adSJuan Quintela migrate_set_error(s, err); 500d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 501d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 502d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_DEVICE || 503d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 504d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 505d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 506d32ca5adSJuan Quintela } 507d32ca5adSJuan Quintela } 508d32ca5adSJuan Quintela 509d32ca5adSJuan Quintela /* 510d32ca5adSJuan Quintela * We don't want to exit each threads twice. Depending on where 511d32ca5adSJuan Quintela * we get the error, or if there are two independent errors in two 512d32ca5adSJuan Quintela * threads at the same time, we can end calling this function 513d32ca5adSJuan Quintela * twice. 514d32ca5adSJuan Quintela */ 515d73415a3SStefan Hajnoczi if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 516d32ca5adSJuan Quintela return; 517d32ca5adSJuan Quintela } 518d32ca5adSJuan Quintela 519d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 520d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 521d32ca5adSJuan Quintela 522d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 523d32ca5adSJuan Quintela p->quit = true; 524d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 525d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 526d32ca5adSJuan Quintela } 527d32ca5adSJuan Quintela } 528d32ca5adSJuan Quintela 529d32ca5adSJuan Quintela void multifd_save_cleanup(void) 530d32ca5adSJuan Quintela { 531d32ca5adSJuan Quintela int i; 532d32ca5adSJuan Quintela 533d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 534d32ca5adSJuan Quintela return; 535d32ca5adSJuan Quintela } 536d32ca5adSJuan Quintela multifd_send_terminate_threads(NULL); 537d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 538d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 539d32ca5adSJuan Quintela 540d32ca5adSJuan Quintela if (p->running) { 541d32ca5adSJuan Quintela qemu_thread_join(&p->thread); 542d32ca5adSJuan Quintela } 543d32ca5adSJuan Quintela } 544d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 545d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 546ab7cbb0bSJuan Quintela Error *local_err = NULL; 547d32ca5adSJuan Quintela 548d32ca5adSJuan Quintela socket_send_channel_destroy(p->c); 549d32ca5adSJuan Quintela p->c = NULL; 550d32ca5adSJuan Quintela qemu_mutex_destroy(&p->mutex); 551d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem); 552d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem_sync); 553d32ca5adSJuan Quintela g_free(p->name); 554d32ca5adSJuan Quintela p->name = NULL; 5558e5fa059SChuan Zheng g_free(p->tls_hostname); 5568e5fa059SChuan Zheng p->tls_hostname = NULL; 557d32ca5adSJuan Quintela multifd_pages_clear(p->pages); 558d32ca5adSJuan Quintela p->pages = NULL; 559d32ca5adSJuan Quintela p->packet_len = 0; 560d32ca5adSJuan Quintela g_free(p->packet); 561d32ca5adSJuan Quintela p->packet = NULL; 562ab7cbb0bSJuan Quintela multifd_send_state->ops->send_cleanup(p, &local_err); 563ab7cbb0bSJuan Quintela if (local_err) { 564ab7cbb0bSJuan Quintela migrate_set_error(migrate_get_current(), local_err); 56513f2cb21SPan Nengyuan error_free(local_err); 566ab7cbb0bSJuan Quintela } 567d32ca5adSJuan Quintela } 568d32ca5adSJuan Quintela qemu_sem_destroy(&multifd_send_state->channels_ready); 569d32ca5adSJuan Quintela g_free(multifd_send_state->params); 570d32ca5adSJuan Quintela multifd_send_state->params = NULL; 571d32ca5adSJuan Quintela multifd_pages_clear(multifd_send_state->pages); 572d32ca5adSJuan Quintela multifd_send_state->pages = NULL; 573d32ca5adSJuan Quintela g_free(multifd_send_state); 574d32ca5adSJuan Quintela multifd_send_state = NULL; 575d32ca5adSJuan Quintela } 576d32ca5adSJuan Quintela 577d32ca5adSJuan Quintela void multifd_send_sync_main(QEMUFile *f) 578d32ca5adSJuan Quintela { 579d32ca5adSJuan Quintela int i; 580d32ca5adSJuan Quintela 581d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 582d32ca5adSJuan Quintela return; 583d32ca5adSJuan Quintela } 584d32ca5adSJuan Quintela if (multifd_send_state->pages->used) { 585d32ca5adSJuan Quintela if (multifd_send_pages(f) < 0) { 586d32ca5adSJuan Quintela error_report("%s: multifd_send_pages fail", __func__); 587d32ca5adSJuan Quintela return; 588d32ca5adSJuan Quintela } 589d32ca5adSJuan Quintela } 590d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 591d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 592d32ca5adSJuan Quintela 593d32ca5adSJuan Quintela trace_multifd_send_sync_main_signal(p->id); 594d32ca5adSJuan Quintela 595d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 596d32ca5adSJuan Quintela 597d32ca5adSJuan Quintela if (p->quit) { 598d32ca5adSJuan Quintela error_report("%s: channel %d has already quit", __func__, i); 599d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 600d32ca5adSJuan Quintela return; 601d32ca5adSJuan Quintela } 602d32ca5adSJuan Quintela 603d32ca5adSJuan Quintela p->packet_num = multifd_send_state->packet_num++; 604d32ca5adSJuan Quintela p->flags |= MULTIFD_FLAG_SYNC; 605d32ca5adSJuan Quintela p->pending_job++; 606d32ca5adSJuan Quintela qemu_file_update_transfer(f, p->packet_len); 607d32ca5adSJuan Quintela ram_counters.multifd_bytes += p->packet_len; 608d32ca5adSJuan Quintela ram_counters.transferred += p->packet_len; 609d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 610d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 611d32ca5adSJuan Quintela } 612d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 613d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 614d32ca5adSJuan Quintela 615d32ca5adSJuan Quintela trace_multifd_send_sync_main_wait(p->id); 616d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 617d32ca5adSJuan Quintela } 618d32ca5adSJuan Quintela trace_multifd_send_sync_main(multifd_send_state->packet_num); 619d32ca5adSJuan Quintela } 620d32ca5adSJuan Quintela 621d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque) 622d32ca5adSJuan Quintela { 623d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 624d32ca5adSJuan Quintela Error *local_err = NULL; 625d32ca5adSJuan Quintela int ret = 0; 626d32ca5adSJuan Quintela uint32_t flags = 0; 627d32ca5adSJuan Quintela 628d32ca5adSJuan Quintela trace_multifd_send_thread_start(p->id); 629d32ca5adSJuan Quintela rcu_register_thread(); 630d32ca5adSJuan Quintela 631d32ca5adSJuan Quintela if (multifd_send_initial_packet(p, &local_err) < 0) { 632d32ca5adSJuan Quintela ret = -1; 633d32ca5adSJuan Quintela goto out; 634d32ca5adSJuan Quintela } 635d32ca5adSJuan Quintela /* initial packet */ 636d32ca5adSJuan Quintela p->num_packets = 1; 637d32ca5adSJuan Quintela 638d32ca5adSJuan Quintela while (true) { 639d32ca5adSJuan Quintela qemu_sem_wait(&p->sem); 640d32ca5adSJuan Quintela 641d73415a3SStefan Hajnoczi if (qatomic_read(&multifd_send_state->exiting)) { 642d32ca5adSJuan Quintela break; 643d32ca5adSJuan Quintela } 644d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 645d32ca5adSJuan Quintela 646d32ca5adSJuan Quintela if (p->pending_job) { 647d32ca5adSJuan Quintela uint32_t used = p->pages->used; 648d32ca5adSJuan Quintela uint64_t packet_num = p->packet_num; 649d32ca5adSJuan Quintela flags = p->flags; 650d32ca5adSJuan Quintela 651ab7cbb0bSJuan Quintela if (used) { 652ab7cbb0bSJuan Quintela ret = multifd_send_state->ops->send_prepare(p, used, 653ab7cbb0bSJuan Quintela &local_err); 654ab7cbb0bSJuan Quintela if (ret != 0) { 655ab7cbb0bSJuan Quintela qemu_mutex_unlock(&p->mutex); 656ab7cbb0bSJuan Quintela break; 657ab7cbb0bSJuan Quintela } 658ab7cbb0bSJuan Quintela } 659d32ca5adSJuan Quintela multifd_send_fill_packet(p); 660d32ca5adSJuan Quintela p->flags = 0; 661d32ca5adSJuan Quintela p->num_packets++; 662d32ca5adSJuan Quintela p->num_pages += used; 663d32ca5adSJuan Quintela p->pages->used = 0; 664d32ca5adSJuan Quintela p->pages->block = NULL; 665d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 666d32ca5adSJuan Quintela 667d32ca5adSJuan Quintela trace_multifd_send(p->id, packet_num, used, flags, 668d32ca5adSJuan Quintela p->next_packet_size); 669d32ca5adSJuan Quintela 670d32ca5adSJuan Quintela ret = qio_channel_write_all(p->c, (void *)p->packet, 671d32ca5adSJuan Quintela p->packet_len, &local_err); 672d32ca5adSJuan Quintela if (ret != 0) { 673d32ca5adSJuan Quintela break; 674d32ca5adSJuan Quintela } 675d32ca5adSJuan Quintela 676d32ca5adSJuan Quintela if (used) { 677ab7cbb0bSJuan Quintela ret = multifd_send_state->ops->send_write(p, used, &local_err); 678d32ca5adSJuan Quintela if (ret != 0) { 679d32ca5adSJuan Quintela break; 680d32ca5adSJuan Quintela } 681d32ca5adSJuan Quintela } 682d32ca5adSJuan Quintela 683d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 684d32ca5adSJuan Quintela p->pending_job--; 685d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 686d32ca5adSJuan Quintela 687d32ca5adSJuan Quintela if (flags & MULTIFD_FLAG_SYNC) { 688d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 689d32ca5adSJuan Quintela } 690d32ca5adSJuan Quintela qemu_sem_post(&multifd_send_state->channels_ready); 691d32ca5adSJuan Quintela } else if (p->quit) { 692d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 693d32ca5adSJuan Quintela break; 694d32ca5adSJuan Quintela } else { 695d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 696d32ca5adSJuan Quintela /* sometimes there are spurious wakeups */ 697d32ca5adSJuan Quintela } 698d32ca5adSJuan Quintela } 699d32ca5adSJuan Quintela 700d32ca5adSJuan Quintela out: 701d32ca5adSJuan Quintela if (local_err) { 702d32ca5adSJuan Quintela trace_multifd_send_error(p->id); 703d32ca5adSJuan Quintela multifd_send_terminate_threads(local_err); 70413f2cb21SPan Nengyuan error_free(local_err); 705d32ca5adSJuan Quintela } 706d32ca5adSJuan Quintela 707d32ca5adSJuan Quintela /* 708d32ca5adSJuan Quintela * Error happen, I will exit, but I can't just leave, tell 709d32ca5adSJuan Quintela * who pay attention to me. 710d32ca5adSJuan Quintela */ 711d32ca5adSJuan Quintela if (ret != 0) { 712d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 713d32ca5adSJuan Quintela qemu_sem_post(&multifd_send_state->channels_ready); 714d32ca5adSJuan Quintela } 715d32ca5adSJuan Quintela 716d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 717d32ca5adSJuan Quintela p->running = false; 718d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 719d32ca5adSJuan Quintela 720d32ca5adSJuan Quintela rcu_unregister_thread(); 721d32ca5adSJuan Quintela trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); 722d32ca5adSJuan Quintela 723d32ca5adSJuan Quintela return NULL; 724d32ca5adSJuan Quintela } 725d32ca5adSJuan Quintela 72629647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p, 72729647140SChuan Zheng QIOChannel *ioc, 72829647140SChuan Zheng Error *error); 72929647140SChuan Zheng 73029647140SChuan Zheng static void multifd_tls_outgoing_handshake(QIOTask *task, 73129647140SChuan Zheng gpointer opaque) 73229647140SChuan Zheng { 73329647140SChuan Zheng MultiFDSendParams *p = opaque; 73429647140SChuan Zheng QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 73529647140SChuan Zheng Error *err = NULL; 73629647140SChuan Zheng 737894f0214SChuan Zheng if (qio_task_propagate_error(task, &err)) { 738894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); 739894f0214SChuan Zheng } else { 740894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_complete(ioc); 741894f0214SChuan Zheng } 74229647140SChuan Zheng multifd_channel_connect(p, ioc, err); 74329647140SChuan Zheng } 74429647140SChuan Zheng 745a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque) 746a1af605bSChuan Zheng { 747a1af605bSChuan Zheng MultiFDSendParams *p = opaque; 748a1af605bSChuan Zheng QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); 749a1af605bSChuan Zheng 750a1af605bSChuan Zheng qio_channel_tls_handshake(tioc, 751a1af605bSChuan Zheng multifd_tls_outgoing_handshake, 752a1af605bSChuan Zheng p, 753a1af605bSChuan Zheng NULL, 754a1af605bSChuan Zheng NULL); 755a1af605bSChuan Zheng return NULL; 756a1af605bSChuan Zheng } 757a1af605bSChuan Zheng 75829647140SChuan Zheng static void multifd_tls_channel_connect(MultiFDSendParams *p, 75929647140SChuan Zheng QIOChannel *ioc, 76029647140SChuan Zheng Error **errp) 76129647140SChuan Zheng { 76229647140SChuan Zheng MigrationState *s = migrate_get_current(); 76329647140SChuan Zheng const char *hostname = p->tls_hostname; 76429647140SChuan Zheng QIOChannelTLS *tioc; 76529647140SChuan Zheng 76629647140SChuan Zheng tioc = migration_tls_client_create(s, ioc, hostname, errp); 76729647140SChuan Zheng if (!tioc) { 76829647140SChuan Zheng return; 76929647140SChuan Zheng } 77029647140SChuan Zheng 7719e842408SChuan Zheng object_unref(OBJECT(ioc)); 772894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 77329647140SChuan Zheng qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 774a1af605bSChuan Zheng p->c = QIO_CHANNEL(tioc); 775a1af605bSChuan Zheng qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", 776a1af605bSChuan Zheng multifd_tls_handshake_thread, p, 777a1af605bSChuan Zheng QEMU_THREAD_JOINABLE); 77829647140SChuan Zheng } 77929647140SChuan Zheng 78029647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p, 78129647140SChuan Zheng QIOChannel *ioc, 78229647140SChuan Zheng Error *error) 78329647140SChuan Zheng { 78429647140SChuan Zheng MigrationState *s = migrate_get_current(); 78529647140SChuan Zheng 786894f0214SChuan Zheng trace_multifd_set_outgoing_channel( 787894f0214SChuan Zheng ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error); 788894f0214SChuan Zheng 78929647140SChuan Zheng if (!error) { 79029647140SChuan Zheng if (s->parameters.tls_creds && 79129647140SChuan Zheng *s->parameters.tls_creds && 79229647140SChuan Zheng !object_dynamic_cast(OBJECT(ioc), 79329647140SChuan Zheng TYPE_QIO_CHANNEL_TLS)) { 79429647140SChuan Zheng multifd_tls_channel_connect(p, ioc, &error); 79529647140SChuan Zheng if (!error) { 79629647140SChuan Zheng /* 79729647140SChuan Zheng * tls_channel_connect will call back to this 79829647140SChuan Zheng * function after the TLS handshake, 79929647140SChuan Zheng * so we mustn't call multifd_send_thread until then 80029647140SChuan Zheng */ 80129647140SChuan Zheng return false; 80229647140SChuan Zheng } else { 80329647140SChuan Zheng return true; 80429647140SChuan Zheng } 80529647140SChuan Zheng } else { 80629647140SChuan Zheng /* update for tls qio channel */ 80729647140SChuan Zheng p->c = ioc; 80829647140SChuan Zheng qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 80929647140SChuan Zheng QEMU_THREAD_JOINABLE); 81029647140SChuan Zheng } 81129647140SChuan Zheng return false; 81229647140SChuan Zheng } 81329647140SChuan Zheng 81429647140SChuan Zheng return true; 81529647140SChuan Zheng } 81629647140SChuan Zheng 81703c7a42dSChuan Zheng static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, 81803c7a42dSChuan Zheng QIOChannel *ioc, Error *err) 81903c7a42dSChuan Zheng { 82003c7a42dSChuan Zheng migrate_set_error(migrate_get_current(), err); 82103c7a42dSChuan Zheng /* Error happen, we need to tell who pay attention to me */ 82203c7a42dSChuan Zheng qemu_sem_post(&multifd_send_state->channels_ready); 82303c7a42dSChuan Zheng qemu_sem_post(&p->sem_sync); 82403c7a42dSChuan Zheng /* 82503c7a42dSChuan Zheng * Although multifd_send_thread is not created, but main migration 82603c7a42dSChuan Zheng * thread neet to judge whether it is running, so we need to mark 82703c7a42dSChuan Zheng * its status. 82803c7a42dSChuan Zheng */ 82903c7a42dSChuan Zheng p->quit = true; 83003c7a42dSChuan Zheng object_unref(OBJECT(ioc)); 83103c7a42dSChuan Zheng error_free(err); 83203c7a42dSChuan Zheng } 83303c7a42dSChuan Zheng 834d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 835d32ca5adSJuan Quintela { 836d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 837d32ca5adSJuan Quintela QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); 838d32ca5adSJuan Quintela Error *local_err = NULL; 839d32ca5adSJuan Quintela 840d32ca5adSJuan Quintela trace_multifd_new_send_channel_async(p->id); 841d32ca5adSJuan Quintela if (qio_task_propagate_error(task, &local_err)) { 84203c7a42dSChuan Zheng goto cleanup; 843d32ca5adSJuan Quintela } else { 844d32ca5adSJuan Quintela p->c = QIO_CHANNEL(sioc); 845d32ca5adSJuan Quintela qio_channel_set_delay(p->c, false); 846d32ca5adSJuan Quintela p->running = true; 84729647140SChuan Zheng if (multifd_channel_connect(p, sioc, local_err)) { 84829647140SChuan Zheng goto cleanup; 84929647140SChuan Zheng } 85003c7a42dSChuan Zheng return; 851d32ca5adSJuan Quintela } 85203c7a42dSChuan Zheng 85303c7a42dSChuan Zheng cleanup: 85403c7a42dSChuan Zheng multifd_new_send_channel_cleanup(p, sioc, local_err); 855d32ca5adSJuan Quintela } 856d32ca5adSJuan Quintela 857d32ca5adSJuan Quintela int multifd_save_setup(Error **errp) 858d32ca5adSJuan Quintela { 859d32ca5adSJuan Quintela int thread_count; 860d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 861d32ca5adSJuan Quintela uint8_t i; 8628e5fa059SChuan Zheng MigrationState *s; 863d32ca5adSJuan Quintela 864d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 865d32ca5adSJuan Quintela return 0; 866d32ca5adSJuan Quintela } 8678e5fa059SChuan Zheng s = migrate_get_current(); 868d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 869d32ca5adSJuan Quintela multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 870d32ca5adSJuan Quintela multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 871d32ca5adSJuan Quintela multifd_send_state->pages = multifd_pages_init(page_count); 872d32ca5adSJuan Quintela qemu_sem_init(&multifd_send_state->channels_ready, 0); 873d73415a3SStefan Hajnoczi qatomic_set(&multifd_send_state->exiting, 0); 874ab7cbb0bSJuan Quintela multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 875d32ca5adSJuan Quintela 876d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 877d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 878d32ca5adSJuan Quintela 879d32ca5adSJuan Quintela qemu_mutex_init(&p->mutex); 880d32ca5adSJuan Quintela qemu_sem_init(&p->sem, 0); 881d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 882d32ca5adSJuan Quintela p->quit = false; 883d32ca5adSJuan Quintela p->pending_job = 0; 884d32ca5adSJuan Quintela p->id = i; 885d32ca5adSJuan Quintela p->pages = multifd_pages_init(page_count); 886d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 887d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 888d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 889d32ca5adSJuan Quintela p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 890d32ca5adSJuan Quintela p->packet->version = cpu_to_be32(MULTIFD_VERSION); 891d32ca5adSJuan Quintela p->name = g_strdup_printf("multifdsend_%d", i); 8928e5fa059SChuan Zheng p->tls_hostname = g_strdup(s->hostname); 893d32ca5adSJuan Quintela socket_send_channel_create(multifd_new_send_channel_async, p); 894d32ca5adSJuan Quintela } 895ab7cbb0bSJuan Quintela 896ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 897ab7cbb0bSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 898ab7cbb0bSJuan Quintela Error *local_err = NULL; 899ab7cbb0bSJuan Quintela int ret; 900ab7cbb0bSJuan Quintela 901ab7cbb0bSJuan Quintela ret = multifd_send_state->ops->send_setup(p, &local_err); 902ab7cbb0bSJuan Quintela if (ret) { 903ab7cbb0bSJuan Quintela error_propagate(errp, local_err); 904ab7cbb0bSJuan Quintela return ret; 905ab7cbb0bSJuan Quintela } 906ab7cbb0bSJuan Quintela } 907d32ca5adSJuan Quintela return 0; 908d32ca5adSJuan Quintela } 909d32ca5adSJuan Quintela 910d32ca5adSJuan Quintela struct { 911d32ca5adSJuan Quintela MultiFDRecvParams *params; 912d32ca5adSJuan Quintela /* number of created threads */ 913d32ca5adSJuan Quintela int count; 914d32ca5adSJuan Quintela /* syncs main thread and channels */ 915d32ca5adSJuan Quintela QemuSemaphore sem_sync; 916d32ca5adSJuan Quintela /* global number of generated multifd packets */ 917d32ca5adSJuan Quintela uint64_t packet_num; 918ab7cbb0bSJuan Quintela /* multifd ops */ 919ab7cbb0bSJuan Quintela MultiFDMethods *ops; 920d32ca5adSJuan Quintela } *multifd_recv_state; 921d32ca5adSJuan Quintela 922d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err) 923d32ca5adSJuan Quintela { 924d32ca5adSJuan Quintela int i; 925d32ca5adSJuan Quintela 926d32ca5adSJuan Quintela trace_multifd_recv_terminate_threads(err != NULL); 927d32ca5adSJuan Quintela 928d32ca5adSJuan Quintela if (err) { 929d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 930d32ca5adSJuan Quintela migrate_set_error(s, err); 931d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 932d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 933d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 934d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 935d32ca5adSJuan Quintela } 936d32ca5adSJuan Quintela } 937d32ca5adSJuan Quintela 938d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 939d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 940d32ca5adSJuan Quintela 941d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 942d32ca5adSJuan Quintela p->quit = true; 943d32ca5adSJuan Quintela /* 944d32ca5adSJuan Quintela * We could arrive here for two reasons: 945d32ca5adSJuan Quintela * - normal quit, i.e. everything went fine, just finished 946d32ca5adSJuan Quintela * - error quit: We close the channels so the channel threads 947d32ca5adSJuan Quintela * finish the qio_channel_read_all_eof() 948d32ca5adSJuan Quintela */ 949d32ca5adSJuan Quintela if (p->c) { 950d32ca5adSJuan Quintela qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 951d32ca5adSJuan Quintela } 952d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 953d32ca5adSJuan Quintela } 954d32ca5adSJuan Quintela } 955d32ca5adSJuan Quintela 956d32ca5adSJuan Quintela int multifd_load_cleanup(Error **errp) 957d32ca5adSJuan Quintela { 958d32ca5adSJuan Quintela int i; 959d32ca5adSJuan Quintela 960d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 961d32ca5adSJuan Quintela return 0; 962d32ca5adSJuan Quintela } 963d32ca5adSJuan Quintela multifd_recv_terminate_threads(NULL); 964d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 965d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 966d32ca5adSJuan Quintela 967d32ca5adSJuan Quintela if (p->running) { 968d32ca5adSJuan Quintela p->quit = true; 969d32ca5adSJuan Quintela /* 970d32ca5adSJuan Quintela * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 971d32ca5adSJuan Quintela * however try to wakeup it without harm in cleanup phase. 972d32ca5adSJuan Quintela */ 973d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 974d32ca5adSJuan Quintela qemu_thread_join(&p->thread); 975d32ca5adSJuan Quintela } 976d32ca5adSJuan Quintela } 977d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 978d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 979d32ca5adSJuan Quintela 980*b5eea99eSLukas Straub if (object_dynamic_cast(OBJECT(p->c), TYPE_QIO_CHANNEL_SOCKET) 981*b5eea99eSLukas Straub && OBJECT(p->c)->ref == 1) { 982*b5eea99eSLukas Straub yank_unregister_function(MIGRATION_YANK_INSTANCE, 983*b5eea99eSLukas Straub yank_generic_iochannel, 984*b5eea99eSLukas Straub QIO_CHANNEL(p->c)); 985*b5eea99eSLukas Straub } 986*b5eea99eSLukas Straub 987d32ca5adSJuan Quintela object_unref(OBJECT(p->c)); 988d32ca5adSJuan Quintela p->c = NULL; 989d32ca5adSJuan Quintela qemu_mutex_destroy(&p->mutex); 990d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem_sync); 991d32ca5adSJuan Quintela g_free(p->name); 992d32ca5adSJuan Quintela p->name = NULL; 993d32ca5adSJuan Quintela multifd_pages_clear(p->pages); 994d32ca5adSJuan Quintela p->pages = NULL; 995d32ca5adSJuan Quintela p->packet_len = 0; 996d32ca5adSJuan Quintela g_free(p->packet); 997d32ca5adSJuan Quintela p->packet = NULL; 998ab7cbb0bSJuan Quintela multifd_recv_state->ops->recv_cleanup(p); 999d32ca5adSJuan Quintela } 1000d32ca5adSJuan Quintela qemu_sem_destroy(&multifd_recv_state->sem_sync); 1001d32ca5adSJuan Quintela g_free(multifd_recv_state->params); 1002d32ca5adSJuan Quintela multifd_recv_state->params = NULL; 1003d32ca5adSJuan Quintela g_free(multifd_recv_state); 1004d32ca5adSJuan Quintela multifd_recv_state = NULL; 1005d32ca5adSJuan Quintela 1006ab7cbb0bSJuan Quintela return 0; 1007d32ca5adSJuan Quintela } 1008d32ca5adSJuan Quintela 1009d32ca5adSJuan Quintela void multifd_recv_sync_main(void) 1010d32ca5adSJuan Quintela { 1011d32ca5adSJuan Quintela int i; 1012d32ca5adSJuan Quintela 1013d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 1014d32ca5adSJuan Quintela return; 1015d32ca5adSJuan Quintela } 1016d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1017d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1018d32ca5adSJuan Quintela 1019d32ca5adSJuan Quintela trace_multifd_recv_sync_main_wait(p->id); 1020d32ca5adSJuan Quintela qemu_sem_wait(&multifd_recv_state->sem_sync); 1021d32ca5adSJuan Quintela } 1022d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1023d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1024d32ca5adSJuan Quintela 10256e8a355dSDaniel Brodsky WITH_QEMU_LOCK_GUARD(&p->mutex) { 1026d32ca5adSJuan Quintela if (multifd_recv_state->packet_num < p->packet_num) { 1027d32ca5adSJuan Quintela multifd_recv_state->packet_num = p->packet_num; 1028d32ca5adSJuan Quintela } 10296e8a355dSDaniel Brodsky } 1030d32ca5adSJuan Quintela trace_multifd_recv_sync_main_signal(p->id); 1031d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 1032d32ca5adSJuan Quintela } 1033d32ca5adSJuan Quintela trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1034d32ca5adSJuan Quintela } 1035d32ca5adSJuan Quintela 1036d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque) 1037d32ca5adSJuan Quintela { 1038d32ca5adSJuan Quintela MultiFDRecvParams *p = opaque; 1039d32ca5adSJuan Quintela Error *local_err = NULL; 1040d32ca5adSJuan Quintela int ret; 1041d32ca5adSJuan Quintela 1042d32ca5adSJuan Quintela trace_multifd_recv_thread_start(p->id); 1043d32ca5adSJuan Quintela rcu_register_thread(); 1044d32ca5adSJuan Quintela 1045d32ca5adSJuan Quintela while (true) { 1046d32ca5adSJuan Quintela uint32_t used; 1047d32ca5adSJuan Quintela uint32_t flags; 1048d32ca5adSJuan Quintela 1049d32ca5adSJuan Quintela if (p->quit) { 1050d32ca5adSJuan Quintela break; 1051d32ca5adSJuan Quintela } 1052d32ca5adSJuan Quintela 1053d32ca5adSJuan Quintela ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1054d32ca5adSJuan Quintela p->packet_len, &local_err); 1055d32ca5adSJuan Quintela if (ret == 0) { /* EOF */ 1056d32ca5adSJuan Quintela break; 1057d32ca5adSJuan Quintela } 1058d32ca5adSJuan Quintela if (ret == -1) { /* Error */ 1059d32ca5adSJuan Quintela break; 1060d32ca5adSJuan Quintela } 1061d32ca5adSJuan Quintela 1062d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1063d32ca5adSJuan Quintela ret = multifd_recv_unfill_packet(p, &local_err); 1064d32ca5adSJuan Quintela if (ret) { 1065d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1066d32ca5adSJuan Quintela break; 1067d32ca5adSJuan Quintela } 1068d32ca5adSJuan Quintela 1069d32ca5adSJuan Quintela used = p->pages->used; 1070d32ca5adSJuan Quintela flags = p->flags; 1071ab7cbb0bSJuan Quintela /* recv methods don't know how to handle the SYNC flag */ 1072ab7cbb0bSJuan Quintela p->flags &= ~MULTIFD_FLAG_SYNC; 1073d32ca5adSJuan Quintela trace_multifd_recv(p->id, p->packet_num, used, flags, 1074d32ca5adSJuan Quintela p->next_packet_size); 1075d32ca5adSJuan Quintela p->num_packets++; 1076d32ca5adSJuan Quintela p->num_pages += used; 1077d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1078d32ca5adSJuan Quintela 1079d32ca5adSJuan Quintela if (used) { 1080ab7cbb0bSJuan Quintela ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); 1081d32ca5adSJuan Quintela if (ret != 0) { 1082d32ca5adSJuan Quintela break; 1083d32ca5adSJuan Quintela } 1084d32ca5adSJuan Quintela } 1085d32ca5adSJuan Quintela 1086d32ca5adSJuan Quintela if (flags & MULTIFD_FLAG_SYNC) { 1087d32ca5adSJuan Quintela qemu_sem_post(&multifd_recv_state->sem_sync); 1088d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 1089d32ca5adSJuan Quintela } 1090d32ca5adSJuan Quintela } 1091d32ca5adSJuan Quintela 1092d32ca5adSJuan Quintela if (local_err) { 1093d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 109413f2cb21SPan Nengyuan error_free(local_err); 1095d32ca5adSJuan Quintela } 1096d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1097d32ca5adSJuan Quintela p->running = false; 1098d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1099d32ca5adSJuan Quintela 1100d32ca5adSJuan Quintela rcu_unregister_thread(); 1101d32ca5adSJuan Quintela trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); 1102d32ca5adSJuan Quintela 1103d32ca5adSJuan Quintela return NULL; 1104d32ca5adSJuan Quintela } 1105d32ca5adSJuan Quintela 1106d32ca5adSJuan Quintela int multifd_load_setup(Error **errp) 1107d32ca5adSJuan Quintela { 1108d32ca5adSJuan Quintela int thread_count; 1109d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1110d32ca5adSJuan Quintela uint8_t i; 1111d32ca5adSJuan Quintela 1112d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 1113d32ca5adSJuan Quintela return 0; 1114d32ca5adSJuan Quintela } 1115d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 1116d32ca5adSJuan Quintela multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1117d32ca5adSJuan Quintela multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1118d73415a3SStefan Hajnoczi qatomic_set(&multifd_recv_state->count, 0); 1119d32ca5adSJuan Quintela qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1120ab7cbb0bSJuan Quintela multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1121d32ca5adSJuan Quintela 1122d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 1123d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1124d32ca5adSJuan Quintela 1125d32ca5adSJuan Quintela qemu_mutex_init(&p->mutex); 1126d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 1127d32ca5adSJuan Quintela p->quit = false; 1128d32ca5adSJuan Quintela p->id = i; 1129d32ca5adSJuan Quintela p->pages = multifd_pages_init(page_count); 1130d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 1131d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 1132d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 1133d32ca5adSJuan Quintela p->name = g_strdup_printf("multifdrecv_%d", i); 1134d32ca5adSJuan Quintela } 1135ab7cbb0bSJuan Quintela 1136ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 1137ab7cbb0bSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1138ab7cbb0bSJuan Quintela Error *local_err = NULL; 1139ab7cbb0bSJuan Quintela int ret; 1140ab7cbb0bSJuan Quintela 1141ab7cbb0bSJuan Quintela ret = multifd_recv_state->ops->recv_setup(p, &local_err); 1142ab7cbb0bSJuan Quintela if (ret) { 1143ab7cbb0bSJuan Quintela error_propagate(errp, local_err); 1144ab7cbb0bSJuan Quintela return ret; 1145ab7cbb0bSJuan Quintela } 1146ab7cbb0bSJuan Quintela } 1147d32ca5adSJuan Quintela return 0; 1148d32ca5adSJuan Quintela } 1149d32ca5adSJuan Quintela 1150d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void) 1151d32ca5adSJuan Quintela { 1152d32ca5adSJuan Quintela int thread_count = migrate_multifd_channels(); 1153d32ca5adSJuan Quintela 1154d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 1155d32ca5adSJuan Quintela return true; 1156d32ca5adSJuan Quintela } 1157d32ca5adSJuan Quintela 1158d73415a3SStefan Hajnoczi return thread_count == qatomic_read(&multifd_recv_state->count); 1159d32ca5adSJuan Quintela } 1160d32ca5adSJuan Quintela 1161d32ca5adSJuan Quintela /* 1162d32ca5adSJuan Quintela * Try to receive all multifd channels to get ready for the migration. 11633a4452d8Szhaolichang * - Return true and do not set @errp when correctly receiving all channels; 1164d32ca5adSJuan Quintela * - Return false and do not set @errp when correctly receiving the current one; 1165d32ca5adSJuan Quintela * - Return false and set @errp when failing to receive the current channel. 1166d32ca5adSJuan Quintela */ 1167d32ca5adSJuan Quintela bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1168d32ca5adSJuan Quintela { 1169d32ca5adSJuan Quintela MultiFDRecvParams *p; 1170d32ca5adSJuan Quintela Error *local_err = NULL; 1171d32ca5adSJuan Quintela int id; 1172d32ca5adSJuan Quintela 1173d32ca5adSJuan Quintela id = multifd_recv_initial_packet(ioc, &local_err); 1174d32ca5adSJuan Quintela if (id < 0) { 1175d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1176d32ca5adSJuan Quintela error_propagate_prepend(errp, local_err, 1177d32ca5adSJuan Quintela "failed to receive packet" 1178d32ca5adSJuan Quintela " via multifd channel %d: ", 1179d73415a3SStefan Hajnoczi qatomic_read(&multifd_recv_state->count)); 1180d32ca5adSJuan Quintela return false; 1181d32ca5adSJuan Quintela } 1182d32ca5adSJuan Quintela trace_multifd_recv_new_channel(id); 1183d32ca5adSJuan Quintela 1184d32ca5adSJuan Quintela p = &multifd_recv_state->params[id]; 1185d32ca5adSJuan Quintela if (p->c != NULL) { 1186d32ca5adSJuan Quintela error_setg(&local_err, "multifd: received id '%d' already setup'", 1187d32ca5adSJuan Quintela id); 1188d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1189d32ca5adSJuan Quintela error_propagate(errp, local_err); 1190d32ca5adSJuan Quintela return false; 1191d32ca5adSJuan Quintela } 1192d32ca5adSJuan Quintela p->c = ioc; 1193d32ca5adSJuan Quintela object_ref(OBJECT(ioc)); 1194d32ca5adSJuan Quintela /* initial packet */ 1195d32ca5adSJuan Quintela p->num_packets = 1; 1196d32ca5adSJuan Quintela 1197d32ca5adSJuan Quintela p->running = true; 1198d32ca5adSJuan Quintela qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1199d32ca5adSJuan Quintela QEMU_THREAD_JOINABLE); 1200d73415a3SStefan Hajnoczi qatomic_inc(&multifd_recv_state->count); 1201d73415a3SStefan Hajnoczi return qatomic_read(&multifd_recv_state->count) == 1202d32ca5adSJuan Quintela migrate_multifd_channels(); 1203d32ca5adSJuan Quintela } 1204