1 /* 2 * net stream generic functions 3 * 4 * Copyright Red Hat 5 * 6 * SPDX-License-Identifier: GPL-2.0-or-later 7 */ 8 9 #include "qemu/osdep.h" 10 #include "qemu/iov.h" 11 #include "qapi/error.h" 12 #include "net/net.h" 13 #include "io/channel.h" 14 #include "io/net-listener.h" 15 16 #include "stream_data.h" 17 18 static gboolean net_stream_data_writable(QIOChannel *ioc, 19 GIOCondition condition, gpointer data) 20 { 21 NetStreamData *d = data; 22 23 d->ioc_write_tag = 0; 24 25 qemu_flush_queued_packets(&d->nc); 26 27 return G_SOURCE_REMOVE; 28 } 29 30 ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf, 31 size_t size) 32 { 33 uint32_t len = htonl(size); 34 struct iovec iov[] = { 35 { 36 .iov_base = &len, 37 .iov_len = sizeof(len), 38 }, { 39 .iov_base = (void *)buf, 40 .iov_len = size, 41 }, 42 }; 43 struct iovec local_iov[2]; 44 unsigned int nlocal_iov; 45 size_t remaining; 46 ssize_t ret; 47 48 remaining = iov_size(iov, 2) - d->send_index; 49 nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining); 50 ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL); 51 if (ret == QIO_CHANNEL_ERR_BLOCK) { 52 ret = 0; /* handled further down */ 53 } 54 if (ret == -1) { 55 d->send_index = 0; 56 return -errno; 57 } 58 if (ret < (ssize_t)remaining) { 59 d->send_index += ret; 60 d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT, 61 net_stream_data_writable, d, 62 NULL); 63 return 0; 64 } 65 d->send_index = 0; 66 return size; 67 } 68 69 static void net_stream_data_send_completed(NetClientState *nc, ssize_t len) 70 { 71 NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc); 72 73 if (!d->ioc_read_tag) { 74 d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, 75 NULL); 76 } 77 } 78 79 void net_stream_data_rs_finalize(SocketReadState *rs) 80 { 81 NetStreamData *d = container_of(rs, NetStreamData, rs); 82 83 if (qemu_send_packet_async(&d->nc, rs->buf, 84 rs->packet_len, 85 net_stream_data_send_completed) == 0) { 86 if (d->ioc_read_tag) { 87 g_source_remove(d->ioc_read_tag); 88 d->ioc_read_tag = 0; 89 } 90 } 91 } 92 93 gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition, 94 NetStreamData *d) 95 { 96 int size; 97 int ret; 98 QEMU_UNINITIALIZED char buf1[NET_BUFSIZE]; 99 const char *buf; 100 101 size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL); 102 if (size < 0) { 103 if (errno != EWOULDBLOCK) { 104 goto eoc; 105 } 106 } else if (size == 0) { 107 /* end of connection */ 108 eoc: 109 d->ioc_read_tag = 0; 110 if (d->ioc_write_tag) { 111 g_source_remove(d->ioc_write_tag); 112 d->ioc_write_tag = 0; 113 } 114 if (d->listener) { 115 qemu_set_info_str(&d->nc, "listening"); 116 qio_net_listener_set_client_func(d->listener, 117 d->listen, d, NULL); 118 } 119 object_unref(OBJECT(d->ioc)); 120 d->ioc = NULL; 121 122 net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false); 123 d->nc.link_down = true; 124 125 return G_SOURCE_REMOVE; 126 } 127 buf = buf1; 128 129 ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size); 130 131 if (ret == -1) { 132 goto eoc; 133 } 134 135 return G_SOURCE_CONTINUE; 136 } 137 138 void net_stream_data_listen(QIONetListener *listener, 139 QIOChannelSocket *cioc, 140 NetStreamData *d) 141 { 142 object_ref(OBJECT(cioc)); 143 144 qio_net_listener_set_client_func(d->listener, NULL, d, NULL); 145 146 d->ioc = QIO_CHANNEL(cioc); 147 qio_channel_set_name(d->ioc, "stream-server"); 148 d->nc.link_down = false; 149 150 d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL); 151 } 152 153 int net_stream_data_client_connected(QIOTask *task, NetStreamData *d) 154 { 155 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc); 156 SocketAddress *addr; 157 int ret; 158 Error *err = NULL; 159 160 if (qio_task_propagate_error(task, &err)) { 161 qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err)); 162 error_free(err); 163 goto error; 164 } 165 166 addr = qio_channel_socket_get_remote_address(sioc, NULL); 167 g_assert(addr != NULL); 168 169 ret = qemu_socket_try_set_nonblock(sioc->fd); 170 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 171 qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)", 172 addr->u.fd.str, -ret); 173 qapi_free_SocketAddress(addr); 174 goto error; 175 } 176 g_assert(ret == 0); 177 qapi_free_SocketAddress(addr); 178 179 net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false); 180 181 /* Disable Nagle algorithm on TCP sockets to reduce latency */ 182 qio_channel_set_delay(d->ioc, false); 183 184 d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL); 185 d->nc.link_down = false; 186 187 return 0; 188 error: 189 object_unref(OBJECT(d->ioc)); 190 d->ioc = NULL; 191 192 return -1; 193 } 194