1 /*
2 * QEMU System Emulator
3 *
4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2022 Red Hat, Inc.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
24 */
25
26 #include "qemu/osdep.h"
27
28 #include "net/net.h"
29 #include "clients.h"
30 #include "qapi/error.h"
31 #include "io/net-listener.h"
32 #include "qapi/qapi-events-net.h"
33 #include "qapi/qapi-visit-sockets.h"
34 #include "qapi/clone-visitor.h"
35
36 #include "stream_data.h"
37
38 typedef struct NetStreamState {
39 NetStreamData data;
40 uint32_t reconnect_ms;
41 guint timer_tag;
42 SocketAddress *addr;
43 } NetStreamState;
44
45 static void net_stream_arm_reconnect(NetStreamState *s);
46
net_stream_receive(NetClientState * nc,const uint8_t * buf,size_t size)47 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
48 size_t size)
49 {
50 NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
51
52 return net_stream_data_receive(d, buf, size);
53 }
54
net_stream_send(QIOChannel * ioc,GIOCondition condition,gpointer data)55 static gboolean net_stream_send(QIOChannel *ioc,
56 GIOCondition condition,
57 gpointer data)
58 {
59 if (net_stream_data_send(ioc, condition, data) == G_SOURCE_REMOVE) {
60 NetStreamState *s = DO_UPCAST(NetStreamState, data, data);
61
62 qapi_event_send_netdev_stream_disconnected(s->data.nc.name);
63 net_stream_arm_reconnect(s);
64
65 return G_SOURCE_REMOVE;
66 }
67
68 return G_SOURCE_CONTINUE;
69 }
70
net_stream_cleanup(NetClientState * nc)71 static void net_stream_cleanup(NetClientState *nc)
72 {
73 NetStreamState *s = DO_UPCAST(NetStreamState, data.nc, nc);
74 if (s->timer_tag) {
75 g_source_remove(s->timer_tag);
76 s->timer_tag = 0;
77 }
78 if (s->addr) {
79 qapi_free_SocketAddress(s->addr);
80 s->addr = NULL;
81 }
82 if (s->data.ioc) {
83 if (QIO_CHANNEL_SOCKET(s->data.ioc)->fd != -1) {
84 if (s->data.ioc_read_tag) {
85 g_source_remove(s->data.ioc_read_tag);
86 s->data.ioc_read_tag = 0;
87 }
88 if (s->data.ioc_write_tag) {
89 g_source_remove(s->data.ioc_write_tag);
90 s->data.ioc_write_tag = 0;
91 }
92 }
93 object_unref(OBJECT(s->data.ioc));
94 s->data.ioc = NULL;
95 }
96 if (s->data.listen_ioc) {
97 if (s->data.listener) {
98 qio_net_listener_disconnect(s->data.listener);
99 object_unref(OBJECT(s->data.listener));
100 s->data.listener = NULL;
101 }
102 object_unref(OBJECT(s->data.listen_ioc));
103 s->data.listen_ioc = NULL;
104 }
105 }
106
107 static NetClientInfo net_stream_info = {
108 .type = NET_CLIENT_DRIVER_STREAM,
109 .size = sizeof(NetStreamState),
110 .receive = net_stream_receive,
111 .cleanup = net_stream_cleanup,
112 };
113
net_stream_listen(QIONetListener * listener,QIOChannelSocket * cioc,gpointer data)114 static void net_stream_listen(QIONetListener *listener,
115 QIOChannelSocket *cioc, gpointer data)
116 {
117 NetStreamData *d = data;
118 SocketAddress *addr;
119 char *uri;
120
121 net_stream_data_listen(listener, cioc, data);
122
123 if (cioc->localAddr.ss_family == AF_UNIX) {
124 addr = qio_channel_socket_get_local_address(cioc, NULL);
125 } else {
126 addr = qio_channel_socket_get_remote_address(cioc, NULL);
127 }
128 g_assert(addr != NULL);
129 uri = socket_uri(addr);
130 qemu_set_info_str(&d->nc, "%s", uri);
131 g_free(uri);
132 qapi_event_send_netdev_stream_connected(d->nc.name, addr);
133 qapi_free_SocketAddress(addr);
134 }
135
net_stream_server_listening(QIOTask * task,gpointer opaque)136 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
137 {
138 NetStreamData *d = opaque;
139 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(d->listen_ioc);
140 SocketAddress *addr;
141 int ret;
142 Error *err = NULL;
143
144 if (qio_task_propagate_error(task, &err)) {
145 qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
146 error_free(err);
147 return;
148 }
149
150 addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
151 g_assert(addr != NULL);
152 ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
153 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
154 qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
155 addr->u.fd.str, -ret);
156 return;
157 }
158 g_assert(ret == 0);
159 qapi_free_SocketAddress(addr);
160
161 d->nc.link_down = true;
162 d->listener = qio_net_listener_new();
163
164 qemu_set_info_str(&d->nc, "listening");
165 net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
166 qio_net_listener_set_client_func(d->listener, d->listen, d,
167 NULL);
168 qio_net_listener_add(d->listener, listen_sioc);
169 }
170
net_stream_server_init(NetClientState * peer,const char * model,const char * name,SocketAddress * addr,Error ** errp)171 static int net_stream_server_init(NetClientState *peer,
172 const char *model,
173 const char *name,
174 SocketAddress *addr,
175 Error **errp)
176 {
177 NetClientState *nc;
178 NetStreamData *d;
179 QIOChannelSocket *listen_sioc = qio_channel_socket_new();
180
181 nc = qemu_new_net_client(&net_stream_info, peer, model, name);
182 d = DO_UPCAST(NetStreamData, nc, nc);
183 d->send = net_stream_send;
184 d->listen = net_stream_listen;
185 qemu_set_info_str(&d->nc, "initializing");
186
187 d->listen_ioc = QIO_CHANNEL(listen_sioc);
188 qio_channel_socket_listen_async(listen_sioc, addr, 0,
189 net_stream_server_listening, d,
190 NULL, NULL);
191
192 return 0;
193 }
194
net_stream_client_connected(QIOTask * task,gpointer opaque)195 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
196 {
197 NetStreamState *s = opaque;
198 NetStreamData *d = &s->data;
199 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
200 SocketAddress *addr;
201 gchar *uri;
202
203 if (net_stream_data_client_connected(task, d) == -1) {
204 net_stream_arm_reconnect(s);
205 return;
206 }
207
208 addr = qio_channel_socket_get_remote_address(sioc, NULL);
209 g_assert(addr != NULL);
210 uri = socket_uri(addr);
211 qemu_set_info_str(&d->nc, "%s", uri);
212 g_free(uri);
213 qapi_event_send_netdev_stream_connected(d->nc.name, addr);
214 qapi_free_SocketAddress(addr);
215 }
216
net_stream_reconnect(gpointer data)217 static gboolean net_stream_reconnect(gpointer data)
218 {
219 NetStreamState *s = data;
220 QIOChannelSocket *sioc;
221
222 s->timer_tag = 0;
223
224 sioc = qio_channel_socket_new();
225 s->data.ioc = QIO_CHANNEL(sioc);
226 qio_channel_socket_connect_async(sioc, s->addr,
227 net_stream_client_connected, s,
228 NULL, NULL);
229 return G_SOURCE_REMOVE;
230 }
231
net_stream_arm_reconnect(NetStreamState * s)232 static void net_stream_arm_reconnect(NetStreamState *s)
233 {
234 if (s->reconnect_ms && s->timer_tag == 0) {
235 qemu_set_info_str(&s->data.nc, "connecting");
236 s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
237 }
238 }
239
net_stream_client_init(NetClientState * peer,const char * model,const char * name,SocketAddress * addr,uint32_t reconnect_ms,Error ** errp)240 static int net_stream_client_init(NetClientState *peer,
241 const char *model,
242 const char *name,
243 SocketAddress *addr,
244 uint32_t reconnect_ms,
245 Error **errp)
246 {
247 NetStreamState *s;
248 NetClientState *nc;
249 QIOChannelSocket *sioc = qio_channel_socket_new();
250
251 nc = qemu_new_net_client(&net_stream_info, peer, model, name);
252 s = DO_UPCAST(NetStreamState, data.nc, nc);
253 qemu_set_info_str(&s->data.nc, "connecting");
254
255 s->data.ioc = QIO_CHANNEL(sioc);
256 s->data.nc.link_down = true;
257 s->data.send = net_stream_send;
258 s->data.listen = net_stream_listen;
259
260 s->reconnect_ms = reconnect_ms;
261 if (reconnect_ms) {
262 s->addr = QAPI_CLONE(SocketAddress, addr);
263 }
264 qio_channel_socket_connect_async(sioc, addr,
265 net_stream_client_connected, s,
266 NULL, NULL);
267
268 return 0;
269 }
270
net_init_stream(const Netdev * netdev,const char * name,NetClientState * peer,Error ** errp)271 int net_init_stream(const Netdev *netdev, const char *name,
272 NetClientState *peer, Error **errp)
273 {
274 const NetdevStreamOptions *sock;
275
276 assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
277 sock = &netdev->u.stream;
278
279 if (!sock->has_server || !sock->server) {
280 uint32_t reconnect_ms = 0;
281
282 if (sock->has_reconnect && sock->has_reconnect_ms) {
283 error_setg(errp, "'reconnect' and 'reconnect-ms' are mutually "
284 "exclusive");
285 return -1;
286 } else if (sock->has_reconnect_ms) {
287 reconnect_ms = sock->reconnect_ms;
288 } else if (sock->has_reconnect) {
289 reconnect_ms = sock->reconnect * 1000u;
290 }
291
292 return net_stream_client_init(peer, "stream", name, sock->addr,
293 reconnect_ms, errp);
294 }
295 if (sock->has_reconnect || sock->has_reconnect_ms) {
296 error_setg(errp, "'reconnect' and 'reconnect-ms' options are "
297 "incompatible with socket in server mode");
298 return -1;
299 }
300 return net_stream_server_init(peer, "stream", name, sock->addr, errp);
301 }
302