1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * Copyright (C) 2004 Oracle. All rights reserved. 6 * 7 * This program is free software; you can redistribute it and/or 8 * modify it under the terms of the GNU General Public 9 * License as published by the Free Software Foundation; either 10 * version 2 of the License, or (at your option) any later version. 11 * 12 * This program is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * General Public License for more details. 16 * 17 * You should have received a copy of the GNU General Public 18 * License along with this program; if not, write to the 19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 20 * Boston, MA 021110-1307, USA. 21 * 22 * ---- 23 * 24 * Callers for this were originally written against a very simple synchronus 25 * API. This implementation reflects those simple callers. Some day I'm sure 26 * we'll need to move to a more robust posting/callback mechanism. 27 * 28 * Transmit calls pass in kernel virtual addresses and block copying this into 29 * the socket's tx buffers via a usual blocking sendmsg. They'll block waiting 30 * for a failed socket to timeout. TX callers can also pass in a poniter to an 31 * 'int' which gets filled with an errno off the wire in response to the 32 * message they send. 33 * 34 * Handlers for unsolicited messages are registered. Each socket has a page 35 * that incoming data is copied into. First the header, then the data. 36 * Handlers are called from only one thread with a reference to this per-socket 37 * page. This page is destroyed after the handler call, so it can't be 38 * referenced beyond the call. Handlers may block but are discouraged from 39 * doing so. 40 * 41 * Any framing errors (bad magic, large payload lengths) close a connection. 42 * 43 * Our sock_container holds the state we associate with a socket. It's current 44 * framing state is held there as well as the refcounting we do around when it 45 * is safe to tear down the socket. The socket is only finally torn down from 46 * the container when the container loses all of its references -- so as long 47 * as you hold a ref on the container you can trust that the socket is valid 48 * for use with kernel socket APIs. 49 * 50 * Connections are initiated between a pair of nodes when the node with the 51 * higher node number gets a heartbeat callback which indicates that the lower 52 * numbered node has started heartbeating. The lower numbered node is passive 53 * and only accepts the connection if the higher numbered node is heartbeating. 54 */ 55 56 #include <linux/kernel.h> 57 #include <linux/jiffies.h> 58 #include <linux/slab.h> 59 #include <linux/idr.h> 60 #include <linux/kref.h> 61 #include <net/tcp.h> 62 63 #include <asm/uaccess.h> 64 65 #include "heartbeat.h" 66 #include "tcp.h" 67 #include "nodemanager.h" 68 #define MLOG_MASK_PREFIX ML_TCP 69 #include "masklog.h" 70 #include "quorum.h" 71 72 #include "tcp_internal.h" 73 74 /* 75 * The linux network stack isn't sparse endian clean.. It has macros like 76 * ntohs() which perform the endian checks and structs like sockaddr_in 77 * which aren't annotated. So __force is found here to get the build 78 * clean. When they emerge from the dark ages and annotate the code 79 * we can remove these. 80 */ 81 82 #define SC_NODEF_FMT "node %s (num %u) at %u.%u.%u.%u:%u" 83 #define SC_NODEF_ARGS(sc) sc->sc_node->nd_name, sc->sc_node->nd_num, \ 84 NIPQUAD(sc->sc_node->nd_ipv4_address), \ 85 ntohs(sc->sc_node->nd_ipv4_port) 86 87 /* 88 * In the following two log macros, the whitespace after the ',' just 89 * before ##args is intentional. Otherwise, gcc 2.95 will eat the 90 * previous token if args expands to nothing. 91 */ 92 #define msglog(hdr, fmt, args...) do { \ 93 typeof(hdr) __hdr = (hdr); \ 94 mlog(ML_MSG, "[mag %u len %u typ %u stat %d sys_stat %d " \ 95 "key %08x num %u] " fmt, \ 96 be16_to_cpu(__hdr->magic), be16_to_cpu(__hdr->data_len), \ 97 be16_to_cpu(__hdr->msg_type), be32_to_cpu(__hdr->status), \ 98 be32_to_cpu(__hdr->sys_status), be32_to_cpu(__hdr->key), \ 99 be32_to_cpu(__hdr->msg_num) , ##args); \ 100 } while (0) 101 102 #define sclog(sc, fmt, args...) do { \ 103 typeof(sc) __sc = (sc); \ 104 mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p " \ 105 "pg_off %zu] " fmt, __sc, \ 106 atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock, \ 107 __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off , \ 108 ##args); \ 109 } while (0) 110 111 static rwlock_t o2net_handler_lock = RW_LOCK_UNLOCKED; 112 static struct rb_root o2net_handler_tree = RB_ROOT; 113 114 static struct o2net_node o2net_nodes[O2NM_MAX_NODES]; 115 116 /* XXX someday we'll need better accounting */ 117 static struct socket *o2net_listen_sock = NULL; 118 119 /* 120 * listen work is only queued by the listening socket callbacks on the 121 * o2net_wq. teardown detaches the callbacks before destroying the workqueue. 122 * quorum work is queued as sock containers are shutdown.. stop_listening 123 * tears down all the node's sock containers, preventing future shutdowns 124 * and queued quroum work, before canceling delayed quorum work and 125 * destroying the work queue. 126 */ 127 static struct workqueue_struct *o2net_wq; 128 static struct work_struct o2net_listen_work; 129 130 static struct o2hb_callback_func o2net_hb_up, o2net_hb_down; 131 #define O2NET_HB_PRI 0x1 132 133 static struct o2net_handshake *o2net_hand; 134 static struct o2net_msg *o2net_keep_req, *o2net_keep_resp; 135 136 static int o2net_sys_err_translations[O2NET_ERR_MAX] = 137 {[O2NET_ERR_NONE] = 0, 138 [O2NET_ERR_NO_HNDLR] = -ENOPROTOOPT, 139 [O2NET_ERR_OVERFLOW] = -EOVERFLOW, 140 [O2NET_ERR_DIED] = -EHOSTDOWN,}; 141 142 /* can't quite avoid *all* internal declarations :/ */ 143 static void o2net_sc_connect_completed(void *arg); 144 static void o2net_rx_until_empty(void *arg); 145 static void o2net_shutdown_sc(void *arg); 146 static void o2net_listen_data_ready(struct sock *sk, int bytes); 147 static void o2net_sc_send_keep_req(void *arg); 148 static void o2net_idle_timer(unsigned long data); 149 static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); 150 151 static inline int o2net_sys_err_to_errno(enum o2net_system_error err) 152 { 153 int trans; 154 BUG_ON(err >= O2NET_ERR_MAX); 155 trans = o2net_sys_err_translations[err]; 156 157 /* Just in case we mess up the translation table above */ 158 BUG_ON(err != O2NET_ERR_NONE && trans == 0); 159 return trans; 160 } 161 162 static struct o2net_node * o2net_nn_from_num(u8 node_num) 163 { 164 BUG_ON(node_num >= ARRAY_SIZE(o2net_nodes)); 165 return &o2net_nodes[node_num]; 166 } 167 168 static u8 o2net_num_from_nn(struct o2net_node *nn) 169 { 170 BUG_ON(nn == NULL); 171 return nn - o2net_nodes; 172 } 173 174 /* ------------------------------------------------------------ */ 175 176 static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw) 177 { 178 int ret = 0; 179 180 do { 181 if (!idr_pre_get(&nn->nn_status_idr, GFP_ATOMIC)) { 182 ret = -EAGAIN; 183 break; 184 } 185 spin_lock(&nn->nn_lock); 186 ret = idr_get_new(&nn->nn_status_idr, nsw, &nsw->ns_id); 187 if (ret == 0) 188 list_add_tail(&nsw->ns_node_item, 189 &nn->nn_status_list); 190 spin_unlock(&nn->nn_lock); 191 } while (ret == -EAGAIN); 192 193 if (ret == 0) { 194 init_waitqueue_head(&nsw->ns_wq); 195 nsw->ns_sys_status = O2NET_ERR_NONE; 196 nsw->ns_status = 0; 197 } 198 199 return ret; 200 } 201 202 static void o2net_complete_nsw_locked(struct o2net_node *nn, 203 struct o2net_status_wait *nsw, 204 enum o2net_system_error sys_status, 205 s32 status) 206 { 207 assert_spin_locked(&nn->nn_lock); 208 209 if (!list_empty(&nsw->ns_node_item)) { 210 list_del_init(&nsw->ns_node_item); 211 nsw->ns_sys_status = sys_status; 212 nsw->ns_status = status; 213 idr_remove(&nn->nn_status_idr, nsw->ns_id); 214 wake_up(&nsw->ns_wq); 215 } 216 } 217 218 static void o2net_complete_nsw(struct o2net_node *nn, 219 struct o2net_status_wait *nsw, 220 u64 id, enum o2net_system_error sys_status, 221 s32 status) 222 { 223 spin_lock(&nn->nn_lock); 224 if (nsw == NULL) { 225 if (id > INT_MAX) 226 goto out; 227 228 nsw = idr_find(&nn->nn_status_idr, id); 229 if (nsw == NULL) 230 goto out; 231 } 232 233 o2net_complete_nsw_locked(nn, nsw, sys_status, status); 234 235 out: 236 spin_unlock(&nn->nn_lock); 237 return; 238 } 239 240 static void o2net_complete_nodes_nsw(struct o2net_node *nn) 241 { 242 struct list_head *iter, *tmp; 243 unsigned int num_kills = 0; 244 struct o2net_status_wait *nsw; 245 246 assert_spin_locked(&nn->nn_lock); 247 248 list_for_each_safe(iter, tmp, &nn->nn_status_list) { 249 nsw = list_entry(iter, struct o2net_status_wait, ns_node_item); 250 o2net_complete_nsw_locked(nn, nsw, O2NET_ERR_DIED, 0); 251 num_kills++; 252 } 253 254 mlog(0, "completed %d messages for node %u\n", num_kills, 255 o2net_num_from_nn(nn)); 256 } 257 258 static int o2net_nsw_completed(struct o2net_node *nn, 259 struct o2net_status_wait *nsw) 260 { 261 int completed; 262 spin_lock(&nn->nn_lock); 263 completed = list_empty(&nsw->ns_node_item); 264 spin_unlock(&nn->nn_lock); 265 return completed; 266 } 267 268 /* ------------------------------------------------------------ */ 269 270 static void sc_kref_release(struct kref *kref) 271 { 272 struct o2net_sock_container *sc = container_of(kref, 273 struct o2net_sock_container, sc_kref); 274 sclog(sc, "releasing\n"); 275 276 if (sc->sc_sock) { 277 sock_release(sc->sc_sock); 278 sc->sc_sock = NULL; 279 } 280 281 o2nm_node_put(sc->sc_node); 282 sc->sc_node = NULL; 283 284 kfree(sc); 285 } 286 287 static void sc_put(struct o2net_sock_container *sc) 288 { 289 sclog(sc, "put\n"); 290 kref_put(&sc->sc_kref, sc_kref_release); 291 } 292 static void sc_get(struct o2net_sock_container *sc) 293 { 294 sclog(sc, "get\n"); 295 kref_get(&sc->sc_kref); 296 } 297 static struct o2net_sock_container *sc_alloc(struct o2nm_node *node) 298 { 299 struct o2net_sock_container *sc, *ret = NULL; 300 struct page *page = NULL; 301 302 page = alloc_page(GFP_NOFS); 303 sc = kcalloc(1, sizeof(*sc), GFP_NOFS); 304 if (sc == NULL || page == NULL) 305 goto out; 306 307 kref_init(&sc->sc_kref); 308 o2nm_node_get(node); 309 sc->sc_node = node; 310 311 INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc); 312 INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc); 313 INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc); 314 INIT_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req, sc); 315 316 init_timer(&sc->sc_idle_timeout); 317 sc->sc_idle_timeout.function = o2net_idle_timer; 318 sc->sc_idle_timeout.data = (unsigned long)sc; 319 320 sclog(sc, "alloced\n"); 321 322 ret = sc; 323 sc->sc_page = page; 324 sc = NULL; 325 page = NULL; 326 327 out: 328 if (page) 329 __free_page(page); 330 kfree(sc); 331 332 return ret; 333 } 334 335 /* ------------------------------------------------------------ */ 336 337 static void o2net_sc_queue_work(struct o2net_sock_container *sc, 338 struct work_struct *work) 339 { 340 sc_get(sc); 341 if (!queue_work(o2net_wq, work)) 342 sc_put(sc); 343 } 344 static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc, 345 struct work_struct *work, 346 int delay) 347 { 348 sc_get(sc); 349 if (!queue_delayed_work(o2net_wq, work, delay)) 350 sc_put(sc); 351 } 352 static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc, 353 struct work_struct *work) 354 { 355 if (cancel_delayed_work(work)) 356 sc_put(sc); 357 } 358 359 static void o2net_set_nn_state(struct o2net_node *nn, 360 struct o2net_sock_container *sc, 361 unsigned valid, int err) 362 { 363 int was_valid = nn->nn_sc_valid; 364 int was_err = nn->nn_persistent_error; 365 struct o2net_sock_container *old_sc = nn->nn_sc; 366 367 assert_spin_locked(&nn->nn_lock); 368 369 /* the node num comparison and single connect/accept path should stop 370 * an non-null sc from being overwritten with another */ 371 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); 372 mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid); 373 mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc); 374 375 /* we won't reconnect after our valid conn goes away for 376 * this hb iteration.. here so it shows up in the logs */ 377 if (was_valid && !valid && err == 0) 378 err = -ENOTCONN; 379 380 mlog(ML_CONN, "node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n", 381 o2net_num_from_nn(nn), nn->nn_sc, sc, nn->nn_sc_valid, valid, 382 nn->nn_persistent_error, err); 383 384 nn->nn_sc = sc; 385 nn->nn_sc_valid = valid ? 1 : 0; 386 nn->nn_persistent_error = err; 387 388 /* mirrors o2net_tx_can_proceed() */ 389 if (nn->nn_persistent_error || nn->nn_sc_valid) 390 wake_up(&nn->nn_sc_wq); 391 392 if (!was_err && nn->nn_persistent_error) { 393 o2quo_conn_err(o2net_num_from_nn(nn)); 394 queue_delayed_work(o2net_wq, &nn->nn_still_up, 395 msecs_to_jiffies(O2NET_QUORUM_DELAY_MS)); 396 } 397 398 if (was_valid && !valid) { 399 mlog(ML_NOTICE, "no longer connected to " SC_NODEF_FMT "\n", 400 SC_NODEF_ARGS(old_sc)); 401 o2net_complete_nodes_nsw(nn); 402 } 403 404 if (!was_valid && valid) { 405 o2quo_conn_up(o2net_num_from_nn(nn)); 406 /* this is a bit of a hack. we only try reconnecting 407 * when heartbeating starts until we get a connection. 408 * if that connection then dies we don't try reconnecting. 409 * the only way to start connecting again is to down 410 * heartbeat and bring it back up. */ 411 cancel_delayed_work(&nn->nn_connect_expired); 412 mlog(ML_NOTICE, "%s " SC_NODEF_FMT "\n", 413 o2nm_this_node() > sc->sc_node->nd_num ? 414 "connected to" : "accepted connection from", 415 SC_NODEF_ARGS(sc)); 416 } 417 418 /* trigger the connecting worker func as long as we're not valid, 419 * it will back off if it shouldn't connect. This can be called 420 * from node config teardown and so needs to be careful about 421 * the work queue actually being up. */ 422 if (!valid && o2net_wq) { 423 unsigned long delay; 424 /* delay if we're withing a RECONNECT_DELAY of the 425 * last attempt */ 426 delay = (nn->nn_last_connect_attempt + 427 msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) 428 - jiffies; 429 if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) 430 delay = 0; 431 mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay); 432 queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay); 433 } 434 435 /* keep track of the nn's sc ref for the caller */ 436 if ((old_sc == NULL) && sc) 437 sc_get(sc); 438 if (old_sc && (old_sc != sc)) { 439 o2net_sc_queue_work(old_sc, &old_sc->sc_shutdown_work); 440 sc_put(old_sc); 441 } 442 } 443 444 /* see o2net_register_callbacks() */ 445 static void o2net_data_ready(struct sock *sk, int bytes) 446 { 447 void (*ready)(struct sock *sk, int bytes); 448 449 read_lock(&sk->sk_callback_lock); 450 if (sk->sk_user_data) { 451 struct o2net_sock_container *sc = sk->sk_user_data; 452 sclog(sc, "data_ready hit\n"); 453 do_gettimeofday(&sc->sc_tv_data_ready); 454 o2net_sc_queue_work(sc, &sc->sc_rx_work); 455 ready = sc->sc_data_ready; 456 } else { 457 ready = sk->sk_data_ready; 458 } 459 read_unlock(&sk->sk_callback_lock); 460 461 ready(sk, bytes); 462 } 463 464 /* see o2net_register_callbacks() */ 465 static void o2net_state_change(struct sock *sk) 466 { 467 void (*state_change)(struct sock *sk); 468 struct o2net_sock_container *sc; 469 470 read_lock(&sk->sk_callback_lock); 471 sc = sk->sk_user_data; 472 if (sc == NULL) { 473 state_change = sk->sk_state_change; 474 goto out; 475 } 476 477 sclog(sc, "state_change to %d\n", sk->sk_state); 478 479 state_change = sc->sc_state_change; 480 481 switch(sk->sk_state) { 482 /* ignore connecting sockets as they make progress */ 483 case TCP_SYN_SENT: 484 case TCP_SYN_RECV: 485 break; 486 case TCP_ESTABLISHED: 487 o2net_sc_queue_work(sc, &sc->sc_connect_work); 488 break; 489 default: 490 o2net_sc_queue_work(sc, &sc->sc_shutdown_work); 491 break; 492 } 493 out: 494 read_unlock(&sk->sk_callback_lock); 495 state_change(sk); 496 } 497 498 /* 499 * we register callbacks so we can queue work on events before calling 500 * the original callbacks. our callbacks our careful to test user_data 501 * to discover when they've reaced with o2net_unregister_callbacks(). 502 */ 503 static void o2net_register_callbacks(struct sock *sk, 504 struct o2net_sock_container *sc) 505 { 506 write_lock_bh(&sk->sk_callback_lock); 507 508 /* accepted sockets inherit the old listen socket data ready */ 509 if (sk->sk_data_ready == o2net_listen_data_ready) { 510 sk->sk_data_ready = sk->sk_user_data; 511 sk->sk_user_data = NULL; 512 } 513 514 BUG_ON(sk->sk_user_data != NULL); 515 sk->sk_user_data = sc; 516 sc_get(sc); 517 518 sc->sc_data_ready = sk->sk_data_ready; 519 sc->sc_state_change = sk->sk_state_change; 520 sk->sk_data_ready = o2net_data_ready; 521 sk->sk_state_change = o2net_state_change; 522 523 write_unlock_bh(&sk->sk_callback_lock); 524 } 525 526 static int o2net_unregister_callbacks(struct sock *sk, 527 struct o2net_sock_container *sc) 528 { 529 int ret = 0; 530 531 write_lock_bh(&sk->sk_callback_lock); 532 if (sk->sk_user_data == sc) { 533 ret = 1; 534 sk->sk_user_data = NULL; 535 sk->sk_data_ready = sc->sc_data_ready; 536 sk->sk_state_change = sc->sc_state_change; 537 } 538 write_unlock_bh(&sk->sk_callback_lock); 539 540 return ret; 541 } 542 543 /* 544 * this is a little helper that is called by callers who have seen a problem 545 * with an sc and want to detach it from the nn if someone already hasn't beat 546 * them to it. if an error is given then the shutdown will be persistent 547 * and pending transmits will be canceled. 548 */ 549 static void o2net_ensure_shutdown(struct o2net_node *nn, 550 struct o2net_sock_container *sc, 551 int err) 552 { 553 spin_lock(&nn->nn_lock); 554 if (nn->nn_sc == sc) 555 o2net_set_nn_state(nn, NULL, 0, err); 556 spin_unlock(&nn->nn_lock); 557 } 558 559 /* 560 * This work queue function performs the blocking parts of socket shutdown. A 561 * few paths lead here. set_nn_state will trigger this callback if it sees an 562 * sc detached from the nn. state_change will also trigger this callback 563 * directly when it sees errors. In that case we need to call set_nn_state 564 * ourselves as state_change couldn't get the nn_lock and call set_nn_state 565 * itself. 566 */ 567 static void o2net_shutdown_sc(void *arg) 568 { 569 struct o2net_sock_container *sc = arg; 570 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); 571 572 sclog(sc, "shutting down\n"); 573 574 /* drop the callbacks ref and call shutdown only once */ 575 if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) { 576 /* we shouldn't flush as we're in the thread, the 577 * races with pending sc work structs are harmless */ 578 del_timer_sync(&sc->sc_idle_timeout); 579 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); 580 sc_put(sc); 581 sc->sc_sock->ops->shutdown(sc->sc_sock, 582 RCV_SHUTDOWN|SEND_SHUTDOWN); 583 } 584 585 /* not fatal so failed connects before the other guy has our 586 * heartbeat can be retried */ 587 o2net_ensure_shutdown(nn, sc, 0); 588 sc_put(sc); 589 } 590 591 /* ------------------------------------------------------------ */ 592 593 static int o2net_handler_cmp(struct o2net_msg_handler *nmh, u32 msg_type, 594 u32 key) 595 { 596 int ret = memcmp(&nmh->nh_key, &key, sizeof(key)); 597 598 if (ret == 0) 599 ret = memcmp(&nmh->nh_msg_type, &msg_type, sizeof(msg_type)); 600 601 return ret; 602 } 603 604 static struct o2net_msg_handler * 605 o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p, 606 struct rb_node **ret_parent) 607 { 608 struct rb_node **p = &o2net_handler_tree.rb_node; 609 struct rb_node *parent = NULL; 610 struct o2net_msg_handler *nmh, *ret = NULL; 611 int cmp; 612 613 while (*p) { 614 parent = *p; 615 nmh = rb_entry(parent, struct o2net_msg_handler, nh_node); 616 cmp = o2net_handler_cmp(nmh, msg_type, key); 617 618 if (cmp < 0) 619 p = &(*p)->rb_left; 620 else if (cmp > 0) 621 p = &(*p)->rb_right; 622 else { 623 ret = nmh; 624 break; 625 } 626 } 627 628 if (ret_p != NULL) 629 *ret_p = p; 630 if (ret_parent != NULL) 631 *ret_parent = parent; 632 633 return ret; 634 } 635 636 static void o2net_handler_kref_release(struct kref *kref) 637 { 638 struct o2net_msg_handler *nmh; 639 nmh = container_of(kref, struct o2net_msg_handler, nh_kref); 640 641 kfree(nmh); 642 } 643 644 static void o2net_handler_put(struct o2net_msg_handler *nmh) 645 { 646 kref_put(&nmh->nh_kref, o2net_handler_kref_release); 647 } 648 649 /* max_len is protection for the handler func. incoming messages won't 650 * be given to the handler if their payload is longer than the max. */ 651 int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, 652 o2net_msg_handler_func *func, void *data, 653 struct list_head *unreg_list) 654 { 655 struct o2net_msg_handler *nmh = NULL; 656 struct rb_node **p, *parent; 657 int ret = 0; 658 659 if (max_len > O2NET_MAX_PAYLOAD_BYTES) { 660 mlog(0, "max_len for message handler out of range: %u\n", 661 max_len); 662 ret = -EINVAL; 663 goto out; 664 } 665 666 if (!msg_type) { 667 mlog(0, "no message type provided: %u, %p\n", msg_type, func); 668 ret = -EINVAL; 669 goto out; 670 671 } 672 if (!func) { 673 mlog(0, "no message handler provided: %u, %p\n", 674 msg_type, func); 675 ret = -EINVAL; 676 goto out; 677 } 678 679 nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS); 680 if (nmh == NULL) { 681 ret = -ENOMEM; 682 goto out; 683 } 684 685 nmh->nh_func = func; 686 nmh->nh_func_data = data; 687 nmh->nh_msg_type = msg_type; 688 nmh->nh_max_len = max_len; 689 nmh->nh_key = key; 690 /* the tree and list get this ref.. they're both removed in 691 * unregister when this ref is dropped */ 692 kref_init(&nmh->nh_kref); 693 INIT_LIST_HEAD(&nmh->nh_unregister_item); 694 695 write_lock(&o2net_handler_lock); 696 if (o2net_handler_tree_lookup(msg_type, key, &p, &parent)) 697 ret = -EEXIST; 698 else { 699 rb_link_node(&nmh->nh_node, parent, p); 700 rb_insert_color(&nmh->nh_node, &o2net_handler_tree); 701 list_add_tail(&nmh->nh_unregister_item, unreg_list); 702 703 mlog(ML_TCP, "registered handler func %p type %u key %08x\n", 704 func, msg_type, key); 705 /* we've had some trouble with handlers seemingly vanishing. */ 706 mlog_bug_on_msg(o2net_handler_tree_lookup(msg_type, key, &p, 707 &parent) == NULL, 708 "couldn't find handler we *just* registerd " 709 "for type %u key %08x\n", msg_type, key); 710 } 711 write_unlock(&o2net_handler_lock); 712 if (ret) 713 goto out; 714 715 out: 716 if (ret) 717 kfree(nmh); 718 719 return ret; 720 } 721 EXPORT_SYMBOL_GPL(o2net_register_handler); 722 723 void o2net_unregister_handler_list(struct list_head *list) 724 { 725 struct list_head *pos, *n; 726 struct o2net_msg_handler *nmh; 727 728 write_lock(&o2net_handler_lock); 729 list_for_each_safe(pos, n, list) { 730 nmh = list_entry(pos, struct o2net_msg_handler, 731 nh_unregister_item); 732 mlog(ML_TCP, "unregistering handler func %p type %u key %08x\n", 733 nmh->nh_func, nmh->nh_msg_type, nmh->nh_key); 734 rb_erase(&nmh->nh_node, &o2net_handler_tree); 735 list_del_init(&nmh->nh_unregister_item); 736 kref_put(&nmh->nh_kref, o2net_handler_kref_release); 737 } 738 write_unlock(&o2net_handler_lock); 739 } 740 EXPORT_SYMBOL_GPL(o2net_unregister_handler_list); 741 742 static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key) 743 { 744 struct o2net_msg_handler *nmh; 745 746 read_lock(&o2net_handler_lock); 747 nmh = o2net_handler_tree_lookup(msg_type, key, NULL, NULL); 748 if (nmh) 749 kref_get(&nmh->nh_kref); 750 read_unlock(&o2net_handler_lock); 751 752 return nmh; 753 } 754 755 /* ------------------------------------------------------------ */ 756 757 static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len) 758 { 759 int ret; 760 mm_segment_t oldfs; 761 struct kvec vec = { 762 .iov_len = len, 763 .iov_base = data, 764 }; 765 struct msghdr msg = { 766 .msg_iovlen = 1, 767 .msg_iov = (struct iovec *)&vec, 768 .msg_flags = MSG_DONTWAIT, 769 }; 770 771 oldfs = get_fs(); 772 set_fs(get_ds()); 773 ret = sock_recvmsg(sock, &msg, len, msg.msg_flags); 774 set_fs(oldfs); 775 776 return ret; 777 } 778 779 static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec, 780 size_t veclen, size_t total) 781 { 782 int ret; 783 mm_segment_t oldfs; 784 struct msghdr msg = { 785 .msg_iov = (struct iovec *)vec, 786 .msg_iovlen = veclen, 787 }; 788 789 if (sock == NULL) { 790 ret = -EINVAL; 791 goto out; 792 } 793 794 oldfs = get_fs(); 795 set_fs(get_ds()); 796 ret = sock_sendmsg(sock, &msg, total); 797 set_fs(oldfs); 798 if (ret != total) { 799 mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, 800 total); 801 if (ret >= 0) 802 ret = -EPIPE; /* should be smarter, I bet */ 803 goto out; 804 } 805 806 ret = 0; 807 out: 808 if (ret < 0) 809 mlog(0, "returning error: %d\n", ret); 810 return ret; 811 } 812 813 static void o2net_sendpage(struct o2net_sock_container *sc, 814 void *kmalloced_virt, 815 size_t size) 816 { 817 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); 818 ssize_t ret; 819 820 821 ret = sc->sc_sock->ops->sendpage(sc->sc_sock, 822 virt_to_page(kmalloced_virt), 823 (long)kmalloced_virt & ~PAGE_MASK, 824 size, MSG_DONTWAIT); 825 if (ret != size) { 826 mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT 827 " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret); 828 o2net_ensure_shutdown(nn, sc, 0); 829 } 830 } 831 832 static void o2net_init_msg(struct o2net_msg *msg, u16 data_len, u16 msg_type, u32 key) 833 { 834 memset(msg, 0, sizeof(struct o2net_msg)); 835 msg->magic = cpu_to_be16(O2NET_MSG_MAGIC); 836 msg->data_len = cpu_to_be16(data_len); 837 msg->msg_type = cpu_to_be16(msg_type); 838 msg->sys_status = cpu_to_be32(O2NET_ERR_NONE); 839 msg->status = 0; 840 msg->key = cpu_to_be32(key); 841 } 842 843 static int o2net_tx_can_proceed(struct o2net_node *nn, 844 struct o2net_sock_container **sc_ret, 845 int *error) 846 { 847 int ret = 0; 848 849 spin_lock(&nn->nn_lock); 850 if (nn->nn_persistent_error) { 851 ret = 1; 852 *sc_ret = NULL; 853 *error = nn->nn_persistent_error; 854 } else if (nn->nn_sc_valid) { 855 kref_get(&nn->nn_sc->sc_kref); 856 857 ret = 1; 858 *sc_ret = nn->nn_sc; 859 *error = 0; 860 } 861 spin_unlock(&nn->nn_lock); 862 863 return ret; 864 } 865 866 int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec, 867 size_t caller_veclen, u8 target_node, int *status) 868 { 869 int ret, error = 0; 870 struct o2net_msg *msg = NULL; 871 size_t veclen, caller_bytes = 0; 872 struct kvec *vec = NULL; 873 struct o2net_sock_container *sc = NULL; 874 struct o2net_node *nn = o2net_nn_from_num(target_node); 875 struct o2net_status_wait nsw = { 876 .ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item), 877 }; 878 879 if (o2net_wq == NULL) { 880 mlog(0, "attempt to tx without o2netd running\n"); 881 ret = -ESRCH; 882 goto out; 883 } 884 885 if (caller_veclen == 0) { 886 mlog(0, "bad kvec array length\n"); 887 ret = -EINVAL; 888 goto out; 889 } 890 891 caller_bytes = iov_length((struct iovec *)caller_vec, caller_veclen); 892 if (caller_bytes > O2NET_MAX_PAYLOAD_BYTES) { 893 mlog(0, "total payload len %zu too large\n", caller_bytes); 894 ret = -EINVAL; 895 goto out; 896 } 897 898 if (target_node == o2nm_this_node()) { 899 ret = -ELOOP; 900 goto out; 901 } 902 903 ret = wait_event_interruptible(nn->nn_sc_wq, 904 o2net_tx_can_proceed(nn, &sc, &error)); 905 if (!ret && error) 906 ret = error; 907 if (ret) 908 goto out; 909 910 veclen = caller_veclen + 1; 911 vec = kmalloc(sizeof(struct kvec) * veclen, GFP_ATOMIC); 912 if (vec == NULL) { 913 mlog(0, "failed to %zu element kvec!\n", veclen); 914 ret = -ENOMEM; 915 goto out; 916 } 917 918 msg = kmalloc(sizeof(struct o2net_msg), GFP_ATOMIC); 919 if (!msg) { 920 mlog(0, "failed to allocate a o2net_msg!\n"); 921 ret = -ENOMEM; 922 goto out; 923 } 924 925 o2net_init_msg(msg, caller_bytes, msg_type, key); 926 927 vec[0].iov_len = sizeof(struct o2net_msg); 928 vec[0].iov_base = msg; 929 memcpy(&vec[1], caller_vec, caller_veclen * sizeof(struct kvec)); 930 931 ret = o2net_prep_nsw(nn, &nsw); 932 if (ret) 933 goto out; 934 935 msg->msg_num = cpu_to_be32(nsw.ns_id); 936 937 /* finally, convert the message header to network byte-order 938 * and send */ 939 ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen, 940 sizeof(struct o2net_msg) + caller_bytes); 941 msglog(msg, "sending returned %d\n", ret); 942 if (ret < 0) { 943 mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret); 944 goto out; 945 } 946 947 /* wait on other node's handler */ 948 wait_event(nsw.ns_wq, o2net_nsw_completed(nn, &nsw)); 949 950 /* Note that we avoid overwriting the callers status return 951 * variable if a system error was reported on the other 952 * side. Callers beware. */ 953 ret = o2net_sys_err_to_errno(nsw.ns_sys_status); 954 if (status && !ret) 955 *status = nsw.ns_status; 956 957 mlog(0, "woken, returning system status %d, user status %d\n", 958 ret, nsw.ns_status); 959 out: 960 if (sc) 961 sc_put(sc); 962 if (vec) 963 kfree(vec); 964 if (msg) 965 kfree(msg); 966 o2net_complete_nsw(nn, &nsw, 0, 0, 0); 967 return ret; 968 } 969 EXPORT_SYMBOL_GPL(o2net_send_message_vec); 970 971 int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len, 972 u8 target_node, int *status) 973 { 974 struct kvec vec = { 975 .iov_base = data, 976 .iov_len = len, 977 }; 978 return o2net_send_message_vec(msg_type, key, &vec, 1, 979 target_node, status); 980 } 981 EXPORT_SYMBOL_GPL(o2net_send_message); 982 983 static int o2net_send_status_magic(struct socket *sock, struct o2net_msg *hdr, 984 enum o2net_system_error syserr, int err) 985 { 986 struct kvec vec = { 987 .iov_base = hdr, 988 .iov_len = sizeof(struct o2net_msg), 989 }; 990 991 BUG_ON(syserr >= O2NET_ERR_MAX); 992 993 /* leave other fields intact from the incoming message, msg_num 994 * in particular */ 995 hdr->sys_status = cpu_to_be32(syserr); 996 hdr->status = cpu_to_be32(err); 997 hdr->magic = cpu_to_be16(O2NET_MSG_STATUS_MAGIC); // twiddle the magic 998 hdr->data_len = 0; 999 1000 msglog(hdr, "about to send status magic %d\n", err); 1001 /* hdr has been in host byteorder this whole time */ 1002 return o2net_send_tcp_msg(sock, &vec, 1, sizeof(struct o2net_msg)); 1003 } 1004 1005 /* this returns -errno if the header was unknown or too large, etc. 1006 * after this is called the buffer us reused for the next message */ 1007 static int o2net_process_message(struct o2net_sock_container *sc, 1008 struct o2net_msg *hdr) 1009 { 1010 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); 1011 int ret = 0, handler_status; 1012 enum o2net_system_error syserr; 1013 struct o2net_msg_handler *nmh = NULL; 1014 1015 msglog(hdr, "processing message\n"); 1016 1017 o2net_sc_postpone_idle(sc); 1018 1019 switch(be16_to_cpu(hdr->magic)) { 1020 case O2NET_MSG_STATUS_MAGIC: 1021 /* special type for returning message status */ 1022 o2net_complete_nsw(nn, NULL, 1023 be32_to_cpu(hdr->msg_num), 1024 be32_to_cpu(hdr->sys_status), 1025 be32_to_cpu(hdr->status)); 1026 goto out; 1027 case O2NET_MSG_KEEP_REQ_MAGIC: 1028 o2net_sendpage(sc, o2net_keep_resp, 1029 sizeof(*o2net_keep_resp)); 1030 goto out; 1031 case O2NET_MSG_KEEP_RESP_MAGIC: 1032 goto out; 1033 case O2NET_MSG_MAGIC: 1034 break; 1035 default: 1036 msglog(hdr, "bad magic\n"); 1037 ret = -EINVAL; 1038 goto out; 1039 break; 1040 } 1041 1042 /* find a handler for it */ 1043 handler_status = 0; 1044 nmh = o2net_handler_get(be16_to_cpu(hdr->msg_type), 1045 be32_to_cpu(hdr->key)); 1046 if (!nmh) { 1047 mlog(ML_TCP, "couldn't find handler for type %u key %08x\n", 1048 be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key)); 1049 syserr = O2NET_ERR_NO_HNDLR; 1050 goto out_respond; 1051 } 1052 1053 syserr = O2NET_ERR_NONE; 1054 1055 if (be16_to_cpu(hdr->data_len) > nmh->nh_max_len) 1056 syserr = O2NET_ERR_OVERFLOW; 1057 1058 if (syserr != O2NET_ERR_NONE) 1059 goto out_respond; 1060 1061 do_gettimeofday(&sc->sc_tv_func_start); 1062 sc->sc_msg_key = be32_to_cpu(hdr->key); 1063 sc->sc_msg_type = be16_to_cpu(hdr->msg_type); 1064 handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) + 1065 be16_to_cpu(hdr->data_len), 1066 nmh->nh_func_data); 1067 do_gettimeofday(&sc->sc_tv_func_stop); 1068 1069 out_respond: 1070 /* this destroys the hdr, so don't use it after this */ 1071 ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr, 1072 handler_status); 1073 hdr = NULL; 1074 mlog(0, "sending handler status %d, syserr %d returned %d\n", 1075 handler_status, syserr, ret); 1076 1077 out: 1078 if (nmh) 1079 o2net_handler_put(nmh); 1080 return ret; 1081 } 1082 1083 static int o2net_check_handshake(struct o2net_sock_container *sc) 1084 { 1085 struct o2net_handshake *hand = page_address(sc->sc_page); 1086 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); 1087 1088 if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) { 1089 mlog(ML_NOTICE, SC_NODEF_FMT " advertised net protocol " 1090 "version %llu but %llu is required, disconnecting\n", 1091 SC_NODEF_ARGS(sc), 1092 (unsigned long long)be64_to_cpu(hand->protocol_version), 1093 O2NET_PROTOCOL_VERSION); 1094 1095 /* don't bother reconnecting if its the wrong version. */ 1096 o2net_ensure_shutdown(nn, sc, -ENOTCONN); 1097 return -1; 1098 } 1099 1100 sc->sc_handshake_ok = 1; 1101 1102 spin_lock(&nn->nn_lock); 1103 /* set valid and queue the idle timers only if it hasn't been 1104 * shut down already */ 1105 if (nn->nn_sc == sc) { 1106 o2net_sc_postpone_idle(sc); 1107 o2net_set_nn_state(nn, sc, 1, 0); 1108 } 1109 spin_unlock(&nn->nn_lock); 1110 1111 /* shift everything up as though it wasn't there */ 1112 sc->sc_page_off -= sizeof(struct o2net_handshake); 1113 if (sc->sc_page_off) 1114 memmove(hand, hand + 1, sc->sc_page_off); 1115 1116 return 0; 1117 } 1118 1119 /* this demuxes the queued rx bytes into header or payload bits and calls 1120 * handlers as each full message is read off the socket. it returns -error, 1121 * == 0 eof, or > 0 for progress made.*/ 1122 static int o2net_advance_rx(struct o2net_sock_container *sc) 1123 { 1124 struct o2net_msg *hdr; 1125 int ret = 0; 1126 void *data; 1127 size_t datalen; 1128 1129 sclog(sc, "receiving\n"); 1130 do_gettimeofday(&sc->sc_tv_advance_start); 1131 1132 /* do we need more header? */ 1133 if (sc->sc_page_off < sizeof(struct o2net_msg)) { 1134 data = page_address(sc->sc_page) + sc->sc_page_off; 1135 datalen = sizeof(struct o2net_msg) - sc->sc_page_off; 1136 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); 1137 if (ret > 0) { 1138 sc->sc_page_off += ret; 1139 1140 /* this working relies on the handshake being 1141 * smaller than the normal message header */ 1142 if (sc->sc_page_off >= sizeof(struct o2net_handshake)&& 1143 !sc->sc_handshake_ok && o2net_check_handshake(sc)) { 1144 ret = -EPROTO; 1145 goto out; 1146 } 1147 1148 /* only swab incoming here.. we can 1149 * only get here once as we cross from 1150 * being under to over */ 1151 if (sc->sc_page_off == sizeof(struct o2net_msg)) { 1152 hdr = page_address(sc->sc_page); 1153 if (be16_to_cpu(hdr->data_len) > 1154 O2NET_MAX_PAYLOAD_BYTES) 1155 ret = -EOVERFLOW; 1156 } 1157 } 1158 if (ret <= 0) 1159 goto out; 1160 } 1161 1162 if (sc->sc_page_off < sizeof(struct o2net_msg)) { 1163 /* oof, still don't have a header */ 1164 goto out; 1165 } 1166 1167 /* this was swabbed above when we first read it */ 1168 hdr = page_address(sc->sc_page); 1169 1170 msglog(hdr, "at page_off %zu\n", sc->sc_page_off); 1171 1172 /* do we need more payload? */ 1173 if (sc->sc_page_off - sizeof(struct o2net_msg) < be16_to_cpu(hdr->data_len)) { 1174 /* need more payload */ 1175 data = page_address(sc->sc_page) + sc->sc_page_off; 1176 datalen = (sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len)) - 1177 sc->sc_page_off; 1178 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); 1179 if (ret > 0) 1180 sc->sc_page_off += ret; 1181 if (ret <= 0) 1182 goto out; 1183 } 1184 1185 if (sc->sc_page_off - sizeof(struct o2net_msg) == be16_to_cpu(hdr->data_len)) { 1186 /* we can only get here once, the first time we read 1187 * the payload.. so set ret to progress if the handler 1188 * works out. after calling this the message is toast */ 1189 ret = o2net_process_message(sc, hdr); 1190 if (ret == 0) 1191 ret = 1; 1192 sc->sc_page_off = 0; 1193 } 1194 1195 out: 1196 sclog(sc, "ret = %d\n", ret); 1197 do_gettimeofday(&sc->sc_tv_advance_stop); 1198 return ret; 1199 } 1200 1201 /* this work func is triggerd by data ready. it reads until it can read no 1202 * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing 1203 * our work the work struct will be marked and we'll be called again. */ 1204 static void o2net_rx_until_empty(void *arg) 1205 { 1206 struct o2net_sock_container *sc = arg; 1207 int ret; 1208 1209 do { 1210 ret = o2net_advance_rx(sc); 1211 } while (ret > 0); 1212 1213 if (ret <= 0 && ret != -EAGAIN) { 1214 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); 1215 sclog(sc, "saw error %d, closing\n", ret); 1216 /* not permanent so read failed handshake can retry */ 1217 o2net_ensure_shutdown(nn, sc, 0); 1218 } 1219 1220 sc_put(sc); 1221 } 1222 1223 static int o2net_set_nodelay(struct socket *sock) 1224 { 1225 int ret, val = 1; 1226 mm_segment_t oldfs; 1227 1228 oldfs = get_fs(); 1229 set_fs(KERNEL_DS); 1230 1231 /* 1232 * Dear unsuspecting programmer, 1233 * 1234 * Don't use sock_setsockopt() for SOL_TCP. It doesn't check its level 1235 * argument and assumes SOL_SOCKET so, say, your TCP_NODELAY will 1236 * silently turn into SO_DEBUG. 1237 * 1238 * Yours, 1239 * Keeper of hilariously fragile interfaces. 1240 */ 1241 ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY, 1242 (char __user *)&val, sizeof(val)); 1243 1244 set_fs(oldfs); 1245 return ret; 1246 } 1247 1248 /* ------------------------------------------------------------ */ 1249 1250 /* called when a connect completes and after a sock is accepted. the 1251 * rx path will see the response and mark the sc valid */ 1252 static void o2net_sc_connect_completed(void *arg) 1253 { 1254 struct o2net_sock_container *sc = arg; 1255 1256 mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n", 1257 (unsigned long long)O2NET_PROTOCOL_VERSION, 1258 (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); 1259 1260 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1261 sc_put(sc); 1262 } 1263 1264 /* this is called as a work_struct func. */ 1265 static void o2net_sc_send_keep_req(void *arg) 1266 { 1267 struct o2net_sock_container *sc = arg; 1268 1269 o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req)); 1270 sc_put(sc); 1271 } 1272 1273 /* socket shutdown does a del_timer_sync against this as it tears down. 1274 * we can't start this timer until we've got to the point in sc buildup 1275 * where shutdown is going to be involved */ 1276 static void o2net_idle_timer(unsigned long data) 1277 { 1278 struct o2net_sock_container *sc = (struct o2net_sock_container *)data; 1279 struct timeval now; 1280 1281 do_gettimeofday(&now); 1282 1283 mlog(ML_NOTICE, "connection to " SC_NODEF_FMT " has been idle for 10 " 1284 "seconds, shutting it down.\n", SC_NODEF_ARGS(sc)); 1285 mlog(ML_NOTICE, "here are some times that might help debug the " 1286 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " 1287 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", 1288 sc->sc_tv_timer.tv_sec, sc->sc_tv_timer.tv_usec, 1289 now.tv_sec, now.tv_usec, 1290 sc->sc_tv_data_ready.tv_sec, sc->sc_tv_data_ready.tv_usec, 1291 sc->sc_tv_advance_start.tv_sec, sc->sc_tv_advance_start.tv_usec, 1292 sc->sc_tv_advance_stop.tv_sec, sc->sc_tv_advance_stop.tv_usec, 1293 sc->sc_msg_key, sc->sc_msg_type, 1294 sc->sc_tv_func_start.tv_sec, sc->sc_tv_func_start.tv_usec, 1295 sc->sc_tv_func_stop.tv_sec, sc->sc_tv_func_stop.tv_usec); 1296 1297 o2net_sc_queue_work(sc, &sc->sc_shutdown_work); 1298 } 1299 1300 static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) 1301 { 1302 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); 1303 o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work, 1304 O2NET_KEEPALIVE_DELAY_SECS * HZ); 1305 do_gettimeofday(&sc->sc_tv_timer); 1306 mod_timer(&sc->sc_idle_timeout, 1307 jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ)); 1308 } 1309 1310 /* this work func is kicked whenever a path sets the nn state which doesn't 1311 * have valid set. This includes seeing hb come up, losing a connection, 1312 * having a connect attempt fail, etc. This centralizes the logic which decides 1313 * if a connect attempt should be made or if we should give up and all future 1314 * transmit attempts should fail */ 1315 static void o2net_start_connect(void *arg) 1316 { 1317 struct o2net_node *nn = arg; 1318 struct o2net_sock_container *sc = NULL; 1319 struct o2nm_node *node = NULL; 1320 struct socket *sock = NULL; 1321 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; 1322 int ret = 0; 1323 1324 /* if we're greater we initiate tx, otherwise we accept */ 1325 if (o2nm_this_node() <= o2net_num_from_nn(nn)) 1326 goto out; 1327 1328 /* watch for racing with tearing a node down */ 1329 node = o2nm_get_node_by_num(o2net_num_from_nn(nn)); 1330 if (node == NULL) { 1331 ret = 0; 1332 goto out; 1333 } 1334 1335 spin_lock(&nn->nn_lock); 1336 /* see if we already have one pending or have given up */ 1337 if (nn->nn_sc || nn->nn_persistent_error) 1338 arg = NULL; 1339 spin_unlock(&nn->nn_lock); 1340 if (arg == NULL) /* *shrug*, needed some indicator */ 1341 goto out; 1342 1343 nn->nn_last_connect_attempt = jiffies; 1344 1345 sc = sc_alloc(node); 1346 if (sc == NULL) { 1347 mlog(0, "couldn't allocate sc\n"); 1348 ret = -ENOMEM; 1349 goto out; 1350 } 1351 1352 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); 1353 if (ret < 0) { 1354 mlog(0, "can't create socket: %d\n", ret); 1355 goto out; 1356 } 1357 sc->sc_sock = sock; /* freed by sc_kref_release */ 1358 1359 sock->sk->sk_allocation = GFP_ATOMIC; 1360 1361 myaddr.sin_family = AF_INET; 1362 myaddr.sin_port = (__force u16)htons(0); /* any port */ 1363 1364 ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr, 1365 sizeof(myaddr)); 1366 if (ret) { 1367 mlog(0, "bind failed: %d\n", ret); 1368 goto out; 1369 } 1370 1371 ret = o2net_set_nodelay(sc->sc_sock); 1372 if (ret) { 1373 mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret); 1374 goto out; 1375 } 1376 1377 o2net_register_callbacks(sc->sc_sock->sk, sc); 1378 1379 spin_lock(&nn->nn_lock); 1380 /* handshake completion will set nn->nn_sc_valid */ 1381 o2net_set_nn_state(nn, sc, 0, 0); 1382 spin_unlock(&nn->nn_lock); 1383 1384 remoteaddr.sin_family = AF_INET; 1385 remoteaddr.sin_addr.s_addr = (__force u32)node->nd_ipv4_address; 1386 remoteaddr.sin_port = (__force u16)node->nd_ipv4_port; 1387 1388 ret = sc->sc_sock->ops->connect(sc->sc_sock, 1389 (struct sockaddr *)&remoteaddr, 1390 sizeof(remoteaddr), 1391 O_NONBLOCK); 1392 if (ret == -EINPROGRESS) 1393 ret = 0; 1394 1395 out: 1396 if (ret) { 1397 mlog(ML_NOTICE, "connect attempt to " SC_NODEF_FMT " failed " 1398 "with errno %d\n", SC_NODEF_ARGS(sc), ret); 1399 /* 0 err so that another will be queued and attempted 1400 * from set_nn_state */ 1401 if (sc) 1402 o2net_ensure_shutdown(nn, sc, 0); 1403 } 1404 if (sc) 1405 sc_put(sc); 1406 if (node) 1407 o2nm_node_put(node); 1408 1409 return; 1410 } 1411 1412 static void o2net_connect_expired(void *arg) 1413 { 1414 struct o2net_node *nn = arg; 1415 1416 spin_lock(&nn->nn_lock); 1417 if (!nn->nn_sc_valid) { 1418 mlog(ML_ERROR, "no connection established with node %u after " 1419 "%u seconds, giving up and returning errors.\n", 1420 o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS); 1421 1422 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); 1423 } 1424 spin_unlock(&nn->nn_lock); 1425 } 1426 1427 static void o2net_still_up(void *arg) 1428 { 1429 struct o2net_node *nn = arg; 1430 1431 o2quo_hb_still_up(o2net_num_from_nn(nn)); 1432 } 1433 1434 /* ------------------------------------------------------------ */ 1435 1436 void o2net_disconnect_node(struct o2nm_node *node) 1437 { 1438 struct o2net_node *nn = o2net_nn_from_num(node->nd_num); 1439 1440 /* don't reconnect until it's heartbeating again */ 1441 spin_lock(&nn->nn_lock); 1442 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); 1443 spin_unlock(&nn->nn_lock); 1444 1445 if (o2net_wq) { 1446 cancel_delayed_work(&nn->nn_connect_expired); 1447 cancel_delayed_work(&nn->nn_connect_work); 1448 cancel_delayed_work(&nn->nn_still_up); 1449 flush_workqueue(o2net_wq); 1450 } 1451 } 1452 1453 static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num, 1454 void *data) 1455 { 1456 o2quo_hb_down(node_num); 1457 1458 if (node_num != o2nm_this_node()) 1459 o2net_disconnect_node(node); 1460 } 1461 1462 static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, 1463 void *data) 1464 { 1465 struct o2net_node *nn = o2net_nn_from_num(node_num); 1466 1467 o2quo_hb_up(node_num); 1468 1469 /* ensure an immediate connect attempt */ 1470 nn->nn_last_connect_attempt = jiffies - 1471 (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1); 1472 1473 if (node_num != o2nm_this_node()) { 1474 /* heartbeat doesn't work unless a local node number is 1475 * configured and doing so brings up the o2net_wq, so we can 1476 * use it.. */ 1477 queue_delayed_work(o2net_wq, &nn->nn_connect_expired, 1478 O2NET_IDLE_TIMEOUT_SECS * HZ); 1479 1480 /* believe it or not, accept and node hearbeating testing 1481 * can succeed for this node before we got here.. so 1482 * only use set_nn_state to clear the persistent error 1483 * if that hasn't already happened */ 1484 spin_lock(&nn->nn_lock); 1485 if (nn->nn_persistent_error) 1486 o2net_set_nn_state(nn, NULL, 0, 0); 1487 spin_unlock(&nn->nn_lock); 1488 } 1489 } 1490 1491 void o2net_unregister_hb_callbacks(void) 1492 { 1493 int ret; 1494 1495 ret = o2hb_unregister_callback(&o2net_hb_up); 1496 if (ret < 0) 1497 mlog(ML_ERROR, "Status return %d unregistering heartbeat up " 1498 "callback!\n", ret); 1499 1500 ret = o2hb_unregister_callback(&o2net_hb_down); 1501 if (ret < 0) 1502 mlog(ML_ERROR, "Status return %d unregistering heartbeat down " 1503 "callback!\n", ret); 1504 } 1505 1506 int o2net_register_hb_callbacks(void) 1507 { 1508 int ret; 1509 1510 o2hb_setup_callback(&o2net_hb_down, O2HB_NODE_DOWN_CB, 1511 o2net_hb_node_down_cb, NULL, O2NET_HB_PRI); 1512 o2hb_setup_callback(&o2net_hb_up, O2HB_NODE_UP_CB, 1513 o2net_hb_node_up_cb, NULL, O2NET_HB_PRI); 1514 1515 ret = o2hb_register_callback(&o2net_hb_up); 1516 if (ret == 0) 1517 ret = o2hb_register_callback(&o2net_hb_down); 1518 1519 if (ret) 1520 o2net_unregister_hb_callbacks(); 1521 1522 return ret; 1523 } 1524 1525 /* ------------------------------------------------------------ */ 1526 1527 static int o2net_accept_one(struct socket *sock) 1528 { 1529 int ret, slen; 1530 struct sockaddr_in sin; 1531 struct socket *new_sock = NULL; 1532 struct o2nm_node *node = NULL; 1533 struct o2net_sock_container *sc = NULL; 1534 struct o2net_node *nn; 1535 1536 BUG_ON(sock == NULL); 1537 ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type, 1538 sock->sk->sk_protocol, &new_sock); 1539 if (ret) 1540 goto out; 1541 1542 new_sock->type = sock->type; 1543 new_sock->ops = sock->ops; 1544 ret = sock->ops->accept(sock, new_sock, O_NONBLOCK); 1545 if (ret < 0) 1546 goto out; 1547 1548 new_sock->sk->sk_allocation = GFP_ATOMIC; 1549 1550 ret = o2net_set_nodelay(new_sock); 1551 if (ret) { 1552 mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret); 1553 goto out; 1554 } 1555 1556 slen = sizeof(sin); 1557 ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin, 1558 &slen, 1); 1559 if (ret < 0) 1560 goto out; 1561 1562 node = o2nm_get_node_by_ip((__force __be32)sin.sin_addr.s_addr); 1563 if (node == NULL) { 1564 mlog(ML_NOTICE, "attempt to connect from unknown node at " 1565 "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr), 1566 ntohs((__force __be16)sin.sin_port)); 1567 ret = -EINVAL; 1568 goto out; 1569 } 1570 1571 if (o2nm_this_node() > node->nd_num) { 1572 mlog(ML_NOTICE, "unexpected connect attempted from a lower " 1573 "numbered node '%s' at " "%u.%u.%u.%u:%d with num %u\n", 1574 node->nd_name, NIPQUAD(sin.sin_addr.s_addr), 1575 ntohs((__force __be16)sin.sin_port), node->nd_num); 1576 ret = -EINVAL; 1577 goto out; 1578 } 1579 1580 /* this happens all the time when the other node sees our heartbeat 1581 * and tries to connect before we see their heartbeat */ 1582 if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) { 1583 mlog(ML_CONN, "attempt to connect from node '%s' at " 1584 "%u.%u.%u.%u:%d but it isn't heartbeating\n", 1585 node->nd_name, NIPQUAD(sin.sin_addr.s_addr), 1586 ntohs((__force __be16)sin.sin_port)); 1587 ret = -EINVAL; 1588 goto out; 1589 } 1590 1591 nn = o2net_nn_from_num(node->nd_num); 1592 1593 spin_lock(&nn->nn_lock); 1594 if (nn->nn_sc) 1595 ret = -EBUSY; 1596 else 1597 ret = 0; 1598 spin_unlock(&nn->nn_lock); 1599 if (ret) { 1600 mlog(ML_NOTICE, "attempt to connect from node '%s' at " 1601 "%u.%u.%u.%u:%d but it already has an open connection\n", 1602 node->nd_name, NIPQUAD(sin.sin_addr.s_addr), 1603 ntohs((__force __be16)sin.sin_port)); 1604 goto out; 1605 } 1606 1607 sc = sc_alloc(node); 1608 if (sc == NULL) { 1609 ret = -ENOMEM; 1610 goto out; 1611 } 1612 1613 sc->sc_sock = new_sock; 1614 new_sock = NULL; 1615 1616 spin_lock(&nn->nn_lock); 1617 o2net_set_nn_state(nn, sc, 0, 0); 1618 spin_unlock(&nn->nn_lock); 1619 1620 o2net_register_callbacks(sc->sc_sock->sk, sc); 1621 o2net_sc_queue_work(sc, &sc->sc_rx_work); 1622 1623 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1624 1625 out: 1626 if (new_sock) 1627 sock_release(new_sock); 1628 if (node) 1629 o2nm_node_put(node); 1630 if (sc) 1631 sc_put(sc); 1632 return ret; 1633 } 1634 1635 static void o2net_accept_many(void *arg) 1636 { 1637 struct socket *sock = arg; 1638 while (o2net_accept_one(sock) == 0) 1639 cond_resched(); 1640 } 1641 1642 static void o2net_listen_data_ready(struct sock *sk, int bytes) 1643 { 1644 void (*ready)(struct sock *sk, int bytes); 1645 1646 read_lock(&sk->sk_callback_lock); 1647 ready = sk->sk_user_data; 1648 if (ready == NULL) { /* check for teardown race */ 1649 ready = sk->sk_data_ready; 1650 goto out; 1651 } 1652 1653 /* ->sk_data_ready is also called for a newly established child socket 1654 * before it has been accepted and the acceptor has set up their 1655 * data_ready.. we only want to queue listen work for our listening 1656 * socket */ 1657 if (sk->sk_state == TCP_LISTEN) { 1658 mlog(ML_TCP, "bytes: %d\n", bytes); 1659 queue_work(o2net_wq, &o2net_listen_work); 1660 } 1661 1662 out: 1663 read_unlock(&sk->sk_callback_lock); 1664 ready(sk, bytes); 1665 } 1666 1667 static int o2net_open_listening_sock(__be16 port) 1668 { 1669 struct socket *sock = NULL; 1670 int ret; 1671 struct sockaddr_in sin = { 1672 .sin_family = PF_INET, 1673 .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) }, 1674 .sin_port = (__force u16)port, 1675 }; 1676 1677 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); 1678 if (ret < 0) { 1679 mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret); 1680 goto out; 1681 } 1682 1683 sock->sk->sk_allocation = GFP_ATOMIC; 1684 1685 write_lock_bh(&sock->sk->sk_callback_lock); 1686 sock->sk->sk_user_data = sock->sk->sk_data_ready; 1687 sock->sk->sk_data_ready = o2net_listen_data_ready; 1688 write_unlock_bh(&sock->sk->sk_callback_lock); 1689 1690 o2net_listen_sock = sock; 1691 INIT_WORK(&o2net_listen_work, o2net_accept_many, sock); 1692 1693 sock->sk->sk_reuse = 1; 1694 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); 1695 if (ret < 0) { 1696 mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n", 1697 ntohs(port), ret); 1698 goto out; 1699 } 1700 1701 ret = sock->ops->listen(sock, 64); 1702 if (ret < 0) { 1703 mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n", 1704 ntohs(port), ret); 1705 } 1706 1707 out: 1708 if (ret) { 1709 o2net_listen_sock = NULL; 1710 if (sock) 1711 sock_release(sock); 1712 } 1713 return ret; 1714 } 1715 1716 /* 1717 * called from node manager when we should bring up our network listening 1718 * socket. node manager handles all the serialization to only call this 1719 * once and to match it with o2net_stop_listening(). note, 1720 * o2nm_this_node() doesn't work yet as we're being called while it 1721 * is being set up. 1722 */ 1723 int o2net_start_listening(struct o2nm_node *node) 1724 { 1725 int ret = 0; 1726 1727 BUG_ON(o2net_wq != NULL); 1728 BUG_ON(o2net_listen_sock != NULL); 1729 1730 mlog(ML_KTHREAD, "starting o2net thread...\n"); 1731 o2net_wq = create_singlethread_workqueue("o2net"); 1732 if (o2net_wq == NULL) { 1733 mlog(ML_ERROR, "unable to launch o2net thread\n"); 1734 return -ENOMEM; /* ? */ 1735 } 1736 1737 ret = o2net_open_listening_sock(node->nd_ipv4_port); 1738 if (ret) { 1739 destroy_workqueue(o2net_wq); 1740 o2net_wq = NULL; 1741 } else 1742 o2quo_conn_up(node->nd_num); 1743 1744 return ret; 1745 } 1746 1747 /* again, o2nm_this_node() doesn't work here as we're involved in 1748 * tearing it down */ 1749 void o2net_stop_listening(struct o2nm_node *node) 1750 { 1751 struct socket *sock = o2net_listen_sock; 1752 size_t i; 1753 1754 BUG_ON(o2net_wq == NULL); 1755 BUG_ON(o2net_listen_sock == NULL); 1756 1757 /* stop the listening socket from generating work */ 1758 write_lock_bh(&sock->sk->sk_callback_lock); 1759 sock->sk->sk_data_ready = sock->sk->sk_user_data; 1760 sock->sk->sk_user_data = NULL; 1761 write_unlock_bh(&sock->sk->sk_callback_lock); 1762 1763 for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) { 1764 struct o2nm_node *node = o2nm_get_node_by_num(i); 1765 if (node) { 1766 o2net_disconnect_node(node); 1767 o2nm_node_put(node); 1768 } 1769 } 1770 1771 /* finish all work and tear down the work queue */ 1772 mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n"); 1773 destroy_workqueue(o2net_wq); 1774 o2net_wq = NULL; 1775 1776 sock_release(o2net_listen_sock); 1777 o2net_listen_sock = NULL; 1778 1779 o2quo_conn_err(node->nd_num); 1780 } 1781 1782 /* ------------------------------------------------------------ */ 1783 1784 int o2net_init(void) 1785 { 1786 unsigned long i; 1787 1788 o2quo_init(); 1789 1790 o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL); 1791 o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); 1792 o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); 1793 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) { 1794 kfree(o2net_hand); 1795 kfree(o2net_keep_req); 1796 kfree(o2net_keep_resp); 1797 return -ENOMEM; 1798 } 1799 1800 o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION); 1801 o2net_hand->connector_id = cpu_to_be64(1); 1802 1803 o2net_keep_req->magic = cpu_to_be16(O2NET_MSG_KEEP_REQ_MAGIC); 1804 o2net_keep_resp->magic = cpu_to_be16(O2NET_MSG_KEEP_RESP_MAGIC); 1805 1806 for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) { 1807 struct o2net_node *nn = o2net_nn_from_num(i); 1808 1809 spin_lock_init(&nn->nn_lock); 1810 INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn); 1811 INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn); 1812 INIT_WORK(&nn->nn_still_up, o2net_still_up, nn); 1813 /* until we see hb from a node we'll return einval */ 1814 nn->nn_persistent_error = -ENOTCONN; 1815 init_waitqueue_head(&nn->nn_sc_wq); 1816 idr_init(&nn->nn_status_idr); 1817 INIT_LIST_HEAD(&nn->nn_status_list); 1818 } 1819 1820 return 0; 1821 } 1822 1823 void o2net_exit(void) 1824 { 1825 o2quo_exit(); 1826 kfree(o2net_hand); 1827 kfree(o2net_keep_req); 1828 kfree(o2net_keep_resp); 1829 } 1830