1 /* 2 * Graph lock: rwlock to protect block layer graph manipulations (add/remove 3 * edges and nodes) 4 * 5 * Copyright (c) 2022 Red Hat 6 * 7 * This library is free software; you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public 9 * License as published by the Free Software Foundation; either 10 * version 2.1 of the License, or (at your option) any later version. 11 * 12 * This library is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 19 */ 20 21 #include "qemu/osdep.h" 22 #include "qemu/main-loop.h" 23 #include "block/graph-lock.h" 24 #include "block/block.h" 25 #include "block/block_int.h" 26 27 /* Protects the list of aiocontext and orphaned_reader_count */ 28 static QemuMutex aio_context_list_lock; 29 30 /* Written and read with atomic operations. */ 31 static int has_writer; 32 33 /* 34 * A reader coroutine could move from an AioContext to another. 35 * If this happens, there is no problem from the point of view of 36 * counters. The problem is that the total count becomes 37 * unbalanced if one of the two AioContexts gets deleted. 38 * The count of readers must remain correct, so the AioContext's 39 * balance is transferred to this glboal variable. 40 * Protected by aio_context_list_lock. 41 */ 42 static uint32_t orphaned_reader_count; 43 44 /* Queue of readers waiting for the writer to finish */ 45 static CoQueue reader_queue; 46 47 struct BdrvGraphRWlock { 48 /* How many readers are currently reading the graph. */ 49 uint32_t reader_count; 50 51 /* 52 * List of BdrvGraphRWlock kept in graph-lock.c 53 * Protected by aio_context_list_lock 54 */ 55 QTAILQ_ENTRY(BdrvGraphRWlock) next_aio; 56 }; 57 58 /* 59 * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock 60 * can safely modify only its own counter, avoid reading/writing 61 * others and thus improving performances by avoiding cacheline bounces. 62 */ 63 static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list = 64 QTAILQ_HEAD_INITIALIZER(aio_context_list); 65 66 static void __attribute__((__constructor__)) bdrv_init_graph_lock(void) 67 { 68 qemu_mutex_init(&aio_context_list_lock); 69 qemu_co_queue_init(&reader_queue); 70 } 71 72 void register_aiocontext(AioContext *ctx) 73 { 74 ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1); 75 QEMU_LOCK_GUARD(&aio_context_list_lock); 76 assert(ctx->bdrv_graph->reader_count == 0); 77 QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio); 78 } 79 80 void unregister_aiocontext(AioContext *ctx) 81 { 82 QEMU_LOCK_GUARD(&aio_context_list_lock); 83 orphaned_reader_count += ctx->bdrv_graph->reader_count; 84 QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio); 85 g_free(ctx->bdrv_graph); 86 } 87 88 static uint32_t reader_count(void) 89 { 90 BdrvGraphRWlock *brdv_graph; 91 uint32_t rd; 92 93 QEMU_LOCK_GUARD(&aio_context_list_lock); 94 95 /* rd can temporarly be negative, but the total will *always* be >= 0 */ 96 rd = orphaned_reader_count; 97 QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) { 98 rd += qatomic_read(&brdv_graph->reader_count); 99 } 100 101 /* shouldn't overflow unless there are 2^31 readers */ 102 assert((int32_t)rd >= 0); 103 return rd; 104 } 105 106 void bdrv_graph_wrlock(void) 107 { 108 GLOBAL_STATE_CODE(); 109 assert(!qatomic_read(&has_writer)); 110 111 /* Make sure that constantly arriving new I/O doesn't cause starvation */ 112 bdrv_drain_all_begin_nopoll(); 113 114 /* 115 * reader_count == 0: this means writer will read has_reader as 1 116 * reader_count >= 1: we don't know if writer read has_writer == 0 or 1, 117 * but we need to wait. 118 * Wait by allowing other coroutine (and possible readers) to continue. 119 */ 120 do { 121 /* 122 * has_writer must be 0 while polling, otherwise we get a deadlock if 123 * any callback involved during AIO_WAIT_WHILE() tries to acquire the 124 * reader lock. 125 */ 126 qatomic_set(&has_writer, 0); 127 AIO_WAIT_WHILE(qemu_get_aio_context(), reader_count() >= 1); 128 qatomic_set(&has_writer, 1); 129 130 /* 131 * We want to only check reader_count() after has_writer = 1 is visible 132 * to other threads. That way no more readers can sneak in after we've 133 * determined reader_count() == 0. 134 */ 135 smp_mb(); 136 } while (reader_count() >= 1); 137 138 bdrv_drain_all_end(); 139 } 140 141 void bdrv_graph_wrunlock(void) 142 { 143 GLOBAL_STATE_CODE(); 144 QEMU_LOCK_GUARD(&aio_context_list_lock); 145 assert(qatomic_read(&has_writer)); 146 147 /* 148 * No need for memory barriers, this works in pair with 149 * the slow path of rdlock() and both take the lock. 150 */ 151 qatomic_store_release(&has_writer, 0); 152 153 /* Wake up all coroutine that are waiting to read the graph */ 154 qemu_co_enter_all(&reader_queue, &aio_context_list_lock); 155 } 156 157 void coroutine_fn bdrv_graph_co_rdlock(void) 158 { 159 BdrvGraphRWlock *bdrv_graph; 160 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 161 162 /* Do not lock if in main thread */ 163 if (qemu_in_main_thread()) { 164 return; 165 } 166 167 for (;;) { 168 qatomic_set(&bdrv_graph->reader_count, 169 bdrv_graph->reader_count + 1); 170 /* make sure writer sees reader_count before we check has_writer */ 171 smp_mb(); 172 173 /* 174 * has_writer == 0: this means writer will read reader_count as >= 1 175 * has_writer == 1: we don't know if writer read reader_count == 0 176 * or > 0, but we need to wait anyways because 177 * it will write. 178 */ 179 if (!qatomic_read(&has_writer)) { 180 break; 181 } 182 183 /* 184 * Synchronize access with reader_count() in bdrv_graph_wrlock(). 185 * Case 1: 186 * If this critical section gets executed first, reader_count will 187 * decrease and the reader will go to sleep. 188 * Then the writer will read reader_count that does not take into 189 * account this reader, and if there's no other reader it will 190 * enter the write section. 191 * Case 2: 192 * If reader_count() critical section gets executed first, 193 * then writer will read reader_count >= 1. 194 * It will wait in AIO_WAIT_WHILE(), but once it releases the lock 195 * we will enter this critical section and call aio_wait_kick(). 196 */ 197 WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) { 198 /* 199 * Additional check when we use the above lock to synchronize 200 * with bdrv_graph_wrunlock(). 201 * Case 1: 202 * If this gets executed first, has_writer is still 1, so we reduce 203 * reader_count and go to sleep. 204 * Then the writer will set has_writer to 0 and wake up all readers, 205 * us included. 206 * Case 2: 207 * If bdrv_graph_wrunlock() critical section gets executed first, 208 * then it will set has_writer to 0 and wake up all other readers. 209 * Then we execute this critical section, and therefore must check 210 * again for has_writer, otherwise we sleep without any writer 211 * actually running. 212 */ 213 if (!qatomic_read(&has_writer)) { 214 return; 215 } 216 217 /* slow path where reader sleeps */ 218 bdrv_graph->reader_count--; 219 aio_wait_kick(); 220 qemu_co_queue_wait(&reader_queue, &aio_context_list_lock); 221 } 222 } 223 } 224 225 void coroutine_fn bdrv_graph_co_rdunlock(void) 226 { 227 BdrvGraphRWlock *bdrv_graph; 228 bdrv_graph = qemu_get_current_aio_context()->bdrv_graph; 229 230 /* Do not lock if in main thread */ 231 if (qemu_in_main_thread()) { 232 return; 233 } 234 235 qatomic_store_release(&bdrv_graph->reader_count, 236 bdrv_graph->reader_count - 1); 237 /* make sure writer sees reader_count before we check has_writer */ 238 smp_mb(); 239 240 /* 241 * has_writer == 0: this means reader will read reader_count decreased 242 * has_writer == 1: we don't know if writer read reader_count old or 243 * new. Therefore, kick again so on next iteration 244 * writer will for sure read the updated value. 245 */ 246 if (qatomic_read(&has_writer)) { 247 aio_wait_kick(); 248 } 249 } 250 251 void bdrv_graph_rdlock_main_loop(void) 252 { 253 GLOBAL_STATE_CODE(); 254 assert(!qemu_in_coroutine()); 255 } 256 257 void bdrv_graph_rdunlock_main_loop(void) 258 { 259 GLOBAL_STATE_CODE(); 260 assert(!qemu_in_coroutine()); 261 } 262