xref: /openbmc/qemu/block/graph-lock.c (revision 68ff2eeb299d562e437b49e9bb98f9d6f62fbf06)
1 /*
2  * Graph lock: rwlock to protect block layer graph manipulations (add/remove
3  * edges and nodes)
4  *
5  *  Copyright (c) 2022 Red Hat
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "qemu/osdep.h"
22 #include "qemu/main-loop.h"
23 #include "block/graph-lock.h"
24 #include "block/block.h"
25 #include "block/block_int.h"
26 
27 /* Dummy lock object to use for Thread Safety Analysis (TSA) */
28 BdrvGraphLock graph_lock;
29 
30 /* Protects the list of aiocontext and orphaned_reader_count */
31 static QemuMutex aio_context_list_lock;
32 
33 /* Written and read with atomic operations. */
34 static int has_writer;
35 
36 /*
37  * Many write-locked sections are also drained sections. There is a convenience
38  * wrapper bdrv_graph_wrlock_drained() which begins a drained section before
39  * acquiring the lock. This variable here is used so bdrv_graph_wrunlock() knows
40  * if it also needs to end such a drained section. It needs to be a counter,
41  * because the aio_poll() call in bdrv_graph_wrlock() might re-enter
42  * bdrv_graph_wrlock_drained(). And note that aio_bh_poll() in
43  * bdrv_graph_wrunlock() might also re-enter a write-locked section.
44  */
45 static int wrlock_quiesced_counter;
46 
47 /*
48  * A reader coroutine could move from an AioContext to another.
49  * If this happens, there is no problem from the point of view of
50  * counters. The problem is that the total count becomes
51  * unbalanced if one of the two AioContexts gets deleted.
52  * The count of readers must remain correct, so the AioContext's
53  * balance is transferred to this glboal variable.
54  * Protected by aio_context_list_lock.
55  */
56 static uint32_t orphaned_reader_count;
57 
58 /* Queue of readers waiting for the writer to finish */
59 static CoQueue reader_queue;
60 
61 struct BdrvGraphRWlock {
62     /* How many readers are currently reading the graph. */
63     uint32_t reader_count;
64 
65     /*
66      * List of BdrvGraphRWlock kept in graph-lock.c
67      * Protected by aio_context_list_lock
68      */
69     QTAILQ_ENTRY(BdrvGraphRWlock) next_aio;
70 };
71 
72 /*
73  * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
74  * can safely modify only its own counter, avoid reading/writing
75  * others and thus improving performances by avoiding cacheline bounces.
76  */
77 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list =
78     QTAILQ_HEAD_INITIALIZER(aio_context_list);
79 
bdrv_init_graph_lock(void)80 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void)
81 {
82     qemu_mutex_init(&aio_context_list_lock);
83     qemu_co_queue_init(&reader_queue);
84 }
85 
register_aiocontext(AioContext * ctx)86 void register_aiocontext(AioContext *ctx)
87 {
88     ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1);
89     QEMU_LOCK_GUARD(&aio_context_list_lock);
90     assert(ctx->bdrv_graph->reader_count == 0);
91     QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio);
92 }
93 
unregister_aiocontext(AioContext * ctx)94 void unregister_aiocontext(AioContext *ctx)
95 {
96     QEMU_LOCK_GUARD(&aio_context_list_lock);
97     orphaned_reader_count += ctx->bdrv_graph->reader_count;
98     QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio);
99     g_free(ctx->bdrv_graph);
100 }
101 
reader_count(void)102 static uint32_t reader_count(void)
103 {
104     BdrvGraphRWlock *brdv_graph;
105     uint32_t rd;
106 
107     QEMU_LOCK_GUARD(&aio_context_list_lock);
108 
109     /* rd can temporarily be negative, but the total will *always* be >= 0 */
110     rd = orphaned_reader_count;
111     QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) {
112         rd += qatomic_read(&brdv_graph->reader_count);
113     }
114 
115     /* shouldn't overflow unless there are 2^31 readers */
116     assert((int32_t)rd >= 0);
117     return rd;
118 }
119 
bdrv_graph_wrlock(void)120 void no_coroutine_fn bdrv_graph_wrlock(void)
121 {
122     GLOBAL_STATE_CODE();
123     assert(!qatomic_read(&has_writer));
124     assert(!qemu_in_coroutine());
125 
126     bool need_drain = wrlock_quiesced_counter == 0;
127 
128     if (need_drain) {
129         /*
130          * Make sure that constantly arriving new I/O doesn't cause starvation
131          */
132         bdrv_drain_all_begin_nopoll();
133     }
134 
135     /*
136      * reader_count == 0: this means writer will read has_reader as 1
137      * reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
138      *                    but we need to wait.
139      * Wait by allowing other coroutine (and possible readers) to continue.
140      */
141     do {
142         /*
143          * has_writer must be 0 while polling, otherwise we get a deadlock if
144          * any callback involved during AIO_WAIT_WHILE() tries to acquire the
145          * reader lock.
146          */
147         qatomic_set(&has_writer, 0);
148         AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1);
149         qatomic_set(&has_writer, 1);
150 
151         /*
152          * We want to only check reader_count() after has_writer = 1 is visible
153          * to other threads. That way no more readers can sneak in after we've
154          * determined reader_count() == 0.
155          */
156         smp_mb();
157     } while (reader_count() >= 1);
158 
159     if (need_drain) {
160         bdrv_drain_all_end();
161     }
162 }
163 
bdrv_graph_wrlock_drained(void)164 void no_coroutine_fn bdrv_graph_wrlock_drained(void)
165 {
166     GLOBAL_STATE_CODE();
167 
168     bdrv_drain_all_begin();
169     wrlock_quiesced_counter++;
170     bdrv_graph_wrlock();
171 }
172 
bdrv_graph_wrunlock(void)173 void no_coroutine_fn bdrv_graph_wrunlock(void)
174 {
175     GLOBAL_STATE_CODE();
176     assert(qatomic_read(&has_writer));
177 
178     WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
179         /*
180          * No need for memory barriers, this works in pair with
181          * the slow path of rdlock() and both take the lock.
182          */
183         qatomic_store_release(&has_writer, 0);
184 
185         /* Wake up all coroutines that are waiting to read the graph */
186         qemu_co_enter_all(&reader_queue, &aio_context_list_lock);
187     }
188 
189     /*
190      * Run any BHs that were scheduled during the wrlock section and that
191      * callers might expect to have finished (in particular, this is important
192      * for bdrv_schedule_unref()).
193      *
194      * Do this only after restarting coroutines so that nested event loops in
195      * BHs don't deadlock if their condition relies on the coroutine making
196      * progress.
197      */
198     aio_bh_poll(qemu_get_aio_context());
199 
200     if (wrlock_quiesced_counter > 0) {
201         bdrv_drain_all_end();
202         wrlock_quiesced_counter--;
203     }
204 
205 }
206 
bdrv_graph_co_rdlock(void)207 void coroutine_fn bdrv_graph_co_rdlock(void)
208 {
209     BdrvGraphRWlock *bdrv_graph;
210     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
211 
212     for (;;) {
213         qatomic_set(&bdrv_graph->reader_count,
214                     bdrv_graph->reader_count + 1);
215         /* make sure writer sees reader_count before we check has_writer */
216         smp_mb();
217 
218         /*
219          * has_writer == 0: this means writer will read reader_count as >= 1
220          * has_writer == 1: we don't know if writer read reader_count == 0
221          *                  or > 0, but we need to wait anyways because
222          *                  it will write.
223          */
224         if (!qatomic_read(&has_writer)) {
225             break;
226         }
227 
228         /*
229          * Synchronize access with reader_count() in bdrv_graph_wrlock().
230          * Case 1:
231          * If this critical section gets executed first, reader_count will
232          * decrease and the reader will go to sleep.
233          * Then the writer will read reader_count that does not take into
234          * account this reader, and if there's no other reader it will
235          * enter the write section.
236          * Case 2:
237          * If reader_count() critical section gets executed first,
238          * then writer will read reader_count >= 1.
239          * It will wait in AIO_WAIT_WHILE(), but once it releases the lock
240          * we will enter this critical section and call aio_wait_kick().
241          */
242         WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
243             /*
244              * Additional check when we use the above lock to synchronize
245              * with bdrv_graph_wrunlock().
246              * Case 1:
247              * If this gets executed first, has_writer is still 1, so we reduce
248              * reader_count and go to sleep.
249              * Then the writer will set has_writer to 0 and wake up all readers,
250              * us included.
251              * Case 2:
252              * If bdrv_graph_wrunlock() critical section gets executed first,
253              * then it will set has_writer to 0 and wake up all other readers.
254              * Then we execute this critical section, and therefore must check
255              * again for has_writer, otherwise we sleep without any writer
256              * actually running.
257              */
258             if (!qatomic_read(&has_writer)) {
259                 return;
260             }
261 
262             /* slow path where reader sleeps */
263             bdrv_graph->reader_count--;
264             aio_wait_kick();
265             qemu_co_queue_wait(&reader_queue, &aio_context_list_lock);
266         }
267     }
268 }
269 
bdrv_graph_co_rdunlock(void)270 void coroutine_fn bdrv_graph_co_rdunlock(void)
271 {
272     BdrvGraphRWlock *bdrv_graph;
273     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
274 
275     qatomic_store_release(&bdrv_graph->reader_count,
276                           bdrv_graph->reader_count - 1);
277     /* make sure writer sees reader_count before we check has_writer */
278     smp_mb();
279 
280     /*
281      * has_writer == 0: this means reader will read reader_count decreased
282      * has_writer == 1: we don't know if writer read reader_count old or
283      *                  new. Therefore, kick again so on next iteration
284      *                  writer will for sure read the updated value.
285      */
286     if (qatomic_read(&has_writer)) {
287         aio_wait_kick();
288     }
289 }
290 
bdrv_graph_rdlock_main_loop(void)291 void bdrv_graph_rdlock_main_loop(void)
292 {
293     GLOBAL_STATE_CODE();
294     assert(!qemu_in_coroutine());
295 }
296 
bdrv_graph_rdunlock_main_loop(void)297 void bdrv_graph_rdunlock_main_loop(void)
298 {
299     GLOBAL_STATE_CODE();
300     assert(!qemu_in_coroutine());
301 }
302 
assert_bdrv_graph_readable(void)303 void assert_bdrv_graph_readable(void)
304 {
305     /* reader_count() is slow due to aio_context_list_lock lock contention */
306 #ifdef CONFIG_DEBUG_GRAPH_LOCK
307     assert(qemu_in_main_thread() || reader_count());
308 #endif
309 }
310 
assert_bdrv_graph_writable(void)311 void assert_bdrv_graph_writable(void)
312 {
313     assert(qemu_in_main_thread());
314     assert(qatomic_read(&has_writer));
315 }
316