xref: /openbmc/linux/drivers/md/dm-raid1.c (revision b454cc66)
1 /*
2  * Copyright (C) 2003 Sistina Software Limited.
3  *
4  * This file is released under the GPL.
5  */
6 
7 #include "dm.h"
8 #include "dm-bio-list.h"
9 #include "dm-io.h"
10 #include "dm-log.h"
11 #include "kcopyd.h"
12 
13 #include <linux/ctype.h>
14 #include <linux/init.h>
15 #include <linux/mempool.h>
16 #include <linux/module.h>
17 #include <linux/pagemap.h>
18 #include <linux/slab.h>
19 #include <linux/time.h>
20 #include <linux/vmalloc.h>
21 #include <linux/workqueue.h>
22 
23 #define DM_MSG_PREFIX "raid1"
24 
25 static struct workqueue_struct *_kmirrord_wq;
26 static struct work_struct _kmirrord_work;
27 static DECLARE_WAIT_QUEUE_HEAD(_kmirrord_recovery_stopped);
28 
29 static inline void wake(void)
30 {
31 	queue_work(_kmirrord_wq, &_kmirrord_work);
32 }
33 
34 /*-----------------------------------------------------------------
35  * Region hash
36  *
37  * The mirror splits itself up into discrete regions.  Each
38  * region can be in one of three states: clean, dirty,
39  * nosync.  There is no need to put clean regions in the hash.
40  *
41  * In addition to being present in the hash table a region _may_
42  * be present on one of three lists.
43  *
44  *   clean_regions: Regions on this list have no io pending to
45  *   them, they are in sync, we are no longer interested in them,
46  *   they are dull.  rh_update_states() will remove them from the
47  *   hash table.
48  *
49  *   quiesced_regions: These regions have been spun down, ready
50  *   for recovery.  rh_recovery_start() will remove regions from
51  *   this list and hand them to kmirrord, which will schedule the
52  *   recovery io with kcopyd.
53  *
54  *   recovered_regions: Regions that kcopyd has successfully
55  *   recovered.  rh_update_states() will now schedule any delayed
56  *   io, up the recovery_count, and remove the region from the
57  *   hash.
58  *
59  * There are 2 locks:
60  *   A rw spin lock 'hash_lock' protects just the hash table,
61  *   this is never held in write mode from interrupt context,
62  *   which I believe means that we only have to disable irqs when
63  *   doing a write lock.
64  *
65  *   An ordinary spin lock 'region_lock' that protects the three
66  *   lists in the region_hash, with the 'state', 'list' and
67  *   'bhs_delayed' fields of the regions.  This is used from irq
68  *   context, so all other uses will have to suspend local irqs.
69  *---------------------------------------------------------------*/
70 struct mirror_set;
71 struct region_hash {
72 	struct mirror_set *ms;
73 	uint32_t region_size;
74 	unsigned region_shift;
75 
76 	/* holds persistent region state */
77 	struct dirty_log *log;
78 
79 	/* hash table */
80 	rwlock_t hash_lock;
81 	mempool_t *region_pool;
82 	unsigned int mask;
83 	unsigned int nr_buckets;
84 	struct list_head *buckets;
85 
86 	spinlock_t region_lock;
87 	atomic_t recovery_in_flight;
88 	struct semaphore recovery_count;
89 	struct list_head clean_regions;
90 	struct list_head quiesced_regions;
91 	struct list_head recovered_regions;
92 };
93 
94 enum {
95 	RH_CLEAN,
96 	RH_DIRTY,
97 	RH_NOSYNC,
98 	RH_RECOVERING
99 };
100 
101 struct region {
102 	struct region_hash *rh;	/* FIXME: can we get rid of this ? */
103 	region_t key;
104 	int state;
105 
106 	struct list_head hash_list;
107 	struct list_head list;
108 
109 	atomic_t pending;
110 	struct bio_list delayed_bios;
111 };
112 
113 
114 /*-----------------------------------------------------------------
115  * Mirror set structures.
116  *---------------------------------------------------------------*/
117 struct mirror {
118 	atomic_t error_count;
119 	struct dm_dev *dev;
120 	sector_t offset;
121 };
122 
123 struct mirror_set {
124 	struct dm_target *ti;
125 	struct list_head list;
126 	struct region_hash rh;
127 	struct kcopyd_client *kcopyd_client;
128 
129 	spinlock_t lock;	/* protects the next two lists */
130 	struct bio_list reads;
131 	struct bio_list writes;
132 
133 	/* recovery */
134 	region_t nr_regions;
135 	int in_sync;
136 
137 	struct mirror *default_mirror;	/* Default mirror */
138 
139 	unsigned int nr_mirrors;
140 	struct mirror mirror[0];
141 };
142 
143 /*
144  * Conversion fns
145  */
146 static inline region_t bio_to_region(struct region_hash *rh, struct bio *bio)
147 {
148 	return (bio->bi_sector - rh->ms->ti->begin) >> rh->region_shift;
149 }
150 
151 static inline sector_t region_to_sector(struct region_hash *rh, region_t region)
152 {
153 	return region << rh->region_shift;
154 }
155 
156 /* FIXME move this */
157 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw);
158 
159 #define MIN_REGIONS 64
160 #define MAX_RECOVERY 1
161 static int rh_init(struct region_hash *rh, struct mirror_set *ms,
162 		   struct dirty_log *log, uint32_t region_size,
163 		   region_t nr_regions)
164 {
165 	unsigned int nr_buckets, max_buckets;
166 	size_t i;
167 
168 	/*
169 	 * Calculate a suitable number of buckets for our hash
170 	 * table.
171 	 */
172 	max_buckets = nr_regions >> 6;
173 	for (nr_buckets = 128u; nr_buckets < max_buckets; nr_buckets <<= 1)
174 		;
175 	nr_buckets >>= 1;
176 
177 	rh->ms = ms;
178 	rh->log = log;
179 	rh->region_size = region_size;
180 	rh->region_shift = ffs(region_size) - 1;
181 	rwlock_init(&rh->hash_lock);
182 	rh->mask = nr_buckets - 1;
183 	rh->nr_buckets = nr_buckets;
184 
185 	rh->buckets = vmalloc(nr_buckets * sizeof(*rh->buckets));
186 	if (!rh->buckets) {
187 		DMERR("unable to allocate region hash memory");
188 		return -ENOMEM;
189 	}
190 
191 	for (i = 0; i < nr_buckets; i++)
192 		INIT_LIST_HEAD(rh->buckets + i);
193 
194 	spin_lock_init(&rh->region_lock);
195 	sema_init(&rh->recovery_count, 0);
196 	atomic_set(&rh->recovery_in_flight, 0);
197 	INIT_LIST_HEAD(&rh->clean_regions);
198 	INIT_LIST_HEAD(&rh->quiesced_regions);
199 	INIT_LIST_HEAD(&rh->recovered_regions);
200 
201 	rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS,
202 						      sizeof(struct region));
203 	if (!rh->region_pool) {
204 		vfree(rh->buckets);
205 		rh->buckets = NULL;
206 		return -ENOMEM;
207 	}
208 
209 	return 0;
210 }
211 
212 static void rh_exit(struct region_hash *rh)
213 {
214 	unsigned int h;
215 	struct region *reg, *nreg;
216 
217 	BUG_ON(!list_empty(&rh->quiesced_regions));
218 	for (h = 0; h < rh->nr_buckets; h++) {
219 		list_for_each_entry_safe(reg, nreg, rh->buckets + h, hash_list) {
220 			BUG_ON(atomic_read(&reg->pending));
221 			mempool_free(reg, rh->region_pool);
222 		}
223 	}
224 
225 	if (rh->log)
226 		dm_destroy_dirty_log(rh->log);
227 	if (rh->region_pool)
228 		mempool_destroy(rh->region_pool);
229 	vfree(rh->buckets);
230 }
231 
232 #define RH_HASH_MULT 2654435387U
233 
234 static inline unsigned int rh_hash(struct region_hash *rh, region_t region)
235 {
236 	return (unsigned int) ((region * RH_HASH_MULT) >> 12) & rh->mask;
237 }
238 
239 static struct region *__rh_lookup(struct region_hash *rh, region_t region)
240 {
241 	struct region *reg;
242 
243 	list_for_each_entry (reg, rh->buckets + rh_hash(rh, region), hash_list)
244 		if (reg->key == region)
245 			return reg;
246 
247 	return NULL;
248 }
249 
250 static void __rh_insert(struct region_hash *rh, struct region *reg)
251 {
252 	unsigned int h = rh_hash(rh, reg->key);
253 	list_add(&reg->hash_list, rh->buckets + h);
254 }
255 
256 static struct region *__rh_alloc(struct region_hash *rh, region_t region)
257 {
258 	struct region *reg, *nreg;
259 
260 	read_unlock(&rh->hash_lock);
261 	nreg = mempool_alloc(rh->region_pool, GFP_ATOMIC);
262 	if (unlikely(!nreg))
263 		nreg = kmalloc(sizeof(struct region), GFP_NOIO);
264 	nreg->state = rh->log->type->in_sync(rh->log, region, 1) ?
265 		RH_CLEAN : RH_NOSYNC;
266 	nreg->rh = rh;
267 	nreg->key = region;
268 
269 	INIT_LIST_HEAD(&nreg->list);
270 
271 	atomic_set(&nreg->pending, 0);
272 	bio_list_init(&nreg->delayed_bios);
273 	write_lock_irq(&rh->hash_lock);
274 
275 	reg = __rh_lookup(rh, region);
276 	if (reg)
277 		/* we lost the race */
278 		mempool_free(nreg, rh->region_pool);
279 
280 	else {
281 		__rh_insert(rh, nreg);
282 		if (nreg->state == RH_CLEAN) {
283 			spin_lock(&rh->region_lock);
284 			list_add(&nreg->list, &rh->clean_regions);
285 			spin_unlock(&rh->region_lock);
286 		}
287 		reg = nreg;
288 	}
289 	write_unlock_irq(&rh->hash_lock);
290 	read_lock(&rh->hash_lock);
291 
292 	return reg;
293 }
294 
295 static inline struct region *__rh_find(struct region_hash *rh, region_t region)
296 {
297 	struct region *reg;
298 
299 	reg = __rh_lookup(rh, region);
300 	if (!reg)
301 		reg = __rh_alloc(rh, region);
302 
303 	return reg;
304 }
305 
306 static int rh_state(struct region_hash *rh, region_t region, int may_block)
307 {
308 	int r;
309 	struct region *reg;
310 
311 	read_lock(&rh->hash_lock);
312 	reg = __rh_lookup(rh, region);
313 	read_unlock(&rh->hash_lock);
314 
315 	if (reg)
316 		return reg->state;
317 
318 	/*
319 	 * The region wasn't in the hash, so we fall back to the
320 	 * dirty log.
321 	 */
322 	r = rh->log->type->in_sync(rh->log, region, may_block);
323 
324 	/*
325 	 * Any error from the dirty log (eg. -EWOULDBLOCK) gets
326 	 * taken as a RH_NOSYNC
327 	 */
328 	return r == 1 ? RH_CLEAN : RH_NOSYNC;
329 }
330 
331 static inline int rh_in_sync(struct region_hash *rh,
332 			     region_t region, int may_block)
333 {
334 	int state = rh_state(rh, region, may_block);
335 	return state == RH_CLEAN || state == RH_DIRTY;
336 }
337 
338 static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list)
339 {
340 	struct bio *bio;
341 
342 	while ((bio = bio_list_pop(bio_list))) {
343 		queue_bio(ms, bio, WRITE);
344 	}
345 }
346 
347 static void complete_resync_work(struct region *reg, int success)
348 {
349 	struct region_hash *rh = reg->rh;
350 
351 	rh->log->type->set_region_sync(rh->log, reg->key, success);
352 	dispatch_bios(rh->ms, &reg->delayed_bios);
353 	if (atomic_dec_and_test(&rh->recovery_in_flight))
354 		wake_up_all(&_kmirrord_recovery_stopped);
355 	up(&rh->recovery_count);
356 }
357 
358 static void rh_update_states(struct region_hash *rh)
359 {
360 	struct region *reg, *next;
361 
362 	LIST_HEAD(clean);
363 	LIST_HEAD(recovered);
364 
365 	/*
366 	 * Quickly grab the lists.
367 	 */
368 	write_lock_irq(&rh->hash_lock);
369 	spin_lock(&rh->region_lock);
370 	if (!list_empty(&rh->clean_regions)) {
371 		list_splice(&rh->clean_regions, &clean);
372 		INIT_LIST_HEAD(&rh->clean_regions);
373 
374 		list_for_each_entry (reg, &clean, list) {
375 			rh->log->type->clear_region(rh->log, reg->key);
376 			list_del(&reg->hash_list);
377 		}
378 	}
379 
380 	if (!list_empty(&rh->recovered_regions)) {
381 		list_splice(&rh->recovered_regions, &recovered);
382 		INIT_LIST_HEAD(&rh->recovered_regions);
383 
384 		list_for_each_entry (reg, &recovered, list)
385 			list_del(&reg->hash_list);
386 	}
387 	spin_unlock(&rh->region_lock);
388 	write_unlock_irq(&rh->hash_lock);
389 
390 	/*
391 	 * All the regions on the recovered and clean lists have
392 	 * now been pulled out of the system, so no need to do
393 	 * any more locking.
394 	 */
395 	list_for_each_entry_safe (reg, next, &recovered, list) {
396 		rh->log->type->clear_region(rh->log, reg->key);
397 		complete_resync_work(reg, 1);
398 		mempool_free(reg, rh->region_pool);
399 	}
400 
401 	if (!list_empty(&recovered))
402 		rh->log->type->flush(rh->log);
403 
404 	list_for_each_entry_safe (reg, next, &clean, list)
405 		mempool_free(reg, rh->region_pool);
406 }
407 
408 static void rh_inc(struct region_hash *rh, region_t region)
409 {
410 	struct region *reg;
411 
412 	read_lock(&rh->hash_lock);
413 	reg = __rh_find(rh, region);
414 
415 	spin_lock_irq(&rh->region_lock);
416 	atomic_inc(&reg->pending);
417 
418 	if (reg->state == RH_CLEAN) {
419 		reg->state = RH_DIRTY;
420 		list_del_init(&reg->list);	/* take off the clean list */
421 		spin_unlock_irq(&rh->region_lock);
422 
423 		rh->log->type->mark_region(rh->log, reg->key);
424 	} else
425 		spin_unlock_irq(&rh->region_lock);
426 
427 
428 	read_unlock(&rh->hash_lock);
429 }
430 
431 static void rh_inc_pending(struct region_hash *rh, struct bio_list *bios)
432 {
433 	struct bio *bio;
434 
435 	for (bio = bios->head; bio; bio = bio->bi_next)
436 		rh_inc(rh, bio_to_region(rh, bio));
437 }
438 
439 static void rh_dec(struct region_hash *rh, region_t region)
440 {
441 	unsigned long flags;
442 	struct region *reg;
443 	int should_wake = 0;
444 
445 	read_lock(&rh->hash_lock);
446 	reg = __rh_lookup(rh, region);
447 	read_unlock(&rh->hash_lock);
448 
449 	spin_lock_irqsave(&rh->region_lock, flags);
450 	if (atomic_dec_and_test(&reg->pending)) {
451 		/*
452 		 * There is no pending I/O for this region.
453 		 * We can move the region to corresponding list for next action.
454 		 * At this point, the region is not yet connected to any list.
455 		 *
456 		 * If the state is RH_NOSYNC, the region should be kept off
457 		 * from clean list.
458 		 * The hash entry for RH_NOSYNC will remain in memory
459 		 * until the region is recovered or the map is reloaded.
460 		 */
461 
462 		/* do nothing for RH_NOSYNC */
463 		if (reg->state == RH_RECOVERING) {
464 			list_add_tail(&reg->list, &rh->quiesced_regions);
465 		} else if (reg->state == RH_DIRTY) {
466 			reg->state = RH_CLEAN;
467 			list_add(&reg->list, &rh->clean_regions);
468 		}
469 		should_wake = 1;
470 	}
471 	spin_unlock_irqrestore(&rh->region_lock, flags);
472 
473 	if (should_wake)
474 		wake();
475 }
476 
477 /*
478  * Starts quiescing a region in preparation for recovery.
479  */
480 static int __rh_recovery_prepare(struct region_hash *rh)
481 {
482 	int r;
483 	struct region *reg;
484 	region_t region;
485 
486 	/*
487 	 * Ask the dirty log what's next.
488 	 */
489 	r = rh->log->type->get_resync_work(rh->log, &region);
490 	if (r <= 0)
491 		return r;
492 
493 	/*
494 	 * Get this region, and start it quiescing by setting the
495 	 * recovering flag.
496 	 */
497 	read_lock(&rh->hash_lock);
498 	reg = __rh_find(rh, region);
499 	read_unlock(&rh->hash_lock);
500 
501 	spin_lock_irq(&rh->region_lock);
502 	reg->state = RH_RECOVERING;
503 
504 	/* Already quiesced ? */
505 	if (atomic_read(&reg->pending))
506 		list_del_init(&reg->list);
507 	else
508 		list_move(&reg->list, &rh->quiesced_regions);
509 
510 	spin_unlock_irq(&rh->region_lock);
511 
512 	return 1;
513 }
514 
515 static void rh_recovery_prepare(struct region_hash *rh)
516 {
517 	/* Extra reference to avoid race with rh_stop_recovery */
518 	atomic_inc(&rh->recovery_in_flight);
519 
520 	while (!down_trylock(&rh->recovery_count)) {
521 		atomic_inc(&rh->recovery_in_flight);
522 		if (__rh_recovery_prepare(rh) <= 0) {
523 			atomic_dec(&rh->recovery_in_flight);
524 			up(&rh->recovery_count);
525 			break;
526 		}
527 	}
528 
529 	/* Drop the extra reference */
530 	if (atomic_dec_and_test(&rh->recovery_in_flight))
531 		wake_up_all(&_kmirrord_recovery_stopped);
532 }
533 
534 /*
535  * Returns any quiesced regions.
536  */
537 static struct region *rh_recovery_start(struct region_hash *rh)
538 {
539 	struct region *reg = NULL;
540 
541 	spin_lock_irq(&rh->region_lock);
542 	if (!list_empty(&rh->quiesced_regions)) {
543 		reg = list_entry(rh->quiesced_regions.next,
544 				 struct region, list);
545 		list_del_init(&reg->list);	/* remove from the quiesced list */
546 	}
547 	spin_unlock_irq(&rh->region_lock);
548 
549 	return reg;
550 }
551 
552 /* FIXME: success ignored for now */
553 static void rh_recovery_end(struct region *reg, int success)
554 {
555 	struct region_hash *rh = reg->rh;
556 
557 	spin_lock_irq(&rh->region_lock);
558 	list_add(&reg->list, &reg->rh->recovered_regions);
559 	spin_unlock_irq(&rh->region_lock);
560 
561 	wake();
562 }
563 
564 static void rh_flush(struct region_hash *rh)
565 {
566 	rh->log->type->flush(rh->log);
567 }
568 
569 static void rh_delay(struct region_hash *rh, struct bio *bio)
570 {
571 	struct region *reg;
572 
573 	read_lock(&rh->hash_lock);
574 	reg = __rh_find(rh, bio_to_region(rh, bio));
575 	bio_list_add(&reg->delayed_bios, bio);
576 	read_unlock(&rh->hash_lock);
577 }
578 
579 static void rh_stop_recovery(struct region_hash *rh)
580 {
581 	int i;
582 
583 	/* wait for any recovering regions */
584 	for (i = 0; i < MAX_RECOVERY; i++)
585 		down(&rh->recovery_count);
586 }
587 
588 static void rh_start_recovery(struct region_hash *rh)
589 {
590 	int i;
591 
592 	for (i = 0; i < MAX_RECOVERY; i++)
593 		up(&rh->recovery_count);
594 
595 	wake();
596 }
597 
598 /*
599  * Every mirror should look like this one.
600  */
601 #define DEFAULT_MIRROR 0
602 
603 /*
604  * This is yucky.  We squirrel the mirror_set struct away inside
605  * bi_next for write buffers.  This is safe since the bh
606  * doesn't get submitted to the lower levels of block layer.
607  */
608 static struct mirror_set *bio_get_ms(struct bio *bio)
609 {
610 	return (struct mirror_set *) bio->bi_next;
611 }
612 
613 static void bio_set_ms(struct bio *bio, struct mirror_set *ms)
614 {
615 	bio->bi_next = (struct bio *) ms;
616 }
617 
618 /*-----------------------------------------------------------------
619  * Recovery.
620  *
621  * When a mirror is first activated we may find that some regions
622  * are in the no-sync state.  We have to recover these by
623  * recopying from the default mirror to all the others.
624  *---------------------------------------------------------------*/
625 static void recovery_complete(int read_err, unsigned int write_err,
626 			      void *context)
627 {
628 	struct region *reg = (struct region *) context;
629 
630 	/* FIXME: better error handling */
631 	rh_recovery_end(reg, !(read_err || write_err));
632 }
633 
634 static int recover(struct mirror_set *ms, struct region *reg)
635 {
636 	int r;
637 	unsigned int i;
638 	struct io_region from, to[KCOPYD_MAX_REGIONS], *dest;
639 	struct mirror *m;
640 	unsigned long flags = 0;
641 
642 	/* fill in the source */
643 	m = ms->default_mirror;
644 	from.bdev = m->dev->bdev;
645 	from.sector = m->offset + region_to_sector(reg->rh, reg->key);
646 	if (reg->key == (ms->nr_regions - 1)) {
647 		/*
648 		 * The final region may be smaller than
649 		 * region_size.
650 		 */
651 		from.count = ms->ti->len & (reg->rh->region_size - 1);
652 		if (!from.count)
653 			from.count = reg->rh->region_size;
654 	} else
655 		from.count = reg->rh->region_size;
656 
657 	/* fill in the destinations */
658 	for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
659 		if (&ms->mirror[i] == ms->default_mirror)
660 			continue;
661 
662 		m = ms->mirror + i;
663 		dest->bdev = m->dev->bdev;
664 		dest->sector = m->offset + region_to_sector(reg->rh, reg->key);
665 		dest->count = from.count;
666 		dest++;
667 	}
668 
669 	/* hand to kcopyd */
670 	set_bit(KCOPYD_IGNORE_ERROR, &flags);
671 	r = kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to, flags,
672 			recovery_complete, reg);
673 
674 	return r;
675 }
676 
677 static void do_recovery(struct mirror_set *ms)
678 {
679 	int r;
680 	struct region *reg;
681 	struct dirty_log *log = ms->rh.log;
682 
683 	/*
684 	 * Start quiescing some regions.
685 	 */
686 	rh_recovery_prepare(&ms->rh);
687 
688 	/*
689 	 * Copy any already quiesced regions.
690 	 */
691 	while ((reg = rh_recovery_start(&ms->rh))) {
692 		r = recover(ms, reg);
693 		if (r)
694 			rh_recovery_end(reg, 0);
695 	}
696 
697 	/*
698 	 * Update the in sync flag.
699 	 */
700 	if (!ms->in_sync &&
701 	    (log->type->get_sync_count(log) == ms->nr_regions)) {
702 		/* the sync is complete */
703 		dm_table_event(ms->ti->table);
704 		ms->in_sync = 1;
705 	}
706 }
707 
708 /*-----------------------------------------------------------------
709  * Reads
710  *---------------------------------------------------------------*/
711 static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
712 {
713 	/* FIXME: add read balancing */
714 	return ms->default_mirror;
715 }
716 
717 /*
718  * remap a buffer to a particular mirror.
719  */
720 static void map_bio(struct mirror_set *ms, struct mirror *m, struct bio *bio)
721 {
722 	bio->bi_bdev = m->dev->bdev;
723 	bio->bi_sector = m->offset + (bio->bi_sector - ms->ti->begin);
724 }
725 
726 static void do_reads(struct mirror_set *ms, struct bio_list *reads)
727 {
728 	region_t region;
729 	struct bio *bio;
730 	struct mirror *m;
731 
732 	while ((bio = bio_list_pop(reads))) {
733 		region = bio_to_region(&ms->rh, bio);
734 
735 		/*
736 		 * We can only read balance if the region is in sync.
737 		 */
738 		if (rh_in_sync(&ms->rh, region, 0))
739 			m = choose_mirror(ms, bio->bi_sector);
740 		else
741 			m = ms->default_mirror;
742 
743 		map_bio(ms, m, bio);
744 		generic_make_request(bio);
745 	}
746 }
747 
748 /*-----------------------------------------------------------------
749  * Writes.
750  *
751  * We do different things with the write io depending on the
752  * state of the region that it's in:
753  *
754  * SYNC: 	increment pending, use kcopyd to write to *all* mirrors
755  * RECOVERING:	delay the io until recovery completes
756  * NOSYNC:	increment pending, just write to the default mirror
757  *---------------------------------------------------------------*/
758 static void write_callback(unsigned long error, void *context)
759 {
760 	unsigned int i;
761 	int uptodate = 1;
762 	struct bio *bio = (struct bio *) context;
763 	struct mirror_set *ms;
764 
765 	ms = bio_get_ms(bio);
766 	bio_set_ms(bio, NULL);
767 
768 	/*
769 	 * NOTE: We don't decrement the pending count here,
770 	 * instead it is done by the targets endio function.
771 	 * This way we handle both writes to SYNC and NOSYNC
772 	 * regions with the same code.
773 	 */
774 
775 	if (error) {
776 		/*
777 		 * only error the io if all mirrors failed.
778 		 * FIXME: bogus
779 		 */
780 		uptodate = 0;
781 		for (i = 0; i < ms->nr_mirrors; i++)
782 			if (!test_bit(i, &error)) {
783 				uptodate = 1;
784 				break;
785 			}
786 	}
787 	bio_endio(bio, bio->bi_size, 0);
788 }
789 
790 static void do_write(struct mirror_set *ms, struct bio *bio)
791 {
792 	unsigned int i;
793 	struct io_region io[KCOPYD_MAX_REGIONS+1];
794 	struct mirror *m;
795 
796 	for (i = 0; i < ms->nr_mirrors; i++) {
797 		m = ms->mirror + i;
798 
799 		io[i].bdev = m->dev->bdev;
800 		io[i].sector = m->offset + (bio->bi_sector - ms->ti->begin);
801 		io[i].count = bio->bi_size >> 9;
802 	}
803 
804 	bio_set_ms(bio, ms);
805 	dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
806 			 bio->bi_io_vec + bio->bi_idx,
807 			 write_callback, bio);
808 }
809 
810 static void do_writes(struct mirror_set *ms, struct bio_list *writes)
811 {
812 	int state;
813 	struct bio *bio;
814 	struct bio_list sync, nosync, recover, *this_list = NULL;
815 
816 	if (!writes->head)
817 		return;
818 
819 	/*
820 	 * Classify each write.
821 	 */
822 	bio_list_init(&sync);
823 	bio_list_init(&nosync);
824 	bio_list_init(&recover);
825 
826 	while ((bio = bio_list_pop(writes))) {
827 		state = rh_state(&ms->rh, bio_to_region(&ms->rh, bio), 1);
828 		switch (state) {
829 		case RH_CLEAN:
830 		case RH_DIRTY:
831 			this_list = &sync;
832 			break;
833 
834 		case RH_NOSYNC:
835 			this_list = &nosync;
836 			break;
837 
838 		case RH_RECOVERING:
839 			this_list = &recover;
840 			break;
841 		}
842 
843 		bio_list_add(this_list, bio);
844 	}
845 
846 	/*
847 	 * Increment the pending counts for any regions that will
848 	 * be written to (writes to recover regions are going to
849 	 * be delayed).
850 	 */
851 	rh_inc_pending(&ms->rh, &sync);
852 	rh_inc_pending(&ms->rh, &nosync);
853 	rh_flush(&ms->rh);
854 
855 	/*
856 	 * Dispatch io.
857 	 */
858 	while ((bio = bio_list_pop(&sync)))
859 		do_write(ms, bio);
860 
861 	while ((bio = bio_list_pop(&recover)))
862 		rh_delay(&ms->rh, bio);
863 
864 	while ((bio = bio_list_pop(&nosync))) {
865 		map_bio(ms, ms->default_mirror, bio);
866 		generic_make_request(bio);
867 	}
868 }
869 
870 /*-----------------------------------------------------------------
871  * kmirrord
872  *---------------------------------------------------------------*/
873 static LIST_HEAD(_mirror_sets);
874 static DECLARE_RWSEM(_mirror_sets_lock);
875 
876 static void do_mirror(struct mirror_set *ms)
877 {
878 	struct bio_list reads, writes;
879 
880 	spin_lock(&ms->lock);
881 	reads = ms->reads;
882 	writes = ms->writes;
883 	bio_list_init(&ms->reads);
884 	bio_list_init(&ms->writes);
885 	spin_unlock(&ms->lock);
886 
887 	rh_update_states(&ms->rh);
888 	do_recovery(ms);
889 	do_reads(ms, &reads);
890 	do_writes(ms, &writes);
891 }
892 
893 static void do_work(struct work_struct *ignored)
894 {
895 	struct mirror_set *ms;
896 
897 	down_read(&_mirror_sets_lock);
898 	list_for_each_entry (ms, &_mirror_sets, list)
899 		do_mirror(ms);
900 	up_read(&_mirror_sets_lock);
901 }
902 
903 /*-----------------------------------------------------------------
904  * Target functions
905  *---------------------------------------------------------------*/
906 static struct mirror_set *alloc_context(unsigned int nr_mirrors,
907 					uint32_t region_size,
908 					struct dm_target *ti,
909 					struct dirty_log *dl)
910 {
911 	size_t len;
912 	struct mirror_set *ms = NULL;
913 
914 	if (array_too_big(sizeof(*ms), sizeof(ms->mirror[0]), nr_mirrors))
915 		return NULL;
916 
917 	len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
918 
919 	ms = kmalloc(len, GFP_KERNEL);
920 	if (!ms) {
921 		ti->error = "Cannot allocate mirror context";
922 		return NULL;
923 	}
924 
925 	memset(ms, 0, len);
926 	spin_lock_init(&ms->lock);
927 
928 	ms->ti = ti;
929 	ms->nr_mirrors = nr_mirrors;
930 	ms->nr_regions = dm_sector_div_up(ti->len, region_size);
931 	ms->in_sync = 0;
932 	ms->default_mirror = &ms->mirror[DEFAULT_MIRROR];
933 
934 	if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
935 		ti->error = "Error creating dirty region hash";
936 		kfree(ms);
937 		return NULL;
938 	}
939 
940 	return ms;
941 }
942 
943 static void free_context(struct mirror_set *ms, struct dm_target *ti,
944 			 unsigned int m)
945 {
946 	while (m--)
947 		dm_put_device(ti, ms->mirror[m].dev);
948 
949 	rh_exit(&ms->rh);
950 	kfree(ms);
951 }
952 
953 static inline int _check_region_size(struct dm_target *ti, uint32_t size)
954 {
955 	return !(size % (PAGE_SIZE >> 9) || (size & (size - 1)) ||
956 		 size > ti->len);
957 }
958 
959 static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
960 		      unsigned int mirror, char **argv)
961 {
962 	unsigned long long offset;
963 
964 	if (sscanf(argv[1], "%llu", &offset) != 1) {
965 		ti->error = "Invalid offset";
966 		return -EINVAL;
967 	}
968 
969 	if (dm_get_device(ti, argv[0], offset, ti->len,
970 			  dm_table_get_mode(ti->table),
971 			  &ms->mirror[mirror].dev)) {
972 		ti->error = "Device lookup failure";
973 		return -ENXIO;
974 	}
975 
976 	ms->mirror[mirror].offset = offset;
977 
978 	return 0;
979 }
980 
981 static int add_mirror_set(struct mirror_set *ms)
982 {
983 	down_write(&_mirror_sets_lock);
984 	list_add_tail(&ms->list, &_mirror_sets);
985 	up_write(&_mirror_sets_lock);
986 	wake();
987 
988 	return 0;
989 }
990 
991 static void del_mirror_set(struct mirror_set *ms)
992 {
993 	down_write(&_mirror_sets_lock);
994 	list_del(&ms->list);
995 	up_write(&_mirror_sets_lock);
996 }
997 
998 /*
999  * Create dirty log: log_type #log_params <log_params>
1000  */
1001 static struct dirty_log *create_dirty_log(struct dm_target *ti,
1002 					  unsigned int argc, char **argv,
1003 					  unsigned int *args_used)
1004 {
1005 	unsigned int param_count;
1006 	struct dirty_log *dl;
1007 
1008 	if (argc < 2) {
1009 		ti->error = "Insufficient mirror log arguments";
1010 		return NULL;
1011 	}
1012 
1013 	if (sscanf(argv[1], "%u", &param_count) != 1) {
1014 		ti->error = "Invalid mirror log argument count";
1015 		return NULL;
1016 	}
1017 
1018 	*args_used = 2 + param_count;
1019 
1020 	if (argc < *args_used) {
1021 		ti->error = "Insufficient mirror log arguments";
1022 		return NULL;
1023 	}
1024 
1025 	dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2);
1026 	if (!dl) {
1027 		ti->error = "Error creating mirror dirty log";
1028 		return NULL;
1029 	}
1030 
1031 	if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
1032 		ti->error = "Invalid region size";
1033 		dm_destroy_dirty_log(dl);
1034 		return NULL;
1035 	}
1036 
1037 	return dl;
1038 }
1039 
1040 /*
1041  * Construct a mirror mapping:
1042  *
1043  * log_type #log_params <log_params>
1044  * #mirrors [mirror_path offset]{2,}
1045  *
1046  * log_type is "core" or "disk"
1047  * #log_params is between 1 and 3
1048  */
1049 #define DM_IO_PAGES 64
1050 static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1051 {
1052 	int r;
1053 	unsigned int nr_mirrors, m, args_used;
1054 	struct mirror_set *ms;
1055 	struct dirty_log *dl;
1056 
1057 	dl = create_dirty_log(ti, argc, argv, &args_used);
1058 	if (!dl)
1059 		return -EINVAL;
1060 
1061 	argv += args_used;
1062 	argc -= args_used;
1063 
1064 	if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
1065 	    nr_mirrors < 2 || nr_mirrors > KCOPYD_MAX_REGIONS + 1) {
1066 		ti->error = "Invalid number of mirrors";
1067 		dm_destroy_dirty_log(dl);
1068 		return -EINVAL;
1069 	}
1070 
1071 	argv++, argc--;
1072 
1073 	if (argc != nr_mirrors * 2) {
1074 		ti->error = "Wrong number of mirror arguments";
1075 		dm_destroy_dirty_log(dl);
1076 		return -EINVAL;
1077 	}
1078 
1079 	ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
1080 	if (!ms) {
1081 		dm_destroy_dirty_log(dl);
1082 		return -ENOMEM;
1083 	}
1084 
1085 	/* Get the mirror parameter sets */
1086 	for (m = 0; m < nr_mirrors; m++) {
1087 		r = get_mirror(ms, ti, m, argv);
1088 		if (r) {
1089 			free_context(ms, ti, m);
1090 			return r;
1091 		}
1092 		argv += 2;
1093 		argc -= 2;
1094 	}
1095 
1096 	ti->private = ms;
1097  	ti->split_io = ms->rh.region_size;
1098 
1099 	r = kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client);
1100 	if (r) {
1101 		free_context(ms, ti, ms->nr_mirrors);
1102 		return r;
1103 	}
1104 
1105 	add_mirror_set(ms);
1106 	return 0;
1107 }
1108 
1109 static void mirror_dtr(struct dm_target *ti)
1110 {
1111 	struct mirror_set *ms = (struct mirror_set *) ti->private;
1112 
1113 	del_mirror_set(ms);
1114 	kcopyd_client_destroy(ms->kcopyd_client);
1115 	free_context(ms, ti, ms->nr_mirrors);
1116 }
1117 
1118 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
1119 {
1120 	int should_wake = 0;
1121 	struct bio_list *bl;
1122 
1123 	bl = (rw == WRITE) ? &ms->writes : &ms->reads;
1124 	spin_lock(&ms->lock);
1125 	should_wake = !(bl->head);
1126 	bio_list_add(bl, bio);
1127 	spin_unlock(&ms->lock);
1128 
1129 	if (should_wake)
1130 		wake();
1131 }
1132 
1133 /*
1134  * Mirror mapping function
1135  */
1136 static int mirror_map(struct dm_target *ti, struct bio *bio,
1137 		      union map_info *map_context)
1138 {
1139 	int r, rw = bio_rw(bio);
1140 	struct mirror *m;
1141 	struct mirror_set *ms = ti->private;
1142 
1143 	map_context->ll = bio_to_region(&ms->rh, bio);
1144 
1145 	if (rw == WRITE) {
1146 		queue_bio(ms, bio, rw);
1147 		return DM_MAPIO_SUBMITTED;
1148 	}
1149 
1150 	r = ms->rh.log->type->in_sync(ms->rh.log,
1151 				      bio_to_region(&ms->rh, bio), 0);
1152 	if (r < 0 && r != -EWOULDBLOCK)
1153 		return r;
1154 
1155 	if (r == -EWOULDBLOCK)	/* FIXME: ugly */
1156 		r = DM_MAPIO_SUBMITTED;
1157 
1158 	/*
1159 	 * We don't want to fast track a recovery just for a read
1160 	 * ahead.  So we just let it silently fail.
1161 	 * FIXME: get rid of this.
1162 	 */
1163 	if (!r && rw == READA)
1164 		return -EIO;
1165 
1166 	if (!r) {
1167 		/* Pass this io over to the daemon */
1168 		queue_bio(ms, bio, rw);
1169 		return DM_MAPIO_SUBMITTED;
1170 	}
1171 
1172 	m = choose_mirror(ms, bio->bi_sector);
1173 	if (!m)
1174 		return -EIO;
1175 
1176 	map_bio(ms, m, bio);
1177 	return DM_MAPIO_REMAPPED;
1178 }
1179 
1180 static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1181 			 int error, union map_info *map_context)
1182 {
1183 	int rw = bio_rw(bio);
1184 	struct mirror_set *ms = (struct mirror_set *) ti->private;
1185 	region_t region = map_context->ll;
1186 
1187 	/*
1188 	 * We need to dec pending if this was a write.
1189 	 */
1190 	if (rw == WRITE)
1191 		rh_dec(&ms->rh, region);
1192 
1193 	return 0;
1194 }
1195 
1196 static void mirror_postsuspend(struct dm_target *ti)
1197 {
1198 	struct mirror_set *ms = (struct mirror_set *) ti->private;
1199 	struct dirty_log *log = ms->rh.log;
1200 
1201 	rh_stop_recovery(&ms->rh);
1202 
1203 	/* Wait for all I/O we generated to complete */
1204 	wait_event(_kmirrord_recovery_stopped,
1205 		   !atomic_read(&ms->rh.recovery_in_flight));
1206 
1207 	if (log->type->suspend && log->type->suspend(log))
1208 		/* FIXME: need better error handling */
1209 		DMWARN("log suspend failed");
1210 }
1211 
1212 static void mirror_resume(struct dm_target *ti)
1213 {
1214 	struct mirror_set *ms = (struct mirror_set *) ti->private;
1215 	struct dirty_log *log = ms->rh.log;
1216 	if (log->type->resume && log->type->resume(log))
1217 		/* FIXME: need better error handling */
1218 		DMWARN("log resume failed");
1219 	rh_start_recovery(&ms->rh);
1220 }
1221 
1222 static int mirror_status(struct dm_target *ti, status_type_t type,
1223 			 char *result, unsigned int maxlen)
1224 {
1225 	unsigned int m, sz;
1226 	struct mirror_set *ms = (struct mirror_set *) ti->private;
1227 
1228 	sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
1229 
1230 	switch (type) {
1231 	case STATUSTYPE_INFO:
1232 		DMEMIT("%d ", ms->nr_mirrors);
1233 		for (m = 0; m < ms->nr_mirrors; m++)
1234 			DMEMIT("%s ", ms->mirror[m].dev->name);
1235 
1236 		DMEMIT("%llu/%llu",
1237 			(unsigned long long)ms->rh.log->type->
1238 				get_sync_count(ms->rh.log),
1239 			(unsigned long long)ms->nr_regions);
1240 		break;
1241 
1242 	case STATUSTYPE_TABLE:
1243 		DMEMIT("%d", ms->nr_mirrors);
1244 		for (m = 0; m < ms->nr_mirrors; m++)
1245 			DMEMIT(" %s %llu", ms->mirror[m].dev->name,
1246 				(unsigned long long)ms->mirror[m].offset);
1247 	}
1248 
1249 	return 0;
1250 }
1251 
1252 static struct target_type mirror_target = {
1253 	.name	 = "mirror",
1254 	.version = {1, 0, 2},
1255 	.module	 = THIS_MODULE,
1256 	.ctr	 = mirror_ctr,
1257 	.dtr	 = mirror_dtr,
1258 	.map	 = mirror_map,
1259 	.end_io	 = mirror_end_io,
1260 	.postsuspend = mirror_postsuspend,
1261 	.resume	 = mirror_resume,
1262 	.status	 = mirror_status,
1263 };
1264 
1265 static int __init dm_mirror_init(void)
1266 {
1267 	int r;
1268 
1269 	r = dm_dirty_log_init();
1270 	if (r)
1271 		return r;
1272 
1273 	_kmirrord_wq = create_singlethread_workqueue("kmirrord");
1274 	if (!_kmirrord_wq) {
1275 		DMERR("couldn't start kmirrord");
1276 		dm_dirty_log_exit();
1277 		return r;
1278 	}
1279 	INIT_WORK(&_kmirrord_work, do_work);
1280 
1281 	r = dm_register_target(&mirror_target);
1282 	if (r < 0) {
1283 		DMERR("%s: Failed to register mirror target",
1284 		      mirror_target.name);
1285 		dm_dirty_log_exit();
1286 		destroy_workqueue(_kmirrord_wq);
1287 	}
1288 
1289 	return r;
1290 }
1291 
1292 static void __exit dm_mirror_exit(void)
1293 {
1294 	int r;
1295 
1296 	r = dm_unregister_target(&mirror_target);
1297 	if (r < 0)
1298 		DMERR("%s: unregister failed %d", mirror_target.name, r);
1299 
1300 	destroy_workqueue(_kmirrord_wq);
1301 	dm_dirty_log_exit();
1302 }
1303 
1304 /* Module hooks */
1305 module_init(dm_mirror_init);
1306 module_exit(dm_mirror_exit);
1307 
1308 MODULE_DESCRIPTION(DM_NAME " mirror target");
1309 MODULE_AUTHOR("Joe Thornber");
1310 MODULE_LICENSE("GPL");
1311