xref: /openbmc/qemu/block/graph-lock.c (revision c1774bdb)
1 /*
2  * Graph lock: rwlock to protect block layer graph manipulations (add/remove
3  * edges and nodes)
4  *
5  *  Copyright (c) 2022 Red Hat
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "qemu/osdep.h"
22 #include "qemu/main-loop.h"
23 #include "block/graph-lock.h"
24 #include "block/block.h"
25 #include "block/block_int.h"
26 
27 /* Dummy lock object to use for Thread Safety Analysis (TSA) */
28 BdrvGraphLock graph_lock;
29 
30 /* Protects the list of aiocontext and orphaned_reader_count */
31 static QemuMutex aio_context_list_lock;
32 
33 /* Written and read with atomic operations. */
34 static int has_writer;
35 
36 /*
37  * A reader coroutine could move from an AioContext to another.
38  * If this happens, there is no problem from the point of view of
39  * counters. The problem is that the total count becomes
40  * unbalanced if one of the two AioContexts gets deleted.
41  * The count of readers must remain correct, so the AioContext's
42  * balance is transferred to this glboal variable.
43  * Protected by aio_context_list_lock.
44  */
45 static uint32_t orphaned_reader_count;
46 
47 /* Queue of readers waiting for the writer to finish */
48 static CoQueue reader_queue;
49 
50 struct BdrvGraphRWlock {
51     /* How many readers are currently reading the graph. */
52     uint32_t reader_count;
53 
54     /*
55      * List of BdrvGraphRWlock kept in graph-lock.c
56      * Protected by aio_context_list_lock
57      */
58     QTAILQ_ENTRY(BdrvGraphRWlock) next_aio;
59 };
60 
61 /*
62  * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
63  * can safely modify only its own counter, avoid reading/writing
64  * others and thus improving performances by avoiding cacheline bounces.
65  */
66 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list =
67     QTAILQ_HEAD_INITIALIZER(aio_context_list);
68 
69 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void)
70 {
71     qemu_mutex_init(&aio_context_list_lock);
72     qemu_co_queue_init(&reader_queue);
73 }
74 
75 void register_aiocontext(AioContext *ctx)
76 {
77     ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1);
78     QEMU_LOCK_GUARD(&aio_context_list_lock);
79     assert(ctx->bdrv_graph->reader_count == 0);
80     QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio);
81 }
82 
83 void unregister_aiocontext(AioContext *ctx)
84 {
85     QEMU_LOCK_GUARD(&aio_context_list_lock);
86     orphaned_reader_count += ctx->bdrv_graph->reader_count;
87     QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio);
88     g_free(ctx->bdrv_graph);
89 }
90 
91 static uint32_t reader_count(void)
92 {
93     BdrvGraphRWlock *brdv_graph;
94     uint32_t rd;
95 
96     QEMU_LOCK_GUARD(&aio_context_list_lock);
97 
98     /* rd can temporarily be negative, but the total will *always* be >= 0 */
99     rd = orphaned_reader_count;
100     QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) {
101         rd += qatomic_read(&brdv_graph->reader_count);
102     }
103 
104     /* shouldn't overflow unless there are 2^31 readers */
105     assert((int32_t)rd >= 0);
106     return rd;
107 }
108 
109 void bdrv_graph_wrlock(BlockDriverState *bs)
110 {
111     AioContext *ctx = NULL;
112 
113     GLOBAL_STATE_CODE();
114     assert(!qatomic_read(&has_writer));
115 
116     /*
117      * Release only non-mainloop AioContext. The mainloop often relies on the
118      * BQL and doesn't lock the main AioContext before doing things.
119      */
120     if (bs) {
121         ctx = bdrv_get_aio_context(bs);
122         if (ctx != qemu_get_aio_context()) {
123             aio_context_release(ctx);
124         } else {
125             ctx = NULL;
126         }
127     }
128 
129     /* Make sure that constantly arriving new I/O doesn't cause starvation */
130     bdrv_drain_all_begin_nopoll();
131 
132     /*
133      * reader_count == 0: this means writer will read has_reader as 1
134      * reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
135      *                    but we need to wait.
136      * Wait by allowing other coroutine (and possible readers) to continue.
137      */
138     do {
139         /*
140          * has_writer must be 0 while polling, otherwise we get a deadlock if
141          * any callback involved during AIO_WAIT_WHILE() tries to acquire the
142          * reader lock.
143          */
144         qatomic_set(&has_writer, 0);
145         AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1);
146         qatomic_set(&has_writer, 1);
147 
148         /*
149          * We want to only check reader_count() after has_writer = 1 is visible
150          * to other threads. That way no more readers can sneak in after we've
151          * determined reader_count() == 0.
152          */
153         smp_mb();
154     } while (reader_count() >= 1);
155 
156     bdrv_drain_all_end();
157 
158     if (ctx) {
159         aio_context_acquire(bdrv_get_aio_context(bs));
160     }
161 }
162 
163 void bdrv_graph_wrunlock(void)
164 {
165     GLOBAL_STATE_CODE();
166     assert(qatomic_read(&has_writer));
167 
168     WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
169         /*
170          * No need for memory barriers, this works in pair with
171          * the slow path of rdlock() and both take the lock.
172          */
173         qatomic_store_release(&has_writer, 0);
174 
175         /* Wake up all coroutines that are waiting to read the graph */
176         qemu_co_enter_all(&reader_queue, &aio_context_list_lock);
177     }
178 
179     /*
180      * Run any BHs that were scheduled during the wrlock section and that
181      * callers might expect to have finished (in particular, this is important
182      * for bdrv_schedule_unref()).
183      *
184      * Do this only after restarting coroutines so that nested event loops in
185      * BHs don't deadlock if their condition relies on the coroutine making
186      * progress.
187      */
188     aio_bh_poll(qemu_get_aio_context());
189 }
190 
191 void coroutine_fn bdrv_graph_co_rdlock(void)
192 {
193     BdrvGraphRWlock *bdrv_graph;
194     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
195 
196     for (;;) {
197         qatomic_set(&bdrv_graph->reader_count,
198                     bdrv_graph->reader_count + 1);
199         /* make sure writer sees reader_count before we check has_writer */
200         smp_mb();
201 
202         /*
203          * has_writer == 0: this means writer will read reader_count as >= 1
204          * has_writer == 1: we don't know if writer read reader_count == 0
205          *                  or > 0, but we need to wait anyways because
206          *                  it will write.
207          */
208         if (!qatomic_read(&has_writer)) {
209             break;
210         }
211 
212         /*
213          * Synchronize access with reader_count() in bdrv_graph_wrlock().
214          * Case 1:
215          * If this critical section gets executed first, reader_count will
216          * decrease and the reader will go to sleep.
217          * Then the writer will read reader_count that does not take into
218          * account this reader, and if there's no other reader it will
219          * enter the write section.
220          * Case 2:
221          * If reader_count() critical section gets executed first,
222          * then writer will read reader_count >= 1.
223          * It will wait in AIO_WAIT_WHILE(), but once it releases the lock
224          * we will enter this critical section and call aio_wait_kick().
225          */
226         WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
227             /*
228              * Additional check when we use the above lock to synchronize
229              * with bdrv_graph_wrunlock().
230              * Case 1:
231              * If this gets executed first, has_writer is still 1, so we reduce
232              * reader_count and go to sleep.
233              * Then the writer will set has_writer to 0 and wake up all readers,
234              * us included.
235              * Case 2:
236              * If bdrv_graph_wrunlock() critical section gets executed first,
237              * then it will set has_writer to 0 and wake up all other readers.
238              * Then we execute this critical section, and therefore must check
239              * again for has_writer, otherwise we sleep without any writer
240              * actually running.
241              */
242             if (!qatomic_read(&has_writer)) {
243                 return;
244             }
245 
246             /* slow path where reader sleeps */
247             bdrv_graph->reader_count--;
248             aio_wait_kick();
249             qemu_co_queue_wait(&reader_queue, &aio_context_list_lock);
250         }
251     }
252 }
253 
254 void coroutine_fn bdrv_graph_co_rdunlock(void)
255 {
256     BdrvGraphRWlock *bdrv_graph;
257     bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
258 
259     qatomic_store_release(&bdrv_graph->reader_count,
260                           bdrv_graph->reader_count - 1);
261     /* make sure writer sees reader_count before we check has_writer */
262     smp_mb();
263 
264     /*
265      * has_writer == 0: this means reader will read reader_count decreased
266      * has_writer == 1: we don't know if writer read reader_count old or
267      *                  new. Therefore, kick again so on next iteration
268      *                  writer will for sure read the updated value.
269      */
270     if (qatomic_read(&has_writer)) {
271         aio_wait_kick();
272     }
273 }
274 
275 void bdrv_graph_rdlock_main_loop(void)
276 {
277     GLOBAL_STATE_CODE();
278     assert(!qemu_in_coroutine());
279 }
280 
281 void bdrv_graph_rdunlock_main_loop(void)
282 {
283     GLOBAL_STATE_CODE();
284     assert(!qemu_in_coroutine());
285 }
286 
287 void assert_bdrv_graph_readable(void)
288 {
289     /* reader_count() is slow due to aio_context_list_lock lock contention */
290 #ifdef CONFIG_DEBUG_GRAPH_LOCK
291     assert(qemu_in_main_thread() || reader_count());
292 #endif
293 }
294 
295 void assert_bdrv_graph_writable(void)
296 {
297     assert(qemu_in_main_thread());
298     assert(qatomic_read(&has_writer));
299 }
300