1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmdomain.c 5 * 6 * defines domain join / leave apis 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 #include <linux/module.h> 28 #include <linux/types.h> 29 #include <linux/slab.h> 30 #include <linux/highmem.h> 31 #include <linux/init.h> 32 #include <linux/spinlock.h> 33 #include <linux/delay.h> 34 #include <linux/err.h> 35 #include <linux/debugfs.h> 36 37 #include "cluster/heartbeat.h" 38 #include "cluster/nodemanager.h" 39 #include "cluster/tcp.h" 40 41 #include "dlmapi.h" 42 #include "dlmcommon.h" 43 #include "dlmdomain.h" 44 #include "dlmdebug.h" 45 46 #include "dlmver.h" 47 48 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 49 #include "cluster/masklog.h" 50 51 /* 52 * ocfs2 node maps are array of long int, which limits to send them freely 53 * across the wire due to endianness issues. To workaround this, we convert 54 * long ints to byte arrays. Following 3 routines are helper functions to 55 * set/test/copy bits within those array of bytes 56 */ 57 static inline void byte_set_bit(u8 nr, u8 map[]) 58 { 59 map[nr >> 3] |= (1UL << (nr & 7)); 60 } 61 62 static inline int byte_test_bit(u8 nr, u8 map[]) 63 { 64 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0; 65 } 66 67 static inline void byte_copymap(u8 dmap[], unsigned long smap[], 68 unsigned int sz) 69 { 70 unsigned int nn; 71 72 if (!sz) 73 return; 74 75 memset(dmap, 0, ((sz + 7) >> 3)); 76 for (nn = 0 ; nn < sz; nn++) 77 if (test_bit(nn, smap)) 78 byte_set_bit(nn, dmap); 79 } 80 81 static void dlm_free_pagevec(void **vec, int pages) 82 { 83 while (pages--) 84 free_page((unsigned long)vec[pages]); 85 kfree(vec); 86 } 87 88 static void **dlm_alloc_pagevec(int pages) 89 { 90 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL); 91 int i; 92 93 if (!vec) 94 return NULL; 95 96 for (i = 0; i < pages; i++) 97 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) 98 goto out_free; 99 100 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n", 101 pages, (unsigned long)DLM_HASH_PAGES, 102 (unsigned long)DLM_BUCKETS_PER_PAGE); 103 return vec; 104 out_free: 105 dlm_free_pagevec(vec, i); 106 return NULL; 107 } 108 109 /* 110 * 111 * spinlock lock ordering: if multiple locks are needed, obey this ordering: 112 * dlm_domain_lock 113 * struct dlm_ctxt->spinlock 114 * struct dlm_lock_resource->spinlock 115 * struct dlm_ctxt->master_lock 116 * struct dlm_ctxt->ast_lock 117 * dlm_master_list_entry->spinlock 118 * dlm_lock->spinlock 119 * 120 */ 121 122 DEFINE_SPINLOCK(dlm_domain_lock); 123 LIST_HEAD(dlm_domains); 124 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 125 126 /* 127 * The supported protocol version for DLM communication. Running domains 128 * will have a negotiated version with the same major number and a minor 129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should 130 * be used to determine what a running domain is actually using. 131 * 132 * New in version 1.1: 133 * - Message DLM_QUERY_REGION added to support global heartbeat 134 */ 135 static const struct dlm_protocol_version dlm_protocol = { 136 .pv_major = 1, 137 .pv_minor = 0, 138 }; 139 140 #define DLM_DOMAIN_BACKOFF_MS 200 141 142 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 143 void **ret_data); 144 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 145 void **ret_data); 146 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 147 void **ret_data); 148 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len, 149 void *data, void **ret_data); 150 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 151 void **ret_data); 152 static int dlm_protocol_compare(struct dlm_protocol_version *existing, 153 struct dlm_protocol_version *request); 154 155 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 156 157 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) 158 { 159 if (!hlist_unhashed(&lockres->hash_node)) { 160 hlist_del_init(&lockres->hash_node); 161 dlm_lockres_put(lockres); 162 } 163 } 164 165 void __dlm_insert_lockres(struct dlm_ctxt *dlm, 166 struct dlm_lock_resource *res) 167 { 168 struct hlist_head *bucket; 169 struct qstr *q; 170 171 assert_spin_locked(&dlm->spinlock); 172 173 q = &res->lockname; 174 bucket = dlm_lockres_hash(dlm, q->hash); 175 176 /* get a reference for our hashtable */ 177 dlm_lockres_get(res); 178 179 hlist_add_head(&res->hash_node, bucket); 180 } 181 182 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, 183 const char *name, 184 unsigned int len, 185 unsigned int hash) 186 { 187 struct hlist_head *bucket; 188 struct hlist_node *list; 189 190 mlog_entry("%.*s\n", len, name); 191 192 assert_spin_locked(&dlm->spinlock); 193 194 bucket = dlm_lockres_hash(dlm, hash); 195 196 hlist_for_each(list, bucket) { 197 struct dlm_lock_resource *res = hlist_entry(list, 198 struct dlm_lock_resource, hash_node); 199 if (res->lockname.name[0] != name[0]) 200 continue; 201 if (unlikely(res->lockname.len != len)) 202 continue; 203 if (memcmp(res->lockname.name + 1, name + 1, len - 1)) 204 continue; 205 dlm_lockres_get(res); 206 return res; 207 } 208 return NULL; 209 } 210 211 /* intended to be called by functions which do not care about lock 212 * resources which are being purged (most net _handler functions). 213 * this will return NULL for any lock resource which is found but 214 * currently in the process of dropping its mastery reference. 215 * use __dlm_lookup_lockres_full when you need the lock resource 216 * regardless (e.g. dlm_get_lock_resource) */ 217 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 218 const char *name, 219 unsigned int len, 220 unsigned int hash) 221 { 222 struct dlm_lock_resource *res = NULL; 223 224 mlog_entry("%.*s\n", len, name); 225 226 assert_spin_locked(&dlm->spinlock); 227 228 res = __dlm_lookup_lockres_full(dlm, name, len, hash); 229 if (res) { 230 spin_lock(&res->spinlock); 231 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 232 spin_unlock(&res->spinlock); 233 dlm_lockres_put(res); 234 return NULL; 235 } 236 spin_unlock(&res->spinlock); 237 } 238 239 return res; 240 } 241 242 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 243 const char *name, 244 unsigned int len) 245 { 246 struct dlm_lock_resource *res; 247 unsigned int hash = dlm_lockid_hash(name, len); 248 249 spin_lock(&dlm->spinlock); 250 res = __dlm_lookup_lockres(dlm, name, len, hash); 251 spin_unlock(&dlm->spinlock); 252 return res; 253 } 254 255 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len) 256 { 257 struct dlm_ctxt *tmp = NULL; 258 struct list_head *iter; 259 260 assert_spin_locked(&dlm_domain_lock); 261 262 /* tmp->name here is always NULL terminated, 263 * but domain may not be! */ 264 list_for_each(iter, &dlm_domains) { 265 tmp = list_entry (iter, struct dlm_ctxt, list); 266 if (strlen(tmp->name) == len && 267 memcmp(tmp->name, domain, len)==0) 268 break; 269 tmp = NULL; 270 } 271 272 return tmp; 273 } 274 275 /* For null terminated domain strings ONLY */ 276 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain) 277 { 278 assert_spin_locked(&dlm_domain_lock); 279 280 return __dlm_lookup_domain_full(domain, strlen(domain)); 281 } 282 283 284 /* returns true on one of two conditions: 285 * 1) the domain does not exist 286 * 2) the domain exists and it's state is "joined" */ 287 static int dlm_wait_on_domain_helper(const char *domain) 288 { 289 int ret = 0; 290 struct dlm_ctxt *tmp = NULL; 291 292 spin_lock(&dlm_domain_lock); 293 294 tmp = __dlm_lookup_domain(domain); 295 if (!tmp) 296 ret = 1; 297 else if (tmp->dlm_state == DLM_CTXT_JOINED) 298 ret = 1; 299 300 spin_unlock(&dlm_domain_lock); 301 return ret; 302 } 303 304 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) 305 { 306 dlm_destroy_debugfs_subroot(dlm); 307 308 if (dlm->lockres_hash) 309 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 310 311 if (dlm->master_hash) 312 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); 313 314 if (dlm->name) 315 kfree(dlm->name); 316 317 kfree(dlm); 318 } 319 320 /* A little strange - this function will be called while holding 321 * dlm_domain_lock and is expected to be holding it on the way out. We 322 * will however drop and reacquire it multiple times */ 323 static void dlm_ctxt_release(struct kref *kref) 324 { 325 struct dlm_ctxt *dlm; 326 327 dlm = container_of(kref, struct dlm_ctxt, dlm_refs); 328 329 BUG_ON(dlm->num_joins); 330 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED); 331 332 /* we may still be in the list if we hit an error during join. */ 333 list_del_init(&dlm->list); 334 335 spin_unlock(&dlm_domain_lock); 336 337 mlog(0, "freeing memory from domain %s\n", dlm->name); 338 339 wake_up(&dlm_domain_events); 340 341 dlm_free_ctxt_mem(dlm); 342 343 spin_lock(&dlm_domain_lock); 344 } 345 346 void dlm_put(struct dlm_ctxt *dlm) 347 { 348 spin_lock(&dlm_domain_lock); 349 kref_put(&dlm->dlm_refs, dlm_ctxt_release); 350 spin_unlock(&dlm_domain_lock); 351 } 352 353 static void __dlm_get(struct dlm_ctxt *dlm) 354 { 355 kref_get(&dlm->dlm_refs); 356 } 357 358 /* given a questionable reference to a dlm object, gets a reference if 359 * it can find it in the list, otherwise returns NULL in which case 360 * you shouldn't trust your pointer. */ 361 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm) 362 { 363 struct list_head *iter; 364 struct dlm_ctxt *target = NULL; 365 366 spin_lock(&dlm_domain_lock); 367 368 list_for_each(iter, &dlm_domains) { 369 target = list_entry (iter, struct dlm_ctxt, list); 370 371 if (target == dlm) { 372 __dlm_get(target); 373 break; 374 } 375 376 target = NULL; 377 } 378 379 spin_unlock(&dlm_domain_lock); 380 381 return target; 382 } 383 384 int dlm_domain_fully_joined(struct dlm_ctxt *dlm) 385 { 386 int ret; 387 388 spin_lock(&dlm_domain_lock); 389 ret = (dlm->dlm_state == DLM_CTXT_JOINED) || 390 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN); 391 spin_unlock(&dlm_domain_lock); 392 393 return ret; 394 } 395 396 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm) 397 { 398 if (dlm->dlm_worker) { 399 flush_workqueue(dlm->dlm_worker); 400 destroy_workqueue(dlm->dlm_worker); 401 dlm->dlm_worker = NULL; 402 } 403 } 404 405 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) 406 { 407 dlm_unregister_domain_handlers(dlm); 408 dlm_debug_shutdown(dlm); 409 dlm_complete_thread(dlm); 410 dlm_complete_recovery_thread(dlm); 411 dlm_destroy_dlm_worker(dlm); 412 413 /* We've left the domain. Now we can take ourselves out of the 414 * list and allow the kref stuff to help us free the 415 * memory. */ 416 spin_lock(&dlm_domain_lock); 417 list_del_init(&dlm->list); 418 spin_unlock(&dlm_domain_lock); 419 420 /* Wake up anyone waiting for us to remove this domain */ 421 wake_up(&dlm_domain_events); 422 } 423 424 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm) 425 { 426 int i, num, n, ret = 0; 427 struct dlm_lock_resource *res; 428 struct hlist_node *iter; 429 struct hlist_head *bucket; 430 int dropped; 431 432 mlog(0, "Migrating locks from domain %s\n", dlm->name); 433 434 num = 0; 435 spin_lock(&dlm->spinlock); 436 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 437 redo_bucket: 438 n = 0; 439 bucket = dlm_lockres_hash(dlm, i); 440 iter = bucket->first; 441 while (iter) { 442 n++; 443 res = hlist_entry(iter, struct dlm_lock_resource, 444 hash_node); 445 dlm_lockres_get(res); 446 /* migrate, if necessary. this will drop the dlm 447 * spinlock and retake it if it does migration. */ 448 dropped = dlm_empty_lockres(dlm, res); 449 450 spin_lock(&res->spinlock); 451 __dlm_lockres_calc_usage(dlm, res); 452 iter = res->hash_node.next; 453 spin_unlock(&res->spinlock); 454 455 dlm_lockres_put(res); 456 457 if (dropped) 458 goto redo_bucket; 459 } 460 cond_resched_lock(&dlm->spinlock); 461 num += n; 462 mlog(0, "%s: touched %d lockreses in bucket %d " 463 "(tot=%d)\n", dlm->name, n, i, num); 464 } 465 spin_unlock(&dlm->spinlock); 466 wake_up(&dlm->dlm_thread_wq); 467 468 /* let the dlm thread take care of purging, keep scanning until 469 * nothing remains in the hash */ 470 if (num) { 471 mlog(0, "%s: %d lock resources in hash last pass\n", 472 dlm->name, num); 473 ret = -EAGAIN; 474 } 475 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 476 return ret; 477 } 478 479 static int dlm_no_joining_node(struct dlm_ctxt *dlm) 480 { 481 int ret; 482 483 spin_lock(&dlm->spinlock); 484 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN; 485 spin_unlock(&dlm->spinlock); 486 487 return ret; 488 } 489 490 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm) 491 { 492 /* Yikes, a double spinlock! I need domain_lock for the dlm 493 * state and the dlm spinlock for join state... Sorry! */ 494 again: 495 spin_lock(&dlm_domain_lock); 496 spin_lock(&dlm->spinlock); 497 498 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 499 mlog(0, "Node %d is joining, we wait on it.\n", 500 dlm->joining_node); 501 spin_unlock(&dlm->spinlock); 502 spin_unlock(&dlm_domain_lock); 503 504 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm)); 505 goto again; 506 } 507 508 dlm->dlm_state = DLM_CTXT_LEAVING; 509 spin_unlock(&dlm->spinlock); 510 spin_unlock(&dlm_domain_lock); 511 } 512 513 static void __dlm_print_nodes(struct dlm_ctxt *dlm) 514 { 515 int node = -1; 516 517 assert_spin_locked(&dlm->spinlock); 518 519 printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name); 520 521 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 522 node + 1)) < O2NM_MAX_NODES) { 523 printk("%d ", node); 524 } 525 printk("\n"); 526 } 527 528 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 529 void **ret_data) 530 { 531 struct dlm_ctxt *dlm = data; 532 unsigned int node; 533 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; 534 535 mlog_entry("%p %u %p", msg, len, data); 536 537 if (!dlm_grab(dlm)) 538 return 0; 539 540 node = exit_msg->node_idx; 541 542 printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name); 543 544 spin_lock(&dlm->spinlock); 545 clear_bit(node, dlm->domain_map); 546 __dlm_print_nodes(dlm); 547 548 /* notify anything attached to the heartbeat events */ 549 dlm_hb_event_notify_attached(dlm, node, 0); 550 551 spin_unlock(&dlm->spinlock); 552 553 dlm_put(dlm); 554 555 return 0; 556 } 557 558 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, 559 unsigned int node) 560 { 561 int status; 562 struct dlm_exit_domain leave_msg; 563 564 mlog(0, "Asking node %u if we can leave the domain %s me = %u\n", 565 node, dlm->name, dlm->node_num); 566 567 memset(&leave_msg, 0, sizeof(leave_msg)); 568 leave_msg.node_idx = dlm->node_num; 569 570 status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key, 571 &leave_msg, sizeof(leave_msg), node, 572 NULL); 573 if (status < 0) 574 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 575 "node %u\n", status, DLM_EXIT_DOMAIN_MSG, dlm->key, node); 576 mlog(0, "status return %d from o2net_send_message\n", status); 577 578 return status; 579 } 580 581 582 static void dlm_leave_domain(struct dlm_ctxt *dlm) 583 { 584 int node, clear_node, status; 585 586 /* At this point we've migrated away all our locks and won't 587 * accept mastership of new ones. The dlm is responsible for 588 * almost nothing now. We make sure not to confuse any joining 589 * nodes and then commence shutdown procedure. */ 590 591 spin_lock(&dlm->spinlock); 592 /* Clear ourselves from the domain map */ 593 clear_bit(dlm->node_num, dlm->domain_map); 594 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 595 0)) < O2NM_MAX_NODES) { 596 /* Drop the dlm spinlock. This is safe wrt the domain_map. 597 * -nodes cannot be added now as the 598 * query_join_handlers knows to respond with OK_NO_MAP 599 * -we catch the right network errors if a node is 600 * removed from the map while we're sending him the 601 * exit message. */ 602 spin_unlock(&dlm->spinlock); 603 604 clear_node = 1; 605 606 status = dlm_send_one_domain_exit(dlm, node); 607 if (status < 0 && 608 status != -ENOPROTOOPT && 609 status != -ENOTCONN) { 610 mlog(ML_NOTICE, "Error %d sending domain exit message " 611 "to node %d\n", status, node); 612 613 /* Not sure what to do here but lets sleep for 614 * a bit in case this was a transient 615 * error... */ 616 msleep(DLM_DOMAIN_BACKOFF_MS); 617 clear_node = 0; 618 } 619 620 spin_lock(&dlm->spinlock); 621 /* If we're not clearing the node bit then we intend 622 * to loop back around to try again. */ 623 if (clear_node) 624 clear_bit(node, dlm->domain_map); 625 } 626 spin_unlock(&dlm->spinlock); 627 } 628 629 int dlm_joined(struct dlm_ctxt *dlm) 630 { 631 int ret = 0; 632 633 spin_lock(&dlm_domain_lock); 634 635 if (dlm->dlm_state == DLM_CTXT_JOINED) 636 ret = 1; 637 638 spin_unlock(&dlm_domain_lock); 639 640 return ret; 641 } 642 643 int dlm_shutting_down(struct dlm_ctxt *dlm) 644 { 645 int ret = 0; 646 647 spin_lock(&dlm_domain_lock); 648 649 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN) 650 ret = 1; 651 652 spin_unlock(&dlm_domain_lock); 653 654 return ret; 655 } 656 657 void dlm_unregister_domain(struct dlm_ctxt *dlm) 658 { 659 int leave = 0; 660 struct dlm_lock_resource *res; 661 662 spin_lock(&dlm_domain_lock); 663 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED); 664 BUG_ON(!dlm->num_joins); 665 666 dlm->num_joins--; 667 if (!dlm->num_joins) { 668 /* We mark it "in shutdown" now so new register 669 * requests wait until we've completely left the 670 * domain. Don't use DLM_CTXT_LEAVING yet as we still 671 * want new domain joins to communicate with us at 672 * least until we've completed migration of our 673 * resources. */ 674 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN; 675 leave = 1; 676 } 677 spin_unlock(&dlm_domain_lock); 678 679 if (leave) { 680 mlog(0, "shutting down domain %s\n", dlm->name); 681 682 /* We changed dlm state, notify the thread */ 683 dlm_kick_thread(dlm, NULL); 684 685 while (dlm_migrate_all_locks(dlm)) { 686 /* Give dlm_thread time to purge the lockres' */ 687 msleep(500); 688 mlog(0, "%s: more migration to do\n", dlm->name); 689 } 690 691 /* This list should be empty. If not, print remaining lockres */ 692 if (!list_empty(&dlm->tracking_list)) { 693 mlog(ML_ERROR, "Following lockres' are still on the " 694 "tracking list:\n"); 695 list_for_each_entry(res, &dlm->tracking_list, tracking) 696 dlm_print_one_lock_resource(res); 697 } 698 699 dlm_mark_domain_leaving(dlm); 700 dlm_leave_domain(dlm); 701 dlm_force_free_mles(dlm); 702 dlm_complete_dlm_shutdown(dlm); 703 } 704 dlm_put(dlm); 705 } 706 EXPORT_SYMBOL_GPL(dlm_unregister_domain); 707 708 static int dlm_query_join_proto_check(char *proto_type, int node, 709 struct dlm_protocol_version *ours, 710 struct dlm_protocol_version *request) 711 { 712 int rc; 713 struct dlm_protocol_version proto = *request; 714 715 if (!dlm_protocol_compare(ours, &proto)) { 716 mlog(0, 717 "node %u wanted to join with %s locking protocol " 718 "%u.%u, we respond with %u.%u\n", 719 node, proto_type, 720 request->pv_major, 721 request->pv_minor, 722 proto.pv_major, proto.pv_minor); 723 request->pv_minor = proto.pv_minor; 724 rc = 0; 725 } else { 726 mlog(ML_NOTICE, 727 "Node %u wanted to join with %s locking " 728 "protocol %u.%u, but we have %u.%u, disallowing\n", 729 node, proto_type, 730 request->pv_major, 731 request->pv_minor, 732 ours->pv_major, 733 ours->pv_minor); 734 rc = 1; 735 } 736 737 return rc; 738 } 739 740 /* 741 * struct dlm_query_join_packet is made up of four one-byte fields. They 742 * are effectively in big-endian order already. However, little-endian 743 * machines swap them before putting the packet on the wire (because 744 * query_join's response is a status, and that status is treated as a u32 745 * on the wire). Thus, a big-endian and little-endian machines will treat 746 * this structure differently. 747 * 748 * The solution is to have little-endian machines swap the structure when 749 * converting from the structure to the u32 representation. This will 750 * result in the structure having the correct format on the wire no matter 751 * the host endian format. 752 */ 753 static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet, 754 u32 *wire) 755 { 756 union dlm_query_join_response response; 757 758 response.packet = *packet; 759 *wire = cpu_to_be32(response.intval); 760 } 761 762 static void dlm_query_join_wire_to_packet(u32 wire, 763 struct dlm_query_join_packet *packet) 764 { 765 union dlm_query_join_response response; 766 767 response.intval = cpu_to_be32(wire); 768 *packet = response.packet; 769 } 770 771 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 772 void **ret_data) 773 { 774 struct dlm_query_join_request *query; 775 struct dlm_query_join_packet packet = { 776 .code = JOIN_DISALLOW, 777 }; 778 struct dlm_ctxt *dlm = NULL; 779 u32 response; 780 u8 nodenum; 781 782 query = (struct dlm_query_join_request *) msg->buf; 783 784 mlog(0, "node %u wants to join domain %s\n", query->node_idx, 785 query->domain); 786 787 /* 788 * If heartbeat doesn't consider the node live, tell it 789 * to back off and try again. This gives heartbeat a chance 790 * to catch up. 791 */ 792 if (!o2hb_check_node_heartbeating(query->node_idx)) { 793 mlog(0, "node %u is not in our live map yet\n", 794 query->node_idx); 795 796 packet.code = JOIN_DISALLOW; 797 goto respond; 798 } 799 800 packet.code = JOIN_OK_NO_MAP; 801 802 spin_lock(&dlm_domain_lock); 803 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 804 if (!dlm) 805 goto unlock_respond; 806 807 /* 808 * There is a small window where the joining node may not see the 809 * node(s) that just left but still part of the cluster. DISALLOW 810 * join request if joining node has different node map. 811 */ 812 nodenum=0; 813 while (nodenum < O2NM_MAX_NODES) { 814 if (test_bit(nodenum, dlm->domain_map)) { 815 if (!byte_test_bit(nodenum, query->node_map)) { 816 mlog(0, "disallow join as node %u does not " 817 "have node %u in its nodemap\n", 818 query->node_idx, nodenum); 819 packet.code = JOIN_DISALLOW; 820 goto unlock_respond; 821 } 822 } 823 nodenum++; 824 } 825 826 /* Once the dlm ctxt is marked as leaving then we don't want 827 * to be put in someone's domain map. 828 * Also, explicitly disallow joining at certain troublesome 829 * times (ie. during recovery). */ 830 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { 831 int bit = query->node_idx; 832 spin_lock(&dlm->spinlock); 833 834 if (dlm->dlm_state == DLM_CTXT_NEW && 835 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) { 836 /*If this is a brand new context and we 837 * haven't started our join process yet, then 838 * the other node won the race. */ 839 packet.code = JOIN_OK_NO_MAP; 840 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 841 /* Disallow parallel joins. */ 842 packet.code = JOIN_DISALLOW; 843 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 844 mlog(0, "node %u trying to join, but recovery " 845 "is ongoing.\n", bit); 846 packet.code = JOIN_DISALLOW; 847 } else if (test_bit(bit, dlm->recovery_map)) { 848 mlog(0, "node %u trying to join, but it " 849 "still needs recovery.\n", bit); 850 packet.code = JOIN_DISALLOW; 851 } else if (test_bit(bit, dlm->domain_map)) { 852 mlog(0, "node %u trying to join, but it " 853 "is still in the domain! needs recovery?\n", 854 bit); 855 packet.code = JOIN_DISALLOW; 856 } else { 857 /* Alright we're fully a part of this domain 858 * so we keep some state as to who's joining 859 * and indicate to him that needs to be fixed 860 * up. */ 861 862 /* Make sure we speak compatible locking protocols. */ 863 if (dlm_query_join_proto_check("DLM", bit, 864 &dlm->dlm_locking_proto, 865 &query->dlm_proto)) { 866 packet.code = JOIN_PROTOCOL_MISMATCH; 867 } else if (dlm_query_join_proto_check("fs", bit, 868 &dlm->fs_locking_proto, 869 &query->fs_proto)) { 870 packet.code = JOIN_PROTOCOL_MISMATCH; 871 } else { 872 packet.dlm_minor = query->dlm_proto.pv_minor; 873 packet.fs_minor = query->fs_proto.pv_minor; 874 packet.code = JOIN_OK; 875 __dlm_set_joining_node(dlm, query->node_idx); 876 } 877 } 878 879 spin_unlock(&dlm->spinlock); 880 } 881 unlock_respond: 882 spin_unlock(&dlm_domain_lock); 883 884 respond: 885 mlog(0, "We respond with %u\n", packet.code); 886 887 dlm_query_join_packet_to_wire(&packet, &response); 888 return response; 889 } 890 891 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 892 void **ret_data) 893 { 894 struct dlm_assert_joined *assert; 895 struct dlm_ctxt *dlm = NULL; 896 897 assert = (struct dlm_assert_joined *) msg->buf; 898 899 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx, 900 assert->domain); 901 902 spin_lock(&dlm_domain_lock); 903 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len); 904 /* XXX should we consider no dlm ctxt an error? */ 905 if (dlm) { 906 spin_lock(&dlm->spinlock); 907 908 /* Alright, this node has officially joined our 909 * domain. Set him in the map and clean up our 910 * leftover join state. */ 911 BUG_ON(dlm->joining_node != assert->node_idx); 912 set_bit(assert->node_idx, dlm->domain_map); 913 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 914 915 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n", 916 assert->node_idx, dlm->name); 917 __dlm_print_nodes(dlm); 918 919 /* notify anything attached to the heartbeat events */ 920 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1); 921 922 spin_unlock(&dlm->spinlock); 923 } 924 spin_unlock(&dlm_domain_lock); 925 926 return 0; 927 } 928 929 static int dlm_match_regions(struct dlm_ctxt *dlm, 930 struct dlm_query_region *qr) 931 { 932 char *local = NULL, *remote = qr->qr_regions; 933 char *l, *r; 934 int localnr, i, j, foundit; 935 int status = 0; 936 937 if (!o2hb_global_heartbeat_active()) { 938 if (qr->qr_numregions) { 939 mlog(ML_ERROR, "Domain %s: Joining node %d has global " 940 "heartbeat enabled but local node %d does not\n", 941 qr->qr_domain, qr->qr_node, dlm->node_num); 942 status = -EINVAL; 943 } 944 goto bail; 945 } 946 947 if (o2hb_global_heartbeat_active() && !qr->qr_numregions) { 948 mlog(ML_ERROR, "Domain %s: Local node %d has global " 949 "heartbeat enabled but joining node %d does not\n", 950 qr->qr_domain, dlm->node_num, qr->qr_node); 951 status = -EINVAL; 952 goto bail; 953 } 954 955 r = remote; 956 for (i = 0; i < qr->qr_numregions; ++i) { 957 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r); 958 r += O2HB_MAX_REGION_NAME_LEN; 959 } 960 961 local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL); 962 if (!local) { 963 status = -ENOMEM; 964 goto bail; 965 } 966 967 localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS); 968 969 /* compare local regions with remote */ 970 l = local; 971 for (i = 0; i < localnr; ++i) { 972 foundit = 0; 973 r = remote; 974 for (j = 0; j <= qr->qr_numregions; ++j) { 975 if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) { 976 foundit = 1; 977 break; 978 } 979 r += O2HB_MAX_REGION_NAME_LEN; 980 } 981 if (!foundit) { 982 status = -EINVAL; 983 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered " 984 "in local node %d but not in joining node %d\n", 985 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l, 986 dlm->node_num, qr->qr_node); 987 goto bail; 988 } 989 l += O2HB_MAX_REGION_NAME_LEN; 990 } 991 992 /* compare remote with local regions */ 993 r = remote; 994 for (i = 0; i < qr->qr_numregions; ++i) { 995 foundit = 0; 996 l = local; 997 for (j = 0; j < localnr; ++j) { 998 if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) { 999 foundit = 1; 1000 break; 1001 } 1002 l += O2HB_MAX_REGION_NAME_LEN; 1003 } 1004 if (!foundit) { 1005 status = -EINVAL; 1006 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered " 1007 "in joining node %d but not in local node %d\n", 1008 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r, 1009 qr->qr_node, dlm->node_num); 1010 goto bail; 1011 } 1012 r += O2HB_MAX_REGION_NAME_LEN; 1013 } 1014 1015 bail: 1016 kfree(local); 1017 1018 return status; 1019 } 1020 1021 static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map) 1022 { 1023 struct dlm_query_region *qr = NULL; 1024 int status, ret = 0, i; 1025 char *p; 1026 1027 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES) 1028 goto bail; 1029 1030 qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL); 1031 if (!qr) { 1032 ret = -ENOMEM; 1033 mlog_errno(ret); 1034 goto bail; 1035 } 1036 1037 qr->qr_node = dlm->node_num; 1038 qr->qr_namelen = strlen(dlm->name); 1039 memcpy(qr->qr_domain, dlm->name, qr->qr_namelen); 1040 /* if local hb, the numregions will be zero */ 1041 if (o2hb_global_heartbeat_active()) 1042 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions, 1043 O2NM_MAX_REGIONS); 1044 1045 p = qr->qr_regions; 1046 for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN) 1047 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p); 1048 1049 i = -1; 1050 while ((i = find_next_bit(node_map, O2NM_MAX_NODES, 1051 i + 1)) < O2NM_MAX_NODES) { 1052 if (i == dlm->node_num) 1053 continue; 1054 1055 mlog(0, "Sending regions to node %d\n", i); 1056 1057 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr, 1058 sizeof(struct dlm_query_region), 1059 i, &status); 1060 if (ret >= 0) 1061 ret = status; 1062 if (ret) { 1063 mlog(ML_ERROR, "Region mismatch %d, node %d\n", 1064 ret, i); 1065 break; 1066 } 1067 } 1068 1069 bail: 1070 kfree(qr); 1071 return ret; 1072 } 1073 1074 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len, 1075 void *data, void **ret_data) 1076 { 1077 struct dlm_query_region *qr; 1078 struct dlm_ctxt *dlm = NULL; 1079 int status = 0; 1080 int locked = 0; 1081 1082 qr = (struct dlm_query_region *) msg->buf; 1083 1084 mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node, 1085 qr->qr_domain); 1086 1087 status = -EINVAL; 1088 1089 spin_lock(&dlm_domain_lock); 1090 dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen); 1091 if (!dlm) { 1092 mlog(ML_ERROR, "Node %d queried hb regions on domain %s " 1093 "before join domain\n", qr->qr_node, qr->qr_domain); 1094 goto bail; 1095 } 1096 1097 spin_lock(&dlm->spinlock); 1098 locked = 1; 1099 if (dlm->joining_node != qr->qr_node) { 1100 mlog(ML_ERROR, "Node %d queried hb regions on domain %s " 1101 "but joining node is %d\n", qr->qr_node, qr->qr_domain, 1102 dlm->joining_node); 1103 goto bail; 1104 } 1105 1106 /* Support for global heartbeat was added in 1.1 */ 1107 if (dlm->dlm_locking_proto.pv_major == 1 && 1108 dlm->dlm_locking_proto.pv_minor == 0) { 1109 mlog(ML_ERROR, "Node %d queried hb regions on domain %s " 1110 "but active dlm protocol is %d.%d\n", qr->qr_node, 1111 qr->qr_domain, dlm->dlm_locking_proto.pv_major, 1112 dlm->dlm_locking_proto.pv_minor); 1113 goto bail; 1114 } 1115 1116 status = dlm_match_regions(dlm, qr); 1117 1118 bail: 1119 if (locked) 1120 spin_unlock(&dlm->spinlock); 1121 spin_unlock(&dlm_domain_lock); 1122 1123 return status; 1124 } 1125 1126 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 1127 void **ret_data) 1128 { 1129 struct dlm_cancel_join *cancel; 1130 struct dlm_ctxt *dlm = NULL; 1131 1132 cancel = (struct dlm_cancel_join *) msg->buf; 1133 1134 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx, 1135 cancel->domain); 1136 1137 spin_lock(&dlm_domain_lock); 1138 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len); 1139 1140 if (dlm) { 1141 spin_lock(&dlm->spinlock); 1142 1143 /* Yikes, this guy wants to cancel his join. No 1144 * problem, we simply cleanup our join state. */ 1145 BUG_ON(dlm->joining_node != cancel->node_idx); 1146 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 1147 1148 spin_unlock(&dlm->spinlock); 1149 } 1150 spin_unlock(&dlm_domain_lock); 1151 1152 return 0; 1153 } 1154 1155 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm, 1156 unsigned int node) 1157 { 1158 int status; 1159 struct dlm_cancel_join cancel_msg; 1160 1161 memset(&cancel_msg, 0, sizeof(cancel_msg)); 1162 cancel_msg.node_idx = dlm->node_num; 1163 cancel_msg.name_len = strlen(dlm->name); 1164 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len); 1165 1166 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1167 &cancel_msg, sizeof(cancel_msg), node, 1168 NULL); 1169 if (status < 0) { 1170 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1171 "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1172 node); 1173 goto bail; 1174 } 1175 1176 bail: 1177 return status; 1178 } 1179 1180 /* map_size should be in bytes. */ 1181 static int dlm_send_join_cancels(struct dlm_ctxt *dlm, 1182 unsigned long *node_map, 1183 unsigned int map_size) 1184 { 1185 int status, tmpstat; 1186 unsigned int node; 1187 1188 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) * 1189 sizeof(unsigned long))) { 1190 mlog(ML_ERROR, 1191 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n", 1192 map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES)); 1193 return -EINVAL; 1194 } 1195 1196 status = 0; 1197 node = -1; 1198 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 1199 node + 1)) < O2NM_MAX_NODES) { 1200 if (node == dlm->node_num) 1201 continue; 1202 1203 tmpstat = dlm_send_one_join_cancel(dlm, node); 1204 if (tmpstat) { 1205 mlog(ML_ERROR, "Error return %d cancelling join on " 1206 "node %d\n", tmpstat, node); 1207 if (!status) 1208 status = tmpstat; 1209 } 1210 } 1211 1212 if (status) 1213 mlog_errno(status); 1214 return status; 1215 } 1216 1217 static int dlm_request_join(struct dlm_ctxt *dlm, 1218 int node, 1219 enum dlm_query_join_response_code *response) 1220 { 1221 int status; 1222 struct dlm_query_join_request join_msg; 1223 struct dlm_query_join_packet packet; 1224 u32 join_resp; 1225 1226 mlog(0, "querying node %d\n", node); 1227 1228 memset(&join_msg, 0, sizeof(join_msg)); 1229 join_msg.node_idx = dlm->node_num; 1230 join_msg.name_len = strlen(dlm->name); 1231 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 1232 join_msg.dlm_proto = dlm->dlm_locking_proto; 1233 join_msg.fs_proto = dlm->fs_locking_proto; 1234 1235 /* copy live node map to join message */ 1236 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); 1237 1238 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 1239 sizeof(join_msg), node, &join_resp); 1240 if (status < 0 && status != -ENOPROTOOPT) { 1241 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1242 "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1243 node); 1244 goto bail; 1245 } 1246 dlm_query_join_wire_to_packet(join_resp, &packet); 1247 1248 /* -ENOPROTOOPT from the net code means the other side isn't 1249 listening for our message type -- that's fine, it means 1250 his dlm isn't up, so we can consider him a 'yes' but not 1251 joined into the domain. */ 1252 if (status == -ENOPROTOOPT) { 1253 status = 0; 1254 *response = JOIN_OK_NO_MAP; 1255 } else if (packet.code == JOIN_DISALLOW || 1256 packet.code == JOIN_OK_NO_MAP) { 1257 *response = packet.code; 1258 } else if (packet.code == JOIN_PROTOCOL_MISMATCH) { 1259 mlog(ML_NOTICE, 1260 "This node requested DLM locking protocol %u.%u and " 1261 "filesystem locking protocol %u.%u. At least one of " 1262 "the protocol versions on node %d is not compatible, " 1263 "disconnecting\n", 1264 dlm->dlm_locking_proto.pv_major, 1265 dlm->dlm_locking_proto.pv_minor, 1266 dlm->fs_locking_proto.pv_major, 1267 dlm->fs_locking_proto.pv_minor, 1268 node); 1269 status = -EPROTO; 1270 *response = packet.code; 1271 } else if (packet.code == JOIN_OK) { 1272 *response = packet.code; 1273 /* Use the same locking protocol as the remote node */ 1274 dlm->dlm_locking_proto.pv_minor = packet.dlm_minor; 1275 dlm->fs_locking_proto.pv_minor = packet.fs_minor; 1276 mlog(0, 1277 "Node %d responds JOIN_OK with DLM locking protocol " 1278 "%u.%u and fs locking protocol %u.%u\n", 1279 node, 1280 dlm->dlm_locking_proto.pv_major, 1281 dlm->dlm_locking_proto.pv_minor, 1282 dlm->fs_locking_proto.pv_major, 1283 dlm->fs_locking_proto.pv_minor); 1284 } else { 1285 status = -EINVAL; 1286 mlog(ML_ERROR, "invalid response %d from node %u\n", 1287 packet.code, node); 1288 } 1289 1290 mlog(0, "status %d, node %d response is %d\n", status, node, 1291 *response); 1292 1293 bail: 1294 return status; 1295 } 1296 1297 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm, 1298 unsigned int node) 1299 { 1300 int status; 1301 struct dlm_assert_joined assert_msg; 1302 1303 mlog(0, "Sending join assert to node %u\n", node); 1304 1305 memset(&assert_msg, 0, sizeof(assert_msg)); 1306 assert_msg.node_idx = dlm->node_num; 1307 assert_msg.name_len = strlen(dlm->name); 1308 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len); 1309 1310 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1311 &assert_msg, sizeof(assert_msg), node, 1312 NULL); 1313 if (status < 0) 1314 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1315 "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1316 node); 1317 1318 return status; 1319 } 1320 1321 static void dlm_send_join_asserts(struct dlm_ctxt *dlm, 1322 unsigned long *node_map) 1323 { 1324 int status, node, live; 1325 1326 status = 0; 1327 node = -1; 1328 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 1329 node + 1)) < O2NM_MAX_NODES) { 1330 if (node == dlm->node_num) 1331 continue; 1332 1333 do { 1334 /* It is very important that this message be 1335 * received so we spin until either the node 1336 * has died or it gets the message. */ 1337 status = dlm_send_one_join_assert(dlm, node); 1338 1339 spin_lock(&dlm->spinlock); 1340 live = test_bit(node, dlm->live_nodes_map); 1341 spin_unlock(&dlm->spinlock); 1342 1343 if (status) { 1344 mlog(ML_ERROR, "Error return %d asserting " 1345 "join on node %d\n", status, node); 1346 1347 /* give us some time between errors... */ 1348 if (live) 1349 msleep(DLM_DOMAIN_BACKOFF_MS); 1350 } 1351 } while (status && live); 1352 } 1353 } 1354 1355 struct domain_join_ctxt { 1356 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1357 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1358 }; 1359 1360 static int dlm_should_restart_join(struct dlm_ctxt *dlm, 1361 struct domain_join_ctxt *ctxt, 1362 enum dlm_query_join_response_code response) 1363 { 1364 int ret; 1365 1366 if (response == JOIN_DISALLOW) { 1367 mlog(0, "Latest response of disallow -- should restart\n"); 1368 return 1; 1369 } 1370 1371 spin_lock(&dlm->spinlock); 1372 /* For now, we restart the process if the node maps have 1373 * changed at all */ 1374 ret = memcmp(ctxt->live_map, dlm->live_nodes_map, 1375 sizeof(dlm->live_nodes_map)); 1376 spin_unlock(&dlm->spinlock); 1377 1378 if (ret) 1379 mlog(0, "Node maps changed -- should restart\n"); 1380 1381 return ret; 1382 } 1383 1384 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) 1385 { 1386 int status = 0, tmpstat, node; 1387 struct domain_join_ctxt *ctxt; 1388 enum dlm_query_join_response_code response = JOIN_DISALLOW; 1389 1390 mlog_entry("%p", dlm); 1391 1392 ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL); 1393 if (!ctxt) { 1394 status = -ENOMEM; 1395 mlog_errno(status); 1396 goto bail; 1397 } 1398 1399 /* group sem locking should work for us here -- we're already 1400 * registered for heartbeat events so filling this should be 1401 * atomic wrt getting those handlers called. */ 1402 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map)); 1403 1404 spin_lock(&dlm->spinlock); 1405 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map)); 1406 1407 __dlm_set_joining_node(dlm, dlm->node_num); 1408 1409 spin_unlock(&dlm->spinlock); 1410 1411 node = -1; 1412 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES, 1413 node + 1)) < O2NM_MAX_NODES) { 1414 if (node == dlm->node_num) 1415 continue; 1416 1417 status = dlm_request_join(dlm, node, &response); 1418 if (status < 0) { 1419 mlog_errno(status); 1420 goto bail; 1421 } 1422 1423 /* Ok, either we got a response or the node doesn't have a 1424 * dlm up. */ 1425 if (response == JOIN_OK) 1426 set_bit(node, ctxt->yes_resp_map); 1427 1428 if (dlm_should_restart_join(dlm, ctxt, response)) { 1429 status = -EAGAIN; 1430 goto bail; 1431 } 1432 } 1433 1434 mlog(0, "Yay, done querying nodes!\n"); 1435 1436 /* Yay, everyone agree's we can join the domain. My domain is 1437 * comprised of all nodes who were put in the 1438 * yes_resp_map. Copy that into our domain map and send a join 1439 * assert message to clean up everyone elses state. */ 1440 spin_lock(&dlm->spinlock); 1441 memcpy(dlm->domain_map, ctxt->yes_resp_map, 1442 sizeof(ctxt->yes_resp_map)); 1443 set_bit(dlm->node_num, dlm->domain_map); 1444 spin_unlock(&dlm->spinlock); 1445 1446 /* Support for global heartbeat was added in 1.1 */ 1447 if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) { 1448 status = dlm_send_regions(dlm, ctxt->yes_resp_map); 1449 if (status) { 1450 mlog_errno(status); 1451 goto bail; 1452 } 1453 } 1454 1455 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 1456 1457 /* Joined state *must* be set before the joining node 1458 * information, otherwise the query_join handler may read no 1459 * current joiner but a state of NEW and tell joining nodes 1460 * we're not in the domain. */ 1461 spin_lock(&dlm_domain_lock); 1462 dlm->dlm_state = DLM_CTXT_JOINED; 1463 dlm->num_joins++; 1464 spin_unlock(&dlm_domain_lock); 1465 1466 bail: 1467 spin_lock(&dlm->spinlock); 1468 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 1469 if (!status) 1470 __dlm_print_nodes(dlm); 1471 spin_unlock(&dlm->spinlock); 1472 1473 if (ctxt) { 1474 /* Do we need to send a cancel message to any nodes? */ 1475 if (status < 0) { 1476 tmpstat = dlm_send_join_cancels(dlm, 1477 ctxt->yes_resp_map, 1478 sizeof(ctxt->yes_resp_map)); 1479 if (tmpstat < 0) 1480 mlog_errno(tmpstat); 1481 } 1482 kfree(ctxt); 1483 } 1484 1485 mlog(0, "returning %d\n", status); 1486 return status; 1487 } 1488 1489 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm) 1490 { 1491 o2hb_unregister_callback(NULL, &dlm->dlm_hb_up); 1492 o2hb_unregister_callback(NULL, &dlm->dlm_hb_down); 1493 o2net_unregister_handler_list(&dlm->dlm_domain_handlers); 1494 } 1495 1496 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) 1497 { 1498 int status; 1499 1500 mlog(0, "registering handlers.\n"); 1501 1502 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB, 1503 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI); 1504 status = o2hb_register_callback(NULL, &dlm->dlm_hb_down); 1505 if (status) 1506 goto bail; 1507 1508 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB, 1509 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI); 1510 status = o2hb_register_callback(NULL, &dlm->dlm_hb_up); 1511 if (status) 1512 goto bail; 1513 1514 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1515 sizeof(struct dlm_master_request), 1516 dlm_master_request_handler, 1517 dlm, NULL, &dlm->dlm_domain_handlers); 1518 if (status) 1519 goto bail; 1520 1521 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1522 sizeof(struct dlm_assert_master), 1523 dlm_assert_master_handler, 1524 dlm, dlm_assert_master_post_handler, 1525 &dlm->dlm_domain_handlers); 1526 if (status) 1527 goto bail; 1528 1529 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1530 sizeof(struct dlm_create_lock), 1531 dlm_create_lock_handler, 1532 dlm, NULL, &dlm->dlm_domain_handlers); 1533 if (status) 1534 goto bail; 1535 1536 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1537 DLM_CONVERT_LOCK_MAX_LEN, 1538 dlm_convert_lock_handler, 1539 dlm, NULL, &dlm->dlm_domain_handlers); 1540 if (status) 1541 goto bail; 1542 1543 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1544 DLM_UNLOCK_LOCK_MAX_LEN, 1545 dlm_unlock_lock_handler, 1546 dlm, NULL, &dlm->dlm_domain_handlers); 1547 if (status) 1548 goto bail; 1549 1550 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1551 DLM_PROXY_AST_MAX_LEN, 1552 dlm_proxy_ast_handler, 1553 dlm, NULL, &dlm->dlm_domain_handlers); 1554 if (status) 1555 goto bail; 1556 1557 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1558 sizeof(struct dlm_exit_domain), 1559 dlm_exit_domain_handler, 1560 dlm, NULL, &dlm->dlm_domain_handlers); 1561 if (status) 1562 goto bail; 1563 1564 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key, 1565 sizeof(struct dlm_deref_lockres), 1566 dlm_deref_lockres_handler, 1567 dlm, NULL, &dlm->dlm_domain_handlers); 1568 if (status) 1569 goto bail; 1570 1571 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1572 sizeof(struct dlm_migrate_request), 1573 dlm_migrate_request_handler, 1574 dlm, NULL, &dlm->dlm_domain_handlers); 1575 if (status) 1576 goto bail; 1577 1578 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1579 DLM_MIG_LOCKRES_MAX_LEN, 1580 dlm_mig_lockres_handler, 1581 dlm, NULL, &dlm->dlm_domain_handlers); 1582 if (status) 1583 goto bail; 1584 1585 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1586 sizeof(struct dlm_master_requery), 1587 dlm_master_requery_handler, 1588 dlm, NULL, &dlm->dlm_domain_handlers); 1589 if (status) 1590 goto bail; 1591 1592 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1593 sizeof(struct dlm_lock_request), 1594 dlm_request_all_locks_handler, 1595 dlm, NULL, &dlm->dlm_domain_handlers); 1596 if (status) 1597 goto bail; 1598 1599 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1600 sizeof(struct dlm_reco_data_done), 1601 dlm_reco_data_done_handler, 1602 dlm, NULL, &dlm->dlm_domain_handlers); 1603 if (status) 1604 goto bail; 1605 1606 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1607 sizeof(struct dlm_begin_reco), 1608 dlm_begin_reco_handler, 1609 dlm, NULL, &dlm->dlm_domain_handlers); 1610 if (status) 1611 goto bail; 1612 1613 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1614 sizeof(struct dlm_finalize_reco), 1615 dlm_finalize_reco_handler, 1616 dlm, NULL, &dlm->dlm_domain_handlers); 1617 if (status) 1618 goto bail; 1619 1620 bail: 1621 if (status) 1622 dlm_unregister_domain_handlers(dlm); 1623 1624 return status; 1625 } 1626 1627 static int dlm_join_domain(struct dlm_ctxt *dlm) 1628 { 1629 int status; 1630 unsigned int backoff; 1631 unsigned int total_backoff = 0; 1632 1633 BUG_ON(!dlm); 1634 1635 mlog(0, "Join domain %s\n", dlm->name); 1636 1637 status = dlm_register_domain_handlers(dlm); 1638 if (status) { 1639 mlog_errno(status); 1640 goto bail; 1641 } 1642 1643 status = dlm_debug_init(dlm); 1644 if (status < 0) { 1645 mlog_errno(status); 1646 goto bail; 1647 } 1648 1649 status = dlm_launch_thread(dlm); 1650 if (status < 0) { 1651 mlog_errno(status); 1652 goto bail; 1653 } 1654 1655 status = dlm_launch_recovery_thread(dlm); 1656 if (status < 0) { 1657 mlog_errno(status); 1658 goto bail; 1659 } 1660 1661 dlm->dlm_worker = create_singlethread_workqueue("dlm_wq"); 1662 if (!dlm->dlm_worker) { 1663 status = -ENOMEM; 1664 mlog_errno(status); 1665 goto bail; 1666 } 1667 1668 do { 1669 status = dlm_try_to_join_domain(dlm); 1670 1671 /* If we're racing another node to the join, then we 1672 * need to back off temporarily and let them 1673 * complete. */ 1674 #define DLM_JOIN_TIMEOUT_MSECS 90000 1675 if (status == -EAGAIN) { 1676 if (signal_pending(current)) { 1677 status = -ERESTARTSYS; 1678 goto bail; 1679 } 1680 1681 if (total_backoff > 1682 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) { 1683 status = -ERESTARTSYS; 1684 mlog(ML_NOTICE, "Timed out joining dlm domain " 1685 "%s after %u msecs\n", dlm->name, 1686 jiffies_to_msecs(total_backoff)); 1687 goto bail; 1688 } 1689 1690 /* 1691 * <chip> After you! 1692 * <dale> No, after you! 1693 * <chip> I insist! 1694 * <dale> But you first! 1695 * ... 1696 */ 1697 backoff = (unsigned int)(jiffies & 0x3); 1698 backoff *= DLM_DOMAIN_BACKOFF_MS; 1699 total_backoff += backoff; 1700 mlog(0, "backoff %d\n", backoff); 1701 msleep(backoff); 1702 } 1703 } while (status == -EAGAIN); 1704 1705 if (status < 0) { 1706 mlog_errno(status); 1707 goto bail; 1708 } 1709 1710 status = 0; 1711 bail: 1712 wake_up(&dlm_domain_events); 1713 1714 if (status) { 1715 dlm_unregister_domain_handlers(dlm); 1716 dlm_debug_shutdown(dlm); 1717 dlm_complete_thread(dlm); 1718 dlm_complete_recovery_thread(dlm); 1719 dlm_destroy_dlm_worker(dlm); 1720 } 1721 1722 return status; 1723 } 1724 1725 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, 1726 u32 key) 1727 { 1728 int i; 1729 int ret; 1730 struct dlm_ctxt *dlm = NULL; 1731 1732 dlm = kzalloc(sizeof(*dlm), GFP_KERNEL); 1733 if (!dlm) { 1734 mlog_errno(-ENOMEM); 1735 goto leave; 1736 } 1737 1738 dlm->name = kstrdup(domain, GFP_KERNEL); 1739 if (dlm->name == NULL) { 1740 mlog_errno(-ENOMEM); 1741 kfree(dlm); 1742 dlm = NULL; 1743 goto leave; 1744 } 1745 1746 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES); 1747 if (!dlm->lockres_hash) { 1748 mlog_errno(-ENOMEM); 1749 kfree(dlm->name); 1750 kfree(dlm); 1751 dlm = NULL; 1752 goto leave; 1753 } 1754 1755 for (i = 0; i < DLM_HASH_BUCKETS; i++) 1756 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); 1757 1758 dlm->master_hash = (struct hlist_head **) 1759 dlm_alloc_pagevec(DLM_HASH_PAGES); 1760 if (!dlm->master_hash) { 1761 mlog_errno(-ENOMEM); 1762 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 1763 kfree(dlm->name); 1764 kfree(dlm); 1765 dlm = NULL; 1766 goto leave; 1767 } 1768 1769 for (i = 0; i < DLM_HASH_BUCKETS; i++) 1770 INIT_HLIST_HEAD(dlm_master_hash(dlm, i)); 1771 1772 dlm->key = key; 1773 dlm->node_num = o2nm_this_node(); 1774 1775 ret = dlm_create_debugfs_subroot(dlm); 1776 if (ret < 0) { 1777 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); 1778 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 1779 kfree(dlm->name); 1780 kfree(dlm); 1781 dlm = NULL; 1782 goto leave; 1783 } 1784 1785 spin_lock_init(&dlm->spinlock); 1786 spin_lock_init(&dlm->master_lock); 1787 spin_lock_init(&dlm->ast_lock); 1788 spin_lock_init(&dlm->track_lock); 1789 INIT_LIST_HEAD(&dlm->list); 1790 INIT_LIST_HEAD(&dlm->dirty_list); 1791 INIT_LIST_HEAD(&dlm->reco.resources); 1792 INIT_LIST_HEAD(&dlm->reco.received); 1793 INIT_LIST_HEAD(&dlm->reco.node_data); 1794 INIT_LIST_HEAD(&dlm->purge_list); 1795 INIT_LIST_HEAD(&dlm->dlm_domain_handlers); 1796 INIT_LIST_HEAD(&dlm->tracking_list); 1797 dlm->reco.state = 0; 1798 1799 INIT_LIST_HEAD(&dlm->pending_asts); 1800 INIT_LIST_HEAD(&dlm->pending_basts); 1801 1802 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n", 1803 dlm->recovery_map, &(dlm->recovery_map[0])); 1804 1805 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map)); 1806 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map)); 1807 memset(dlm->domain_map, 0, sizeof(dlm->domain_map)); 1808 1809 dlm->dlm_thread_task = NULL; 1810 dlm->dlm_reco_thread_task = NULL; 1811 dlm->dlm_worker = NULL; 1812 init_waitqueue_head(&dlm->dlm_thread_wq); 1813 init_waitqueue_head(&dlm->dlm_reco_thread_wq); 1814 init_waitqueue_head(&dlm->reco.event); 1815 init_waitqueue_head(&dlm->ast_wq); 1816 init_waitqueue_head(&dlm->migration_wq); 1817 INIT_LIST_HEAD(&dlm->mle_hb_events); 1818 1819 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN; 1820 init_waitqueue_head(&dlm->dlm_join_events); 1821 1822 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 1823 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 1824 1825 atomic_set(&dlm->res_tot_count, 0); 1826 atomic_set(&dlm->res_cur_count, 0); 1827 for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) { 1828 atomic_set(&dlm->mle_tot_count[i], 0); 1829 atomic_set(&dlm->mle_cur_count[i], 0); 1830 } 1831 1832 spin_lock_init(&dlm->work_lock); 1833 INIT_LIST_HEAD(&dlm->work_list); 1834 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work); 1835 1836 kref_init(&dlm->dlm_refs); 1837 dlm->dlm_state = DLM_CTXT_NEW; 1838 1839 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks); 1840 1841 mlog(0, "context init: refcount %u\n", 1842 atomic_read(&dlm->dlm_refs.refcount)); 1843 1844 leave: 1845 return dlm; 1846 } 1847 1848 /* 1849 * Compare a requested locking protocol version against the current one. 1850 * 1851 * If the major numbers are different, they are incompatible. 1852 * If the current minor is greater than the request, they are incompatible. 1853 * If the current minor is less than or equal to the request, they are 1854 * compatible, and the requester should run at the current minor version. 1855 */ 1856 static int dlm_protocol_compare(struct dlm_protocol_version *existing, 1857 struct dlm_protocol_version *request) 1858 { 1859 if (existing->pv_major != request->pv_major) 1860 return 1; 1861 1862 if (existing->pv_minor > request->pv_minor) 1863 return 1; 1864 1865 if (existing->pv_minor < request->pv_minor) 1866 request->pv_minor = existing->pv_minor; 1867 1868 return 0; 1869 } 1870 1871 /* 1872 * dlm_register_domain: one-time setup per "domain". 1873 * 1874 * The filesystem passes in the requested locking version via proto. 1875 * If registration was successful, proto will contain the negotiated 1876 * locking protocol. 1877 */ 1878 struct dlm_ctxt * dlm_register_domain(const char *domain, 1879 u32 key, 1880 struct dlm_protocol_version *fs_proto) 1881 { 1882 int ret; 1883 struct dlm_ctxt *dlm = NULL; 1884 struct dlm_ctxt *new_ctxt = NULL; 1885 1886 if (strlen(domain) >= O2NM_MAX_NAME_LEN) { 1887 ret = -ENAMETOOLONG; 1888 mlog(ML_ERROR, "domain name length too long\n"); 1889 goto leave; 1890 } 1891 1892 if (!o2hb_check_local_node_heartbeating()) { 1893 mlog(ML_ERROR, "the local node has not been configured, or is " 1894 "not heartbeating\n"); 1895 ret = -EPROTO; 1896 goto leave; 1897 } 1898 1899 mlog(0, "register called for domain \"%s\"\n", domain); 1900 1901 retry: 1902 dlm = NULL; 1903 if (signal_pending(current)) { 1904 ret = -ERESTARTSYS; 1905 mlog_errno(ret); 1906 goto leave; 1907 } 1908 1909 spin_lock(&dlm_domain_lock); 1910 1911 dlm = __dlm_lookup_domain(domain); 1912 if (dlm) { 1913 if (dlm->dlm_state != DLM_CTXT_JOINED) { 1914 spin_unlock(&dlm_domain_lock); 1915 1916 mlog(0, "This ctxt is not joined yet!\n"); 1917 wait_event_interruptible(dlm_domain_events, 1918 dlm_wait_on_domain_helper( 1919 domain)); 1920 goto retry; 1921 } 1922 1923 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) { 1924 spin_unlock(&dlm_domain_lock); 1925 mlog(ML_ERROR, 1926 "Requested locking protocol version is not " 1927 "compatible with already registered domain " 1928 "\"%s\"\n", domain); 1929 ret = -EPROTO; 1930 goto leave; 1931 } 1932 1933 __dlm_get(dlm); 1934 dlm->num_joins++; 1935 1936 spin_unlock(&dlm_domain_lock); 1937 1938 ret = 0; 1939 goto leave; 1940 } 1941 1942 /* doesn't exist */ 1943 if (!new_ctxt) { 1944 spin_unlock(&dlm_domain_lock); 1945 1946 new_ctxt = dlm_alloc_ctxt(domain, key); 1947 if (new_ctxt) 1948 goto retry; 1949 1950 ret = -ENOMEM; 1951 mlog_errno(ret); 1952 goto leave; 1953 } 1954 1955 /* a little variable switch-a-roo here... */ 1956 dlm = new_ctxt; 1957 new_ctxt = NULL; 1958 1959 /* add the new domain */ 1960 list_add_tail(&dlm->list, &dlm_domains); 1961 spin_unlock(&dlm_domain_lock); 1962 1963 /* 1964 * Pass the locking protocol version into the join. If the join 1965 * succeeds, it will have the negotiated protocol set. 1966 */ 1967 dlm->dlm_locking_proto = dlm_protocol; 1968 dlm->fs_locking_proto = *fs_proto; 1969 1970 ret = dlm_join_domain(dlm); 1971 if (ret) { 1972 mlog_errno(ret); 1973 dlm_put(dlm); 1974 goto leave; 1975 } 1976 1977 /* Tell the caller what locking protocol we negotiated */ 1978 *fs_proto = dlm->fs_locking_proto; 1979 1980 ret = 0; 1981 leave: 1982 if (new_ctxt) 1983 dlm_free_ctxt_mem(new_ctxt); 1984 1985 if (ret < 0) 1986 dlm = ERR_PTR(ret); 1987 1988 return dlm; 1989 } 1990 EXPORT_SYMBOL_GPL(dlm_register_domain); 1991 1992 static LIST_HEAD(dlm_join_handlers); 1993 1994 static void dlm_unregister_net_handlers(void) 1995 { 1996 o2net_unregister_handler_list(&dlm_join_handlers); 1997 } 1998 1999 static int dlm_register_net_handlers(void) 2000 { 2001 int status = 0; 2002 2003 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 2004 sizeof(struct dlm_query_join_request), 2005 dlm_query_join_handler, 2006 NULL, NULL, &dlm_join_handlers); 2007 if (status) 2008 goto bail; 2009 2010 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 2011 sizeof(struct dlm_assert_joined), 2012 dlm_assert_joined_handler, 2013 NULL, NULL, &dlm_join_handlers); 2014 if (status) 2015 goto bail; 2016 2017 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 2018 sizeof(struct dlm_cancel_join), 2019 dlm_cancel_join_handler, 2020 NULL, NULL, &dlm_join_handlers); 2021 if (status) 2022 goto bail; 2023 2024 status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY, 2025 sizeof(struct dlm_query_region), 2026 dlm_query_region_handler, 2027 NULL, NULL, &dlm_join_handlers); 2028 2029 bail: 2030 if (status < 0) 2031 dlm_unregister_net_handlers(); 2032 2033 return status; 2034 } 2035 2036 /* Domain eviction callback handling. 2037 * 2038 * The file system requires notification of node death *before* the 2039 * dlm completes it's recovery work, otherwise it may be able to 2040 * acquire locks on resources requiring recovery. Since the dlm can 2041 * evict a node from it's domain *before* heartbeat fires, a similar 2042 * mechanism is required. */ 2043 2044 /* Eviction is not expected to happen often, so a per-domain lock is 2045 * not necessary. Eviction callbacks are allowed to sleep for short 2046 * periods of time. */ 2047 static DECLARE_RWSEM(dlm_callback_sem); 2048 2049 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm, 2050 int node_num) 2051 { 2052 struct list_head *iter; 2053 struct dlm_eviction_cb *cb; 2054 2055 down_read(&dlm_callback_sem); 2056 list_for_each(iter, &dlm->dlm_eviction_callbacks) { 2057 cb = list_entry(iter, struct dlm_eviction_cb, ec_item); 2058 2059 cb->ec_func(node_num, cb->ec_data); 2060 } 2061 up_read(&dlm_callback_sem); 2062 } 2063 2064 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb, 2065 dlm_eviction_func *f, 2066 void *data) 2067 { 2068 INIT_LIST_HEAD(&cb->ec_item); 2069 cb->ec_func = f; 2070 cb->ec_data = data; 2071 } 2072 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb); 2073 2074 void dlm_register_eviction_cb(struct dlm_ctxt *dlm, 2075 struct dlm_eviction_cb *cb) 2076 { 2077 down_write(&dlm_callback_sem); 2078 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks); 2079 up_write(&dlm_callback_sem); 2080 } 2081 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb); 2082 2083 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb) 2084 { 2085 down_write(&dlm_callback_sem); 2086 list_del_init(&cb->ec_item); 2087 up_write(&dlm_callback_sem); 2088 } 2089 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb); 2090 2091 static int __init dlm_init(void) 2092 { 2093 int status; 2094 2095 dlm_print_version(); 2096 2097 status = dlm_init_mle_cache(); 2098 if (status) { 2099 mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n"); 2100 goto error; 2101 } 2102 2103 status = dlm_init_master_caches(); 2104 if (status) { 2105 mlog(ML_ERROR, "Could not create o2dlm_lockres and " 2106 "o2dlm_lockname slabcaches\n"); 2107 goto error; 2108 } 2109 2110 status = dlm_init_lock_cache(); 2111 if (status) { 2112 mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n"); 2113 goto error; 2114 } 2115 2116 status = dlm_register_net_handlers(); 2117 if (status) { 2118 mlog(ML_ERROR, "Unable to register network handlers\n"); 2119 goto error; 2120 } 2121 2122 status = dlm_create_debugfs_root(); 2123 if (status) 2124 goto error; 2125 2126 return 0; 2127 error: 2128 dlm_unregister_net_handlers(); 2129 dlm_destroy_lock_cache(); 2130 dlm_destroy_master_caches(); 2131 dlm_destroy_mle_cache(); 2132 return -1; 2133 } 2134 2135 static void __exit dlm_exit (void) 2136 { 2137 dlm_destroy_debugfs_root(); 2138 dlm_unregister_net_handlers(); 2139 dlm_destroy_lock_cache(); 2140 dlm_destroy_master_caches(); 2141 dlm_destroy_mle_cache(); 2142 } 2143 2144 MODULE_AUTHOR("Oracle"); 2145 MODULE_LICENSE("GPL"); 2146 2147 module_init(dlm_init); 2148 module_exit(dlm_exit); 2149