1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public 17 * License along with this program; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 19 * Boston, MA 021110-1307, USA. 20 */ 21 22 #include <linux/kernel.h> 23 #include <linux/sched.h> 24 #include <linux/jiffies.h> 25 #include <linux/module.h> 26 #include <linux/fs.h> 27 #include <linux/bio.h> 28 #include <linux/blkdev.h> 29 #include <linux/delay.h> 30 #include <linux/file.h> 31 #include <linux/kthread.h> 32 #include <linux/configfs.h> 33 #include <linux/random.h> 34 #include <linux/crc32.h> 35 #include <linux/time.h> 36 37 #include "heartbeat.h" 38 #include "tcp.h" 39 #include "nodemanager.h" 40 #include "quorum.h" 41 42 #include "masklog.h" 43 44 45 /* 46 * The first heartbeat pass had one global thread that would serialize all hb 47 * callback calls. This global serializing sem should only be removed once 48 * we've made sure that all callees can deal with being called concurrently 49 * from multiple hb region threads. 50 */ 51 static DECLARE_RWSEM(o2hb_callback_sem); 52 53 /* 54 * multiple hb threads are watching multiple regions. A node is live 55 * whenever any of the threads sees activity from the node in its region. 56 */ 57 static DEFINE_SPINLOCK(o2hb_live_lock); 58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 60 static LIST_HEAD(o2hb_node_events); 61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 62 63 static LIST_HEAD(o2hb_all_regions); 64 65 static struct o2hb_callback { 66 struct list_head list; 67 } o2hb_callbacks[O2HB_NUM_CB]; 68 69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 70 71 #define O2HB_DEFAULT_BLOCK_BITS 9 72 73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 74 75 /* Only sets a new threshold if there are no active regions. 76 * 77 * No locking or otherwise interesting code is required for reading 78 * o2hb_dead_threshold as it can't change once regions are active and 79 * it's not interesting to anyone until then anyway. */ 80 static void o2hb_dead_threshold_set(unsigned int threshold) 81 { 82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 83 spin_lock(&o2hb_live_lock); 84 if (list_empty(&o2hb_all_regions)) 85 o2hb_dead_threshold = threshold; 86 spin_unlock(&o2hb_live_lock); 87 } 88 } 89 90 struct o2hb_node_event { 91 struct list_head hn_item; 92 enum o2hb_callback_type hn_event_type; 93 struct o2nm_node *hn_node; 94 int hn_node_num; 95 }; 96 97 struct o2hb_disk_slot { 98 struct o2hb_disk_heartbeat_block *ds_raw_block; 99 u8 ds_node_num; 100 u64 ds_last_time; 101 u64 ds_last_generation; 102 u16 ds_equal_samples; 103 u16 ds_changed_samples; 104 struct list_head ds_live_item; 105 }; 106 107 /* each thread owns a region.. when we're asked to tear down the region 108 * we ask the thread to stop, who cleans up the region */ 109 struct o2hb_region { 110 struct config_item hr_item; 111 112 struct list_head hr_all_item; 113 unsigned hr_unclean_stop:1; 114 115 /* protected by the hr_callback_sem */ 116 struct task_struct *hr_task; 117 118 unsigned int hr_blocks; 119 unsigned long long hr_start_block; 120 121 unsigned int hr_block_bits; 122 unsigned int hr_block_bytes; 123 124 unsigned int hr_slots_per_page; 125 unsigned int hr_num_pages; 126 127 struct page **hr_slot_data; 128 struct block_device *hr_bdev; 129 struct o2hb_disk_slot *hr_slots; 130 131 /* let the person setting up hb wait for it to return until it 132 * has reached a 'steady' state. This will be fixed when we have 133 * a more complete api that doesn't lead to this sort of fragility. */ 134 atomic_t hr_steady_iterations; 135 136 char hr_dev_name[BDEVNAME_SIZE]; 137 138 unsigned int hr_timeout_ms; 139 140 /* randomized as the region goes up and down so that a node 141 * recognizes a node going up and down in one iteration */ 142 u64 hr_generation; 143 144 struct delayed_work hr_write_timeout_work; 145 unsigned long hr_last_timeout_start; 146 147 /* Used during o2hb_check_slot to hold a copy of the block 148 * being checked because we temporarily have to zero out the 149 * crc field. */ 150 struct o2hb_disk_heartbeat_block *hr_tmp_block; 151 }; 152 153 struct o2hb_bio_wait_ctxt { 154 atomic_t wc_num_reqs; 155 struct completion wc_io_complete; 156 int wc_error; 157 }; 158 159 static void o2hb_write_timeout(struct work_struct *work) 160 { 161 struct o2hb_region *reg = 162 container_of(work, struct o2hb_region, 163 hr_write_timeout_work.work); 164 165 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 166 "milliseconds\n", reg->hr_dev_name, 167 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 168 o2quo_disk_timeout(); 169 } 170 171 static void o2hb_arm_write_timeout(struct o2hb_region *reg) 172 { 173 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); 174 175 cancel_delayed_work(®->hr_write_timeout_work); 176 reg->hr_last_timeout_start = jiffies; 177 schedule_delayed_work(®->hr_write_timeout_work, 178 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 179 } 180 181 static void o2hb_disarm_write_timeout(struct o2hb_region *reg) 182 { 183 cancel_delayed_work(®->hr_write_timeout_work); 184 flush_scheduled_work(); 185 } 186 187 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) 188 { 189 atomic_set(&wc->wc_num_reqs, 1); 190 init_completion(&wc->wc_io_complete); 191 wc->wc_error = 0; 192 } 193 194 /* Used in error paths too */ 195 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 196 unsigned int num) 197 { 198 /* sadly atomic_sub_and_test() isn't available on all platforms. The 199 * good news is that the fast path only completes one at a time */ 200 while(num--) { 201 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 202 BUG_ON(num > 0); 203 complete(&wc->wc_io_complete); 204 } 205 } 206 } 207 208 static void o2hb_wait_on_io(struct o2hb_region *reg, 209 struct o2hb_bio_wait_ctxt *wc) 210 { 211 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 212 213 blk_run_address_space(mapping); 214 o2hb_bio_wait_dec(wc, 1); 215 216 wait_for_completion(&wc->wc_io_complete); 217 } 218 219 static int o2hb_bio_end_io(struct bio *bio, 220 unsigned int bytes_done, 221 int error) 222 { 223 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 224 225 if (error) { 226 mlog(ML_ERROR, "IO Error %d\n", error); 227 wc->wc_error = error; 228 } 229 230 if (bio->bi_size) 231 return 1; 232 233 o2hb_bio_wait_dec(wc, 1); 234 bio_put(bio); 235 return 0; 236 } 237 238 /* Setup a Bio to cover I/O against num_slots slots starting at 239 * start_slot. */ 240 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 241 struct o2hb_bio_wait_ctxt *wc, 242 unsigned int *current_slot, 243 unsigned int max_slots) 244 { 245 int len, current_page; 246 unsigned int vec_len, vec_start; 247 unsigned int bits = reg->hr_block_bits; 248 unsigned int spp = reg->hr_slots_per_page; 249 unsigned int cs = *current_slot; 250 struct bio *bio; 251 struct page *page; 252 253 /* Testing has shown this allocation to take long enough under 254 * GFP_KERNEL that the local node can get fenced. It would be 255 * nicest if we could pre-allocate these bios and avoid this 256 * all together. */ 257 bio = bio_alloc(GFP_ATOMIC, 16); 258 if (!bio) { 259 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 260 bio = ERR_PTR(-ENOMEM); 261 goto bail; 262 } 263 264 /* Must put everything in 512 byte sectors for the bio... */ 265 bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); 266 bio->bi_bdev = reg->hr_bdev; 267 bio->bi_private = wc; 268 bio->bi_end_io = o2hb_bio_end_io; 269 270 vec_start = (cs << bits) % PAGE_CACHE_SIZE; 271 while(cs < max_slots) { 272 current_page = cs / spp; 273 page = reg->hr_slot_data[current_page]; 274 275 vec_len = min(PAGE_CACHE_SIZE, 276 (max_slots-cs) * (PAGE_CACHE_SIZE/spp) ); 277 278 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 279 current_page, vec_len, vec_start); 280 281 len = bio_add_page(bio, page, vec_len, vec_start); 282 if (len != vec_len) break; 283 284 cs += vec_len / (PAGE_CACHE_SIZE/spp); 285 vec_start = 0; 286 } 287 288 bail: 289 *current_slot = cs; 290 return bio; 291 } 292 293 static int o2hb_read_slots(struct o2hb_region *reg, 294 unsigned int max_slots) 295 { 296 unsigned int current_slot=0; 297 int status; 298 struct o2hb_bio_wait_ctxt wc; 299 struct bio *bio; 300 301 o2hb_bio_wait_init(&wc); 302 303 while(current_slot < max_slots) { 304 bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); 305 if (IS_ERR(bio)) { 306 status = PTR_ERR(bio); 307 mlog_errno(status); 308 goto bail_and_wait; 309 } 310 311 atomic_inc(&wc.wc_num_reqs); 312 submit_bio(READ, bio); 313 } 314 315 status = 0; 316 317 bail_and_wait: 318 o2hb_wait_on_io(reg, &wc); 319 if (wc.wc_error && !status) 320 status = wc.wc_error; 321 322 return status; 323 } 324 325 static int o2hb_issue_node_write(struct o2hb_region *reg, 326 struct o2hb_bio_wait_ctxt *write_wc) 327 { 328 int status; 329 unsigned int slot; 330 struct bio *bio; 331 332 o2hb_bio_wait_init(write_wc); 333 334 slot = o2nm_this_node(); 335 336 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); 337 if (IS_ERR(bio)) { 338 status = PTR_ERR(bio); 339 mlog_errno(status); 340 goto bail; 341 } 342 343 atomic_inc(&write_wc->wc_num_reqs); 344 submit_bio(WRITE, bio); 345 346 status = 0; 347 bail: 348 return status; 349 } 350 351 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 352 struct o2hb_disk_heartbeat_block *hb_block) 353 { 354 __le32 old_cksum; 355 u32 ret; 356 357 /* We want to compute the block crc with a 0 value in the 358 * hb_cksum field. Save it off here and replace after the 359 * crc. */ 360 old_cksum = hb_block->hb_cksum; 361 hb_block->hb_cksum = 0; 362 363 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 364 365 hb_block->hb_cksum = old_cksum; 366 367 return ret; 368 } 369 370 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 371 { 372 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " 373 "cksum = 0x%x, generation 0x%llx\n", 374 (long long)le64_to_cpu(hb_block->hb_seq), 375 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), 376 (long long)le64_to_cpu(hb_block->hb_generation)); 377 } 378 379 static int o2hb_verify_crc(struct o2hb_region *reg, 380 struct o2hb_disk_heartbeat_block *hb_block) 381 { 382 u32 read, computed; 383 384 read = le32_to_cpu(hb_block->hb_cksum); 385 computed = o2hb_compute_block_crc_le(reg, hb_block); 386 387 return read == computed; 388 } 389 390 /* We want to make sure that nobody is heartbeating on top of us -- 391 * this will help detect an invalid configuration. */ 392 static int o2hb_check_last_timestamp(struct o2hb_region *reg) 393 { 394 int node_num, ret; 395 struct o2hb_disk_slot *slot; 396 struct o2hb_disk_heartbeat_block *hb_block; 397 398 node_num = o2nm_this_node(); 399 400 ret = 1; 401 slot = ®->hr_slots[node_num]; 402 /* Don't check on our 1st timestamp */ 403 if (slot->ds_last_time) { 404 hb_block = slot->ds_raw_block; 405 406 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) 407 ret = 0; 408 } 409 410 return ret; 411 } 412 413 static inline void o2hb_prepare_block(struct o2hb_region *reg, 414 u64 generation) 415 { 416 int node_num; 417 u64 cputime; 418 struct o2hb_disk_slot *slot; 419 struct o2hb_disk_heartbeat_block *hb_block; 420 421 node_num = o2nm_this_node(); 422 slot = ®->hr_slots[node_num]; 423 424 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 425 memset(hb_block, 0, reg->hr_block_bytes); 426 /* TODO: time stuff */ 427 cputime = CURRENT_TIME.tv_sec; 428 if (!cputime) 429 cputime = 1; 430 431 hb_block->hb_seq = cpu_to_le64(cputime); 432 hb_block->hb_node = node_num; 433 hb_block->hb_generation = cpu_to_le64(generation); 434 hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS); 435 436 /* This step must always happen last! */ 437 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 438 hb_block)); 439 440 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", 441 (long long)generation, 442 le32_to_cpu(hb_block->hb_cksum)); 443 } 444 445 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 446 struct o2nm_node *node, 447 int idx) 448 { 449 struct list_head *iter; 450 struct o2hb_callback_func *f; 451 452 list_for_each(iter, &hbcall->list) { 453 f = list_entry(iter, struct o2hb_callback_func, hc_item); 454 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 455 (f->hc_func)(node, idx, f->hc_data); 456 } 457 } 458 459 /* Will run the list in order until we process the passed event */ 460 static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 461 { 462 int empty; 463 struct o2hb_callback *hbcall; 464 struct o2hb_node_event *event; 465 466 spin_lock(&o2hb_live_lock); 467 empty = list_empty(&queued_event->hn_item); 468 spin_unlock(&o2hb_live_lock); 469 if (empty) 470 return; 471 472 /* Holding callback sem assures we don't alter the callback 473 * lists when doing this, and serializes ourselves with other 474 * processes wanting callbacks. */ 475 down_write(&o2hb_callback_sem); 476 477 spin_lock(&o2hb_live_lock); 478 while (!list_empty(&o2hb_node_events) 479 && !list_empty(&queued_event->hn_item)) { 480 event = list_entry(o2hb_node_events.next, 481 struct o2hb_node_event, 482 hn_item); 483 list_del_init(&event->hn_item); 484 spin_unlock(&o2hb_live_lock); 485 486 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 487 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 488 event->hn_node_num); 489 490 hbcall = hbcall_from_type(event->hn_event_type); 491 492 /* We should *never* have gotten on to the list with a 493 * bad type... This isn't something that we should try 494 * to recover from. */ 495 BUG_ON(IS_ERR(hbcall)); 496 497 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 498 499 spin_lock(&o2hb_live_lock); 500 } 501 spin_unlock(&o2hb_live_lock); 502 503 up_write(&o2hb_callback_sem); 504 } 505 506 static void o2hb_queue_node_event(struct o2hb_node_event *event, 507 enum o2hb_callback_type type, 508 struct o2nm_node *node, 509 int node_num) 510 { 511 assert_spin_locked(&o2hb_live_lock); 512 513 event->hn_event_type = type; 514 event->hn_node = node; 515 event->hn_node_num = node_num; 516 517 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 518 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 519 520 list_add_tail(&event->hn_item, &o2hb_node_events); 521 } 522 523 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 524 { 525 struct o2hb_node_event event = 526 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 527 struct o2nm_node *node; 528 529 node = o2nm_get_node_by_num(slot->ds_node_num); 530 if (!node) 531 return; 532 533 spin_lock(&o2hb_live_lock); 534 if (!list_empty(&slot->ds_live_item)) { 535 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 536 slot->ds_node_num); 537 538 list_del_init(&slot->ds_live_item); 539 540 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 541 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 542 543 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 544 slot->ds_node_num); 545 } 546 } 547 spin_unlock(&o2hb_live_lock); 548 549 o2hb_run_event_list(&event); 550 551 o2nm_node_put(node); 552 } 553 554 static int o2hb_check_slot(struct o2hb_region *reg, 555 struct o2hb_disk_slot *slot) 556 { 557 int changed = 0, gen_changed = 0; 558 struct o2hb_node_event event = 559 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 560 struct o2nm_node *node; 561 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 562 u64 cputime; 563 unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; 564 unsigned int slot_dead_ms; 565 566 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 567 568 /* Is this correct? Do we assume that the node doesn't exist 569 * if we're not configured for him? */ 570 node = o2nm_get_node_by_num(slot->ds_node_num); 571 if (!node) 572 return 0; 573 574 if (!o2hb_verify_crc(reg, hb_block)) { 575 /* all paths from here will drop o2hb_live_lock for 576 * us. */ 577 spin_lock(&o2hb_live_lock); 578 579 /* Don't print an error on the console in this case - 580 * a freshly formatted heartbeat area will not have a 581 * crc set on it. */ 582 if (list_empty(&slot->ds_live_item)) 583 goto out; 584 585 /* The node is live but pushed out a bad crc. We 586 * consider it a transient miss but don't populate any 587 * other values as they may be junk. */ 588 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 589 slot->ds_node_num, reg->hr_dev_name); 590 o2hb_dump_slot(hb_block); 591 592 slot->ds_equal_samples++; 593 goto fire_callbacks; 594 } 595 596 /* we don't care if these wrap.. the state transitions below 597 * clear at the right places */ 598 cputime = le64_to_cpu(hb_block->hb_seq); 599 if (slot->ds_last_time != cputime) 600 slot->ds_changed_samples++; 601 else 602 slot->ds_equal_samples++; 603 slot->ds_last_time = cputime; 604 605 /* The node changed heartbeat generations. We assume this to 606 * mean it dropped off but came back before we timed out. We 607 * want to consider it down for the time being but don't want 608 * to lose any changed_samples state we might build up to 609 * considering it live again. */ 610 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 611 gen_changed = 1; 612 slot->ds_equal_samples = 0; 613 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " 614 "to 0x%llx)\n", slot->ds_node_num, 615 (long long)slot->ds_last_generation, 616 (long long)le64_to_cpu(hb_block->hb_generation)); 617 } 618 619 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 620 621 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " 622 "seq %llu last %llu changed %u equal %u\n", 623 slot->ds_node_num, (long long)slot->ds_last_generation, 624 le32_to_cpu(hb_block->hb_cksum), 625 (unsigned long long)le64_to_cpu(hb_block->hb_seq), 626 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, 627 slot->ds_equal_samples); 628 629 spin_lock(&o2hb_live_lock); 630 631 fire_callbacks: 632 /* dead nodes only come to life after some number of 633 * changes at any time during their dead time */ 634 if (list_empty(&slot->ds_live_item) && 635 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 636 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", 637 slot->ds_node_num, (long long)slot->ds_last_generation); 638 639 /* first on the list generates a callback */ 640 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 641 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 642 643 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 644 slot->ds_node_num); 645 646 changed = 1; 647 } 648 649 list_add_tail(&slot->ds_live_item, 650 &o2hb_live_slots[slot->ds_node_num]); 651 652 slot->ds_equal_samples = 0; 653 654 /* We want to be sure that all nodes agree on the 655 * number of milliseconds before a node will be 656 * considered dead. The self-fencing timeout is 657 * computed from this value, and a discrepancy might 658 * result in heartbeat calling a node dead when it 659 * hasn't self-fenced yet. */ 660 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); 661 if (slot_dead_ms && slot_dead_ms != dead_ms) { 662 /* TODO: Perhaps we can fail the region here. */ 663 mlog(ML_ERROR, "Node %d on device %s has a dead count " 664 "of %u ms, but our count is %u ms.\n" 665 "Please double check your configuration values " 666 "for 'O2CB_HEARTBEAT_THRESHOLD'\n", 667 slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, 668 dead_ms); 669 } 670 goto out; 671 } 672 673 /* if the list is dead, we're done.. */ 674 if (list_empty(&slot->ds_live_item)) 675 goto out; 676 677 /* live nodes only go dead after enough consequtive missed 678 * samples.. reset the missed counter whenever we see 679 * activity */ 680 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 681 mlog(ML_HEARTBEAT, "Node %d left my region\n", 682 slot->ds_node_num); 683 684 /* last off the live_slot generates a callback */ 685 list_del_init(&slot->ds_live_item); 686 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 687 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 688 689 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 690 slot->ds_node_num); 691 692 changed = 1; 693 } 694 695 /* We don't clear this because the node is still 696 * actually writing new blocks. */ 697 if (!gen_changed) 698 slot->ds_changed_samples = 0; 699 goto out; 700 } 701 if (slot->ds_changed_samples) { 702 slot->ds_changed_samples = 0; 703 slot->ds_equal_samples = 0; 704 } 705 out: 706 spin_unlock(&o2hb_live_lock); 707 708 o2hb_run_event_list(&event); 709 710 o2nm_node_put(node); 711 return changed; 712 } 713 714 /* This could be faster if we just implmented a find_last_bit, but I 715 * don't think the circumstances warrant it. */ 716 static int o2hb_highest_node(unsigned long *nodes, 717 int numbits) 718 { 719 int highest, node; 720 721 highest = numbits; 722 node = -1; 723 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { 724 if (node >= numbits) 725 break; 726 727 highest = node; 728 } 729 730 return highest; 731 } 732 733 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) 734 { 735 int i, ret, highest_node, change = 0; 736 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 737 struct o2hb_bio_wait_ctxt write_wc; 738 739 ret = o2nm_configured_node_map(configured_nodes, 740 sizeof(configured_nodes)); 741 if (ret) { 742 mlog_errno(ret); 743 return ret; 744 } 745 746 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 747 if (highest_node >= O2NM_MAX_NODES) { 748 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); 749 return -EINVAL; 750 } 751 752 /* No sense in reading the slots of nodes that don't exist 753 * yet. Of course, if the node definitions have holes in them 754 * then we're reading an empty slot anyway... Consider this 755 * best-effort. */ 756 ret = o2hb_read_slots(reg, highest_node + 1); 757 if (ret < 0) { 758 mlog_errno(ret); 759 return ret; 760 } 761 762 /* With an up to date view of the slots, we can check that no 763 * other node has been improperly configured to heartbeat in 764 * our slot. */ 765 if (!o2hb_check_last_timestamp(reg)) 766 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " 767 "in our slot!\n", reg->hr_dev_name); 768 769 /* fill in the proper info for our next heartbeat */ 770 o2hb_prepare_block(reg, reg->hr_generation); 771 772 /* And fire off the write. Note that we don't wait on this I/O 773 * until later. */ 774 ret = o2hb_issue_node_write(reg, &write_wc); 775 if (ret < 0) { 776 mlog_errno(ret); 777 return ret; 778 } 779 780 i = -1; 781 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 782 783 change |= o2hb_check_slot(reg, ®->hr_slots[i]); 784 } 785 786 /* 787 * We have to be sure we've advertised ourselves on disk 788 * before we can go to steady state. This ensures that 789 * people we find in our steady state have seen us. 790 */ 791 o2hb_wait_on_io(reg, &write_wc); 792 if (write_wc.wc_error) { 793 /* Do not re-arm the write timeout on I/O error - we 794 * can't be sure that the new block ever made it to 795 * disk */ 796 mlog(ML_ERROR, "Write error %d on device \"%s\"\n", 797 write_wc.wc_error, reg->hr_dev_name); 798 return write_wc.wc_error; 799 } 800 801 o2hb_arm_write_timeout(reg); 802 803 /* let the person who launched us know when things are steady */ 804 if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { 805 if (atomic_dec_and_test(®->hr_steady_iterations)) 806 wake_up(&o2hb_steady_queue); 807 } 808 809 return 0; 810 } 811 812 /* Subtract b from a, storing the result in a. a *must* have a larger 813 * value than b. */ 814 static void o2hb_tv_subtract(struct timeval *a, 815 struct timeval *b) 816 { 817 /* just return 0 when a is after b */ 818 if (a->tv_sec < b->tv_sec || 819 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { 820 a->tv_sec = 0; 821 a->tv_usec = 0; 822 return; 823 } 824 825 a->tv_sec -= b->tv_sec; 826 a->tv_usec -= b->tv_usec; 827 while ( a->tv_usec < 0 ) { 828 a->tv_sec--; 829 a->tv_usec += 1000000; 830 } 831 } 832 833 static unsigned int o2hb_elapsed_msecs(struct timeval *start, 834 struct timeval *end) 835 { 836 struct timeval res = *end; 837 838 o2hb_tv_subtract(&res, start); 839 840 return res.tv_sec * 1000 + res.tv_usec / 1000; 841 } 842 843 /* 844 * we ride the region ref that the region dir holds. before the region 845 * dir is removed and drops it ref it will wait to tear down this 846 * thread. 847 */ 848 static int o2hb_thread(void *data) 849 { 850 int i, ret; 851 struct o2hb_region *reg = data; 852 struct o2hb_bio_wait_ctxt write_wc; 853 struct timeval before_hb, after_hb; 854 unsigned int elapsed_msec; 855 856 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 857 858 set_user_nice(current, -20); 859 860 while (!kthread_should_stop() && !reg->hr_unclean_stop) { 861 /* We track the time spent inside 862 * o2hb_do_disk_heartbeat so that we avoid more then 863 * hr_timeout_ms between disk writes. On busy systems 864 * this should result in a heartbeat which is less 865 * likely to time itself out. */ 866 do_gettimeofday(&before_hb); 867 868 i = 0; 869 do { 870 ret = o2hb_do_disk_heartbeat(reg); 871 } while (ret && ++i < 2); 872 873 do_gettimeofday(&after_hb); 874 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); 875 876 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", 877 before_hb.tv_sec, (unsigned long) before_hb.tv_usec, 878 after_hb.tv_sec, (unsigned long) after_hb.tv_usec, 879 elapsed_msec); 880 881 if (elapsed_msec < reg->hr_timeout_ms) { 882 /* the kthread api has blocked signals for us so no 883 * need to record the return value. */ 884 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 885 } 886 } 887 888 o2hb_disarm_write_timeout(reg); 889 890 /* unclean stop is only used in very bad situation */ 891 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 892 o2hb_shutdown_slot(®->hr_slots[i]); 893 894 /* Explicit down notification - avoid forcing the other nodes 895 * to timeout on this region when we could just as easily 896 * write a clear generation - thus indicating to them that 897 * this node has left this region. 898 * 899 * XXX: Should we skip this on unclean_stop? */ 900 o2hb_prepare_block(reg, 0); 901 ret = o2hb_issue_node_write(reg, &write_wc); 902 if (ret == 0) { 903 o2hb_wait_on_io(reg, &write_wc); 904 } else { 905 mlog_errno(ret); 906 } 907 908 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); 909 910 return 0; 911 } 912 913 void o2hb_init(void) 914 { 915 int i; 916 917 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 918 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 919 920 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 921 INIT_LIST_HEAD(&o2hb_live_slots[i]); 922 923 INIT_LIST_HEAD(&o2hb_node_events); 924 925 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 926 } 927 928 /* if we're already in a callback then we're already serialized by the sem */ 929 static void o2hb_fill_node_map_from_callback(unsigned long *map, 930 unsigned bytes) 931 { 932 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 933 934 memcpy(map, &o2hb_live_node_bitmap, bytes); 935 } 936 937 /* 938 * get a map of all nodes that are heartbeating in any regions 939 */ 940 void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 941 { 942 /* callers want to serialize this map and callbacks so that they 943 * can trust that they don't miss nodes coming to the party */ 944 down_read(&o2hb_callback_sem); 945 spin_lock(&o2hb_live_lock); 946 o2hb_fill_node_map_from_callback(map, bytes); 947 spin_unlock(&o2hb_live_lock); 948 up_read(&o2hb_callback_sem); 949 } 950 EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 951 952 /* 953 * heartbeat configfs bits. The heartbeat set is a default set under 954 * the cluster set in nodemanager.c. 955 */ 956 957 static struct o2hb_region *to_o2hb_region(struct config_item *item) 958 { 959 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 960 } 961 962 /* drop_item only drops its ref after killing the thread, nothing should 963 * be using the region anymore. this has to clean up any state that 964 * attributes might have built up. */ 965 static void o2hb_region_release(struct config_item *item) 966 { 967 int i; 968 struct page *page; 969 struct o2hb_region *reg = to_o2hb_region(item); 970 971 if (reg->hr_tmp_block) 972 kfree(reg->hr_tmp_block); 973 974 if (reg->hr_slot_data) { 975 for (i = 0; i < reg->hr_num_pages; i++) { 976 page = reg->hr_slot_data[i]; 977 if (page) 978 __free_page(page); 979 } 980 kfree(reg->hr_slot_data); 981 } 982 983 if (reg->hr_bdev) 984 blkdev_put(reg->hr_bdev); 985 986 if (reg->hr_slots) 987 kfree(reg->hr_slots); 988 989 spin_lock(&o2hb_live_lock); 990 list_del(®->hr_all_item); 991 spin_unlock(&o2hb_live_lock); 992 993 kfree(reg); 994 } 995 996 static int o2hb_read_block_input(struct o2hb_region *reg, 997 const char *page, 998 size_t count, 999 unsigned long *ret_bytes, 1000 unsigned int *ret_bits) 1001 { 1002 unsigned long bytes; 1003 char *p = (char *)page; 1004 1005 bytes = simple_strtoul(p, &p, 0); 1006 if (!p || (*p && (*p != '\n'))) 1007 return -EINVAL; 1008 1009 /* Heartbeat and fs min / max block sizes are the same. */ 1010 if (bytes > 4096 || bytes < 512) 1011 return -ERANGE; 1012 if (hweight16(bytes) != 1) 1013 return -EINVAL; 1014 1015 if (ret_bytes) 1016 *ret_bytes = bytes; 1017 if (ret_bits) 1018 *ret_bits = ffs(bytes) - 1; 1019 1020 return 0; 1021 } 1022 1023 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, 1024 char *page) 1025 { 1026 return sprintf(page, "%u\n", reg->hr_block_bytes); 1027 } 1028 1029 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, 1030 const char *page, 1031 size_t count) 1032 { 1033 int status; 1034 unsigned long block_bytes; 1035 unsigned int block_bits; 1036 1037 if (reg->hr_bdev) 1038 return -EINVAL; 1039 1040 status = o2hb_read_block_input(reg, page, count, 1041 &block_bytes, &block_bits); 1042 if (status) 1043 return status; 1044 1045 reg->hr_block_bytes = (unsigned int)block_bytes; 1046 reg->hr_block_bits = block_bits; 1047 1048 return count; 1049 } 1050 1051 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, 1052 char *page) 1053 { 1054 return sprintf(page, "%llu\n", reg->hr_start_block); 1055 } 1056 1057 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, 1058 const char *page, 1059 size_t count) 1060 { 1061 unsigned long long tmp; 1062 char *p = (char *)page; 1063 1064 if (reg->hr_bdev) 1065 return -EINVAL; 1066 1067 tmp = simple_strtoull(p, &p, 0); 1068 if (!p || (*p && (*p != '\n'))) 1069 return -EINVAL; 1070 1071 reg->hr_start_block = tmp; 1072 1073 return count; 1074 } 1075 1076 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, 1077 char *page) 1078 { 1079 return sprintf(page, "%d\n", reg->hr_blocks); 1080 } 1081 1082 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, 1083 const char *page, 1084 size_t count) 1085 { 1086 unsigned long tmp; 1087 char *p = (char *)page; 1088 1089 if (reg->hr_bdev) 1090 return -EINVAL; 1091 1092 tmp = simple_strtoul(p, &p, 0); 1093 if (!p || (*p && (*p != '\n'))) 1094 return -EINVAL; 1095 1096 if (tmp > O2NM_MAX_NODES || tmp == 0) 1097 return -ERANGE; 1098 1099 reg->hr_blocks = (unsigned int)tmp; 1100 1101 return count; 1102 } 1103 1104 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, 1105 char *page) 1106 { 1107 unsigned int ret = 0; 1108 1109 if (reg->hr_bdev) 1110 ret = sprintf(page, "%s\n", reg->hr_dev_name); 1111 1112 return ret; 1113 } 1114 1115 static void o2hb_init_region_params(struct o2hb_region *reg) 1116 { 1117 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; 1118 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1119 1120 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1121 reg->hr_start_block, reg->hr_blocks); 1122 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1123 reg->hr_block_bytes, reg->hr_block_bits); 1124 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1125 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1126 } 1127 1128 static int o2hb_map_slot_data(struct o2hb_region *reg) 1129 { 1130 int i, j; 1131 unsigned int last_slot; 1132 unsigned int spp = reg->hr_slots_per_page; 1133 struct page *page; 1134 char *raw; 1135 struct o2hb_disk_slot *slot; 1136 1137 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1138 if (reg->hr_tmp_block == NULL) { 1139 mlog_errno(-ENOMEM); 1140 return -ENOMEM; 1141 } 1142 1143 reg->hr_slots = kcalloc(reg->hr_blocks, 1144 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1145 if (reg->hr_slots == NULL) { 1146 mlog_errno(-ENOMEM); 1147 return -ENOMEM; 1148 } 1149 1150 for(i = 0; i < reg->hr_blocks; i++) { 1151 slot = ®->hr_slots[i]; 1152 slot->ds_node_num = i; 1153 INIT_LIST_HEAD(&slot->ds_live_item); 1154 slot->ds_raw_block = NULL; 1155 } 1156 1157 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1158 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1159 "at %u blocks per page\n", 1160 reg->hr_num_pages, reg->hr_blocks, spp); 1161 1162 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1163 GFP_KERNEL); 1164 if (!reg->hr_slot_data) { 1165 mlog_errno(-ENOMEM); 1166 return -ENOMEM; 1167 } 1168 1169 for(i = 0; i < reg->hr_num_pages; i++) { 1170 page = alloc_page(GFP_KERNEL); 1171 if (!page) { 1172 mlog_errno(-ENOMEM); 1173 return -ENOMEM; 1174 } 1175 1176 reg->hr_slot_data[i] = page; 1177 1178 last_slot = i * spp; 1179 raw = page_address(page); 1180 for (j = 0; 1181 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1182 j++) { 1183 BUG_ON((j + last_slot) >= reg->hr_blocks); 1184 1185 slot = ®->hr_slots[j + last_slot]; 1186 slot->ds_raw_block = 1187 (struct o2hb_disk_heartbeat_block *) raw; 1188 1189 raw += reg->hr_block_bytes; 1190 } 1191 } 1192 1193 return 0; 1194 } 1195 1196 /* Read in all the slots available and populate the tracking 1197 * structures so that we can start with a baseline idea of what's 1198 * there. */ 1199 static int o2hb_populate_slot_data(struct o2hb_region *reg) 1200 { 1201 int ret, i; 1202 struct o2hb_disk_slot *slot; 1203 struct o2hb_disk_heartbeat_block *hb_block; 1204 1205 mlog_entry_void(); 1206 1207 ret = o2hb_read_slots(reg, reg->hr_blocks); 1208 if (ret) { 1209 mlog_errno(ret); 1210 goto out; 1211 } 1212 1213 /* We only want to get an idea of the values initially in each 1214 * slot, so we do no verification - o2hb_check_slot will 1215 * actually determine if each configured slot is valid and 1216 * whether any values have changed. */ 1217 for(i = 0; i < reg->hr_blocks; i++) { 1218 slot = ®->hr_slots[i]; 1219 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1220 1221 /* Only fill the values that o2hb_check_slot uses to 1222 * determine changing slots */ 1223 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1224 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1225 } 1226 1227 out: 1228 mlog_exit(ret); 1229 return ret; 1230 } 1231 1232 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1233 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, 1234 const char *page, 1235 size_t count) 1236 { 1237 struct task_struct *hb_task; 1238 long fd; 1239 int sectsize; 1240 char *p = (char *)page; 1241 struct file *filp = NULL; 1242 struct inode *inode = NULL; 1243 ssize_t ret = -EINVAL; 1244 1245 if (reg->hr_bdev) 1246 goto out; 1247 1248 /* We can't heartbeat without having had our node number 1249 * configured yet. */ 1250 if (o2nm_this_node() == O2NM_MAX_NODES) 1251 goto out; 1252 1253 fd = simple_strtol(p, &p, 0); 1254 if (!p || (*p && (*p != '\n'))) 1255 goto out; 1256 1257 if (fd < 0 || fd >= INT_MAX) 1258 goto out; 1259 1260 filp = fget(fd); 1261 if (filp == NULL) 1262 goto out; 1263 1264 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1265 reg->hr_block_bytes == 0) 1266 goto out; 1267 1268 inode = igrab(filp->f_mapping->host); 1269 if (inode == NULL) 1270 goto out; 1271 1272 if (!S_ISBLK(inode->i_mode)) 1273 goto out; 1274 1275 reg->hr_bdev = I_BDEV(filp->f_mapping->host); 1276 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0); 1277 if (ret) { 1278 reg->hr_bdev = NULL; 1279 goto out; 1280 } 1281 inode = NULL; 1282 1283 bdevname(reg->hr_bdev, reg->hr_dev_name); 1284 1285 sectsize = bdev_hardsect_size(reg->hr_bdev); 1286 if (sectsize != reg->hr_block_bytes) { 1287 mlog(ML_ERROR, 1288 "blocksize %u incorrect for device, expected %d", 1289 reg->hr_block_bytes, sectsize); 1290 ret = -EINVAL; 1291 goto out; 1292 } 1293 1294 o2hb_init_region_params(reg); 1295 1296 /* Generation of zero is invalid */ 1297 do { 1298 get_random_bytes(®->hr_generation, 1299 sizeof(reg->hr_generation)); 1300 } while (reg->hr_generation == 0); 1301 1302 ret = o2hb_map_slot_data(reg); 1303 if (ret) { 1304 mlog_errno(ret); 1305 goto out; 1306 } 1307 1308 ret = o2hb_populate_slot_data(reg); 1309 if (ret) { 1310 mlog_errno(ret); 1311 goto out; 1312 } 1313 1314 INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); 1315 1316 /* 1317 * A node is considered live after it has beat LIVE_THRESHOLD 1318 * times. We're not steady until we've given them a chance 1319 * _after_ our first read. 1320 */ 1321 atomic_set(®->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1); 1322 1323 hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1324 reg->hr_item.ci_name); 1325 if (IS_ERR(hb_task)) { 1326 ret = PTR_ERR(hb_task); 1327 mlog_errno(ret); 1328 goto out; 1329 } 1330 1331 spin_lock(&o2hb_live_lock); 1332 reg->hr_task = hb_task; 1333 spin_unlock(&o2hb_live_lock); 1334 1335 ret = wait_event_interruptible(o2hb_steady_queue, 1336 atomic_read(®->hr_steady_iterations) == 0); 1337 if (ret) { 1338 spin_lock(&o2hb_live_lock); 1339 hb_task = reg->hr_task; 1340 reg->hr_task = NULL; 1341 spin_unlock(&o2hb_live_lock); 1342 1343 if (hb_task) 1344 kthread_stop(hb_task); 1345 goto out; 1346 } 1347 1348 ret = count; 1349 out: 1350 if (filp) 1351 fput(filp); 1352 if (inode) 1353 iput(inode); 1354 if (ret < 0) { 1355 if (reg->hr_bdev) { 1356 blkdev_put(reg->hr_bdev); 1357 reg->hr_bdev = NULL; 1358 } 1359 } 1360 return ret; 1361 } 1362 1363 static ssize_t o2hb_region_pid_read(struct o2hb_region *reg, 1364 char *page) 1365 { 1366 pid_t pid = 0; 1367 1368 spin_lock(&o2hb_live_lock); 1369 if (reg->hr_task) 1370 pid = reg->hr_task->pid; 1371 spin_unlock(&o2hb_live_lock); 1372 1373 if (!pid) 1374 return 0; 1375 1376 return sprintf(page, "%u\n", pid); 1377 } 1378 1379 struct o2hb_region_attribute { 1380 struct configfs_attribute attr; 1381 ssize_t (*show)(struct o2hb_region *, char *); 1382 ssize_t (*store)(struct o2hb_region *, const char *, size_t); 1383 }; 1384 1385 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = { 1386 .attr = { .ca_owner = THIS_MODULE, 1387 .ca_name = "block_bytes", 1388 .ca_mode = S_IRUGO | S_IWUSR }, 1389 .show = o2hb_region_block_bytes_read, 1390 .store = o2hb_region_block_bytes_write, 1391 }; 1392 1393 static struct o2hb_region_attribute o2hb_region_attr_start_block = { 1394 .attr = { .ca_owner = THIS_MODULE, 1395 .ca_name = "start_block", 1396 .ca_mode = S_IRUGO | S_IWUSR }, 1397 .show = o2hb_region_start_block_read, 1398 .store = o2hb_region_start_block_write, 1399 }; 1400 1401 static struct o2hb_region_attribute o2hb_region_attr_blocks = { 1402 .attr = { .ca_owner = THIS_MODULE, 1403 .ca_name = "blocks", 1404 .ca_mode = S_IRUGO | S_IWUSR }, 1405 .show = o2hb_region_blocks_read, 1406 .store = o2hb_region_blocks_write, 1407 }; 1408 1409 static struct o2hb_region_attribute o2hb_region_attr_dev = { 1410 .attr = { .ca_owner = THIS_MODULE, 1411 .ca_name = "dev", 1412 .ca_mode = S_IRUGO | S_IWUSR }, 1413 .show = o2hb_region_dev_read, 1414 .store = o2hb_region_dev_write, 1415 }; 1416 1417 static struct o2hb_region_attribute o2hb_region_attr_pid = { 1418 .attr = { .ca_owner = THIS_MODULE, 1419 .ca_name = "pid", 1420 .ca_mode = S_IRUGO | S_IRUSR }, 1421 .show = o2hb_region_pid_read, 1422 }; 1423 1424 static struct configfs_attribute *o2hb_region_attrs[] = { 1425 &o2hb_region_attr_block_bytes.attr, 1426 &o2hb_region_attr_start_block.attr, 1427 &o2hb_region_attr_blocks.attr, 1428 &o2hb_region_attr_dev.attr, 1429 &o2hb_region_attr_pid.attr, 1430 NULL, 1431 }; 1432 1433 static ssize_t o2hb_region_show(struct config_item *item, 1434 struct configfs_attribute *attr, 1435 char *page) 1436 { 1437 struct o2hb_region *reg = to_o2hb_region(item); 1438 struct o2hb_region_attribute *o2hb_region_attr = 1439 container_of(attr, struct o2hb_region_attribute, attr); 1440 ssize_t ret = 0; 1441 1442 if (o2hb_region_attr->show) 1443 ret = o2hb_region_attr->show(reg, page); 1444 return ret; 1445 } 1446 1447 static ssize_t o2hb_region_store(struct config_item *item, 1448 struct configfs_attribute *attr, 1449 const char *page, size_t count) 1450 { 1451 struct o2hb_region *reg = to_o2hb_region(item); 1452 struct o2hb_region_attribute *o2hb_region_attr = 1453 container_of(attr, struct o2hb_region_attribute, attr); 1454 ssize_t ret = -EINVAL; 1455 1456 if (o2hb_region_attr->store) 1457 ret = o2hb_region_attr->store(reg, page, count); 1458 return ret; 1459 } 1460 1461 static struct configfs_item_operations o2hb_region_item_ops = { 1462 .release = o2hb_region_release, 1463 .show_attribute = o2hb_region_show, 1464 .store_attribute = o2hb_region_store, 1465 }; 1466 1467 static struct config_item_type o2hb_region_type = { 1468 .ct_item_ops = &o2hb_region_item_ops, 1469 .ct_attrs = o2hb_region_attrs, 1470 .ct_owner = THIS_MODULE, 1471 }; 1472 1473 /* heartbeat set */ 1474 1475 struct o2hb_heartbeat_group { 1476 struct config_group hs_group; 1477 /* some stuff? */ 1478 }; 1479 1480 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 1481 { 1482 return group ? 1483 container_of(group, struct o2hb_heartbeat_group, hs_group) 1484 : NULL; 1485 } 1486 1487 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 1488 const char *name) 1489 { 1490 struct o2hb_region *reg = NULL; 1491 struct config_item *ret = NULL; 1492 1493 reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL); 1494 if (reg == NULL) 1495 goto out; /* ENOMEM */ 1496 1497 config_item_init_type_name(®->hr_item, name, &o2hb_region_type); 1498 1499 ret = ®->hr_item; 1500 1501 spin_lock(&o2hb_live_lock); 1502 list_add_tail(®->hr_all_item, &o2hb_all_regions); 1503 spin_unlock(&o2hb_live_lock); 1504 out: 1505 if (ret == NULL) 1506 kfree(reg); 1507 1508 return ret; 1509 } 1510 1511 static void o2hb_heartbeat_group_drop_item(struct config_group *group, 1512 struct config_item *item) 1513 { 1514 struct task_struct *hb_task; 1515 struct o2hb_region *reg = to_o2hb_region(item); 1516 1517 /* stop the thread when the user removes the region dir */ 1518 spin_lock(&o2hb_live_lock); 1519 hb_task = reg->hr_task; 1520 reg->hr_task = NULL; 1521 spin_unlock(&o2hb_live_lock); 1522 1523 if (hb_task) 1524 kthread_stop(hb_task); 1525 1526 config_item_put(item); 1527 } 1528 1529 struct o2hb_heartbeat_group_attribute { 1530 struct configfs_attribute attr; 1531 ssize_t (*show)(struct o2hb_heartbeat_group *, char *); 1532 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t); 1533 }; 1534 1535 static ssize_t o2hb_heartbeat_group_show(struct config_item *item, 1536 struct configfs_attribute *attr, 1537 char *page) 1538 { 1539 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1540 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1541 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1542 ssize_t ret = 0; 1543 1544 if (o2hb_heartbeat_group_attr->show) 1545 ret = o2hb_heartbeat_group_attr->show(reg, page); 1546 return ret; 1547 } 1548 1549 static ssize_t o2hb_heartbeat_group_store(struct config_item *item, 1550 struct configfs_attribute *attr, 1551 const char *page, size_t count) 1552 { 1553 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1554 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1555 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1556 ssize_t ret = -EINVAL; 1557 1558 if (o2hb_heartbeat_group_attr->store) 1559 ret = o2hb_heartbeat_group_attr->store(reg, page, count); 1560 return ret; 1561 } 1562 1563 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group, 1564 char *page) 1565 { 1566 return sprintf(page, "%u\n", o2hb_dead_threshold); 1567 } 1568 1569 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group, 1570 const char *page, 1571 size_t count) 1572 { 1573 unsigned long tmp; 1574 char *p = (char *)page; 1575 1576 tmp = simple_strtoul(p, &p, 10); 1577 if (!p || (*p && (*p != '\n'))) 1578 return -EINVAL; 1579 1580 /* this will validate ranges for us. */ 1581 o2hb_dead_threshold_set((unsigned int) tmp); 1582 1583 return count; 1584 } 1585 1586 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = { 1587 .attr = { .ca_owner = THIS_MODULE, 1588 .ca_name = "dead_threshold", 1589 .ca_mode = S_IRUGO | S_IWUSR }, 1590 .show = o2hb_heartbeat_group_threshold_show, 1591 .store = o2hb_heartbeat_group_threshold_store, 1592 }; 1593 1594 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 1595 &o2hb_heartbeat_group_attr_threshold.attr, 1596 NULL, 1597 }; 1598 1599 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = { 1600 .show_attribute = o2hb_heartbeat_group_show, 1601 .store_attribute = o2hb_heartbeat_group_store, 1602 }; 1603 1604 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 1605 .make_item = o2hb_heartbeat_group_make_item, 1606 .drop_item = o2hb_heartbeat_group_drop_item, 1607 }; 1608 1609 static struct config_item_type o2hb_heartbeat_group_type = { 1610 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 1611 .ct_item_ops = &o2hb_hearbeat_group_item_ops, 1612 .ct_attrs = o2hb_heartbeat_group_attrs, 1613 .ct_owner = THIS_MODULE, 1614 }; 1615 1616 /* this is just here to avoid touching group in heartbeat.h which the 1617 * entire damn world #includes */ 1618 struct config_group *o2hb_alloc_hb_set(void) 1619 { 1620 struct o2hb_heartbeat_group *hs = NULL; 1621 struct config_group *ret = NULL; 1622 1623 hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 1624 if (hs == NULL) 1625 goto out; 1626 1627 config_group_init_type_name(&hs->hs_group, "heartbeat", 1628 &o2hb_heartbeat_group_type); 1629 1630 ret = &hs->hs_group; 1631 out: 1632 if (ret == NULL) 1633 kfree(hs); 1634 return ret; 1635 } 1636 1637 void o2hb_free_hb_set(struct config_group *group) 1638 { 1639 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 1640 kfree(hs); 1641 } 1642 1643 /* hb callback registration and issueing */ 1644 1645 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 1646 { 1647 if (type == O2HB_NUM_CB) 1648 return ERR_PTR(-EINVAL); 1649 1650 return &o2hb_callbacks[type]; 1651 } 1652 1653 void o2hb_setup_callback(struct o2hb_callback_func *hc, 1654 enum o2hb_callback_type type, 1655 o2hb_cb_func *func, 1656 void *data, 1657 int priority) 1658 { 1659 INIT_LIST_HEAD(&hc->hc_item); 1660 hc->hc_func = func; 1661 hc->hc_data = data; 1662 hc->hc_priority = priority; 1663 hc->hc_type = type; 1664 hc->hc_magic = O2HB_CB_MAGIC; 1665 } 1666 EXPORT_SYMBOL_GPL(o2hb_setup_callback); 1667 1668 int o2hb_register_callback(struct o2hb_callback_func *hc) 1669 { 1670 struct o2hb_callback_func *tmp; 1671 struct list_head *iter; 1672 struct o2hb_callback *hbcall; 1673 int ret; 1674 1675 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1676 BUG_ON(!list_empty(&hc->hc_item)); 1677 1678 hbcall = hbcall_from_type(hc->hc_type); 1679 if (IS_ERR(hbcall)) { 1680 ret = PTR_ERR(hbcall); 1681 goto out; 1682 } 1683 1684 down_write(&o2hb_callback_sem); 1685 1686 list_for_each(iter, &hbcall->list) { 1687 tmp = list_entry(iter, struct o2hb_callback_func, hc_item); 1688 if (hc->hc_priority < tmp->hc_priority) { 1689 list_add_tail(&hc->hc_item, iter); 1690 break; 1691 } 1692 } 1693 if (list_empty(&hc->hc_item)) 1694 list_add_tail(&hc->hc_item, &hbcall->list); 1695 1696 up_write(&o2hb_callback_sem); 1697 ret = 0; 1698 out: 1699 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n", 1700 ret, __builtin_return_address(0), hc); 1701 return ret; 1702 } 1703 EXPORT_SYMBOL_GPL(o2hb_register_callback); 1704 1705 void o2hb_unregister_callback(struct o2hb_callback_func *hc) 1706 { 1707 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1708 1709 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n", 1710 __builtin_return_address(0), hc); 1711 1712 if (list_empty(&hc->hc_item)) 1713 return; 1714 1715 down_write(&o2hb_callback_sem); 1716 1717 list_del_init(&hc->hc_item); 1718 1719 up_write(&o2hb_callback_sem); 1720 } 1721 EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 1722 1723 int o2hb_check_node_heartbeating(u8 node_num) 1724 { 1725 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1726 1727 o2hb_fill_node_map(testing_map, sizeof(testing_map)); 1728 if (!test_bit(node_num, testing_map)) { 1729 mlog(ML_HEARTBEAT, 1730 "node (%u) does not have heartbeating enabled.\n", 1731 node_num); 1732 return 0; 1733 } 1734 1735 return 1; 1736 } 1737 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating); 1738 1739 int o2hb_check_node_heartbeating_from_callback(u8 node_num) 1740 { 1741 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1742 1743 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 1744 if (!test_bit(node_num, testing_map)) { 1745 mlog(ML_HEARTBEAT, 1746 "node (%u) does not have heartbeating enabled.\n", 1747 node_num); 1748 return 0; 1749 } 1750 1751 return 1; 1752 } 1753 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 1754 1755 /* Makes sure our local node is configured with a node number, and is 1756 * heartbeating. */ 1757 int o2hb_check_local_node_heartbeating(void) 1758 { 1759 u8 node_num; 1760 1761 /* if this node was set then we have networking */ 1762 node_num = o2nm_this_node(); 1763 if (node_num == O2NM_MAX_NODES) { 1764 mlog(ML_HEARTBEAT, "this node has not been configured.\n"); 1765 return 0; 1766 } 1767 1768 return o2hb_check_node_heartbeating(node_num); 1769 } 1770 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating); 1771 1772 /* 1773 * this is just a hack until we get the plumbing which flips file systems 1774 * read only and drops the hb ref instead of killing the node dead. 1775 */ 1776 void o2hb_stop_all_regions(void) 1777 { 1778 struct o2hb_region *reg; 1779 1780 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 1781 1782 spin_lock(&o2hb_live_lock); 1783 1784 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 1785 reg->hr_unclean_stop = 1; 1786 1787 spin_unlock(&o2hb_live_lock); 1788 } 1789 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 1790