1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu/queue.h" 19 #include "qemu/thread.h" 20 #include "qemu/coroutine.h" 21 #include "trace.h" 22 #include "block/thread-pool.h" 23 #include "qemu/main-loop.h" 24 25 static void do_spawn_thread(ThreadPool *pool); 26 27 typedef struct ThreadPoolElement ThreadPoolElement; 28 29 enum ThreadState { 30 THREAD_QUEUED, 31 THREAD_ACTIVE, 32 THREAD_DONE, 33 }; 34 35 struct ThreadPoolElement { 36 BlockAIOCB common; 37 ThreadPool *pool; 38 ThreadPoolFunc *func; 39 void *arg; 40 41 /* Moving state out of THREAD_QUEUED is protected by lock. After 42 * that, only the worker thread can write to it. Reads and writes 43 * of state and ret are ordered with memory barriers. 44 */ 45 enum ThreadState state; 46 int ret; 47 48 /* Access to this list is protected by lock. */ 49 QTAILQ_ENTRY(ThreadPoolElement) reqs; 50 51 /* Access to this list is protected by the global mutex. */ 52 QLIST_ENTRY(ThreadPoolElement) all; 53 }; 54 55 struct ThreadPool { 56 AioContext *ctx; 57 QEMUBH *completion_bh; 58 QemuMutex lock; 59 QemuCond worker_stopped; 60 QemuSemaphore sem; 61 QEMUBH *new_thread_bh; 62 63 /* The following variables are only accessed from one AioContext. */ 64 QLIST_HEAD(, ThreadPoolElement) head; 65 66 /* The following variables are protected by lock. */ 67 QTAILQ_HEAD(, ThreadPoolElement) request_list; 68 int cur_threads; 69 int idle_threads; 70 int new_threads; /* backlog of threads we need to create */ 71 int pending_threads; /* threads created but not running yet */ 72 bool stopping; 73 int min_threads; 74 int max_threads; 75 }; 76 77 static inline bool back_to_sleep(ThreadPool *pool, int ret) 78 { 79 /* 80 * The semaphore timed out, we should exit the loop except when: 81 * - There is work to do, we raced with the signal. 82 * - The max threads threshold just changed, we raced with the signal. 83 * - The thread pool forces a minimum number of readily available threads. 84 */ 85 if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) || 86 pool->cur_threads > pool->max_threads || 87 pool->cur_threads <= pool->min_threads)) { 88 return true; 89 } 90 91 return false; 92 } 93 94 static void *worker_thread(void *opaque) 95 { 96 ThreadPool *pool = opaque; 97 98 qemu_mutex_lock(&pool->lock); 99 pool->pending_threads--; 100 do_spawn_thread(pool); 101 102 while (!pool->stopping) { 103 ThreadPoolElement *req; 104 int ret; 105 106 do { 107 pool->idle_threads++; 108 qemu_mutex_unlock(&pool->lock); 109 ret = qemu_sem_timedwait(&pool->sem, 10000); 110 qemu_mutex_lock(&pool->lock); 111 pool->idle_threads--; 112 } while (back_to_sleep(pool, ret)); 113 if (ret == -1 || pool->stopping || 114 pool->cur_threads > pool->max_threads) { 115 break; 116 } 117 118 req = QTAILQ_FIRST(&pool->request_list); 119 QTAILQ_REMOVE(&pool->request_list, req, reqs); 120 req->state = THREAD_ACTIVE; 121 qemu_mutex_unlock(&pool->lock); 122 123 ret = req->func(req->arg); 124 125 req->ret = ret; 126 /* Write ret before state. */ 127 smp_wmb(); 128 req->state = THREAD_DONE; 129 130 qemu_mutex_lock(&pool->lock); 131 132 qemu_bh_schedule(pool->completion_bh); 133 } 134 135 pool->cur_threads--; 136 qemu_cond_signal(&pool->worker_stopped); 137 qemu_mutex_unlock(&pool->lock); 138 return NULL; 139 } 140 141 static void do_spawn_thread(ThreadPool *pool) 142 { 143 QemuThread t; 144 145 /* Runs with lock taken. */ 146 if (!pool->new_threads) { 147 return; 148 } 149 150 pool->new_threads--; 151 pool->pending_threads++; 152 153 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 154 } 155 156 static void spawn_thread_bh_fn(void *opaque) 157 { 158 ThreadPool *pool = opaque; 159 160 qemu_mutex_lock(&pool->lock); 161 do_spawn_thread(pool); 162 qemu_mutex_unlock(&pool->lock); 163 } 164 165 static void spawn_thread(ThreadPool *pool) 166 { 167 pool->cur_threads++; 168 pool->new_threads++; 169 /* If there are threads being created, they will spawn new workers, so 170 * we don't spend time creating many threads in a loop holding a mutex or 171 * starving the current vcpu. 172 * 173 * If there are no idle threads, ask the main thread to create one, so we 174 * inherit the correct affinity instead of the vcpu affinity. 175 */ 176 if (!pool->pending_threads) { 177 qemu_bh_schedule(pool->new_thread_bh); 178 } 179 } 180 181 static void thread_pool_completion_bh(void *opaque) 182 { 183 ThreadPool *pool = opaque; 184 ThreadPoolElement *elem, *next; 185 186 aio_context_acquire(pool->ctx); 187 restart: 188 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 189 if (elem->state != THREAD_DONE) { 190 continue; 191 } 192 193 trace_thread_pool_complete(pool, elem, elem->common.opaque, 194 elem->ret); 195 QLIST_REMOVE(elem, all); 196 197 if (elem->common.cb) { 198 /* Read state before ret. */ 199 smp_rmb(); 200 201 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 202 * wait for another request that completed at the same time. 203 */ 204 qemu_bh_schedule(pool->completion_bh); 205 206 aio_context_release(pool->ctx); 207 elem->common.cb(elem->common.opaque, elem->ret); 208 aio_context_acquire(pool->ctx); 209 210 /* We can safely cancel the completion_bh here regardless of someone 211 * else having scheduled it meanwhile because we reenter the 212 * completion function anyway (goto restart). 213 */ 214 qemu_bh_cancel(pool->completion_bh); 215 216 qemu_aio_unref(elem); 217 goto restart; 218 } else { 219 qemu_aio_unref(elem); 220 } 221 } 222 aio_context_release(pool->ctx); 223 } 224 225 static void thread_pool_cancel(BlockAIOCB *acb) 226 { 227 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 228 ThreadPool *pool = elem->pool; 229 230 trace_thread_pool_cancel(elem, elem->common.opaque); 231 232 QEMU_LOCK_GUARD(&pool->lock); 233 if (elem->state == THREAD_QUEUED && 234 /* No thread has yet started working on elem. we can try to "steal" 235 * the item from the worker if we can get a signal from the 236 * semaphore. Because this is non-blocking, we can do it with 237 * the lock taken and ensure that elem will remain THREAD_QUEUED. 238 */ 239 qemu_sem_timedwait(&pool->sem, 0) == 0) { 240 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 241 qemu_bh_schedule(pool->completion_bh); 242 243 elem->state = THREAD_DONE; 244 elem->ret = -ECANCELED; 245 } 246 247 } 248 249 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 250 { 251 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 252 ThreadPool *pool = elem->pool; 253 return pool->ctx; 254 } 255 256 static const AIOCBInfo thread_pool_aiocb_info = { 257 .aiocb_size = sizeof(ThreadPoolElement), 258 .cancel_async = thread_pool_cancel, 259 .get_aio_context = thread_pool_get_aio_context, 260 }; 261 262 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, 263 ThreadPoolFunc *func, void *arg, 264 BlockCompletionFunc *cb, void *opaque) 265 { 266 ThreadPoolElement *req; 267 268 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 269 req->func = func; 270 req->arg = arg; 271 req->state = THREAD_QUEUED; 272 req->pool = pool; 273 274 QLIST_INSERT_HEAD(&pool->head, req, all); 275 276 trace_thread_pool_submit(pool, req, arg); 277 278 qemu_mutex_lock(&pool->lock); 279 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 280 spawn_thread(pool); 281 } 282 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 283 qemu_mutex_unlock(&pool->lock); 284 qemu_sem_post(&pool->sem); 285 return &req->common; 286 } 287 288 typedef struct ThreadPoolCo { 289 Coroutine *co; 290 int ret; 291 } ThreadPoolCo; 292 293 static void thread_pool_co_cb(void *opaque, int ret) 294 { 295 ThreadPoolCo *co = opaque; 296 297 co->ret = ret; 298 aio_co_wake(co->co); 299 } 300 301 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, 302 void *arg) 303 { 304 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 305 assert(qemu_in_coroutine()); 306 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); 307 qemu_coroutine_yield(); 308 return tpc.ret; 309 } 310 311 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) 312 { 313 thread_pool_submit_aio(pool, func, arg, NULL, NULL); 314 } 315 316 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) 317 { 318 qemu_mutex_lock(&pool->lock); 319 320 pool->min_threads = ctx->thread_pool_min; 321 pool->max_threads = ctx->thread_pool_max; 322 323 /* 324 * We either have to: 325 * - Increase the number available of threads until over the min_threads 326 * threshold. 327 * - Decrease the number of available threads until under the max_threads 328 * threshold. 329 * - Do nothing. The current number of threads fall in between the min and 330 * max thresholds. We'll let the pool manage itself. 331 */ 332 for (int i = pool->cur_threads; i < pool->min_threads; i++) { 333 spawn_thread(pool); 334 } 335 336 for (int i = pool->cur_threads; i > pool->max_threads; i--) { 337 qemu_sem_post(&pool->sem); 338 } 339 340 qemu_mutex_unlock(&pool->lock); 341 } 342 343 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 344 { 345 if (!ctx) { 346 ctx = qemu_get_aio_context(); 347 } 348 349 memset(pool, 0, sizeof(*pool)); 350 pool->ctx = ctx; 351 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 352 qemu_mutex_init(&pool->lock); 353 qemu_cond_init(&pool->worker_stopped); 354 qemu_sem_init(&pool->sem, 0); 355 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 356 357 QLIST_INIT(&pool->head); 358 QTAILQ_INIT(&pool->request_list); 359 360 thread_pool_update_params(pool, ctx); 361 } 362 363 ThreadPool *thread_pool_new(AioContext *ctx) 364 { 365 ThreadPool *pool = g_new(ThreadPool, 1); 366 thread_pool_init_one(pool, ctx); 367 return pool; 368 } 369 370 void thread_pool_free(ThreadPool *pool) 371 { 372 if (!pool) { 373 return; 374 } 375 376 assert(QLIST_EMPTY(&pool->head)); 377 378 qemu_mutex_lock(&pool->lock); 379 380 /* Stop new threads from spawning */ 381 qemu_bh_delete(pool->new_thread_bh); 382 pool->cur_threads -= pool->new_threads; 383 pool->new_threads = 0; 384 385 /* Wait for worker threads to terminate */ 386 pool->stopping = true; 387 while (pool->cur_threads > 0) { 388 qemu_sem_post(&pool->sem); 389 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 390 } 391 392 qemu_mutex_unlock(&pool->lock); 393 394 qemu_bh_delete(pool->completion_bh); 395 qemu_sem_destroy(&pool->sem); 396 qemu_cond_destroy(&pool->worker_stopped); 397 qemu_mutex_destroy(&pool->lock); 398 g_free(pool); 399 } 400