]> Git Repo - qemu.git/blob - block/throttle-groups.c
block: move restarting of throttled reqs to block/throttle-groups.c
[qemu.git] / block / throttle-groups.c
1 /*
2  * QEMU block throttling group infrastructure
3  *
4  * Copyright (C) Nodalink, EURL. 2014
5  * Copyright (C) Igalia, S.L. 2015
6  *
7  * Authors:
8  *   BenoĆ®t Canet <[email protected]>
9  *   Alberto Garcia <[email protected]>
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License as
13  * published by the Free Software Foundation; either version 2 or
14  * (at your option) version 3 of the License.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, see <http://www.gnu.org/licenses/>.
23  */
24
25 #include "qemu/osdep.h"
26 #include "block/throttle-groups.h"
27 #include "qemu/queue.h"
28 #include "qemu/thread.h"
29 #include "sysemu/qtest.h"
30
31 /* The ThrottleGroup structure (with its ThrottleState) is shared
32  * among different BlockDriverState and it's independent from
33  * AioContext, so in order to use it from different threads it needs
34  * its own locking.
35  *
36  * This locking is however handled internally in this file, so it's
37  * transparent to outside users.
38  *
39  * The whole ThrottleGroup structure is private and invisible to
40  * outside users, that only use it through its ThrottleState.
41  *
42  * In addition to the ThrottleGroup structure, BlockDriverState has
43  * fields that need to be accessed by other members of the group and
44  * therefore also need to be protected by this lock. Once a BDS is
45  * registered in a group those fields can be accessed by other threads
46  * any time.
47  *
48  * Again, all this is handled internally and is mostly transparent to
49  * the outside. The 'throttle_timers' field however has an additional
50  * constraint because it may be temporarily invalid (see for example
51  * bdrv_set_aio_context()). Therefore in this file a thread will
52  * access some other BDS's timers only after verifying that that BDS
53  * has throttled requests in the queue.
54  */
55 typedef struct ThrottleGroup {
56     char *name; /* This is constant during the lifetime of the group */
57
58     QemuMutex lock; /* This lock protects the following four fields */
59     ThrottleState ts;
60     QLIST_HEAD(, BlockDriverState) head;
61     BlockDriverState *tokens[2];
62     bool any_timer_armed[2];
63
64     /* These two are protected by the global throttle_groups_lock */
65     unsigned refcount;
66     QTAILQ_ENTRY(ThrottleGroup) list;
67 } ThrottleGroup;
68
69 static QemuMutex throttle_groups_lock;
70 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
71     QTAILQ_HEAD_INITIALIZER(throttle_groups);
72
73 /* Increments the reference count of a ThrottleGroup given its name.
74  *
75  * If no ThrottleGroup is found with the given name a new one is
76  * created.
77  *
78  * @name: the name of the ThrottleGroup
79  * @ret:  the ThrottleState member of the ThrottleGroup
80  */
81 ThrottleState *throttle_group_incref(const char *name)
82 {
83     ThrottleGroup *tg = NULL;
84     ThrottleGroup *iter;
85
86     qemu_mutex_lock(&throttle_groups_lock);
87
88     /* Look for an existing group with that name */
89     QTAILQ_FOREACH(iter, &throttle_groups, list) {
90         if (!strcmp(name, iter->name)) {
91             tg = iter;
92             break;
93         }
94     }
95
96     /* Create a new one if not found */
97     if (!tg) {
98         tg = g_new0(ThrottleGroup, 1);
99         tg->name = g_strdup(name);
100         qemu_mutex_init(&tg->lock);
101         throttle_init(&tg->ts);
102         QLIST_INIT(&tg->head);
103
104         QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
105     }
106
107     tg->refcount++;
108
109     qemu_mutex_unlock(&throttle_groups_lock);
110
111     return &tg->ts;
112 }
113
114 /* Decrease the reference count of a ThrottleGroup.
115  *
116  * When the reference count reaches zero the ThrottleGroup is
117  * destroyed.
118  *
119  * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
120  */
121 void throttle_group_unref(ThrottleState *ts)
122 {
123     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
124
125     qemu_mutex_lock(&throttle_groups_lock);
126     if (--tg->refcount == 0) {
127         QTAILQ_REMOVE(&throttle_groups, tg, list);
128         qemu_mutex_destroy(&tg->lock);
129         g_free(tg->name);
130         g_free(tg);
131     }
132     qemu_mutex_unlock(&throttle_groups_lock);
133 }
134
135 /* Get the name from a BlockDriverState's ThrottleGroup. The name (and
136  * the pointer) is guaranteed to remain constant during the lifetime
137  * of the group.
138  *
139  * @bs:   a BlockDriverState that is member of a throttling group
140  * @ret:  the name of the group.
141  */
142 const char *throttle_group_get_name(BlockDriverState *bs)
143 {
144     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
145     return tg->name;
146 }
147
148 /* Return the next BlockDriverState in the round-robin sequence,
149  * simulating a circular list.
150  *
151  * This assumes that tg->lock is held.
152  *
153  * @bs:  the current BlockDriverState
154  * @ret: the next BlockDriverState in the sequence
155  */
156 static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
157 {
158     ThrottleState *ts = bs->throttle_state;
159     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
160     BlockDriverState *next = QLIST_NEXT(bs, round_robin);
161
162     if (!next) {
163         return QLIST_FIRST(&tg->head);
164     }
165
166     return next;
167 }
168
169 /* Return the next BlockDriverState in the round-robin sequence with
170  * pending I/O requests.
171  *
172  * This assumes that tg->lock is held.
173  *
174  * @bs:        the current BlockDriverState
175  * @is_write:  the type of operation (read/write)
176  * @ret:       the next BlockDriverState with pending requests, or bs
177  *             if there is none.
178  */
179 static BlockDriverState *next_throttle_token(BlockDriverState *bs,
180                                              bool is_write)
181 {
182     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
183     BlockDriverState *token, *start;
184
185     start = token = tg->tokens[is_write];
186
187     /* get next bs round in round robin style */
188     token = throttle_group_next_bs(token);
189     while (token != start && !token->pending_reqs[is_write]) {
190         token = throttle_group_next_bs(token);
191     }
192
193     /* If no IO are queued for scheduling on the next round robin token
194      * then decide the token is the current bs because chances are
195      * the current bs get the current request queued.
196      */
197     if (token == start && !token->pending_reqs[is_write]) {
198         token = bs;
199     }
200
201     return token;
202 }
203
204 /* Check if the next I/O request for a BlockDriverState needs to be
205  * throttled or not. If there's no timer set in this group, set one
206  * and update the token accordingly.
207  *
208  * This assumes that tg->lock is held.
209  *
210  * @bs:         the current BlockDriverState
211  * @is_write:   the type of operation (read/write)
212  * @ret:        whether the I/O request needs to be throttled or not
213  */
214 static bool throttle_group_schedule_timer(BlockDriverState *bs,
215                                           bool is_write)
216 {
217     ThrottleState *ts = bs->throttle_state;
218     ThrottleTimers *tt = &bs->throttle_timers;
219     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
220     bool must_wait;
221
222     /* Check if any of the timers in this group is already armed */
223     if (tg->any_timer_armed[is_write]) {
224         return true;
225     }
226
227     must_wait = throttle_schedule_timer(ts, tt, is_write);
228
229     /* If a timer just got armed, set bs as the current token */
230     if (must_wait) {
231         tg->tokens[is_write] = bs;
232         tg->any_timer_armed[is_write] = true;
233     }
234
235     return must_wait;
236 }
237
238 /* Look for the next pending I/O request and schedule it.
239  *
240  * This assumes that tg->lock is held.
241  *
242  * @bs:        the current BlockDriverState
243  * @is_write:  the type of operation (read/write)
244  */
245 static void schedule_next_request(BlockDriverState *bs, bool is_write)
246 {
247     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
248     bool must_wait;
249     BlockDriverState *token;
250
251     /* Check if there's any pending request to schedule next */
252     token = next_throttle_token(bs, is_write);
253     if (!token->pending_reqs[is_write]) {
254         return;
255     }
256
257     /* Set a timer for the request if it needs to be throttled */
258     must_wait = throttle_group_schedule_timer(token, is_write);
259
260     /* If it doesn't have to wait, queue it for immediate execution */
261     if (!must_wait) {
262         /* Give preference to requests from the current bs */
263         if (qemu_in_coroutine() &&
264             qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
265             token = bs;
266         } else {
267             ThrottleTimers *tt = &token->throttle_timers;
268             int64_t now = qemu_clock_get_ns(tt->clock_type);
269             timer_mod(tt->timers[is_write], now + 1);
270             tg->any_timer_armed[is_write] = true;
271         }
272         tg->tokens[is_write] = token;
273     }
274 }
275
276 /* Check if an I/O request needs to be throttled, wait and set a timer
277  * if necessary, and schedule the next request using a round robin
278  * algorithm.
279  *
280  * @bs:        the current BlockDriverState
281  * @bytes:     the number of bytes for this I/O
282  * @is_write:  the type of operation (read/write)
283  */
284 void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs,
285                                                         unsigned int bytes,
286                                                         bool is_write)
287 {
288     bool must_wait;
289     BlockDriverState *token;
290
291     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
292     qemu_mutex_lock(&tg->lock);
293
294     /* First we check if this I/O has to be throttled. */
295     token = next_throttle_token(bs, is_write);
296     must_wait = throttle_group_schedule_timer(token, is_write);
297
298     /* Wait if there's a timer set or queued requests of this type */
299     if (must_wait || bs->pending_reqs[is_write]) {
300         bs->pending_reqs[is_write]++;
301         qemu_mutex_unlock(&tg->lock);
302         qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
303         qemu_mutex_lock(&tg->lock);
304         bs->pending_reqs[is_write]--;
305     }
306
307     /* The I/O will be executed, so do the accounting */
308     throttle_account(bs->throttle_state, is_write, bytes);
309
310     /* Schedule the next request */
311     schedule_next_request(bs, is_write);
312
313     qemu_mutex_unlock(&tg->lock);
314 }
315
316 void throttle_group_restart_bs(BlockDriverState *bs)
317 {
318     int i;
319
320     for (i = 0; i < 2; i++) {
321         while (qemu_co_enter_next(&bs->throttled_reqs[i])) {
322             ;
323         }
324     }
325 }
326
327 /* Update the throttle configuration for a particular group. Similar
328  * to throttle_config(), but guarantees atomicity within the
329  * throttling group.
330  *
331  * @bs:  a BlockDriverState that is member of the group
332  * @cfg: the configuration to set
333  */
334 void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg)
335 {
336     ThrottleTimers *tt = &bs->throttle_timers;
337     ThrottleState *ts = bs->throttle_state;
338     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
339     qemu_mutex_lock(&tg->lock);
340     /* throttle_config() cancels the timers */
341     if (timer_pending(tt->timers[0])) {
342         tg->any_timer_armed[0] = false;
343     }
344     if (timer_pending(tt->timers[1])) {
345         tg->any_timer_armed[1] = false;
346     }
347     throttle_config(ts, tt, cfg);
348     qemu_mutex_unlock(&tg->lock);
349
350     qemu_co_enter_next(&bs->throttled_reqs[0]);
351     qemu_co_enter_next(&bs->throttled_reqs[1]);
352 }
353
354 /* Get the throttle configuration from a particular group. Similar to
355  * throttle_get_config(), but guarantees atomicity within the
356  * throttling group.
357  *
358  * @bs:  a BlockDriverState that is member of the group
359  * @cfg: the configuration will be written here
360  */
361 void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg)
362 {
363     ThrottleState *ts = bs->throttle_state;
364     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
365     qemu_mutex_lock(&tg->lock);
366     throttle_get_config(ts, cfg);
367     qemu_mutex_unlock(&tg->lock);
368 }
369
370 /* ThrottleTimers callback. This wakes up a request that was waiting
371  * because it had been throttled.
372  *
373  * @bs:        the BlockDriverState whose request had been throttled
374  * @is_write:  the type of operation (read/write)
375  */
376 static void timer_cb(BlockDriverState *bs, bool is_write)
377 {
378     ThrottleState *ts = bs->throttle_state;
379     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
380     bool empty_queue;
381
382     /* The timer has just been fired, so we can update the flag */
383     qemu_mutex_lock(&tg->lock);
384     tg->any_timer_armed[is_write] = false;
385     qemu_mutex_unlock(&tg->lock);
386
387     /* Run the request that was waiting for this timer */
388     empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]);
389
390     /* If the request queue was empty then we have to take care of
391      * scheduling the next one */
392     if (empty_queue) {
393         qemu_mutex_lock(&tg->lock);
394         schedule_next_request(bs, is_write);
395         qemu_mutex_unlock(&tg->lock);
396     }
397 }
398
399 static void read_timer_cb(void *opaque)
400 {
401     timer_cb(opaque, false);
402 }
403
404 static void write_timer_cb(void *opaque)
405 {
406     timer_cb(opaque, true);
407 }
408
409 /* Register a BlockDriverState in the throttling group, also
410  * initializing its timers and updating its throttle_state pointer to
411  * point to it. If a throttling group with that name does not exist
412  * yet, it will be created.
413  *
414  * @bs:        the BlockDriverState to insert
415  * @groupname: the name of the group
416  */
417 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname)
418 {
419     int i;
420     ThrottleState *ts = throttle_group_incref(groupname);
421     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
422     int clock_type = QEMU_CLOCK_REALTIME;
423
424     if (qtest_enabled()) {
425         /* For testing block IO throttling only */
426         clock_type = QEMU_CLOCK_VIRTUAL;
427     }
428
429     bs->throttle_state = ts;
430
431     qemu_mutex_lock(&tg->lock);
432     /* If the ThrottleGroup is new set this BlockDriverState as the token */
433     for (i = 0; i < 2; i++) {
434         if (!tg->tokens[i]) {
435             tg->tokens[i] = bs;
436         }
437     }
438
439     QLIST_INSERT_HEAD(&tg->head, bs, round_robin);
440
441     throttle_timers_init(&bs->throttle_timers,
442                          bdrv_get_aio_context(bs),
443                          clock_type,
444                          read_timer_cb,
445                          write_timer_cb,
446                          bs);
447
448     qemu_mutex_unlock(&tg->lock);
449 }
450
451 /* Unregister a BlockDriverState from its group, removing it from the
452  * list, destroying the timers and setting the throttle_state pointer
453  * to NULL.
454  *
455  * The BlockDriverState must not have pending throttled requests, so
456  * the caller has to drain them first.
457  *
458  * The group will be destroyed if it's empty after this operation.
459  *
460  * @bs: the BlockDriverState to remove
461  */
462 void throttle_group_unregister_bs(BlockDriverState *bs)
463 {
464     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
465     int i;
466
467     assert(bs->pending_reqs[0] == 0 && bs->pending_reqs[1] == 0);
468     assert(qemu_co_queue_empty(&bs->throttled_reqs[0]));
469     assert(qemu_co_queue_empty(&bs->throttled_reqs[1]));
470
471     qemu_mutex_lock(&tg->lock);
472     for (i = 0; i < 2; i++) {
473         if (tg->tokens[i] == bs) {
474             BlockDriverState *token = throttle_group_next_bs(bs);
475             /* Take care of the case where this is the last bs in the group */
476             if (token == bs) {
477                 token = NULL;
478             }
479             tg->tokens[i] = token;
480         }
481     }
482
483     /* remove the current bs from the list */
484     QLIST_REMOVE(bs, round_robin);
485     throttle_timers_destroy(&bs->throttle_timers);
486     qemu_mutex_unlock(&tg->lock);
487
488     throttle_group_unref(&tg->ts);
489     bs->throttle_state = NULL;
490 }
491
492 static void throttle_groups_init(void)
493 {
494     qemu_mutex_init(&throttle_groups_lock);
495 }
496
497 block_init(throttle_groups_init);
This page took 0.050976 seconds and 4 git commands to generate.