1 /* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 13 14 #include <linux/net.h> 15 #include <linux/skbuff.h> 16 #include <linux/export.h> 17 #include <net/sock.h> 18 #include <net/af_rxrpc.h> 19 #include "ar-internal.h" 20 21 /* 22 * removal a call's user ID from the socket tree to make the user ID available 23 * again and so that it won't be seen again in association with that call 24 */ 25 void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 26 { 27 _debug("RELEASE CALL %d", call->debug_id); 28 29 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 30 write_lock_bh(&rx->call_lock); 31 rb_erase(&call->sock_node, &call->socket->calls); 32 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 33 write_unlock_bh(&rx->call_lock); 34 } 35 36 read_lock_bh(&call->state_lock); 37 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 38 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) 39 rxrpc_queue_call(call); 40 read_unlock_bh(&call->state_lock); 41 } 42 43 /* 44 * receive a message from an RxRPC socket 45 * - we need to be careful about two or more threads calling recvmsg 46 * simultaneously 47 */ 48 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 49 int flags) 50 { 51 struct rxrpc_skb_priv *sp; 52 struct rxrpc_call *call = NULL, *continue_call = NULL; 53 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 54 struct sk_buff *skb; 55 long timeo; 56 int copy, ret, ullen, offset, copied = 0; 57 u32 abort_code; 58 59 DEFINE_WAIT(wait); 60 61 _enter(",,,%zu,%d", len, flags); 62 63 if (flags & (MSG_OOB | MSG_TRUNC)) 64 return -EOPNOTSUPP; 65 66 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 67 68 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 69 msg->msg_flags |= MSG_MORE; 70 71 lock_sock(&rx->sk); 72 73 for (;;) { 74 /* return immediately if a client socket has no outstanding 75 * calls */ 76 if (RB_EMPTY_ROOT(&rx->calls)) { 77 if (copied) 78 goto out; 79 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 80 release_sock(&rx->sk); 81 if (continue_call) 82 rxrpc_put_call(continue_call); 83 return -ENODATA; 84 } 85 } 86 87 /* get the next message on the Rx queue */ 88 skb = skb_peek(&rx->sk.sk_receive_queue); 89 if (!skb) { 90 /* nothing remains on the queue */ 91 if (copied && 92 (flags & MSG_PEEK || timeo == 0)) 93 goto out; 94 95 /* wait for a message to turn up */ 96 release_sock(&rx->sk); 97 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 98 TASK_INTERRUPTIBLE); 99 ret = sock_error(&rx->sk); 100 if (ret) 101 goto wait_error; 102 103 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 104 if (signal_pending(current)) 105 goto wait_interrupted; 106 timeo = schedule_timeout(timeo); 107 } 108 finish_wait(sk_sleep(&rx->sk), &wait); 109 lock_sock(&rx->sk); 110 continue; 111 } 112 113 peek_next_packet: 114 rxrpc_see_skb(skb); 115 sp = rxrpc_skb(skb); 116 call = sp->call; 117 ASSERT(call != NULL); 118 119 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 120 121 /* make sure we wait for the state to be updated in this call */ 122 spin_lock_bh(&call->lock); 123 spin_unlock_bh(&call->lock); 124 125 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 126 _debug("packet from released call"); 127 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 128 BUG(); 129 rxrpc_free_skb(skb); 130 continue; 131 } 132 133 /* determine whether to continue last data receive */ 134 if (continue_call) { 135 _debug("maybe cont"); 136 if (call != continue_call || 137 skb->mark != RXRPC_SKB_MARK_DATA) { 138 release_sock(&rx->sk); 139 rxrpc_put_call(continue_call); 140 _leave(" = %d [noncont]", copied); 141 return copied; 142 } 143 } 144 145 rxrpc_get_call(call); 146 147 /* copy the peer address and timestamp */ 148 if (!continue_call) { 149 if (msg->msg_name) { 150 size_t len = 151 sizeof(call->conn->params.peer->srx); 152 memcpy(msg->msg_name, 153 &call->conn->params.peer->srx, len); 154 msg->msg_namelen = len; 155 } 156 sock_recv_timestamp(msg, &rx->sk, skb); 157 } 158 159 /* receive the message */ 160 if (skb->mark != RXRPC_SKB_MARK_DATA) 161 goto receive_non_data_message; 162 163 _debug("recvmsg DATA #%u { %d, %d }", 164 sp->hdr.seq, skb->len, sp->offset); 165 166 if (!continue_call) { 167 /* only set the control data once per recvmsg() */ 168 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 169 ullen, &call->user_call_ID); 170 if (ret < 0) 171 goto copy_error; 172 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 173 } 174 175 ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv); 176 ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1); 177 call->rx_data_recv = sp->hdr.seq; 178 179 ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten); 180 181 offset = sp->offset; 182 copy = skb->len - offset; 183 if (copy > len - copied) 184 copy = len - copied; 185 186 ret = skb_copy_datagram_msg(skb, offset, msg, copy); 187 188 if (ret < 0) 189 goto copy_error; 190 191 /* handle piecemeal consumption of data packets */ 192 _debug("copied %d+%d", copy, copied); 193 194 offset += copy; 195 copied += copy; 196 197 if (!(flags & MSG_PEEK)) 198 sp->offset = offset; 199 200 if (sp->offset < skb->len) { 201 _debug("buffer full"); 202 ASSERTCMP(copied, ==, len); 203 break; 204 } 205 206 /* we transferred the whole data packet */ 207 if (!(flags & MSG_PEEK)) 208 rxrpc_kernel_data_consumed(call, skb); 209 210 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 211 _debug("last"); 212 if (rxrpc_conn_is_client(call->conn)) { 213 /* last byte of reply received */ 214 ret = copied; 215 goto terminal_message; 216 } 217 218 /* last bit of request received */ 219 if (!(flags & MSG_PEEK)) { 220 _debug("eat packet"); 221 if (skb_dequeue(&rx->sk.sk_receive_queue) != 222 skb) 223 BUG(); 224 rxrpc_free_skb(skb); 225 } 226 msg->msg_flags &= ~MSG_MORE; 227 break; 228 } 229 230 /* move on to the next data message */ 231 _debug("next"); 232 if (!continue_call) 233 continue_call = sp->call; 234 else 235 rxrpc_put_call(call); 236 call = NULL; 237 238 if (flags & MSG_PEEK) { 239 _debug("peek next"); 240 skb = skb->next; 241 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 242 break; 243 goto peek_next_packet; 244 } 245 246 _debug("eat packet"); 247 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 248 BUG(); 249 rxrpc_free_skb(skb); 250 } 251 252 /* end of non-terminal data packet reception for the moment */ 253 _debug("end rcv data"); 254 out: 255 release_sock(&rx->sk); 256 if (call) 257 rxrpc_put_call(call); 258 if (continue_call) 259 rxrpc_put_call(continue_call); 260 _leave(" = %d [data]", copied); 261 return copied; 262 263 /* handle non-DATA messages such as aborts, incoming connections and 264 * final ACKs */ 265 receive_non_data_message: 266 _debug("non-data"); 267 268 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 269 _debug("RECV NEW CALL"); 270 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 271 if (ret < 0) 272 goto copy_error; 273 if (!(flags & MSG_PEEK)) { 274 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 275 BUG(); 276 rxrpc_free_skb(skb); 277 } 278 goto out; 279 } 280 281 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 282 ullen, &call->user_call_ID); 283 if (ret < 0) 284 goto copy_error; 285 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 286 287 switch (skb->mark) { 288 case RXRPC_SKB_MARK_DATA: 289 BUG(); 290 case RXRPC_SKB_MARK_FINAL_ACK: 291 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 292 break; 293 case RXRPC_SKB_MARK_BUSY: 294 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 295 break; 296 case RXRPC_SKB_MARK_REMOTE_ABORT: 297 abort_code = call->abort_code; 298 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 299 break; 300 case RXRPC_SKB_MARK_LOCAL_ABORT: 301 abort_code = call->abort_code; 302 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 303 if (call->error) { 304 abort_code = call->error; 305 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 306 &abort_code); 307 } 308 break; 309 case RXRPC_SKB_MARK_NET_ERROR: 310 _debug("RECV NET ERROR %d", sp->error); 311 abort_code = sp->error; 312 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 313 break; 314 case RXRPC_SKB_MARK_LOCAL_ERROR: 315 _debug("RECV LOCAL ERROR %d", sp->error); 316 abort_code = sp->error; 317 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 318 &abort_code); 319 break; 320 default: 321 pr_err("Unknown packet mark %u\n", skb->mark); 322 BUG(); 323 break; 324 } 325 326 if (ret < 0) 327 goto copy_error; 328 329 terminal_message: 330 _debug("terminal"); 331 msg->msg_flags &= ~MSG_MORE; 332 msg->msg_flags |= MSG_EOR; 333 334 if (!(flags & MSG_PEEK)) { 335 _net("free terminal skb %p", skb); 336 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 337 BUG(); 338 rxrpc_free_skb(skb); 339 rxrpc_remove_user_ID(rx, call); 340 } 341 342 release_sock(&rx->sk); 343 rxrpc_put_call(call); 344 if (continue_call) 345 rxrpc_put_call(continue_call); 346 _leave(" = %d", ret); 347 return ret; 348 349 copy_error: 350 _debug("copy error"); 351 release_sock(&rx->sk); 352 rxrpc_put_call(call); 353 if (continue_call) 354 rxrpc_put_call(continue_call); 355 _leave(" = %d", ret); 356 return ret; 357 358 wait_interrupted: 359 ret = sock_intr_errno(timeo); 360 wait_error: 361 finish_wait(sk_sleep(&rx->sk), &wait); 362 if (continue_call) 363 rxrpc_put_call(continue_call); 364 if (copied) 365 copied = ret; 366 _leave(" = %d [waitfail %d]", copied, ret); 367 return copied; 368 369 } 370 371 /** 372 * rxrpc_kernel_is_data_last - Determine if data message is last one 373 * @skb: Message holding data 374 * 375 * Determine if data message is last one for the parent call. 376 */ 377 bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 378 { 379 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 380 381 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 382 383 return sp->hdr.flags & RXRPC_LAST_PACKET; 384 } 385 386 EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 387 388 /** 389 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 390 * @skb: Message indicating an abort 391 * 392 * Get the abort code from an RxRPC abort message. 393 */ 394 u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 395 { 396 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 397 398 switch (skb->mark) { 399 case RXRPC_SKB_MARK_REMOTE_ABORT: 400 case RXRPC_SKB_MARK_LOCAL_ABORT: 401 return sp->call->abort_code; 402 default: 403 BUG(); 404 } 405 } 406 407 EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 408 409 /** 410 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 411 * @skb: Message indicating an error 412 * 413 * Get the error number from an RxRPC error message. 414 */ 415 int rxrpc_kernel_get_error_number(struct sk_buff *skb) 416 { 417 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 418 419 return sp->error; 420 } 421 422 EXPORT_SYMBOL(rxrpc_kernel_get_error_number); 423