1 /* 2 * Graph lock: rwlock to protect block layer graph manipulations (add/remove 3 * edges and nodes) 4 * 5 * Copyright (c) 2022 Red Hat 6 * 7 * This library is free software; you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public 9 * License as published by the Free Software Foundation; either 10 * version 2.1 of the License, or (at your option) any later version. 11 * 12 * This library is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 19 */ 20 21 #include "qemu/osdep.h" 22 #include "qemu/main-loop.h" 23 #include "block/graph-lock.h" 24 #include "block/block.h" 25 #include "block/block_int.h" 26 27 /* Dummy lock object to use for Thread Safety Analysis (TSA) */ 28 BdrvGraphLock graph_lock; 29 30 /* Protects the list of aiocontext and orphaned_reader_count */ 31 static QemuMutex aio_context_list_lock; 32 33 #if 0 34 /* Written and read with atomic operations. */ 35 static int has_writer; 36 #endif 37 38 /* 39 * A reader coroutine could move from an AioContext to another. 40 * If this happens, there is no problem from the point of view of 41 * counters. The problem is that the total count becomes 42 * unbalanced if one of the two AioContexts gets deleted. 43 * The count of readers must remain correct, so the AioContext's 44 * balance is transferred to this glboal variable. 45 * Protected by aio_context_list_lock. 46 */ 47 static uint32_t orphaned_reader_count; 48 49 /* Queue of readers waiting for the writer to finish */ 50 static CoQueue reader_queue; 51 52 struct BdrvGraphRWlock { 53 /* How many readers are currently reading the graph. */ 54 uint32_t reader_count; 55 56 /* 57 * List of BdrvGraphRWlock kept in graph-lock.c 58 * Protected by aio_context_list_lock 59 */ 60 QTAILQ_ENTRY(BdrvGraphRWlock) next_aio; 61 }; 62 63 /* 64 * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock 65 * can safely modify only its own counter, avoid reading/writing 66 * others and thus improving performances by avoiding cacheline bounces. 67 */ 68 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list = 69 QTAILQ_HEAD_INITIALIZER(aio_context_list); 70 71 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void) 72 { 73 qemu_mutex_init(&aio_context_list_lock); 74 qemu_co_queue_init(&reader_queue); 75 } 76 77 void register_aiocontext(AioContext *ctx) 78 { 79 ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1); 80 QEMU_LOCK_GUARD(&aio_context_list_lock); 81 assert(ctx->bdrv_graph->reader_count == 0); 82 QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio); 83 } 84 85 void unregister_aiocontext(AioContext *ctx) 86 { 87 QEMU_LOCK_GUARD(&aio_context_list_lock); 88 orphaned_reader_count += ctx->bdrv_graph->reader_count; 89 QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio); 90 g_free(ctx->bdrv_graph); 91 } 92 93 #if 0 94 static uint32_t reader_count(void) 95 { 96 BdrvGraphRWlock *brdv_graph; 97 uint32_t rd; 98 99 QEMU_LOCK_GUARD(&aio_context_list_lock); 100 101 /* rd can temporarly be negative, but the total will *always* be >= 0 */ 102 rd = orphaned_reader_count; 103 QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) { 104 rd += qatomic_read(&brdv_graph->reader_count); 105 } 106 107 /* shouldn't overflow unless there are 2^31 readers */ 108 assert((int32_t)rd >= 0); 109 return rd; 110 } 111 #endif 112 113 void bdrv_graph_wrlock(void) 114 { 115 GLOBAL_STATE_CODE(); 116 /* 117 * TODO Some callers hold an AioContext lock when this is called, which 118 * causes deadlocks. Reenable once the AioContext locking is cleaned up (or 119 * AioContext locks are gone). 120 */ 121 #if 0 122 assert(!qatomic_read(&has_writer)); 123 124 /* Make sure that constantly arriving new I/O doesn't cause starvation */ 125 bdrv_drain_all_begin_nopoll(); 126 127 /* 128 * reader_count == 0: this means writer will read has_reader as 1 129 * reader_count >= 1: we don't know if writer read has_writer == 0 or 1, 130 * but we need to wait. 131 * Wait by allowing other coroutine (and possible readers) to continue. 132 */ 133 do { 134 /* 135 * has_writer must be 0 while polling, otherwise we get a deadlock if 136 * any callback involved during AIO_WAIT_WHILE() tries to acquire the 137 * reader lock. 138 */ 139 qatomic_set(&has_writer, 0); 140 AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1); 141 qatomic_set(&has_writer, 1); 142 143 /* 144 * We want to only check reader_count() after has_writer = 1 is visible 145 * to other threads. That way no more readers can sneak in after we've 146 * determined reader_count() == 0. 147 */ 148 smp_mb(); 149 } while (reader_count() >= 1); 150 151 bdrv_drain_all_end(); 152 #endif 153 } 154 155 void bdrv_graph_wrunlock(void) 156 { 157 GLOBAL_STATE_CODE(); 158 #if 0 159 QEMU_LOCK_GUARD(&aio_context_list_lock); 160 assert(qatomic_read(&has_writer)); 161 162 /* 163 * No need for memory barriers, this works in pair with 164 * the slow path of rdlock() and both take the lock. 165 */ 166 qatomic_store_release(&has_writer, 0); 167 168 /* Wake up all coroutine that are waiting to read the graph */ 169 qemu_co_enter_all(&reader_queue, &aio_context_list_lock); 170 #endif 171 } 172 173 void coroutine_fn bdrv_graph_co_rdlock(void) 174 { 175 /* TODO Reenable when wrlock is reenabled */ 176 #if 0 177 BdrvGraphRWlock *bdrv_graph; 178 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 179 180 for (;;) { 181 qatomic_set(&bdrv_graph->reader_count, 182 bdrv_graph->reader_count + 1); 183 /* make sure writer sees reader_count before we check has_writer */ 184 smp_mb(); 185 186 /* 187 * has_writer == 0: this means writer will read reader_count as >= 1 188 * has_writer == 1: we don't know if writer read reader_count == 0 189 * or > 0, but we need to wait anyways because 190 * it will write. 191 */ 192 if (!qatomic_read(&has_writer)) { 193 break; 194 } 195 196 /* 197 * Synchronize access with reader_count() in bdrv_graph_wrlock(). 198 * Case 1: 199 * If this critical section gets executed first, reader_count will 200 * decrease and the reader will go to sleep. 201 * Then the writer will read reader_count that does not take into 202 * account this reader, and if there's no other reader it will 203 * enter the write section. 204 * Case 2: 205 * If reader_count() critical section gets executed first, 206 * then writer will read reader_count >= 1. 207 * It will wait in AIO_WAIT_WHILE(), but once it releases the lock 208 * we will enter this critical section and call aio_wait_kick(). 209 */ 210 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { 211 /* 212 * Additional check when we use the above lock to synchronize 213 * with bdrv_graph_wrunlock(). 214 * Case 1: 215 * If this gets executed first, has_writer is still 1, so we reduce 216 * reader_count and go to sleep. 217 * Then the writer will set has_writer to 0 and wake up all readers, 218 * us included. 219 * Case 2: 220 * If bdrv_graph_wrunlock() critical section gets executed first, 221 * then it will set has_writer to 0 and wake up all other readers. 222 * Then we execute this critical section, and therefore must check 223 * again for has_writer, otherwise we sleep without any writer 224 * actually running. 225 */ 226 if (!qatomic_read(&has_writer)) { 227 return; 228 } 229 230 /* slow path where reader sleeps */ 231 bdrv_graph->reader_count--; 232 aio_wait_kick(); 233 qemu_co_queue_wait(&reader_queue, &aio_context_list_lock); 234 } 235 } 236 #endif 237 } 238 239 void coroutine_fn bdrv_graph_co_rdunlock(void) 240 { 241 #if 0 242 BdrvGraphRWlock *bdrv_graph; 243 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 244 245 qatomic_store_release(&bdrv_graph->reader_count, 246 bdrv_graph->reader_count - 1); 247 /* make sure writer sees reader_count before we check has_writer */ 248 smp_mb(); 249 250 /* 251 * has_writer == 0: this means reader will read reader_count decreased 252 * has_writer == 1: we don't know if writer read reader_count old or 253 * new. Therefore, kick again so on next iteration 254 * writer will for sure read the updated value. 255 */ 256 if (qatomic_read(&has_writer)) { 257 aio_wait_kick(); 258 } 259 #endif 260 } 261 262 void bdrv_graph_rdlock_main_loop(void) 263 { 264 GLOBAL_STATE_CODE(); 265 assert(!qemu_in_coroutine()); 266 } 267 268 void bdrv_graph_rdunlock_main_loop(void) 269 { 270 GLOBAL_STATE_CODE(); 271 assert(!qemu_in_coroutine()); 272 } 273 274 void assert_bdrv_graph_readable(void) 275 { 276 /* reader_count() is slow due to aio_context_list_lock lock contention */ 277 /* TODO Reenable when wrlock is reenabled */ 278 #if 0 279 #ifdef CONFIG_DEBUG_GRAPH_LOCK 280 assert(qemu_in_main_thread() || reader_count()); 281 #endif 282 #endif 283 } 284 285 void assert_bdrv_graph_writable(void) 286 { 287 assert(qemu_in_main_thread()); 288 /* TODO Reenable when wrlock is reenabled */ 289 #if 0 290 assert(qatomic_read(&has_writer)); 291 #endif 292 } 293