]> Git Repo - J-linux.git/blob - io_uring/sqpoll.c
Merge tag 'vfs-6.13-rc7.fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/vfs/vfs
[J-linux.git] / io_uring / sqpoll.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Contains the core associated with submission side polling of the SQ
4  * ring, offloading submissions from the application to a kernel thread.
5  */
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
9 #include <linux/mm.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/cpuset.h>
14 #include <linux/io_uring.h>
15
16 #include <uapi/linux/io_uring.h>
17
18 #include "io_uring.h"
19 #include "napi.h"
20 #include "sqpoll.h"
21
22 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
23 #define IORING_TW_CAP_ENTRIES_VALUE     8
24
25 enum {
26         IO_SQ_THREAD_SHOULD_STOP = 0,
27         IO_SQ_THREAD_SHOULD_PARK,
28 };
29
30 void io_sq_thread_unpark(struct io_sq_data *sqd)
31         __releases(&sqd->lock)
32 {
33         WARN_ON_ONCE(sqd->thread == current);
34
35         /*
36          * Do the dance but not conditional clear_bit() because it'd race with
37          * other threads incrementing park_pending and setting the bit.
38          */
39         clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
40         if (atomic_dec_return(&sqd->park_pending))
41                 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
42         mutex_unlock(&sqd->lock);
43         wake_up(&sqd->wait);
44 }
45
46 void io_sq_thread_park(struct io_sq_data *sqd)
47         __acquires(&sqd->lock)
48 {
49         WARN_ON_ONCE(data_race(sqd->thread) == current);
50
51         atomic_inc(&sqd->park_pending);
52         set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
53         mutex_lock(&sqd->lock);
54         if (sqd->thread)
55                 wake_up_process(sqd->thread);
56 }
57
58 void io_sq_thread_stop(struct io_sq_data *sqd)
59 {
60         WARN_ON_ONCE(sqd->thread == current);
61         WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
62
63         set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
64         mutex_lock(&sqd->lock);
65         if (sqd->thread)
66                 wake_up_process(sqd->thread);
67         mutex_unlock(&sqd->lock);
68         wait_for_completion(&sqd->exited);
69 }
70
71 void io_put_sq_data(struct io_sq_data *sqd)
72 {
73         if (refcount_dec_and_test(&sqd->refs)) {
74                 WARN_ON_ONCE(atomic_read(&sqd->park_pending));
75
76                 io_sq_thread_stop(sqd);
77                 kfree(sqd);
78         }
79 }
80
81 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
82 {
83         struct io_ring_ctx *ctx;
84         unsigned sq_thread_idle = 0;
85
86         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
87                 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
88         sqd->sq_thread_idle = sq_thread_idle;
89 }
90
91 void io_sq_thread_finish(struct io_ring_ctx *ctx)
92 {
93         struct io_sq_data *sqd = ctx->sq_data;
94
95         if (sqd) {
96                 io_sq_thread_park(sqd);
97                 list_del_init(&ctx->sqd_list);
98                 io_sqd_update_thread_idle(sqd);
99                 io_sq_thread_unpark(sqd);
100
101                 io_put_sq_data(sqd);
102                 ctx->sq_data = NULL;
103         }
104 }
105
106 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
107 {
108         struct io_ring_ctx *ctx_attach;
109         struct io_sq_data *sqd;
110         CLASS(fd, f)(p->wq_fd);
111
112         if (fd_empty(f))
113                 return ERR_PTR(-ENXIO);
114         if (!io_is_uring_fops(fd_file(f)))
115                 return ERR_PTR(-EINVAL);
116
117         ctx_attach = fd_file(f)->private_data;
118         sqd = ctx_attach->sq_data;
119         if (!sqd)
120                 return ERR_PTR(-EINVAL);
121         if (sqd->task_tgid != current->tgid)
122                 return ERR_PTR(-EPERM);
123
124         refcount_inc(&sqd->refs);
125         return sqd;
126 }
127
128 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
129                                          bool *attached)
130 {
131         struct io_sq_data *sqd;
132
133         *attached = false;
134         if (p->flags & IORING_SETUP_ATTACH_WQ) {
135                 sqd = io_attach_sq_data(p);
136                 if (!IS_ERR(sqd)) {
137                         *attached = true;
138                         return sqd;
139                 }
140                 /* fall through for EPERM case, setup new sqd/task */
141                 if (PTR_ERR(sqd) != -EPERM)
142                         return sqd;
143         }
144
145         sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
146         if (!sqd)
147                 return ERR_PTR(-ENOMEM);
148
149         atomic_set(&sqd->park_pending, 0);
150         refcount_set(&sqd->refs, 1);
151         INIT_LIST_HEAD(&sqd->ctx_list);
152         mutex_init(&sqd->lock);
153         init_waitqueue_head(&sqd->wait);
154         init_completion(&sqd->exited);
155         return sqd;
156 }
157
158 static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
159 {
160         return READ_ONCE(sqd->state);
161 }
162
163 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
164 {
165         unsigned int to_submit;
166         int ret = 0;
167
168         to_submit = io_sqring_entries(ctx);
169         /* if we're handling multiple rings, cap submit size for fairness */
170         if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
171                 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
172
173         if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
174                 const struct cred *creds = NULL;
175
176                 if (ctx->sq_creds != current_cred())
177                         creds = override_creds(ctx->sq_creds);
178
179                 mutex_lock(&ctx->uring_lock);
180                 if (!wq_list_empty(&ctx->iopoll_list))
181                         io_do_iopoll(ctx, true);
182
183                 /*
184                  * Don't submit if refs are dying, good for io_uring_register(),
185                  * but also it is relied upon by io_ring_exit_work()
186                  */
187                 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
188                     !(ctx->flags & IORING_SETUP_R_DISABLED))
189                         ret = io_submit_sqes(ctx, to_submit);
190                 mutex_unlock(&ctx->uring_lock);
191
192                 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
193                         wake_up(&ctx->sqo_sq_wait);
194                 if (creds)
195                         revert_creds(creds);
196         }
197
198         return ret;
199 }
200
201 static bool io_sqd_handle_event(struct io_sq_data *sqd)
202 {
203         bool did_sig = false;
204         struct ksignal ksig;
205
206         if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
207             signal_pending(current)) {
208                 mutex_unlock(&sqd->lock);
209                 if (signal_pending(current))
210                         did_sig = get_signal(&ksig);
211                 wait_event(sqd->wait, !atomic_read(&sqd->park_pending));
212                 mutex_lock(&sqd->lock);
213                 sqd->sq_cpu = raw_smp_processor_id();
214         }
215         return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
216 }
217
218 /*
219  * Run task_work, processing the retry_list first. The retry_list holds
220  * entries that we passed on in the previous run, if we had more task_work
221  * than we were asked to process. Newly queued task_work isn't run until the
222  * retry list has been fully processed.
223  */
224 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
225 {
226         struct io_uring_task *tctx = current->io_uring;
227         unsigned int count = 0;
228
229         if (*retry_list) {
230                 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
231                 if (count >= max_entries)
232                         goto out;
233                 max_entries -= count;
234         }
235         *retry_list = tctx_task_work_run(tctx, max_entries, &count);
236 out:
237         if (task_work_pending(current))
238                 task_work_run();
239         return count;
240 }
241
242 static bool io_sq_tw_pending(struct llist_node *retry_list)
243 {
244         struct io_uring_task *tctx = current->io_uring;
245
246         return retry_list || !llist_empty(&tctx->task_list);
247 }
248
249 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
250 {
251         struct rusage end;
252
253         getrusage(current, RUSAGE_SELF, &end);
254         end.ru_stime.tv_sec -= start->ru_stime.tv_sec;
255         end.ru_stime.tv_usec -= start->ru_stime.tv_usec;
256
257         sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000;
258 }
259
260 static int io_sq_thread(void *data)
261 {
262         struct llist_node *retry_list = NULL;
263         struct io_sq_data *sqd = data;
264         struct io_ring_ctx *ctx;
265         struct rusage start;
266         unsigned long timeout = 0;
267         char buf[TASK_COMM_LEN];
268         DEFINE_WAIT(wait);
269
270         /* offload context creation failed, just exit */
271         if (!current->io_uring)
272                 goto err_out;
273
274         snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
275         set_task_comm(current, buf);
276
277         /* reset to our pid after we've set task_comm, for fdinfo */
278         sqd->task_pid = current->pid;
279
280         if (sqd->sq_cpu != -1) {
281                 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
282         } else {
283                 set_cpus_allowed_ptr(current, cpu_online_mask);
284                 sqd->sq_cpu = raw_smp_processor_id();
285         }
286
287         /*
288          * Force audit context to get setup, in case we do prep side async
289          * operations that would trigger an audit call before any issue side
290          * audit has been done.
291          */
292         audit_uring_entry(IORING_OP_NOP);
293         audit_uring_exit(true, 0);
294
295         mutex_lock(&sqd->lock);
296         while (1) {
297                 bool cap_entries, sqt_spin = false;
298
299                 if (io_sqd_events_pending(sqd) || signal_pending(current)) {
300                         if (io_sqd_handle_event(sqd))
301                                 break;
302                         timeout = jiffies + sqd->sq_thread_idle;
303                 }
304
305                 cap_entries = !list_is_singular(&sqd->ctx_list);
306                 getrusage(current, RUSAGE_SELF, &start);
307                 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
308                         int ret = __io_sq_thread(ctx, cap_entries);
309
310                         if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
311                                 sqt_spin = true;
312                 }
313                 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
314                         sqt_spin = true;
315
316                 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
317                         if (io_napi(ctx))
318                                 io_napi_sqpoll_busy_poll(ctx);
319
320                 if (sqt_spin || !time_after(jiffies, timeout)) {
321                         if (sqt_spin) {
322                                 io_sq_update_worktime(sqd, &start);
323                                 timeout = jiffies + sqd->sq_thread_idle;
324                         }
325                         if (unlikely(need_resched())) {
326                                 mutex_unlock(&sqd->lock);
327                                 cond_resched();
328                                 mutex_lock(&sqd->lock);
329                                 sqd->sq_cpu = raw_smp_processor_id();
330                         }
331                         continue;
332                 }
333
334                 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
335                 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
336                         bool needs_sched = true;
337
338                         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
339                                 atomic_or(IORING_SQ_NEED_WAKEUP,
340                                                 &ctx->rings->sq_flags);
341                                 if ((ctx->flags & IORING_SETUP_IOPOLL) &&
342                                     !wq_list_empty(&ctx->iopoll_list)) {
343                                         needs_sched = false;
344                                         break;
345                                 }
346
347                                 /*
348                                  * Ensure the store of the wakeup flag is not
349                                  * reordered with the load of the SQ tail
350                                  */
351                                 smp_mb__after_atomic();
352
353                                 if (io_sqring_entries(ctx)) {
354                                         needs_sched = false;
355                                         break;
356                                 }
357                         }
358
359                         if (needs_sched) {
360                                 mutex_unlock(&sqd->lock);
361                                 schedule();
362                                 mutex_lock(&sqd->lock);
363                                 sqd->sq_cpu = raw_smp_processor_id();
364                         }
365                         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
366                                 atomic_andnot(IORING_SQ_NEED_WAKEUP,
367                                                 &ctx->rings->sq_flags);
368                 }
369
370                 finish_wait(&sqd->wait, &wait);
371                 timeout = jiffies + sqd->sq_thread_idle;
372         }
373
374         if (retry_list)
375                 io_sq_tw(&retry_list, UINT_MAX);
376
377         io_uring_cancel_generic(true, sqd);
378         sqd->thread = NULL;
379         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
380                 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
381         io_run_task_work();
382         mutex_unlock(&sqd->lock);
383 err_out:
384         complete(&sqd->exited);
385         do_exit(0);
386 }
387
388 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
389 {
390         DEFINE_WAIT(wait);
391
392         do {
393                 if (!io_sqring_full(ctx))
394                         break;
395                 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
396
397                 if (!io_sqring_full(ctx))
398                         break;
399                 schedule();
400         } while (!signal_pending(current));
401
402         finish_wait(&ctx->sqo_sq_wait, &wait);
403 }
404
405 __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
406                                 struct io_uring_params *p)
407 {
408         struct task_struct *task_to_put = NULL;
409         int ret;
410
411         /* Retain compatibility with failing for an invalid attach attempt */
412         if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
413                                 IORING_SETUP_ATTACH_WQ) {
414                 CLASS(fd, f)(p->wq_fd);
415                 if (fd_empty(f))
416                         return -ENXIO;
417                 if (!io_is_uring_fops(fd_file(f)))
418                         return -EINVAL;
419         }
420         if (ctx->flags & IORING_SETUP_SQPOLL) {
421                 struct task_struct *tsk;
422                 struct io_sq_data *sqd;
423                 bool attached;
424
425                 ret = security_uring_sqpoll();
426                 if (ret)
427                         return ret;
428
429                 sqd = io_get_sq_data(p, &attached);
430                 if (IS_ERR(sqd)) {
431                         ret = PTR_ERR(sqd);
432                         goto err;
433                 }
434
435                 ctx->sq_creds = get_current_cred();
436                 ctx->sq_data = sqd;
437                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
438                 if (!ctx->sq_thread_idle)
439                         ctx->sq_thread_idle = HZ;
440
441                 io_sq_thread_park(sqd);
442                 list_add(&ctx->sqd_list, &sqd->ctx_list);
443                 io_sqd_update_thread_idle(sqd);
444                 /* don't attach to a dying SQPOLL thread, would be racy */
445                 ret = (attached && !sqd->thread) ? -ENXIO : 0;
446                 io_sq_thread_unpark(sqd);
447
448                 if (ret < 0)
449                         goto err;
450                 if (attached)
451                         return 0;
452
453                 if (p->flags & IORING_SETUP_SQ_AFF) {
454                         cpumask_var_t allowed_mask;
455                         int cpu = p->sq_thread_cpu;
456
457                         ret = -EINVAL;
458                         if (cpu >= nr_cpu_ids || !cpu_online(cpu))
459                                 goto err_sqpoll;
460                         ret = -ENOMEM;
461                         if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
462                                 goto err_sqpoll;
463                         ret = -EINVAL;
464                         cpuset_cpus_allowed(current, allowed_mask);
465                         if (!cpumask_test_cpu(cpu, allowed_mask)) {
466                                 free_cpumask_var(allowed_mask);
467                                 goto err_sqpoll;
468                         }
469                         free_cpumask_var(allowed_mask);
470                         sqd->sq_cpu = cpu;
471                 } else {
472                         sqd->sq_cpu = -1;
473                 }
474
475                 sqd->task_pid = current->pid;
476                 sqd->task_tgid = current->tgid;
477                 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
478                 if (IS_ERR(tsk)) {
479                         ret = PTR_ERR(tsk);
480                         goto err_sqpoll;
481                 }
482
483                 sqd->thread = tsk;
484                 task_to_put = get_task_struct(tsk);
485                 ret = io_uring_alloc_task_context(tsk, ctx);
486                 wake_up_new_task(tsk);
487                 if (ret)
488                         goto err;
489         } else if (p->flags & IORING_SETUP_SQ_AFF) {
490                 /* Can't have SQ_AFF without SQPOLL */
491                 ret = -EINVAL;
492                 goto err;
493         }
494
495         if (task_to_put)
496                 put_task_struct(task_to_put);
497         return 0;
498 err_sqpoll:
499         complete(&ctx->sq_data->exited);
500 err:
501         io_sq_thread_finish(ctx);
502         if (task_to_put)
503                 put_task_struct(task_to_put);
504         return ret;
505 }
506
507 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
508                                      cpumask_var_t mask)
509 {
510         struct io_sq_data *sqd = ctx->sq_data;
511         int ret = -EINVAL;
512
513         if (sqd) {
514                 io_sq_thread_park(sqd);
515                 /* Don't set affinity for a dying thread */
516                 if (sqd->thread)
517                         ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask);
518                 io_sq_thread_unpark(sqd);
519         }
520
521         return ret;
522 }
This page took 0.059832 seconds and 4 git commands to generate.