1 /* 2 * QEMU System Emulator 3 * 4 * Copyright (c) 2003-2008 Fabrice Bellard 5 * Copyright (c) 2022 Red Hat, Inc. 6 * 7 * Permission is hereby granted, free of charge, to any person obtaining a copy 8 * of this software and associated documentation files (the "Software"), to deal 9 * in the Software without restriction, including without limitation the rights 10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 * copies of the Software, and to permit persons to whom the Software is 12 * furnished to do so, subject to the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be included in 15 * all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23 * THE SOFTWARE. 24 */ 25 26 #include "qemu/osdep.h" 27 28 #include "net/net.h" 29 #include "clients.h" 30 #include "monitor/monitor.h" 31 #include "qapi/error.h" 32 #include "qemu/error-report.h" 33 #include "qemu/option.h" 34 #include "qemu/sockets.h" 35 #include "qemu/iov.h" 36 #include "qemu/main-loop.h" 37 #include "qemu/cutils.h" 38 #include "io/channel.h" 39 #include "io/channel-socket.h" 40 #include "io/net-listener.h" 41 #include "qapi/qapi-events-net.h" 42 #include "qapi/qapi-visit-sockets.h" 43 #include "qapi/clone-visitor.h" 44 45 typedef struct NetStreamState { 46 NetClientState nc; 47 QIOChannel *listen_ioc; 48 QIONetListener *listener; 49 QIOChannel *ioc; 50 guint ioc_read_tag; 51 guint ioc_write_tag; 52 SocketReadState rs; 53 unsigned int send_index; /* number of bytes sent*/ 54 uint32_t reconnect_ms; 55 guint timer_tag; 56 SocketAddress *addr; 57 } NetStreamState; 58 59 static void net_stream_listen(QIONetListener *listener, 60 QIOChannelSocket *cioc, 61 void *opaque); 62 static void net_stream_arm_reconnect(NetStreamState *s); 63 64 static gboolean net_stream_writable(QIOChannel *ioc, 65 GIOCondition condition, 66 gpointer data) 67 { 68 NetStreamState *s = data; 69 70 s->ioc_write_tag = 0; 71 72 qemu_flush_queued_packets(&s->nc); 73 74 return G_SOURCE_REMOVE; 75 } 76 77 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf, 78 size_t size) 79 { 80 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 81 uint32_t len = htonl(size); 82 struct iovec iov[] = { 83 { 84 .iov_base = &len, 85 .iov_len = sizeof(len), 86 }, { 87 .iov_base = (void *)buf, 88 .iov_len = size, 89 }, 90 }; 91 struct iovec local_iov[2]; 92 unsigned int nlocal_iov; 93 size_t remaining; 94 ssize_t ret; 95 96 remaining = iov_size(iov, 2) - s->send_index; 97 nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining); 98 ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL); 99 if (ret == QIO_CHANNEL_ERR_BLOCK) { 100 ret = 0; /* handled further down */ 101 } 102 if (ret == -1) { 103 s->send_index = 0; 104 return -errno; 105 } 106 if (ret < (ssize_t)remaining) { 107 s->send_index += ret; 108 s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT, 109 net_stream_writable, s, NULL); 110 return 0; 111 } 112 s->send_index = 0; 113 return size; 114 } 115 116 static gboolean net_stream_send(QIOChannel *ioc, 117 GIOCondition condition, 118 gpointer data); 119 120 static void net_stream_send_completed(NetClientState *nc, ssize_t len) 121 { 122 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 123 124 if (!s->ioc_read_tag) { 125 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, 126 net_stream_send, s, NULL); 127 } 128 } 129 130 static void net_stream_rs_finalize(SocketReadState *rs) 131 { 132 NetStreamState *s = container_of(rs, NetStreamState, rs); 133 134 if (qemu_send_packet_async(&s->nc, rs->buf, 135 rs->packet_len, 136 net_stream_send_completed) == 0) { 137 if (s->ioc_read_tag) { 138 g_source_remove(s->ioc_read_tag); 139 s->ioc_read_tag = 0; 140 } 141 } 142 } 143 144 static gboolean net_stream_send(QIOChannel *ioc, 145 GIOCondition condition, 146 gpointer data) 147 { 148 NetStreamState *s = data; 149 int size; 150 int ret; 151 char buf1[NET_BUFSIZE]; 152 const char *buf; 153 154 size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL); 155 if (size < 0) { 156 if (errno != EWOULDBLOCK) { 157 goto eoc; 158 } 159 } else if (size == 0) { 160 /* end of connection */ 161 eoc: 162 s->ioc_read_tag = 0; 163 if (s->ioc_write_tag) { 164 g_source_remove(s->ioc_write_tag); 165 s->ioc_write_tag = 0; 166 } 167 if (s->listener) { 168 qemu_set_info_str(&s->nc, "listening"); 169 qio_net_listener_set_client_func(s->listener, net_stream_listen, 170 s, NULL); 171 } 172 object_unref(OBJECT(s->ioc)); 173 s->ioc = NULL; 174 175 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 176 s->nc.link_down = true; 177 178 qapi_event_send_netdev_stream_disconnected(s->nc.name); 179 net_stream_arm_reconnect(s); 180 181 return G_SOURCE_REMOVE; 182 } 183 buf = buf1; 184 185 ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size); 186 187 if (ret == -1) { 188 goto eoc; 189 } 190 191 return G_SOURCE_CONTINUE; 192 } 193 194 static void net_stream_cleanup(NetClientState *nc) 195 { 196 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 197 if (s->timer_tag) { 198 g_source_remove(s->timer_tag); 199 s->timer_tag = 0; 200 } 201 if (s->addr) { 202 qapi_free_SocketAddress(s->addr); 203 s->addr = NULL; 204 } 205 if (s->ioc) { 206 if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) { 207 if (s->ioc_read_tag) { 208 g_source_remove(s->ioc_read_tag); 209 s->ioc_read_tag = 0; 210 } 211 if (s->ioc_write_tag) { 212 g_source_remove(s->ioc_write_tag); 213 s->ioc_write_tag = 0; 214 } 215 } 216 object_unref(OBJECT(s->ioc)); 217 s->ioc = NULL; 218 } 219 if (s->listen_ioc) { 220 if (s->listener) { 221 qio_net_listener_disconnect(s->listener); 222 object_unref(OBJECT(s->listener)); 223 s->listener = NULL; 224 } 225 object_unref(OBJECT(s->listen_ioc)); 226 s->listen_ioc = NULL; 227 } 228 } 229 230 static NetClientInfo net_stream_info = { 231 .type = NET_CLIENT_DRIVER_STREAM, 232 .size = sizeof(NetStreamState), 233 .receive = net_stream_receive, 234 .cleanup = net_stream_cleanup, 235 }; 236 237 static void net_stream_listen(QIONetListener *listener, 238 QIOChannelSocket *cioc, 239 void *opaque) 240 { 241 NetStreamState *s = opaque; 242 SocketAddress *addr; 243 char *uri; 244 245 object_ref(OBJECT(cioc)); 246 247 qio_net_listener_set_client_func(s->listener, NULL, s, NULL); 248 249 s->ioc = QIO_CHANNEL(cioc); 250 qio_channel_set_name(s->ioc, "stream-server"); 251 s->nc.link_down = false; 252 253 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 254 s, NULL); 255 256 if (cioc->localAddr.ss_family == AF_UNIX) { 257 addr = qio_channel_socket_get_local_address(cioc, NULL); 258 } else { 259 addr = qio_channel_socket_get_remote_address(cioc, NULL); 260 } 261 g_assert(addr != NULL); 262 uri = socket_uri(addr); 263 qemu_set_info_str(&s->nc, "%s", uri); 264 g_free(uri); 265 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 266 qapi_free_SocketAddress(addr); 267 } 268 269 static void net_stream_server_listening(QIOTask *task, gpointer opaque) 270 { 271 NetStreamState *s = opaque; 272 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc); 273 SocketAddress *addr; 274 int ret; 275 Error *err = NULL; 276 277 if (qio_task_propagate_error(task, &err)) { 278 qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err)); 279 error_free(err); 280 return; 281 } 282 283 addr = qio_channel_socket_get_local_address(listen_sioc, NULL); 284 g_assert(addr != NULL); 285 ret = qemu_socket_try_set_nonblock(listen_sioc->fd); 286 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 287 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 288 addr->u.fd.str, -ret); 289 return; 290 } 291 g_assert(ret == 0); 292 qapi_free_SocketAddress(addr); 293 294 s->nc.link_down = true; 295 s->listener = qio_net_listener_new(); 296 297 qemu_set_info_str(&s->nc, "listening"); 298 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 299 qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL); 300 qio_net_listener_add(s->listener, listen_sioc); 301 } 302 303 static int net_stream_server_init(NetClientState *peer, 304 const char *model, 305 const char *name, 306 SocketAddress *addr, 307 Error **errp) 308 { 309 NetClientState *nc; 310 NetStreamState *s; 311 QIOChannelSocket *listen_sioc = qio_channel_socket_new(); 312 313 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 314 s = DO_UPCAST(NetStreamState, nc, nc); 315 qemu_set_info_str(&s->nc, "initializing"); 316 317 s->listen_ioc = QIO_CHANNEL(listen_sioc); 318 qio_channel_socket_listen_async(listen_sioc, addr, 0, 319 net_stream_server_listening, s, 320 NULL, NULL); 321 322 return 0; 323 } 324 325 static void net_stream_client_connected(QIOTask *task, gpointer opaque) 326 { 327 NetStreamState *s = opaque; 328 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc); 329 SocketAddress *addr; 330 gchar *uri; 331 int ret; 332 Error *err = NULL; 333 334 if (qio_task_propagate_error(task, &err)) { 335 qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err)); 336 error_free(err); 337 goto error; 338 } 339 340 addr = qio_channel_socket_get_remote_address(sioc, NULL); 341 g_assert(addr != NULL); 342 uri = socket_uri(addr); 343 qemu_set_info_str(&s->nc, "%s", uri); 344 g_free(uri); 345 346 ret = qemu_socket_try_set_nonblock(sioc->fd); 347 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 348 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 349 addr->u.fd.str, -ret); 350 qapi_free_SocketAddress(addr); 351 goto error; 352 } 353 g_assert(ret == 0); 354 355 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 356 357 /* Disable Nagle algorithm on TCP sockets to reduce latency */ 358 qio_channel_set_delay(s->ioc, false); 359 360 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 361 s, NULL); 362 s->nc.link_down = false; 363 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 364 qapi_free_SocketAddress(addr); 365 366 return; 367 error: 368 object_unref(OBJECT(s->ioc)); 369 s->ioc = NULL; 370 net_stream_arm_reconnect(s); 371 } 372 373 static gboolean net_stream_reconnect(gpointer data) 374 { 375 NetStreamState *s = data; 376 QIOChannelSocket *sioc; 377 378 s->timer_tag = 0; 379 380 sioc = qio_channel_socket_new(); 381 s->ioc = QIO_CHANNEL(sioc); 382 qio_channel_socket_connect_async(sioc, s->addr, 383 net_stream_client_connected, s, 384 NULL, NULL); 385 return G_SOURCE_REMOVE; 386 } 387 388 static void net_stream_arm_reconnect(NetStreamState *s) 389 { 390 if (s->reconnect_ms && s->timer_tag == 0) { 391 qemu_set_info_str(&s->nc, "connecting"); 392 s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s); 393 } 394 } 395 396 static int net_stream_client_init(NetClientState *peer, 397 const char *model, 398 const char *name, 399 SocketAddress *addr, 400 uint32_t reconnect_ms, 401 Error **errp) 402 { 403 NetStreamState *s; 404 NetClientState *nc; 405 QIOChannelSocket *sioc = qio_channel_socket_new(); 406 407 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 408 s = DO_UPCAST(NetStreamState, nc, nc); 409 qemu_set_info_str(&s->nc, "connecting"); 410 411 s->ioc = QIO_CHANNEL(sioc); 412 s->nc.link_down = true; 413 414 s->reconnect_ms = reconnect_ms; 415 if (reconnect_ms) { 416 s->addr = QAPI_CLONE(SocketAddress, addr); 417 } 418 qio_channel_socket_connect_async(sioc, addr, 419 net_stream_client_connected, s, 420 NULL, NULL); 421 422 return 0; 423 } 424 425 int net_init_stream(const Netdev *netdev, const char *name, 426 NetClientState *peer, Error **errp) 427 { 428 const NetdevStreamOptions *sock; 429 430 assert(netdev->type == NET_CLIENT_DRIVER_STREAM); 431 sock = &netdev->u.stream; 432 433 if (!sock->has_server || !sock->server) { 434 uint32_t reconnect_ms = 0; 435 436 if (sock->has_reconnect && sock->has_reconnect_ms) { 437 error_setg(errp, "'reconnect' and 'reconnect-ms' are mutually " 438 "exclusive"); 439 return -1; 440 } else if (sock->has_reconnect_ms) { 441 reconnect_ms = sock->reconnect_ms; 442 } else if (sock->has_reconnect) { 443 reconnect_ms = sock->reconnect * 1000u; 444 } 445 446 return net_stream_client_init(peer, "stream", name, sock->addr, 447 reconnect_ms, errp); 448 } 449 if (sock->has_reconnect || sock->has_reconnect_ms) { 450 error_setg(errp, "'reconnect' and 'reconnect-ms' options are " 451 "incompatible with socket in server mode"); 452 return -1; 453 } 454 return net_stream_server_init(peer, "stream", name, sock->addr, errp); 455 } 456