1d32ca5adSJuan Quintela /* 2d32ca5adSJuan Quintela * Multifd common code 3d32ca5adSJuan Quintela * 4d32ca5adSJuan Quintela * Copyright (c) 2019-2020 Red Hat Inc 5d32ca5adSJuan Quintela * 6d32ca5adSJuan Quintela * Authors: 7d32ca5adSJuan Quintela * Juan Quintela <quintela@redhat.com> 8d32ca5adSJuan Quintela * 9d32ca5adSJuan Quintela * This work is licensed under the terms of the GNU GPL, version 2 or later. 10d32ca5adSJuan Quintela * See the COPYING file in the top-level directory. 11d32ca5adSJuan Quintela */ 12d32ca5adSJuan Quintela 13d32ca5adSJuan Quintela #include "qemu/osdep.h" 14d32ca5adSJuan Quintela #include "qemu/rcu.h" 15d32ca5adSJuan Quintela #include "exec/target_page.h" 16d32ca5adSJuan Quintela #include "sysemu/sysemu.h" 17d32ca5adSJuan Quintela #include "exec/ramblock.h" 18d32ca5adSJuan Quintela #include "qemu/error-report.h" 19d32ca5adSJuan Quintela #include "qapi/error.h" 20d32ca5adSJuan Quintela #include "ram.h" 21d32ca5adSJuan Quintela #include "migration.h" 22d32ca5adSJuan Quintela #include "socket.h" 2329647140SChuan Zheng #include "tls.h" 24d32ca5adSJuan Quintela #include "qemu-file.h" 25d32ca5adSJuan Quintela #include "trace.h" 26d32ca5adSJuan Quintela #include "multifd.h" 27d32ca5adSJuan Quintela 28b5eea99eSLukas Straub #include "qemu/yank.h" 29b5eea99eSLukas Straub #include "io/channel-socket.h" 301a92d6d5SLukas Straub #include "yank_functions.h" 31b5eea99eSLukas Straub 32d32ca5adSJuan Quintela /* Multiple fd's */ 33d32ca5adSJuan Quintela 34d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U 35d32ca5adSJuan Quintela #define MULTIFD_VERSION 1 36d32ca5adSJuan Quintela 37d32ca5adSJuan Quintela typedef struct { 38d32ca5adSJuan Quintela uint32_t magic; 39d32ca5adSJuan Quintela uint32_t version; 40d32ca5adSJuan Quintela unsigned char uuid[16]; /* QemuUUID */ 41d32ca5adSJuan Quintela uint8_t id; 42d32ca5adSJuan Quintela uint8_t unused1[7]; /* Reserved for future use */ 43d32ca5adSJuan Quintela uint64_t unused2[4]; /* Reserved for future use */ 44d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t; 45d32ca5adSJuan Quintela 46ab7cbb0bSJuan Quintela /* Multifd without compression */ 47ab7cbb0bSJuan Quintela 48ab7cbb0bSJuan Quintela /** 49ab7cbb0bSJuan Quintela * nocomp_send_setup: setup send side 50ab7cbb0bSJuan Quintela * 51ab7cbb0bSJuan Quintela * For no compression this function does nothing. 52ab7cbb0bSJuan Quintela * 53ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 54ab7cbb0bSJuan Quintela * 55ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 56ab7cbb0bSJuan Quintela * @errp: pointer to an error 57ab7cbb0bSJuan Quintela */ 58ab7cbb0bSJuan Quintela static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 59ab7cbb0bSJuan Quintela { 60ab7cbb0bSJuan Quintela return 0; 61ab7cbb0bSJuan Quintela } 62ab7cbb0bSJuan Quintela 63ab7cbb0bSJuan Quintela /** 64ab7cbb0bSJuan Quintela * nocomp_send_cleanup: cleanup send side 65ab7cbb0bSJuan Quintela * 66ab7cbb0bSJuan Quintela * For no compression this function does nothing. 67ab7cbb0bSJuan Quintela * 68ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 6918ede636SJuan Quintela * @errp: pointer to an error 70ab7cbb0bSJuan Quintela */ 71ab7cbb0bSJuan Quintela static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 72ab7cbb0bSJuan Quintela { 73ab7cbb0bSJuan Quintela return; 74ab7cbb0bSJuan Quintela } 75ab7cbb0bSJuan Quintela 76ab7cbb0bSJuan Quintela /** 77ab7cbb0bSJuan Quintela * nocomp_send_prepare: prepare date to be able to send 78ab7cbb0bSJuan Quintela * 79ab7cbb0bSJuan Quintela * For no compression we just have to calculate the size of the 80ab7cbb0bSJuan Quintela * packet. 81ab7cbb0bSJuan Quintela * 82ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 83ab7cbb0bSJuan Quintela * 84ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 85ab7cbb0bSJuan Quintela * @errp: pointer to an error 86ab7cbb0bSJuan Quintela */ 8702fb8104SJuan Quintela static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 88ab7cbb0bSJuan Quintela { 89226468baSJuan Quintela MultiFDPages_t *pages = p->pages; 90226468baSJuan Quintela 91815956f0SJuan Quintela for (int i = 0; i < p->normal_num; i++) { 92815956f0SJuan Quintela p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i]; 93ddec20f8SJuan Quintela p->iov[p->iovs_num].iov_len = p->page_size; 94226468baSJuan Quintela p->iovs_num++; 95226468baSJuan Quintela } 96226468baSJuan Quintela 97ddec20f8SJuan Quintela p->next_packet_size = p->normal_num * p->page_size; 98ab7cbb0bSJuan Quintela p->flags |= MULTIFD_FLAG_NOCOMP; 99ab7cbb0bSJuan Quintela return 0; 100ab7cbb0bSJuan Quintela } 101ab7cbb0bSJuan Quintela 102ab7cbb0bSJuan Quintela /** 103ab7cbb0bSJuan Quintela * nocomp_recv_setup: setup receive side 104ab7cbb0bSJuan Quintela * 105ab7cbb0bSJuan Quintela * For no compression this function does nothing. 106ab7cbb0bSJuan Quintela * 107ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 108ab7cbb0bSJuan Quintela * 109ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 110ab7cbb0bSJuan Quintela * @errp: pointer to an error 111ab7cbb0bSJuan Quintela */ 112ab7cbb0bSJuan Quintela static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 113ab7cbb0bSJuan Quintela { 114ab7cbb0bSJuan Quintela return 0; 115ab7cbb0bSJuan Quintela } 116ab7cbb0bSJuan Quintela 117ab7cbb0bSJuan Quintela /** 118ab7cbb0bSJuan Quintela * nocomp_recv_cleanup: setup receive side 119ab7cbb0bSJuan Quintela * 120ab7cbb0bSJuan Quintela * For no compression this function does nothing. 121ab7cbb0bSJuan Quintela * 122ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 123ab7cbb0bSJuan Quintela */ 124ab7cbb0bSJuan Quintela static void nocomp_recv_cleanup(MultiFDRecvParams *p) 125ab7cbb0bSJuan Quintela { 126ab7cbb0bSJuan Quintela } 127ab7cbb0bSJuan Quintela 128ab7cbb0bSJuan Quintela /** 129ab7cbb0bSJuan Quintela * nocomp_recv_pages: read the data from the channel into actual pages 130ab7cbb0bSJuan Quintela * 131ab7cbb0bSJuan Quintela * For no compression we just need to read things into the correct place. 132ab7cbb0bSJuan Quintela * 133ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 134ab7cbb0bSJuan Quintela * 135ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 136ab7cbb0bSJuan Quintela * @errp: pointer to an error 137ab7cbb0bSJuan Quintela */ 13840a4bfe9SJuan Quintela static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) 139ab7cbb0bSJuan Quintela { 140ab7cbb0bSJuan Quintela uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 141ab7cbb0bSJuan Quintela 142ab7cbb0bSJuan Quintela if (flags != MULTIFD_FLAG_NOCOMP) { 14304e11404SJuan Quintela error_setg(errp, "multifd %u: flags received %x flags expected %x", 144ab7cbb0bSJuan Quintela p->id, flags, MULTIFD_FLAG_NOCOMP); 145ab7cbb0bSJuan Quintela return -1; 146ab7cbb0bSJuan Quintela } 147cf2d4aa8SJuan Quintela for (int i = 0; i < p->normal_num; i++) { 148faf60935SJuan Quintela p->iov[i].iov_base = p->host + p->normal[i]; 149ddec20f8SJuan Quintela p->iov[i].iov_len = p->page_size; 150226468baSJuan Quintela } 151cf2d4aa8SJuan Quintela return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 152ab7cbb0bSJuan Quintela } 153ab7cbb0bSJuan Quintela 154ab7cbb0bSJuan Quintela static MultiFDMethods multifd_nocomp_ops = { 155ab7cbb0bSJuan Quintela .send_setup = nocomp_send_setup, 156ab7cbb0bSJuan Quintela .send_cleanup = nocomp_send_cleanup, 157ab7cbb0bSJuan Quintela .send_prepare = nocomp_send_prepare, 158ab7cbb0bSJuan Quintela .recv_setup = nocomp_recv_setup, 159ab7cbb0bSJuan Quintela .recv_cleanup = nocomp_recv_cleanup, 160ab7cbb0bSJuan Quintela .recv_pages = nocomp_recv_pages 161ab7cbb0bSJuan Quintela }; 162ab7cbb0bSJuan Quintela 163ab7cbb0bSJuan Quintela static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 164ab7cbb0bSJuan Quintela [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 165ab7cbb0bSJuan Quintela }; 166ab7cbb0bSJuan Quintela 1677ec2c2b3SJuan Quintela void multifd_register_ops(int method, MultiFDMethods *ops) 1687ec2c2b3SJuan Quintela { 1697ec2c2b3SJuan Quintela assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 1707ec2c2b3SJuan Quintela multifd_ops[method] = ops; 1717ec2c2b3SJuan Quintela } 1727ec2c2b3SJuan Quintela 173d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 174d32ca5adSJuan Quintela { 175d32ca5adSJuan Quintela MultiFDInit_t msg = {}; 176d32ca5adSJuan Quintela int ret; 177d32ca5adSJuan Quintela 178d32ca5adSJuan Quintela msg.magic = cpu_to_be32(MULTIFD_MAGIC); 179d32ca5adSJuan Quintela msg.version = cpu_to_be32(MULTIFD_VERSION); 180d32ca5adSJuan Quintela msg.id = p->id; 181d32ca5adSJuan Quintela memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 182d32ca5adSJuan Quintela 183d32ca5adSJuan Quintela ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); 184d32ca5adSJuan Quintela if (ret != 0) { 185d32ca5adSJuan Quintela return -1; 186d32ca5adSJuan Quintela } 187d32ca5adSJuan Quintela return 0; 188d32ca5adSJuan Quintela } 189d32ca5adSJuan Quintela 190d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 191d32ca5adSJuan Quintela { 192d32ca5adSJuan Quintela MultiFDInit_t msg; 193d32ca5adSJuan Quintela int ret; 194d32ca5adSJuan Quintela 195d32ca5adSJuan Quintela ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 196d32ca5adSJuan Quintela if (ret != 0) { 197d32ca5adSJuan Quintela return -1; 198d32ca5adSJuan Quintela } 199d32ca5adSJuan Quintela 200d32ca5adSJuan Quintela msg.magic = be32_to_cpu(msg.magic); 201d32ca5adSJuan Quintela msg.version = be32_to_cpu(msg.version); 202d32ca5adSJuan Quintela 203d32ca5adSJuan Quintela if (msg.magic != MULTIFD_MAGIC) { 204d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet magic %x " 205d32ca5adSJuan Quintela "expected %x", msg.magic, MULTIFD_MAGIC); 206d32ca5adSJuan Quintela return -1; 207d32ca5adSJuan Quintela } 208d32ca5adSJuan Quintela 209d32ca5adSJuan Quintela if (msg.version != MULTIFD_VERSION) { 21004e11404SJuan Quintela error_setg(errp, "multifd: received packet version %u " 21104e11404SJuan Quintela "expected %u", msg.version, MULTIFD_VERSION); 212d32ca5adSJuan Quintela return -1; 213d32ca5adSJuan Quintela } 214d32ca5adSJuan Quintela 215d32ca5adSJuan Quintela if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 216d32ca5adSJuan Quintela char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 217d32ca5adSJuan Quintela char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 218d32ca5adSJuan Quintela 219d32ca5adSJuan Quintela error_setg(errp, "multifd: received uuid '%s' and expected " 220d32ca5adSJuan Quintela "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 221d32ca5adSJuan Quintela g_free(uuid); 222d32ca5adSJuan Quintela g_free(msg_uuid); 223d32ca5adSJuan Quintela return -1; 224d32ca5adSJuan Quintela } 225d32ca5adSJuan Quintela 226d32ca5adSJuan Quintela if (msg.id > migrate_multifd_channels()) { 22704e11404SJuan Quintela error_setg(errp, "multifd: received channel version %u " 22804e11404SJuan Quintela "expected %u", msg.version, MULTIFD_VERSION); 229d32ca5adSJuan Quintela return -1; 230d32ca5adSJuan Quintela } 231d32ca5adSJuan Quintela 232d32ca5adSJuan Quintela return msg.id; 233d32ca5adSJuan Quintela } 234d32ca5adSJuan Quintela 235d32ca5adSJuan Quintela static MultiFDPages_t *multifd_pages_init(size_t size) 236d32ca5adSJuan Quintela { 237d32ca5adSJuan Quintela MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 238d32ca5adSJuan Quintela 239d32ca5adSJuan Quintela pages->allocated = size; 240d32ca5adSJuan Quintela pages->offset = g_new0(ram_addr_t, size); 241d32ca5adSJuan Quintela 242d32ca5adSJuan Quintela return pages; 243d32ca5adSJuan Quintela } 244d32ca5adSJuan Quintela 245d32ca5adSJuan Quintela static void multifd_pages_clear(MultiFDPages_t *pages) 246d32ca5adSJuan Quintela { 24790a3d2f9SJuan Quintela pages->num = 0; 248d32ca5adSJuan Quintela pages->allocated = 0; 249d32ca5adSJuan Quintela pages->packet_num = 0; 250d32ca5adSJuan Quintela pages->block = NULL; 251d32ca5adSJuan Quintela g_free(pages->offset); 252d32ca5adSJuan Quintela pages->offset = NULL; 253d32ca5adSJuan Quintela g_free(pages); 254d32ca5adSJuan Quintela } 255d32ca5adSJuan Quintela 256d32ca5adSJuan Quintela static void multifd_send_fill_packet(MultiFDSendParams *p) 257d32ca5adSJuan Quintela { 258d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 259d32ca5adSJuan Quintela int i; 260d32ca5adSJuan Quintela 261d32ca5adSJuan Quintela packet->flags = cpu_to_be32(p->flags); 262d32ca5adSJuan Quintela packet->pages_alloc = cpu_to_be32(p->pages->allocated); 2638c0ec0b2SJuan Quintela packet->normal_pages = cpu_to_be32(p->normal_num); 264d32ca5adSJuan Quintela packet->next_packet_size = cpu_to_be32(p->next_packet_size); 265d32ca5adSJuan Quintela packet->packet_num = cpu_to_be64(p->packet_num); 266d32ca5adSJuan Quintela 267d32ca5adSJuan Quintela if (p->pages->block) { 268d32ca5adSJuan Quintela strncpy(packet->ramblock, p->pages->block->idstr, 256); 269d32ca5adSJuan Quintela } 270d32ca5adSJuan Quintela 271815956f0SJuan Quintela for (i = 0; i < p->normal_num; i++) { 272d32ca5adSJuan Quintela /* there are architectures where ram_addr_t is 32 bit */ 273815956f0SJuan Quintela uint64_t temp = p->normal[i]; 274d32ca5adSJuan Quintela 275d32ca5adSJuan Quintela packet->offset[i] = cpu_to_be64(temp); 276d32ca5adSJuan Quintela } 277d32ca5adSJuan Quintela } 278d32ca5adSJuan Quintela 279d32ca5adSJuan Quintela static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 280d32ca5adSJuan Quintela { 281d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 282d32ca5adSJuan Quintela RAMBlock *block; 283d32ca5adSJuan Quintela int i; 284d32ca5adSJuan Quintela 285d32ca5adSJuan Quintela packet->magic = be32_to_cpu(packet->magic); 286d32ca5adSJuan Quintela if (packet->magic != MULTIFD_MAGIC) { 287d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 288d32ca5adSJuan Quintela "magic %x and expected magic %x", 289d32ca5adSJuan Quintela packet->magic, MULTIFD_MAGIC); 290d32ca5adSJuan Quintela return -1; 291d32ca5adSJuan Quintela } 292d32ca5adSJuan Quintela 293d32ca5adSJuan Quintela packet->version = be32_to_cpu(packet->version); 294d32ca5adSJuan Quintela if (packet->version != MULTIFD_VERSION) { 295d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 29604e11404SJuan Quintela "version %u and expected version %u", 297d32ca5adSJuan Quintela packet->version, MULTIFD_VERSION); 298d32ca5adSJuan Quintela return -1; 299d32ca5adSJuan Quintela } 300d32ca5adSJuan Quintela 301d32ca5adSJuan Quintela p->flags = be32_to_cpu(packet->flags); 302d32ca5adSJuan Quintela 303d32ca5adSJuan Quintela packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 304d32ca5adSJuan Quintela /* 305d32ca5adSJuan Quintela * If we received a packet that is 100 times bigger than expected 306d32ca5adSJuan Quintela * just stop migration. It is a magic number. 307d32ca5adSJuan Quintela */ 308d6f45ebaSJuan Quintela if (packet->pages_alloc > p->page_count) { 309d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 310cf2d4aa8SJuan Quintela "with size %u and expected a size of %u", 311d6f45ebaSJuan Quintela packet->pages_alloc, p->page_count) ; 312d32ca5adSJuan Quintela return -1; 313d32ca5adSJuan Quintela } 314d32ca5adSJuan Quintela 3158c0ec0b2SJuan Quintela p->normal_num = be32_to_cpu(packet->normal_pages); 316cf2d4aa8SJuan Quintela if (p->normal_num > packet->pages_alloc) { 317d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 31804e11404SJuan Quintela "with %u pages and expected maximum pages are %u", 319cf2d4aa8SJuan Quintela p->normal_num, packet->pages_alloc) ; 320d32ca5adSJuan Quintela return -1; 321d32ca5adSJuan Quintela } 322d32ca5adSJuan Quintela 323d32ca5adSJuan Quintela p->next_packet_size = be32_to_cpu(packet->next_packet_size); 324d32ca5adSJuan Quintela p->packet_num = be64_to_cpu(packet->packet_num); 325d32ca5adSJuan Quintela 326cf2d4aa8SJuan Quintela if (p->normal_num == 0) { 327d32ca5adSJuan Quintela return 0; 328d32ca5adSJuan Quintela } 329d32ca5adSJuan Quintela 330d32ca5adSJuan Quintela /* make sure that ramblock is 0 terminated */ 331d32ca5adSJuan Quintela packet->ramblock[255] = 0; 332d32ca5adSJuan Quintela block = qemu_ram_block_by_name(packet->ramblock); 333d32ca5adSJuan Quintela if (!block) { 334d32ca5adSJuan Quintela error_setg(errp, "multifd: unknown ram block %s", 335d32ca5adSJuan Quintela packet->ramblock); 336d32ca5adSJuan Quintela return -1; 337d32ca5adSJuan Quintela } 338d32ca5adSJuan Quintela 339faf60935SJuan Quintela p->host = block->host; 340cf2d4aa8SJuan Quintela for (i = 0; i < p->normal_num; i++) { 341d32ca5adSJuan Quintela uint64_t offset = be64_to_cpu(packet->offset[i]); 342d32ca5adSJuan Quintela 343ddec20f8SJuan Quintela if (offset > (block->used_length - p->page_size)) { 344d32ca5adSJuan Quintela error_setg(errp, "multifd: offset too long %" PRIu64 345d32ca5adSJuan Quintela " (max " RAM_ADDR_FMT ")", 346c1668bdeSDavid Hildenbrand offset, block->used_length); 347d32ca5adSJuan Quintela return -1; 348d32ca5adSJuan Quintela } 349cf2d4aa8SJuan Quintela p->normal[i] = offset; 350d32ca5adSJuan Quintela } 351d32ca5adSJuan Quintela 352d32ca5adSJuan Quintela return 0; 353d32ca5adSJuan Quintela } 354d32ca5adSJuan Quintela 355d32ca5adSJuan Quintela struct { 356d32ca5adSJuan Quintela MultiFDSendParams *params; 357d32ca5adSJuan Quintela /* array of pages to sent */ 358d32ca5adSJuan Quintela MultiFDPages_t *pages; 359d32ca5adSJuan Quintela /* global number of generated multifd packets */ 360d32ca5adSJuan Quintela uint64_t packet_num; 361d32ca5adSJuan Quintela /* send channels ready */ 362d32ca5adSJuan Quintela QemuSemaphore channels_ready; 363d32ca5adSJuan Quintela /* 364d32ca5adSJuan Quintela * Have we already run terminate threads. There is a race when it 365d32ca5adSJuan Quintela * happens that we got one error while we are exiting. 366d32ca5adSJuan Quintela * We will use atomic operations. Only valid values are 0 and 1. 367d32ca5adSJuan Quintela */ 368d32ca5adSJuan Quintela int exiting; 369ab7cbb0bSJuan Quintela /* multifd ops */ 370ab7cbb0bSJuan Quintela MultiFDMethods *ops; 371d32ca5adSJuan Quintela } *multifd_send_state; 372d32ca5adSJuan Quintela 373d32ca5adSJuan Quintela /* 374d32ca5adSJuan Quintela * How we use multifd_send_state->pages and channel->pages? 375d32ca5adSJuan Quintela * 376d32ca5adSJuan Quintela * We create a pages for each channel, and a main one. Each time that 377d32ca5adSJuan Quintela * we need to send a batch of pages we interchange the ones between 378d32ca5adSJuan Quintela * multifd_send_state and the channel that is sending it. There are 379d32ca5adSJuan Quintela * two reasons for that: 380d32ca5adSJuan Quintela * - to not have to do so many mallocs during migration 381d32ca5adSJuan Quintela * - to make easier to know what to free at the end of migration 382d32ca5adSJuan Quintela * 383d32ca5adSJuan Quintela * This way we always know who is the owner of each "pages" struct, 384d32ca5adSJuan Quintela * and we don't need any locking. It belongs to the migration thread 385d32ca5adSJuan Quintela * or to the channel thread. Switching is safe because the migration 386d32ca5adSJuan Quintela * thread is using the channel mutex when changing it, and the channel 387d32ca5adSJuan Quintela * have to had finish with its own, otherwise pending_job can't be 388d32ca5adSJuan Quintela * false. 389d32ca5adSJuan Quintela */ 390d32ca5adSJuan Quintela 391d32ca5adSJuan Quintela static int multifd_send_pages(QEMUFile *f) 392d32ca5adSJuan Quintela { 393d32ca5adSJuan Quintela int i; 394d32ca5adSJuan Quintela static int next_channel; 395d32ca5adSJuan Quintela MultiFDSendParams *p = NULL; /* make happy gcc */ 396d32ca5adSJuan Quintela MultiFDPages_t *pages = multifd_send_state->pages; 397d32ca5adSJuan Quintela uint64_t transferred; 398d32ca5adSJuan Quintela 399d73415a3SStefan Hajnoczi if (qatomic_read(&multifd_send_state->exiting)) { 400d32ca5adSJuan Quintela return -1; 401d32ca5adSJuan Quintela } 402d32ca5adSJuan Quintela 403d32ca5adSJuan Quintela qemu_sem_wait(&multifd_send_state->channels_ready); 4047e89a140SLaurent Vivier /* 4057e89a140SLaurent Vivier * next_channel can remain from a previous migration that was 4067e89a140SLaurent Vivier * using more channels, so ensure it doesn't overflow if the 4077e89a140SLaurent Vivier * limit is lower now. 4087e89a140SLaurent Vivier */ 4097e89a140SLaurent Vivier next_channel %= migrate_multifd_channels(); 410d32ca5adSJuan Quintela for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 411d32ca5adSJuan Quintela p = &multifd_send_state->params[i]; 412d32ca5adSJuan Quintela 413d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 414d32ca5adSJuan Quintela if (p->quit) { 415d32ca5adSJuan Quintela error_report("%s: channel %d has already quit!", __func__, i); 416d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 417d32ca5adSJuan Quintela return -1; 418d32ca5adSJuan Quintela } 419d32ca5adSJuan Quintela if (!p->pending_job) { 420d32ca5adSJuan Quintela p->pending_job++; 421d32ca5adSJuan Quintela next_channel = (i + 1) % migrate_multifd_channels(); 422d32ca5adSJuan Quintela break; 423d32ca5adSJuan Quintela } 424d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 425d32ca5adSJuan Quintela } 42690a3d2f9SJuan Quintela assert(!p->pages->num); 427d32ca5adSJuan Quintela assert(!p->pages->block); 428d32ca5adSJuan Quintela 429d32ca5adSJuan Quintela p->packet_num = multifd_send_state->packet_num++; 430d32ca5adSJuan Quintela multifd_send_state->pages = p->pages; 431d32ca5adSJuan Quintela p->pages = pages; 432ddec20f8SJuan Quintela transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; 433bc698c36SDaniel P. Berrangé qemu_file_acct_rate_limit(f, transferred); 434d32ca5adSJuan Quintela ram_counters.multifd_bytes += transferred; 43523b7576dSPeter Xu stat64_add(&ram_atomic_counters.transferred, transferred); 436d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 437d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 438d32ca5adSJuan Quintela 439d32ca5adSJuan Quintela return 1; 440d32ca5adSJuan Quintela } 441d32ca5adSJuan Quintela 442d32ca5adSJuan Quintela int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) 443d32ca5adSJuan Quintela { 444d32ca5adSJuan Quintela MultiFDPages_t *pages = multifd_send_state->pages; 445ddbe628cSZhenzhong Duan bool changed = false; 446d32ca5adSJuan Quintela 447d32ca5adSJuan Quintela if (!pages->block) { 448d32ca5adSJuan Quintela pages->block = block; 449d32ca5adSJuan Quintela } 450d32ca5adSJuan Quintela 451d32ca5adSJuan Quintela if (pages->block == block) { 45290a3d2f9SJuan Quintela pages->offset[pages->num] = offset; 45390a3d2f9SJuan Quintela pages->num++; 454d32ca5adSJuan Quintela 45590a3d2f9SJuan Quintela if (pages->num < pages->allocated) { 456d32ca5adSJuan Quintela return 1; 457d32ca5adSJuan Quintela } 458ddbe628cSZhenzhong Duan } else { 459ddbe628cSZhenzhong Duan changed = true; 460d32ca5adSJuan Quintela } 461d32ca5adSJuan Quintela 462d32ca5adSJuan Quintela if (multifd_send_pages(f) < 0) { 463d32ca5adSJuan Quintela return -1; 464d32ca5adSJuan Quintela } 465d32ca5adSJuan Quintela 466ddbe628cSZhenzhong Duan if (changed) { 467d32ca5adSJuan Quintela return multifd_queue_page(f, block, offset); 468d32ca5adSJuan Quintela } 469d32ca5adSJuan Quintela 470d32ca5adSJuan Quintela return 1; 471d32ca5adSJuan Quintela } 472d32ca5adSJuan Quintela 473d32ca5adSJuan Quintela static void multifd_send_terminate_threads(Error *err) 474d32ca5adSJuan Quintela { 475d32ca5adSJuan Quintela int i; 476d32ca5adSJuan Quintela 477d32ca5adSJuan Quintela trace_multifd_send_terminate_threads(err != NULL); 478d32ca5adSJuan Quintela 479d32ca5adSJuan Quintela if (err) { 480d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 481d32ca5adSJuan Quintela migrate_set_error(s, err); 482d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 483d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 484d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_DEVICE || 485d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 486d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 487d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 488d32ca5adSJuan Quintela } 489d32ca5adSJuan Quintela } 490d32ca5adSJuan Quintela 491d32ca5adSJuan Quintela /* 492d32ca5adSJuan Quintela * We don't want to exit each threads twice. Depending on where 493d32ca5adSJuan Quintela * we get the error, or if there are two independent errors in two 494d32ca5adSJuan Quintela * threads at the same time, we can end calling this function 495d32ca5adSJuan Quintela * twice. 496d32ca5adSJuan Quintela */ 497d73415a3SStefan Hajnoczi if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 498d32ca5adSJuan Quintela return; 499d32ca5adSJuan Quintela } 500d32ca5adSJuan Quintela 501d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 502d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 503d32ca5adSJuan Quintela 504d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 505d32ca5adSJuan Quintela p->quit = true; 506d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 507077fbb59SLi Zhang if (p->c) { 508077fbb59SLi Zhang qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 509077fbb59SLi Zhang } 510d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 511d32ca5adSJuan Quintela } 512d32ca5adSJuan Quintela } 513d32ca5adSJuan Quintela 514d32ca5adSJuan Quintela void multifd_save_cleanup(void) 515d32ca5adSJuan Quintela { 516d32ca5adSJuan Quintela int i; 517d32ca5adSJuan Quintela 518f444eedaSPeter Xu if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { 519d32ca5adSJuan Quintela return; 520d32ca5adSJuan Quintela } 521d32ca5adSJuan Quintela multifd_send_terminate_threads(NULL); 522d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 523d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 524d32ca5adSJuan Quintela 525d32ca5adSJuan Quintela if (p->running) { 526d32ca5adSJuan Quintela qemu_thread_join(&p->thread); 527d32ca5adSJuan Quintela } 528d32ca5adSJuan Quintela } 529d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 530d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 531ab7cbb0bSJuan Quintela Error *local_err = NULL; 532d32ca5adSJuan Quintela 53320171ea8SLukas Straub if (p->registered_yank) { 53420171ea8SLukas Straub migration_ioc_unregister_yank(p->c); 53520171ea8SLukas Straub } 536d32ca5adSJuan Quintela socket_send_channel_destroy(p->c); 537d32ca5adSJuan Quintela p->c = NULL; 538d32ca5adSJuan Quintela qemu_mutex_destroy(&p->mutex); 539d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem); 540d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem_sync); 541d32ca5adSJuan Quintela g_free(p->name); 542d32ca5adSJuan Quintela p->name = NULL; 543d32ca5adSJuan Quintela multifd_pages_clear(p->pages); 544d32ca5adSJuan Quintela p->pages = NULL; 545d32ca5adSJuan Quintela p->packet_len = 0; 546d32ca5adSJuan Quintela g_free(p->packet); 547d32ca5adSJuan Quintela p->packet = NULL; 548226468baSJuan Quintela g_free(p->iov); 549226468baSJuan Quintela p->iov = NULL; 550815956f0SJuan Quintela g_free(p->normal); 551815956f0SJuan Quintela p->normal = NULL; 552ab7cbb0bSJuan Quintela multifd_send_state->ops->send_cleanup(p, &local_err); 553ab7cbb0bSJuan Quintela if (local_err) { 554ab7cbb0bSJuan Quintela migrate_set_error(migrate_get_current(), local_err); 55513f2cb21SPan Nengyuan error_free(local_err); 556ab7cbb0bSJuan Quintela } 557d32ca5adSJuan Quintela } 558d32ca5adSJuan Quintela qemu_sem_destroy(&multifd_send_state->channels_ready); 559d32ca5adSJuan Quintela g_free(multifd_send_state->params); 560d32ca5adSJuan Quintela multifd_send_state->params = NULL; 561d32ca5adSJuan Quintela multifd_pages_clear(multifd_send_state->pages); 562d32ca5adSJuan Quintela multifd_send_state->pages = NULL; 563d32ca5adSJuan Quintela g_free(multifd_send_state); 564d32ca5adSJuan Quintela multifd_send_state = NULL; 565d32ca5adSJuan Quintela } 566d32ca5adSJuan Quintela 5674cc47b43SLeonardo Bras static int multifd_zero_copy_flush(QIOChannel *c) 5684cc47b43SLeonardo Bras { 5694cc47b43SLeonardo Bras int ret; 5704cc47b43SLeonardo Bras Error *err = NULL; 5714cc47b43SLeonardo Bras 5724cc47b43SLeonardo Bras ret = qio_channel_flush(c, &err); 5734cc47b43SLeonardo Bras if (ret < 0) { 5744cc47b43SLeonardo Bras error_report_err(err); 5754cc47b43SLeonardo Bras return -1; 5764cc47b43SLeonardo Bras } 5774cc47b43SLeonardo Bras if (ret == 1) { 5784cc47b43SLeonardo Bras dirty_sync_missed_zero_copy(); 5794cc47b43SLeonardo Bras } 5804cc47b43SLeonardo Bras 5814cc47b43SLeonardo Bras return ret; 5824cc47b43SLeonardo Bras } 5834cc47b43SLeonardo Bras 58433d70973SLeonardo Bras int multifd_send_sync_main(QEMUFile *f) 585d32ca5adSJuan Quintela { 586d32ca5adSJuan Quintela int i; 5875b1d9babSLeonardo Bras bool flush_zero_copy; 588d32ca5adSJuan Quintela 589d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 59033d70973SLeonardo Bras return 0; 591d32ca5adSJuan Quintela } 59290a3d2f9SJuan Quintela if (multifd_send_state->pages->num) { 593d32ca5adSJuan Quintela if (multifd_send_pages(f) < 0) { 594d32ca5adSJuan Quintela error_report("%s: multifd_send_pages fail", __func__); 59533d70973SLeonardo Bras return -1; 596d32ca5adSJuan Quintela } 597d32ca5adSJuan Quintela } 5985b1d9babSLeonardo Bras 5995b1d9babSLeonardo Bras /* 6005b1d9babSLeonardo Bras * When using zero-copy, it's necessary to flush the pages before any of 6015b1d9babSLeonardo Bras * the pages can be sent again, so we'll make sure the new version of the 6025b1d9babSLeonardo Bras * pages will always arrive _later_ than the old pages. 6035b1d9babSLeonardo Bras * 6045b1d9babSLeonardo Bras * Currently we achieve this by flushing the zero-page requested writes 6055b1d9babSLeonardo Bras * per ram iteration, but in the future we could potentially optimize it 6065b1d9babSLeonardo Bras * to be less frequent, e.g. only after we finished one whole scanning of 6075b1d9babSLeonardo Bras * all the dirty bitmaps. 6085b1d9babSLeonardo Bras */ 6095b1d9babSLeonardo Bras 6105b1d9babSLeonardo Bras flush_zero_copy = migrate_use_zero_copy_send(); 6115b1d9babSLeonardo Bras 612d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 613d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 614d32ca5adSJuan Quintela 615d32ca5adSJuan Quintela trace_multifd_send_sync_main_signal(p->id); 616d32ca5adSJuan Quintela 617d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 618d32ca5adSJuan Quintela 619d32ca5adSJuan Quintela if (p->quit) { 620d32ca5adSJuan Quintela error_report("%s: channel %d has already quit", __func__, i); 621d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 62233d70973SLeonardo Bras return -1; 623d32ca5adSJuan Quintela } 624d32ca5adSJuan Quintela 625d32ca5adSJuan Quintela p->packet_num = multifd_send_state->packet_num++; 626d32ca5adSJuan Quintela p->flags |= MULTIFD_FLAG_SYNC; 627d32ca5adSJuan Quintela p->pending_job++; 628bc698c36SDaniel P. Berrangé qemu_file_acct_rate_limit(f, p->packet_len); 629d32ca5adSJuan Quintela ram_counters.multifd_bytes += p->packet_len; 63023b7576dSPeter Xu stat64_add(&ram_atomic_counters.transferred, p->packet_len); 631d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 632d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 633d32ca5adSJuan Quintela } 634d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 635d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 636d32ca5adSJuan Quintela 637d32ca5adSJuan Quintela trace_multifd_send_sync_main_wait(p->id); 638d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 639*ebfc5787SZhenzhong Duan 640*ebfc5787SZhenzhong Duan if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 641*ebfc5787SZhenzhong Duan return -1; 642*ebfc5787SZhenzhong Duan } 643d32ca5adSJuan Quintela } 644d32ca5adSJuan Quintela trace_multifd_send_sync_main(multifd_send_state->packet_num); 64533d70973SLeonardo Bras 64633d70973SLeonardo Bras return 0; 647d32ca5adSJuan Quintela } 648d32ca5adSJuan Quintela 649d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque) 650d32ca5adSJuan Quintela { 651d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 652d32ca5adSJuan Quintela Error *local_err = NULL; 653d32ca5adSJuan Quintela int ret = 0; 654b7dbdd8eSLeonardo Bras bool use_zero_copy_send = migrate_use_zero_copy_send(); 655d32ca5adSJuan Quintela 656d32ca5adSJuan Quintela trace_multifd_send_thread_start(p->id); 657d32ca5adSJuan Quintela rcu_register_thread(); 658d32ca5adSJuan Quintela 659d32ca5adSJuan Quintela if (multifd_send_initial_packet(p, &local_err) < 0) { 660d32ca5adSJuan Quintela ret = -1; 661d32ca5adSJuan Quintela goto out; 662d32ca5adSJuan Quintela } 663d32ca5adSJuan Quintela /* initial packet */ 664d32ca5adSJuan Quintela p->num_packets = 1; 665d32ca5adSJuan Quintela 666d32ca5adSJuan Quintela while (true) { 667d32ca5adSJuan Quintela qemu_sem_wait(&p->sem); 668d32ca5adSJuan Quintela 669d73415a3SStefan Hajnoczi if (qatomic_read(&multifd_send_state->exiting)) { 670d32ca5adSJuan Quintela break; 671d32ca5adSJuan Quintela } 672d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 673d32ca5adSJuan Quintela 674d32ca5adSJuan Quintela if (p->pending_job) { 675d32ca5adSJuan Quintela uint64_t packet_num = p->packet_num; 6761943c11aSJuan Quintela uint32_t flags = p->flags; 677815956f0SJuan Quintela p->normal_num = 0; 678d32ca5adSJuan Quintela 679b7dbdd8eSLeonardo Bras if (use_zero_copy_send) { 680b7dbdd8eSLeonardo Bras p->iovs_num = 0; 681b7dbdd8eSLeonardo Bras } else { 682b7dbdd8eSLeonardo Bras p->iovs_num = 1; 683b7dbdd8eSLeonardo Bras } 684b7dbdd8eSLeonardo Bras 685815956f0SJuan Quintela for (int i = 0; i < p->pages->num; i++) { 686815956f0SJuan Quintela p->normal[p->normal_num] = p->pages->offset[i]; 687815956f0SJuan Quintela p->normal_num++; 688815956f0SJuan Quintela } 689815956f0SJuan Quintela 690815956f0SJuan Quintela if (p->normal_num) { 69102fb8104SJuan Quintela ret = multifd_send_state->ops->send_prepare(p, &local_err); 692ab7cbb0bSJuan Quintela if (ret != 0) { 693ab7cbb0bSJuan Quintela qemu_mutex_unlock(&p->mutex); 694ab7cbb0bSJuan Quintela break; 695ab7cbb0bSJuan Quintela } 696ab7cbb0bSJuan Quintela } 697d32ca5adSJuan Quintela multifd_send_fill_packet(p); 698d32ca5adSJuan Quintela p->flags = 0; 699d32ca5adSJuan Quintela p->num_packets++; 700815956f0SJuan Quintela p->total_normal_pages += p->normal_num; 70190a3d2f9SJuan Quintela p->pages->num = 0; 702d32ca5adSJuan Quintela p->pages->block = NULL; 703d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 704d32ca5adSJuan Quintela 705815956f0SJuan Quintela trace_multifd_send(p->id, packet_num, p->normal_num, flags, 706d32ca5adSJuan Quintela p->next_packet_size); 707d32ca5adSJuan Quintela 708b7dbdd8eSLeonardo Bras if (use_zero_copy_send) { 709b7dbdd8eSLeonardo Bras /* Send header first, without zerocopy */ 710b7dbdd8eSLeonardo Bras ret = qio_channel_write_all(p->c, (void *)p->packet, 711b7dbdd8eSLeonardo Bras p->packet_len, &local_err); 712b7dbdd8eSLeonardo Bras if (ret != 0) { 713b7dbdd8eSLeonardo Bras break; 714b7dbdd8eSLeonardo Bras } 715b7dbdd8eSLeonardo Bras } else { 716b7dbdd8eSLeonardo Bras /* Send header using the same writev call */ 717d48c3a04SJuan Quintela p->iov[0].iov_len = p->packet_len; 718d48c3a04SJuan Quintela p->iov[0].iov_base = p->packet; 719b7dbdd8eSLeonardo Bras } 720d32ca5adSJuan Quintela 7215b1d9babSLeonardo Bras ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 7225b1d9babSLeonardo Bras 0, p->write_flags, &local_err); 723d32ca5adSJuan Quintela if (ret != 0) { 724d32ca5adSJuan Quintela break; 725d32ca5adSJuan Quintela } 726d32ca5adSJuan Quintela 727d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 728d32ca5adSJuan Quintela p->pending_job--; 729d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 730d32ca5adSJuan Quintela 731d32ca5adSJuan Quintela if (flags & MULTIFD_FLAG_SYNC) { 732d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 733d32ca5adSJuan Quintela } 734d32ca5adSJuan Quintela qemu_sem_post(&multifd_send_state->channels_ready); 735d32ca5adSJuan Quintela } else if (p->quit) { 736d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 737d32ca5adSJuan Quintela break; 738d32ca5adSJuan Quintela } else { 739d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 740d32ca5adSJuan Quintela /* sometimes there are spurious wakeups */ 741d32ca5adSJuan Quintela } 742d32ca5adSJuan Quintela } 743d32ca5adSJuan Quintela 744d32ca5adSJuan Quintela out: 745d32ca5adSJuan Quintela if (local_err) { 746d32ca5adSJuan Quintela trace_multifd_send_error(p->id); 747d32ca5adSJuan Quintela multifd_send_terminate_threads(local_err); 74813f2cb21SPan Nengyuan error_free(local_err); 749d32ca5adSJuan Quintela } 750d32ca5adSJuan Quintela 751d32ca5adSJuan Quintela /* 752d32ca5adSJuan Quintela * Error happen, I will exit, but I can't just leave, tell 753d32ca5adSJuan Quintela * who pay attention to me. 754d32ca5adSJuan Quintela */ 755d32ca5adSJuan Quintela if (ret != 0) { 756d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 757d32ca5adSJuan Quintela qemu_sem_post(&multifd_send_state->channels_ready); 758d32ca5adSJuan Quintela } 759d32ca5adSJuan Quintela 760d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 761d32ca5adSJuan Quintela p->running = false; 762d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 763d32ca5adSJuan Quintela 764d32ca5adSJuan Quintela rcu_unregister_thread(); 765815956f0SJuan Quintela trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages); 766d32ca5adSJuan Quintela 767d32ca5adSJuan Quintela return NULL; 768d32ca5adSJuan Quintela } 769d32ca5adSJuan Quintela 77029647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p, 77129647140SChuan Zheng QIOChannel *ioc, 77229647140SChuan Zheng Error *error); 77329647140SChuan Zheng 77429647140SChuan Zheng static void multifd_tls_outgoing_handshake(QIOTask *task, 77529647140SChuan Zheng gpointer opaque) 77629647140SChuan Zheng { 77729647140SChuan Zheng MultiFDSendParams *p = opaque; 77829647140SChuan Zheng QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 77929647140SChuan Zheng Error *err = NULL; 78029647140SChuan Zheng 781894f0214SChuan Zheng if (qio_task_propagate_error(task, &err)) { 782894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); 783894f0214SChuan Zheng } else { 784894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_complete(ioc); 785894f0214SChuan Zheng } 786fca67642SHao Wang 787fca67642SHao Wang if (!multifd_channel_connect(p, ioc, err)) { 788fca67642SHao Wang /* 789fca67642SHao Wang * Error happen, mark multifd_send_thread status as 'quit' although it 790fca67642SHao Wang * is not created, and then tell who pay attention to me. 791fca67642SHao Wang */ 792fca67642SHao Wang p->quit = true; 793fca67642SHao Wang qemu_sem_post(&multifd_send_state->channels_ready); 794fca67642SHao Wang qemu_sem_post(&p->sem_sync); 795fca67642SHao Wang } 79629647140SChuan Zheng } 79729647140SChuan Zheng 798a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque) 799a1af605bSChuan Zheng { 800a1af605bSChuan Zheng MultiFDSendParams *p = opaque; 801a1af605bSChuan Zheng QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); 802a1af605bSChuan Zheng 803a1af605bSChuan Zheng qio_channel_tls_handshake(tioc, 804a1af605bSChuan Zheng multifd_tls_outgoing_handshake, 805a1af605bSChuan Zheng p, 806a1af605bSChuan Zheng NULL, 807a1af605bSChuan Zheng NULL); 808a1af605bSChuan Zheng return NULL; 809a1af605bSChuan Zheng } 810a1af605bSChuan Zheng 81129647140SChuan Zheng static void multifd_tls_channel_connect(MultiFDSendParams *p, 81229647140SChuan Zheng QIOChannel *ioc, 81329647140SChuan Zheng Error **errp) 81429647140SChuan Zheng { 81529647140SChuan Zheng MigrationState *s = migrate_get_current(); 8167f692ec7SPeter Xu const char *hostname = s->hostname; 81729647140SChuan Zheng QIOChannelTLS *tioc; 81829647140SChuan Zheng 81929647140SChuan Zheng tioc = migration_tls_client_create(s, ioc, hostname, errp); 82029647140SChuan Zheng if (!tioc) { 82129647140SChuan Zheng return; 82229647140SChuan Zheng } 82329647140SChuan Zheng 8249e842408SChuan Zheng object_unref(OBJECT(ioc)); 825894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 82629647140SChuan Zheng qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 827a1af605bSChuan Zheng p->c = QIO_CHANNEL(tioc); 828a1af605bSChuan Zheng qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", 829a1af605bSChuan Zheng multifd_tls_handshake_thread, p, 830a1af605bSChuan Zheng QEMU_THREAD_JOINABLE); 83129647140SChuan Zheng } 83229647140SChuan Zheng 83329647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p, 83429647140SChuan Zheng QIOChannel *ioc, 83529647140SChuan Zheng Error *error) 83629647140SChuan Zheng { 837894f0214SChuan Zheng trace_multifd_set_outgoing_channel( 8387f692ec7SPeter Xu ioc, object_get_typename(OBJECT(ioc)), 8397f692ec7SPeter Xu migrate_get_current()->hostname, error); 840894f0214SChuan Zheng 84129647140SChuan Zheng if (!error) { 84285a8578eSPeter Xu if (migrate_channel_requires_tls_upgrade(ioc)) { 84329647140SChuan Zheng multifd_tls_channel_connect(p, ioc, &error); 84429647140SChuan Zheng if (!error) { 84529647140SChuan Zheng /* 84629647140SChuan Zheng * tls_channel_connect will call back to this 84729647140SChuan Zheng * function after the TLS handshake, 84829647140SChuan Zheng * so we mustn't call multifd_send_thread until then 84929647140SChuan Zheng */ 85029647140SChuan Zheng return true; 851a339149aSHao Wang } else { 852a339149aSHao Wang return false; 85329647140SChuan Zheng } 85429647140SChuan Zheng } else { 85520171ea8SLukas Straub migration_ioc_register_yank(ioc); 85620171ea8SLukas Straub p->registered_yank = true; 85729647140SChuan Zheng p->c = ioc; 85829647140SChuan Zheng qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 85929647140SChuan Zheng QEMU_THREAD_JOINABLE); 86029647140SChuan Zheng } 861a339149aSHao Wang return true; 86229647140SChuan Zheng } 86329647140SChuan Zheng 864a339149aSHao Wang return false; 86529647140SChuan Zheng } 86629647140SChuan Zheng 86703c7a42dSChuan Zheng static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, 86803c7a42dSChuan Zheng QIOChannel *ioc, Error *err) 86903c7a42dSChuan Zheng { 87003c7a42dSChuan Zheng migrate_set_error(migrate_get_current(), err); 87103c7a42dSChuan Zheng /* Error happen, we need to tell who pay attention to me */ 87203c7a42dSChuan Zheng qemu_sem_post(&multifd_send_state->channels_ready); 87303c7a42dSChuan Zheng qemu_sem_post(&p->sem_sync); 87403c7a42dSChuan Zheng /* 87503c7a42dSChuan Zheng * Although multifd_send_thread is not created, but main migration 87603c7a42dSChuan Zheng * thread neet to judge whether it is running, so we need to mark 87703c7a42dSChuan Zheng * its status. 87803c7a42dSChuan Zheng */ 87903c7a42dSChuan Zheng p->quit = true; 88003c7a42dSChuan Zheng object_unref(OBJECT(ioc)); 88103c7a42dSChuan Zheng error_free(err); 88203c7a42dSChuan Zheng } 88303c7a42dSChuan Zheng 884d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 885d32ca5adSJuan Quintela { 886d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 887d32ca5adSJuan Quintela QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); 888d32ca5adSJuan Quintela Error *local_err = NULL; 889d32ca5adSJuan Quintela 890d32ca5adSJuan Quintela trace_multifd_new_send_channel_async(p->id); 891d32ca5adSJuan Quintela if (qio_task_propagate_error(task, &local_err)) { 89203c7a42dSChuan Zheng goto cleanup; 893d32ca5adSJuan Quintela } else { 894d32ca5adSJuan Quintela p->c = QIO_CHANNEL(sioc); 895d32ca5adSJuan Quintela qio_channel_set_delay(p->c, false); 896d32ca5adSJuan Quintela p->running = true; 897a339149aSHao Wang if (!multifd_channel_connect(p, sioc, local_err)) { 89829647140SChuan Zheng goto cleanup; 89929647140SChuan Zheng } 90003c7a42dSChuan Zheng return; 901d32ca5adSJuan Quintela } 90203c7a42dSChuan Zheng 90303c7a42dSChuan Zheng cleanup: 90403c7a42dSChuan Zheng multifd_new_send_channel_cleanup(p, sioc, local_err); 905d32ca5adSJuan Quintela } 906d32ca5adSJuan Quintela 907d32ca5adSJuan Quintela int multifd_save_setup(Error **errp) 908d32ca5adSJuan Quintela { 909d32ca5adSJuan Quintela int thread_count; 910d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 911d32ca5adSJuan Quintela uint8_t i; 912d32ca5adSJuan Quintela 913d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 914d32ca5adSJuan Quintela return 0; 915d32ca5adSJuan Quintela } 916f444eedaSPeter Xu if (!migrate_multi_channels_is_allowed()) { 917b7acd657SLi Zhijian error_setg(errp, "multifd is not supported by current protocol"); 918b7acd657SLi Zhijian return -1; 919b7acd657SLi Zhijian } 920b7acd657SLi Zhijian 921d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 922d32ca5adSJuan Quintela multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 923d32ca5adSJuan Quintela multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 924d32ca5adSJuan Quintela multifd_send_state->pages = multifd_pages_init(page_count); 925d32ca5adSJuan Quintela qemu_sem_init(&multifd_send_state->channels_ready, 0); 926d73415a3SStefan Hajnoczi qatomic_set(&multifd_send_state->exiting, 0); 927ab7cbb0bSJuan Quintela multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 928d32ca5adSJuan Quintela 929d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 930d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 931d32ca5adSJuan Quintela 932d32ca5adSJuan Quintela qemu_mutex_init(&p->mutex); 933d32ca5adSJuan Quintela qemu_sem_init(&p->sem, 0); 934d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 935d32ca5adSJuan Quintela p->quit = false; 936d32ca5adSJuan Quintela p->pending_job = 0; 937d32ca5adSJuan Quintela p->id = i; 938d32ca5adSJuan Quintela p->pages = multifd_pages_init(page_count); 939d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 940d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 941d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 942d32ca5adSJuan Quintela p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 943d32ca5adSJuan Quintela p->packet->version = cpu_to_be32(MULTIFD_VERSION); 944d32ca5adSJuan Quintela p->name = g_strdup_printf("multifdsend_%d", i); 945d48c3a04SJuan Quintela /* We need one extra place for the packet header */ 946d48c3a04SJuan Quintela p->iov = g_new0(struct iovec, page_count + 1); 947815956f0SJuan Quintela p->normal = g_new0(ram_addr_t, page_count); 948ddec20f8SJuan Quintela p->page_size = qemu_target_page_size(); 949d6f45ebaSJuan Quintela p->page_count = page_count; 9505b1d9babSLeonardo Bras 9515b1d9babSLeonardo Bras if (migrate_use_zero_copy_send()) { 9525b1d9babSLeonardo Bras p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 9535b1d9babSLeonardo Bras } else { 9545b1d9babSLeonardo Bras p->write_flags = 0; 9555b1d9babSLeonardo Bras } 9565b1d9babSLeonardo Bras 957d32ca5adSJuan Quintela socket_send_channel_create(multifd_new_send_channel_async, p); 958d32ca5adSJuan Quintela } 959ab7cbb0bSJuan Quintela 960ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 961ab7cbb0bSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 962ab7cbb0bSJuan Quintela Error *local_err = NULL; 963ab7cbb0bSJuan Quintela int ret; 964ab7cbb0bSJuan Quintela 965ab7cbb0bSJuan Quintela ret = multifd_send_state->ops->send_setup(p, &local_err); 966ab7cbb0bSJuan Quintela if (ret) { 967ab7cbb0bSJuan Quintela error_propagate(errp, local_err); 968ab7cbb0bSJuan Quintela return ret; 969ab7cbb0bSJuan Quintela } 970ab7cbb0bSJuan Quintela } 971d32ca5adSJuan Quintela return 0; 972d32ca5adSJuan Quintela } 973d32ca5adSJuan Quintela 974d32ca5adSJuan Quintela struct { 975d32ca5adSJuan Quintela MultiFDRecvParams *params; 976d32ca5adSJuan Quintela /* number of created threads */ 977d32ca5adSJuan Quintela int count; 978d32ca5adSJuan Quintela /* syncs main thread and channels */ 979d32ca5adSJuan Quintela QemuSemaphore sem_sync; 980d32ca5adSJuan Quintela /* global number of generated multifd packets */ 981d32ca5adSJuan Quintela uint64_t packet_num; 982ab7cbb0bSJuan Quintela /* multifd ops */ 983ab7cbb0bSJuan Quintela MultiFDMethods *ops; 984d32ca5adSJuan Quintela } *multifd_recv_state; 985d32ca5adSJuan Quintela 986d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err) 987d32ca5adSJuan Quintela { 988d32ca5adSJuan Quintela int i; 989d32ca5adSJuan Quintela 990d32ca5adSJuan Quintela trace_multifd_recv_terminate_threads(err != NULL); 991d32ca5adSJuan Quintela 992d32ca5adSJuan Quintela if (err) { 993d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 994d32ca5adSJuan Quintela migrate_set_error(s, err); 995d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 996d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 997d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 998d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 999d32ca5adSJuan Quintela } 1000d32ca5adSJuan Quintela } 1001d32ca5adSJuan Quintela 1002d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1003d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1004d32ca5adSJuan Quintela 1005d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1006d32ca5adSJuan Quintela p->quit = true; 1007d32ca5adSJuan Quintela /* 1008d32ca5adSJuan Quintela * We could arrive here for two reasons: 1009d32ca5adSJuan Quintela * - normal quit, i.e. everything went fine, just finished 1010d32ca5adSJuan Quintela * - error quit: We close the channels so the channel threads 1011d32ca5adSJuan Quintela * finish the qio_channel_read_all_eof() 1012d32ca5adSJuan Quintela */ 1013d32ca5adSJuan Quintela if (p->c) { 1014d32ca5adSJuan Quintela qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1015d32ca5adSJuan Quintela } 1016d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1017d32ca5adSJuan Quintela } 1018d32ca5adSJuan Quintela } 1019d32ca5adSJuan Quintela 1020d32ca5adSJuan Quintela int multifd_load_cleanup(Error **errp) 1021d32ca5adSJuan Quintela { 1022d32ca5adSJuan Quintela int i; 1023d32ca5adSJuan Quintela 1024f444eedaSPeter Xu if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { 1025d32ca5adSJuan Quintela return 0; 1026d32ca5adSJuan Quintela } 1027d32ca5adSJuan Quintela multifd_recv_terminate_threads(NULL); 1028d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1029d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1030d32ca5adSJuan Quintela 1031d32ca5adSJuan Quintela if (p->running) { 1032d32ca5adSJuan Quintela p->quit = true; 1033d32ca5adSJuan Quintela /* 1034d32ca5adSJuan Quintela * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 1035d32ca5adSJuan Quintela * however try to wakeup it without harm in cleanup phase. 1036d32ca5adSJuan Quintela */ 1037d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 1038d32ca5adSJuan Quintela qemu_thread_join(&p->thread); 1039d32ca5adSJuan Quintela } 1040d32ca5adSJuan Quintela } 1041d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1042d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1043d32ca5adSJuan Quintela 104418711405SPeter Xu migration_ioc_unregister_yank(p->c); 1045d32ca5adSJuan Quintela object_unref(OBJECT(p->c)); 1046d32ca5adSJuan Quintela p->c = NULL; 1047d32ca5adSJuan Quintela qemu_mutex_destroy(&p->mutex); 1048d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem_sync); 1049d32ca5adSJuan Quintela g_free(p->name); 1050d32ca5adSJuan Quintela p->name = NULL; 1051d32ca5adSJuan Quintela p->packet_len = 0; 1052d32ca5adSJuan Quintela g_free(p->packet); 1053d32ca5adSJuan Quintela p->packet = NULL; 1054226468baSJuan Quintela g_free(p->iov); 1055226468baSJuan Quintela p->iov = NULL; 1056cf2d4aa8SJuan Quintela g_free(p->normal); 1057cf2d4aa8SJuan Quintela p->normal = NULL; 1058ab7cbb0bSJuan Quintela multifd_recv_state->ops->recv_cleanup(p); 1059d32ca5adSJuan Quintela } 1060d32ca5adSJuan Quintela qemu_sem_destroy(&multifd_recv_state->sem_sync); 1061d32ca5adSJuan Quintela g_free(multifd_recv_state->params); 1062d32ca5adSJuan Quintela multifd_recv_state->params = NULL; 1063d32ca5adSJuan Quintela g_free(multifd_recv_state); 1064d32ca5adSJuan Quintela multifd_recv_state = NULL; 1065d32ca5adSJuan Quintela 1066ab7cbb0bSJuan Quintela return 0; 1067d32ca5adSJuan Quintela } 1068d32ca5adSJuan Quintela 1069d32ca5adSJuan Quintela void multifd_recv_sync_main(void) 1070d32ca5adSJuan Quintela { 1071d32ca5adSJuan Quintela int i; 1072d32ca5adSJuan Quintela 1073d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 1074d32ca5adSJuan Quintela return; 1075d32ca5adSJuan Quintela } 1076d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1077d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1078d32ca5adSJuan Quintela 1079d32ca5adSJuan Quintela trace_multifd_recv_sync_main_wait(p->id); 1080d32ca5adSJuan Quintela qemu_sem_wait(&multifd_recv_state->sem_sync); 1081d32ca5adSJuan Quintela } 1082d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1083d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1084d32ca5adSJuan Quintela 10856e8a355dSDaniel Brodsky WITH_QEMU_LOCK_GUARD(&p->mutex) { 1086d32ca5adSJuan Quintela if (multifd_recv_state->packet_num < p->packet_num) { 1087d32ca5adSJuan Quintela multifd_recv_state->packet_num = p->packet_num; 1088d32ca5adSJuan Quintela } 10896e8a355dSDaniel Brodsky } 1090d32ca5adSJuan Quintela trace_multifd_recv_sync_main_signal(p->id); 1091d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 1092d32ca5adSJuan Quintela } 1093d32ca5adSJuan Quintela trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1094d32ca5adSJuan Quintela } 1095d32ca5adSJuan Quintela 1096d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque) 1097d32ca5adSJuan Quintela { 1098d32ca5adSJuan Quintela MultiFDRecvParams *p = opaque; 1099d32ca5adSJuan Quintela Error *local_err = NULL; 1100d32ca5adSJuan Quintela int ret; 1101d32ca5adSJuan Quintela 1102d32ca5adSJuan Quintela trace_multifd_recv_thread_start(p->id); 1103d32ca5adSJuan Quintela rcu_register_thread(); 1104d32ca5adSJuan Quintela 1105d32ca5adSJuan Quintela while (true) { 1106d32ca5adSJuan Quintela uint32_t flags; 1107d32ca5adSJuan Quintela 1108d32ca5adSJuan Quintela if (p->quit) { 1109d32ca5adSJuan Quintela break; 1110d32ca5adSJuan Quintela } 1111d32ca5adSJuan Quintela 1112d32ca5adSJuan Quintela ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1113d32ca5adSJuan Quintela p->packet_len, &local_err); 1114d32ca5adSJuan Quintela if (ret == 0) { /* EOF */ 1115d32ca5adSJuan Quintela break; 1116d32ca5adSJuan Quintela } 1117d32ca5adSJuan Quintela if (ret == -1) { /* Error */ 1118d32ca5adSJuan Quintela break; 1119d32ca5adSJuan Quintela } 1120d32ca5adSJuan Quintela 1121d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1122d32ca5adSJuan Quintela ret = multifd_recv_unfill_packet(p, &local_err); 1123d32ca5adSJuan Quintela if (ret) { 1124d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1125d32ca5adSJuan Quintela break; 1126d32ca5adSJuan Quintela } 1127d32ca5adSJuan Quintela 1128d32ca5adSJuan Quintela flags = p->flags; 1129ab7cbb0bSJuan Quintela /* recv methods don't know how to handle the SYNC flag */ 1130ab7cbb0bSJuan Quintela p->flags &= ~MULTIFD_FLAG_SYNC; 1131cf2d4aa8SJuan Quintela trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags, 1132d32ca5adSJuan Quintela p->next_packet_size); 1133d32ca5adSJuan Quintela p->num_packets++; 1134cf2d4aa8SJuan Quintela p->total_normal_pages += p->normal_num; 1135d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1136d32ca5adSJuan Quintela 1137cf2d4aa8SJuan Quintela if (p->normal_num) { 113840a4bfe9SJuan Quintela ret = multifd_recv_state->ops->recv_pages(p, &local_err); 1139d32ca5adSJuan Quintela if (ret != 0) { 1140d32ca5adSJuan Quintela break; 1141d32ca5adSJuan Quintela } 1142d32ca5adSJuan Quintela } 1143d32ca5adSJuan Quintela 1144d32ca5adSJuan Quintela if (flags & MULTIFD_FLAG_SYNC) { 1145d32ca5adSJuan Quintela qemu_sem_post(&multifd_recv_state->sem_sync); 1146d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 1147d32ca5adSJuan Quintela } 1148d32ca5adSJuan Quintela } 1149d32ca5adSJuan Quintela 1150d32ca5adSJuan Quintela if (local_err) { 1151d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 115213f2cb21SPan Nengyuan error_free(local_err); 1153d32ca5adSJuan Quintela } 1154d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1155d32ca5adSJuan Quintela p->running = false; 1156d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1157d32ca5adSJuan Quintela 1158d32ca5adSJuan Quintela rcu_unregister_thread(); 1159cf2d4aa8SJuan Quintela trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages); 1160d32ca5adSJuan Quintela 1161d32ca5adSJuan Quintela return NULL; 1162d32ca5adSJuan Quintela } 1163d32ca5adSJuan Quintela 1164d32ca5adSJuan Quintela int multifd_load_setup(Error **errp) 1165d32ca5adSJuan Quintela { 1166d32ca5adSJuan Quintela int thread_count; 1167d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1168d32ca5adSJuan Quintela uint8_t i; 1169d32ca5adSJuan Quintela 11706720c2b3Smanish.mishra /* 11716720c2b3Smanish.mishra * Return successfully if multiFD recv state is already initialised 11726720c2b3Smanish.mishra * or multiFD is not enabled. 11736720c2b3Smanish.mishra */ 11746720c2b3Smanish.mishra if (multifd_recv_state || !migrate_use_multifd()) { 1175d32ca5adSJuan Quintela return 0; 1176d32ca5adSJuan Quintela } 11776720c2b3Smanish.mishra 1178f444eedaSPeter Xu if (!migrate_multi_channels_is_allowed()) { 1179b7acd657SLi Zhijian error_setg(errp, "multifd is not supported by current protocol"); 1180b7acd657SLi Zhijian return -1; 1181b7acd657SLi Zhijian } 1182d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 1183d32ca5adSJuan Quintela multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1184d32ca5adSJuan Quintela multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1185d73415a3SStefan Hajnoczi qatomic_set(&multifd_recv_state->count, 0); 1186d32ca5adSJuan Quintela qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1187ab7cbb0bSJuan Quintela multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1188d32ca5adSJuan Quintela 1189d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 1190d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1191d32ca5adSJuan Quintela 1192d32ca5adSJuan Quintela qemu_mutex_init(&p->mutex); 1193d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 1194d32ca5adSJuan Quintela p->quit = false; 1195d32ca5adSJuan Quintela p->id = i; 1196d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 1197d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 1198d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 1199d32ca5adSJuan Quintela p->name = g_strdup_printf("multifdrecv_%d", i); 1200226468baSJuan Quintela p->iov = g_new0(struct iovec, page_count); 1201cf2d4aa8SJuan Quintela p->normal = g_new0(ram_addr_t, page_count); 1202d6f45ebaSJuan Quintela p->page_count = page_count; 1203ddec20f8SJuan Quintela p->page_size = qemu_target_page_size(); 1204d32ca5adSJuan Quintela } 1205ab7cbb0bSJuan Quintela 1206ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 1207ab7cbb0bSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1208ab7cbb0bSJuan Quintela Error *local_err = NULL; 1209ab7cbb0bSJuan Quintela int ret; 1210ab7cbb0bSJuan Quintela 1211ab7cbb0bSJuan Quintela ret = multifd_recv_state->ops->recv_setup(p, &local_err); 1212ab7cbb0bSJuan Quintela if (ret) { 1213ab7cbb0bSJuan Quintela error_propagate(errp, local_err); 1214ab7cbb0bSJuan Quintela return ret; 1215ab7cbb0bSJuan Quintela } 1216ab7cbb0bSJuan Quintela } 1217d32ca5adSJuan Quintela return 0; 1218d32ca5adSJuan Quintela } 1219d32ca5adSJuan Quintela 1220d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void) 1221d32ca5adSJuan Quintela { 1222d32ca5adSJuan Quintela int thread_count = migrate_multifd_channels(); 1223d32ca5adSJuan Quintela 1224d32ca5adSJuan Quintela if (!migrate_use_multifd()) { 1225d32ca5adSJuan Quintela return true; 1226d32ca5adSJuan Quintela } 1227d32ca5adSJuan Quintela 1228a59136f3SDr. David Alan Gilbert if (!multifd_recv_state) { 1229a59136f3SDr. David Alan Gilbert /* Called before any connections created */ 1230a59136f3SDr. David Alan Gilbert return false; 1231a59136f3SDr. David Alan Gilbert } 1232a59136f3SDr. David Alan Gilbert 1233d73415a3SStefan Hajnoczi return thread_count == qatomic_read(&multifd_recv_state->count); 1234d32ca5adSJuan Quintela } 1235d32ca5adSJuan Quintela 1236d32ca5adSJuan Quintela /* 1237d32ca5adSJuan Quintela * Try to receive all multifd channels to get ready for the migration. 12386720c2b3Smanish.mishra * Sets @errp when failing to receive the current channel. 1239d32ca5adSJuan Quintela */ 12406720c2b3Smanish.mishra void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1241d32ca5adSJuan Quintela { 1242d32ca5adSJuan Quintela MultiFDRecvParams *p; 1243d32ca5adSJuan Quintela Error *local_err = NULL; 1244d32ca5adSJuan Quintela int id; 1245d32ca5adSJuan Quintela 1246d32ca5adSJuan Quintela id = multifd_recv_initial_packet(ioc, &local_err); 1247d32ca5adSJuan Quintela if (id < 0) { 1248d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1249d32ca5adSJuan Quintela error_propagate_prepend(errp, local_err, 1250d32ca5adSJuan Quintela "failed to receive packet" 1251d32ca5adSJuan Quintela " via multifd channel %d: ", 1252d73415a3SStefan Hajnoczi qatomic_read(&multifd_recv_state->count)); 12536720c2b3Smanish.mishra return; 1254d32ca5adSJuan Quintela } 1255d32ca5adSJuan Quintela trace_multifd_recv_new_channel(id); 1256d32ca5adSJuan Quintela 1257d32ca5adSJuan Quintela p = &multifd_recv_state->params[id]; 1258d32ca5adSJuan Quintela if (p->c != NULL) { 1259d32ca5adSJuan Quintela error_setg(&local_err, "multifd: received id '%d' already setup'", 1260d32ca5adSJuan Quintela id); 1261d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1262d32ca5adSJuan Quintela error_propagate(errp, local_err); 12636720c2b3Smanish.mishra return; 1264d32ca5adSJuan Quintela } 1265d32ca5adSJuan Quintela p->c = ioc; 1266d32ca5adSJuan Quintela object_ref(OBJECT(ioc)); 1267d32ca5adSJuan Quintela /* initial packet */ 1268d32ca5adSJuan Quintela p->num_packets = 1; 1269d32ca5adSJuan Quintela 1270d32ca5adSJuan Quintela p->running = true; 1271d32ca5adSJuan Quintela qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1272d32ca5adSJuan Quintela QEMU_THREAD_JOINABLE); 1273d73415a3SStefan Hajnoczi qatomic_inc(&multifd_recv_state->count); 1274d32ca5adSJuan Quintela } 1275