1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu-common.h" 19 #include "qemu/queue.h" 20 #include "qemu/thread.h" 21 #include "qemu/coroutine.h" 22 #include "trace.h" 23 #include "block/thread-pool.h" 24 #include "qemu/main-loop.h" 25 26 static void do_spawn_thread(ThreadPool *pool); 27 28 typedef struct ThreadPoolElement ThreadPoolElement; 29 30 enum ThreadState { 31 THREAD_QUEUED, 32 THREAD_ACTIVE, 33 THREAD_DONE, 34 }; 35 36 struct ThreadPoolElement { 37 BlockAIOCB common; 38 ThreadPool *pool; 39 ThreadPoolFunc *func; 40 void *arg; 41 42 /* Moving state out of THREAD_QUEUED is protected by lock. After 43 * that, only the worker thread can write to it. Reads and writes 44 * of state and ret are ordered with memory barriers. 45 */ 46 enum ThreadState state; 47 int ret; 48 49 /* Access to this list is protected by lock. */ 50 QTAILQ_ENTRY(ThreadPoolElement) reqs; 51 52 /* Access to this list is protected by the global mutex. */ 53 QLIST_ENTRY(ThreadPoolElement) all; 54 }; 55 56 struct ThreadPool { 57 AioContext *ctx; 58 QEMUBH *completion_bh; 59 QemuMutex lock; 60 QemuCond worker_stopped; 61 QemuSemaphore sem; 62 int max_threads; 63 QEMUBH *new_thread_bh; 64 65 /* The following variables are only accessed from one AioContext. */ 66 QLIST_HEAD(, ThreadPoolElement) head; 67 68 /* The following variables are protected by lock. */ 69 QTAILQ_HEAD(, ThreadPoolElement) request_list; 70 int cur_threads; 71 int idle_threads; 72 int new_threads; /* backlog of threads we need to create */ 73 int pending_threads; /* threads created but not running yet */ 74 bool stopping; 75 }; 76 77 static void *worker_thread(void *opaque) 78 { 79 ThreadPool *pool = opaque; 80 81 qemu_mutex_lock(&pool->lock); 82 pool->pending_threads--; 83 do_spawn_thread(pool); 84 85 while (!pool->stopping) { 86 ThreadPoolElement *req; 87 int ret; 88 89 do { 90 pool->idle_threads++; 91 qemu_mutex_unlock(&pool->lock); 92 ret = qemu_sem_timedwait(&pool->sem, 10000); 93 qemu_mutex_lock(&pool->lock); 94 pool->idle_threads--; 95 } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); 96 if (ret == -1 || pool->stopping) { 97 break; 98 } 99 100 req = QTAILQ_FIRST(&pool->request_list); 101 QTAILQ_REMOVE(&pool->request_list, req, reqs); 102 req->state = THREAD_ACTIVE; 103 qemu_mutex_unlock(&pool->lock); 104 105 ret = req->func(req->arg); 106 107 req->ret = ret; 108 /* Write ret before state. */ 109 smp_wmb(); 110 req->state = THREAD_DONE; 111 112 qemu_mutex_lock(&pool->lock); 113 114 qemu_bh_schedule(pool->completion_bh); 115 } 116 117 pool->cur_threads--; 118 qemu_cond_signal(&pool->worker_stopped); 119 qemu_mutex_unlock(&pool->lock); 120 return NULL; 121 } 122 123 static void do_spawn_thread(ThreadPool *pool) 124 { 125 QemuThread t; 126 127 /* Runs with lock taken. */ 128 if (!pool->new_threads) { 129 return; 130 } 131 132 pool->new_threads--; 133 pool->pending_threads++; 134 135 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 136 } 137 138 static void spawn_thread_bh_fn(void *opaque) 139 { 140 ThreadPool *pool = opaque; 141 142 qemu_mutex_lock(&pool->lock); 143 do_spawn_thread(pool); 144 qemu_mutex_unlock(&pool->lock); 145 } 146 147 static void spawn_thread(ThreadPool *pool) 148 { 149 pool->cur_threads++; 150 pool->new_threads++; 151 /* If there are threads being created, they will spawn new workers, so 152 * we don't spend time creating many threads in a loop holding a mutex or 153 * starving the current vcpu. 154 * 155 * If there are no idle threads, ask the main thread to create one, so we 156 * inherit the correct affinity instead of the vcpu affinity. 157 */ 158 if (!pool->pending_threads) { 159 qemu_bh_schedule(pool->new_thread_bh); 160 } 161 } 162 163 static void thread_pool_completion_bh(void *opaque) 164 { 165 ThreadPool *pool = opaque; 166 ThreadPoolElement *elem, *next; 167 168 aio_context_acquire(pool->ctx); 169 restart: 170 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 171 if (elem->state != THREAD_DONE) { 172 continue; 173 } 174 175 trace_thread_pool_complete(pool, elem, elem->common.opaque, 176 elem->ret); 177 QLIST_REMOVE(elem, all); 178 179 if (elem->common.cb) { 180 /* Read state before ret. */ 181 smp_rmb(); 182 183 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 184 * wait for another request that completed at the same time. 185 */ 186 qemu_bh_schedule(pool->completion_bh); 187 188 aio_context_release(pool->ctx); 189 elem->common.cb(elem->common.opaque, elem->ret); 190 aio_context_acquire(pool->ctx); 191 qemu_aio_unref(elem); 192 goto restart; 193 } else { 194 qemu_aio_unref(elem); 195 } 196 } 197 aio_context_release(pool->ctx); 198 } 199 200 static void thread_pool_cancel(BlockAIOCB *acb) 201 { 202 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 203 ThreadPool *pool = elem->pool; 204 205 trace_thread_pool_cancel(elem, elem->common.opaque); 206 207 qemu_mutex_lock(&pool->lock); 208 if (elem->state == THREAD_QUEUED && 209 /* No thread has yet started working on elem. we can try to "steal" 210 * the item from the worker if we can get a signal from the 211 * semaphore. Because this is non-blocking, we can do it with 212 * the lock taken and ensure that elem will remain THREAD_QUEUED. 213 */ 214 qemu_sem_timedwait(&pool->sem, 0) == 0) { 215 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 216 qemu_bh_schedule(pool->completion_bh); 217 218 elem->state = THREAD_DONE; 219 elem->ret = -ECANCELED; 220 } 221 222 qemu_mutex_unlock(&pool->lock); 223 } 224 225 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 226 { 227 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 228 ThreadPool *pool = elem->pool; 229 return pool->ctx; 230 } 231 232 static const AIOCBInfo thread_pool_aiocb_info = { 233 .aiocb_size = sizeof(ThreadPoolElement), 234 .cancel_async = thread_pool_cancel, 235 .get_aio_context = thread_pool_get_aio_context, 236 }; 237 238 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, 239 ThreadPoolFunc *func, void *arg, 240 BlockCompletionFunc *cb, void *opaque) 241 { 242 ThreadPoolElement *req; 243 244 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 245 req->func = func; 246 req->arg = arg; 247 req->state = THREAD_QUEUED; 248 req->pool = pool; 249 250 QLIST_INSERT_HEAD(&pool->head, req, all); 251 252 trace_thread_pool_submit(pool, req, arg); 253 254 qemu_mutex_lock(&pool->lock); 255 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 256 spawn_thread(pool); 257 } 258 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 259 qemu_mutex_unlock(&pool->lock); 260 qemu_sem_post(&pool->sem); 261 return &req->common; 262 } 263 264 typedef struct ThreadPoolCo { 265 Coroutine *co; 266 int ret; 267 } ThreadPoolCo; 268 269 static void thread_pool_co_cb(void *opaque, int ret) 270 { 271 ThreadPoolCo *co = opaque; 272 273 co->ret = ret; 274 aio_co_wake(co->co); 275 } 276 277 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, 278 void *arg) 279 { 280 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 281 assert(qemu_in_coroutine()); 282 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); 283 qemu_coroutine_yield(); 284 return tpc.ret; 285 } 286 287 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) 288 { 289 thread_pool_submit_aio(pool, func, arg, NULL, NULL); 290 } 291 292 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 293 { 294 if (!ctx) { 295 ctx = qemu_get_aio_context(); 296 } 297 298 memset(pool, 0, sizeof(*pool)); 299 pool->ctx = ctx; 300 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 301 qemu_mutex_init(&pool->lock); 302 qemu_cond_init(&pool->worker_stopped); 303 qemu_sem_init(&pool->sem, 0); 304 pool->max_threads = 64; 305 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 306 307 QLIST_INIT(&pool->head); 308 QTAILQ_INIT(&pool->request_list); 309 } 310 311 ThreadPool *thread_pool_new(AioContext *ctx) 312 { 313 ThreadPool *pool = g_new(ThreadPool, 1); 314 thread_pool_init_one(pool, ctx); 315 return pool; 316 } 317 318 void thread_pool_free(ThreadPool *pool) 319 { 320 if (!pool) { 321 return; 322 } 323 324 assert(QLIST_EMPTY(&pool->head)); 325 326 qemu_mutex_lock(&pool->lock); 327 328 /* Stop new threads from spawning */ 329 qemu_bh_delete(pool->new_thread_bh); 330 pool->cur_threads -= pool->new_threads; 331 pool->new_threads = 0; 332 333 /* Wait for worker threads to terminate */ 334 pool->stopping = true; 335 while (pool->cur_threads > 0) { 336 qemu_sem_post(&pool->sem); 337 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 338 } 339 340 qemu_mutex_unlock(&pool->lock); 341 342 qemu_bh_delete(pool->completion_bh); 343 qemu_sem_destroy(&pool->sem); 344 qemu_cond_destroy(&pool->worker_stopped); 345 qemu_mutex_destroy(&pool->lock); 346 g_free(pool); 347 } 348