raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu);
- if (refcount_dec_and_test(&wqe->wq->refs))
- complete(&wqe->wq->done);
+ io_wq_put(wqe->wq);
}
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
- if (refcount_dec_and_test(&wq->refs))
- complete(&wq->done);
+ io_wq_put(wq);
kfree(worker);
return false;
}
io_wq_check_workers(wq);
- if (refcount_dec_and_test(&wq->refs)) {
- wq->manager = NULL;
- complete(&wq->done);
- do_exit(0);
- }
/* if ERROR is set and we get here, we have workers to wake */
if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
rcu_read_lock();
rcu_read_unlock();
}
wq->manager = NULL;
+ io_wq_put(wq);
do_exit(0);
}
wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}
+static int io_wq_fork_manager(struct io_wq *wq)
+{
+ int ret;
+
+ if (wq->manager)
+ return 0;
+
+ clear_bit(IO_WQ_BIT_EXIT, &wq->state);
+ refcount_inc(&wq->refs);
+ current->flags |= PF_IO_WORKER;
+ ret = io_wq_fork_thread(io_wq_manager, wq);
+ current->flags &= ~PF_IO_WORKER;
+ if (ret >= 0) {
+ wait_for_completion(&wq->done);
+ return 0;
+ }
+
+ io_wq_put(wq);
+ return ret;
+}
+
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
int work_flags;
unsigned long flags;
+ /* Can only happen if manager creation fails after exec */
+ if (unlikely(io_wq_fork_manager(wqe->wq))) {
+ work->flags |= IO_WQ_WORK_CANCEL;
+ wqe->wq->do_work(work);
+ return;
+ }
+
work_flags = work->flags;
raw_spin_lock_irqsave(&wqe->lock, flags);
io_wqe_insert_work(wqe, work);
init_completion(&wq->done);
refcount_set(&wq->refs, 1);
- current->flags |= PF_IO_WORKER;
- ret = io_wq_fork_thread(io_wq_manager, wq);
- current->flags &= ~PF_IO_WORKER;
- if (ret >= 0) {
- wait_for_completion(&wq->done);
+ ret = io_wq_fork_manager(wq);
+ if (!ret)
return wq;
- }
- if (refcount_dec_and_test(&wq->refs))
- complete(&wq->done);
+ io_wq_put(wq);
io_wq_put_hash(data->hash);
err:
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
return ERR_PTR(ret);
}
-void io_wq_destroy(struct io_wq *wq)
+static void io_wq_destroy(struct io_wq *wq)
{
int node;
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
- wait_for_completion(&wq->done);
-
spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
io_wq_put_hash(wq->hash);
kfree(wq->wqes);
kfree(wq);
+
+}
+
+void io_wq_put(struct io_wq *wq)
+{
+ if (refcount_dec_and_test(&wq->refs))
+ io_wq_destroy(wq);
}
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)