xref: /openbmc/qemu/block/graph-lock.c (revision 587d82fa)
1 /*
2  * Graph lock: rwlock to protect block layer graph manipulations (add/remove
3  * edges and nodes)
4  *
5  *  Copyright (c) 2022 Red Hat
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "qemu/osdep.h"
22 #include "qemu/main-loop.h"
23 #include "block/graph-lock.h"
24 #include "block/block.h"
25 #include "block/block_int.h"
26 
27 /* Protects the list of aiocontext and orphaned_reader_count */
28 static QemuMutex aio_context_list_lock;
29 
30 /* Written and read with atomic operations. */
31 static int has_writer;
32 
33 /*
34  * A reader coroutine could move from an AioContext to another.
35  * If this happens, there is no problem from the point of view of
36  * counters. The problem is that the total count becomes
37  * unbalanced if one of the two AioContexts gets deleted.
38  * The count of readers must remain correct, so the AioContext's
39  * balance is transferred to this glboal variable.
40  * Protected by aio_context_list_lock.
41  */
42 static uint32_t orphaned_reader_count;
43 
44 /* Queue of readers waiting for the writer to finish */
45 static CoQueue reader_queue;
46 
47 struct BdrvGraphRWlock {
48     /* How many readers are currently reading the graph. */
49     uint32_t reader_count;
50 
51     /*
52      * List of BdrvGraphRWlock kept in graph-lock.c
53      * Protected by aio_context_list_lock
54      */
55     QTAILQ_ENTRY(BdrvGraphRWlock) next_aio;
56 };
57 
58 /*
59  * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
60  * can safely modify only its own counter, avoid reading/writing
61  * others and thus improving performances by avoiding cacheline bounces.
62  */
63 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list =
64     QTAILQ_HEAD_INITIALIZER(aio_context_list);
65 
66 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void)
67 {
68     qemu_mutex_init(&aio_context_list_lock);
69     qemu_co_queue_init(&reader_queue);
70 }
71 
72 void register_aiocontext(AioContext *ctx)
73 {
74     ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1);
75     QEMU_LOCK_GUARD(&aio_context_list_lock);
76     assert(ctx->bdrv_graph->reader_count == 0);
77     QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio);
78 }
79 
80 void unregister_aiocontext(AioContext *ctx)
81 {
82     QEMU_LOCK_GUARD(&aio_context_list_lock);
83     orphaned_reader_count += ctx->bdrv_graph->reader_count;
84     QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio);
85     g_free(ctx->bdrv_graph);
86 }
87 
88 static uint32_t reader_count(void)
89 {
90     BdrvGraphRWlock *brdv_graph;
91     uint32_t rd;
92 
93     QEMU_LOCK_GUARD(&aio_context_list_lock);
94 
95     /* rd can temporarly be negative, but the total will *always* be >= 0 */
96     rd = orphaned_reader_count;
97     QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) {
98         rd += qatomic_read(&brdv_graph->reader_count);
99     }
100 
101     /* shouldn't overflow unless there are 2^31 readers */
102     assert((int32_t)rd >= 0);
103     return rd;
104 }
105 
106 void bdrv_graph_wrlock(void)
107 {
108     GLOBAL_STATE_CODE();
109     assert(!qatomic_read(&has_writer));
110 
111     /* Make sure that constantly arriving new I/O doesn't cause starvation */
112     bdrv_drain_all_begin_nopoll();
113 
114     /*
115      * reader_count == 0: this means writer will read has_reader as 1
116      * reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
117      *                    but we need to wait.
118      * Wait by allowing other coroutine (and possible readers) to continue.
119      */
120     do {
121         /*
122          * has_writer must be 0 while polling, otherwise we get a deadlock if
123          * any callback involved during AIO_WAIT_WHILE() tries to acquire the
124          * reader lock.
125          */
126         qatomic_set(&has_writer, 0);
127         AIO_WAIT_WHILE(qemu_get_aio_context(), reader_count() >= 1);
128         qatomic_set(&has_writer, 1);
129 
130         /*
131          * We want to only check reader_count() after has_writer = 1 is visible
132          * to other threads. That way no more readers can sneak in after we've
133          * determined reader_count() == 0.
134          */
135         smp_mb();
136     } while (reader_count() >= 1);
137 
138     bdrv_drain_all_end();
139 }
140 
141 void bdrv_graph_wrunlock(void)
142 {
143     GLOBAL_STATE_CODE();
144     QEMU_LOCK_GUARD(&aio_context_list_lock);
145     assert(qatomic_read(&has_writer));
146 
147     /*
148      * No need for memory barriers, this works in pair with
149      * the slow path of rdlock() and both take the lock.
150      */
151     qatomic_store_release(&has_writer, 0);
152 
153     /* Wake up all coroutine that are waiting to read the graph */
154     qemu_co_enter_all(&reader_queue, &aio_context_list_lock);
155 }
156 
157 void coroutine_fn bdrv_graph_co_rdlock(void)
158 {
159     BdrvGraphRWlock *bdrv_graph;
160     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
161 
162     /* Do not lock if in main thread */
163     if (qemu_in_main_thread()) {
164         return;
165     }
166 
167     for (;;) {
168         qatomic_set(&bdrv_graph->reader_count,
169                     bdrv_graph->reader_count + 1);
170         /* make sure writer sees reader_count before we check has_writer */
171         smp_mb();
172 
173         /*
174          * has_writer == 0: this means writer will read reader_count as >= 1
175          * has_writer == 1: we don't know if writer read reader_count == 0
176          *                  or > 0, but we need to wait anyways because
177          *                  it will write.
178          */
179         if (!qatomic_read(&has_writer)) {
180             break;
181         }
182 
183         /*
184          * Synchronize access with reader_count() in bdrv_graph_wrlock().
185          * Case 1:
186          * If this critical section gets executed first, reader_count will
187          * decrease and the reader will go to sleep.
188          * Then the writer will read reader_count that does not take into
189          * account this reader, and if there's no other reader it will
190          * enter the write section.
191          * Case 2:
192          * If reader_count() critical section gets executed first,
193          * then writer will read reader_count >= 1.
194          * It will wait in AIO_WAIT_WHILE(), but once it releases the lock
195          * we will enter this critical section and call aio_wait_kick().
196          */
197         WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
198             /*
199              * Additional check when we use the above lock to synchronize
200              * with bdrv_graph_wrunlock().
201              * Case 1:
202              * If this gets executed first, has_writer is still 1, so we reduce
203              * reader_count and go to sleep.
204              * Then the writer will set has_writer to 0 and wake up all readers,
205              * us included.
206              * Case 2:
207              * If bdrv_graph_wrunlock() critical section gets executed first,
208              * then it will set has_writer to 0 and wake up all other readers.
209              * Then we execute this critical section, and therefore must check
210              * again for has_writer, otherwise we sleep without any writer
211              * actually running.
212              */
213             if (!qatomic_read(&has_writer)) {
214                 return;
215             }
216 
217             /* slow path where reader sleeps */
218             bdrv_graph->reader_count--;
219             aio_wait_kick();
220             qemu_co_queue_wait(&reader_queue, &aio_context_list_lock);
221         }
222     }
223 }
224 
225 void coroutine_fn bdrv_graph_co_rdunlock(void)
226 {
227     BdrvGraphRWlock *bdrv_graph;
228     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
229 
230     /* Do not lock if in main thread */
231     if (qemu_in_main_thread()) {
232         return;
233     }
234 
235     qatomic_store_release(&bdrv_graph->reader_count,
236                           bdrv_graph->reader_count - 1);
237     /* make sure writer sees reader_count before we check has_writer */
238     smp_mb();
239 
240     /*
241      * has_writer == 0: this means reader will read reader_count decreased
242      * has_writer == 1: we don't know if writer read reader_count old or
243      *                  new. Therefore, kick again so on next iteration
244      *                  writer will for sure read the updated value.
245      */
246     if (qatomic_read(&has_writer)) {
247         aio_wait_kick();
248     }
249 }
250 
251 void bdrv_graph_rdlock_main_loop(void)
252 {
253     GLOBAL_STATE_CODE();
254     assert(!qemu_in_coroutine());
255 }
256 
257 void bdrv_graph_rdunlock_main_loop(void)
258 {
259     GLOBAL_STATE_CODE();
260     assert(!qemu_in_coroutine());
261 }
262