1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu/defer-call.h" 19 #include "qemu/queue.h" 20 #include "qemu/thread.h" 21 #include "qemu/coroutine.h" 22 #include "trace.h" 23 #include "block/thread-pool.h" 24 #include "qemu/main-loop.h" 25 26 static void do_spawn_thread(ThreadPool *pool); 27 28 typedef struct ThreadPoolElement ThreadPoolElement; 29 30 enum ThreadState { 31 THREAD_QUEUED, 32 THREAD_ACTIVE, 33 THREAD_DONE, 34 }; 35 36 struct ThreadPoolElement { 37 BlockAIOCB common; 38 ThreadPool *pool; 39 ThreadPoolFunc *func; 40 void *arg; 41 42 /* Moving state out of THREAD_QUEUED is protected by lock. After 43 * that, only the worker thread can write to it. Reads and writes 44 * of state and ret are ordered with memory barriers. 45 */ 46 enum ThreadState state; 47 int ret; 48 49 /* Access to this list is protected by lock. */ 50 QTAILQ_ENTRY(ThreadPoolElement) reqs; 51 52 /* This list is only written by the thread pool's mother thread. */ 53 QLIST_ENTRY(ThreadPoolElement) all; 54 }; 55 56 struct ThreadPool { 57 AioContext *ctx; 58 QEMUBH *completion_bh; 59 QemuMutex lock; 60 QemuCond worker_stopped; 61 QemuCond request_cond; 62 QEMUBH *new_thread_bh; 63 64 /* The following variables are only accessed from one AioContext. */ 65 QLIST_HEAD(, ThreadPoolElement) head; 66 67 /* The following variables are protected by lock. */ 68 QTAILQ_HEAD(, ThreadPoolElement) request_list; 69 int cur_threads; 70 int idle_threads; 71 int new_threads; /* backlog of threads we need to create */ 72 int pending_threads; /* threads created but not running yet */ 73 int min_threads; 74 int max_threads; 75 }; 76 77 static void *worker_thread(void *opaque) 78 { 79 ThreadPool *pool = opaque; 80 81 qemu_mutex_lock(&pool->lock); 82 pool->pending_threads--; 83 do_spawn_thread(pool); 84 85 while (pool->cur_threads <= pool->max_threads) { 86 ThreadPoolElement *req; 87 int ret; 88 89 if (QTAILQ_EMPTY(&pool->request_list)) { 90 pool->idle_threads++; 91 ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000); 92 pool->idle_threads--; 93 if (ret == 0 && 94 QTAILQ_EMPTY(&pool->request_list) && 95 pool->cur_threads > pool->min_threads) { 96 /* Timed out + no work to do + no need for warm threads = exit. */ 97 break; 98 } 99 /* 100 * Even if there was some work to do, check if there aren't 101 * too many worker threads before picking it up. 102 */ 103 continue; 104 } 105 106 req = QTAILQ_FIRST(&pool->request_list); 107 QTAILQ_REMOVE(&pool->request_list, req, reqs); 108 req->state = THREAD_ACTIVE; 109 qemu_mutex_unlock(&pool->lock); 110 111 ret = req->func(req->arg); 112 113 req->ret = ret; 114 /* Write ret before state. */ 115 smp_wmb(); 116 req->state = THREAD_DONE; 117 118 qemu_bh_schedule(pool->completion_bh); 119 qemu_mutex_lock(&pool->lock); 120 } 121 122 pool->cur_threads--; 123 qemu_cond_signal(&pool->worker_stopped); 124 125 /* 126 * Wake up another thread, in case we got a wakeup but decided 127 * to exit due to pool->cur_threads > pool->max_threads. 128 */ 129 qemu_cond_signal(&pool->request_cond); 130 qemu_mutex_unlock(&pool->lock); 131 return NULL; 132 } 133 134 static void do_spawn_thread(ThreadPool *pool) 135 { 136 QemuThread t; 137 138 /* Runs with lock taken. */ 139 if (!pool->new_threads) { 140 return; 141 } 142 143 pool->new_threads--; 144 pool->pending_threads++; 145 146 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 147 } 148 149 static void spawn_thread_bh_fn(void *opaque) 150 { 151 ThreadPool *pool = opaque; 152 153 qemu_mutex_lock(&pool->lock); 154 do_spawn_thread(pool); 155 qemu_mutex_unlock(&pool->lock); 156 } 157 158 static void spawn_thread(ThreadPool *pool) 159 { 160 pool->cur_threads++; 161 pool->new_threads++; 162 /* If there are threads being created, they will spawn new workers, so 163 * we don't spend time creating many threads in a loop holding a mutex or 164 * starving the current vcpu. 165 * 166 * If there are no idle threads, ask the main thread to create one, so we 167 * inherit the correct affinity instead of the vcpu affinity. 168 */ 169 if (!pool->pending_threads) { 170 qemu_bh_schedule(pool->new_thread_bh); 171 } 172 } 173 174 static void thread_pool_completion_bh(void *opaque) 175 { 176 ThreadPool *pool = opaque; 177 ThreadPoolElement *elem, *next; 178 179 defer_call_begin(); /* cb() may use defer_call() to coalesce work */ 180 181 restart: 182 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 183 if (elem->state != THREAD_DONE) { 184 continue; 185 } 186 187 trace_thread_pool_complete(pool, elem, elem->common.opaque, 188 elem->ret); 189 QLIST_REMOVE(elem, all); 190 191 if (elem->common.cb) { 192 /* Read state before ret. */ 193 smp_rmb(); 194 195 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 196 * wait for another request that completed at the same time. 197 */ 198 qemu_bh_schedule(pool->completion_bh); 199 200 elem->common.cb(elem->common.opaque, elem->ret); 201 202 /* We can safely cancel the completion_bh here regardless of someone 203 * else having scheduled it meanwhile because we reenter the 204 * completion function anyway (goto restart). 205 */ 206 qemu_bh_cancel(pool->completion_bh); 207 208 qemu_aio_unref(elem); 209 goto restart; 210 } else { 211 qemu_aio_unref(elem); 212 } 213 } 214 215 defer_call_end(); 216 } 217 218 static void thread_pool_cancel(BlockAIOCB *acb) 219 { 220 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 221 ThreadPool *pool = elem->pool; 222 223 trace_thread_pool_cancel(elem, elem->common.opaque); 224 225 QEMU_LOCK_GUARD(&pool->lock); 226 if (elem->state == THREAD_QUEUED) { 227 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 228 qemu_bh_schedule(pool->completion_bh); 229 230 elem->state = THREAD_DONE; 231 elem->ret = -ECANCELED; 232 } 233 234 } 235 236 static const AIOCBInfo thread_pool_aiocb_info = { 237 .aiocb_size = sizeof(ThreadPoolElement), 238 .cancel_async = thread_pool_cancel, 239 }; 240 241 BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, 242 BlockCompletionFunc *cb, void *opaque) 243 { 244 ThreadPoolElement *req; 245 AioContext *ctx = qemu_get_current_aio_context(); 246 ThreadPool *pool = aio_get_thread_pool(ctx); 247 248 /* Assert that the thread submitting work is the same running the pool */ 249 assert(pool->ctx == qemu_get_current_aio_context()); 250 251 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 252 req->func = func; 253 req->arg = arg; 254 req->state = THREAD_QUEUED; 255 req->pool = pool; 256 257 QLIST_INSERT_HEAD(&pool->head, req, all); 258 259 trace_thread_pool_submit(pool, req, arg); 260 261 qemu_mutex_lock(&pool->lock); 262 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 263 spawn_thread(pool); 264 } 265 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 266 qemu_mutex_unlock(&pool->lock); 267 qemu_cond_signal(&pool->request_cond); 268 return &req->common; 269 } 270 271 typedef struct ThreadPoolCo { 272 Coroutine *co; 273 int ret; 274 } ThreadPoolCo; 275 276 static void thread_pool_co_cb(void *opaque, int ret) 277 { 278 ThreadPoolCo *co = opaque; 279 280 co->ret = ret; 281 aio_co_wake(co->co); 282 } 283 284 int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) 285 { 286 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 287 assert(qemu_in_coroutine()); 288 thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); 289 qemu_coroutine_yield(); 290 return tpc.ret; 291 } 292 293 void thread_pool_submit(ThreadPoolFunc *func, void *arg) 294 { 295 thread_pool_submit_aio(func, arg, NULL, NULL); 296 } 297 298 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) 299 { 300 qemu_mutex_lock(&pool->lock); 301 302 pool->min_threads = ctx->thread_pool_min; 303 pool->max_threads = ctx->thread_pool_max; 304 305 /* 306 * We either have to: 307 * - Increase the number available of threads until over the min_threads 308 * threshold. 309 * - Bump the worker threads so that they exit, until under the max_threads 310 * threshold. 311 * - Do nothing. The current number of threads fall in between the min and 312 * max thresholds. We'll let the pool manage itself. 313 */ 314 for (int i = pool->cur_threads; i < pool->min_threads; i++) { 315 spawn_thread(pool); 316 } 317 318 for (int i = pool->cur_threads; i > pool->max_threads; i--) { 319 qemu_cond_signal(&pool->request_cond); 320 } 321 322 qemu_mutex_unlock(&pool->lock); 323 } 324 325 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 326 { 327 if (!ctx) { 328 ctx = qemu_get_aio_context(); 329 } 330 331 memset(pool, 0, sizeof(*pool)); 332 pool->ctx = ctx; 333 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 334 qemu_mutex_init(&pool->lock); 335 qemu_cond_init(&pool->worker_stopped); 336 qemu_cond_init(&pool->request_cond); 337 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 338 339 QLIST_INIT(&pool->head); 340 QTAILQ_INIT(&pool->request_list); 341 342 thread_pool_update_params(pool, ctx); 343 } 344 345 ThreadPool *thread_pool_new(AioContext *ctx) 346 { 347 ThreadPool *pool = g_new(ThreadPool, 1); 348 thread_pool_init_one(pool, ctx); 349 return pool; 350 } 351 352 void thread_pool_free(ThreadPool *pool) 353 { 354 if (!pool) { 355 return; 356 } 357 358 assert(QLIST_EMPTY(&pool->head)); 359 360 qemu_mutex_lock(&pool->lock); 361 362 /* Stop new threads from spawning */ 363 qemu_bh_delete(pool->new_thread_bh); 364 pool->cur_threads -= pool->new_threads; 365 pool->new_threads = 0; 366 367 /* Wait for worker threads to terminate */ 368 pool->max_threads = 0; 369 qemu_cond_broadcast(&pool->request_cond); 370 while (pool->cur_threads > 0) { 371 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 372 } 373 374 qemu_mutex_unlock(&pool->lock); 375 376 qemu_bh_delete(pool->completion_bh); 377 qemu_cond_destroy(&pool->request_cond); 378 qemu_cond_destroy(&pool->worker_stopped); 379 qemu_mutex_destroy(&pool->lock); 380 g_free(pool); 381 } 382