1 #include "qemu/osdep.h" 2 #include "block/aio.h" 3 #include "block/thread-pool.h" 4 #include "block/block.h" 5 #include "qapi/error.h" 6 #include "qemu/timer.h" 7 #include "qemu/error-report.h" 8 #include "qemu/main-loop.h" 9 10 static AioContext *ctx; 11 static int active; 12 13 typedef struct { 14 BlockAIOCB *aiocb; 15 int n; 16 int ret; 17 } WorkerTestData; 18 19 static int worker_cb(void *opaque) 20 { 21 WorkerTestData *data = opaque; 22 return qatomic_fetch_inc(&data->n); 23 } 24 25 static int long_cb(void *opaque) 26 { 27 WorkerTestData *data = opaque; 28 if (qatomic_cmpxchg(&data->n, 0, 1) == 0) { 29 g_usleep(2000000); 30 qatomic_or(&data->n, 2); 31 } 32 return 0; 33 } 34 35 static void done_cb(void *opaque, int ret) 36 { 37 WorkerTestData *data = opaque; 38 g_assert(data->ret == -EINPROGRESS || data->ret == -ECANCELED); 39 data->ret = ret; 40 data->aiocb = NULL; 41 42 /* Callbacks are serialized, so no need to use atomic ops. */ 43 active--; 44 } 45 46 static void test_submit(void) 47 { 48 WorkerTestData data = { .n = 0 }; 49 thread_pool_submit(worker_cb, &data); 50 while (data.n == 0) { 51 aio_poll(ctx, true); 52 } 53 g_assert_cmpint(data.n, ==, 1); 54 } 55 56 static void test_submit_aio(void) 57 { 58 WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; 59 data.aiocb = thread_pool_submit_aio(worker_cb, &data, 60 done_cb, &data); 61 62 /* The callbacks are not called until after the first wait. */ 63 active = 1; 64 g_assert_cmpint(data.ret, ==, -EINPROGRESS); 65 while (data.ret == -EINPROGRESS) { 66 aio_poll(ctx, true); 67 } 68 g_assert_cmpint(active, ==, 0); 69 g_assert_cmpint(data.n, ==, 1); 70 g_assert_cmpint(data.ret, ==, 0); 71 } 72 73 static void coroutine_fn co_test_cb(void *opaque) 74 { 75 WorkerTestData *data = opaque; 76 77 active = 1; 78 data->n = 0; 79 data->ret = -EINPROGRESS; 80 thread_pool_submit_co(worker_cb, data); 81 82 /* The test continues in test_submit_co, after qemu_coroutine_enter... */ 83 84 g_assert_cmpint(data->n, ==, 1); 85 data->ret = 0; 86 active--; 87 88 /* The test continues in test_submit_co, after aio_poll... */ 89 } 90 91 static void test_submit_co(void) 92 { 93 WorkerTestData data; 94 Coroutine *co = qemu_coroutine_create(co_test_cb, &data); 95 96 qemu_coroutine_enter(co); 97 98 /* Back here once the worker has started. */ 99 100 g_assert_cmpint(active, ==, 1); 101 g_assert_cmpint(data.ret, ==, -EINPROGRESS); 102 103 /* aio_poll will execute the rest of the coroutine. */ 104 105 while (data.ret == -EINPROGRESS) { 106 aio_poll(ctx, true); 107 } 108 109 /* Back here after the coroutine has finished. */ 110 111 g_assert_cmpint(active, ==, 0); 112 g_assert_cmpint(data.ret, ==, 0); 113 } 114 115 static void test_submit_many(void) 116 { 117 WorkerTestData data[100]; 118 int i; 119 120 /* Start more work items than there will be threads. */ 121 for (i = 0; i < 100; i++) { 122 data[i].n = 0; 123 data[i].ret = -EINPROGRESS; 124 thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); 125 } 126 127 active = 100; 128 while (active > 0) { 129 aio_poll(ctx, true); 130 } 131 for (i = 0; i < 100; i++) { 132 g_assert_cmpint(data[i].n, ==, 1); 133 g_assert_cmpint(data[i].ret, ==, 0); 134 } 135 } 136 137 static void do_test_cancel(bool sync) 138 { 139 WorkerTestData data[100]; 140 int num_canceled; 141 int i; 142 143 /* Start more work items than there will be threads, to ensure 144 * the pool is full. 145 */ 146 test_submit_many(); 147 148 /* Start long running jobs, to ensure we can cancel some. */ 149 for (i = 0; i < 100; i++) { 150 data[i].n = 0; 151 data[i].ret = -EINPROGRESS; 152 data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], 153 done_cb, &data[i]); 154 } 155 156 /* Starting the threads may be left to a bottom half. Let it 157 * run, but do not waste too much time... 158 */ 159 active = 100; 160 aio_notify(ctx); 161 aio_poll(ctx, false); 162 163 /* Wait some time for the threads to start, with some sanity 164 * testing on the behavior of the scheduler... 165 */ 166 g_assert_cmpint(active, ==, 100); 167 g_usleep(1000000); 168 g_assert_cmpint(active, >, 50); 169 170 /* Cancel the jobs that haven't been started yet. */ 171 num_canceled = 0; 172 for (i = 0; i < 100; i++) { 173 if (qatomic_cmpxchg(&data[i].n, 0, 4) == 0) { 174 data[i].ret = -ECANCELED; 175 if (sync) { 176 bdrv_aio_cancel(data[i].aiocb); 177 } else { 178 bdrv_aio_cancel_async(data[i].aiocb); 179 } 180 num_canceled++; 181 } 182 } 183 g_assert_cmpint(active, >, 0); 184 g_assert_cmpint(num_canceled, <, 100); 185 186 for (i = 0; i < 100; i++) { 187 if (data[i].aiocb && qatomic_read(&data[i].n) < 4) { 188 if (sync) { 189 /* Canceling the others will be a blocking operation. */ 190 bdrv_aio_cancel(data[i].aiocb); 191 } else { 192 bdrv_aio_cancel_async(data[i].aiocb); 193 } 194 } 195 } 196 197 /* Finish execution and execute any remaining callbacks. */ 198 while (active > 0) { 199 aio_poll(ctx, true); 200 } 201 g_assert_cmpint(active, ==, 0); 202 for (i = 0; i < 100; i++) { 203 g_assert(data[i].aiocb == NULL); 204 switch (data[i].n) { 205 case 0: 206 fprintf(stderr, "Callback not canceled but never started?\n"); 207 abort(); 208 case 3: 209 /* Couldn't be canceled asynchronously, must have completed. */ 210 g_assert_cmpint(data[i].ret, ==, 0); 211 break; 212 case 4: 213 /* Could be canceled asynchronously, never started. */ 214 g_assert_cmpint(data[i].ret, ==, -ECANCELED); 215 break; 216 default: 217 fprintf(stderr, "Callback aborted while running?\n"); 218 abort(); 219 } 220 } 221 } 222 223 static void test_cancel(void) 224 { 225 do_test_cancel(true); 226 } 227 228 static void test_cancel_async(void) 229 { 230 do_test_cancel(false); 231 } 232 233 int main(int argc, char **argv) 234 { 235 qemu_init_main_loop(&error_abort); 236 ctx = qemu_get_current_aio_context(); 237 238 g_test_init(&argc, &argv, NULL); 239 g_test_add_func("/thread-pool/submit", test_submit); 240 g_test_add_func("/thread-pool/submit-aio", test_submit_aio); 241 g_test_add_func("/thread-pool/submit-co", test_submit_co); 242 g_test_add_func("/thread-pool/submit-many", test_submit_many); 243 g_test_add_func("/thread-pool/cancel", test_cancel); 244 g_test_add_func("/thread-pool/cancel-async", test_cancel_async); 245 246 return g_test_run(); 247 } 248