1 /* 2 * QEMU System Emulator 3 * 4 * Copyright (c) 2003-2008 Fabrice Bellard 5 * Copyright (c) 2022 Red Hat, Inc. 6 * 7 * Permission is hereby granted, free of charge, to any person obtaining a copy 8 * of this software and associated documentation files (the "Software"), to deal 9 * in the Software without restriction, including without limitation the rights 10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 * copies of the Software, and to permit persons to whom the Software is 12 * furnished to do so, subject to the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be included in 15 * all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23 * THE SOFTWARE. 24 */ 25 26 #include "qemu/osdep.h" 27 28 #include "net/net.h" 29 #include "clients.h" 30 #include "monitor/monitor.h" 31 #include "qapi/error.h" 32 #include "qemu/error-report.h" 33 #include "qemu/option.h" 34 #include "qemu/sockets.h" 35 #include "qemu/iov.h" 36 #include "qemu/main-loop.h" 37 #include "qemu/cutils.h" 38 #include "io/channel.h" 39 #include "io/channel-socket.h" 40 #include "io/net-listener.h" 41 #include "qapi/qapi-events-net.h" 42 43 typedef struct NetStreamState { 44 NetClientState nc; 45 QIOChannel *listen_ioc; 46 QIONetListener *listener; 47 QIOChannel *ioc; 48 guint ioc_read_tag; 49 guint ioc_write_tag; 50 SocketReadState rs; 51 unsigned int send_index; /* number of bytes sent*/ 52 } NetStreamState; 53 54 static void net_stream_listen(QIONetListener *listener, 55 QIOChannelSocket *cioc, 56 void *opaque); 57 58 static gboolean net_stream_writable(QIOChannel *ioc, 59 GIOCondition condition, 60 gpointer data) 61 { 62 NetStreamState *s = data; 63 64 s->ioc_write_tag = 0; 65 66 qemu_flush_queued_packets(&s->nc); 67 68 return G_SOURCE_REMOVE; 69 } 70 71 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf, 72 size_t size) 73 { 74 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 75 uint32_t len = htonl(size); 76 struct iovec iov[] = { 77 { 78 .iov_base = &len, 79 .iov_len = sizeof(len), 80 }, { 81 .iov_base = (void *)buf, 82 .iov_len = size, 83 }, 84 }; 85 struct iovec local_iov[2]; 86 unsigned int nlocal_iov; 87 size_t remaining; 88 ssize_t ret; 89 90 remaining = iov_size(iov, 2) - s->send_index; 91 nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining); 92 ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL); 93 if (ret == QIO_CHANNEL_ERR_BLOCK) { 94 ret = 0; /* handled further down */ 95 } 96 if (ret == -1) { 97 s->send_index = 0; 98 return -errno; 99 } 100 if (ret < (ssize_t)remaining) { 101 s->send_index += ret; 102 s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT, 103 net_stream_writable, s, NULL); 104 return 0; 105 } 106 s->send_index = 0; 107 return size; 108 } 109 110 static gboolean net_stream_send(QIOChannel *ioc, 111 GIOCondition condition, 112 gpointer data); 113 114 static void net_stream_send_completed(NetClientState *nc, ssize_t len) 115 { 116 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 117 118 if (!s->ioc_read_tag) { 119 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, 120 net_stream_send, s, NULL); 121 } 122 } 123 124 static void net_stream_rs_finalize(SocketReadState *rs) 125 { 126 NetStreamState *s = container_of(rs, NetStreamState, rs); 127 128 if (qemu_send_packet_async(&s->nc, rs->buf, 129 rs->packet_len, 130 net_stream_send_completed) == 0) { 131 if (s->ioc_read_tag) { 132 g_source_remove(s->ioc_read_tag); 133 s->ioc_read_tag = 0; 134 } 135 } 136 } 137 138 static gboolean net_stream_send(QIOChannel *ioc, 139 GIOCondition condition, 140 gpointer data) 141 { 142 NetStreamState *s = data; 143 int size; 144 int ret; 145 char buf1[NET_BUFSIZE]; 146 const char *buf; 147 148 size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL); 149 if (size < 0) { 150 if (errno != EWOULDBLOCK) { 151 goto eoc; 152 } 153 } else if (size == 0) { 154 /* end of connection */ 155 eoc: 156 s->ioc_read_tag = 0; 157 if (s->ioc_write_tag) { 158 g_source_remove(s->ioc_write_tag); 159 s->ioc_write_tag = 0; 160 } 161 if (s->listener) { 162 qio_net_listener_set_client_func(s->listener, net_stream_listen, 163 s, NULL); 164 } 165 object_unref(OBJECT(s->ioc)); 166 s->ioc = NULL; 167 168 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 169 s->nc.link_down = true; 170 qemu_set_info_str(&s->nc, "%s", ""); 171 172 qapi_event_send_netdev_stream_disconnected(s->nc.name); 173 174 return G_SOURCE_REMOVE; 175 } 176 buf = buf1; 177 178 ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size); 179 180 if (ret == -1) { 181 goto eoc; 182 } 183 184 return G_SOURCE_CONTINUE; 185 } 186 187 static void net_stream_cleanup(NetClientState *nc) 188 { 189 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 190 if (s->ioc) { 191 if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) { 192 if (s->ioc_read_tag) { 193 g_source_remove(s->ioc_read_tag); 194 s->ioc_read_tag = 0; 195 } 196 if (s->ioc_write_tag) { 197 g_source_remove(s->ioc_write_tag); 198 s->ioc_write_tag = 0; 199 } 200 } 201 object_unref(OBJECT(s->ioc)); 202 s->ioc = NULL; 203 } 204 if (s->listen_ioc) { 205 if (s->listener) { 206 qio_net_listener_disconnect(s->listener); 207 object_unref(OBJECT(s->listener)); 208 s->listener = NULL; 209 } 210 object_unref(OBJECT(s->listen_ioc)); 211 s->listen_ioc = NULL; 212 } 213 } 214 215 static NetClientInfo net_stream_info = { 216 .type = NET_CLIENT_DRIVER_STREAM, 217 .size = sizeof(NetStreamState), 218 .receive = net_stream_receive, 219 .cleanup = net_stream_cleanup, 220 }; 221 222 static void net_stream_listen(QIONetListener *listener, 223 QIOChannelSocket *cioc, 224 void *opaque) 225 { 226 NetStreamState *s = opaque; 227 SocketAddress *addr; 228 char *uri; 229 230 object_ref(OBJECT(cioc)); 231 232 qio_net_listener_set_client_func(s->listener, NULL, s, NULL); 233 234 s->ioc = QIO_CHANNEL(cioc); 235 qio_channel_set_name(s->ioc, "stream-server"); 236 s->nc.link_down = false; 237 238 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 239 s, NULL); 240 241 if (cioc->localAddr.ss_family == AF_UNIX) { 242 addr = qio_channel_socket_get_local_address(cioc, NULL); 243 } else { 244 addr = qio_channel_socket_get_remote_address(cioc, NULL); 245 } 246 g_assert(addr != NULL); 247 uri = socket_uri(addr); 248 qemu_set_info_str(&s->nc, "%s", uri); 249 g_free(uri); 250 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 251 qapi_free_SocketAddress(addr); 252 } 253 254 static void net_stream_server_listening(QIOTask *task, gpointer opaque) 255 { 256 NetStreamState *s = opaque; 257 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc); 258 SocketAddress *addr; 259 int ret; 260 261 if (listen_sioc->fd < 0) { 262 qemu_set_info_str(&s->nc, "connection error"); 263 return; 264 } 265 266 addr = qio_channel_socket_get_local_address(listen_sioc, NULL); 267 g_assert(addr != NULL); 268 ret = qemu_socket_try_set_nonblock(listen_sioc->fd); 269 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 270 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 271 addr->u.fd.str, -ret); 272 return; 273 } 274 g_assert(ret == 0); 275 qapi_free_SocketAddress(addr); 276 277 s->nc.link_down = true; 278 s->listener = qio_net_listener_new(); 279 280 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 281 qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL); 282 qio_net_listener_add(s->listener, listen_sioc); 283 } 284 285 static int net_stream_server_init(NetClientState *peer, 286 const char *model, 287 const char *name, 288 SocketAddress *addr, 289 Error **errp) 290 { 291 NetClientState *nc; 292 NetStreamState *s; 293 QIOChannelSocket *listen_sioc = qio_channel_socket_new(); 294 295 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 296 s = DO_UPCAST(NetStreamState, nc, nc); 297 298 s->listen_ioc = QIO_CHANNEL(listen_sioc); 299 qio_channel_socket_listen_async(listen_sioc, addr, 0, 300 net_stream_server_listening, s, 301 NULL, NULL); 302 303 return 0; 304 } 305 306 static void net_stream_client_connected(QIOTask *task, gpointer opaque) 307 { 308 NetStreamState *s = opaque; 309 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc); 310 SocketAddress *addr; 311 gchar *uri; 312 int ret; 313 314 if (sioc->fd < 0) { 315 qemu_set_info_str(&s->nc, "connection error"); 316 goto error; 317 } 318 319 addr = qio_channel_socket_get_remote_address(sioc, NULL); 320 g_assert(addr != NULL); 321 uri = socket_uri(addr); 322 qemu_set_info_str(&s->nc, "%s", uri); 323 g_free(uri); 324 325 ret = qemu_socket_try_set_nonblock(sioc->fd); 326 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 327 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 328 addr->u.fd.str, -ret); 329 qapi_free_SocketAddress(addr); 330 goto error; 331 } 332 g_assert(ret == 0); 333 334 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 335 336 /* Disable Nagle algorithm on TCP sockets to reduce latency */ 337 qio_channel_set_delay(s->ioc, false); 338 339 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 340 s, NULL); 341 s->nc.link_down = false; 342 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 343 qapi_free_SocketAddress(addr); 344 345 return; 346 error: 347 object_unref(OBJECT(s->ioc)); 348 s->ioc = NULL; 349 } 350 351 static int net_stream_client_init(NetClientState *peer, 352 const char *model, 353 const char *name, 354 SocketAddress *addr, 355 Error **errp) 356 { 357 NetStreamState *s; 358 NetClientState *nc; 359 QIOChannelSocket *sioc = qio_channel_socket_new(); 360 361 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 362 s = DO_UPCAST(NetStreamState, nc, nc); 363 364 s->ioc = QIO_CHANNEL(sioc); 365 s->nc.link_down = true; 366 367 qio_channel_socket_connect_async(sioc, addr, 368 net_stream_client_connected, s, 369 NULL, NULL); 370 371 return 0; 372 } 373 374 int net_init_stream(const Netdev *netdev, const char *name, 375 NetClientState *peer, Error **errp) 376 { 377 const NetdevStreamOptions *sock; 378 379 assert(netdev->type == NET_CLIENT_DRIVER_STREAM); 380 sock = &netdev->u.stream; 381 382 if (!sock->has_server || !sock->server) { 383 return net_stream_client_init(peer, "stream", name, sock->addr, errp); 384 } 385 return net_stream_server_init(peer, "stream", name, sock->addr, errp); 386 } 387