1 /* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 13 14 #include <linux/net.h> 15 #include <linux/skbuff.h> 16 #include <linux/export.h> 17 #include <net/sock.h> 18 #include <net/af_rxrpc.h> 19 #include "ar-internal.h" 20 21 /* 22 * removal a call's user ID from the socket tree to make the user ID available 23 * again and so that it won't be seen again in association with that call 24 */ 25 void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 26 { 27 _debug("RELEASE CALL %d", call->debug_id); 28 29 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 30 write_lock_bh(&rx->call_lock); 31 rb_erase(&call->sock_node, &call->socket->calls); 32 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 33 write_unlock_bh(&rx->call_lock); 34 } 35 36 read_lock_bh(&call->state_lock); 37 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 38 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) 39 rxrpc_queue_call(call); 40 read_unlock_bh(&call->state_lock); 41 } 42 43 /* 44 * receive a message from an RxRPC socket 45 * - we need to be careful about two or more threads calling recvmsg 46 * simultaneously 47 */ 48 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 49 int flags) 50 { 51 struct rxrpc_skb_priv *sp; 52 struct rxrpc_call *call = NULL, *continue_call = NULL; 53 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 54 struct sk_buff *skb; 55 long timeo; 56 int copy, ret, ullen, offset, copied = 0; 57 u32 abort_code; 58 59 DEFINE_WAIT(wait); 60 61 _enter(",,,%zu,%d", len, flags); 62 63 if (flags & (MSG_OOB | MSG_TRUNC)) 64 return -EOPNOTSUPP; 65 66 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 67 68 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 69 msg->msg_flags |= MSG_MORE; 70 71 lock_sock(&rx->sk); 72 73 for (;;) { 74 /* return immediately if a client socket has no outstanding 75 * calls */ 76 if (RB_EMPTY_ROOT(&rx->calls)) { 77 if (copied) 78 goto out; 79 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 80 release_sock(&rx->sk); 81 if (continue_call) 82 rxrpc_put_call(continue_call); 83 return -ENODATA; 84 } 85 } 86 87 /* get the next message on the Rx queue */ 88 skb = skb_peek(&rx->sk.sk_receive_queue); 89 if (!skb) { 90 /* nothing remains on the queue */ 91 if (copied && 92 (flags & MSG_PEEK || timeo == 0)) 93 goto out; 94 95 /* wait for a message to turn up */ 96 release_sock(&rx->sk); 97 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 98 TASK_INTERRUPTIBLE); 99 ret = sock_error(&rx->sk); 100 if (ret) 101 goto wait_error; 102 103 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 104 if (signal_pending(current)) 105 goto wait_interrupted; 106 timeo = schedule_timeout(timeo); 107 } 108 finish_wait(sk_sleep(&rx->sk), &wait); 109 lock_sock(&rx->sk); 110 continue; 111 } 112 113 peek_next_packet: 114 sp = rxrpc_skb(skb); 115 call = sp->call; 116 ASSERT(call != NULL); 117 118 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 119 120 /* make sure we wait for the state to be updated in this call */ 121 spin_lock_bh(&call->lock); 122 spin_unlock_bh(&call->lock); 123 124 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 125 _debug("packet from released call"); 126 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 127 BUG(); 128 rxrpc_free_skb(skb); 129 continue; 130 } 131 132 /* determine whether to continue last data receive */ 133 if (continue_call) { 134 _debug("maybe cont"); 135 if (call != continue_call || 136 skb->mark != RXRPC_SKB_MARK_DATA) { 137 release_sock(&rx->sk); 138 rxrpc_put_call(continue_call); 139 _leave(" = %d [noncont]", copied); 140 return copied; 141 } 142 } 143 144 rxrpc_get_call(call); 145 146 /* copy the peer address and timestamp */ 147 if (!continue_call) { 148 if (msg->msg_name) { 149 size_t len = 150 sizeof(call->conn->params.peer->srx); 151 memcpy(msg->msg_name, 152 &call->conn->params.peer->srx, len); 153 msg->msg_namelen = len; 154 } 155 sock_recv_timestamp(msg, &rx->sk, skb); 156 } 157 158 /* receive the message */ 159 if (skb->mark != RXRPC_SKB_MARK_DATA) 160 goto receive_non_data_message; 161 162 _debug("recvmsg DATA #%u { %d, %d }", 163 sp->hdr.seq, skb->len, sp->offset); 164 165 if (!continue_call) { 166 /* only set the control data once per recvmsg() */ 167 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 168 ullen, &call->user_call_ID); 169 if (ret < 0) 170 goto copy_error; 171 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 172 } 173 174 ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv); 175 ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1); 176 call->rx_data_recv = sp->hdr.seq; 177 178 ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten); 179 180 offset = sp->offset; 181 copy = skb->len - offset; 182 if (copy > len - copied) 183 copy = len - copied; 184 185 ret = skb_copy_datagram_msg(skb, offset, msg, copy); 186 187 if (ret < 0) 188 goto copy_error; 189 190 /* handle piecemeal consumption of data packets */ 191 _debug("copied %d+%d", copy, copied); 192 193 offset += copy; 194 copied += copy; 195 196 if (!(flags & MSG_PEEK)) 197 sp->offset = offset; 198 199 if (sp->offset < skb->len) { 200 _debug("buffer full"); 201 ASSERTCMP(copied, ==, len); 202 break; 203 } 204 205 /* we transferred the whole data packet */ 206 if (!(flags & MSG_PEEK)) 207 rxrpc_kernel_data_consumed(call, skb); 208 209 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 210 _debug("last"); 211 if (rxrpc_conn_is_client(call->conn)) { 212 /* last byte of reply received */ 213 ret = copied; 214 goto terminal_message; 215 } 216 217 /* last bit of request received */ 218 if (!(flags & MSG_PEEK)) { 219 _debug("eat packet"); 220 if (skb_dequeue(&rx->sk.sk_receive_queue) != 221 skb) 222 BUG(); 223 rxrpc_free_skb(skb); 224 } 225 msg->msg_flags &= ~MSG_MORE; 226 break; 227 } 228 229 /* move on to the next data message */ 230 _debug("next"); 231 if (!continue_call) 232 continue_call = sp->call; 233 else 234 rxrpc_put_call(call); 235 call = NULL; 236 237 if (flags & MSG_PEEK) { 238 _debug("peek next"); 239 skb = skb->next; 240 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 241 break; 242 goto peek_next_packet; 243 } 244 245 _debug("eat packet"); 246 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 247 BUG(); 248 rxrpc_free_skb(skb); 249 } 250 251 /* end of non-terminal data packet reception for the moment */ 252 _debug("end rcv data"); 253 out: 254 release_sock(&rx->sk); 255 if (call) 256 rxrpc_put_call(call); 257 if (continue_call) 258 rxrpc_put_call(continue_call); 259 _leave(" = %d [data]", copied); 260 return copied; 261 262 /* handle non-DATA messages such as aborts, incoming connections and 263 * final ACKs */ 264 receive_non_data_message: 265 _debug("non-data"); 266 267 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 268 _debug("RECV NEW CALL"); 269 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 270 if (ret < 0) 271 goto copy_error; 272 if (!(flags & MSG_PEEK)) { 273 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 274 BUG(); 275 rxrpc_free_skb(skb); 276 } 277 goto out; 278 } 279 280 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 281 ullen, &call->user_call_ID); 282 if (ret < 0) 283 goto copy_error; 284 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 285 286 switch (skb->mark) { 287 case RXRPC_SKB_MARK_DATA: 288 BUG(); 289 case RXRPC_SKB_MARK_FINAL_ACK: 290 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 291 break; 292 case RXRPC_SKB_MARK_BUSY: 293 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 294 break; 295 case RXRPC_SKB_MARK_REMOTE_ABORT: 296 abort_code = call->remote_abort; 297 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 298 break; 299 case RXRPC_SKB_MARK_LOCAL_ABORT: 300 abort_code = call->local_abort; 301 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 302 break; 303 case RXRPC_SKB_MARK_NET_ERROR: 304 _debug("RECV NET ERROR %d", sp->error); 305 abort_code = sp->error; 306 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 307 break; 308 case RXRPC_SKB_MARK_LOCAL_ERROR: 309 _debug("RECV LOCAL ERROR %d", sp->error); 310 abort_code = sp->error; 311 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 312 &abort_code); 313 break; 314 default: 315 pr_err("Unknown packet mark %u\n", skb->mark); 316 BUG(); 317 break; 318 } 319 320 if (ret < 0) 321 goto copy_error; 322 323 terminal_message: 324 _debug("terminal"); 325 msg->msg_flags &= ~MSG_MORE; 326 msg->msg_flags |= MSG_EOR; 327 328 if (!(flags & MSG_PEEK)) { 329 _net("free terminal skb %p", skb); 330 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 331 BUG(); 332 rxrpc_free_skb(skb); 333 rxrpc_remove_user_ID(rx, call); 334 } 335 336 release_sock(&rx->sk); 337 rxrpc_put_call(call); 338 if (continue_call) 339 rxrpc_put_call(continue_call); 340 _leave(" = %d", ret); 341 return ret; 342 343 copy_error: 344 _debug("copy error"); 345 release_sock(&rx->sk); 346 rxrpc_put_call(call); 347 if (continue_call) 348 rxrpc_put_call(continue_call); 349 _leave(" = %d", ret); 350 return ret; 351 352 wait_interrupted: 353 ret = sock_intr_errno(timeo); 354 wait_error: 355 finish_wait(sk_sleep(&rx->sk), &wait); 356 if (continue_call) 357 rxrpc_put_call(continue_call); 358 if (copied) 359 copied = ret; 360 _leave(" = %d [waitfail %d]", copied, ret); 361 return copied; 362 363 } 364 365 /** 366 * rxrpc_kernel_is_data_last - Determine if data message is last one 367 * @skb: Message holding data 368 * 369 * Determine if data message is last one for the parent call. 370 */ 371 bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 372 { 373 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 374 375 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 376 377 return sp->hdr.flags & RXRPC_LAST_PACKET; 378 } 379 380 EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 381 382 /** 383 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 384 * @skb: Message indicating an abort 385 * 386 * Get the abort code from an RxRPC abort message. 387 */ 388 u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 389 { 390 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 391 392 switch (skb->mark) { 393 case RXRPC_SKB_MARK_REMOTE_ABORT: 394 return sp->call->remote_abort; 395 case RXRPC_SKB_MARK_LOCAL_ABORT: 396 return sp->call->local_abort; 397 default: 398 BUG(); 399 } 400 } 401 402 EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 403 404 /** 405 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 406 * @skb: Message indicating an error 407 * 408 * Get the error number from an RxRPC error message. 409 */ 410 int rxrpc_kernel_get_error_number(struct sk_buff *skb) 411 { 412 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 413 414 return sp->error; 415 } 416 417 EXPORT_SYMBOL(rxrpc_kernel_get_error_number); 418