1d32ca5adSJuan Quintela /* 2d32ca5adSJuan Quintela * Multifd common code 3d32ca5adSJuan Quintela * 4d32ca5adSJuan Quintela * Copyright (c) 2019-2020 Red Hat Inc 5d32ca5adSJuan Quintela * 6d32ca5adSJuan Quintela * Authors: 7d32ca5adSJuan Quintela * Juan Quintela <quintela@redhat.com> 8d32ca5adSJuan Quintela * 9d32ca5adSJuan Quintela * This work is licensed under the terms of the GNU GPL, version 2 or later. 10d32ca5adSJuan Quintela * See the COPYING file in the top-level directory. 11d32ca5adSJuan Quintela */ 12d32ca5adSJuan Quintela 13d32ca5adSJuan Quintela #include "qemu/osdep.h" 14d32ca5adSJuan Quintela #include "qemu/rcu.h" 15d32ca5adSJuan Quintela #include "exec/target_page.h" 16d32ca5adSJuan Quintela #include "sysemu/sysemu.h" 17d32ca5adSJuan Quintela #include "exec/ramblock.h" 18d32ca5adSJuan Quintela #include "qemu/error-report.h" 19d32ca5adSJuan Quintela #include "qapi/error.h" 20d32ca5adSJuan Quintela #include "ram.h" 21d32ca5adSJuan Quintela #include "migration.h" 22947701ccSJuan Quintela #include "migration-stats.h" 23d32ca5adSJuan Quintela #include "socket.h" 2429647140SChuan Zheng #include "tls.h" 25d32ca5adSJuan Quintela #include "qemu-file.h" 26d32ca5adSJuan Quintela #include "trace.h" 27d32ca5adSJuan Quintela #include "multifd.h" 281b1f4ab6SJiang Jiacheng #include "threadinfo.h" 29b4bc342cSJuan Quintela #include "options.h" 30b5eea99eSLukas Straub #include "qemu/yank.h" 31b5eea99eSLukas Straub #include "io/channel-socket.h" 321a92d6d5SLukas Straub #include "yank_functions.h" 33b5eea99eSLukas Straub 34d32ca5adSJuan Quintela /* Multiple fd's */ 35d32ca5adSJuan Quintela 36d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U 37d32ca5adSJuan Quintela #define MULTIFD_VERSION 1 38d32ca5adSJuan Quintela 39d32ca5adSJuan Quintela typedef struct { 40d32ca5adSJuan Quintela uint32_t magic; 41d32ca5adSJuan Quintela uint32_t version; 42d32ca5adSJuan Quintela unsigned char uuid[16]; /* QemuUUID */ 43d32ca5adSJuan Quintela uint8_t id; 44d32ca5adSJuan Quintela uint8_t unused1[7]; /* Reserved for future use */ 45d32ca5adSJuan Quintela uint64_t unused2[4]; /* Reserved for future use */ 46d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t; 47d32ca5adSJuan Quintela 4898ea497dSPeter Xu struct { 4998ea497dSPeter Xu MultiFDSendParams *params; 5098ea497dSPeter Xu /* array of pages to sent */ 5198ea497dSPeter Xu MultiFDPages_t *pages; 5298ea497dSPeter Xu /* 5398ea497dSPeter Xu * Global number of generated multifd packets. 5498ea497dSPeter Xu * 5598ea497dSPeter Xu * Note that we used 'uintptr_t' because it'll naturally support atomic 5698ea497dSPeter Xu * operations on both 32bit / 64 bits hosts. It means on 32bit systems 5798ea497dSPeter Xu * multifd will overflow the packet_num easier, but that should be 5898ea497dSPeter Xu * fine. 5998ea497dSPeter Xu * 6098ea497dSPeter Xu * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 6198ea497dSPeter Xu * hosts, however so far it does not support atomic fetch_add() yet. 6298ea497dSPeter Xu * Make it easy for now. 6398ea497dSPeter Xu */ 6498ea497dSPeter Xu uintptr_t packet_num; 6598ea497dSPeter Xu /* send channels ready */ 6698ea497dSPeter Xu QemuSemaphore channels_ready; 6798ea497dSPeter Xu /* 6898ea497dSPeter Xu * Have we already run terminate threads. There is a race when it 6998ea497dSPeter Xu * happens that we got one error while we are exiting. 7098ea497dSPeter Xu * We will use atomic operations. Only valid values are 0 and 1. 7198ea497dSPeter Xu */ 7298ea497dSPeter Xu int exiting; 7398ea497dSPeter Xu /* multifd ops */ 7498ea497dSPeter Xu MultiFDMethods *ops; 7598ea497dSPeter Xu } *multifd_send_state; 7698ea497dSPeter Xu 77ab7cbb0bSJuan Quintela /* Multifd without compression */ 78ab7cbb0bSJuan Quintela 79ab7cbb0bSJuan Quintela /** 80ab7cbb0bSJuan Quintela * nocomp_send_setup: setup send side 81ab7cbb0bSJuan Quintela * 82ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 83ab7cbb0bSJuan Quintela * @errp: pointer to an error 84ab7cbb0bSJuan Quintela */ 85ab7cbb0bSJuan Quintela static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 86ab7cbb0bSJuan Quintela { 8725a1f878SPeter Xu if (migrate_zero_copy_send()) { 8825a1f878SPeter Xu p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 8925a1f878SPeter Xu } 9025a1f878SPeter Xu 91ab7cbb0bSJuan Quintela return 0; 92ab7cbb0bSJuan Quintela } 93ab7cbb0bSJuan Quintela 94ab7cbb0bSJuan Quintela /** 95ab7cbb0bSJuan Quintela * nocomp_send_cleanup: cleanup send side 96ab7cbb0bSJuan Quintela * 97ab7cbb0bSJuan Quintela * For no compression this function does nothing. 98ab7cbb0bSJuan Quintela * 99ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 10018ede636SJuan Quintela * @errp: pointer to an error 101ab7cbb0bSJuan Quintela */ 102ab7cbb0bSJuan Quintela static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 103ab7cbb0bSJuan Quintela { 104ab7cbb0bSJuan Quintela return; 105ab7cbb0bSJuan Quintela } 106ab7cbb0bSJuan Quintela 107ab7cbb0bSJuan Quintela /** 108ab7cbb0bSJuan Quintela * nocomp_send_prepare: prepare date to be able to send 109ab7cbb0bSJuan Quintela * 110ab7cbb0bSJuan Quintela * For no compression we just have to calculate the size of the 111ab7cbb0bSJuan Quintela * packet. 112ab7cbb0bSJuan Quintela * 113ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 114ab7cbb0bSJuan Quintela * 115ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 116ab7cbb0bSJuan Quintela * @errp: pointer to an error 117ab7cbb0bSJuan Quintela */ 11802fb8104SJuan Quintela static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 119ab7cbb0bSJuan Quintela { 12025a1f878SPeter Xu bool use_zero_copy_send = migrate_zero_copy_send(); 121226468baSJuan Quintela MultiFDPages_t *pages = p->pages; 12225a1f878SPeter Xu int ret; 12325a1f878SPeter Xu 12425a1f878SPeter Xu if (!use_zero_copy_send) { 12525a1f878SPeter Xu /* 12625a1f878SPeter Xu * Only !zerocopy needs the header in IOV; zerocopy will 12725a1f878SPeter Xu * send it separately. 12825a1f878SPeter Xu */ 12925a1f878SPeter Xu multifd_send_prepare_header(p); 13025a1f878SPeter Xu } 131226468baSJuan Quintela 132efd8c543SPeter Xu for (int i = 0; i < pages->num; i++) { 133efd8c543SPeter Xu p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 134ddec20f8SJuan Quintela p->iov[p->iovs_num].iov_len = p->page_size; 135226468baSJuan Quintela p->iovs_num++; 136226468baSJuan Quintela } 137226468baSJuan Quintela 138efd8c543SPeter Xu p->next_packet_size = pages->num * p->page_size; 139ab7cbb0bSJuan Quintela p->flags |= MULTIFD_FLAG_NOCOMP; 14025a1f878SPeter Xu 14125a1f878SPeter Xu multifd_send_fill_packet(p); 14225a1f878SPeter Xu 14325a1f878SPeter Xu if (use_zero_copy_send) { 14425a1f878SPeter Xu /* Send header first, without zerocopy */ 14525a1f878SPeter Xu ret = qio_channel_write_all(p->c, (void *)p->packet, 14625a1f878SPeter Xu p->packet_len, errp); 14725a1f878SPeter Xu if (ret != 0) { 14825a1f878SPeter Xu return -1; 14925a1f878SPeter Xu } 15025a1f878SPeter Xu } 15125a1f878SPeter Xu 152ab7cbb0bSJuan Quintela return 0; 153ab7cbb0bSJuan Quintela } 154ab7cbb0bSJuan Quintela 155ab7cbb0bSJuan Quintela /** 156ab7cbb0bSJuan Quintela * nocomp_recv_setup: setup receive side 157ab7cbb0bSJuan Quintela * 158ab7cbb0bSJuan Quintela * For no compression this function does nothing. 159ab7cbb0bSJuan Quintela * 160ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 161ab7cbb0bSJuan Quintela * 162ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 163ab7cbb0bSJuan Quintela * @errp: pointer to an error 164ab7cbb0bSJuan Quintela */ 165ab7cbb0bSJuan Quintela static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 166ab7cbb0bSJuan Quintela { 167ab7cbb0bSJuan Quintela return 0; 168ab7cbb0bSJuan Quintela } 169ab7cbb0bSJuan Quintela 170ab7cbb0bSJuan Quintela /** 171ab7cbb0bSJuan Quintela * nocomp_recv_cleanup: setup receive side 172ab7cbb0bSJuan Quintela * 173ab7cbb0bSJuan Quintela * For no compression this function does nothing. 174ab7cbb0bSJuan Quintela * 175ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 176ab7cbb0bSJuan Quintela */ 177ab7cbb0bSJuan Quintela static void nocomp_recv_cleanup(MultiFDRecvParams *p) 178ab7cbb0bSJuan Quintela { 179ab7cbb0bSJuan Quintela } 180ab7cbb0bSJuan Quintela 181ab7cbb0bSJuan Quintela /** 182ab7cbb0bSJuan Quintela * nocomp_recv_pages: read the data from the channel into actual pages 183ab7cbb0bSJuan Quintela * 184ab7cbb0bSJuan Quintela * For no compression we just need to read things into the correct place. 185ab7cbb0bSJuan Quintela * 186ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 187ab7cbb0bSJuan Quintela * 188ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 189ab7cbb0bSJuan Quintela * @errp: pointer to an error 190ab7cbb0bSJuan Quintela */ 19140a4bfe9SJuan Quintela static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) 192ab7cbb0bSJuan Quintela { 193ab7cbb0bSJuan Quintela uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 194ab7cbb0bSJuan Quintela 195ab7cbb0bSJuan Quintela if (flags != MULTIFD_FLAG_NOCOMP) { 19604e11404SJuan Quintela error_setg(errp, "multifd %u: flags received %x flags expected %x", 197ab7cbb0bSJuan Quintela p->id, flags, MULTIFD_FLAG_NOCOMP); 198ab7cbb0bSJuan Quintela return -1; 199ab7cbb0bSJuan Quintela } 200cf2d4aa8SJuan Quintela for (int i = 0; i < p->normal_num; i++) { 201faf60935SJuan Quintela p->iov[i].iov_base = p->host + p->normal[i]; 202ddec20f8SJuan Quintela p->iov[i].iov_len = p->page_size; 203226468baSJuan Quintela } 204cf2d4aa8SJuan Quintela return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 205ab7cbb0bSJuan Quintela } 206ab7cbb0bSJuan Quintela 207ab7cbb0bSJuan Quintela static MultiFDMethods multifd_nocomp_ops = { 208ab7cbb0bSJuan Quintela .send_setup = nocomp_send_setup, 209ab7cbb0bSJuan Quintela .send_cleanup = nocomp_send_cleanup, 210ab7cbb0bSJuan Quintela .send_prepare = nocomp_send_prepare, 211ab7cbb0bSJuan Quintela .recv_setup = nocomp_recv_setup, 212ab7cbb0bSJuan Quintela .recv_cleanup = nocomp_recv_cleanup, 213ab7cbb0bSJuan Quintela .recv_pages = nocomp_recv_pages 214ab7cbb0bSJuan Quintela }; 215ab7cbb0bSJuan Quintela 216ab7cbb0bSJuan Quintela static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 217ab7cbb0bSJuan Quintela [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 218ab7cbb0bSJuan Quintela }; 219ab7cbb0bSJuan Quintela 2207ec2c2b3SJuan Quintela void multifd_register_ops(int method, MultiFDMethods *ops) 2217ec2c2b3SJuan Quintela { 2227ec2c2b3SJuan Quintela assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 2237ec2c2b3SJuan Quintela multifd_ops[method] = ops; 2247ec2c2b3SJuan Quintela } 2257ec2c2b3SJuan Quintela 226836eca47SPeter Xu /* Reset a MultiFDPages_t* object for the next use */ 227836eca47SPeter Xu static void multifd_pages_reset(MultiFDPages_t *pages) 228836eca47SPeter Xu { 229836eca47SPeter Xu /* 230836eca47SPeter Xu * We don't need to touch offset[] array, because it will be 231836eca47SPeter Xu * overwritten later when reused. 232836eca47SPeter Xu */ 233836eca47SPeter Xu pages->num = 0; 234836eca47SPeter Xu pages->block = NULL; 235836eca47SPeter Xu } 236836eca47SPeter Xu 237d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 238d32ca5adSJuan Quintela { 239d32ca5adSJuan Quintela MultiFDInit_t msg = {}; 240cbec7eb7SJuan Quintela size_t size = sizeof(msg); 241d32ca5adSJuan Quintela int ret; 242d32ca5adSJuan Quintela 243d32ca5adSJuan Quintela msg.magic = cpu_to_be32(MULTIFD_MAGIC); 244d32ca5adSJuan Quintela msg.version = cpu_to_be32(MULTIFD_VERSION); 245d32ca5adSJuan Quintela msg.id = p->id; 246d32ca5adSJuan Quintela memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 247d32ca5adSJuan Quintela 248cbec7eb7SJuan Quintela ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 249d32ca5adSJuan Quintela if (ret != 0) { 250d32ca5adSJuan Quintela return -1; 251d32ca5adSJuan Quintela } 252cbec7eb7SJuan Quintela stat64_add(&mig_stats.multifd_bytes, size); 253d32ca5adSJuan Quintela return 0; 254d32ca5adSJuan Quintela } 255d32ca5adSJuan Quintela 256d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 257d32ca5adSJuan Quintela { 258d32ca5adSJuan Quintela MultiFDInit_t msg; 259d32ca5adSJuan Quintela int ret; 260d32ca5adSJuan Quintela 261d32ca5adSJuan Quintela ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 262d32ca5adSJuan Quintela if (ret != 0) { 263d32ca5adSJuan Quintela return -1; 264d32ca5adSJuan Quintela } 265d32ca5adSJuan Quintela 266d32ca5adSJuan Quintela msg.magic = be32_to_cpu(msg.magic); 267d32ca5adSJuan Quintela msg.version = be32_to_cpu(msg.version); 268d32ca5adSJuan Quintela 269d32ca5adSJuan Quintela if (msg.magic != MULTIFD_MAGIC) { 270d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet magic %x " 271d32ca5adSJuan Quintela "expected %x", msg.magic, MULTIFD_MAGIC); 272d32ca5adSJuan Quintela return -1; 273d32ca5adSJuan Quintela } 274d32ca5adSJuan Quintela 275d32ca5adSJuan Quintela if (msg.version != MULTIFD_VERSION) { 27604e11404SJuan Quintela error_setg(errp, "multifd: received packet version %u " 27704e11404SJuan Quintela "expected %u", msg.version, MULTIFD_VERSION); 278d32ca5adSJuan Quintela return -1; 279d32ca5adSJuan Quintela } 280d32ca5adSJuan Quintela 281d32ca5adSJuan Quintela if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 282d32ca5adSJuan Quintela char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 283d32ca5adSJuan Quintela char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 284d32ca5adSJuan Quintela 285d32ca5adSJuan Quintela error_setg(errp, "multifd: received uuid '%s' and expected " 286d32ca5adSJuan Quintela "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 287d32ca5adSJuan Quintela g_free(uuid); 288d32ca5adSJuan Quintela g_free(msg_uuid); 289d32ca5adSJuan Quintela return -1; 290d32ca5adSJuan Quintela } 291d32ca5adSJuan Quintela 292d32ca5adSJuan Quintela if (msg.id > migrate_multifd_channels()) { 293c77b4085SAvihai Horon error_setg(errp, "multifd: received channel id %u is greater than " 294c77b4085SAvihai Horon "number of channels %u", msg.id, migrate_multifd_channels()); 295d32ca5adSJuan Quintela return -1; 296d32ca5adSJuan Quintela } 297d32ca5adSJuan Quintela 298d32ca5adSJuan Quintela return msg.id; 299d32ca5adSJuan Quintela } 300d32ca5adSJuan Quintela 3016074f816SFabiano Rosas static MultiFDPages_t *multifd_pages_init(uint32_t n) 302d32ca5adSJuan Quintela { 303d32ca5adSJuan Quintela MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 304d32ca5adSJuan Quintela 3056074f816SFabiano Rosas pages->allocated = n; 3066074f816SFabiano Rosas pages->offset = g_new0(ram_addr_t, n); 307d32ca5adSJuan Quintela 308d32ca5adSJuan Quintela return pages; 309d32ca5adSJuan Quintela } 310d32ca5adSJuan Quintela 311d32ca5adSJuan Quintela static void multifd_pages_clear(MultiFDPages_t *pages) 312d32ca5adSJuan Quintela { 313836eca47SPeter Xu multifd_pages_reset(pages); 314d32ca5adSJuan Quintela pages->allocated = 0; 315d32ca5adSJuan Quintela g_free(pages->offset); 316d32ca5adSJuan Quintela pages->offset = NULL; 317d32ca5adSJuan Quintela g_free(pages); 318d32ca5adSJuan Quintela } 319d32ca5adSJuan Quintela 32025a1f878SPeter Xu void multifd_send_fill_packet(MultiFDSendParams *p) 321d32ca5adSJuan Quintela { 322d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 323efd8c543SPeter Xu MultiFDPages_t *pages = p->pages; 32498ea497dSPeter Xu uint64_t packet_num; 325d32ca5adSJuan Quintela int i; 326d32ca5adSJuan Quintela 327d32ca5adSJuan Quintela packet->flags = cpu_to_be32(p->flags); 328d32ca5adSJuan Quintela packet->pages_alloc = cpu_to_be32(p->pages->allocated); 329efd8c543SPeter Xu packet->normal_pages = cpu_to_be32(pages->num); 330d32ca5adSJuan Quintela packet->next_packet_size = cpu_to_be32(p->next_packet_size); 33198ea497dSPeter Xu 33298ea497dSPeter Xu packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 33398ea497dSPeter Xu packet->packet_num = cpu_to_be64(packet_num); 334d32ca5adSJuan Quintela 335efd8c543SPeter Xu if (pages->block) { 336efd8c543SPeter Xu strncpy(packet->ramblock, pages->block->idstr, 256); 337d32ca5adSJuan Quintela } 338d32ca5adSJuan Quintela 339efd8c543SPeter Xu for (i = 0; i < pages->num; i++) { 340d32ca5adSJuan Quintela /* there are architectures where ram_addr_t is 32 bit */ 341efd8c543SPeter Xu uint64_t temp = pages->offset[i]; 342d32ca5adSJuan Quintela 343d32ca5adSJuan Quintela packet->offset[i] = cpu_to_be64(temp); 344d32ca5adSJuan Quintela } 34505b7ec18SPeter Xu 34605b7ec18SPeter Xu p->packets_sent++; 347db7e1cc5SPeter Xu p->total_normal_pages += pages->num; 3488a9ef173SPeter Xu 34998ea497dSPeter Xu trace_multifd_send(p->id, packet_num, pages->num, p->flags, 3508a9ef173SPeter Xu p->next_packet_size); 351d32ca5adSJuan Quintela } 352d32ca5adSJuan Quintela 353d32ca5adSJuan Quintela static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 354d32ca5adSJuan Quintela { 355d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 356d32ca5adSJuan Quintela int i; 357d32ca5adSJuan Quintela 358d32ca5adSJuan Quintela packet->magic = be32_to_cpu(packet->magic); 359d32ca5adSJuan Quintela if (packet->magic != MULTIFD_MAGIC) { 360d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 361d32ca5adSJuan Quintela "magic %x and expected magic %x", 362d32ca5adSJuan Quintela packet->magic, MULTIFD_MAGIC); 363d32ca5adSJuan Quintela return -1; 364d32ca5adSJuan Quintela } 365d32ca5adSJuan Quintela 366d32ca5adSJuan Quintela packet->version = be32_to_cpu(packet->version); 367d32ca5adSJuan Quintela if (packet->version != MULTIFD_VERSION) { 368d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 36904e11404SJuan Quintela "version %u and expected version %u", 370d32ca5adSJuan Quintela packet->version, MULTIFD_VERSION); 371d32ca5adSJuan Quintela return -1; 372d32ca5adSJuan Quintela } 373d32ca5adSJuan Quintela 374d32ca5adSJuan Quintela p->flags = be32_to_cpu(packet->flags); 375d32ca5adSJuan Quintela 376d32ca5adSJuan Quintela packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 377d32ca5adSJuan Quintela /* 378d32ca5adSJuan Quintela * If we received a packet that is 100 times bigger than expected 379d32ca5adSJuan Quintela * just stop migration. It is a magic number. 380d32ca5adSJuan Quintela */ 381d6f45ebaSJuan Quintela if (packet->pages_alloc > p->page_count) { 382d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 383cf2d4aa8SJuan Quintela "with size %u and expected a size of %u", 384d6f45ebaSJuan Quintela packet->pages_alloc, p->page_count) ; 385d32ca5adSJuan Quintela return -1; 386d32ca5adSJuan Quintela } 387d32ca5adSJuan Quintela 3888c0ec0b2SJuan Quintela p->normal_num = be32_to_cpu(packet->normal_pages); 389cf2d4aa8SJuan Quintela if (p->normal_num > packet->pages_alloc) { 390d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 39104e11404SJuan Quintela "with %u pages and expected maximum pages are %u", 392cf2d4aa8SJuan Quintela p->normal_num, packet->pages_alloc) ; 393d32ca5adSJuan Quintela return -1; 394d32ca5adSJuan Quintela } 395d32ca5adSJuan Quintela 396d32ca5adSJuan Quintela p->next_packet_size = be32_to_cpu(packet->next_packet_size); 397d32ca5adSJuan Quintela p->packet_num = be64_to_cpu(packet->packet_num); 39805b7ec18SPeter Xu p->packets_recved++; 399db7e1cc5SPeter Xu p->total_normal_pages += p->normal_num; 400d32ca5adSJuan Quintela 4018a9ef173SPeter Xu trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, 4028a9ef173SPeter Xu p->next_packet_size); 4038a9ef173SPeter Xu 404cf2d4aa8SJuan Quintela if (p->normal_num == 0) { 405d32ca5adSJuan Quintela return 0; 406d32ca5adSJuan Quintela } 407d32ca5adSJuan Quintela 408d32ca5adSJuan Quintela /* make sure that ramblock is 0 terminated */ 409d32ca5adSJuan Quintela packet->ramblock[255] = 0; 4105d1d1fcfSLukas Straub p->block = qemu_ram_block_by_name(packet->ramblock); 4115d1d1fcfSLukas Straub if (!p->block) { 412d32ca5adSJuan Quintela error_setg(errp, "multifd: unknown ram block %s", 413d32ca5adSJuan Quintela packet->ramblock); 414d32ca5adSJuan Quintela return -1; 415d32ca5adSJuan Quintela } 416d32ca5adSJuan Quintela 4175d1d1fcfSLukas Straub p->host = p->block->host; 418cf2d4aa8SJuan Quintela for (i = 0; i < p->normal_num; i++) { 419d32ca5adSJuan Quintela uint64_t offset = be64_to_cpu(packet->offset[i]); 420d32ca5adSJuan Quintela 4215d1d1fcfSLukas Straub if (offset > (p->block->used_length - p->page_size)) { 422d32ca5adSJuan Quintela error_setg(errp, "multifd: offset too long %" PRIu64 423d32ca5adSJuan Quintela " (max " RAM_ADDR_FMT ")", 4245d1d1fcfSLukas Straub offset, p->block->used_length); 425d32ca5adSJuan Quintela return -1; 426d32ca5adSJuan Quintela } 427cf2d4aa8SJuan Quintela p->normal[i] = offset; 428d32ca5adSJuan Quintela } 429d32ca5adSJuan Quintela 430d32ca5adSJuan Quintela return 0; 431d32ca5adSJuan Quintela } 432d32ca5adSJuan Quintela 43315f3f21dSPeter Xu static bool multifd_send_should_exit(void) 43415f3f21dSPeter Xu { 43515f3f21dSPeter Xu return qatomic_read(&multifd_send_state->exiting); 43615f3f21dSPeter Xu } 43715f3f21dSPeter Xu 438d32ca5adSJuan Quintela /* 43948c0f5d5SPeter Xu * The migration thread can wait on either of the two semaphores. This 44048c0f5d5SPeter Xu * function can be used to kick the main thread out of waiting on either of 44148c0f5d5SPeter Xu * them. Should mostly only be called when something wrong happened with 44248c0f5d5SPeter Xu * the current multifd send thread. 44348c0f5d5SPeter Xu */ 44448c0f5d5SPeter Xu static void multifd_send_kick_main(MultiFDSendParams *p) 44548c0f5d5SPeter Xu { 44648c0f5d5SPeter Xu qemu_sem_post(&p->sem_sync); 44748c0f5d5SPeter Xu qemu_sem_post(&multifd_send_state->channels_ready); 44848c0f5d5SPeter Xu } 44948c0f5d5SPeter Xu 45048c0f5d5SPeter Xu /* 451d32ca5adSJuan Quintela * How we use multifd_send_state->pages and channel->pages? 452d32ca5adSJuan Quintela * 453d32ca5adSJuan Quintela * We create a pages for each channel, and a main one. Each time that 454d32ca5adSJuan Quintela * we need to send a batch of pages we interchange the ones between 455d32ca5adSJuan Quintela * multifd_send_state and the channel that is sending it. There are 456d32ca5adSJuan Quintela * two reasons for that: 457d32ca5adSJuan Quintela * - to not have to do so many mallocs during migration 458d32ca5adSJuan Quintela * - to make easier to know what to free at the end of migration 459d32ca5adSJuan Quintela * 460d32ca5adSJuan Quintela * This way we always know who is the owner of each "pages" struct, 461d32ca5adSJuan Quintela * and we don't need any locking. It belongs to the migration thread 462d32ca5adSJuan Quintela * or to the channel thread. Switching is safe because the migration 463d32ca5adSJuan Quintela * thread is using the channel mutex when changing it, and the channel 464d32ca5adSJuan Quintela * have to had finish with its own, otherwise pending_job can't be 465d32ca5adSJuan Quintela * false. 4663b40964aSPeter Xu * 4673b40964aSPeter Xu * Returns true if succeed, false otherwise. 468d32ca5adSJuan Quintela */ 4693b40964aSPeter Xu static bool multifd_send_pages(void) 470d32ca5adSJuan Quintela { 471d32ca5adSJuan Quintela int i; 472d32ca5adSJuan Quintela static int next_channel; 473d32ca5adSJuan Quintela MultiFDSendParams *p = NULL; /* make happy gcc */ 474d32ca5adSJuan Quintela MultiFDPages_t *pages = multifd_send_state->pages; 475d32ca5adSJuan Quintela 47615f3f21dSPeter Xu if (multifd_send_should_exit()) { 4773b40964aSPeter Xu return false; 478d32ca5adSJuan Quintela } 479d32ca5adSJuan Quintela 480e3cce9afSPeter Xu /* We wait here, until at least one channel is ready */ 481d32ca5adSJuan Quintela qemu_sem_wait(&multifd_send_state->channels_ready); 482e3cce9afSPeter Xu 4837e89a140SLaurent Vivier /* 4847e89a140SLaurent Vivier * next_channel can remain from a previous migration that was 4857e89a140SLaurent Vivier * using more channels, so ensure it doesn't overflow if the 4867e89a140SLaurent Vivier * limit is lower now. 4877e89a140SLaurent Vivier */ 4887e89a140SLaurent Vivier next_channel %= migrate_multifd_channels(); 489d32ca5adSJuan Quintela for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 49015f3f21dSPeter Xu if (multifd_send_should_exit()) { 4913b40964aSPeter Xu return false; 492d32ca5adSJuan Quintela } 49315f3f21dSPeter Xu p = &multifd_send_state->params[i]; 494e3cce9afSPeter Xu /* 495e3cce9afSPeter Xu * Lockless read to p->pending_job is safe, because only multifd 496e3cce9afSPeter Xu * sender thread can clear it. 497e3cce9afSPeter Xu */ 498f5f48a78SPeter Xu if (qatomic_read(&p->pending_job) == false) { 499d32ca5adSJuan Quintela next_channel = (i + 1) % migrate_multifd_channels(); 500d32ca5adSJuan Quintela break; 501d32ca5adSJuan Quintela } 502d32ca5adSJuan Quintela } 503e3cce9afSPeter Xu 504e3cce9afSPeter Xu /* 505*488c84acSPeter Xu * Make sure we read p->pending_job before all the rest. Pairs with 506*488c84acSPeter Xu * qatomic_store_release() in multifd_send_thread(). 507e3cce9afSPeter Xu */ 508*488c84acSPeter Xu smp_mb_acquire(); 509*488c84acSPeter Xu assert(!p->pages->num); 510d32ca5adSJuan Quintela multifd_send_state->pages = p->pages; 511d32ca5adSJuan Quintela p->pages = pages; 512*488c84acSPeter Xu /* 513*488c84acSPeter Xu * Making sure p->pages is setup before marking pending_job=true. Pairs 514*488c84acSPeter Xu * with the qatomic_load_acquire() in multifd_send_thread(). 515*488c84acSPeter Xu */ 516*488c84acSPeter Xu qatomic_store_release(&p->pending_job, true); 517d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 518d32ca5adSJuan Quintela 5193b40964aSPeter Xu return true; 520d32ca5adSJuan Quintela } 521d32ca5adSJuan Quintela 522f88f86c4SPeter Xu static inline bool multifd_queue_empty(MultiFDPages_t *pages) 523f88f86c4SPeter Xu { 524f88f86c4SPeter Xu return pages->num == 0; 525f88f86c4SPeter Xu } 526f88f86c4SPeter Xu 527f88f86c4SPeter Xu static inline bool multifd_queue_full(MultiFDPages_t *pages) 528f88f86c4SPeter Xu { 529f88f86c4SPeter Xu return pages->num == pages->allocated; 530f88f86c4SPeter Xu } 531f88f86c4SPeter Xu 532f88f86c4SPeter Xu static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 533f88f86c4SPeter Xu { 534f88f86c4SPeter Xu pages->offset[pages->num++] = offset; 535f88f86c4SPeter Xu } 536f88f86c4SPeter Xu 537d6556d17SPeter Xu /* Returns true if enqueue successful, false otherwise */ 538d6556d17SPeter Xu bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 539d32ca5adSJuan Quintela { 540f88f86c4SPeter Xu MultiFDPages_t *pages; 541d32ca5adSJuan Quintela 542f88f86c4SPeter Xu retry: 543f88f86c4SPeter Xu pages = multifd_send_state->pages; 544f88f86c4SPeter Xu 545f88f86c4SPeter Xu /* If the queue is empty, we can already enqueue now */ 546f88f86c4SPeter Xu if (multifd_queue_empty(pages)) { 547d32ca5adSJuan Quintela pages->block = block; 548f88f86c4SPeter Xu multifd_enqueue(pages, offset); 549d6556d17SPeter Xu return true; 550d32ca5adSJuan Quintela } 551d32ca5adSJuan Quintela 552f88f86c4SPeter Xu /* 553f88f86c4SPeter Xu * Not empty, meanwhile we need a flush. It can because of either: 554f88f86c4SPeter Xu * 555f88f86c4SPeter Xu * (1) The page is not on the same ramblock of previous ones, or, 556f88f86c4SPeter Xu * (2) The queue is full. 557f88f86c4SPeter Xu * 558f88f86c4SPeter Xu * After flush, always retry. 559f88f86c4SPeter Xu */ 560f88f86c4SPeter Xu if (pages->block != block || multifd_queue_full(pages)) { 5613b40964aSPeter Xu if (!multifd_send_pages()) { 562d6556d17SPeter Xu return false; 563d32ca5adSJuan Quintela } 564f88f86c4SPeter Xu goto retry; 565d32ca5adSJuan Quintela } 566d32ca5adSJuan Quintela 567f88f86c4SPeter Xu /* Not empty, and we still have space, do it! */ 568f88f86c4SPeter Xu multifd_enqueue(pages, offset); 569d6556d17SPeter Xu return true; 570d32ca5adSJuan Quintela } 571d32ca5adSJuan Quintela 5723ab4441dSPeter Xu /* Multifd send side hit an error; remember it and prepare to quit */ 5733ab4441dSPeter Xu static void multifd_send_set_error(Error *err) 574d32ca5adSJuan Quintela { 57515f3f21dSPeter Xu /* 57615f3f21dSPeter Xu * We don't want to exit each threads twice. Depending on where 57715f3f21dSPeter Xu * we get the error, or if there are two independent errors in two 57815f3f21dSPeter Xu * threads at the same time, we can end calling this function 57915f3f21dSPeter Xu * twice. 58015f3f21dSPeter Xu */ 58115f3f21dSPeter Xu if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 58215f3f21dSPeter Xu return; 58315f3f21dSPeter Xu } 58415f3f21dSPeter Xu 585d32ca5adSJuan Quintela if (err) { 586d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 587d32ca5adSJuan Quintela migrate_set_error(s, err); 588d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 589d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 590d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_DEVICE || 591d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 592d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 593d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 594d32ca5adSJuan Quintela } 595d32ca5adSJuan Quintela } 5963ab4441dSPeter Xu } 597d32ca5adSJuan Quintela 5983ab4441dSPeter Xu static void multifd_send_terminate_threads(void) 5993ab4441dSPeter Xu { 6003ab4441dSPeter Xu int i; 6013ab4441dSPeter Xu 6023ab4441dSPeter Xu trace_multifd_send_terminate_threads(); 6033ab4441dSPeter Xu 6043ab4441dSPeter Xu /* 6053ab4441dSPeter Xu * Tell everyone we're quitting. No xchg() needed here; we simply 6063ab4441dSPeter Xu * always set it. 6073ab4441dSPeter Xu */ 6083ab4441dSPeter Xu qatomic_set(&multifd_send_state->exiting, 1); 60912808db3SPeter Xu 61012808db3SPeter Xu /* 61112808db3SPeter Xu * Firstly, kick all threads out; no matter whether they are just idle, 61212808db3SPeter Xu * or blocked in an IO system call. 61312808db3SPeter Xu */ 614d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 615d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 616d32ca5adSJuan Quintela 617d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 618077fbb59SLi Zhang if (p->c) { 619077fbb59SLi Zhang qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 620077fbb59SLi Zhang } 621d32ca5adSJuan Quintela } 62212808db3SPeter Xu 62312808db3SPeter Xu /* 62412808db3SPeter Xu * Finally recycle all the threads. 62512808db3SPeter Xu * 62612808db3SPeter Xu * TODO: p->running is still buggy, e.g. we can reach here without the 62712808db3SPeter Xu * corresponding multifd_new_send_channel_async() get invoked yet, 62812808db3SPeter Xu * then a new thread can even be created after this function returns. 62912808db3SPeter Xu */ 63012808db3SPeter Xu for (i = 0; i < migrate_multifd_channels(); i++) { 63112808db3SPeter Xu MultiFDSendParams *p = &multifd_send_state->params[i]; 63212808db3SPeter Xu 63312808db3SPeter Xu if (p->running) { 63412808db3SPeter Xu qemu_thread_join(&p->thread); 63512808db3SPeter Xu } 63612808db3SPeter Xu } 637d32ca5adSJuan Quintela } 638d32ca5adSJuan Quintela 6390e92f644SFabiano Rosas static int multifd_send_channel_destroy(QIOChannel *send) 6400e92f644SFabiano Rosas { 6410e92f644SFabiano Rosas return socket_send_channel_destroy(send); 6420e92f644SFabiano Rosas } 6430e92f644SFabiano Rosas 64412808db3SPeter Xu static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 645d32ca5adSJuan Quintela { 64620171ea8SLukas Straub if (p->registered_yank) { 64720171ea8SLukas Straub migration_ioc_unregister_yank(p->c); 64820171ea8SLukas Straub } 6490e92f644SFabiano Rosas multifd_send_channel_destroy(p->c); 650d32ca5adSJuan Quintela p->c = NULL; 651d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem); 652d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem_sync); 653d32ca5adSJuan Quintela g_free(p->name); 654d32ca5adSJuan Quintela p->name = NULL; 655d32ca5adSJuan Quintela multifd_pages_clear(p->pages); 656d32ca5adSJuan Quintela p->pages = NULL; 657d32ca5adSJuan Quintela p->packet_len = 0; 658d32ca5adSJuan Quintela g_free(p->packet); 659d32ca5adSJuan Quintela p->packet = NULL; 660226468baSJuan Quintela g_free(p->iov); 661226468baSJuan Quintela p->iov = NULL; 66212808db3SPeter Xu multifd_send_state->ops->send_cleanup(p, errp); 66312808db3SPeter Xu 66412808db3SPeter Xu return *errp == NULL; 665ab7cbb0bSJuan Quintela } 66612808db3SPeter Xu 66712808db3SPeter Xu static void multifd_send_cleanup_state(void) 66812808db3SPeter Xu { 669d32ca5adSJuan Quintela qemu_sem_destroy(&multifd_send_state->channels_ready); 670d32ca5adSJuan Quintela g_free(multifd_send_state->params); 671d32ca5adSJuan Quintela multifd_send_state->params = NULL; 672d32ca5adSJuan Quintela multifd_pages_clear(multifd_send_state->pages); 673d32ca5adSJuan Quintela multifd_send_state->pages = NULL; 674d32ca5adSJuan Quintela g_free(multifd_send_state); 675d32ca5adSJuan Quintela multifd_send_state = NULL; 676d32ca5adSJuan Quintela } 677d32ca5adSJuan Quintela 678cde85c37SPeter Xu void multifd_send_shutdown(void) 67912808db3SPeter Xu { 68012808db3SPeter Xu int i; 68112808db3SPeter Xu 68212808db3SPeter Xu if (!migrate_multifd()) { 68312808db3SPeter Xu return; 68412808db3SPeter Xu } 68512808db3SPeter Xu 68612808db3SPeter Xu multifd_send_terminate_threads(); 68712808db3SPeter Xu 68812808db3SPeter Xu for (i = 0; i < migrate_multifd_channels(); i++) { 68912808db3SPeter Xu MultiFDSendParams *p = &multifd_send_state->params[i]; 69012808db3SPeter Xu Error *local_err = NULL; 69112808db3SPeter Xu 69212808db3SPeter Xu if (!multifd_send_cleanup_channel(p, &local_err)) { 69312808db3SPeter Xu migrate_set_error(migrate_get_current(), local_err); 69412808db3SPeter Xu error_free(local_err); 69512808db3SPeter Xu } 69612808db3SPeter Xu } 69712808db3SPeter Xu 69812808db3SPeter Xu multifd_send_cleanup_state(); 69912808db3SPeter Xu } 70012808db3SPeter Xu 7014cc47b43SLeonardo Bras static int multifd_zero_copy_flush(QIOChannel *c) 7024cc47b43SLeonardo Bras { 7034cc47b43SLeonardo Bras int ret; 7044cc47b43SLeonardo Bras Error *err = NULL; 7054cc47b43SLeonardo Bras 7064cc47b43SLeonardo Bras ret = qio_channel_flush(c, &err); 7074cc47b43SLeonardo Bras if (ret < 0) { 7084cc47b43SLeonardo Bras error_report_err(err); 7094cc47b43SLeonardo Bras return -1; 7104cc47b43SLeonardo Bras } 7114cc47b43SLeonardo Bras if (ret == 1) { 712aff3f660SJuan Quintela stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 7134cc47b43SLeonardo Bras } 7144cc47b43SLeonardo Bras 7154cc47b43SLeonardo Bras return ret; 7164cc47b43SLeonardo Bras } 7174cc47b43SLeonardo Bras 7189346fa18SFabiano Rosas int multifd_send_sync_main(void) 719d32ca5adSJuan Quintela { 720d32ca5adSJuan Quintela int i; 7215b1d9babSLeonardo Bras bool flush_zero_copy; 722d32ca5adSJuan Quintela 72351b07548SJuan Quintela if (!migrate_multifd()) { 72433d70973SLeonardo Bras return 0; 725d32ca5adSJuan Quintela } 72690a3d2f9SJuan Quintela if (multifd_send_state->pages->num) { 7273b40964aSPeter Xu if (!multifd_send_pages()) { 728d32ca5adSJuan Quintela error_report("%s: multifd_send_pages fail", __func__); 72933d70973SLeonardo Bras return -1; 730d32ca5adSJuan Quintela } 731d32ca5adSJuan Quintela } 7325b1d9babSLeonardo Bras 733b4bc342cSJuan Quintela flush_zero_copy = migrate_zero_copy_send(); 7345b1d9babSLeonardo Bras 735d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 736d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 737d32ca5adSJuan Quintela 73815f3f21dSPeter Xu if (multifd_send_should_exit()) { 73933d70973SLeonardo Bras return -1; 740d32ca5adSJuan Quintela } 741d32ca5adSJuan Quintela 74215f3f21dSPeter Xu trace_multifd_send_sync_main_signal(p->id); 74315f3f21dSPeter Xu 744f5f48a78SPeter Xu /* 745f5f48a78SPeter Xu * We should be the only user so far, so not possible to be set by 746f5f48a78SPeter Xu * others concurrently. 747f5f48a78SPeter Xu */ 748f5f48a78SPeter Xu assert(qatomic_read(&p->pending_sync) == false); 749f5f48a78SPeter Xu qatomic_set(&p->pending_sync, true); 750d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 751d32ca5adSJuan Quintela } 752d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 753d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 754d32ca5adSJuan Quintela 75515f3f21dSPeter Xu if (multifd_send_should_exit()) { 75615f3f21dSPeter Xu return -1; 75715f3f21dSPeter Xu } 75815f3f21dSPeter Xu 759d2026ee1SJuan Quintela qemu_sem_wait(&multifd_send_state->channels_ready); 760d32ca5adSJuan Quintela trace_multifd_send_sync_main_wait(p->id); 761d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 762ebfc5787SZhenzhong Duan 763ebfc5787SZhenzhong Duan if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 764ebfc5787SZhenzhong Duan return -1; 765ebfc5787SZhenzhong Duan } 766d32ca5adSJuan Quintela } 767d32ca5adSJuan Quintela trace_multifd_send_sync_main(multifd_send_state->packet_num); 76833d70973SLeonardo Bras 76933d70973SLeonardo Bras return 0; 770d32ca5adSJuan Quintela } 771d32ca5adSJuan Quintela 772d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque) 773d32ca5adSJuan Quintela { 774d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 7751b1f4ab6SJiang Jiacheng MigrationThread *thread = NULL; 776d32ca5adSJuan Quintela Error *local_err = NULL; 777d32ca5adSJuan Quintela int ret = 0; 778d32ca5adSJuan Quintela 779788fa680SFabiano Rosas thread = migration_threads_add(p->name, qemu_get_thread_id()); 7801b1f4ab6SJiang Jiacheng 781d32ca5adSJuan Quintela trace_multifd_send_thread_start(p->id); 782d32ca5adSJuan Quintela rcu_register_thread(); 783d32ca5adSJuan Quintela 784d32ca5adSJuan Quintela if (multifd_send_initial_packet(p, &local_err) < 0) { 785d32ca5adSJuan Quintela ret = -1; 786d32ca5adSJuan Quintela goto out; 787d32ca5adSJuan Quintela } 788d32ca5adSJuan Quintela 789d32ca5adSJuan Quintela while (true) { 790d2026ee1SJuan Quintela qemu_sem_post(&multifd_send_state->channels_ready); 791d32ca5adSJuan Quintela qemu_sem_wait(&p->sem); 792d32ca5adSJuan Quintela 79315f3f21dSPeter Xu if (multifd_send_should_exit()) { 794d32ca5adSJuan Quintela break; 795d32ca5adSJuan Quintela } 796d32ca5adSJuan Quintela 797*488c84acSPeter Xu /* 798*488c84acSPeter Xu * Read pending_job flag before p->pages. Pairs with the 799*488c84acSPeter Xu * qatomic_store_release() in multifd_send_pages(). 800*488c84acSPeter Xu */ 801*488c84acSPeter Xu if (qatomic_load_acquire(&p->pending_job)) { 802efd8c543SPeter Xu MultiFDPages_t *pages = p->pages; 803d32ca5adSJuan Quintela 804b7dbdd8eSLeonardo Bras p->iovs_num = 0; 80583c560fbSPeter Xu assert(pages->num); 80683c560fbSPeter Xu 80702fb8104SJuan Quintela ret = multifd_send_state->ops->send_prepare(p, &local_err); 808ab7cbb0bSJuan Quintela if (ret != 0) { 809ab7cbb0bSJuan Quintela break; 810ab7cbb0bSJuan Quintela } 81183c560fbSPeter Xu 8125b1d9babSLeonardo Bras ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 8135b1d9babSLeonardo Bras 0, p->write_flags, &local_err); 814d32ca5adSJuan Quintela if (ret != 0) { 815d32ca5adSJuan Quintela break; 816d32ca5adSJuan Quintela } 817d32ca5adSJuan Quintela 81868b6e000SElena Ufimtseva stat64_add(&mig_stats.multifd_bytes, 81968b6e000SElena Ufimtseva p->next_packet_size + p->packet_len); 820836eca47SPeter Xu 821836eca47SPeter Xu multifd_pages_reset(p->pages); 8221618f552SElena Ufimtseva p->next_packet_size = 0; 823*488c84acSPeter Xu 824*488c84acSPeter Xu /* 825*488c84acSPeter Xu * Making sure p->pages is published before saying "we're 826*488c84acSPeter Xu * free". Pairs with the smp_mb_acquire() in 827*488c84acSPeter Xu * multifd_send_pages(). 828*488c84acSPeter Xu */ 829*488c84acSPeter Xu qatomic_store_release(&p->pending_job, false); 830859ebaf3SPeter Xu } else { 831*488c84acSPeter Xu /* 832*488c84acSPeter Xu * If not a normal job, must be a sync request. Note that 833*488c84acSPeter Xu * pending_sync is a standalone flag (unlike pending_job), so 834*488c84acSPeter Xu * it doesn't require explicit memory barriers. 835*488c84acSPeter Xu */ 836859ebaf3SPeter Xu assert(qatomic_read(&p->pending_sync)); 837f5f48a78SPeter Xu p->flags = MULTIFD_FLAG_SYNC; 838f5f48a78SPeter Xu multifd_send_fill_packet(p); 839f5f48a78SPeter Xu ret = qio_channel_write_all(p->c, (void *)p->packet, 840f5f48a78SPeter Xu p->packet_len, &local_err); 841f5f48a78SPeter Xu if (ret != 0) { 842f5f48a78SPeter Xu break; 843d32ca5adSJuan Quintela } 844f5f48a78SPeter Xu /* p->next_packet_size will always be zero for a SYNC packet */ 845f5f48a78SPeter Xu stat64_add(&mig_stats.multifd_bytes, p->packet_len); 846f5f48a78SPeter Xu p->flags = 0; 847f5f48a78SPeter Xu qatomic_set(&p->pending_sync, false); 848f5f48a78SPeter Xu qemu_sem_post(&p->sem_sync); 849d32ca5adSJuan Quintela } 850d32ca5adSJuan Quintela } 851d32ca5adSJuan Quintela 852d32ca5adSJuan Quintela out: 853ee8a7c9cSFabiano Rosas if (ret) { 854ee8a7c9cSFabiano Rosas assert(local_err); 855d32ca5adSJuan Quintela trace_multifd_send_error(p->id); 8563ab4441dSPeter Xu multifd_send_set_error(local_err); 85748c0f5d5SPeter Xu multifd_send_kick_main(p); 858ee8a7c9cSFabiano Rosas error_free(local_err); 859d32ca5adSJuan Quintela } 860d32ca5adSJuan Quintela 861d32ca5adSJuan Quintela p->running = false; 862d32ca5adSJuan Quintela rcu_unregister_thread(); 863788fa680SFabiano Rosas migration_threads_remove(thread); 86405b7ec18SPeter Xu trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages); 865d32ca5adSJuan Quintela 866d32ca5adSJuan Quintela return NULL; 867d32ca5adSJuan Quintela } 868d32ca5adSJuan Quintela 86929647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p, 87029647140SChuan Zheng QIOChannel *ioc, 871967e3889SFabiano Rosas Error **errp); 87229647140SChuan Zheng 87329647140SChuan Zheng static void multifd_tls_outgoing_handshake(QIOTask *task, 87429647140SChuan Zheng gpointer opaque) 87529647140SChuan Zheng { 87629647140SChuan Zheng MultiFDSendParams *p = opaque; 87729647140SChuan Zheng QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 87829647140SChuan Zheng Error *err = NULL; 87929647140SChuan Zheng 880967e3889SFabiano Rosas if (!qio_task_propagate_error(task, &err)) { 881894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_complete(ioc); 882967e3889SFabiano Rosas if (multifd_channel_connect(p, ioc, &err)) { 883967e3889SFabiano Rosas return; 884967e3889SFabiano Rosas } 885894f0214SChuan Zheng } 886fca67642SHao Wang 887967e3889SFabiano Rosas trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); 888967e3889SFabiano Rosas 8893ab4441dSPeter Xu multifd_send_set_error(err); 89048c0f5d5SPeter Xu multifd_send_kick_main(p); 8916ae208ceSAvihai Horon error_free(err); 892fca67642SHao Wang } 89329647140SChuan Zheng 894a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque) 895a1af605bSChuan Zheng { 896a1af605bSChuan Zheng MultiFDSendParams *p = opaque; 897a1af605bSChuan Zheng QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); 898a1af605bSChuan Zheng 899a1af605bSChuan Zheng qio_channel_tls_handshake(tioc, 900a1af605bSChuan Zheng multifd_tls_outgoing_handshake, 901a1af605bSChuan Zheng p, 902a1af605bSChuan Zheng NULL, 903a1af605bSChuan Zheng NULL); 904a1af605bSChuan Zheng return NULL; 905a1af605bSChuan Zheng } 906a1af605bSChuan Zheng 907967e3889SFabiano Rosas static bool multifd_tls_channel_connect(MultiFDSendParams *p, 90829647140SChuan Zheng QIOChannel *ioc, 90929647140SChuan Zheng Error **errp) 91029647140SChuan Zheng { 91129647140SChuan Zheng MigrationState *s = migrate_get_current(); 9127f692ec7SPeter Xu const char *hostname = s->hostname; 91329647140SChuan Zheng QIOChannelTLS *tioc; 91429647140SChuan Zheng 9150deb7e9bSJuan Quintela tioc = migration_tls_client_create(ioc, hostname, errp); 91629647140SChuan Zheng if (!tioc) { 917967e3889SFabiano Rosas return false; 91829647140SChuan Zheng } 91929647140SChuan Zheng 9209e842408SChuan Zheng object_unref(OBJECT(ioc)); 921894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 92229647140SChuan Zheng qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 923a1af605bSChuan Zheng p->c = QIO_CHANNEL(tioc); 924a1af605bSChuan Zheng qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", 925a1af605bSChuan Zheng multifd_tls_handshake_thread, p, 926a1af605bSChuan Zheng QEMU_THREAD_JOINABLE); 927967e3889SFabiano Rosas return true; 92829647140SChuan Zheng } 92929647140SChuan Zheng 93029647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p, 93129647140SChuan Zheng QIOChannel *ioc, 932967e3889SFabiano Rosas Error **errp) 93329647140SChuan Zheng { 934894f0214SChuan Zheng trace_multifd_set_outgoing_channel( 9357f692ec7SPeter Xu ioc, object_get_typename(OBJECT(ioc)), 936967e3889SFabiano Rosas migrate_get_current()->hostname); 937894f0214SChuan Zheng 93885a8578eSPeter Xu if (migrate_channel_requires_tls_upgrade(ioc)) { 93929647140SChuan Zheng /* 94029647140SChuan Zheng * tls_channel_connect will call back to this 94129647140SChuan Zheng * function after the TLS handshake, 94229647140SChuan Zheng * so we mustn't call multifd_send_thread until then 94329647140SChuan Zheng */ 944967e3889SFabiano Rosas return multifd_tls_channel_connect(p, ioc, errp); 945a4395f5dSAvihai Horon } 946967e3889SFabiano Rosas 94720171ea8SLukas Straub migration_ioc_register_yank(ioc); 94820171ea8SLukas Straub p->registered_yank = true; 94929647140SChuan Zheng p->c = ioc; 95029647140SChuan Zheng qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 95129647140SChuan Zheng QEMU_THREAD_JOINABLE); 952a339149aSHao Wang return true; 95329647140SChuan Zheng } 95429647140SChuan Zheng 955d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 956d32ca5adSJuan Quintela { 957d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 9580e92f644SFabiano Rosas QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 959d32ca5adSJuan Quintela Error *local_err = NULL; 960d32ca5adSJuan Quintela 961d32ca5adSJuan Quintela trace_multifd_new_send_channel_async(p->id); 962bca762c2SLi Zhang if (!qio_task_propagate_error(task, &local_err)) { 9630a08c794SFabiano Rosas qio_channel_set_delay(ioc, false); 964d32ca5adSJuan Quintela p->running = true; 965967e3889SFabiano Rosas if (multifd_channel_connect(p, ioc, &local_err)) { 96603c7a42dSChuan Zheng return; 967d32ca5adSJuan Quintela } 968bca762c2SLi Zhang } 96903c7a42dSChuan Zheng 970967e3889SFabiano Rosas trace_multifd_new_send_channel_async_error(p->id, local_err); 9713ab4441dSPeter Xu multifd_send_set_error(local_err); 97215f3f21dSPeter Xu multifd_send_kick_main(p); 97315f3f21dSPeter Xu object_unref(OBJECT(ioc)); 97415f3f21dSPeter Xu error_free(local_err); 9750e92f644SFabiano Rosas } 9760e92f644SFabiano Rosas 9770e92f644SFabiano Rosas static void multifd_new_send_channel_create(gpointer opaque) 9780e92f644SFabiano Rosas { 9790e92f644SFabiano Rosas socket_send_channel_create(multifd_new_send_channel_async, opaque); 980d32ca5adSJuan Quintela } 981d32ca5adSJuan Quintela 982cde85c37SPeter Xu int multifd_send_setup(Error **errp) 983d32ca5adSJuan Quintela { 984d32ca5adSJuan Quintela int thread_count; 985d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 986d32ca5adSJuan Quintela uint8_t i; 987d32ca5adSJuan Quintela 98851b07548SJuan Quintela if (!migrate_multifd()) { 989d32ca5adSJuan Quintela return 0; 990d32ca5adSJuan Quintela } 991b7acd657SLi Zhijian 992d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 993d32ca5adSJuan Quintela multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 994d32ca5adSJuan Quintela multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 995d32ca5adSJuan Quintela multifd_send_state->pages = multifd_pages_init(page_count); 996d32ca5adSJuan Quintela qemu_sem_init(&multifd_send_state->channels_ready, 0); 997d73415a3SStefan Hajnoczi qatomic_set(&multifd_send_state->exiting, 0); 998ab7cbb0bSJuan Quintela multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 999d32ca5adSJuan Quintela 1000d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 1001d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 1002d32ca5adSJuan Quintela 1003d32ca5adSJuan Quintela qemu_sem_init(&p->sem, 0); 1004d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 1005d32ca5adSJuan Quintela p->id = i; 1006d32ca5adSJuan Quintela p->pages = multifd_pages_init(page_count); 1007d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 1008d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 1009d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 1010d32ca5adSJuan Quintela p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1011d32ca5adSJuan Quintela p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1012d32ca5adSJuan Quintela p->name = g_strdup_printf("multifdsend_%d", i); 1013d48c3a04SJuan Quintela /* We need one extra place for the packet header */ 1014d48c3a04SJuan Quintela p->iov = g_new0(struct iovec, page_count + 1); 1015ddec20f8SJuan Quintela p->page_size = qemu_target_page_size(); 1016d6f45ebaSJuan Quintela p->page_count = page_count; 10175b1d9babSLeonardo Bras p->write_flags = 0; 10180e92f644SFabiano Rosas multifd_new_send_channel_create(p); 1019d32ca5adSJuan Quintela } 1020ab7cbb0bSJuan Quintela 1021ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 1022ab7cbb0bSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 1023ab7cbb0bSJuan Quintela int ret; 1024ab7cbb0bSJuan Quintela 10253fc58efaSAvihai Horon ret = multifd_send_state->ops->send_setup(p, errp); 1026ab7cbb0bSJuan Quintela if (ret) { 1027ab7cbb0bSJuan Quintela return ret; 1028ab7cbb0bSJuan Quintela } 1029ab7cbb0bSJuan Quintela } 1030d32ca5adSJuan Quintela return 0; 1031d32ca5adSJuan Quintela } 1032d32ca5adSJuan Quintela 1033d32ca5adSJuan Quintela struct { 1034d32ca5adSJuan Quintela MultiFDRecvParams *params; 1035d32ca5adSJuan Quintela /* number of created threads */ 1036d32ca5adSJuan Quintela int count; 1037d32ca5adSJuan Quintela /* syncs main thread and channels */ 1038d32ca5adSJuan Quintela QemuSemaphore sem_sync; 1039d32ca5adSJuan Quintela /* global number of generated multifd packets */ 1040d32ca5adSJuan Quintela uint64_t packet_num; 1041ab7cbb0bSJuan Quintela /* multifd ops */ 1042ab7cbb0bSJuan Quintela MultiFDMethods *ops; 1043d32ca5adSJuan Quintela } *multifd_recv_state; 1044d32ca5adSJuan Quintela 1045d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err) 1046d32ca5adSJuan Quintela { 1047d32ca5adSJuan Quintela int i; 1048d32ca5adSJuan Quintela 1049d32ca5adSJuan Quintela trace_multifd_recv_terminate_threads(err != NULL); 1050d32ca5adSJuan Quintela 1051d32ca5adSJuan Quintela if (err) { 1052d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 1053d32ca5adSJuan Quintela migrate_set_error(s, err); 1054d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 1055d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 1056d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 1057d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 1058d32ca5adSJuan Quintela } 1059d32ca5adSJuan Quintela } 1060d32ca5adSJuan Quintela 1061d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1062d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1063d32ca5adSJuan Quintela 1064d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1065d32ca5adSJuan Quintela p->quit = true; 1066d32ca5adSJuan Quintela /* 1067d32ca5adSJuan Quintela * We could arrive here for two reasons: 1068d32ca5adSJuan Quintela * - normal quit, i.e. everything went fine, just finished 1069d32ca5adSJuan Quintela * - error quit: We close the channels so the channel threads 1070d32ca5adSJuan Quintela * finish the qio_channel_read_all_eof() 1071d32ca5adSJuan Quintela */ 1072d32ca5adSJuan Quintela if (p->c) { 1073d32ca5adSJuan Quintela qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1074d32ca5adSJuan Quintela } 1075d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1076d32ca5adSJuan Quintela } 1077d32ca5adSJuan Quintela } 1078d32ca5adSJuan Quintela 1079cde85c37SPeter Xu void multifd_recv_shutdown(void) 1080cfc3bcf3SLeonardo Bras { 108151b07548SJuan Quintela if (migrate_multifd()) { 1082cfc3bcf3SLeonardo Bras multifd_recv_terminate_threads(NULL); 1083cfc3bcf3SLeonardo Bras } 1084cfc3bcf3SLeonardo Bras } 1085cfc3bcf3SLeonardo Bras 10865e6ea8a1SPeter Xu static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 10875e6ea8a1SPeter Xu { 10885e6ea8a1SPeter Xu migration_ioc_unregister_yank(p->c); 10895e6ea8a1SPeter Xu object_unref(OBJECT(p->c)); 10905e6ea8a1SPeter Xu p->c = NULL; 10915e6ea8a1SPeter Xu qemu_mutex_destroy(&p->mutex); 10925e6ea8a1SPeter Xu qemu_sem_destroy(&p->sem_sync); 10935e6ea8a1SPeter Xu g_free(p->name); 10945e6ea8a1SPeter Xu p->name = NULL; 10955e6ea8a1SPeter Xu p->packet_len = 0; 10965e6ea8a1SPeter Xu g_free(p->packet); 10975e6ea8a1SPeter Xu p->packet = NULL; 10985e6ea8a1SPeter Xu g_free(p->iov); 10995e6ea8a1SPeter Xu p->iov = NULL; 11005e6ea8a1SPeter Xu g_free(p->normal); 11015e6ea8a1SPeter Xu p->normal = NULL; 11025e6ea8a1SPeter Xu multifd_recv_state->ops->recv_cleanup(p); 11035e6ea8a1SPeter Xu } 11045e6ea8a1SPeter Xu 11055e6ea8a1SPeter Xu static void multifd_recv_cleanup_state(void) 11065e6ea8a1SPeter Xu { 11075e6ea8a1SPeter Xu qemu_sem_destroy(&multifd_recv_state->sem_sync); 11085e6ea8a1SPeter Xu g_free(multifd_recv_state->params); 11095e6ea8a1SPeter Xu multifd_recv_state->params = NULL; 11105e6ea8a1SPeter Xu g_free(multifd_recv_state); 11115e6ea8a1SPeter Xu multifd_recv_state = NULL; 11125e6ea8a1SPeter Xu } 11135e6ea8a1SPeter Xu 1114cde85c37SPeter Xu void multifd_recv_cleanup(void) 1115d32ca5adSJuan Quintela { 1116d32ca5adSJuan Quintela int i; 1117d32ca5adSJuan Quintela 111851b07548SJuan Quintela if (!migrate_multifd()) { 1119e5bac1f5SLeonardo Bras return; 1120d32ca5adSJuan Quintela } 1121d32ca5adSJuan Quintela multifd_recv_terminate_threads(NULL); 1122d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1123d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1124d32ca5adSJuan Quintela 1125d32ca5adSJuan Quintela if (p->running) { 1126d32ca5adSJuan Quintela /* 1127d32ca5adSJuan Quintela * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 1128d32ca5adSJuan Quintela * however try to wakeup it without harm in cleanup phase. 1129d32ca5adSJuan Quintela */ 1130d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 1131d32ca5adSJuan Quintela } 113210351fbaSLeonardo Bras 113310351fbaSLeonardo Bras qemu_thread_join(&p->thread); 1134d32ca5adSJuan Quintela } 1135d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 11365e6ea8a1SPeter Xu multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1137d32ca5adSJuan Quintela } 11385e6ea8a1SPeter Xu multifd_recv_cleanup_state(); 1139d32ca5adSJuan Quintela } 1140d32ca5adSJuan Quintela 1141d32ca5adSJuan Quintela void multifd_recv_sync_main(void) 1142d32ca5adSJuan Quintela { 1143d32ca5adSJuan Quintela int i; 1144d32ca5adSJuan Quintela 114551b07548SJuan Quintela if (!migrate_multifd()) { 1146d32ca5adSJuan Quintela return; 1147d32ca5adSJuan Quintela } 1148d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1149d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1150d32ca5adSJuan Quintela 1151d32ca5adSJuan Quintela trace_multifd_recv_sync_main_wait(p->id); 1152d32ca5adSJuan Quintela qemu_sem_wait(&multifd_recv_state->sem_sync); 1153d32ca5adSJuan Quintela } 1154d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1155d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1156d32ca5adSJuan Quintela 11576e8a355dSDaniel Brodsky WITH_QEMU_LOCK_GUARD(&p->mutex) { 1158d32ca5adSJuan Quintela if (multifd_recv_state->packet_num < p->packet_num) { 1159d32ca5adSJuan Quintela multifd_recv_state->packet_num = p->packet_num; 1160d32ca5adSJuan Quintela } 11616e8a355dSDaniel Brodsky } 1162d32ca5adSJuan Quintela trace_multifd_recv_sync_main_signal(p->id); 1163d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 1164d32ca5adSJuan Quintela } 1165d32ca5adSJuan Quintela trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1166d32ca5adSJuan Quintela } 1167d32ca5adSJuan Quintela 1168d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque) 1169d32ca5adSJuan Quintela { 1170d32ca5adSJuan Quintela MultiFDRecvParams *p = opaque; 1171d32ca5adSJuan Quintela Error *local_err = NULL; 1172d32ca5adSJuan Quintela int ret; 1173d32ca5adSJuan Quintela 1174d32ca5adSJuan Quintela trace_multifd_recv_thread_start(p->id); 1175d32ca5adSJuan Quintela rcu_register_thread(); 1176d32ca5adSJuan Quintela 1177d32ca5adSJuan Quintela while (true) { 1178d32ca5adSJuan Quintela uint32_t flags; 1179d32ca5adSJuan Quintela 1180d32ca5adSJuan Quintela if (p->quit) { 1181d32ca5adSJuan Quintela break; 1182d32ca5adSJuan Quintela } 1183d32ca5adSJuan Quintela 1184d32ca5adSJuan Quintela ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1185d32ca5adSJuan Quintela p->packet_len, &local_err); 1186bca762c2SLi Zhang if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1187d32ca5adSJuan Quintela break; 1188d32ca5adSJuan Quintela } 1189d32ca5adSJuan Quintela 1190d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1191d32ca5adSJuan Quintela ret = multifd_recv_unfill_packet(p, &local_err); 1192d32ca5adSJuan Quintela if (ret) { 1193d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1194d32ca5adSJuan Quintela break; 1195d32ca5adSJuan Quintela } 1196d32ca5adSJuan Quintela 1197d32ca5adSJuan Quintela flags = p->flags; 1198ab7cbb0bSJuan Quintela /* recv methods don't know how to handle the SYNC flag */ 1199ab7cbb0bSJuan Quintela p->flags &= ~MULTIFD_FLAG_SYNC; 1200d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1201d32ca5adSJuan Quintela 1202cf2d4aa8SJuan Quintela if (p->normal_num) { 120340a4bfe9SJuan Quintela ret = multifd_recv_state->ops->recv_pages(p, &local_err); 1204d32ca5adSJuan Quintela if (ret != 0) { 1205d32ca5adSJuan Quintela break; 1206d32ca5adSJuan Quintela } 1207d32ca5adSJuan Quintela } 1208d32ca5adSJuan Quintela 1209d32ca5adSJuan Quintela if (flags & MULTIFD_FLAG_SYNC) { 1210d32ca5adSJuan Quintela qemu_sem_post(&multifd_recv_state->sem_sync); 1211d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 1212d32ca5adSJuan Quintela } 1213d32ca5adSJuan Quintela } 1214d32ca5adSJuan Quintela 1215d32ca5adSJuan Quintela if (local_err) { 1216d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 121713f2cb21SPan Nengyuan error_free(local_err); 1218d32ca5adSJuan Quintela } 1219d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1220d32ca5adSJuan Quintela p->running = false; 1221d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1222d32ca5adSJuan Quintela 1223d32ca5adSJuan Quintela rcu_unregister_thread(); 122405b7ec18SPeter Xu trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages); 1225d32ca5adSJuan Quintela 1226d32ca5adSJuan Quintela return NULL; 1227d32ca5adSJuan Quintela } 1228d32ca5adSJuan Quintela 1229cde85c37SPeter Xu int multifd_recv_setup(Error **errp) 1230d32ca5adSJuan Quintela { 1231d32ca5adSJuan Quintela int thread_count; 1232d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1233d32ca5adSJuan Quintela uint8_t i; 1234d32ca5adSJuan Quintela 12356720c2b3Smanish.mishra /* 12366720c2b3Smanish.mishra * Return successfully if multiFD recv state is already initialised 12376720c2b3Smanish.mishra * or multiFD is not enabled. 12386720c2b3Smanish.mishra */ 123951b07548SJuan Quintela if (multifd_recv_state || !migrate_multifd()) { 1240d32ca5adSJuan Quintela return 0; 1241d32ca5adSJuan Quintela } 12426720c2b3Smanish.mishra 1243d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 1244d32ca5adSJuan Quintela multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1245d32ca5adSJuan Quintela multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1246d73415a3SStefan Hajnoczi qatomic_set(&multifd_recv_state->count, 0); 1247d32ca5adSJuan Quintela qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1248ab7cbb0bSJuan Quintela multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1249d32ca5adSJuan Quintela 1250d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 1251d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1252d32ca5adSJuan Quintela 1253d32ca5adSJuan Quintela qemu_mutex_init(&p->mutex); 1254d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 1255d32ca5adSJuan Quintela p->quit = false; 1256d32ca5adSJuan Quintela p->id = i; 1257d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 1258d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 1259d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 1260d32ca5adSJuan Quintela p->name = g_strdup_printf("multifdrecv_%d", i); 1261226468baSJuan Quintela p->iov = g_new0(struct iovec, page_count); 1262cf2d4aa8SJuan Quintela p->normal = g_new0(ram_addr_t, page_count); 1263d6f45ebaSJuan Quintela p->page_count = page_count; 1264ddec20f8SJuan Quintela p->page_size = qemu_target_page_size(); 1265d32ca5adSJuan Quintela } 1266ab7cbb0bSJuan Quintela 1267ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 1268ab7cbb0bSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1269ab7cbb0bSJuan Quintela int ret; 1270ab7cbb0bSJuan Quintela 12713fc58efaSAvihai Horon ret = multifd_recv_state->ops->recv_setup(p, errp); 1272ab7cbb0bSJuan Quintela if (ret) { 1273ab7cbb0bSJuan Quintela return ret; 1274ab7cbb0bSJuan Quintela } 1275ab7cbb0bSJuan Quintela } 1276d32ca5adSJuan Quintela return 0; 1277d32ca5adSJuan Quintela } 1278d32ca5adSJuan Quintela 1279d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void) 1280d32ca5adSJuan Quintela { 1281d32ca5adSJuan Quintela int thread_count = migrate_multifd_channels(); 1282d32ca5adSJuan Quintela 128351b07548SJuan Quintela if (!migrate_multifd()) { 1284d32ca5adSJuan Quintela return true; 1285d32ca5adSJuan Quintela } 1286d32ca5adSJuan Quintela 1287a59136f3SDr. David Alan Gilbert if (!multifd_recv_state) { 1288a59136f3SDr. David Alan Gilbert /* Called before any connections created */ 1289a59136f3SDr. David Alan Gilbert return false; 1290a59136f3SDr. David Alan Gilbert } 1291a59136f3SDr. David Alan Gilbert 1292d73415a3SStefan Hajnoczi return thread_count == qatomic_read(&multifd_recv_state->count); 1293d32ca5adSJuan Quintela } 1294d32ca5adSJuan Quintela 1295d32ca5adSJuan Quintela /* 1296d32ca5adSJuan Quintela * Try to receive all multifd channels to get ready for the migration. 12976720c2b3Smanish.mishra * Sets @errp when failing to receive the current channel. 1298d32ca5adSJuan Quintela */ 12996720c2b3Smanish.mishra void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1300d32ca5adSJuan Quintela { 1301d32ca5adSJuan Quintela MultiFDRecvParams *p; 1302d32ca5adSJuan Quintela Error *local_err = NULL; 1303d32ca5adSJuan Quintela int id; 1304d32ca5adSJuan Quintela 1305d32ca5adSJuan Quintela id = multifd_recv_initial_packet(ioc, &local_err); 1306d32ca5adSJuan Quintela if (id < 0) { 1307d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1308d32ca5adSJuan Quintela error_propagate_prepend(errp, local_err, 1309d32ca5adSJuan Quintela "failed to receive packet" 1310d32ca5adSJuan Quintela " via multifd channel %d: ", 1311d73415a3SStefan Hajnoczi qatomic_read(&multifd_recv_state->count)); 13126720c2b3Smanish.mishra return; 1313d32ca5adSJuan Quintela } 1314d32ca5adSJuan Quintela trace_multifd_recv_new_channel(id); 1315d32ca5adSJuan Quintela 1316d32ca5adSJuan Quintela p = &multifd_recv_state->params[id]; 1317d32ca5adSJuan Quintela if (p->c != NULL) { 1318d32ca5adSJuan Quintela error_setg(&local_err, "multifd: received id '%d' already setup'", 1319d32ca5adSJuan Quintela id); 1320d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1321d32ca5adSJuan Quintela error_propagate(errp, local_err); 13226720c2b3Smanish.mishra return; 1323d32ca5adSJuan Quintela } 1324d32ca5adSJuan Quintela p->c = ioc; 1325d32ca5adSJuan Quintela object_ref(OBJECT(ioc)); 1326d32ca5adSJuan Quintela 1327d32ca5adSJuan Quintela p->running = true; 1328d32ca5adSJuan Quintela qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1329d32ca5adSJuan Quintela QEMU_THREAD_JOINABLE); 1330d73415a3SStefan Hajnoczi qatomic_inc(&multifd_recv_state->count); 1331d32ca5adSJuan Quintela } 1332