summaryrefslogtreecommitdiff
path: root/fs
diff options
context:
space:
mode:
Diffstat (limited to 'fs')
-rw-r--r--fs/io-wq.c30
-rw-r--r--fs/io-wq.h3
-rw-r--r--fs/io_uring.c116
3 files changed, 141 insertions, 8 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 37863879e987..253c04a40db5 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -52,6 +52,7 @@ struct io_worker {
struct rcu_head rcu;
struct mm_struct *mm;
+ struct files_struct *restore_files;
};
struct io_wq_nulls_list {
@@ -126,22 +127,36 @@ static void io_worker_release(struct io_worker *worker)
*/
static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
{
+ bool dropped_lock = false;
+
+ if (current->files != worker->restore_files) {
+ __acquire(&wqe->lock);
+ spin_unlock_irq(&wqe->lock);
+ dropped_lock = true;
+
+ task_lock(current);
+ current->files = worker->restore_files;
+ task_unlock(current);
+ }
+
/*
* If we have an active mm, we need to drop the wq lock before unusing
* it. If we do, return true and let the caller retry the idle loop.
*/
if (worker->mm) {
- __acquire(&wqe->lock);
- spin_unlock_irq(&wqe->lock);
+ if (!dropped_lock) {
+ __acquire(&wqe->lock);
+ spin_unlock_irq(&wqe->lock);
+ dropped_lock = true;
+ }
__set_current_state(TASK_RUNNING);
set_fs(KERNEL_DS);
unuse_mm(worker->mm);
mmput(worker->mm);
worker->mm = NULL;
- return true;
}
- return false;
+ return dropped_lock;
}
static void io_worker_exit(struct io_worker *worker)
@@ -189,6 +204,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
current->flags |= PF_IO_WORKER;
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
+ worker->restore_files = current->files;
atomic_inc(&wqe->nr_running);
}
@@ -291,6 +307,12 @@ static void io_worker_handle_work(struct io_worker *worker)
if (!work)
break;
next:
+ if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
+ current->files != work->files) {
+ task_lock(current);
+ current->files = work->files;
+ task_unlock(current);
+ }
if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
wq->mm && mmget_not_zero(wq->mm)) {
use_mm(wq->mm);
diff --git a/fs/io-wq.h b/fs/io-wq.h
index be8f22c8937b..e93f764b1fa4 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -8,6 +8,7 @@ enum {
IO_WQ_WORK_HAS_MM = 2,
IO_WQ_WORK_HASHED = 4,
IO_WQ_WORK_NEEDS_USER = 8,
+ IO_WQ_WORK_NEEDS_FILES = 16,
IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */
};
@@ -22,12 +23,14 @@ struct io_wq_work {
struct list_head list;
void (*func)(struct io_wq_work **);
unsigned flags;
+ struct files_struct *files;
};
#define INIT_IO_WORK(work, _func) \
do { \
(work)->func = _func; \
(work)->flags = 0; \
+ (work)->files = NULL; \
} while (0) \
struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm);
diff --git a/fs/io_uring.c b/fs/io_uring.c
index d94bd4e3a60e..6e1523567920 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -196,6 +196,8 @@ struct io_ring_ctx {
struct list_head defer_list;
struct list_head timeout_list;
+
+ wait_queue_head_t inflight_wait;
} ____cacheline_aligned_in_smp;
/* IO offload */
@@ -250,6 +252,9 @@ struct io_ring_ctx {
*/
struct list_head poll_list;
struct list_head cancel_list;
+
+ spinlock_t inflight_lock;
+ struct list_head inflight_list;
} ____cacheline_aligned_in_smp;
#if defined(CONFIG_UNIX)
@@ -259,6 +264,8 @@ struct io_ring_ctx {
struct sqe_submit {
const struct io_uring_sqe *sqe;
+ struct file *ring_file;
+ int ring_fd;
u32 sequence;
bool has_user;
bool in_async;
@@ -317,10 +324,13 @@ struct io_kiocb {
#define REQ_F_TIMEOUT 1024 /* timeout request */
#define REQ_F_ISREG 2048 /* regular file */
#define REQ_F_MUST_PUNT 4096 /* must be punted even for NONBLOCK */
+#define REQ_F_INFLIGHT 8192 /* on inflight list */
u64 user_data;
u32 result;
u32 sequence;
+ struct list_head inflight_entry;
+
struct io_wq_work work;
};
@@ -401,6 +411,9 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->cancel_list);
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
+ init_waitqueue_head(&ctx->inflight_wait);
+ spin_lock_init(&ctx->inflight_lock);
+ INIT_LIST_HEAD(&ctx->inflight_list);
return ctx;
}
@@ -670,9 +683,20 @@ static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
static void __io_free_req(struct io_kiocb *req)
{
+ struct io_ring_ctx *ctx = req->ctx;
+
if (req->file && !(req->flags & REQ_F_FIXED_FILE))
fput(req->file);
- percpu_ref_put(&req->ctx->refs);
+ if (req->flags & REQ_F_INFLIGHT) {
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->inflight_lock, flags);
+ list_del(&req->inflight_entry);
+ if (waitqueue_active(&ctx->inflight_wait))
+ wake_up(&ctx->inflight_wait);
+ spin_unlock_irqrestore(&ctx->inflight_lock, flags);
+ }
+ percpu_ref_put(&ctx->refs);
kmem_cache_free(req_cachep, req);
}
@@ -2276,6 +2300,30 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
return 0;
}
+static int io_grab_files(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+ int ret = -EBADF;
+
+ rcu_read_lock();
+ spin_lock_irq(&ctx->inflight_lock);
+ /*
+ * We use the f_ops->flush() handler to ensure that we can flush
+ * out work accessing these files if the fd is closed. Check if
+ * the fd has changed since we started down this path, and disallow
+ * this operation if it has.
+ */
+ if (fcheck(req->submit.ring_fd) == req->submit.ring_file) {
+ list_add(&req->inflight_entry, &ctx->inflight_list);
+ req->flags |= REQ_F_INFLIGHT;
+ req->work.files = current->files;
+ ret = 0;
+ }
+ spin_unlock_irq(&ctx->inflight_lock);
+ rcu_read_unlock();
+
+ return ret;
+}
+
static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
struct sqe_submit *s)
{
@@ -2295,17 +2343,25 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
if (sqe_copy) {
s->sqe = sqe_copy;
memcpy(&req->submit, s, sizeof(*s));
- io_queue_async_work(ctx, req);
+ if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
+ ret = io_grab_files(ctx, req);
+ if (ret) {
+ kfree(sqe_copy);
+ goto err;
+ }
+ }
/*
* Queued up for async execution, worker will release
* submit reference when the iocb is actually submitted.
*/
+ io_queue_async_work(ctx, req);
return 0;
}
}
/* drop submission reference */
+err:
io_put_req(req, NULL);
/* and drop final reference, if we failed */
@@ -2509,6 +2565,7 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
head = READ_ONCE(sq_array[head & ctx->sq_mask]);
if (head < ctx->sq_entries) {
+ s->ring_file = NULL;
s->sqe = &ctx->sq_sqes[head];
s->sequence = ctx->cached_sq_head;
ctx->cached_sq_head++;
@@ -2708,7 +2765,8 @@ static int io_sq_thread(void *data)
return 0;
}
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
+ struct file *ring_file, int ring_fd)
{
struct io_submit_state state, *statep = NULL;
struct io_kiocb *link = NULL;
@@ -2750,9 +2808,11 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
}
out:
+ s.ring_file = ring_file;
s.has_user = true;
s.in_async = false;
s.needs_fixed_file = false;
+ s.ring_fd = ring_fd;
submit++;
trace_io_uring_submit_sqe(ctx, true, false);
io_submit_sqe(ctx, &s, statep, &link);
@@ -3714,6 +3774,53 @@ static int io_uring_release(struct inode *inode, struct file *file)
return 0;
}
+static void io_uring_cancel_files(struct io_ring_ctx *ctx,
+ struct files_struct *files)
+{
+ struct io_kiocb *req;
+ DEFINE_WAIT(wait);
+
+ while (!list_empty_careful(&ctx->inflight_list)) {
+ enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
+
+ spin_lock_irq(&ctx->inflight_lock);
+ list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
+ if (req->work.files == files) {
+ ret = io_wq_cancel_work(ctx->io_wq, &req->work);
+ break;
+ }
+ }
+ if (ret == IO_WQ_CANCEL_RUNNING)
+ prepare_to_wait(&ctx->inflight_wait, &wait,
+ TASK_UNINTERRUPTIBLE);
+
+ spin_unlock_irq(&ctx->inflight_lock);
+
+ /*
+ * We need to keep going until we get NOTFOUND. We only cancel
+ * one work at the time.
+ *
+ * If we get CANCEL_RUNNING, then wait for a work to complete
+ * before continuing.
+ */
+ if (ret == IO_WQ_CANCEL_OK)
+ continue;
+ else if (ret != IO_WQ_CANCEL_RUNNING)
+ break;
+ schedule();
+ }
+}
+
+static int io_uring_flush(struct file *file, void *data)
+{
+ struct io_ring_ctx *ctx = file->private_data;
+
+ io_uring_cancel_files(ctx, data);
+ if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
+ io_wq_cancel_all(ctx->io_wq);
+ return 0;
+}
+
static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
{
loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
@@ -3782,7 +3889,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
to_submit = min(to_submit, ctx->sq_entries);
mutex_lock(&ctx->uring_lock);
- submitted = io_ring_submit(ctx, to_submit);
+ submitted = io_ring_submit(ctx, to_submit, f.file, fd);
mutex_unlock(&ctx->uring_lock);
}
if (flags & IORING_ENTER_GETEVENTS) {
@@ -3805,6 +3912,7 @@ out_fput:
static const struct file_operations io_uring_fops = {
.release = io_uring_release,
+ .flush = io_uring_flush,
.mmap = io_uring_mmap,
.poll = io_uring_poll,
.fasync = io_uring_fasync,