1 /* 2 * QEMU System Emulator 3 * 4 * Copyright (c) 2003-2008 Fabrice Bellard 5 * Copyright (c) 2022 Red Hat, Inc. 6 * 7 * Permission is hereby granted, free of charge, to any person obtaining a copy 8 * of this software and associated documentation files (the "Software"), to deal 9 * in the Software without restriction, including without limitation the rights 10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 * copies of the Software, and to permit persons to whom the Software is 12 * furnished to do so, subject to the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be included in 15 * all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23 * THE SOFTWARE. 24 */ 25 26 #include "qemu/osdep.h" 27 28 #include "net/net.h" 29 #include "clients.h" 30 #include "monitor/monitor.h" 31 #include "qapi/error.h" 32 #include "qemu/error-report.h" 33 #include "qemu/option.h" 34 #include "qemu/sockets.h" 35 #include "qemu/iov.h" 36 #include "qemu/main-loop.h" 37 #include "qemu/cutils.h" 38 #include "io/channel.h" 39 #include "io/channel-socket.h" 40 #include "io/net-listener.h" 41 #include "qapi/qapi-events-net.h" 42 #include "qapi/qapi-visit-sockets.h" 43 #include "qapi/clone-visitor.h" 44 45 typedef struct NetStreamState { 46 NetClientState nc; 47 QIOChannel *listen_ioc; 48 QIONetListener *listener; 49 QIOChannel *ioc; 50 guint ioc_read_tag; 51 guint ioc_write_tag; 52 SocketReadState rs; 53 unsigned int send_index; /* number of bytes sent*/ 54 uint32_t reconnect; 55 guint timer_tag; 56 SocketAddress *addr; 57 } NetStreamState; 58 59 static void net_stream_listen(QIONetListener *listener, 60 QIOChannelSocket *cioc, 61 void *opaque); 62 static void net_stream_arm_reconnect(NetStreamState *s); 63 64 static gboolean net_stream_writable(QIOChannel *ioc, 65 GIOCondition condition, 66 gpointer data) 67 { 68 NetStreamState *s = data; 69 70 s->ioc_write_tag = 0; 71 72 qemu_flush_queued_packets(&s->nc); 73 74 return G_SOURCE_REMOVE; 75 } 76 77 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf, 78 size_t size) 79 { 80 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 81 uint32_t len = htonl(size); 82 struct iovec iov[] = { 83 { 84 .iov_base = &len, 85 .iov_len = sizeof(len), 86 }, { 87 .iov_base = (void *)buf, 88 .iov_len = size, 89 }, 90 }; 91 struct iovec local_iov[2]; 92 unsigned int nlocal_iov; 93 size_t remaining; 94 ssize_t ret; 95 96 remaining = iov_size(iov, 2) - s->send_index; 97 nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining); 98 ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL); 99 if (ret == QIO_CHANNEL_ERR_BLOCK) { 100 ret = 0; /* handled further down */ 101 } 102 if (ret == -1) { 103 s->send_index = 0; 104 return -errno; 105 } 106 if (ret < (ssize_t)remaining) { 107 s->send_index += ret; 108 s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT, 109 net_stream_writable, s, NULL); 110 return 0; 111 } 112 s->send_index = 0; 113 return size; 114 } 115 116 static gboolean net_stream_send(QIOChannel *ioc, 117 GIOCondition condition, 118 gpointer data); 119 120 static void net_stream_send_completed(NetClientState *nc, ssize_t len) 121 { 122 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 123 124 if (!s->ioc_read_tag) { 125 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, 126 net_stream_send, s, NULL); 127 } 128 } 129 130 static void net_stream_rs_finalize(SocketReadState *rs) 131 { 132 NetStreamState *s = container_of(rs, NetStreamState, rs); 133 134 if (qemu_send_packet_async(&s->nc, rs->buf, 135 rs->packet_len, 136 net_stream_send_completed) == 0) { 137 if (s->ioc_read_tag) { 138 g_source_remove(s->ioc_read_tag); 139 s->ioc_read_tag = 0; 140 } 141 } 142 } 143 144 static gboolean net_stream_send(QIOChannel *ioc, 145 GIOCondition condition, 146 gpointer data) 147 { 148 NetStreamState *s = data; 149 int size; 150 int ret; 151 char buf1[NET_BUFSIZE]; 152 const char *buf; 153 154 size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL); 155 if (size < 0) { 156 if (errno != EWOULDBLOCK) { 157 goto eoc; 158 } 159 } else if (size == 0) { 160 /* end of connection */ 161 eoc: 162 s->ioc_read_tag = 0; 163 if (s->ioc_write_tag) { 164 g_source_remove(s->ioc_write_tag); 165 s->ioc_write_tag = 0; 166 } 167 if (s->listener) { 168 qio_net_listener_set_client_func(s->listener, net_stream_listen, 169 s, NULL); 170 } 171 object_unref(OBJECT(s->ioc)); 172 s->ioc = NULL; 173 174 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 175 s->nc.link_down = true; 176 qemu_set_info_str(&s->nc, "listening"); 177 178 qapi_event_send_netdev_stream_disconnected(s->nc.name); 179 net_stream_arm_reconnect(s); 180 181 return G_SOURCE_REMOVE; 182 } 183 buf = buf1; 184 185 ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size); 186 187 if (ret == -1) { 188 goto eoc; 189 } 190 191 return G_SOURCE_CONTINUE; 192 } 193 194 static void net_stream_cleanup(NetClientState *nc) 195 { 196 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); 197 if (s->timer_tag) { 198 g_source_remove(s->timer_tag); 199 s->timer_tag = 0; 200 } 201 if (s->addr) { 202 qapi_free_SocketAddress(s->addr); 203 s->addr = NULL; 204 } 205 if (s->ioc) { 206 if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) { 207 if (s->ioc_read_tag) { 208 g_source_remove(s->ioc_read_tag); 209 s->ioc_read_tag = 0; 210 } 211 if (s->ioc_write_tag) { 212 g_source_remove(s->ioc_write_tag); 213 s->ioc_write_tag = 0; 214 } 215 } 216 object_unref(OBJECT(s->ioc)); 217 s->ioc = NULL; 218 } 219 if (s->listen_ioc) { 220 if (s->listener) { 221 qio_net_listener_disconnect(s->listener); 222 object_unref(OBJECT(s->listener)); 223 s->listener = NULL; 224 } 225 object_unref(OBJECT(s->listen_ioc)); 226 s->listen_ioc = NULL; 227 } 228 } 229 230 static NetClientInfo net_stream_info = { 231 .type = NET_CLIENT_DRIVER_STREAM, 232 .size = sizeof(NetStreamState), 233 .receive = net_stream_receive, 234 .cleanup = net_stream_cleanup, 235 }; 236 237 static void net_stream_listen(QIONetListener *listener, 238 QIOChannelSocket *cioc, 239 void *opaque) 240 { 241 NetStreamState *s = opaque; 242 SocketAddress *addr; 243 char *uri; 244 245 object_ref(OBJECT(cioc)); 246 247 qio_net_listener_set_client_func(s->listener, NULL, s, NULL); 248 249 s->ioc = QIO_CHANNEL(cioc); 250 qio_channel_set_name(s->ioc, "stream-server"); 251 s->nc.link_down = false; 252 253 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 254 s, NULL); 255 256 if (cioc->localAddr.ss_family == AF_UNIX) { 257 addr = qio_channel_socket_get_local_address(cioc, NULL); 258 } else { 259 addr = qio_channel_socket_get_remote_address(cioc, NULL); 260 } 261 g_assert(addr != NULL); 262 uri = socket_uri(addr); 263 qemu_set_info_str(&s->nc, "%s", uri); 264 g_free(uri); 265 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 266 qapi_free_SocketAddress(addr); 267 } 268 269 static void net_stream_server_listening(QIOTask *task, gpointer opaque) 270 { 271 NetStreamState *s = opaque; 272 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc); 273 SocketAddress *addr; 274 int ret; 275 276 if (listen_sioc->fd < 0) { 277 qemu_set_info_str(&s->nc, "connection error"); 278 return; 279 } 280 281 addr = qio_channel_socket_get_local_address(listen_sioc, NULL); 282 g_assert(addr != NULL); 283 ret = qemu_socket_try_set_nonblock(listen_sioc->fd); 284 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 285 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 286 addr->u.fd.str, -ret); 287 return; 288 } 289 g_assert(ret == 0); 290 qapi_free_SocketAddress(addr); 291 292 s->nc.link_down = true; 293 s->listener = qio_net_listener_new(); 294 295 qemu_set_info_str(&s->nc, "listening"); 296 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 297 qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL); 298 qio_net_listener_add(s->listener, listen_sioc); 299 } 300 301 static int net_stream_server_init(NetClientState *peer, 302 const char *model, 303 const char *name, 304 SocketAddress *addr, 305 Error **errp) 306 { 307 NetClientState *nc; 308 NetStreamState *s; 309 QIOChannelSocket *listen_sioc = qio_channel_socket_new(); 310 311 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 312 s = DO_UPCAST(NetStreamState, nc, nc); 313 qemu_set_info_str(&s->nc, "initializing"); 314 315 s->listen_ioc = QIO_CHANNEL(listen_sioc); 316 qio_channel_socket_listen_async(listen_sioc, addr, 0, 317 net_stream_server_listening, s, 318 NULL, NULL); 319 320 return 0; 321 } 322 323 static void net_stream_client_connected(QIOTask *task, gpointer opaque) 324 { 325 NetStreamState *s = opaque; 326 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc); 327 SocketAddress *addr; 328 gchar *uri; 329 int ret; 330 331 if (sioc->fd < 0) { 332 qemu_set_info_str(&s->nc, "connection error"); 333 goto error; 334 } 335 336 addr = qio_channel_socket_get_remote_address(sioc, NULL); 337 g_assert(addr != NULL); 338 uri = socket_uri(addr); 339 qemu_set_info_str(&s->nc, "%s", uri); 340 g_free(uri); 341 342 ret = qemu_socket_try_set_nonblock(sioc->fd); 343 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 344 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", 345 addr->u.fd.str, -ret); 346 qapi_free_SocketAddress(addr); 347 goto error; 348 } 349 g_assert(ret == 0); 350 351 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); 352 353 /* Disable Nagle algorithm on TCP sockets to reduce latency */ 354 qio_channel_set_delay(s->ioc, false); 355 356 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, 357 s, NULL); 358 s->nc.link_down = false; 359 qapi_event_send_netdev_stream_connected(s->nc.name, addr); 360 qapi_free_SocketAddress(addr); 361 362 return; 363 error: 364 object_unref(OBJECT(s->ioc)); 365 s->ioc = NULL; 366 net_stream_arm_reconnect(s); 367 } 368 369 static gboolean net_stream_reconnect(gpointer data) 370 { 371 NetStreamState *s = data; 372 QIOChannelSocket *sioc; 373 374 s->timer_tag = 0; 375 376 sioc = qio_channel_socket_new(); 377 s->ioc = QIO_CHANNEL(sioc); 378 qio_channel_socket_connect_async(sioc, s->addr, 379 net_stream_client_connected, s, 380 NULL, NULL); 381 return G_SOURCE_REMOVE; 382 } 383 384 static void net_stream_arm_reconnect(NetStreamState *s) 385 { 386 if (s->reconnect && s->timer_tag == 0) { 387 s->timer_tag = g_timeout_add_seconds(s->reconnect, 388 net_stream_reconnect, s); 389 } 390 } 391 392 static int net_stream_client_init(NetClientState *peer, 393 const char *model, 394 const char *name, 395 SocketAddress *addr, 396 uint32_t reconnect, 397 Error **errp) 398 { 399 NetStreamState *s; 400 NetClientState *nc; 401 QIOChannelSocket *sioc = qio_channel_socket_new(); 402 403 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 404 s = DO_UPCAST(NetStreamState, nc, nc); 405 qemu_set_info_str(&s->nc, "connecting"); 406 407 s->ioc = QIO_CHANNEL(sioc); 408 s->nc.link_down = true; 409 410 s->reconnect = reconnect; 411 if (reconnect) { 412 s->addr = QAPI_CLONE(SocketAddress, addr); 413 } 414 qio_channel_socket_connect_async(sioc, addr, 415 net_stream_client_connected, s, 416 NULL, NULL); 417 418 return 0; 419 } 420 421 int net_init_stream(const Netdev *netdev, const char *name, 422 NetClientState *peer, Error **errp) 423 { 424 const NetdevStreamOptions *sock; 425 426 assert(netdev->type == NET_CLIENT_DRIVER_STREAM); 427 sock = &netdev->u.stream; 428 429 if (!sock->has_server || !sock->server) { 430 return net_stream_client_init(peer, "stream", name, sock->addr, 431 sock->has_reconnect ? sock->reconnect : 0, 432 errp); 433 } 434 if (sock->has_reconnect) { 435 error_setg(errp, "'reconnect' option is incompatible with " 436 "socket in server mode"); 437 return -1; 438 } 439 return net_stream_server_init(peer, "stream", name, sock->addr, errp); 440 } 441