xref: /openbmc/linux/fs/ocfs2/cluster/heartbeat.c (revision 384740dc)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21 
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36 
37 #include "heartbeat.h"
38 #include "tcp.h"
39 #include "nodemanager.h"
40 #include "quorum.h"
41 
42 #include "masklog.h"
43 
44 
45 /*
46  * The first heartbeat pass had one global thread that would serialize all hb
47  * callback calls.  This global serializing sem should only be removed once
48  * we've made sure that all callees can deal with being called concurrently
49  * from multiple hb region threads.
50  */
51 static DECLARE_RWSEM(o2hb_callback_sem);
52 
53 /*
54  * multiple hb threads are watching multiple regions.  A node is live
55  * whenever any of the threads sees activity from the node in its region.
56  */
57 static DEFINE_SPINLOCK(o2hb_live_lock);
58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
60 static LIST_HEAD(o2hb_node_events);
61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
62 
63 static LIST_HEAD(o2hb_all_regions);
64 
65 static struct o2hb_callback {
66 	struct list_head list;
67 } o2hb_callbacks[O2HB_NUM_CB];
68 
69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
70 
71 #define O2HB_DEFAULT_BLOCK_BITS       9
72 
73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
74 
75 /* Only sets a new threshold if there are no active regions.
76  *
77  * No locking or otherwise interesting code is required for reading
78  * o2hb_dead_threshold as it can't change once regions are active and
79  * it's not interesting to anyone until then anyway. */
80 static void o2hb_dead_threshold_set(unsigned int threshold)
81 {
82 	if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
83 		spin_lock(&o2hb_live_lock);
84 		if (list_empty(&o2hb_all_regions))
85 			o2hb_dead_threshold = threshold;
86 		spin_unlock(&o2hb_live_lock);
87 	}
88 }
89 
90 struct o2hb_node_event {
91 	struct list_head        hn_item;
92 	enum o2hb_callback_type hn_event_type;
93 	struct o2nm_node        *hn_node;
94 	int                     hn_node_num;
95 };
96 
97 struct o2hb_disk_slot {
98 	struct o2hb_disk_heartbeat_block *ds_raw_block;
99 	u8			ds_node_num;
100 	u64			ds_last_time;
101 	u64			ds_last_generation;
102 	u16			ds_equal_samples;
103 	u16			ds_changed_samples;
104 	struct list_head	ds_live_item;
105 };
106 
107 /* each thread owns a region.. when we're asked to tear down the region
108  * we ask the thread to stop, who cleans up the region */
109 struct o2hb_region {
110 	struct config_item	hr_item;
111 
112 	struct list_head	hr_all_item;
113 	unsigned		hr_unclean_stop:1;
114 
115 	/* protected by the hr_callback_sem */
116 	struct task_struct 	*hr_task;
117 
118 	unsigned int		hr_blocks;
119 	unsigned long long	hr_start_block;
120 
121 	unsigned int		hr_block_bits;
122 	unsigned int		hr_block_bytes;
123 
124 	unsigned int		hr_slots_per_page;
125 	unsigned int		hr_num_pages;
126 
127 	struct page             **hr_slot_data;
128 	struct block_device	*hr_bdev;
129 	struct o2hb_disk_slot	*hr_slots;
130 
131 	/* let the person setting up hb wait for it to return until it
132 	 * has reached a 'steady' state.  This will be fixed when we have
133 	 * a more complete api that doesn't lead to this sort of fragility. */
134 	atomic_t		hr_steady_iterations;
135 
136 	char			hr_dev_name[BDEVNAME_SIZE];
137 
138 	unsigned int		hr_timeout_ms;
139 
140 	/* randomized as the region goes up and down so that a node
141 	 * recognizes a node going up and down in one iteration */
142 	u64			hr_generation;
143 
144 	struct delayed_work	hr_write_timeout_work;
145 	unsigned long		hr_last_timeout_start;
146 
147 	/* Used during o2hb_check_slot to hold a copy of the block
148 	 * being checked because we temporarily have to zero out the
149 	 * crc field. */
150 	struct o2hb_disk_heartbeat_block *hr_tmp_block;
151 };
152 
153 struct o2hb_bio_wait_ctxt {
154 	atomic_t          wc_num_reqs;
155 	struct completion wc_io_complete;
156 	int               wc_error;
157 };
158 
159 static void o2hb_write_timeout(struct work_struct *work)
160 {
161 	struct o2hb_region *reg =
162 		container_of(work, struct o2hb_region,
163 			     hr_write_timeout_work.work);
164 
165 	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
166 	     "milliseconds\n", reg->hr_dev_name,
167 	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
168 	o2quo_disk_timeout();
169 }
170 
171 static void o2hb_arm_write_timeout(struct o2hb_region *reg)
172 {
173 	mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
174 
175 	cancel_delayed_work(&reg->hr_write_timeout_work);
176 	reg->hr_last_timeout_start = jiffies;
177 	schedule_delayed_work(&reg->hr_write_timeout_work,
178 			      msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
179 }
180 
181 static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
182 {
183 	cancel_delayed_work(&reg->hr_write_timeout_work);
184 	flush_scheduled_work();
185 }
186 
187 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
188 {
189 	atomic_set(&wc->wc_num_reqs, 1);
190 	init_completion(&wc->wc_io_complete);
191 	wc->wc_error = 0;
192 }
193 
194 /* Used in error paths too */
195 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
196 				     unsigned int num)
197 {
198 	/* sadly atomic_sub_and_test() isn't available on all platforms.  The
199 	 * good news is that the fast path only completes one at a time */
200 	while(num--) {
201 		if (atomic_dec_and_test(&wc->wc_num_reqs)) {
202 			BUG_ON(num > 0);
203 			complete(&wc->wc_io_complete);
204 		}
205 	}
206 }
207 
208 static void o2hb_wait_on_io(struct o2hb_region *reg,
209 			    struct o2hb_bio_wait_ctxt *wc)
210 {
211 	struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
212 
213 	blk_run_address_space(mapping);
214 	o2hb_bio_wait_dec(wc, 1);
215 
216 	wait_for_completion(&wc->wc_io_complete);
217 }
218 
219 static void o2hb_bio_end_io(struct bio *bio,
220 			   int error)
221 {
222 	struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
223 
224 	if (error) {
225 		mlog(ML_ERROR, "IO Error %d\n", error);
226 		wc->wc_error = error;
227 	}
228 
229 	o2hb_bio_wait_dec(wc, 1);
230 	bio_put(bio);
231 }
232 
233 /* Setup a Bio to cover I/O against num_slots slots starting at
234  * start_slot. */
235 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
236 				      struct o2hb_bio_wait_ctxt *wc,
237 				      unsigned int *current_slot,
238 				      unsigned int max_slots)
239 {
240 	int len, current_page;
241 	unsigned int vec_len, vec_start;
242 	unsigned int bits = reg->hr_block_bits;
243 	unsigned int spp = reg->hr_slots_per_page;
244 	unsigned int cs = *current_slot;
245 	struct bio *bio;
246 	struct page *page;
247 
248 	/* Testing has shown this allocation to take long enough under
249 	 * GFP_KERNEL that the local node can get fenced. It would be
250 	 * nicest if we could pre-allocate these bios and avoid this
251 	 * all together. */
252 	bio = bio_alloc(GFP_ATOMIC, 16);
253 	if (!bio) {
254 		mlog(ML_ERROR, "Could not alloc slots BIO!\n");
255 		bio = ERR_PTR(-ENOMEM);
256 		goto bail;
257 	}
258 
259 	/* Must put everything in 512 byte sectors for the bio... */
260 	bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
261 	bio->bi_bdev = reg->hr_bdev;
262 	bio->bi_private = wc;
263 	bio->bi_end_io = o2hb_bio_end_io;
264 
265 	vec_start = (cs << bits) % PAGE_CACHE_SIZE;
266 	while(cs < max_slots) {
267 		current_page = cs / spp;
268 		page = reg->hr_slot_data[current_page];
269 
270 		vec_len = min(PAGE_CACHE_SIZE - vec_start,
271 			      (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
272 
273 		mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
274 		     current_page, vec_len, vec_start);
275 
276 		len = bio_add_page(bio, page, vec_len, vec_start);
277 		if (len != vec_len) break;
278 
279 		cs += vec_len / (PAGE_CACHE_SIZE/spp);
280 		vec_start = 0;
281 	}
282 
283 bail:
284 	*current_slot = cs;
285 	return bio;
286 }
287 
288 static int o2hb_read_slots(struct o2hb_region *reg,
289 			   unsigned int max_slots)
290 {
291 	unsigned int current_slot=0;
292 	int status;
293 	struct o2hb_bio_wait_ctxt wc;
294 	struct bio *bio;
295 
296 	o2hb_bio_wait_init(&wc);
297 
298 	while(current_slot < max_slots) {
299 		bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
300 		if (IS_ERR(bio)) {
301 			status = PTR_ERR(bio);
302 			mlog_errno(status);
303 			goto bail_and_wait;
304 		}
305 
306 		atomic_inc(&wc.wc_num_reqs);
307 		submit_bio(READ, bio);
308 	}
309 
310 	status = 0;
311 
312 bail_and_wait:
313 	o2hb_wait_on_io(reg, &wc);
314 	if (wc.wc_error && !status)
315 		status = wc.wc_error;
316 
317 	return status;
318 }
319 
320 static int o2hb_issue_node_write(struct o2hb_region *reg,
321 				 struct o2hb_bio_wait_ctxt *write_wc)
322 {
323 	int status;
324 	unsigned int slot;
325 	struct bio *bio;
326 
327 	o2hb_bio_wait_init(write_wc);
328 
329 	slot = o2nm_this_node();
330 
331 	bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
332 	if (IS_ERR(bio)) {
333 		status = PTR_ERR(bio);
334 		mlog_errno(status);
335 		goto bail;
336 	}
337 
338 	atomic_inc(&write_wc->wc_num_reqs);
339 	submit_bio(WRITE, bio);
340 
341 	status = 0;
342 bail:
343 	return status;
344 }
345 
346 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
347 				     struct o2hb_disk_heartbeat_block *hb_block)
348 {
349 	__le32 old_cksum;
350 	u32 ret;
351 
352 	/* We want to compute the block crc with a 0 value in the
353 	 * hb_cksum field. Save it off here and replace after the
354 	 * crc. */
355 	old_cksum = hb_block->hb_cksum;
356 	hb_block->hb_cksum = 0;
357 
358 	ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
359 
360 	hb_block->hb_cksum = old_cksum;
361 
362 	return ret;
363 }
364 
365 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
366 {
367 	mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
368 	     "cksum = 0x%x, generation 0x%llx\n",
369 	     (long long)le64_to_cpu(hb_block->hb_seq),
370 	     hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
371 	     (long long)le64_to_cpu(hb_block->hb_generation));
372 }
373 
374 static int o2hb_verify_crc(struct o2hb_region *reg,
375 			   struct o2hb_disk_heartbeat_block *hb_block)
376 {
377 	u32 read, computed;
378 
379 	read = le32_to_cpu(hb_block->hb_cksum);
380 	computed = o2hb_compute_block_crc_le(reg, hb_block);
381 
382 	return read == computed;
383 }
384 
385 /* We want to make sure that nobody is heartbeating on top of us --
386  * this will help detect an invalid configuration. */
387 static int o2hb_check_last_timestamp(struct o2hb_region *reg)
388 {
389 	int node_num, ret;
390 	struct o2hb_disk_slot *slot;
391 	struct o2hb_disk_heartbeat_block *hb_block;
392 
393 	node_num = o2nm_this_node();
394 
395 	ret = 1;
396 	slot = &reg->hr_slots[node_num];
397 	/* Don't check on our 1st timestamp */
398 	if (slot->ds_last_time) {
399 		hb_block = slot->ds_raw_block;
400 
401 		if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)
402 			ret = 0;
403 	}
404 
405 	return ret;
406 }
407 
408 static inline void o2hb_prepare_block(struct o2hb_region *reg,
409 				      u64 generation)
410 {
411 	int node_num;
412 	u64 cputime;
413 	struct o2hb_disk_slot *slot;
414 	struct o2hb_disk_heartbeat_block *hb_block;
415 
416 	node_num = o2nm_this_node();
417 	slot = &reg->hr_slots[node_num];
418 
419 	hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
420 	memset(hb_block, 0, reg->hr_block_bytes);
421 	/* TODO: time stuff */
422 	cputime = CURRENT_TIME.tv_sec;
423 	if (!cputime)
424 		cputime = 1;
425 
426 	hb_block->hb_seq = cpu_to_le64(cputime);
427 	hb_block->hb_node = node_num;
428 	hb_block->hb_generation = cpu_to_le64(generation);
429 	hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
430 
431 	/* This step must always happen last! */
432 	hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
433 								   hb_block));
434 
435 	mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
436 	     (long long)generation,
437 	     le32_to_cpu(hb_block->hb_cksum));
438 }
439 
440 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
441 				struct o2nm_node *node,
442 				int idx)
443 {
444 	struct list_head *iter;
445 	struct o2hb_callback_func *f;
446 
447 	list_for_each(iter, &hbcall->list) {
448 		f = list_entry(iter, struct o2hb_callback_func, hc_item);
449 		mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
450 		(f->hc_func)(node, idx, f->hc_data);
451 	}
452 }
453 
454 /* Will run the list in order until we process the passed event */
455 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
456 {
457 	int empty;
458 	struct o2hb_callback *hbcall;
459 	struct o2hb_node_event *event;
460 
461 	spin_lock(&o2hb_live_lock);
462 	empty = list_empty(&queued_event->hn_item);
463 	spin_unlock(&o2hb_live_lock);
464 	if (empty)
465 		return;
466 
467 	/* Holding callback sem assures we don't alter the callback
468 	 * lists when doing this, and serializes ourselves with other
469 	 * processes wanting callbacks. */
470 	down_write(&o2hb_callback_sem);
471 
472 	spin_lock(&o2hb_live_lock);
473 	while (!list_empty(&o2hb_node_events)
474 	       && !list_empty(&queued_event->hn_item)) {
475 		event = list_entry(o2hb_node_events.next,
476 				   struct o2hb_node_event,
477 				   hn_item);
478 		list_del_init(&event->hn_item);
479 		spin_unlock(&o2hb_live_lock);
480 
481 		mlog(ML_HEARTBEAT, "Node %s event for %d\n",
482 		     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
483 		     event->hn_node_num);
484 
485 		hbcall = hbcall_from_type(event->hn_event_type);
486 
487 		/* We should *never* have gotten on to the list with a
488 		 * bad type... This isn't something that we should try
489 		 * to recover from. */
490 		BUG_ON(IS_ERR(hbcall));
491 
492 		o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
493 
494 		spin_lock(&o2hb_live_lock);
495 	}
496 	spin_unlock(&o2hb_live_lock);
497 
498 	up_write(&o2hb_callback_sem);
499 }
500 
501 static void o2hb_queue_node_event(struct o2hb_node_event *event,
502 				  enum o2hb_callback_type type,
503 				  struct o2nm_node *node,
504 				  int node_num)
505 {
506 	assert_spin_locked(&o2hb_live_lock);
507 
508 	event->hn_event_type = type;
509 	event->hn_node = node;
510 	event->hn_node_num = node_num;
511 
512 	mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
513 	     type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
514 
515 	list_add_tail(&event->hn_item, &o2hb_node_events);
516 }
517 
518 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
519 {
520 	struct o2hb_node_event event =
521 		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
522 	struct o2nm_node *node;
523 
524 	node = o2nm_get_node_by_num(slot->ds_node_num);
525 	if (!node)
526 		return;
527 
528 	spin_lock(&o2hb_live_lock);
529 	if (!list_empty(&slot->ds_live_item)) {
530 		mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
531 		     slot->ds_node_num);
532 
533 		list_del_init(&slot->ds_live_item);
534 
535 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
536 			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
537 
538 			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
539 					      slot->ds_node_num);
540 		}
541 	}
542 	spin_unlock(&o2hb_live_lock);
543 
544 	o2hb_run_event_list(&event);
545 
546 	o2nm_node_put(node);
547 }
548 
549 static int o2hb_check_slot(struct o2hb_region *reg,
550 			   struct o2hb_disk_slot *slot)
551 {
552 	int changed = 0, gen_changed = 0;
553 	struct o2hb_node_event event =
554 		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
555 	struct o2nm_node *node;
556 	struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
557 	u64 cputime;
558 	unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
559 	unsigned int slot_dead_ms;
560 
561 	memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
562 
563 	/* Is this correct? Do we assume that the node doesn't exist
564 	 * if we're not configured for him? */
565 	node = o2nm_get_node_by_num(slot->ds_node_num);
566 	if (!node)
567 		return 0;
568 
569 	if (!o2hb_verify_crc(reg, hb_block)) {
570 		/* all paths from here will drop o2hb_live_lock for
571 		 * us. */
572 		spin_lock(&o2hb_live_lock);
573 
574 		/* Don't print an error on the console in this case -
575 		 * a freshly formatted heartbeat area will not have a
576 		 * crc set on it. */
577 		if (list_empty(&slot->ds_live_item))
578 			goto out;
579 
580 		/* The node is live but pushed out a bad crc. We
581 		 * consider it a transient miss but don't populate any
582 		 * other values as they may be junk. */
583 		mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
584 		     slot->ds_node_num, reg->hr_dev_name);
585 		o2hb_dump_slot(hb_block);
586 
587 		slot->ds_equal_samples++;
588 		goto fire_callbacks;
589 	}
590 
591 	/* we don't care if these wrap.. the state transitions below
592 	 * clear at the right places */
593 	cputime = le64_to_cpu(hb_block->hb_seq);
594 	if (slot->ds_last_time != cputime)
595 		slot->ds_changed_samples++;
596 	else
597 		slot->ds_equal_samples++;
598 	slot->ds_last_time = cputime;
599 
600 	/* The node changed heartbeat generations. We assume this to
601 	 * mean it dropped off but came back before we timed out. We
602 	 * want to consider it down for the time being but don't want
603 	 * to lose any changed_samples state we might build up to
604 	 * considering it live again. */
605 	if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
606 		gen_changed = 1;
607 		slot->ds_equal_samples = 0;
608 		mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
609 		     "to 0x%llx)\n", slot->ds_node_num,
610 		     (long long)slot->ds_last_generation,
611 		     (long long)le64_to_cpu(hb_block->hb_generation));
612 	}
613 
614 	slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
615 
616 	mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
617 	     "seq %llu last %llu changed %u equal %u\n",
618 	     slot->ds_node_num, (long long)slot->ds_last_generation,
619 	     le32_to_cpu(hb_block->hb_cksum),
620 	     (unsigned long long)le64_to_cpu(hb_block->hb_seq),
621 	     (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
622 	     slot->ds_equal_samples);
623 
624 	spin_lock(&o2hb_live_lock);
625 
626 fire_callbacks:
627 	/* dead nodes only come to life after some number of
628 	 * changes at any time during their dead time */
629 	if (list_empty(&slot->ds_live_item) &&
630 	    slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
631 		mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
632 		     slot->ds_node_num, (long long)slot->ds_last_generation);
633 
634 		/* first on the list generates a callback */
635 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
636 			set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
637 
638 			o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
639 					      slot->ds_node_num);
640 
641 			changed = 1;
642 		}
643 
644 		list_add_tail(&slot->ds_live_item,
645 			      &o2hb_live_slots[slot->ds_node_num]);
646 
647 		slot->ds_equal_samples = 0;
648 
649 		/* We want to be sure that all nodes agree on the
650 		 * number of milliseconds before a node will be
651 		 * considered dead. The self-fencing timeout is
652 		 * computed from this value, and a discrepancy might
653 		 * result in heartbeat calling a node dead when it
654 		 * hasn't self-fenced yet. */
655 		slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
656 		if (slot_dead_ms && slot_dead_ms != dead_ms) {
657 			/* TODO: Perhaps we can fail the region here. */
658 			mlog(ML_ERROR, "Node %d on device %s has a dead count "
659 			     "of %u ms, but our count is %u ms.\n"
660 			     "Please double check your configuration values "
661 			     "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
662 			     slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
663 			     dead_ms);
664 		}
665 		goto out;
666 	}
667 
668 	/* if the list is dead, we're done.. */
669 	if (list_empty(&slot->ds_live_item))
670 		goto out;
671 
672 	/* live nodes only go dead after enough consequtive missed
673 	 * samples..  reset the missed counter whenever we see
674 	 * activity */
675 	if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
676 		mlog(ML_HEARTBEAT, "Node %d left my region\n",
677 		     slot->ds_node_num);
678 
679 		/* last off the live_slot generates a callback */
680 		list_del_init(&slot->ds_live_item);
681 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
682 			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
683 
684 			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
685 					      slot->ds_node_num);
686 
687 			changed = 1;
688 		}
689 
690 		/* We don't clear this because the node is still
691 		 * actually writing new blocks. */
692 		if (!gen_changed)
693 			slot->ds_changed_samples = 0;
694 		goto out;
695 	}
696 	if (slot->ds_changed_samples) {
697 		slot->ds_changed_samples = 0;
698 		slot->ds_equal_samples = 0;
699 	}
700 out:
701 	spin_unlock(&o2hb_live_lock);
702 
703 	o2hb_run_event_list(&event);
704 
705 	o2nm_node_put(node);
706 	return changed;
707 }
708 
709 /* This could be faster if we just implmented a find_last_bit, but I
710  * don't think the circumstances warrant it. */
711 static int o2hb_highest_node(unsigned long *nodes,
712 			     int numbits)
713 {
714 	int highest, node;
715 
716 	highest = numbits;
717 	node = -1;
718 	while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
719 		if (node >= numbits)
720 			break;
721 
722 		highest = node;
723 	}
724 
725 	return highest;
726 }
727 
728 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
729 {
730 	int i, ret, highest_node, change = 0;
731 	unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
732 	struct o2hb_bio_wait_ctxt write_wc;
733 
734 	ret = o2nm_configured_node_map(configured_nodes,
735 				       sizeof(configured_nodes));
736 	if (ret) {
737 		mlog_errno(ret);
738 		return ret;
739 	}
740 
741 	highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
742 	if (highest_node >= O2NM_MAX_NODES) {
743 		mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
744 		return -EINVAL;
745 	}
746 
747 	/* No sense in reading the slots of nodes that don't exist
748 	 * yet. Of course, if the node definitions have holes in them
749 	 * then we're reading an empty slot anyway... Consider this
750 	 * best-effort. */
751 	ret = o2hb_read_slots(reg, highest_node + 1);
752 	if (ret < 0) {
753 		mlog_errno(ret);
754 		return ret;
755 	}
756 
757 	/* With an up to date view of the slots, we can check that no
758 	 * other node has been improperly configured to heartbeat in
759 	 * our slot. */
760 	if (!o2hb_check_last_timestamp(reg))
761 		mlog(ML_ERROR, "Device \"%s\": another node is heartbeating "
762 		     "in our slot!\n", reg->hr_dev_name);
763 
764 	/* fill in the proper info for our next heartbeat */
765 	o2hb_prepare_block(reg, reg->hr_generation);
766 
767 	/* And fire off the write. Note that we don't wait on this I/O
768 	 * until later. */
769 	ret = o2hb_issue_node_write(reg, &write_wc);
770 	if (ret < 0) {
771 		mlog_errno(ret);
772 		return ret;
773 	}
774 
775 	i = -1;
776 	while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
777 
778 		change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
779 	}
780 
781 	/*
782 	 * We have to be sure we've advertised ourselves on disk
783 	 * before we can go to steady state.  This ensures that
784 	 * people we find in our steady state have seen us.
785 	 */
786 	o2hb_wait_on_io(reg, &write_wc);
787 	if (write_wc.wc_error) {
788 		/* Do not re-arm the write timeout on I/O error - we
789 		 * can't be sure that the new block ever made it to
790 		 * disk */
791 		mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
792 		     write_wc.wc_error, reg->hr_dev_name);
793 		return write_wc.wc_error;
794 	}
795 
796 	o2hb_arm_write_timeout(reg);
797 
798 	/* let the person who launched us know when things are steady */
799 	if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
800 		if (atomic_dec_and_test(&reg->hr_steady_iterations))
801 			wake_up(&o2hb_steady_queue);
802 	}
803 
804 	return 0;
805 }
806 
807 /* Subtract b from a, storing the result in a. a *must* have a larger
808  * value than b. */
809 static void o2hb_tv_subtract(struct timeval *a,
810 			     struct timeval *b)
811 {
812 	/* just return 0 when a is after b */
813 	if (a->tv_sec < b->tv_sec ||
814 	    (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
815 		a->tv_sec = 0;
816 		a->tv_usec = 0;
817 		return;
818 	}
819 
820 	a->tv_sec -= b->tv_sec;
821 	a->tv_usec -= b->tv_usec;
822 	while ( a->tv_usec < 0 ) {
823 		a->tv_sec--;
824 		a->tv_usec += 1000000;
825 	}
826 }
827 
828 static unsigned int o2hb_elapsed_msecs(struct timeval *start,
829 				       struct timeval *end)
830 {
831 	struct timeval res = *end;
832 
833 	o2hb_tv_subtract(&res, start);
834 
835 	return res.tv_sec * 1000 + res.tv_usec / 1000;
836 }
837 
838 /*
839  * we ride the region ref that the region dir holds.  before the region
840  * dir is removed and drops it ref it will wait to tear down this
841  * thread.
842  */
843 static int o2hb_thread(void *data)
844 {
845 	int i, ret;
846 	struct o2hb_region *reg = data;
847 	struct o2hb_bio_wait_ctxt write_wc;
848 	struct timeval before_hb, after_hb;
849 	unsigned int elapsed_msec;
850 
851 	mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
852 
853 	set_user_nice(current, -20);
854 
855 	while (!kthread_should_stop() && !reg->hr_unclean_stop) {
856 		/* We track the time spent inside
857 		 * o2hb_do_disk_heartbeat so that we avoid more then
858 		 * hr_timeout_ms between disk writes. On busy systems
859 		 * this should result in a heartbeat which is less
860 		 * likely to time itself out. */
861 		do_gettimeofday(&before_hb);
862 
863 		i = 0;
864 		do {
865 			ret = o2hb_do_disk_heartbeat(reg);
866 		} while (ret && ++i < 2);
867 
868 		do_gettimeofday(&after_hb);
869 		elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
870 
871 		mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
872 		     before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
873 		     after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
874 		     elapsed_msec);
875 
876 		if (elapsed_msec < reg->hr_timeout_ms) {
877 			/* the kthread api has blocked signals for us so no
878 			 * need to record the return value. */
879 			msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
880 		}
881 	}
882 
883 	o2hb_disarm_write_timeout(reg);
884 
885 	/* unclean stop is only used in very bad situation */
886 	for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
887 		o2hb_shutdown_slot(&reg->hr_slots[i]);
888 
889 	/* Explicit down notification - avoid forcing the other nodes
890 	 * to timeout on this region when we could just as easily
891 	 * write a clear generation - thus indicating to them that
892 	 * this node has left this region.
893 	 *
894 	 * XXX: Should we skip this on unclean_stop? */
895 	o2hb_prepare_block(reg, 0);
896 	ret = o2hb_issue_node_write(reg, &write_wc);
897 	if (ret == 0) {
898 		o2hb_wait_on_io(reg, &write_wc);
899 	} else {
900 		mlog_errno(ret);
901 	}
902 
903 	mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
904 
905 	return 0;
906 }
907 
908 void o2hb_init(void)
909 {
910 	int i;
911 
912 	for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
913 		INIT_LIST_HEAD(&o2hb_callbacks[i].list);
914 
915 	for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
916 		INIT_LIST_HEAD(&o2hb_live_slots[i]);
917 
918 	INIT_LIST_HEAD(&o2hb_node_events);
919 
920 	memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
921 }
922 
923 /* if we're already in a callback then we're already serialized by the sem */
924 static void o2hb_fill_node_map_from_callback(unsigned long *map,
925 					     unsigned bytes)
926 {
927 	BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
928 
929 	memcpy(map, &o2hb_live_node_bitmap, bytes);
930 }
931 
932 /*
933  * get a map of all nodes that are heartbeating in any regions
934  */
935 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
936 {
937 	/* callers want to serialize this map and callbacks so that they
938 	 * can trust that they don't miss nodes coming to the party */
939 	down_read(&o2hb_callback_sem);
940 	spin_lock(&o2hb_live_lock);
941 	o2hb_fill_node_map_from_callback(map, bytes);
942 	spin_unlock(&o2hb_live_lock);
943 	up_read(&o2hb_callback_sem);
944 }
945 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
946 
947 /*
948  * heartbeat configfs bits.  The heartbeat set is a default set under
949  * the cluster set in nodemanager.c.
950  */
951 
952 static struct o2hb_region *to_o2hb_region(struct config_item *item)
953 {
954 	return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
955 }
956 
957 /* drop_item only drops its ref after killing the thread, nothing should
958  * be using the region anymore.  this has to clean up any state that
959  * attributes might have built up. */
960 static void o2hb_region_release(struct config_item *item)
961 {
962 	int i;
963 	struct page *page;
964 	struct o2hb_region *reg = to_o2hb_region(item);
965 
966 	if (reg->hr_tmp_block)
967 		kfree(reg->hr_tmp_block);
968 
969 	if (reg->hr_slot_data) {
970 		for (i = 0; i < reg->hr_num_pages; i++) {
971 			page = reg->hr_slot_data[i];
972 			if (page)
973 				__free_page(page);
974 		}
975 		kfree(reg->hr_slot_data);
976 	}
977 
978 	if (reg->hr_bdev)
979 		blkdev_put(reg->hr_bdev);
980 
981 	if (reg->hr_slots)
982 		kfree(reg->hr_slots);
983 
984 	spin_lock(&o2hb_live_lock);
985 	list_del(&reg->hr_all_item);
986 	spin_unlock(&o2hb_live_lock);
987 
988 	kfree(reg);
989 }
990 
991 static int o2hb_read_block_input(struct o2hb_region *reg,
992 				 const char *page,
993 				 size_t count,
994 				 unsigned long *ret_bytes,
995 				 unsigned int *ret_bits)
996 {
997 	unsigned long bytes;
998 	char *p = (char *)page;
999 
1000 	bytes = simple_strtoul(p, &p, 0);
1001 	if (!p || (*p && (*p != '\n')))
1002 		return -EINVAL;
1003 
1004 	/* Heartbeat and fs min / max block sizes are the same. */
1005 	if (bytes > 4096 || bytes < 512)
1006 		return -ERANGE;
1007 	if (hweight16(bytes) != 1)
1008 		return -EINVAL;
1009 
1010 	if (ret_bytes)
1011 		*ret_bytes = bytes;
1012 	if (ret_bits)
1013 		*ret_bits = ffs(bytes) - 1;
1014 
1015 	return 0;
1016 }
1017 
1018 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1019 					    char *page)
1020 {
1021 	return sprintf(page, "%u\n", reg->hr_block_bytes);
1022 }
1023 
1024 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1025 					     const char *page,
1026 					     size_t count)
1027 {
1028 	int status;
1029 	unsigned long block_bytes;
1030 	unsigned int block_bits;
1031 
1032 	if (reg->hr_bdev)
1033 		return -EINVAL;
1034 
1035 	status = o2hb_read_block_input(reg, page, count,
1036 				       &block_bytes, &block_bits);
1037 	if (status)
1038 		return status;
1039 
1040 	reg->hr_block_bytes = (unsigned int)block_bytes;
1041 	reg->hr_block_bits = block_bits;
1042 
1043 	return count;
1044 }
1045 
1046 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1047 					    char *page)
1048 {
1049 	return sprintf(page, "%llu\n", reg->hr_start_block);
1050 }
1051 
1052 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1053 					     const char *page,
1054 					     size_t count)
1055 {
1056 	unsigned long long tmp;
1057 	char *p = (char *)page;
1058 
1059 	if (reg->hr_bdev)
1060 		return -EINVAL;
1061 
1062 	tmp = simple_strtoull(p, &p, 0);
1063 	if (!p || (*p && (*p != '\n')))
1064 		return -EINVAL;
1065 
1066 	reg->hr_start_block = tmp;
1067 
1068 	return count;
1069 }
1070 
1071 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1072 				       char *page)
1073 {
1074 	return sprintf(page, "%d\n", reg->hr_blocks);
1075 }
1076 
1077 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1078 					const char *page,
1079 					size_t count)
1080 {
1081 	unsigned long tmp;
1082 	char *p = (char *)page;
1083 
1084 	if (reg->hr_bdev)
1085 		return -EINVAL;
1086 
1087 	tmp = simple_strtoul(p, &p, 0);
1088 	if (!p || (*p && (*p != '\n')))
1089 		return -EINVAL;
1090 
1091 	if (tmp > O2NM_MAX_NODES || tmp == 0)
1092 		return -ERANGE;
1093 
1094 	reg->hr_blocks = (unsigned int)tmp;
1095 
1096 	return count;
1097 }
1098 
1099 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1100 				    char *page)
1101 {
1102 	unsigned int ret = 0;
1103 
1104 	if (reg->hr_bdev)
1105 		ret = sprintf(page, "%s\n", reg->hr_dev_name);
1106 
1107 	return ret;
1108 }
1109 
1110 static void o2hb_init_region_params(struct o2hb_region *reg)
1111 {
1112 	reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1113 	reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1114 
1115 	mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1116 	     reg->hr_start_block, reg->hr_blocks);
1117 	mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1118 	     reg->hr_block_bytes, reg->hr_block_bits);
1119 	mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1120 	mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1121 }
1122 
1123 static int o2hb_map_slot_data(struct o2hb_region *reg)
1124 {
1125 	int i, j;
1126 	unsigned int last_slot;
1127 	unsigned int spp = reg->hr_slots_per_page;
1128 	struct page *page;
1129 	char *raw;
1130 	struct o2hb_disk_slot *slot;
1131 
1132 	reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1133 	if (reg->hr_tmp_block == NULL) {
1134 		mlog_errno(-ENOMEM);
1135 		return -ENOMEM;
1136 	}
1137 
1138 	reg->hr_slots = kcalloc(reg->hr_blocks,
1139 				sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1140 	if (reg->hr_slots == NULL) {
1141 		mlog_errno(-ENOMEM);
1142 		return -ENOMEM;
1143 	}
1144 
1145 	for(i = 0; i < reg->hr_blocks; i++) {
1146 		slot = &reg->hr_slots[i];
1147 		slot->ds_node_num = i;
1148 		INIT_LIST_HEAD(&slot->ds_live_item);
1149 		slot->ds_raw_block = NULL;
1150 	}
1151 
1152 	reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1153 	mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1154 			   "at %u blocks per page\n",
1155 	     reg->hr_num_pages, reg->hr_blocks, spp);
1156 
1157 	reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1158 				    GFP_KERNEL);
1159 	if (!reg->hr_slot_data) {
1160 		mlog_errno(-ENOMEM);
1161 		return -ENOMEM;
1162 	}
1163 
1164 	for(i = 0; i < reg->hr_num_pages; i++) {
1165 		page = alloc_page(GFP_KERNEL);
1166 		if (!page) {
1167 			mlog_errno(-ENOMEM);
1168 			return -ENOMEM;
1169 		}
1170 
1171 		reg->hr_slot_data[i] = page;
1172 
1173 		last_slot = i * spp;
1174 		raw = page_address(page);
1175 		for (j = 0;
1176 		     (j < spp) && ((j + last_slot) < reg->hr_blocks);
1177 		     j++) {
1178 			BUG_ON((j + last_slot) >= reg->hr_blocks);
1179 
1180 			slot = &reg->hr_slots[j + last_slot];
1181 			slot->ds_raw_block =
1182 				(struct o2hb_disk_heartbeat_block *) raw;
1183 
1184 			raw += reg->hr_block_bytes;
1185 		}
1186 	}
1187 
1188 	return 0;
1189 }
1190 
1191 /* Read in all the slots available and populate the tracking
1192  * structures so that we can start with a baseline idea of what's
1193  * there. */
1194 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1195 {
1196 	int ret, i;
1197 	struct o2hb_disk_slot *slot;
1198 	struct o2hb_disk_heartbeat_block *hb_block;
1199 
1200 	mlog_entry_void();
1201 
1202 	ret = o2hb_read_slots(reg, reg->hr_blocks);
1203 	if (ret) {
1204 		mlog_errno(ret);
1205 		goto out;
1206 	}
1207 
1208 	/* We only want to get an idea of the values initially in each
1209 	 * slot, so we do no verification - o2hb_check_slot will
1210 	 * actually determine if each configured slot is valid and
1211 	 * whether any values have changed. */
1212 	for(i = 0; i < reg->hr_blocks; i++) {
1213 		slot = &reg->hr_slots[i];
1214 		hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1215 
1216 		/* Only fill the values that o2hb_check_slot uses to
1217 		 * determine changing slots */
1218 		slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1219 		slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1220 	}
1221 
1222 out:
1223 	mlog_exit(ret);
1224 	return ret;
1225 }
1226 
1227 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1228 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1229 				     const char *page,
1230 				     size_t count)
1231 {
1232 	struct task_struct *hb_task;
1233 	long fd;
1234 	int sectsize;
1235 	char *p = (char *)page;
1236 	struct file *filp = NULL;
1237 	struct inode *inode = NULL;
1238 	ssize_t ret = -EINVAL;
1239 
1240 	if (reg->hr_bdev)
1241 		goto out;
1242 
1243 	/* We can't heartbeat without having had our node number
1244 	 * configured yet. */
1245 	if (o2nm_this_node() == O2NM_MAX_NODES)
1246 		goto out;
1247 
1248 	fd = simple_strtol(p, &p, 0);
1249 	if (!p || (*p && (*p != '\n')))
1250 		goto out;
1251 
1252 	if (fd < 0 || fd >= INT_MAX)
1253 		goto out;
1254 
1255 	filp = fget(fd);
1256 	if (filp == NULL)
1257 		goto out;
1258 
1259 	if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1260 	    reg->hr_block_bytes == 0)
1261 		goto out;
1262 
1263 	inode = igrab(filp->f_mapping->host);
1264 	if (inode == NULL)
1265 		goto out;
1266 
1267 	if (!S_ISBLK(inode->i_mode))
1268 		goto out;
1269 
1270 	reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1271 	ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0);
1272 	if (ret) {
1273 		reg->hr_bdev = NULL;
1274 		goto out;
1275 	}
1276 	inode = NULL;
1277 
1278 	bdevname(reg->hr_bdev, reg->hr_dev_name);
1279 
1280 	sectsize = bdev_hardsect_size(reg->hr_bdev);
1281 	if (sectsize != reg->hr_block_bytes) {
1282 		mlog(ML_ERROR,
1283 		     "blocksize %u incorrect for device, expected %d",
1284 		     reg->hr_block_bytes, sectsize);
1285 		ret = -EINVAL;
1286 		goto out;
1287 	}
1288 
1289 	o2hb_init_region_params(reg);
1290 
1291 	/* Generation of zero is invalid */
1292 	do {
1293 		get_random_bytes(&reg->hr_generation,
1294 				 sizeof(reg->hr_generation));
1295 	} while (reg->hr_generation == 0);
1296 
1297 	ret = o2hb_map_slot_data(reg);
1298 	if (ret) {
1299 		mlog_errno(ret);
1300 		goto out;
1301 	}
1302 
1303 	ret = o2hb_populate_slot_data(reg);
1304 	if (ret) {
1305 		mlog_errno(ret);
1306 		goto out;
1307 	}
1308 
1309 	INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1310 
1311 	/*
1312 	 * A node is considered live after it has beat LIVE_THRESHOLD
1313 	 * times.  We're not steady until we've given them a chance
1314 	 * _after_ our first read.
1315 	 */
1316 	atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
1317 
1318 	hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1319 			      reg->hr_item.ci_name);
1320 	if (IS_ERR(hb_task)) {
1321 		ret = PTR_ERR(hb_task);
1322 		mlog_errno(ret);
1323 		goto out;
1324 	}
1325 
1326 	spin_lock(&o2hb_live_lock);
1327 	reg->hr_task = hb_task;
1328 	spin_unlock(&o2hb_live_lock);
1329 
1330 	ret = wait_event_interruptible(o2hb_steady_queue,
1331 				atomic_read(&reg->hr_steady_iterations) == 0);
1332 	if (ret) {
1333 		/* We got interrupted (hello ptrace!).  Clean up */
1334 		spin_lock(&o2hb_live_lock);
1335 		hb_task = reg->hr_task;
1336 		reg->hr_task = NULL;
1337 		spin_unlock(&o2hb_live_lock);
1338 
1339 		if (hb_task)
1340 			kthread_stop(hb_task);
1341 		goto out;
1342 	}
1343 
1344 	/* Ok, we were woken.  Make sure it wasn't by drop_item() */
1345 	spin_lock(&o2hb_live_lock);
1346 	hb_task = reg->hr_task;
1347 	spin_unlock(&o2hb_live_lock);
1348 
1349 	if (hb_task)
1350 		ret = count;
1351 	else
1352 		ret = -EIO;
1353 
1354 out:
1355 	if (filp)
1356 		fput(filp);
1357 	if (inode)
1358 		iput(inode);
1359 	if (ret < 0) {
1360 		if (reg->hr_bdev) {
1361 			blkdev_put(reg->hr_bdev);
1362 			reg->hr_bdev = NULL;
1363 		}
1364 	}
1365 	return ret;
1366 }
1367 
1368 static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
1369                                       char *page)
1370 {
1371 	pid_t pid = 0;
1372 
1373 	spin_lock(&o2hb_live_lock);
1374 	if (reg->hr_task)
1375 		pid = task_pid_nr(reg->hr_task);
1376 	spin_unlock(&o2hb_live_lock);
1377 
1378 	if (!pid)
1379 		return 0;
1380 
1381 	return sprintf(page, "%u\n", pid);
1382 }
1383 
1384 struct o2hb_region_attribute {
1385 	struct configfs_attribute attr;
1386 	ssize_t (*show)(struct o2hb_region *, char *);
1387 	ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1388 };
1389 
1390 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1391 	.attr	= { .ca_owner = THIS_MODULE,
1392 		    .ca_name = "block_bytes",
1393 		    .ca_mode = S_IRUGO | S_IWUSR },
1394 	.show	= o2hb_region_block_bytes_read,
1395 	.store	= o2hb_region_block_bytes_write,
1396 };
1397 
1398 static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1399 	.attr	= { .ca_owner = THIS_MODULE,
1400 		    .ca_name = "start_block",
1401 		    .ca_mode = S_IRUGO | S_IWUSR },
1402 	.show	= o2hb_region_start_block_read,
1403 	.store	= o2hb_region_start_block_write,
1404 };
1405 
1406 static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1407 	.attr	= { .ca_owner = THIS_MODULE,
1408 		    .ca_name = "blocks",
1409 		    .ca_mode = S_IRUGO | S_IWUSR },
1410 	.show	= o2hb_region_blocks_read,
1411 	.store	= o2hb_region_blocks_write,
1412 };
1413 
1414 static struct o2hb_region_attribute o2hb_region_attr_dev = {
1415 	.attr	= { .ca_owner = THIS_MODULE,
1416 		    .ca_name = "dev",
1417 		    .ca_mode = S_IRUGO | S_IWUSR },
1418 	.show	= o2hb_region_dev_read,
1419 	.store	= o2hb_region_dev_write,
1420 };
1421 
1422 static struct o2hb_region_attribute o2hb_region_attr_pid = {
1423        .attr   = { .ca_owner = THIS_MODULE,
1424                    .ca_name = "pid",
1425                    .ca_mode = S_IRUGO | S_IRUSR },
1426        .show   = o2hb_region_pid_read,
1427 };
1428 
1429 static struct configfs_attribute *o2hb_region_attrs[] = {
1430 	&o2hb_region_attr_block_bytes.attr,
1431 	&o2hb_region_attr_start_block.attr,
1432 	&o2hb_region_attr_blocks.attr,
1433 	&o2hb_region_attr_dev.attr,
1434 	&o2hb_region_attr_pid.attr,
1435 	NULL,
1436 };
1437 
1438 static ssize_t o2hb_region_show(struct config_item *item,
1439 				struct configfs_attribute *attr,
1440 				char *page)
1441 {
1442 	struct o2hb_region *reg = to_o2hb_region(item);
1443 	struct o2hb_region_attribute *o2hb_region_attr =
1444 		container_of(attr, struct o2hb_region_attribute, attr);
1445 	ssize_t ret = 0;
1446 
1447 	if (o2hb_region_attr->show)
1448 		ret = o2hb_region_attr->show(reg, page);
1449 	return ret;
1450 }
1451 
1452 static ssize_t o2hb_region_store(struct config_item *item,
1453 				 struct configfs_attribute *attr,
1454 				 const char *page, size_t count)
1455 {
1456 	struct o2hb_region *reg = to_o2hb_region(item);
1457 	struct o2hb_region_attribute *o2hb_region_attr =
1458 		container_of(attr, struct o2hb_region_attribute, attr);
1459 	ssize_t ret = -EINVAL;
1460 
1461 	if (o2hb_region_attr->store)
1462 		ret = o2hb_region_attr->store(reg, page, count);
1463 	return ret;
1464 }
1465 
1466 static struct configfs_item_operations o2hb_region_item_ops = {
1467 	.release		= o2hb_region_release,
1468 	.show_attribute		= o2hb_region_show,
1469 	.store_attribute	= o2hb_region_store,
1470 };
1471 
1472 static struct config_item_type o2hb_region_type = {
1473 	.ct_item_ops	= &o2hb_region_item_ops,
1474 	.ct_attrs	= o2hb_region_attrs,
1475 	.ct_owner	= THIS_MODULE,
1476 };
1477 
1478 /* heartbeat set */
1479 
1480 struct o2hb_heartbeat_group {
1481 	struct config_group hs_group;
1482 	/* some stuff? */
1483 };
1484 
1485 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1486 {
1487 	return group ?
1488 		container_of(group, struct o2hb_heartbeat_group, hs_group)
1489 		: NULL;
1490 }
1491 
1492 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1493 							  const char *name)
1494 {
1495 	struct o2hb_region *reg = NULL;
1496 
1497 	reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
1498 	if (reg == NULL)
1499 		return ERR_PTR(-ENOMEM);
1500 
1501 	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
1502 
1503 	spin_lock(&o2hb_live_lock);
1504 	list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
1505 	spin_unlock(&o2hb_live_lock);
1506 
1507 	return &reg->hr_item;
1508 }
1509 
1510 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
1511 					   struct config_item *item)
1512 {
1513 	struct task_struct *hb_task;
1514 	struct o2hb_region *reg = to_o2hb_region(item);
1515 
1516 	/* stop the thread when the user removes the region dir */
1517 	spin_lock(&o2hb_live_lock);
1518 	hb_task = reg->hr_task;
1519 	reg->hr_task = NULL;
1520 	spin_unlock(&o2hb_live_lock);
1521 
1522 	if (hb_task)
1523 		kthread_stop(hb_task);
1524 
1525 	/*
1526 	 * If we're racing a dev_write(), we need to wake them.  They will
1527 	 * check reg->hr_task
1528 	 */
1529 	if (atomic_read(&reg->hr_steady_iterations) != 0) {
1530 		atomic_set(&reg->hr_steady_iterations, 0);
1531 		wake_up(&o2hb_steady_queue);
1532 	}
1533 
1534 	config_item_put(item);
1535 }
1536 
1537 struct o2hb_heartbeat_group_attribute {
1538 	struct configfs_attribute attr;
1539 	ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
1540 	ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
1541 };
1542 
1543 static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
1544 					 struct configfs_attribute *attr,
1545 					 char *page)
1546 {
1547 	struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1548 	struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1549 		container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1550 	ssize_t ret = 0;
1551 
1552 	if (o2hb_heartbeat_group_attr->show)
1553 		ret = o2hb_heartbeat_group_attr->show(reg, page);
1554 	return ret;
1555 }
1556 
1557 static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
1558 					  struct configfs_attribute *attr,
1559 					  const char *page, size_t count)
1560 {
1561 	struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1562 	struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1563 		container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1564 	ssize_t ret = -EINVAL;
1565 
1566 	if (o2hb_heartbeat_group_attr->store)
1567 		ret = o2hb_heartbeat_group_attr->store(reg, page, count);
1568 	return ret;
1569 }
1570 
1571 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
1572 						     char *page)
1573 {
1574 	return sprintf(page, "%u\n", o2hb_dead_threshold);
1575 }
1576 
1577 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
1578 						    const char *page,
1579 						    size_t count)
1580 {
1581 	unsigned long tmp;
1582 	char *p = (char *)page;
1583 
1584 	tmp = simple_strtoul(p, &p, 10);
1585 	if (!p || (*p && (*p != '\n')))
1586                 return -EINVAL;
1587 
1588 	/* this will validate ranges for us. */
1589 	o2hb_dead_threshold_set((unsigned int) tmp);
1590 
1591 	return count;
1592 }
1593 
1594 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
1595 	.attr	= { .ca_owner = THIS_MODULE,
1596 		    .ca_name = "dead_threshold",
1597 		    .ca_mode = S_IRUGO | S_IWUSR },
1598 	.show	= o2hb_heartbeat_group_threshold_show,
1599 	.store	= o2hb_heartbeat_group_threshold_store,
1600 };
1601 
1602 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
1603 	&o2hb_heartbeat_group_attr_threshold.attr,
1604 	NULL,
1605 };
1606 
1607 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
1608 	.show_attribute		= o2hb_heartbeat_group_show,
1609 	.store_attribute	= o2hb_heartbeat_group_store,
1610 };
1611 
1612 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
1613 	.make_item	= o2hb_heartbeat_group_make_item,
1614 	.drop_item	= o2hb_heartbeat_group_drop_item,
1615 };
1616 
1617 static struct config_item_type o2hb_heartbeat_group_type = {
1618 	.ct_group_ops	= &o2hb_heartbeat_group_group_ops,
1619 	.ct_item_ops	= &o2hb_hearbeat_group_item_ops,
1620 	.ct_attrs	= o2hb_heartbeat_group_attrs,
1621 	.ct_owner	= THIS_MODULE,
1622 };
1623 
1624 /* this is just here to avoid touching group in heartbeat.h which the
1625  * entire damn world #includes */
1626 struct config_group *o2hb_alloc_hb_set(void)
1627 {
1628 	struct o2hb_heartbeat_group *hs = NULL;
1629 	struct config_group *ret = NULL;
1630 
1631 	hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1632 	if (hs == NULL)
1633 		goto out;
1634 
1635 	config_group_init_type_name(&hs->hs_group, "heartbeat",
1636 				    &o2hb_heartbeat_group_type);
1637 
1638 	ret = &hs->hs_group;
1639 out:
1640 	if (ret == NULL)
1641 		kfree(hs);
1642 	return ret;
1643 }
1644 
1645 void o2hb_free_hb_set(struct config_group *group)
1646 {
1647 	struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
1648 	kfree(hs);
1649 }
1650 
1651 /* hb callback registration and issueing */
1652 
1653 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
1654 {
1655 	if (type == O2HB_NUM_CB)
1656 		return ERR_PTR(-EINVAL);
1657 
1658 	return &o2hb_callbacks[type];
1659 }
1660 
1661 void o2hb_setup_callback(struct o2hb_callback_func *hc,
1662 			 enum o2hb_callback_type type,
1663 			 o2hb_cb_func *func,
1664 			 void *data,
1665 			 int priority)
1666 {
1667 	INIT_LIST_HEAD(&hc->hc_item);
1668 	hc->hc_func = func;
1669 	hc->hc_data = data;
1670 	hc->hc_priority = priority;
1671 	hc->hc_type = type;
1672 	hc->hc_magic = O2HB_CB_MAGIC;
1673 }
1674 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
1675 
1676 static struct o2hb_region *o2hb_find_region(const char *region_uuid)
1677 {
1678 	struct o2hb_region *p, *reg = NULL;
1679 
1680 	assert_spin_locked(&o2hb_live_lock);
1681 
1682 	list_for_each_entry(p, &o2hb_all_regions, hr_all_item) {
1683 		if (!strcmp(region_uuid, config_item_name(&p->hr_item))) {
1684 			reg = p;
1685 			break;
1686 		}
1687 	}
1688 
1689 	return reg;
1690 }
1691 
1692 static int o2hb_region_get(const char *region_uuid)
1693 {
1694 	int ret = 0;
1695 	struct o2hb_region *reg;
1696 
1697 	spin_lock(&o2hb_live_lock);
1698 
1699 	reg = o2hb_find_region(region_uuid);
1700 	if (!reg)
1701 		ret = -ENOENT;
1702 	spin_unlock(&o2hb_live_lock);
1703 
1704 	if (ret)
1705 		goto out;
1706 
1707 	ret = o2nm_depend_this_node();
1708 	if (ret)
1709 		goto out;
1710 
1711 	ret = o2nm_depend_item(&reg->hr_item);
1712 	if (ret)
1713 		o2nm_undepend_this_node();
1714 
1715 out:
1716 	return ret;
1717 }
1718 
1719 static void o2hb_region_put(const char *region_uuid)
1720 {
1721 	struct o2hb_region *reg;
1722 
1723 	spin_lock(&o2hb_live_lock);
1724 
1725 	reg = o2hb_find_region(region_uuid);
1726 
1727 	spin_unlock(&o2hb_live_lock);
1728 
1729 	if (reg) {
1730 		o2nm_undepend_item(&reg->hr_item);
1731 		o2nm_undepend_this_node();
1732 	}
1733 }
1734 
1735 int o2hb_register_callback(const char *region_uuid,
1736 			   struct o2hb_callback_func *hc)
1737 {
1738 	struct o2hb_callback_func *tmp;
1739 	struct list_head *iter;
1740 	struct o2hb_callback *hbcall;
1741 	int ret;
1742 
1743 	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1744 	BUG_ON(!list_empty(&hc->hc_item));
1745 
1746 	hbcall = hbcall_from_type(hc->hc_type);
1747 	if (IS_ERR(hbcall)) {
1748 		ret = PTR_ERR(hbcall);
1749 		goto out;
1750 	}
1751 
1752 	if (region_uuid) {
1753 		ret = o2hb_region_get(region_uuid);
1754 		if (ret)
1755 			goto out;
1756 	}
1757 
1758 	down_write(&o2hb_callback_sem);
1759 
1760 	list_for_each(iter, &hbcall->list) {
1761 		tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
1762 		if (hc->hc_priority < tmp->hc_priority) {
1763 			list_add_tail(&hc->hc_item, iter);
1764 			break;
1765 		}
1766 	}
1767 	if (list_empty(&hc->hc_item))
1768 		list_add_tail(&hc->hc_item, &hbcall->list);
1769 
1770 	up_write(&o2hb_callback_sem);
1771 	ret = 0;
1772 out:
1773 	mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n",
1774 	     ret, __builtin_return_address(0), hc);
1775 	return ret;
1776 }
1777 EXPORT_SYMBOL_GPL(o2hb_register_callback);
1778 
1779 void o2hb_unregister_callback(const char *region_uuid,
1780 			      struct o2hb_callback_func *hc)
1781 {
1782 	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1783 
1784 	mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
1785 	     __builtin_return_address(0), hc);
1786 
1787 	/* XXX Can this happen _with_ a region reference? */
1788 	if (list_empty(&hc->hc_item))
1789 		return;
1790 
1791 	if (region_uuid)
1792 		o2hb_region_put(region_uuid);
1793 
1794 	down_write(&o2hb_callback_sem);
1795 
1796 	list_del_init(&hc->hc_item);
1797 
1798 	up_write(&o2hb_callback_sem);
1799 }
1800 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
1801 
1802 int o2hb_check_node_heartbeating(u8 node_num)
1803 {
1804 	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1805 
1806 	o2hb_fill_node_map(testing_map, sizeof(testing_map));
1807 	if (!test_bit(node_num, testing_map)) {
1808 		mlog(ML_HEARTBEAT,
1809 		     "node (%u) does not have heartbeating enabled.\n",
1810 		     node_num);
1811 		return 0;
1812 	}
1813 
1814 	return 1;
1815 }
1816 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
1817 
1818 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
1819 {
1820 	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1821 
1822 	o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
1823 	if (!test_bit(node_num, testing_map)) {
1824 		mlog(ML_HEARTBEAT,
1825 		     "node (%u) does not have heartbeating enabled.\n",
1826 		     node_num);
1827 		return 0;
1828 	}
1829 
1830 	return 1;
1831 }
1832 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
1833 
1834 /* Makes sure our local node is configured with a node number, and is
1835  * heartbeating. */
1836 int o2hb_check_local_node_heartbeating(void)
1837 {
1838 	u8 node_num;
1839 
1840 	/* if this node was set then we have networking */
1841 	node_num = o2nm_this_node();
1842 	if (node_num == O2NM_MAX_NODES) {
1843 		mlog(ML_HEARTBEAT, "this node has not been configured.\n");
1844 		return 0;
1845 	}
1846 
1847 	return o2hb_check_node_heartbeating(node_num);
1848 }
1849 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
1850 
1851 /*
1852  * this is just a hack until we get the plumbing which flips file systems
1853  * read only and drops the hb ref instead of killing the node dead.
1854  */
1855 void o2hb_stop_all_regions(void)
1856 {
1857 	struct o2hb_region *reg;
1858 
1859 	mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
1860 
1861 	spin_lock(&o2hb_live_lock);
1862 
1863 	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
1864 		reg->hr_unclean_stop = 1;
1865 
1866 	spin_unlock(&o2hb_live_lock);
1867 }
1868 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
1869