1 /* 2 * Graph lock: rwlock to protect block layer graph manipulations (add/remove 3 * edges and nodes) 4 * 5 * Copyright (c) 2022 Red Hat 6 * 7 * This library is free software; you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public 9 * License as published by the Free Software Foundation; either 10 * version 2.1 of the License, or (at your option) any later version. 11 * 12 * This library is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 19 */ 20 21 #include "qemu/osdep.h" 22 #include "qemu/main-loop.h" 23 #include "block/graph-lock.h" 24 #include "block/block.h" 25 #include "block/block_int.h" 26 27 /* Dummy lock object to use for Thread Safety Analysis (TSA) */ 28 BdrvGraphLock graph_lock; 29 30 /* Protects the list of aiocontext and orphaned_reader_count */ 31 static QemuMutex aio_context_list_lock; 32 33 /* Written and read with atomic operations. */ 34 static int has_writer; 35 36 /* 37 * Many write-locked sections are also drained sections. There is a convenience 38 * wrapper bdrv_graph_wrlock_drained() which begins a drained section before 39 * acquiring the lock. This variable here is used so bdrv_graph_wrunlock() knows 40 * if it also needs to end such a drained section. It needs to be a counter, 41 * because the aio_poll() call in bdrv_graph_wrlock() might re-enter 42 * bdrv_graph_wrlock_drained(). And note that aio_bh_poll() in 43 * bdrv_graph_wrunlock() might also re-enter a write-locked section. 44 */ 45 static int wrlock_quiesced_counter; 46 47 /* 48 * A reader coroutine could move from an AioContext to another. 49 * If this happens, there is no problem from the point of view of 50 * counters. The problem is that the total count becomes 51 * unbalanced if one of the two AioContexts gets deleted. 52 * The count of readers must remain correct, so the AioContext's 53 * balance is transferred to this glboal variable. 54 * Protected by aio_context_list_lock. 55 */ 56 static uint32_t orphaned_reader_count; 57 58 /* Queue of readers waiting for the writer to finish */ 59 static CoQueue reader_queue; 60 61 struct BdrvGraphRWlock { 62 /* How many readers are currently reading the graph. */ 63 uint32_t reader_count; 64 65 /* 66 * List of BdrvGraphRWlock kept in graph-lock.c 67 * Protected by aio_context_list_lock 68 */ 69 QTAILQ_ENTRY(BdrvGraphRWlock) next_aio; 70 }; 71 72 /* 73 * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock 74 * can safely modify only its own counter, avoid reading/writing 75 * others and thus improving performances by avoiding cacheline bounces. 76 */ 77 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list = 78 QTAILQ_HEAD_INITIALIZER(aio_context_list); 79 80 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void) 81 { 82 qemu_mutex_init(&aio_context_list_lock); 83 qemu_co_queue_init(&reader_queue); 84 } 85 86 void register_aiocontext(AioContext *ctx) 87 { 88 ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1); 89 QEMU_LOCK_GUARD(&aio_context_list_lock); 90 assert(ctx->bdrv_graph->reader_count == 0); 91 QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio); 92 } 93 94 void unregister_aiocontext(AioContext *ctx) 95 { 96 QEMU_LOCK_GUARD(&aio_context_list_lock); 97 orphaned_reader_count += ctx->bdrv_graph->reader_count; 98 QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio); 99 g_free(ctx->bdrv_graph); 100 } 101 102 static uint32_t reader_count(void) 103 { 104 BdrvGraphRWlock *brdv_graph; 105 uint32_t rd; 106 107 QEMU_LOCK_GUARD(&aio_context_list_lock); 108 109 /* rd can temporarily be negative, but the total will *always* be >= 0 */ 110 rd = orphaned_reader_count; 111 QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) { 112 rd += qatomic_read(&brdv_graph->reader_count); 113 } 114 115 /* shouldn't overflow unless there are 2^31 readers */ 116 assert((int32_t)rd >= 0); 117 return rd; 118 } 119 120 void no_coroutine_fn bdrv_graph_wrlock(void) 121 { 122 GLOBAL_STATE_CODE(); 123 assert(!qatomic_read(&has_writer)); 124 assert(!qemu_in_coroutine()); 125 126 bool need_drain = wrlock_quiesced_counter == 0; 127 128 if (need_drain) { 129 /* 130 * Make sure that constantly arriving new I/O doesn't cause starvation 131 */ 132 bdrv_drain_all_begin_nopoll(); 133 } 134 135 /* 136 * reader_count == 0: this means writer will read has_reader as 1 137 * reader_count >= 1: we don't know if writer read has_writer == 0 or 1, 138 * but we need to wait. 139 * Wait by allowing other coroutine (and possible readers) to continue. 140 */ 141 do { 142 /* 143 * has_writer must be 0 while polling, otherwise we get a deadlock if 144 * any callback involved during AIO_WAIT_WHILE() tries to acquire the 145 * reader lock. 146 */ 147 qatomic_set(&has_writer, 0); 148 AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1); 149 qatomic_set(&has_writer, 1); 150 151 /* 152 * We want to only check reader_count() after has_writer = 1 is visible 153 * to other threads. That way no more readers can sneak in after we've 154 * determined reader_count() == 0. 155 */ 156 smp_mb(); 157 } while (reader_count() >= 1); 158 159 if (need_drain) { 160 bdrv_drain_all_end(); 161 } 162 } 163 164 void no_coroutine_fn bdrv_graph_wrlock_drained(void) 165 { 166 GLOBAL_STATE_CODE(); 167 168 bdrv_drain_all_begin(); 169 wrlock_quiesced_counter++; 170 bdrv_graph_wrlock(); 171 } 172 173 void no_coroutine_fn bdrv_graph_wrunlock(void) 174 { 175 GLOBAL_STATE_CODE(); 176 assert(qatomic_read(&has_writer)); 177 178 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { 179 /* 180 * No need for memory barriers, this works in pair with 181 * the slow path of rdlock() and both take the lock. 182 */ 183 qatomic_store_release(&has_writer, 0); 184 185 /* Wake up all coroutines that are waiting to read the graph */ 186 qemu_co_enter_all(&reader_queue, &aio_context_list_lock); 187 } 188 189 /* 190 * Run any BHs that were scheduled during the wrlock section and that 191 * callers might expect to have finished (in particular, this is important 192 * for bdrv_schedule_unref()). 193 * 194 * Do this only after restarting coroutines so that nested event loops in 195 * BHs don't deadlock if their condition relies on the coroutine making 196 * progress. 197 */ 198 aio_bh_poll(qemu_get_aio_context()); 199 200 if (wrlock_quiesced_counter > 0) { 201 bdrv_drain_all_end(); 202 wrlock_quiesced_counter--; 203 } 204 205 } 206 207 void coroutine_fn bdrv_graph_co_rdlock(void) 208 { 209 BdrvGraphRWlock *bdrv_graph; 210 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 211 212 for (;;) { 213 qatomic_set(&bdrv_graph->reader_count, 214 bdrv_graph->reader_count + 1); 215 /* make sure writer sees reader_count before we check has_writer */ 216 smp_mb(); 217 218 /* 219 * has_writer == 0: this means writer will read reader_count as >= 1 220 * has_writer == 1: we don't know if writer read reader_count == 0 221 * or > 0, but we need to wait anyways because 222 * it will write. 223 */ 224 if (!qatomic_read(&has_writer)) { 225 break; 226 } 227 228 /* 229 * Synchronize access with reader_count() in bdrv_graph_wrlock(). 230 * Case 1: 231 * If this critical section gets executed first, reader_count will 232 * decrease and the reader will go to sleep. 233 * Then the writer will read reader_count that does not take into 234 * account this reader, and if there's no other reader it will 235 * enter the write section. 236 * Case 2: 237 * If reader_count() critical section gets executed first, 238 * then writer will read reader_count >= 1. 239 * It will wait in AIO_WAIT_WHILE(), but once it releases the lock 240 * we will enter this critical section and call aio_wait_kick(). 241 */ 242 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { 243 /* 244 * Additional check when we use the above lock to synchronize 245 * with bdrv_graph_wrunlock(). 246 * Case 1: 247 * If this gets executed first, has_writer is still 1, so we reduce 248 * reader_count and go to sleep. 249 * Then the writer will set has_writer to 0 and wake up all readers, 250 * us included. 251 * Case 2: 252 * If bdrv_graph_wrunlock() critical section gets executed first, 253 * then it will set has_writer to 0 and wake up all other readers. 254 * Then we execute this critical section, and therefore must check 255 * again for has_writer, otherwise we sleep without any writer 256 * actually running. 257 */ 258 if (!qatomic_read(&has_writer)) { 259 return; 260 } 261 262 /* slow path where reader sleeps */ 263 bdrv_graph->reader_count--; 264 aio_wait_kick(); 265 qemu_co_queue_wait(&reader_queue, &aio_context_list_lock); 266 } 267 } 268 } 269 270 void coroutine_fn bdrv_graph_co_rdunlock(void) 271 { 272 BdrvGraphRWlock *bdrv_graph; 273 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 274 275 qatomic_store_release(&bdrv_graph->reader_count, 276 bdrv_graph->reader_count - 1); 277 /* make sure writer sees reader_count before we check has_writer */ 278 smp_mb(); 279 280 /* 281 * has_writer == 0: this means reader will read reader_count decreased 282 * has_writer == 1: we don't know if writer read reader_count old or 283 * new. Therefore, kick again so on next iteration 284 * writer will for sure read the updated value. 285 */ 286 if (qatomic_read(&has_writer)) { 287 aio_wait_kick(); 288 } 289 } 290 291 void bdrv_graph_rdlock_main_loop(void) 292 { 293 GLOBAL_STATE_CODE(); 294 assert(!qemu_in_coroutine()); 295 } 296 297 void bdrv_graph_rdunlock_main_loop(void) 298 { 299 GLOBAL_STATE_CODE(); 300 assert(!qemu_in_coroutine()); 301 } 302 303 void assert_bdrv_graph_readable(void) 304 { 305 /* reader_count() is slow due to aio_context_list_lock lock contention */ 306 #ifdef CONFIG_DEBUG_GRAPH_LOCK 307 assert(qemu_in_main_thread() || reader_count()); 308 #endif 309 } 310 311 void assert_bdrv_graph_writable(void) 312 { 313 assert(qemu_in_main_thread()); 314 assert(qatomic_read(&has_writer)); 315 } 316