1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public 17 * License along with this program; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 19 * Boston, MA 021110-1307, USA. 20 */ 21 22 #include <linux/kernel.h> 23 #include <linux/sched.h> 24 #include <linux/jiffies.h> 25 #include <linux/module.h> 26 #include <linux/fs.h> 27 #include <linux/bio.h> 28 #include <linux/blkdev.h> 29 #include <linux/delay.h> 30 #include <linux/file.h> 31 #include <linux/kthread.h> 32 #include <linux/configfs.h> 33 #include <linux/random.h> 34 #include <linux/crc32.h> 35 #include <linux/time.h> 36 37 #include "heartbeat.h" 38 #include "tcp.h" 39 #include "nodemanager.h" 40 #include "quorum.h" 41 42 #include "masklog.h" 43 44 45 /* 46 * The first heartbeat pass had one global thread that would serialize all hb 47 * callback calls. This global serializing sem should only be removed once 48 * we've made sure that all callees can deal with being called concurrently 49 * from multiple hb region threads. 50 */ 51 static DECLARE_RWSEM(o2hb_callback_sem); 52 53 /* 54 * multiple hb threads are watching multiple regions. A node is live 55 * whenever any of the threads sees activity from the node in its region. 56 */ 57 static spinlock_t o2hb_live_lock = SPIN_LOCK_UNLOCKED; 58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 60 static LIST_HEAD(o2hb_node_events); 61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 62 63 static LIST_HEAD(o2hb_all_regions); 64 65 static struct o2hb_callback { 66 struct list_head list; 67 } o2hb_callbacks[O2HB_NUM_CB]; 68 69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 70 71 #define O2HB_DEFAULT_BLOCK_BITS 9 72 73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 74 75 /* Only sets a new threshold if there are no active regions. 76 * 77 * No locking or otherwise interesting code is required for reading 78 * o2hb_dead_threshold as it can't change once regions are active and 79 * it's not interesting to anyone until then anyway. */ 80 static void o2hb_dead_threshold_set(unsigned int threshold) 81 { 82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 83 spin_lock(&o2hb_live_lock); 84 if (list_empty(&o2hb_all_regions)) 85 o2hb_dead_threshold = threshold; 86 spin_unlock(&o2hb_live_lock); 87 } 88 } 89 90 struct o2hb_node_event { 91 struct list_head hn_item; 92 enum o2hb_callback_type hn_event_type; 93 struct o2nm_node *hn_node; 94 int hn_node_num; 95 }; 96 97 struct o2hb_disk_slot { 98 struct o2hb_disk_heartbeat_block *ds_raw_block; 99 u8 ds_node_num; 100 u64 ds_last_time; 101 u64 ds_last_generation; 102 u16 ds_equal_samples; 103 u16 ds_changed_samples; 104 struct list_head ds_live_item; 105 }; 106 107 /* each thread owns a region.. when we're asked to tear down the region 108 * we ask the thread to stop, who cleans up the region */ 109 struct o2hb_region { 110 struct config_item hr_item; 111 112 struct list_head hr_all_item; 113 unsigned hr_unclean_stop:1; 114 115 /* protected by the hr_callback_sem */ 116 struct task_struct *hr_task; 117 118 unsigned int hr_blocks; 119 unsigned long long hr_start_block; 120 121 unsigned int hr_block_bits; 122 unsigned int hr_block_bytes; 123 124 unsigned int hr_slots_per_page; 125 unsigned int hr_num_pages; 126 127 struct page **hr_slot_data; 128 struct block_device *hr_bdev; 129 struct o2hb_disk_slot *hr_slots; 130 131 /* let the person setting up hb wait for it to return until it 132 * has reached a 'steady' state. This will be fixed when we have 133 * a more complete api that doesn't lead to this sort of fragility. */ 134 atomic_t hr_steady_iterations; 135 136 char hr_dev_name[BDEVNAME_SIZE]; 137 138 unsigned int hr_timeout_ms; 139 140 /* randomized as the region goes up and down so that a node 141 * recognizes a node going up and down in one iteration */ 142 u64 hr_generation; 143 144 struct work_struct hr_write_timeout_work; 145 unsigned long hr_last_timeout_start; 146 147 /* Used during o2hb_check_slot to hold a copy of the block 148 * being checked because we temporarily have to zero out the 149 * crc field. */ 150 struct o2hb_disk_heartbeat_block *hr_tmp_block; 151 }; 152 153 struct o2hb_bio_wait_ctxt { 154 atomic_t wc_num_reqs; 155 struct completion wc_io_complete; 156 }; 157 158 static void o2hb_write_timeout(void *arg) 159 { 160 struct o2hb_region *reg = arg; 161 162 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 163 "milliseconds\n", reg->hr_dev_name, 164 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 165 o2quo_disk_timeout(); 166 } 167 168 static void o2hb_arm_write_timeout(struct o2hb_region *reg) 169 { 170 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); 171 172 cancel_delayed_work(®->hr_write_timeout_work); 173 reg->hr_last_timeout_start = jiffies; 174 schedule_delayed_work(®->hr_write_timeout_work, 175 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 176 } 177 178 static void o2hb_disarm_write_timeout(struct o2hb_region *reg) 179 { 180 cancel_delayed_work(®->hr_write_timeout_work); 181 flush_scheduled_work(); 182 } 183 184 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, 185 unsigned int num_ios) 186 { 187 atomic_set(&wc->wc_num_reqs, num_ios); 188 init_completion(&wc->wc_io_complete); 189 } 190 191 /* Used in error paths too */ 192 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 193 unsigned int num) 194 { 195 /* sadly atomic_sub_and_test() isn't available on all platforms. The 196 * good news is that the fast path only completes one at a time */ 197 while(num--) { 198 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 199 BUG_ON(num > 0); 200 complete(&wc->wc_io_complete); 201 } 202 } 203 } 204 205 static void o2hb_wait_on_io(struct o2hb_region *reg, 206 struct o2hb_bio_wait_ctxt *wc) 207 { 208 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 209 210 blk_run_address_space(mapping); 211 212 wait_for_completion(&wc->wc_io_complete); 213 } 214 215 static int o2hb_bio_end_io(struct bio *bio, 216 unsigned int bytes_done, 217 int error) 218 { 219 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 220 221 if (error) 222 mlog(ML_ERROR, "IO Error %d\n", error); 223 224 if (bio->bi_size) 225 return 1; 226 227 o2hb_bio_wait_dec(wc, 1); 228 return 0; 229 } 230 231 /* Setup a Bio to cover I/O against num_slots slots starting at 232 * start_slot. */ 233 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 234 struct o2hb_bio_wait_ctxt *wc, 235 unsigned int start_slot, 236 unsigned int num_slots) 237 { 238 int i, nr_vecs, len, first_page, last_page; 239 unsigned int vec_len, vec_start; 240 unsigned int bits = reg->hr_block_bits; 241 unsigned int spp = reg->hr_slots_per_page; 242 struct bio *bio; 243 struct page *page; 244 245 nr_vecs = (num_slots + spp - 1) / spp; 246 247 /* Testing has shown this allocation to take long enough under 248 * GFP_KERNEL that the local node can get fenced. It would be 249 * nicest if we could pre-allocate these bios and avoid this 250 * all together. */ 251 bio = bio_alloc(GFP_ATOMIC, nr_vecs); 252 if (!bio) { 253 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 254 bio = ERR_PTR(-ENOMEM); 255 goto bail; 256 } 257 258 /* Must put everything in 512 byte sectors for the bio... */ 259 bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); 260 bio->bi_bdev = reg->hr_bdev; 261 bio->bi_private = wc; 262 bio->bi_end_io = o2hb_bio_end_io; 263 264 first_page = start_slot / spp; 265 last_page = first_page + nr_vecs; 266 vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; 267 for(i = first_page; i < last_page; i++) { 268 page = reg->hr_slot_data[i]; 269 270 vec_len = PAGE_CACHE_SIZE; 271 /* last page might be short */ 272 if (((i + 1) * spp) > (start_slot + num_slots)) 273 vec_len = ((num_slots + start_slot) % spp) << bits; 274 vec_len -= vec_start; 275 276 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 277 i, vec_len, vec_start); 278 279 len = bio_add_page(bio, page, vec_len, vec_start); 280 if (len != vec_len) { 281 bio_put(bio); 282 bio = ERR_PTR(-EIO); 283 284 mlog(ML_ERROR, "Error adding page to bio i = %d, " 285 "vec_len = %u, len = %d\n, start = %u\n", 286 i, vec_len, len, vec_start); 287 goto bail; 288 } 289 290 vec_start = 0; 291 } 292 293 bail: 294 return bio; 295 } 296 297 /* 298 * Compute the maximum number of sectors the bdev can handle in one bio, 299 * as a power of two. 300 * 301 * Stolen from oracleasm, thanks Joel! 302 */ 303 static int compute_max_sectors(struct block_device *bdev) 304 { 305 int max_pages, max_sectors, pow_two_sectors; 306 307 struct request_queue *q; 308 309 q = bdev_get_queue(bdev); 310 max_pages = q->max_sectors >> (PAGE_SHIFT - 9); 311 if (max_pages > BIO_MAX_PAGES) 312 max_pages = BIO_MAX_PAGES; 313 if (max_pages > q->max_phys_segments) 314 max_pages = q->max_phys_segments; 315 if (max_pages > q->max_hw_segments) 316 max_pages = q->max_hw_segments; 317 max_pages--; /* Handle I/Os that straddle a page */ 318 319 max_sectors = max_pages << (PAGE_SHIFT - 9); 320 321 /* Why is fls() 1-based???? */ 322 pow_two_sectors = 1 << (fls(max_sectors) - 1); 323 324 return pow_two_sectors; 325 } 326 327 static inline void o2hb_compute_request_limits(struct o2hb_region *reg, 328 unsigned int num_slots, 329 unsigned int *num_bios, 330 unsigned int *slots_per_bio) 331 { 332 unsigned int max_sectors, io_sectors; 333 334 max_sectors = compute_max_sectors(reg->hr_bdev); 335 336 io_sectors = num_slots << (reg->hr_block_bits - 9); 337 338 *num_bios = (io_sectors + max_sectors - 1) / max_sectors; 339 *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9); 340 341 mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This " 342 "device can handle %u sectors of I/O\n", io_sectors, num_slots, 343 max_sectors); 344 mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n", 345 *num_bios, *slots_per_bio); 346 } 347 348 static int o2hb_read_slots(struct o2hb_region *reg, 349 unsigned int max_slots) 350 { 351 unsigned int num_bios, slots_per_bio, start_slot, num_slots; 352 int i, status; 353 struct o2hb_bio_wait_ctxt wc; 354 struct bio **bios; 355 struct bio *bio; 356 357 o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); 358 359 bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); 360 if (!bios) { 361 status = -ENOMEM; 362 mlog_errno(status); 363 return status; 364 } 365 366 o2hb_bio_wait_init(&wc, num_bios); 367 368 num_slots = slots_per_bio; 369 for(i = 0; i < num_bios; i++) { 370 start_slot = i * slots_per_bio; 371 372 /* adjust num_slots at last bio */ 373 if (max_slots < (start_slot + num_slots)) 374 num_slots = max_slots - start_slot; 375 376 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots); 377 if (IS_ERR(bio)) { 378 o2hb_bio_wait_dec(&wc, num_bios - i); 379 380 status = PTR_ERR(bio); 381 mlog_errno(status); 382 goto bail_and_wait; 383 } 384 bios[i] = bio; 385 386 submit_bio(READ, bio); 387 } 388 389 status = 0; 390 391 bail_and_wait: 392 o2hb_wait_on_io(reg, &wc); 393 394 if (bios) { 395 for(i = 0; i < num_bios; i++) 396 if (bios[i]) 397 bio_put(bios[i]); 398 kfree(bios); 399 } 400 401 return status; 402 } 403 404 static int o2hb_issue_node_write(struct o2hb_region *reg, 405 struct bio **write_bio, 406 struct o2hb_bio_wait_ctxt *write_wc) 407 { 408 int status; 409 unsigned int slot; 410 struct bio *bio; 411 412 o2hb_bio_wait_init(write_wc, 1); 413 414 slot = o2nm_this_node(); 415 416 bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); 417 if (IS_ERR(bio)) { 418 status = PTR_ERR(bio); 419 mlog_errno(status); 420 goto bail; 421 } 422 423 submit_bio(WRITE, bio); 424 425 *write_bio = bio; 426 status = 0; 427 bail: 428 return status; 429 } 430 431 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 432 struct o2hb_disk_heartbeat_block *hb_block) 433 { 434 __le32 old_cksum; 435 u32 ret; 436 437 /* We want to compute the block crc with a 0 value in the 438 * hb_cksum field. Save it off here and replace after the 439 * crc. */ 440 old_cksum = hb_block->hb_cksum; 441 hb_block->hb_cksum = 0; 442 443 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 444 445 hb_block->hb_cksum = old_cksum; 446 447 return ret; 448 } 449 450 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 451 { 452 mlog(ML_ERROR, "Dump slot information: seq = 0x%"MLFx64", node = %u, " 453 "cksum = 0x%x, generation 0x%"MLFx64"\n", 454 le64_to_cpu(hb_block->hb_seq), hb_block->hb_node, 455 le32_to_cpu(hb_block->hb_cksum), 456 le64_to_cpu(hb_block->hb_generation)); 457 } 458 459 static int o2hb_verify_crc(struct o2hb_region *reg, 460 struct o2hb_disk_heartbeat_block *hb_block) 461 { 462 u32 read, computed; 463 464 read = le32_to_cpu(hb_block->hb_cksum); 465 computed = o2hb_compute_block_crc_le(reg, hb_block); 466 467 return read == computed; 468 } 469 470 /* We want to make sure that nobody is heartbeating on top of us -- 471 * this will help detect an invalid configuration. */ 472 static int o2hb_check_last_timestamp(struct o2hb_region *reg) 473 { 474 int node_num, ret; 475 struct o2hb_disk_slot *slot; 476 struct o2hb_disk_heartbeat_block *hb_block; 477 478 node_num = o2nm_this_node(); 479 480 ret = 1; 481 slot = ®->hr_slots[node_num]; 482 /* Don't check on our 1st timestamp */ 483 if (slot->ds_last_time) { 484 hb_block = slot->ds_raw_block; 485 486 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) 487 ret = 0; 488 } 489 490 return ret; 491 } 492 493 static inline void o2hb_prepare_block(struct o2hb_region *reg, 494 u64 generation) 495 { 496 int node_num; 497 u64 cputime; 498 struct o2hb_disk_slot *slot; 499 struct o2hb_disk_heartbeat_block *hb_block; 500 501 node_num = o2nm_this_node(); 502 slot = ®->hr_slots[node_num]; 503 504 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 505 memset(hb_block, 0, reg->hr_block_bytes); 506 /* TODO: time stuff */ 507 cputime = CURRENT_TIME.tv_sec; 508 if (!cputime) 509 cputime = 1; 510 511 hb_block->hb_seq = cpu_to_le64(cputime); 512 hb_block->hb_node = node_num; 513 hb_block->hb_generation = cpu_to_le64(generation); 514 515 /* This step must always happen last! */ 516 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 517 hb_block)); 518 519 mlog(ML_HB_BIO, "our node generation = 0x%"MLFx64", cksum = 0x%x\n", 520 cpu_to_le64(generation), le32_to_cpu(hb_block->hb_cksum)); 521 } 522 523 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 524 struct o2nm_node *node, 525 int idx) 526 { 527 struct list_head *iter; 528 struct o2hb_callback_func *f; 529 530 list_for_each(iter, &hbcall->list) { 531 f = list_entry(iter, struct o2hb_callback_func, hc_item); 532 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 533 (f->hc_func)(node, idx, f->hc_data); 534 } 535 } 536 537 /* Will run the list in order until we process the passed event */ 538 static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 539 { 540 int empty; 541 struct o2hb_callback *hbcall; 542 struct o2hb_node_event *event; 543 544 spin_lock(&o2hb_live_lock); 545 empty = list_empty(&queued_event->hn_item); 546 spin_unlock(&o2hb_live_lock); 547 if (empty) 548 return; 549 550 /* Holding callback sem assures we don't alter the callback 551 * lists when doing this, and serializes ourselves with other 552 * processes wanting callbacks. */ 553 down_write(&o2hb_callback_sem); 554 555 spin_lock(&o2hb_live_lock); 556 while (!list_empty(&o2hb_node_events) 557 && !list_empty(&queued_event->hn_item)) { 558 event = list_entry(o2hb_node_events.next, 559 struct o2hb_node_event, 560 hn_item); 561 list_del_init(&event->hn_item); 562 spin_unlock(&o2hb_live_lock); 563 564 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 565 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 566 event->hn_node_num); 567 568 hbcall = hbcall_from_type(event->hn_event_type); 569 570 /* We should *never* have gotten on to the list with a 571 * bad type... This isn't something that we should try 572 * to recover from. */ 573 BUG_ON(IS_ERR(hbcall)); 574 575 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 576 577 spin_lock(&o2hb_live_lock); 578 } 579 spin_unlock(&o2hb_live_lock); 580 581 up_write(&o2hb_callback_sem); 582 } 583 584 static void o2hb_queue_node_event(struct o2hb_node_event *event, 585 enum o2hb_callback_type type, 586 struct o2nm_node *node, 587 int node_num) 588 { 589 assert_spin_locked(&o2hb_live_lock); 590 591 event->hn_event_type = type; 592 event->hn_node = node; 593 event->hn_node_num = node_num; 594 595 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 596 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 597 598 list_add_tail(&event->hn_item, &o2hb_node_events); 599 } 600 601 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 602 { 603 struct o2hb_node_event event = 604 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 605 struct o2nm_node *node; 606 607 node = o2nm_get_node_by_num(slot->ds_node_num); 608 if (!node) 609 return; 610 611 spin_lock(&o2hb_live_lock); 612 if (!list_empty(&slot->ds_live_item)) { 613 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 614 slot->ds_node_num); 615 616 list_del_init(&slot->ds_live_item); 617 618 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 619 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 620 621 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 622 slot->ds_node_num); 623 } 624 } 625 spin_unlock(&o2hb_live_lock); 626 627 o2hb_run_event_list(&event); 628 629 o2nm_node_put(node); 630 } 631 632 static int o2hb_check_slot(struct o2hb_region *reg, 633 struct o2hb_disk_slot *slot) 634 { 635 int changed = 0, gen_changed = 0; 636 struct o2hb_node_event event = 637 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 638 struct o2nm_node *node; 639 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 640 u64 cputime; 641 642 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 643 644 /* Is this correct? Do we assume that the node doesn't exist 645 * if we're not configured for him? */ 646 node = o2nm_get_node_by_num(slot->ds_node_num); 647 if (!node) 648 return 0; 649 650 if (!o2hb_verify_crc(reg, hb_block)) { 651 /* all paths from here will drop o2hb_live_lock for 652 * us. */ 653 spin_lock(&o2hb_live_lock); 654 655 /* Don't print an error on the console in this case - 656 * a freshly formatted heartbeat area will not have a 657 * crc set on it. */ 658 if (list_empty(&slot->ds_live_item)) 659 goto out; 660 661 /* The node is live but pushed out a bad crc. We 662 * consider it a transient miss but don't populate any 663 * other values as they may be junk. */ 664 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 665 slot->ds_node_num, reg->hr_dev_name); 666 o2hb_dump_slot(hb_block); 667 668 slot->ds_equal_samples++; 669 goto fire_callbacks; 670 } 671 672 /* we don't care if these wrap.. the state transitions below 673 * clear at the right places */ 674 cputime = le64_to_cpu(hb_block->hb_seq); 675 if (slot->ds_last_time != cputime) 676 slot->ds_changed_samples++; 677 else 678 slot->ds_equal_samples++; 679 slot->ds_last_time = cputime; 680 681 /* The node changed heartbeat generations. We assume this to 682 * mean it dropped off but came back before we timed out. We 683 * want to consider it down for the time being but don't want 684 * to lose any changed_samples state we might build up to 685 * considering it live again. */ 686 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 687 gen_changed = 1; 688 slot->ds_equal_samples = 0; 689 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%"MLFx64" " 690 "to 0x%"MLFx64")\n", slot->ds_node_num, 691 slot->ds_last_generation, 692 le64_to_cpu(hb_block->hb_generation)); 693 } 694 695 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 696 697 mlog(ML_HEARTBEAT, "Slot %d gen 0x%"MLFx64" cksum 0x%x " 698 "seq %"MLFu64" last %"MLFu64" changed %u equal %u\n", 699 slot->ds_node_num, slot->ds_last_generation, 700 le32_to_cpu(hb_block->hb_cksum), le64_to_cpu(hb_block->hb_seq), 701 slot->ds_last_time, slot->ds_changed_samples, 702 slot->ds_equal_samples); 703 704 spin_lock(&o2hb_live_lock); 705 706 fire_callbacks: 707 /* dead nodes only come to life after some number of 708 * changes at any time during their dead time */ 709 if (list_empty(&slot->ds_live_item) && 710 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 711 mlog(ML_HEARTBEAT, "Node %d (id 0x%"MLFx64") joined my " 712 "region\n", slot->ds_node_num, slot->ds_last_generation); 713 714 /* first on the list generates a callback */ 715 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 716 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 717 718 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 719 slot->ds_node_num); 720 721 changed = 1; 722 } 723 724 list_add_tail(&slot->ds_live_item, 725 &o2hb_live_slots[slot->ds_node_num]); 726 727 slot->ds_equal_samples = 0; 728 goto out; 729 } 730 731 /* if the list is dead, we're done.. */ 732 if (list_empty(&slot->ds_live_item)) 733 goto out; 734 735 /* live nodes only go dead after enough consequtive missed 736 * samples.. reset the missed counter whenever we see 737 * activity */ 738 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 739 mlog(ML_HEARTBEAT, "Node %d left my region\n", 740 slot->ds_node_num); 741 742 /* last off the live_slot generates a callback */ 743 list_del_init(&slot->ds_live_item); 744 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 745 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 746 747 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 748 slot->ds_node_num); 749 750 changed = 1; 751 } 752 753 /* We don't clear this because the node is still 754 * actually writing new blocks. */ 755 if (!gen_changed) 756 slot->ds_changed_samples = 0; 757 goto out; 758 } 759 if (slot->ds_changed_samples) { 760 slot->ds_changed_samples = 0; 761 slot->ds_equal_samples = 0; 762 } 763 out: 764 spin_unlock(&o2hb_live_lock); 765 766 o2hb_run_event_list(&event); 767 768 o2nm_node_put(node); 769 return changed; 770 } 771 772 /* This could be faster if we just implmented a find_last_bit, but I 773 * don't think the circumstances warrant it. */ 774 static int o2hb_highest_node(unsigned long *nodes, 775 int numbits) 776 { 777 int highest, node; 778 779 highest = numbits; 780 node = -1; 781 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { 782 if (node >= numbits) 783 break; 784 785 highest = node; 786 } 787 788 return highest; 789 } 790 791 static void o2hb_do_disk_heartbeat(struct o2hb_region *reg) 792 { 793 int i, ret, highest_node, change = 0; 794 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 795 struct bio *write_bio; 796 struct o2hb_bio_wait_ctxt write_wc; 797 798 if (o2nm_configured_node_map(configured_nodes, sizeof(configured_nodes))) 799 return; 800 801 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 802 if (highest_node >= O2NM_MAX_NODES) { 803 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); 804 return; 805 } 806 807 /* No sense in reading the slots of nodes that don't exist 808 * yet. Of course, if the node definitions have holes in them 809 * then we're reading an empty slot anyway... Consider this 810 * best-effort. */ 811 ret = o2hb_read_slots(reg, highest_node + 1); 812 if (ret < 0) { 813 mlog_errno(ret); 814 return; 815 } 816 817 /* With an up to date view of the slots, we can check that no 818 * other node has been improperly configured to heartbeat in 819 * our slot. */ 820 if (!o2hb_check_last_timestamp(reg)) 821 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " 822 "in our slot!\n", reg->hr_dev_name); 823 824 /* fill in the proper info for our next heartbeat */ 825 o2hb_prepare_block(reg, reg->hr_generation); 826 827 /* And fire off the write. Note that we don't wait on this I/O 828 * until later. */ 829 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 830 if (ret < 0) { 831 mlog_errno(ret); 832 return; 833 } 834 835 i = -1; 836 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 837 838 change |= o2hb_check_slot(reg, ®->hr_slots[i]); 839 } 840 841 /* 842 * We have to be sure we've advertised ourselves on disk 843 * before we can go to steady state. This ensures that 844 * people we find in our steady state have seen us. 845 */ 846 o2hb_wait_on_io(reg, &write_wc); 847 bio_put(write_bio); 848 o2hb_arm_write_timeout(reg); 849 850 /* let the person who launched us know when things are steady */ 851 if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { 852 if (atomic_dec_and_test(®->hr_steady_iterations)) 853 wake_up(&o2hb_steady_queue); 854 } 855 } 856 857 /* Subtract b from a, storing the result in a. a *must* have a larger 858 * value than b. */ 859 static void o2hb_tv_subtract(struct timeval *a, 860 struct timeval *b) 861 { 862 /* just return 0 when a is after b */ 863 if (a->tv_sec < b->tv_sec || 864 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { 865 a->tv_sec = 0; 866 a->tv_usec = 0; 867 return; 868 } 869 870 a->tv_sec -= b->tv_sec; 871 a->tv_usec -= b->tv_usec; 872 while ( a->tv_usec < 0 ) { 873 a->tv_sec--; 874 a->tv_usec += 1000000; 875 } 876 } 877 878 static unsigned int o2hb_elapsed_msecs(struct timeval *start, 879 struct timeval *end) 880 { 881 struct timeval res = *end; 882 883 o2hb_tv_subtract(&res, start); 884 885 return res.tv_sec * 1000 + res.tv_usec / 1000; 886 } 887 888 /* 889 * we ride the region ref that the region dir holds. before the region 890 * dir is removed and drops it ref it will wait to tear down this 891 * thread. 892 */ 893 static int o2hb_thread(void *data) 894 { 895 int i, ret; 896 struct o2hb_region *reg = data; 897 struct bio *write_bio; 898 struct o2hb_bio_wait_ctxt write_wc; 899 struct timeval before_hb, after_hb; 900 unsigned int elapsed_msec; 901 902 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 903 904 set_user_nice(current, -20); 905 906 while (!kthread_should_stop() && !reg->hr_unclean_stop) { 907 /* We track the time spent inside 908 * o2hb_do_disk_heartbeat so that we avoid more then 909 * hr_timeout_ms between disk writes. On busy systems 910 * this should result in a heartbeat which is less 911 * likely to time itself out. */ 912 do_gettimeofday(&before_hb); 913 914 o2hb_do_disk_heartbeat(reg); 915 916 do_gettimeofday(&after_hb); 917 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); 918 919 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", 920 before_hb.tv_sec, before_hb.tv_usec, 921 after_hb.tv_sec, after_hb.tv_usec, elapsed_msec); 922 923 if (elapsed_msec < reg->hr_timeout_ms) { 924 /* the kthread api has blocked signals for us so no 925 * need to record the return value. */ 926 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 927 } 928 } 929 930 o2hb_disarm_write_timeout(reg); 931 932 /* unclean stop is only used in very bad situation */ 933 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 934 o2hb_shutdown_slot(®->hr_slots[i]); 935 936 /* Explicit down notification - avoid forcing the other nodes 937 * to timeout on this region when we could just as easily 938 * write a clear generation - thus indicating to them that 939 * this node has left this region. 940 * 941 * XXX: Should we skip this on unclean_stop? */ 942 o2hb_prepare_block(reg, 0); 943 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 944 if (ret == 0) { 945 o2hb_wait_on_io(reg, &write_wc); 946 bio_put(write_bio); 947 } else { 948 mlog_errno(ret); 949 } 950 951 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); 952 953 return 0; 954 } 955 956 void o2hb_init(void) 957 { 958 int i; 959 960 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 961 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 962 963 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 964 INIT_LIST_HEAD(&o2hb_live_slots[i]); 965 966 INIT_LIST_HEAD(&o2hb_node_events); 967 968 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 969 } 970 971 /* if we're already in a callback then we're already serialized by the sem */ 972 static void o2hb_fill_node_map_from_callback(unsigned long *map, 973 unsigned bytes) 974 { 975 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 976 977 memcpy(map, &o2hb_live_node_bitmap, bytes); 978 } 979 980 /* 981 * get a map of all nodes that are heartbeating in any regions 982 */ 983 void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 984 { 985 /* callers want to serialize this map and callbacks so that they 986 * can trust that they don't miss nodes coming to the party */ 987 down_read(&o2hb_callback_sem); 988 spin_lock(&o2hb_live_lock); 989 o2hb_fill_node_map_from_callback(map, bytes); 990 spin_unlock(&o2hb_live_lock); 991 up_read(&o2hb_callback_sem); 992 } 993 EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 994 995 /* 996 * heartbeat configfs bits. The heartbeat set is a default set under 997 * the cluster set in nodemanager.c. 998 */ 999 1000 static struct o2hb_region *to_o2hb_region(struct config_item *item) 1001 { 1002 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 1003 } 1004 1005 /* drop_item only drops its ref after killing the thread, nothing should 1006 * be using the region anymore. this has to clean up any state that 1007 * attributes might have built up. */ 1008 static void o2hb_region_release(struct config_item *item) 1009 { 1010 int i; 1011 struct page *page; 1012 struct o2hb_region *reg = to_o2hb_region(item); 1013 1014 if (reg->hr_tmp_block) 1015 kfree(reg->hr_tmp_block); 1016 1017 if (reg->hr_slot_data) { 1018 for (i = 0; i < reg->hr_num_pages; i++) { 1019 page = reg->hr_slot_data[i]; 1020 if (page) 1021 __free_page(page); 1022 } 1023 kfree(reg->hr_slot_data); 1024 } 1025 1026 if (reg->hr_bdev) 1027 blkdev_put(reg->hr_bdev); 1028 1029 if (reg->hr_slots) 1030 kfree(reg->hr_slots); 1031 1032 spin_lock(&o2hb_live_lock); 1033 list_del(®->hr_all_item); 1034 spin_unlock(&o2hb_live_lock); 1035 1036 kfree(reg); 1037 } 1038 1039 static int o2hb_read_block_input(struct o2hb_region *reg, 1040 const char *page, 1041 size_t count, 1042 unsigned long *ret_bytes, 1043 unsigned int *ret_bits) 1044 { 1045 unsigned long bytes; 1046 char *p = (char *)page; 1047 1048 bytes = simple_strtoul(p, &p, 0); 1049 if (!p || (*p && (*p != '\n'))) 1050 return -EINVAL; 1051 1052 /* Heartbeat and fs min / max block sizes are the same. */ 1053 if (bytes > 4096 || bytes < 512) 1054 return -ERANGE; 1055 if (hweight16(bytes) != 1) 1056 return -EINVAL; 1057 1058 if (ret_bytes) 1059 *ret_bytes = bytes; 1060 if (ret_bits) 1061 *ret_bits = ffs(bytes) - 1; 1062 1063 return 0; 1064 } 1065 1066 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, 1067 char *page) 1068 { 1069 return sprintf(page, "%u\n", reg->hr_block_bytes); 1070 } 1071 1072 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, 1073 const char *page, 1074 size_t count) 1075 { 1076 int status; 1077 unsigned long block_bytes; 1078 unsigned int block_bits; 1079 1080 if (reg->hr_bdev) 1081 return -EINVAL; 1082 1083 status = o2hb_read_block_input(reg, page, count, 1084 &block_bytes, &block_bits); 1085 if (status) 1086 return status; 1087 1088 reg->hr_block_bytes = (unsigned int)block_bytes; 1089 reg->hr_block_bits = block_bits; 1090 1091 return count; 1092 } 1093 1094 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, 1095 char *page) 1096 { 1097 return sprintf(page, "%llu\n", reg->hr_start_block); 1098 } 1099 1100 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, 1101 const char *page, 1102 size_t count) 1103 { 1104 unsigned long long tmp; 1105 char *p = (char *)page; 1106 1107 if (reg->hr_bdev) 1108 return -EINVAL; 1109 1110 tmp = simple_strtoull(p, &p, 0); 1111 if (!p || (*p && (*p != '\n'))) 1112 return -EINVAL; 1113 1114 reg->hr_start_block = tmp; 1115 1116 return count; 1117 } 1118 1119 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, 1120 char *page) 1121 { 1122 return sprintf(page, "%d\n", reg->hr_blocks); 1123 } 1124 1125 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, 1126 const char *page, 1127 size_t count) 1128 { 1129 unsigned long tmp; 1130 char *p = (char *)page; 1131 1132 if (reg->hr_bdev) 1133 return -EINVAL; 1134 1135 tmp = simple_strtoul(p, &p, 0); 1136 if (!p || (*p && (*p != '\n'))) 1137 return -EINVAL; 1138 1139 if (tmp > O2NM_MAX_NODES || tmp == 0) 1140 return -ERANGE; 1141 1142 reg->hr_blocks = (unsigned int)tmp; 1143 1144 return count; 1145 } 1146 1147 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, 1148 char *page) 1149 { 1150 unsigned int ret = 0; 1151 1152 if (reg->hr_bdev) 1153 ret = sprintf(page, "%s\n", reg->hr_dev_name); 1154 1155 return ret; 1156 } 1157 1158 static void o2hb_init_region_params(struct o2hb_region *reg) 1159 { 1160 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; 1161 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1162 1163 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1164 reg->hr_start_block, reg->hr_blocks); 1165 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1166 reg->hr_block_bytes, reg->hr_block_bits); 1167 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1168 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1169 } 1170 1171 static int o2hb_map_slot_data(struct o2hb_region *reg) 1172 { 1173 int i, j; 1174 unsigned int last_slot; 1175 unsigned int spp = reg->hr_slots_per_page; 1176 struct page *page; 1177 char *raw; 1178 struct o2hb_disk_slot *slot; 1179 1180 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1181 if (reg->hr_tmp_block == NULL) { 1182 mlog_errno(-ENOMEM); 1183 return -ENOMEM; 1184 } 1185 1186 reg->hr_slots = kcalloc(reg->hr_blocks, 1187 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1188 if (reg->hr_slots == NULL) { 1189 mlog_errno(-ENOMEM); 1190 return -ENOMEM; 1191 } 1192 1193 for(i = 0; i < reg->hr_blocks; i++) { 1194 slot = ®->hr_slots[i]; 1195 slot->ds_node_num = i; 1196 INIT_LIST_HEAD(&slot->ds_live_item); 1197 slot->ds_raw_block = NULL; 1198 } 1199 1200 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1201 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1202 "at %u blocks per page\n", 1203 reg->hr_num_pages, reg->hr_blocks, spp); 1204 1205 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1206 GFP_KERNEL); 1207 if (!reg->hr_slot_data) { 1208 mlog_errno(-ENOMEM); 1209 return -ENOMEM; 1210 } 1211 1212 for(i = 0; i < reg->hr_num_pages; i++) { 1213 page = alloc_page(GFP_KERNEL); 1214 if (!page) { 1215 mlog_errno(-ENOMEM); 1216 return -ENOMEM; 1217 } 1218 1219 reg->hr_slot_data[i] = page; 1220 1221 last_slot = i * spp; 1222 raw = page_address(page); 1223 for (j = 0; 1224 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1225 j++) { 1226 BUG_ON((j + last_slot) >= reg->hr_blocks); 1227 1228 slot = ®->hr_slots[j + last_slot]; 1229 slot->ds_raw_block = 1230 (struct o2hb_disk_heartbeat_block *) raw; 1231 1232 raw += reg->hr_block_bytes; 1233 } 1234 } 1235 1236 return 0; 1237 } 1238 1239 /* Read in all the slots available and populate the tracking 1240 * structures so that we can start with a baseline idea of what's 1241 * there. */ 1242 static int o2hb_populate_slot_data(struct o2hb_region *reg) 1243 { 1244 int ret, i; 1245 struct o2hb_disk_slot *slot; 1246 struct o2hb_disk_heartbeat_block *hb_block; 1247 1248 mlog_entry_void(); 1249 1250 ret = o2hb_read_slots(reg, reg->hr_blocks); 1251 if (ret) { 1252 mlog_errno(ret); 1253 goto out; 1254 } 1255 1256 /* We only want to get an idea of the values initially in each 1257 * slot, so we do no verification - o2hb_check_slot will 1258 * actually determine if each configured slot is valid and 1259 * whether any values have changed. */ 1260 for(i = 0; i < reg->hr_blocks; i++) { 1261 slot = ®->hr_slots[i]; 1262 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1263 1264 /* Only fill the values that o2hb_check_slot uses to 1265 * determine changing slots */ 1266 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1267 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1268 } 1269 1270 out: 1271 mlog_exit(ret); 1272 return ret; 1273 } 1274 1275 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1276 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, 1277 const char *page, 1278 size_t count) 1279 { 1280 long fd; 1281 int sectsize; 1282 char *p = (char *)page; 1283 struct file *filp = NULL; 1284 struct inode *inode = NULL; 1285 ssize_t ret = -EINVAL; 1286 1287 if (reg->hr_bdev) 1288 goto out; 1289 1290 /* We can't heartbeat without having had our node number 1291 * configured yet. */ 1292 if (o2nm_this_node() == O2NM_MAX_NODES) 1293 goto out; 1294 1295 fd = simple_strtol(p, &p, 0); 1296 if (!p || (*p && (*p != '\n'))) 1297 goto out; 1298 1299 if (fd < 0 || fd >= INT_MAX) 1300 goto out; 1301 1302 filp = fget(fd); 1303 if (filp == NULL) 1304 goto out; 1305 1306 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1307 reg->hr_block_bytes == 0) 1308 goto out; 1309 1310 inode = igrab(filp->f_mapping->host); 1311 if (inode == NULL) 1312 goto out; 1313 1314 if (!S_ISBLK(inode->i_mode)) 1315 goto out; 1316 1317 reg->hr_bdev = I_BDEV(filp->f_mapping->host); 1318 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0); 1319 if (ret) { 1320 reg->hr_bdev = NULL; 1321 goto out; 1322 } 1323 inode = NULL; 1324 1325 bdevname(reg->hr_bdev, reg->hr_dev_name); 1326 1327 sectsize = bdev_hardsect_size(reg->hr_bdev); 1328 if (sectsize != reg->hr_block_bytes) { 1329 mlog(ML_ERROR, 1330 "blocksize %u incorrect for device, expected %d", 1331 reg->hr_block_bytes, sectsize); 1332 ret = -EINVAL; 1333 goto out; 1334 } 1335 1336 o2hb_init_region_params(reg); 1337 1338 /* Generation of zero is invalid */ 1339 do { 1340 get_random_bytes(®->hr_generation, 1341 sizeof(reg->hr_generation)); 1342 } while (reg->hr_generation == 0); 1343 1344 ret = o2hb_map_slot_data(reg); 1345 if (ret) { 1346 mlog_errno(ret); 1347 goto out; 1348 } 1349 1350 ret = o2hb_populate_slot_data(reg); 1351 if (ret) { 1352 mlog_errno(ret); 1353 goto out; 1354 } 1355 1356 INIT_WORK(®->hr_write_timeout_work, o2hb_write_timeout, reg); 1357 1358 /* 1359 * A node is considered live after it has beat LIVE_THRESHOLD 1360 * times. We're not steady until we've given them a chance 1361 * _after_ our first read. 1362 */ 1363 atomic_set(®->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1); 1364 1365 reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1366 reg->hr_item.ci_name); 1367 if (IS_ERR(reg->hr_task)) { 1368 ret = PTR_ERR(reg->hr_task); 1369 mlog_errno(ret); 1370 reg->hr_task = NULL; 1371 goto out; 1372 } 1373 1374 ret = wait_event_interruptible(o2hb_steady_queue, 1375 atomic_read(®->hr_steady_iterations) == 0); 1376 if (ret) { 1377 kthread_stop(reg->hr_task); 1378 reg->hr_task = NULL; 1379 goto out; 1380 } 1381 1382 ret = count; 1383 out: 1384 if (filp) 1385 fput(filp); 1386 if (inode) 1387 iput(inode); 1388 if (ret < 0) { 1389 if (reg->hr_bdev) { 1390 blkdev_put(reg->hr_bdev); 1391 reg->hr_bdev = NULL; 1392 } 1393 } 1394 return ret; 1395 } 1396 1397 struct o2hb_region_attribute { 1398 struct configfs_attribute attr; 1399 ssize_t (*show)(struct o2hb_region *, char *); 1400 ssize_t (*store)(struct o2hb_region *, const char *, size_t); 1401 }; 1402 1403 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = { 1404 .attr = { .ca_owner = THIS_MODULE, 1405 .ca_name = "block_bytes", 1406 .ca_mode = S_IRUGO | S_IWUSR }, 1407 .show = o2hb_region_block_bytes_read, 1408 .store = o2hb_region_block_bytes_write, 1409 }; 1410 1411 static struct o2hb_region_attribute o2hb_region_attr_start_block = { 1412 .attr = { .ca_owner = THIS_MODULE, 1413 .ca_name = "start_block", 1414 .ca_mode = S_IRUGO | S_IWUSR }, 1415 .show = o2hb_region_start_block_read, 1416 .store = o2hb_region_start_block_write, 1417 }; 1418 1419 static struct o2hb_region_attribute o2hb_region_attr_blocks = { 1420 .attr = { .ca_owner = THIS_MODULE, 1421 .ca_name = "blocks", 1422 .ca_mode = S_IRUGO | S_IWUSR }, 1423 .show = o2hb_region_blocks_read, 1424 .store = o2hb_region_blocks_write, 1425 }; 1426 1427 static struct o2hb_region_attribute o2hb_region_attr_dev = { 1428 .attr = { .ca_owner = THIS_MODULE, 1429 .ca_name = "dev", 1430 .ca_mode = S_IRUGO | S_IWUSR }, 1431 .show = o2hb_region_dev_read, 1432 .store = o2hb_region_dev_write, 1433 }; 1434 1435 static struct configfs_attribute *o2hb_region_attrs[] = { 1436 &o2hb_region_attr_block_bytes.attr, 1437 &o2hb_region_attr_start_block.attr, 1438 &o2hb_region_attr_blocks.attr, 1439 &o2hb_region_attr_dev.attr, 1440 NULL, 1441 }; 1442 1443 static ssize_t o2hb_region_show(struct config_item *item, 1444 struct configfs_attribute *attr, 1445 char *page) 1446 { 1447 struct o2hb_region *reg = to_o2hb_region(item); 1448 struct o2hb_region_attribute *o2hb_region_attr = 1449 container_of(attr, struct o2hb_region_attribute, attr); 1450 ssize_t ret = 0; 1451 1452 if (o2hb_region_attr->show) 1453 ret = o2hb_region_attr->show(reg, page); 1454 return ret; 1455 } 1456 1457 static ssize_t o2hb_region_store(struct config_item *item, 1458 struct configfs_attribute *attr, 1459 const char *page, size_t count) 1460 { 1461 struct o2hb_region *reg = to_o2hb_region(item); 1462 struct o2hb_region_attribute *o2hb_region_attr = 1463 container_of(attr, struct o2hb_region_attribute, attr); 1464 ssize_t ret = -EINVAL; 1465 1466 if (o2hb_region_attr->store) 1467 ret = o2hb_region_attr->store(reg, page, count); 1468 return ret; 1469 } 1470 1471 static struct configfs_item_operations o2hb_region_item_ops = { 1472 .release = o2hb_region_release, 1473 .show_attribute = o2hb_region_show, 1474 .store_attribute = o2hb_region_store, 1475 }; 1476 1477 static struct config_item_type o2hb_region_type = { 1478 .ct_item_ops = &o2hb_region_item_ops, 1479 .ct_attrs = o2hb_region_attrs, 1480 .ct_owner = THIS_MODULE, 1481 }; 1482 1483 /* heartbeat set */ 1484 1485 struct o2hb_heartbeat_group { 1486 struct config_group hs_group; 1487 /* some stuff? */ 1488 }; 1489 1490 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 1491 { 1492 return group ? 1493 container_of(group, struct o2hb_heartbeat_group, hs_group) 1494 : NULL; 1495 } 1496 1497 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 1498 const char *name) 1499 { 1500 struct o2hb_region *reg = NULL; 1501 struct config_item *ret = NULL; 1502 1503 reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL); 1504 if (reg == NULL) 1505 goto out; /* ENOMEM */ 1506 1507 config_item_init_type_name(®->hr_item, name, &o2hb_region_type); 1508 1509 ret = ®->hr_item; 1510 1511 spin_lock(&o2hb_live_lock); 1512 list_add_tail(®->hr_all_item, &o2hb_all_regions); 1513 spin_unlock(&o2hb_live_lock); 1514 out: 1515 if (ret == NULL) 1516 kfree(reg); 1517 1518 return ret; 1519 } 1520 1521 static void o2hb_heartbeat_group_drop_item(struct config_group *group, 1522 struct config_item *item) 1523 { 1524 struct o2hb_region *reg = to_o2hb_region(item); 1525 1526 /* stop the thread when the user removes the region dir */ 1527 if (reg->hr_task) { 1528 kthread_stop(reg->hr_task); 1529 reg->hr_task = NULL; 1530 } 1531 1532 config_item_put(item); 1533 } 1534 1535 struct o2hb_heartbeat_group_attribute { 1536 struct configfs_attribute attr; 1537 ssize_t (*show)(struct o2hb_heartbeat_group *, char *); 1538 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t); 1539 }; 1540 1541 static ssize_t o2hb_heartbeat_group_show(struct config_item *item, 1542 struct configfs_attribute *attr, 1543 char *page) 1544 { 1545 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1546 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1547 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1548 ssize_t ret = 0; 1549 1550 if (o2hb_heartbeat_group_attr->show) 1551 ret = o2hb_heartbeat_group_attr->show(reg, page); 1552 return ret; 1553 } 1554 1555 static ssize_t o2hb_heartbeat_group_store(struct config_item *item, 1556 struct configfs_attribute *attr, 1557 const char *page, size_t count) 1558 { 1559 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1560 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1561 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1562 ssize_t ret = -EINVAL; 1563 1564 if (o2hb_heartbeat_group_attr->store) 1565 ret = o2hb_heartbeat_group_attr->store(reg, page, count); 1566 return ret; 1567 } 1568 1569 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group, 1570 char *page) 1571 { 1572 return sprintf(page, "%u\n", o2hb_dead_threshold); 1573 } 1574 1575 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group, 1576 const char *page, 1577 size_t count) 1578 { 1579 unsigned long tmp; 1580 char *p = (char *)page; 1581 1582 tmp = simple_strtoul(p, &p, 10); 1583 if (!p || (*p && (*p != '\n'))) 1584 return -EINVAL; 1585 1586 /* this will validate ranges for us. */ 1587 o2hb_dead_threshold_set((unsigned int) tmp); 1588 1589 return count; 1590 } 1591 1592 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = { 1593 .attr = { .ca_owner = THIS_MODULE, 1594 .ca_name = "dead_threshold", 1595 .ca_mode = S_IRUGO | S_IWUSR }, 1596 .show = o2hb_heartbeat_group_threshold_show, 1597 .store = o2hb_heartbeat_group_threshold_store, 1598 }; 1599 1600 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 1601 &o2hb_heartbeat_group_attr_threshold.attr, 1602 NULL, 1603 }; 1604 1605 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = { 1606 .show_attribute = o2hb_heartbeat_group_show, 1607 .store_attribute = o2hb_heartbeat_group_store, 1608 }; 1609 1610 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 1611 .make_item = o2hb_heartbeat_group_make_item, 1612 .drop_item = o2hb_heartbeat_group_drop_item, 1613 }; 1614 1615 static struct config_item_type o2hb_heartbeat_group_type = { 1616 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 1617 .ct_item_ops = &o2hb_hearbeat_group_item_ops, 1618 .ct_attrs = o2hb_heartbeat_group_attrs, 1619 .ct_owner = THIS_MODULE, 1620 }; 1621 1622 /* this is just here to avoid touching group in heartbeat.h which the 1623 * entire damn world #includes */ 1624 struct config_group *o2hb_alloc_hb_set(void) 1625 { 1626 struct o2hb_heartbeat_group *hs = NULL; 1627 struct config_group *ret = NULL; 1628 1629 hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 1630 if (hs == NULL) 1631 goto out; 1632 1633 config_group_init_type_name(&hs->hs_group, "heartbeat", 1634 &o2hb_heartbeat_group_type); 1635 1636 ret = &hs->hs_group; 1637 out: 1638 if (ret == NULL) 1639 kfree(hs); 1640 return ret; 1641 } 1642 1643 void o2hb_free_hb_set(struct config_group *group) 1644 { 1645 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 1646 kfree(hs); 1647 } 1648 1649 /* hb callback registration and issueing */ 1650 1651 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 1652 { 1653 if (type == O2HB_NUM_CB) 1654 return ERR_PTR(-EINVAL); 1655 1656 return &o2hb_callbacks[type]; 1657 } 1658 1659 void o2hb_setup_callback(struct o2hb_callback_func *hc, 1660 enum o2hb_callback_type type, 1661 o2hb_cb_func *func, 1662 void *data, 1663 int priority) 1664 { 1665 INIT_LIST_HEAD(&hc->hc_item); 1666 hc->hc_func = func; 1667 hc->hc_data = data; 1668 hc->hc_priority = priority; 1669 hc->hc_type = type; 1670 hc->hc_magic = O2HB_CB_MAGIC; 1671 } 1672 EXPORT_SYMBOL_GPL(o2hb_setup_callback); 1673 1674 int o2hb_register_callback(struct o2hb_callback_func *hc) 1675 { 1676 struct o2hb_callback_func *tmp; 1677 struct list_head *iter; 1678 struct o2hb_callback *hbcall; 1679 int ret; 1680 1681 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1682 BUG_ON(!list_empty(&hc->hc_item)); 1683 1684 hbcall = hbcall_from_type(hc->hc_type); 1685 if (IS_ERR(hbcall)) { 1686 ret = PTR_ERR(hbcall); 1687 goto out; 1688 } 1689 1690 down_write(&o2hb_callback_sem); 1691 1692 list_for_each(iter, &hbcall->list) { 1693 tmp = list_entry(iter, struct o2hb_callback_func, hc_item); 1694 if (hc->hc_priority < tmp->hc_priority) { 1695 list_add_tail(&hc->hc_item, iter); 1696 break; 1697 } 1698 } 1699 if (list_empty(&hc->hc_item)) 1700 list_add_tail(&hc->hc_item, &hbcall->list); 1701 1702 up_write(&o2hb_callback_sem); 1703 ret = 0; 1704 out: 1705 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n", 1706 ret, __builtin_return_address(0), hc); 1707 return ret; 1708 } 1709 EXPORT_SYMBOL_GPL(o2hb_register_callback); 1710 1711 int o2hb_unregister_callback(struct o2hb_callback_func *hc) 1712 { 1713 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1714 1715 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n", 1716 __builtin_return_address(0), hc); 1717 1718 if (list_empty(&hc->hc_item)) 1719 return 0; 1720 1721 down_write(&o2hb_callback_sem); 1722 1723 list_del_init(&hc->hc_item); 1724 1725 up_write(&o2hb_callback_sem); 1726 1727 return 0; 1728 } 1729 EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 1730 1731 int o2hb_check_node_heartbeating(u8 node_num) 1732 { 1733 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1734 1735 o2hb_fill_node_map(testing_map, sizeof(testing_map)); 1736 if (!test_bit(node_num, testing_map)) { 1737 mlog(ML_HEARTBEAT, 1738 "node (%u) does not have heartbeating enabled.\n", 1739 node_num); 1740 return 0; 1741 } 1742 1743 return 1; 1744 } 1745 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating); 1746 1747 int o2hb_check_node_heartbeating_from_callback(u8 node_num) 1748 { 1749 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1750 1751 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 1752 if (!test_bit(node_num, testing_map)) { 1753 mlog(ML_HEARTBEAT, 1754 "node (%u) does not have heartbeating enabled.\n", 1755 node_num); 1756 return 0; 1757 } 1758 1759 return 1; 1760 } 1761 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 1762 1763 /* Makes sure our local node is configured with a node number, and is 1764 * heartbeating. */ 1765 int o2hb_check_local_node_heartbeating(void) 1766 { 1767 u8 node_num; 1768 1769 /* if this node was set then we have networking */ 1770 node_num = o2nm_this_node(); 1771 if (node_num == O2NM_MAX_NODES) { 1772 mlog(ML_HEARTBEAT, "this node has not been configured.\n"); 1773 return 0; 1774 } 1775 1776 return o2hb_check_node_heartbeating(node_num); 1777 } 1778 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating); 1779 1780 /* 1781 * this is just a hack until we get the plumbing which flips file systems 1782 * read only and drops the hb ref instead of killing the node dead. 1783 */ 1784 void o2hb_stop_all_regions(void) 1785 { 1786 struct o2hb_region *reg; 1787 1788 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 1789 1790 spin_lock(&o2hb_live_lock); 1791 1792 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 1793 reg->hr_unclean_stop = 1; 1794 1795 spin_unlock(&o2hb_live_lock); 1796 } 1797 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 1798