1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmdomain.c 5 * 6 * defines domain join / leave apis 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 #include <linux/module.h> 28 #include <linux/types.h> 29 #include <linux/slab.h> 30 #include <linux/highmem.h> 31 #include <linux/utsname.h> 32 #include <linux/init.h> 33 #include <linux/spinlock.h> 34 #include <linux/delay.h> 35 #include <linux/err.h> 36 #include <linux/debugfs.h> 37 38 #include "cluster/heartbeat.h" 39 #include "cluster/nodemanager.h" 40 #include "cluster/tcp.h" 41 42 #include "dlmapi.h" 43 #include "dlmcommon.h" 44 #include "dlmdomain.h" 45 #include "dlmdebug.h" 46 47 #include "dlmver.h" 48 49 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 50 #include "cluster/masklog.h" 51 52 /* 53 * ocfs2 node maps are array of long int, which limits to send them freely 54 * across the wire due to endianness issues. To workaround this, we convert 55 * long ints to byte arrays. Following 3 routines are helper functions to 56 * set/test/copy bits within those array of bytes 57 */ 58 static inline void byte_set_bit(u8 nr, u8 map[]) 59 { 60 map[nr >> 3] |= (1UL << (nr & 7)); 61 } 62 63 static inline int byte_test_bit(u8 nr, u8 map[]) 64 { 65 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0; 66 } 67 68 static inline void byte_copymap(u8 dmap[], unsigned long smap[], 69 unsigned int sz) 70 { 71 unsigned int nn; 72 73 if (!sz) 74 return; 75 76 memset(dmap, 0, ((sz + 7) >> 3)); 77 for (nn = 0 ; nn < sz; nn++) 78 if (test_bit(nn, smap)) 79 byte_set_bit(nn, dmap); 80 } 81 82 static void dlm_free_pagevec(void **vec, int pages) 83 { 84 while (pages--) 85 free_page((unsigned long)vec[pages]); 86 kfree(vec); 87 } 88 89 static void **dlm_alloc_pagevec(int pages) 90 { 91 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL); 92 int i; 93 94 if (!vec) 95 return NULL; 96 97 for (i = 0; i < pages; i++) 98 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) 99 goto out_free; 100 101 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n", 102 pages, (unsigned long)DLM_HASH_PAGES, 103 (unsigned long)DLM_BUCKETS_PER_PAGE); 104 return vec; 105 out_free: 106 dlm_free_pagevec(vec, i); 107 return NULL; 108 } 109 110 /* 111 * 112 * spinlock lock ordering: if multiple locks are needed, obey this ordering: 113 * dlm_domain_lock 114 * struct dlm_ctxt->spinlock 115 * struct dlm_lock_resource->spinlock 116 * struct dlm_ctxt->master_lock 117 * struct dlm_ctxt->ast_lock 118 * dlm_master_list_entry->spinlock 119 * dlm_lock->spinlock 120 * 121 */ 122 123 DEFINE_SPINLOCK(dlm_domain_lock); 124 LIST_HEAD(dlm_domains); 125 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 126 127 /* 128 * The supported protocol version for DLM communication. Running domains 129 * will have a negotiated version with the same major number and a minor 130 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should 131 * be used to determine what a running domain is actually using. 132 */ 133 static const struct dlm_protocol_version dlm_protocol = { 134 .pv_major = 1, 135 .pv_minor = 0, 136 }; 137 138 #define DLM_DOMAIN_BACKOFF_MS 200 139 140 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 141 void **ret_data); 142 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 143 void **ret_data); 144 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 145 void **ret_data); 146 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 147 void **ret_data); 148 static int dlm_protocol_compare(struct dlm_protocol_version *existing, 149 struct dlm_protocol_version *request); 150 151 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 152 153 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) 154 { 155 if (!hlist_unhashed(&lockres->hash_node)) { 156 hlist_del_init(&lockres->hash_node); 157 dlm_lockres_put(lockres); 158 } 159 } 160 161 void __dlm_insert_lockres(struct dlm_ctxt *dlm, 162 struct dlm_lock_resource *res) 163 { 164 struct hlist_head *bucket; 165 struct qstr *q; 166 167 assert_spin_locked(&dlm->spinlock); 168 169 q = &res->lockname; 170 bucket = dlm_lockres_hash(dlm, q->hash); 171 172 /* get a reference for our hashtable */ 173 dlm_lockres_get(res); 174 175 hlist_add_head(&res->hash_node, bucket); 176 } 177 178 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, 179 const char *name, 180 unsigned int len, 181 unsigned int hash) 182 { 183 struct hlist_head *bucket; 184 struct hlist_node *list; 185 186 mlog_entry("%.*s\n", len, name); 187 188 assert_spin_locked(&dlm->spinlock); 189 190 bucket = dlm_lockres_hash(dlm, hash); 191 192 hlist_for_each(list, bucket) { 193 struct dlm_lock_resource *res = hlist_entry(list, 194 struct dlm_lock_resource, hash_node); 195 if (res->lockname.name[0] != name[0]) 196 continue; 197 if (unlikely(res->lockname.len != len)) 198 continue; 199 if (memcmp(res->lockname.name + 1, name + 1, len - 1)) 200 continue; 201 dlm_lockres_get(res); 202 return res; 203 } 204 return NULL; 205 } 206 207 /* intended to be called by functions which do not care about lock 208 * resources which are being purged (most net _handler functions). 209 * this will return NULL for any lock resource which is found but 210 * currently in the process of dropping its mastery reference. 211 * use __dlm_lookup_lockres_full when you need the lock resource 212 * regardless (e.g. dlm_get_lock_resource) */ 213 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 214 const char *name, 215 unsigned int len, 216 unsigned int hash) 217 { 218 struct dlm_lock_resource *res = NULL; 219 220 mlog_entry("%.*s\n", len, name); 221 222 assert_spin_locked(&dlm->spinlock); 223 224 res = __dlm_lookup_lockres_full(dlm, name, len, hash); 225 if (res) { 226 spin_lock(&res->spinlock); 227 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 228 spin_unlock(&res->spinlock); 229 dlm_lockres_put(res); 230 return NULL; 231 } 232 spin_unlock(&res->spinlock); 233 } 234 235 return res; 236 } 237 238 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 239 const char *name, 240 unsigned int len) 241 { 242 struct dlm_lock_resource *res; 243 unsigned int hash = dlm_lockid_hash(name, len); 244 245 spin_lock(&dlm->spinlock); 246 res = __dlm_lookup_lockres(dlm, name, len, hash); 247 spin_unlock(&dlm->spinlock); 248 return res; 249 } 250 251 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len) 252 { 253 struct dlm_ctxt *tmp = NULL; 254 struct list_head *iter; 255 256 assert_spin_locked(&dlm_domain_lock); 257 258 /* tmp->name here is always NULL terminated, 259 * but domain may not be! */ 260 list_for_each(iter, &dlm_domains) { 261 tmp = list_entry (iter, struct dlm_ctxt, list); 262 if (strlen(tmp->name) == len && 263 memcmp(tmp->name, domain, len)==0) 264 break; 265 tmp = NULL; 266 } 267 268 return tmp; 269 } 270 271 /* For null terminated domain strings ONLY */ 272 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain) 273 { 274 assert_spin_locked(&dlm_domain_lock); 275 276 return __dlm_lookup_domain_full(domain, strlen(domain)); 277 } 278 279 280 /* returns true on one of two conditions: 281 * 1) the domain does not exist 282 * 2) the domain exists and it's state is "joined" */ 283 static int dlm_wait_on_domain_helper(const char *domain) 284 { 285 int ret = 0; 286 struct dlm_ctxt *tmp = NULL; 287 288 spin_lock(&dlm_domain_lock); 289 290 tmp = __dlm_lookup_domain(domain); 291 if (!tmp) 292 ret = 1; 293 else if (tmp->dlm_state == DLM_CTXT_JOINED) 294 ret = 1; 295 296 spin_unlock(&dlm_domain_lock); 297 return ret; 298 } 299 300 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) 301 { 302 dlm_destroy_debugfs_subroot(dlm); 303 304 if (dlm->lockres_hash) 305 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 306 307 if (dlm->name) 308 kfree(dlm->name); 309 310 kfree(dlm); 311 } 312 313 /* A little strange - this function will be called while holding 314 * dlm_domain_lock and is expected to be holding it on the way out. We 315 * will however drop and reacquire it multiple times */ 316 static void dlm_ctxt_release(struct kref *kref) 317 { 318 struct dlm_ctxt *dlm; 319 320 dlm = container_of(kref, struct dlm_ctxt, dlm_refs); 321 322 BUG_ON(dlm->num_joins); 323 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED); 324 325 /* we may still be in the list if we hit an error during join. */ 326 list_del_init(&dlm->list); 327 328 spin_unlock(&dlm_domain_lock); 329 330 mlog(0, "freeing memory from domain %s\n", dlm->name); 331 332 wake_up(&dlm_domain_events); 333 334 dlm_free_ctxt_mem(dlm); 335 336 spin_lock(&dlm_domain_lock); 337 } 338 339 void dlm_put(struct dlm_ctxt *dlm) 340 { 341 spin_lock(&dlm_domain_lock); 342 kref_put(&dlm->dlm_refs, dlm_ctxt_release); 343 spin_unlock(&dlm_domain_lock); 344 } 345 346 static void __dlm_get(struct dlm_ctxt *dlm) 347 { 348 kref_get(&dlm->dlm_refs); 349 } 350 351 /* given a questionable reference to a dlm object, gets a reference if 352 * it can find it in the list, otherwise returns NULL in which case 353 * you shouldn't trust your pointer. */ 354 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm) 355 { 356 struct list_head *iter; 357 struct dlm_ctxt *target = NULL; 358 359 spin_lock(&dlm_domain_lock); 360 361 list_for_each(iter, &dlm_domains) { 362 target = list_entry (iter, struct dlm_ctxt, list); 363 364 if (target == dlm) { 365 __dlm_get(target); 366 break; 367 } 368 369 target = NULL; 370 } 371 372 spin_unlock(&dlm_domain_lock); 373 374 return target; 375 } 376 377 int dlm_domain_fully_joined(struct dlm_ctxt *dlm) 378 { 379 int ret; 380 381 spin_lock(&dlm_domain_lock); 382 ret = (dlm->dlm_state == DLM_CTXT_JOINED) || 383 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN); 384 spin_unlock(&dlm_domain_lock); 385 386 return ret; 387 } 388 389 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm) 390 { 391 if (dlm->dlm_worker) { 392 flush_workqueue(dlm->dlm_worker); 393 destroy_workqueue(dlm->dlm_worker); 394 dlm->dlm_worker = NULL; 395 } 396 } 397 398 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) 399 { 400 dlm_unregister_domain_handlers(dlm); 401 dlm_debug_shutdown(dlm); 402 dlm_complete_thread(dlm); 403 dlm_complete_recovery_thread(dlm); 404 dlm_destroy_dlm_worker(dlm); 405 406 /* We've left the domain. Now we can take ourselves out of the 407 * list and allow the kref stuff to help us free the 408 * memory. */ 409 spin_lock(&dlm_domain_lock); 410 list_del_init(&dlm->list); 411 spin_unlock(&dlm_domain_lock); 412 413 /* Wake up anyone waiting for us to remove this domain */ 414 wake_up(&dlm_domain_events); 415 } 416 417 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm) 418 { 419 int i, num, n, ret = 0; 420 struct dlm_lock_resource *res; 421 struct hlist_node *iter; 422 struct hlist_head *bucket; 423 int dropped; 424 425 mlog(0, "Migrating locks from domain %s\n", dlm->name); 426 427 num = 0; 428 spin_lock(&dlm->spinlock); 429 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 430 redo_bucket: 431 n = 0; 432 bucket = dlm_lockres_hash(dlm, i); 433 iter = bucket->first; 434 while (iter) { 435 n++; 436 res = hlist_entry(iter, struct dlm_lock_resource, 437 hash_node); 438 dlm_lockres_get(res); 439 /* migrate, if necessary. this will drop the dlm 440 * spinlock and retake it if it does migration. */ 441 dropped = dlm_empty_lockres(dlm, res); 442 443 spin_lock(&res->spinlock); 444 __dlm_lockres_calc_usage(dlm, res); 445 iter = res->hash_node.next; 446 spin_unlock(&res->spinlock); 447 448 dlm_lockres_put(res); 449 450 if (dropped) 451 goto redo_bucket; 452 } 453 cond_resched_lock(&dlm->spinlock); 454 num += n; 455 mlog(0, "%s: touched %d lockreses in bucket %d " 456 "(tot=%d)\n", dlm->name, n, i, num); 457 } 458 spin_unlock(&dlm->spinlock); 459 wake_up(&dlm->dlm_thread_wq); 460 461 /* let the dlm thread take care of purging, keep scanning until 462 * nothing remains in the hash */ 463 if (num) { 464 mlog(0, "%s: %d lock resources in hash last pass\n", 465 dlm->name, num); 466 ret = -EAGAIN; 467 } 468 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 469 return ret; 470 } 471 472 static int dlm_no_joining_node(struct dlm_ctxt *dlm) 473 { 474 int ret; 475 476 spin_lock(&dlm->spinlock); 477 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN; 478 spin_unlock(&dlm->spinlock); 479 480 return ret; 481 } 482 483 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm) 484 { 485 /* Yikes, a double spinlock! I need domain_lock for the dlm 486 * state and the dlm spinlock for join state... Sorry! */ 487 again: 488 spin_lock(&dlm_domain_lock); 489 spin_lock(&dlm->spinlock); 490 491 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 492 mlog(0, "Node %d is joining, we wait on it.\n", 493 dlm->joining_node); 494 spin_unlock(&dlm->spinlock); 495 spin_unlock(&dlm_domain_lock); 496 497 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm)); 498 goto again; 499 } 500 501 dlm->dlm_state = DLM_CTXT_LEAVING; 502 spin_unlock(&dlm->spinlock); 503 spin_unlock(&dlm_domain_lock); 504 } 505 506 static void __dlm_print_nodes(struct dlm_ctxt *dlm) 507 { 508 int node = -1; 509 510 assert_spin_locked(&dlm->spinlock); 511 512 printk(KERN_INFO "ocfs2_dlm: Nodes in domain (\"%s\"): ", dlm->name); 513 514 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 515 node + 1)) < O2NM_MAX_NODES) { 516 printk("%d ", node); 517 } 518 printk("\n"); 519 } 520 521 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 522 void **ret_data) 523 { 524 struct dlm_ctxt *dlm = data; 525 unsigned int node; 526 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; 527 528 mlog_entry("%p %u %p", msg, len, data); 529 530 if (!dlm_grab(dlm)) 531 return 0; 532 533 node = exit_msg->node_idx; 534 535 printk(KERN_INFO "ocfs2_dlm: Node %u leaves domain %s\n", node, dlm->name); 536 537 spin_lock(&dlm->spinlock); 538 clear_bit(node, dlm->domain_map); 539 __dlm_print_nodes(dlm); 540 541 /* notify anything attached to the heartbeat events */ 542 dlm_hb_event_notify_attached(dlm, node, 0); 543 544 spin_unlock(&dlm->spinlock); 545 546 dlm_put(dlm); 547 548 return 0; 549 } 550 551 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, 552 unsigned int node) 553 { 554 int status; 555 struct dlm_exit_domain leave_msg; 556 557 mlog(0, "Asking node %u if we can leave the domain %s me = %u\n", 558 node, dlm->name, dlm->node_num); 559 560 memset(&leave_msg, 0, sizeof(leave_msg)); 561 leave_msg.node_idx = dlm->node_num; 562 563 status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key, 564 &leave_msg, sizeof(leave_msg), node, 565 NULL); 566 567 mlog(0, "status return %d from o2net_send_message\n", status); 568 569 return status; 570 } 571 572 573 static void dlm_leave_domain(struct dlm_ctxt *dlm) 574 { 575 int node, clear_node, status; 576 577 /* At this point we've migrated away all our locks and won't 578 * accept mastership of new ones. The dlm is responsible for 579 * almost nothing now. We make sure not to confuse any joining 580 * nodes and then commence shutdown procedure. */ 581 582 spin_lock(&dlm->spinlock); 583 /* Clear ourselves from the domain map */ 584 clear_bit(dlm->node_num, dlm->domain_map); 585 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 586 0)) < O2NM_MAX_NODES) { 587 /* Drop the dlm spinlock. This is safe wrt the domain_map. 588 * -nodes cannot be added now as the 589 * query_join_handlers knows to respond with OK_NO_MAP 590 * -we catch the right network errors if a node is 591 * removed from the map while we're sending him the 592 * exit message. */ 593 spin_unlock(&dlm->spinlock); 594 595 clear_node = 1; 596 597 status = dlm_send_one_domain_exit(dlm, node); 598 if (status < 0 && 599 status != -ENOPROTOOPT && 600 status != -ENOTCONN) { 601 mlog(ML_NOTICE, "Error %d sending domain exit message " 602 "to node %d\n", status, node); 603 604 /* Not sure what to do here but lets sleep for 605 * a bit in case this was a transient 606 * error... */ 607 msleep(DLM_DOMAIN_BACKOFF_MS); 608 clear_node = 0; 609 } 610 611 spin_lock(&dlm->spinlock); 612 /* If we're not clearing the node bit then we intend 613 * to loop back around to try again. */ 614 if (clear_node) 615 clear_bit(node, dlm->domain_map); 616 } 617 spin_unlock(&dlm->spinlock); 618 } 619 620 int dlm_joined(struct dlm_ctxt *dlm) 621 { 622 int ret = 0; 623 624 spin_lock(&dlm_domain_lock); 625 626 if (dlm->dlm_state == DLM_CTXT_JOINED) 627 ret = 1; 628 629 spin_unlock(&dlm_domain_lock); 630 631 return ret; 632 } 633 634 int dlm_shutting_down(struct dlm_ctxt *dlm) 635 { 636 int ret = 0; 637 638 spin_lock(&dlm_domain_lock); 639 640 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN) 641 ret = 1; 642 643 spin_unlock(&dlm_domain_lock); 644 645 return ret; 646 } 647 648 void dlm_unregister_domain(struct dlm_ctxt *dlm) 649 { 650 int leave = 0; 651 struct dlm_lock_resource *res; 652 653 spin_lock(&dlm_domain_lock); 654 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED); 655 BUG_ON(!dlm->num_joins); 656 657 dlm->num_joins--; 658 if (!dlm->num_joins) { 659 /* We mark it "in shutdown" now so new register 660 * requests wait until we've completely left the 661 * domain. Don't use DLM_CTXT_LEAVING yet as we still 662 * want new domain joins to communicate with us at 663 * least until we've completed migration of our 664 * resources. */ 665 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN; 666 leave = 1; 667 } 668 spin_unlock(&dlm_domain_lock); 669 670 if (leave) { 671 mlog(0, "shutting down domain %s\n", dlm->name); 672 673 /* We changed dlm state, notify the thread */ 674 dlm_kick_thread(dlm, NULL); 675 676 while (dlm_migrate_all_locks(dlm)) { 677 /* Give dlm_thread time to purge the lockres' */ 678 msleep(500); 679 mlog(0, "%s: more migration to do\n", dlm->name); 680 } 681 682 /* This list should be empty. If not, print remaining lockres */ 683 if (!list_empty(&dlm->tracking_list)) { 684 mlog(ML_ERROR, "Following lockres' are still on the " 685 "tracking list:\n"); 686 list_for_each_entry(res, &dlm->tracking_list, tracking) 687 dlm_print_one_lock_resource(res); 688 } 689 690 dlm_mark_domain_leaving(dlm); 691 dlm_leave_domain(dlm); 692 dlm_complete_dlm_shutdown(dlm); 693 } 694 dlm_put(dlm); 695 } 696 EXPORT_SYMBOL_GPL(dlm_unregister_domain); 697 698 static int dlm_query_join_proto_check(char *proto_type, int node, 699 struct dlm_protocol_version *ours, 700 struct dlm_protocol_version *request) 701 { 702 int rc; 703 struct dlm_protocol_version proto = *request; 704 705 if (!dlm_protocol_compare(ours, &proto)) { 706 mlog(0, 707 "node %u wanted to join with %s locking protocol " 708 "%u.%u, we respond with %u.%u\n", 709 node, proto_type, 710 request->pv_major, 711 request->pv_minor, 712 proto.pv_major, proto.pv_minor); 713 request->pv_minor = proto.pv_minor; 714 rc = 0; 715 } else { 716 mlog(ML_NOTICE, 717 "Node %u wanted to join with %s locking " 718 "protocol %u.%u, but we have %u.%u, disallowing\n", 719 node, proto_type, 720 request->pv_major, 721 request->pv_minor, 722 ours->pv_major, 723 ours->pv_minor); 724 rc = 1; 725 } 726 727 return rc; 728 } 729 730 /* 731 * struct dlm_query_join_packet is made up of four one-byte fields. They 732 * are effectively in big-endian order already. However, little-endian 733 * machines swap them before putting the packet on the wire (because 734 * query_join's response is a status, and that status is treated as a u32 735 * on the wire). Thus, a big-endian and little-endian machines will treat 736 * this structure differently. 737 * 738 * The solution is to have little-endian machines swap the structure when 739 * converting from the structure to the u32 representation. This will 740 * result in the structure having the correct format on the wire no matter 741 * the host endian format. 742 */ 743 static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet, 744 u32 *wire) 745 { 746 union dlm_query_join_response response; 747 748 response.packet = *packet; 749 *wire = cpu_to_be32(response.intval); 750 } 751 752 static void dlm_query_join_wire_to_packet(u32 wire, 753 struct dlm_query_join_packet *packet) 754 { 755 union dlm_query_join_response response; 756 757 response.intval = cpu_to_be32(wire); 758 *packet = response.packet; 759 } 760 761 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 762 void **ret_data) 763 { 764 struct dlm_query_join_request *query; 765 struct dlm_query_join_packet packet = { 766 .code = JOIN_DISALLOW, 767 }; 768 struct dlm_ctxt *dlm = NULL; 769 u32 response; 770 u8 nodenum; 771 772 query = (struct dlm_query_join_request *) msg->buf; 773 774 mlog(0, "node %u wants to join domain %s\n", query->node_idx, 775 query->domain); 776 777 /* 778 * If heartbeat doesn't consider the node live, tell it 779 * to back off and try again. This gives heartbeat a chance 780 * to catch up. 781 */ 782 if (!o2hb_check_node_heartbeating(query->node_idx)) { 783 mlog(0, "node %u is not in our live map yet\n", 784 query->node_idx); 785 786 packet.code = JOIN_DISALLOW; 787 goto respond; 788 } 789 790 packet.code = JOIN_OK_NO_MAP; 791 792 spin_lock(&dlm_domain_lock); 793 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 794 if (!dlm) 795 goto unlock_respond; 796 797 /* 798 * There is a small window where the joining node may not see the 799 * node(s) that just left but still part of the cluster. DISALLOW 800 * join request if joining node has different node map. 801 */ 802 nodenum=0; 803 while (nodenum < O2NM_MAX_NODES) { 804 if (test_bit(nodenum, dlm->domain_map)) { 805 if (!byte_test_bit(nodenum, query->node_map)) { 806 mlog(0, "disallow join as node %u does not " 807 "have node %u in its nodemap\n", 808 query->node_idx, nodenum); 809 packet.code = JOIN_DISALLOW; 810 goto unlock_respond; 811 } 812 } 813 nodenum++; 814 } 815 816 /* Once the dlm ctxt is marked as leaving then we don't want 817 * to be put in someone's domain map. 818 * Also, explicitly disallow joining at certain troublesome 819 * times (ie. during recovery). */ 820 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { 821 int bit = query->node_idx; 822 spin_lock(&dlm->spinlock); 823 824 if (dlm->dlm_state == DLM_CTXT_NEW && 825 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) { 826 /*If this is a brand new context and we 827 * haven't started our join process yet, then 828 * the other node won the race. */ 829 packet.code = JOIN_OK_NO_MAP; 830 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 831 /* Disallow parallel joins. */ 832 packet.code = JOIN_DISALLOW; 833 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 834 mlog(0, "node %u trying to join, but recovery " 835 "is ongoing.\n", bit); 836 packet.code = JOIN_DISALLOW; 837 } else if (test_bit(bit, dlm->recovery_map)) { 838 mlog(0, "node %u trying to join, but it " 839 "still needs recovery.\n", bit); 840 packet.code = JOIN_DISALLOW; 841 } else if (test_bit(bit, dlm->domain_map)) { 842 mlog(0, "node %u trying to join, but it " 843 "is still in the domain! needs recovery?\n", 844 bit); 845 packet.code = JOIN_DISALLOW; 846 } else { 847 /* Alright we're fully a part of this domain 848 * so we keep some state as to who's joining 849 * and indicate to him that needs to be fixed 850 * up. */ 851 852 /* Make sure we speak compatible locking protocols. */ 853 if (dlm_query_join_proto_check("DLM", bit, 854 &dlm->dlm_locking_proto, 855 &query->dlm_proto)) { 856 packet.code = JOIN_PROTOCOL_MISMATCH; 857 } else if (dlm_query_join_proto_check("fs", bit, 858 &dlm->fs_locking_proto, 859 &query->fs_proto)) { 860 packet.code = JOIN_PROTOCOL_MISMATCH; 861 } else { 862 packet.dlm_minor = query->dlm_proto.pv_minor; 863 packet.fs_minor = query->fs_proto.pv_minor; 864 packet.code = JOIN_OK; 865 __dlm_set_joining_node(dlm, query->node_idx); 866 } 867 } 868 869 spin_unlock(&dlm->spinlock); 870 } 871 unlock_respond: 872 spin_unlock(&dlm_domain_lock); 873 874 respond: 875 mlog(0, "We respond with %u\n", packet.code); 876 877 dlm_query_join_packet_to_wire(&packet, &response); 878 return response; 879 } 880 881 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 882 void **ret_data) 883 { 884 struct dlm_assert_joined *assert; 885 struct dlm_ctxt *dlm = NULL; 886 887 assert = (struct dlm_assert_joined *) msg->buf; 888 889 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx, 890 assert->domain); 891 892 spin_lock(&dlm_domain_lock); 893 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len); 894 /* XXX should we consider no dlm ctxt an error? */ 895 if (dlm) { 896 spin_lock(&dlm->spinlock); 897 898 /* Alright, this node has officially joined our 899 * domain. Set him in the map and clean up our 900 * leftover join state. */ 901 BUG_ON(dlm->joining_node != assert->node_idx); 902 set_bit(assert->node_idx, dlm->domain_map); 903 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 904 905 printk(KERN_INFO "ocfs2_dlm: Node %u joins domain %s\n", 906 assert->node_idx, dlm->name); 907 __dlm_print_nodes(dlm); 908 909 /* notify anything attached to the heartbeat events */ 910 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1); 911 912 spin_unlock(&dlm->spinlock); 913 } 914 spin_unlock(&dlm_domain_lock); 915 916 return 0; 917 } 918 919 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 920 void **ret_data) 921 { 922 struct dlm_cancel_join *cancel; 923 struct dlm_ctxt *dlm = NULL; 924 925 cancel = (struct dlm_cancel_join *) msg->buf; 926 927 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx, 928 cancel->domain); 929 930 spin_lock(&dlm_domain_lock); 931 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len); 932 933 if (dlm) { 934 spin_lock(&dlm->spinlock); 935 936 /* Yikes, this guy wants to cancel his join. No 937 * problem, we simply cleanup our join state. */ 938 BUG_ON(dlm->joining_node != cancel->node_idx); 939 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 940 941 spin_unlock(&dlm->spinlock); 942 } 943 spin_unlock(&dlm_domain_lock); 944 945 return 0; 946 } 947 948 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm, 949 unsigned int node) 950 { 951 int status; 952 struct dlm_cancel_join cancel_msg; 953 954 memset(&cancel_msg, 0, sizeof(cancel_msg)); 955 cancel_msg.node_idx = dlm->node_num; 956 cancel_msg.name_len = strlen(dlm->name); 957 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len); 958 959 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 960 &cancel_msg, sizeof(cancel_msg), node, 961 NULL); 962 if (status < 0) { 963 mlog_errno(status); 964 goto bail; 965 } 966 967 bail: 968 return status; 969 } 970 971 /* map_size should be in bytes. */ 972 static int dlm_send_join_cancels(struct dlm_ctxt *dlm, 973 unsigned long *node_map, 974 unsigned int map_size) 975 { 976 int status, tmpstat; 977 unsigned int node; 978 979 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) * 980 sizeof(unsigned long))) { 981 mlog(ML_ERROR, 982 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n", 983 map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES)); 984 return -EINVAL; 985 } 986 987 status = 0; 988 node = -1; 989 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 990 node + 1)) < O2NM_MAX_NODES) { 991 if (node == dlm->node_num) 992 continue; 993 994 tmpstat = dlm_send_one_join_cancel(dlm, node); 995 if (tmpstat) { 996 mlog(ML_ERROR, "Error return %d cancelling join on " 997 "node %d\n", tmpstat, node); 998 if (!status) 999 status = tmpstat; 1000 } 1001 } 1002 1003 if (status) 1004 mlog_errno(status); 1005 return status; 1006 } 1007 1008 static int dlm_request_join(struct dlm_ctxt *dlm, 1009 int node, 1010 enum dlm_query_join_response_code *response) 1011 { 1012 int status; 1013 struct dlm_query_join_request join_msg; 1014 struct dlm_query_join_packet packet; 1015 u32 join_resp; 1016 1017 mlog(0, "querying node %d\n", node); 1018 1019 memset(&join_msg, 0, sizeof(join_msg)); 1020 join_msg.node_idx = dlm->node_num; 1021 join_msg.name_len = strlen(dlm->name); 1022 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 1023 join_msg.dlm_proto = dlm->dlm_locking_proto; 1024 join_msg.fs_proto = dlm->fs_locking_proto; 1025 1026 /* copy live node map to join message */ 1027 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); 1028 1029 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 1030 sizeof(join_msg), node, 1031 &join_resp); 1032 if (status < 0 && status != -ENOPROTOOPT) { 1033 mlog_errno(status); 1034 goto bail; 1035 } 1036 dlm_query_join_wire_to_packet(join_resp, &packet); 1037 1038 /* -ENOPROTOOPT from the net code means the other side isn't 1039 listening for our message type -- that's fine, it means 1040 his dlm isn't up, so we can consider him a 'yes' but not 1041 joined into the domain. */ 1042 if (status == -ENOPROTOOPT) { 1043 status = 0; 1044 *response = JOIN_OK_NO_MAP; 1045 } else if (packet.code == JOIN_DISALLOW || 1046 packet.code == JOIN_OK_NO_MAP) { 1047 *response = packet.code; 1048 } else if (packet.code == JOIN_PROTOCOL_MISMATCH) { 1049 mlog(ML_NOTICE, 1050 "This node requested DLM locking protocol %u.%u and " 1051 "filesystem locking protocol %u.%u. At least one of " 1052 "the protocol versions on node %d is not compatible, " 1053 "disconnecting\n", 1054 dlm->dlm_locking_proto.pv_major, 1055 dlm->dlm_locking_proto.pv_minor, 1056 dlm->fs_locking_proto.pv_major, 1057 dlm->fs_locking_proto.pv_minor, 1058 node); 1059 status = -EPROTO; 1060 *response = packet.code; 1061 } else if (packet.code == JOIN_OK) { 1062 *response = packet.code; 1063 /* Use the same locking protocol as the remote node */ 1064 dlm->dlm_locking_proto.pv_minor = packet.dlm_minor; 1065 dlm->fs_locking_proto.pv_minor = packet.fs_minor; 1066 mlog(0, 1067 "Node %d responds JOIN_OK with DLM locking protocol " 1068 "%u.%u and fs locking protocol %u.%u\n", 1069 node, 1070 dlm->dlm_locking_proto.pv_major, 1071 dlm->dlm_locking_proto.pv_minor, 1072 dlm->fs_locking_proto.pv_major, 1073 dlm->fs_locking_proto.pv_minor); 1074 } else { 1075 status = -EINVAL; 1076 mlog(ML_ERROR, "invalid response %d from node %u\n", 1077 packet.code, node); 1078 } 1079 1080 mlog(0, "status %d, node %d response is %d\n", status, node, 1081 *response); 1082 1083 bail: 1084 return status; 1085 } 1086 1087 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm, 1088 unsigned int node) 1089 { 1090 int status; 1091 struct dlm_assert_joined assert_msg; 1092 1093 mlog(0, "Sending join assert to node %u\n", node); 1094 1095 memset(&assert_msg, 0, sizeof(assert_msg)); 1096 assert_msg.node_idx = dlm->node_num; 1097 assert_msg.name_len = strlen(dlm->name); 1098 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len); 1099 1100 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1101 &assert_msg, sizeof(assert_msg), node, 1102 NULL); 1103 if (status < 0) 1104 mlog_errno(status); 1105 1106 return status; 1107 } 1108 1109 static void dlm_send_join_asserts(struct dlm_ctxt *dlm, 1110 unsigned long *node_map) 1111 { 1112 int status, node, live; 1113 1114 status = 0; 1115 node = -1; 1116 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 1117 node + 1)) < O2NM_MAX_NODES) { 1118 if (node == dlm->node_num) 1119 continue; 1120 1121 do { 1122 /* It is very important that this message be 1123 * received so we spin until either the node 1124 * has died or it gets the message. */ 1125 status = dlm_send_one_join_assert(dlm, node); 1126 1127 spin_lock(&dlm->spinlock); 1128 live = test_bit(node, dlm->live_nodes_map); 1129 spin_unlock(&dlm->spinlock); 1130 1131 if (status) { 1132 mlog(ML_ERROR, "Error return %d asserting " 1133 "join on node %d\n", status, node); 1134 1135 /* give us some time between errors... */ 1136 if (live) 1137 msleep(DLM_DOMAIN_BACKOFF_MS); 1138 } 1139 } while (status && live); 1140 } 1141 } 1142 1143 struct domain_join_ctxt { 1144 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1145 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1146 }; 1147 1148 static int dlm_should_restart_join(struct dlm_ctxt *dlm, 1149 struct domain_join_ctxt *ctxt, 1150 enum dlm_query_join_response_code response) 1151 { 1152 int ret; 1153 1154 if (response == JOIN_DISALLOW) { 1155 mlog(0, "Latest response of disallow -- should restart\n"); 1156 return 1; 1157 } 1158 1159 spin_lock(&dlm->spinlock); 1160 /* For now, we restart the process if the node maps have 1161 * changed at all */ 1162 ret = memcmp(ctxt->live_map, dlm->live_nodes_map, 1163 sizeof(dlm->live_nodes_map)); 1164 spin_unlock(&dlm->spinlock); 1165 1166 if (ret) 1167 mlog(0, "Node maps changed -- should restart\n"); 1168 1169 return ret; 1170 } 1171 1172 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) 1173 { 1174 int status = 0, tmpstat, node; 1175 struct domain_join_ctxt *ctxt; 1176 enum dlm_query_join_response_code response = JOIN_DISALLOW; 1177 1178 mlog_entry("%p", dlm); 1179 1180 ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL); 1181 if (!ctxt) { 1182 status = -ENOMEM; 1183 mlog_errno(status); 1184 goto bail; 1185 } 1186 1187 /* group sem locking should work for us here -- we're already 1188 * registered for heartbeat events so filling this should be 1189 * atomic wrt getting those handlers called. */ 1190 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map)); 1191 1192 spin_lock(&dlm->spinlock); 1193 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map)); 1194 1195 __dlm_set_joining_node(dlm, dlm->node_num); 1196 1197 spin_unlock(&dlm->spinlock); 1198 1199 node = -1; 1200 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES, 1201 node + 1)) < O2NM_MAX_NODES) { 1202 if (node == dlm->node_num) 1203 continue; 1204 1205 status = dlm_request_join(dlm, node, &response); 1206 if (status < 0) { 1207 mlog_errno(status); 1208 goto bail; 1209 } 1210 1211 /* Ok, either we got a response or the node doesn't have a 1212 * dlm up. */ 1213 if (response == JOIN_OK) 1214 set_bit(node, ctxt->yes_resp_map); 1215 1216 if (dlm_should_restart_join(dlm, ctxt, response)) { 1217 status = -EAGAIN; 1218 goto bail; 1219 } 1220 } 1221 1222 mlog(0, "Yay, done querying nodes!\n"); 1223 1224 /* Yay, everyone agree's we can join the domain. My domain is 1225 * comprised of all nodes who were put in the 1226 * yes_resp_map. Copy that into our domain map and send a join 1227 * assert message to clean up everyone elses state. */ 1228 spin_lock(&dlm->spinlock); 1229 memcpy(dlm->domain_map, ctxt->yes_resp_map, 1230 sizeof(ctxt->yes_resp_map)); 1231 set_bit(dlm->node_num, dlm->domain_map); 1232 spin_unlock(&dlm->spinlock); 1233 1234 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 1235 1236 /* Joined state *must* be set before the joining node 1237 * information, otherwise the query_join handler may read no 1238 * current joiner but a state of NEW and tell joining nodes 1239 * we're not in the domain. */ 1240 spin_lock(&dlm_domain_lock); 1241 dlm->dlm_state = DLM_CTXT_JOINED; 1242 dlm->num_joins++; 1243 spin_unlock(&dlm_domain_lock); 1244 1245 bail: 1246 spin_lock(&dlm->spinlock); 1247 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 1248 if (!status) 1249 __dlm_print_nodes(dlm); 1250 spin_unlock(&dlm->spinlock); 1251 1252 if (ctxt) { 1253 /* Do we need to send a cancel message to any nodes? */ 1254 if (status < 0) { 1255 tmpstat = dlm_send_join_cancels(dlm, 1256 ctxt->yes_resp_map, 1257 sizeof(ctxt->yes_resp_map)); 1258 if (tmpstat < 0) 1259 mlog_errno(tmpstat); 1260 } 1261 kfree(ctxt); 1262 } 1263 1264 mlog(0, "returning %d\n", status); 1265 return status; 1266 } 1267 1268 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm) 1269 { 1270 o2hb_unregister_callback(NULL, &dlm->dlm_hb_up); 1271 o2hb_unregister_callback(NULL, &dlm->dlm_hb_down); 1272 o2net_unregister_handler_list(&dlm->dlm_domain_handlers); 1273 } 1274 1275 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) 1276 { 1277 int status; 1278 1279 mlog(0, "registering handlers.\n"); 1280 1281 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB, 1282 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI); 1283 status = o2hb_register_callback(NULL, &dlm->dlm_hb_down); 1284 if (status) 1285 goto bail; 1286 1287 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB, 1288 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI); 1289 status = o2hb_register_callback(NULL, &dlm->dlm_hb_up); 1290 if (status) 1291 goto bail; 1292 1293 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1294 sizeof(struct dlm_master_request), 1295 dlm_master_request_handler, 1296 dlm, NULL, &dlm->dlm_domain_handlers); 1297 if (status) 1298 goto bail; 1299 1300 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1301 sizeof(struct dlm_assert_master), 1302 dlm_assert_master_handler, 1303 dlm, dlm_assert_master_post_handler, 1304 &dlm->dlm_domain_handlers); 1305 if (status) 1306 goto bail; 1307 1308 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1309 sizeof(struct dlm_create_lock), 1310 dlm_create_lock_handler, 1311 dlm, NULL, &dlm->dlm_domain_handlers); 1312 if (status) 1313 goto bail; 1314 1315 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1316 DLM_CONVERT_LOCK_MAX_LEN, 1317 dlm_convert_lock_handler, 1318 dlm, NULL, &dlm->dlm_domain_handlers); 1319 if (status) 1320 goto bail; 1321 1322 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1323 DLM_UNLOCK_LOCK_MAX_LEN, 1324 dlm_unlock_lock_handler, 1325 dlm, NULL, &dlm->dlm_domain_handlers); 1326 if (status) 1327 goto bail; 1328 1329 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1330 DLM_PROXY_AST_MAX_LEN, 1331 dlm_proxy_ast_handler, 1332 dlm, NULL, &dlm->dlm_domain_handlers); 1333 if (status) 1334 goto bail; 1335 1336 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1337 sizeof(struct dlm_exit_domain), 1338 dlm_exit_domain_handler, 1339 dlm, NULL, &dlm->dlm_domain_handlers); 1340 if (status) 1341 goto bail; 1342 1343 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key, 1344 sizeof(struct dlm_deref_lockres), 1345 dlm_deref_lockres_handler, 1346 dlm, NULL, &dlm->dlm_domain_handlers); 1347 if (status) 1348 goto bail; 1349 1350 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1351 sizeof(struct dlm_migrate_request), 1352 dlm_migrate_request_handler, 1353 dlm, NULL, &dlm->dlm_domain_handlers); 1354 if (status) 1355 goto bail; 1356 1357 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1358 DLM_MIG_LOCKRES_MAX_LEN, 1359 dlm_mig_lockres_handler, 1360 dlm, NULL, &dlm->dlm_domain_handlers); 1361 if (status) 1362 goto bail; 1363 1364 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1365 sizeof(struct dlm_master_requery), 1366 dlm_master_requery_handler, 1367 dlm, NULL, &dlm->dlm_domain_handlers); 1368 if (status) 1369 goto bail; 1370 1371 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1372 sizeof(struct dlm_lock_request), 1373 dlm_request_all_locks_handler, 1374 dlm, NULL, &dlm->dlm_domain_handlers); 1375 if (status) 1376 goto bail; 1377 1378 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1379 sizeof(struct dlm_reco_data_done), 1380 dlm_reco_data_done_handler, 1381 dlm, NULL, &dlm->dlm_domain_handlers); 1382 if (status) 1383 goto bail; 1384 1385 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1386 sizeof(struct dlm_begin_reco), 1387 dlm_begin_reco_handler, 1388 dlm, NULL, &dlm->dlm_domain_handlers); 1389 if (status) 1390 goto bail; 1391 1392 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1393 sizeof(struct dlm_finalize_reco), 1394 dlm_finalize_reco_handler, 1395 dlm, NULL, &dlm->dlm_domain_handlers); 1396 if (status) 1397 goto bail; 1398 1399 bail: 1400 if (status) 1401 dlm_unregister_domain_handlers(dlm); 1402 1403 return status; 1404 } 1405 1406 static int dlm_join_domain(struct dlm_ctxt *dlm) 1407 { 1408 int status; 1409 unsigned int backoff; 1410 unsigned int total_backoff = 0; 1411 1412 BUG_ON(!dlm); 1413 1414 mlog(0, "Join domain %s\n", dlm->name); 1415 1416 status = dlm_register_domain_handlers(dlm); 1417 if (status) { 1418 mlog_errno(status); 1419 goto bail; 1420 } 1421 1422 status = dlm_debug_init(dlm); 1423 if (status < 0) { 1424 mlog_errno(status); 1425 goto bail; 1426 } 1427 1428 status = dlm_launch_thread(dlm); 1429 if (status < 0) { 1430 mlog_errno(status); 1431 goto bail; 1432 } 1433 1434 status = dlm_launch_recovery_thread(dlm); 1435 if (status < 0) { 1436 mlog_errno(status); 1437 goto bail; 1438 } 1439 1440 dlm->dlm_worker = create_singlethread_workqueue("dlm_wq"); 1441 if (!dlm->dlm_worker) { 1442 status = -ENOMEM; 1443 mlog_errno(status); 1444 goto bail; 1445 } 1446 1447 do { 1448 status = dlm_try_to_join_domain(dlm); 1449 1450 /* If we're racing another node to the join, then we 1451 * need to back off temporarily and let them 1452 * complete. */ 1453 #define DLM_JOIN_TIMEOUT_MSECS 90000 1454 if (status == -EAGAIN) { 1455 if (signal_pending(current)) { 1456 status = -ERESTARTSYS; 1457 goto bail; 1458 } 1459 1460 if (total_backoff > 1461 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) { 1462 status = -ERESTARTSYS; 1463 mlog(ML_NOTICE, "Timed out joining dlm domain " 1464 "%s after %u msecs\n", dlm->name, 1465 jiffies_to_msecs(total_backoff)); 1466 goto bail; 1467 } 1468 1469 /* 1470 * <chip> After you! 1471 * <dale> No, after you! 1472 * <chip> I insist! 1473 * <dale> But you first! 1474 * ... 1475 */ 1476 backoff = (unsigned int)(jiffies & 0x3); 1477 backoff *= DLM_DOMAIN_BACKOFF_MS; 1478 total_backoff += backoff; 1479 mlog(0, "backoff %d\n", backoff); 1480 msleep(backoff); 1481 } 1482 } while (status == -EAGAIN); 1483 1484 if (status < 0) { 1485 mlog_errno(status); 1486 goto bail; 1487 } 1488 1489 status = 0; 1490 bail: 1491 wake_up(&dlm_domain_events); 1492 1493 if (status) { 1494 dlm_unregister_domain_handlers(dlm); 1495 dlm_debug_shutdown(dlm); 1496 dlm_complete_thread(dlm); 1497 dlm_complete_recovery_thread(dlm); 1498 dlm_destroy_dlm_worker(dlm); 1499 } 1500 1501 return status; 1502 } 1503 1504 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, 1505 u32 key) 1506 { 1507 int i; 1508 int ret; 1509 struct dlm_ctxt *dlm = NULL; 1510 1511 dlm = kzalloc(sizeof(*dlm), GFP_KERNEL); 1512 if (!dlm) { 1513 mlog_errno(-ENOMEM); 1514 goto leave; 1515 } 1516 1517 dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL); 1518 if (dlm->name == NULL) { 1519 mlog_errno(-ENOMEM); 1520 kfree(dlm); 1521 dlm = NULL; 1522 goto leave; 1523 } 1524 1525 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES); 1526 if (!dlm->lockres_hash) { 1527 mlog_errno(-ENOMEM); 1528 kfree(dlm->name); 1529 kfree(dlm); 1530 dlm = NULL; 1531 goto leave; 1532 } 1533 1534 for (i = 0; i < DLM_HASH_BUCKETS; i++) 1535 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); 1536 1537 strcpy(dlm->name, domain); 1538 dlm->key = key; 1539 dlm->node_num = o2nm_this_node(); 1540 1541 ret = dlm_create_debugfs_subroot(dlm); 1542 if (ret < 0) { 1543 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 1544 kfree(dlm->name); 1545 kfree(dlm); 1546 dlm = NULL; 1547 goto leave; 1548 } 1549 1550 spin_lock_init(&dlm->spinlock); 1551 spin_lock_init(&dlm->master_lock); 1552 spin_lock_init(&dlm->ast_lock); 1553 INIT_LIST_HEAD(&dlm->list); 1554 INIT_LIST_HEAD(&dlm->dirty_list); 1555 INIT_LIST_HEAD(&dlm->reco.resources); 1556 INIT_LIST_HEAD(&dlm->reco.received); 1557 INIT_LIST_HEAD(&dlm->reco.node_data); 1558 INIT_LIST_HEAD(&dlm->purge_list); 1559 INIT_LIST_HEAD(&dlm->dlm_domain_handlers); 1560 INIT_LIST_HEAD(&dlm->tracking_list); 1561 dlm->reco.state = 0; 1562 1563 INIT_LIST_HEAD(&dlm->pending_asts); 1564 INIT_LIST_HEAD(&dlm->pending_basts); 1565 1566 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n", 1567 dlm->recovery_map, &(dlm->recovery_map[0])); 1568 1569 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map)); 1570 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map)); 1571 memset(dlm->domain_map, 0, sizeof(dlm->domain_map)); 1572 1573 dlm->dlm_thread_task = NULL; 1574 dlm->dlm_reco_thread_task = NULL; 1575 dlm->dlm_worker = NULL; 1576 init_waitqueue_head(&dlm->dlm_thread_wq); 1577 init_waitqueue_head(&dlm->dlm_reco_thread_wq); 1578 init_waitqueue_head(&dlm->reco.event); 1579 init_waitqueue_head(&dlm->ast_wq); 1580 init_waitqueue_head(&dlm->migration_wq); 1581 INIT_LIST_HEAD(&dlm->master_list); 1582 INIT_LIST_HEAD(&dlm->mle_hb_events); 1583 1584 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN; 1585 init_waitqueue_head(&dlm->dlm_join_events); 1586 1587 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 1588 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 1589 atomic_set(&dlm->local_resources, 0); 1590 atomic_set(&dlm->remote_resources, 0); 1591 atomic_set(&dlm->unknown_resources, 0); 1592 1593 spin_lock_init(&dlm->work_lock); 1594 INIT_LIST_HEAD(&dlm->work_list); 1595 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work); 1596 1597 kref_init(&dlm->dlm_refs); 1598 dlm->dlm_state = DLM_CTXT_NEW; 1599 1600 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks); 1601 1602 mlog(0, "context init: refcount %u\n", 1603 atomic_read(&dlm->dlm_refs.refcount)); 1604 1605 leave: 1606 return dlm; 1607 } 1608 1609 /* 1610 * Compare a requested locking protocol version against the current one. 1611 * 1612 * If the major numbers are different, they are incompatible. 1613 * If the current minor is greater than the request, they are incompatible. 1614 * If the current minor is less than or equal to the request, they are 1615 * compatible, and the requester should run at the current minor version. 1616 */ 1617 static int dlm_protocol_compare(struct dlm_protocol_version *existing, 1618 struct dlm_protocol_version *request) 1619 { 1620 if (existing->pv_major != request->pv_major) 1621 return 1; 1622 1623 if (existing->pv_minor > request->pv_minor) 1624 return 1; 1625 1626 if (existing->pv_minor < request->pv_minor) 1627 request->pv_minor = existing->pv_minor; 1628 1629 return 0; 1630 } 1631 1632 /* 1633 * dlm_register_domain: one-time setup per "domain". 1634 * 1635 * The filesystem passes in the requested locking version via proto. 1636 * If registration was successful, proto will contain the negotiated 1637 * locking protocol. 1638 */ 1639 struct dlm_ctxt * dlm_register_domain(const char *domain, 1640 u32 key, 1641 struct dlm_protocol_version *fs_proto) 1642 { 1643 int ret; 1644 struct dlm_ctxt *dlm = NULL; 1645 struct dlm_ctxt *new_ctxt = NULL; 1646 1647 if (strlen(domain) > O2NM_MAX_NAME_LEN) { 1648 ret = -ENAMETOOLONG; 1649 mlog(ML_ERROR, "domain name length too long\n"); 1650 goto leave; 1651 } 1652 1653 if (!o2hb_check_local_node_heartbeating()) { 1654 mlog(ML_ERROR, "the local node has not been configured, or is " 1655 "not heartbeating\n"); 1656 ret = -EPROTO; 1657 goto leave; 1658 } 1659 1660 mlog(0, "register called for domain \"%s\"\n", domain); 1661 1662 retry: 1663 dlm = NULL; 1664 if (signal_pending(current)) { 1665 ret = -ERESTARTSYS; 1666 mlog_errno(ret); 1667 goto leave; 1668 } 1669 1670 spin_lock(&dlm_domain_lock); 1671 1672 dlm = __dlm_lookup_domain(domain); 1673 if (dlm) { 1674 if (dlm->dlm_state != DLM_CTXT_JOINED) { 1675 spin_unlock(&dlm_domain_lock); 1676 1677 mlog(0, "This ctxt is not joined yet!\n"); 1678 wait_event_interruptible(dlm_domain_events, 1679 dlm_wait_on_domain_helper( 1680 domain)); 1681 goto retry; 1682 } 1683 1684 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) { 1685 mlog(ML_ERROR, 1686 "Requested locking protocol version is not " 1687 "compatible with already registered domain " 1688 "\"%s\"\n", domain); 1689 ret = -EPROTO; 1690 goto leave; 1691 } 1692 1693 __dlm_get(dlm); 1694 dlm->num_joins++; 1695 1696 spin_unlock(&dlm_domain_lock); 1697 1698 ret = 0; 1699 goto leave; 1700 } 1701 1702 /* doesn't exist */ 1703 if (!new_ctxt) { 1704 spin_unlock(&dlm_domain_lock); 1705 1706 new_ctxt = dlm_alloc_ctxt(domain, key); 1707 if (new_ctxt) 1708 goto retry; 1709 1710 ret = -ENOMEM; 1711 mlog_errno(ret); 1712 goto leave; 1713 } 1714 1715 /* a little variable switch-a-roo here... */ 1716 dlm = new_ctxt; 1717 new_ctxt = NULL; 1718 1719 /* add the new domain */ 1720 list_add_tail(&dlm->list, &dlm_domains); 1721 spin_unlock(&dlm_domain_lock); 1722 1723 /* 1724 * Pass the locking protocol version into the join. If the join 1725 * succeeds, it will have the negotiated protocol set. 1726 */ 1727 dlm->dlm_locking_proto = dlm_protocol; 1728 dlm->fs_locking_proto = *fs_proto; 1729 1730 ret = dlm_join_domain(dlm); 1731 if (ret) { 1732 mlog_errno(ret); 1733 dlm_put(dlm); 1734 goto leave; 1735 } 1736 1737 /* Tell the caller what locking protocol we negotiated */ 1738 *fs_proto = dlm->fs_locking_proto; 1739 1740 ret = 0; 1741 leave: 1742 if (new_ctxt) 1743 dlm_free_ctxt_mem(new_ctxt); 1744 1745 if (ret < 0) 1746 dlm = ERR_PTR(ret); 1747 1748 return dlm; 1749 } 1750 EXPORT_SYMBOL_GPL(dlm_register_domain); 1751 1752 static LIST_HEAD(dlm_join_handlers); 1753 1754 static void dlm_unregister_net_handlers(void) 1755 { 1756 o2net_unregister_handler_list(&dlm_join_handlers); 1757 } 1758 1759 static int dlm_register_net_handlers(void) 1760 { 1761 int status = 0; 1762 1763 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1764 sizeof(struct dlm_query_join_request), 1765 dlm_query_join_handler, 1766 NULL, NULL, &dlm_join_handlers); 1767 if (status) 1768 goto bail; 1769 1770 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1771 sizeof(struct dlm_assert_joined), 1772 dlm_assert_joined_handler, 1773 NULL, NULL, &dlm_join_handlers); 1774 if (status) 1775 goto bail; 1776 1777 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1778 sizeof(struct dlm_cancel_join), 1779 dlm_cancel_join_handler, 1780 NULL, NULL, &dlm_join_handlers); 1781 1782 bail: 1783 if (status < 0) 1784 dlm_unregister_net_handlers(); 1785 1786 return status; 1787 } 1788 1789 /* Domain eviction callback handling. 1790 * 1791 * The file system requires notification of node death *before* the 1792 * dlm completes it's recovery work, otherwise it may be able to 1793 * acquire locks on resources requiring recovery. Since the dlm can 1794 * evict a node from it's domain *before* heartbeat fires, a similar 1795 * mechanism is required. */ 1796 1797 /* Eviction is not expected to happen often, so a per-domain lock is 1798 * not necessary. Eviction callbacks are allowed to sleep for short 1799 * periods of time. */ 1800 static DECLARE_RWSEM(dlm_callback_sem); 1801 1802 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm, 1803 int node_num) 1804 { 1805 struct list_head *iter; 1806 struct dlm_eviction_cb *cb; 1807 1808 down_read(&dlm_callback_sem); 1809 list_for_each(iter, &dlm->dlm_eviction_callbacks) { 1810 cb = list_entry(iter, struct dlm_eviction_cb, ec_item); 1811 1812 cb->ec_func(node_num, cb->ec_data); 1813 } 1814 up_read(&dlm_callback_sem); 1815 } 1816 1817 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb, 1818 dlm_eviction_func *f, 1819 void *data) 1820 { 1821 INIT_LIST_HEAD(&cb->ec_item); 1822 cb->ec_func = f; 1823 cb->ec_data = data; 1824 } 1825 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb); 1826 1827 void dlm_register_eviction_cb(struct dlm_ctxt *dlm, 1828 struct dlm_eviction_cb *cb) 1829 { 1830 down_write(&dlm_callback_sem); 1831 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks); 1832 up_write(&dlm_callback_sem); 1833 } 1834 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb); 1835 1836 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb) 1837 { 1838 down_write(&dlm_callback_sem); 1839 list_del_init(&cb->ec_item); 1840 up_write(&dlm_callback_sem); 1841 } 1842 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb); 1843 1844 static int __init dlm_init(void) 1845 { 1846 int status; 1847 1848 dlm_print_version(); 1849 1850 status = dlm_init_mle_cache(); 1851 if (status) { 1852 mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n"); 1853 goto error; 1854 } 1855 1856 status = dlm_init_master_caches(); 1857 if (status) { 1858 mlog(ML_ERROR, "Could not create o2dlm_lockres and " 1859 "o2dlm_lockname slabcaches\n"); 1860 goto error; 1861 } 1862 1863 status = dlm_init_lock_cache(); 1864 if (status) { 1865 mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n"); 1866 goto error; 1867 } 1868 1869 status = dlm_register_net_handlers(); 1870 if (status) { 1871 mlog(ML_ERROR, "Unable to register network handlers\n"); 1872 goto error; 1873 } 1874 1875 status = dlm_create_debugfs_root(); 1876 if (status) 1877 goto error; 1878 1879 return 0; 1880 error: 1881 dlm_unregister_net_handlers(); 1882 dlm_destroy_lock_cache(); 1883 dlm_destroy_master_caches(); 1884 dlm_destroy_mle_cache(); 1885 return -1; 1886 } 1887 1888 static void __exit dlm_exit (void) 1889 { 1890 dlm_destroy_debugfs_root(); 1891 dlm_unregister_net_handlers(); 1892 dlm_destroy_lock_cache(); 1893 dlm_destroy_master_caches(); 1894 dlm_destroy_mle_cache(); 1895 } 1896 1897 MODULE_AUTHOR("Oracle"); 1898 MODULE_LICENSE("GPL"); 1899 1900 module_init(dlm_init); 1901 module_exit(dlm_exit); 1902