1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 6 */ 7 8 #include <linux/kernel.h> 9 #include <linux/sched.h> 10 #include <linux/jiffies.h> 11 #include <linux/module.h> 12 #include <linux/fs.h> 13 #include <linux/bio.h> 14 #include <linux/blkdev.h> 15 #include <linux/delay.h> 16 #include <linux/file.h> 17 #include <linux/kthread.h> 18 #include <linux/configfs.h> 19 #include <linux/random.h> 20 #include <linux/crc32.h> 21 #include <linux/time.h> 22 #include <linux/debugfs.h> 23 #include <linux/slab.h> 24 #include <linux/bitmap.h> 25 #include <linux/ktime.h> 26 #include "heartbeat.h" 27 #include "tcp.h" 28 #include "nodemanager.h" 29 #include "quorum.h" 30 31 #include "masklog.h" 32 33 34 /* 35 * The first heartbeat pass had one global thread that would serialize all hb 36 * callback calls. This global serializing sem should only be removed once 37 * we've made sure that all callees can deal with being called concurrently 38 * from multiple hb region threads. 39 */ 40 static DECLARE_RWSEM(o2hb_callback_sem); 41 42 /* 43 * multiple hb threads are watching multiple regions. A node is live 44 * whenever any of the threads sees activity from the node in its region. 45 */ 46 static DEFINE_SPINLOCK(o2hb_live_lock); 47 static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 48 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 49 static LIST_HEAD(o2hb_node_events); 50 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 51 52 /* 53 * In global heartbeat, we maintain a series of region bitmaps. 54 * - o2hb_region_bitmap allows us to limit the region number to max region. 55 * - o2hb_live_region_bitmap tracks live regions (seen steady iterations). 56 * - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes 57 * heartbeat on it. 58 * - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts. 59 */ 60 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 61 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 62 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 63 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 64 65 #define O2HB_DB_TYPE_LIVENODES 0 66 #define O2HB_DB_TYPE_LIVEREGIONS 1 67 #define O2HB_DB_TYPE_QUORUMREGIONS 2 68 #define O2HB_DB_TYPE_FAILEDREGIONS 3 69 #define O2HB_DB_TYPE_REGION_LIVENODES 4 70 #define O2HB_DB_TYPE_REGION_NUMBER 5 71 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME 6 72 #define O2HB_DB_TYPE_REGION_PINNED 7 73 struct o2hb_debug_buf { 74 int db_type; 75 int db_size; 76 int db_len; 77 void *db_data; 78 }; 79 80 static struct o2hb_debug_buf *o2hb_db_livenodes; 81 static struct o2hb_debug_buf *o2hb_db_liveregions; 82 static struct o2hb_debug_buf *o2hb_db_quorumregions; 83 static struct o2hb_debug_buf *o2hb_db_failedregions; 84 85 #define O2HB_DEBUG_DIR "o2hb" 86 #define O2HB_DEBUG_LIVENODES "livenodes" 87 #define O2HB_DEBUG_LIVEREGIONS "live_regions" 88 #define O2HB_DEBUG_QUORUMREGIONS "quorum_regions" 89 #define O2HB_DEBUG_FAILEDREGIONS "failed_regions" 90 #define O2HB_DEBUG_REGION_NUMBER "num" 91 #define O2HB_DEBUG_REGION_ELAPSED_TIME "elapsed_time_in_ms" 92 #define O2HB_DEBUG_REGION_PINNED "pinned" 93 94 static struct dentry *o2hb_debug_dir; 95 96 static LIST_HEAD(o2hb_all_regions); 97 98 static struct o2hb_callback { 99 struct list_head list; 100 } o2hb_callbacks[O2HB_NUM_CB]; 101 102 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 103 104 #define O2HB_DEFAULT_BLOCK_BITS 9 105 106 enum o2hb_heartbeat_modes { 107 O2HB_HEARTBEAT_LOCAL = 0, 108 O2HB_HEARTBEAT_GLOBAL, 109 O2HB_HEARTBEAT_NUM_MODES, 110 }; 111 112 static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = { 113 "local", /* O2HB_HEARTBEAT_LOCAL */ 114 "global", /* O2HB_HEARTBEAT_GLOBAL */ 115 }; 116 117 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 118 static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL; 119 120 /* 121 * o2hb_dependent_users tracks the number of registered callbacks that depend 122 * on heartbeat. o2net and o2dlm are two entities that register this callback. 123 * However only o2dlm depends on the heartbeat. It does not want the heartbeat 124 * to stop while a dlm domain is still active. 125 */ 126 static unsigned int o2hb_dependent_users; 127 128 /* 129 * In global heartbeat mode, all regions are pinned if there are one or more 130 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All 131 * regions are unpinned if the region count exceeds the cut off or the number 132 * of dependent users falls to zero. 133 */ 134 #define O2HB_PIN_CUT_OFF 3 135 136 /* 137 * In local heartbeat mode, we assume the dlm domain name to be the same as 138 * region uuid. This is true for domains created for the file system but not 139 * necessarily true for userdlm domains. This is a known limitation. 140 * 141 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution 142 * works for both file system and userdlm domains. 143 */ 144 static int o2hb_region_pin(const char *region_uuid); 145 static void o2hb_region_unpin(const char *region_uuid); 146 147 /* Only sets a new threshold if there are no active regions. 148 * 149 * No locking or otherwise interesting code is required for reading 150 * o2hb_dead_threshold as it can't change once regions are active and 151 * it's not interesting to anyone until then anyway. */ 152 static void o2hb_dead_threshold_set(unsigned int threshold) 153 { 154 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 155 spin_lock(&o2hb_live_lock); 156 if (list_empty(&o2hb_all_regions)) 157 o2hb_dead_threshold = threshold; 158 spin_unlock(&o2hb_live_lock); 159 } 160 } 161 162 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode) 163 { 164 int ret = -1; 165 166 if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) { 167 spin_lock(&o2hb_live_lock); 168 if (list_empty(&o2hb_all_regions)) { 169 o2hb_heartbeat_mode = hb_mode; 170 ret = 0; 171 } 172 spin_unlock(&o2hb_live_lock); 173 } 174 175 return ret; 176 } 177 178 struct o2hb_node_event { 179 struct list_head hn_item; 180 enum o2hb_callback_type hn_event_type; 181 struct o2nm_node *hn_node; 182 int hn_node_num; 183 }; 184 185 struct o2hb_disk_slot { 186 struct o2hb_disk_heartbeat_block *ds_raw_block; 187 u8 ds_node_num; 188 u64 ds_last_time; 189 u64 ds_last_generation; 190 u16 ds_equal_samples; 191 u16 ds_changed_samples; 192 struct list_head ds_live_item; 193 }; 194 195 /* each thread owns a region.. when we're asked to tear down the region 196 * we ask the thread to stop, who cleans up the region */ 197 struct o2hb_region { 198 struct config_item hr_item; 199 200 struct list_head hr_all_item; 201 unsigned hr_unclean_stop:1, 202 hr_aborted_start:1, 203 hr_item_pinned:1, 204 hr_item_dropped:1, 205 hr_node_deleted:1; 206 207 /* protected by the hr_callback_sem */ 208 struct task_struct *hr_task; 209 210 unsigned int hr_blocks; 211 unsigned long long hr_start_block; 212 213 unsigned int hr_block_bits; 214 unsigned int hr_block_bytes; 215 216 unsigned int hr_slots_per_page; 217 unsigned int hr_num_pages; 218 219 struct page **hr_slot_data; 220 struct block_device *hr_bdev; 221 struct o2hb_disk_slot *hr_slots; 222 223 /* live node map of this region */ 224 unsigned long hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 225 unsigned int hr_region_num; 226 227 struct dentry *hr_debug_dir; 228 struct o2hb_debug_buf *hr_db_livenodes; 229 struct o2hb_debug_buf *hr_db_regnum; 230 struct o2hb_debug_buf *hr_db_elapsed_time; 231 struct o2hb_debug_buf *hr_db_pinned; 232 233 /* let the person setting up hb wait for it to return until it 234 * has reached a 'steady' state. This will be fixed when we have 235 * a more complete api that doesn't lead to this sort of fragility. */ 236 atomic_t hr_steady_iterations; 237 238 /* terminate o2hb thread if it does not reach steady state 239 * (hr_steady_iterations == 0) within hr_unsteady_iterations */ 240 atomic_t hr_unsteady_iterations; 241 242 char hr_dev_name[BDEVNAME_SIZE]; 243 244 unsigned int hr_timeout_ms; 245 246 /* randomized as the region goes up and down so that a node 247 * recognizes a node going up and down in one iteration */ 248 u64 hr_generation; 249 250 struct delayed_work hr_write_timeout_work; 251 unsigned long hr_last_timeout_start; 252 253 /* negotiate timer, used to negotiate extending hb timeout. */ 254 struct delayed_work hr_nego_timeout_work; 255 unsigned long hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 256 257 /* Used during o2hb_check_slot to hold a copy of the block 258 * being checked because we temporarily have to zero out the 259 * crc field. */ 260 struct o2hb_disk_heartbeat_block *hr_tmp_block; 261 262 /* Message key for negotiate timeout message. */ 263 unsigned int hr_key; 264 struct list_head hr_handler_list; 265 266 /* last hb status, 0 for success, other value for error. */ 267 int hr_last_hb_status; 268 }; 269 270 struct o2hb_bio_wait_ctxt { 271 atomic_t wc_num_reqs; 272 struct completion wc_io_complete; 273 int wc_error; 274 }; 275 276 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2) 277 278 enum { 279 O2HB_NEGO_TIMEOUT_MSG = 1, 280 O2HB_NEGO_APPROVE_MSG = 2, 281 }; 282 283 struct o2hb_nego_msg { 284 u8 node_num; 285 }; 286 287 static void o2hb_write_timeout(struct work_struct *work) 288 { 289 int failed, quorum; 290 struct o2hb_region *reg = 291 container_of(work, struct o2hb_region, 292 hr_write_timeout_work.work); 293 294 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 295 "milliseconds\n", reg->hr_dev_name, 296 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 297 298 if (o2hb_global_heartbeat_active()) { 299 spin_lock(&o2hb_live_lock); 300 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) 301 set_bit(reg->hr_region_num, o2hb_failed_region_bitmap); 302 failed = bitmap_weight(o2hb_failed_region_bitmap, 303 O2NM_MAX_REGIONS); 304 quorum = bitmap_weight(o2hb_quorum_region_bitmap, 305 O2NM_MAX_REGIONS); 306 spin_unlock(&o2hb_live_lock); 307 308 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n", 309 quorum, failed); 310 311 /* 312 * Fence if the number of failed regions >= half the number 313 * of quorum regions 314 */ 315 if ((failed << 1) < quorum) 316 return; 317 } 318 319 o2quo_disk_timeout(); 320 } 321 322 static void o2hb_arm_timeout(struct o2hb_region *reg) 323 { 324 /* Arm writeout only after thread reaches steady state */ 325 if (atomic_read(®->hr_steady_iterations) != 0) 326 return; 327 328 mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n", 329 O2HB_MAX_WRITE_TIMEOUT_MS); 330 331 if (o2hb_global_heartbeat_active()) { 332 spin_lock(&o2hb_live_lock); 333 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap); 334 spin_unlock(&o2hb_live_lock); 335 } 336 cancel_delayed_work(®->hr_write_timeout_work); 337 schedule_delayed_work(®->hr_write_timeout_work, 338 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 339 340 cancel_delayed_work(®->hr_nego_timeout_work); 341 /* negotiate timeout must be less than write timeout. */ 342 schedule_delayed_work(®->hr_nego_timeout_work, 343 msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS)); 344 memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap)); 345 } 346 347 static void o2hb_disarm_timeout(struct o2hb_region *reg) 348 { 349 cancel_delayed_work_sync(®->hr_write_timeout_work); 350 cancel_delayed_work_sync(®->hr_nego_timeout_work); 351 } 352 353 static int o2hb_send_nego_msg(int key, int type, u8 target) 354 { 355 struct o2hb_nego_msg msg; 356 int status, ret; 357 358 msg.node_num = o2nm_this_node(); 359 again: 360 ret = o2net_send_message(type, key, &msg, sizeof(msg), 361 target, &status); 362 363 if (ret == -EAGAIN || ret == -ENOMEM) { 364 msleep(100); 365 goto again; 366 } 367 368 return ret; 369 } 370 371 static void o2hb_nego_timeout(struct work_struct *work) 372 { 373 unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 374 int master_node, i, ret; 375 struct o2hb_region *reg; 376 377 reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work); 378 /* don't negotiate timeout if last hb failed since it is very 379 * possible io failed. Should let write timeout fence self. 380 */ 381 if (reg->hr_last_hb_status) 382 return; 383 384 o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap)); 385 /* lowest node as master node to make negotiate decision. */ 386 master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0); 387 388 if (master_node == o2nm_this_node()) { 389 if (!test_bit(master_node, reg->hr_nego_node_bitmap)) { 390 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n", 391 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, 392 config_item_name(®->hr_item), reg->hr_dev_name); 393 set_bit(master_node, reg->hr_nego_node_bitmap); 394 } 395 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap, 396 sizeof(reg->hr_nego_node_bitmap))) { 397 /* check negotiate bitmap every second to do timeout 398 * approve decision. 399 */ 400 schedule_delayed_work(®->hr_nego_timeout_work, 401 msecs_to_jiffies(1000)); 402 403 return; 404 } 405 406 printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n", 407 config_item_name(®->hr_item), reg->hr_dev_name); 408 /* approve negotiate timeout request. */ 409 o2hb_arm_timeout(reg); 410 411 i = -1; 412 while ((i = find_next_bit(live_node_bitmap, 413 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 414 if (i == master_node) 415 continue; 416 417 mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i); 418 ret = o2hb_send_nego_msg(reg->hr_key, 419 O2HB_NEGO_APPROVE_MSG, i); 420 if (ret) 421 mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n", 422 i, ret); 423 } 424 } else { 425 /* negotiate timeout with master node. */ 426 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n", 427 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(®->hr_item), 428 reg->hr_dev_name, master_node); 429 ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG, 430 master_node); 431 if (ret) 432 mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n", 433 master_node, ret); 434 } 435 } 436 437 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data, 438 void **ret_data) 439 { 440 struct o2hb_region *reg = data; 441 struct o2hb_nego_msg *nego_msg; 442 443 nego_msg = (struct o2hb_nego_msg *)msg->buf; 444 printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n", 445 nego_msg->node_num, config_item_name(®->hr_item), reg->hr_dev_name); 446 if (nego_msg->node_num < O2NM_MAX_NODES) 447 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap); 448 else 449 mlog(ML_ERROR, "got nego timeout message from bad node.\n"); 450 451 return 0; 452 } 453 454 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data, 455 void **ret_data) 456 { 457 struct o2hb_region *reg = data; 458 459 printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n", 460 config_item_name(®->hr_item), reg->hr_dev_name); 461 o2hb_arm_timeout(reg); 462 return 0; 463 } 464 465 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) 466 { 467 atomic_set(&wc->wc_num_reqs, 1); 468 init_completion(&wc->wc_io_complete); 469 wc->wc_error = 0; 470 } 471 472 /* Used in error paths too */ 473 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 474 unsigned int num) 475 { 476 /* sadly atomic_sub_and_test() isn't available on all platforms. The 477 * good news is that the fast path only completes one at a time */ 478 while(num--) { 479 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 480 BUG_ON(num > 0); 481 complete(&wc->wc_io_complete); 482 } 483 } 484 } 485 486 static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc) 487 { 488 o2hb_bio_wait_dec(wc, 1); 489 wait_for_completion(&wc->wc_io_complete); 490 } 491 492 static void o2hb_bio_end_io(struct bio *bio) 493 { 494 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 495 496 if (bio->bi_status) { 497 mlog(ML_ERROR, "IO Error %d\n", bio->bi_status); 498 wc->wc_error = blk_status_to_errno(bio->bi_status); 499 } 500 501 o2hb_bio_wait_dec(wc, 1); 502 bio_put(bio); 503 } 504 505 /* Setup a Bio to cover I/O against num_slots slots starting at 506 * start_slot. */ 507 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 508 struct o2hb_bio_wait_ctxt *wc, 509 unsigned int *current_slot, 510 unsigned int max_slots, int op, 511 int op_flags) 512 { 513 int len, current_page; 514 unsigned int vec_len, vec_start; 515 unsigned int bits = reg->hr_block_bits; 516 unsigned int spp = reg->hr_slots_per_page; 517 unsigned int cs = *current_slot; 518 struct bio *bio; 519 struct page *page; 520 521 /* Testing has shown this allocation to take long enough under 522 * GFP_KERNEL that the local node can get fenced. It would be 523 * nicest if we could pre-allocate these bios and avoid this 524 * all together. */ 525 bio = bio_alloc(GFP_ATOMIC, 16); 526 if (!bio) { 527 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 528 bio = ERR_PTR(-ENOMEM); 529 goto bail; 530 } 531 532 /* Must put everything in 512 byte sectors for the bio... */ 533 bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9); 534 bio_set_dev(bio, reg->hr_bdev); 535 bio->bi_private = wc; 536 bio->bi_end_io = o2hb_bio_end_io; 537 bio_set_op_attrs(bio, op, op_flags); 538 539 vec_start = (cs << bits) % PAGE_SIZE; 540 while(cs < max_slots) { 541 current_page = cs / spp; 542 page = reg->hr_slot_data[current_page]; 543 544 vec_len = min(PAGE_SIZE - vec_start, 545 (max_slots-cs) * (PAGE_SIZE/spp) ); 546 547 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 548 current_page, vec_len, vec_start); 549 550 len = bio_add_page(bio, page, vec_len, vec_start); 551 if (len != vec_len) break; 552 553 cs += vec_len / (PAGE_SIZE/spp); 554 vec_start = 0; 555 } 556 557 bail: 558 *current_slot = cs; 559 return bio; 560 } 561 562 static int o2hb_read_slots(struct o2hb_region *reg, 563 unsigned int begin_slot, 564 unsigned int max_slots) 565 { 566 unsigned int current_slot = begin_slot; 567 int status; 568 struct o2hb_bio_wait_ctxt wc; 569 struct bio *bio; 570 571 o2hb_bio_wait_init(&wc); 572 573 while(current_slot < max_slots) { 574 bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots, 575 REQ_OP_READ, 0); 576 if (IS_ERR(bio)) { 577 status = PTR_ERR(bio); 578 mlog_errno(status); 579 goto bail_and_wait; 580 } 581 582 atomic_inc(&wc.wc_num_reqs); 583 submit_bio(bio); 584 } 585 586 status = 0; 587 588 bail_and_wait: 589 o2hb_wait_on_io(&wc); 590 if (wc.wc_error && !status) 591 status = wc.wc_error; 592 593 return status; 594 } 595 596 static int o2hb_issue_node_write(struct o2hb_region *reg, 597 struct o2hb_bio_wait_ctxt *write_wc) 598 { 599 int status; 600 unsigned int slot; 601 struct bio *bio; 602 603 o2hb_bio_wait_init(write_wc); 604 605 slot = o2nm_this_node(); 606 607 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE, 608 REQ_SYNC); 609 if (IS_ERR(bio)) { 610 status = PTR_ERR(bio); 611 mlog_errno(status); 612 goto bail; 613 } 614 615 atomic_inc(&write_wc->wc_num_reqs); 616 submit_bio(bio); 617 618 status = 0; 619 bail: 620 return status; 621 } 622 623 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 624 struct o2hb_disk_heartbeat_block *hb_block) 625 { 626 __le32 old_cksum; 627 u32 ret; 628 629 /* We want to compute the block crc with a 0 value in the 630 * hb_cksum field. Save it off here and replace after the 631 * crc. */ 632 old_cksum = hb_block->hb_cksum; 633 hb_block->hb_cksum = 0; 634 635 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 636 637 hb_block->hb_cksum = old_cksum; 638 639 return ret; 640 } 641 642 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 643 { 644 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " 645 "cksum = 0x%x, generation 0x%llx\n", 646 (long long)le64_to_cpu(hb_block->hb_seq), 647 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), 648 (long long)le64_to_cpu(hb_block->hb_generation)); 649 } 650 651 static int o2hb_verify_crc(struct o2hb_region *reg, 652 struct o2hb_disk_heartbeat_block *hb_block) 653 { 654 u32 read, computed; 655 656 read = le32_to_cpu(hb_block->hb_cksum); 657 computed = o2hb_compute_block_crc_le(reg, hb_block); 658 659 return read == computed; 660 } 661 662 /* 663 * Compare the slot data with what we wrote in the last iteration. 664 * If the match fails, print an appropriate error message. This is to 665 * detect errors like... another node hearting on the same slot, 666 * flaky device that is losing writes, etc. 667 * Returns 1 if check succeeds, 0 otherwise. 668 */ 669 static int o2hb_check_own_slot(struct o2hb_region *reg) 670 { 671 struct o2hb_disk_slot *slot; 672 struct o2hb_disk_heartbeat_block *hb_block; 673 char *errstr; 674 675 slot = ®->hr_slots[o2nm_this_node()]; 676 /* Don't check on our 1st timestamp */ 677 if (!slot->ds_last_time) 678 return 0; 679 680 hb_block = slot->ds_raw_block; 681 if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time && 682 le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation && 683 hb_block->hb_node == slot->ds_node_num) 684 return 1; 685 686 #define ERRSTR1 "Another node is heartbeating on device" 687 #define ERRSTR2 "Heartbeat generation mismatch on device" 688 #define ERRSTR3 "Heartbeat sequence mismatch on device" 689 690 if (hb_block->hb_node != slot->ds_node_num) 691 errstr = ERRSTR1; 692 else if (le64_to_cpu(hb_block->hb_generation) != 693 slot->ds_last_generation) 694 errstr = ERRSTR2; 695 else 696 errstr = ERRSTR3; 697 698 mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), " 699 "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name, 700 slot->ds_node_num, (unsigned long long)slot->ds_last_generation, 701 (unsigned long long)slot->ds_last_time, hb_block->hb_node, 702 (unsigned long long)le64_to_cpu(hb_block->hb_generation), 703 (unsigned long long)le64_to_cpu(hb_block->hb_seq)); 704 705 return 0; 706 } 707 708 static inline void o2hb_prepare_block(struct o2hb_region *reg, 709 u64 generation) 710 { 711 int node_num; 712 u64 cputime; 713 struct o2hb_disk_slot *slot; 714 struct o2hb_disk_heartbeat_block *hb_block; 715 716 node_num = o2nm_this_node(); 717 slot = ®->hr_slots[node_num]; 718 719 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 720 memset(hb_block, 0, reg->hr_block_bytes); 721 /* TODO: time stuff */ 722 cputime = ktime_get_real_seconds(); 723 if (!cputime) 724 cputime = 1; 725 726 hb_block->hb_seq = cpu_to_le64(cputime); 727 hb_block->hb_node = node_num; 728 hb_block->hb_generation = cpu_to_le64(generation); 729 hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS); 730 731 /* This step must always happen last! */ 732 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 733 hb_block)); 734 735 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", 736 (long long)generation, 737 le32_to_cpu(hb_block->hb_cksum)); 738 } 739 740 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 741 struct o2nm_node *node, 742 int idx) 743 { 744 struct o2hb_callback_func *f; 745 746 list_for_each_entry(f, &hbcall->list, hc_item) { 747 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 748 (f->hc_func)(node, idx, f->hc_data); 749 } 750 } 751 752 /* Will run the list in order until we process the passed event */ 753 static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 754 { 755 struct o2hb_callback *hbcall; 756 struct o2hb_node_event *event; 757 758 /* Holding callback sem assures we don't alter the callback 759 * lists when doing this, and serializes ourselves with other 760 * processes wanting callbacks. */ 761 down_write(&o2hb_callback_sem); 762 763 spin_lock(&o2hb_live_lock); 764 while (!list_empty(&o2hb_node_events) 765 && !list_empty(&queued_event->hn_item)) { 766 event = list_entry(o2hb_node_events.next, 767 struct o2hb_node_event, 768 hn_item); 769 list_del_init(&event->hn_item); 770 spin_unlock(&o2hb_live_lock); 771 772 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 773 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 774 event->hn_node_num); 775 776 hbcall = hbcall_from_type(event->hn_event_type); 777 778 /* We should *never* have gotten on to the list with a 779 * bad type... This isn't something that we should try 780 * to recover from. */ 781 BUG_ON(IS_ERR(hbcall)); 782 783 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 784 785 spin_lock(&o2hb_live_lock); 786 } 787 spin_unlock(&o2hb_live_lock); 788 789 up_write(&o2hb_callback_sem); 790 } 791 792 static void o2hb_queue_node_event(struct o2hb_node_event *event, 793 enum o2hb_callback_type type, 794 struct o2nm_node *node, 795 int node_num) 796 { 797 assert_spin_locked(&o2hb_live_lock); 798 799 BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB)); 800 801 event->hn_event_type = type; 802 event->hn_node = node; 803 event->hn_node_num = node_num; 804 805 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 806 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 807 808 list_add_tail(&event->hn_item, &o2hb_node_events); 809 } 810 811 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 812 { 813 struct o2hb_node_event event = 814 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 815 struct o2nm_node *node; 816 int queued = 0; 817 818 node = o2nm_get_node_by_num(slot->ds_node_num); 819 if (!node) 820 return; 821 822 spin_lock(&o2hb_live_lock); 823 if (!list_empty(&slot->ds_live_item)) { 824 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 825 slot->ds_node_num); 826 827 list_del_init(&slot->ds_live_item); 828 829 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 830 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 831 832 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 833 slot->ds_node_num); 834 queued = 1; 835 } 836 } 837 spin_unlock(&o2hb_live_lock); 838 839 if (queued) 840 o2hb_run_event_list(&event); 841 842 o2nm_node_put(node); 843 } 844 845 static void o2hb_set_quorum_device(struct o2hb_region *reg) 846 { 847 if (!o2hb_global_heartbeat_active()) 848 return; 849 850 /* Prevent race with o2hb_heartbeat_group_drop_item() */ 851 if (kthread_should_stop()) 852 return; 853 854 /* Tag region as quorum only after thread reaches steady state */ 855 if (atomic_read(®->hr_steady_iterations) != 0) 856 return; 857 858 spin_lock(&o2hb_live_lock); 859 860 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) 861 goto unlock; 862 863 /* 864 * A region can be added to the quorum only when it sees all 865 * live nodes heartbeat on it. In other words, the region has been 866 * added to all nodes. 867 */ 868 if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap, 869 sizeof(o2hb_live_node_bitmap))) 870 goto unlock; 871 872 printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n", 873 config_item_name(®->hr_item), reg->hr_dev_name); 874 875 set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); 876 877 /* 878 * If global heartbeat active, unpin all regions if the 879 * region count > CUT_OFF 880 */ 881 if (bitmap_weight(o2hb_quorum_region_bitmap, 882 O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF) 883 o2hb_region_unpin(NULL); 884 unlock: 885 spin_unlock(&o2hb_live_lock); 886 } 887 888 static int o2hb_check_slot(struct o2hb_region *reg, 889 struct o2hb_disk_slot *slot) 890 { 891 int changed = 0, gen_changed = 0; 892 struct o2hb_node_event event = 893 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 894 struct o2nm_node *node; 895 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 896 u64 cputime; 897 unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; 898 unsigned int slot_dead_ms; 899 int tmp; 900 int queued = 0; 901 902 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 903 904 /* 905 * If a node is no longer configured but is still in the livemap, we 906 * may need to clear that bit from the livemap. 907 */ 908 node = o2nm_get_node_by_num(slot->ds_node_num); 909 if (!node) { 910 spin_lock(&o2hb_live_lock); 911 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap); 912 spin_unlock(&o2hb_live_lock); 913 if (!tmp) 914 return 0; 915 } 916 917 if (!o2hb_verify_crc(reg, hb_block)) { 918 /* all paths from here will drop o2hb_live_lock for 919 * us. */ 920 spin_lock(&o2hb_live_lock); 921 922 /* Don't print an error on the console in this case - 923 * a freshly formatted heartbeat area will not have a 924 * crc set on it. */ 925 if (list_empty(&slot->ds_live_item)) 926 goto out; 927 928 /* The node is live but pushed out a bad crc. We 929 * consider it a transient miss but don't populate any 930 * other values as they may be junk. */ 931 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 932 slot->ds_node_num, reg->hr_dev_name); 933 o2hb_dump_slot(hb_block); 934 935 slot->ds_equal_samples++; 936 goto fire_callbacks; 937 } 938 939 /* we don't care if these wrap.. the state transitions below 940 * clear at the right places */ 941 cputime = le64_to_cpu(hb_block->hb_seq); 942 if (slot->ds_last_time != cputime) 943 slot->ds_changed_samples++; 944 else 945 slot->ds_equal_samples++; 946 slot->ds_last_time = cputime; 947 948 /* The node changed heartbeat generations. We assume this to 949 * mean it dropped off but came back before we timed out. We 950 * want to consider it down for the time being but don't want 951 * to lose any changed_samples state we might build up to 952 * considering it live again. */ 953 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 954 gen_changed = 1; 955 slot->ds_equal_samples = 0; 956 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " 957 "to 0x%llx)\n", slot->ds_node_num, 958 (long long)slot->ds_last_generation, 959 (long long)le64_to_cpu(hb_block->hb_generation)); 960 } 961 962 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 963 964 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " 965 "seq %llu last %llu changed %u equal %u\n", 966 slot->ds_node_num, (long long)slot->ds_last_generation, 967 le32_to_cpu(hb_block->hb_cksum), 968 (unsigned long long)le64_to_cpu(hb_block->hb_seq), 969 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, 970 slot->ds_equal_samples); 971 972 spin_lock(&o2hb_live_lock); 973 974 fire_callbacks: 975 /* dead nodes only come to life after some number of 976 * changes at any time during their dead time */ 977 if (list_empty(&slot->ds_live_item) && 978 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 979 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", 980 slot->ds_node_num, (long long)slot->ds_last_generation); 981 982 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap); 983 984 /* first on the list generates a callback */ 985 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 986 mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes " 987 "bitmap\n", slot->ds_node_num); 988 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 989 990 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 991 slot->ds_node_num); 992 993 changed = 1; 994 queued = 1; 995 } 996 997 list_add_tail(&slot->ds_live_item, 998 &o2hb_live_slots[slot->ds_node_num]); 999 1000 slot->ds_equal_samples = 0; 1001 1002 /* We want to be sure that all nodes agree on the 1003 * number of milliseconds before a node will be 1004 * considered dead. The self-fencing timeout is 1005 * computed from this value, and a discrepancy might 1006 * result in heartbeat calling a node dead when it 1007 * hasn't self-fenced yet. */ 1008 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); 1009 if (slot_dead_ms && slot_dead_ms != dead_ms) { 1010 /* TODO: Perhaps we can fail the region here. */ 1011 mlog(ML_ERROR, "Node %d on device %s has a dead count " 1012 "of %u ms, but our count is %u ms.\n" 1013 "Please double check your configuration values " 1014 "for 'O2CB_HEARTBEAT_THRESHOLD'\n", 1015 slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, 1016 dead_ms); 1017 } 1018 goto out; 1019 } 1020 1021 /* if the list is dead, we're done.. */ 1022 if (list_empty(&slot->ds_live_item)) 1023 goto out; 1024 1025 /* live nodes only go dead after enough consequtive missed 1026 * samples.. reset the missed counter whenever we see 1027 * activity */ 1028 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 1029 mlog(ML_HEARTBEAT, "Node %d left my region\n", 1030 slot->ds_node_num); 1031 1032 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap); 1033 1034 /* last off the live_slot generates a callback */ 1035 list_del_init(&slot->ds_live_item); 1036 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 1037 mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live " 1038 "nodes bitmap\n", slot->ds_node_num); 1039 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 1040 1041 /* node can be null */ 1042 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, 1043 node, slot->ds_node_num); 1044 1045 changed = 1; 1046 queued = 1; 1047 } 1048 1049 /* We don't clear this because the node is still 1050 * actually writing new blocks. */ 1051 if (!gen_changed) 1052 slot->ds_changed_samples = 0; 1053 goto out; 1054 } 1055 if (slot->ds_changed_samples) { 1056 slot->ds_changed_samples = 0; 1057 slot->ds_equal_samples = 0; 1058 } 1059 out: 1060 spin_unlock(&o2hb_live_lock); 1061 1062 if (queued) 1063 o2hb_run_event_list(&event); 1064 1065 if (node) 1066 o2nm_node_put(node); 1067 return changed; 1068 } 1069 1070 static int o2hb_highest_node(unsigned long *nodes, int numbits) 1071 { 1072 return find_last_bit(nodes, numbits); 1073 } 1074 1075 static int o2hb_lowest_node(unsigned long *nodes, int numbits) 1076 { 1077 return find_first_bit(nodes, numbits); 1078 } 1079 1080 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) 1081 { 1082 int i, ret, highest_node, lowest_node; 1083 int membership_change = 0, own_slot_ok = 0; 1084 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1085 unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1086 struct o2hb_bio_wait_ctxt write_wc; 1087 1088 ret = o2nm_configured_node_map(configured_nodes, 1089 sizeof(configured_nodes)); 1090 if (ret) { 1091 mlog_errno(ret); 1092 goto bail; 1093 } 1094 1095 /* 1096 * If a node is not configured but is in the livemap, we still need 1097 * to read the slot so as to be able to remove it from the livemap. 1098 */ 1099 o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap)); 1100 i = -1; 1101 while ((i = find_next_bit(live_node_bitmap, 1102 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 1103 set_bit(i, configured_nodes); 1104 } 1105 1106 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 1107 lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES); 1108 if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) { 1109 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n"); 1110 ret = -EINVAL; 1111 goto bail; 1112 } 1113 1114 /* No sense in reading the slots of nodes that don't exist 1115 * yet. Of course, if the node definitions have holes in them 1116 * then we're reading an empty slot anyway... Consider this 1117 * best-effort. */ 1118 ret = o2hb_read_slots(reg, lowest_node, highest_node + 1); 1119 if (ret < 0) { 1120 mlog_errno(ret); 1121 goto bail; 1122 } 1123 1124 /* With an up to date view of the slots, we can check that no 1125 * other node has been improperly configured to heartbeat in 1126 * our slot. */ 1127 own_slot_ok = o2hb_check_own_slot(reg); 1128 1129 /* fill in the proper info for our next heartbeat */ 1130 o2hb_prepare_block(reg, reg->hr_generation); 1131 1132 ret = o2hb_issue_node_write(reg, &write_wc); 1133 if (ret < 0) { 1134 mlog_errno(ret); 1135 goto bail; 1136 } 1137 1138 i = -1; 1139 while((i = find_next_bit(configured_nodes, 1140 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 1141 membership_change |= o2hb_check_slot(reg, ®->hr_slots[i]); 1142 } 1143 1144 /* 1145 * We have to be sure we've advertised ourselves on disk 1146 * before we can go to steady state. This ensures that 1147 * people we find in our steady state have seen us. 1148 */ 1149 o2hb_wait_on_io(&write_wc); 1150 if (write_wc.wc_error) { 1151 /* Do not re-arm the write timeout on I/O error - we 1152 * can't be sure that the new block ever made it to 1153 * disk */ 1154 mlog(ML_ERROR, "Write error %d on device \"%s\"\n", 1155 write_wc.wc_error, reg->hr_dev_name); 1156 ret = write_wc.wc_error; 1157 goto bail; 1158 } 1159 1160 /* Skip disarming the timeout if own slot has stale/bad data */ 1161 if (own_slot_ok) { 1162 o2hb_set_quorum_device(reg); 1163 o2hb_arm_timeout(reg); 1164 reg->hr_last_timeout_start = jiffies; 1165 } 1166 1167 bail: 1168 /* let the person who launched us know when things are steady */ 1169 if (atomic_read(®->hr_steady_iterations) != 0) { 1170 if (!ret && own_slot_ok && !membership_change) { 1171 if (atomic_dec_and_test(®->hr_steady_iterations)) 1172 wake_up(&o2hb_steady_queue); 1173 } 1174 } 1175 1176 if (atomic_read(®->hr_steady_iterations) != 0) { 1177 if (atomic_dec_and_test(®->hr_unsteady_iterations)) { 1178 printk(KERN_NOTICE "o2hb: Unable to stabilize " 1179 "heartbeat on region %s (%s)\n", 1180 config_item_name(®->hr_item), 1181 reg->hr_dev_name); 1182 atomic_set(®->hr_steady_iterations, 0); 1183 reg->hr_aborted_start = 1; 1184 wake_up(&o2hb_steady_queue); 1185 ret = -EIO; 1186 } 1187 } 1188 1189 return ret; 1190 } 1191 1192 /* 1193 * we ride the region ref that the region dir holds. before the region 1194 * dir is removed and drops it ref it will wait to tear down this 1195 * thread. 1196 */ 1197 static int o2hb_thread(void *data) 1198 { 1199 int i, ret; 1200 struct o2hb_region *reg = data; 1201 struct o2hb_bio_wait_ctxt write_wc; 1202 ktime_t before_hb, after_hb; 1203 unsigned int elapsed_msec; 1204 1205 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 1206 1207 set_user_nice(current, MIN_NICE); 1208 1209 /* Pin node */ 1210 ret = o2nm_depend_this_node(); 1211 if (ret) { 1212 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret); 1213 reg->hr_node_deleted = 1; 1214 wake_up(&o2hb_steady_queue); 1215 return 0; 1216 } 1217 1218 while (!kthread_should_stop() && 1219 !reg->hr_unclean_stop && !reg->hr_aborted_start) { 1220 /* We track the time spent inside 1221 * o2hb_do_disk_heartbeat so that we avoid more than 1222 * hr_timeout_ms between disk writes. On busy systems 1223 * this should result in a heartbeat which is less 1224 * likely to time itself out. */ 1225 before_hb = ktime_get_real(); 1226 1227 ret = o2hb_do_disk_heartbeat(reg); 1228 reg->hr_last_hb_status = ret; 1229 1230 after_hb = ktime_get_real(); 1231 1232 elapsed_msec = (unsigned int) 1233 ktime_ms_delta(after_hb, before_hb); 1234 1235 mlog(ML_HEARTBEAT, 1236 "start = %lld, end = %lld, msec = %u, ret = %d\n", 1237 before_hb, after_hb, elapsed_msec, ret); 1238 1239 if (!kthread_should_stop() && 1240 elapsed_msec < reg->hr_timeout_ms) { 1241 /* the kthread api has blocked signals for us so no 1242 * need to record the return value. */ 1243 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 1244 } 1245 } 1246 1247 o2hb_disarm_timeout(reg); 1248 1249 /* unclean stop is only used in very bad situation */ 1250 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 1251 o2hb_shutdown_slot(®->hr_slots[i]); 1252 1253 /* Explicit down notification - avoid forcing the other nodes 1254 * to timeout on this region when we could just as easily 1255 * write a clear generation - thus indicating to them that 1256 * this node has left this region. 1257 */ 1258 if (!reg->hr_unclean_stop && !reg->hr_aborted_start) { 1259 o2hb_prepare_block(reg, 0); 1260 ret = o2hb_issue_node_write(reg, &write_wc); 1261 if (ret == 0) 1262 o2hb_wait_on_io(&write_wc); 1263 else 1264 mlog_errno(ret); 1265 } 1266 1267 /* Unpin node */ 1268 o2nm_undepend_this_node(); 1269 1270 mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n"); 1271 1272 return 0; 1273 } 1274 1275 #ifdef CONFIG_DEBUG_FS 1276 static int o2hb_debug_open(struct inode *inode, struct file *file) 1277 { 1278 struct o2hb_debug_buf *db = inode->i_private; 1279 struct o2hb_region *reg; 1280 unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1281 unsigned long lts; 1282 char *buf = NULL; 1283 int i = -1; 1284 int out = 0; 1285 1286 /* max_nodes should be the largest bitmap we pass here */ 1287 BUG_ON(sizeof(map) < db->db_size); 1288 1289 buf = kmalloc(PAGE_SIZE, GFP_KERNEL); 1290 if (!buf) 1291 goto bail; 1292 1293 switch (db->db_type) { 1294 case O2HB_DB_TYPE_LIVENODES: 1295 case O2HB_DB_TYPE_LIVEREGIONS: 1296 case O2HB_DB_TYPE_QUORUMREGIONS: 1297 case O2HB_DB_TYPE_FAILEDREGIONS: 1298 spin_lock(&o2hb_live_lock); 1299 memcpy(map, db->db_data, db->db_size); 1300 spin_unlock(&o2hb_live_lock); 1301 break; 1302 1303 case O2HB_DB_TYPE_REGION_LIVENODES: 1304 spin_lock(&o2hb_live_lock); 1305 reg = (struct o2hb_region *)db->db_data; 1306 memcpy(map, reg->hr_live_node_bitmap, db->db_size); 1307 spin_unlock(&o2hb_live_lock); 1308 break; 1309 1310 case O2HB_DB_TYPE_REGION_NUMBER: 1311 reg = (struct o2hb_region *)db->db_data; 1312 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n", 1313 reg->hr_region_num); 1314 goto done; 1315 1316 case O2HB_DB_TYPE_REGION_ELAPSED_TIME: 1317 reg = (struct o2hb_region *)db->db_data; 1318 lts = reg->hr_last_timeout_start; 1319 /* If 0, it has never been set before */ 1320 if (lts) 1321 lts = jiffies_to_msecs(jiffies - lts); 1322 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts); 1323 goto done; 1324 1325 case O2HB_DB_TYPE_REGION_PINNED: 1326 reg = (struct o2hb_region *)db->db_data; 1327 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n", 1328 !!reg->hr_item_pinned); 1329 goto done; 1330 1331 default: 1332 goto done; 1333 } 1334 1335 while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len) 1336 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i); 1337 out += snprintf(buf + out, PAGE_SIZE - out, "\n"); 1338 1339 done: 1340 i_size_write(inode, out); 1341 1342 file->private_data = buf; 1343 1344 return 0; 1345 bail: 1346 return -ENOMEM; 1347 } 1348 1349 static int o2hb_debug_release(struct inode *inode, struct file *file) 1350 { 1351 kfree(file->private_data); 1352 return 0; 1353 } 1354 1355 static ssize_t o2hb_debug_read(struct file *file, char __user *buf, 1356 size_t nbytes, loff_t *ppos) 1357 { 1358 return simple_read_from_buffer(buf, nbytes, ppos, file->private_data, 1359 i_size_read(file->f_mapping->host)); 1360 } 1361 #else 1362 static int o2hb_debug_open(struct inode *inode, struct file *file) 1363 { 1364 return 0; 1365 } 1366 static int o2hb_debug_release(struct inode *inode, struct file *file) 1367 { 1368 return 0; 1369 } 1370 static ssize_t o2hb_debug_read(struct file *file, char __user *buf, 1371 size_t nbytes, loff_t *ppos) 1372 { 1373 return 0; 1374 } 1375 #endif /* CONFIG_DEBUG_FS */ 1376 1377 static const struct file_operations o2hb_debug_fops = { 1378 .open = o2hb_debug_open, 1379 .release = o2hb_debug_release, 1380 .read = o2hb_debug_read, 1381 .llseek = generic_file_llseek, 1382 }; 1383 1384 void o2hb_exit(void) 1385 { 1386 debugfs_remove_recursive(o2hb_debug_dir); 1387 kfree(o2hb_db_livenodes); 1388 kfree(o2hb_db_liveregions); 1389 kfree(o2hb_db_quorumregions); 1390 kfree(o2hb_db_failedregions); 1391 } 1392 1393 static void o2hb_debug_create(const char *name, struct dentry *dir, 1394 struct o2hb_debug_buf **db, int db_len, int type, 1395 int size, int len, void *data) 1396 { 1397 *db = kmalloc(db_len, GFP_KERNEL); 1398 if (!*db) 1399 return; 1400 1401 (*db)->db_type = type; 1402 (*db)->db_size = size; 1403 (*db)->db_len = len; 1404 (*db)->db_data = data; 1405 1406 debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, &o2hb_debug_fops); 1407 } 1408 1409 static void o2hb_debug_init(void) 1410 { 1411 o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL); 1412 1413 o2hb_debug_create(O2HB_DEBUG_LIVENODES, o2hb_debug_dir, 1414 &o2hb_db_livenodes, sizeof(*o2hb_db_livenodes), 1415 O2HB_DB_TYPE_LIVENODES, sizeof(o2hb_live_node_bitmap), 1416 O2NM_MAX_NODES, o2hb_live_node_bitmap); 1417 1418 o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, o2hb_debug_dir, 1419 &o2hb_db_liveregions, sizeof(*o2hb_db_liveregions), 1420 O2HB_DB_TYPE_LIVEREGIONS, 1421 sizeof(o2hb_live_region_bitmap), O2NM_MAX_REGIONS, 1422 o2hb_live_region_bitmap); 1423 1424 o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, o2hb_debug_dir, 1425 &o2hb_db_quorumregions, 1426 sizeof(*o2hb_db_quorumregions), 1427 O2HB_DB_TYPE_QUORUMREGIONS, 1428 sizeof(o2hb_quorum_region_bitmap), O2NM_MAX_REGIONS, 1429 o2hb_quorum_region_bitmap); 1430 1431 o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, o2hb_debug_dir, 1432 &o2hb_db_failedregions, 1433 sizeof(*o2hb_db_failedregions), 1434 O2HB_DB_TYPE_FAILEDREGIONS, 1435 sizeof(o2hb_failed_region_bitmap), O2NM_MAX_REGIONS, 1436 o2hb_failed_region_bitmap); 1437 } 1438 1439 void o2hb_init(void) 1440 { 1441 int i; 1442 1443 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 1444 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 1445 1446 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 1447 INIT_LIST_HEAD(&o2hb_live_slots[i]); 1448 1449 INIT_LIST_HEAD(&o2hb_node_events); 1450 1451 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 1452 memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap)); 1453 memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap)); 1454 memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap)); 1455 memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap)); 1456 1457 o2hb_dependent_users = 0; 1458 1459 o2hb_debug_init(); 1460 } 1461 1462 /* if we're already in a callback then we're already serialized by the sem */ 1463 static void o2hb_fill_node_map_from_callback(unsigned long *map, 1464 unsigned bytes) 1465 { 1466 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 1467 1468 memcpy(map, &o2hb_live_node_bitmap, bytes); 1469 } 1470 1471 /* 1472 * get a map of all nodes that are heartbeating in any regions 1473 */ 1474 void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 1475 { 1476 /* callers want to serialize this map and callbacks so that they 1477 * can trust that they don't miss nodes coming to the party */ 1478 down_read(&o2hb_callback_sem); 1479 spin_lock(&o2hb_live_lock); 1480 o2hb_fill_node_map_from_callback(map, bytes); 1481 spin_unlock(&o2hb_live_lock); 1482 up_read(&o2hb_callback_sem); 1483 } 1484 EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 1485 1486 /* 1487 * heartbeat configfs bits. The heartbeat set is a default set under 1488 * the cluster set in nodemanager.c. 1489 */ 1490 1491 static struct o2hb_region *to_o2hb_region(struct config_item *item) 1492 { 1493 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 1494 } 1495 1496 /* drop_item only drops its ref after killing the thread, nothing should 1497 * be using the region anymore. this has to clean up any state that 1498 * attributes might have built up. */ 1499 static void o2hb_region_release(struct config_item *item) 1500 { 1501 int i; 1502 struct page *page; 1503 struct o2hb_region *reg = to_o2hb_region(item); 1504 1505 mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name); 1506 1507 kfree(reg->hr_tmp_block); 1508 1509 if (reg->hr_slot_data) { 1510 for (i = 0; i < reg->hr_num_pages; i++) { 1511 page = reg->hr_slot_data[i]; 1512 if (page) 1513 __free_page(page); 1514 } 1515 kfree(reg->hr_slot_data); 1516 } 1517 1518 if (reg->hr_bdev) 1519 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); 1520 1521 kfree(reg->hr_slots); 1522 1523 debugfs_remove_recursive(reg->hr_debug_dir); 1524 kfree(reg->hr_db_livenodes); 1525 kfree(reg->hr_db_regnum); 1526 kfree(reg->hr_db_elapsed_time); 1527 kfree(reg->hr_db_pinned); 1528 1529 spin_lock(&o2hb_live_lock); 1530 list_del(®->hr_all_item); 1531 spin_unlock(&o2hb_live_lock); 1532 1533 o2net_unregister_handler_list(®->hr_handler_list); 1534 kfree(reg); 1535 } 1536 1537 static int o2hb_read_block_input(struct o2hb_region *reg, 1538 const char *page, 1539 unsigned long *ret_bytes, 1540 unsigned int *ret_bits) 1541 { 1542 unsigned long bytes; 1543 char *p = (char *)page; 1544 1545 bytes = simple_strtoul(p, &p, 0); 1546 if (!p || (*p && (*p != '\n'))) 1547 return -EINVAL; 1548 1549 /* Heartbeat and fs min / max block sizes are the same. */ 1550 if (bytes > 4096 || bytes < 512) 1551 return -ERANGE; 1552 if (hweight16(bytes) != 1) 1553 return -EINVAL; 1554 1555 if (ret_bytes) 1556 *ret_bytes = bytes; 1557 if (ret_bits) 1558 *ret_bits = ffs(bytes) - 1; 1559 1560 return 0; 1561 } 1562 1563 static ssize_t o2hb_region_block_bytes_show(struct config_item *item, 1564 char *page) 1565 { 1566 return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes); 1567 } 1568 1569 static ssize_t o2hb_region_block_bytes_store(struct config_item *item, 1570 const char *page, 1571 size_t count) 1572 { 1573 struct o2hb_region *reg = to_o2hb_region(item); 1574 int status; 1575 unsigned long block_bytes; 1576 unsigned int block_bits; 1577 1578 if (reg->hr_bdev) 1579 return -EINVAL; 1580 1581 status = o2hb_read_block_input(reg, page, &block_bytes, 1582 &block_bits); 1583 if (status) 1584 return status; 1585 1586 reg->hr_block_bytes = (unsigned int)block_bytes; 1587 reg->hr_block_bits = block_bits; 1588 1589 return count; 1590 } 1591 1592 static ssize_t o2hb_region_start_block_show(struct config_item *item, 1593 char *page) 1594 { 1595 return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block); 1596 } 1597 1598 static ssize_t o2hb_region_start_block_store(struct config_item *item, 1599 const char *page, 1600 size_t count) 1601 { 1602 struct o2hb_region *reg = to_o2hb_region(item); 1603 unsigned long long tmp; 1604 char *p = (char *)page; 1605 1606 if (reg->hr_bdev) 1607 return -EINVAL; 1608 1609 tmp = simple_strtoull(p, &p, 0); 1610 if (!p || (*p && (*p != '\n'))) 1611 return -EINVAL; 1612 1613 reg->hr_start_block = tmp; 1614 1615 return count; 1616 } 1617 1618 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page) 1619 { 1620 return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks); 1621 } 1622 1623 static ssize_t o2hb_region_blocks_store(struct config_item *item, 1624 const char *page, 1625 size_t count) 1626 { 1627 struct o2hb_region *reg = to_o2hb_region(item); 1628 unsigned long tmp; 1629 char *p = (char *)page; 1630 1631 if (reg->hr_bdev) 1632 return -EINVAL; 1633 1634 tmp = simple_strtoul(p, &p, 0); 1635 if (!p || (*p && (*p != '\n'))) 1636 return -EINVAL; 1637 1638 if (tmp > O2NM_MAX_NODES || tmp == 0) 1639 return -ERANGE; 1640 1641 reg->hr_blocks = (unsigned int)tmp; 1642 1643 return count; 1644 } 1645 1646 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page) 1647 { 1648 unsigned int ret = 0; 1649 1650 if (to_o2hb_region(item)->hr_bdev) 1651 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name); 1652 1653 return ret; 1654 } 1655 1656 static void o2hb_init_region_params(struct o2hb_region *reg) 1657 { 1658 reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits; 1659 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1660 1661 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1662 reg->hr_start_block, reg->hr_blocks); 1663 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1664 reg->hr_block_bytes, reg->hr_block_bits); 1665 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1666 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1667 } 1668 1669 static int o2hb_map_slot_data(struct o2hb_region *reg) 1670 { 1671 int i, j; 1672 unsigned int last_slot; 1673 unsigned int spp = reg->hr_slots_per_page; 1674 struct page *page; 1675 char *raw; 1676 struct o2hb_disk_slot *slot; 1677 1678 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1679 if (reg->hr_tmp_block == NULL) 1680 return -ENOMEM; 1681 1682 reg->hr_slots = kcalloc(reg->hr_blocks, 1683 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1684 if (reg->hr_slots == NULL) 1685 return -ENOMEM; 1686 1687 for(i = 0; i < reg->hr_blocks; i++) { 1688 slot = ®->hr_slots[i]; 1689 slot->ds_node_num = i; 1690 INIT_LIST_HEAD(&slot->ds_live_item); 1691 slot->ds_raw_block = NULL; 1692 } 1693 1694 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1695 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1696 "at %u blocks per page\n", 1697 reg->hr_num_pages, reg->hr_blocks, spp); 1698 1699 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1700 GFP_KERNEL); 1701 if (!reg->hr_slot_data) 1702 return -ENOMEM; 1703 1704 for(i = 0; i < reg->hr_num_pages; i++) { 1705 page = alloc_page(GFP_KERNEL); 1706 if (!page) 1707 return -ENOMEM; 1708 1709 reg->hr_slot_data[i] = page; 1710 1711 last_slot = i * spp; 1712 raw = page_address(page); 1713 for (j = 0; 1714 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1715 j++) { 1716 BUG_ON((j + last_slot) >= reg->hr_blocks); 1717 1718 slot = ®->hr_slots[j + last_slot]; 1719 slot->ds_raw_block = 1720 (struct o2hb_disk_heartbeat_block *) raw; 1721 1722 raw += reg->hr_block_bytes; 1723 } 1724 } 1725 1726 return 0; 1727 } 1728 1729 /* Read in all the slots available and populate the tracking 1730 * structures so that we can start with a baseline idea of what's 1731 * there. */ 1732 static int o2hb_populate_slot_data(struct o2hb_region *reg) 1733 { 1734 int ret, i; 1735 struct o2hb_disk_slot *slot; 1736 struct o2hb_disk_heartbeat_block *hb_block; 1737 1738 ret = o2hb_read_slots(reg, 0, reg->hr_blocks); 1739 if (ret) 1740 goto out; 1741 1742 /* We only want to get an idea of the values initially in each 1743 * slot, so we do no verification - o2hb_check_slot will 1744 * actually determine if each configured slot is valid and 1745 * whether any values have changed. */ 1746 for(i = 0; i < reg->hr_blocks; i++) { 1747 slot = ®->hr_slots[i]; 1748 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1749 1750 /* Only fill the values that o2hb_check_slot uses to 1751 * determine changing slots */ 1752 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1753 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1754 } 1755 1756 out: 1757 return ret; 1758 } 1759 1760 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1761 static ssize_t o2hb_region_dev_store(struct config_item *item, 1762 const char *page, 1763 size_t count) 1764 { 1765 struct o2hb_region *reg = to_o2hb_region(item); 1766 struct task_struct *hb_task; 1767 long fd; 1768 int sectsize; 1769 char *p = (char *)page; 1770 struct fd f; 1771 struct inode *inode; 1772 ssize_t ret = -EINVAL; 1773 int live_threshold; 1774 1775 if (reg->hr_bdev) 1776 goto out; 1777 1778 /* We can't heartbeat without having had our node number 1779 * configured yet. */ 1780 if (o2nm_this_node() == O2NM_MAX_NODES) 1781 goto out; 1782 1783 fd = simple_strtol(p, &p, 0); 1784 if (!p || (*p && (*p != '\n'))) 1785 goto out; 1786 1787 if (fd < 0 || fd >= INT_MAX) 1788 goto out; 1789 1790 f = fdget(fd); 1791 if (f.file == NULL) 1792 goto out; 1793 1794 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1795 reg->hr_block_bytes == 0) 1796 goto out2; 1797 1798 inode = igrab(f.file->f_mapping->host); 1799 if (inode == NULL) 1800 goto out2; 1801 1802 if (!S_ISBLK(inode->i_mode)) 1803 goto out3; 1804 1805 reg->hr_bdev = I_BDEV(f.file->f_mapping->host); 1806 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL); 1807 if (ret) { 1808 reg->hr_bdev = NULL; 1809 goto out3; 1810 } 1811 inode = NULL; 1812 1813 bdevname(reg->hr_bdev, reg->hr_dev_name); 1814 1815 sectsize = bdev_logical_block_size(reg->hr_bdev); 1816 if (sectsize != reg->hr_block_bytes) { 1817 mlog(ML_ERROR, 1818 "blocksize %u incorrect for device, expected %d", 1819 reg->hr_block_bytes, sectsize); 1820 ret = -EINVAL; 1821 goto out3; 1822 } 1823 1824 o2hb_init_region_params(reg); 1825 1826 /* Generation of zero is invalid */ 1827 do { 1828 get_random_bytes(®->hr_generation, 1829 sizeof(reg->hr_generation)); 1830 } while (reg->hr_generation == 0); 1831 1832 ret = o2hb_map_slot_data(reg); 1833 if (ret) { 1834 mlog_errno(ret); 1835 goto out3; 1836 } 1837 1838 ret = o2hb_populate_slot_data(reg); 1839 if (ret) { 1840 mlog_errno(ret); 1841 goto out3; 1842 } 1843 1844 INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); 1845 INIT_DELAYED_WORK(®->hr_nego_timeout_work, o2hb_nego_timeout); 1846 1847 /* 1848 * A node is considered live after it has beat LIVE_THRESHOLD 1849 * times. We're not steady until we've given them a chance 1850 * _after_ our first read. 1851 * The default threshold is bare minimum so as to limit the delay 1852 * during mounts. For global heartbeat, the threshold doubled for the 1853 * first region. 1854 */ 1855 live_threshold = O2HB_LIVE_THRESHOLD; 1856 if (o2hb_global_heartbeat_active()) { 1857 spin_lock(&o2hb_live_lock); 1858 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1) 1859 live_threshold <<= 1; 1860 spin_unlock(&o2hb_live_lock); 1861 } 1862 ++live_threshold; 1863 atomic_set(®->hr_steady_iterations, live_threshold); 1864 /* unsteady_iterations is triple the steady_iterations */ 1865 atomic_set(®->hr_unsteady_iterations, (live_threshold * 3)); 1866 1867 hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1868 reg->hr_item.ci_name); 1869 if (IS_ERR(hb_task)) { 1870 ret = PTR_ERR(hb_task); 1871 mlog_errno(ret); 1872 goto out3; 1873 } 1874 1875 spin_lock(&o2hb_live_lock); 1876 reg->hr_task = hb_task; 1877 spin_unlock(&o2hb_live_lock); 1878 1879 ret = wait_event_interruptible(o2hb_steady_queue, 1880 atomic_read(®->hr_steady_iterations) == 0 || 1881 reg->hr_node_deleted); 1882 if (ret) { 1883 atomic_set(®->hr_steady_iterations, 0); 1884 reg->hr_aborted_start = 1; 1885 } 1886 1887 if (reg->hr_aborted_start) { 1888 ret = -EIO; 1889 goto out3; 1890 } 1891 1892 if (reg->hr_node_deleted) { 1893 ret = -EINVAL; 1894 goto out3; 1895 } 1896 1897 /* Ok, we were woken. Make sure it wasn't by drop_item() */ 1898 spin_lock(&o2hb_live_lock); 1899 hb_task = reg->hr_task; 1900 if (o2hb_global_heartbeat_active()) 1901 set_bit(reg->hr_region_num, o2hb_live_region_bitmap); 1902 spin_unlock(&o2hb_live_lock); 1903 1904 if (hb_task) 1905 ret = count; 1906 else 1907 ret = -EIO; 1908 1909 if (hb_task && o2hb_global_heartbeat_active()) 1910 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n", 1911 config_item_name(®->hr_item), reg->hr_dev_name); 1912 1913 out3: 1914 iput(inode); 1915 out2: 1916 fdput(f); 1917 out: 1918 if (ret < 0) { 1919 if (reg->hr_bdev) { 1920 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); 1921 reg->hr_bdev = NULL; 1922 } 1923 } 1924 return ret; 1925 } 1926 1927 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page) 1928 { 1929 struct o2hb_region *reg = to_o2hb_region(item); 1930 pid_t pid = 0; 1931 1932 spin_lock(&o2hb_live_lock); 1933 if (reg->hr_task) 1934 pid = task_pid_nr(reg->hr_task); 1935 spin_unlock(&o2hb_live_lock); 1936 1937 if (!pid) 1938 return 0; 1939 1940 return sprintf(page, "%u\n", pid); 1941 } 1942 1943 CONFIGFS_ATTR(o2hb_region_, block_bytes); 1944 CONFIGFS_ATTR(o2hb_region_, start_block); 1945 CONFIGFS_ATTR(o2hb_region_, blocks); 1946 CONFIGFS_ATTR(o2hb_region_, dev); 1947 CONFIGFS_ATTR_RO(o2hb_region_, pid); 1948 1949 static struct configfs_attribute *o2hb_region_attrs[] = { 1950 &o2hb_region_attr_block_bytes, 1951 &o2hb_region_attr_start_block, 1952 &o2hb_region_attr_blocks, 1953 &o2hb_region_attr_dev, 1954 &o2hb_region_attr_pid, 1955 NULL, 1956 }; 1957 1958 static struct configfs_item_operations o2hb_region_item_ops = { 1959 .release = o2hb_region_release, 1960 }; 1961 1962 static const struct config_item_type o2hb_region_type = { 1963 .ct_item_ops = &o2hb_region_item_ops, 1964 .ct_attrs = o2hb_region_attrs, 1965 .ct_owner = THIS_MODULE, 1966 }; 1967 1968 /* heartbeat set */ 1969 1970 struct o2hb_heartbeat_group { 1971 struct config_group hs_group; 1972 /* some stuff? */ 1973 }; 1974 1975 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 1976 { 1977 return group ? 1978 container_of(group, struct o2hb_heartbeat_group, hs_group) 1979 : NULL; 1980 } 1981 1982 static void o2hb_debug_region_init(struct o2hb_region *reg, 1983 struct dentry *parent) 1984 { 1985 struct dentry *dir; 1986 1987 dir = debugfs_create_dir(config_item_name(®->hr_item), parent); 1988 reg->hr_debug_dir = dir; 1989 1990 o2hb_debug_create(O2HB_DEBUG_LIVENODES, dir, &(reg->hr_db_livenodes), 1991 sizeof(*(reg->hr_db_livenodes)), 1992 O2HB_DB_TYPE_REGION_LIVENODES, 1993 sizeof(reg->hr_live_node_bitmap), O2NM_MAX_NODES, 1994 reg); 1995 1996 o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, dir, &(reg->hr_db_regnum), 1997 sizeof(*(reg->hr_db_regnum)), 1998 O2HB_DB_TYPE_REGION_NUMBER, 0, O2NM_MAX_NODES, reg); 1999 2000 o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, dir, 2001 &(reg->hr_db_elapsed_time), 2002 sizeof(*(reg->hr_db_elapsed_time)), 2003 O2HB_DB_TYPE_REGION_ELAPSED_TIME, 0, 0, reg); 2004 2005 o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, dir, &(reg->hr_db_pinned), 2006 sizeof(*(reg->hr_db_pinned)), 2007 O2HB_DB_TYPE_REGION_PINNED, 0, 0, reg); 2008 2009 } 2010 2011 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 2012 const char *name) 2013 { 2014 struct o2hb_region *reg = NULL; 2015 int ret; 2016 2017 reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL); 2018 if (reg == NULL) 2019 return ERR_PTR(-ENOMEM); 2020 2021 if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) { 2022 ret = -ENAMETOOLONG; 2023 goto free; 2024 } 2025 2026 spin_lock(&o2hb_live_lock); 2027 reg->hr_region_num = 0; 2028 if (o2hb_global_heartbeat_active()) { 2029 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap, 2030 O2NM_MAX_REGIONS); 2031 if (reg->hr_region_num >= O2NM_MAX_REGIONS) { 2032 spin_unlock(&o2hb_live_lock); 2033 ret = -EFBIG; 2034 goto free; 2035 } 2036 set_bit(reg->hr_region_num, o2hb_region_bitmap); 2037 } 2038 list_add_tail(®->hr_all_item, &o2hb_all_regions); 2039 spin_unlock(&o2hb_live_lock); 2040 2041 config_item_init_type_name(®->hr_item, name, &o2hb_region_type); 2042 2043 /* this is the same way to generate msg key as dlm, for local heartbeat, 2044 * name is also the same, so make initial crc value different to avoid 2045 * message key conflict. 2046 */ 2047 reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS, 2048 name, strlen(name)); 2049 INIT_LIST_HEAD(®->hr_handler_list); 2050 ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key, 2051 sizeof(struct o2hb_nego_msg), 2052 o2hb_nego_timeout_handler, 2053 reg, NULL, ®->hr_handler_list); 2054 if (ret) 2055 goto free; 2056 2057 ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key, 2058 sizeof(struct o2hb_nego_msg), 2059 o2hb_nego_approve_handler, 2060 reg, NULL, ®->hr_handler_list); 2061 if (ret) 2062 goto unregister_handler; 2063 2064 o2hb_debug_region_init(reg, o2hb_debug_dir); 2065 2066 return ®->hr_item; 2067 2068 unregister_handler: 2069 o2net_unregister_handler_list(®->hr_handler_list); 2070 free: 2071 kfree(reg); 2072 return ERR_PTR(ret); 2073 } 2074 2075 static void o2hb_heartbeat_group_drop_item(struct config_group *group, 2076 struct config_item *item) 2077 { 2078 struct task_struct *hb_task; 2079 struct o2hb_region *reg = to_o2hb_region(item); 2080 int quorum_region = 0; 2081 2082 /* stop the thread when the user removes the region dir */ 2083 spin_lock(&o2hb_live_lock); 2084 hb_task = reg->hr_task; 2085 reg->hr_task = NULL; 2086 reg->hr_item_dropped = 1; 2087 spin_unlock(&o2hb_live_lock); 2088 2089 if (hb_task) 2090 kthread_stop(hb_task); 2091 2092 if (o2hb_global_heartbeat_active()) { 2093 spin_lock(&o2hb_live_lock); 2094 clear_bit(reg->hr_region_num, o2hb_region_bitmap); 2095 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap); 2096 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) 2097 quorum_region = 1; 2098 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); 2099 spin_unlock(&o2hb_live_lock); 2100 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n", 2101 ((atomic_read(®->hr_steady_iterations) == 0) ? 2102 "stopped" : "start aborted"), config_item_name(item), 2103 reg->hr_dev_name); 2104 } 2105 2106 /* 2107 * If we're racing a dev_write(), we need to wake them. They will 2108 * check reg->hr_task 2109 */ 2110 if (atomic_read(®->hr_steady_iterations) != 0) { 2111 reg->hr_aborted_start = 1; 2112 atomic_set(®->hr_steady_iterations, 0); 2113 wake_up(&o2hb_steady_queue); 2114 } 2115 2116 config_item_put(item); 2117 2118 if (!o2hb_global_heartbeat_active() || !quorum_region) 2119 return; 2120 2121 /* 2122 * If global heartbeat active and there are dependent users, 2123 * pin all regions if quorum region count <= CUT_OFF 2124 */ 2125 spin_lock(&o2hb_live_lock); 2126 2127 if (!o2hb_dependent_users) 2128 goto unlock; 2129 2130 if (bitmap_weight(o2hb_quorum_region_bitmap, 2131 O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) 2132 o2hb_region_pin(NULL); 2133 2134 unlock: 2135 spin_unlock(&o2hb_live_lock); 2136 } 2137 2138 static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item, 2139 char *page) 2140 { 2141 return sprintf(page, "%u\n", o2hb_dead_threshold); 2142 } 2143 2144 static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item, 2145 const char *page, size_t count) 2146 { 2147 unsigned long tmp; 2148 char *p = (char *)page; 2149 2150 tmp = simple_strtoul(p, &p, 10); 2151 if (!p || (*p && (*p != '\n'))) 2152 return -EINVAL; 2153 2154 /* this will validate ranges for us. */ 2155 o2hb_dead_threshold_set((unsigned int) tmp); 2156 2157 return count; 2158 } 2159 2160 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item, 2161 char *page) 2162 { 2163 return sprintf(page, "%s\n", 2164 o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]); 2165 } 2166 2167 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item, 2168 const char *page, size_t count) 2169 { 2170 unsigned int i; 2171 int ret; 2172 size_t len; 2173 2174 len = (page[count - 1] == '\n') ? count - 1 : count; 2175 if (!len) 2176 return -EINVAL; 2177 2178 for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) { 2179 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len)) 2180 continue; 2181 2182 ret = o2hb_global_heartbeat_mode_set(i); 2183 if (!ret) 2184 printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n", 2185 o2hb_heartbeat_mode_desc[i]); 2186 return count; 2187 } 2188 2189 return -EINVAL; 2190 2191 } 2192 2193 CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold); 2194 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode); 2195 2196 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 2197 &o2hb_heartbeat_group_attr_dead_threshold, 2198 &o2hb_heartbeat_group_attr_mode, 2199 NULL, 2200 }; 2201 2202 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 2203 .make_item = o2hb_heartbeat_group_make_item, 2204 .drop_item = o2hb_heartbeat_group_drop_item, 2205 }; 2206 2207 static const struct config_item_type o2hb_heartbeat_group_type = { 2208 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 2209 .ct_attrs = o2hb_heartbeat_group_attrs, 2210 .ct_owner = THIS_MODULE, 2211 }; 2212 2213 /* this is just here to avoid touching group in heartbeat.h which the 2214 * entire damn world #includes */ 2215 struct config_group *o2hb_alloc_hb_set(void) 2216 { 2217 struct o2hb_heartbeat_group *hs = NULL; 2218 struct config_group *ret = NULL; 2219 2220 hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 2221 if (hs == NULL) 2222 goto out; 2223 2224 config_group_init_type_name(&hs->hs_group, "heartbeat", 2225 &o2hb_heartbeat_group_type); 2226 2227 ret = &hs->hs_group; 2228 out: 2229 if (ret == NULL) 2230 kfree(hs); 2231 return ret; 2232 } 2233 2234 void o2hb_free_hb_set(struct config_group *group) 2235 { 2236 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 2237 kfree(hs); 2238 } 2239 2240 /* hb callback registration and issuing */ 2241 2242 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 2243 { 2244 if (type == O2HB_NUM_CB) 2245 return ERR_PTR(-EINVAL); 2246 2247 return &o2hb_callbacks[type]; 2248 } 2249 2250 void o2hb_setup_callback(struct o2hb_callback_func *hc, 2251 enum o2hb_callback_type type, 2252 o2hb_cb_func *func, 2253 void *data, 2254 int priority) 2255 { 2256 INIT_LIST_HEAD(&hc->hc_item); 2257 hc->hc_func = func; 2258 hc->hc_data = data; 2259 hc->hc_priority = priority; 2260 hc->hc_type = type; 2261 hc->hc_magic = O2HB_CB_MAGIC; 2262 } 2263 EXPORT_SYMBOL_GPL(o2hb_setup_callback); 2264 2265 /* 2266 * In local heartbeat mode, region_uuid passed matches the dlm domain name. 2267 * In global heartbeat mode, region_uuid passed is NULL. 2268 * 2269 * In local, we only pin the matching region. In global we pin all the active 2270 * regions. 2271 */ 2272 static int o2hb_region_pin(const char *region_uuid) 2273 { 2274 int ret = 0, found = 0; 2275 struct o2hb_region *reg; 2276 char *uuid; 2277 2278 assert_spin_locked(&o2hb_live_lock); 2279 2280 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { 2281 if (reg->hr_item_dropped) 2282 continue; 2283 2284 uuid = config_item_name(®->hr_item); 2285 2286 /* local heartbeat */ 2287 if (region_uuid) { 2288 if (strcmp(region_uuid, uuid)) 2289 continue; 2290 found = 1; 2291 } 2292 2293 if (reg->hr_item_pinned || reg->hr_item_dropped) 2294 goto skip_pin; 2295 2296 /* Ignore ENOENT only for local hb (userdlm domain) */ 2297 ret = o2nm_depend_item(®->hr_item); 2298 if (!ret) { 2299 mlog(ML_CLUSTER, "Pin region %s\n", uuid); 2300 reg->hr_item_pinned = 1; 2301 } else { 2302 if (ret == -ENOENT && found) 2303 ret = 0; 2304 else { 2305 mlog(ML_ERROR, "Pin region %s fails with %d\n", 2306 uuid, ret); 2307 break; 2308 } 2309 } 2310 skip_pin: 2311 if (found) 2312 break; 2313 } 2314 2315 return ret; 2316 } 2317 2318 /* 2319 * In local heartbeat mode, region_uuid passed matches the dlm domain name. 2320 * In global heartbeat mode, region_uuid passed is NULL. 2321 * 2322 * In local, we only unpin the matching region. In global we unpin all the 2323 * active regions. 2324 */ 2325 static void o2hb_region_unpin(const char *region_uuid) 2326 { 2327 struct o2hb_region *reg; 2328 char *uuid; 2329 int found = 0; 2330 2331 assert_spin_locked(&o2hb_live_lock); 2332 2333 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { 2334 if (reg->hr_item_dropped) 2335 continue; 2336 2337 uuid = config_item_name(®->hr_item); 2338 if (region_uuid) { 2339 if (strcmp(region_uuid, uuid)) 2340 continue; 2341 found = 1; 2342 } 2343 2344 if (reg->hr_item_pinned) { 2345 mlog(ML_CLUSTER, "Unpin region %s\n", uuid); 2346 o2nm_undepend_item(®->hr_item); 2347 reg->hr_item_pinned = 0; 2348 } 2349 if (found) 2350 break; 2351 } 2352 } 2353 2354 static int o2hb_region_inc_user(const char *region_uuid) 2355 { 2356 int ret = 0; 2357 2358 spin_lock(&o2hb_live_lock); 2359 2360 /* local heartbeat */ 2361 if (!o2hb_global_heartbeat_active()) { 2362 ret = o2hb_region_pin(region_uuid); 2363 goto unlock; 2364 } 2365 2366 /* 2367 * if global heartbeat active and this is the first dependent user, 2368 * pin all regions if quorum region count <= CUT_OFF 2369 */ 2370 o2hb_dependent_users++; 2371 if (o2hb_dependent_users > 1) 2372 goto unlock; 2373 2374 if (bitmap_weight(o2hb_quorum_region_bitmap, 2375 O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) 2376 ret = o2hb_region_pin(NULL); 2377 2378 unlock: 2379 spin_unlock(&o2hb_live_lock); 2380 return ret; 2381 } 2382 2383 static void o2hb_region_dec_user(const char *region_uuid) 2384 { 2385 spin_lock(&o2hb_live_lock); 2386 2387 /* local heartbeat */ 2388 if (!o2hb_global_heartbeat_active()) { 2389 o2hb_region_unpin(region_uuid); 2390 goto unlock; 2391 } 2392 2393 /* 2394 * if global heartbeat active and there are no dependent users, 2395 * unpin all quorum regions 2396 */ 2397 o2hb_dependent_users--; 2398 if (!o2hb_dependent_users) 2399 o2hb_region_unpin(NULL); 2400 2401 unlock: 2402 spin_unlock(&o2hb_live_lock); 2403 } 2404 2405 int o2hb_register_callback(const char *region_uuid, 2406 struct o2hb_callback_func *hc) 2407 { 2408 struct o2hb_callback_func *f; 2409 struct o2hb_callback *hbcall; 2410 int ret; 2411 2412 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 2413 BUG_ON(!list_empty(&hc->hc_item)); 2414 2415 hbcall = hbcall_from_type(hc->hc_type); 2416 if (IS_ERR(hbcall)) { 2417 ret = PTR_ERR(hbcall); 2418 goto out; 2419 } 2420 2421 if (region_uuid) { 2422 ret = o2hb_region_inc_user(region_uuid); 2423 if (ret) { 2424 mlog_errno(ret); 2425 goto out; 2426 } 2427 } 2428 2429 down_write(&o2hb_callback_sem); 2430 2431 list_for_each_entry(f, &hbcall->list, hc_item) { 2432 if (hc->hc_priority < f->hc_priority) { 2433 list_add_tail(&hc->hc_item, &f->hc_item); 2434 break; 2435 } 2436 } 2437 if (list_empty(&hc->hc_item)) 2438 list_add_tail(&hc->hc_item, &hbcall->list); 2439 2440 up_write(&o2hb_callback_sem); 2441 ret = 0; 2442 out: 2443 mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n", 2444 ret, __builtin_return_address(0), hc); 2445 return ret; 2446 } 2447 EXPORT_SYMBOL_GPL(o2hb_register_callback); 2448 2449 void o2hb_unregister_callback(const char *region_uuid, 2450 struct o2hb_callback_func *hc) 2451 { 2452 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 2453 2454 mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n", 2455 __builtin_return_address(0), hc); 2456 2457 /* XXX Can this happen _with_ a region reference? */ 2458 if (list_empty(&hc->hc_item)) 2459 return; 2460 2461 if (region_uuid) 2462 o2hb_region_dec_user(region_uuid); 2463 2464 down_write(&o2hb_callback_sem); 2465 2466 list_del_init(&hc->hc_item); 2467 2468 up_write(&o2hb_callback_sem); 2469 } 2470 EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 2471 2472 int o2hb_check_node_heartbeating_no_sem(u8 node_num) 2473 { 2474 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 2475 2476 spin_lock(&o2hb_live_lock); 2477 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 2478 spin_unlock(&o2hb_live_lock); 2479 if (!test_bit(node_num, testing_map)) { 2480 mlog(ML_HEARTBEAT, 2481 "node (%u) does not have heartbeating enabled.\n", 2482 node_num); 2483 return 0; 2484 } 2485 2486 return 1; 2487 } 2488 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem); 2489 2490 int o2hb_check_node_heartbeating_from_callback(u8 node_num) 2491 { 2492 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 2493 2494 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 2495 if (!test_bit(node_num, testing_map)) { 2496 mlog(ML_HEARTBEAT, 2497 "node (%u) does not have heartbeating enabled.\n", 2498 node_num); 2499 return 0; 2500 } 2501 2502 return 1; 2503 } 2504 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 2505 2506 /* 2507 * this is just a hack until we get the plumbing which flips file systems 2508 * read only and drops the hb ref instead of killing the node dead. 2509 */ 2510 void o2hb_stop_all_regions(void) 2511 { 2512 struct o2hb_region *reg; 2513 2514 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 2515 2516 spin_lock(&o2hb_live_lock); 2517 2518 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 2519 reg->hr_unclean_stop = 1; 2520 2521 spin_unlock(&o2hb_live_lock); 2522 } 2523 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 2524 2525 int o2hb_get_all_regions(char *region_uuids, u8 max_regions) 2526 { 2527 struct o2hb_region *reg; 2528 int numregs = 0; 2529 char *p; 2530 2531 spin_lock(&o2hb_live_lock); 2532 2533 p = region_uuids; 2534 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { 2535 if (reg->hr_item_dropped) 2536 continue; 2537 2538 mlog(0, "Region: %s\n", config_item_name(®->hr_item)); 2539 if (numregs < max_regions) { 2540 memcpy(p, config_item_name(®->hr_item), 2541 O2HB_MAX_REGION_NAME_LEN); 2542 p += O2HB_MAX_REGION_NAME_LEN; 2543 } 2544 numregs++; 2545 } 2546 2547 spin_unlock(&o2hb_live_lock); 2548 2549 return numregs; 2550 } 2551 EXPORT_SYMBOL_GPL(o2hb_get_all_regions); 2552 2553 int o2hb_global_heartbeat_active(void) 2554 { 2555 return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL); 2556 } 2557 EXPORT_SYMBOL(o2hb_global_heartbeat_active); 2558