xref: /openbmc/qemu/block/replication.c (revision 7f709ce7)
1 /*
2  * Replication Block filter
3  *
4  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
5  * Copyright (c) 2016 Intel Corporation
6  * Copyright (c) 2016 FUJITSU LIMITED
7  *
8  * Author:
9  *   Wen Congyang <wency@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "block/nbd.h"
18 #include "block/blockjob.h"
19 #include "block/block_int.h"
20 #include "block/block_backup.h"
21 #include "sysemu/block-backend.h"
22 #include "qapi/error.h"
23 #include "replication.h"
24 
25 typedef enum {
26     BLOCK_REPLICATION_NONE,             /* block replication is not started */
27     BLOCK_REPLICATION_RUNNING,          /* block replication is running */
28     BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
29     BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
30     BLOCK_REPLICATION_DONE,             /* block replication is done */
31 } ReplicationStage;
32 
33 typedef struct BDRVReplicationState {
34     ReplicationMode mode;
35     ReplicationStage stage;
36     BdrvChild *active_disk;
37     BdrvChild *hidden_disk;
38     BdrvChild *secondary_disk;
39     char *top_id;
40     ReplicationState *rs;
41     Error *blocker;
42     int orig_hidden_flags;
43     int orig_secondary_flags;
44     int error;
45 } BDRVReplicationState;
46 
47 static void replication_start(ReplicationState *rs, ReplicationMode mode,
48                               Error **errp);
49 static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
50 static void replication_get_error(ReplicationState *rs, Error **errp);
51 static void replication_stop(ReplicationState *rs, bool failover,
52                              Error **errp);
53 
54 #define REPLICATION_MODE        "mode"
55 #define REPLICATION_TOP_ID      "top-id"
56 static QemuOptsList replication_runtime_opts = {
57     .name = "replication",
58     .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
59     .desc = {
60         {
61             .name = REPLICATION_MODE,
62             .type = QEMU_OPT_STRING,
63         },
64         {
65             .name = REPLICATION_TOP_ID,
66             .type = QEMU_OPT_STRING,
67         },
68         { /* end of list */ }
69     },
70 };
71 
72 static ReplicationOps replication_ops = {
73     .start = replication_start,
74     .checkpoint = replication_do_checkpoint,
75     .get_error = replication_get_error,
76     .stop = replication_stop,
77 };
78 
79 static int replication_open(BlockDriverState *bs, QDict *options,
80                             int flags, Error **errp)
81 {
82     int ret;
83     BDRVReplicationState *s = bs->opaque;
84     Error *local_err = NULL;
85     QemuOpts *opts = NULL;
86     const char *mode;
87     const char *top_id;
88 
89     bs->file = bdrv_open_child(NULL, options, "file", bs, &child_file,
90                                false, errp);
91     if (!bs->file) {
92         return -EINVAL;
93     }
94 
95     ret = -EINVAL;
96     opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
97     qemu_opts_absorb_qdict(opts, options, &local_err);
98     if (local_err) {
99         goto fail;
100     }
101 
102     mode = qemu_opt_get(opts, REPLICATION_MODE);
103     if (!mode) {
104         error_setg(&local_err, "Missing the option mode");
105         goto fail;
106     }
107 
108     if (!strcmp(mode, "primary")) {
109         s->mode = REPLICATION_MODE_PRIMARY;
110         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
111         if (top_id) {
112             error_setg(&local_err, "The primary side does not support option top-id");
113             goto fail;
114         }
115     } else if (!strcmp(mode, "secondary")) {
116         s->mode = REPLICATION_MODE_SECONDARY;
117         top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
118         s->top_id = g_strdup(top_id);
119         if (!s->top_id) {
120             error_setg(&local_err, "Missing the option top-id");
121             goto fail;
122         }
123     } else {
124         error_setg(&local_err,
125                    "The option mode's value should be primary or secondary");
126         goto fail;
127     }
128 
129     s->rs = replication_new(bs, &replication_ops);
130 
131     ret = 0;
132 
133 fail:
134     qemu_opts_del(opts);
135     error_propagate(errp, local_err);
136 
137     return ret;
138 }
139 
140 static void replication_close(BlockDriverState *bs)
141 {
142     BDRVReplicationState *s = bs->opaque;
143 
144     if (s->stage == BLOCK_REPLICATION_RUNNING) {
145         replication_stop(s->rs, false, NULL);
146     }
147     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
148         block_job_cancel_sync(s->active_disk->bs->job);
149     }
150 
151     if (s->mode == REPLICATION_MODE_SECONDARY) {
152         g_free(s->top_id);
153     }
154 
155     replication_remove(s->rs);
156 }
157 
158 static void replication_child_perm(BlockDriverState *bs, BdrvChild *c,
159                                    const BdrvChildRole *role,
160                                    BlockReopenQueue *reopen_queue,
161                                    uint64_t perm, uint64_t shared,
162                                    uint64_t *nperm, uint64_t *nshared)
163 {
164     *nperm = BLK_PERM_CONSISTENT_READ;
165     if ((bs->open_flags & (BDRV_O_INACTIVE | BDRV_O_RDWR)) == BDRV_O_RDWR) {
166         *nperm |= BLK_PERM_WRITE;
167     }
168     *nshared = BLK_PERM_CONSISTENT_READ \
169                | BLK_PERM_WRITE \
170                | BLK_PERM_WRITE_UNCHANGED;
171     return;
172 }
173 
174 static int64_t replication_getlength(BlockDriverState *bs)
175 {
176     return bdrv_getlength(bs->file->bs);
177 }
178 
179 static int replication_get_io_status(BDRVReplicationState *s)
180 {
181     switch (s->stage) {
182     case BLOCK_REPLICATION_NONE:
183         return -EIO;
184     case BLOCK_REPLICATION_RUNNING:
185         return 0;
186     case BLOCK_REPLICATION_FAILOVER:
187         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
188     case BLOCK_REPLICATION_FAILOVER_FAILED:
189         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
190     case BLOCK_REPLICATION_DONE:
191         /*
192          * active commit job completes, and active disk and secondary_disk
193          * is swapped, so we can operate bs->file directly
194          */
195         return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
196     default:
197         abort();
198     }
199 }
200 
201 static int replication_return_value(BDRVReplicationState *s, int ret)
202 {
203     if (s->mode == REPLICATION_MODE_SECONDARY) {
204         return ret;
205     }
206 
207     if (ret < 0) {
208         s->error = ret;
209         ret = 0;
210     }
211 
212     return ret;
213 }
214 
215 static coroutine_fn int replication_co_readv(BlockDriverState *bs,
216                                              int64_t sector_num,
217                                              int remaining_sectors,
218                                              QEMUIOVector *qiov)
219 {
220     BDRVReplicationState *s = bs->opaque;
221     BdrvChild *child = s->secondary_disk;
222     BlockJob *job = NULL;
223     CowRequest req;
224     int ret;
225 
226     if (s->mode == REPLICATION_MODE_PRIMARY) {
227         /* We only use it to forward primary write requests */
228         return -EIO;
229     }
230 
231     ret = replication_get_io_status(s);
232     if (ret < 0) {
233         return ret;
234     }
235 
236     if (child && child->bs) {
237         job = child->bs->job;
238     }
239 
240     if (job) {
241         uint64_t remaining_bytes = remaining_sectors * BDRV_SECTOR_SIZE;
242 
243         backup_wait_for_overlapping_requests(child->bs->job,
244                                              sector_num * BDRV_SECTOR_SIZE,
245                                              remaining_bytes);
246         backup_cow_request_begin(&req, child->bs->job,
247                                  sector_num * BDRV_SECTOR_SIZE,
248                                  remaining_bytes);
249         ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors,
250                             qiov);
251         backup_cow_request_end(&req);
252         goto out;
253     }
254 
255     ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors, qiov);
256 out:
257     return replication_return_value(s, ret);
258 }
259 
260 static coroutine_fn int replication_co_writev(BlockDriverState *bs,
261                                               int64_t sector_num,
262                                               int remaining_sectors,
263                                               QEMUIOVector *qiov)
264 {
265     BDRVReplicationState *s = bs->opaque;
266     QEMUIOVector hd_qiov;
267     uint64_t bytes_done = 0;
268     BdrvChild *top = bs->file;
269     BdrvChild *base = s->secondary_disk;
270     BdrvChild *target;
271     int ret;
272     int64_t n;
273 
274     ret = replication_get_io_status(s);
275     if (ret < 0) {
276         goto out;
277     }
278 
279     if (ret == 0) {
280         ret = bdrv_co_writev(top, sector_num,
281                              remaining_sectors, qiov);
282         return replication_return_value(s, ret);
283     }
284 
285     /*
286      * Failover failed, only write to active disk if the sectors
287      * have already been allocated in active disk/hidden disk.
288      */
289     qemu_iovec_init(&hd_qiov, qiov->niov);
290     while (remaining_sectors > 0) {
291         int64_t count;
292 
293         ret = bdrv_is_allocated_above(top->bs, base->bs,
294                                       sector_num * BDRV_SECTOR_SIZE,
295                                       remaining_sectors * BDRV_SECTOR_SIZE,
296                                       &count);
297         if (ret < 0) {
298             goto out1;
299         }
300 
301         assert(QEMU_IS_ALIGNED(count, BDRV_SECTOR_SIZE));
302         n = count >> BDRV_SECTOR_BITS;
303         qemu_iovec_reset(&hd_qiov);
304         qemu_iovec_concat(&hd_qiov, qiov, bytes_done, count);
305 
306         target = ret ? top : base;
307         ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
308         if (ret < 0) {
309             goto out1;
310         }
311 
312         remaining_sectors -= n;
313         sector_num += n;
314         bytes_done += count;
315     }
316 
317 out1:
318     qemu_iovec_destroy(&hd_qiov);
319 out:
320     return ret;
321 }
322 
323 static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
324                                                     BlockDriverState *candidate)
325 {
326     return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
327 }
328 
329 static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
330 {
331     Error *local_err = NULL;
332     int ret;
333 
334     if (!s->secondary_disk->bs->job) {
335         error_setg(errp, "Backup job was cancelled unexpectedly");
336         return;
337     }
338 
339     backup_do_checkpoint(s->secondary_disk->bs->job, &local_err);
340     if (local_err) {
341         error_propagate(errp, local_err);
342         return;
343     }
344 
345     if (!s->active_disk->bs->drv) {
346         error_setg(errp, "Active disk %s is ejected",
347                    s->active_disk->bs->node_name);
348         return;
349     }
350 
351     ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
352     if (ret < 0) {
353         error_setg(errp, "Cannot make active disk empty");
354         return;
355     }
356 
357     if (!s->hidden_disk->bs->drv) {
358         error_setg(errp, "Hidden disk %s is ejected",
359                    s->hidden_disk->bs->node_name);
360         return;
361     }
362 
363     ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
364     if (ret < 0) {
365         error_setg(errp, "Cannot make hidden disk empty");
366         return;
367     }
368 }
369 
370 static void reopen_backing_file(BlockDriverState *bs, bool writable,
371                                 Error **errp)
372 {
373     BDRVReplicationState *s = bs->opaque;
374     BlockReopenQueue *reopen_queue = NULL;
375     int orig_hidden_flags, orig_secondary_flags;
376     int new_hidden_flags, new_secondary_flags;
377     Error *local_err = NULL;
378 
379     if (writable) {
380         orig_hidden_flags = s->orig_hidden_flags =
381                                 bdrv_get_flags(s->hidden_disk->bs);
382         new_hidden_flags = (orig_hidden_flags | BDRV_O_RDWR) &
383                                                     ~BDRV_O_INACTIVE;
384         orig_secondary_flags = s->orig_secondary_flags =
385                                 bdrv_get_flags(s->secondary_disk->bs);
386         new_secondary_flags = (orig_secondary_flags | BDRV_O_RDWR) &
387                                                      ~BDRV_O_INACTIVE;
388     } else {
389         orig_hidden_flags = (s->orig_hidden_flags | BDRV_O_RDWR) &
390                                                     ~BDRV_O_INACTIVE;
391         new_hidden_flags = s->orig_hidden_flags;
392         orig_secondary_flags = (s->orig_secondary_flags | BDRV_O_RDWR) &
393                                                     ~BDRV_O_INACTIVE;
394         new_secondary_flags = s->orig_secondary_flags;
395     }
396 
397     if (orig_hidden_flags != new_hidden_flags) {
398         reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs, NULL,
399                                          new_hidden_flags);
400     }
401 
402     if (!(orig_secondary_flags & BDRV_O_RDWR)) {
403         reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
404                                          NULL, new_secondary_flags);
405     }
406 
407     if (reopen_queue) {
408         bdrv_reopen_multiple(bdrv_get_aio_context(bs),
409                              reopen_queue, &local_err);
410         error_propagate(errp, local_err);
411     }
412 }
413 
414 static void backup_job_cleanup(BlockDriverState *bs)
415 {
416     BDRVReplicationState *s = bs->opaque;
417     BlockDriverState *top_bs;
418 
419     top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
420     if (!top_bs) {
421         return;
422     }
423     bdrv_op_unblock_all(top_bs, s->blocker);
424     error_free(s->blocker);
425     reopen_backing_file(bs, false, NULL);
426 }
427 
428 static void backup_job_completed(void *opaque, int ret)
429 {
430     BlockDriverState *bs = opaque;
431     BDRVReplicationState *s = bs->opaque;
432 
433     if (s->stage != BLOCK_REPLICATION_FAILOVER) {
434         /* The backup job is cancelled unexpectedly */
435         s->error = -EIO;
436     }
437 
438     backup_job_cleanup(bs);
439 }
440 
441 static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
442 {
443     BdrvChild *child;
444 
445     /* The bs itself is the top_bs */
446     if (top_bs == bs) {
447         return true;
448     }
449 
450     /* Iterate over top_bs's children */
451     QLIST_FOREACH(child, &top_bs->children, next) {
452         if (child->bs == bs || check_top_bs(child->bs, bs)) {
453             return true;
454         }
455     }
456 
457     return false;
458 }
459 
460 static void replication_start(ReplicationState *rs, ReplicationMode mode,
461                               Error **errp)
462 {
463     BlockDriverState *bs = rs->opaque;
464     BDRVReplicationState *s;
465     BlockDriverState *top_bs;
466     int64_t active_length, hidden_length, disk_length;
467     AioContext *aio_context;
468     Error *local_err = NULL;
469     BlockJob *job;
470 
471     aio_context = bdrv_get_aio_context(bs);
472     aio_context_acquire(aio_context);
473     s = bs->opaque;
474 
475     if (s->stage != BLOCK_REPLICATION_NONE) {
476         error_setg(errp, "Block replication is running or done");
477         aio_context_release(aio_context);
478         return;
479     }
480 
481     if (s->mode != mode) {
482         error_setg(errp, "The parameter mode's value is invalid, needs %d,"
483                    " but got %d", s->mode, mode);
484         aio_context_release(aio_context);
485         return;
486     }
487 
488     switch (s->mode) {
489     case REPLICATION_MODE_PRIMARY:
490         break;
491     case REPLICATION_MODE_SECONDARY:
492         s->active_disk = bs->file;
493         if (!s->active_disk || !s->active_disk->bs ||
494                                     !s->active_disk->bs->backing) {
495             error_setg(errp, "Active disk doesn't have backing file");
496             aio_context_release(aio_context);
497             return;
498         }
499 
500         s->hidden_disk = s->active_disk->bs->backing;
501         if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
502             error_setg(errp, "Hidden disk doesn't have backing file");
503             aio_context_release(aio_context);
504             return;
505         }
506 
507         s->secondary_disk = s->hidden_disk->bs->backing;
508         if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
509             error_setg(errp, "The secondary disk doesn't have block backend");
510             aio_context_release(aio_context);
511             return;
512         }
513 
514         /* verify the length */
515         active_length = bdrv_getlength(s->active_disk->bs);
516         hidden_length = bdrv_getlength(s->hidden_disk->bs);
517         disk_length = bdrv_getlength(s->secondary_disk->bs);
518         if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
519             active_length != hidden_length || hidden_length != disk_length) {
520             error_setg(errp, "Active disk, hidden disk, secondary disk's length"
521                        " are not the same");
522             aio_context_release(aio_context);
523             return;
524         }
525 
526         /* Must be true, or the bdrv_getlength() calls would have failed */
527         assert(s->active_disk->bs->drv && s->hidden_disk->bs->drv);
528 
529         if (!s->active_disk->bs->drv->bdrv_make_empty ||
530             !s->hidden_disk->bs->drv->bdrv_make_empty) {
531             error_setg(errp,
532                        "Active disk or hidden disk doesn't support make_empty");
533             aio_context_release(aio_context);
534             return;
535         }
536 
537         /* reopen the backing file in r/w mode */
538         reopen_backing_file(bs, true, &local_err);
539         if (local_err) {
540             error_propagate(errp, local_err);
541             aio_context_release(aio_context);
542             return;
543         }
544 
545         /* start backup job now */
546         error_setg(&s->blocker,
547                    "Block device is in use by internal backup job");
548 
549         top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
550         if (!top_bs || !bdrv_is_root_node(top_bs) ||
551             !check_top_bs(top_bs, bs)) {
552             error_setg(errp, "No top_bs or it is invalid");
553             reopen_backing_file(bs, false, NULL);
554             aio_context_release(aio_context);
555             return;
556         }
557         bdrv_op_block_all(top_bs, s->blocker);
558         bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
559 
560         job = backup_job_create(NULL, s->secondary_disk->bs, s->hidden_disk->bs,
561                                 0, MIRROR_SYNC_MODE_NONE, NULL, false,
562                                 BLOCKDEV_ON_ERROR_REPORT,
563                                 BLOCKDEV_ON_ERROR_REPORT, BLOCK_JOB_INTERNAL,
564                                 backup_job_completed, bs, NULL, &local_err);
565         if (local_err) {
566             error_propagate(errp, local_err);
567             backup_job_cleanup(bs);
568             aio_context_release(aio_context);
569             return;
570         }
571         block_job_start(job);
572         break;
573     default:
574         aio_context_release(aio_context);
575         abort();
576     }
577 
578     s->stage = BLOCK_REPLICATION_RUNNING;
579 
580     if (s->mode == REPLICATION_MODE_SECONDARY) {
581         secondary_do_checkpoint(s, errp);
582     }
583 
584     s->error = 0;
585     aio_context_release(aio_context);
586 }
587 
588 static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
589 {
590     BlockDriverState *bs = rs->opaque;
591     BDRVReplicationState *s;
592     AioContext *aio_context;
593 
594     aio_context = bdrv_get_aio_context(bs);
595     aio_context_acquire(aio_context);
596     s = bs->opaque;
597 
598     if (s->mode == REPLICATION_MODE_SECONDARY) {
599         secondary_do_checkpoint(s, errp);
600     }
601     aio_context_release(aio_context);
602 }
603 
604 static void replication_get_error(ReplicationState *rs, Error **errp)
605 {
606     BlockDriverState *bs = rs->opaque;
607     BDRVReplicationState *s;
608     AioContext *aio_context;
609 
610     aio_context = bdrv_get_aio_context(bs);
611     aio_context_acquire(aio_context);
612     s = bs->opaque;
613 
614     if (s->stage != BLOCK_REPLICATION_RUNNING) {
615         error_setg(errp, "Block replication is not running");
616         aio_context_release(aio_context);
617         return;
618     }
619 
620     if (s->error) {
621         error_setg(errp, "I/O error occurred");
622         aio_context_release(aio_context);
623         return;
624     }
625     aio_context_release(aio_context);
626 }
627 
628 static void replication_done(void *opaque, int ret)
629 {
630     BlockDriverState *bs = opaque;
631     BDRVReplicationState *s = bs->opaque;
632 
633     if (ret == 0) {
634         s->stage = BLOCK_REPLICATION_DONE;
635 
636         /* refresh top bs's filename */
637         bdrv_refresh_filename(bs);
638         s->active_disk = NULL;
639         s->secondary_disk = NULL;
640         s->hidden_disk = NULL;
641         s->error = 0;
642     } else {
643         s->stage = BLOCK_REPLICATION_FAILOVER_FAILED;
644         s->error = -EIO;
645     }
646 }
647 
648 static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
649 {
650     BlockDriverState *bs = rs->opaque;
651     BDRVReplicationState *s;
652     AioContext *aio_context;
653 
654     aio_context = bdrv_get_aio_context(bs);
655     aio_context_acquire(aio_context);
656     s = bs->opaque;
657 
658     if (s->stage != BLOCK_REPLICATION_RUNNING) {
659         error_setg(errp, "Block replication is not running");
660         aio_context_release(aio_context);
661         return;
662     }
663 
664     switch (s->mode) {
665     case REPLICATION_MODE_PRIMARY:
666         s->stage = BLOCK_REPLICATION_DONE;
667         s->error = 0;
668         break;
669     case REPLICATION_MODE_SECONDARY:
670         /*
671          * This BDS will be closed, and the job should be completed
672          * before the BDS is closed, because we will access hidden
673          * disk, secondary disk in backup_job_completed().
674          */
675         if (s->secondary_disk->bs->job) {
676             block_job_cancel_sync(s->secondary_disk->bs->job);
677         }
678 
679         if (!failover) {
680             secondary_do_checkpoint(s, errp);
681             s->stage = BLOCK_REPLICATION_DONE;
682             aio_context_release(aio_context);
683             return;
684         }
685 
686         s->stage = BLOCK_REPLICATION_FAILOVER;
687         commit_active_start(NULL, s->active_disk->bs, s->secondary_disk->bs,
688                             BLOCK_JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
689                             NULL, replication_done, bs, true, errp);
690         break;
691     default:
692         aio_context_release(aio_context);
693         abort();
694     }
695     aio_context_release(aio_context);
696 }
697 
698 BlockDriver bdrv_replication = {
699     .format_name                = "replication",
700     .protocol_name              = "replication",
701     .instance_size              = sizeof(BDRVReplicationState),
702 
703     .bdrv_open                  = replication_open,
704     .bdrv_close                 = replication_close,
705     .bdrv_child_perm            = replication_child_perm,
706 
707     .bdrv_getlength             = replication_getlength,
708     .bdrv_co_readv              = replication_co_readv,
709     .bdrv_co_writev             = replication_co_writev,
710 
711     .is_filter                  = true,
712     .bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
713 
714     .has_variable_length        = true,
715 };
716 
717 static void bdrv_replication_init(void)
718 {
719     bdrv_register(&bdrv_replication);
720 }
721 
722 block_init(bdrv_replication_init);
723