]>
Commit | Line | Data |
---|---|---|
1 | #include "qemu/osdep.h" | |
2 | #include <glib.h> | |
3 | #include "qemu-common.h" | |
4 | #include "block/aio.h" | |
5 | #include "block/thread-pool.h" | |
6 | #include "block/block.h" | |
7 | #include "qemu/timer.h" | |
8 | #include "qemu/error-report.h" | |
9 | ||
10 | static AioContext *ctx; | |
11 | static ThreadPool *pool; | |
12 | static int active; | |
13 | ||
14 | typedef struct { | |
15 | BlockAIOCB *aiocb; | |
16 | int n; | |
17 | int ret; | |
18 | } WorkerTestData; | |
19 | ||
20 | static int worker_cb(void *opaque) | |
21 | { | |
22 | WorkerTestData *data = opaque; | |
23 | return atomic_fetch_inc(&data->n); | |
24 | } | |
25 | ||
26 | static int long_cb(void *opaque) | |
27 | { | |
28 | WorkerTestData *data = opaque; | |
29 | atomic_inc(&data->n); | |
30 | g_usleep(2000000); | |
31 | atomic_inc(&data->n); | |
32 | return 0; | |
33 | } | |
34 | ||
35 | static void done_cb(void *opaque, int ret) | |
36 | { | |
37 | WorkerTestData *data = opaque; | |
38 | g_assert(data->ret == -EINPROGRESS || data->ret == -ECANCELED); | |
39 | data->ret = ret; | |
40 | data->aiocb = NULL; | |
41 | ||
42 | /* Callbacks are serialized, so no need to use atomic ops. */ | |
43 | active--; | |
44 | } | |
45 | ||
46 | static void test_submit(void) | |
47 | { | |
48 | WorkerTestData data = { .n = 0 }; | |
49 | thread_pool_submit(pool, worker_cb, &data); | |
50 | while (data.n == 0) { | |
51 | aio_poll(ctx, true); | |
52 | } | |
53 | g_assert_cmpint(data.n, ==, 1); | |
54 | } | |
55 | ||
56 | static void test_submit_aio(void) | |
57 | { | |
58 | WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; | |
59 | data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, | |
60 | done_cb, &data); | |
61 | ||
62 | /* The callbacks are not called until after the first wait. */ | |
63 | active = 1; | |
64 | g_assert_cmpint(data.ret, ==, -EINPROGRESS); | |
65 | while (data.ret == -EINPROGRESS) { | |
66 | aio_poll(ctx, true); | |
67 | } | |
68 | g_assert_cmpint(active, ==, 0); | |
69 | g_assert_cmpint(data.n, ==, 1); | |
70 | g_assert_cmpint(data.ret, ==, 0); | |
71 | } | |
72 | ||
73 | static void co_test_cb(void *opaque) | |
74 | { | |
75 | WorkerTestData *data = opaque; | |
76 | ||
77 | active = 1; | |
78 | data->n = 0; | |
79 | data->ret = -EINPROGRESS; | |
80 | thread_pool_submit_co(pool, worker_cb, data); | |
81 | ||
82 | /* The test continues in test_submit_co, after qemu_coroutine_enter... */ | |
83 | ||
84 | g_assert_cmpint(data->n, ==, 1); | |
85 | data->ret = 0; | |
86 | active--; | |
87 | ||
88 | /* The test continues in test_submit_co, after aio_poll... */ | |
89 | } | |
90 | ||
91 | static void test_submit_co(void) | |
92 | { | |
93 | WorkerTestData data; | |
94 | Coroutine *co = qemu_coroutine_create(co_test_cb); | |
95 | ||
96 | qemu_coroutine_enter(co, &data); | |
97 | ||
98 | /* Back here once the worker has started. */ | |
99 | ||
100 | g_assert_cmpint(active, ==, 1); | |
101 | g_assert_cmpint(data.ret, ==, -EINPROGRESS); | |
102 | ||
103 | /* aio_poll will execute the rest of the coroutine. */ | |
104 | ||
105 | while (data.ret == -EINPROGRESS) { | |
106 | aio_poll(ctx, true); | |
107 | } | |
108 | ||
109 | /* Back here after the coroutine has finished. */ | |
110 | ||
111 | g_assert_cmpint(active, ==, 0); | |
112 | g_assert_cmpint(data.ret, ==, 0); | |
113 | } | |
114 | ||
115 | static void test_submit_many(void) | |
116 | { | |
117 | WorkerTestData data[100]; | |
118 | int i; | |
119 | ||
120 | /* Start more work items than there will be threads. */ | |
121 | for (i = 0; i < 100; i++) { | |
122 | data[i].n = 0; | |
123 | data[i].ret = -EINPROGRESS; | |
124 | thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); | |
125 | } | |
126 | ||
127 | active = 100; | |
128 | while (active > 0) { | |
129 | aio_poll(ctx, true); | |
130 | } | |
131 | for (i = 0; i < 100; i++) { | |
132 | g_assert_cmpint(data[i].n, ==, 1); | |
133 | g_assert_cmpint(data[i].ret, ==, 0); | |
134 | } | |
135 | } | |
136 | ||
137 | static void do_test_cancel(bool sync) | |
138 | { | |
139 | WorkerTestData data[100]; | |
140 | int num_canceled; | |
141 | int i; | |
142 | ||
143 | /* Start more work items than there will be threads, to ensure | |
144 | * the pool is full. | |
145 | */ | |
146 | test_submit_many(); | |
147 | ||
148 | /* Start long running jobs, to ensure we can cancel some. */ | |
149 | for (i = 0; i < 100; i++) { | |
150 | data[i].n = 0; | |
151 | data[i].ret = -EINPROGRESS; | |
152 | data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], | |
153 | done_cb, &data[i]); | |
154 | } | |
155 | ||
156 | /* Starting the threads may be left to a bottom half. Let it | |
157 | * run, but do not waste too much time... | |
158 | */ | |
159 | active = 100; | |
160 | aio_notify(ctx); | |
161 | aio_poll(ctx, false); | |
162 | ||
163 | /* Wait some time for the threads to start, with some sanity | |
164 | * testing on the behavior of the scheduler... | |
165 | */ | |
166 | g_assert_cmpint(active, ==, 100); | |
167 | g_usleep(1000000); | |
168 | g_assert_cmpint(active, >, 50); | |
169 | ||
170 | /* Cancel the jobs that haven't been started yet. */ | |
171 | num_canceled = 0; | |
172 | for (i = 0; i < 100; i++) { | |
173 | if (atomic_cmpxchg(&data[i].n, 0, 3) == 0) { | |
174 | data[i].ret = -ECANCELED; | |
175 | if (sync) { | |
176 | bdrv_aio_cancel(data[i].aiocb); | |
177 | } else { | |
178 | bdrv_aio_cancel_async(data[i].aiocb); | |
179 | } | |
180 | num_canceled++; | |
181 | } | |
182 | } | |
183 | g_assert_cmpint(active, >, 0); | |
184 | g_assert_cmpint(num_canceled, <, 100); | |
185 | ||
186 | for (i = 0; i < 100; i++) { | |
187 | if (data[i].aiocb && data[i].n != 3) { | |
188 | if (sync) { | |
189 | /* Canceling the others will be a blocking operation. */ | |
190 | bdrv_aio_cancel(data[i].aiocb); | |
191 | } else { | |
192 | bdrv_aio_cancel_async(data[i].aiocb); | |
193 | } | |
194 | } | |
195 | } | |
196 | ||
197 | /* Finish execution and execute any remaining callbacks. */ | |
198 | while (active > 0) { | |
199 | aio_poll(ctx, true); | |
200 | } | |
201 | g_assert_cmpint(active, ==, 0); | |
202 | for (i = 0; i < 100; i++) { | |
203 | if (data[i].n == 3) { | |
204 | g_assert_cmpint(data[i].ret, ==, -ECANCELED); | |
205 | g_assert(data[i].aiocb == NULL); | |
206 | } else { | |
207 | g_assert_cmpint(data[i].n, ==, 2); | |
208 | g_assert(data[i].ret == 0 || data[i].ret == -ECANCELED); | |
209 | g_assert(data[i].aiocb == NULL); | |
210 | } | |
211 | } | |
212 | } | |
213 | ||
214 | static void test_cancel(void) | |
215 | { | |
216 | do_test_cancel(true); | |
217 | } | |
218 | ||
219 | static void test_cancel_async(void) | |
220 | { | |
221 | do_test_cancel(false); | |
222 | } | |
223 | ||
224 | int main(int argc, char **argv) | |
225 | { | |
226 | int ret; | |
227 | Error *local_error = NULL; | |
228 | ||
229 | init_clocks(); | |
230 | ||
231 | ctx = aio_context_new(&local_error); | |
232 | if (!ctx) { | |
233 | error_reportf_err(local_error, "Failed to create AIO Context: "); | |
234 | exit(1); | |
235 | } | |
236 | pool = aio_get_thread_pool(ctx); | |
237 | ||
238 | g_test_init(&argc, &argv, NULL); | |
239 | g_test_add_func("/thread-pool/submit", test_submit); | |
240 | g_test_add_func("/thread-pool/submit-aio", test_submit_aio); | |
241 | g_test_add_func("/thread-pool/submit-co", test_submit_co); | |
242 | g_test_add_func("/thread-pool/submit-many", test_submit_many); | |
243 | g_test_add_func("/thread-pool/cancel", test_cancel); | |
244 | g_test_add_func("/thread-pool/cancel-async", test_cancel_async); | |
245 | ||
246 | ret = g_test_run(); | |
247 | ||
248 | aio_context_unref(ctx); | |
249 | return ret; | |
250 | } |