1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu/queue.h" 19 #include "qemu/thread.h" 20 #include "qemu/coroutine.h" 21 #include "trace.h" 22 #include "block/thread-pool.h" 23 #include "qemu/main-loop.h" 24 25 static void do_spawn_thread(ThreadPool *pool); 26 27 typedef struct ThreadPoolElement ThreadPoolElement; 28 29 enum ThreadState { 30 THREAD_QUEUED, 31 THREAD_ACTIVE, 32 THREAD_DONE, 33 }; 34 35 struct ThreadPoolElement { 36 BlockAIOCB common; 37 ThreadPool *pool; 38 ThreadPoolFunc *func; 39 void *arg; 40 41 /* Moving state out of THREAD_QUEUED is protected by lock. After 42 * that, only the worker thread can write to it. Reads and writes 43 * of state and ret are ordered with memory barriers. 44 */ 45 enum ThreadState state; 46 int ret; 47 48 /* Access to this list is protected by lock. */ 49 QTAILQ_ENTRY(ThreadPoolElement) reqs; 50 51 /* Access to this list is protected by the global mutex. */ 52 QLIST_ENTRY(ThreadPoolElement) all; 53 }; 54 55 struct ThreadPool { 56 AioContext *ctx; 57 QEMUBH *completion_bh; 58 QemuMutex lock; 59 QemuCond worker_stopped; 60 QemuSemaphore sem; 61 int max_threads; 62 QEMUBH *new_thread_bh; 63 64 /* The following variables are only accessed from one AioContext. */ 65 QLIST_HEAD(, ThreadPoolElement) head; 66 67 /* The following variables are protected by lock. */ 68 QTAILQ_HEAD(, ThreadPoolElement) request_list; 69 int cur_threads; 70 int idle_threads; 71 int new_threads; /* backlog of threads we need to create */ 72 int pending_threads; /* threads created but not running yet */ 73 bool stopping; 74 }; 75 76 static void *worker_thread(void *opaque) 77 { 78 ThreadPool *pool = opaque; 79 80 qemu_mutex_lock(&pool->lock); 81 pool->pending_threads--; 82 do_spawn_thread(pool); 83 84 while (!pool->stopping) { 85 ThreadPoolElement *req; 86 int ret; 87 88 do { 89 pool->idle_threads++; 90 qemu_mutex_unlock(&pool->lock); 91 ret = qemu_sem_timedwait(&pool->sem, 10000); 92 qemu_mutex_lock(&pool->lock); 93 pool->idle_threads--; 94 } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); 95 if (ret == -1 || pool->stopping) { 96 break; 97 } 98 99 req = QTAILQ_FIRST(&pool->request_list); 100 QTAILQ_REMOVE(&pool->request_list, req, reqs); 101 req->state = THREAD_ACTIVE; 102 qemu_mutex_unlock(&pool->lock); 103 104 ret = req->func(req->arg); 105 106 req->ret = ret; 107 /* Write ret before state. */ 108 smp_wmb(); 109 req->state = THREAD_DONE; 110 111 qemu_mutex_lock(&pool->lock); 112 113 qemu_bh_schedule(pool->completion_bh); 114 } 115 116 pool->cur_threads--; 117 qemu_cond_signal(&pool->worker_stopped); 118 qemu_mutex_unlock(&pool->lock); 119 return NULL; 120 } 121 122 static void do_spawn_thread(ThreadPool *pool) 123 { 124 QemuThread t; 125 126 /* Runs with lock taken. */ 127 if (!pool->new_threads) { 128 return; 129 } 130 131 pool->new_threads--; 132 pool->pending_threads++; 133 134 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 135 } 136 137 static void spawn_thread_bh_fn(void *opaque) 138 { 139 ThreadPool *pool = opaque; 140 141 qemu_mutex_lock(&pool->lock); 142 do_spawn_thread(pool); 143 qemu_mutex_unlock(&pool->lock); 144 } 145 146 static void spawn_thread(ThreadPool *pool) 147 { 148 pool->cur_threads++; 149 pool->new_threads++; 150 /* If there are threads being created, they will spawn new workers, so 151 * we don't spend time creating many threads in a loop holding a mutex or 152 * starving the current vcpu. 153 * 154 * If there are no idle threads, ask the main thread to create one, so we 155 * inherit the correct affinity instead of the vcpu affinity. 156 */ 157 if (!pool->pending_threads) { 158 qemu_bh_schedule(pool->new_thread_bh); 159 } 160 } 161 162 static void thread_pool_completion_bh(void *opaque) 163 { 164 ThreadPool *pool = opaque; 165 ThreadPoolElement *elem, *next; 166 167 aio_context_acquire(pool->ctx); 168 restart: 169 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 170 if (elem->state != THREAD_DONE) { 171 continue; 172 } 173 174 trace_thread_pool_complete(pool, elem, elem->common.opaque, 175 elem->ret); 176 QLIST_REMOVE(elem, all); 177 178 if (elem->common.cb) { 179 /* Read state before ret. */ 180 smp_rmb(); 181 182 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 183 * wait for another request that completed at the same time. 184 */ 185 qemu_bh_schedule(pool->completion_bh); 186 187 aio_context_release(pool->ctx); 188 elem->common.cb(elem->common.opaque, elem->ret); 189 aio_context_acquire(pool->ctx); 190 191 /* We can safely cancel the completion_bh here regardless of someone 192 * else having scheduled it meanwhile because we reenter the 193 * completion function anyway (goto restart). 194 */ 195 qemu_bh_cancel(pool->completion_bh); 196 197 qemu_aio_unref(elem); 198 goto restart; 199 } else { 200 qemu_aio_unref(elem); 201 } 202 } 203 aio_context_release(pool->ctx); 204 } 205 206 static void thread_pool_cancel(BlockAIOCB *acb) 207 { 208 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 209 ThreadPool *pool = elem->pool; 210 211 trace_thread_pool_cancel(elem, elem->common.opaque); 212 213 qemu_mutex_lock(&pool->lock); 214 if (elem->state == THREAD_QUEUED && 215 /* No thread has yet started working on elem. we can try to "steal" 216 * the item from the worker if we can get a signal from the 217 * semaphore. Because this is non-blocking, we can do it with 218 * the lock taken and ensure that elem will remain THREAD_QUEUED. 219 */ 220 qemu_sem_timedwait(&pool->sem, 0) == 0) { 221 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 222 qemu_bh_schedule(pool->completion_bh); 223 224 elem->state = THREAD_DONE; 225 elem->ret = -ECANCELED; 226 } 227 228 qemu_mutex_unlock(&pool->lock); 229 } 230 231 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 232 { 233 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 234 ThreadPool *pool = elem->pool; 235 return pool->ctx; 236 } 237 238 static const AIOCBInfo thread_pool_aiocb_info = { 239 .aiocb_size = sizeof(ThreadPoolElement), 240 .cancel_async = thread_pool_cancel, 241 .get_aio_context = thread_pool_get_aio_context, 242 }; 243 244 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, 245 ThreadPoolFunc *func, void *arg, 246 BlockCompletionFunc *cb, void *opaque) 247 { 248 ThreadPoolElement *req; 249 250 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 251 req->func = func; 252 req->arg = arg; 253 req->state = THREAD_QUEUED; 254 req->pool = pool; 255 256 QLIST_INSERT_HEAD(&pool->head, req, all); 257 258 trace_thread_pool_submit(pool, req, arg); 259 260 qemu_mutex_lock(&pool->lock); 261 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 262 spawn_thread(pool); 263 } 264 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 265 qemu_mutex_unlock(&pool->lock); 266 qemu_sem_post(&pool->sem); 267 return &req->common; 268 } 269 270 typedef struct ThreadPoolCo { 271 Coroutine *co; 272 int ret; 273 } ThreadPoolCo; 274 275 static void thread_pool_co_cb(void *opaque, int ret) 276 { 277 ThreadPoolCo *co = opaque; 278 279 co->ret = ret; 280 aio_co_wake(co->co); 281 } 282 283 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, 284 void *arg) 285 { 286 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 287 assert(qemu_in_coroutine()); 288 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); 289 qemu_coroutine_yield(); 290 return tpc.ret; 291 } 292 293 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) 294 { 295 thread_pool_submit_aio(pool, func, arg, NULL, NULL); 296 } 297 298 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 299 { 300 if (!ctx) { 301 ctx = qemu_get_aio_context(); 302 } 303 304 memset(pool, 0, sizeof(*pool)); 305 pool->ctx = ctx; 306 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 307 qemu_mutex_init(&pool->lock); 308 qemu_cond_init(&pool->worker_stopped); 309 qemu_sem_init(&pool->sem, 0); 310 pool->max_threads = 64; 311 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 312 313 QLIST_INIT(&pool->head); 314 QTAILQ_INIT(&pool->request_list); 315 } 316 317 ThreadPool *thread_pool_new(AioContext *ctx) 318 { 319 ThreadPool *pool = g_new(ThreadPool, 1); 320 thread_pool_init_one(pool, ctx); 321 return pool; 322 } 323 324 void thread_pool_free(ThreadPool *pool) 325 { 326 if (!pool) { 327 return; 328 } 329 330 assert(QLIST_EMPTY(&pool->head)); 331 332 qemu_mutex_lock(&pool->lock); 333 334 /* Stop new threads from spawning */ 335 qemu_bh_delete(pool->new_thread_bh); 336 pool->cur_threads -= pool->new_threads; 337 pool->new_threads = 0; 338 339 /* Wait for worker threads to terminate */ 340 pool->stopping = true; 341 while (pool->cur_threads > 0) { 342 qemu_sem_post(&pool->sem); 343 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 344 } 345 346 qemu_mutex_unlock(&pool->lock); 347 348 qemu_bh_delete(pool->completion_bh); 349 qemu_sem_destroy(&pool->sem); 350 qemu_cond_destroy(&pool->worker_stopped); 351 qemu_mutex_destroy(&pool->lock); 352 g_free(pool); 353 } 354