1 /*
2 * net stream generic functions
3 *
4 * Copyright Red Hat
5 *
6 * SPDX-License-Identifier: GPL-2.0-or-later
7 */
8
9 #include "qemu/osdep.h"
10 #include "qemu/iov.h"
11 #include "qapi/error.h"
12 #include "net/net.h"
13 #include "io/channel.h"
14 #include "io/net-listener.h"
15
16 #include "stream_data.h"
17
net_stream_data_writable(QIOChannel * ioc,GIOCondition condition,gpointer data)18 static gboolean net_stream_data_writable(QIOChannel *ioc,
19 GIOCondition condition, gpointer data)
20 {
21 NetStreamData *d = data;
22
23 d->ioc_write_tag = 0;
24
25 qemu_flush_queued_packets(&d->nc);
26
27 return G_SOURCE_REMOVE;
28 }
29
net_stream_data_receive(NetStreamData * d,const uint8_t * buf,size_t size)30 ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf,
31 size_t size)
32 {
33 uint32_t len = htonl(size);
34 struct iovec iov[] = {
35 {
36 .iov_base = &len,
37 .iov_len = sizeof(len),
38 }, {
39 .iov_base = (void *)buf,
40 .iov_len = size,
41 },
42 };
43 struct iovec local_iov[2];
44 unsigned int nlocal_iov;
45 size_t remaining;
46 ssize_t ret;
47
48 remaining = iov_size(iov, 2) - d->send_index;
49 nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining);
50 ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL);
51 if (ret == QIO_CHANNEL_ERR_BLOCK) {
52 ret = 0; /* handled further down */
53 }
54 if (ret == -1) {
55 d->send_index = 0;
56 return -errno;
57 }
58 if (ret < (ssize_t)remaining) {
59 d->send_index += ret;
60 d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT,
61 net_stream_data_writable, d,
62 NULL);
63 return 0;
64 }
65 d->send_index = 0;
66 return size;
67 }
68
net_stream_data_send_completed(NetClientState * nc,ssize_t len)69 static void net_stream_data_send_completed(NetClientState *nc, ssize_t len)
70 {
71 NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
72
73 if (!d->ioc_read_tag) {
74 d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d,
75 NULL);
76 }
77 }
78
net_stream_data_rs_finalize(SocketReadState * rs)79 void net_stream_data_rs_finalize(SocketReadState *rs)
80 {
81 NetStreamData *d = container_of(rs, NetStreamData, rs);
82
83 if (qemu_send_packet_async(&d->nc, rs->buf,
84 rs->packet_len,
85 net_stream_data_send_completed) == 0) {
86 if (d->ioc_read_tag) {
87 g_source_remove(d->ioc_read_tag);
88 d->ioc_read_tag = 0;
89 }
90 }
91 }
92
net_stream_data_send(QIOChannel * ioc,GIOCondition condition,NetStreamData * d)93 gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition,
94 NetStreamData *d)
95 {
96 int size;
97 int ret;
98 QEMU_UNINITIALIZED char buf1[NET_BUFSIZE];
99 const char *buf;
100
101 size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL);
102 if (size < 0) {
103 if (errno != EWOULDBLOCK) {
104 goto eoc;
105 }
106 } else if (size == 0) {
107 /* end of connection */
108 eoc:
109 d->ioc_read_tag = 0;
110 if (d->ioc_write_tag) {
111 g_source_remove(d->ioc_write_tag);
112 d->ioc_write_tag = 0;
113 }
114 if (d->listener) {
115 qemu_set_info_str(&d->nc, "listening");
116 qio_net_listener_set_client_func(d->listener,
117 d->listen, d, NULL);
118 }
119 object_unref(OBJECT(d->ioc));
120 d->ioc = NULL;
121
122 net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
123 d->nc.link_down = true;
124
125 return G_SOURCE_REMOVE;
126 }
127 buf = buf1;
128
129 ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size);
130
131 if (ret == -1) {
132 goto eoc;
133 }
134
135 return G_SOURCE_CONTINUE;
136 }
137
net_stream_data_listen(QIONetListener * listener,QIOChannelSocket * cioc,NetStreamData * d)138 void net_stream_data_listen(QIONetListener *listener,
139 QIOChannelSocket *cioc,
140 NetStreamData *d)
141 {
142 object_ref(OBJECT(cioc));
143
144 qio_net_listener_set_client_func(d->listener, NULL, d, NULL);
145
146 d->ioc = QIO_CHANNEL(cioc);
147 qio_channel_set_name(d->ioc, "stream-server");
148 d->nc.link_down = false;
149
150 d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
151 }
152
net_stream_data_client_connected(QIOTask * task,NetStreamData * d)153 int net_stream_data_client_connected(QIOTask *task, NetStreamData *d)
154 {
155 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
156 SocketAddress *addr;
157 int ret;
158 Error *err = NULL;
159
160 if (qio_task_propagate_error(task, &err)) {
161 qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
162 error_free(err);
163 goto error;
164 }
165
166 addr = qio_channel_socket_get_remote_address(sioc, NULL);
167 g_assert(addr != NULL);
168
169 ret = qemu_socket_try_set_nonblock(sioc->fd);
170 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
171 qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
172 addr->u.fd.str, -ret);
173 qapi_free_SocketAddress(addr);
174 goto error;
175 }
176 g_assert(ret == 0);
177 qapi_free_SocketAddress(addr);
178
179 net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
180
181 /* Disable Nagle algorithm on TCP sockets to reduce latency */
182 qio_channel_set_delay(d->ioc, false);
183
184 d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
185 d->nc.link_down = false;
186
187 return 0;
188 error:
189 object_unref(OBJECT(d->ioc));
190 d->ioc = NULL;
191
192 return -1;
193 }
194