1d32ca5adSJuan Quintela /* 2d32ca5adSJuan Quintela * Multifd common code 3d32ca5adSJuan Quintela * 4d32ca5adSJuan Quintela * Copyright (c) 2019-2020 Red Hat Inc 5d32ca5adSJuan Quintela * 6d32ca5adSJuan Quintela * Authors: 7d32ca5adSJuan Quintela * Juan Quintela <quintela@redhat.com> 8d32ca5adSJuan Quintela * 9d32ca5adSJuan Quintela * This work is licensed under the terms of the GNU GPL, version 2 or later. 10d32ca5adSJuan Quintela * See the COPYING file in the top-level directory. 11d32ca5adSJuan Quintela */ 12d32ca5adSJuan Quintela 13d32ca5adSJuan Quintela #include "qemu/osdep.h" 14d32ca5adSJuan Quintela #include "qemu/rcu.h" 15d32ca5adSJuan Quintela #include "exec/target_page.h" 16d32ca5adSJuan Quintela #include "sysemu/sysemu.h" 17d32ca5adSJuan Quintela #include "exec/ramblock.h" 18d32ca5adSJuan Quintela #include "qemu/error-report.h" 19d32ca5adSJuan Quintela #include "qapi/error.h" 20decdc767SFabiano Rosas #include "fd.h" 21b7b03eb6SFabiano Rosas #include "file.h" 22d32ca5adSJuan Quintela #include "migration.h" 23947701ccSJuan Quintela #include "migration-stats.h" 24d32ca5adSJuan Quintela #include "socket.h" 2529647140SChuan Zheng #include "tls.h" 26d32ca5adSJuan Quintela #include "qemu-file.h" 27d32ca5adSJuan Quintela #include "trace.h" 28d32ca5adSJuan Quintela #include "multifd.h" 291b1f4ab6SJiang Jiacheng #include "threadinfo.h" 30b4bc342cSJuan Quintela #include "options.h" 31b5eea99eSLukas Straub #include "qemu/yank.h" 32b7b03eb6SFabiano Rosas #include "io/channel-file.h" 33b5eea99eSLukas Straub #include "io/channel-socket.h" 341a92d6d5SLukas Straub #include "yank_functions.h" 35b5eea99eSLukas Straub 36d32ca5adSJuan Quintela /* Multiple fd's */ 37d32ca5adSJuan Quintela 38d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U 39d32ca5adSJuan Quintela #define MULTIFD_VERSION 1 40d32ca5adSJuan Quintela 41d32ca5adSJuan Quintela typedef struct { 42d32ca5adSJuan Quintela uint32_t magic; 43d32ca5adSJuan Quintela uint32_t version; 44d32ca5adSJuan Quintela unsigned char uuid[16]; /* QemuUUID */ 45d32ca5adSJuan Quintela uint8_t id; 46d32ca5adSJuan Quintela uint8_t unused1[7]; /* Reserved for future use */ 47d32ca5adSJuan Quintela uint64_t unused2[4]; /* Reserved for future use */ 48d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t; 49d32ca5adSJuan Quintela 5098ea497dSPeter Xu struct { 5198ea497dSPeter Xu MultiFDSendParams *params; 5298ea497dSPeter Xu /* array of pages to sent */ 5398ea497dSPeter Xu MultiFDPages_t *pages; 5498ea497dSPeter Xu /* 5598ea497dSPeter Xu * Global number of generated multifd packets. 5698ea497dSPeter Xu * 5798ea497dSPeter Xu * Note that we used 'uintptr_t' because it'll naturally support atomic 5898ea497dSPeter Xu * operations on both 32bit / 64 bits hosts. It means on 32bit systems 5998ea497dSPeter Xu * multifd will overflow the packet_num easier, but that should be 6098ea497dSPeter Xu * fine. 6198ea497dSPeter Xu * 6298ea497dSPeter Xu * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 6398ea497dSPeter Xu * hosts, however so far it does not support atomic fetch_add() yet. 6498ea497dSPeter Xu * Make it easy for now. 6598ea497dSPeter Xu */ 6698ea497dSPeter Xu uintptr_t packet_num; 6793fa9dc2SFabiano Rosas /* 6893fa9dc2SFabiano Rosas * Synchronization point past which no more channels will be 6993fa9dc2SFabiano Rosas * created. 7093fa9dc2SFabiano Rosas */ 7193fa9dc2SFabiano Rosas QemuSemaphore channels_created; 7298ea497dSPeter Xu /* send channels ready */ 7398ea497dSPeter Xu QemuSemaphore channels_ready; 7498ea497dSPeter Xu /* 7598ea497dSPeter Xu * Have we already run terminate threads. There is a race when it 7698ea497dSPeter Xu * happens that we got one error while we are exiting. 7798ea497dSPeter Xu * We will use atomic operations. Only valid values are 0 and 1. 7898ea497dSPeter Xu */ 7998ea497dSPeter Xu int exiting; 8098ea497dSPeter Xu /* multifd ops */ 8198ea497dSPeter Xu MultiFDMethods *ops; 8298ea497dSPeter Xu } *multifd_send_state; 8398ea497dSPeter Xu 8411dd7be5SFabiano Rosas struct { 8511dd7be5SFabiano Rosas MultiFDRecvParams *params; 86d117ed06SFabiano Rosas MultiFDRecvData *data; 8711dd7be5SFabiano Rosas /* number of created threads */ 8811dd7be5SFabiano Rosas int count; 89d117ed06SFabiano Rosas /* 90d117ed06SFabiano Rosas * This is always posted by the recv threads, the migration thread 91d117ed06SFabiano Rosas * uses it to wait for recv threads to finish assigned tasks. 92d117ed06SFabiano Rosas */ 9311dd7be5SFabiano Rosas QemuSemaphore sem_sync; 9411dd7be5SFabiano Rosas /* global number of generated multifd packets */ 9511dd7be5SFabiano Rosas uint64_t packet_num; 9611dd7be5SFabiano Rosas int exiting; 9711dd7be5SFabiano Rosas /* multifd ops */ 9811dd7be5SFabiano Rosas MultiFDMethods *ops; 9911dd7be5SFabiano Rosas } *multifd_recv_state; 10011dd7be5SFabiano Rosas 10106833d83SFabiano Rosas static bool multifd_use_packets(void) 10206833d83SFabiano Rosas { 10306833d83SFabiano Rosas return !migrate_mapped_ram(); 10406833d83SFabiano Rosas } 10506833d83SFabiano Rosas 106a8a3e710SFabiano Rosas void multifd_send_channel_created(void) 107a8a3e710SFabiano Rosas { 108a8a3e710SFabiano Rosas qemu_sem_post(&multifd_send_state->channels_created); 109a8a3e710SFabiano Rosas } 110a8a3e710SFabiano Rosas 111f427d90bSFabiano Rosas static void multifd_set_file_bitmap(MultiFDSendParams *p) 112f427d90bSFabiano Rosas { 113f427d90bSFabiano Rosas MultiFDPages_t *pages = p->pages; 114f427d90bSFabiano Rosas 115f427d90bSFabiano Rosas assert(pages->block); 116f427d90bSFabiano Rosas 117f427d90bSFabiano Rosas for (int i = 0; i < p->pages->num; i++) { 118f427d90bSFabiano Rosas ramblock_set_file_bmap_atomic(pages->block, pages->offset[i]); 119f427d90bSFabiano Rosas } 120f427d90bSFabiano Rosas } 121f427d90bSFabiano Rosas 122ab7cbb0bSJuan Quintela /* Multifd without compression */ 123ab7cbb0bSJuan Quintela 124ab7cbb0bSJuan Quintela /** 125ab7cbb0bSJuan Quintela * nocomp_send_setup: setup send side 126ab7cbb0bSJuan Quintela * 127ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 128ab7cbb0bSJuan Quintela * @errp: pointer to an error 129ab7cbb0bSJuan Quintela */ 130ab7cbb0bSJuan Quintela static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 131ab7cbb0bSJuan Quintela { 13225a1f878SPeter Xu if (migrate_zero_copy_send()) { 13325a1f878SPeter Xu p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 13425a1f878SPeter Xu } 13525a1f878SPeter Xu 136ab7cbb0bSJuan Quintela return 0; 137ab7cbb0bSJuan Quintela } 138ab7cbb0bSJuan Quintela 139ab7cbb0bSJuan Quintela /** 140ab7cbb0bSJuan Quintela * nocomp_send_cleanup: cleanup send side 141ab7cbb0bSJuan Quintela * 142ab7cbb0bSJuan Quintela * For no compression this function does nothing. 143ab7cbb0bSJuan Quintela * 144ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 14518ede636SJuan Quintela * @errp: pointer to an error 146ab7cbb0bSJuan Quintela */ 147ab7cbb0bSJuan Quintela static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 148ab7cbb0bSJuan Quintela { 149ab7cbb0bSJuan Quintela return; 150ab7cbb0bSJuan Quintela } 151ab7cbb0bSJuan Quintela 15206833d83SFabiano Rosas static void multifd_send_prepare_iovs(MultiFDSendParams *p) 15306833d83SFabiano Rosas { 15406833d83SFabiano Rosas MultiFDPages_t *pages = p->pages; 15506833d83SFabiano Rosas 15606833d83SFabiano Rosas for (int i = 0; i < pages->num; i++) { 15706833d83SFabiano Rosas p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 15806833d83SFabiano Rosas p->iov[p->iovs_num].iov_len = p->page_size; 15906833d83SFabiano Rosas p->iovs_num++; 16006833d83SFabiano Rosas } 16106833d83SFabiano Rosas 16206833d83SFabiano Rosas p->next_packet_size = pages->num * p->page_size; 16306833d83SFabiano Rosas } 16406833d83SFabiano Rosas 165ab7cbb0bSJuan Quintela /** 166ab7cbb0bSJuan Quintela * nocomp_send_prepare: prepare date to be able to send 167ab7cbb0bSJuan Quintela * 168ab7cbb0bSJuan Quintela * For no compression we just have to calculate the size of the 169ab7cbb0bSJuan Quintela * packet. 170ab7cbb0bSJuan Quintela * 171ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 172ab7cbb0bSJuan Quintela * 173ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 174ab7cbb0bSJuan Quintela * @errp: pointer to an error 175ab7cbb0bSJuan Quintela */ 17602fb8104SJuan Quintela static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 177ab7cbb0bSJuan Quintela { 17825a1f878SPeter Xu bool use_zero_copy_send = migrate_zero_copy_send(); 17925a1f878SPeter Xu int ret; 18025a1f878SPeter Xu 18106833d83SFabiano Rosas if (!multifd_use_packets()) { 18206833d83SFabiano Rosas multifd_send_prepare_iovs(p); 183f427d90bSFabiano Rosas multifd_set_file_bitmap(p); 184f427d90bSFabiano Rosas 18506833d83SFabiano Rosas return 0; 18606833d83SFabiano Rosas } 18706833d83SFabiano Rosas 18825a1f878SPeter Xu if (!use_zero_copy_send) { 18925a1f878SPeter Xu /* 19025a1f878SPeter Xu * Only !zerocopy needs the header in IOV; zerocopy will 19125a1f878SPeter Xu * send it separately. 19225a1f878SPeter Xu */ 19325a1f878SPeter Xu multifd_send_prepare_header(p); 19425a1f878SPeter Xu } 195226468baSJuan Quintela 19606833d83SFabiano Rosas multifd_send_prepare_iovs(p); 197ab7cbb0bSJuan Quintela p->flags |= MULTIFD_FLAG_NOCOMP; 19825a1f878SPeter Xu 19925a1f878SPeter Xu multifd_send_fill_packet(p); 20025a1f878SPeter Xu 20125a1f878SPeter Xu if (use_zero_copy_send) { 20225a1f878SPeter Xu /* Send header first, without zerocopy */ 20325a1f878SPeter Xu ret = qio_channel_write_all(p->c, (void *)p->packet, 20425a1f878SPeter Xu p->packet_len, errp); 20525a1f878SPeter Xu if (ret != 0) { 20625a1f878SPeter Xu return -1; 20725a1f878SPeter Xu } 20825a1f878SPeter Xu } 20925a1f878SPeter Xu 210ab7cbb0bSJuan Quintela return 0; 211ab7cbb0bSJuan Quintela } 212ab7cbb0bSJuan Quintela 213ab7cbb0bSJuan Quintela /** 214ab7cbb0bSJuan Quintela * nocomp_recv_setup: setup receive side 215ab7cbb0bSJuan Quintela * 216ab7cbb0bSJuan Quintela * For no compression this function does nothing. 217ab7cbb0bSJuan Quintela * 218ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 219ab7cbb0bSJuan Quintela * 220ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 221ab7cbb0bSJuan Quintela * @errp: pointer to an error 222ab7cbb0bSJuan Quintela */ 223ab7cbb0bSJuan Quintela static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 224ab7cbb0bSJuan Quintela { 225ab7cbb0bSJuan Quintela return 0; 226ab7cbb0bSJuan Quintela } 227ab7cbb0bSJuan Quintela 228ab7cbb0bSJuan Quintela /** 229ab7cbb0bSJuan Quintela * nocomp_recv_cleanup: setup receive side 230ab7cbb0bSJuan Quintela * 231ab7cbb0bSJuan Quintela * For no compression this function does nothing. 232ab7cbb0bSJuan Quintela * 233ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 234ab7cbb0bSJuan Quintela */ 235ab7cbb0bSJuan Quintela static void nocomp_recv_cleanup(MultiFDRecvParams *p) 236ab7cbb0bSJuan Quintela { 237ab7cbb0bSJuan Quintela } 238ab7cbb0bSJuan Quintela 239ab7cbb0bSJuan Quintela /** 2409db19125SFabiano Rosas * nocomp_recv: read the data from the channel 241ab7cbb0bSJuan Quintela * 242ab7cbb0bSJuan Quintela * For no compression we just need to read things into the correct place. 243ab7cbb0bSJuan Quintela * 244ab7cbb0bSJuan Quintela * Returns 0 for success or -1 for error 245ab7cbb0bSJuan Quintela * 246ab7cbb0bSJuan Quintela * @p: Params for the channel that we are using 247ab7cbb0bSJuan Quintela * @errp: pointer to an error 248ab7cbb0bSJuan Quintela */ 2499db19125SFabiano Rosas static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 250ab7cbb0bSJuan Quintela { 25106833d83SFabiano Rosas uint32_t flags; 25206833d83SFabiano Rosas 25306833d83SFabiano Rosas if (!multifd_use_packets()) { 254a49d15a3SFabiano Rosas return multifd_file_recv_data(p, errp); 25506833d83SFabiano Rosas } 25606833d83SFabiano Rosas 25706833d83SFabiano Rosas flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 258ab7cbb0bSJuan Quintela 259ab7cbb0bSJuan Quintela if (flags != MULTIFD_FLAG_NOCOMP) { 26004e11404SJuan Quintela error_setg(errp, "multifd %u: flags received %x flags expected %x", 261ab7cbb0bSJuan Quintela p->id, flags, MULTIFD_FLAG_NOCOMP); 262ab7cbb0bSJuan Quintela return -1; 263ab7cbb0bSJuan Quintela } 264cf2d4aa8SJuan Quintela for (int i = 0; i < p->normal_num; i++) { 265faf60935SJuan Quintela p->iov[i].iov_base = p->host + p->normal[i]; 266ddec20f8SJuan Quintela p->iov[i].iov_len = p->page_size; 267226468baSJuan Quintela } 268cf2d4aa8SJuan Quintela return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 269ab7cbb0bSJuan Quintela } 270ab7cbb0bSJuan Quintela 271ab7cbb0bSJuan Quintela static MultiFDMethods multifd_nocomp_ops = { 272ab7cbb0bSJuan Quintela .send_setup = nocomp_send_setup, 273ab7cbb0bSJuan Quintela .send_cleanup = nocomp_send_cleanup, 274ab7cbb0bSJuan Quintela .send_prepare = nocomp_send_prepare, 275ab7cbb0bSJuan Quintela .recv_setup = nocomp_recv_setup, 276ab7cbb0bSJuan Quintela .recv_cleanup = nocomp_recv_cleanup, 2779db19125SFabiano Rosas .recv = nocomp_recv 278ab7cbb0bSJuan Quintela }; 279ab7cbb0bSJuan Quintela 280ab7cbb0bSJuan Quintela static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 281ab7cbb0bSJuan Quintela [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 282ab7cbb0bSJuan Quintela }; 283ab7cbb0bSJuan Quintela 2847ec2c2b3SJuan Quintela void multifd_register_ops(int method, MultiFDMethods *ops) 2857ec2c2b3SJuan Quintela { 2867ec2c2b3SJuan Quintela assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 2877ec2c2b3SJuan Quintela multifd_ops[method] = ops; 2887ec2c2b3SJuan Quintela } 2897ec2c2b3SJuan Quintela 290836eca47SPeter Xu /* Reset a MultiFDPages_t* object for the next use */ 291836eca47SPeter Xu static void multifd_pages_reset(MultiFDPages_t *pages) 292836eca47SPeter Xu { 293836eca47SPeter Xu /* 294836eca47SPeter Xu * We don't need to touch offset[] array, because it will be 295836eca47SPeter Xu * overwritten later when reused. 296836eca47SPeter Xu */ 297836eca47SPeter Xu pages->num = 0; 298836eca47SPeter Xu pages->block = NULL; 299836eca47SPeter Xu } 300836eca47SPeter Xu 301d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 302d32ca5adSJuan Quintela { 303d32ca5adSJuan Quintela MultiFDInit_t msg = {}; 304cbec7eb7SJuan Quintela size_t size = sizeof(msg); 305d32ca5adSJuan Quintela int ret; 306d32ca5adSJuan Quintela 307d32ca5adSJuan Quintela msg.magic = cpu_to_be32(MULTIFD_MAGIC); 308d32ca5adSJuan Quintela msg.version = cpu_to_be32(MULTIFD_VERSION); 309d32ca5adSJuan Quintela msg.id = p->id; 310d32ca5adSJuan Quintela memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 311d32ca5adSJuan Quintela 312cbec7eb7SJuan Quintela ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 313d32ca5adSJuan Quintela if (ret != 0) { 314d32ca5adSJuan Quintela return -1; 315d32ca5adSJuan Quintela } 316cbec7eb7SJuan Quintela stat64_add(&mig_stats.multifd_bytes, size); 317d32ca5adSJuan Quintela return 0; 318d32ca5adSJuan Quintela } 319d32ca5adSJuan Quintela 320d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 321d32ca5adSJuan Quintela { 322d32ca5adSJuan Quintela MultiFDInit_t msg; 323d32ca5adSJuan Quintela int ret; 324d32ca5adSJuan Quintela 325d32ca5adSJuan Quintela ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 326d32ca5adSJuan Quintela if (ret != 0) { 327d32ca5adSJuan Quintela return -1; 328d32ca5adSJuan Quintela } 329d32ca5adSJuan Quintela 330d32ca5adSJuan Quintela msg.magic = be32_to_cpu(msg.magic); 331d32ca5adSJuan Quintela msg.version = be32_to_cpu(msg.version); 332d32ca5adSJuan Quintela 333d32ca5adSJuan Quintela if (msg.magic != MULTIFD_MAGIC) { 334d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet magic %x " 335d32ca5adSJuan Quintela "expected %x", msg.magic, MULTIFD_MAGIC); 336d32ca5adSJuan Quintela return -1; 337d32ca5adSJuan Quintela } 338d32ca5adSJuan Quintela 339d32ca5adSJuan Quintela if (msg.version != MULTIFD_VERSION) { 34004e11404SJuan Quintela error_setg(errp, "multifd: received packet version %u " 34104e11404SJuan Quintela "expected %u", msg.version, MULTIFD_VERSION); 342d32ca5adSJuan Quintela return -1; 343d32ca5adSJuan Quintela } 344d32ca5adSJuan Quintela 345d32ca5adSJuan Quintela if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 346d32ca5adSJuan Quintela char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 347d32ca5adSJuan Quintela char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 348d32ca5adSJuan Quintela 349d32ca5adSJuan Quintela error_setg(errp, "multifd: received uuid '%s' and expected " 350d32ca5adSJuan Quintela "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 351d32ca5adSJuan Quintela g_free(uuid); 352d32ca5adSJuan Quintela g_free(msg_uuid); 353d32ca5adSJuan Quintela return -1; 354d32ca5adSJuan Quintela } 355d32ca5adSJuan Quintela 356d32ca5adSJuan Quintela if (msg.id > migrate_multifd_channels()) { 357c77b4085SAvihai Horon error_setg(errp, "multifd: received channel id %u is greater than " 358c77b4085SAvihai Horon "number of channels %u", msg.id, migrate_multifd_channels()); 359d32ca5adSJuan Quintela return -1; 360d32ca5adSJuan Quintela } 361d32ca5adSJuan Quintela 362d32ca5adSJuan Quintela return msg.id; 363d32ca5adSJuan Quintela } 364d32ca5adSJuan Quintela 3656074f816SFabiano Rosas static MultiFDPages_t *multifd_pages_init(uint32_t n) 366d32ca5adSJuan Quintela { 367d32ca5adSJuan Quintela MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 368d32ca5adSJuan Quintela 3696074f816SFabiano Rosas pages->allocated = n; 3706074f816SFabiano Rosas pages->offset = g_new0(ram_addr_t, n); 371d32ca5adSJuan Quintela 372d32ca5adSJuan Quintela return pages; 373d32ca5adSJuan Quintela } 374d32ca5adSJuan Quintela 375d32ca5adSJuan Quintela static void multifd_pages_clear(MultiFDPages_t *pages) 376d32ca5adSJuan Quintela { 377836eca47SPeter Xu multifd_pages_reset(pages); 378d32ca5adSJuan Quintela pages->allocated = 0; 379d32ca5adSJuan Quintela g_free(pages->offset); 380d32ca5adSJuan Quintela pages->offset = NULL; 381d32ca5adSJuan Quintela g_free(pages); 382d32ca5adSJuan Quintela } 383d32ca5adSJuan Quintela 38425a1f878SPeter Xu void multifd_send_fill_packet(MultiFDSendParams *p) 385d32ca5adSJuan Quintela { 386d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 387efd8c543SPeter Xu MultiFDPages_t *pages = p->pages; 38898ea497dSPeter Xu uint64_t packet_num; 389d32ca5adSJuan Quintela int i; 390d32ca5adSJuan Quintela 391d32ca5adSJuan Quintela packet->flags = cpu_to_be32(p->flags); 392d32ca5adSJuan Quintela packet->pages_alloc = cpu_to_be32(p->pages->allocated); 393efd8c543SPeter Xu packet->normal_pages = cpu_to_be32(pages->num); 394d32ca5adSJuan Quintela packet->next_packet_size = cpu_to_be32(p->next_packet_size); 39598ea497dSPeter Xu 39698ea497dSPeter Xu packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 39798ea497dSPeter Xu packet->packet_num = cpu_to_be64(packet_num); 398d32ca5adSJuan Quintela 399efd8c543SPeter Xu if (pages->block) { 400efd8c543SPeter Xu strncpy(packet->ramblock, pages->block->idstr, 256); 401d32ca5adSJuan Quintela } 402d32ca5adSJuan Quintela 403efd8c543SPeter Xu for (i = 0; i < pages->num; i++) { 404d32ca5adSJuan Quintela /* there are architectures where ram_addr_t is 32 bit */ 405efd8c543SPeter Xu uint64_t temp = pages->offset[i]; 406d32ca5adSJuan Quintela 407d32ca5adSJuan Quintela packet->offset[i] = cpu_to_be64(temp); 408d32ca5adSJuan Quintela } 40905b7ec18SPeter Xu 41005b7ec18SPeter Xu p->packets_sent++; 411db7e1cc5SPeter Xu p->total_normal_pages += pages->num; 4128a9ef173SPeter Xu 41398ea497dSPeter Xu trace_multifd_send(p->id, packet_num, pages->num, p->flags, 4148a9ef173SPeter Xu p->next_packet_size); 415d32ca5adSJuan Quintela } 416d32ca5adSJuan Quintela 417d32ca5adSJuan Quintela static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 418d32ca5adSJuan Quintela { 419d32ca5adSJuan Quintela MultiFDPacket_t *packet = p->packet; 420d32ca5adSJuan Quintela int i; 421d32ca5adSJuan Quintela 422d32ca5adSJuan Quintela packet->magic = be32_to_cpu(packet->magic); 423d32ca5adSJuan Quintela if (packet->magic != MULTIFD_MAGIC) { 424d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 425d32ca5adSJuan Quintela "magic %x and expected magic %x", 426d32ca5adSJuan Quintela packet->magic, MULTIFD_MAGIC); 427d32ca5adSJuan Quintela return -1; 428d32ca5adSJuan Quintela } 429d32ca5adSJuan Quintela 430d32ca5adSJuan Quintela packet->version = be32_to_cpu(packet->version); 431d32ca5adSJuan Quintela if (packet->version != MULTIFD_VERSION) { 432d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 43304e11404SJuan Quintela "version %u and expected version %u", 434d32ca5adSJuan Quintela packet->version, MULTIFD_VERSION); 435d32ca5adSJuan Quintela return -1; 436d32ca5adSJuan Quintela } 437d32ca5adSJuan Quintela 438d32ca5adSJuan Quintela p->flags = be32_to_cpu(packet->flags); 439d32ca5adSJuan Quintela 440d32ca5adSJuan Quintela packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 441d32ca5adSJuan Quintela /* 442d32ca5adSJuan Quintela * If we received a packet that is 100 times bigger than expected 443d32ca5adSJuan Quintela * just stop migration. It is a magic number. 444d32ca5adSJuan Quintela */ 445d6f45ebaSJuan Quintela if (packet->pages_alloc > p->page_count) { 446d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 447cf2d4aa8SJuan Quintela "with size %u and expected a size of %u", 448d6f45ebaSJuan Quintela packet->pages_alloc, p->page_count) ; 449d32ca5adSJuan Quintela return -1; 450d32ca5adSJuan Quintela } 451d32ca5adSJuan Quintela 4528c0ec0b2SJuan Quintela p->normal_num = be32_to_cpu(packet->normal_pages); 453cf2d4aa8SJuan Quintela if (p->normal_num > packet->pages_alloc) { 454d32ca5adSJuan Quintela error_setg(errp, "multifd: received packet " 45504e11404SJuan Quintela "with %u pages and expected maximum pages are %u", 456cf2d4aa8SJuan Quintela p->normal_num, packet->pages_alloc) ; 457d32ca5adSJuan Quintela return -1; 458d32ca5adSJuan Quintela } 459d32ca5adSJuan Quintela 460d32ca5adSJuan Quintela p->next_packet_size = be32_to_cpu(packet->next_packet_size); 461d32ca5adSJuan Quintela p->packet_num = be64_to_cpu(packet->packet_num); 46205b7ec18SPeter Xu p->packets_recved++; 463db7e1cc5SPeter Xu p->total_normal_pages += p->normal_num; 464d32ca5adSJuan Quintela 4658a9ef173SPeter Xu trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, 4668a9ef173SPeter Xu p->next_packet_size); 4678a9ef173SPeter Xu 468cf2d4aa8SJuan Quintela if (p->normal_num == 0) { 469d32ca5adSJuan Quintela return 0; 470d32ca5adSJuan Quintela } 471d32ca5adSJuan Quintela 472d32ca5adSJuan Quintela /* make sure that ramblock is 0 terminated */ 473d32ca5adSJuan Quintela packet->ramblock[255] = 0; 4745d1d1fcfSLukas Straub p->block = qemu_ram_block_by_name(packet->ramblock); 4755d1d1fcfSLukas Straub if (!p->block) { 476d32ca5adSJuan Quintela error_setg(errp, "multifd: unknown ram block %s", 477d32ca5adSJuan Quintela packet->ramblock); 478d32ca5adSJuan Quintela return -1; 479d32ca5adSJuan Quintela } 480d32ca5adSJuan Quintela 4815d1d1fcfSLukas Straub p->host = p->block->host; 482cf2d4aa8SJuan Quintela for (i = 0; i < p->normal_num; i++) { 483d32ca5adSJuan Quintela uint64_t offset = be64_to_cpu(packet->offset[i]); 484d32ca5adSJuan Quintela 4855d1d1fcfSLukas Straub if (offset > (p->block->used_length - p->page_size)) { 486d32ca5adSJuan Quintela error_setg(errp, "multifd: offset too long %" PRIu64 487d32ca5adSJuan Quintela " (max " RAM_ADDR_FMT ")", 4885d1d1fcfSLukas Straub offset, p->block->used_length); 489d32ca5adSJuan Quintela return -1; 490d32ca5adSJuan Quintela } 491cf2d4aa8SJuan Quintela p->normal[i] = offset; 492d32ca5adSJuan Quintela } 493d32ca5adSJuan Quintela 494d32ca5adSJuan Quintela return 0; 495d32ca5adSJuan Quintela } 496d32ca5adSJuan Quintela 49715f3f21dSPeter Xu static bool multifd_send_should_exit(void) 49815f3f21dSPeter Xu { 49915f3f21dSPeter Xu return qatomic_read(&multifd_send_state->exiting); 50015f3f21dSPeter Xu } 50115f3f21dSPeter Xu 50211dd7be5SFabiano Rosas static bool multifd_recv_should_exit(void) 50311dd7be5SFabiano Rosas { 50411dd7be5SFabiano Rosas return qatomic_read(&multifd_recv_state->exiting); 50511dd7be5SFabiano Rosas } 50611dd7be5SFabiano Rosas 507d32ca5adSJuan Quintela /* 50848c0f5d5SPeter Xu * The migration thread can wait on either of the two semaphores. This 50948c0f5d5SPeter Xu * function can be used to kick the main thread out of waiting on either of 51048c0f5d5SPeter Xu * them. Should mostly only be called when something wrong happened with 51148c0f5d5SPeter Xu * the current multifd send thread. 51248c0f5d5SPeter Xu */ 51348c0f5d5SPeter Xu static void multifd_send_kick_main(MultiFDSendParams *p) 51448c0f5d5SPeter Xu { 51548c0f5d5SPeter Xu qemu_sem_post(&p->sem_sync); 51648c0f5d5SPeter Xu qemu_sem_post(&multifd_send_state->channels_ready); 51748c0f5d5SPeter Xu } 51848c0f5d5SPeter Xu 51948c0f5d5SPeter Xu /* 520d32ca5adSJuan Quintela * How we use multifd_send_state->pages and channel->pages? 521d32ca5adSJuan Quintela * 522d32ca5adSJuan Quintela * We create a pages for each channel, and a main one. Each time that 523d32ca5adSJuan Quintela * we need to send a batch of pages we interchange the ones between 524d32ca5adSJuan Quintela * multifd_send_state and the channel that is sending it. There are 525d32ca5adSJuan Quintela * two reasons for that: 526d32ca5adSJuan Quintela * - to not have to do so many mallocs during migration 527d32ca5adSJuan Quintela * - to make easier to know what to free at the end of migration 528d32ca5adSJuan Quintela * 529d32ca5adSJuan Quintela * This way we always know who is the owner of each "pages" struct, 530d32ca5adSJuan Quintela * and we don't need any locking. It belongs to the migration thread 531d32ca5adSJuan Quintela * or to the channel thread. Switching is safe because the migration 532d32ca5adSJuan Quintela * thread is using the channel mutex when changing it, and the channel 533d32ca5adSJuan Quintela * have to had finish with its own, otherwise pending_job can't be 534d32ca5adSJuan Quintela * false. 5353b40964aSPeter Xu * 5363b40964aSPeter Xu * Returns true if succeed, false otherwise. 537d32ca5adSJuan Quintela */ 5383b40964aSPeter Xu static bool multifd_send_pages(void) 539d32ca5adSJuan Quintela { 540d32ca5adSJuan Quintela int i; 541d32ca5adSJuan Quintela static int next_channel; 542d32ca5adSJuan Quintela MultiFDSendParams *p = NULL; /* make happy gcc */ 543d32ca5adSJuan Quintela MultiFDPages_t *pages = multifd_send_state->pages; 544d32ca5adSJuan Quintela 54515f3f21dSPeter Xu if (multifd_send_should_exit()) { 5463b40964aSPeter Xu return false; 547d32ca5adSJuan Quintela } 548d32ca5adSJuan Quintela 549e3cce9afSPeter Xu /* We wait here, until at least one channel is ready */ 550d32ca5adSJuan Quintela qemu_sem_wait(&multifd_send_state->channels_ready); 551e3cce9afSPeter Xu 5527e89a140SLaurent Vivier /* 5537e89a140SLaurent Vivier * next_channel can remain from a previous migration that was 5547e89a140SLaurent Vivier * using more channels, so ensure it doesn't overflow if the 5557e89a140SLaurent Vivier * limit is lower now. 5567e89a140SLaurent Vivier */ 5577e89a140SLaurent Vivier next_channel %= migrate_multifd_channels(); 558d32ca5adSJuan Quintela for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 55915f3f21dSPeter Xu if (multifd_send_should_exit()) { 5603b40964aSPeter Xu return false; 561d32ca5adSJuan Quintela } 56215f3f21dSPeter Xu p = &multifd_send_state->params[i]; 563e3cce9afSPeter Xu /* 564e3cce9afSPeter Xu * Lockless read to p->pending_job is safe, because only multifd 565e3cce9afSPeter Xu * sender thread can clear it. 566e3cce9afSPeter Xu */ 567f5f48a78SPeter Xu if (qatomic_read(&p->pending_job) == false) { 568d32ca5adSJuan Quintela next_channel = (i + 1) % migrate_multifd_channels(); 569d32ca5adSJuan Quintela break; 570d32ca5adSJuan Quintela } 571d32ca5adSJuan Quintela } 572e3cce9afSPeter Xu 573e3cce9afSPeter Xu /* 574488c84acSPeter Xu * Make sure we read p->pending_job before all the rest. Pairs with 575488c84acSPeter Xu * qatomic_store_release() in multifd_send_thread(). 576e3cce9afSPeter Xu */ 577488c84acSPeter Xu smp_mb_acquire(); 578488c84acSPeter Xu assert(!p->pages->num); 579d32ca5adSJuan Quintela multifd_send_state->pages = p->pages; 580d32ca5adSJuan Quintela p->pages = pages; 581488c84acSPeter Xu /* 582488c84acSPeter Xu * Making sure p->pages is setup before marking pending_job=true. Pairs 583488c84acSPeter Xu * with the qatomic_load_acquire() in multifd_send_thread(). 584488c84acSPeter Xu */ 585488c84acSPeter Xu qatomic_store_release(&p->pending_job, true); 586d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 587d32ca5adSJuan Quintela 5883b40964aSPeter Xu return true; 589d32ca5adSJuan Quintela } 590d32ca5adSJuan Quintela 591f88f86c4SPeter Xu static inline bool multifd_queue_empty(MultiFDPages_t *pages) 592f88f86c4SPeter Xu { 593f88f86c4SPeter Xu return pages->num == 0; 594f88f86c4SPeter Xu } 595f88f86c4SPeter Xu 596f88f86c4SPeter Xu static inline bool multifd_queue_full(MultiFDPages_t *pages) 597f88f86c4SPeter Xu { 598f88f86c4SPeter Xu return pages->num == pages->allocated; 599f88f86c4SPeter Xu } 600f88f86c4SPeter Xu 601f88f86c4SPeter Xu static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 602f88f86c4SPeter Xu { 603f88f86c4SPeter Xu pages->offset[pages->num++] = offset; 604f88f86c4SPeter Xu } 605f88f86c4SPeter Xu 606d6556d17SPeter Xu /* Returns true if enqueue successful, false otherwise */ 607d6556d17SPeter Xu bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 608d32ca5adSJuan Quintela { 609f88f86c4SPeter Xu MultiFDPages_t *pages; 610d32ca5adSJuan Quintela 611f88f86c4SPeter Xu retry: 612f88f86c4SPeter Xu pages = multifd_send_state->pages; 613f88f86c4SPeter Xu 614f88f86c4SPeter Xu /* If the queue is empty, we can already enqueue now */ 615f88f86c4SPeter Xu if (multifd_queue_empty(pages)) { 616d32ca5adSJuan Quintela pages->block = block; 617f88f86c4SPeter Xu multifd_enqueue(pages, offset); 618d6556d17SPeter Xu return true; 619d32ca5adSJuan Quintela } 620d32ca5adSJuan Quintela 621f88f86c4SPeter Xu /* 622f88f86c4SPeter Xu * Not empty, meanwhile we need a flush. It can because of either: 623f88f86c4SPeter Xu * 624f88f86c4SPeter Xu * (1) The page is not on the same ramblock of previous ones, or, 625f88f86c4SPeter Xu * (2) The queue is full. 626f88f86c4SPeter Xu * 627f88f86c4SPeter Xu * After flush, always retry. 628f88f86c4SPeter Xu */ 629f88f86c4SPeter Xu if (pages->block != block || multifd_queue_full(pages)) { 6303b40964aSPeter Xu if (!multifd_send_pages()) { 631d6556d17SPeter Xu return false; 632d32ca5adSJuan Quintela } 633f88f86c4SPeter Xu goto retry; 634d32ca5adSJuan Quintela } 635d32ca5adSJuan Quintela 636f88f86c4SPeter Xu /* Not empty, and we still have space, do it! */ 637f88f86c4SPeter Xu multifd_enqueue(pages, offset); 638d6556d17SPeter Xu return true; 639d32ca5adSJuan Quintela } 640d32ca5adSJuan Quintela 6413ab4441dSPeter Xu /* Multifd send side hit an error; remember it and prepare to quit */ 6423ab4441dSPeter Xu static void multifd_send_set_error(Error *err) 643d32ca5adSJuan Quintela { 64415f3f21dSPeter Xu /* 64515f3f21dSPeter Xu * We don't want to exit each threads twice. Depending on where 64615f3f21dSPeter Xu * we get the error, or if there are two independent errors in two 64715f3f21dSPeter Xu * threads at the same time, we can end calling this function 64815f3f21dSPeter Xu * twice. 64915f3f21dSPeter Xu */ 65015f3f21dSPeter Xu if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 65115f3f21dSPeter Xu return; 65215f3f21dSPeter Xu } 65315f3f21dSPeter Xu 654d32ca5adSJuan Quintela if (err) { 655d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 656d32ca5adSJuan Quintela migrate_set_error(s, err); 657d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 658d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 659d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_DEVICE || 660d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 661d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 662d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 663d32ca5adSJuan Quintela } 664d32ca5adSJuan Quintela } 6653ab4441dSPeter Xu } 666d32ca5adSJuan Quintela 6673ab4441dSPeter Xu static void multifd_send_terminate_threads(void) 6683ab4441dSPeter Xu { 6693ab4441dSPeter Xu int i; 6703ab4441dSPeter Xu 6713ab4441dSPeter Xu trace_multifd_send_terminate_threads(); 6723ab4441dSPeter Xu 6733ab4441dSPeter Xu /* 6743ab4441dSPeter Xu * Tell everyone we're quitting. No xchg() needed here; we simply 6753ab4441dSPeter Xu * always set it. 6763ab4441dSPeter Xu */ 6773ab4441dSPeter Xu qatomic_set(&multifd_send_state->exiting, 1); 67812808db3SPeter Xu 67912808db3SPeter Xu /* 68012808db3SPeter Xu * Firstly, kick all threads out; no matter whether they are just idle, 68112808db3SPeter Xu * or blocked in an IO system call. 68212808db3SPeter Xu */ 683d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 684d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 685d32ca5adSJuan Quintela 686d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 687077fbb59SLi Zhang if (p->c) { 688077fbb59SLi Zhang qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 689077fbb59SLi Zhang } 690d32ca5adSJuan Quintela } 69112808db3SPeter Xu 69212808db3SPeter Xu /* 69312808db3SPeter Xu * Finally recycle all the threads. 69412808db3SPeter Xu */ 69512808db3SPeter Xu for (i = 0; i < migrate_multifd_channels(); i++) { 69612808db3SPeter Xu MultiFDSendParams *p = &multifd_send_state->params[i]; 69712808db3SPeter Xu 698e1921f10SFabiano Rosas if (p->tls_thread_created) { 699e1921f10SFabiano Rosas qemu_thread_join(&p->tls_thread); 700e1921f10SFabiano Rosas } 701e1921f10SFabiano Rosas 702a2a63c4aSFabiano Rosas if (p->thread_created) { 70312808db3SPeter Xu qemu_thread_join(&p->thread); 70412808db3SPeter Xu } 70512808db3SPeter Xu } 706d32ca5adSJuan Quintela } 707d32ca5adSJuan Quintela 70812808db3SPeter Xu static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 709d32ca5adSJuan Quintela { 7100518b5d8SPeter Xu if (p->c) { 71120171ea8SLukas Straub migration_ioc_unregister_yank(p->c); 712*1a6e217cSPeter Xu /* 713*1a6e217cSPeter Xu * An explicit close() on the channel here is normally not 714*1a6e217cSPeter Xu * required, but can be helpful for "file:" iochannels, where it 715*1a6e217cSPeter Xu * will include fdatasync() to make sure the data is flushed to the 716*1a6e217cSPeter Xu * disk backend. 717*1a6e217cSPeter Xu * 718*1a6e217cSPeter Xu * The object_unref() cannot guarantee that because: (1) finalize() 719*1a6e217cSPeter Xu * of the iochannel is only triggered on the last reference, and 720*1a6e217cSPeter Xu * it's not guaranteed that we always hold the last refcount when 721*1a6e217cSPeter Xu * reaching here, and, (2) even if finalize() is invoked, it only 722*1a6e217cSPeter Xu * does a close(fd) without data flush. 723*1a6e217cSPeter Xu */ 724b7b03eb6SFabiano Rosas qio_channel_close(p->c, &error_abort); 725c9a7e83cSPeter Xu object_unref(OBJECT(p->c)); 726d32ca5adSJuan Quintela p->c = NULL; 7270518b5d8SPeter Xu } 728d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem); 729d32ca5adSJuan Quintela qemu_sem_destroy(&p->sem_sync); 730d32ca5adSJuan Quintela g_free(p->name); 731d32ca5adSJuan Quintela p->name = NULL; 732d32ca5adSJuan Quintela multifd_pages_clear(p->pages); 733d32ca5adSJuan Quintela p->pages = NULL; 734d32ca5adSJuan Quintela p->packet_len = 0; 735d32ca5adSJuan Quintela g_free(p->packet); 736d32ca5adSJuan Quintela p->packet = NULL; 737226468baSJuan Quintela g_free(p->iov); 738226468baSJuan Quintela p->iov = NULL; 73912808db3SPeter Xu multifd_send_state->ops->send_cleanup(p, errp); 74012808db3SPeter Xu 74112808db3SPeter Xu return *errp == NULL; 742ab7cbb0bSJuan Quintela } 74312808db3SPeter Xu 74412808db3SPeter Xu static void multifd_send_cleanup_state(void) 74512808db3SPeter Xu { 746b7b03eb6SFabiano Rosas file_cleanup_outgoing_migration(); 747decdc767SFabiano Rosas fd_cleanup_outgoing_migration(); 74872b90b96SPeter Xu socket_cleanup_outgoing_migration(); 74993fa9dc2SFabiano Rosas qemu_sem_destroy(&multifd_send_state->channels_created); 750d32ca5adSJuan Quintela qemu_sem_destroy(&multifd_send_state->channels_ready); 751d32ca5adSJuan Quintela g_free(multifd_send_state->params); 752d32ca5adSJuan Quintela multifd_send_state->params = NULL; 753d32ca5adSJuan Quintela multifd_pages_clear(multifd_send_state->pages); 754d32ca5adSJuan Quintela multifd_send_state->pages = NULL; 755d32ca5adSJuan Quintela g_free(multifd_send_state); 756d32ca5adSJuan Quintela multifd_send_state = NULL; 757d32ca5adSJuan Quintela } 758d32ca5adSJuan Quintela 759cde85c37SPeter Xu void multifd_send_shutdown(void) 76012808db3SPeter Xu { 76112808db3SPeter Xu int i; 76212808db3SPeter Xu 76312808db3SPeter Xu if (!migrate_multifd()) { 76412808db3SPeter Xu return; 76512808db3SPeter Xu } 76612808db3SPeter Xu 76712808db3SPeter Xu multifd_send_terminate_threads(); 76812808db3SPeter Xu 76912808db3SPeter Xu for (i = 0; i < migrate_multifd_channels(); i++) { 77012808db3SPeter Xu MultiFDSendParams *p = &multifd_send_state->params[i]; 77112808db3SPeter Xu Error *local_err = NULL; 77212808db3SPeter Xu 77312808db3SPeter Xu if (!multifd_send_cleanup_channel(p, &local_err)) { 77412808db3SPeter Xu migrate_set_error(migrate_get_current(), local_err); 77512808db3SPeter Xu error_free(local_err); 77612808db3SPeter Xu } 77712808db3SPeter Xu } 77812808db3SPeter Xu 77912808db3SPeter Xu multifd_send_cleanup_state(); 78012808db3SPeter Xu } 78112808db3SPeter Xu 7824cc47b43SLeonardo Bras static int multifd_zero_copy_flush(QIOChannel *c) 7834cc47b43SLeonardo Bras { 7844cc47b43SLeonardo Bras int ret; 7854cc47b43SLeonardo Bras Error *err = NULL; 7864cc47b43SLeonardo Bras 7874cc47b43SLeonardo Bras ret = qio_channel_flush(c, &err); 7884cc47b43SLeonardo Bras if (ret < 0) { 7894cc47b43SLeonardo Bras error_report_err(err); 7904cc47b43SLeonardo Bras return -1; 7914cc47b43SLeonardo Bras } 7924cc47b43SLeonardo Bras if (ret == 1) { 793aff3f660SJuan Quintela stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 7944cc47b43SLeonardo Bras } 7954cc47b43SLeonardo Bras 7964cc47b43SLeonardo Bras return ret; 7974cc47b43SLeonardo Bras } 7984cc47b43SLeonardo Bras 7999346fa18SFabiano Rosas int multifd_send_sync_main(void) 800d32ca5adSJuan Quintela { 801d32ca5adSJuan Quintela int i; 8025b1d9babSLeonardo Bras bool flush_zero_copy; 803d32ca5adSJuan Quintela 80451b07548SJuan Quintela if (!migrate_multifd()) { 80533d70973SLeonardo Bras return 0; 806d32ca5adSJuan Quintela } 80790a3d2f9SJuan Quintela if (multifd_send_state->pages->num) { 8083b40964aSPeter Xu if (!multifd_send_pages()) { 809d32ca5adSJuan Quintela error_report("%s: multifd_send_pages fail", __func__); 81033d70973SLeonardo Bras return -1; 811d32ca5adSJuan Quintela } 812d32ca5adSJuan Quintela } 8135b1d9babSLeonardo Bras 814b4bc342cSJuan Quintela flush_zero_copy = migrate_zero_copy_send(); 8155b1d9babSLeonardo Bras 816d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 817d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 818d32ca5adSJuan Quintela 81915f3f21dSPeter Xu if (multifd_send_should_exit()) { 82033d70973SLeonardo Bras return -1; 821d32ca5adSJuan Quintela } 822d32ca5adSJuan Quintela 82315f3f21dSPeter Xu trace_multifd_send_sync_main_signal(p->id); 82415f3f21dSPeter Xu 825f5f48a78SPeter Xu /* 826f5f48a78SPeter Xu * We should be the only user so far, so not possible to be set by 827f5f48a78SPeter Xu * others concurrently. 828f5f48a78SPeter Xu */ 829f5f48a78SPeter Xu assert(qatomic_read(&p->pending_sync) == false); 830f5f48a78SPeter Xu qatomic_set(&p->pending_sync, true); 831d32ca5adSJuan Quintela qemu_sem_post(&p->sem); 832d32ca5adSJuan Quintela } 833d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 834d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 835d32ca5adSJuan Quintela 83615f3f21dSPeter Xu if (multifd_send_should_exit()) { 83715f3f21dSPeter Xu return -1; 83815f3f21dSPeter Xu } 83915f3f21dSPeter Xu 840d2026ee1SJuan Quintela qemu_sem_wait(&multifd_send_state->channels_ready); 841d32ca5adSJuan Quintela trace_multifd_send_sync_main_wait(p->id); 842d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 843ebfc5787SZhenzhong Duan 844ebfc5787SZhenzhong Duan if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 845ebfc5787SZhenzhong Duan return -1; 846ebfc5787SZhenzhong Duan } 847d32ca5adSJuan Quintela } 848d32ca5adSJuan Quintela trace_multifd_send_sync_main(multifd_send_state->packet_num); 84933d70973SLeonardo Bras 85033d70973SLeonardo Bras return 0; 851d32ca5adSJuan Quintela } 852d32ca5adSJuan Quintela 853d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque) 854d32ca5adSJuan Quintela { 855d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 8561b1f4ab6SJiang Jiacheng MigrationThread *thread = NULL; 857d32ca5adSJuan Quintela Error *local_err = NULL; 858d32ca5adSJuan Quintela int ret = 0; 85906833d83SFabiano Rosas bool use_packets = multifd_use_packets(); 860d32ca5adSJuan Quintela 861788fa680SFabiano Rosas thread = migration_threads_add(p->name, qemu_get_thread_id()); 8621b1f4ab6SJiang Jiacheng 863d32ca5adSJuan Quintela trace_multifd_send_thread_start(p->id); 864d32ca5adSJuan Quintela rcu_register_thread(); 865d32ca5adSJuan Quintela 86606833d83SFabiano Rosas if (use_packets) { 867d32ca5adSJuan Quintela if (multifd_send_initial_packet(p, &local_err) < 0) { 868d32ca5adSJuan Quintela ret = -1; 869d32ca5adSJuan Quintela goto out; 870d32ca5adSJuan Quintela } 87106833d83SFabiano Rosas } 872d32ca5adSJuan Quintela 873d32ca5adSJuan Quintela while (true) { 874d2026ee1SJuan Quintela qemu_sem_post(&multifd_send_state->channels_ready); 875d32ca5adSJuan Quintela qemu_sem_wait(&p->sem); 876d32ca5adSJuan Quintela 87715f3f21dSPeter Xu if (multifd_send_should_exit()) { 878d32ca5adSJuan Quintela break; 879d32ca5adSJuan Quintela } 880d32ca5adSJuan Quintela 881488c84acSPeter Xu /* 882488c84acSPeter Xu * Read pending_job flag before p->pages. Pairs with the 883488c84acSPeter Xu * qatomic_store_release() in multifd_send_pages(). 884488c84acSPeter Xu */ 885488c84acSPeter Xu if (qatomic_load_acquire(&p->pending_job)) { 886efd8c543SPeter Xu MultiFDPages_t *pages = p->pages; 887d32ca5adSJuan Quintela 888b7dbdd8eSLeonardo Bras p->iovs_num = 0; 88983c560fbSPeter Xu assert(pages->num); 89083c560fbSPeter Xu 89102fb8104SJuan Quintela ret = multifd_send_state->ops->send_prepare(p, &local_err); 892ab7cbb0bSJuan Quintela if (ret != 0) { 893ab7cbb0bSJuan Quintela break; 894ab7cbb0bSJuan Quintela } 89583c560fbSPeter Xu 896f427d90bSFabiano Rosas if (migrate_mapped_ram()) { 897f427d90bSFabiano Rosas ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 898f427d90bSFabiano Rosas p->pages->block, &local_err); 899f427d90bSFabiano Rosas } else { 900f427d90bSFabiano Rosas ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 901f427d90bSFabiano Rosas NULL, 0, p->write_flags, 902f427d90bSFabiano Rosas &local_err); 903f427d90bSFabiano Rosas } 904f427d90bSFabiano Rosas 905d32ca5adSJuan Quintela if (ret != 0) { 906d32ca5adSJuan Quintela break; 907d32ca5adSJuan Quintela } 908d32ca5adSJuan Quintela 90968b6e000SElena Ufimtseva stat64_add(&mig_stats.multifd_bytes, 91068b6e000SElena Ufimtseva p->next_packet_size + p->packet_len); 911836eca47SPeter Xu 912836eca47SPeter Xu multifd_pages_reset(p->pages); 9131618f552SElena Ufimtseva p->next_packet_size = 0; 914488c84acSPeter Xu 915488c84acSPeter Xu /* 916488c84acSPeter Xu * Making sure p->pages is published before saying "we're 917488c84acSPeter Xu * free". Pairs with the smp_mb_acquire() in 918488c84acSPeter Xu * multifd_send_pages(). 919488c84acSPeter Xu */ 920488c84acSPeter Xu qatomic_store_release(&p->pending_job, false); 921859ebaf3SPeter Xu } else { 922488c84acSPeter Xu /* 923488c84acSPeter Xu * If not a normal job, must be a sync request. Note that 924488c84acSPeter Xu * pending_sync is a standalone flag (unlike pending_job), so 925488c84acSPeter Xu * it doesn't require explicit memory barriers. 926488c84acSPeter Xu */ 927859ebaf3SPeter Xu assert(qatomic_read(&p->pending_sync)); 92806833d83SFabiano Rosas 92906833d83SFabiano Rosas if (use_packets) { 930f5f48a78SPeter Xu p->flags = MULTIFD_FLAG_SYNC; 931f5f48a78SPeter Xu multifd_send_fill_packet(p); 932f5f48a78SPeter Xu ret = qio_channel_write_all(p->c, (void *)p->packet, 933f5f48a78SPeter Xu p->packet_len, &local_err); 934f5f48a78SPeter Xu if (ret != 0) { 935f5f48a78SPeter Xu break; 936d32ca5adSJuan Quintela } 937f5f48a78SPeter Xu /* p->next_packet_size will always be zero for a SYNC packet */ 938f5f48a78SPeter Xu stat64_add(&mig_stats.multifd_bytes, p->packet_len); 939f5f48a78SPeter Xu p->flags = 0; 94006833d83SFabiano Rosas } 94106833d83SFabiano Rosas 942f5f48a78SPeter Xu qatomic_set(&p->pending_sync, false); 943f5f48a78SPeter Xu qemu_sem_post(&p->sem_sync); 944d32ca5adSJuan Quintela } 945d32ca5adSJuan Quintela } 946d32ca5adSJuan Quintela 947d32ca5adSJuan Quintela out: 948ee8a7c9cSFabiano Rosas if (ret) { 949ee8a7c9cSFabiano Rosas assert(local_err); 950d32ca5adSJuan Quintela trace_multifd_send_error(p->id); 9513ab4441dSPeter Xu multifd_send_set_error(local_err); 95248c0f5d5SPeter Xu multifd_send_kick_main(p); 953ee8a7c9cSFabiano Rosas error_free(local_err); 954d32ca5adSJuan Quintela } 955d32ca5adSJuan Quintela 956d32ca5adSJuan Quintela rcu_unregister_thread(); 957788fa680SFabiano Rosas migration_threads_remove(thread); 95805b7ec18SPeter Xu trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages); 959d32ca5adSJuan Quintela 960d32ca5adSJuan Quintela return NULL; 961d32ca5adSJuan Quintela } 962d32ca5adSJuan Quintela 9632576ae48SFabiano Rosas static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 96429647140SChuan Zheng 9659221e3c6SPeter Xu typedef struct { 9669221e3c6SPeter Xu MultiFDSendParams *p; 9679221e3c6SPeter Xu QIOChannelTLS *tioc; 9689221e3c6SPeter Xu } MultiFDTLSThreadArgs; 9699221e3c6SPeter Xu 970a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque) 971a1af605bSChuan Zheng { 9729221e3c6SPeter Xu MultiFDTLSThreadArgs *args = opaque; 973a1af605bSChuan Zheng 9749221e3c6SPeter Xu qio_channel_tls_handshake(args->tioc, 9752576ae48SFabiano Rosas multifd_new_send_channel_async, 9769221e3c6SPeter Xu args->p, 977a1af605bSChuan Zheng NULL, 978a1af605bSChuan Zheng NULL); 9799221e3c6SPeter Xu g_free(args); 9809221e3c6SPeter Xu 981a1af605bSChuan Zheng return NULL; 982a1af605bSChuan Zheng } 983a1af605bSChuan Zheng 984967e3889SFabiano Rosas static bool multifd_tls_channel_connect(MultiFDSendParams *p, 98529647140SChuan Zheng QIOChannel *ioc, 98629647140SChuan Zheng Error **errp) 98729647140SChuan Zheng { 98829647140SChuan Zheng MigrationState *s = migrate_get_current(); 9897f692ec7SPeter Xu const char *hostname = s->hostname; 9909221e3c6SPeter Xu MultiFDTLSThreadArgs *args; 99129647140SChuan Zheng QIOChannelTLS *tioc; 99229647140SChuan Zheng 9930deb7e9bSJuan Quintela tioc = migration_tls_client_create(ioc, hostname, errp); 99429647140SChuan Zheng if (!tioc) { 995967e3889SFabiano Rosas return false; 99629647140SChuan Zheng } 99729647140SChuan Zheng 9982576ae48SFabiano Rosas /* 9992576ae48SFabiano Rosas * Ownership of the socket channel now transfers to the newly 10002576ae48SFabiano Rosas * created TLS channel, which has already taken a reference. 10012576ae48SFabiano Rosas */ 10029e842408SChuan Zheng object_unref(OBJECT(ioc)); 1003894f0214SChuan Zheng trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 100429647140SChuan Zheng qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 10059221e3c6SPeter Xu 10069221e3c6SPeter Xu args = g_new0(MultiFDTLSThreadArgs, 1); 10079221e3c6SPeter Xu args->tioc = tioc; 10089221e3c6SPeter Xu args->p = p; 1009e1921f10SFabiano Rosas 1010e1921f10SFabiano Rosas p->tls_thread_created = true; 1011e1921f10SFabiano Rosas qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 10129221e3c6SPeter Xu multifd_tls_handshake_thread, args, 1013a1af605bSChuan Zheng QEMU_THREAD_JOINABLE); 1014967e3889SFabiano Rosas return true; 101529647140SChuan Zheng } 101629647140SChuan Zheng 1017b7b03eb6SFabiano Rosas void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 101829647140SChuan Zheng { 10192576ae48SFabiano Rosas qio_channel_set_delay(ioc, false); 1020967e3889SFabiano Rosas 102120171ea8SLukas Straub migration_ioc_register_yank(ioc); 10229221e3c6SPeter Xu /* Setup p->c only if the channel is completely setup */ 102329647140SChuan Zheng p->c = ioc; 1024a2a63c4aSFabiano Rosas 1025a2a63c4aSFabiano Rosas p->thread_created = true; 102629647140SChuan Zheng qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 102729647140SChuan Zheng QEMU_THREAD_JOINABLE); 102829647140SChuan Zheng } 102929647140SChuan Zheng 10302576ae48SFabiano Rosas /* 10312576ae48SFabiano Rosas * When TLS is enabled this function is called once to establish the 10322576ae48SFabiano Rosas * TLS connection and a second time after the TLS handshake to create 10332576ae48SFabiano Rosas * the multifd channel. Without TLS it goes straight into the channel 10342576ae48SFabiano Rosas * creation. 10352576ae48SFabiano Rosas */ 1036d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1037d32ca5adSJuan Quintela { 1038d32ca5adSJuan Quintela MultiFDSendParams *p = opaque; 10390e92f644SFabiano Rosas QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1040d32ca5adSJuan Quintela Error *local_err = NULL; 10412576ae48SFabiano Rosas bool ret; 1042d32ca5adSJuan Quintela 1043d32ca5adSJuan Quintela trace_multifd_new_send_channel_async(p->id); 10442576ae48SFabiano Rosas 10452576ae48SFabiano Rosas if (qio_task_propagate_error(task, &local_err)) { 10462576ae48SFabiano Rosas ret = false; 10472576ae48SFabiano Rosas goto out; 1048bca762c2SLi Zhang } 104903c7a42dSChuan Zheng 10502576ae48SFabiano Rosas trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 10512576ae48SFabiano Rosas migrate_get_current()->hostname); 10522576ae48SFabiano Rosas 10532576ae48SFabiano Rosas if (migrate_channel_requires_tls_upgrade(ioc)) { 10542576ae48SFabiano Rosas ret = multifd_tls_channel_connect(p, ioc, &local_err); 105593fa9dc2SFabiano Rosas if (ret) { 105693fa9dc2SFabiano Rosas return; 105793fa9dc2SFabiano Rosas } 10582576ae48SFabiano Rosas } else { 1059770de49cSPeter Xu multifd_channel_connect(p, ioc); 1060770de49cSPeter Xu ret = true; 10612576ae48SFabiano Rosas } 10622576ae48SFabiano Rosas 106393fa9dc2SFabiano Rosas out: 106493fa9dc2SFabiano Rosas /* 106593fa9dc2SFabiano Rosas * Here we're not interested whether creation succeeded, only that 106693fa9dc2SFabiano Rosas * it happened at all. 106793fa9dc2SFabiano Rosas */ 1068a8a3e710SFabiano Rosas multifd_send_channel_created(); 106993fa9dc2SFabiano Rosas 10702576ae48SFabiano Rosas if (ret) { 10712576ae48SFabiano Rosas return; 10722576ae48SFabiano Rosas } 10732576ae48SFabiano Rosas 1074967e3889SFabiano Rosas trace_multifd_new_send_channel_async_error(p->id, local_err); 10753ab4441dSPeter Xu multifd_send_set_error(local_err); 10762576ae48SFabiano Rosas /* 10779221e3c6SPeter Xu * For error cases (TLS or non-TLS), IO channel is always freed here 10789221e3c6SPeter Xu * rather than when cleanup multifd: since p->c is not set, multifd 10799221e3c6SPeter Xu * cleanup code doesn't even know its existence. 10802576ae48SFabiano Rosas */ 108115f3f21dSPeter Xu object_unref(OBJECT(ioc)); 108215f3f21dSPeter Xu error_free(local_err); 10830e92f644SFabiano Rosas } 10840e92f644SFabiano Rosas 1085b7b03eb6SFabiano Rosas static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 10860e92f644SFabiano Rosas { 1087b7b03eb6SFabiano Rosas if (!multifd_use_packets()) { 1088b7b03eb6SFabiano Rosas return file_send_channel_create(opaque, errp); 1089b7b03eb6SFabiano Rosas } 1090b7b03eb6SFabiano Rosas 10910e92f644SFabiano Rosas socket_send_channel_create(multifd_new_send_channel_async, opaque); 1092b7b03eb6SFabiano Rosas return true; 1093d32ca5adSJuan Quintela } 1094d32ca5adSJuan Quintela 1095bd8b0a8fSFabiano Rosas bool multifd_send_setup(void) 1096d32ca5adSJuan Quintela { 1097bd8b0a8fSFabiano Rosas MigrationState *s = migrate_get_current(); 1098bd8b0a8fSFabiano Rosas Error *local_err = NULL; 1099bd8b0a8fSFabiano Rosas int thread_count, ret = 0; 1100d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 110106833d83SFabiano Rosas bool use_packets = multifd_use_packets(); 1102d32ca5adSJuan Quintela uint8_t i; 1103d32ca5adSJuan Quintela 110451b07548SJuan Quintela if (!migrate_multifd()) { 1105bd8b0a8fSFabiano Rosas return true; 1106d32ca5adSJuan Quintela } 1107b7acd657SLi Zhijian 1108d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 1109d32ca5adSJuan Quintela multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1110d32ca5adSJuan Quintela multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1111d32ca5adSJuan Quintela multifd_send_state->pages = multifd_pages_init(page_count); 111293fa9dc2SFabiano Rosas qemu_sem_init(&multifd_send_state->channels_created, 0); 1113d32ca5adSJuan Quintela qemu_sem_init(&multifd_send_state->channels_ready, 0); 1114d73415a3SStefan Hajnoczi qatomic_set(&multifd_send_state->exiting, 0); 1115ab7cbb0bSJuan Quintela multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1116d32ca5adSJuan Quintela 1117d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 1118d32ca5adSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 1119d32ca5adSJuan Quintela 1120d32ca5adSJuan Quintela qemu_sem_init(&p->sem, 0); 1121d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 1122d32ca5adSJuan Quintela p->id = i; 1123d32ca5adSJuan Quintela p->pages = multifd_pages_init(page_count); 112406833d83SFabiano Rosas 112506833d83SFabiano Rosas if (use_packets) { 1126d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 1127d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 1128d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 1129d32ca5adSJuan Quintela p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1130d32ca5adSJuan Quintela p->packet->version = cpu_to_be32(MULTIFD_VERSION); 113106833d83SFabiano Rosas 1132d48c3a04SJuan Quintela /* We need one extra place for the packet header */ 1133d48c3a04SJuan Quintela p->iov = g_new0(struct iovec, page_count + 1); 113406833d83SFabiano Rosas } else { 113506833d83SFabiano Rosas p->iov = g_new0(struct iovec, page_count); 113606833d83SFabiano Rosas } 113706833d83SFabiano Rosas p->name = g_strdup_printf("multifdsend_%d", i); 1138ddec20f8SJuan Quintela p->page_size = qemu_target_page_size(); 1139d6f45ebaSJuan Quintela p->page_count = page_count; 11405b1d9babSLeonardo Bras p->write_flags = 0; 1141b7b03eb6SFabiano Rosas 1142b7b03eb6SFabiano Rosas if (!multifd_new_send_channel_create(p, &local_err)) { 1143b7b03eb6SFabiano Rosas return false; 1144b7b03eb6SFabiano Rosas } 1145d32ca5adSJuan Quintela } 1146ab7cbb0bSJuan Quintela 114793fa9dc2SFabiano Rosas /* 114893fa9dc2SFabiano Rosas * Wait until channel creation has started for all channels. The 114993fa9dc2SFabiano Rosas * creation can still fail, but no more channels will be created 115093fa9dc2SFabiano Rosas * past this point. 115193fa9dc2SFabiano Rosas */ 115293fa9dc2SFabiano Rosas for (i = 0; i < thread_count; i++) { 115393fa9dc2SFabiano Rosas qemu_sem_wait(&multifd_send_state->channels_created); 115493fa9dc2SFabiano Rosas } 115593fa9dc2SFabiano Rosas 1156ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 1157ab7cbb0bSJuan Quintela MultiFDSendParams *p = &multifd_send_state->params[i]; 1158ab7cbb0bSJuan Quintela 1159bd8b0a8fSFabiano Rosas ret = multifd_send_state->ops->send_setup(p, &local_err); 1160ab7cbb0bSJuan Quintela if (ret) { 1161bd8b0a8fSFabiano Rosas break; 1162ab7cbb0bSJuan Quintela } 1163ab7cbb0bSJuan Quintela } 1164bd8b0a8fSFabiano Rosas 1165bd8b0a8fSFabiano Rosas if (ret) { 1166bd8b0a8fSFabiano Rosas migrate_set_error(s, local_err); 1167bd8b0a8fSFabiano Rosas error_report_err(local_err); 1168bd8b0a8fSFabiano Rosas migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1169bd8b0a8fSFabiano Rosas MIGRATION_STATUS_FAILED); 1170bd8b0a8fSFabiano Rosas return false; 1171bd8b0a8fSFabiano Rosas } 1172bd8b0a8fSFabiano Rosas 1173bd8b0a8fSFabiano Rosas return true; 1174d32ca5adSJuan Quintela } 1175d32ca5adSJuan Quintela 1176d117ed06SFabiano Rosas bool multifd_recv(void) 1177d117ed06SFabiano Rosas { 1178d117ed06SFabiano Rosas int i; 1179d117ed06SFabiano Rosas static int next_recv_channel; 1180d117ed06SFabiano Rosas MultiFDRecvParams *p = NULL; 1181d117ed06SFabiano Rosas MultiFDRecvData *data = multifd_recv_state->data; 1182d117ed06SFabiano Rosas 1183d117ed06SFabiano Rosas /* 1184d117ed06SFabiano Rosas * next_channel can remain from a previous migration that was 1185d117ed06SFabiano Rosas * using more channels, so ensure it doesn't overflow if the 1186d117ed06SFabiano Rosas * limit is lower now. 1187d117ed06SFabiano Rosas */ 1188d117ed06SFabiano Rosas next_recv_channel %= migrate_multifd_channels(); 1189d117ed06SFabiano Rosas for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1190d117ed06SFabiano Rosas if (multifd_recv_should_exit()) { 1191d117ed06SFabiano Rosas return false; 1192d117ed06SFabiano Rosas } 1193d117ed06SFabiano Rosas 1194d117ed06SFabiano Rosas p = &multifd_recv_state->params[i]; 1195d117ed06SFabiano Rosas 1196d117ed06SFabiano Rosas if (qatomic_read(&p->pending_job) == false) { 1197d117ed06SFabiano Rosas next_recv_channel = (i + 1) % migrate_multifd_channels(); 1198d117ed06SFabiano Rosas break; 1199d117ed06SFabiano Rosas } 1200d117ed06SFabiano Rosas } 1201d117ed06SFabiano Rosas 1202d117ed06SFabiano Rosas /* 1203d117ed06SFabiano Rosas * Order pending_job read before manipulating p->data below. Pairs 1204d117ed06SFabiano Rosas * with qatomic_store_release() at multifd_recv_thread(). 1205d117ed06SFabiano Rosas */ 1206d117ed06SFabiano Rosas smp_mb_acquire(); 1207d117ed06SFabiano Rosas 1208d117ed06SFabiano Rosas assert(!p->data->size); 1209d117ed06SFabiano Rosas multifd_recv_state->data = p->data; 1210d117ed06SFabiano Rosas p->data = data; 1211d117ed06SFabiano Rosas 1212d117ed06SFabiano Rosas /* 1213d117ed06SFabiano Rosas * Order p->data update before setting pending_job. Pairs with 1214d117ed06SFabiano Rosas * qatomic_load_acquire() at multifd_recv_thread(). 1215d117ed06SFabiano Rosas */ 1216d117ed06SFabiano Rosas qatomic_store_release(&p->pending_job, true); 1217d117ed06SFabiano Rosas qemu_sem_post(&p->sem); 1218d117ed06SFabiano Rosas 1219d117ed06SFabiano Rosas return true; 1220d117ed06SFabiano Rosas } 1221d117ed06SFabiano Rosas 1222d117ed06SFabiano Rosas MultiFDRecvData *multifd_get_recv_data(void) 1223d117ed06SFabiano Rosas { 1224d117ed06SFabiano Rosas return multifd_recv_state->data; 1225d117ed06SFabiano Rosas } 1226d117ed06SFabiano Rosas 1227d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err) 1228d32ca5adSJuan Quintela { 1229d32ca5adSJuan Quintela int i; 1230d32ca5adSJuan Quintela 1231d32ca5adSJuan Quintela trace_multifd_recv_terminate_threads(err != NULL); 1232d32ca5adSJuan Quintela 123311dd7be5SFabiano Rosas if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 123411dd7be5SFabiano Rosas return; 123511dd7be5SFabiano Rosas } 123611dd7be5SFabiano Rosas 1237d32ca5adSJuan Quintela if (err) { 1238d32ca5adSJuan Quintela MigrationState *s = migrate_get_current(); 1239d32ca5adSJuan Quintela migrate_set_error(s, err); 1240d32ca5adSJuan Quintela if (s->state == MIGRATION_STATUS_SETUP || 1241d32ca5adSJuan Quintela s->state == MIGRATION_STATUS_ACTIVE) { 1242d32ca5adSJuan Quintela migrate_set_state(&s->state, s->state, 1243d32ca5adSJuan Quintela MIGRATION_STATUS_FAILED); 1244d32ca5adSJuan Quintela } 1245d32ca5adSJuan Quintela } 1246d32ca5adSJuan Quintela 1247d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1248d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1249d32ca5adSJuan Quintela 1250d32ca5adSJuan Quintela /* 1251d117ed06SFabiano Rosas * The migration thread and channels interact differently 1252d117ed06SFabiano Rosas * depending on the presence of packets. 1253d13f0026SFabiano Rosas */ 125406833d83SFabiano Rosas if (multifd_use_packets()) { 1255d117ed06SFabiano Rosas /* 1256d117ed06SFabiano Rosas * The channel receives as long as there are packets. When 1257d117ed06SFabiano Rosas * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1258d117ed06SFabiano Rosas * channel waits for the migration thread to sync. If the 1259d117ed06SFabiano Rosas * sync never happens, do it here. 1260d117ed06SFabiano Rosas */ 1261d13f0026SFabiano Rosas qemu_sem_post(&p->sem_sync); 1262d117ed06SFabiano Rosas } else { 1263d117ed06SFabiano Rosas /* 1264d117ed06SFabiano Rosas * The channel waits for the migration thread to give it 1265d117ed06SFabiano Rosas * work. When the migration thread runs out of work, it 1266d117ed06SFabiano Rosas * releases the channel and waits for any pending work to 1267d117ed06SFabiano Rosas * finish. If we reach here (e.g. due to error) before the 1268d117ed06SFabiano Rosas * work runs out, release the channel. 1269d117ed06SFabiano Rosas */ 1270d117ed06SFabiano Rosas qemu_sem_post(&p->sem); 127106833d83SFabiano Rosas } 1272d13f0026SFabiano Rosas 1273d13f0026SFabiano Rosas /* 1274d32ca5adSJuan Quintela * We could arrive here for two reasons: 1275d32ca5adSJuan Quintela * - normal quit, i.e. everything went fine, just finished 1276d32ca5adSJuan Quintela * - error quit: We close the channels so the channel threads 1277d32ca5adSJuan Quintela * finish the qio_channel_read_all_eof() 1278d32ca5adSJuan Quintela */ 1279d32ca5adSJuan Quintela if (p->c) { 1280d32ca5adSJuan Quintela qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1281d32ca5adSJuan Quintela } 1282d32ca5adSJuan Quintela } 1283d32ca5adSJuan Quintela } 1284d32ca5adSJuan Quintela 1285cde85c37SPeter Xu void multifd_recv_shutdown(void) 1286cfc3bcf3SLeonardo Bras { 128751b07548SJuan Quintela if (migrate_multifd()) { 1288cfc3bcf3SLeonardo Bras multifd_recv_terminate_threads(NULL); 1289cfc3bcf3SLeonardo Bras } 1290cfc3bcf3SLeonardo Bras } 1291cfc3bcf3SLeonardo Bras 12925e6ea8a1SPeter Xu static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 12935e6ea8a1SPeter Xu { 12945e6ea8a1SPeter Xu migration_ioc_unregister_yank(p->c); 12955e6ea8a1SPeter Xu object_unref(OBJECT(p->c)); 12965e6ea8a1SPeter Xu p->c = NULL; 12975e6ea8a1SPeter Xu qemu_mutex_destroy(&p->mutex); 12985e6ea8a1SPeter Xu qemu_sem_destroy(&p->sem_sync); 1299d117ed06SFabiano Rosas qemu_sem_destroy(&p->sem); 13005e6ea8a1SPeter Xu g_free(p->name); 13015e6ea8a1SPeter Xu p->name = NULL; 13025e6ea8a1SPeter Xu p->packet_len = 0; 13035e6ea8a1SPeter Xu g_free(p->packet); 13045e6ea8a1SPeter Xu p->packet = NULL; 13055e6ea8a1SPeter Xu g_free(p->iov); 13065e6ea8a1SPeter Xu p->iov = NULL; 13075e6ea8a1SPeter Xu g_free(p->normal); 13085e6ea8a1SPeter Xu p->normal = NULL; 13095e6ea8a1SPeter Xu multifd_recv_state->ops->recv_cleanup(p); 13105e6ea8a1SPeter Xu } 13115e6ea8a1SPeter Xu 13125e6ea8a1SPeter Xu static void multifd_recv_cleanup_state(void) 13135e6ea8a1SPeter Xu { 13145e6ea8a1SPeter Xu qemu_sem_destroy(&multifd_recv_state->sem_sync); 13155e6ea8a1SPeter Xu g_free(multifd_recv_state->params); 13165e6ea8a1SPeter Xu multifd_recv_state->params = NULL; 1317d117ed06SFabiano Rosas g_free(multifd_recv_state->data); 1318d117ed06SFabiano Rosas multifd_recv_state->data = NULL; 13195e6ea8a1SPeter Xu g_free(multifd_recv_state); 13205e6ea8a1SPeter Xu multifd_recv_state = NULL; 13215e6ea8a1SPeter Xu } 13225e6ea8a1SPeter Xu 1323cde85c37SPeter Xu void multifd_recv_cleanup(void) 1324d32ca5adSJuan Quintela { 1325d32ca5adSJuan Quintela int i; 1326d32ca5adSJuan Quintela 132751b07548SJuan Quintela if (!migrate_multifd()) { 1328e5bac1f5SLeonardo Bras return; 1329d32ca5adSJuan Quintela } 1330d32ca5adSJuan Quintela multifd_recv_terminate_threads(NULL); 1331d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 1332d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1333d32ca5adSJuan Quintela 1334a2a63c4aSFabiano Rosas if (p->thread_created) { 133510351fbaSLeonardo Bras qemu_thread_join(&p->thread); 1336d32ca5adSJuan Quintela } 1337a2a63c4aSFabiano Rosas } 1338d32ca5adSJuan Quintela for (i = 0; i < migrate_multifd_channels(); i++) { 13395e6ea8a1SPeter Xu multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1340d32ca5adSJuan Quintela } 13415e6ea8a1SPeter Xu multifd_recv_cleanup_state(); 1342d32ca5adSJuan Quintela } 1343d32ca5adSJuan Quintela 1344d32ca5adSJuan Quintela void multifd_recv_sync_main(void) 1345d32ca5adSJuan Quintela { 13464aac6b1eSFabiano Rosas int thread_count = migrate_multifd_channels(); 1347a49d15a3SFabiano Rosas bool file_based = !multifd_use_packets(); 1348d32ca5adSJuan Quintela int i; 1349d32ca5adSJuan Quintela 1350a49d15a3SFabiano Rosas if (!migrate_multifd()) { 1351d32ca5adSJuan Quintela return; 1352d32ca5adSJuan Quintela } 1353d32ca5adSJuan Quintela 13544aac6b1eSFabiano Rosas /* 1355a49d15a3SFabiano Rosas * File-based channels don't use packets and therefore need to 1356a49d15a3SFabiano Rosas * wait for more work. Release them to start the sync. 1357a49d15a3SFabiano Rosas */ 1358a49d15a3SFabiano Rosas if (file_based) { 1359a49d15a3SFabiano Rosas for (i = 0; i < thread_count; i++) { 1360a49d15a3SFabiano Rosas MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1361a49d15a3SFabiano Rosas 1362a49d15a3SFabiano Rosas trace_multifd_recv_sync_main_signal(p->id); 1363a49d15a3SFabiano Rosas qemu_sem_post(&p->sem); 1364a49d15a3SFabiano Rosas } 1365a49d15a3SFabiano Rosas } 1366a49d15a3SFabiano Rosas 1367a49d15a3SFabiano Rosas /* 13684aac6b1eSFabiano Rosas * Initiate the synchronization by waiting for all channels. 1369a49d15a3SFabiano Rosas * 13704aac6b1eSFabiano Rosas * For socket-based migration this means each channel has received 13714aac6b1eSFabiano Rosas * the SYNC packet on the stream. 1372a49d15a3SFabiano Rosas * 1373a49d15a3SFabiano Rosas * For file-based migration this means each channel is done with 1374a49d15a3SFabiano Rosas * the work (pending_job=false). 13754aac6b1eSFabiano Rosas */ 13764aac6b1eSFabiano Rosas for (i = 0; i < thread_count; i++) { 13774aac6b1eSFabiano Rosas trace_multifd_recv_sync_main_wait(i); 1378d32ca5adSJuan Quintela qemu_sem_wait(&multifd_recv_state->sem_sync); 1379d32ca5adSJuan Quintela } 13804aac6b1eSFabiano Rosas 1381a49d15a3SFabiano Rosas if (file_based) { 1382a49d15a3SFabiano Rosas /* 1383a49d15a3SFabiano Rosas * For file-based loading is done in one iteration. We're 1384a49d15a3SFabiano Rosas * done. 1385a49d15a3SFabiano Rosas */ 1386a49d15a3SFabiano Rosas return; 1387a49d15a3SFabiano Rosas } 1388a49d15a3SFabiano Rosas 13894aac6b1eSFabiano Rosas /* 13904aac6b1eSFabiano Rosas * Sync done. Release the channels for the next iteration. 13914aac6b1eSFabiano Rosas */ 13924aac6b1eSFabiano Rosas for (i = 0; i < thread_count; i++) { 1393d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1394d32ca5adSJuan Quintela 13956e8a355dSDaniel Brodsky WITH_QEMU_LOCK_GUARD(&p->mutex) { 1396d32ca5adSJuan Quintela if (multifd_recv_state->packet_num < p->packet_num) { 1397d32ca5adSJuan Quintela multifd_recv_state->packet_num = p->packet_num; 1398d32ca5adSJuan Quintela } 13996e8a355dSDaniel Brodsky } 1400d32ca5adSJuan Quintela trace_multifd_recv_sync_main_signal(p->id); 1401d32ca5adSJuan Quintela qemu_sem_post(&p->sem_sync); 1402d32ca5adSJuan Quintela } 1403d32ca5adSJuan Quintela trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1404d32ca5adSJuan Quintela } 1405d32ca5adSJuan Quintela 1406d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque) 1407d32ca5adSJuan Quintela { 1408d32ca5adSJuan Quintela MultiFDRecvParams *p = opaque; 1409d32ca5adSJuan Quintela Error *local_err = NULL; 141006833d83SFabiano Rosas bool use_packets = multifd_use_packets(); 1411d32ca5adSJuan Quintela int ret; 1412d32ca5adSJuan Quintela 1413d32ca5adSJuan Quintela trace_multifd_recv_thread_start(p->id); 1414d32ca5adSJuan Quintela rcu_register_thread(); 1415d32ca5adSJuan Quintela 1416d32ca5adSJuan Quintela while (true) { 141706833d83SFabiano Rosas uint32_t flags = 0; 14189db19125SFabiano Rosas bool has_data = false; 14199db19125SFabiano Rosas p->normal_num = 0; 1420d32ca5adSJuan Quintela 1421d117ed06SFabiano Rosas if (use_packets) { 142211dd7be5SFabiano Rosas if (multifd_recv_should_exit()) { 1423d32ca5adSJuan Quintela break; 1424d32ca5adSJuan Quintela } 1425d32ca5adSJuan Quintela 1426d32ca5adSJuan Quintela ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1427d32ca5adSJuan Quintela p->packet_len, &local_err); 1428bca762c2SLi Zhang if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1429d32ca5adSJuan Quintela break; 1430d32ca5adSJuan Quintela } 1431d32ca5adSJuan Quintela 1432d32ca5adSJuan Quintela qemu_mutex_lock(&p->mutex); 1433d32ca5adSJuan Quintela ret = multifd_recv_unfill_packet(p, &local_err); 1434d32ca5adSJuan Quintela if (ret) { 1435d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1436d32ca5adSJuan Quintela break; 1437d32ca5adSJuan Quintela } 1438d32ca5adSJuan Quintela 1439d32ca5adSJuan Quintela flags = p->flags; 1440ab7cbb0bSJuan Quintela /* recv methods don't know how to handle the SYNC flag */ 1441ab7cbb0bSJuan Quintela p->flags &= ~MULTIFD_FLAG_SYNC; 14429db19125SFabiano Rosas has_data = !!p->normal_num; 1443d32ca5adSJuan Quintela qemu_mutex_unlock(&p->mutex); 1444d117ed06SFabiano Rosas } else { 1445d117ed06SFabiano Rosas /* 1446d117ed06SFabiano Rosas * No packets, so we need to wait for the vmstate code to 1447d117ed06SFabiano Rosas * give us work. 1448d117ed06SFabiano Rosas */ 1449d117ed06SFabiano Rosas qemu_sem_wait(&p->sem); 1450d117ed06SFabiano Rosas 1451d117ed06SFabiano Rosas if (multifd_recv_should_exit()) { 1452d117ed06SFabiano Rosas break; 1453d117ed06SFabiano Rosas } 1454d117ed06SFabiano Rosas 1455d117ed06SFabiano Rosas /* pairs with qatomic_store_release() at multifd_recv() */ 1456d117ed06SFabiano Rosas if (!qatomic_load_acquire(&p->pending_job)) { 1457d117ed06SFabiano Rosas /* 1458d117ed06SFabiano Rosas * Migration thread did not send work, this is 1459d117ed06SFabiano Rosas * equivalent to pending_sync on the sending 1460d117ed06SFabiano Rosas * side. Post sem_sync to notify we reached this 1461d117ed06SFabiano Rosas * point. 1462d117ed06SFabiano Rosas */ 1463d117ed06SFabiano Rosas qemu_sem_post(&multifd_recv_state->sem_sync); 1464d117ed06SFabiano Rosas continue; 1465d117ed06SFabiano Rosas } 1466d117ed06SFabiano Rosas 1467d117ed06SFabiano Rosas has_data = !!p->data->size; 146806833d83SFabiano Rosas } 1469d32ca5adSJuan Quintela 14709db19125SFabiano Rosas if (has_data) { 14719db19125SFabiano Rosas ret = multifd_recv_state->ops->recv(p, &local_err); 1472d32ca5adSJuan Quintela if (ret != 0) { 1473d32ca5adSJuan Quintela break; 1474d32ca5adSJuan Quintela } 1475d32ca5adSJuan Quintela } 1476d32ca5adSJuan Quintela 147706833d83SFabiano Rosas if (use_packets) { 1478d32ca5adSJuan Quintela if (flags & MULTIFD_FLAG_SYNC) { 1479d32ca5adSJuan Quintela qemu_sem_post(&multifd_recv_state->sem_sync); 1480d32ca5adSJuan Quintela qemu_sem_wait(&p->sem_sync); 1481d32ca5adSJuan Quintela } 1482d117ed06SFabiano Rosas } else { 1483d117ed06SFabiano Rosas p->total_normal_pages += p->data->size / qemu_target_page_size(); 1484d117ed06SFabiano Rosas p->data->size = 0; 1485d117ed06SFabiano Rosas /* 1486d117ed06SFabiano Rosas * Order data->size update before clearing 1487d117ed06SFabiano Rosas * pending_job. Pairs with smp_mb_acquire() at 1488d117ed06SFabiano Rosas * multifd_recv(). 1489d117ed06SFabiano Rosas */ 1490d117ed06SFabiano Rosas qatomic_store_release(&p->pending_job, false); 1491d32ca5adSJuan Quintela } 149206833d83SFabiano Rosas } 1493d32ca5adSJuan Quintela 1494d32ca5adSJuan Quintela if (local_err) { 1495d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 149613f2cb21SPan Nengyuan error_free(local_err); 1497d32ca5adSJuan Quintela } 1498d32ca5adSJuan Quintela 1499d32ca5adSJuan Quintela rcu_unregister_thread(); 150005b7ec18SPeter Xu trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages); 1501d32ca5adSJuan Quintela 1502d32ca5adSJuan Quintela return NULL; 1503d32ca5adSJuan Quintela } 1504d32ca5adSJuan Quintela 1505cde85c37SPeter Xu int multifd_recv_setup(Error **errp) 1506d32ca5adSJuan Quintela { 1507d32ca5adSJuan Quintela int thread_count; 1508d32ca5adSJuan Quintela uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 150906833d83SFabiano Rosas bool use_packets = multifd_use_packets(); 1510d32ca5adSJuan Quintela uint8_t i; 1511d32ca5adSJuan Quintela 15126720c2b3Smanish.mishra /* 15136720c2b3Smanish.mishra * Return successfully if multiFD recv state is already initialised 15146720c2b3Smanish.mishra * or multiFD is not enabled. 15156720c2b3Smanish.mishra */ 151651b07548SJuan Quintela if (multifd_recv_state || !migrate_multifd()) { 1517d32ca5adSJuan Quintela return 0; 1518d32ca5adSJuan Quintela } 15196720c2b3Smanish.mishra 1520d32ca5adSJuan Quintela thread_count = migrate_multifd_channels(); 1521d32ca5adSJuan Quintela multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1522d32ca5adSJuan Quintela multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1523d117ed06SFabiano Rosas 1524d117ed06SFabiano Rosas multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1525d117ed06SFabiano Rosas multifd_recv_state->data->size = 0; 1526d117ed06SFabiano Rosas 1527d73415a3SStefan Hajnoczi qatomic_set(&multifd_recv_state->count, 0); 152811dd7be5SFabiano Rosas qatomic_set(&multifd_recv_state->exiting, 0); 1529d32ca5adSJuan Quintela qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1530ab7cbb0bSJuan Quintela multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1531d32ca5adSJuan Quintela 1532d32ca5adSJuan Quintela for (i = 0; i < thread_count; i++) { 1533d32ca5adSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1534d32ca5adSJuan Quintela 1535d32ca5adSJuan Quintela qemu_mutex_init(&p->mutex); 1536d32ca5adSJuan Quintela qemu_sem_init(&p->sem_sync, 0); 1537d117ed06SFabiano Rosas qemu_sem_init(&p->sem, 0); 1538d117ed06SFabiano Rosas p->pending_job = false; 1539d32ca5adSJuan Quintela p->id = i; 154006833d83SFabiano Rosas 1541d117ed06SFabiano Rosas p->data = g_new0(MultiFDRecvData, 1); 1542d117ed06SFabiano Rosas p->data->size = 0; 1543d117ed06SFabiano Rosas 154406833d83SFabiano Rosas if (use_packets) { 1545d32ca5adSJuan Quintela p->packet_len = sizeof(MultiFDPacket_t) 1546d32ca5adSJuan Quintela + sizeof(uint64_t) * page_count; 1547d32ca5adSJuan Quintela p->packet = g_malloc0(p->packet_len); 154806833d83SFabiano Rosas } 1549d32ca5adSJuan Quintela p->name = g_strdup_printf("multifdrecv_%d", i); 1550226468baSJuan Quintela p->iov = g_new0(struct iovec, page_count); 1551cf2d4aa8SJuan Quintela p->normal = g_new0(ram_addr_t, page_count); 1552d6f45ebaSJuan Quintela p->page_count = page_count; 1553ddec20f8SJuan Quintela p->page_size = qemu_target_page_size(); 1554d32ca5adSJuan Quintela } 1555ab7cbb0bSJuan Quintela 1556ab7cbb0bSJuan Quintela for (i = 0; i < thread_count; i++) { 1557ab7cbb0bSJuan Quintela MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1558ab7cbb0bSJuan Quintela int ret; 1559ab7cbb0bSJuan Quintela 15603fc58efaSAvihai Horon ret = multifd_recv_state->ops->recv_setup(p, errp); 1561ab7cbb0bSJuan Quintela if (ret) { 1562ab7cbb0bSJuan Quintela return ret; 1563ab7cbb0bSJuan Quintela } 1564ab7cbb0bSJuan Quintela } 1565d32ca5adSJuan Quintela return 0; 1566d32ca5adSJuan Quintela } 1567d32ca5adSJuan Quintela 1568d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void) 1569d32ca5adSJuan Quintela { 1570d32ca5adSJuan Quintela int thread_count = migrate_multifd_channels(); 1571d32ca5adSJuan Quintela 157251b07548SJuan Quintela if (!migrate_multifd()) { 1573d32ca5adSJuan Quintela return true; 1574d32ca5adSJuan Quintela } 1575d32ca5adSJuan Quintela 1576a59136f3SDr. David Alan Gilbert if (!multifd_recv_state) { 1577a59136f3SDr. David Alan Gilbert /* Called before any connections created */ 1578a59136f3SDr. David Alan Gilbert return false; 1579a59136f3SDr. David Alan Gilbert } 1580a59136f3SDr. David Alan Gilbert 1581d73415a3SStefan Hajnoczi return thread_count == qatomic_read(&multifd_recv_state->count); 1582d32ca5adSJuan Quintela } 1583d32ca5adSJuan Quintela 1584d32ca5adSJuan Quintela /* 1585d32ca5adSJuan Quintela * Try to receive all multifd channels to get ready for the migration. 15866720c2b3Smanish.mishra * Sets @errp when failing to receive the current channel. 1587d32ca5adSJuan Quintela */ 15886720c2b3Smanish.mishra void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1589d32ca5adSJuan Quintela { 1590d32ca5adSJuan Quintela MultiFDRecvParams *p; 1591d32ca5adSJuan Quintela Error *local_err = NULL; 159206833d83SFabiano Rosas bool use_packets = multifd_use_packets(); 1593d32ca5adSJuan Quintela int id; 1594d32ca5adSJuan Quintela 159506833d83SFabiano Rosas if (use_packets) { 1596d32ca5adSJuan Quintela id = multifd_recv_initial_packet(ioc, &local_err); 1597d32ca5adSJuan Quintela if (id < 0) { 1598d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1599d32ca5adSJuan Quintela error_propagate_prepend(errp, local_err, 1600d32ca5adSJuan Quintela "failed to receive packet" 1601d32ca5adSJuan Quintela " via multifd channel %d: ", 1602d73415a3SStefan Hajnoczi qatomic_read(&multifd_recv_state->count)); 16036720c2b3Smanish.mishra return; 1604d32ca5adSJuan Quintela } 1605d32ca5adSJuan Quintela trace_multifd_recv_new_channel(id); 160606833d83SFabiano Rosas } else { 16072dd7ee7aSFabiano Rosas id = qatomic_read(&multifd_recv_state->count); 160806833d83SFabiano Rosas } 1609d32ca5adSJuan Quintela 1610d32ca5adSJuan Quintela p = &multifd_recv_state->params[id]; 1611d32ca5adSJuan Quintela if (p->c != NULL) { 1612d32ca5adSJuan Quintela error_setg(&local_err, "multifd: received id '%d' already setup'", 1613d32ca5adSJuan Quintela id); 1614d32ca5adSJuan Quintela multifd_recv_terminate_threads(local_err); 1615d32ca5adSJuan Quintela error_propagate(errp, local_err); 16166720c2b3Smanish.mishra return; 1617d32ca5adSJuan Quintela } 1618d32ca5adSJuan Quintela p->c = ioc; 1619d32ca5adSJuan Quintela object_ref(OBJECT(ioc)); 1620d32ca5adSJuan Quintela 1621a2a63c4aSFabiano Rosas p->thread_created = true; 1622d32ca5adSJuan Quintela qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1623d32ca5adSJuan Quintela QEMU_THREAD_JOINABLE); 1624d73415a3SStefan Hajnoczi qatomic_inc(&multifd_recv_state->count); 1625d32ca5adSJuan Quintela } 1626