1 /* 2 * QEMU System Emulator 3 * 4 * Copyright (c) 2003-2008 Fabrice Bellard 5 * Copyright (c) 2022 Red Hat, Inc. 6 * 7 * Permission is hereby granted, free of charge, to any person obtaining a copy 8 * of this software and associated documentation files (the "Software"), to deal 9 * in the Software without restriction, including without limitation the rights 10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 * copies of the Software, and to permit persons to whom the Software is 12 * furnished to do so, subject to the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be included in 15 * all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23 * THE SOFTWARE. 24 */ 25 26 #include "qemu/osdep.h" 27 28 #include "net/net.h" 29 #include "clients.h" 30 #include "qapi/error.h" 31 #include "io/net-listener.h" 32 #include "qapi/qapi-events-net.h" 33 #include "qapi/qapi-visit-sockets.h" 34 #include "qapi/clone-visitor.h" 35 36 #include "stream_data.h" 37 38 typedef struct NetStreamState { 39 NetStreamData data; 40 uint32_t reconnect_ms; 41 guint timer_tag; 42 SocketAddress *addr; 43 } NetStreamState; 44 45 static void net_stream_arm_reconnect(NetStreamState *s); 46 47 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf, 48 size_t size) 49 { 50 NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc); 51 52 return net_stream_data_receive(d, buf, size); 53 } 54 55 static gboolean net_stream_send(QIOChannel *ioc, 56 GIOCondition condition, 57 gpointer data) 58 { 59 if (net_stream_data_send(ioc, condition, data) == G_SOURCE_REMOVE) { 60 NetStreamState *s = DO_UPCAST(NetStreamState, data, data); 61 62 qapi_event_send_netdev_stream_disconnected(s->data.nc.name); 63 net_stream_arm_reconnect(s); 64 65 return G_SOURCE_REMOVE; 66 } 67 68 return G_SOURCE_CONTINUE; 69 } 70 71 static void net_stream_cleanup(NetClientState *nc) 72 { 73 NetStreamState *s = DO_UPCAST(NetStreamState, data.nc, nc); 74 if (s->timer_tag) { 75 g_source_remove(s->timer_tag); 76 s->timer_tag = 0; 77 } 78 if (s->addr) { 79 qapi_free_SocketAddress(s->addr); 80 s->addr = NULL; 81 } 82 if (s->data.ioc) { 83 if (QIO_CHANNEL_SOCKET(s->data.ioc)->fd != -1) { 84 if (s->data.ioc_read_tag) { 85 g_source_remove(s->data.ioc_read_tag); 86 s->data.ioc_read_tag = 0; 87 } 88 if (s->data.ioc_write_tag) { 89 g_source_remove(s->data.ioc_write_tag); 90 s->data.ioc_write_tag = 0; 91 } 92 } 93 object_unref(OBJECT(s->data.ioc)); 94 s->data.ioc = NULL; 95 } 96 if (s->data.listen_ioc) { 97 if (s->data.listener) { 98 qio_net_listener_disconnect(s->data.listener); 99 object_unref(OBJECT(s->data.listener)); 100 s->data.listener = NULL; 101 } 102 object_unref(OBJECT(s->data.listen_ioc)); 103 s->data.listen_ioc = NULL; 104 } 105 } 106 107 static NetClientInfo net_stream_info = { 108 .type = NET_CLIENT_DRIVER_STREAM, 109 .size = sizeof(NetStreamState), 110 .receive = net_stream_receive, 111 .cleanup = net_stream_cleanup, 112 }; 113 114 static void net_stream_listen(QIONetListener *listener, 115 QIOChannelSocket *cioc, gpointer data) 116 { 117 NetStreamData *d = data; 118 SocketAddress *addr; 119 char *uri; 120 121 net_stream_data_listen(listener, cioc, data); 122 123 if (cioc->localAddr.ss_family == AF_UNIX) { 124 addr = qio_channel_socket_get_local_address(cioc, NULL); 125 } else { 126 addr = qio_channel_socket_get_remote_address(cioc, NULL); 127 } 128 g_assert(addr != NULL); 129 uri = socket_uri(addr); 130 qemu_set_info_str(&d->nc, "%s", uri); 131 g_free(uri); 132 qapi_event_send_netdev_stream_connected(d->nc.name, addr); 133 qapi_free_SocketAddress(addr); 134 } 135 136 static void net_stream_server_listening(QIOTask *task, gpointer opaque) 137 { 138 NetStreamData *d = opaque; 139 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(d->listen_ioc); 140 SocketAddress *addr; 141 int ret; 142 Error *err = NULL; 143 144 if (qio_task_propagate_error(task, &err)) { 145 qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err)); 146 error_free(err); 147 return; 148 } 149 150 addr = qio_channel_socket_get_local_address(listen_sioc, NULL); 151 g_assert(addr != NULL); 152 ret = qemu_socket_try_set_nonblock(listen_sioc->fd); 153 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { 154 qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)", 155 addr->u.fd.str, -ret); 156 return; 157 } 158 g_assert(ret == 0); 159 qapi_free_SocketAddress(addr); 160 161 d->nc.link_down = true; 162 d->listener = qio_net_listener_new(); 163 164 qemu_set_info_str(&d->nc, "listening"); 165 net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false); 166 qio_net_listener_set_client_func(d->listener, d->listen, d, 167 NULL); 168 qio_net_listener_add(d->listener, listen_sioc); 169 } 170 171 static int net_stream_server_init(NetClientState *peer, 172 const char *model, 173 const char *name, 174 SocketAddress *addr, 175 Error **errp) 176 { 177 NetClientState *nc; 178 NetStreamData *d; 179 QIOChannelSocket *listen_sioc = qio_channel_socket_new(); 180 181 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 182 d = DO_UPCAST(NetStreamData, nc, nc); 183 d->send = net_stream_send; 184 d->listen = net_stream_listen; 185 qemu_set_info_str(&d->nc, "initializing"); 186 187 d->listen_ioc = QIO_CHANNEL(listen_sioc); 188 qio_channel_socket_listen_async(listen_sioc, addr, 0, 189 net_stream_server_listening, d, 190 NULL, NULL); 191 192 return 0; 193 } 194 195 static void net_stream_client_connected(QIOTask *task, gpointer opaque) 196 { 197 NetStreamState *s = opaque; 198 NetStreamData *d = &s->data; 199 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc); 200 SocketAddress *addr; 201 gchar *uri; 202 203 if (net_stream_data_client_connected(task, d) == -1) { 204 net_stream_arm_reconnect(s); 205 return; 206 } 207 208 addr = qio_channel_socket_get_remote_address(sioc, NULL); 209 g_assert(addr != NULL); 210 uri = socket_uri(addr); 211 qemu_set_info_str(&d->nc, "%s", uri); 212 g_free(uri); 213 qapi_event_send_netdev_stream_connected(d->nc.name, addr); 214 qapi_free_SocketAddress(addr); 215 } 216 217 static gboolean net_stream_reconnect(gpointer data) 218 { 219 NetStreamState *s = data; 220 QIOChannelSocket *sioc; 221 222 s->timer_tag = 0; 223 224 sioc = qio_channel_socket_new(); 225 s->data.ioc = QIO_CHANNEL(sioc); 226 qio_channel_socket_connect_async(sioc, s->addr, 227 net_stream_client_connected, s, 228 NULL, NULL); 229 return G_SOURCE_REMOVE; 230 } 231 232 static void net_stream_arm_reconnect(NetStreamState *s) 233 { 234 if (s->reconnect_ms && s->timer_tag == 0) { 235 qemu_set_info_str(&s->data.nc, "connecting"); 236 s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s); 237 } 238 } 239 240 static int net_stream_client_init(NetClientState *peer, 241 const char *model, 242 const char *name, 243 SocketAddress *addr, 244 uint32_t reconnect_ms, 245 Error **errp) 246 { 247 NetStreamState *s; 248 NetClientState *nc; 249 QIOChannelSocket *sioc = qio_channel_socket_new(); 250 251 nc = qemu_new_net_client(&net_stream_info, peer, model, name); 252 s = DO_UPCAST(NetStreamState, data.nc, nc); 253 qemu_set_info_str(&s->data.nc, "connecting"); 254 255 s->data.ioc = QIO_CHANNEL(sioc); 256 s->data.nc.link_down = true; 257 s->data.send = net_stream_send; 258 s->data.listen = net_stream_listen; 259 260 s->reconnect_ms = reconnect_ms; 261 if (reconnect_ms) { 262 s->addr = QAPI_CLONE(SocketAddress, addr); 263 } 264 qio_channel_socket_connect_async(sioc, addr, 265 net_stream_client_connected, s, 266 NULL, NULL); 267 268 return 0; 269 } 270 271 int net_init_stream(const Netdev *netdev, const char *name, 272 NetClientState *peer, Error **errp) 273 { 274 const NetdevStreamOptions *sock; 275 276 assert(netdev->type == NET_CLIENT_DRIVER_STREAM); 277 sock = &netdev->u.stream; 278 279 if (!sock->has_server || !sock->server) { 280 uint32_t reconnect_ms = 0; 281 282 if (sock->has_reconnect && sock->has_reconnect_ms) { 283 error_setg(errp, "'reconnect' and 'reconnect-ms' are mutually " 284 "exclusive"); 285 return -1; 286 } else if (sock->has_reconnect_ms) { 287 reconnect_ms = sock->reconnect_ms; 288 } else if (sock->has_reconnect) { 289 reconnect_ms = sock->reconnect * 1000u; 290 } 291 292 return net_stream_client_init(peer, "stream", name, sock->addr, 293 reconnect_ms, errp); 294 } 295 if (sock->has_reconnect || sock->has_reconnect_ms) { 296 error_setg(errp, "'reconnect' and 'reconnect-ms' options are " 297 "incompatible with socket in server mode"); 298 return -1; 299 } 300 return net_stream_server_init(peer, "stream", name, sock->addr, errp); 301 } 302