1 /* 2 * QEMU System Emulator 3 * 4 * Copyright (c) 2003-2008 Fabrice Bellard 5 * Copyright (c) 2022 Red Hat, Inc. 6 * 7 * Permission is hereby granted, free of charge, to any person obtaining a copy 8 * of this software and associated documentation files (the "Software"), to deal 9 * in the Software without restriction, including without limitation the rights 10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 * copies of the Software, and to permit persons to whom the Software is 12 * furnished to do so, subject to the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be included in 15 * all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23 * THE SOFTWARE. 24 */ 25 26 #include "qemu/osdep.h" 27 28 #include "net/net.h" 29 #include "clients.h" 30 #include "monitor/monitor.h" 31 #include "qapi/error.h" 32 #include "qemu/error-report.h" 33 #include "qemu/option.h" 34 #include "qemu/sockets.h" 35 #include "qemu/iov.h" 36 #include "qemu/main-loop.h" 37 #include "qemu/cutils.h" 38 #include "io/channel.h" 39 #include "io/channel-socket.h" 40 #include "io/net-listener.h" 41 #include "qapi/qapi-events-net.h" 42 #include "qapi/qapi-visit-sockets.h" 43 #include "qapi/clone-visitor.h" 44 45 typedef struct NetStreamState { 46 NetClientState nc; 47 QIOChannel *listen_ioc; 48 QIONetListener *listener; 49 QIOChannel *ioc; 50 guint ioc_read_tag; 51 guint ioc_write_tag; 52 SocketReadState rs; 53 unsigned int send_index; /* number of bytes sent*/ 54 uint32_t reconnect; 55 guint timer_tag; 56 SocketAddress *addr; 57 } NetStreamState; 58 59 static void net_stream_listen(QIONetListener *listener, 60 QIOChannelSocket *cioc, 61 void *opaque); 62 static void net_stream_arm_reconnect(NetStreamState *s); 63 64 static gboolean net_stream_writable(QIOChannel *ioc, 65 GIOCondition condition, 66 gpointer data) 67 { 68 NetStreamState *s = data; 69 70 s->ioc_write_tag = 0; 71 72 qemu_flush_queued_packets(&s->nc); 73 74 return G_SOURCE_REMOVE; 75 } 76 77 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf, 78 size_t size) 79 { 80 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 81 uint32_t len = htonl(size); 82 struct iovec iov[] = { 83 { 84 .iov_base = &len, 85 .iov_len = sizeof(len), 86 }, { 87 .iov_base = (void *)buf, 88 .iov_len = size, 89 }, 90 }; 91 struct iovec local_iov[2]; 92 unsigned int nlocal_iov; 93 size_t remaining; 94 ssize_t ret; 95 96 remaining = iov_size(iov, 2) - s->send_index; 97 nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining); 98 ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL); 99 if (ret == QIO_CHANNEL_ERR_BLOCK) { 100 ret = 0; /* handled further down */ 101 } 102 if (ret == -1) { 103 s->send_index = 0; 104 return -errno; 105 } 106 if (ret < (ssize_t)remaining) { 107 s->send_index += ret; 108 s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT, 109 net_stream_writable, s, NULL); 110 return 0; 111 } 112 s->send_index = 0; 113 return size; 114 } 115 116 static gboolean net_stream_send(QIOChannel *ioc, 117 GIOCondition condition, 118 gpointer data); 119 120 static void net_stream_send_completed(NetClientState *nc, ssize_t len) 121 { 122 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 123 124 if (!s->ioc_read_tag) { 125 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, 126 net_stream_send, s, NULL); 127 } 128 } 129 130 static void net_stream_rs_finalize(SocketReadState *rs) 131 { 132 NetStreamState *s = container_of(rs, NetStreamState, rs); 133 134 if (qemu_send_packet_async(&s->nc, rs->buf, 135 rs->packet_len, 136 net_stream_send_completed) == 0) { 137 if (s->ioc_read_tag) { 138 g_source_remove(s->ioc_read_tag); 139 s->ioc_read_tag = 0; 140 } 141 } 142 } 143 144 static gboolean net_stream_send(QIOChannel *ioc, 145 GIOCondition condition, 146 gpointer data) 147 { 148 NetStreamState *s = data; 149 int size; 150 int ret; 151 char buf1[NET_BUFSIZE]; 152 const char *buf; 153 154 size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL); 155 if (size < 0) { 156 if (errno != EWOULDBLOCK) { 157 goto eoc; 158 } 159 } else if (size == 0) { 160 /* end of connection */ 161 eoc: 162 s->ioc_read_tag = 0; 163 if (s->ioc_write_tag) { 164 g_source_remove(s->ioc_write_tag); 165 s->ioc_write_tag = 0; 166 } 167 if (s->listener) { 168 qio_net_listener_set_client_func(s->listener, net_stream_listen, 169 s, NULL); 170 } 171 object_unref(OBJECT(s->ioc)); 172 s->ioc = NULL; 173 174 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 175 s->nc.link_down = true; 176 qemu_set_info_str(&s->nc, "%s", ""); 177 178 qapi_event_send_netdev_stream_disconnected(s->nc.name); 179 net_stream_arm_reconnect(s); 180 181 return G_SOURCE_REMOVE; 182 } 183 buf = buf1; 184 185 ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size); 186 187 if (ret == -1) { 188 goto eoc; 189 } 190 191 return G_SOURCE_CONTINUE; 192 } 193 194 static void net_stream_cleanup(NetClientState *nc) 195 { 196 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 197 if (s->timer_tag) { 198 g_source_remove(s->timer_tag); 199 s->timer_tag = 0; 200 } 201 if (s->addr) { 202 qapi_free_SocketAddress(s->addr); 203 s->addr = NULL; 204 } 205 if (s->ioc) { 206 if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) { 207 if (s->ioc_read_tag) { 208 g_source_remove(s->ioc_read_tag); 209 s->ioc_read_tag = 0; 210 } 211 if (s->ioc_write_tag) { 212 g_source_remove(s->ioc_write_tag); 213 s->ioc_write_tag = 0; 214 } 215 } 216 object_unref(OBJECT(s->ioc)); 217 s->ioc = NULL; 218 } 219 if (s->listen_ioc) { 220 if (s->listener) { 221 qio_net_listener_disconnect(s->listener); 222 object_unref(OBJECT(s->listener)); 223 s->listener = NULL; 224 } 225 object_unref(OBJECT(s->listen_ioc)); 226 s->listen_ioc = NULL; 227 } 228 } 229 230 static NetClientInfo net_stream_info = { 231 .type = NET_CLIENT_DRIVER_STREAM, 232 .size = sizeof(NetStreamState), 233 .receive = net_stream_receive, 234 .cleanup = net_stream_cleanup, 235 }; 236 237 static void net_stream_listen(QIONetListener *listener, 238 QIOChannelSocket *cioc, 239 void *opaque) 240 { 241 NetStreamState *s = opaque; 242 SocketAddress *addr; 243 char *uri; 244 245 object_ref(OBJECT(cioc)); 246 247 qio_net_listener_set_client_func(s->listener, NULL, s, NULL); 248 249 s->ioc = QIO_CHANNEL(cioc); 250 qio_channel_set_name(s->ioc, "stream-server"); 251 s->nc.link_down = false; 252 253 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 254 s, NULL); 255 256 if (cioc->localAddr.ss_family == AF_UNIX) { 257 addr = qio_channel_socket_get_local_address(cioc, NULL); 258 } else { 259 addr = qio_channel_socket_get_remote_address(cioc, NULL); 260 } 261 g_assert(addr != NULL); 262 uri = socket_uri(addr); 263 qemu_set_info_str(&s->nc, "%s", uri); 264 g_free(uri); 265 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 266 qapi_free_SocketAddress(addr); 267 } 268 269 static void net_stream_server_listening(QIOTask *task, gpointer opaque) 270 { 271 NetStreamState *s = opaque; 272 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc); 273 SocketAddress *addr; 274 int ret; 275 276 if (listen_sioc->fd < 0) { 277 qemu_set_info_str(&s->nc, "connection error"); 278 return; 279 } 280 281 addr = qio_channel_socket_get_local_address(listen_sioc, NULL); 282 g_assert(addr != NULL); 283 ret = qemu_socket_try_set_nonblock(listen_sioc->fd); 284 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 285 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 286 addr->u.fd.str, -ret); 287 return; 288 } 289 g_assert(ret == 0); 290 qapi_free_SocketAddress(addr); 291 292 s->nc.link_down = true; 293 s->listener = qio_net_listener_new(); 294 295 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 296 qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL); 297 qio_net_listener_add(s->listener, listen_sioc); 298 } 299 300 static int net_stream_server_init(NetClientState *peer, 301 const char *model, 302 const char *name, 303 SocketAddress *addr, 304 Error **errp) 305 { 306 NetClientState *nc; 307 NetStreamState *s; 308 QIOChannelSocket *listen_sioc = qio_channel_socket_new(); 309 310 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 311 s = DO_UPCAST(NetStreamState, nc, nc); 312 313 s->listen_ioc = QIO_CHANNEL(listen_sioc); 314 qio_channel_socket_listen_async(listen_sioc, addr, 0, 315 net_stream_server_listening, s, 316 NULL, NULL); 317 318 return 0; 319 } 320 321 static void net_stream_client_connected(QIOTask *task, gpointer opaque) 322 { 323 NetStreamState *s = opaque; 324 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc); 325 SocketAddress *addr; 326 gchar *uri; 327 int ret; 328 329 if (sioc->fd < 0) { 330 qemu_set_info_str(&s->nc, "connection error"); 331 goto error; 332 } 333 334 addr = qio_channel_socket_get_remote_address(sioc, NULL); 335 g_assert(addr != NULL); 336 uri = socket_uri(addr); 337 qemu_set_info_str(&s->nc, "%s", uri); 338 g_free(uri); 339 340 ret = qemu_socket_try_set_nonblock(sioc->fd); 341 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 342 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 343 addr->u.fd.str, -ret); 344 qapi_free_SocketAddress(addr); 345 goto error; 346 } 347 g_assert(ret == 0); 348 349 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 350 351 /* Disable Nagle algorithm on TCP sockets to reduce latency */ 352 qio_channel_set_delay(s->ioc, false); 353 354 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 355 s, NULL); 356 s->nc.link_down = false; 357 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 358 qapi_free_SocketAddress(addr); 359 360 return; 361 error: 362 object_unref(OBJECT(s->ioc)); 363 s->ioc = NULL; 364 net_stream_arm_reconnect(s); 365 } 366 367 static gboolean net_stream_reconnect(gpointer data) 368 { 369 NetStreamState *s = data; 370 QIOChannelSocket *sioc; 371 372 s->timer_tag = 0; 373 374 sioc = qio_channel_socket_new(); 375 s->ioc = QIO_CHANNEL(sioc); 376 qio_channel_socket_connect_async(sioc, s->addr, 377 net_stream_client_connected, s, 378 NULL, NULL); 379 return G_SOURCE_REMOVE; 380 } 381 382 static void net_stream_arm_reconnect(NetStreamState *s) 383 { 384 if (s->reconnect && s->timer_tag == 0) { 385 s->timer_tag = g_timeout_add_seconds(s->reconnect, 386 net_stream_reconnect, s); 387 } 388 } 389 390 static int net_stream_client_init(NetClientState *peer, 391 const char *model, 392 const char *name, 393 SocketAddress *addr, 394 uint32_t reconnect, 395 Error **errp) 396 { 397 NetStreamState *s; 398 NetClientState *nc; 399 QIOChannelSocket *sioc = qio_channel_socket_new(); 400 401 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 402 s = DO_UPCAST(NetStreamState, nc, nc); 403 404 s->ioc = QIO_CHANNEL(sioc); 405 s->nc.link_down = true; 406 407 s->reconnect = reconnect; 408 if (reconnect) { 409 s->addr = QAPI_CLONE(SocketAddress, addr); 410 } 411 qio_channel_socket_connect_async(sioc, addr, 412 net_stream_client_connected, s, 413 NULL, NULL); 414 415 return 0; 416 } 417 418 int net_init_stream(const Netdev *netdev, const char *name, 419 NetClientState *peer, Error **errp) 420 { 421 const NetdevStreamOptions *sock; 422 423 assert(netdev->type == NET_CLIENT_DRIVER_STREAM); 424 sock = &netdev->u.stream; 425 426 if (!sock->has_server || !sock->server) { 427 return net_stream_client_init(peer, "stream", name, sock->addr, 428 sock->has_reconnect ? sock->reconnect : 0, 429 errp); 430 } 431 if (sock->has_reconnect) { 432 error_setg(errp, "'reconnect' option is incompatible with " 433 "socket in server mode"); 434 return -1; 435 } 436 return net_stream_server_init(peer, "stream", name, sock->addr, errp); 437 } 438