1*adf684ceSLaurent Vivier /* 2*adf684ceSLaurent Vivier * net stream generic functions 3*adf684ceSLaurent Vivier * 4*adf684ceSLaurent Vivier * Copyright Red Hat 5*adf684ceSLaurent Vivier * 6*adf684ceSLaurent Vivier * SPDX-License-Identifier: GPL-2.0-or-later 7*adf684ceSLaurent Vivier */ 8*adf684ceSLaurent Vivier 9*adf684ceSLaurent Vivier #include "qemu/osdep.h" 10*adf684ceSLaurent Vivier #include "qemu/iov.h" 11*adf684ceSLaurent Vivier #include "qapi/error.h" 12*adf684ceSLaurent Vivier #include "net/net.h" 13*adf684ceSLaurent Vivier #include "io/channel.h" 14*adf684ceSLaurent Vivier #include "io/net-listener.h" 15*adf684ceSLaurent Vivier 16*adf684ceSLaurent Vivier #include "stream_data.h" 17*adf684ceSLaurent Vivier 18*adf684ceSLaurent Vivier static gboolean net_stream_data_writable(QIOChannel *ioc, 19*adf684ceSLaurent Vivier GIOCondition condition, gpointer data) 20*adf684ceSLaurent Vivier { 21*adf684ceSLaurent Vivier NetStreamData *d = data; 22*adf684ceSLaurent Vivier 23*adf684ceSLaurent Vivier d->ioc_write_tag = 0; 24*adf684ceSLaurent Vivier 25*adf684ceSLaurent Vivier qemu_flush_queued_packets(&d->nc); 26*adf684ceSLaurent Vivier 27*adf684ceSLaurent Vivier return G_SOURCE_REMOVE; 28*adf684ceSLaurent Vivier } 29*adf684ceSLaurent Vivier 30*adf684ceSLaurent Vivier ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf, 31*adf684ceSLaurent Vivier size_t size) 32*adf684ceSLaurent Vivier { 33*adf684ceSLaurent Vivier uint32_t len = htonl(size); 34*adf684ceSLaurent Vivier struct iovec iov[] = { 35*adf684ceSLaurent Vivier { 36*adf684ceSLaurent Vivier .iov_base = &len, 37*adf684ceSLaurent Vivier .iov_len = sizeof(len), 38*adf684ceSLaurent Vivier }, { 39*adf684ceSLaurent Vivier .iov_base = (void *)buf, 40*adf684ceSLaurent Vivier .iov_len = size, 41*adf684ceSLaurent Vivier }, 42*adf684ceSLaurent Vivier }; 43*adf684ceSLaurent Vivier struct iovec local_iov[2]; 44*adf684ceSLaurent Vivier unsigned int nlocal_iov; 45*adf684ceSLaurent Vivier size_t remaining; 46*adf684ceSLaurent Vivier ssize_t ret; 47*adf684ceSLaurent Vivier 48*adf684ceSLaurent Vivier remaining = iov_size(iov, 2) - d->send_index; 49*adf684ceSLaurent Vivier nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining); 50*adf684ceSLaurent Vivier ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL); 51*adf684ceSLaurent Vivier if (ret == QIO_CHANNEL_ERR_BLOCK) { 52*adf684ceSLaurent Vivier ret = 0; /* handled further down */ 53*adf684ceSLaurent Vivier } 54*adf684ceSLaurent Vivier if (ret == -1) { 55*adf684ceSLaurent Vivier d->send_index = 0; 56*adf684ceSLaurent Vivier return -errno; 57*adf684ceSLaurent Vivier } 58*adf684ceSLaurent Vivier if (ret < (ssize_t)remaining) { 59*adf684ceSLaurent Vivier d->send_index += ret; 60*adf684ceSLaurent Vivier d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT, 61*adf684ceSLaurent Vivier net_stream_data_writable, d, 62*adf684ceSLaurent Vivier NULL); 63*adf684ceSLaurent Vivier return 0; 64*adf684ceSLaurent Vivier } 65*adf684ceSLaurent Vivier d->send_index = 0; 66*adf684ceSLaurent Vivier return size; 67*adf684ceSLaurent Vivier } 68*adf684ceSLaurent Vivier 69*adf684ceSLaurent Vivier static void net_stream_data_send_completed(NetClientState *nc, ssize_t len) 70*adf684ceSLaurent Vivier { 71*adf684ceSLaurent Vivier NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc); 72*adf684ceSLaurent Vivier 73*adf684ceSLaurent Vivier if (!d->ioc_read_tag) { 74*adf684ceSLaurent Vivier d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, 75*adf684ceSLaurent Vivier NULL); 76*adf684ceSLaurent Vivier } 77*adf684ceSLaurent Vivier } 78*adf684ceSLaurent Vivier 79*adf684ceSLaurent Vivier void net_stream_data_rs_finalize(SocketReadState *rs) 80*adf684ceSLaurent Vivier { 81*adf684ceSLaurent Vivier NetStreamData *d = container_of(rs, NetStreamData, rs); 82*adf684ceSLaurent Vivier 83*adf684ceSLaurent Vivier if (qemu_send_packet_async(&d->nc, rs->buf, 84*adf684ceSLaurent Vivier rs->packet_len, 85*adf684ceSLaurent Vivier net_stream_data_send_completed) == 0) { 86*adf684ceSLaurent Vivier if (d->ioc_read_tag) { 87*adf684ceSLaurent Vivier g_source_remove(d->ioc_read_tag); 88*adf684ceSLaurent Vivier d->ioc_read_tag = 0; 89*adf684ceSLaurent Vivier } 90*adf684ceSLaurent Vivier } 91*adf684ceSLaurent Vivier } 92*adf684ceSLaurent Vivier 93*adf684ceSLaurent Vivier gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition, 94*adf684ceSLaurent Vivier NetStreamData *d) 95*adf684ceSLaurent Vivier { 96*adf684ceSLaurent Vivier int size; 97*adf684ceSLaurent Vivier int ret; 98*adf684ceSLaurent Vivier QEMU_UNINITIALIZED char buf1[NET_BUFSIZE]; 99*adf684ceSLaurent Vivier const char *buf; 100*adf684ceSLaurent Vivier 101*adf684ceSLaurent Vivier size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL); 102*adf684ceSLaurent Vivier if (size < 0) { 103*adf684ceSLaurent Vivier if (errno != EWOULDBLOCK) { 104*adf684ceSLaurent Vivier goto eoc; 105*adf684ceSLaurent Vivier } 106*adf684ceSLaurent Vivier } else if (size == 0) { 107*adf684ceSLaurent Vivier /* end of connection */ 108*adf684ceSLaurent Vivier eoc: 109*adf684ceSLaurent Vivier d->ioc_read_tag = 0; 110*adf684ceSLaurent Vivier if (d->ioc_write_tag) { 111*adf684ceSLaurent Vivier g_source_remove(d->ioc_write_tag); 112*adf684ceSLaurent Vivier d->ioc_write_tag = 0; 113*adf684ceSLaurent Vivier } 114*adf684ceSLaurent Vivier if (d->listener) { 115*adf684ceSLaurent Vivier qemu_set_info_str(&d->nc, "listening"); 116*adf684ceSLaurent Vivier qio_net_listener_set_client_func(d->listener, 117*adf684ceSLaurent Vivier d->listen, d, NULL); 118*adf684ceSLaurent Vivier } 119*adf684ceSLaurent Vivier object_unref(OBJECT(d->ioc)); 120*adf684ceSLaurent Vivier d->ioc = NULL; 121*adf684ceSLaurent Vivier 122*adf684ceSLaurent Vivier net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false); 123*adf684ceSLaurent Vivier d->nc.link_down = true; 124*adf684ceSLaurent Vivier 125*adf684ceSLaurent Vivier return G_SOURCE_REMOVE; 126*adf684ceSLaurent Vivier } 127*adf684ceSLaurent Vivier buf = buf1; 128*adf684ceSLaurent Vivier 129*adf684ceSLaurent Vivier ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size); 130*adf684ceSLaurent Vivier 131*adf684ceSLaurent Vivier if (ret == -1) { 132*adf684ceSLaurent Vivier goto eoc; 133*adf684ceSLaurent Vivier } 134*adf684ceSLaurent Vivier 135*adf684ceSLaurent Vivier return G_SOURCE_CONTINUE; 136*adf684ceSLaurent Vivier } 137*adf684ceSLaurent Vivier 138*adf684ceSLaurent Vivier void net_stream_data_listen(QIONetListener *listener, 139*adf684ceSLaurent Vivier QIOChannelSocket *cioc, 140*adf684ceSLaurent Vivier NetStreamData *d) 141*adf684ceSLaurent Vivier { 142*adf684ceSLaurent Vivier object_ref(OBJECT(cioc)); 143*adf684ceSLaurent Vivier 144*adf684ceSLaurent Vivier qio_net_listener_set_client_func(d->listener, NULL, d, NULL); 145*adf684ceSLaurent Vivier 146*adf684ceSLaurent Vivier d->ioc = QIO_CHANNEL(cioc); 147*adf684ceSLaurent Vivier qio_channel_set_name(d->ioc, "stream-server"); 148*adf684ceSLaurent Vivier d->nc.link_down = false; 149*adf684ceSLaurent Vivier 150*adf684ceSLaurent Vivier d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL); 151*adf684ceSLaurent Vivier } 152*adf684ceSLaurent Vivier 153*adf684ceSLaurent Vivier int net_stream_data_client_connected(QIOTask *task, NetStreamData *d) 154*adf684ceSLaurent Vivier { 155*adf684ceSLaurent Vivier QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc); 156*adf684ceSLaurent Vivier SocketAddress *addr; 157*adf684ceSLaurent Vivier int ret; 158*adf684ceSLaurent Vivier Error *err = NULL; 159*adf684ceSLaurent Vivier 160*adf684ceSLaurent Vivier if (qio_task_propagate_error(task, &err)) { 161*adf684ceSLaurent Vivier qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err)); 162*adf684ceSLaurent Vivier error_free(err); 163*adf684ceSLaurent Vivier goto error; 164*adf684ceSLaurent Vivier } 165*adf684ceSLaurent Vivier 166*adf684ceSLaurent Vivier addr = qio_channel_socket_get_remote_address(sioc, NULL); 167*adf684ceSLaurent Vivier g_assert(addr != NULL); 168*adf684ceSLaurent Vivier 169*adf684ceSLaurent Vivier ret = qemu_socket_try_set_nonblock(sioc->fd); 170*adf684ceSLaurent Vivier if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 171*adf684ceSLaurent Vivier qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)", 172*adf684ceSLaurent Vivier addr->u.fd.str, -ret); 173*adf684ceSLaurent Vivier qapi_free_SocketAddress(addr); 174*adf684ceSLaurent Vivier goto error; 175*adf684ceSLaurent Vivier } 176*adf684ceSLaurent Vivier g_assert(ret == 0); 177*adf684ceSLaurent Vivier qapi_free_SocketAddress(addr); 178*adf684ceSLaurent Vivier 179*adf684ceSLaurent Vivier net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false); 180*adf684ceSLaurent Vivier 181*adf684ceSLaurent Vivier /* Disable Nagle algorithm on TCP sockets to reduce latency */ 182*adf684ceSLaurent Vivier qio_channel_set_delay(d->ioc, false); 183*adf684ceSLaurent Vivier 184*adf684ceSLaurent Vivier d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL); 185*adf684ceSLaurent Vivier d->nc.link_down = false; 186*adf684ceSLaurent Vivier 187*adf684ceSLaurent Vivier return 0; 188*adf684ceSLaurent Vivier error: 189*adf684ceSLaurent Vivier object_unref(OBJECT(d->ioc)); 190*adf684ceSLaurent Vivier d->ioc = NULL; 191*adf684ceSLaurent Vivier 192*adf684ceSLaurent Vivier return -1; 193*adf684ceSLaurent Vivier } 194