1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Shared Memory Communications over RDMA (SMC-R) and RoCE 4 * 5 * Manage RMBE 6 * copy new RMBE data into user space 7 * 8 * Copyright IBM Corp. 2016 9 * 10 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> 11 */ 12 13 #include <linux/net.h> 14 #include <linux/rcupdate.h> 15 #include <linux/sched/signal.h> 16 17 #include <net/sock.h> 18 19 #include "smc.h" 20 #include "smc_core.h" 21 #include "smc_cdc.h" 22 #include "smc_tx.h" /* smc_tx_consumer_update() */ 23 #include "smc_rx.h" 24 25 /* callback implementation to wakeup consumers blocked with smc_rx_wait(). 26 * indirectly called by smc_cdc_msg_recv_action(). 27 */ 28 static void smc_rx_wake_up(struct sock *sk) 29 { 30 struct socket_wq *wq; 31 32 /* derived from sock_def_readable() */ 33 /* called already in smc_listen_work() */ 34 rcu_read_lock(); 35 wq = rcu_dereference(sk->sk_wq); 36 if (skwq_has_sleeper(wq)) 37 wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI | 38 EPOLLRDNORM | EPOLLRDBAND); 39 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); 40 if ((sk->sk_shutdown == SHUTDOWN_MASK) || 41 (sk->sk_state == SMC_CLOSED)) 42 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); 43 rcu_read_unlock(); 44 } 45 46 /* Update consumer cursor 47 * @conn connection to update 48 * @cons consumer cursor 49 * @len number of Bytes consumed 50 */ 51 static void smc_rx_update_consumer(struct smc_connection *conn, 52 union smc_host_cursor cons, size_t len) 53 { 54 smc_curs_add(conn->rmb_desc->len, &cons, len); 55 smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn), 56 conn); 57 /* send consumer cursor update if required */ 58 /* similar to advertising new TCP rcv_wnd if required */ 59 smc_tx_consumer_update(conn); 60 } 61 62 struct smc_spd_priv { 63 struct smc_sock *smc; 64 size_t len; 65 }; 66 67 static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, 68 struct pipe_buffer *buf) 69 { 70 struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; 71 struct smc_sock *smc = priv->smc; 72 struct smc_connection *conn; 73 union smc_host_cursor cons; 74 struct sock *sk = &smc->sk; 75 76 if (sk->sk_state == SMC_CLOSED || 77 sk->sk_state == SMC_PEERFINCLOSEWAIT || 78 sk->sk_state == SMC_APPFINCLOSEWAIT) 79 goto out; 80 conn = &smc->conn; 81 lock_sock(sk); 82 smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn), 83 conn); 84 smc_rx_update_consumer(conn, cons, priv->len); 85 release_sock(sk); 86 if (atomic_sub_and_test(priv->len, &conn->splice_pending)) 87 smc_rx_wake_up(sk); 88 out: 89 kfree(priv); 90 put_page(buf->page); 91 sock_put(sk); 92 } 93 94 static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe, 95 struct pipe_buffer *buf) 96 { 97 return 1; 98 } 99 100 static const struct pipe_buf_operations smc_pipe_ops = { 101 .can_merge = 0, 102 .confirm = generic_pipe_buf_confirm, 103 .release = smc_rx_pipe_buf_release, 104 .steal = smc_rx_pipe_buf_nosteal, 105 .get = generic_pipe_buf_get 106 }; 107 108 static void smc_rx_spd_release(struct splice_pipe_desc *spd, 109 unsigned int i) 110 { 111 put_page(spd->pages[i]); 112 } 113 114 static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len, 115 struct smc_sock *smc) 116 { 117 struct splice_pipe_desc spd; 118 struct partial_page partial; 119 struct smc_spd_priv *priv; 120 struct page *page; 121 int bytes; 122 123 page = virt_to_page(smc->conn.rmb_desc->cpu_addr); 124 priv = kzalloc(sizeof(*priv), GFP_KERNEL); 125 if (!priv) 126 return -ENOMEM; 127 priv->len = len; 128 priv->smc = smc; 129 partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr; 130 partial.len = len; 131 partial.private = (unsigned long)priv; 132 133 spd.nr_pages_max = 1; 134 spd.nr_pages = 1; 135 spd.pages = &page; 136 spd.partial = &partial; 137 spd.ops = &smc_pipe_ops; 138 spd.spd_release = smc_rx_spd_release; 139 140 bytes = splice_to_pipe(pipe, &spd); 141 if (bytes > 0) { 142 sock_hold(&smc->sk); 143 get_page(smc->conn.rmb_desc->pages); 144 atomic_add(bytes, &smc->conn.splice_pending); 145 } 146 147 return bytes; 148 } 149 150 static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn) 151 { 152 return atomic_read(&conn->bytes_to_rcv) && 153 !atomic_read(&conn->splice_pending); 154 } 155 156 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted 157 * @smc smc socket 158 * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout 159 * @fcrit add'l criterion to evaluate as function pointer 160 * Returns: 161 * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. 162 * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted). 163 */ 164 int smc_rx_wait(struct smc_sock *smc, long *timeo, 165 int (*fcrit)(struct smc_connection *conn)) 166 { 167 DEFINE_WAIT_FUNC(wait, woken_wake_function); 168 struct smc_connection *conn = &smc->conn; 169 struct sock *sk = &smc->sk; 170 int rc; 171 172 if (fcrit(conn)) 173 return 1; 174 sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); 175 add_wait_queue(sk_sleep(sk), &wait); 176 rc = sk_wait_event(sk, timeo, 177 sk->sk_err || 178 sk->sk_shutdown & RCV_SHUTDOWN || 179 fcrit(conn) || 180 smc_cdc_rxed_any_close_or_senddone(conn), 181 &wait); 182 remove_wait_queue(sk_sleep(sk), &wait); 183 sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); 184 return rc; 185 } 186 187 /* smc_rx_recvmsg - receive data from RMBE 188 * @msg: copy data to receive buffer 189 * @pipe: copy data to pipe if set - indicates splice() call 190 * 191 * rcvbuf consumer: main API called by socket layer. 192 * Called under sk lock. 193 */ 194 int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, 195 struct pipe_inode_info *pipe, size_t len, int flags) 196 { 197 size_t copylen, read_done = 0, read_remaining = len; 198 size_t chunk_len, chunk_off, chunk_len_sum; 199 struct smc_connection *conn = &smc->conn; 200 int (*func)(struct smc_connection *conn); 201 union smc_host_cursor cons; 202 int readable, chunk; 203 char *rcvbuf_base; 204 struct sock *sk; 205 int splbytes; 206 long timeo; 207 int target; /* Read at least these many bytes */ 208 int rc; 209 210 if (unlikely(flags & MSG_ERRQUEUE)) 211 return -EINVAL; /* future work for sk.sk_family == AF_SMC */ 212 if (flags & MSG_OOB) 213 return -EINVAL; /* future work */ 214 215 sk = &smc->sk; 216 if (sk->sk_state == SMC_LISTEN) 217 return -ENOTCONN; 218 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 219 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); 220 221 /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ 222 rcvbuf_base = conn->rmb_desc->cpu_addr; 223 224 do { /* while (read_remaining) */ 225 if (read_done >= target || (pipe && read_done)) 226 break; 227 228 if (atomic_read(&conn->bytes_to_rcv)) 229 goto copy; 230 231 if (sk->sk_shutdown & RCV_SHUTDOWN || 232 smc_cdc_rxed_any_close_or_senddone(conn) || 233 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort) 234 break; 235 236 if (read_done) { 237 if (sk->sk_err || 238 sk->sk_state == SMC_CLOSED || 239 !timeo || 240 signal_pending(current)) 241 break; 242 } else { 243 if (sk->sk_err) { 244 read_done = sock_error(sk); 245 break; 246 } 247 if (sk->sk_state == SMC_CLOSED) { 248 if (!sock_flag(sk, SOCK_DONE)) { 249 /* This occurs when user tries to read 250 * from never connected socket. 251 */ 252 read_done = -ENOTCONN; 253 break; 254 } 255 break; 256 } 257 if (signal_pending(current)) { 258 read_done = sock_intr_errno(timeo); 259 break; 260 } 261 if (!timeo) 262 return -EAGAIN; 263 } 264 265 if (!smc_rx_data_available(conn)) { 266 smc_rx_wait(smc, &timeo, smc_rx_data_available); 267 continue; 268 } 269 270 copy: 271 /* initialize variables for 1st iteration of subsequent loop */ 272 /* could be just 1 byte, even after waiting on data above */ 273 readable = atomic_read(&conn->bytes_to_rcv); 274 splbytes = atomic_read(&conn->splice_pending); 275 if (!readable || (msg && splbytes)) { 276 if (splbytes) 277 func = smc_rx_data_available_and_no_splice_pend; 278 else 279 func = smc_rx_data_available; 280 smc_rx_wait(smc, &timeo, func); 281 continue; 282 } 283 284 /* not more than what user space asked for */ 285 copylen = min_t(size_t, read_remaining, readable); 286 smc_curs_write(&cons, 287 smc_curs_read(&conn->local_tx_ctrl.cons, conn), 288 conn); 289 /* subsequent splice() calls pick up where previous left */ 290 if (splbytes) 291 smc_curs_add(conn->rmb_desc->len, &cons, splbytes); 292 /* determine chunks where to read from rcvbuf */ 293 /* either unwrapped case, or 1st chunk of wrapped case */ 294 chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - 295 cons.count); 296 chunk_len_sum = chunk_len; 297 chunk_off = cons.count; 298 smc_rmb_sync_sg_for_cpu(conn); 299 for (chunk = 0; chunk < 2; chunk++) { 300 if (!(flags & MSG_TRUNC)) { 301 if (msg) { 302 rc = memcpy_to_msg(msg, rcvbuf_base + 303 chunk_off, 304 chunk_len); 305 } else { 306 rc = smc_rx_splice(pipe, rcvbuf_base + 307 chunk_off, chunk_len, 308 smc); 309 } 310 if (rc < 0) { 311 if (!read_done) 312 read_done = -EFAULT; 313 smc_rmb_sync_sg_for_device(conn); 314 goto out; 315 } 316 } 317 read_remaining -= chunk_len; 318 read_done += chunk_len; 319 320 if (chunk_len_sum == copylen) 321 break; /* either on 1st or 2nd iteration */ 322 /* prepare next (== 2nd) iteration */ 323 chunk_len = copylen - chunk_len; /* remainder */ 324 chunk_len_sum += chunk_len; 325 chunk_off = 0; /* modulo offset in recv ring buffer */ 326 } 327 smc_rmb_sync_sg_for_device(conn); 328 329 /* update cursors */ 330 if (!(flags & MSG_PEEK)) { 331 /* increased in recv tasklet smc_cdc_msg_rcv() */ 332 smp_mb__before_atomic(); 333 atomic_sub(copylen, &conn->bytes_to_rcv); 334 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ 335 smp_mb__after_atomic(); 336 if (msg) 337 smc_rx_update_consumer(conn, cons, copylen); 338 } 339 } while (read_remaining); 340 out: 341 return read_done; 342 } 343 344 /* Initialize receive properties on connection establishment. NB: not __init! */ 345 void smc_rx_init(struct smc_sock *smc) 346 { 347 smc->sk.sk_data_ready = smc_rx_wake_up; 348 atomic_set(&smc->conn.splice_pending, 0); 349 } 350