1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu/queue.h" 19 #include "qemu/thread.h" 20 #include "qemu/coroutine.h" 21 #include "trace.h" 22 #include "block/thread-pool.h" 23 #include "qemu/main-loop.h" 24 25 static void do_spawn_thread(ThreadPool *pool); 26 27 typedef struct ThreadPoolElement ThreadPoolElement; 28 29 enum ThreadState { 30 THREAD_QUEUED, 31 THREAD_ACTIVE, 32 THREAD_DONE, 33 }; 34 35 struct ThreadPoolElement { 36 BlockAIOCB common; 37 ThreadPool *pool; 38 ThreadPoolFunc *func; 39 void *arg; 40 41 /* Moving state out of THREAD_QUEUED is protected by lock. After 42 * that, only the worker thread can write to it. Reads and writes 43 * of state and ret are ordered with memory barriers. 44 */ 45 enum ThreadState state; 46 int ret; 47 48 /* Access to this list is protected by lock. */ 49 QTAILQ_ENTRY(ThreadPoolElement) reqs; 50 51 /* This list is only written by the thread pool's mother thread. */ 52 QLIST_ENTRY(ThreadPoolElement) all; 53 }; 54 55 struct ThreadPool { 56 AioContext *ctx; 57 QEMUBH *completion_bh; 58 QemuMutex lock; 59 QemuCond worker_stopped; 60 QemuCond request_cond; 61 QEMUBH *new_thread_bh; 62 63 /* The following variables are only accessed from one AioContext. */ 64 QLIST_HEAD(, ThreadPoolElement) head; 65 66 /* The following variables are protected by lock. */ 67 QTAILQ_HEAD(, ThreadPoolElement) request_list; 68 int cur_threads; 69 int idle_threads; 70 int new_threads; /* backlog of threads we need to create */ 71 int pending_threads; /* threads created but not running yet */ 72 int min_threads; 73 int max_threads; 74 }; 75 76 static void *worker_thread(void *opaque) 77 { 78 ThreadPool *pool = opaque; 79 80 qemu_mutex_lock(&pool->lock); 81 pool->pending_threads--; 82 do_spawn_thread(pool); 83 84 while (pool->cur_threads <= pool->max_threads) { 85 ThreadPoolElement *req; 86 int ret; 87 88 if (QTAILQ_EMPTY(&pool->request_list)) { 89 pool->idle_threads++; 90 ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000); 91 pool->idle_threads--; 92 if (ret == 0 && 93 QTAILQ_EMPTY(&pool->request_list) && 94 pool->cur_threads > pool->min_threads) { 95 /* Timed out + no work to do + no need for warm threads = exit. */ 96 break; 97 } 98 /* 99 * Even if there was some work to do, check if there aren't 100 * too many worker threads before picking it up. 101 */ 102 continue; 103 } 104 105 req = QTAILQ_FIRST(&pool->request_list); 106 QTAILQ_REMOVE(&pool->request_list, req, reqs); 107 req->state = THREAD_ACTIVE; 108 qemu_mutex_unlock(&pool->lock); 109 110 ret = req->func(req->arg); 111 112 req->ret = ret; 113 /* Write ret before state. */ 114 smp_wmb(); 115 req->state = THREAD_DONE; 116 117 qemu_bh_schedule(pool->completion_bh); 118 qemu_mutex_lock(&pool->lock); 119 } 120 121 pool->cur_threads--; 122 qemu_cond_signal(&pool->worker_stopped); 123 qemu_mutex_unlock(&pool->lock); 124 125 /* 126 * Wake up another thread, in case we got a wakeup but decided 127 * to exit due to pool->cur_threads > pool->max_threads. 128 */ 129 qemu_cond_signal(&pool->request_cond); 130 return NULL; 131 } 132 133 static void do_spawn_thread(ThreadPool *pool) 134 { 135 QemuThread t; 136 137 /* Runs with lock taken. */ 138 if (!pool->new_threads) { 139 return; 140 } 141 142 pool->new_threads--; 143 pool->pending_threads++; 144 145 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 146 } 147 148 static void spawn_thread_bh_fn(void *opaque) 149 { 150 ThreadPool *pool = opaque; 151 152 qemu_mutex_lock(&pool->lock); 153 do_spawn_thread(pool); 154 qemu_mutex_unlock(&pool->lock); 155 } 156 157 static void spawn_thread(ThreadPool *pool) 158 { 159 pool->cur_threads++; 160 pool->new_threads++; 161 /* If there are threads being created, they will spawn new workers, so 162 * we don't spend time creating many threads in a loop holding a mutex or 163 * starving the current vcpu. 164 * 165 * If there are no idle threads, ask the main thread to create one, so we 166 * inherit the correct affinity instead of the vcpu affinity. 167 */ 168 if (!pool->pending_threads) { 169 qemu_bh_schedule(pool->new_thread_bh); 170 } 171 } 172 173 static void thread_pool_completion_bh(void *opaque) 174 { 175 ThreadPool *pool = opaque; 176 ThreadPoolElement *elem, *next; 177 178 restart: 179 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 180 if (elem->state != THREAD_DONE) { 181 continue; 182 } 183 184 trace_thread_pool_complete(pool, elem, elem->common.opaque, 185 elem->ret); 186 QLIST_REMOVE(elem, all); 187 188 if (elem->common.cb) { 189 /* Read state before ret. */ 190 smp_rmb(); 191 192 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 193 * wait for another request that completed at the same time. 194 */ 195 qemu_bh_schedule(pool->completion_bh); 196 197 elem->common.cb(elem->common.opaque, elem->ret); 198 199 /* We can safely cancel the completion_bh here regardless of someone 200 * else having scheduled it meanwhile because we reenter the 201 * completion function anyway (goto restart). 202 */ 203 qemu_bh_cancel(pool->completion_bh); 204 205 qemu_aio_unref(elem); 206 goto restart; 207 } else { 208 qemu_aio_unref(elem); 209 } 210 } 211 } 212 213 static void thread_pool_cancel(BlockAIOCB *acb) 214 { 215 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 216 ThreadPool *pool = elem->pool; 217 218 trace_thread_pool_cancel(elem, elem->common.opaque); 219 220 QEMU_LOCK_GUARD(&pool->lock); 221 if (elem->state == THREAD_QUEUED) { 222 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 223 qemu_bh_schedule(pool->completion_bh); 224 225 elem->state = THREAD_DONE; 226 elem->ret = -ECANCELED; 227 } 228 229 } 230 231 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 232 { 233 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 234 ThreadPool *pool = elem->pool; 235 return pool->ctx; 236 } 237 238 static const AIOCBInfo thread_pool_aiocb_info = { 239 .aiocb_size = sizeof(ThreadPoolElement), 240 .cancel_async = thread_pool_cancel, 241 .get_aio_context = thread_pool_get_aio_context, 242 }; 243 244 BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, 245 BlockCompletionFunc *cb, void *opaque) 246 { 247 ThreadPoolElement *req; 248 AioContext *ctx = qemu_get_current_aio_context(); 249 ThreadPool *pool = aio_get_thread_pool(ctx); 250 251 /* Assert that the thread submitting work is the same running the pool */ 252 assert(pool->ctx == qemu_get_current_aio_context()); 253 254 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 255 req->func = func; 256 req->arg = arg; 257 req->state = THREAD_QUEUED; 258 req->pool = pool; 259 260 QLIST_INSERT_HEAD(&pool->head, req, all); 261 262 trace_thread_pool_submit(pool, req, arg); 263 264 qemu_mutex_lock(&pool->lock); 265 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 266 spawn_thread(pool); 267 } 268 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 269 qemu_mutex_unlock(&pool->lock); 270 qemu_cond_signal(&pool->request_cond); 271 return &req->common; 272 } 273 274 typedef struct ThreadPoolCo { 275 Coroutine *co; 276 int ret; 277 } ThreadPoolCo; 278 279 static void thread_pool_co_cb(void *opaque, int ret) 280 { 281 ThreadPoolCo *co = opaque; 282 283 co->ret = ret; 284 aio_co_wake(co->co); 285 } 286 287 int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) 288 { 289 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 290 assert(qemu_in_coroutine()); 291 thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); 292 qemu_coroutine_yield(); 293 return tpc.ret; 294 } 295 296 void thread_pool_submit(ThreadPoolFunc *func, void *arg) 297 { 298 thread_pool_submit_aio(func, arg, NULL, NULL); 299 } 300 301 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) 302 { 303 qemu_mutex_lock(&pool->lock); 304 305 pool->min_threads = ctx->thread_pool_min; 306 pool->max_threads = ctx->thread_pool_max; 307 308 /* 309 * We either have to: 310 * - Increase the number available of threads until over the min_threads 311 * threshold. 312 * - Bump the worker threads so that they exit, until under the max_threads 313 * threshold. 314 * - Do nothing. The current number of threads fall in between the min and 315 * max thresholds. We'll let the pool manage itself. 316 */ 317 for (int i = pool->cur_threads; i < pool->min_threads; i++) { 318 spawn_thread(pool); 319 } 320 321 for (int i = pool->cur_threads; i > pool->max_threads; i--) { 322 qemu_cond_signal(&pool->request_cond); 323 } 324 325 qemu_mutex_unlock(&pool->lock); 326 } 327 328 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 329 { 330 if (!ctx) { 331 ctx = qemu_get_aio_context(); 332 } 333 334 memset(pool, 0, sizeof(*pool)); 335 pool->ctx = ctx; 336 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 337 qemu_mutex_init(&pool->lock); 338 qemu_cond_init(&pool->worker_stopped); 339 qemu_cond_init(&pool->request_cond); 340 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 341 342 QLIST_INIT(&pool->head); 343 QTAILQ_INIT(&pool->request_list); 344 345 thread_pool_update_params(pool, ctx); 346 } 347 348 ThreadPool *thread_pool_new(AioContext *ctx) 349 { 350 ThreadPool *pool = g_new(ThreadPool, 1); 351 thread_pool_init_one(pool, ctx); 352 return pool; 353 } 354 355 void thread_pool_free(ThreadPool *pool) 356 { 357 if (!pool) { 358 return; 359 } 360 361 assert(QLIST_EMPTY(&pool->head)); 362 363 qemu_mutex_lock(&pool->lock); 364 365 /* Stop new threads from spawning */ 366 qemu_bh_delete(pool->new_thread_bh); 367 pool->cur_threads -= pool->new_threads; 368 pool->new_threads = 0; 369 370 /* Wait for worker threads to terminate */ 371 pool->max_threads = 0; 372 qemu_cond_broadcast(&pool->request_cond); 373 while (pool->cur_threads > 0) { 374 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 375 } 376 377 qemu_mutex_unlock(&pool->lock); 378 379 qemu_bh_delete(pool->completion_bh); 380 qemu_cond_destroy(&pool->request_cond); 381 qemu_cond_destroy(&pool->worker_stopped); 382 qemu_mutex_destroy(&pool->lock); 383 g_free(pool); 384 } 385