1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu/queue.h" 19 #include "qemu/thread.h" 20 #include "qemu/coroutine.h" 21 #include "trace.h" 22 #include "block/thread-pool.h" 23 #include "qemu/main-loop.h" 24 25 static void do_spawn_thread(ThreadPool *pool); 26 27 typedef struct ThreadPoolElement ThreadPoolElement; 28 29 enum ThreadState { 30 THREAD_QUEUED, 31 THREAD_ACTIVE, 32 THREAD_DONE, 33 }; 34 35 struct ThreadPoolElement { 36 BlockAIOCB common; 37 ThreadPool *pool; 38 ThreadPoolFunc *func; 39 void *arg; 40 41 /* Moving state out of THREAD_QUEUED is protected by lock. After 42 * that, only the worker thread can write to it. Reads and writes 43 * of state and ret are ordered with memory barriers. 44 */ 45 enum ThreadState state; 46 int ret; 47 48 /* Access to this list is protected by lock. */ 49 QTAILQ_ENTRY(ThreadPoolElement) reqs; 50 51 /* Access to this list is protected by the global mutex. */ 52 QLIST_ENTRY(ThreadPoolElement) all; 53 }; 54 55 struct ThreadPool { 56 AioContext *ctx; 57 QEMUBH *completion_bh; 58 QemuMutex lock; 59 QemuCond worker_stopped; 60 QemuCond request_cond; 61 QEMUBH *new_thread_bh; 62 63 /* The following variables are only accessed from one AioContext. */ 64 QLIST_HEAD(, ThreadPoolElement) head; 65 66 /* The following variables are protected by lock. */ 67 QTAILQ_HEAD(, ThreadPoolElement) request_list; 68 int cur_threads; 69 int idle_threads; 70 int new_threads; /* backlog of threads we need to create */ 71 int pending_threads; /* threads created but not running yet */ 72 int min_threads; 73 int max_threads; 74 }; 75 76 static void *worker_thread(void *opaque) 77 { 78 ThreadPool *pool = opaque; 79 80 qemu_mutex_lock(&pool->lock); 81 pool->pending_threads--; 82 do_spawn_thread(pool); 83 84 while (pool->cur_threads <= pool->max_threads) { 85 ThreadPoolElement *req; 86 int ret; 87 88 if (QTAILQ_EMPTY(&pool->request_list)) { 89 pool->idle_threads++; 90 ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000); 91 pool->idle_threads--; 92 if (ret == 0 && 93 QTAILQ_EMPTY(&pool->request_list) && 94 pool->cur_threads > pool->min_threads) { 95 /* Timed out + no work to do + no need for warm threads = exit. */ 96 break; 97 } 98 /* 99 * Even if there was some work to do, check if there aren't 100 * too many worker threads before picking it up. 101 */ 102 continue; 103 } 104 105 req = QTAILQ_FIRST(&pool->request_list); 106 QTAILQ_REMOVE(&pool->request_list, req, reqs); 107 req->state = THREAD_ACTIVE; 108 qemu_mutex_unlock(&pool->lock); 109 110 ret = req->func(req->arg); 111 112 req->ret = ret; 113 /* Write ret before state. */ 114 smp_wmb(); 115 req->state = THREAD_DONE; 116 117 qemu_bh_schedule(pool->completion_bh); 118 qemu_mutex_lock(&pool->lock); 119 } 120 121 pool->cur_threads--; 122 qemu_cond_signal(&pool->worker_stopped); 123 qemu_mutex_unlock(&pool->lock); 124 125 /* 126 * Wake up another thread, in case we got a wakeup but decided 127 * to exit due to pool->cur_threads > pool->max_threads. 128 */ 129 qemu_cond_signal(&pool->request_cond); 130 return NULL; 131 } 132 133 static void do_spawn_thread(ThreadPool *pool) 134 { 135 QemuThread t; 136 137 /* Runs with lock taken. */ 138 if (!pool->new_threads) { 139 return; 140 } 141 142 pool->new_threads--; 143 pool->pending_threads++; 144 145 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 146 } 147 148 static void spawn_thread_bh_fn(void *opaque) 149 { 150 ThreadPool *pool = opaque; 151 152 qemu_mutex_lock(&pool->lock); 153 do_spawn_thread(pool); 154 qemu_mutex_unlock(&pool->lock); 155 } 156 157 static void spawn_thread(ThreadPool *pool) 158 { 159 pool->cur_threads++; 160 pool->new_threads++; 161 /* If there are threads being created, they will spawn new workers, so 162 * we don't spend time creating many threads in a loop holding a mutex or 163 * starving the current vcpu. 164 * 165 * If there are no idle threads, ask the main thread to create one, so we 166 * inherit the correct affinity instead of the vcpu affinity. 167 */ 168 if (!pool->pending_threads) { 169 qemu_bh_schedule(pool->new_thread_bh); 170 } 171 } 172 173 static void thread_pool_completion_bh(void *opaque) 174 { 175 ThreadPool *pool = opaque; 176 ThreadPoolElement *elem, *next; 177 178 aio_context_acquire(pool->ctx); 179 restart: 180 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 181 if (elem->state != THREAD_DONE) { 182 continue; 183 } 184 185 trace_thread_pool_complete(pool, elem, elem->common.opaque, 186 elem->ret); 187 QLIST_REMOVE(elem, all); 188 189 if (elem->common.cb) { 190 /* Read state before ret. */ 191 smp_rmb(); 192 193 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 194 * wait for another request that completed at the same time. 195 */ 196 qemu_bh_schedule(pool->completion_bh); 197 198 aio_context_release(pool->ctx); 199 elem->common.cb(elem->common.opaque, elem->ret); 200 aio_context_acquire(pool->ctx); 201 202 /* We can safely cancel the completion_bh here regardless of someone 203 * else having scheduled it meanwhile because we reenter the 204 * completion function anyway (goto restart). 205 */ 206 qemu_bh_cancel(pool->completion_bh); 207 208 qemu_aio_unref(elem); 209 goto restart; 210 } else { 211 qemu_aio_unref(elem); 212 } 213 } 214 aio_context_release(pool->ctx); 215 } 216 217 static void thread_pool_cancel(BlockAIOCB *acb) 218 { 219 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 220 ThreadPool *pool = elem->pool; 221 222 trace_thread_pool_cancel(elem, elem->common.opaque); 223 224 QEMU_LOCK_GUARD(&pool->lock); 225 if (elem->state == THREAD_QUEUED) { 226 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 227 qemu_bh_schedule(pool->completion_bh); 228 229 elem->state = THREAD_DONE; 230 elem->ret = -ECANCELED; 231 } 232 233 } 234 235 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 236 { 237 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 238 ThreadPool *pool = elem->pool; 239 return pool->ctx; 240 } 241 242 static const AIOCBInfo thread_pool_aiocb_info = { 243 .aiocb_size = sizeof(ThreadPoolElement), 244 .cancel_async = thread_pool_cancel, 245 .get_aio_context = thread_pool_get_aio_context, 246 }; 247 248 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, 249 ThreadPoolFunc *func, void *arg, 250 BlockCompletionFunc *cb, void *opaque) 251 { 252 ThreadPoolElement *req; 253 254 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 255 req->func = func; 256 req->arg = arg; 257 req->state = THREAD_QUEUED; 258 req->pool = pool; 259 260 QLIST_INSERT_HEAD(&pool->head, req, all); 261 262 trace_thread_pool_submit(pool, req, arg); 263 264 qemu_mutex_lock(&pool->lock); 265 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 266 spawn_thread(pool); 267 } 268 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 269 qemu_mutex_unlock(&pool->lock); 270 qemu_cond_signal(&pool->request_cond); 271 return &req->common; 272 } 273 274 typedef struct ThreadPoolCo { 275 Coroutine *co; 276 int ret; 277 } ThreadPoolCo; 278 279 static void thread_pool_co_cb(void *opaque, int ret) 280 { 281 ThreadPoolCo *co = opaque; 282 283 co->ret = ret; 284 aio_co_wake(co->co); 285 } 286 287 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, 288 void *arg) 289 { 290 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 291 assert(qemu_in_coroutine()); 292 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); 293 qemu_coroutine_yield(); 294 return tpc.ret; 295 } 296 297 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) 298 { 299 thread_pool_submit_aio(pool, func, arg, NULL, NULL); 300 } 301 302 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) 303 { 304 qemu_mutex_lock(&pool->lock); 305 306 pool->min_threads = ctx->thread_pool_min; 307 pool->max_threads = ctx->thread_pool_max; 308 309 /* 310 * We either have to: 311 * - Increase the number available of threads until over the min_threads 312 * threshold. 313 * - Bump the worker threads so that they exit, until under the max_threads 314 * threshold. 315 * - Do nothing. The current number of threads fall in between the min and 316 * max thresholds. We'll let the pool manage itself. 317 */ 318 for (int i = pool->cur_threads; i < pool->min_threads; i++) { 319 spawn_thread(pool); 320 } 321 322 for (int i = pool->cur_threads; i > pool->max_threads; i--) { 323 qemu_cond_signal(&pool->request_cond); 324 } 325 326 qemu_mutex_unlock(&pool->lock); 327 } 328 329 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 330 { 331 if (!ctx) { 332 ctx = qemu_get_aio_context(); 333 } 334 335 memset(pool, 0, sizeof(*pool)); 336 pool->ctx = ctx; 337 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 338 qemu_mutex_init(&pool->lock); 339 qemu_cond_init(&pool->worker_stopped); 340 qemu_cond_init(&pool->request_cond); 341 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 342 343 QLIST_INIT(&pool->head); 344 QTAILQ_INIT(&pool->request_list); 345 346 thread_pool_update_params(pool, ctx); 347 } 348 349 ThreadPool *thread_pool_new(AioContext *ctx) 350 { 351 ThreadPool *pool = g_new(ThreadPool, 1); 352 thread_pool_init_one(pool, ctx); 353 return pool; 354 } 355 356 void thread_pool_free(ThreadPool *pool) 357 { 358 if (!pool) { 359 return; 360 } 361 362 assert(QLIST_EMPTY(&pool->head)); 363 364 qemu_mutex_lock(&pool->lock); 365 366 /* Stop new threads from spawning */ 367 qemu_bh_delete(pool->new_thread_bh); 368 pool->cur_threads -= pool->new_threads; 369 pool->new_threads = 0; 370 371 /* Wait for worker threads to terminate */ 372 pool->max_threads = 0; 373 qemu_cond_broadcast(&pool->request_cond); 374 while (pool->cur_threads > 0) { 375 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 376 } 377 378 qemu_mutex_unlock(&pool->lock); 379 380 qemu_bh_delete(pool->completion_bh); 381 qemu_cond_destroy(&pool->request_cond); 382 qemu_cond_destroy(&pool->worker_stopped); 383 qemu_mutex_destroy(&pool->lock); 384 g_free(pool); 385 } 386