1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmdomain.c 5 * 6 * defines domain join / leave apis 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 #include <linux/module.h> 28 #include <linux/types.h> 29 #include <linux/slab.h> 30 #include <linux/highmem.h> 31 #include <linux/init.h> 32 #include <linux/spinlock.h> 33 #include <linux/delay.h> 34 #include <linux/err.h> 35 #include <linux/debugfs.h> 36 37 #include "cluster/heartbeat.h" 38 #include "cluster/nodemanager.h" 39 #include "cluster/tcp.h" 40 41 #include "dlmapi.h" 42 #include "dlmcommon.h" 43 #include "dlmdomain.h" 44 #include "dlmdebug.h" 45 46 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 47 #include "cluster/masklog.h" 48 49 /* 50 * ocfs2 node maps are array of long int, which limits to send them freely 51 * across the wire due to endianness issues. To workaround this, we convert 52 * long ints to byte arrays. Following 3 routines are helper functions to 53 * set/test/copy bits within those array of bytes 54 */ 55 static inline void byte_set_bit(u8 nr, u8 map[]) 56 { 57 map[nr >> 3] |= (1UL << (nr & 7)); 58 } 59 60 static inline int byte_test_bit(u8 nr, u8 map[]) 61 { 62 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0; 63 } 64 65 static inline void byte_copymap(u8 dmap[], unsigned long smap[], 66 unsigned int sz) 67 { 68 unsigned int nn; 69 70 if (!sz) 71 return; 72 73 memset(dmap, 0, ((sz + 7) >> 3)); 74 for (nn = 0 ; nn < sz; nn++) 75 if (test_bit(nn, smap)) 76 byte_set_bit(nn, dmap); 77 } 78 79 static void dlm_free_pagevec(void **vec, int pages) 80 { 81 while (pages--) 82 free_page((unsigned long)vec[pages]); 83 kfree(vec); 84 } 85 86 static void **dlm_alloc_pagevec(int pages) 87 { 88 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL); 89 int i; 90 91 if (!vec) 92 return NULL; 93 94 for (i = 0; i < pages; i++) 95 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) 96 goto out_free; 97 98 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n", 99 pages, (unsigned long)DLM_HASH_PAGES, 100 (unsigned long)DLM_BUCKETS_PER_PAGE); 101 return vec; 102 out_free: 103 dlm_free_pagevec(vec, i); 104 return NULL; 105 } 106 107 /* 108 * 109 * spinlock lock ordering: if multiple locks are needed, obey this ordering: 110 * dlm_domain_lock 111 * struct dlm_ctxt->spinlock 112 * struct dlm_lock_resource->spinlock 113 * struct dlm_ctxt->master_lock 114 * struct dlm_ctxt->ast_lock 115 * dlm_master_list_entry->spinlock 116 * dlm_lock->spinlock 117 * 118 */ 119 120 DEFINE_SPINLOCK(dlm_domain_lock); 121 LIST_HEAD(dlm_domains); 122 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 123 124 /* 125 * The supported protocol version for DLM communication. Running domains 126 * will have a negotiated version with the same major number and a minor 127 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should 128 * be used to determine what a running domain is actually using. 129 * 130 * New in version 1.1: 131 * - Message DLM_QUERY_REGION added to support global heartbeat 132 * - Message DLM_QUERY_NODEINFO added to allow online node removes 133 * New in version 1.2: 134 * - Message DLM_BEGIN_EXIT_DOMAIN_MSG added to mark start of exit domain 135 * New in version 1.3: 136 * - Message DLM_DEREF_LOCKRES_DONE added to inform non-master that the 137 * refmap is cleared 138 */ 139 static const struct dlm_protocol_version dlm_protocol = { 140 .pv_major = 1, 141 .pv_minor = 3, 142 }; 143 144 #define DLM_DOMAIN_BACKOFF_MS 200 145 146 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 147 void **ret_data); 148 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 149 void **ret_data); 150 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 151 void **ret_data); 152 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len, 153 void *data, void **ret_data); 154 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 155 void **ret_data); 156 static int dlm_protocol_compare(struct dlm_protocol_version *existing, 157 struct dlm_protocol_version *request); 158 159 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 160 161 void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 162 { 163 if (hlist_unhashed(&res->hash_node)) 164 return; 165 166 mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len, 167 res->lockname.name); 168 hlist_del_init(&res->hash_node); 169 dlm_lockres_put(res); 170 } 171 172 void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 173 { 174 struct hlist_head *bucket; 175 176 assert_spin_locked(&dlm->spinlock); 177 178 bucket = dlm_lockres_hash(dlm, res->lockname.hash); 179 180 /* get a reference for our hashtable */ 181 dlm_lockres_get(res); 182 183 hlist_add_head(&res->hash_node, bucket); 184 185 mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len, 186 res->lockname.name); 187 } 188 189 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, 190 const char *name, 191 unsigned int len, 192 unsigned int hash) 193 { 194 struct hlist_head *bucket; 195 struct dlm_lock_resource *res; 196 197 mlog(0, "%.*s\n", len, name); 198 199 assert_spin_locked(&dlm->spinlock); 200 201 bucket = dlm_lockres_hash(dlm, hash); 202 203 hlist_for_each_entry(res, bucket, hash_node) { 204 if (res->lockname.name[0] != name[0]) 205 continue; 206 if (unlikely(res->lockname.len != len)) 207 continue; 208 if (memcmp(res->lockname.name + 1, name + 1, len - 1)) 209 continue; 210 dlm_lockres_get(res); 211 return res; 212 } 213 return NULL; 214 } 215 216 /* intended to be called by functions which do not care about lock 217 * resources which are being purged (most net _handler functions). 218 * this will return NULL for any lock resource which is found but 219 * currently in the process of dropping its mastery reference. 220 * use __dlm_lookup_lockres_full when you need the lock resource 221 * regardless (e.g. dlm_get_lock_resource) */ 222 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 223 const char *name, 224 unsigned int len, 225 unsigned int hash) 226 { 227 struct dlm_lock_resource *res = NULL; 228 229 mlog(0, "%.*s\n", len, name); 230 231 assert_spin_locked(&dlm->spinlock); 232 233 res = __dlm_lookup_lockres_full(dlm, name, len, hash); 234 if (res) { 235 spin_lock(&res->spinlock); 236 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 237 spin_unlock(&res->spinlock); 238 dlm_lockres_put(res); 239 return NULL; 240 } 241 spin_unlock(&res->spinlock); 242 } 243 244 return res; 245 } 246 247 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 248 const char *name, 249 unsigned int len) 250 { 251 struct dlm_lock_resource *res; 252 unsigned int hash = dlm_lockid_hash(name, len); 253 254 spin_lock(&dlm->spinlock); 255 res = __dlm_lookup_lockres(dlm, name, len, hash); 256 spin_unlock(&dlm->spinlock); 257 return res; 258 } 259 260 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len) 261 { 262 struct dlm_ctxt *tmp; 263 264 assert_spin_locked(&dlm_domain_lock); 265 266 /* tmp->name here is always NULL terminated, 267 * but domain may not be! */ 268 list_for_each_entry(tmp, &dlm_domains, list) { 269 if (strlen(tmp->name) == len && 270 memcmp(tmp->name, domain, len)==0) 271 return tmp; 272 } 273 274 return NULL; 275 } 276 277 /* For null terminated domain strings ONLY */ 278 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain) 279 { 280 assert_spin_locked(&dlm_domain_lock); 281 282 return __dlm_lookup_domain_full(domain, strlen(domain)); 283 } 284 285 286 /* returns true on one of two conditions: 287 * 1) the domain does not exist 288 * 2) the domain exists and it's state is "joined" */ 289 static int dlm_wait_on_domain_helper(const char *domain) 290 { 291 int ret = 0; 292 struct dlm_ctxt *tmp = NULL; 293 294 spin_lock(&dlm_domain_lock); 295 296 tmp = __dlm_lookup_domain(domain); 297 if (!tmp) 298 ret = 1; 299 else if (tmp->dlm_state == DLM_CTXT_JOINED) 300 ret = 1; 301 302 spin_unlock(&dlm_domain_lock); 303 return ret; 304 } 305 306 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) 307 { 308 dlm_destroy_debugfs_subroot(dlm); 309 310 if (dlm->lockres_hash) 311 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 312 313 if (dlm->master_hash) 314 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); 315 316 kfree(dlm->name); 317 kfree(dlm); 318 } 319 320 /* A little strange - this function will be called while holding 321 * dlm_domain_lock and is expected to be holding it on the way out. We 322 * will however drop and reacquire it multiple times */ 323 static void dlm_ctxt_release(struct kref *kref) 324 { 325 struct dlm_ctxt *dlm; 326 327 dlm = container_of(kref, struct dlm_ctxt, dlm_refs); 328 329 BUG_ON(dlm->num_joins); 330 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED); 331 332 /* we may still be in the list if we hit an error during join. */ 333 list_del_init(&dlm->list); 334 335 spin_unlock(&dlm_domain_lock); 336 337 mlog(0, "freeing memory from domain %s\n", dlm->name); 338 339 wake_up(&dlm_domain_events); 340 341 dlm_free_ctxt_mem(dlm); 342 343 spin_lock(&dlm_domain_lock); 344 } 345 346 void dlm_put(struct dlm_ctxt *dlm) 347 { 348 spin_lock(&dlm_domain_lock); 349 kref_put(&dlm->dlm_refs, dlm_ctxt_release); 350 spin_unlock(&dlm_domain_lock); 351 } 352 353 static void __dlm_get(struct dlm_ctxt *dlm) 354 { 355 kref_get(&dlm->dlm_refs); 356 } 357 358 /* given a questionable reference to a dlm object, gets a reference if 359 * it can find it in the list, otherwise returns NULL in which case 360 * you shouldn't trust your pointer. */ 361 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm) 362 { 363 struct dlm_ctxt *target; 364 struct dlm_ctxt *ret = NULL; 365 366 spin_lock(&dlm_domain_lock); 367 368 list_for_each_entry(target, &dlm_domains, list) { 369 if (target == dlm) { 370 __dlm_get(target); 371 ret = target; 372 break; 373 } 374 } 375 376 spin_unlock(&dlm_domain_lock); 377 378 return ret; 379 } 380 381 int dlm_domain_fully_joined(struct dlm_ctxt *dlm) 382 { 383 int ret; 384 385 spin_lock(&dlm_domain_lock); 386 ret = (dlm->dlm_state == DLM_CTXT_JOINED) || 387 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN); 388 spin_unlock(&dlm_domain_lock); 389 390 return ret; 391 } 392 393 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm) 394 { 395 if (dlm->dlm_worker) { 396 flush_workqueue(dlm->dlm_worker); 397 destroy_workqueue(dlm->dlm_worker); 398 dlm->dlm_worker = NULL; 399 } 400 } 401 402 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) 403 { 404 dlm_unregister_domain_handlers(dlm); 405 dlm_debug_shutdown(dlm); 406 dlm_complete_thread(dlm); 407 dlm_complete_recovery_thread(dlm); 408 dlm_destroy_dlm_worker(dlm); 409 410 /* We've left the domain. Now we can take ourselves out of the 411 * list and allow the kref stuff to help us free the 412 * memory. */ 413 spin_lock(&dlm_domain_lock); 414 list_del_init(&dlm->list); 415 spin_unlock(&dlm_domain_lock); 416 417 /* Wake up anyone waiting for us to remove this domain */ 418 wake_up(&dlm_domain_events); 419 } 420 421 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm) 422 { 423 int i, num, n, ret = 0; 424 struct dlm_lock_resource *res; 425 struct hlist_node *iter; 426 struct hlist_head *bucket; 427 int dropped; 428 429 mlog(0, "Migrating locks from domain %s\n", dlm->name); 430 431 num = 0; 432 spin_lock(&dlm->spinlock); 433 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 434 redo_bucket: 435 n = 0; 436 bucket = dlm_lockres_hash(dlm, i); 437 iter = bucket->first; 438 while (iter) { 439 n++; 440 res = hlist_entry(iter, struct dlm_lock_resource, 441 hash_node); 442 dlm_lockres_get(res); 443 /* migrate, if necessary. this will drop the dlm 444 * spinlock and retake it if it does migration. */ 445 dropped = dlm_empty_lockres(dlm, res); 446 447 spin_lock(&res->spinlock); 448 if (dropped) 449 __dlm_lockres_calc_usage(dlm, res); 450 else 451 iter = res->hash_node.next; 452 spin_unlock(&res->spinlock); 453 454 dlm_lockres_put(res); 455 456 if (dropped) { 457 cond_resched_lock(&dlm->spinlock); 458 goto redo_bucket; 459 } 460 } 461 cond_resched_lock(&dlm->spinlock); 462 num += n; 463 } 464 spin_unlock(&dlm->spinlock); 465 wake_up(&dlm->dlm_thread_wq); 466 467 /* let the dlm thread take care of purging, keep scanning until 468 * nothing remains in the hash */ 469 if (num) { 470 mlog(0, "%s: %d lock resources in hash last pass\n", 471 dlm->name, num); 472 ret = -EAGAIN; 473 } 474 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 475 return ret; 476 } 477 478 static int dlm_no_joining_node(struct dlm_ctxt *dlm) 479 { 480 int ret; 481 482 spin_lock(&dlm->spinlock); 483 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN; 484 spin_unlock(&dlm->spinlock); 485 486 return ret; 487 } 488 489 static int dlm_begin_exit_domain_handler(struct o2net_msg *msg, u32 len, 490 void *data, void **ret_data) 491 { 492 struct dlm_ctxt *dlm = data; 493 unsigned int node; 494 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; 495 496 if (!dlm_grab(dlm)) 497 return 0; 498 499 node = exit_msg->node_idx; 500 mlog(0, "%s: Node %u sent a begin exit domain message\n", dlm->name, node); 501 502 spin_lock(&dlm->spinlock); 503 set_bit(node, dlm->exit_domain_map); 504 spin_unlock(&dlm->spinlock); 505 506 dlm_put(dlm); 507 508 return 0; 509 } 510 511 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm) 512 { 513 /* Yikes, a double spinlock! I need domain_lock for the dlm 514 * state and the dlm spinlock for join state... Sorry! */ 515 again: 516 spin_lock(&dlm_domain_lock); 517 spin_lock(&dlm->spinlock); 518 519 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 520 mlog(0, "Node %d is joining, we wait on it.\n", 521 dlm->joining_node); 522 spin_unlock(&dlm->spinlock); 523 spin_unlock(&dlm_domain_lock); 524 525 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm)); 526 goto again; 527 } 528 529 dlm->dlm_state = DLM_CTXT_LEAVING; 530 spin_unlock(&dlm->spinlock); 531 spin_unlock(&dlm_domain_lock); 532 } 533 534 static void __dlm_print_nodes(struct dlm_ctxt *dlm) 535 { 536 int node = -1, num = 0; 537 538 assert_spin_locked(&dlm->spinlock); 539 540 printk("( "); 541 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 542 node + 1)) < O2NM_MAX_NODES) { 543 printk("%d ", node); 544 ++num; 545 } 546 printk(") %u nodes\n", num); 547 } 548 549 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 550 void **ret_data) 551 { 552 struct dlm_ctxt *dlm = data; 553 unsigned int node; 554 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; 555 556 mlog(0, "%p %u %p", msg, len, data); 557 558 if (!dlm_grab(dlm)) 559 return 0; 560 561 node = exit_msg->node_idx; 562 563 spin_lock(&dlm->spinlock); 564 clear_bit(node, dlm->domain_map); 565 clear_bit(node, dlm->exit_domain_map); 566 printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name); 567 __dlm_print_nodes(dlm); 568 569 /* notify anything attached to the heartbeat events */ 570 dlm_hb_event_notify_attached(dlm, node, 0); 571 572 spin_unlock(&dlm->spinlock); 573 574 dlm_put(dlm); 575 576 return 0; 577 } 578 579 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, u32 msg_type, 580 unsigned int node) 581 { 582 int status; 583 struct dlm_exit_domain leave_msg; 584 585 mlog(0, "%s: Sending domain exit message %u to node %u\n", dlm->name, 586 msg_type, node); 587 588 memset(&leave_msg, 0, sizeof(leave_msg)); 589 leave_msg.node_idx = dlm->node_num; 590 591 status = o2net_send_message(msg_type, dlm->key, &leave_msg, 592 sizeof(leave_msg), node, NULL); 593 if (status < 0) 594 mlog(ML_ERROR, "Error %d sending domain exit message %u " 595 "to node %u on domain %s\n", status, msg_type, node, 596 dlm->name); 597 598 return status; 599 } 600 601 static void dlm_begin_exit_domain(struct dlm_ctxt *dlm) 602 { 603 int node = -1; 604 605 /* Support for begin exit domain was added in 1.2 */ 606 if (dlm->dlm_locking_proto.pv_major == 1 && 607 dlm->dlm_locking_proto.pv_minor < 2) 608 return; 609 610 /* 611 * Unlike DLM_EXIT_DOMAIN_MSG, DLM_BEGIN_EXIT_DOMAIN_MSG is purely 612 * informational. Meaning if a node does not receive the message, 613 * so be it. 614 */ 615 spin_lock(&dlm->spinlock); 616 while (1) { 617 node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1); 618 if (node >= O2NM_MAX_NODES) 619 break; 620 if (node == dlm->node_num) 621 continue; 622 623 spin_unlock(&dlm->spinlock); 624 dlm_send_one_domain_exit(dlm, DLM_BEGIN_EXIT_DOMAIN_MSG, node); 625 spin_lock(&dlm->spinlock); 626 } 627 spin_unlock(&dlm->spinlock); 628 } 629 630 static void dlm_leave_domain(struct dlm_ctxt *dlm) 631 { 632 int node, clear_node, status; 633 634 /* At this point we've migrated away all our locks and won't 635 * accept mastership of new ones. The dlm is responsible for 636 * almost nothing now. We make sure not to confuse any joining 637 * nodes and then commence shutdown procedure. */ 638 639 spin_lock(&dlm->spinlock); 640 /* Clear ourselves from the domain map */ 641 clear_bit(dlm->node_num, dlm->domain_map); 642 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 643 0)) < O2NM_MAX_NODES) { 644 /* Drop the dlm spinlock. This is safe wrt the domain_map. 645 * -nodes cannot be added now as the 646 * query_join_handlers knows to respond with OK_NO_MAP 647 * -we catch the right network errors if a node is 648 * removed from the map while we're sending him the 649 * exit message. */ 650 spin_unlock(&dlm->spinlock); 651 652 clear_node = 1; 653 654 status = dlm_send_one_domain_exit(dlm, DLM_EXIT_DOMAIN_MSG, 655 node); 656 if (status < 0 && 657 status != -ENOPROTOOPT && 658 status != -ENOTCONN) { 659 mlog(ML_NOTICE, "Error %d sending domain exit message " 660 "to node %d\n", status, node); 661 662 /* Not sure what to do here but lets sleep for 663 * a bit in case this was a transient 664 * error... */ 665 msleep(DLM_DOMAIN_BACKOFF_MS); 666 clear_node = 0; 667 } 668 669 spin_lock(&dlm->spinlock); 670 /* If we're not clearing the node bit then we intend 671 * to loop back around to try again. */ 672 if (clear_node) 673 clear_bit(node, dlm->domain_map); 674 } 675 spin_unlock(&dlm->spinlock); 676 } 677 678 int dlm_shutting_down(struct dlm_ctxt *dlm) 679 { 680 int ret = 0; 681 682 spin_lock(&dlm_domain_lock); 683 684 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN) 685 ret = 1; 686 687 spin_unlock(&dlm_domain_lock); 688 689 return ret; 690 } 691 692 void dlm_unregister_domain(struct dlm_ctxt *dlm) 693 { 694 int leave = 0; 695 struct dlm_lock_resource *res; 696 697 spin_lock(&dlm_domain_lock); 698 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED); 699 BUG_ON(!dlm->num_joins); 700 701 dlm->num_joins--; 702 if (!dlm->num_joins) { 703 /* We mark it "in shutdown" now so new register 704 * requests wait until we've completely left the 705 * domain. Don't use DLM_CTXT_LEAVING yet as we still 706 * want new domain joins to communicate with us at 707 * least until we've completed migration of our 708 * resources. */ 709 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN; 710 leave = 1; 711 } 712 spin_unlock(&dlm_domain_lock); 713 714 if (leave) { 715 mlog(0, "shutting down domain %s\n", dlm->name); 716 dlm_begin_exit_domain(dlm); 717 718 /* We changed dlm state, notify the thread */ 719 dlm_kick_thread(dlm, NULL); 720 721 while (dlm_migrate_all_locks(dlm)) { 722 /* Give dlm_thread time to purge the lockres' */ 723 msleep(500); 724 mlog(0, "%s: more migration to do\n", dlm->name); 725 } 726 727 /* This list should be empty. If not, print remaining lockres */ 728 if (!list_empty(&dlm->tracking_list)) { 729 mlog(ML_ERROR, "Following lockres' are still on the " 730 "tracking list:\n"); 731 list_for_each_entry(res, &dlm->tracking_list, tracking) 732 dlm_print_one_lock_resource(res); 733 } 734 735 dlm_mark_domain_leaving(dlm); 736 dlm_leave_domain(dlm); 737 printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name); 738 dlm_force_free_mles(dlm); 739 dlm_complete_dlm_shutdown(dlm); 740 } 741 dlm_put(dlm); 742 } 743 EXPORT_SYMBOL_GPL(dlm_unregister_domain); 744 745 static int dlm_query_join_proto_check(char *proto_type, int node, 746 struct dlm_protocol_version *ours, 747 struct dlm_protocol_version *request) 748 { 749 int rc; 750 struct dlm_protocol_version proto = *request; 751 752 if (!dlm_protocol_compare(ours, &proto)) { 753 mlog(0, 754 "node %u wanted to join with %s locking protocol " 755 "%u.%u, we respond with %u.%u\n", 756 node, proto_type, 757 request->pv_major, 758 request->pv_minor, 759 proto.pv_major, proto.pv_minor); 760 request->pv_minor = proto.pv_minor; 761 rc = 0; 762 } else { 763 mlog(ML_NOTICE, 764 "Node %u wanted to join with %s locking " 765 "protocol %u.%u, but we have %u.%u, disallowing\n", 766 node, proto_type, 767 request->pv_major, 768 request->pv_minor, 769 ours->pv_major, 770 ours->pv_minor); 771 rc = 1; 772 } 773 774 return rc; 775 } 776 777 /* 778 * struct dlm_query_join_packet is made up of four one-byte fields. They 779 * are effectively in big-endian order already. However, little-endian 780 * machines swap them before putting the packet on the wire (because 781 * query_join's response is a status, and that status is treated as a u32 782 * on the wire). Thus, a big-endian and little-endian machines will treat 783 * this structure differently. 784 * 785 * The solution is to have little-endian machines swap the structure when 786 * converting from the structure to the u32 representation. This will 787 * result in the structure having the correct format on the wire no matter 788 * the host endian format. 789 */ 790 static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet, 791 u32 *wire) 792 { 793 union dlm_query_join_response response; 794 795 response.packet = *packet; 796 *wire = be32_to_cpu(response.intval); 797 } 798 799 static void dlm_query_join_wire_to_packet(u32 wire, 800 struct dlm_query_join_packet *packet) 801 { 802 union dlm_query_join_response response; 803 804 response.intval = cpu_to_be32(wire); 805 *packet = response.packet; 806 } 807 808 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 809 void **ret_data) 810 { 811 struct dlm_query_join_request *query; 812 struct dlm_query_join_packet packet = { 813 .code = JOIN_DISALLOW, 814 }; 815 struct dlm_ctxt *dlm = NULL; 816 u32 response; 817 u8 nodenum; 818 819 query = (struct dlm_query_join_request *) msg->buf; 820 821 mlog(0, "node %u wants to join domain %s\n", query->node_idx, 822 query->domain); 823 824 /* 825 * If heartbeat doesn't consider the node live, tell it 826 * to back off and try again. This gives heartbeat a chance 827 * to catch up. 828 */ 829 if (!o2hb_check_node_heartbeating_no_sem(query->node_idx)) { 830 mlog(0, "node %u is not in our live map yet\n", 831 query->node_idx); 832 833 packet.code = JOIN_DISALLOW; 834 goto respond; 835 } 836 837 packet.code = JOIN_OK_NO_MAP; 838 839 spin_lock(&dlm_domain_lock); 840 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 841 if (!dlm) 842 goto unlock_respond; 843 844 /* 845 * There is a small window where the joining node may not see the 846 * node(s) that just left but still part of the cluster. DISALLOW 847 * join request if joining node has different node map. 848 */ 849 nodenum=0; 850 while (nodenum < O2NM_MAX_NODES) { 851 if (test_bit(nodenum, dlm->domain_map)) { 852 if (!byte_test_bit(nodenum, query->node_map)) { 853 mlog(0, "disallow join as node %u does not " 854 "have node %u in its nodemap\n", 855 query->node_idx, nodenum); 856 packet.code = JOIN_DISALLOW; 857 goto unlock_respond; 858 } 859 } 860 nodenum++; 861 } 862 863 /* Once the dlm ctxt is marked as leaving then we don't want 864 * to be put in someone's domain map. 865 * Also, explicitly disallow joining at certain troublesome 866 * times (ie. during recovery). */ 867 if (dlm->dlm_state != DLM_CTXT_LEAVING) { 868 int bit = query->node_idx; 869 spin_lock(&dlm->spinlock); 870 871 if (dlm->dlm_state == DLM_CTXT_NEW && 872 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) { 873 /*If this is a brand new context and we 874 * haven't started our join process yet, then 875 * the other node won the race. */ 876 packet.code = JOIN_OK_NO_MAP; 877 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 878 /* Disallow parallel joins. */ 879 packet.code = JOIN_DISALLOW; 880 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 881 mlog(0, "node %u trying to join, but recovery " 882 "is ongoing.\n", bit); 883 packet.code = JOIN_DISALLOW; 884 } else if (test_bit(bit, dlm->recovery_map)) { 885 mlog(0, "node %u trying to join, but it " 886 "still needs recovery.\n", bit); 887 packet.code = JOIN_DISALLOW; 888 } else if (test_bit(bit, dlm->domain_map)) { 889 mlog(0, "node %u trying to join, but it " 890 "is still in the domain! needs recovery?\n", 891 bit); 892 packet.code = JOIN_DISALLOW; 893 } else { 894 /* Alright we're fully a part of this domain 895 * so we keep some state as to who's joining 896 * and indicate to him that needs to be fixed 897 * up. */ 898 899 /* Make sure we speak compatible locking protocols. */ 900 if (dlm_query_join_proto_check("DLM", bit, 901 &dlm->dlm_locking_proto, 902 &query->dlm_proto)) { 903 packet.code = JOIN_PROTOCOL_MISMATCH; 904 } else if (dlm_query_join_proto_check("fs", bit, 905 &dlm->fs_locking_proto, 906 &query->fs_proto)) { 907 packet.code = JOIN_PROTOCOL_MISMATCH; 908 } else { 909 packet.dlm_minor = query->dlm_proto.pv_minor; 910 packet.fs_minor = query->fs_proto.pv_minor; 911 packet.code = JOIN_OK; 912 __dlm_set_joining_node(dlm, query->node_idx); 913 } 914 } 915 916 spin_unlock(&dlm->spinlock); 917 } 918 unlock_respond: 919 spin_unlock(&dlm_domain_lock); 920 921 respond: 922 mlog(0, "We respond with %u\n", packet.code); 923 924 dlm_query_join_packet_to_wire(&packet, &response); 925 return response; 926 } 927 928 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 929 void **ret_data) 930 { 931 struct dlm_assert_joined *assert; 932 struct dlm_ctxt *dlm = NULL; 933 934 assert = (struct dlm_assert_joined *) msg->buf; 935 936 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx, 937 assert->domain); 938 939 spin_lock(&dlm_domain_lock); 940 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len); 941 /* XXX should we consider no dlm ctxt an error? */ 942 if (dlm) { 943 spin_lock(&dlm->spinlock); 944 945 /* Alright, this node has officially joined our 946 * domain. Set him in the map and clean up our 947 * leftover join state. */ 948 BUG_ON(dlm->joining_node != assert->node_idx); 949 950 if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 951 mlog(0, "dlm recovery is ongoing, disallow join\n"); 952 spin_unlock(&dlm->spinlock); 953 spin_unlock(&dlm_domain_lock); 954 return -EAGAIN; 955 } 956 957 set_bit(assert->node_idx, dlm->domain_map); 958 clear_bit(assert->node_idx, dlm->exit_domain_map); 959 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 960 961 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ", 962 assert->node_idx, dlm->name); 963 __dlm_print_nodes(dlm); 964 965 /* notify anything attached to the heartbeat events */ 966 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1); 967 968 spin_unlock(&dlm->spinlock); 969 } 970 spin_unlock(&dlm_domain_lock); 971 972 return 0; 973 } 974 975 static int dlm_match_regions(struct dlm_ctxt *dlm, 976 struct dlm_query_region *qr, 977 char *local, int locallen) 978 { 979 char *remote = qr->qr_regions; 980 char *l, *r; 981 int localnr, i, j, foundit; 982 int status = 0; 983 984 if (!o2hb_global_heartbeat_active()) { 985 if (qr->qr_numregions) { 986 mlog(ML_ERROR, "Domain %s: Joining node %d has global " 987 "heartbeat enabled but local node %d does not\n", 988 qr->qr_domain, qr->qr_node, dlm->node_num); 989 status = -EINVAL; 990 } 991 goto bail; 992 } 993 994 if (o2hb_global_heartbeat_active() && !qr->qr_numregions) { 995 mlog(ML_ERROR, "Domain %s: Local node %d has global " 996 "heartbeat enabled but joining node %d does not\n", 997 qr->qr_domain, dlm->node_num, qr->qr_node); 998 status = -EINVAL; 999 goto bail; 1000 } 1001 1002 r = remote; 1003 for (i = 0; i < qr->qr_numregions; ++i) { 1004 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r); 1005 r += O2HB_MAX_REGION_NAME_LEN; 1006 } 1007 1008 localnr = min(O2NM_MAX_REGIONS, locallen/O2HB_MAX_REGION_NAME_LEN); 1009 localnr = o2hb_get_all_regions(local, (u8)localnr); 1010 1011 /* compare local regions with remote */ 1012 l = local; 1013 for (i = 0; i < localnr; ++i) { 1014 foundit = 0; 1015 r = remote; 1016 for (j = 0; j <= qr->qr_numregions; ++j) { 1017 if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) { 1018 foundit = 1; 1019 break; 1020 } 1021 r += O2HB_MAX_REGION_NAME_LEN; 1022 } 1023 if (!foundit) { 1024 status = -EINVAL; 1025 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered " 1026 "in local node %d but not in joining node %d\n", 1027 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l, 1028 dlm->node_num, qr->qr_node); 1029 goto bail; 1030 } 1031 l += O2HB_MAX_REGION_NAME_LEN; 1032 } 1033 1034 /* compare remote with local regions */ 1035 r = remote; 1036 for (i = 0; i < qr->qr_numregions; ++i) { 1037 foundit = 0; 1038 l = local; 1039 for (j = 0; j < localnr; ++j) { 1040 if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) { 1041 foundit = 1; 1042 break; 1043 } 1044 l += O2HB_MAX_REGION_NAME_LEN; 1045 } 1046 if (!foundit) { 1047 status = -EINVAL; 1048 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered " 1049 "in joining node %d but not in local node %d\n", 1050 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r, 1051 qr->qr_node, dlm->node_num); 1052 goto bail; 1053 } 1054 r += O2HB_MAX_REGION_NAME_LEN; 1055 } 1056 1057 bail: 1058 return status; 1059 } 1060 1061 static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map) 1062 { 1063 struct dlm_query_region *qr = NULL; 1064 int status, ret = 0, i; 1065 char *p; 1066 1067 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES) 1068 goto bail; 1069 1070 qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL); 1071 if (!qr) { 1072 ret = -ENOMEM; 1073 mlog_errno(ret); 1074 goto bail; 1075 } 1076 1077 qr->qr_node = dlm->node_num; 1078 qr->qr_namelen = strlen(dlm->name); 1079 memcpy(qr->qr_domain, dlm->name, qr->qr_namelen); 1080 /* if local hb, the numregions will be zero */ 1081 if (o2hb_global_heartbeat_active()) 1082 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions, 1083 O2NM_MAX_REGIONS); 1084 1085 p = qr->qr_regions; 1086 for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN) 1087 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p); 1088 1089 i = -1; 1090 while ((i = find_next_bit(node_map, O2NM_MAX_NODES, 1091 i + 1)) < O2NM_MAX_NODES) { 1092 if (i == dlm->node_num) 1093 continue; 1094 1095 mlog(0, "Sending regions to node %d\n", i); 1096 1097 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr, 1098 sizeof(struct dlm_query_region), 1099 i, &status); 1100 if (ret >= 0) 1101 ret = status; 1102 if (ret) { 1103 mlog(ML_ERROR, "Region mismatch %d, node %d\n", 1104 ret, i); 1105 break; 1106 } 1107 } 1108 1109 bail: 1110 kfree(qr); 1111 return ret; 1112 } 1113 1114 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len, 1115 void *data, void **ret_data) 1116 { 1117 struct dlm_query_region *qr; 1118 struct dlm_ctxt *dlm = NULL; 1119 char *local = NULL; 1120 int status = 0; 1121 1122 qr = (struct dlm_query_region *) msg->buf; 1123 1124 mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node, 1125 qr->qr_domain); 1126 1127 /* buffer used in dlm_mast_regions() */ 1128 local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL); 1129 if (!local) 1130 return -ENOMEM; 1131 1132 status = -EINVAL; 1133 1134 spin_lock(&dlm_domain_lock); 1135 dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen); 1136 if (!dlm) { 1137 mlog(ML_ERROR, "Node %d queried hb regions on domain %s " 1138 "before join domain\n", qr->qr_node, qr->qr_domain); 1139 goto out_domain_lock; 1140 } 1141 1142 spin_lock(&dlm->spinlock); 1143 if (dlm->joining_node != qr->qr_node) { 1144 mlog(ML_ERROR, "Node %d queried hb regions on domain %s " 1145 "but joining node is %d\n", qr->qr_node, qr->qr_domain, 1146 dlm->joining_node); 1147 goto out_dlm_lock; 1148 } 1149 1150 /* Support for global heartbeat was added in 1.1 */ 1151 if (dlm->dlm_locking_proto.pv_major == 1 && 1152 dlm->dlm_locking_proto.pv_minor == 0) { 1153 mlog(ML_ERROR, "Node %d queried hb regions on domain %s " 1154 "but active dlm protocol is %d.%d\n", qr->qr_node, 1155 qr->qr_domain, dlm->dlm_locking_proto.pv_major, 1156 dlm->dlm_locking_proto.pv_minor); 1157 goto out_dlm_lock; 1158 } 1159 1160 status = dlm_match_regions(dlm, qr, local, sizeof(qr->qr_regions)); 1161 1162 out_dlm_lock: 1163 spin_unlock(&dlm->spinlock); 1164 1165 out_domain_lock: 1166 spin_unlock(&dlm_domain_lock); 1167 1168 kfree(local); 1169 1170 return status; 1171 } 1172 1173 static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn) 1174 { 1175 struct o2nm_node *local; 1176 struct dlm_node_info *remote; 1177 int i, j; 1178 int status = 0; 1179 1180 for (j = 0; j < qn->qn_numnodes; ++j) 1181 mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum, 1182 &(qn->qn_nodes[j].ni_ipv4_address), 1183 ntohs(qn->qn_nodes[j].ni_ipv4_port)); 1184 1185 for (i = 0; i < O2NM_MAX_NODES && !status; ++i) { 1186 local = o2nm_get_node_by_num(i); 1187 remote = NULL; 1188 for (j = 0; j < qn->qn_numnodes; ++j) { 1189 if (qn->qn_nodes[j].ni_nodenum == i) { 1190 remote = &(qn->qn_nodes[j]); 1191 break; 1192 } 1193 } 1194 1195 if (!local && !remote) 1196 continue; 1197 1198 if ((local && !remote) || (!local && remote)) 1199 status = -EINVAL; 1200 1201 if (!status && 1202 ((remote->ni_nodenum != local->nd_num) || 1203 (remote->ni_ipv4_port != local->nd_ipv4_port) || 1204 (remote->ni_ipv4_address != local->nd_ipv4_address))) 1205 status = -EINVAL; 1206 1207 if (status) { 1208 if (remote && !local) 1209 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) " 1210 "registered in joining node %d but not in " 1211 "local node %d\n", qn->qn_domain, 1212 remote->ni_nodenum, 1213 &(remote->ni_ipv4_address), 1214 ntohs(remote->ni_ipv4_port), 1215 qn->qn_nodenum, dlm->node_num); 1216 if (local && !remote) 1217 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) " 1218 "registered in local node %d but not in " 1219 "joining node %d\n", qn->qn_domain, 1220 local->nd_num, &(local->nd_ipv4_address), 1221 ntohs(local->nd_ipv4_port), 1222 dlm->node_num, qn->qn_nodenum); 1223 BUG_ON((!local && !remote)); 1224 } 1225 1226 if (local) 1227 o2nm_node_put(local); 1228 } 1229 1230 return status; 1231 } 1232 1233 static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map) 1234 { 1235 struct dlm_query_nodeinfo *qn = NULL; 1236 struct o2nm_node *node; 1237 int ret = 0, status, count, i; 1238 1239 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES) 1240 goto bail; 1241 1242 qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL); 1243 if (!qn) { 1244 ret = -ENOMEM; 1245 mlog_errno(ret); 1246 goto bail; 1247 } 1248 1249 for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) { 1250 node = o2nm_get_node_by_num(i); 1251 if (!node) 1252 continue; 1253 qn->qn_nodes[count].ni_nodenum = node->nd_num; 1254 qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port; 1255 qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address; 1256 mlog(0, "Node %3d, %pI4:%u\n", node->nd_num, 1257 &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port)); 1258 ++count; 1259 o2nm_node_put(node); 1260 } 1261 1262 qn->qn_nodenum = dlm->node_num; 1263 qn->qn_numnodes = count; 1264 qn->qn_namelen = strlen(dlm->name); 1265 memcpy(qn->qn_domain, dlm->name, qn->qn_namelen); 1266 1267 i = -1; 1268 while ((i = find_next_bit(node_map, O2NM_MAX_NODES, 1269 i + 1)) < O2NM_MAX_NODES) { 1270 if (i == dlm->node_num) 1271 continue; 1272 1273 mlog(0, "Sending nodeinfo to node %d\n", i); 1274 1275 ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY, 1276 qn, sizeof(struct dlm_query_nodeinfo), 1277 i, &status); 1278 if (ret >= 0) 1279 ret = status; 1280 if (ret) { 1281 mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i); 1282 break; 1283 } 1284 } 1285 1286 bail: 1287 kfree(qn); 1288 return ret; 1289 } 1290 1291 static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len, 1292 void *data, void **ret_data) 1293 { 1294 struct dlm_query_nodeinfo *qn; 1295 struct dlm_ctxt *dlm = NULL; 1296 int locked = 0, status = -EINVAL; 1297 1298 qn = (struct dlm_query_nodeinfo *) msg->buf; 1299 1300 mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum, 1301 qn->qn_domain); 1302 1303 spin_lock(&dlm_domain_lock); 1304 dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen); 1305 if (!dlm) { 1306 mlog(ML_ERROR, "Node %d queried nodes on domain %s before " 1307 "join domain\n", qn->qn_nodenum, qn->qn_domain); 1308 goto bail; 1309 } 1310 1311 spin_lock(&dlm->spinlock); 1312 locked = 1; 1313 if (dlm->joining_node != qn->qn_nodenum) { 1314 mlog(ML_ERROR, "Node %d queried nodes on domain %s but " 1315 "joining node is %d\n", qn->qn_nodenum, qn->qn_domain, 1316 dlm->joining_node); 1317 goto bail; 1318 } 1319 1320 /* Support for node query was added in 1.1 */ 1321 if (dlm->dlm_locking_proto.pv_major == 1 && 1322 dlm->dlm_locking_proto.pv_minor == 0) { 1323 mlog(ML_ERROR, "Node %d queried nodes on domain %s " 1324 "but active dlm protocol is %d.%d\n", qn->qn_nodenum, 1325 qn->qn_domain, dlm->dlm_locking_proto.pv_major, 1326 dlm->dlm_locking_proto.pv_minor); 1327 goto bail; 1328 } 1329 1330 status = dlm_match_nodes(dlm, qn); 1331 1332 bail: 1333 if (locked) 1334 spin_unlock(&dlm->spinlock); 1335 spin_unlock(&dlm_domain_lock); 1336 1337 return status; 1338 } 1339 1340 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 1341 void **ret_data) 1342 { 1343 struct dlm_cancel_join *cancel; 1344 struct dlm_ctxt *dlm = NULL; 1345 1346 cancel = (struct dlm_cancel_join *) msg->buf; 1347 1348 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx, 1349 cancel->domain); 1350 1351 spin_lock(&dlm_domain_lock); 1352 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len); 1353 1354 if (dlm) { 1355 spin_lock(&dlm->spinlock); 1356 1357 /* Yikes, this guy wants to cancel his join. No 1358 * problem, we simply cleanup our join state. */ 1359 BUG_ON(dlm->joining_node != cancel->node_idx); 1360 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 1361 1362 spin_unlock(&dlm->spinlock); 1363 } 1364 spin_unlock(&dlm_domain_lock); 1365 1366 return 0; 1367 } 1368 1369 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm, 1370 unsigned int node) 1371 { 1372 int status; 1373 struct dlm_cancel_join cancel_msg; 1374 1375 memset(&cancel_msg, 0, sizeof(cancel_msg)); 1376 cancel_msg.node_idx = dlm->node_num; 1377 cancel_msg.name_len = strlen(dlm->name); 1378 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len); 1379 1380 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1381 &cancel_msg, sizeof(cancel_msg), node, 1382 NULL); 1383 if (status < 0) { 1384 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1385 "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1386 node); 1387 goto bail; 1388 } 1389 1390 bail: 1391 return status; 1392 } 1393 1394 /* map_size should be in bytes. */ 1395 static int dlm_send_join_cancels(struct dlm_ctxt *dlm, 1396 unsigned long *node_map, 1397 unsigned int map_size) 1398 { 1399 int status, tmpstat; 1400 int node; 1401 1402 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) * 1403 sizeof(unsigned long))) { 1404 mlog(ML_ERROR, 1405 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n", 1406 map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES)); 1407 return -EINVAL; 1408 } 1409 1410 status = 0; 1411 node = -1; 1412 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 1413 node + 1)) < O2NM_MAX_NODES) { 1414 if (node == dlm->node_num) 1415 continue; 1416 1417 tmpstat = dlm_send_one_join_cancel(dlm, node); 1418 if (tmpstat) { 1419 mlog(ML_ERROR, "Error return %d cancelling join on " 1420 "node %d\n", tmpstat, node); 1421 if (!status) 1422 status = tmpstat; 1423 } 1424 } 1425 1426 if (status) 1427 mlog_errno(status); 1428 return status; 1429 } 1430 1431 static int dlm_request_join(struct dlm_ctxt *dlm, 1432 int node, 1433 enum dlm_query_join_response_code *response) 1434 { 1435 int status; 1436 struct dlm_query_join_request join_msg; 1437 struct dlm_query_join_packet packet; 1438 u32 join_resp; 1439 1440 mlog(0, "querying node %d\n", node); 1441 1442 memset(&join_msg, 0, sizeof(join_msg)); 1443 join_msg.node_idx = dlm->node_num; 1444 join_msg.name_len = strlen(dlm->name); 1445 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 1446 join_msg.dlm_proto = dlm->dlm_locking_proto; 1447 join_msg.fs_proto = dlm->fs_locking_proto; 1448 1449 /* copy live node map to join message */ 1450 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); 1451 1452 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 1453 sizeof(join_msg), node, &join_resp); 1454 if (status < 0 && status != -ENOPROTOOPT) { 1455 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1456 "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1457 node); 1458 goto bail; 1459 } 1460 dlm_query_join_wire_to_packet(join_resp, &packet); 1461 1462 /* -ENOPROTOOPT from the net code means the other side isn't 1463 listening for our message type -- that's fine, it means 1464 his dlm isn't up, so we can consider him a 'yes' but not 1465 joined into the domain. */ 1466 if (status == -ENOPROTOOPT) { 1467 status = 0; 1468 *response = JOIN_OK_NO_MAP; 1469 } else { 1470 *response = packet.code; 1471 switch (packet.code) { 1472 case JOIN_DISALLOW: 1473 case JOIN_OK_NO_MAP: 1474 break; 1475 case JOIN_PROTOCOL_MISMATCH: 1476 mlog(ML_NOTICE, 1477 "This node requested DLM locking protocol %u.%u and " 1478 "filesystem locking protocol %u.%u. At least one of " 1479 "the protocol versions on node %d is not compatible, " 1480 "disconnecting\n", 1481 dlm->dlm_locking_proto.pv_major, 1482 dlm->dlm_locking_proto.pv_minor, 1483 dlm->fs_locking_proto.pv_major, 1484 dlm->fs_locking_proto.pv_minor, 1485 node); 1486 status = -EPROTO; 1487 break; 1488 case JOIN_OK: 1489 /* Use the same locking protocol as the remote node */ 1490 dlm->dlm_locking_proto.pv_minor = packet.dlm_minor; 1491 dlm->fs_locking_proto.pv_minor = packet.fs_minor; 1492 mlog(0, 1493 "Node %d responds JOIN_OK with DLM locking protocol " 1494 "%u.%u and fs locking protocol %u.%u\n", 1495 node, 1496 dlm->dlm_locking_proto.pv_major, 1497 dlm->dlm_locking_proto.pv_minor, 1498 dlm->fs_locking_proto.pv_major, 1499 dlm->fs_locking_proto.pv_minor); 1500 break; 1501 default: 1502 status = -EINVAL; 1503 mlog(ML_ERROR, "invalid response %d from node %u\n", 1504 packet.code, node); 1505 /* Reset response to JOIN_DISALLOW */ 1506 *response = JOIN_DISALLOW; 1507 break; 1508 } 1509 } 1510 1511 mlog(0, "status %d, node %d response is %d\n", status, node, 1512 *response); 1513 1514 bail: 1515 return status; 1516 } 1517 1518 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm, 1519 unsigned int node) 1520 { 1521 int status; 1522 int ret; 1523 struct dlm_assert_joined assert_msg; 1524 1525 mlog(0, "Sending join assert to node %u\n", node); 1526 1527 memset(&assert_msg, 0, sizeof(assert_msg)); 1528 assert_msg.node_idx = dlm->node_num; 1529 assert_msg.name_len = strlen(dlm->name); 1530 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len); 1531 1532 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1533 &assert_msg, sizeof(assert_msg), node, 1534 &ret); 1535 if (status < 0) 1536 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1537 "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1538 node); 1539 else 1540 status = ret; 1541 1542 return status; 1543 } 1544 1545 static void dlm_send_join_asserts(struct dlm_ctxt *dlm, 1546 unsigned long *node_map) 1547 { 1548 int status, node, live; 1549 1550 status = 0; 1551 node = -1; 1552 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 1553 node + 1)) < O2NM_MAX_NODES) { 1554 if (node == dlm->node_num) 1555 continue; 1556 1557 do { 1558 /* It is very important that this message be 1559 * received so we spin until either the node 1560 * has died or it gets the message. */ 1561 status = dlm_send_one_join_assert(dlm, node); 1562 1563 spin_lock(&dlm->spinlock); 1564 live = test_bit(node, dlm->live_nodes_map); 1565 spin_unlock(&dlm->spinlock); 1566 1567 if (status) { 1568 mlog(ML_ERROR, "Error return %d asserting " 1569 "join on node %d\n", status, node); 1570 1571 /* give us some time between errors... */ 1572 if (live) 1573 msleep(DLM_DOMAIN_BACKOFF_MS); 1574 } 1575 } while (status && live); 1576 } 1577 } 1578 1579 struct domain_join_ctxt { 1580 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1581 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1582 }; 1583 1584 static int dlm_should_restart_join(struct dlm_ctxt *dlm, 1585 struct domain_join_ctxt *ctxt, 1586 enum dlm_query_join_response_code response) 1587 { 1588 int ret; 1589 1590 if (response == JOIN_DISALLOW) { 1591 mlog(0, "Latest response of disallow -- should restart\n"); 1592 return 1; 1593 } 1594 1595 spin_lock(&dlm->spinlock); 1596 /* For now, we restart the process if the node maps have 1597 * changed at all */ 1598 ret = memcmp(ctxt->live_map, dlm->live_nodes_map, 1599 sizeof(dlm->live_nodes_map)); 1600 spin_unlock(&dlm->spinlock); 1601 1602 if (ret) 1603 mlog(0, "Node maps changed -- should restart\n"); 1604 1605 return ret; 1606 } 1607 1608 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) 1609 { 1610 int status = 0, tmpstat, node; 1611 struct domain_join_ctxt *ctxt; 1612 enum dlm_query_join_response_code response = JOIN_DISALLOW; 1613 1614 mlog(0, "%p", dlm); 1615 1616 ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL); 1617 if (!ctxt) { 1618 status = -ENOMEM; 1619 mlog_errno(status); 1620 goto bail; 1621 } 1622 1623 /* group sem locking should work for us here -- we're already 1624 * registered for heartbeat events so filling this should be 1625 * atomic wrt getting those handlers called. */ 1626 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map)); 1627 1628 spin_lock(&dlm->spinlock); 1629 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map)); 1630 1631 __dlm_set_joining_node(dlm, dlm->node_num); 1632 1633 spin_unlock(&dlm->spinlock); 1634 1635 node = -1; 1636 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES, 1637 node + 1)) < O2NM_MAX_NODES) { 1638 if (node == dlm->node_num) 1639 continue; 1640 1641 status = dlm_request_join(dlm, node, &response); 1642 if (status < 0) { 1643 mlog_errno(status); 1644 goto bail; 1645 } 1646 1647 /* Ok, either we got a response or the node doesn't have a 1648 * dlm up. */ 1649 if (response == JOIN_OK) 1650 set_bit(node, ctxt->yes_resp_map); 1651 1652 if (dlm_should_restart_join(dlm, ctxt, response)) { 1653 status = -EAGAIN; 1654 goto bail; 1655 } 1656 } 1657 1658 mlog(0, "Yay, done querying nodes!\n"); 1659 1660 /* Yay, everyone agree's we can join the domain. My domain is 1661 * comprised of all nodes who were put in the 1662 * yes_resp_map. Copy that into our domain map and send a join 1663 * assert message to clean up everyone elses state. */ 1664 spin_lock(&dlm->spinlock); 1665 memcpy(dlm->domain_map, ctxt->yes_resp_map, 1666 sizeof(ctxt->yes_resp_map)); 1667 set_bit(dlm->node_num, dlm->domain_map); 1668 spin_unlock(&dlm->spinlock); 1669 1670 /* Support for global heartbeat and node info was added in 1.1 */ 1671 if (dlm->dlm_locking_proto.pv_major > 1 || 1672 dlm->dlm_locking_proto.pv_minor > 0) { 1673 status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map); 1674 if (status) { 1675 mlog_errno(status); 1676 goto bail; 1677 } 1678 status = dlm_send_regions(dlm, ctxt->yes_resp_map); 1679 if (status) { 1680 mlog_errno(status); 1681 goto bail; 1682 } 1683 } 1684 1685 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 1686 1687 /* Joined state *must* be set before the joining node 1688 * information, otherwise the query_join handler may read no 1689 * current joiner but a state of NEW and tell joining nodes 1690 * we're not in the domain. */ 1691 spin_lock(&dlm_domain_lock); 1692 dlm->dlm_state = DLM_CTXT_JOINED; 1693 dlm->num_joins++; 1694 spin_unlock(&dlm_domain_lock); 1695 1696 bail: 1697 spin_lock(&dlm->spinlock); 1698 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 1699 if (!status) { 1700 printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name); 1701 __dlm_print_nodes(dlm); 1702 } 1703 spin_unlock(&dlm->spinlock); 1704 1705 if (ctxt) { 1706 /* Do we need to send a cancel message to any nodes? */ 1707 if (status < 0) { 1708 tmpstat = dlm_send_join_cancels(dlm, 1709 ctxt->yes_resp_map, 1710 sizeof(ctxt->yes_resp_map)); 1711 if (tmpstat < 0) 1712 mlog_errno(tmpstat); 1713 } 1714 kfree(ctxt); 1715 } 1716 1717 mlog(0, "returning %d\n", status); 1718 return status; 1719 } 1720 1721 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm) 1722 { 1723 o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_up); 1724 o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_down); 1725 o2net_unregister_handler_list(&dlm->dlm_domain_handlers); 1726 } 1727 1728 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) 1729 { 1730 int status; 1731 1732 mlog(0, "registering handlers.\n"); 1733 1734 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB, 1735 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI); 1736 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB, 1737 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI); 1738 1739 status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down); 1740 if (status) 1741 goto bail; 1742 1743 status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up); 1744 if (status) 1745 goto bail; 1746 1747 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1748 sizeof(struct dlm_master_request), 1749 dlm_master_request_handler, 1750 dlm, NULL, &dlm->dlm_domain_handlers); 1751 if (status) 1752 goto bail; 1753 1754 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1755 sizeof(struct dlm_assert_master), 1756 dlm_assert_master_handler, 1757 dlm, dlm_assert_master_post_handler, 1758 &dlm->dlm_domain_handlers); 1759 if (status) 1760 goto bail; 1761 1762 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1763 sizeof(struct dlm_create_lock), 1764 dlm_create_lock_handler, 1765 dlm, NULL, &dlm->dlm_domain_handlers); 1766 if (status) 1767 goto bail; 1768 1769 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1770 DLM_CONVERT_LOCK_MAX_LEN, 1771 dlm_convert_lock_handler, 1772 dlm, NULL, &dlm->dlm_domain_handlers); 1773 if (status) 1774 goto bail; 1775 1776 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1777 DLM_UNLOCK_LOCK_MAX_LEN, 1778 dlm_unlock_lock_handler, 1779 dlm, NULL, &dlm->dlm_domain_handlers); 1780 if (status) 1781 goto bail; 1782 1783 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1784 DLM_PROXY_AST_MAX_LEN, 1785 dlm_proxy_ast_handler, 1786 dlm, NULL, &dlm->dlm_domain_handlers); 1787 if (status) 1788 goto bail; 1789 1790 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1791 sizeof(struct dlm_exit_domain), 1792 dlm_exit_domain_handler, 1793 dlm, NULL, &dlm->dlm_domain_handlers); 1794 if (status) 1795 goto bail; 1796 1797 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key, 1798 sizeof(struct dlm_deref_lockres), 1799 dlm_deref_lockres_handler, 1800 dlm, NULL, &dlm->dlm_domain_handlers); 1801 if (status) 1802 goto bail; 1803 1804 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1805 sizeof(struct dlm_migrate_request), 1806 dlm_migrate_request_handler, 1807 dlm, NULL, &dlm->dlm_domain_handlers); 1808 if (status) 1809 goto bail; 1810 1811 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1812 DLM_MIG_LOCKRES_MAX_LEN, 1813 dlm_mig_lockres_handler, 1814 dlm, NULL, &dlm->dlm_domain_handlers); 1815 if (status) 1816 goto bail; 1817 1818 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1819 sizeof(struct dlm_master_requery), 1820 dlm_master_requery_handler, 1821 dlm, NULL, &dlm->dlm_domain_handlers); 1822 if (status) 1823 goto bail; 1824 1825 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1826 sizeof(struct dlm_lock_request), 1827 dlm_request_all_locks_handler, 1828 dlm, NULL, &dlm->dlm_domain_handlers); 1829 if (status) 1830 goto bail; 1831 1832 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1833 sizeof(struct dlm_reco_data_done), 1834 dlm_reco_data_done_handler, 1835 dlm, NULL, &dlm->dlm_domain_handlers); 1836 if (status) 1837 goto bail; 1838 1839 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1840 sizeof(struct dlm_begin_reco), 1841 dlm_begin_reco_handler, 1842 dlm, NULL, &dlm->dlm_domain_handlers); 1843 if (status) 1844 goto bail; 1845 1846 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1847 sizeof(struct dlm_finalize_reco), 1848 dlm_finalize_reco_handler, 1849 dlm, NULL, &dlm->dlm_domain_handlers); 1850 if (status) 1851 goto bail; 1852 1853 status = o2net_register_handler(DLM_BEGIN_EXIT_DOMAIN_MSG, dlm->key, 1854 sizeof(struct dlm_exit_domain), 1855 dlm_begin_exit_domain_handler, 1856 dlm, NULL, &dlm->dlm_domain_handlers); 1857 if (status) 1858 goto bail; 1859 1860 status = o2net_register_handler(DLM_DEREF_LOCKRES_DONE, dlm->key, 1861 sizeof(struct dlm_deref_lockres_done), 1862 dlm_deref_lockres_done_handler, 1863 dlm, NULL, &dlm->dlm_domain_handlers); 1864 bail: 1865 if (status) 1866 dlm_unregister_domain_handlers(dlm); 1867 1868 return status; 1869 } 1870 1871 static int dlm_join_domain(struct dlm_ctxt *dlm) 1872 { 1873 int status; 1874 unsigned int backoff; 1875 unsigned int total_backoff = 0; 1876 char wq_name[O2NM_MAX_NAME_LEN]; 1877 1878 BUG_ON(!dlm); 1879 1880 mlog(0, "Join domain %s\n", dlm->name); 1881 1882 status = dlm_register_domain_handlers(dlm); 1883 if (status) { 1884 mlog_errno(status); 1885 goto bail; 1886 } 1887 1888 status = dlm_launch_thread(dlm); 1889 if (status < 0) { 1890 mlog_errno(status); 1891 goto bail; 1892 } 1893 1894 status = dlm_launch_recovery_thread(dlm); 1895 if (status < 0) { 1896 mlog_errno(status); 1897 goto bail; 1898 } 1899 1900 status = dlm_debug_init(dlm); 1901 if (status < 0) { 1902 mlog_errno(status); 1903 goto bail; 1904 } 1905 1906 snprintf(wq_name, O2NM_MAX_NAME_LEN, "dlm_wq-%s", dlm->name); 1907 dlm->dlm_worker = create_singlethread_workqueue(wq_name); 1908 if (!dlm->dlm_worker) { 1909 status = -ENOMEM; 1910 mlog_errno(status); 1911 goto bail; 1912 } 1913 1914 do { 1915 status = dlm_try_to_join_domain(dlm); 1916 1917 /* If we're racing another node to the join, then we 1918 * need to back off temporarily and let them 1919 * complete. */ 1920 #define DLM_JOIN_TIMEOUT_MSECS 90000 1921 if (status == -EAGAIN) { 1922 if (signal_pending(current)) { 1923 status = -ERESTARTSYS; 1924 goto bail; 1925 } 1926 1927 if (total_backoff > DLM_JOIN_TIMEOUT_MSECS) { 1928 status = -ERESTARTSYS; 1929 mlog(ML_NOTICE, "Timed out joining dlm domain " 1930 "%s after %u msecs\n", dlm->name, 1931 total_backoff); 1932 goto bail; 1933 } 1934 1935 /* 1936 * <chip> After you! 1937 * <dale> No, after you! 1938 * <chip> I insist! 1939 * <dale> But you first! 1940 * ... 1941 */ 1942 backoff = (unsigned int)(jiffies & 0x3); 1943 backoff *= DLM_DOMAIN_BACKOFF_MS; 1944 total_backoff += backoff; 1945 mlog(0, "backoff %d\n", backoff); 1946 msleep(backoff); 1947 } 1948 } while (status == -EAGAIN); 1949 1950 if (status < 0) { 1951 mlog_errno(status); 1952 goto bail; 1953 } 1954 1955 status = 0; 1956 bail: 1957 wake_up(&dlm_domain_events); 1958 1959 if (status) { 1960 dlm_unregister_domain_handlers(dlm); 1961 dlm_debug_shutdown(dlm); 1962 dlm_complete_thread(dlm); 1963 dlm_complete_recovery_thread(dlm); 1964 dlm_destroy_dlm_worker(dlm); 1965 } 1966 1967 return status; 1968 } 1969 1970 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, 1971 u32 key) 1972 { 1973 int i; 1974 int ret; 1975 struct dlm_ctxt *dlm = NULL; 1976 1977 dlm = kzalloc(sizeof(*dlm), GFP_KERNEL); 1978 if (!dlm) { 1979 ret = -ENOMEM; 1980 mlog_errno(ret); 1981 goto leave; 1982 } 1983 1984 dlm->name = kstrdup(domain, GFP_KERNEL); 1985 if (dlm->name == NULL) { 1986 ret = -ENOMEM; 1987 mlog_errno(ret); 1988 goto leave; 1989 } 1990 1991 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES); 1992 if (!dlm->lockres_hash) { 1993 ret = -ENOMEM; 1994 mlog_errno(ret); 1995 goto leave; 1996 } 1997 1998 for (i = 0; i < DLM_HASH_BUCKETS; i++) 1999 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); 2000 2001 dlm->master_hash = (struct hlist_head **) 2002 dlm_alloc_pagevec(DLM_HASH_PAGES); 2003 if (!dlm->master_hash) { 2004 ret = -ENOMEM; 2005 mlog_errno(ret); 2006 goto leave; 2007 } 2008 2009 for (i = 0; i < DLM_HASH_BUCKETS; i++) 2010 INIT_HLIST_HEAD(dlm_master_hash(dlm, i)); 2011 2012 dlm->key = key; 2013 dlm->node_num = o2nm_this_node(); 2014 2015 ret = dlm_create_debugfs_subroot(dlm); 2016 if (ret < 0) 2017 goto leave; 2018 2019 spin_lock_init(&dlm->spinlock); 2020 spin_lock_init(&dlm->master_lock); 2021 spin_lock_init(&dlm->ast_lock); 2022 spin_lock_init(&dlm->track_lock); 2023 INIT_LIST_HEAD(&dlm->list); 2024 INIT_LIST_HEAD(&dlm->dirty_list); 2025 INIT_LIST_HEAD(&dlm->reco.resources); 2026 INIT_LIST_HEAD(&dlm->reco.node_data); 2027 INIT_LIST_HEAD(&dlm->purge_list); 2028 INIT_LIST_HEAD(&dlm->dlm_domain_handlers); 2029 INIT_LIST_HEAD(&dlm->tracking_list); 2030 dlm->reco.state = 0; 2031 2032 INIT_LIST_HEAD(&dlm->pending_asts); 2033 INIT_LIST_HEAD(&dlm->pending_basts); 2034 2035 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n", 2036 dlm->recovery_map, &(dlm->recovery_map[0])); 2037 2038 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map)); 2039 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map)); 2040 memset(dlm->domain_map, 0, sizeof(dlm->domain_map)); 2041 2042 dlm->dlm_thread_task = NULL; 2043 dlm->dlm_reco_thread_task = NULL; 2044 dlm->dlm_worker = NULL; 2045 init_waitqueue_head(&dlm->dlm_thread_wq); 2046 init_waitqueue_head(&dlm->dlm_reco_thread_wq); 2047 init_waitqueue_head(&dlm->reco.event); 2048 init_waitqueue_head(&dlm->ast_wq); 2049 init_waitqueue_head(&dlm->migration_wq); 2050 INIT_LIST_HEAD(&dlm->mle_hb_events); 2051 2052 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN; 2053 init_waitqueue_head(&dlm->dlm_join_events); 2054 2055 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 2056 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 2057 2058 atomic_set(&dlm->res_tot_count, 0); 2059 atomic_set(&dlm->res_cur_count, 0); 2060 for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) { 2061 atomic_set(&dlm->mle_tot_count[i], 0); 2062 atomic_set(&dlm->mle_cur_count[i], 0); 2063 } 2064 2065 spin_lock_init(&dlm->work_lock); 2066 INIT_LIST_HEAD(&dlm->work_list); 2067 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work); 2068 2069 kref_init(&dlm->dlm_refs); 2070 dlm->dlm_state = DLM_CTXT_NEW; 2071 2072 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks); 2073 2074 mlog(0, "context init: refcount %u\n", 2075 atomic_read(&dlm->dlm_refs.refcount)); 2076 2077 leave: 2078 if (ret < 0 && dlm) { 2079 if (dlm->master_hash) 2080 dlm_free_pagevec((void **)dlm->master_hash, 2081 DLM_HASH_PAGES); 2082 2083 if (dlm->lockres_hash) 2084 dlm_free_pagevec((void **)dlm->lockres_hash, 2085 DLM_HASH_PAGES); 2086 2087 kfree(dlm->name); 2088 kfree(dlm); 2089 dlm = NULL; 2090 } 2091 return dlm; 2092 } 2093 2094 /* 2095 * Compare a requested locking protocol version against the current one. 2096 * 2097 * If the major numbers are different, they are incompatible. 2098 * If the current minor is greater than the request, they are incompatible. 2099 * If the current minor is less than or equal to the request, they are 2100 * compatible, and the requester should run at the current minor version. 2101 */ 2102 static int dlm_protocol_compare(struct dlm_protocol_version *existing, 2103 struct dlm_protocol_version *request) 2104 { 2105 if (existing->pv_major != request->pv_major) 2106 return 1; 2107 2108 if (existing->pv_minor > request->pv_minor) 2109 return 1; 2110 2111 if (existing->pv_minor < request->pv_minor) 2112 request->pv_minor = existing->pv_minor; 2113 2114 return 0; 2115 } 2116 2117 /* 2118 * dlm_register_domain: one-time setup per "domain". 2119 * 2120 * The filesystem passes in the requested locking version via proto. 2121 * If registration was successful, proto will contain the negotiated 2122 * locking protocol. 2123 */ 2124 struct dlm_ctxt * dlm_register_domain(const char *domain, 2125 u32 key, 2126 struct dlm_protocol_version *fs_proto) 2127 { 2128 int ret; 2129 struct dlm_ctxt *dlm = NULL; 2130 struct dlm_ctxt *new_ctxt = NULL; 2131 2132 if (strlen(domain) >= O2NM_MAX_NAME_LEN) { 2133 ret = -ENAMETOOLONG; 2134 mlog(ML_ERROR, "domain name length too long\n"); 2135 goto leave; 2136 } 2137 2138 mlog(0, "register called for domain \"%s\"\n", domain); 2139 2140 retry: 2141 dlm = NULL; 2142 if (signal_pending(current)) { 2143 ret = -ERESTARTSYS; 2144 mlog_errno(ret); 2145 goto leave; 2146 } 2147 2148 spin_lock(&dlm_domain_lock); 2149 2150 dlm = __dlm_lookup_domain(domain); 2151 if (dlm) { 2152 if (dlm->dlm_state != DLM_CTXT_JOINED) { 2153 spin_unlock(&dlm_domain_lock); 2154 2155 mlog(0, "This ctxt is not joined yet!\n"); 2156 wait_event_interruptible(dlm_domain_events, 2157 dlm_wait_on_domain_helper( 2158 domain)); 2159 goto retry; 2160 } 2161 2162 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) { 2163 spin_unlock(&dlm_domain_lock); 2164 mlog(ML_ERROR, 2165 "Requested locking protocol version is not " 2166 "compatible with already registered domain " 2167 "\"%s\"\n", domain); 2168 ret = -EPROTO; 2169 goto leave; 2170 } 2171 2172 __dlm_get(dlm); 2173 dlm->num_joins++; 2174 2175 spin_unlock(&dlm_domain_lock); 2176 2177 ret = 0; 2178 goto leave; 2179 } 2180 2181 /* doesn't exist */ 2182 if (!new_ctxt) { 2183 spin_unlock(&dlm_domain_lock); 2184 2185 new_ctxt = dlm_alloc_ctxt(domain, key); 2186 if (new_ctxt) 2187 goto retry; 2188 2189 ret = -ENOMEM; 2190 mlog_errno(ret); 2191 goto leave; 2192 } 2193 2194 /* a little variable switch-a-roo here... */ 2195 dlm = new_ctxt; 2196 new_ctxt = NULL; 2197 2198 /* add the new domain */ 2199 list_add_tail(&dlm->list, &dlm_domains); 2200 spin_unlock(&dlm_domain_lock); 2201 2202 /* 2203 * Pass the locking protocol version into the join. If the join 2204 * succeeds, it will have the negotiated protocol set. 2205 */ 2206 dlm->dlm_locking_proto = dlm_protocol; 2207 dlm->fs_locking_proto = *fs_proto; 2208 2209 ret = dlm_join_domain(dlm); 2210 if (ret) { 2211 mlog_errno(ret); 2212 dlm_put(dlm); 2213 goto leave; 2214 } 2215 2216 /* Tell the caller what locking protocol we negotiated */ 2217 *fs_proto = dlm->fs_locking_proto; 2218 2219 ret = 0; 2220 leave: 2221 if (new_ctxt) 2222 dlm_free_ctxt_mem(new_ctxt); 2223 2224 if (ret < 0) 2225 dlm = ERR_PTR(ret); 2226 2227 return dlm; 2228 } 2229 EXPORT_SYMBOL_GPL(dlm_register_domain); 2230 2231 static LIST_HEAD(dlm_join_handlers); 2232 2233 static void dlm_unregister_net_handlers(void) 2234 { 2235 o2net_unregister_handler_list(&dlm_join_handlers); 2236 } 2237 2238 static int dlm_register_net_handlers(void) 2239 { 2240 int status = 0; 2241 2242 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 2243 sizeof(struct dlm_query_join_request), 2244 dlm_query_join_handler, 2245 NULL, NULL, &dlm_join_handlers); 2246 if (status) 2247 goto bail; 2248 2249 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 2250 sizeof(struct dlm_assert_joined), 2251 dlm_assert_joined_handler, 2252 NULL, NULL, &dlm_join_handlers); 2253 if (status) 2254 goto bail; 2255 2256 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 2257 sizeof(struct dlm_cancel_join), 2258 dlm_cancel_join_handler, 2259 NULL, NULL, &dlm_join_handlers); 2260 if (status) 2261 goto bail; 2262 2263 status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY, 2264 sizeof(struct dlm_query_region), 2265 dlm_query_region_handler, 2266 NULL, NULL, &dlm_join_handlers); 2267 2268 if (status) 2269 goto bail; 2270 2271 status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY, 2272 sizeof(struct dlm_query_nodeinfo), 2273 dlm_query_nodeinfo_handler, 2274 NULL, NULL, &dlm_join_handlers); 2275 bail: 2276 if (status < 0) 2277 dlm_unregister_net_handlers(); 2278 2279 return status; 2280 } 2281 2282 /* Domain eviction callback handling. 2283 * 2284 * The file system requires notification of node death *before* the 2285 * dlm completes it's recovery work, otherwise it may be able to 2286 * acquire locks on resources requiring recovery. Since the dlm can 2287 * evict a node from it's domain *before* heartbeat fires, a similar 2288 * mechanism is required. */ 2289 2290 /* Eviction is not expected to happen often, so a per-domain lock is 2291 * not necessary. Eviction callbacks are allowed to sleep for short 2292 * periods of time. */ 2293 static DECLARE_RWSEM(dlm_callback_sem); 2294 2295 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm, 2296 int node_num) 2297 { 2298 struct dlm_eviction_cb *cb; 2299 2300 down_read(&dlm_callback_sem); 2301 list_for_each_entry(cb, &dlm->dlm_eviction_callbacks, ec_item) { 2302 cb->ec_func(node_num, cb->ec_data); 2303 } 2304 up_read(&dlm_callback_sem); 2305 } 2306 2307 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb, 2308 dlm_eviction_func *f, 2309 void *data) 2310 { 2311 INIT_LIST_HEAD(&cb->ec_item); 2312 cb->ec_func = f; 2313 cb->ec_data = data; 2314 } 2315 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb); 2316 2317 void dlm_register_eviction_cb(struct dlm_ctxt *dlm, 2318 struct dlm_eviction_cb *cb) 2319 { 2320 down_write(&dlm_callback_sem); 2321 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks); 2322 up_write(&dlm_callback_sem); 2323 } 2324 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb); 2325 2326 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb) 2327 { 2328 down_write(&dlm_callback_sem); 2329 list_del_init(&cb->ec_item); 2330 up_write(&dlm_callback_sem); 2331 } 2332 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb); 2333 2334 static int __init dlm_init(void) 2335 { 2336 int status; 2337 2338 status = dlm_init_mle_cache(); 2339 if (status) { 2340 mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n"); 2341 goto error; 2342 } 2343 2344 status = dlm_init_master_caches(); 2345 if (status) { 2346 mlog(ML_ERROR, "Could not create o2dlm_lockres and " 2347 "o2dlm_lockname slabcaches\n"); 2348 goto error; 2349 } 2350 2351 status = dlm_init_lock_cache(); 2352 if (status) { 2353 mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n"); 2354 goto error; 2355 } 2356 2357 status = dlm_register_net_handlers(); 2358 if (status) { 2359 mlog(ML_ERROR, "Unable to register network handlers\n"); 2360 goto error; 2361 } 2362 2363 status = dlm_create_debugfs_root(); 2364 if (status) 2365 goto error; 2366 2367 return 0; 2368 error: 2369 dlm_unregister_net_handlers(); 2370 dlm_destroy_lock_cache(); 2371 dlm_destroy_master_caches(); 2372 dlm_destroy_mle_cache(); 2373 return -1; 2374 } 2375 2376 static void __exit dlm_exit (void) 2377 { 2378 dlm_destroy_debugfs_root(); 2379 dlm_unregister_net_handlers(); 2380 dlm_destroy_lock_cache(); 2381 dlm_destroy_master_caches(); 2382 dlm_destroy_mle_cache(); 2383 } 2384 2385 MODULE_AUTHOR("Oracle"); 2386 MODULE_LICENSE("GPL"); 2387 MODULE_DESCRIPTION("OCFS2 Distributed Lock Management"); 2388 2389 module_init(dlm_init); 2390 module_exit(dlm_exit); 2391