1*adf684ceSLaurent Vivier /*
2*adf684ceSLaurent Vivier * net stream generic functions
3*adf684ceSLaurent Vivier *
4*adf684ceSLaurent Vivier * Copyright Red Hat
5*adf684ceSLaurent Vivier *
6*adf684ceSLaurent Vivier * SPDX-License-Identifier: GPL-2.0-or-later
7*adf684ceSLaurent Vivier */
8*adf684ceSLaurent Vivier
9*adf684ceSLaurent Vivier #include "qemu/osdep.h"
10*adf684ceSLaurent Vivier #include "qemu/iov.h"
11*adf684ceSLaurent Vivier #include "qapi/error.h"
12*adf684ceSLaurent Vivier #include "net/net.h"
13*adf684ceSLaurent Vivier #include "io/channel.h"
14*adf684ceSLaurent Vivier #include "io/net-listener.h"
15*adf684ceSLaurent Vivier
16*adf684ceSLaurent Vivier #include "stream_data.h"
17*adf684ceSLaurent Vivier
net_stream_data_writable(QIOChannel * ioc,GIOCondition condition,gpointer data)18*adf684ceSLaurent Vivier static gboolean net_stream_data_writable(QIOChannel *ioc,
19*adf684ceSLaurent Vivier GIOCondition condition, gpointer data)
20*adf684ceSLaurent Vivier {
21*adf684ceSLaurent Vivier NetStreamData *d = data;
22*adf684ceSLaurent Vivier
23*adf684ceSLaurent Vivier d->ioc_write_tag = 0;
24*adf684ceSLaurent Vivier
25*adf684ceSLaurent Vivier qemu_flush_queued_packets(&d->nc);
26*adf684ceSLaurent Vivier
27*adf684ceSLaurent Vivier return G_SOURCE_REMOVE;
28*adf684ceSLaurent Vivier }
29*adf684ceSLaurent Vivier
net_stream_data_receive(NetStreamData * d,const uint8_t * buf,size_t size)30*adf684ceSLaurent Vivier ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf,
31*adf684ceSLaurent Vivier size_t size)
32*adf684ceSLaurent Vivier {
33*adf684ceSLaurent Vivier uint32_t len = htonl(size);
34*adf684ceSLaurent Vivier struct iovec iov[] = {
35*adf684ceSLaurent Vivier {
36*adf684ceSLaurent Vivier .iov_base = &len,
37*adf684ceSLaurent Vivier .iov_len = sizeof(len),
38*adf684ceSLaurent Vivier }, {
39*adf684ceSLaurent Vivier .iov_base = (void *)buf,
40*adf684ceSLaurent Vivier .iov_len = size,
41*adf684ceSLaurent Vivier },
42*adf684ceSLaurent Vivier };
43*adf684ceSLaurent Vivier struct iovec local_iov[2];
44*adf684ceSLaurent Vivier unsigned int nlocal_iov;
45*adf684ceSLaurent Vivier size_t remaining;
46*adf684ceSLaurent Vivier ssize_t ret;
47*adf684ceSLaurent Vivier
48*adf684ceSLaurent Vivier remaining = iov_size(iov, 2) - d->send_index;
49*adf684ceSLaurent Vivier nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining);
50*adf684ceSLaurent Vivier ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL);
51*adf684ceSLaurent Vivier if (ret == QIO_CHANNEL_ERR_BLOCK) {
52*adf684ceSLaurent Vivier ret = 0; /* handled further down */
53*adf684ceSLaurent Vivier }
54*adf684ceSLaurent Vivier if (ret == -1) {
55*adf684ceSLaurent Vivier d->send_index = 0;
56*adf684ceSLaurent Vivier return -errno;
57*adf684ceSLaurent Vivier }
58*adf684ceSLaurent Vivier if (ret < (ssize_t)remaining) {
59*adf684ceSLaurent Vivier d->send_index += ret;
60*adf684ceSLaurent Vivier d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT,
61*adf684ceSLaurent Vivier net_stream_data_writable, d,
62*adf684ceSLaurent Vivier NULL);
63*adf684ceSLaurent Vivier return 0;
64*adf684ceSLaurent Vivier }
65*adf684ceSLaurent Vivier d->send_index = 0;
66*adf684ceSLaurent Vivier return size;
67*adf684ceSLaurent Vivier }
68*adf684ceSLaurent Vivier
net_stream_data_send_completed(NetClientState * nc,ssize_t len)69*adf684ceSLaurent Vivier static void net_stream_data_send_completed(NetClientState *nc, ssize_t len)
70*adf684ceSLaurent Vivier {
71*adf684ceSLaurent Vivier NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
72*adf684ceSLaurent Vivier
73*adf684ceSLaurent Vivier if (!d->ioc_read_tag) {
74*adf684ceSLaurent Vivier d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d,
75*adf684ceSLaurent Vivier NULL);
76*adf684ceSLaurent Vivier }
77*adf684ceSLaurent Vivier }
78*adf684ceSLaurent Vivier
net_stream_data_rs_finalize(SocketReadState * rs)79*adf684ceSLaurent Vivier void net_stream_data_rs_finalize(SocketReadState *rs)
80*adf684ceSLaurent Vivier {
81*adf684ceSLaurent Vivier NetStreamData *d = container_of(rs, NetStreamData, rs);
82*adf684ceSLaurent Vivier
83*adf684ceSLaurent Vivier if (qemu_send_packet_async(&d->nc, rs->buf,
84*adf684ceSLaurent Vivier rs->packet_len,
85*adf684ceSLaurent Vivier net_stream_data_send_completed) == 0) {
86*adf684ceSLaurent Vivier if (d->ioc_read_tag) {
87*adf684ceSLaurent Vivier g_source_remove(d->ioc_read_tag);
88*adf684ceSLaurent Vivier d->ioc_read_tag = 0;
89*adf684ceSLaurent Vivier }
90*adf684ceSLaurent Vivier }
91*adf684ceSLaurent Vivier }
92*adf684ceSLaurent Vivier
net_stream_data_send(QIOChannel * ioc,GIOCondition condition,NetStreamData * d)93*adf684ceSLaurent Vivier gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition,
94*adf684ceSLaurent Vivier NetStreamData *d)
95*adf684ceSLaurent Vivier {
96*adf684ceSLaurent Vivier int size;
97*adf684ceSLaurent Vivier int ret;
98*adf684ceSLaurent Vivier QEMU_UNINITIALIZED char buf1[NET_BUFSIZE];
99*adf684ceSLaurent Vivier const char *buf;
100*adf684ceSLaurent Vivier
101*adf684ceSLaurent Vivier size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL);
102*adf684ceSLaurent Vivier if (size < 0) {
103*adf684ceSLaurent Vivier if (errno != EWOULDBLOCK) {
104*adf684ceSLaurent Vivier goto eoc;
105*adf684ceSLaurent Vivier }
106*adf684ceSLaurent Vivier } else if (size == 0) {
107*adf684ceSLaurent Vivier /* end of connection */
108*adf684ceSLaurent Vivier eoc:
109*adf684ceSLaurent Vivier d->ioc_read_tag = 0;
110*adf684ceSLaurent Vivier if (d->ioc_write_tag) {
111*adf684ceSLaurent Vivier g_source_remove(d->ioc_write_tag);
112*adf684ceSLaurent Vivier d->ioc_write_tag = 0;
113*adf684ceSLaurent Vivier }
114*adf684ceSLaurent Vivier if (d->listener) {
115*adf684ceSLaurent Vivier qemu_set_info_str(&d->nc, "listening");
116*adf684ceSLaurent Vivier qio_net_listener_set_client_func(d->listener,
117*adf684ceSLaurent Vivier d->listen, d, NULL);
118*adf684ceSLaurent Vivier }
119*adf684ceSLaurent Vivier object_unref(OBJECT(d->ioc));
120*adf684ceSLaurent Vivier d->ioc = NULL;
121*adf684ceSLaurent Vivier
122*adf684ceSLaurent Vivier net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
123*adf684ceSLaurent Vivier d->nc.link_down = true;
124*adf684ceSLaurent Vivier
125*adf684ceSLaurent Vivier return G_SOURCE_REMOVE;
126*adf684ceSLaurent Vivier }
127*adf684ceSLaurent Vivier buf = buf1;
128*adf684ceSLaurent Vivier
129*adf684ceSLaurent Vivier ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size);
130*adf684ceSLaurent Vivier
131*adf684ceSLaurent Vivier if (ret == -1) {
132*adf684ceSLaurent Vivier goto eoc;
133*adf684ceSLaurent Vivier }
134*adf684ceSLaurent Vivier
135*adf684ceSLaurent Vivier return G_SOURCE_CONTINUE;
136*adf684ceSLaurent Vivier }
137*adf684ceSLaurent Vivier
net_stream_data_listen(QIONetListener * listener,QIOChannelSocket * cioc,NetStreamData * d)138*adf684ceSLaurent Vivier void net_stream_data_listen(QIONetListener *listener,
139*adf684ceSLaurent Vivier QIOChannelSocket *cioc,
140*adf684ceSLaurent Vivier NetStreamData *d)
141*adf684ceSLaurent Vivier {
142*adf684ceSLaurent Vivier object_ref(OBJECT(cioc));
143*adf684ceSLaurent Vivier
144*adf684ceSLaurent Vivier qio_net_listener_set_client_func(d->listener, NULL, d, NULL);
145*adf684ceSLaurent Vivier
146*adf684ceSLaurent Vivier d->ioc = QIO_CHANNEL(cioc);
147*adf684ceSLaurent Vivier qio_channel_set_name(d->ioc, "stream-server");
148*adf684ceSLaurent Vivier d->nc.link_down = false;
149*adf684ceSLaurent Vivier
150*adf684ceSLaurent Vivier d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
151*adf684ceSLaurent Vivier }
152*adf684ceSLaurent Vivier
net_stream_data_client_connected(QIOTask * task,NetStreamData * d)153*adf684ceSLaurent Vivier int net_stream_data_client_connected(QIOTask *task, NetStreamData *d)
154*adf684ceSLaurent Vivier {
155*adf684ceSLaurent Vivier QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
156*adf684ceSLaurent Vivier SocketAddress *addr;
157*adf684ceSLaurent Vivier int ret;
158*adf684ceSLaurent Vivier Error *err = NULL;
159*adf684ceSLaurent Vivier
160*adf684ceSLaurent Vivier if (qio_task_propagate_error(task, &err)) {
161*adf684ceSLaurent Vivier qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
162*adf684ceSLaurent Vivier error_free(err);
163*adf684ceSLaurent Vivier goto error;
164*adf684ceSLaurent Vivier }
165*adf684ceSLaurent Vivier
166*adf684ceSLaurent Vivier addr = qio_channel_socket_get_remote_address(sioc, NULL);
167*adf684ceSLaurent Vivier g_assert(addr != NULL);
168*adf684ceSLaurent Vivier
169*adf684ceSLaurent Vivier ret = qemu_socket_try_set_nonblock(sioc->fd);
170*adf684ceSLaurent Vivier if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
171*adf684ceSLaurent Vivier qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
172*adf684ceSLaurent Vivier addr->u.fd.str, -ret);
173*adf684ceSLaurent Vivier qapi_free_SocketAddress(addr);
174*adf684ceSLaurent Vivier goto error;
175*adf684ceSLaurent Vivier }
176*adf684ceSLaurent Vivier g_assert(ret == 0);
177*adf684ceSLaurent Vivier qapi_free_SocketAddress(addr);
178*adf684ceSLaurent Vivier
179*adf684ceSLaurent Vivier net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
180*adf684ceSLaurent Vivier
181*adf684ceSLaurent Vivier /* Disable Nagle algorithm on TCP sockets to reduce latency */
182*adf684ceSLaurent Vivier qio_channel_set_delay(d->ioc, false);
183*adf684ceSLaurent Vivier
184*adf684ceSLaurent Vivier d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
185*adf684ceSLaurent Vivier d->nc.link_down = false;
186*adf684ceSLaurent Vivier
187*adf684ceSLaurent Vivier return 0;
188*adf684ceSLaurent Vivier error:
189*adf684ceSLaurent Vivier object_unref(OBJECT(d->ioc));
190*adf684ceSLaurent Vivier d->ioc = NULL;
191*adf684ceSLaurent Vivier
192*adf684ceSLaurent Vivier return -1;
193*adf684ceSLaurent Vivier }
194