1 /*
2  * Copyright (C) 2006-2009 Red Hat, Inc.
3  *
4  * This file is released under the LGPL.
5  */
6 
7 #include <linux/bio.h>
8 #include <linux/slab.h>
9 #include <linux/dm-dirty-log.h>
10 #include <linux/device-mapper.h>
11 #include <linux/dm-log-userspace.h>
12 
13 #include "dm-log-userspace-transfer.h"
14 
15 #define DM_LOG_USERSPACE_VSN "1.1.0"
16 
17 struct flush_entry {
18 	int type;
19 	region_t region;
20 	struct list_head list;
21 };
22 
23 /*
24  * This limit on the number of mark and clear request is, to a degree,
25  * arbitrary.  However, there is some basis for the choice in the limits
26  * imposed on the size of data payload by dm-log-userspace-transfer.c:
27  * dm_consult_userspace().
28  */
29 #define MAX_FLUSH_GROUP_COUNT 32
30 
31 struct log_c {
32 	struct dm_target *ti;
33 	struct dm_dev *log_dev;
34 	uint32_t region_size;
35 	region_t region_count;
36 	uint64_t luid;
37 	char uuid[DM_UUID_LEN];
38 
39 	char *usr_argv_str;
40 	uint32_t usr_argc;
41 
42 	/*
43 	 * in_sync_hint gets set when doing is_remote_recovering.  It
44 	 * represents the first region that needs recovery.  IOW, the
45 	 * first zero bit of sync_bits.  This can be useful for to limit
46 	 * traffic for calls like is_remote_recovering and get_resync_work,
47 	 * but be take care in its use for anything else.
48 	 */
49 	uint64_t in_sync_hint;
50 
51 	/*
52 	 * Mark and clear requests are held until a flush is issued
53 	 * so that we can group, and thereby limit, the amount of
54 	 * network traffic between kernel and userspace.  The 'flush_lock'
55 	 * is used to protect these lists.
56 	 */
57 	spinlock_t flush_lock;
58 	struct list_head mark_list;
59 	struct list_head clear_list;
60 };
61 
62 static mempool_t *flush_entry_pool;
63 
64 static void *flush_entry_alloc(gfp_t gfp_mask, void *pool_data)
65 {
66 	return kmalloc(sizeof(struct flush_entry), gfp_mask);
67 }
68 
69 static void flush_entry_free(void *element, void *pool_data)
70 {
71 	kfree(element);
72 }
73 
74 static int userspace_do_request(struct log_c *lc, const char *uuid,
75 				int request_type, char *data, size_t data_size,
76 				char *rdata, size_t *rdata_size)
77 {
78 	int r;
79 
80 	/*
81 	 * If the server isn't there, -ESRCH is returned,
82 	 * and we must keep trying until the server is
83 	 * restored.
84 	 */
85 retry:
86 	r = dm_consult_userspace(uuid, lc->luid, request_type, data,
87 				 data_size, rdata, rdata_size);
88 
89 	if (r != -ESRCH)
90 		return r;
91 
92 	DMERR(" Userspace log server not found.");
93 	while (1) {
94 		set_current_state(TASK_INTERRUPTIBLE);
95 		schedule_timeout(2*HZ);
96 		DMWARN("Attempting to contact userspace log server...");
97 		r = dm_consult_userspace(uuid, lc->luid, DM_ULOG_CTR,
98 					 lc->usr_argv_str,
99 					 strlen(lc->usr_argv_str) + 1,
100 					 NULL, NULL);
101 		if (!r)
102 			break;
103 	}
104 	DMINFO("Reconnected to userspace log server... DM_ULOG_CTR complete");
105 	r = dm_consult_userspace(uuid, lc->luid, DM_ULOG_RESUME, NULL,
106 				 0, NULL, NULL);
107 	if (!r)
108 		goto retry;
109 
110 	DMERR("Error trying to resume userspace log: %d", r);
111 
112 	return -ESRCH;
113 }
114 
115 static int build_constructor_string(struct dm_target *ti,
116 				    unsigned argc, char **argv,
117 				    char **ctr_str)
118 {
119 	int i, str_size;
120 	char *str = NULL;
121 
122 	*ctr_str = NULL;
123 
124 	for (i = 0, str_size = 0; i < argc; i++)
125 		str_size += strlen(argv[i]) + 1; /* +1 for space between args */
126 
127 	str_size += 20; /* Max number of chars in a printed u64 number */
128 
129 	str = kzalloc(str_size, GFP_KERNEL);
130 	if (!str) {
131 		DMWARN("Unable to allocate memory for constructor string");
132 		return -ENOMEM;
133 	}
134 
135 	str_size = sprintf(str, "%llu", (unsigned long long)ti->len);
136 	for (i = 0; i < argc; i++)
137 		str_size += sprintf(str + str_size, " %s", argv[i]);
138 
139 	*ctr_str = str;
140 	return str_size;
141 }
142 
143 /*
144  * userspace_ctr
145  *
146  * argv contains:
147  *	<UUID> <other args>
148  * Where 'other args' is the userspace implementation specific log
149  * arguments.  An example might be:
150  *	<UUID> clustered-disk <arg count> <log dev> <region_size> [[no]sync]
151  *
152  * So, this module will strip off the <UUID> for identification purposes
153  * when communicating with userspace about a log; but will pass on everything
154  * else.
155  */
156 static int userspace_ctr(struct dm_dirty_log *log, struct dm_target *ti,
157 			 unsigned argc, char **argv)
158 {
159 	int r = 0;
160 	int str_size;
161 	char *ctr_str = NULL;
162 	struct log_c *lc = NULL;
163 	uint64_t rdata;
164 	size_t rdata_size = sizeof(rdata);
165 	char *devices_rdata = NULL;
166 	size_t devices_rdata_size = DM_NAME_LEN;
167 
168 	if (argc < 3) {
169 		DMWARN("Too few arguments to userspace dirty log");
170 		return -EINVAL;
171 	}
172 
173 	lc = kzalloc(sizeof(*lc), GFP_KERNEL);
174 	if (!lc) {
175 		DMWARN("Unable to allocate userspace log context.");
176 		return -ENOMEM;
177 	}
178 
179 	/* The ptr value is sufficient for local unique id */
180 	lc->luid = (unsigned long)lc;
181 
182 	lc->ti = ti;
183 
184 	if (strlen(argv[0]) > (DM_UUID_LEN - 1)) {
185 		DMWARN("UUID argument too long.");
186 		kfree(lc);
187 		return -EINVAL;
188 	}
189 
190 	strncpy(lc->uuid, argv[0], DM_UUID_LEN);
191 	spin_lock_init(&lc->flush_lock);
192 	INIT_LIST_HEAD(&lc->mark_list);
193 	INIT_LIST_HEAD(&lc->clear_list);
194 
195 	str_size = build_constructor_string(ti, argc - 1, argv + 1, &ctr_str);
196 	if (str_size < 0) {
197 		kfree(lc);
198 		return str_size;
199 	}
200 
201 	devices_rdata = kzalloc(devices_rdata_size, GFP_KERNEL);
202 	if (!devices_rdata) {
203 		DMERR("Failed to allocate memory for device information");
204 		r = -ENOMEM;
205 		goto out;
206 	}
207 
208 	/*
209 	 * Send table string and get back any opened device.
210 	 */
211 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_CTR,
212 				 ctr_str, str_size,
213 				 devices_rdata, &devices_rdata_size);
214 
215 	if (r < 0) {
216 		if (r == -ESRCH)
217 			DMERR("Userspace log server not found");
218 		else
219 			DMERR("Userspace log server failed to create log");
220 		goto out;
221 	}
222 
223 	/* Since the region size does not change, get it now */
224 	rdata_size = sizeof(rdata);
225 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_GET_REGION_SIZE,
226 				 NULL, 0, (char *)&rdata, &rdata_size);
227 
228 	if (r) {
229 		DMERR("Failed to get region size of dirty log");
230 		goto out;
231 	}
232 
233 	lc->region_size = (uint32_t)rdata;
234 	lc->region_count = dm_sector_div_up(ti->len, lc->region_size);
235 
236 	if (devices_rdata_size) {
237 		if (devices_rdata[devices_rdata_size - 1] != '\0') {
238 			DMERR("DM_ULOG_CTR device return string not properly terminated");
239 			r = -EINVAL;
240 			goto out;
241 		}
242 		r = dm_get_device(ti, devices_rdata,
243 				  dm_table_get_mode(ti->table), &lc->log_dev);
244 		if (r)
245 			DMERR("Failed to register %s with device-mapper",
246 			      devices_rdata);
247 	}
248 out:
249 	kfree(devices_rdata);
250 	if (r) {
251 		kfree(lc);
252 		kfree(ctr_str);
253 	} else {
254 		lc->usr_argv_str = ctr_str;
255 		lc->usr_argc = argc;
256 		log->context = lc;
257 	}
258 
259 	return r;
260 }
261 
262 static void userspace_dtr(struct dm_dirty_log *log)
263 {
264 	struct log_c *lc = log->context;
265 
266 	(void) dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_DTR,
267 				 NULL, 0,
268 				 NULL, NULL);
269 
270 	if (lc->log_dev)
271 		dm_put_device(lc->ti, lc->log_dev);
272 
273 	kfree(lc->usr_argv_str);
274 	kfree(lc);
275 
276 	return;
277 }
278 
279 static int userspace_presuspend(struct dm_dirty_log *log)
280 {
281 	int r;
282 	struct log_c *lc = log->context;
283 
284 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_PRESUSPEND,
285 				 NULL, 0,
286 				 NULL, NULL);
287 
288 	return r;
289 }
290 
291 static int userspace_postsuspend(struct dm_dirty_log *log)
292 {
293 	int r;
294 	struct log_c *lc = log->context;
295 
296 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_POSTSUSPEND,
297 				 NULL, 0,
298 				 NULL, NULL);
299 
300 	return r;
301 }
302 
303 static int userspace_resume(struct dm_dirty_log *log)
304 {
305 	int r;
306 	struct log_c *lc = log->context;
307 
308 	lc->in_sync_hint = 0;
309 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_RESUME,
310 				 NULL, 0,
311 				 NULL, NULL);
312 
313 	return r;
314 }
315 
316 static uint32_t userspace_get_region_size(struct dm_dirty_log *log)
317 {
318 	struct log_c *lc = log->context;
319 
320 	return lc->region_size;
321 }
322 
323 /*
324  * userspace_is_clean
325  *
326  * Check whether a region is clean.  If there is any sort of
327  * failure when consulting the server, we return not clean.
328  *
329  * Returns: 1 if clean, 0 otherwise
330  */
331 static int userspace_is_clean(struct dm_dirty_log *log, region_t region)
332 {
333 	int r;
334 	uint64_t region64 = (uint64_t)region;
335 	int64_t is_clean;
336 	size_t rdata_size;
337 	struct log_c *lc = log->context;
338 
339 	rdata_size = sizeof(is_clean);
340 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_IS_CLEAN,
341 				 (char *)&region64, sizeof(region64),
342 				 (char *)&is_clean, &rdata_size);
343 
344 	return (r) ? 0 : (int)is_clean;
345 }
346 
347 /*
348  * userspace_in_sync
349  *
350  * Check if the region is in-sync.  If there is any sort
351  * of failure when consulting the server, we assume that
352  * the region is not in sync.
353  *
354  * If 'can_block' is set, return immediately
355  *
356  * Returns: 1 if in-sync, 0 if not-in-sync, -EWOULDBLOCK
357  */
358 static int userspace_in_sync(struct dm_dirty_log *log, region_t region,
359 			     int can_block)
360 {
361 	int r;
362 	uint64_t region64 = region;
363 	int64_t in_sync;
364 	size_t rdata_size;
365 	struct log_c *lc = log->context;
366 
367 	/*
368 	 * We can never respond directly - even if in_sync_hint is
369 	 * set.  This is because another machine could see a device
370 	 * failure and mark the region out-of-sync.  If we don't go
371 	 * to userspace to ask, we might think the region is in-sync
372 	 * and allow a read to pick up data that is stale.  (This is
373 	 * very unlikely if a device actually fails; but it is very
374 	 * likely if a connection to one device from one machine fails.)
375 	 *
376 	 * There still might be a problem if the mirror caches the region
377 	 * state as in-sync... but then this call would not be made.  So,
378 	 * that is a mirror problem.
379 	 */
380 	if (!can_block)
381 		return -EWOULDBLOCK;
382 
383 	rdata_size = sizeof(in_sync);
384 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_IN_SYNC,
385 				 (char *)&region64, sizeof(region64),
386 				 (char *)&in_sync, &rdata_size);
387 	return (r) ? 0 : (int)in_sync;
388 }
389 
390 static int flush_one_by_one(struct log_c *lc, struct list_head *flush_list)
391 {
392 	int r = 0;
393 	struct flush_entry *fe;
394 
395 	list_for_each_entry(fe, flush_list, list) {
396 		r = userspace_do_request(lc, lc->uuid, fe->type,
397 					 (char *)&fe->region,
398 					 sizeof(fe->region),
399 					 NULL, NULL);
400 		if (r)
401 			break;
402 	}
403 
404 	return r;
405 }
406 
407 static int flush_by_group(struct log_c *lc, struct list_head *flush_list)
408 {
409 	int r = 0;
410 	int count;
411 	uint32_t type = 0;
412 	struct flush_entry *fe, *tmp_fe;
413 	LIST_HEAD(tmp_list);
414 	uint64_t group[MAX_FLUSH_GROUP_COUNT];
415 
416 	/*
417 	 * Group process the requests
418 	 */
419 	while (!list_empty(flush_list)) {
420 		count = 0;
421 
422 		list_for_each_entry_safe(fe, tmp_fe, flush_list, list) {
423 			group[count] = fe->region;
424 			count++;
425 
426 			list_move(&fe->list, &tmp_list);
427 
428 			type = fe->type;
429 			if (count >= MAX_FLUSH_GROUP_COUNT)
430 				break;
431 		}
432 
433 		r = userspace_do_request(lc, lc->uuid, type,
434 					 (char *)(group),
435 					 count * sizeof(uint64_t),
436 					 NULL, NULL);
437 		if (r) {
438 			/* Group send failed.  Attempt one-by-one. */
439 			list_splice_init(&tmp_list, flush_list);
440 			r = flush_one_by_one(lc, flush_list);
441 			break;
442 		}
443 	}
444 
445 	/*
446 	 * Must collect flush_entrys that were successfully processed
447 	 * as a group so that they will be free'd by the caller.
448 	 */
449 	list_splice_init(&tmp_list, flush_list);
450 
451 	return r;
452 }
453 
454 /*
455  * userspace_flush
456  *
457  * This function is ok to block.
458  * The flush happens in two stages.  First, it sends all
459  * clear/mark requests that are on the list.  Then it
460  * tells the server to commit them.  This gives the
461  * server a chance to optimise the commit, instead of
462  * doing it for every request.
463  *
464  * Additionally, we could implement another thread that
465  * sends the requests up to the server - reducing the
466  * load on flush.  Then the flush would have less in
467  * the list and be responsible for the finishing commit.
468  *
469  * Returns: 0 on success, < 0 on failure
470  */
471 static int userspace_flush(struct dm_dirty_log *log)
472 {
473 	int r = 0;
474 	unsigned long flags;
475 	struct log_c *lc = log->context;
476 	LIST_HEAD(mark_list);
477 	LIST_HEAD(clear_list);
478 	struct flush_entry *fe, *tmp_fe;
479 
480 	spin_lock_irqsave(&lc->flush_lock, flags);
481 	list_splice_init(&lc->mark_list, &mark_list);
482 	list_splice_init(&lc->clear_list, &clear_list);
483 	spin_unlock_irqrestore(&lc->flush_lock, flags);
484 
485 	if (list_empty(&mark_list) && list_empty(&clear_list))
486 		return 0;
487 
488 	r = flush_by_group(lc, &mark_list);
489 	if (r)
490 		goto fail;
491 
492 	r = flush_by_group(lc, &clear_list);
493 	if (r)
494 		goto fail;
495 
496 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH,
497 				 NULL, 0, NULL, NULL);
498 
499 fail:
500 	/*
501 	 * We can safely remove these entries, even if failure.
502 	 * Calling code will receive an error and will know that
503 	 * the log facility has failed.
504 	 */
505 	list_for_each_entry_safe(fe, tmp_fe, &mark_list, list) {
506 		list_del(&fe->list);
507 		mempool_free(fe, flush_entry_pool);
508 	}
509 	list_for_each_entry_safe(fe, tmp_fe, &clear_list, list) {
510 		list_del(&fe->list);
511 		mempool_free(fe, flush_entry_pool);
512 	}
513 
514 	if (r)
515 		dm_table_event(lc->ti->table);
516 
517 	return r;
518 }
519 
520 /*
521  * userspace_mark_region
522  *
523  * This function should avoid blocking unless absolutely required.
524  * (Memory allocation is valid for blocking.)
525  */
526 static void userspace_mark_region(struct dm_dirty_log *log, region_t region)
527 {
528 	unsigned long flags;
529 	struct log_c *lc = log->context;
530 	struct flush_entry *fe;
531 
532 	/* Wait for an allocation, but _never_ fail */
533 	fe = mempool_alloc(flush_entry_pool, GFP_NOIO);
534 	BUG_ON(!fe);
535 
536 	spin_lock_irqsave(&lc->flush_lock, flags);
537 	fe->type = DM_ULOG_MARK_REGION;
538 	fe->region = region;
539 	list_add(&fe->list, &lc->mark_list);
540 	spin_unlock_irqrestore(&lc->flush_lock, flags);
541 
542 	return;
543 }
544 
545 /*
546  * userspace_clear_region
547  *
548  * This function must not block.
549  * So, the alloc can't block.  In the worst case, it is ok to
550  * fail.  It would simply mean we can't clear the region.
551  * Does nothing to current sync context, but does mean
552  * the region will be re-sync'ed on a reload of the mirror
553  * even though it is in-sync.
554  */
555 static void userspace_clear_region(struct dm_dirty_log *log, region_t region)
556 {
557 	unsigned long flags;
558 	struct log_c *lc = log->context;
559 	struct flush_entry *fe;
560 
561 	/*
562 	 * If we fail to allocate, we skip the clearing of
563 	 * the region.  This doesn't hurt us in any way, except
564 	 * to cause the region to be resync'ed when the
565 	 * device is activated next time.
566 	 */
567 	fe = mempool_alloc(flush_entry_pool, GFP_ATOMIC);
568 	if (!fe) {
569 		DMERR("Failed to allocate memory to clear region.");
570 		return;
571 	}
572 
573 	spin_lock_irqsave(&lc->flush_lock, flags);
574 	fe->type = DM_ULOG_CLEAR_REGION;
575 	fe->region = region;
576 	list_add(&fe->list, &lc->clear_list);
577 	spin_unlock_irqrestore(&lc->flush_lock, flags);
578 
579 	return;
580 }
581 
582 /*
583  * userspace_get_resync_work
584  *
585  * Get a region that needs recovery.  It is valid to return
586  * an error for this function.
587  *
588  * Returns: 1 if region filled, 0 if no work, <0 on error
589  */
590 static int userspace_get_resync_work(struct dm_dirty_log *log, region_t *region)
591 {
592 	int r;
593 	size_t rdata_size;
594 	struct log_c *lc = log->context;
595 	struct {
596 		int64_t i; /* 64-bit for mix arch compatibility */
597 		region_t r;
598 	} pkg;
599 
600 	if (lc->in_sync_hint >= lc->region_count)
601 		return 0;
602 
603 	rdata_size = sizeof(pkg);
604 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_GET_RESYNC_WORK,
605 				 NULL, 0,
606 				 (char *)&pkg, &rdata_size);
607 
608 	*region = pkg.r;
609 	return (r) ? r : (int)pkg.i;
610 }
611 
612 /*
613  * userspace_set_region_sync
614  *
615  * Set the sync status of a given region.  This function
616  * must not fail.
617  */
618 static void userspace_set_region_sync(struct dm_dirty_log *log,
619 				      region_t region, int in_sync)
620 {
621 	int r;
622 	struct log_c *lc = log->context;
623 	struct {
624 		region_t r;
625 		int64_t i;
626 	} pkg;
627 
628 	pkg.r = region;
629 	pkg.i = (int64_t)in_sync;
630 
631 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_SET_REGION_SYNC,
632 				 (char *)&pkg, sizeof(pkg),
633 				 NULL, NULL);
634 
635 	/*
636 	 * It would be nice to be able to report failures.
637 	 * However, it is easy emough to detect and resolve.
638 	 */
639 	return;
640 }
641 
642 /*
643  * userspace_get_sync_count
644  *
645  * If there is any sort of failure when consulting the server,
646  * we assume that the sync count is zero.
647  *
648  * Returns: sync count on success, 0 on failure
649  */
650 static region_t userspace_get_sync_count(struct dm_dirty_log *log)
651 {
652 	int r;
653 	size_t rdata_size;
654 	uint64_t sync_count;
655 	struct log_c *lc = log->context;
656 
657 	rdata_size = sizeof(sync_count);
658 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_GET_SYNC_COUNT,
659 				 NULL, 0,
660 				 (char *)&sync_count, &rdata_size);
661 
662 	if (r)
663 		return 0;
664 
665 	if (sync_count >= lc->region_count)
666 		lc->in_sync_hint = lc->region_count;
667 
668 	return (region_t)sync_count;
669 }
670 
671 /*
672  * userspace_status
673  *
674  * Returns: amount of space consumed
675  */
676 static int userspace_status(struct dm_dirty_log *log, status_type_t status_type,
677 			    char *result, unsigned maxlen)
678 {
679 	int r = 0;
680 	char *table_args;
681 	size_t sz = (size_t)maxlen;
682 	struct log_c *lc = log->context;
683 
684 	switch (status_type) {
685 	case STATUSTYPE_INFO:
686 		r = userspace_do_request(lc, lc->uuid, DM_ULOG_STATUS_INFO,
687 					 NULL, 0,
688 					 result, &sz);
689 
690 		if (r) {
691 			sz = 0;
692 			DMEMIT("%s 1 COM_FAILURE", log->type->name);
693 		}
694 		break;
695 	case STATUSTYPE_TABLE:
696 		sz = 0;
697 		table_args = strchr(lc->usr_argv_str, ' ');
698 		BUG_ON(!table_args); /* There will always be a ' ' */
699 		table_args++;
700 
701 		DMEMIT("%s %u %s %s ", log->type->name, lc->usr_argc,
702 		       lc->uuid, table_args);
703 		break;
704 	}
705 	return (r) ? 0 : (int)sz;
706 }
707 
708 /*
709  * userspace_is_remote_recovering
710  *
711  * Returns: 1 if region recovering, 0 otherwise
712  */
713 static int userspace_is_remote_recovering(struct dm_dirty_log *log,
714 					  region_t region)
715 {
716 	int r;
717 	uint64_t region64 = region;
718 	struct log_c *lc = log->context;
719 	static unsigned long long limit;
720 	struct {
721 		int64_t is_recovering;
722 		uint64_t in_sync_hint;
723 	} pkg;
724 	size_t rdata_size = sizeof(pkg);
725 
726 	/*
727 	 * Once the mirror has been reported to be in-sync,
728 	 * it will never again ask for recovery work.  So,
729 	 * we can safely say there is not a remote machine
730 	 * recovering if the device is in-sync.  (in_sync_hint
731 	 * must be reset at resume time.)
732 	 */
733 	if (region < lc->in_sync_hint)
734 		return 0;
735 	else if (jiffies < limit)
736 		return 1;
737 
738 	limit = jiffies + (HZ / 4);
739 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_IS_REMOTE_RECOVERING,
740 				 (char *)&region64, sizeof(region64),
741 				 (char *)&pkg, &rdata_size);
742 	if (r)
743 		return 1;
744 
745 	lc->in_sync_hint = pkg.in_sync_hint;
746 
747 	return (int)pkg.is_recovering;
748 }
749 
750 static struct dm_dirty_log_type _userspace_type = {
751 	.name = "userspace",
752 	.module = THIS_MODULE,
753 	.ctr = userspace_ctr,
754 	.dtr = userspace_dtr,
755 	.presuspend = userspace_presuspend,
756 	.postsuspend = userspace_postsuspend,
757 	.resume = userspace_resume,
758 	.get_region_size = userspace_get_region_size,
759 	.is_clean = userspace_is_clean,
760 	.in_sync = userspace_in_sync,
761 	.flush = userspace_flush,
762 	.mark_region = userspace_mark_region,
763 	.clear_region = userspace_clear_region,
764 	.get_resync_work = userspace_get_resync_work,
765 	.set_region_sync = userspace_set_region_sync,
766 	.get_sync_count = userspace_get_sync_count,
767 	.status = userspace_status,
768 	.is_remote_recovering = userspace_is_remote_recovering,
769 };
770 
771 static int __init userspace_dirty_log_init(void)
772 {
773 	int r = 0;
774 
775 	flush_entry_pool = mempool_create(100, flush_entry_alloc,
776 					  flush_entry_free, NULL);
777 
778 	if (!flush_entry_pool) {
779 		DMWARN("Unable to create flush_entry_pool:  No memory.");
780 		return -ENOMEM;
781 	}
782 
783 	r = dm_ulog_tfr_init();
784 	if (r) {
785 		DMWARN("Unable to initialize userspace log communications");
786 		mempool_destroy(flush_entry_pool);
787 		return r;
788 	}
789 
790 	r = dm_dirty_log_type_register(&_userspace_type);
791 	if (r) {
792 		DMWARN("Couldn't register userspace dirty log type");
793 		dm_ulog_tfr_exit();
794 		mempool_destroy(flush_entry_pool);
795 		return r;
796 	}
797 
798 	DMINFO("version " DM_LOG_USERSPACE_VSN " loaded");
799 	return 0;
800 }
801 
802 static void __exit userspace_dirty_log_exit(void)
803 {
804 	dm_dirty_log_type_unregister(&_userspace_type);
805 	dm_ulog_tfr_exit();
806 	mempool_destroy(flush_entry_pool);
807 
808 	DMINFO("version " DM_LOG_USERSPACE_VSN " unloaded");
809 	return;
810 }
811 
812 module_init(userspace_dirty_log_init);
813 module_exit(userspace_dirty_log_exit);
814 
815 MODULE_DESCRIPTION(DM_NAME " userspace dirty log link");
816 MODULE_AUTHOR("Jonathan Brassow <dm-devel@redhat.com>");
817 MODULE_LICENSE("GPL");
818