diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 429 |
1 files changed, 200 insertions, 229 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index cb60a42b9fdf..cc5cf2209fb0 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -16,6 +16,7 @@ #include <linux/slab.h> #include <linux/kthread.h> #include <linux/rculist_nulls.h> +#include <linux/fs_struct.h> #include "io-wq.h" @@ -59,6 +60,7 @@ struct io_worker { const struct cred *cur_creds; const struct cred *saved_creds; struct files_struct *restore_files; + struct fs_struct *restore_fs; }; #if BITS_PER_LONG == 64 @@ -67,6 +69,8 @@ struct io_worker { #define IO_WQ_HASH_ORDER 5 #endif +#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) + struct io_wqe_acct { unsigned nr_workers; unsigned max_workers; @@ -96,6 +100,7 @@ struct io_wqe { struct list_head all_list; struct io_wq *wq; + struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; }; /* @@ -105,8 +110,7 @@ struct io_wq { struct io_wqe **wqes; unsigned long state; - get_work_fn *get_work; - put_work_fn *put_work; + free_work_fn *free_work; struct task_struct *manager; struct user_struct *user; @@ -151,6 +155,9 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) task_unlock(current); } + if (current->fs != worker->restore_fs) + current->fs = worker->restore_fs; + /* * If we have an active mm, we need to drop the wq lock before unusing * it. If we do, return true and let the caller retry the idle loop. @@ -311,6 +318,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); worker->restore_files = current->files; + worker->restore_fs = current->fs; io_wqe_inc_running(wqe, worker); } @@ -370,26 +378,35 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) return __io_worker_unuse(wqe, worker); } -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) +static inline unsigned int io_get_work_hash(struct io_wq_work *work) +{ + return work->flags >> IO_WQ_HASH_SHIFT; +} + +static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; - struct io_wq_work *work; + struct io_wq_work *work, *tail; + unsigned int hash; wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); /* not hashed, can run anytime */ - if (!(work->flags & IO_WQ_WORK_HASHED)) { - wq_node_del(&wqe->work_list, node, prev); + if (!io_wq_is_hashed(work)) { + wq_list_del(&wqe->work_list, node, prev); return work; } /* hashed, can run if not already running */ - *hash = work->flags >> IO_WQ_HASH_SHIFT; - if (!(wqe->hash_map & BIT_ULL(*hash))) { - wqe->hash_map |= BIT_ULL(*hash); - wq_node_del(&wqe->work_list, node, prev); + hash = io_get_work_hash(work); + if (!(wqe->hash_map & BIT(hash))) { + wqe->hash_map |= BIT(hash); + /* all items with this hash lie in [work, tail] */ + tail = wqe->hash_tail[hash]; + wqe->hash_tail[hash] = NULL; + wq_list_cut(&wqe->work_list, &tail->list, prev); return work; } } @@ -434,16 +451,49 @@ static void io_wq_switch_creds(struct io_worker *worker, worker->saved_creds = old_creds; } +static void io_impersonate_work(struct io_worker *worker, + struct io_wq_work *work) +{ + if (work->files && current->files != work->files) { + task_lock(current); + current->files = work->files; + task_unlock(current); + } + if (work->fs && current->fs != work->fs) + current->fs = work->fs; + if (work->mm != worker->mm) + io_wq_switch_mm(worker, work); + if (worker->cur_creds != work->creds) + io_wq_switch_creds(worker, work); +} + +static void io_assign_current_work(struct io_worker *worker, + struct io_wq_work *work) +{ + if (work) { + /* flush pending signals before assigning new work */ + if (signal_pending(current)) + flush_signals(current); + cond_resched(); + } + + spin_lock_irq(&worker->lock); + worker->cur_work = work; + spin_unlock_irq(&worker->lock); +} + +static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); + static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { - struct io_wq_work *work, *old_work = NULL, *put_work = NULL; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; do { - unsigned hash = -1U; - + struct io_wq_work *work; + unsigned int hash; +get_next: /* * If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. @@ -451,118 +501,80 @@ static void io_worker_handle_work(struct io_worker *worker) * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe, &hash); + work = io_get_next_work(wqe); if (work) __io_worker_busy(wqe, worker, work); else if (!wq_list_empty(&wqe->work_list)) wqe->flags |= IO_WQE_FLAG_STALLED; spin_unlock_irq(&wqe->lock); - if (put_work && wq->put_work) - wq->put_work(old_work); if (!work) break; -next: - /* flush any pending signals before assigning new work */ - if (signal_pending(current)) - flush_signals(current); - - cond_resched(); - - spin_lock_irq(&worker->lock); - worker->cur_work = work; - spin_unlock_irq(&worker->lock); - - if (work->flags & IO_WQ_WORK_CB) - work->func(&work); - - if (work->files && current->files != work->files) { - task_lock(current); - current->files = work->files; - task_unlock(current); - } - if (work->mm != worker->mm) - io_wq_switch_mm(worker, work); - if (worker->cur_creds != work->creds) - io_wq_switch_creds(worker, work); - /* - * OK to set IO_WQ_WORK_CANCEL even for uncancellable work, - * the worker function will do the right thing. - */ - if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) - work->flags |= IO_WQ_WORK_CANCEL; - if (worker->mm) - work->flags |= IO_WQ_WORK_HAS_MM; - - if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) { - put_work = work; - wq->get_work(work); - } - - old_work = work; - work->func(&work); - - spin_lock_irq(&worker->lock); - worker->cur_work = NULL; - spin_unlock_irq(&worker->lock); - - spin_lock_irq(&wqe->lock); - - if (hash != -1U) { - wqe->hash_map &= ~BIT_ULL(hash); - wqe->flags &= ~IO_WQE_FLAG_STALLED; - } - if (work && work != old_work) { - spin_unlock_irq(&wqe->lock); - - if (put_work && wq->put_work) { - wq->put_work(put_work); - put_work = NULL; + io_assign_current_work(worker, work); + + /* handle a whole dependent link */ + do { + struct io_wq_work *old_work, *next_hashed, *linked; + + next_hashed = wq_next_work(work); + io_impersonate_work(worker, work); + /* + * OK to set IO_WQ_WORK_CANCEL even for uncancellable + * work, the worker function will do the right thing. + */ + if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) + work->flags |= IO_WQ_WORK_CANCEL; + + hash = io_get_work_hash(work); + linked = old_work = work; + linked->func(&linked); + linked = (old_work == linked) ? NULL : linked; + + work = next_hashed; + if (!work && linked && !io_wq_is_hashed(linked)) { + work = linked; + linked = NULL; + } + io_assign_current_work(worker, work); + wq->free_work(old_work); + + if (linked) + io_wqe_enqueue(wqe, linked); + + if (hash != -1U && !next_hashed) { + spin_lock_irq(&wqe->lock); + wqe->hash_map &= ~BIT_ULL(hash); + wqe->flags &= ~IO_WQE_FLAG_STALLED; + /* dependent work is not hashed */ + hash = -1U; + /* skip unnecessary unlock-lock wqe->lock */ + if (!work) + goto get_next; + spin_unlock_irq(&wqe->lock); } + } while (work); - /* dependent work not hashed */ - hash = -1U; - goto next; - } + spin_lock_irq(&wqe->lock); } while (1); } -static inline void io_worker_spin_for_work(struct io_wqe *wqe) -{ - int i = 0; - - while (++i < 1000) { - if (io_wqe_run_queue(wqe)) - break; - if (need_resched()) - break; - cpu_relax(); - } -} - static int io_wqe_worker(void *data) { struct io_worker *worker = data; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; - bool did_work; io_worker_start(wqe, worker); - did_work = false; while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { set_current_state(TASK_INTERRUPTIBLE); loop: - if (did_work) - io_worker_spin_for_work(wqe); spin_lock_irq(&wqe->lock); if (io_wqe_run_queue(wqe)) { __set_current_state(TASK_RUNNING); io_worker_handle_work(worker); - did_work = true; goto loop; } - did_work = false; /* drops the lock on success, retry */ if (__io_worker_idle(wqe, worker)) { __release(&wqe->lock); @@ -691,11 +703,16 @@ static int io_wq_manager(void *data) /* create fixed workers */ refcount_set(&wq->refs, workers_to_create); for_each_node(node) { + if (!node_online(node)) + continue; if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) goto err; workers_to_create--; } + while (workers_to_create--) + refcount_dec(&wq->refs); + complete(&wq->done); while (!kthread_should_stop()) { @@ -703,6 +720,9 @@ static int io_wq_manager(void *data) struct io_wqe *wqe = wq->wqes[node]; bool fork_worker[2] = { false, false }; + if (!node_online(node)) + continue; + spin_lock_irq(&wqe->lock); if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) fork_worker[IO_WQ_ACCT_BOUND] = true; @@ -750,6 +770,40 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, return true; } +static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) +{ + struct io_wq *wq = wqe->wq; + + do { + struct io_wq_work *old_work = work; + + work->flags |= IO_WQ_WORK_CANCEL; + work->func(&work); + work = (work == old_work) ? NULL : work; + wq->free_work(old_work); + } while (work); +} + +static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) +{ + unsigned int hash; + struct io_wq_work *tail; + + if (!io_wq_is_hashed(work)) { +append: + wq_list_add_tail(&work->list, &wqe->work_list); + return; + } + + hash = io_get_work_hash(work); + tail = wqe->hash_tail[hash]; + wqe->hash_tail[hash] = work; + if (!tail) + goto append; + + wq_list_add_after(&work->list, &tail->list, &wqe->work_list); +} + static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); @@ -763,14 +817,13 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) * It's close enough to not be an issue, fork() has the same delay. */ if (unlikely(!io_wq_can_queue(wqe, acct, work))) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); + io_run_cancel(work, wqe); return; } work_flags = work->flags; spin_lock_irqsave(&wqe->lock, flags); - wq_list_add_tail(&work->list, &wqe->work_list); + io_wqe_insert_work(wqe, work); wqe->flags &= ~IO_WQE_FLAG_STALLED; spin_unlock_irqrestore(&wqe->lock, flags); @@ -787,19 +840,15 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) } /* - * Enqueue work, hashed by some key. Work items that hash to the same value - * will not be done in parallel. Used to limit concurrent writes, generally - * hashed by inode. + * Work items that hash to the same value will not be done in parallel. + * Used to limit concurrent writes, generally hashed by inode. */ -void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val) +void io_wq_hash_work(struct io_wq_work *work, void *val) { - struct io_wqe *wqe = wq->wqes[numa_node_id()]; - unsigned bit; - + unsigned int bit; bit = hash_ptr(val, IO_WQ_HASH_ORDER); work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); - io_wqe_enqueue(wqe, work); } static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) @@ -821,7 +870,9 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe, list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { if (io_worker_get(worker)) { - ret = func(worker, data); + /* no task if node is/was offline */ + if (worker->task) + ret = func(worker, data); io_worker_release(worker); if (ret) break; @@ -847,14 +898,13 @@ void io_wq_cancel_all(struct io_wq *wq) } struct io_cb_cancel_data { - struct io_wqe *wqe; - work_cancel_fn *cancel; - void *caller_data; + work_cancel_fn *fn; + void *data; }; -static bool io_work_cancel(struct io_worker *worker, void *cancel_data) +static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { - struct io_cb_cancel_data *data = cancel_data; + struct io_cb_cancel_data *match = data; unsigned long flags; bool ret = false; @@ -865,82 +915,7 @@ static bool io_work_cancel(struct io_worker *worker, void *cancel_data) spin_lock_irqsave(&worker->lock, flags); if (worker->cur_work && !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && - data->cancel(worker->cur_work, data->caller_data)) { - send_sig(SIGINT, worker->task, 1); - ret = true; - } - spin_unlock_irqrestore(&worker->lock, flags); - - return ret; -} - -static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, - work_cancel_fn *cancel, - void *cancel_data) -{ - struct io_cb_cancel_data data = { - .wqe = wqe, - .cancel = cancel, - .caller_data = cancel_data, - }; - struct io_wq_work_node *node, *prev; - struct io_wq_work *work; - unsigned long flags; - bool found = false; - - spin_lock_irqsave(&wqe->lock, flags); - wq_list_for_each(node, prev, &wqe->work_list) { - work = container_of(node, struct io_wq_work, list); - - if (cancel(work, cancel_data)) { - wq_node_del(&wqe->work_list, node, prev); - found = true; - break; - } - } - spin_unlock_irqrestore(&wqe->lock, flags); - - if (found) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); - return IO_WQ_CANCEL_OK; - } - - rcu_read_lock(); - found = io_wq_for_each_worker(wqe, io_work_cancel, &data); - rcu_read_unlock(); - return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; -} - -enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, - void *data) -{ - enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; - int node; - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - ret = io_wqe_cancel_cb_work(wqe, cancel, data); - if (ret != IO_WQ_CANCEL_NOTFOUND) - break; - } - - return ret; -} - -static bool io_wq_worker_cancel(struct io_worker *worker, void *data) -{ - struct io_wq_work *work = data; - unsigned long flags; - bool ret = false; - - if (worker->cur_work != work) - return false; - - spin_lock_irqsave(&worker->lock, flags); - if (worker->cur_work == work && - !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) { + match->fn(worker->cur_work, match->data)) { send_sig(SIGINT, worker->task, 1); ret = true; } @@ -950,15 +925,13 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) } static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, - struct io_wq_work *cwork) + struct io_cb_cancel_data *match) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; unsigned long flags; bool found = false; - cwork->flags |= IO_WQ_WORK_CANCEL; - /* * First check pending list, if we're lucky we can just remove it * from there. CANCEL_OK means that the work is returned as-new, @@ -968,8 +941,8 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); - if (work == cwork) { - wq_node_del(&wqe->work_list, node, prev); + if (match->fn(work, match->data)) { + wq_list_del(&wqe->work_list, node, prev); found = true; break; } @@ -977,8 +950,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); + io_run_cancel(work, wqe); return IO_WQ_CANCEL_OK; } @@ -989,20 +961,25 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, * completion will run normally in this case. */ rcu_read_lock(); - found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork); + found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); rcu_read_unlock(); return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; } -enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) +enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, + void *data) { + struct io_cb_cancel_data match = { + .fn = cancel, + .data = data, + }; enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; int node; for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; - ret = io_wqe_cancel_work(wqe, cwork); + ret = io_wqe_cancel_work(wqe, &match); if (ret != IO_WQ_CANCEL_NOTFOUND) break; } @@ -1010,38 +987,28 @@ enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) return ret; } -struct io_wq_flush_data { - struct io_wq_work work; - struct completion done; -}; - -static void io_wq_flush_func(struct io_wq_work **workptr) +static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data) { - struct io_wq_work *work = *workptr; - struct io_wq_flush_data *data; + return work == data; +} - data = container_of(work, struct io_wq_flush_data, work); - complete(&data->done); +enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) +{ + return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork); } -/* - * Doesn't wait for previously queued work to finish. When this completes, - * it just means that previously queued work was started. - */ -void io_wq_flush(struct io_wq *wq) +static bool io_wq_pid_match(struct io_wq_work *work, void *data) { - struct io_wq_flush_data data; - int node; + pid_t pid = (pid_t) (unsigned long) data; - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; + return work->task_pid == pid; +} - init_completion(&data.done); - INIT_IO_WORK(&data.work, io_wq_flush_func); - data.work.flags |= IO_WQ_WORK_INTERNAL; - io_wqe_enqueue(wqe, &data.work); - wait_for_completion(&data.done); - } +enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) +{ + void *data = (void *) (unsigned long) pid; + + return io_wq_cancel_cb(wq, io_wq_pid_match, data); } struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) @@ -1049,6 +1016,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) int ret = -ENOMEM, node; struct io_wq *wq; + if (WARN_ON_ONCE(!data->free_work)) + return ERR_PTR(-EINVAL); + wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); @@ -1059,20 +1029,22 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) return ERR_PTR(-ENOMEM); } - wq->get_work = data->get_work; - wq->put_work = data->put_work; + wq->free_work = data->free_work; /* caller must already hold a reference to this */ wq->user = data->user; for_each_node(node) { struct io_wqe *wqe; + int alloc_node = node; - wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node); + if (!node_online(alloc_node)) + alloc_node = NUMA_NO_NODE; + wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); if (!wqe) goto err; wq->wqes[node] = wqe; - wqe->node = node; + wqe->node = alloc_node; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); if (wq->user) { @@ -1080,7 +1052,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) task_rlimit(current, RLIMIT_NPROC); } atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); - wqe->node = node; wqe->wq = wq; spin_lock_init(&wqe->lock); INIT_WQ_LIST(&wqe->work_list); @@ -1115,7 +1086,7 @@ err: bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) { - if (data->get_work != wq->get_work || data->put_work != wq->put_work) + if (data->free_work != wq->free_work) return false; return refcount_inc_not_zero(&wq->use_refs); |