1 /* 2 * Graph lock: rwlock to protect block layer graph manipulations (add/remove 3 * edges and nodes) 4 * 5 * Copyright (c) 2022 Red Hat 6 * 7 * This library is free software; you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public 9 * License as published by the Free Software Foundation; either 10 * version 2.1 of the License, or (at your option) any later version. 11 * 12 * This library is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 19 */ 20 21 #include "qemu/osdep.h" 22 #include "qemu/main-loop.h" 23 #include "block/graph-lock.h" 24 #include "block/block.h" 25 #include "block/block_int.h" 26 27 /* Dummy lock object to use for Thread Safety Analysis (TSA) */ 28 BdrvGraphLock graph_lock; 29 30 /* Protects the list of aiocontext and orphaned_reader_count */ 31 static QemuMutex aio_context_list_lock; 32 33 /* Written and read with atomic operations. */ 34 static int has_writer; 35 36 /* 37 * A reader coroutine could move from an AioContext to another. 38 * If this happens, there is no problem from the point of view of 39 * counters. The problem is that the total count becomes 40 * unbalanced if one of the two AioContexts gets deleted. 41 * The count of readers must remain correct, so the AioContext's 42 * balance is transferred to this glboal variable. 43 * Protected by aio_context_list_lock. 44 */ 45 static uint32_t orphaned_reader_count; 46 47 /* Queue of readers waiting for the writer to finish */ 48 static CoQueue reader_queue; 49 50 struct BdrvGraphRWlock { 51 /* How many readers are currently reading the graph. */ 52 uint32_t reader_count; 53 54 /* 55 * List of BdrvGraphRWlock kept in graph-lock.c 56 * Protected by aio_context_list_lock 57 */ 58 QTAILQ_ENTRY(BdrvGraphRWlock) next_aio; 59 }; 60 61 /* 62 * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock 63 * can safely modify only its own counter, avoid reading/writing 64 * others and thus improving performances by avoiding cacheline bounces. 65 */ 66 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list = 67 QTAILQ_HEAD_INITIALIZER(aio_context_list); 68 69 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void) 70 { 71 qemu_mutex_init(&aio_context_list_lock); 72 qemu_co_queue_init(&reader_queue); 73 } 74 75 void register_aiocontext(AioContext *ctx) 76 { 77 ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1); 78 QEMU_LOCK_GUARD(&aio_context_list_lock); 79 assert(ctx->bdrv_graph->reader_count == 0); 80 QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio); 81 } 82 83 void unregister_aiocontext(AioContext *ctx) 84 { 85 QEMU_LOCK_GUARD(&aio_context_list_lock); 86 orphaned_reader_count += ctx->bdrv_graph->reader_count; 87 QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio); 88 g_free(ctx->bdrv_graph); 89 } 90 91 static uint32_t reader_count(void) 92 { 93 BdrvGraphRWlock *brdv_graph; 94 uint32_t rd; 95 96 QEMU_LOCK_GUARD(&aio_context_list_lock); 97 98 /* rd can temporarily be negative, but the total will *always* be >= 0 */ 99 rd = orphaned_reader_count; 100 QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) { 101 rd += qatomic_read(&brdv_graph->reader_count); 102 } 103 104 /* shouldn't overflow unless there are 2^31 readers */ 105 assert((int32_t)rd >= 0); 106 return rd; 107 } 108 109 void no_coroutine_fn bdrv_graph_wrlock(BlockDriverState *bs) 110 { 111 AioContext *ctx = NULL; 112 113 GLOBAL_STATE_CODE(); 114 assert(!qatomic_read(&has_writer)); 115 assert(!qemu_in_coroutine()); 116 117 /* 118 * Release only non-mainloop AioContext. The mainloop often relies on the 119 * BQL and doesn't lock the main AioContext before doing things. 120 */ 121 if (bs) { 122 ctx = bdrv_get_aio_context(bs); 123 if (ctx != qemu_get_aio_context()) { 124 aio_context_release(ctx); 125 } else { 126 ctx = NULL; 127 } 128 } 129 130 /* Make sure that constantly arriving new I/O doesn't cause starvation */ 131 bdrv_drain_all_begin_nopoll(); 132 133 /* 134 * reader_count == 0: this means writer will read has_reader as 1 135 * reader_count >= 1: we don't know if writer read has_writer == 0 or 1, 136 * but we need to wait. 137 * Wait by allowing other coroutine (and possible readers) to continue. 138 */ 139 do { 140 /* 141 * has_writer must be 0 while polling, otherwise we get a deadlock if 142 * any callback involved during AIO_WAIT_WHILE() tries to acquire the 143 * reader lock. 144 */ 145 qatomic_set(&has_writer, 0); 146 AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1); 147 qatomic_set(&has_writer, 1); 148 149 /* 150 * We want to only check reader_count() after has_writer = 1 is visible 151 * to other threads. That way no more readers can sneak in after we've 152 * determined reader_count() == 0. 153 */ 154 smp_mb(); 155 } while (reader_count() >= 1); 156 157 bdrv_drain_all_end(); 158 159 if (ctx) { 160 aio_context_acquire(bdrv_get_aio_context(bs)); 161 } 162 } 163 164 void no_coroutine_fn bdrv_graph_wrunlock_ctx(AioContext *ctx) 165 { 166 GLOBAL_STATE_CODE(); 167 assert(qatomic_read(&has_writer)); 168 169 /* 170 * Release only non-mainloop AioContext. The mainloop often relies on the 171 * BQL and doesn't lock the main AioContext before doing things. 172 */ 173 if (ctx && ctx != qemu_get_aio_context()) { 174 aio_context_release(ctx); 175 } else { 176 ctx = NULL; 177 } 178 179 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { 180 /* 181 * No need for memory barriers, this works in pair with 182 * the slow path of rdlock() and both take the lock. 183 */ 184 qatomic_store_release(&has_writer, 0); 185 186 /* Wake up all coroutines that are waiting to read the graph */ 187 qemu_co_enter_all(&reader_queue, &aio_context_list_lock); 188 } 189 190 /* 191 * Run any BHs that were scheduled during the wrlock section and that 192 * callers might expect to have finished (in particular, this is important 193 * for bdrv_schedule_unref()). 194 * 195 * Do this only after restarting coroutines so that nested event loops in 196 * BHs don't deadlock if their condition relies on the coroutine making 197 * progress. 198 */ 199 aio_bh_poll(qemu_get_aio_context()); 200 201 if (ctx) { 202 aio_context_acquire(ctx); 203 } 204 } 205 206 void no_coroutine_fn bdrv_graph_wrunlock(BlockDriverState *bs) 207 { 208 AioContext *ctx = bs ? bdrv_get_aio_context(bs) : NULL; 209 210 bdrv_graph_wrunlock_ctx(ctx); 211 } 212 213 void coroutine_fn bdrv_graph_co_rdlock(void) 214 { 215 BdrvGraphRWlock *bdrv_graph; 216 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 217 218 for (;;) { 219 qatomic_set(&bdrv_graph->reader_count, 220 bdrv_graph->reader_count + 1); 221 /* make sure writer sees reader_count before we check has_writer */ 222 smp_mb(); 223 224 /* 225 * has_writer == 0: this means writer will read reader_count as >= 1 226 * has_writer == 1: we don't know if writer read reader_count == 0 227 * or > 0, but we need to wait anyways because 228 * it will write. 229 */ 230 if (!qatomic_read(&has_writer)) { 231 break; 232 } 233 234 /* 235 * Synchronize access with reader_count() in bdrv_graph_wrlock(). 236 * Case 1: 237 * If this critical section gets executed first, reader_count will 238 * decrease and the reader will go to sleep. 239 * Then the writer will read reader_count that does not take into 240 * account this reader, and if there's no other reader it will 241 * enter the write section. 242 * Case 2: 243 * If reader_count() critical section gets executed first, 244 * then writer will read reader_count >= 1. 245 * It will wait in AIO_WAIT_WHILE(), but once it releases the lock 246 * we will enter this critical section and call aio_wait_kick(). 247 */ 248 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { 249 /* 250 * Additional check when we use the above lock to synchronize 251 * with bdrv_graph_wrunlock(). 252 * Case 1: 253 * If this gets executed first, has_writer is still 1, so we reduce 254 * reader_count and go to sleep. 255 * Then the writer will set has_writer to 0 and wake up all readers, 256 * us included. 257 * Case 2: 258 * If bdrv_graph_wrunlock() critical section gets executed first, 259 * then it will set has_writer to 0 and wake up all other readers. 260 * Then we execute this critical section, and therefore must check 261 * again for has_writer, otherwise we sleep without any writer 262 * actually running. 263 */ 264 if (!qatomic_read(&has_writer)) { 265 return; 266 } 267 268 /* slow path where reader sleeps */ 269 bdrv_graph->reader_count--; 270 aio_wait_kick(); 271 qemu_co_queue_wait(&reader_queue, &aio_context_list_lock); 272 } 273 } 274 } 275 276 void coroutine_fn bdrv_graph_co_rdunlock(void) 277 { 278 BdrvGraphRWlock *bdrv_graph; 279 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 280 281 qatomic_store_release(&bdrv_graph->reader_count, 282 bdrv_graph->reader_count - 1); 283 /* make sure writer sees reader_count before we check has_writer */ 284 smp_mb(); 285 286 /* 287 * has_writer == 0: this means reader will read reader_count decreased 288 * has_writer == 1: we don't know if writer read reader_count old or 289 * new. Therefore, kick again so on next iteration 290 * writer will for sure read the updated value. 291 */ 292 if (qatomic_read(&has_writer)) { 293 aio_wait_kick(); 294 } 295 } 296 297 void bdrv_graph_rdlock_main_loop(void) 298 { 299 GLOBAL_STATE_CODE(); 300 assert(!qemu_in_coroutine()); 301 } 302 303 void bdrv_graph_rdunlock_main_loop(void) 304 { 305 GLOBAL_STATE_CODE(); 306 assert(!qemu_in_coroutine()); 307 } 308 309 void assert_bdrv_graph_readable(void) 310 { 311 /* reader_count() is slow due to aio_context_list_lock lock contention */ 312 #ifdef CONFIG_DEBUG_GRAPH_LOCK 313 assert(qemu_in_main_thread() || reader_count()); 314 #endif 315 } 316 317 void assert_bdrv_graph_writable(void) 318 { 319 assert(qemu_in_main_thread()); 320 assert(qatomic_read(&has_writer)); 321 } 322