summaryrefslogtreecommitdiff
path: root/fs/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r--fs/io_uring.c291
1 files changed, 184 insertions, 107 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c
index f65f85d89217..84efb8956734 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -4,15 +4,28 @@
* supporting fast/efficient IO.
*
* A note on the read/write ordering memory barriers that are matched between
- * the application and kernel side. When the application reads the CQ ring
- * tail, it must use an appropriate smp_rmb() to order with the smp_wmb()
- * the kernel uses after writing the tail. Failure to do so could cause a
- * delay in when the application notices that completion events available.
- * This isn't a fatal condition. Likewise, the application must use an
- * appropriate smp_wmb() both before writing the SQ tail, and after writing
- * the SQ tail. The first one orders the sqe writes with the tail write, and
- * the latter is paired with the smp_rmb() the kernel will issue before
- * reading the SQ tail on submission.
+ * the application and kernel side.
+ *
+ * After the application reads the CQ ring tail, it must use an
+ * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
+ * before writing the tail (using smp_load_acquire to read the tail will
+ * do). It also needs a smp_mb() before updating CQ head (ordering the
+ * entry load(s) with the head store), pairing with an implicit barrier
+ * through a control-dependency in io_get_cqring (smp_store_release to
+ * store head will do). Failure to do so could lead to reading invalid
+ * CQ entries.
+ *
+ * Likewise, the application must use an appropriate smp_wmb() before
+ * writing the SQ tail (ordering SQ entry stores with the tail store),
+ * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
+ * to store the tail will do). And it needs a barrier ordering the SQ
+ * head load before writing new SQ entries (smp_load_acquire to read
+ * head will do).
+ *
+ * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
+ * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
+ * updating the SQ tail; a full memory barrier smp_mb() is needed
+ * between.
*
* Also see the examples in the liburing library:
*
@@ -70,20 +83,108 @@ struct io_uring {
u32 tail ____cacheline_aligned_in_smp;
};
+/*
+ * This data is shared with the application through the mmap at offset
+ * IORING_OFF_SQ_RING.
+ *
+ * The offsets to the member fields are published through struct
+ * io_sqring_offsets when calling io_uring_setup.
+ */
struct io_sq_ring {
+ /*
+ * Head and tail offsets into the ring; the offsets need to be
+ * masked to get valid indices.
+ *
+ * The kernel controls head and the application controls tail.
+ */
struct io_uring r;
+ /*
+ * Bitmask to apply to head and tail offsets (constant, equals
+ * ring_entries - 1)
+ */
u32 ring_mask;
+ /* Ring size (constant, power of 2) */
u32 ring_entries;
+ /*
+ * Number of invalid entries dropped by the kernel due to
+ * invalid index stored in array
+ *
+ * Written by the kernel, shouldn't be modified by the
+ * application (i.e. get number of "new events" by comparing to
+ * cached value).
+ *
+ * After a new SQ head value was read by the application this
+ * counter includes all submissions that were dropped reaching
+ * the new SQ head (and possibly more).
+ */
u32 dropped;
+ /*
+ * Runtime flags
+ *
+ * Written by the kernel, shouldn't be modified by the
+ * application.
+ *
+ * The application needs a full memory barrier before checking
+ * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
+ */
u32 flags;
+ /*
+ * Ring buffer of indices into array of io_uring_sqe, which is
+ * mmapped by the application using the IORING_OFF_SQES offset.
+ *
+ * This indirection could e.g. be used to assign fixed
+ * io_uring_sqe entries to operations and only submit them to
+ * the queue when needed.
+ *
+ * The kernel modifies neither the indices array nor the entries
+ * array.
+ */
u32 array[];
};
+/*
+ * This data is shared with the application through the mmap at offset
+ * IORING_OFF_CQ_RING.
+ *
+ * The offsets to the member fields are published through struct
+ * io_cqring_offsets when calling io_uring_setup.
+ */
struct io_cq_ring {
+ /*
+ * Head and tail offsets into the ring; the offsets need to be
+ * masked to get valid indices.
+ *
+ * The application controls head and the kernel tail.
+ */
struct io_uring r;
+ /*
+ * Bitmask to apply to head and tail offsets (constant, equals
+ * ring_entries - 1)
+ */
u32 ring_mask;
+ /* Ring size (constant, power of 2) */
u32 ring_entries;
+ /*
+ * Number of completion events lost because the queue was full;
+ * this should be avoided by the application by making sure
+ * there are not more requests pending thatn there is space in
+ * the completion queue.
+ *
+ * Written by the kernel, shouldn't be modified by the
+ * application (i.e. get number of "new events" by comparing to
+ * cached value).
+ *
+ * As completion events come in out of order this counter is not
+ * ordered with any other data.
+ */
u32 overflow;
+ /*
+ * Ring buffer of completion events.
+ *
+ * The kernel writes completion events fresh every time they are
+ * produced, so the application is allowed to modify pending
+ * entries.
+ */
struct io_uring_cqe cqes[];
};
@@ -221,7 +322,7 @@ struct io_kiocb {
struct list_head list;
unsigned int flags;
refcount_t refs;
-#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */
+#define REQ_F_NOWAIT 1 /* must not punt to workers */
#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */
#define REQ_F_FIXED_FILE 4 /* ctx owns file */
#define REQ_F_SEQ_PREV 8 /* sequential with previous */
@@ -317,12 +418,6 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
/* order cqe stores with ring update */
smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
- /*
- * Write sider barrier of tail update, app has read side. See
- * comment at the top of this file.
- */
- smp_wmb();
-
if (wq_has_sleeper(&ctx->cq_wait)) {
wake_up_interruptible(&ctx->cq_wait);
kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
@@ -336,8 +431,11 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
unsigned tail;
tail = ctx->cached_cq_tail;
- /* See comment at the top of the file */
- smp_rmb();
+ /*
+ * writes to the cq entry need to come after reading head; the
+ * control dependency is enough as we're using WRITE_ONCE to
+ * fill the cq entry
+ */
if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
return NULL;
@@ -740,7 +838,7 @@ static bool io_file_supports_async(struct file *file)
}
static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
- bool force_nonblock, struct io_submit_state *state)
+ bool force_nonblock)
{
const struct io_uring_sqe *sqe = s->sqe;
struct io_ring_ctx *ctx = req->ctx;
@@ -774,10 +872,14 @@ static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
if (unlikely(ret))
return ret;
- if (force_nonblock) {
+
+ /* don't allow async punt if RWF_NOWAIT was requested */
+ if (kiocb->ki_flags & IOCB_NOWAIT)
+ req->flags |= REQ_F_NOWAIT;
+
+ if (force_nonblock)
kiocb->ki_flags |= IOCB_NOWAIT;
- req->flags |= REQ_F_FORCE_NONBLOCK;
- }
+
if (ctx->flags & IORING_SETUP_IOPOLL) {
if (!(kiocb->ki_flags & IOCB_DIRECT) ||
!kiocb->ki_filp->f_op->iopoll)
@@ -938,7 +1040,7 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
}
static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
- bool force_nonblock, struct io_submit_state *state)
+ bool force_nonblock)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw;
@@ -947,7 +1049,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
size_t iov_count;
int ret;
- ret = io_prep_rw(req, s, force_nonblock, state);
+ ret = io_prep_rw(req, s, force_nonblock);
if (ret)
return ret;
file = kiocb->ki_filp;
@@ -985,7 +1087,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
}
static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
- bool force_nonblock, struct io_submit_state *state)
+ bool force_nonblock)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw;
@@ -994,7 +1096,7 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
size_t iov_count;
int ret;
- ret = io_prep_rw(req, s, force_nonblock, state);
+ ret = io_prep_rw(req, s, force_nonblock);
if (ret)
return ret;
@@ -1336,8 +1438,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
}
static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
- const struct sqe_submit *s, bool force_nonblock,
- struct io_submit_state *state)
+ const struct sqe_submit *s, bool force_nonblock)
{
int ret, opcode;
@@ -1353,18 +1454,18 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
case IORING_OP_READV:
if (unlikely(s->sqe->buf_index))
return -EINVAL;
- ret = io_read(req, s, force_nonblock, state);
+ ret = io_read(req, s, force_nonblock);
break;
case IORING_OP_WRITEV:
if (unlikely(s->sqe->buf_index))
return -EINVAL;
- ret = io_write(req, s, force_nonblock, state);
+ ret = io_write(req, s, force_nonblock);
break;
case IORING_OP_READ_FIXED:
- ret = io_read(req, s, force_nonblock, state);
+ ret = io_read(req, s, force_nonblock);
break;
case IORING_OP_WRITE_FIXED:
- ret = io_write(req, s, force_nonblock, state);
+ ret = io_write(req, s, force_nonblock);
break;
case IORING_OP_FSYNC:
ret = io_fsync(req, s->sqe, force_nonblock);
@@ -1437,8 +1538,7 @@ restart:
struct sqe_submit *s = &req->submit;
const struct io_uring_sqe *sqe = s->sqe;
- /* Ensure we clear previously set forced non-block flag */
- req->flags &= ~REQ_F_FORCE_NONBLOCK;
+ /* Ensure we clear previously set non-block flag */
req->rw.ki_flags &= ~IOCB_NOWAIT;
ret = 0;
@@ -1457,7 +1557,7 @@ restart:
s->has_user = cur_mm != NULL;
s->needs_lock = true;
do {
- ret = __io_submit_sqe(ctx, req, s, false, NULL);
+ ret = __io_submit_sqe(ctx, req, s, false);
/*
* We can get EAGAIN for polled IO even though
* we're forcing a sync submission from here,
@@ -1468,10 +1568,11 @@ restart:
break;
cond_resched();
} while (1);
-
- /* drop submission reference */
- io_put_req(req);
}
+
+ /* drop submission reference */
+ io_put_req(req);
+
if (ret) {
io_cqring_add_event(ctx, sqe->user_data, ret, 0);
io_put_req(req);
@@ -1623,8 +1724,8 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
if (unlikely(ret))
goto out;
- ret = __io_submit_sqe(ctx, req, s, true, state);
- if (ret == -EAGAIN) {
+ ret = __io_submit_sqe(ctx, req, s, true);
+ if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
struct io_uring_sqe *sqe_copy;
sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
@@ -1698,24 +1799,10 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
* write new data to them.
*/
smp_store_release(&ring->r.head, ctx->cached_sq_head);
-
- /*
- * write side barrier of head update, app has read side. See
- * comment at the top of this file
- */
- smp_wmb();
}
}
/*
- * Undo last io_get_sqring()
- */
-static void io_drop_sqring(struct io_ring_ctx *ctx)
-{
- ctx->cached_sq_head--;
-}
-
-/*
* Fetch an sqe, if one is available. Note that s->sqe will point to memory
* that is mapped by userspace. This means that care needs to be taken to
* ensure that reads are stable, as we cannot rely on userspace always
@@ -1737,9 +1824,8 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
* though the application is the one updating it.
*/
head = ctx->cached_sq_head;
- /* See comment at the top of this file */
- smp_rmb();
- if (head == READ_ONCE(ring->r.tail))
+ /* make sure SQ entry isn't read before tail */
+ if (head == smp_load_acquire(&ring->r.tail))
return false;
head = READ_ONCE(ring->array[head & ctx->sq_mask]);
@@ -1753,8 +1839,6 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
/* drop invalid entries */
ctx->cached_sq_head++;
ring->dropped++;
- /* See comment at the top of this file */
- smp_wmb();
return false;
}
@@ -1864,7 +1948,8 @@ static int io_sq_thread(void *data)
/* Tell userspace we may need a wakeup call */
ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
- smp_wmb();
+ /* make sure to read SQ tail after writing flags */
+ smp_mb();
if (!io_get_sqring(ctx, &sqes[0])) {
if (kthread_should_stop()) {
@@ -1877,13 +1962,11 @@ static int io_sq_thread(void *data)
finish_wait(&ctx->sqo_wait, &wait);
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
- smp_wmb();
continue;
}
finish_wait(&ctx->sqo_wait, &wait);
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
- smp_wmb();
}
i = 0;
@@ -1928,7 +2011,7 @@ static int io_sq_thread(void *data)
static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
{
struct io_submit_state state, *statep = NULL;
- int i, ret = 0, submit = 0;
+ int i, submit = 0;
if (to_submit > IO_PLUG_THRESHOLD) {
io_submit_state_start(&state, ctx, to_submit);
@@ -1937,6 +2020,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
for (i = 0; i < to_submit; i++) {
struct sqe_submit s;
+ int ret;
if (!io_get_sqring(ctx, &s))
break;
@@ -1944,21 +2028,18 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
s.has_user = true;
s.needs_lock = false;
s.needs_fixed_file = false;
+ submit++;
ret = io_submit_sqe(ctx, &s, statep);
- if (ret) {
- io_drop_sqring(ctx);
- break;
- }
-
- submit++;
+ if (ret)
+ io_cqring_add_event(ctx, s.sqe->user_data, ret, 0);
}
io_commit_sqring(ctx);
if (statep)
io_submit_state_end(statep);
- return submit ? submit : ret;
+ return submit;
}
static unsigned io_cqring_events(struct io_cq_ring *ring)
@@ -2239,10 +2320,6 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
mmgrab(current->mm);
ctx->sqo_mm = current->mm;
- ret = -EINVAL;
- if (!cpu_possible(p->sq_thread_cpu))
- goto err;
-
if (ctx->flags & IORING_SETUP_SQPOLL) {
ret = -EPERM;
if (!capable(CAP_SYS_ADMIN))
@@ -2253,11 +2330,11 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
ctx->sq_thread_idle = HZ;
if (p->flags & IORING_SETUP_SQ_AFF) {
- int cpu;
+ int cpu = array_index_nospec(p->sq_thread_cpu,
+ nr_cpu_ids);
- cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
ret = -EINVAL;
- if (!cpu_possible(p->sq_thread_cpu))
+ if (!cpu_possible(cpu))
goto err;
ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
@@ -2320,8 +2397,12 @@ static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
static void io_mem_free(void *ptr)
{
- struct page *page = virt_to_head_page(ptr);
+ struct page *page;
+
+ if (!ptr)
+ return;
+ page = virt_to_head_page(ptr);
if (put_page_testzero(page))
free_compound_page(page);
}
@@ -2362,7 +2443,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
if (ctx->account_mem)
io_unaccount_mem(ctx->user, imu->nr_bvecs);
- kfree(imu->bvec);
+ kvfree(imu->bvec);
imu->nr_bvecs = 0;
}
@@ -2454,9 +2535,9 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
if (!pages || nr_pages > got_pages) {
kfree(vmas);
kfree(pages);
- pages = kmalloc_array(nr_pages, sizeof(struct page *),
+ pages = kvmalloc_array(nr_pages, sizeof(struct page *),
GFP_KERNEL);
- vmas = kmalloc_array(nr_pages,
+ vmas = kvmalloc_array(nr_pages,
sizeof(struct vm_area_struct *),
GFP_KERNEL);
if (!pages || !vmas) {
@@ -2468,7 +2549,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
got_pages = nr_pages;
}
- imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec),
+ imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
GFP_KERNEL);
ret = -ENOMEM;
if (!imu->bvec) {
@@ -2507,6 +2588,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
}
if (ctx->account_mem)
io_unaccount_mem(ctx->user, nr_pages);
+ kvfree(imu->bvec);
goto err;
}
@@ -2529,12 +2611,12 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
ctx->nr_user_bufs++;
}
- kfree(pages);
- kfree(vmas);
+ kvfree(pages);
+ kvfree(vmas);
return 0;
err:
- kfree(pages);
- kfree(vmas);
+ kvfree(pages);
+ kvfree(vmas);
io_sqe_buffer_unregister(ctx);
return ret;
}
@@ -2572,9 +2654,13 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
__poll_t mask = 0;
poll_wait(file, &ctx->cq_wait, wait);
- /* See comment at the top of this file */
+ /*
+ * synchronizes with barrier from wq_has_sleeper call in
+ * io_commit_cqring
+ */
smp_rmb();
- if (READ_ONCE(ctx->sq_ring->r.tail) + 1 != ctx->cached_sq_head)
+ if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
+ ctx->sq_ring->ring_entries)
mask |= EPOLLOUT | EPOLLWRNORM;
if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
mask |= EPOLLIN | EPOLLRDNORM;
@@ -2685,24 +2771,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
mutex_lock(&ctx->uring_lock);
submitted = io_ring_submit(ctx, to_submit);
mutex_unlock(&ctx->uring_lock);
-
- if (submitted < 0)
- goto out_ctx;
}
if (flags & IORING_ENTER_GETEVENTS) {
unsigned nr_events = 0;
min_complete = min(min_complete, ctx->cq_entries);
- /*
- * The application could have included the 'to_submit' count
- * in how many events it wanted to wait for. If we failed to
- * submit the desired count, we may need to adjust the number
- * of events to poll/wait for.
- */
- if (submitted < to_submit)
- min_complete = min_t(unsigned, submitted, min_complete);
-
if (ctx->flags & IORING_SETUP_IOPOLL) {
mutex_lock(&ctx->uring_lock);
ret = io_iopoll_check(ctx, &nr_events, min_complete);
@@ -2748,17 +2822,12 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
return -EOVERFLOW;
ctx->sq_sqes = io_mem_alloc(size);
- if (!ctx->sq_sqes) {
- io_mem_free(ctx->sq_ring);
+ if (!ctx->sq_sqes)
return -ENOMEM;
- }
cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
- if (!cq_ring) {
- io_mem_free(ctx->sq_ring);
- io_mem_free(ctx->sq_sqes);
+ if (!cq_ring)
return -ENOMEM;
- }
ctx->cq_ring = cq_ring;
cq_ring->ring_mask = p->cq_entries - 1;
@@ -2934,6 +3003,14 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
{
int ret;
+ /*
+ * We're inside the ring mutex, if the ref is already dying, then
+ * someone else killed the ctx or is already going through
+ * io_uring_register().
+ */
+ if (percpu_ref_is_dying(&ctx->refs))
+ return -ENXIO;
+
percpu_ref_kill(&ctx->refs);
/*