1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Shared Memory Communications over RDMA (SMC-R) and RoCE 4 * 5 * Manage RMBE 6 * copy new RMBE data into user space 7 * 8 * Copyright IBM Corp. 2016 9 * 10 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> 11 */ 12 13 #include <linux/net.h> 14 #include <linux/rcupdate.h> 15 #include <linux/sched/signal.h> 16 17 #include <net/sock.h> 18 19 #include "smc.h" 20 #include "smc_core.h" 21 #include "smc_cdc.h" 22 #include "smc_tx.h" /* smc_tx_consumer_update() */ 23 #include "smc_rx.h" 24 25 /* callback implementation to wakeup consumers blocked with smc_rx_wait(). 26 * indirectly called by smc_cdc_msg_recv_action(). 27 */ 28 static void smc_rx_wake_up(struct sock *sk) 29 { 30 struct socket_wq *wq; 31 32 /* derived from sock_def_readable() */ 33 /* called already in smc_listen_work() */ 34 rcu_read_lock(); 35 wq = rcu_dereference(sk->sk_wq); 36 if (skwq_has_sleeper(wq)) 37 wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI | 38 EPOLLRDNORM | EPOLLRDBAND); 39 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); 40 if ((sk->sk_shutdown == SHUTDOWN_MASK) || 41 (sk->sk_state == SMC_CLOSED)) 42 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); 43 rcu_read_unlock(); 44 } 45 46 /* Update consumer cursor 47 * @conn connection to update 48 * @cons consumer cursor 49 * @len number of Bytes consumed 50 * Returns: 51 * 1 if we should end our receive, 0 otherwise 52 */ 53 static int smc_rx_update_consumer(struct smc_sock *smc, 54 union smc_host_cursor cons, size_t len) 55 { 56 struct smc_connection *conn = &smc->conn; 57 struct sock *sk = &smc->sk; 58 bool force = false; 59 int diff, rc = 0; 60 61 smc_curs_add(conn->rmb_desc->len, &cons, len); 62 63 /* did we process urgent data? */ 64 if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) { 65 diff = smc_curs_comp(conn->rmb_desc->len, &cons, 66 &conn->urg_curs); 67 if (sock_flag(sk, SOCK_URGINLINE)) { 68 if (diff == 0) { 69 force = true; 70 rc = 1; 71 conn->urg_state = SMC_URG_READ; 72 } 73 } else { 74 if (diff == 1) { 75 /* skip urgent byte */ 76 force = true; 77 smc_curs_add(conn->rmb_desc->len, &cons, 1); 78 conn->urg_rx_skip_pend = false; 79 } else if (diff < -1) 80 /* we read past urgent byte */ 81 conn->urg_state = SMC_URG_READ; 82 } 83 } 84 85 smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn); 86 87 /* send consumer cursor update if required */ 88 /* similar to advertising new TCP rcv_wnd if required */ 89 smc_tx_consumer_update(conn, force); 90 91 return rc; 92 } 93 94 static void smc_rx_update_cons(struct smc_sock *smc, size_t len) 95 { 96 struct smc_connection *conn = &smc->conn; 97 union smc_host_cursor cons; 98 99 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); 100 smc_rx_update_consumer(smc, cons, len); 101 } 102 103 struct smc_spd_priv { 104 struct smc_sock *smc; 105 size_t len; 106 }; 107 108 static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, 109 struct pipe_buffer *buf) 110 { 111 struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; 112 struct smc_sock *smc = priv->smc; 113 struct smc_connection *conn; 114 struct sock *sk = &smc->sk; 115 116 if (sk->sk_state == SMC_CLOSED || 117 sk->sk_state == SMC_PEERFINCLOSEWAIT || 118 sk->sk_state == SMC_APPFINCLOSEWAIT) 119 goto out; 120 conn = &smc->conn; 121 lock_sock(sk); 122 smc_rx_update_cons(smc, priv->len); 123 release_sock(sk); 124 if (atomic_sub_and_test(priv->len, &conn->splice_pending)) 125 smc_rx_wake_up(sk); 126 out: 127 kfree(priv); 128 put_page(buf->page); 129 sock_put(sk); 130 } 131 132 static const struct pipe_buf_operations smc_pipe_ops = { 133 .release = smc_rx_pipe_buf_release, 134 .get = generic_pipe_buf_get 135 }; 136 137 static void smc_rx_spd_release(struct splice_pipe_desc *spd, 138 unsigned int i) 139 { 140 put_page(spd->pages[i]); 141 } 142 143 static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len, 144 struct smc_sock *smc) 145 { 146 struct splice_pipe_desc spd; 147 struct partial_page partial; 148 struct smc_spd_priv *priv; 149 int bytes; 150 151 priv = kzalloc(sizeof(*priv), GFP_KERNEL); 152 if (!priv) 153 return -ENOMEM; 154 priv->len = len; 155 priv->smc = smc; 156 partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr; 157 partial.len = len; 158 partial.private = (unsigned long)priv; 159 160 spd.nr_pages_max = 1; 161 spd.nr_pages = 1; 162 spd.pages = &smc->conn.rmb_desc->pages; 163 spd.partial = &partial; 164 spd.ops = &smc_pipe_ops; 165 spd.spd_release = smc_rx_spd_release; 166 167 bytes = splice_to_pipe(pipe, &spd); 168 if (bytes > 0) { 169 sock_hold(&smc->sk); 170 get_page(smc->conn.rmb_desc->pages); 171 atomic_add(bytes, &smc->conn.splice_pending); 172 } 173 174 return bytes; 175 } 176 177 static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn) 178 { 179 return atomic_read(&conn->bytes_to_rcv) && 180 !atomic_read(&conn->splice_pending); 181 } 182 183 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted 184 * @smc smc socket 185 * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout 186 * @fcrit add'l criterion to evaluate as function pointer 187 * Returns: 188 * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. 189 * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted). 190 */ 191 int smc_rx_wait(struct smc_sock *smc, long *timeo, 192 int (*fcrit)(struct smc_connection *conn)) 193 { 194 DEFINE_WAIT_FUNC(wait, woken_wake_function); 195 struct smc_connection *conn = &smc->conn; 196 struct smc_cdc_conn_state_flags *cflags = 197 &conn->local_tx_ctrl.conn_state_flags; 198 struct sock *sk = &smc->sk; 199 int rc; 200 201 if (fcrit(conn)) 202 return 1; 203 sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); 204 add_wait_queue(sk_sleep(sk), &wait); 205 rc = sk_wait_event(sk, timeo, 206 sk->sk_err || 207 cflags->peer_conn_abort || 208 sk->sk_shutdown & RCV_SHUTDOWN || 209 conn->killed || 210 fcrit(conn), 211 &wait); 212 remove_wait_queue(sk_sleep(sk), &wait); 213 sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); 214 return rc; 215 } 216 217 static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, 218 int flags) 219 { 220 struct smc_connection *conn = &smc->conn; 221 union smc_host_cursor cons; 222 struct sock *sk = &smc->sk; 223 int rc = 0; 224 225 if (sock_flag(sk, SOCK_URGINLINE) || 226 !(conn->urg_state == SMC_URG_VALID) || 227 conn->urg_state == SMC_URG_READ) 228 return -EINVAL; 229 230 if (conn->urg_state == SMC_URG_VALID) { 231 if (!(flags & MSG_PEEK)) 232 smc->conn.urg_state = SMC_URG_READ; 233 msg->msg_flags |= MSG_OOB; 234 if (len > 0) { 235 if (!(flags & MSG_TRUNC)) 236 rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1); 237 len = 1; 238 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); 239 if (smc_curs_diff(conn->rmb_desc->len, &cons, 240 &conn->urg_curs) > 1) 241 conn->urg_rx_skip_pend = true; 242 /* Urgent Byte was already accounted for, but trigger 243 * skipping the urgent byte in non-inline case 244 */ 245 if (!(flags & MSG_PEEK)) 246 smc_rx_update_consumer(smc, cons, 0); 247 } else { 248 msg->msg_flags |= MSG_TRUNC; 249 } 250 251 return rc ? -EFAULT : len; 252 } 253 254 if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN) 255 return 0; 256 257 return -EAGAIN; 258 } 259 260 static bool smc_rx_recvmsg_data_available(struct smc_sock *smc) 261 { 262 struct smc_connection *conn = &smc->conn; 263 264 if (smc_rx_data_available(conn)) 265 return true; 266 else if (conn->urg_state == SMC_URG_VALID) 267 /* we received a single urgent Byte - skip */ 268 smc_rx_update_cons(smc, 0); 269 return false; 270 } 271 272 /* smc_rx_recvmsg - receive data from RMBE 273 * @msg: copy data to receive buffer 274 * @pipe: copy data to pipe if set - indicates splice() call 275 * 276 * rcvbuf consumer: main API called by socket layer. 277 * Called under sk lock. 278 */ 279 int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, 280 struct pipe_inode_info *pipe, size_t len, int flags) 281 { 282 size_t copylen, read_done = 0, read_remaining = len; 283 size_t chunk_len, chunk_off, chunk_len_sum; 284 struct smc_connection *conn = &smc->conn; 285 int (*func)(struct smc_connection *conn); 286 union smc_host_cursor cons; 287 int readable, chunk; 288 char *rcvbuf_base; 289 struct sock *sk; 290 int splbytes; 291 long timeo; 292 int target; /* Read at least these many bytes */ 293 int rc; 294 295 if (unlikely(flags & MSG_ERRQUEUE)) 296 return -EINVAL; /* future work for sk.sk_family == AF_SMC */ 297 298 sk = &smc->sk; 299 if (sk->sk_state == SMC_LISTEN) 300 return -ENOTCONN; 301 if (flags & MSG_OOB) 302 return smc_rx_recv_urg(smc, msg, len, flags); 303 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 304 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); 305 306 /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ 307 rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr; 308 309 do { /* while (read_remaining) */ 310 if (read_done >= target || (pipe && read_done)) 311 break; 312 313 if (conn->killed) 314 break; 315 316 if (smc_rx_recvmsg_data_available(smc)) 317 goto copy; 318 319 if (sk->sk_shutdown & RCV_SHUTDOWN) { 320 /* smc_cdc_msg_recv_action() could have run after 321 * above smc_rx_recvmsg_data_available() 322 */ 323 if (smc_rx_recvmsg_data_available(smc)) 324 goto copy; 325 break; 326 } 327 328 if (read_done) { 329 if (sk->sk_err || 330 sk->sk_state == SMC_CLOSED || 331 !timeo || 332 signal_pending(current)) 333 break; 334 } else { 335 if (sk->sk_err) { 336 read_done = sock_error(sk); 337 break; 338 } 339 if (sk->sk_state == SMC_CLOSED) { 340 if (!sock_flag(sk, SOCK_DONE)) { 341 /* This occurs when user tries to read 342 * from never connected socket. 343 */ 344 read_done = -ENOTCONN; 345 break; 346 } 347 break; 348 } 349 if (signal_pending(current)) { 350 read_done = sock_intr_errno(timeo); 351 break; 352 } 353 if (!timeo) 354 return -EAGAIN; 355 } 356 357 if (!smc_rx_data_available(conn)) { 358 smc_rx_wait(smc, &timeo, smc_rx_data_available); 359 continue; 360 } 361 362 copy: 363 /* initialize variables for 1st iteration of subsequent loop */ 364 /* could be just 1 byte, even after waiting on data above */ 365 readable = atomic_read(&conn->bytes_to_rcv); 366 splbytes = atomic_read(&conn->splice_pending); 367 if (!readable || (msg && splbytes)) { 368 if (splbytes) 369 func = smc_rx_data_available_and_no_splice_pend; 370 else 371 func = smc_rx_data_available; 372 smc_rx_wait(smc, &timeo, func); 373 continue; 374 } 375 376 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); 377 /* subsequent splice() calls pick up where previous left */ 378 if (splbytes) 379 smc_curs_add(conn->rmb_desc->len, &cons, splbytes); 380 if (conn->urg_state == SMC_URG_VALID && 381 sock_flag(&smc->sk, SOCK_URGINLINE) && 382 readable > 1) 383 readable--; /* always stop at urgent Byte */ 384 /* not more than what user space asked for */ 385 copylen = min_t(size_t, read_remaining, readable); 386 /* determine chunks where to read from rcvbuf */ 387 /* either unwrapped case, or 1st chunk of wrapped case */ 388 chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - 389 cons.count); 390 chunk_len_sum = chunk_len; 391 chunk_off = cons.count; 392 smc_rmb_sync_sg_for_cpu(conn); 393 for (chunk = 0; chunk < 2; chunk++) { 394 if (!(flags & MSG_TRUNC)) { 395 if (msg) { 396 rc = memcpy_to_msg(msg, rcvbuf_base + 397 chunk_off, 398 chunk_len); 399 } else { 400 rc = smc_rx_splice(pipe, rcvbuf_base + 401 chunk_off, chunk_len, 402 smc); 403 } 404 if (rc < 0) { 405 if (!read_done) 406 read_done = -EFAULT; 407 smc_rmb_sync_sg_for_device(conn); 408 goto out; 409 } 410 } 411 read_remaining -= chunk_len; 412 read_done += chunk_len; 413 414 if (chunk_len_sum == copylen) 415 break; /* either on 1st or 2nd iteration */ 416 /* prepare next (== 2nd) iteration */ 417 chunk_len = copylen - chunk_len; /* remainder */ 418 chunk_len_sum += chunk_len; 419 chunk_off = 0; /* modulo offset in recv ring buffer */ 420 } 421 smc_rmb_sync_sg_for_device(conn); 422 423 /* update cursors */ 424 if (!(flags & MSG_PEEK)) { 425 /* increased in recv tasklet smc_cdc_msg_rcv() */ 426 smp_mb__before_atomic(); 427 atomic_sub(copylen, &conn->bytes_to_rcv); 428 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ 429 smp_mb__after_atomic(); 430 if (msg && smc_rx_update_consumer(smc, cons, copylen)) 431 goto out; 432 } 433 } while (read_remaining); 434 out: 435 return read_done; 436 } 437 438 /* Initialize receive properties on connection establishment. NB: not __init! */ 439 void smc_rx_init(struct smc_sock *smc) 440 { 441 smc->sk.sk_data_ready = smc_rx_wake_up; 442 atomic_set(&smc->conn.splice_pending, 0); 443 smc->conn.urg_state = SMC_URG_READ; 444 } 445