diff options
author | Sean Paul <seanpaul@chromium.org> | 2019-05-22 16:08:21 -0400 |
---|---|---|
committer | Sean Paul <seanpaul@chromium.org> | 2019-05-22 16:08:21 -0400 |
commit | 374ed5429346a021c8e2d26fafce14c5b15dedd0 (patch) | |
tree | 70739e93443494993197cc11f41c0fd0a0f3aac0 /fs/io_uring.c | |
parent | 270afb37ae34fc1499d166f6edf4bc472f529d96 (diff) | |
parent | a188339ca5a396acc588e5851ed7e19f66b0ebd9 (diff) |
Merge drm/drm-next into drm-misc-next
Backmerging 5.2-rc1 to -misc-next for robher
Signed-off-by: Sean Paul <seanpaul@chromium.org>
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r-- | fs/io_uring.c | 591 |
1 files changed, 422 insertions, 169 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index 89aa8412b5f5..310f8d17c53e 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -4,15 +4,28 @@ * supporting fast/efficient IO. * * A note on the read/write ordering memory barriers that are matched between - * the application and kernel side. When the application reads the CQ ring - * tail, it must use an appropriate smp_rmb() to order with the smp_wmb() - * the kernel uses after writing the tail. Failure to do so could cause a - * delay in when the application notices that completion events available. - * This isn't a fatal condition. Likewise, the application must use an - * appropriate smp_wmb() both before writing the SQ tail, and after writing - * the SQ tail. The first one orders the sqe writes with the tail write, and - * the latter is paired with the smp_rmb() the kernel will issue before - * reading the SQ tail on submission. + * the application and kernel side. + * + * After the application reads the CQ ring tail, it must use an + * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses + * before writing the tail (using smp_load_acquire to read the tail will + * do). It also needs a smp_mb() before updating CQ head (ordering the + * entry load(s) with the head store), pairing with an implicit barrier + * through a control-dependency in io_get_cqring (smp_store_release to + * store head will do). Failure to do so could lead to reading invalid + * CQ entries. + * + * Likewise, the application must use an appropriate smp_wmb() before + * writing the SQ tail (ordering SQ entry stores with the tail store), + * which pairs with smp_load_acquire in io_get_sqring (smp_store_release + * to store the tail will do). And it needs a barrier ordering the SQ + * head load before writing new SQ entries (smp_load_acquire to read + * head will do). + * + * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application + * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after* + * updating the SQ tail; a full memory barrier smp_mb() is needed + * between. * * Also see the examples in the liburing library: * @@ -70,20 +83,108 @@ struct io_uring { u32 tail ____cacheline_aligned_in_smp; }; +/* + * This data is shared with the application through the mmap at offset + * IORING_OFF_SQ_RING. + * + * The offsets to the member fields are published through struct + * io_sqring_offsets when calling io_uring_setup. + */ struct io_sq_ring { + /* + * Head and tail offsets into the ring; the offsets need to be + * masked to get valid indices. + * + * The kernel controls head and the application controls tail. + */ struct io_uring r; + /* + * Bitmask to apply to head and tail offsets (constant, equals + * ring_entries - 1) + */ u32 ring_mask; + /* Ring size (constant, power of 2) */ u32 ring_entries; + /* + * Number of invalid entries dropped by the kernel due to + * invalid index stored in array + * + * Written by the kernel, shouldn't be modified by the + * application (i.e. get number of "new events" by comparing to + * cached value). + * + * After a new SQ head value was read by the application this + * counter includes all submissions that were dropped reaching + * the new SQ head (and possibly more). + */ u32 dropped; + /* + * Runtime flags + * + * Written by the kernel, shouldn't be modified by the + * application. + * + * The application needs a full memory barrier before checking + * for IORING_SQ_NEED_WAKEUP after updating the sq tail. + */ u32 flags; + /* + * Ring buffer of indices into array of io_uring_sqe, which is + * mmapped by the application using the IORING_OFF_SQES offset. + * + * This indirection could e.g. be used to assign fixed + * io_uring_sqe entries to operations and only submit them to + * the queue when needed. + * + * The kernel modifies neither the indices array nor the entries + * array. + */ u32 array[]; }; +/* + * This data is shared with the application through the mmap at offset + * IORING_OFF_CQ_RING. + * + * The offsets to the member fields are published through struct + * io_cqring_offsets when calling io_uring_setup. + */ struct io_cq_ring { + /* + * Head and tail offsets into the ring; the offsets need to be + * masked to get valid indices. + * + * The application controls head and the kernel tail. + */ struct io_uring r; + /* + * Bitmask to apply to head and tail offsets (constant, equals + * ring_entries - 1) + */ u32 ring_mask; + /* Ring size (constant, power of 2) */ u32 ring_entries; + /* + * Number of completion events lost because the queue was full; + * this should be avoided by the application by making sure + * there are not more requests pending thatn there is space in + * the completion queue. + * + * Written by the kernel, shouldn't be modified by the + * application (i.e. get number of "new events" by comparing to + * cached value). + * + * As completion events come in out of order this counter is not + * ordered with any other data. + */ u32 overflow; + /* + * Ring buffer of completion events. + * + * The kernel writes completion events fresh every time they are + * produced, so the application is allowed to modify pending + * entries. + */ struct io_uring_cqe cqes[]; }; @@ -121,6 +222,8 @@ struct io_ring_ctx { unsigned sq_mask; unsigned sq_thread_idle; struct io_uring_sqe *sq_sqes; + + struct list_head defer_list; } ____cacheline_aligned_in_smp; /* IO offload */ @@ -128,7 +231,6 @@ struct io_ring_ctx { struct task_struct *sqo_thread; /* if using sq thread polling */ struct mm_struct *sqo_mm; wait_queue_head_t sqo_wait; - unsigned sqo_stop; struct { /* CQ ring */ @@ -138,6 +240,7 @@ struct io_ring_ctx { unsigned cq_mask; struct wait_queue_head cq_wait; struct fasync_struct *cq_fasync; + struct eventfd_ctx *cq_ev_fd; } ____cacheline_aligned_in_smp; /* @@ -221,13 +324,15 @@ struct io_kiocb { struct list_head list; unsigned int flags; refcount_t refs; -#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */ +#define REQ_F_NOWAIT 1 /* must not punt to workers */ #define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ #define REQ_F_FIXED_FILE 4 /* ctx owns file */ #define REQ_F_SEQ_PREV 8 /* sequential with previous */ -#define REQ_F_PREPPED 16 /* prep already done */ +#define REQ_F_IO_DRAIN 16 /* drain existing IO first */ +#define REQ_F_IO_DRAINED 32 /* drain done */ u64 user_data; - u64 error; + u32 error; /* iopoll result from callback */ + u32 sequence; struct work_struct work; }; @@ -255,6 +360,8 @@ struct io_submit_state { unsigned int ios_left; }; +static void io_sq_wq_submit_work(struct work_struct *work); + static struct kmem_cache *req_cachep; static const struct file_operations io_uring_fops; @@ -306,10 +413,36 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) spin_lock_init(&ctx->completion_lock); INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->cancel_list); + INIT_LIST_HEAD(&ctx->defer_list); return ctx; } -static void io_commit_cqring(struct io_ring_ctx *ctx) +static inline bool io_sequence_defer(struct io_ring_ctx *ctx, + struct io_kiocb *req) +{ + if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) + return false; + + return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped; +} + +static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) +{ + struct io_kiocb *req; + + if (list_empty(&ctx->defer_list)) + return NULL; + + req = list_first_entry(&ctx->defer_list, struct io_kiocb, list); + if (!io_sequence_defer(ctx, req)) { + list_del_init(&req->list); + return req; + } + + return NULL; +} + +static void __io_commit_cqring(struct io_ring_ctx *ctx) { struct io_cq_ring *ring = ctx->cq_ring; @@ -317,12 +450,6 @@ static void io_commit_cqring(struct io_ring_ctx *ctx) /* order cqe stores with ring update */ smp_store_release(&ring->r.tail, ctx->cached_cq_tail); - /* - * Write sider barrier of tail update, app has read side. See - * comment at the top of this file. - */ - smp_wmb(); - if (wq_has_sleeper(&ctx->cq_wait)) { wake_up_interruptible(&ctx->cq_wait); kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); @@ -330,15 +457,30 @@ static void io_commit_cqring(struct io_ring_ctx *ctx) } } +static void io_commit_cqring(struct io_ring_ctx *ctx) +{ + struct io_kiocb *req; + + __io_commit_cqring(ctx); + + while ((req = io_get_deferred_req(ctx)) != NULL) { + req->flags |= REQ_F_IO_DRAINED; + queue_work(ctx->sqo_wq, &req->work); + } +} + static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) { struct io_cq_ring *ring = ctx->cq_ring; unsigned tail; tail = ctx->cached_cq_tail; - /* See comment at the top of the file */ - smp_rmb(); - if (tail + 1 == READ_ONCE(ring->r.head)) + /* + * writes to the cq entry need to come after reading head; the + * control dependency is enough as we're using WRITE_ONCE to + * fill the cq entry + */ + if (tail - READ_ONCE(ring->r.head) == ring->ring_entries) return NULL; ctx->cached_cq_tail++; @@ -346,7 +488,7 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) } static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, - long res, unsigned ev_flags) + long res) { struct io_uring_cqe *cqe; @@ -359,7 +501,7 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, if (cqe) { WRITE_ONCE(cqe->user_data, ki_user_data); WRITE_ONCE(cqe->res, res); - WRITE_ONCE(cqe->flags, ev_flags); + WRITE_ONCE(cqe->flags, 0); } else { unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); @@ -373,15 +515,17 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx) wake_up(&ctx->wait); if (waitqueue_active(&ctx->sqo_wait)) wake_up(&ctx->sqo_wait); + if (ctx->cq_ev_fd) + eventfd_signal(ctx->cq_ev_fd, 1); } static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data, - long res, unsigned ev_flags) + long res) { unsigned long flags; spin_lock_irqsave(&ctx->completion_lock, flags); - io_cqring_fill_event(ctx, user_data, res, ev_flags); + io_cqring_fill_event(ctx, user_data, res); io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); @@ -483,7 +627,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, req = list_first_entry(done, struct io_kiocb, list); list_del(&req->list); - io_cqring_fill_event(ctx, req->user_data, req->error, 0); + io_cqring_fill_event(ctx, req->user_data, req->error); (*nr_events)++; if (refcount_dec_and_test(&req->refs)) { @@ -631,7 +775,7 @@ static void io_complete_rw(struct kiocb *kiocb, long res, long res2) kiocb_end_write(kiocb); - io_cqring_add_event(req->ctx, req->user_data, res, 0); + io_cqring_add_event(req->ctx, req->user_data, res); io_put_req(req); } @@ -682,11 +826,9 @@ static void io_iopoll_req_issued(struct io_kiocb *req) list_add_tail(&req->list, &ctx->poll_list); } -static void io_file_put(struct io_submit_state *state, struct file *file) +static void io_file_put(struct io_submit_state *state) { - if (!state) { - fput(file); - } else if (state->file) { + if (state->file) { int diff = state->has_refs - state->used_refs; if (diff) @@ -711,7 +853,7 @@ static struct file *io_file_get(struct io_submit_state *state, int fd) state->ios_left--; return state->file; } - io_file_put(state, NULL); + io_file_put(state); } state->file = fget_many(fd, state->ios_left); if (!state->file) @@ -742,7 +884,7 @@ static bool io_file_supports_async(struct file *file) } static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, - bool force_nonblock, struct io_submit_state *state) + bool force_nonblock) { const struct io_uring_sqe *sqe = s->sqe; struct io_ring_ctx *ctx = req->ctx; @@ -752,9 +894,6 @@ static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, if (!req->file) return -EBADF; - /* For -EAGAIN retry, everything is already prepped */ - if (req->flags & REQ_F_PREPPED) - return 0; if (force_nonblock && !io_file_supports_async(req->file)) force_nonblock = false; @@ -776,10 +915,14 @@ static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags)); if (unlikely(ret)) return ret; - if (force_nonblock) { + + /* don't allow async punt if RWF_NOWAIT was requested */ + if (kiocb->ki_flags & IOCB_NOWAIT) + req->flags |= REQ_F_NOWAIT; + + if (force_nonblock) kiocb->ki_flags |= IOCB_NOWAIT; - req->flags |= REQ_F_FORCE_NONBLOCK; - } + if (ctx->flags & IORING_SETUP_IOPOLL) { if (!(kiocb->ki_flags & IOCB_DIRECT) || !kiocb->ki_filp->f_op->iopoll) @@ -793,7 +936,6 @@ static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, return -EINVAL; kiocb->ki_complete = io_complete_rw; } - req->flags |= REQ_F_PREPPED; return 0; } @@ -940,7 +1082,7 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) } static int io_read(struct io_kiocb *req, const struct sqe_submit *s, - bool force_nonblock, struct io_submit_state *state) + bool force_nonblock) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw; @@ -949,7 +1091,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s, size_t iov_count; int ret; - ret = io_prep_rw(req, s, force_nonblock, state); + ret = io_prep_rw(req, s, force_nonblock); if (ret) return ret; file = kiocb->ki_filp; @@ -987,7 +1129,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s, } static int io_write(struct io_kiocb *req, const struct sqe_submit *s, - bool force_nonblock, struct io_submit_state *state) + bool force_nonblock) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw; @@ -996,7 +1138,7 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s, size_t iov_count; int ret; - ret = io_prep_rw(req, s, force_nonblock, state); + ret = io_prep_rw(req, s, force_nonblock); if (ret) return ret; @@ -1068,7 +1210,7 @@ static int io_nop(struct io_kiocb *req, u64 user_data) if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; - io_cqring_add_event(ctx, user_data, err, 0); + io_cqring_add_event(ctx, user_data, err); io_put_req(req); return 0; } @@ -1079,16 +1221,12 @@ static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) if (!req->file) return -EBADF; - /* Prep already done (EAGAIN retry) */ - if (req->flags & REQ_F_PREPPED) - return 0; if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) return -EINVAL; - req->flags |= REQ_F_PREPPED; return 0; } @@ -1117,7 +1255,51 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, end > 0 ? end : LLONG_MAX, fsync_flags & IORING_FSYNC_DATASYNC); - io_cqring_add_event(req->ctx, sqe->user_data, ret, 0); + io_cqring_add_event(req->ctx, sqe->user_data, ret); + io_put_req(req); + return 0; +} + +static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_ring_ctx *ctx = req->ctx; + int ret = 0; + + if (!req->file) + return -EBADF; + + if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) + return -EINVAL; + if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) + return -EINVAL; + + return ret; +} + +static int io_sync_file_range(struct io_kiocb *req, + const struct io_uring_sqe *sqe, + bool force_nonblock) +{ + loff_t sqe_off; + loff_t sqe_len; + unsigned flags; + int ret; + + ret = io_prep_sfr(req, sqe); + if (ret) + return ret; + + /* sync_file_range always requires a blocking context */ + if (force_nonblock) + return -EAGAIN; + + sqe_off = READ_ONCE(sqe->off); + sqe_len = READ_ONCE(sqe->len); + flags = READ_ONCE(sqe->sync_range_flags); + + ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags); + + io_cqring_add_event(req->ctx, sqe->user_data, ret); io_put_req(req); return 0; } @@ -1175,7 +1357,7 @@ static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) } spin_unlock_irq(&ctx->completion_lock); - io_cqring_add_event(req->ctx, sqe->user_data, ret, 0); + io_cqring_add_event(req->ctx, sqe->user_data, ret); io_put_req(req); return 0; } @@ -1184,7 +1366,7 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req, __poll_t mask) { req->poll.done = true; - io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask), 0); + io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask)); io_commit_cqring(ctx); } @@ -1324,7 +1506,6 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) spin_unlock(&poll->head->lock); } if (mask) { /* no async, we'd stolen it */ - req->error = mangle_poll(mask); ipt.error = 0; io_poll_complete(ctx, req, mask); } @@ -1337,9 +1518,36 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) return ipt.error; } +static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + struct io_uring_sqe *sqe_copy; + + if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) + return 0; + + sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); + if (!sqe_copy) + return -EAGAIN; + + spin_lock_irq(&ctx->completion_lock); + if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) { + spin_unlock_irq(&ctx->completion_lock); + kfree(sqe_copy); + return 0; + } + + memcpy(sqe_copy, sqe, sizeof(*sqe_copy)); + req->submit.sqe = sqe_copy; + + INIT_WORK(&req->work, io_sq_wq_submit_work); + list_add_tail(&req->list, &ctx->defer_list); + spin_unlock_irq(&ctx->completion_lock); + return -EIOCBQUEUED; +} + static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, - const struct sqe_submit *s, bool force_nonblock, - struct io_submit_state *state) + const struct sqe_submit *s, bool force_nonblock) { int ret, opcode; @@ -1355,18 +1563,18 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, case IORING_OP_READV: if (unlikely(s->sqe->buf_index)) return -EINVAL; - ret = io_read(req, s, force_nonblock, state); + ret = io_read(req, s, force_nonblock); break; case IORING_OP_WRITEV: if (unlikely(s->sqe->buf_index)) return -EINVAL; - ret = io_write(req, s, force_nonblock, state); + ret = io_write(req, s, force_nonblock); break; case IORING_OP_READ_FIXED: - ret = io_read(req, s, force_nonblock, state); + ret = io_read(req, s, force_nonblock); break; case IORING_OP_WRITE_FIXED: - ret = io_write(req, s, force_nonblock, state); + ret = io_write(req, s, force_nonblock); break; case IORING_OP_FSYNC: ret = io_fsync(req, s->sqe, force_nonblock); @@ -1377,6 +1585,9 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, case IORING_OP_POLL_REMOVE: ret = io_poll_remove(req, s->sqe); break; + case IORING_OP_SYNC_FILE_RANGE: + ret = io_sync_file_range(req, s->sqe, force_nonblock); + break; default: ret = -EINVAL; break; @@ -1439,8 +1650,7 @@ restart: struct sqe_submit *s = &req->submit; const struct io_uring_sqe *sqe = s->sqe; - /* Ensure we clear previously set forced non-block flag */ - req->flags &= ~REQ_F_FORCE_NONBLOCK; + /* Ensure we clear previously set non-block flag */ req->rw.ki_flags &= ~IOCB_NOWAIT; ret = 0; @@ -1459,7 +1669,7 @@ restart: s->has_user = cur_mm != NULL; s->needs_lock = true; do { - ret = __io_submit_sqe(ctx, req, s, false, NULL); + ret = __io_submit_sqe(ctx, req, s, false); /* * We can get EAGAIN for polled IO even though * we're forcing a sync submission from here, @@ -1470,12 +1680,13 @@ restart: break; cond_resched(); } while (1); - - /* drop submission reference */ - io_put_req(req); } + + /* drop submission reference */ + io_put_req(req); + if (ret) { - io_cqring_add_event(ctx, sqe->user_data, ret, 0); + io_cqring_add_event(ctx, sqe->user_data, ret); io_put_req(req); } @@ -1585,6 +1796,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, flags = READ_ONCE(s->sqe->flags); fd = READ_ONCE(s->sqe->fd); + if (flags & IOSQE_IO_DRAIN) { + req->flags |= REQ_F_IO_DRAIN; + req->sequence = ctx->cached_sq_head - 1; + } + if (!io_op_needs_file(s->sqe)) { req->file = NULL; return 0; @@ -1614,7 +1830,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, int ret; /* enforce forwards compatibility on users */ - if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE)) + if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN))) return -EINVAL; req = io_get_req(ctx, state); @@ -1625,8 +1841,15 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, if (unlikely(ret)) goto out; - ret = __io_submit_sqe(ctx, req, s, true, state); - if (ret == -EAGAIN) { + ret = io_req_defer(ctx, req, s->sqe); + if (ret) { + if (ret == -EIOCBQUEUED) + ret = 0; + return ret; + } + + ret = __io_submit_sqe(ctx, req, s, true); + if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { struct io_uring_sqe *sqe_copy; sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); @@ -1671,7 +1894,7 @@ out: static void io_submit_state_end(struct io_submit_state *state) { blk_finish_plug(&state->plug); - io_file_put(state, NULL); + io_file_put(state); if (state->free_reqs) kmem_cache_free_bulk(req_cachep, state->free_reqs, &state->reqs[state->cur_req]); @@ -1700,24 +1923,10 @@ static void io_commit_sqring(struct io_ring_ctx *ctx) * write new data to them. */ smp_store_release(&ring->r.head, ctx->cached_sq_head); - - /* - * write side barrier of head update, app has read side. See - * comment at the top of this file - */ - smp_wmb(); } } /* - * Undo last io_get_sqring() - */ -static void io_drop_sqring(struct io_ring_ctx *ctx) -{ - ctx->cached_sq_head--; -} - -/* * Fetch an sqe, if one is available. Note that s->sqe will point to memory * that is mapped by userspace. This means that care needs to be taken to * ensure that reads are stable, as we cannot rely on userspace always @@ -1739,9 +1948,8 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) * though the application is the one updating it. */ head = ctx->cached_sq_head; - /* See comment at the top of this file */ - smp_rmb(); - if (head == READ_ONCE(ring->r.tail)) + /* make sure SQ entry isn't read before tail */ + if (head == smp_load_acquire(&ring->r.tail)) return false; head = READ_ONCE(ring->array[head & ctx->sq_mask]); @@ -1755,8 +1963,6 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) /* drop invalid entries */ ctx->cached_sq_head++; ring->dropped++; - /* See comment at the top of this file */ - smp_wmb(); return false; } @@ -1785,7 +1991,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, continue; } - io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0); + io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret); } if (statep) @@ -1808,7 +2014,7 @@ static int io_sq_thread(void *data) set_fs(USER_DS); timeout = inflight = 0; - while (!kthread_should_stop() && !ctx->sqo_stop) { + while (!kthread_should_park()) { bool all_fixed, mm_fault = false; int i; @@ -1866,10 +2072,11 @@ static int io_sq_thread(void *data) /* Tell userspace we may need a wakeup call */ ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; - smp_wmb(); + /* make sure to read SQ tail after writing flags */ + smp_mb(); if (!io_get_sqring(ctx, &sqes[0])) { - if (kthread_should_stop()) { + if (kthread_should_park()) { finish_wait(&ctx->sqo_wait, &wait); break; } @@ -1879,13 +2086,11 @@ static int io_sq_thread(void *data) finish_wait(&ctx->sqo_wait, &wait); ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; - smp_wmb(); continue; } finish_wait(&ctx->sqo_wait, &wait); ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; - smp_wmb(); } i = 0; @@ -1920,13 +2125,16 @@ static int io_sq_thread(void *data) unuse_mm(cur_mm); mmput(cur_mm); } + + kthread_parkme(); + return 0; } static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) { struct io_submit_state state, *statep = NULL; - int i, ret = 0, submit = 0; + int i, submit = 0; if (to_submit > IO_PLUG_THRESHOLD) { io_submit_state_start(&state, ctx, to_submit); @@ -1935,6 +2143,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) for (i = 0; i < to_submit; i++) { struct sqe_submit s; + int ret; if (!io_get_sqring(ctx, &s)) break; @@ -1942,25 +2151,24 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) s.has_user = true; s.needs_lock = false; s.needs_fixed_file = false; + submit++; ret = io_submit_sqe(ctx, &s, statep); - if (ret) { - io_drop_sqring(ctx); - break; - } - - submit++; + if (ret) + io_cqring_add_event(ctx, s.sqe->user_data, ret); } io_commit_sqring(ctx); if (statep) io_submit_state_end(statep); - return submit ? submit : ret; + return submit; } static unsigned io_cqring_events(struct io_cq_ring *ring) { + /* See comment at the top of this file */ + smp_rmb(); return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); } @@ -1973,11 +2181,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, { struct io_cq_ring *ring = ctx->cq_ring; sigset_t ksigmask, sigsaved; - DEFINE_WAIT(wait); int ret; - /* See comment at the top of this file */ - smp_rmb(); if (io_cqring_events(ring) >= min_events) return 0; @@ -1995,23 +2200,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; } - do { - prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE); - - ret = 0; - /* See comment at the top of this file */ - smp_rmb(); - if (io_cqring_events(ring) >= min_events) - break; - - schedule(); - + ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events); + if (ret == -ERESTARTSYS) ret = -EINTR; - if (signal_pending(current)) - break; - } while (1); - - finish_wait(&ctx->wait, &wait); if (sig) restore_user_sigmask(sig, &sigsaved); @@ -2052,8 +2243,12 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) static void io_sq_thread_stop(struct io_ring_ctx *ctx) { if (ctx->sqo_thread) { - ctx->sqo_stop = 1; - mb(); + /* + * The park is a bit of a work-around, without it we get + * warning spews on shutdown with SQPOLL set and affinity + * set to a single CPU. + */ + kthread_park(ctx->sqo_thread); kthread_stop(ctx->sqo_thread); ctx->sqo_thread = NULL; } @@ -2141,7 +2336,6 @@ static int io_sqe_files_scm(struct io_ring_ctx *ctx) left = ctx->nr_user_files; while (left) { unsigned this_files = min_t(unsigned, left, SCM_MAX_FD); - int ret; ret = __io_sqe_files_scm(ctx, this_files, total); if (ret) @@ -2236,23 +2430,24 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx, mmgrab(current->mm); ctx->sqo_mm = current->mm; - ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); - if (!ctx->sq_thread_idle) - ctx->sq_thread_idle = HZ; - - ret = -EINVAL; - if (!cpu_possible(p->sq_thread_cpu)) - goto err; - if (ctx->flags & IORING_SETUP_SQPOLL) { ret = -EPERM; if (!capable(CAP_SYS_ADMIN)) goto err; + ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); + if (!ctx->sq_thread_idle) + ctx->sq_thread_idle = HZ; + if (p->flags & IORING_SETUP_SQ_AFF) { - int cpu; + int cpu = p->sq_thread_cpu; + + ret = -EINVAL; + if (cpu >= nr_cpu_ids) + goto err; + if (!cpu_online(cpu)) + goto err; - cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS); ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread, ctx, cpu, "io_uring-sq"); @@ -2313,8 +2508,12 @@ static int io_account_mem(struct user_struct *user, unsigned long nr_pages) static void io_mem_free(void *ptr) { - struct page *page = virt_to_head_page(ptr); + struct page *page; + if (!ptr) + return; + + page = virt_to_head_page(ptr); if (put_page_testzero(page)) free_compound_page(page); } @@ -2355,7 +2554,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) if (ctx->account_mem) io_unaccount_mem(ctx->user, imu->nr_bvecs); - kfree(imu->bvec); + kvfree(imu->bvec); imu->nr_bvecs = 0; } @@ -2447,9 +2646,9 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, if (!pages || nr_pages > got_pages) { kfree(vmas); kfree(pages); - pages = kmalloc_array(nr_pages, sizeof(struct page *), + pages = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL); - vmas = kmalloc_array(nr_pages, + vmas = kvmalloc_array(nr_pages, sizeof(struct vm_area_struct *), GFP_KERNEL); if (!pages || !vmas) { @@ -2461,7 +2660,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, got_pages = nr_pages; } - imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec), + imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec), GFP_KERNEL); ret = -ENOMEM; if (!imu->bvec) { @@ -2472,8 +2671,9 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, ret = 0; down_read(¤t->mm->mmap_sem); - pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE, - pages, vmas); + pret = get_user_pages(ubuf, nr_pages, + FOLL_WRITE | FOLL_LONGTERM, + pages, vmas); if (pret == nr_pages) { /* don't support file backed memory */ for (j = 0; j < nr_pages; j++) { @@ -2500,6 +2700,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, } if (ctx->account_mem) io_unaccount_mem(ctx->user, nr_pages); + kvfree(imu->bvec); goto err; } @@ -2522,16 +2723,48 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, ctx->nr_user_bufs++; } - kfree(pages); - kfree(vmas); + kvfree(pages); + kvfree(vmas); return 0; err: - kfree(pages); - kfree(vmas); + kvfree(pages); + kvfree(vmas); io_sqe_buffer_unregister(ctx); return ret; } +static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg) +{ + __s32 __user *fds = arg; + int fd; + + if (ctx->cq_ev_fd) + return -EBUSY; + + if (copy_from_user(&fd, fds, sizeof(*fds))) + return -EFAULT; + + ctx->cq_ev_fd = eventfd_ctx_fdget(fd); + if (IS_ERR(ctx->cq_ev_fd)) { + int ret = PTR_ERR(ctx->cq_ev_fd); + ctx->cq_ev_fd = NULL; + return ret; + } + + return 0; +} + +static int io_eventfd_unregister(struct io_ring_ctx *ctx) +{ + if (ctx->cq_ev_fd) { + eventfd_ctx_put(ctx->cq_ev_fd); + ctx->cq_ev_fd = NULL; + return 0; + } + + return -ENXIO; +} + static void io_ring_ctx_free(struct io_ring_ctx *ctx) { io_finish_async(ctx); @@ -2541,6 +2774,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) io_iopoll_reap_events(ctx); io_sqe_buffer_unregister(ctx); io_sqe_files_unregister(ctx); + io_eventfd_unregister(ctx); #if defined(CONFIG_UNIX) if (ctx->ring_sock) @@ -2565,9 +2799,13 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) __poll_t mask = 0; poll_wait(file, &ctx->cq_wait, wait); - /* See comment at the top of this file */ + /* + * synchronizes with barrier from wq_has_sleeper call in + * io_commit_cqring + */ smp_rmb(); - if (READ_ONCE(ctx->sq_ring->r.tail) + 1 != ctx->cached_sq_head) + if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head != + ctx->sq_ring->ring_entries) mask |= EPOLLOUT | EPOLLWRNORM; if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) mask |= EPOLLIN | EPOLLRDNORM; @@ -2678,24 +2916,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, mutex_lock(&ctx->uring_lock); submitted = io_ring_submit(ctx, to_submit); mutex_unlock(&ctx->uring_lock); - - if (submitted < 0) - goto out_ctx; } if (flags & IORING_ENTER_GETEVENTS) { unsigned nr_events = 0; min_complete = min(min_complete, ctx->cq_entries); - /* - * The application could have included the 'to_submit' count - * in how many events it wanted to wait for. If we failed to - * submit the desired count, we may need to adjust the number - * of events to poll/wait for. - */ - if (submitted < to_submit) - min_complete = min_t(unsigned, submitted, min_complete); - if (ctx->flags & IORING_SETUP_IOPOLL) { mutex_lock(&ctx->uring_lock); ret = io_iopoll_check(ctx, &nr_events, min_complete); @@ -2741,17 +2967,12 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, return -EOVERFLOW; ctx->sq_sqes = io_mem_alloc(size); - if (!ctx->sq_sqes) { - io_mem_free(ctx->sq_ring); + if (!ctx->sq_sqes) return -ENOMEM; - } cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); - if (!cq_ring) { - io_mem_free(ctx->sq_ring); - io_mem_free(ctx->sq_sqes); + if (!cq_ring) return -ENOMEM; - } ctx->cq_ring = cq_ring; cq_ring->ring_mask = p->cq_entries - 1; @@ -2922,11 +3143,31 @@ SYSCALL_DEFINE2(io_uring_setup, u32, entries, static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, void __user *arg, unsigned nr_args) + __releases(ctx->uring_lock) + __acquires(ctx->uring_lock) { int ret; + /* + * We're inside the ring mutex, if the ref is already dying, then + * someone else killed the ctx or is already going through + * io_uring_register(). + */ + if (percpu_ref_is_dying(&ctx->refs)) + return -ENXIO; + percpu_ref_kill(&ctx->refs); + + /* + * Drop uring mutex before waiting for references to exit. If another + * thread is currently inside io_uring_enter() it might need to grab + * the uring_lock to make progress. If we hold it here across the drain + * wait, then we can deadlock. It's safe to drop the mutex here, since + * no new references will come in after we've killed the percpu ref. + */ + mutex_unlock(&ctx->uring_lock); wait_for_completion(&ctx->ctx_done); + mutex_lock(&ctx->uring_lock); switch (opcode) { case IORING_REGISTER_BUFFERS: @@ -2947,6 +3188,18 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_sqe_files_unregister(ctx); break; + case IORING_REGISTER_EVENTFD: + ret = -EINVAL; + if (nr_args != 1) + break; + ret = io_eventfd_register(ctx, arg); + break; + case IORING_UNREGISTER_EVENTFD: + ret = -EINVAL; + if (arg || nr_args) + break; + ret = io_eventfd_unregister(ctx); + break; default: ret = -EINVAL; break; |