1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* In-kernel rxperf server for testing purposes. 3 * 4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) "rxperf: " fmt 9 #include <linux/module.h> 10 #include <linux/slab.h> 11 #include <net/sock.h> 12 #include <net/af_rxrpc.h> 13 14 MODULE_DESCRIPTION("rxperf test server (afs)"); 15 MODULE_AUTHOR("Red Hat, Inc."); 16 MODULE_LICENSE("GPL"); 17 18 #define RXPERF_PORT 7009 19 #define RX_PERF_SERVICE 147 20 #define RX_PERF_VERSION 3 21 #define RX_PERF_SEND 0 22 #define RX_PERF_RECV 1 23 #define RX_PERF_RPC 3 24 #define RX_PERF_FILE 4 25 #define RX_PERF_MAGIC_COOKIE 0x4711 26 27 struct rxperf_proto_params { 28 __be32 version; 29 __be32 type; 30 __be32 rsize; 31 __be32 wsize; 32 } __packed; 33 34 static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 }; 35 static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 }; 36 37 enum rxperf_call_state { 38 RXPERF_CALL_SV_AWAIT_PARAMS, /* Server: Awaiting parameter block */ 39 RXPERF_CALL_SV_AWAIT_REQUEST, /* Server: Awaiting request data */ 40 RXPERF_CALL_SV_REPLYING, /* Server: Replying */ 41 RXPERF_CALL_SV_AWAIT_ACK, /* Server: Awaiting final ACK */ 42 RXPERF_CALL_COMPLETE, /* Completed or failed */ 43 }; 44 45 struct rxperf_call { 46 struct rxrpc_call *rxcall; 47 struct iov_iter iter; 48 struct kvec kvec[1]; 49 struct work_struct work; 50 const char *type; 51 size_t iov_len; 52 size_t req_len; /* Size of request blob */ 53 size_t reply_len; /* Size of reply blob */ 54 unsigned int debug_id; 55 unsigned int operation_id; 56 struct rxperf_proto_params params; 57 __be32 tmp[2]; 58 s32 abort_code; 59 enum rxperf_call_state state; 60 short error; 61 unsigned short unmarshal; 62 u16 service_id; 63 int (*deliver)(struct rxperf_call *call); 64 void (*processor)(struct work_struct *work); 65 }; 66 67 static struct socket *rxperf_socket; 68 static struct key *rxperf_sec_keyring; /* Ring of security/crypto keys */ 69 static struct workqueue_struct *rxperf_workqueue; 70 71 static void rxperf_deliver_to_call(struct work_struct *work); 72 static int rxperf_deliver_param_block(struct rxperf_call *call); 73 static int rxperf_deliver_request(struct rxperf_call *call); 74 static int rxperf_process_call(struct rxperf_call *call); 75 static void rxperf_charge_preallocation(struct work_struct *work); 76 77 static DECLARE_WORK(rxperf_charge_preallocation_work, 78 rxperf_charge_preallocation); 79 80 static inline void rxperf_set_call_state(struct rxperf_call *call, 81 enum rxperf_call_state to) 82 { 83 call->state = to; 84 } 85 86 static inline void rxperf_set_call_complete(struct rxperf_call *call, 87 int error, s32 remote_abort) 88 { 89 if (call->state != RXPERF_CALL_COMPLETE) { 90 call->abort_code = remote_abort; 91 call->error = error; 92 call->state = RXPERF_CALL_COMPLETE; 93 } 94 } 95 96 static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall, 97 unsigned long user_call_ID) 98 { 99 kfree((struct rxperf_call *)user_call_ID); 100 } 101 102 static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall, 103 unsigned long user_call_ID) 104 { 105 queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work); 106 } 107 108 static void rxperf_queue_call_work(struct rxperf_call *call) 109 { 110 queue_work(rxperf_workqueue, &call->work); 111 } 112 113 static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall, 114 unsigned long call_user_ID) 115 { 116 struct rxperf_call *call = (struct rxperf_call *)call_user_ID; 117 118 if (call->state != RXPERF_CALL_COMPLETE) 119 rxperf_queue_call_work(call); 120 } 121 122 static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID) 123 { 124 struct rxperf_call *call = (struct rxperf_call *)user_call_ID; 125 126 call->rxcall = rxcall; 127 } 128 129 static void rxperf_notify_end_reply_tx(struct sock *sock, 130 struct rxrpc_call *rxcall, 131 unsigned long call_user_ID) 132 { 133 rxperf_set_call_state((struct rxperf_call *)call_user_ID, 134 RXPERF_CALL_SV_AWAIT_ACK); 135 } 136 137 /* 138 * Charge the incoming call preallocation. 139 */ 140 static void rxperf_charge_preallocation(struct work_struct *work) 141 { 142 struct rxperf_call *call; 143 144 for (;;) { 145 call = kzalloc(sizeof(*call), GFP_KERNEL); 146 if (!call) 147 break; 148 149 call->type = "unset"; 150 call->debug_id = atomic_inc_return(&rxrpc_debug_id); 151 call->deliver = rxperf_deliver_param_block; 152 call->state = RXPERF_CALL_SV_AWAIT_PARAMS; 153 call->service_id = RX_PERF_SERVICE; 154 call->iov_len = sizeof(call->params); 155 call->kvec[0].iov_len = sizeof(call->params); 156 call->kvec[0].iov_base = &call->params; 157 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len); 158 INIT_WORK(&call->work, rxperf_deliver_to_call); 159 160 if (rxrpc_kernel_charge_accept(rxperf_socket, 161 rxperf_notify_rx, 162 rxperf_rx_attach, 163 (unsigned long)call, 164 GFP_KERNEL, 165 call->debug_id) < 0) 166 break; 167 call = NULL; 168 } 169 170 kfree(call); 171 } 172 173 /* 174 * Open an rxrpc socket and bind it to be a server for callback notifications 175 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT 176 */ 177 static int rxperf_open_socket(void) 178 { 179 struct sockaddr_rxrpc srx; 180 struct socket *socket; 181 int ret; 182 183 ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6, 184 &socket); 185 if (ret < 0) 186 goto error_1; 187 188 socket->sk->sk_allocation = GFP_NOFS; 189 190 /* bind the callback manager's address to make this a server socket */ 191 memset(&srx, 0, sizeof(srx)); 192 srx.srx_family = AF_RXRPC; 193 srx.srx_service = RX_PERF_SERVICE; 194 srx.transport_type = SOCK_DGRAM; 195 srx.transport_len = sizeof(srx.transport.sin6); 196 srx.transport.sin6.sin6_family = AF_INET6; 197 srx.transport.sin6.sin6_port = htons(RXPERF_PORT); 198 199 ret = rxrpc_sock_set_min_security_level(socket->sk, 200 RXRPC_SECURITY_ENCRYPT); 201 if (ret < 0) 202 goto error_2; 203 204 ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring); 205 206 ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx)); 207 if (ret < 0) 208 goto error_2; 209 210 rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call, 211 rxperf_rx_discard_new_call); 212 213 ret = kernel_listen(socket, INT_MAX); 214 if (ret < 0) 215 goto error_2; 216 217 rxperf_socket = socket; 218 rxperf_charge_preallocation(&rxperf_charge_preallocation_work); 219 return 0; 220 221 error_2: 222 sock_release(socket); 223 error_1: 224 pr_err("Can't set up rxperf socket: %d\n", ret); 225 return ret; 226 } 227 228 /* 229 * close the rxrpc socket rxperf was using 230 */ 231 static void rxperf_close_socket(void) 232 { 233 kernel_listen(rxperf_socket, 0); 234 kernel_sock_shutdown(rxperf_socket, SHUT_RDWR); 235 flush_workqueue(rxperf_workqueue); 236 sock_release(rxperf_socket); 237 } 238 239 /* 240 * Log remote abort codes that indicate that we have a protocol disagreement 241 * with the server. 242 */ 243 static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort) 244 { 245 static int max = 0; 246 const char *msg; 247 int m; 248 249 switch (remote_abort) { 250 case RX_EOF: msg = "unexpected EOF"; break; 251 case RXGEN_CC_MARSHAL: msg = "client marshalling"; break; 252 case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling"; break; 253 case RXGEN_SS_MARSHAL: msg = "server marshalling"; break; 254 case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling"; break; 255 case RXGEN_DECODE: msg = "opcode decode"; break; 256 case RXGEN_SS_XDRFREE: msg = "server XDR cleanup"; break; 257 case RXGEN_CC_XDRFREE: msg = "client XDR cleanup"; break; 258 case -32: msg = "insufficient data"; break; 259 default: 260 return; 261 } 262 263 m = max; 264 if (m < 3) { 265 max = m + 1; 266 pr_info("Peer reported %s failure on %s\n", msg, call->type); 267 } 268 } 269 270 /* 271 * deliver messages to a call 272 */ 273 static void rxperf_deliver_to_call(struct work_struct *work) 274 { 275 struct rxperf_call *call = container_of(work, struct rxperf_call, work); 276 enum rxperf_call_state state; 277 u32 abort_code, remote_abort = 0; 278 int ret = 0; 279 280 if (call->state == RXPERF_CALL_COMPLETE) 281 return; 282 283 while (state = call->state, 284 state == RXPERF_CALL_SV_AWAIT_PARAMS || 285 state == RXPERF_CALL_SV_AWAIT_REQUEST || 286 state == RXPERF_CALL_SV_AWAIT_ACK 287 ) { 288 if (state == RXPERF_CALL_SV_AWAIT_ACK) { 289 if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall)) 290 goto call_complete; 291 return; 292 } 293 294 ret = call->deliver(call); 295 if (ret == 0) 296 ret = rxperf_process_call(call); 297 298 switch (ret) { 299 case 0: 300 continue; 301 case -EINPROGRESS: 302 case -EAGAIN: 303 return; 304 case -ECONNABORTED: 305 rxperf_log_error(call, call->abort_code); 306 goto call_complete; 307 case -EOPNOTSUPP: 308 abort_code = RXGEN_OPCODE; 309 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 310 abort_code, ret, "GOP"); 311 goto call_complete; 312 case -ENOTSUPP: 313 abort_code = RX_USER_ABORT; 314 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 315 abort_code, ret, "GUA"); 316 goto call_complete; 317 case -EIO: 318 pr_err("Call %u in bad state %u\n", 319 call->debug_id, call->state); 320 fallthrough; 321 case -ENODATA: 322 case -EBADMSG: 323 case -EMSGSIZE: 324 case -ENOMEM: 325 case -EFAULT: 326 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 327 RXGEN_SS_UNMARSHAL, ret, "GUM"); 328 goto call_complete; 329 default: 330 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 331 RX_CALL_DEAD, ret, "GER"); 332 goto call_complete; 333 } 334 } 335 336 call_complete: 337 rxperf_set_call_complete(call, ret, remote_abort); 338 /* The call may have been requeued */ 339 rxrpc_kernel_end_call(rxperf_socket, call->rxcall); 340 cancel_work(&call->work); 341 kfree(call); 342 } 343 344 /* 345 * Extract a piece of data from the received data socket buffers. 346 */ 347 static int rxperf_extract_data(struct rxperf_call *call, bool want_more) 348 { 349 u32 remote_abort = 0; 350 int ret; 351 352 ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter, 353 &call->iov_len, want_more, &remote_abort, 354 &call->service_id); 355 pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n", 356 iov_iter_count(&call->iter), call->iov_len, want_more, ret); 357 if (ret == 0 || ret == -EAGAIN) 358 return ret; 359 360 if (ret == 1) { 361 switch (call->state) { 362 case RXPERF_CALL_SV_AWAIT_REQUEST: 363 rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING); 364 break; 365 case RXPERF_CALL_COMPLETE: 366 pr_debug("premature completion %d", call->error); 367 return call->error; 368 default: 369 break; 370 } 371 return 0; 372 } 373 374 rxperf_set_call_complete(call, ret, remote_abort); 375 return ret; 376 } 377 378 /* 379 * Grab the operation ID from an incoming manager call. 380 */ 381 static int rxperf_deliver_param_block(struct rxperf_call *call) 382 { 383 u32 version; 384 int ret; 385 386 /* Extract the parameter block */ 387 ret = rxperf_extract_data(call, true); 388 if (ret < 0) 389 return ret; 390 391 version = ntohl(call->params.version); 392 call->operation_id = ntohl(call->params.type); 393 call->deliver = rxperf_deliver_request; 394 395 if (version != RX_PERF_VERSION) { 396 pr_info("Version mismatch %x\n", version); 397 return -ENOTSUPP; 398 } 399 400 switch (call->operation_id) { 401 case RX_PERF_SEND: 402 call->type = "send"; 403 call->reply_len = 0; 404 call->iov_len = 4; /* Expect req size */ 405 break; 406 case RX_PERF_RECV: 407 call->type = "recv"; 408 call->req_len = 0; 409 call->iov_len = 4; /* Expect reply size */ 410 break; 411 case RX_PERF_RPC: 412 call->type = "rpc"; 413 call->iov_len = 8; /* Expect req size and reply size */ 414 break; 415 case RX_PERF_FILE: 416 call->type = "file"; 417 fallthrough; 418 default: 419 return -EOPNOTSUPP; 420 } 421 422 rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST); 423 return call->deliver(call); 424 } 425 426 /* 427 * Deliver the request data. 428 */ 429 static int rxperf_deliver_request(struct rxperf_call *call) 430 { 431 int ret; 432 433 switch (call->unmarshal) { 434 case 0: 435 call->kvec[0].iov_len = call->iov_len; 436 call->kvec[0].iov_base = call->tmp; 437 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len); 438 call->unmarshal++; 439 fallthrough; 440 case 1: 441 ret = rxperf_extract_data(call, true); 442 if (ret < 0) 443 return ret; 444 445 switch (call->operation_id) { 446 case RX_PERF_SEND: 447 call->type = "send"; 448 call->req_len = ntohl(call->tmp[0]); 449 call->reply_len = 0; 450 break; 451 case RX_PERF_RECV: 452 call->type = "recv"; 453 call->req_len = 0; 454 call->reply_len = ntohl(call->tmp[0]); 455 break; 456 case RX_PERF_RPC: 457 call->type = "rpc"; 458 call->req_len = ntohl(call->tmp[0]); 459 call->reply_len = ntohl(call->tmp[1]); 460 break; 461 default: 462 pr_info("Can't parse extra params\n"); 463 return -EIO; 464 } 465 466 pr_debug("CALL op=%s rq=%zx rp=%zx\n", 467 call->type, call->req_len, call->reply_len); 468 469 call->iov_len = call->req_len; 470 iov_iter_discard(&call->iter, READ, call->req_len); 471 call->unmarshal++; 472 fallthrough; 473 case 2: 474 ret = rxperf_extract_data(call, false); 475 if (ret < 0) 476 return ret; 477 call->unmarshal++; 478 fallthrough; 479 default: 480 return 0; 481 } 482 } 483 484 /* 485 * Process a call for which we've received the request. 486 */ 487 static int rxperf_process_call(struct rxperf_call *call) 488 { 489 struct msghdr msg = {}; 490 struct bio_vec bv[1]; 491 struct kvec iov[1]; 492 ssize_t n; 493 size_t reply_len = call->reply_len, len; 494 495 rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall, 496 reply_len + sizeof(rxperf_magic_cookie)); 497 498 while (reply_len > 0) { 499 len = min_t(size_t, reply_len, PAGE_SIZE); 500 bv[0].bv_page = ZERO_PAGE(0); 501 bv[0].bv_offset = 0; 502 bv[0].bv_len = len; 503 iov_iter_bvec(&msg.msg_iter, WRITE, bv, 1, len); 504 msg.msg_flags = MSG_MORE; 505 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, 506 len, rxperf_notify_end_reply_tx); 507 if (n < 0) 508 return n; 509 if (n == 0) 510 return -EIO; 511 reply_len -= n; 512 } 513 514 len = sizeof(rxperf_magic_cookie); 515 iov[0].iov_base = (void *)rxperf_magic_cookie; 516 iov[0].iov_len = len; 517 iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len); 518 msg.msg_flags = 0; 519 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len, 520 rxperf_notify_end_reply_tx); 521 if (n >= 0) 522 return 0; /* Success */ 523 524 if (n == -ENOMEM) 525 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 526 RXGEN_SS_MARSHAL, -ENOMEM, "GOM"); 527 return n; 528 } 529 530 /* 531 * Add a key to the security keyring. 532 */ 533 static int rxperf_add_key(struct key *keyring) 534 { 535 key_ref_t kref; 536 int ret; 537 538 kref = key_create_or_update(make_key_ref(keyring, true), 539 "rxrpc_s", 540 __stringify(RX_PERF_SERVICE) ":2", 541 secret, 542 sizeof(secret), 543 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH 544 | KEY_USR_VIEW, 545 KEY_ALLOC_NOT_IN_QUOTA); 546 547 if (IS_ERR(kref)) { 548 pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref)); 549 return PTR_ERR(kref); 550 } 551 552 ret = key_link(keyring, key_ref_to_ptr(kref)); 553 if (ret < 0) 554 pr_err("Can't link rxperf server key: %d\n", ret); 555 key_ref_put(kref); 556 return ret; 557 } 558 559 /* 560 * Initialise the rxperf server. 561 */ 562 static int __init rxperf_init(void) 563 { 564 struct key *keyring; 565 int ret = -ENOMEM; 566 567 pr_info("Server registering\n"); 568 569 rxperf_workqueue = alloc_workqueue("rxperf", 0, 0); 570 if (!rxperf_workqueue) 571 goto error_workqueue; 572 573 keyring = keyring_alloc("rxperf_server", 574 GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(), 575 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH | 576 KEY_POS_WRITE | 577 KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH | 578 KEY_USR_WRITE | 579 KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH, 580 KEY_ALLOC_NOT_IN_QUOTA, 581 NULL, NULL); 582 if (IS_ERR(keyring)) { 583 pr_err("Can't allocate rxperf server keyring: %ld\n", 584 PTR_ERR(keyring)); 585 goto error_keyring; 586 } 587 rxperf_sec_keyring = keyring; 588 ret = rxperf_add_key(keyring); 589 if (ret < 0) 590 goto error_key; 591 592 ret = rxperf_open_socket(); 593 if (ret < 0) 594 goto error_socket; 595 return 0; 596 597 error_socket: 598 error_key: 599 key_put(rxperf_sec_keyring); 600 error_keyring: 601 destroy_workqueue(rxperf_workqueue); 602 rcu_barrier(); 603 error_workqueue: 604 pr_err("Failed to register: %d\n", ret); 605 return ret; 606 } 607 late_initcall(rxperf_init); /* Must be called after net/ to create socket */ 608 609 static void __exit rxperf_exit(void) 610 { 611 pr_info("Server unregistering.\n"); 612 613 rxperf_close_socket(); 614 key_put(rxperf_sec_keyring); 615 destroy_workqueue(rxperf_workqueue); 616 rcu_barrier(); 617 } 618 module_exit(rxperf_exit); 619 620