1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public 17 * License along with this program; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 19 * Boston, MA 021110-1307, USA. 20 */ 21 22 #include <linux/kernel.h> 23 #include <linux/sched.h> 24 #include <linux/jiffies.h> 25 #include <linux/module.h> 26 #include <linux/fs.h> 27 #include <linux/bio.h> 28 #include <linux/blkdev.h> 29 #include <linux/delay.h> 30 #include <linux/file.h> 31 #include <linux/kthread.h> 32 #include <linux/configfs.h> 33 #include <linux/random.h> 34 #include <linux/crc32.h> 35 #include <linux/time.h> 36 37 #include "heartbeat.h" 38 #include "tcp.h" 39 #include "nodemanager.h" 40 #include "quorum.h" 41 42 #include "masklog.h" 43 44 45 /* 46 * The first heartbeat pass had one global thread that would serialize all hb 47 * callback calls. This global serializing sem should only be removed once 48 * we've made sure that all callees can deal with being called concurrently 49 * from multiple hb region threads. 50 */ 51 static DECLARE_RWSEM(o2hb_callback_sem); 52 53 /* 54 * multiple hb threads are watching multiple regions. A node is live 55 * whenever any of the threads sees activity from the node in its region. 56 */ 57 static DEFINE_SPINLOCK(o2hb_live_lock); 58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 60 static LIST_HEAD(o2hb_node_events); 61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 62 63 static LIST_HEAD(o2hb_all_regions); 64 65 static struct o2hb_callback { 66 struct list_head list; 67 } o2hb_callbacks[O2HB_NUM_CB]; 68 69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 70 71 #define O2HB_DEFAULT_BLOCK_BITS 9 72 73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 74 75 /* Only sets a new threshold if there are no active regions. 76 * 77 * No locking or otherwise interesting code is required for reading 78 * o2hb_dead_threshold as it can't change once regions are active and 79 * it's not interesting to anyone until then anyway. */ 80 static void o2hb_dead_threshold_set(unsigned int threshold) 81 { 82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 83 spin_lock(&o2hb_live_lock); 84 if (list_empty(&o2hb_all_regions)) 85 o2hb_dead_threshold = threshold; 86 spin_unlock(&o2hb_live_lock); 87 } 88 } 89 90 struct o2hb_node_event { 91 struct list_head hn_item; 92 enum o2hb_callback_type hn_event_type; 93 struct o2nm_node *hn_node; 94 int hn_node_num; 95 }; 96 97 struct o2hb_disk_slot { 98 struct o2hb_disk_heartbeat_block *ds_raw_block; 99 u8 ds_node_num; 100 u64 ds_last_time; 101 u64 ds_last_generation; 102 u16 ds_equal_samples; 103 u16 ds_changed_samples; 104 struct list_head ds_live_item; 105 }; 106 107 /* each thread owns a region.. when we're asked to tear down the region 108 * we ask the thread to stop, who cleans up the region */ 109 struct o2hb_region { 110 struct config_item hr_item; 111 112 struct list_head hr_all_item; 113 unsigned hr_unclean_stop:1; 114 115 /* protected by the hr_callback_sem */ 116 struct task_struct *hr_task; 117 118 unsigned int hr_blocks; 119 unsigned long long hr_start_block; 120 121 unsigned int hr_block_bits; 122 unsigned int hr_block_bytes; 123 124 unsigned int hr_slots_per_page; 125 unsigned int hr_num_pages; 126 127 struct page **hr_slot_data; 128 struct block_device *hr_bdev; 129 struct o2hb_disk_slot *hr_slots; 130 131 /* let the person setting up hb wait for it to return until it 132 * has reached a 'steady' state. This will be fixed when we have 133 * a more complete api that doesn't lead to this sort of fragility. */ 134 atomic_t hr_steady_iterations; 135 136 char hr_dev_name[BDEVNAME_SIZE]; 137 138 unsigned int hr_timeout_ms; 139 140 /* randomized as the region goes up and down so that a node 141 * recognizes a node going up and down in one iteration */ 142 u64 hr_generation; 143 144 struct delayed_work hr_write_timeout_work; 145 unsigned long hr_last_timeout_start; 146 147 /* Used during o2hb_check_slot to hold a copy of the block 148 * being checked because we temporarily have to zero out the 149 * crc field. */ 150 struct o2hb_disk_heartbeat_block *hr_tmp_block; 151 }; 152 153 struct o2hb_bio_wait_ctxt { 154 atomic_t wc_num_reqs; 155 struct completion wc_io_complete; 156 int wc_error; 157 }; 158 159 static void o2hb_write_timeout(struct work_struct *work) 160 { 161 struct o2hb_region *reg = 162 container_of(work, struct o2hb_region, 163 hr_write_timeout_work.work); 164 165 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 166 "milliseconds\n", reg->hr_dev_name, 167 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 168 o2quo_disk_timeout(); 169 } 170 171 static void o2hb_arm_write_timeout(struct o2hb_region *reg) 172 { 173 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); 174 175 cancel_delayed_work(®->hr_write_timeout_work); 176 reg->hr_last_timeout_start = jiffies; 177 schedule_delayed_work(®->hr_write_timeout_work, 178 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 179 } 180 181 static void o2hb_disarm_write_timeout(struct o2hb_region *reg) 182 { 183 cancel_delayed_work(®->hr_write_timeout_work); 184 flush_scheduled_work(); 185 } 186 187 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) 188 { 189 atomic_set(&wc->wc_num_reqs, 1); 190 init_completion(&wc->wc_io_complete); 191 wc->wc_error = 0; 192 } 193 194 /* Used in error paths too */ 195 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 196 unsigned int num) 197 { 198 /* sadly atomic_sub_and_test() isn't available on all platforms. The 199 * good news is that the fast path only completes one at a time */ 200 while(num--) { 201 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 202 BUG_ON(num > 0); 203 complete(&wc->wc_io_complete); 204 } 205 } 206 } 207 208 static void o2hb_wait_on_io(struct o2hb_region *reg, 209 struct o2hb_bio_wait_ctxt *wc) 210 { 211 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 212 213 blk_run_address_space(mapping); 214 o2hb_bio_wait_dec(wc, 1); 215 216 wait_for_completion(&wc->wc_io_complete); 217 } 218 219 static void o2hb_bio_end_io(struct bio *bio, 220 int error) 221 { 222 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 223 224 if (error) { 225 mlog(ML_ERROR, "IO Error %d\n", error); 226 wc->wc_error = error; 227 } 228 229 o2hb_bio_wait_dec(wc, 1); 230 bio_put(bio); 231 } 232 233 /* Setup a Bio to cover I/O against num_slots slots starting at 234 * start_slot. */ 235 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 236 struct o2hb_bio_wait_ctxt *wc, 237 unsigned int *current_slot, 238 unsigned int max_slots) 239 { 240 int len, current_page; 241 unsigned int vec_len, vec_start; 242 unsigned int bits = reg->hr_block_bits; 243 unsigned int spp = reg->hr_slots_per_page; 244 unsigned int cs = *current_slot; 245 struct bio *bio; 246 struct page *page; 247 248 /* Testing has shown this allocation to take long enough under 249 * GFP_KERNEL that the local node can get fenced. It would be 250 * nicest if we could pre-allocate these bios and avoid this 251 * all together. */ 252 bio = bio_alloc(GFP_ATOMIC, 16); 253 if (!bio) { 254 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 255 bio = ERR_PTR(-ENOMEM); 256 goto bail; 257 } 258 259 /* Must put everything in 512 byte sectors for the bio... */ 260 bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); 261 bio->bi_bdev = reg->hr_bdev; 262 bio->bi_private = wc; 263 bio->bi_end_io = o2hb_bio_end_io; 264 265 vec_start = (cs << bits) % PAGE_CACHE_SIZE; 266 while(cs < max_slots) { 267 current_page = cs / spp; 268 page = reg->hr_slot_data[current_page]; 269 270 vec_len = min(PAGE_CACHE_SIZE - vec_start, 271 (max_slots-cs) * (PAGE_CACHE_SIZE/spp) ); 272 273 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 274 current_page, vec_len, vec_start); 275 276 len = bio_add_page(bio, page, vec_len, vec_start); 277 if (len != vec_len) break; 278 279 cs += vec_len / (PAGE_CACHE_SIZE/spp); 280 vec_start = 0; 281 } 282 283 bail: 284 *current_slot = cs; 285 return bio; 286 } 287 288 static int o2hb_read_slots(struct o2hb_region *reg, 289 unsigned int max_slots) 290 { 291 unsigned int current_slot=0; 292 int status; 293 struct o2hb_bio_wait_ctxt wc; 294 struct bio *bio; 295 296 o2hb_bio_wait_init(&wc); 297 298 while(current_slot < max_slots) { 299 bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); 300 if (IS_ERR(bio)) { 301 status = PTR_ERR(bio); 302 mlog_errno(status); 303 goto bail_and_wait; 304 } 305 306 atomic_inc(&wc.wc_num_reqs); 307 submit_bio(READ, bio); 308 } 309 310 status = 0; 311 312 bail_and_wait: 313 o2hb_wait_on_io(reg, &wc); 314 if (wc.wc_error && !status) 315 status = wc.wc_error; 316 317 return status; 318 } 319 320 static int o2hb_issue_node_write(struct o2hb_region *reg, 321 struct o2hb_bio_wait_ctxt *write_wc) 322 { 323 int status; 324 unsigned int slot; 325 struct bio *bio; 326 327 o2hb_bio_wait_init(write_wc); 328 329 slot = o2nm_this_node(); 330 331 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); 332 if (IS_ERR(bio)) { 333 status = PTR_ERR(bio); 334 mlog_errno(status); 335 goto bail; 336 } 337 338 atomic_inc(&write_wc->wc_num_reqs); 339 submit_bio(WRITE, bio); 340 341 status = 0; 342 bail: 343 return status; 344 } 345 346 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 347 struct o2hb_disk_heartbeat_block *hb_block) 348 { 349 __le32 old_cksum; 350 u32 ret; 351 352 /* We want to compute the block crc with a 0 value in the 353 * hb_cksum field. Save it off here and replace after the 354 * crc. */ 355 old_cksum = hb_block->hb_cksum; 356 hb_block->hb_cksum = 0; 357 358 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 359 360 hb_block->hb_cksum = old_cksum; 361 362 return ret; 363 } 364 365 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 366 { 367 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " 368 "cksum = 0x%x, generation 0x%llx\n", 369 (long long)le64_to_cpu(hb_block->hb_seq), 370 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), 371 (long long)le64_to_cpu(hb_block->hb_generation)); 372 } 373 374 static int o2hb_verify_crc(struct o2hb_region *reg, 375 struct o2hb_disk_heartbeat_block *hb_block) 376 { 377 u32 read, computed; 378 379 read = le32_to_cpu(hb_block->hb_cksum); 380 computed = o2hb_compute_block_crc_le(reg, hb_block); 381 382 return read == computed; 383 } 384 385 /* We want to make sure that nobody is heartbeating on top of us -- 386 * this will help detect an invalid configuration. */ 387 static int o2hb_check_last_timestamp(struct o2hb_region *reg) 388 { 389 int node_num, ret; 390 struct o2hb_disk_slot *slot; 391 struct o2hb_disk_heartbeat_block *hb_block; 392 393 node_num = o2nm_this_node(); 394 395 ret = 1; 396 slot = ®->hr_slots[node_num]; 397 /* Don't check on our 1st timestamp */ 398 if (slot->ds_last_time) { 399 hb_block = slot->ds_raw_block; 400 401 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) 402 ret = 0; 403 } 404 405 return ret; 406 } 407 408 static inline void o2hb_prepare_block(struct o2hb_region *reg, 409 u64 generation) 410 { 411 int node_num; 412 u64 cputime; 413 struct o2hb_disk_slot *slot; 414 struct o2hb_disk_heartbeat_block *hb_block; 415 416 node_num = o2nm_this_node(); 417 slot = ®->hr_slots[node_num]; 418 419 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 420 memset(hb_block, 0, reg->hr_block_bytes); 421 /* TODO: time stuff */ 422 cputime = CURRENT_TIME.tv_sec; 423 if (!cputime) 424 cputime = 1; 425 426 hb_block->hb_seq = cpu_to_le64(cputime); 427 hb_block->hb_node = node_num; 428 hb_block->hb_generation = cpu_to_le64(generation); 429 hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS); 430 431 /* This step must always happen last! */ 432 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 433 hb_block)); 434 435 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", 436 (long long)generation, 437 le32_to_cpu(hb_block->hb_cksum)); 438 } 439 440 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 441 struct o2nm_node *node, 442 int idx) 443 { 444 struct list_head *iter; 445 struct o2hb_callback_func *f; 446 447 list_for_each(iter, &hbcall->list) { 448 f = list_entry(iter, struct o2hb_callback_func, hc_item); 449 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 450 (f->hc_func)(node, idx, f->hc_data); 451 } 452 } 453 454 /* Will run the list in order until we process the passed event */ 455 static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 456 { 457 int empty; 458 struct o2hb_callback *hbcall; 459 struct o2hb_node_event *event; 460 461 spin_lock(&o2hb_live_lock); 462 empty = list_empty(&queued_event->hn_item); 463 spin_unlock(&o2hb_live_lock); 464 if (empty) 465 return; 466 467 /* Holding callback sem assures we don't alter the callback 468 * lists when doing this, and serializes ourselves with other 469 * processes wanting callbacks. */ 470 down_write(&o2hb_callback_sem); 471 472 spin_lock(&o2hb_live_lock); 473 while (!list_empty(&o2hb_node_events) 474 && !list_empty(&queued_event->hn_item)) { 475 event = list_entry(o2hb_node_events.next, 476 struct o2hb_node_event, 477 hn_item); 478 list_del_init(&event->hn_item); 479 spin_unlock(&o2hb_live_lock); 480 481 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 482 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 483 event->hn_node_num); 484 485 hbcall = hbcall_from_type(event->hn_event_type); 486 487 /* We should *never* have gotten on to the list with a 488 * bad type... This isn't something that we should try 489 * to recover from. */ 490 BUG_ON(IS_ERR(hbcall)); 491 492 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 493 494 spin_lock(&o2hb_live_lock); 495 } 496 spin_unlock(&o2hb_live_lock); 497 498 up_write(&o2hb_callback_sem); 499 } 500 501 static void o2hb_queue_node_event(struct o2hb_node_event *event, 502 enum o2hb_callback_type type, 503 struct o2nm_node *node, 504 int node_num) 505 { 506 assert_spin_locked(&o2hb_live_lock); 507 508 event->hn_event_type = type; 509 event->hn_node = node; 510 event->hn_node_num = node_num; 511 512 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 513 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 514 515 list_add_tail(&event->hn_item, &o2hb_node_events); 516 } 517 518 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 519 { 520 struct o2hb_node_event event = 521 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 522 struct o2nm_node *node; 523 524 node = o2nm_get_node_by_num(slot->ds_node_num); 525 if (!node) 526 return; 527 528 spin_lock(&o2hb_live_lock); 529 if (!list_empty(&slot->ds_live_item)) { 530 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 531 slot->ds_node_num); 532 533 list_del_init(&slot->ds_live_item); 534 535 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 536 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 537 538 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 539 slot->ds_node_num); 540 } 541 } 542 spin_unlock(&o2hb_live_lock); 543 544 o2hb_run_event_list(&event); 545 546 o2nm_node_put(node); 547 } 548 549 static int o2hb_check_slot(struct o2hb_region *reg, 550 struct o2hb_disk_slot *slot) 551 { 552 int changed = 0, gen_changed = 0; 553 struct o2hb_node_event event = 554 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 555 struct o2nm_node *node; 556 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 557 u64 cputime; 558 unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; 559 unsigned int slot_dead_ms; 560 561 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 562 563 /* Is this correct? Do we assume that the node doesn't exist 564 * if we're not configured for him? */ 565 node = o2nm_get_node_by_num(slot->ds_node_num); 566 if (!node) 567 return 0; 568 569 if (!o2hb_verify_crc(reg, hb_block)) { 570 /* all paths from here will drop o2hb_live_lock for 571 * us. */ 572 spin_lock(&o2hb_live_lock); 573 574 /* Don't print an error on the console in this case - 575 * a freshly formatted heartbeat area will not have a 576 * crc set on it. */ 577 if (list_empty(&slot->ds_live_item)) 578 goto out; 579 580 /* The node is live but pushed out a bad crc. We 581 * consider it a transient miss but don't populate any 582 * other values as they may be junk. */ 583 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 584 slot->ds_node_num, reg->hr_dev_name); 585 o2hb_dump_slot(hb_block); 586 587 slot->ds_equal_samples++; 588 goto fire_callbacks; 589 } 590 591 /* we don't care if these wrap.. the state transitions below 592 * clear at the right places */ 593 cputime = le64_to_cpu(hb_block->hb_seq); 594 if (slot->ds_last_time != cputime) 595 slot->ds_changed_samples++; 596 else 597 slot->ds_equal_samples++; 598 slot->ds_last_time = cputime; 599 600 /* The node changed heartbeat generations. We assume this to 601 * mean it dropped off but came back before we timed out. We 602 * want to consider it down for the time being but don't want 603 * to lose any changed_samples state we might build up to 604 * considering it live again. */ 605 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 606 gen_changed = 1; 607 slot->ds_equal_samples = 0; 608 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " 609 "to 0x%llx)\n", slot->ds_node_num, 610 (long long)slot->ds_last_generation, 611 (long long)le64_to_cpu(hb_block->hb_generation)); 612 } 613 614 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 615 616 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " 617 "seq %llu last %llu changed %u equal %u\n", 618 slot->ds_node_num, (long long)slot->ds_last_generation, 619 le32_to_cpu(hb_block->hb_cksum), 620 (unsigned long long)le64_to_cpu(hb_block->hb_seq), 621 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, 622 slot->ds_equal_samples); 623 624 spin_lock(&o2hb_live_lock); 625 626 fire_callbacks: 627 /* dead nodes only come to life after some number of 628 * changes at any time during their dead time */ 629 if (list_empty(&slot->ds_live_item) && 630 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 631 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", 632 slot->ds_node_num, (long long)slot->ds_last_generation); 633 634 /* first on the list generates a callback */ 635 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 636 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 637 638 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 639 slot->ds_node_num); 640 641 changed = 1; 642 } 643 644 list_add_tail(&slot->ds_live_item, 645 &o2hb_live_slots[slot->ds_node_num]); 646 647 slot->ds_equal_samples = 0; 648 649 /* We want to be sure that all nodes agree on the 650 * number of milliseconds before a node will be 651 * considered dead. The self-fencing timeout is 652 * computed from this value, and a discrepancy might 653 * result in heartbeat calling a node dead when it 654 * hasn't self-fenced yet. */ 655 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); 656 if (slot_dead_ms && slot_dead_ms != dead_ms) { 657 /* TODO: Perhaps we can fail the region here. */ 658 mlog(ML_ERROR, "Node %d on device %s has a dead count " 659 "of %u ms, but our count is %u ms.\n" 660 "Please double check your configuration values " 661 "for 'O2CB_HEARTBEAT_THRESHOLD'\n", 662 slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, 663 dead_ms); 664 } 665 goto out; 666 } 667 668 /* if the list is dead, we're done.. */ 669 if (list_empty(&slot->ds_live_item)) 670 goto out; 671 672 /* live nodes only go dead after enough consequtive missed 673 * samples.. reset the missed counter whenever we see 674 * activity */ 675 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 676 mlog(ML_HEARTBEAT, "Node %d left my region\n", 677 slot->ds_node_num); 678 679 /* last off the live_slot generates a callback */ 680 list_del_init(&slot->ds_live_item); 681 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 682 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 683 684 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 685 slot->ds_node_num); 686 687 changed = 1; 688 } 689 690 /* We don't clear this because the node is still 691 * actually writing new blocks. */ 692 if (!gen_changed) 693 slot->ds_changed_samples = 0; 694 goto out; 695 } 696 if (slot->ds_changed_samples) { 697 slot->ds_changed_samples = 0; 698 slot->ds_equal_samples = 0; 699 } 700 out: 701 spin_unlock(&o2hb_live_lock); 702 703 o2hb_run_event_list(&event); 704 705 o2nm_node_put(node); 706 return changed; 707 } 708 709 /* This could be faster if we just implmented a find_last_bit, but I 710 * don't think the circumstances warrant it. */ 711 static int o2hb_highest_node(unsigned long *nodes, 712 int numbits) 713 { 714 int highest, node; 715 716 highest = numbits; 717 node = -1; 718 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { 719 if (node >= numbits) 720 break; 721 722 highest = node; 723 } 724 725 return highest; 726 } 727 728 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) 729 { 730 int i, ret, highest_node, change = 0; 731 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 732 struct o2hb_bio_wait_ctxt write_wc; 733 734 ret = o2nm_configured_node_map(configured_nodes, 735 sizeof(configured_nodes)); 736 if (ret) { 737 mlog_errno(ret); 738 return ret; 739 } 740 741 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 742 if (highest_node >= O2NM_MAX_NODES) { 743 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); 744 return -EINVAL; 745 } 746 747 /* No sense in reading the slots of nodes that don't exist 748 * yet. Of course, if the node definitions have holes in them 749 * then we're reading an empty slot anyway... Consider this 750 * best-effort. */ 751 ret = o2hb_read_slots(reg, highest_node + 1); 752 if (ret < 0) { 753 mlog_errno(ret); 754 return ret; 755 } 756 757 /* With an up to date view of the slots, we can check that no 758 * other node has been improperly configured to heartbeat in 759 * our slot. */ 760 if (!o2hb_check_last_timestamp(reg)) 761 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " 762 "in our slot!\n", reg->hr_dev_name); 763 764 /* fill in the proper info for our next heartbeat */ 765 o2hb_prepare_block(reg, reg->hr_generation); 766 767 /* And fire off the write. Note that we don't wait on this I/O 768 * until later. */ 769 ret = o2hb_issue_node_write(reg, &write_wc); 770 if (ret < 0) { 771 mlog_errno(ret); 772 return ret; 773 } 774 775 i = -1; 776 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 777 778 change |= o2hb_check_slot(reg, ®->hr_slots[i]); 779 } 780 781 /* 782 * We have to be sure we've advertised ourselves on disk 783 * before we can go to steady state. This ensures that 784 * people we find in our steady state have seen us. 785 */ 786 o2hb_wait_on_io(reg, &write_wc); 787 if (write_wc.wc_error) { 788 /* Do not re-arm the write timeout on I/O error - we 789 * can't be sure that the new block ever made it to 790 * disk */ 791 mlog(ML_ERROR, "Write error %d on device \"%s\"\n", 792 write_wc.wc_error, reg->hr_dev_name); 793 return write_wc.wc_error; 794 } 795 796 o2hb_arm_write_timeout(reg); 797 798 /* let the person who launched us know when things are steady */ 799 if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { 800 if (atomic_dec_and_test(®->hr_steady_iterations)) 801 wake_up(&o2hb_steady_queue); 802 } 803 804 return 0; 805 } 806 807 /* Subtract b from a, storing the result in a. a *must* have a larger 808 * value than b. */ 809 static void o2hb_tv_subtract(struct timeval *a, 810 struct timeval *b) 811 { 812 /* just return 0 when a is after b */ 813 if (a->tv_sec < b->tv_sec || 814 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { 815 a->tv_sec = 0; 816 a->tv_usec = 0; 817 return; 818 } 819 820 a->tv_sec -= b->tv_sec; 821 a->tv_usec -= b->tv_usec; 822 while ( a->tv_usec < 0 ) { 823 a->tv_sec--; 824 a->tv_usec += 1000000; 825 } 826 } 827 828 static unsigned int o2hb_elapsed_msecs(struct timeval *start, 829 struct timeval *end) 830 { 831 struct timeval res = *end; 832 833 o2hb_tv_subtract(&res, start); 834 835 return res.tv_sec * 1000 + res.tv_usec / 1000; 836 } 837 838 /* 839 * we ride the region ref that the region dir holds. before the region 840 * dir is removed and drops it ref it will wait to tear down this 841 * thread. 842 */ 843 static int o2hb_thread(void *data) 844 { 845 int i, ret; 846 struct o2hb_region *reg = data; 847 struct o2hb_bio_wait_ctxt write_wc; 848 struct timeval before_hb, after_hb; 849 unsigned int elapsed_msec; 850 851 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 852 853 set_user_nice(current, -20); 854 855 while (!kthread_should_stop() && !reg->hr_unclean_stop) { 856 /* We track the time spent inside 857 * o2hb_do_disk_heartbeat so that we avoid more then 858 * hr_timeout_ms between disk writes. On busy systems 859 * this should result in a heartbeat which is less 860 * likely to time itself out. */ 861 do_gettimeofday(&before_hb); 862 863 i = 0; 864 do { 865 ret = o2hb_do_disk_heartbeat(reg); 866 } while (ret && ++i < 2); 867 868 do_gettimeofday(&after_hb); 869 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); 870 871 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", 872 before_hb.tv_sec, (unsigned long) before_hb.tv_usec, 873 after_hb.tv_sec, (unsigned long) after_hb.tv_usec, 874 elapsed_msec); 875 876 if (elapsed_msec < reg->hr_timeout_ms) { 877 /* the kthread api has blocked signals for us so no 878 * need to record the return value. */ 879 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 880 } 881 } 882 883 o2hb_disarm_write_timeout(reg); 884 885 /* unclean stop is only used in very bad situation */ 886 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 887 o2hb_shutdown_slot(®->hr_slots[i]); 888 889 /* Explicit down notification - avoid forcing the other nodes 890 * to timeout on this region when we could just as easily 891 * write a clear generation - thus indicating to them that 892 * this node has left this region. 893 * 894 * XXX: Should we skip this on unclean_stop? */ 895 o2hb_prepare_block(reg, 0); 896 ret = o2hb_issue_node_write(reg, &write_wc); 897 if (ret == 0) { 898 o2hb_wait_on_io(reg, &write_wc); 899 } else { 900 mlog_errno(ret); 901 } 902 903 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); 904 905 return 0; 906 } 907 908 void o2hb_init(void) 909 { 910 int i; 911 912 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 913 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 914 915 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 916 INIT_LIST_HEAD(&o2hb_live_slots[i]); 917 918 INIT_LIST_HEAD(&o2hb_node_events); 919 920 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 921 } 922 923 /* if we're already in a callback then we're already serialized by the sem */ 924 static void o2hb_fill_node_map_from_callback(unsigned long *map, 925 unsigned bytes) 926 { 927 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 928 929 memcpy(map, &o2hb_live_node_bitmap, bytes); 930 } 931 932 /* 933 * get a map of all nodes that are heartbeating in any regions 934 */ 935 void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 936 { 937 /* callers want to serialize this map and callbacks so that they 938 * can trust that they don't miss nodes coming to the party */ 939 down_read(&o2hb_callback_sem); 940 spin_lock(&o2hb_live_lock); 941 o2hb_fill_node_map_from_callback(map, bytes); 942 spin_unlock(&o2hb_live_lock); 943 up_read(&o2hb_callback_sem); 944 } 945 EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 946 947 /* 948 * heartbeat configfs bits. The heartbeat set is a default set under 949 * the cluster set in nodemanager.c. 950 */ 951 952 static struct o2hb_region *to_o2hb_region(struct config_item *item) 953 { 954 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 955 } 956 957 /* drop_item only drops its ref after killing the thread, nothing should 958 * be using the region anymore. this has to clean up any state that 959 * attributes might have built up. */ 960 static void o2hb_region_release(struct config_item *item) 961 { 962 int i; 963 struct page *page; 964 struct o2hb_region *reg = to_o2hb_region(item); 965 966 if (reg->hr_tmp_block) 967 kfree(reg->hr_tmp_block); 968 969 if (reg->hr_slot_data) { 970 for (i = 0; i < reg->hr_num_pages; i++) { 971 page = reg->hr_slot_data[i]; 972 if (page) 973 __free_page(page); 974 } 975 kfree(reg->hr_slot_data); 976 } 977 978 if (reg->hr_bdev) 979 blkdev_put(reg->hr_bdev); 980 981 if (reg->hr_slots) 982 kfree(reg->hr_slots); 983 984 spin_lock(&o2hb_live_lock); 985 list_del(®->hr_all_item); 986 spin_unlock(&o2hb_live_lock); 987 988 kfree(reg); 989 } 990 991 static int o2hb_read_block_input(struct o2hb_region *reg, 992 const char *page, 993 size_t count, 994 unsigned long *ret_bytes, 995 unsigned int *ret_bits) 996 { 997 unsigned long bytes; 998 char *p = (char *)page; 999 1000 bytes = simple_strtoul(p, &p, 0); 1001 if (!p || (*p && (*p != '\n'))) 1002 return -EINVAL; 1003 1004 /* Heartbeat and fs min / max block sizes are the same. */ 1005 if (bytes > 4096 || bytes < 512) 1006 return -ERANGE; 1007 if (hweight16(bytes) != 1) 1008 return -EINVAL; 1009 1010 if (ret_bytes) 1011 *ret_bytes = bytes; 1012 if (ret_bits) 1013 *ret_bits = ffs(bytes) - 1; 1014 1015 return 0; 1016 } 1017 1018 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, 1019 char *page) 1020 { 1021 return sprintf(page, "%u\n", reg->hr_block_bytes); 1022 } 1023 1024 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, 1025 const char *page, 1026 size_t count) 1027 { 1028 int status; 1029 unsigned long block_bytes; 1030 unsigned int block_bits; 1031 1032 if (reg->hr_bdev) 1033 return -EINVAL; 1034 1035 status = o2hb_read_block_input(reg, page, count, 1036 &block_bytes, &block_bits); 1037 if (status) 1038 return status; 1039 1040 reg->hr_block_bytes = (unsigned int)block_bytes; 1041 reg->hr_block_bits = block_bits; 1042 1043 return count; 1044 } 1045 1046 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, 1047 char *page) 1048 { 1049 return sprintf(page, "%llu\n", reg->hr_start_block); 1050 } 1051 1052 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, 1053 const char *page, 1054 size_t count) 1055 { 1056 unsigned long long tmp; 1057 char *p = (char *)page; 1058 1059 if (reg->hr_bdev) 1060 return -EINVAL; 1061 1062 tmp = simple_strtoull(p, &p, 0); 1063 if (!p || (*p && (*p != '\n'))) 1064 return -EINVAL; 1065 1066 reg->hr_start_block = tmp; 1067 1068 return count; 1069 } 1070 1071 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, 1072 char *page) 1073 { 1074 return sprintf(page, "%d\n", reg->hr_blocks); 1075 } 1076 1077 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, 1078 const char *page, 1079 size_t count) 1080 { 1081 unsigned long tmp; 1082 char *p = (char *)page; 1083 1084 if (reg->hr_bdev) 1085 return -EINVAL; 1086 1087 tmp = simple_strtoul(p, &p, 0); 1088 if (!p || (*p && (*p != '\n'))) 1089 return -EINVAL; 1090 1091 if (tmp > O2NM_MAX_NODES || tmp == 0) 1092 return -ERANGE; 1093 1094 reg->hr_blocks = (unsigned int)tmp; 1095 1096 return count; 1097 } 1098 1099 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, 1100 char *page) 1101 { 1102 unsigned int ret = 0; 1103 1104 if (reg->hr_bdev) 1105 ret = sprintf(page, "%s\n", reg->hr_dev_name); 1106 1107 return ret; 1108 } 1109 1110 static void o2hb_init_region_params(struct o2hb_region *reg) 1111 { 1112 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; 1113 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1114 1115 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1116 reg->hr_start_block, reg->hr_blocks); 1117 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1118 reg->hr_block_bytes, reg->hr_block_bits); 1119 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1120 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1121 } 1122 1123 static int o2hb_map_slot_data(struct o2hb_region *reg) 1124 { 1125 int i, j; 1126 unsigned int last_slot; 1127 unsigned int spp = reg->hr_slots_per_page; 1128 struct page *page; 1129 char *raw; 1130 struct o2hb_disk_slot *slot; 1131 1132 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1133 if (reg->hr_tmp_block == NULL) { 1134 mlog_errno(-ENOMEM); 1135 return -ENOMEM; 1136 } 1137 1138 reg->hr_slots = kcalloc(reg->hr_blocks, 1139 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1140 if (reg->hr_slots == NULL) { 1141 mlog_errno(-ENOMEM); 1142 return -ENOMEM; 1143 } 1144 1145 for(i = 0; i < reg->hr_blocks; i++) { 1146 slot = ®->hr_slots[i]; 1147 slot->ds_node_num = i; 1148 INIT_LIST_HEAD(&slot->ds_live_item); 1149 slot->ds_raw_block = NULL; 1150 } 1151 1152 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1153 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1154 "at %u blocks per page\n", 1155 reg->hr_num_pages, reg->hr_blocks, spp); 1156 1157 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1158 GFP_KERNEL); 1159 if (!reg->hr_slot_data) { 1160 mlog_errno(-ENOMEM); 1161 return -ENOMEM; 1162 } 1163 1164 for(i = 0; i < reg->hr_num_pages; i++) { 1165 page = alloc_page(GFP_KERNEL); 1166 if (!page) { 1167 mlog_errno(-ENOMEM); 1168 return -ENOMEM; 1169 } 1170 1171 reg->hr_slot_data[i] = page; 1172 1173 last_slot = i * spp; 1174 raw = page_address(page); 1175 for (j = 0; 1176 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1177 j++) { 1178 BUG_ON((j + last_slot) >= reg->hr_blocks); 1179 1180 slot = ®->hr_slots[j + last_slot]; 1181 slot->ds_raw_block = 1182 (struct o2hb_disk_heartbeat_block *) raw; 1183 1184 raw += reg->hr_block_bytes; 1185 } 1186 } 1187 1188 return 0; 1189 } 1190 1191 /* Read in all the slots available and populate the tracking 1192 * structures so that we can start with a baseline idea of what's 1193 * there. */ 1194 static int o2hb_populate_slot_data(struct o2hb_region *reg) 1195 { 1196 int ret, i; 1197 struct o2hb_disk_slot *slot; 1198 struct o2hb_disk_heartbeat_block *hb_block; 1199 1200 mlog_entry_void(); 1201 1202 ret = o2hb_read_slots(reg, reg->hr_blocks); 1203 if (ret) { 1204 mlog_errno(ret); 1205 goto out; 1206 } 1207 1208 /* We only want to get an idea of the values initially in each 1209 * slot, so we do no verification - o2hb_check_slot will 1210 * actually determine if each configured slot is valid and 1211 * whether any values have changed. */ 1212 for(i = 0; i < reg->hr_blocks; i++) { 1213 slot = ®->hr_slots[i]; 1214 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1215 1216 /* Only fill the values that o2hb_check_slot uses to 1217 * determine changing slots */ 1218 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1219 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1220 } 1221 1222 out: 1223 mlog_exit(ret); 1224 return ret; 1225 } 1226 1227 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1228 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, 1229 const char *page, 1230 size_t count) 1231 { 1232 struct task_struct *hb_task; 1233 long fd; 1234 int sectsize; 1235 char *p = (char *)page; 1236 struct file *filp = NULL; 1237 struct inode *inode = NULL; 1238 ssize_t ret = -EINVAL; 1239 1240 if (reg->hr_bdev) 1241 goto out; 1242 1243 /* We can't heartbeat without having had our node number 1244 * configured yet. */ 1245 if (o2nm_this_node() == O2NM_MAX_NODES) 1246 goto out; 1247 1248 fd = simple_strtol(p, &p, 0); 1249 if (!p || (*p && (*p != '\n'))) 1250 goto out; 1251 1252 if (fd < 0 || fd >= INT_MAX) 1253 goto out; 1254 1255 filp = fget(fd); 1256 if (filp == NULL) 1257 goto out; 1258 1259 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1260 reg->hr_block_bytes == 0) 1261 goto out; 1262 1263 inode = igrab(filp->f_mapping->host); 1264 if (inode == NULL) 1265 goto out; 1266 1267 if (!S_ISBLK(inode->i_mode)) 1268 goto out; 1269 1270 reg->hr_bdev = I_BDEV(filp->f_mapping->host); 1271 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0); 1272 if (ret) { 1273 reg->hr_bdev = NULL; 1274 goto out; 1275 } 1276 inode = NULL; 1277 1278 bdevname(reg->hr_bdev, reg->hr_dev_name); 1279 1280 sectsize = bdev_hardsect_size(reg->hr_bdev); 1281 if (sectsize != reg->hr_block_bytes) { 1282 mlog(ML_ERROR, 1283 "blocksize %u incorrect for device, expected %d", 1284 reg->hr_block_bytes, sectsize); 1285 ret = -EINVAL; 1286 goto out; 1287 } 1288 1289 o2hb_init_region_params(reg); 1290 1291 /* Generation of zero is invalid */ 1292 do { 1293 get_random_bytes(®->hr_generation, 1294 sizeof(reg->hr_generation)); 1295 } while (reg->hr_generation == 0); 1296 1297 ret = o2hb_map_slot_data(reg); 1298 if (ret) { 1299 mlog_errno(ret); 1300 goto out; 1301 } 1302 1303 ret = o2hb_populate_slot_data(reg); 1304 if (ret) { 1305 mlog_errno(ret); 1306 goto out; 1307 } 1308 1309 INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); 1310 1311 /* 1312 * A node is considered live after it has beat LIVE_THRESHOLD 1313 * times. We're not steady until we've given them a chance 1314 * _after_ our first read. 1315 */ 1316 atomic_set(®->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1); 1317 1318 hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1319 reg->hr_item.ci_name); 1320 if (IS_ERR(hb_task)) { 1321 ret = PTR_ERR(hb_task); 1322 mlog_errno(ret); 1323 goto out; 1324 } 1325 1326 spin_lock(&o2hb_live_lock); 1327 reg->hr_task = hb_task; 1328 spin_unlock(&o2hb_live_lock); 1329 1330 ret = wait_event_interruptible(o2hb_steady_queue, 1331 atomic_read(®->hr_steady_iterations) == 0); 1332 if (ret) { 1333 /* We got interrupted (hello ptrace!). Clean up */ 1334 spin_lock(&o2hb_live_lock); 1335 hb_task = reg->hr_task; 1336 reg->hr_task = NULL; 1337 spin_unlock(&o2hb_live_lock); 1338 1339 if (hb_task) 1340 kthread_stop(hb_task); 1341 goto out; 1342 } 1343 1344 /* Ok, we were woken. Make sure it wasn't by drop_item() */ 1345 spin_lock(&o2hb_live_lock); 1346 hb_task = reg->hr_task; 1347 spin_unlock(&o2hb_live_lock); 1348 1349 if (hb_task) 1350 ret = count; 1351 else 1352 ret = -EIO; 1353 1354 out: 1355 if (filp) 1356 fput(filp); 1357 if (inode) 1358 iput(inode); 1359 if (ret < 0) { 1360 if (reg->hr_bdev) { 1361 blkdev_put(reg->hr_bdev); 1362 reg->hr_bdev = NULL; 1363 } 1364 } 1365 return ret; 1366 } 1367 1368 static ssize_t o2hb_region_pid_read(struct o2hb_region *reg, 1369 char *page) 1370 { 1371 pid_t pid = 0; 1372 1373 spin_lock(&o2hb_live_lock); 1374 if (reg->hr_task) 1375 pid = task_pid_nr(reg->hr_task); 1376 spin_unlock(&o2hb_live_lock); 1377 1378 if (!pid) 1379 return 0; 1380 1381 return sprintf(page, "%u\n", pid); 1382 } 1383 1384 struct o2hb_region_attribute { 1385 struct configfs_attribute attr; 1386 ssize_t (*show)(struct o2hb_region *, char *); 1387 ssize_t (*store)(struct o2hb_region *, const char *, size_t); 1388 }; 1389 1390 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = { 1391 .attr = { .ca_owner = THIS_MODULE, 1392 .ca_name = "block_bytes", 1393 .ca_mode = S_IRUGO | S_IWUSR }, 1394 .show = o2hb_region_block_bytes_read, 1395 .store = o2hb_region_block_bytes_write, 1396 }; 1397 1398 static struct o2hb_region_attribute o2hb_region_attr_start_block = { 1399 .attr = { .ca_owner = THIS_MODULE, 1400 .ca_name = "start_block", 1401 .ca_mode = S_IRUGO | S_IWUSR }, 1402 .show = o2hb_region_start_block_read, 1403 .store = o2hb_region_start_block_write, 1404 }; 1405 1406 static struct o2hb_region_attribute o2hb_region_attr_blocks = { 1407 .attr = { .ca_owner = THIS_MODULE, 1408 .ca_name = "blocks", 1409 .ca_mode = S_IRUGO | S_IWUSR }, 1410 .show = o2hb_region_blocks_read, 1411 .store = o2hb_region_blocks_write, 1412 }; 1413 1414 static struct o2hb_region_attribute o2hb_region_attr_dev = { 1415 .attr = { .ca_owner = THIS_MODULE, 1416 .ca_name = "dev", 1417 .ca_mode = S_IRUGO | S_IWUSR }, 1418 .show = o2hb_region_dev_read, 1419 .store = o2hb_region_dev_write, 1420 }; 1421 1422 static struct o2hb_region_attribute o2hb_region_attr_pid = { 1423 .attr = { .ca_owner = THIS_MODULE, 1424 .ca_name = "pid", 1425 .ca_mode = S_IRUGO | S_IRUSR }, 1426 .show = o2hb_region_pid_read, 1427 }; 1428 1429 static struct configfs_attribute *o2hb_region_attrs[] = { 1430 &o2hb_region_attr_block_bytes.attr, 1431 &o2hb_region_attr_start_block.attr, 1432 &o2hb_region_attr_blocks.attr, 1433 &o2hb_region_attr_dev.attr, 1434 &o2hb_region_attr_pid.attr, 1435 NULL, 1436 }; 1437 1438 static ssize_t o2hb_region_show(struct config_item *item, 1439 struct configfs_attribute *attr, 1440 char *page) 1441 { 1442 struct o2hb_region *reg = to_o2hb_region(item); 1443 struct o2hb_region_attribute *o2hb_region_attr = 1444 container_of(attr, struct o2hb_region_attribute, attr); 1445 ssize_t ret = 0; 1446 1447 if (o2hb_region_attr->show) 1448 ret = o2hb_region_attr->show(reg, page); 1449 return ret; 1450 } 1451 1452 static ssize_t o2hb_region_store(struct config_item *item, 1453 struct configfs_attribute *attr, 1454 const char *page, size_t count) 1455 { 1456 struct o2hb_region *reg = to_o2hb_region(item); 1457 struct o2hb_region_attribute *o2hb_region_attr = 1458 container_of(attr, struct o2hb_region_attribute, attr); 1459 ssize_t ret = -EINVAL; 1460 1461 if (o2hb_region_attr->store) 1462 ret = o2hb_region_attr->store(reg, page, count); 1463 return ret; 1464 } 1465 1466 static struct configfs_item_operations o2hb_region_item_ops = { 1467 .release = o2hb_region_release, 1468 .show_attribute = o2hb_region_show, 1469 .store_attribute = o2hb_region_store, 1470 }; 1471 1472 static struct config_item_type o2hb_region_type = { 1473 .ct_item_ops = &o2hb_region_item_ops, 1474 .ct_attrs = o2hb_region_attrs, 1475 .ct_owner = THIS_MODULE, 1476 }; 1477 1478 /* heartbeat set */ 1479 1480 struct o2hb_heartbeat_group { 1481 struct config_group hs_group; 1482 /* some stuff? */ 1483 }; 1484 1485 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 1486 { 1487 return group ? 1488 container_of(group, struct o2hb_heartbeat_group, hs_group) 1489 : NULL; 1490 } 1491 1492 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 1493 const char *name) 1494 { 1495 struct o2hb_region *reg = NULL; 1496 1497 reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL); 1498 if (reg == NULL) 1499 return ERR_PTR(-ENOMEM); 1500 1501 config_item_init_type_name(®->hr_item, name, &o2hb_region_type); 1502 1503 spin_lock(&o2hb_live_lock); 1504 list_add_tail(®->hr_all_item, &o2hb_all_regions); 1505 spin_unlock(&o2hb_live_lock); 1506 1507 return ®->hr_item; 1508 } 1509 1510 static void o2hb_heartbeat_group_drop_item(struct config_group *group, 1511 struct config_item *item) 1512 { 1513 struct task_struct *hb_task; 1514 struct o2hb_region *reg = to_o2hb_region(item); 1515 1516 /* stop the thread when the user removes the region dir */ 1517 spin_lock(&o2hb_live_lock); 1518 hb_task = reg->hr_task; 1519 reg->hr_task = NULL; 1520 spin_unlock(&o2hb_live_lock); 1521 1522 if (hb_task) 1523 kthread_stop(hb_task); 1524 1525 /* 1526 * If we're racing a dev_write(), we need to wake them. They will 1527 * check reg->hr_task 1528 */ 1529 if (atomic_read(®->hr_steady_iterations) != 0) { 1530 atomic_set(®->hr_steady_iterations, 0); 1531 wake_up(&o2hb_steady_queue); 1532 } 1533 1534 config_item_put(item); 1535 } 1536 1537 struct o2hb_heartbeat_group_attribute { 1538 struct configfs_attribute attr; 1539 ssize_t (*show)(struct o2hb_heartbeat_group *, char *); 1540 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t); 1541 }; 1542 1543 static ssize_t o2hb_heartbeat_group_show(struct config_item *item, 1544 struct configfs_attribute *attr, 1545 char *page) 1546 { 1547 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1548 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1549 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1550 ssize_t ret = 0; 1551 1552 if (o2hb_heartbeat_group_attr->show) 1553 ret = o2hb_heartbeat_group_attr->show(reg, page); 1554 return ret; 1555 } 1556 1557 static ssize_t o2hb_heartbeat_group_store(struct config_item *item, 1558 struct configfs_attribute *attr, 1559 const char *page, size_t count) 1560 { 1561 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1562 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1563 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1564 ssize_t ret = -EINVAL; 1565 1566 if (o2hb_heartbeat_group_attr->store) 1567 ret = o2hb_heartbeat_group_attr->store(reg, page, count); 1568 return ret; 1569 } 1570 1571 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group, 1572 char *page) 1573 { 1574 return sprintf(page, "%u\n", o2hb_dead_threshold); 1575 } 1576 1577 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group, 1578 const char *page, 1579 size_t count) 1580 { 1581 unsigned long tmp; 1582 char *p = (char *)page; 1583 1584 tmp = simple_strtoul(p, &p, 10); 1585 if (!p || (*p && (*p != '\n'))) 1586 return -EINVAL; 1587 1588 /* this will validate ranges for us. */ 1589 o2hb_dead_threshold_set((unsigned int) tmp); 1590 1591 return count; 1592 } 1593 1594 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = { 1595 .attr = { .ca_owner = THIS_MODULE, 1596 .ca_name = "dead_threshold", 1597 .ca_mode = S_IRUGO | S_IWUSR }, 1598 .show = o2hb_heartbeat_group_threshold_show, 1599 .store = o2hb_heartbeat_group_threshold_store, 1600 }; 1601 1602 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 1603 &o2hb_heartbeat_group_attr_threshold.attr, 1604 NULL, 1605 }; 1606 1607 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = { 1608 .show_attribute = o2hb_heartbeat_group_show, 1609 .store_attribute = o2hb_heartbeat_group_store, 1610 }; 1611 1612 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 1613 .make_item = o2hb_heartbeat_group_make_item, 1614 .drop_item = o2hb_heartbeat_group_drop_item, 1615 }; 1616 1617 static struct config_item_type o2hb_heartbeat_group_type = { 1618 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 1619 .ct_item_ops = &o2hb_hearbeat_group_item_ops, 1620 .ct_attrs = o2hb_heartbeat_group_attrs, 1621 .ct_owner = THIS_MODULE, 1622 }; 1623 1624 /* this is just here to avoid touching group in heartbeat.h which the 1625 * entire damn world #includes */ 1626 struct config_group *o2hb_alloc_hb_set(void) 1627 { 1628 struct o2hb_heartbeat_group *hs = NULL; 1629 struct config_group *ret = NULL; 1630 1631 hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 1632 if (hs == NULL) 1633 goto out; 1634 1635 config_group_init_type_name(&hs->hs_group, "heartbeat", 1636 &o2hb_heartbeat_group_type); 1637 1638 ret = &hs->hs_group; 1639 out: 1640 if (ret == NULL) 1641 kfree(hs); 1642 return ret; 1643 } 1644 1645 void o2hb_free_hb_set(struct config_group *group) 1646 { 1647 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 1648 kfree(hs); 1649 } 1650 1651 /* hb callback registration and issueing */ 1652 1653 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 1654 { 1655 if (type == O2HB_NUM_CB) 1656 return ERR_PTR(-EINVAL); 1657 1658 return &o2hb_callbacks[type]; 1659 } 1660 1661 void o2hb_setup_callback(struct o2hb_callback_func *hc, 1662 enum o2hb_callback_type type, 1663 o2hb_cb_func *func, 1664 void *data, 1665 int priority) 1666 { 1667 INIT_LIST_HEAD(&hc->hc_item); 1668 hc->hc_func = func; 1669 hc->hc_data = data; 1670 hc->hc_priority = priority; 1671 hc->hc_type = type; 1672 hc->hc_magic = O2HB_CB_MAGIC; 1673 } 1674 EXPORT_SYMBOL_GPL(o2hb_setup_callback); 1675 1676 static struct o2hb_region *o2hb_find_region(const char *region_uuid) 1677 { 1678 struct o2hb_region *p, *reg = NULL; 1679 1680 assert_spin_locked(&o2hb_live_lock); 1681 1682 list_for_each_entry(p, &o2hb_all_regions, hr_all_item) { 1683 if (!strcmp(region_uuid, config_item_name(&p->hr_item))) { 1684 reg = p; 1685 break; 1686 } 1687 } 1688 1689 return reg; 1690 } 1691 1692 static int o2hb_region_get(const char *region_uuid) 1693 { 1694 int ret = 0; 1695 struct o2hb_region *reg; 1696 1697 spin_lock(&o2hb_live_lock); 1698 1699 reg = o2hb_find_region(region_uuid); 1700 if (!reg) 1701 ret = -ENOENT; 1702 spin_unlock(&o2hb_live_lock); 1703 1704 if (ret) 1705 goto out; 1706 1707 ret = o2nm_depend_this_node(); 1708 if (ret) 1709 goto out; 1710 1711 ret = o2nm_depend_item(®->hr_item); 1712 if (ret) 1713 o2nm_undepend_this_node(); 1714 1715 out: 1716 return ret; 1717 } 1718 1719 static void o2hb_region_put(const char *region_uuid) 1720 { 1721 struct o2hb_region *reg; 1722 1723 spin_lock(&o2hb_live_lock); 1724 1725 reg = o2hb_find_region(region_uuid); 1726 1727 spin_unlock(&o2hb_live_lock); 1728 1729 if (reg) { 1730 o2nm_undepend_item(®->hr_item); 1731 o2nm_undepend_this_node(); 1732 } 1733 } 1734 1735 int o2hb_register_callback(const char *region_uuid, 1736 struct o2hb_callback_func *hc) 1737 { 1738 struct o2hb_callback_func *tmp; 1739 struct list_head *iter; 1740 struct o2hb_callback *hbcall; 1741 int ret; 1742 1743 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1744 BUG_ON(!list_empty(&hc->hc_item)); 1745 1746 hbcall = hbcall_from_type(hc->hc_type); 1747 if (IS_ERR(hbcall)) { 1748 ret = PTR_ERR(hbcall); 1749 goto out; 1750 } 1751 1752 if (region_uuid) { 1753 ret = o2hb_region_get(region_uuid); 1754 if (ret) 1755 goto out; 1756 } 1757 1758 down_write(&o2hb_callback_sem); 1759 1760 list_for_each(iter, &hbcall->list) { 1761 tmp = list_entry(iter, struct o2hb_callback_func, hc_item); 1762 if (hc->hc_priority < tmp->hc_priority) { 1763 list_add_tail(&hc->hc_item, iter); 1764 break; 1765 } 1766 } 1767 if (list_empty(&hc->hc_item)) 1768 list_add_tail(&hc->hc_item, &hbcall->list); 1769 1770 up_write(&o2hb_callback_sem); 1771 ret = 0; 1772 out: 1773 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n", 1774 ret, __builtin_return_address(0), hc); 1775 return ret; 1776 } 1777 EXPORT_SYMBOL_GPL(o2hb_register_callback); 1778 1779 void o2hb_unregister_callback(const char *region_uuid, 1780 struct o2hb_callback_func *hc) 1781 { 1782 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1783 1784 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n", 1785 __builtin_return_address(0), hc); 1786 1787 /* XXX Can this happen _with_ a region reference? */ 1788 if (list_empty(&hc->hc_item)) 1789 return; 1790 1791 if (region_uuid) 1792 o2hb_region_put(region_uuid); 1793 1794 down_write(&o2hb_callback_sem); 1795 1796 list_del_init(&hc->hc_item); 1797 1798 up_write(&o2hb_callback_sem); 1799 } 1800 EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 1801 1802 int o2hb_check_node_heartbeating(u8 node_num) 1803 { 1804 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1805 1806 o2hb_fill_node_map(testing_map, sizeof(testing_map)); 1807 if (!test_bit(node_num, testing_map)) { 1808 mlog(ML_HEARTBEAT, 1809 "node (%u) does not have heartbeating enabled.\n", 1810 node_num); 1811 return 0; 1812 } 1813 1814 return 1; 1815 } 1816 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating); 1817 1818 int o2hb_check_node_heartbeating_from_callback(u8 node_num) 1819 { 1820 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1821 1822 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 1823 if (!test_bit(node_num, testing_map)) { 1824 mlog(ML_HEARTBEAT, 1825 "node (%u) does not have heartbeating enabled.\n", 1826 node_num); 1827 return 0; 1828 } 1829 1830 return 1; 1831 } 1832 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 1833 1834 /* Makes sure our local node is configured with a node number, and is 1835 * heartbeating. */ 1836 int o2hb_check_local_node_heartbeating(void) 1837 { 1838 u8 node_num; 1839 1840 /* if this node was set then we have networking */ 1841 node_num = o2nm_this_node(); 1842 if (node_num == O2NM_MAX_NODES) { 1843 mlog(ML_HEARTBEAT, "this node has not been configured.\n"); 1844 return 0; 1845 } 1846 1847 return o2hb_check_node_heartbeating(node_num); 1848 } 1849 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating); 1850 1851 /* 1852 * this is just a hack until we get the plumbing which flips file systems 1853 * read only and drops the hb ref instead of killing the node dead. 1854 */ 1855 void o2hb_stop_all_regions(void) 1856 { 1857 struct o2hb_region *reg; 1858 1859 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 1860 1861 spin_lock(&o2hb_live_lock); 1862 1863 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 1864 reg->hr_unclean_stop = 1; 1865 1866 spin_unlock(&o2hb_live_lock); 1867 } 1868 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 1869