1 /*
2  * Copyright (C) 2006-2009 Red Hat, Inc.
3  *
4  * This file is released under the LGPL.
5  */
6 
7 #include <linux/bio.h>
8 #include <linux/slab.h>
9 #include <linux/jiffies.h>
10 #include <linux/dm-dirty-log.h>
11 #include <linux/device-mapper.h>
12 #include <linux/dm-log-userspace.h>
13 #include <linux/module.h>
14 #include <linux/workqueue.h>
15 
16 #include "dm-log-userspace-transfer.h"
17 
18 #define DM_LOG_USERSPACE_VSN "1.3.0"
19 
20 struct flush_entry {
21 	int type;
22 	region_t region;
23 	struct list_head list;
24 };
25 
26 /*
27  * This limit on the number of mark and clear request is, to a degree,
28  * arbitrary.  However, there is some basis for the choice in the limits
29  * imposed on the size of data payload by dm-log-userspace-transfer.c:
30  * dm_consult_userspace().
31  */
32 #define MAX_FLUSH_GROUP_COUNT 32
33 
34 struct log_c {
35 	struct dm_target *ti;
36 	struct dm_dev *log_dev;
37 	uint32_t region_size;
38 	region_t region_count;
39 	uint64_t luid;
40 	char uuid[DM_UUID_LEN];
41 
42 	char *usr_argv_str;
43 	uint32_t usr_argc;
44 
45 	/*
46 	 * in_sync_hint gets set when doing is_remote_recovering.  It
47 	 * represents the first region that needs recovery.  IOW, the
48 	 * first zero bit of sync_bits.  This can be useful for to limit
49 	 * traffic for calls like is_remote_recovering and get_resync_work,
50 	 * but be take care in its use for anything else.
51 	 */
52 	uint64_t in_sync_hint;
53 
54 	/*
55 	 * Mark and clear requests are held until a flush is issued
56 	 * so that we can group, and thereby limit, the amount of
57 	 * network traffic between kernel and userspace.  The 'flush_lock'
58 	 * is used to protect these lists.
59 	 */
60 	spinlock_t flush_lock;
61 	struct list_head mark_list;
62 	struct list_head clear_list;
63 
64 	/*
65 	 * Workqueue for flush of clear region requests.
66 	 */
67 	struct workqueue_struct *dmlog_wq;
68 	struct delayed_work flush_log_work;
69 	atomic_t sched_flush;
70 
71 	/*
72 	 * Combine userspace flush and mark requests for efficiency.
73 	 */
74 	uint32_t integrated_flush;
75 };
76 
77 static mempool_t *flush_entry_pool;
78 
79 static void *flush_entry_alloc(gfp_t gfp_mask, void *pool_data)
80 {
81 	return kmalloc(sizeof(struct flush_entry), gfp_mask);
82 }
83 
84 static void flush_entry_free(void *element, void *pool_data)
85 {
86 	kfree(element);
87 }
88 
89 static int userspace_do_request(struct log_c *lc, const char *uuid,
90 				int request_type, char *data, size_t data_size,
91 				char *rdata, size_t *rdata_size)
92 {
93 	int r;
94 
95 	/*
96 	 * If the server isn't there, -ESRCH is returned,
97 	 * and we must keep trying until the server is
98 	 * restored.
99 	 */
100 retry:
101 	r = dm_consult_userspace(uuid, lc->luid, request_type, data,
102 				 data_size, rdata, rdata_size);
103 
104 	if (r != -ESRCH)
105 		return r;
106 
107 	DMERR(" Userspace log server not found.");
108 	while (1) {
109 		set_current_state(TASK_INTERRUPTIBLE);
110 		schedule_timeout(2*HZ);
111 		DMWARN("Attempting to contact userspace log server...");
112 		r = dm_consult_userspace(uuid, lc->luid, DM_ULOG_CTR,
113 					 lc->usr_argv_str,
114 					 strlen(lc->usr_argv_str) + 1,
115 					 NULL, NULL);
116 		if (!r)
117 			break;
118 	}
119 	DMINFO("Reconnected to userspace log server... DM_ULOG_CTR complete");
120 	r = dm_consult_userspace(uuid, lc->luid, DM_ULOG_RESUME, NULL,
121 				 0, NULL, NULL);
122 	if (!r)
123 		goto retry;
124 
125 	DMERR("Error trying to resume userspace log: %d", r);
126 
127 	return -ESRCH;
128 }
129 
130 static int build_constructor_string(struct dm_target *ti,
131 				    unsigned argc, char **argv,
132 				    char **ctr_str)
133 {
134 	int i, str_size;
135 	char *str = NULL;
136 
137 	*ctr_str = NULL;
138 
139 	/*
140 	 * Determine overall size of the string.
141 	 */
142 	for (i = 0, str_size = 0; i < argc; i++)
143 		str_size += strlen(argv[i]) + 1; /* +1 for space between args */
144 
145 	str_size += 20; /* Max number of chars in a printed u64 number */
146 
147 	str = kzalloc(str_size, GFP_KERNEL);
148 	if (!str) {
149 		DMWARN("Unable to allocate memory for constructor string");
150 		return -ENOMEM;
151 	}
152 
153 	str_size = sprintf(str, "%llu", (unsigned long long)ti->len);
154 	for (i = 0; i < argc; i++)
155 		str_size += sprintf(str + str_size, " %s", argv[i]);
156 
157 	*ctr_str = str;
158 	return str_size;
159 }
160 
161 static void do_flush(struct work_struct *work)
162 {
163 	int r;
164 	struct log_c *lc = container_of(work, struct log_c, flush_log_work.work);
165 
166 	atomic_set(&lc->sched_flush, 0);
167 
168 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH, NULL, 0, NULL, NULL);
169 
170 	if (r)
171 		dm_table_event(lc->ti->table);
172 }
173 
174 /*
175  * userspace_ctr
176  *
177  * argv contains:
178  *	<UUID> [integrated_flush] <other args>
179  * Where 'other args' are the userspace implementation-specific log
180  * arguments.
181  *
182  * Example:
183  *	<UUID> [integrated_flush] clustered-disk <arg count> <log dev>
184  *	<region_size> [[no]sync]
185  *
186  * This module strips off the <UUID> and uses it for identification
187  * purposes when communicating with userspace about a log.
188  *
189  * If integrated_flush is defined, the kernel combines flush
190  * and mark requests.
191  *
192  * The rest of the line, beginning with 'clustered-disk', is passed
193  * to the userspace ctr function.
194  */
195 static int userspace_ctr(struct dm_dirty_log *log, struct dm_target *ti,
196 			 unsigned argc, char **argv)
197 {
198 	int r = 0;
199 	int str_size;
200 	char *ctr_str = NULL;
201 	struct log_c *lc = NULL;
202 	uint64_t rdata;
203 	size_t rdata_size = sizeof(rdata);
204 	char *devices_rdata = NULL;
205 	size_t devices_rdata_size = DM_NAME_LEN;
206 
207 	if (argc < 3) {
208 		DMWARN("Too few arguments to userspace dirty log");
209 		return -EINVAL;
210 	}
211 
212 	lc = kzalloc(sizeof(*lc), GFP_KERNEL);
213 	if (!lc) {
214 		DMWARN("Unable to allocate userspace log context.");
215 		return -ENOMEM;
216 	}
217 
218 	/* The ptr value is sufficient for local unique id */
219 	lc->luid = (unsigned long)lc;
220 
221 	lc->ti = ti;
222 
223 	if (strlen(argv[0]) > (DM_UUID_LEN - 1)) {
224 		DMWARN("UUID argument too long.");
225 		kfree(lc);
226 		return -EINVAL;
227 	}
228 
229 	lc->usr_argc = argc;
230 
231 	strncpy(lc->uuid, argv[0], DM_UUID_LEN);
232 	argc--;
233 	argv++;
234 	spin_lock_init(&lc->flush_lock);
235 	INIT_LIST_HEAD(&lc->mark_list);
236 	INIT_LIST_HEAD(&lc->clear_list);
237 
238 	if (!strcasecmp(argv[0], "integrated_flush")) {
239 		lc->integrated_flush = 1;
240 		argc--;
241 		argv++;
242 	}
243 
244 	str_size = build_constructor_string(ti, argc, argv, &ctr_str);
245 	if (str_size < 0) {
246 		kfree(lc);
247 		return str_size;
248 	}
249 
250 	devices_rdata = kzalloc(devices_rdata_size, GFP_KERNEL);
251 	if (!devices_rdata) {
252 		DMERR("Failed to allocate memory for device information");
253 		r = -ENOMEM;
254 		goto out;
255 	}
256 
257 	/*
258 	 * Send table string and get back any opened device.
259 	 */
260 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_CTR,
261 				 ctr_str, str_size,
262 				 devices_rdata, &devices_rdata_size);
263 
264 	if (r < 0) {
265 		if (r == -ESRCH)
266 			DMERR("Userspace log server not found");
267 		else
268 			DMERR("Userspace log server failed to create log");
269 		goto out;
270 	}
271 
272 	/* Since the region size does not change, get it now */
273 	rdata_size = sizeof(rdata);
274 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_GET_REGION_SIZE,
275 				 NULL, 0, (char *)&rdata, &rdata_size);
276 
277 	if (r) {
278 		DMERR("Failed to get region size of dirty log");
279 		goto out;
280 	}
281 
282 	lc->region_size = (uint32_t)rdata;
283 	lc->region_count = dm_sector_div_up(ti->len, lc->region_size);
284 
285 	if (devices_rdata_size) {
286 		if (devices_rdata[devices_rdata_size - 1] != '\0') {
287 			DMERR("DM_ULOG_CTR device return string not properly terminated");
288 			r = -EINVAL;
289 			goto out;
290 		}
291 		r = dm_get_device(ti, devices_rdata,
292 				  dm_table_get_mode(ti->table), &lc->log_dev);
293 		if (r)
294 			DMERR("Failed to register %s with device-mapper",
295 			      devices_rdata);
296 	}
297 
298 	if (lc->integrated_flush) {
299 		lc->dmlog_wq = alloc_workqueue("dmlogd", WQ_MEM_RECLAIM, 0);
300 		if (!lc->dmlog_wq) {
301 			DMERR("couldn't start dmlogd");
302 			r = -ENOMEM;
303 			goto out;
304 		}
305 
306 		INIT_DELAYED_WORK(&lc->flush_log_work, do_flush);
307 		atomic_set(&lc->sched_flush, 0);
308 	}
309 
310 out:
311 	kfree(devices_rdata);
312 	if (r) {
313 		kfree(lc);
314 		kfree(ctr_str);
315 	} else {
316 		lc->usr_argv_str = ctr_str;
317 		log->context = lc;
318 	}
319 
320 	return r;
321 }
322 
323 static void userspace_dtr(struct dm_dirty_log *log)
324 {
325 	struct log_c *lc = log->context;
326 
327 	if (lc->integrated_flush) {
328 		/* flush workqueue */
329 		if (atomic_read(&lc->sched_flush))
330 			flush_delayed_work(&lc->flush_log_work);
331 
332 		destroy_workqueue(lc->dmlog_wq);
333 	}
334 
335 	(void) dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_DTR,
336 				    NULL, 0, NULL, NULL);
337 
338 	if (lc->log_dev)
339 		dm_put_device(lc->ti, lc->log_dev);
340 
341 	kfree(lc->usr_argv_str);
342 	kfree(lc);
343 
344 	return;
345 }
346 
347 static int userspace_presuspend(struct dm_dirty_log *log)
348 {
349 	int r;
350 	struct log_c *lc = log->context;
351 
352 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_PRESUSPEND,
353 				 NULL, 0, NULL, NULL);
354 
355 	return r;
356 }
357 
358 static int userspace_postsuspend(struct dm_dirty_log *log)
359 {
360 	int r;
361 	struct log_c *lc = log->context;
362 
363 	/*
364 	 * Run planned flush earlier.
365 	 */
366 	if (lc->integrated_flush && atomic_read(&lc->sched_flush))
367 		flush_delayed_work(&lc->flush_log_work);
368 
369 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_POSTSUSPEND,
370 				 NULL, 0, NULL, NULL);
371 
372 	return r;
373 }
374 
375 static int userspace_resume(struct dm_dirty_log *log)
376 {
377 	int r;
378 	struct log_c *lc = log->context;
379 
380 	lc->in_sync_hint = 0;
381 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_RESUME,
382 				 NULL, 0, NULL, NULL);
383 
384 	return r;
385 }
386 
387 static uint32_t userspace_get_region_size(struct dm_dirty_log *log)
388 {
389 	struct log_c *lc = log->context;
390 
391 	return lc->region_size;
392 }
393 
394 /*
395  * userspace_is_clean
396  *
397  * Check whether a region is clean.  If there is any sort of
398  * failure when consulting the server, we return not clean.
399  *
400  * Returns: 1 if clean, 0 otherwise
401  */
402 static int userspace_is_clean(struct dm_dirty_log *log, region_t region)
403 {
404 	int r;
405 	uint64_t region64 = (uint64_t)region;
406 	int64_t is_clean;
407 	size_t rdata_size;
408 	struct log_c *lc = log->context;
409 
410 	rdata_size = sizeof(is_clean);
411 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_IS_CLEAN,
412 				 (char *)&region64, sizeof(region64),
413 				 (char *)&is_clean, &rdata_size);
414 
415 	return (r) ? 0 : (int)is_clean;
416 }
417 
418 /*
419  * userspace_in_sync
420  *
421  * Check if the region is in-sync.  If there is any sort
422  * of failure when consulting the server, we assume that
423  * the region is not in sync.
424  *
425  * If 'can_block' is set, return immediately
426  *
427  * Returns: 1 if in-sync, 0 if not-in-sync, -EWOULDBLOCK
428  */
429 static int userspace_in_sync(struct dm_dirty_log *log, region_t region,
430 			     int can_block)
431 {
432 	int r;
433 	uint64_t region64 = region;
434 	int64_t in_sync;
435 	size_t rdata_size;
436 	struct log_c *lc = log->context;
437 
438 	/*
439 	 * We can never respond directly - even if in_sync_hint is
440 	 * set.  This is because another machine could see a device
441 	 * failure and mark the region out-of-sync.  If we don't go
442 	 * to userspace to ask, we might think the region is in-sync
443 	 * and allow a read to pick up data that is stale.  (This is
444 	 * very unlikely if a device actually fails; but it is very
445 	 * likely if a connection to one device from one machine fails.)
446 	 *
447 	 * There still might be a problem if the mirror caches the region
448 	 * state as in-sync... but then this call would not be made.  So,
449 	 * that is a mirror problem.
450 	 */
451 	if (!can_block)
452 		return -EWOULDBLOCK;
453 
454 	rdata_size = sizeof(in_sync);
455 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_IN_SYNC,
456 				 (char *)&region64, sizeof(region64),
457 				 (char *)&in_sync, &rdata_size);
458 	return (r) ? 0 : (int)in_sync;
459 }
460 
461 static int flush_one_by_one(struct log_c *lc, struct list_head *flush_list)
462 {
463 	int r = 0;
464 	struct flush_entry *fe;
465 
466 	list_for_each_entry(fe, flush_list, list) {
467 		r = userspace_do_request(lc, lc->uuid, fe->type,
468 					 (char *)&fe->region,
469 					 sizeof(fe->region),
470 					 NULL, NULL);
471 		if (r)
472 			break;
473 	}
474 
475 	return r;
476 }
477 
478 static int flush_by_group(struct log_c *lc, struct list_head *flush_list,
479 			  int flush_with_payload)
480 {
481 	int r = 0;
482 	int count;
483 	uint32_t type = 0;
484 	struct flush_entry *fe, *tmp_fe;
485 	LIST_HEAD(tmp_list);
486 	uint64_t group[MAX_FLUSH_GROUP_COUNT];
487 
488 	/*
489 	 * Group process the requests
490 	 */
491 	while (!list_empty(flush_list)) {
492 		count = 0;
493 
494 		list_for_each_entry_safe(fe, tmp_fe, flush_list, list) {
495 			group[count] = fe->region;
496 			count++;
497 
498 			list_move(&fe->list, &tmp_list);
499 
500 			type = fe->type;
501 			if (count >= MAX_FLUSH_GROUP_COUNT)
502 				break;
503 		}
504 
505 		if (flush_with_payload) {
506 			r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH,
507 						 (char *)(group),
508 						 count * sizeof(uint64_t),
509 						 NULL, NULL);
510 			/*
511 			 * Integrated flush failed.
512 			 */
513 			if (r)
514 				break;
515 		} else {
516 			r = userspace_do_request(lc, lc->uuid, type,
517 						 (char *)(group),
518 						 count * sizeof(uint64_t),
519 						 NULL, NULL);
520 			if (r) {
521 				/*
522 				 * Group send failed.  Attempt one-by-one.
523 				 */
524 				list_splice_init(&tmp_list, flush_list);
525 				r = flush_one_by_one(lc, flush_list);
526 				break;
527 			}
528 		}
529 	}
530 
531 	/*
532 	 * Must collect flush_entrys that were successfully processed
533 	 * as a group so that they will be free'd by the caller.
534 	 */
535 	list_splice_init(&tmp_list, flush_list);
536 
537 	return r;
538 }
539 
540 /*
541  * userspace_flush
542  *
543  * This function is ok to block.
544  * The flush happens in two stages.  First, it sends all
545  * clear/mark requests that are on the list.  Then it
546  * tells the server to commit them.  This gives the
547  * server a chance to optimise the commit, instead of
548  * doing it for every request.
549  *
550  * Additionally, we could implement another thread that
551  * sends the requests up to the server - reducing the
552  * load on flush.  Then the flush would have less in
553  * the list and be responsible for the finishing commit.
554  *
555  * Returns: 0 on success, < 0 on failure
556  */
557 static int userspace_flush(struct dm_dirty_log *log)
558 {
559 	int r = 0;
560 	unsigned long flags;
561 	struct log_c *lc = log->context;
562 	LIST_HEAD(mark_list);
563 	LIST_HEAD(clear_list);
564 	int mark_list_is_empty;
565 	int clear_list_is_empty;
566 	struct flush_entry *fe, *tmp_fe;
567 
568 	spin_lock_irqsave(&lc->flush_lock, flags);
569 	list_splice_init(&lc->mark_list, &mark_list);
570 	list_splice_init(&lc->clear_list, &clear_list);
571 	spin_unlock_irqrestore(&lc->flush_lock, flags);
572 
573 	mark_list_is_empty = list_empty(&mark_list);
574 	clear_list_is_empty = list_empty(&clear_list);
575 
576 	if (mark_list_is_empty && clear_list_is_empty)
577 		return 0;
578 
579 	r = flush_by_group(lc, &clear_list, 0);
580 	if (r)
581 		goto out;
582 
583 	if (!lc->integrated_flush) {
584 		r = flush_by_group(lc, &mark_list, 0);
585 		if (r)
586 			goto out;
587 		r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH,
588 					 NULL, 0, NULL, NULL);
589 		goto out;
590 	}
591 
592 	/*
593 	 * Send integrated flush request with mark_list as payload.
594 	 */
595 	r = flush_by_group(lc, &mark_list, 1);
596 	if (r)
597 		goto out;
598 
599 	if (mark_list_is_empty && !atomic_read(&lc->sched_flush)) {
600 		/*
601 		 * When there are only clear region requests,
602 		 * we schedule a flush in the future.
603 		 */
604 		queue_delayed_work(lc->dmlog_wq, &lc->flush_log_work, 3 * HZ);
605 		atomic_set(&lc->sched_flush, 1);
606 	} else {
607 		/*
608 		 * Cancel pending flush because we
609 		 * have already flushed in mark_region.
610 		 */
611 		cancel_delayed_work(&lc->flush_log_work);
612 		atomic_set(&lc->sched_flush, 0);
613 	}
614 
615 out:
616 	/*
617 	 * We can safely remove these entries, even after failure.
618 	 * Calling code will receive an error and will know that
619 	 * the log facility has failed.
620 	 */
621 	list_for_each_entry_safe(fe, tmp_fe, &mark_list, list) {
622 		list_del(&fe->list);
623 		mempool_free(fe, flush_entry_pool);
624 	}
625 	list_for_each_entry_safe(fe, tmp_fe, &clear_list, list) {
626 		list_del(&fe->list);
627 		mempool_free(fe, flush_entry_pool);
628 	}
629 
630 	if (r)
631 		dm_table_event(lc->ti->table);
632 
633 	return r;
634 }
635 
636 /*
637  * userspace_mark_region
638  *
639  * This function should avoid blocking unless absolutely required.
640  * (Memory allocation is valid for blocking.)
641  */
642 static void userspace_mark_region(struct dm_dirty_log *log, region_t region)
643 {
644 	unsigned long flags;
645 	struct log_c *lc = log->context;
646 	struct flush_entry *fe;
647 
648 	/* Wait for an allocation, but _never_ fail */
649 	fe = mempool_alloc(flush_entry_pool, GFP_NOIO);
650 	BUG_ON(!fe);
651 
652 	spin_lock_irqsave(&lc->flush_lock, flags);
653 	fe->type = DM_ULOG_MARK_REGION;
654 	fe->region = region;
655 	list_add(&fe->list, &lc->mark_list);
656 	spin_unlock_irqrestore(&lc->flush_lock, flags);
657 
658 	return;
659 }
660 
661 /*
662  * userspace_clear_region
663  *
664  * This function must not block.
665  * So, the alloc can't block.  In the worst case, it is ok to
666  * fail.  It would simply mean we can't clear the region.
667  * Does nothing to current sync context, but does mean
668  * the region will be re-sync'ed on a reload of the mirror
669  * even though it is in-sync.
670  */
671 static void userspace_clear_region(struct dm_dirty_log *log, region_t region)
672 {
673 	unsigned long flags;
674 	struct log_c *lc = log->context;
675 	struct flush_entry *fe;
676 
677 	/*
678 	 * If we fail to allocate, we skip the clearing of
679 	 * the region.  This doesn't hurt us in any way, except
680 	 * to cause the region to be resync'ed when the
681 	 * device is activated next time.
682 	 */
683 	fe = mempool_alloc(flush_entry_pool, GFP_ATOMIC);
684 	if (!fe) {
685 		DMERR("Failed to allocate memory to clear region.");
686 		return;
687 	}
688 
689 	spin_lock_irqsave(&lc->flush_lock, flags);
690 	fe->type = DM_ULOG_CLEAR_REGION;
691 	fe->region = region;
692 	list_add(&fe->list, &lc->clear_list);
693 	spin_unlock_irqrestore(&lc->flush_lock, flags);
694 
695 	return;
696 }
697 
698 /*
699  * userspace_get_resync_work
700  *
701  * Get a region that needs recovery.  It is valid to return
702  * an error for this function.
703  *
704  * Returns: 1 if region filled, 0 if no work, <0 on error
705  */
706 static int userspace_get_resync_work(struct dm_dirty_log *log, region_t *region)
707 {
708 	int r;
709 	size_t rdata_size;
710 	struct log_c *lc = log->context;
711 	struct {
712 		int64_t i; /* 64-bit for mix arch compatibility */
713 		region_t r;
714 	} pkg;
715 
716 	if (lc->in_sync_hint >= lc->region_count)
717 		return 0;
718 
719 	rdata_size = sizeof(pkg);
720 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_GET_RESYNC_WORK,
721 				 NULL, 0, (char *)&pkg, &rdata_size);
722 
723 	*region = pkg.r;
724 	return (r) ? r : (int)pkg.i;
725 }
726 
727 /*
728  * userspace_set_region_sync
729  *
730  * Set the sync status of a given region.  This function
731  * must not fail.
732  */
733 static void userspace_set_region_sync(struct dm_dirty_log *log,
734 				      region_t region, int in_sync)
735 {
736 	int r;
737 	struct log_c *lc = log->context;
738 	struct {
739 		region_t r;
740 		int64_t i;
741 	} pkg;
742 
743 	pkg.r = region;
744 	pkg.i = (int64_t)in_sync;
745 
746 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_SET_REGION_SYNC,
747 				 (char *)&pkg, sizeof(pkg), NULL, NULL);
748 
749 	/*
750 	 * It would be nice to be able to report failures.
751 	 * However, it is easy emough to detect and resolve.
752 	 */
753 	return;
754 }
755 
756 /*
757  * userspace_get_sync_count
758  *
759  * If there is any sort of failure when consulting the server,
760  * we assume that the sync count is zero.
761  *
762  * Returns: sync count on success, 0 on failure
763  */
764 static region_t userspace_get_sync_count(struct dm_dirty_log *log)
765 {
766 	int r;
767 	size_t rdata_size;
768 	uint64_t sync_count;
769 	struct log_c *lc = log->context;
770 
771 	rdata_size = sizeof(sync_count);
772 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_GET_SYNC_COUNT,
773 				 NULL, 0, (char *)&sync_count, &rdata_size);
774 
775 	if (r)
776 		return 0;
777 
778 	if (sync_count >= lc->region_count)
779 		lc->in_sync_hint = lc->region_count;
780 
781 	return (region_t)sync_count;
782 }
783 
784 /*
785  * userspace_status
786  *
787  * Returns: amount of space consumed
788  */
789 static int userspace_status(struct dm_dirty_log *log, status_type_t status_type,
790 			    char *result, unsigned maxlen)
791 {
792 	int r = 0;
793 	char *table_args;
794 	size_t sz = (size_t)maxlen;
795 	struct log_c *lc = log->context;
796 
797 	switch (status_type) {
798 	case STATUSTYPE_INFO:
799 		r = userspace_do_request(lc, lc->uuid, DM_ULOG_STATUS_INFO,
800 					 NULL, 0, result, &sz);
801 
802 		if (r) {
803 			sz = 0;
804 			DMEMIT("%s 1 COM_FAILURE", log->type->name);
805 		}
806 		break;
807 	case STATUSTYPE_TABLE:
808 		sz = 0;
809 		table_args = strchr(lc->usr_argv_str, ' ');
810 		BUG_ON(!table_args); /* There will always be a ' ' */
811 		table_args++;
812 
813 		DMEMIT("%s %u %s ", log->type->name, lc->usr_argc, lc->uuid);
814 		if (lc->integrated_flush)
815 			DMEMIT("integrated_flush ");
816 		DMEMIT("%s ", table_args);
817 		break;
818 	}
819 	return (r) ? 0 : (int)sz;
820 }
821 
822 /*
823  * userspace_is_remote_recovering
824  *
825  * Returns: 1 if region recovering, 0 otherwise
826  */
827 static int userspace_is_remote_recovering(struct dm_dirty_log *log,
828 					  region_t region)
829 {
830 	int r;
831 	uint64_t region64 = region;
832 	struct log_c *lc = log->context;
833 	static unsigned long limit;
834 	struct {
835 		int64_t is_recovering;
836 		uint64_t in_sync_hint;
837 	} pkg;
838 	size_t rdata_size = sizeof(pkg);
839 
840 	/*
841 	 * Once the mirror has been reported to be in-sync,
842 	 * it will never again ask for recovery work.  So,
843 	 * we can safely say there is not a remote machine
844 	 * recovering if the device is in-sync.  (in_sync_hint
845 	 * must be reset at resume time.)
846 	 */
847 	if (region < lc->in_sync_hint)
848 		return 0;
849 	else if (time_after(limit, jiffies))
850 		return 1;
851 
852 	limit = jiffies + (HZ / 4);
853 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_IS_REMOTE_RECOVERING,
854 				 (char *)&region64, sizeof(region64),
855 				 (char *)&pkg, &rdata_size);
856 	if (r)
857 		return 1;
858 
859 	lc->in_sync_hint = pkg.in_sync_hint;
860 
861 	return (int)pkg.is_recovering;
862 }
863 
864 static struct dm_dirty_log_type _userspace_type = {
865 	.name = "userspace",
866 	.module = THIS_MODULE,
867 	.ctr = userspace_ctr,
868 	.dtr = userspace_dtr,
869 	.presuspend = userspace_presuspend,
870 	.postsuspend = userspace_postsuspend,
871 	.resume = userspace_resume,
872 	.get_region_size = userspace_get_region_size,
873 	.is_clean = userspace_is_clean,
874 	.in_sync = userspace_in_sync,
875 	.flush = userspace_flush,
876 	.mark_region = userspace_mark_region,
877 	.clear_region = userspace_clear_region,
878 	.get_resync_work = userspace_get_resync_work,
879 	.set_region_sync = userspace_set_region_sync,
880 	.get_sync_count = userspace_get_sync_count,
881 	.status = userspace_status,
882 	.is_remote_recovering = userspace_is_remote_recovering,
883 };
884 
885 static int __init userspace_dirty_log_init(void)
886 {
887 	int r = 0;
888 
889 	flush_entry_pool = mempool_create(100, flush_entry_alloc,
890 					  flush_entry_free, NULL);
891 
892 	if (!flush_entry_pool) {
893 		DMWARN("Unable to create flush_entry_pool:  No memory.");
894 		return -ENOMEM;
895 	}
896 
897 	r = dm_ulog_tfr_init();
898 	if (r) {
899 		DMWARN("Unable to initialize userspace log communications");
900 		mempool_destroy(flush_entry_pool);
901 		return r;
902 	}
903 
904 	r = dm_dirty_log_type_register(&_userspace_type);
905 	if (r) {
906 		DMWARN("Couldn't register userspace dirty log type");
907 		dm_ulog_tfr_exit();
908 		mempool_destroy(flush_entry_pool);
909 		return r;
910 	}
911 
912 	DMINFO("version " DM_LOG_USERSPACE_VSN " loaded");
913 	return 0;
914 }
915 
916 static void __exit userspace_dirty_log_exit(void)
917 {
918 	dm_dirty_log_type_unregister(&_userspace_type);
919 	dm_ulog_tfr_exit();
920 	mempool_destroy(flush_entry_pool);
921 
922 	DMINFO("version " DM_LOG_USERSPACE_VSN " unloaded");
923 	return;
924 }
925 
926 module_init(userspace_dirty_log_init);
927 module_exit(userspace_dirty_log_exit);
928 
929 MODULE_DESCRIPTION(DM_NAME " userspace dirty log link");
930 MODULE_AUTHOR("Jonathan Brassow <dm-devel@redhat.com>");
931 MODULE_LICENSE("GPL");
932