1 #include "qemu/osdep.h" 2 #include "block/aio.h" 3 #include "block/thread-pool.h" 4 #include "block/block.h" 5 #include "qapi/error.h" 6 #include "qemu/timer.h" 7 #include "qemu/error-report.h" 8 #include "qemu/main-loop.h" 9 10 static AioContext *ctx; 11 static ThreadPool *pool; 12 static int active; 13 14 typedef struct { 15 BlockAIOCB *aiocb; 16 int n; 17 int ret; 18 } WorkerTestData; 19 20 static int worker_cb(void *opaque) 21 { 22 WorkerTestData *data = opaque; 23 return qatomic_fetch_inc(&data->n); 24 } 25 26 static int long_cb(void *opaque) 27 { 28 WorkerTestData *data = opaque; 29 if (qatomic_cmpxchg(&data->n, 0, 1) == 0) { 30 g_usleep(2000000); 31 qatomic_or(&data->n, 2); 32 } 33 return 0; 34 } 35 36 static void done_cb(void *opaque, int ret) 37 { 38 WorkerTestData *data = opaque; 39 g_assert(data->ret == -EINPROGRESS || data->ret == -ECANCELED); 40 data->ret = ret; 41 data->aiocb = NULL; 42 43 /* Callbacks are serialized, so no need to use atomic ops. */ 44 active--; 45 } 46 47 static void test_submit(void) 48 { 49 WorkerTestData data = { .n = 0 }; 50 thread_pool_submit(pool, worker_cb, &data); 51 while (data.n == 0) { 52 aio_poll(ctx, true); 53 } 54 g_assert_cmpint(data.n, ==, 1); 55 } 56 57 static void test_submit_aio(void) 58 { 59 WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; 60 data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, 61 done_cb, &data); 62 63 /* The callbacks are not called until after the first wait. */ 64 active = 1; 65 g_assert_cmpint(data.ret, ==, -EINPROGRESS); 66 while (data.ret == -EINPROGRESS) { 67 aio_poll(ctx, true); 68 } 69 g_assert_cmpint(active, ==, 0); 70 g_assert_cmpint(data.n, ==, 1); 71 g_assert_cmpint(data.ret, ==, 0); 72 } 73 74 static void co_test_cb(void *opaque) 75 { 76 WorkerTestData *data = opaque; 77 78 active = 1; 79 data->n = 0; 80 data->ret = -EINPROGRESS; 81 thread_pool_submit_co(pool, worker_cb, data); 82 83 /* The test continues in test_submit_co, after qemu_coroutine_enter... */ 84 85 g_assert_cmpint(data->n, ==, 1); 86 data->ret = 0; 87 active--; 88 89 /* The test continues in test_submit_co, after aio_poll... */ 90 } 91 92 static void test_submit_co(void) 93 { 94 WorkerTestData data; 95 Coroutine *co = qemu_coroutine_create(co_test_cb, &data); 96 97 qemu_coroutine_enter(co); 98 99 /* Back here once the worker has started. */ 100 101 g_assert_cmpint(active, ==, 1); 102 g_assert_cmpint(data.ret, ==, -EINPROGRESS); 103 104 /* aio_poll will execute the rest of the coroutine. */ 105 106 while (data.ret == -EINPROGRESS) { 107 aio_poll(ctx, true); 108 } 109 110 /* Back here after the coroutine has finished. */ 111 112 g_assert_cmpint(active, ==, 0); 113 g_assert_cmpint(data.ret, ==, 0); 114 } 115 116 static void test_submit_many(void) 117 { 118 WorkerTestData data[100]; 119 int i; 120 121 /* Start more work items than there will be threads. */ 122 for (i = 0; i < 100; i++) { 123 data[i].n = 0; 124 data[i].ret = -EINPROGRESS; 125 thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); 126 } 127 128 active = 100; 129 while (active > 0) { 130 aio_poll(ctx, true); 131 } 132 for (i = 0; i < 100; i++) { 133 g_assert_cmpint(data[i].n, ==, 1); 134 g_assert_cmpint(data[i].ret, ==, 0); 135 } 136 } 137 138 static void do_test_cancel(bool sync) 139 { 140 WorkerTestData data[100]; 141 int num_canceled; 142 int i; 143 144 /* Start more work items than there will be threads, to ensure 145 * the pool is full. 146 */ 147 test_submit_many(); 148 149 /* Start long running jobs, to ensure we can cancel some. */ 150 for (i = 0; i < 100; i++) { 151 data[i].n = 0; 152 data[i].ret = -EINPROGRESS; 153 data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], 154 done_cb, &data[i]); 155 } 156 157 /* Starting the threads may be left to a bottom half. Let it 158 * run, but do not waste too much time... 159 */ 160 active = 100; 161 aio_notify(ctx); 162 aio_poll(ctx, false); 163 164 /* Wait some time for the threads to start, with some sanity 165 * testing on the behavior of the scheduler... 166 */ 167 g_assert_cmpint(active, ==, 100); 168 g_usleep(1000000); 169 g_assert_cmpint(active, >, 50); 170 171 /* Cancel the jobs that haven't been started yet. */ 172 num_canceled = 0; 173 for (i = 0; i < 100; i++) { 174 if (qatomic_cmpxchg(&data[i].n, 0, 4) == 0) { 175 data[i].ret = -ECANCELED; 176 if (sync) { 177 bdrv_aio_cancel(data[i].aiocb); 178 } else { 179 bdrv_aio_cancel_async(data[i].aiocb); 180 } 181 num_canceled++; 182 } 183 } 184 g_assert_cmpint(active, >, 0); 185 g_assert_cmpint(num_canceled, <, 100); 186 187 for (i = 0; i < 100; i++) { 188 if (data[i].aiocb && qatomic_read(&data[i].n) < 4) { 189 if (sync) { 190 /* Canceling the others will be a blocking operation. */ 191 bdrv_aio_cancel(data[i].aiocb); 192 } else { 193 bdrv_aio_cancel_async(data[i].aiocb); 194 } 195 } 196 } 197 198 /* Finish execution and execute any remaining callbacks. */ 199 while (active > 0) { 200 aio_poll(ctx, true); 201 } 202 g_assert_cmpint(active, ==, 0); 203 for (i = 0; i < 100; i++) { 204 g_assert(data[i].aiocb == NULL); 205 switch (data[i].n) { 206 case 0: 207 fprintf(stderr, "Callback not canceled but never started?\n"); 208 abort(); 209 case 3: 210 /* Couldn't be canceled asynchronously, must have completed. */ 211 g_assert_cmpint(data[i].ret, ==, 0); 212 break; 213 case 4: 214 /* Could be canceled asynchronously, never started. */ 215 g_assert_cmpint(data[i].ret, ==, -ECANCELED); 216 break; 217 default: 218 fprintf(stderr, "Callback aborted while running?\n"); 219 abort(); 220 } 221 } 222 } 223 224 static void test_cancel(void) 225 { 226 do_test_cancel(true); 227 } 228 229 static void test_cancel_async(void) 230 { 231 do_test_cancel(false); 232 } 233 234 int main(int argc, char **argv) 235 { 236 qemu_init_main_loop(&error_abort); 237 ctx = qemu_get_current_aio_context(); 238 pool = aio_get_thread_pool(ctx); 239 240 g_test_init(&argc, &argv, NULL); 241 g_test_add_func("/thread-pool/submit", test_submit); 242 g_test_add_func("/thread-pool/submit-aio", test_submit_aio); 243 g_test_add_func("/thread-pool/submit-co", test_submit_co); 244 g_test_add_func("/thread-pool/submit-many", test_submit_many); 245 g_test_add_func("/thread-pool/cancel", test_cancel); 246 g_test_add_func("/thread-pool/cancel-async", test_cancel_async); 247 248 return g_test_run(); 249 } 250