From 29de5f6a350778a621a748cecc7efbb8f0cfa5a7 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 20 Feb 2020 09:56:08 -0700 Subject: io_uring: consider any io_read/write -EAGAIN as final If the -EAGAIN happens because of a static condition, then a poll or later retry won't fix it. We must call it again from blocking condition. Play it safe and ensure that any -EAGAIN condition from read or write must retry from async context. Signed-off-by: Jens Axboe --- fs/io_uring.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 6a595c13e108..64b4519aabf8 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2234,7 +2234,7 @@ static int io_read(struct io_kiocb *req, struct io_kiocb **nxt, /* Ensure we clear previously set non-block flag */ if (!force_nonblock) - req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT; + kiocb->ki_flags &= ~IOCB_NOWAIT; req->result = 0; io_size = ret; @@ -2245,10 +2245,8 @@ static int io_read(struct io_kiocb *req, struct io_kiocb **nxt, * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so * we know to async punt it even if it was opened O_NONBLOCK */ - if (force_nonblock && !io_file_supports_async(req->file)) { - req->flags |= REQ_F_MUST_PUNT; + if (force_nonblock && !io_file_supports_async(req->file)) goto copy_iov; - } iov_count = iov_iter_count(&iter); ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count); @@ -2269,6 +2267,9 @@ copy_iov: inline_vecs, &iter); if (ret) goto out_free; + /* any defer here is final, must blocking retry */ + if (!(req->flags & REQ_F_NOWAIT)) + req->flags |= REQ_F_MUST_PUNT; return -EAGAIN; } } @@ -2334,10 +2335,8 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt, * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so * we know to async punt it even if it was opened O_NONBLOCK */ - if (force_nonblock && !io_file_supports_async(req->file)) { - req->flags |= REQ_F_MUST_PUNT; + if (force_nonblock && !io_file_supports_async(req->file)) goto copy_iov; - } /* file path doesn't support NOWAIT for non-direct_IO */ if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) && @@ -2382,6 +2381,8 @@ copy_iov: inline_vecs, &iter); if (ret) goto out_free; + /* any defer here is final, must blocking retry */ + req->flags |= REQ_F_MUST_PUNT; return -EAGAIN; } } -- cgit From e441d1cf20e1b9fc443e6130488d41e1941aae82 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 20 Feb 2020 09:59:02 -0700 Subject: io_uring: io_accept() should hold on to submit reference on retry Don't drop an early reference, hang on to it and let the caller drop it. This makes it behave more like "regular" requests. Signed-off-by: Jens Axboe --- fs/io_uring.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 64b4519aabf8..2bf954a42586 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3354,6 +3354,8 @@ static void io_accept_finish(struct io_wq_work **workptr) struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); struct io_kiocb *nxt = NULL; + io_put_req(req); + if (io_req_cancelled(req)) return; __io_accept(req, &nxt, false); @@ -3371,7 +3373,6 @@ static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt, ret = __io_accept(req, nxt, force_nonblock); if (ret == -EAGAIN && force_nonblock) { req->work.func = io_accept_finish; - io_put_req(req); return -EAGAIN; } return 0; -- cgit From 5ea62161167eb8297249d3f4dc63741016f01413 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 24 Feb 2020 11:30:16 +0300 Subject: io_uring: don't call work.func from sync ctx Many operations define custom work.func before getting into an io-wq. There are several points against: - it calls io_wq_assign_next() from outside io-wq, that may be confusing - sync context would go unnecessary through io_req_cancelled() - prototypes are quite different, so work!=old_work looks strange - makes async/sync responsibilities fuzzy - adds extra overhead Don't call generic path and io-wq handlers from each other, but use helpers instead Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 76 +++++++++++++++++++++++++++++------------------------------ 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 2bf954a42586..83ae190a3d31 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2462,23 +2462,28 @@ static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) } } -static void io_fsync_finish(struct io_wq_work **workptr) +static void __io_fsync(struct io_kiocb *req, struct io_kiocb **nxt) { - struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); loff_t end = req->sync.off + req->sync.len; - struct io_kiocb *nxt = NULL; int ret; - if (io_req_cancelled(req)) - return; - ret = vfs_fsync_range(req->file, req->sync.off, end > 0 ? end : LLONG_MAX, req->sync.flags & IORING_FSYNC_DATASYNC); if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, &nxt); + io_put_req_find_next(req, nxt); +} + +static void io_fsync_finish(struct io_wq_work **workptr) +{ + struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); + struct io_kiocb *nxt = NULL; + + if (io_req_cancelled(req)) + return; + __io_fsync(req, &nxt); if (nxt) io_wq_assign_next(workptr, nxt); } @@ -2486,26 +2491,18 @@ static void io_fsync_finish(struct io_wq_work **workptr) static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt, bool force_nonblock) { - struct io_wq_work *work, *old_work; - /* fsync always requires a blocking context */ if (force_nonblock) { io_put_req(req); req->work.func = io_fsync_finish; return -EAGAIN; } - - work = old_work = &req->work; - io_fsync_finish(&work); - if (work && work != old_work) - *nxt = container_of(work, struct io_kiocb, work); + __io_fsync(req, nxt); return 0; } -static void io_fallocate_finish(struct io_wq_work **workptr) +static void __io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt) { - struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct io_kiocb *nxt = NULL; int ret; if (io_req_cancelled(req)) @@ -2516,7 +2513,15 @@ static void io_fallocate_finish(struct io_wq_work **workptr) if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, &nxt); + io_put_req_find_next(req, nxt); +} + +static void io_fallocate_finish(struct io_wq_work **workptr) +{ + struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); + struct io_kiocb *nxt = NULL; + + __io_fallocate(req, &nxt); if (nxt) io_wq_assign_next(workptr, nxt); } @@ -2536,8 +2541,6 @@ static int io_fallocate_prep(struct io_kiocb *req, static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt, bool force_nonblock) { - struct io_wq_work *work, *old_work; - /* fallocate always requiring blocking context */ if (force_nonblock) { io_put_req(req); @@ -2545,11 +2548,7 @@ static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt, return -EAGAIN; } - work = old_work = &req->work; - io_fallocate_finish(&work); - if (work && work != old_work) - *nxt = container_of(work, struct io_kiocb, work); - + __io_fallocate(req, nxt); return 0; } @@ -2953,21 +2952,27 @@ static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) return 0; } -static void io_sync_file_range_finish(struct io_wq_work **workptr) +static void __io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt) { - struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct io_kiocb *nxt = NULL; int ret; - if (io_req_cancelled(req)) - return; - ret = sync_file_range(req->file, req->sync.off, req->sync.len, req->sync.flags); if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, &nxt); + io_put_req_find_next(req, nxt); +} + + +static void io_sync_file_range_finish(struct io_wq_work **workptr) +{ + struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); + struct io_kiocb *nxt = NULL; + + if (io_req_cancelled(req)) + return; + __io_sync_file_range(req, &nxt); if (nxt) io_wq_assign_next(workptr, nxt); } @@ -2975,8 +2980,6 @@ static void io_sync_file_range_finish(struct io_wq_work **workptr) static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt, bool force_nonblock) { - struct io_wq_work *work, *old_work; - /* sync_file_range always requires a blocking context */ if (force_nonblock) { io_put_req(req); @@ -2984,10 +2987,7 @@ static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt, return -EAGAIN; } - work = old_work = &req->work; - io_sync_file_range_finish(&work); - if (work && work != old_work) - *nxt = container_of(work, struct io_kiocb, work); + __io_sync_file_range(req, nxt); return 0; } -- cgit From deb6dc0544884067b93bbf9a4716be323103b911 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 24 Feb 2020 11:30:17 +0300 Subject: io_uring: don't do full *prep_worker() from io-wq io_prep_async_worker() called io_wq_assign_next() do many useless checks: io_req_work_grab_env() was already called during prep, and @do_hashed is not ever used. Add io_prep_next_work() -- simplified version, that can be called io-wq. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 83ae190a3d31..6f085215be13 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -950,6 +950,17 @@ static inline void io_req_work_drop_env(struct io_kiocb *req) } } +static inline void io_prep_next_work(struct io_kiocb *req, + struct io_kiocb **link) +{ + const struct io_op_def *def = &io_op_defs[req->opcode]; + + if (!(req->flags & REQ_F_ISREG) && def->unbound_nonreg_file) + req->work.flags |= IO_WQ_WORK_UNBOUND; + + *link = io_prep_linked_timeout(req); +} + static inline bool io_prep_async_work(struct io_kiocb *req, struct io_kiocb **link) { @@ -2453,7 +2464,7 @@ static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) { struct io_kiocb *link; - io_prep_async_work(nxt, &link); + io_prep_next_work(nxt, &link); *workptr = &nxt->work; if (link) { nxt->work.flags |= IO_WQ_WORK_CB; -- cgit From bcaec089c5b64953f96a59089598643911765a43 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 24 Feb 2020 11:30:18 +0300 Subject: io_uring: remove req->in_async req->in_async is not really needed, it only prevents propagation of @nxt for fast not-blocked submissions. Remove it. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 6f085215be13..5f2c0afefae1 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -551,7 +551,6 @@ struct io_kiocb { * llist_node is only used for poll deferred completions */ struct llist_node llist_node; - bool in_async; bool needs_fixed_file; u8 opcode; @@ -1973,14 +1972,13 @@ static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) } } -static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt, - bool in_async) +static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt) { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); if (req->flags & REQ_F_CUR_POS) req->file->f_pos = kiocb->ki_pos; - if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw) + if (ret >= 0 && kiocb->ki_complete == io_complete_rw) *nxt = __io_complete_rw(kiocb, ret); else io_rw_done(kiocb, ret); @@ -2271,7 +2269,7 @@ static int io_read(struct io_kiocb *req, struct io_kiocb **nxt, /* Catch -EAGAIN return for forced non-blocking submission */ if (!force_nonblock || ret2 != -EAGAIN) { - kiocb_done(kiocb, ret2, nxt, req->in_async); + kiocb_done(kiocb, ret2, nxt); } else { copy_iov: ret = io_setup_async_rw(req, io_size, iovec, @@ -2385,7 +2383,7 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt, if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT)) ret2 = -EAGAIN; if (!force_nonblock || ret2 != -EAGAIN) { - kiocb_done(kiocb, ret2, nxt, req->in_async); + kiocb_done(kiocb, ret2, nxt); } else { copy_iov: ret = io_setup_async_rw(req, io_size, iovec, @@ -4535,7 +4533,6 @@ static void io_wq_submit_work(struct io_wq_work **workptr) } if (!ret) { - req->in_async = true; do { ret = io_issue_sqe(req, NULL, &nxt, false); /* @@ -5077,7 +5074,6 @@ fail_req: *mm = ctx->sqo_mm; } - req->in_async = async; req->needs_fixed_file = async; trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data, true, async); -- cgit From 444ebb5768c5c43aadfc60111fecd6c4f946e77b Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 24 Feb 2020 11:32:43 +0300 Subject: splice: make do_splice public Make do_splice(), so other kernel parts can reuse it Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/splice.c | 6 +++--- include/linux/splice.h | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/fs/splice.c b/fs/splice.c index d671936d0aad..4735defc46ee 100644 --- a/fs/splice.c +++ b/fs/splice.c @@ -1109,9 +1109,9 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe, /* * Determine where to splice to/from. */ -static long do_splice(struct file *in, loff_t __user *off_in, - struct file *out, loff_t __user *off_out, - size_t len, unsigned int flags) +long do_splice(struct file *in, loff_t __user *off_in, + struct file *out, loff_t __user *off_out, + size_t len, unsigned int flags) { struct pipe_inode_info *ipipe; struct pipe_inode_info *opipe; diff --git a/include/linux/splice.h b/include/linux/splice.h index 74b4911ac16d..ebbbfea48aa0 100644 --- a/include/linux/splice.h +++ b/include/linux/splice.h @@ -78,6 +78,9 @@ extern ssize_t add_to_pipe(struct pipe_inode_info *, struct pipe_buffer *); extern ssize_t splice_direct_to_actor(struct file *, struct splice_desc *, splice_direct_actor *); +extern long do_splice(struct file *in, loff_t __user *off_in, + struct file *out, loff_t __user *off_out, + size_t len, unsigned int flags); /* * for dynamic pipe sizing -- cgit From 8da11c19940ddbc22fc835bce3f361f4d2417fb0 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 24 Feb 2020 11:32:44 +0300 Subject: io_uring: add interface for getting files Preparation without functional changes. Adds io_get_file(), that allows to grab files not only into req->file. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 72 +++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 5f2c0afefae1..1a3de7337274 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1253,6 +1253,15 @@ fallback: return NULL; } +static inline void io_put_file(struct io_kiocb *req, struct file *file, + bool fixed) +{ + if (fixed) + percpu_ref_put(&req->ctx->file_data->refs); + else + fput(file); +} + static void __io_req_do_free(struct io_kiocb *req) { if (likely(!io_is_fallback_req(req))) @@ -1263,18 +1272,12 @@ static void __io_req_do_free(struct io_kiocb *req) static void __io_req_aux_free(struct io_kiocb *req) { - struct io_ring_ctx *ctx = req->ctx; - if (req->flags & REQ_F_NEED_CLEANUP) io_cleanup_req(req); kfree(req->io); - if (req->file) { - if (req->flags & REQ_F_FIXED_FILE) - percpu_ref_put(&ctx->file_data->refs); - else - fput(req->file); - } + if (req->file) + io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE)); io_req_work_drop_env(req); } @@ -1848,7 +1851,7 @@ static void io_file_put(struct io_submit_state *state) * assuming most submissions are for one file, or at least that each file * has more than one submission. */ -static struct file *io_file_get(struct io_submit_state *state, int fd) +static struct file *__io_file_get(struct io_submit_state *state, int fd) { if (!state) return fget(fd); @@ -4578,41 +4581,52 @@ static inline struct file *io_file_from_index(struct io_ring_ctx *ctx, return table->files[index & IORING_FILE_TABLE_MASK];; } -static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req, - const struct io_uring_sqe *sqe) +static int io_file_get(struct io_submit_state *state, struct io_kiocb *req, + int fd, struct file **out_file, bool fixed) { struct io_ring_ctx *ctx = req->ctx; - unsigned flags; - int fd; - - flags = READ_ONCE(sqe->flags); - fd = READ_ONCE(sqe->fd); - - if (!io_req_needs_file(req, fd)) - return 0; + struct file *file; - if (flags & IOSQE_FIXED_FILE) { + if (fixed) { if (unlikely(!ctx->file_data || (unsigned) fd >= ctx->nr_user_files)) return -EBADF; fd = array_index_nospec(fd, ctx->nr_user_files); - req->file = io_file_from_index(ctx, fd); - if (!req->file) + file = io_file_from_index(ctx, fd); + if (!file) return -EBADF; - req->flags |= REQ_F_FIXED_FILE; percpu_ref_get(&ctx->file_data->refs); } else { - if (req->needs_fixed_file) - return -EBADF; trace_io_uring_file_get(ctx, fd); - req->file = io_file_get(state, fd); - if (unlikely(!req->file)) + file = __io_file_get(state, fd); + if (unlikely(!file)) return -EBADF; } + *out_file = file; return 0; } +static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + unsigned flags; + int fd; + bool fixed; + + flags = READ_ONCE(sqe->flags); + fd = READ_ONCE(sqe->fd); + + if (!io_req_needs_file(req, fd)) + return 0; + + fixed = (flags & IOSQE_FIXED_FILE); + if (unlikely(!fixed && req->needs_fixed_file)) + return -EBADF; + + return io_file_get(state, req, fd, &req->file, fixed); +} + static int io_grab_files(struct io_kiocb *req) { int ret = -EBADF; @@ -4857,8 +4871,8 @@ static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, } /* same numerical values with corresponding REQ_F_*, safe to copy */ - req->flags |= sqe_flags & (IOSQE_IO_DRAIN|IOSQE_IO_HARDLINK| - IOSQE_ASYNC); + req->flags |= sqe_flags & (IOSQE_IO_DRAIN | IOSQE_IO_HARDLINK | + IOSQE_ASYNC | IOSQE_FIXED_FILE); ret = io_req_set_file(state, req, sqe); if (unlikely(ret)) { -- cgit From 7d67af2c013402537385dae343a2d0f6a4cb3bfd Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 24 Feb 2020 11:32:45 +0300 Subject: io_uring: add splice(2) support Add support for splice(2). - output file is specified as sqe->fd, so it's handled by generic code - hash_reg_file handled by generic code as well - len is 32bit, but should be fine - the fd_in is registered file, when SPLICE_F_FD_IN_FIXED is set, which is a splice flag (i.e. sqe->splice_flags). Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 109 ++++++++++++++++++++++++++++++++++++++++++ include/uapi/linux/io_uring.h | 14 +++++- 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 1a3de7337274..1ef20a2af10b 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -76,6 +76,7 @@ #include #include #include +#include #define CREATE_TRACE_POINTS #include @@ -428,6 +429,15 @@ struct io_epoll { struct epoll_event event; }; +struct io_splice { + struct file *file_out; + struct file *file_in; + loff_t off_out; + loff_t off_in; + u64 len; + unsigned int flags; +}; + struct io_async_connect { struct sockaddr_storage address; }; @@ -544,6 +554,7 @@ struct io_kiocb { struct io_fadvise fadvise; struct io_madvise madvise; struct io_epoll epoll; + struct io_splice splice; }; struct io_async_ctx *io; @@ -744,6 +755,11 @@ static const struct io_op_def io_op_defs[] = { .unbound_nonreg_file = 1, .file_table = 1, }, + [IORING_OP_SPLICE] = { + .needs_file = 1, + .hash_reg_file = 1, + .unbound_nonreg_file = 1, + } }; static void io_wq_submit_work(struct io_wq_work **workptr); @@ -758,6 +774,10 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, static int io_grab_files(struct io_kiocb *req); static void io_ring_file_ref_flush(struct fixed_file_data *data); static void io_cleanup_req(struct io_kiocb *req); +static int io_file_get(struct io_submit_state *state, + struct io_kiocb *req, + int fd, struct file **out_file, + bool fixed); static struct kmem_cache *req_cachep; @@ -2404,6 +2424,77 @@ out_free: return ret; } +static int io_splice_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_splice* sp = &req->splice; + unsigned int valid_flags = SPLICE_F_FD_IN_FIXED | SPLICE_F_ALL; + int ret; + + if (req->flags & REQ_F_NEED_CLEANUP) + return 0; + + sp->file_in = NULL; + sp->off_in = READ_ONCE(sqe->splice_off_in); + sp->off_out = READ_ONCE(sqe->off); + sp->len = READ_ONCE(sqe->len); + sp->flags = READ_ONCE(sqe->splice_flags); + + if (unlikely(sp->flags & ~valid_flags)) + return -EINVAL; + + ret = io_file_get(NULL, req, READ_ONCE(sqe->splice_fd_in), &sp->file_in, + (sp->flags & SPLICE_F_FD_IN_FIXED)); + if (ret) + return ret; + req->flags |= REQ_F_NEED_CLEANUP; + + if (!S_ISREG(file_inode(sp->file_in)->i_mode)) + req->work.flags |= IO_WQ_WORK_UNBOUND; + + return 0; +} + +static bool io_splice_punt(struct file *file) +{ + if (get_pipe_info(file)) + return false; + if (!io_file_supports_async(file)) + return true; + return !(file->f_mode & O_NONBLOCK); +} + +static int io_splice(struct io_kiocb *req, struct io_kiocb **nxt, + bool force_nonblock) +{ + struct io_splice *sp = &req->splice; + struct file *in = sp->file_in; + struct file *out = sp->file_out; + unsigned int flags = sp->flags & ~SPLICE_F_FD_IN_FIXED; + loff_t *poff_in, *poff_out; + long ret; + + if (force_nonblock) { + if (io_splice_punt(in) || io_splice_punt(out)) + return -EAGAIN; + flags |= SPLICE_F_NONBLOCK; + } + + poff_in = (sp->off_in == -1) ? NULL : &sp->off_in; + poff_out = (sp->off_out == -1) ? NULL : &sp->off_out; + ret = do_splice(in, poff_in, out, poff_out, sp->len, flags); + if (force_nonblock && ret == -EAGAIN) + return -EAGAIN; + + io_put_file(req, in, (sp->flags & SPLICE_F_FD_IN_FIXED)); + req->flags &= ~REQ_F_NEED_CLEANUP; + + io_cqring_add_event(req, ret); + if (ret != sp->len) + req_set_fail_links(req); + io_put_req_find_next(req, nxt); + return 0; +} + /* * IORING_OP_NOP just posts a completion event, nothing else. */ @@ -4230,6 +4321,9 @@ static int io_req_defer_prep(struct io_kiocb *req, case IORING_OP_EPOLL_CTL: ret = io_epoll_ctl_prep(req, sqe); break; + case IORING_OP_SPLICE: + ret = io_splice_prep(req, sqe); + break; default: printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n", req->opcode); @@ -4292,6 +4386,10 @@ static void io_cleanup_req(struct io_kiocb *req) case IORING_OP_STATX: putname(req->open.filename); break; + case IORING_OP_SPLICE: + io_put_file(req, req->splice.file_in, + (req->splice.flags & SPLICE_F_FD_IN_FIXED)); + break; } req->flags &= ~REQ_F_NEED_CLEANUP; @@ -4495,6 +4593,14 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, } ret = io_epoll_ctl(req, nxt, force_nonblock); break; + case IORING_OP_SPLICE: + if (sqe) { + ret = io_splice_prep(req, sqe); + if (ret < 0) + break; + } + ret = io_splice(req, nxt, force_nonblock); + break; default: ret = -EINVAL; break; @@ -7230,6 +7336,7 @@ static int __init io_uring_init(void) BUILD_BUG_SQE_ELEM(8, __u64, off); BUILD_BUG_SQE_ELEM(8, __u64, addr2); BUILD_BUG_SQE_ELEM(16, __u64, addr); + BUILD_BUG_SQE_ELEM(16, __u64, splice_off_in); BUILD_BUG_SQE_ELEM(24, __u32, len); BUILD_BUG_SQE_ELEM(28, __kernel_rwf_t, rw_flags); BUILD_BUG_SQE_ELEM(28, /* compat */ int, rw_flags); @@ -7244,9 +7351,11 @@ static int __init io_uring_init(void) BUILD_BUG_SQE_ELEM(28, __u32, open_flags); BUILD_BUG_SQE_ELEM(28, __u32, statx_flags); BUILD_BUG_SQE_ELEM(28, __u32, fadvise_advice); + BUILD_BUG_SQE_ELEM(28, __u32, splice_flags); BUILD_BUG_SQE_ELEM(32, __u64, user_data); BUILD_BUG_SQE_ELEM(40, __u16, buf_index); BUILD_BUG_SQE_ELEM(42, __u16, personality); + BUILD_BUG_SQE_ELEM(44, __s32, splice_fd_in); BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST); req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC); diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 3f7961c1c243..08891cc1c1e7 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -23,7 +23,10 @@ struct io_uring_sqe { __u64 off; /* offset into file */ __u64 addr2; }; - __u64 addr; /* pointer to buffer or iovecs */ + union { + __u64 addr; /* pointer to buffer or iovecs */ + __u64 splice_off_in; + }; __u32 len; /* buffer size or number of iovecs */ union { __kernel_rwf_t rw_flags; @@ -37,6 +40,7 @@ struct io_uring_sqe { __u32 open_flags; __u32 statx_flags; __u32 fadvise_advice; + __u32 splice_flags; }; __u64 user_data; /* data to be passed back at completion time */ union { @@ -45,6 +49,7 @@ struct io_uring_sqe { __u16 buf_index; /* personality to use, if used */ __u16 personality; + __s32 splice_fd_in; }; __u64 __pad2[3]; }; @@ -113,6 +118,7 @@ enum { IORING_OP_RECV, IORING_OP_OPENAT2, IORING_OP_EPOLL_CTL, + IORING_OP_SPLICE, /* this goes last, obviously */ IORING_OP_LAST, @@ -128,6 +134,12 @@ enum { */ #define IORING_TIMEOUT_ABS (1U << 0) +/* + * sqe->splice_flags + * extends splice(2) flags + */ +#define SPLICE_F_FD_IN_FIXED (1U << 31) /* the last bit of __u32 */ + /* * IO completion data structure (Completion Queue Entry) */ -- cgit From b0a20349f212dc725f5ddfd060e426fe6181d9c5 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Fri, 28 Feb 2020 10:36:35 +0300 Subject: io_uring: clean io_poll_complete Deduplicate call to io_cqring_fill_event(), plain and easy Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 1ef20a2af10b..f4c6661b33bc 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3641,10 +3641,7 @@ static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error) struct io_ring_ctx *ctx = req->ctx; req->poll.done = true; - if (error) - io_cqring_fill_event(req, error); - else - io_cqring_fill_event(req, mangle_poll(mask)); + io_cqring_fill_event(req, error ? error : mangle_poll(mask)); io_commit_cqring(ctx); } -- cgit From 02d27d895323c4baa3234e4bed015eb3a196e1dd Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Fri, 28 Feb 2020 10:36:36 +0300 Subject: io_uring: extract kmsg copy helper io_recvmsg() and io_sendmsg() duplicate nonblock -EAGAIN finilising part, so add helper for that. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 43 +++++++++++++++++++------------------------ 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index f4c6661b33bc..2a8d88c9bcab 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3094,6 +3094,21 @@ static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt, return 0; } +static int io_setup_async_msg(struct io_kiocb *req, + struct io_async_msghdr *kmsg) +{ + if (req->io) + return -EAGAIN; + if (io_alloc_async_ctx(req)) { + if (kmsg->iov != kmsg->fast_iov) + kfree(kmsg->iov); + return -ENOMEM; + } + req->flags |= REQ_F_NEED_CLEANUP; + memcpy(&req->io->msg, kmsg, sizeof(*kmsg)); + return -EAGAIN; +} + static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { #if defined(CONFIG_NET) @@ -3170,18 +3185,8 @@ static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt, flags |= MSG_DONTWAIT; ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags); - if (force_nonblock && ret == -EAGAIN) { - if (req->io) - return -EAGAIN; - if (io_alloc_async_ctx(req)) { - if (kmsg->iov != kmsg->fast_iov) - kfree(kmsg->iov); - return -ENOMEM; - } - req->flags |= REQ_F_NEED_CLEANUP; - memcpy(&req->io->msg, &io.msg, sizeof(io.msg)); - return -EAGAIN; - } + if (force_nonblock && ret == -EAGAIN) + return io_setup_async_msg(req, kmsg); if (ret == -ERESTARTSYS) ret = -EINTR; } @@ -3329,18 +3334,8 @@ static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt, ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg, kmsg->uaddr, flags); - if (force_nonblock && ret == -EAGAIN) { - if (req->io) - return -EAGAIN; - if (io_alloc_async_ctx(req)) { - if (kmsg->iov != kmsg->fast_iov) - kfree(kmsg->iov); - return -ENOMEM; - } - memcpy(&req->io->msg, &io.msg, sizeof(io.msg)); - req->flags |= REQ_F_NEED_CLEANUP; - return -EAGAIN; - } + if (force_nonblock && ret == -EAGAIN) + return io_setup_async_msg(req, kmsg); if (ret == -ERESTARTSYS) ret = -EINTR; } -- cgit From e85530ddda4f08d4f9ed6506d4a1f42e086e3b21 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Fri, 28 Feb 2020 10:36:37 +0300 Subject: io-wq: remove unused IO_WQ_WORK_HAS_MM IO_WQ_WORK_HAS_MM is set but never used, remove it. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 2 -- fs/io-wq.h | 1 - 2 files changed, 3 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 5cef075c0b37..39ed8751ea31 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -499,8 +499,6 @@ next: */ if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; - if (worker->mm) - work->flags |= IO_WQ_WORK_HAS_MM; if (wq->get_work) { put_work = work; diff --git a/fs/io-wq.h b/fs/io-wq.h index e5e15f2c93ec..d500d88ab84e 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -5,7 +5,6 @@ struct io_wq; enum { IO_WQ_WORK_CANCEL = 1, - IO_WQ_WORK_HAS_MM = 2, IO_WQ_WORK_HASHED = 4, IO_WQ_WORK_UNBOUND = 32, IO_WQ_WORK_CB = 128, -- cgit From 5eae8619907a1389dbd1b4a1049caf52782c0916 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Fri, 28 Feb 2020 10:36:38 +0300 Subject: io_uring: remove IO_WQ_WORK_CB IO_WQ_WORK_CB is used only for linked timeouts, which will be armed before the work setup (i.e. mm, override creds, etc). The setup shouldn't take long, so it's ok to arm it a bit later and get rid of IO_WQ_WORK_CB. Make io-wq call work->func() only once, callbacks will handle the rest. i.e. the linked timeout handler will do the actual issue. And as a bonus, it removes an extra indirect call. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 3 --- fs/io-wq.h | 1 - fs/io_uring.c | 3 +-- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 39ed8751ea31..a1a42ead3b5a 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -479,9 +479,6 @@ next: worker->cur_work = work; spin_unlock_irq(&worker->lock); - if (work->flags & IO_WQ_WORK_CB) - work->func(&work); - if (work->files && current->files != work->files) { task_lock(current); current->files = work->files; diff --git a/fs/io-wq.h b/fs/io-wq.h index d500d88ab84e..a0978d6958f0 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -7,7 +7,6 @@ enum { IO_WQ_WORK_CANCEL = 1, IO_WQ_WORK_HASHED = 4, IO_WQ_WORK_UNBOUND = 32, - IO_WQ_WORK_CB = 128, IO_WQ_WORK_NO_CANCEL = 256, IO_WQ_WORK_CONCURRENT = 512, diff --git a/fs/io_uring.c b/fs/io_uring.c index 2a8d88c9bcab..f999503854b7 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2549,7 +2549,7 @@ static void io_link_work_cb(struct io_wq_work **workptr) struct io_kiocb *link = work->data; io_queue_linked_timeout(link); - work->func = io_wq_submit_work; + io_wq_submit_work(workptr); } static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) @@ -2559,7 +2559,6 @@ static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) io_prep_next_work(nxt, &link); *workptr = &nxt->work; if (link) { - nxt->work.flags |= IO_WQ_WORK_CB; nxt->work.func = io_link_work_cb; nxt->work.data = link; } -- cgit From 3684f24653534c71c7dc9f44d7281a838f4e4979 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Fri, 28 Feb 2020 10:36:39 +0300 Subject: io-wq: use BIT for ulong hash @hash_map is unsigned long, but BIT_ULL() is used for manipulations. BIT() is a better match as it returns exactly unsigned long value. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index a1a42ead3b5a..042c7e2057ef 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -393,8 +393,8 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) /* hashed, can run if not already running */ *hash = work->flags >> IO_WQ_HASH_SHIFT; - if (!(wqe->hash_map & BIT_ULL(*hash))) { - wqe->hash_map |= BIT_ULL(*hash); + if (!(wqe->hash_map & BIT(*hash))) { + wqe->hash_map |= BIT(*hash); wq_node_del(&wqe->work_list, node, prev); return work; } @@ -512,7 +512,7 @@ next: spin_lock_irq(&wqe->lock); if (hash != -1U) { - wqe->hash_map &= ~BIT_ULL(hash); + wqe->hash_map &= ~BIT(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; } if (work && work != old_work) { -- cgit From 6fb614920b38bbf3c1c7fcd944c6d9b5d746103d Mon Sep 17 00:00:00 2001 From: Oleg Nesterov Date: Tue, 18 Feb 2020 16:50:18 +0100 Subject: task_work_run: don't take ->pi_lock unconditionally As Peter pointed out, task_work() can avoid ->pi_lock and cmpxchg() if task->task_works == NULL && !PF_EXITING. And in fact the only reason why task_work_run() needs ->pi_lock is the possible race with task_work_cancel(), we can optimize this code and make the locking more clear. Signed-off-by: Oleg Nesterov Signed-off-by: Jens Axboe --- kernel/task_work.c | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/kernel/task_work.c b/kernel/task_work.c index 0fef395662a6..825f28259a19 100644 --- a/kernel/task_work.c +++ b/kernel/task_work.c @@ -97,16 +97,26 @@ void task_work_run(void) * work->func() can do task_work_add(), do not set * work_exited unless the list is empty. */ - raw_spin_lock_irq(&task->pi_lock); do { + head = NULL; work = READ_ONCE(task->task_works); - head = !work && (task->flags & PF_EXITING) ? - &work_exited : NULL; + if (!work) { + if (task->flags & PF_EXITING) + head = &work_exited; + else + break; + } } while (cmpxchg(&task->task_works, work, head) != work); - raw_spin_unlock_irq(&task->pi_lock); if (!work) break; + /* + * Synchronize with task_work_cancel(). It can not remove + * the first entry == work, cmpxchg(task_works) must fail. + * But it can remove another entry from the ->next list. + */ + raw_spin_lock_irq(&task->pi_lock); + raw_spin_unlock_irq(&task->pi_lock); do { next = work->next; -- cgit From c2f2eb7d2c1cdc37fa9633bae96f381d33ee7a14 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 10 Feb 2020 09:07:05 -0700 Subject: io_uring: store io_kiocb in wait->private Store the io_kiocb in the private field instead of the poll entry, this is in preparation for allowing multiple waitqueues. No functional changes in this patch. Signed-off-by: Jens Axboe --- fs/io_uring.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index f999503854b7..7a97a6c1c09e 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3730,8 +3730,8 @@ static void io_poll_trigger_evfd(struct io_wq_work **workptr) static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { - struct io_poll_iocb *poll = wait->private; - struct io_kiocb *req = container_of(poll, struct io_kiocb, poll); + struct io_kiocb *req = wait->private; + struct io_poll_iocb *poll = &req->poll; struct io_ring_ctx *ctx = req->ctx; __poll_t mask = key_to_poll(key); @@ -3854,7 +3854,7 @@ static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt) /* initialized the list so that we can do list_empty checks */ INIT_LIST_HEAD(&poll->wait.entry); init_waitqueue_func_entry(&poll->wait, io_poll_wake); - poll->wait.private = poll; + poll->wait.private = req; INIT_LIST_HEAD(&req->list); -- cgit From b41e98524e424d104aa7851d54fd65820759875a Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 17 Feb 2020 09:52:41 -0700 Subject: io_uring: add per-task callback handler For poll requests, it's not uncommon to link a read (or write) after the poll to execute immediately after the file is marked as ready. Since the poll completion is called inside the waitqueue wake up handler, we have to punt that linked request to async context. This slows down the processing, and actually means it's faster to not use a link for this use case. We also run into problems if the completion_lock is contended, as we're doing a different lock ordering than the issue side is. Hence we have to do trylock for completion, and if that fails, go async. Poll removal needs to go async as well, for the same reason. eventfd notification needs special case as well, to avoid stack blowing recursion or deadlocks. These are all deficiencies that were inherited from the aio poll implementation, but I think we can do better. When a poll completes, simply queue it up in the task poll list. When the task completes the list, we can run dependent links inline as well. This means we never have to go async, and we can remove a bunch of code associated with that, and optimizations to try and make that run faster. The diffstat speaks for itself. Signed-off-by: Jens Axboe --- fs/io_uring.c | 218 ++++++++++++++++++++-------------------------------------- 1 file changed, 76 insertions(+), 142 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 7a97a6c1c09e..a16b5632ce6f 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -77,6 +77,7 @@ #include #include #include +#include #define CREATE_TRACE_POINTS #include @@ -291,7 +292,6 @@ struct io_ring_ctx { struct { spinlock_t completion_lock; - struct llist_head poll_llist; /* * ->poll_list is protected by the ctx->uring_lock for @@ -558,10 +558,6 @@ struct io_kiocb { }; struct io_async_ctx *io; - /* - * llist_node is only used for poll deferred completions - */ - struct llist_node llist_node; bool needs_fixed_file; u8 opcode; @@ -579,7 +575,17 @@ struct io_kiocb { struct list_head inflight_entry; - struct io_wq_work work; + union { + /* + * Only commands that never go async can use the below fields, + * obviously. Right now only IORING_OP_POLL_ADD uses them. + */ + struct { + struct task_struct *task; + struct callback_head task_work; + }; + struct io_wq_work work; + }; }; #define IO_PLUG_THRESHOLD 2 @@ -774,10 +780,10 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, static int io_grab_files(struct io_kiocb *req); static void io_ring_file_ref_flush(struct fixed_file_data *data); static void io_cleanup_req(struct io_kiocb *req); -static int io_file_get(struct io_submit_state *state, - struct io_kiocb *req, - int fd, struct file **out_file, - bool fixed); +static int io_file_get(struct io_submit_state *state, struct io_kiocb *req, + int fd, struct file **out_file, bool fixed); +static void __io_queue_sqe(struct io_kiocb *req, + const struct io_uring_sqe *sqe); static struct kmem_cache *req_cachep; @@ -848,7 +854,6 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->wait); spin_lock_init(&ctx->completion_lock); - init_llist_head(&ctx->poll_llist); INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); @@ -1081,24 +1086,19 @@ static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx) return false; if (!ctx->eventfd_async) return true; - return io_wq_current_is_worker() || in_interrupt(); + return io_wq_current_is_worker(); } -static void __io_cqring_ev_posted(struct io_ring_ctx *ctx, bool trigger_ev) +static void io_cqring_ev_posted(struct io_ring_ctx *ctx) { if (waitqueue_active(&ctx->wait)) wake_up(&ctx->wait); if (waitqueue_active(&ctx->sqo_wait)) wake_up(&ctx->sqo_wait); - if (trigger_ev) + if (io_should_trigger_evfd(ctx)) eventfd_signal(ctx->cq_ev_fd, 1); } -static void io_cqring_ev_posted(struct io_ring_ctx *ctx) -{ - __io_cqring_ev_posted(ctx, io_should_trigger_evfd(ctx)); -} - /* Returns true if there are no backlogged entries after the flush */ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) { @@ -3548,18 +3548,27 @@ out: #endif } -static void io_poll_remove_one(struct io_kiocb *req) +static bool io_poll_remove_one(struct io_kiocb *req) { struct io_poll_iocb *poll = &req->poll; + bool do_complete = false; spin_lock(&poll->head->lock); WRITE_ONCE(poll->canceled, true); if (!list_empty(&poll->wait.entry)) { list_del_init(&poll->wait.entry); - io_queue_async_work(req); + do_complete = true; } spin_unlock(&poll->head->lock); hash_del(&req->hash_node); + if (do_complete) { + io_cqring_fill_event(req, -ECANCELED); + io_commit_cqring(req->ctx); + req->flags |= REQ_F_COMP_LOCKED; + io_put_req(req); + } + + return do_complete; } static void io_poll_remove_all(struct io_ring_ctx *ctx) @@ -3577,6 +3586,8 @@ static void io_poll_remove_all(struct io_ring_ctx *ctx) io_poll_remove_one(req); } spin_unlock_irq(&ctx->completion_lock); + + io_cqring_ev_posted(ctx); } static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr) @@ -3586,10 +3597,11 @@ static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr) list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)]; hlist_for_each_entry(req, list, hash_node) { - if (sqe_addr == req->user_data) { - io_poll_remove_one(req); + if (sqe_addr != req->user_data) + continue; + if (io_poll_remove_one(req)) return 0; - } + return -EALREADY; } return -ENOENT; @@ -3639,92 +3651,28 @@ static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error) io_commit_cqring(ctx); } -static void io_poll_complete_work(struct io_wq_work **workptr) +static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt) { - struct io_wq_work *work = *workptr; - struct io_kiocb *req = container_of(work, struct io_kiocb, work); - struct io_poll_iocb *poll = &req->poll; - struct poll_table_struct pt = { ._key = poll->events }; struct io_ring_ctx *ctx = req->ctx; - struct io_kiocb *nxt = NULL; - __poll_t mask = 0; - int ret = 0; - - if (work->flags & IO_WQ_WORK_CANCEL) { - WRITE_ONCE(poll->canceled, true); - ret = -ECANCELED; - } else if (READ_ONCE(poll->canceled)) { - ret = -ECANCELED; - } - - if (ret != -ECANCELED) - mask = vfs_poll(poll->file, &pt) & poll->events; - /* - * Note that ->ki_cancel callers also delete iocb from active_reqs after - * calling ->ki_cancel. We need the ctx_lock roundtrip here to - * synchronize with them. In the cancellation case the list_del_init - * itself is not actually needed, but harmless so we keep it in to - * avoid further branches in the fast path. - */ spin_lock_irq(&ctx->completion_lock); - if (!mask && ret != -ECANCELED) { - add_wait_queue(poll->head, &poll->wait); - spin_unlock_irq(&ctx->completion_lock); - return; - } hash_del(&req->hash_node); - io_poll_complete(req, mask, ret); - spin_unlock_irq(&ctx->completion_lock); - - io_cqring_ev_posted(ctx); - - if (ret < 0) - req_set_fail_links(req); - io_put_req_find_next(req, &nxt); - if (nxt) - io_wq_assign_next(workptr, nxt); -} - -static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes) -{ - struct io_kiocb *req, *tmp; - struct req_batch rb; - - rb.to_free = rb.need_iter = 0; - spin_lock_irq(&ctx->completion_lock); - llist_for_each_entry_safe(req, tmp, nodes, llist_node) { - hash_del(&req->hash_node); - io_poll_complete(req, req->result, 0); - - if (refcount_dec_and_test(&req->refs) && - !io_req_multi_free(&rb, req)) { - req->flags |= REQ_F_COMP_LOCKED; - io_free_req(req); - } - } + io_poll_complete(req, req->result, 0); + req->flags |= REQ_F_COMP_LOCKED; + io_put_req_find_next(req, nxt); spin_unlock_irq(&ctx->completion_lock); io_cqring_ev_posted(ctx); - io_free_req_many(ctx, &rb); -} - -static void io_poll_flush(struct io_wq_work **workptr) -{ - struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct llist_node *nodes; - - nodes = llist_del_all(&req->ctx->poll_llist); - if (nodes) - __io_poll_flush(req->ctx, nodes); } -static void io_poll_trigger_evfd(struct io_wq_work **workptr) +static void io_poll_task_func(struct callback_head *cb) { - struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); + struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct io_kiocb *nxt = NULL; - eventfd_signal(req->ctx->cq_ev_fd, 1); - io_put_req(req); + io_poll_task_handler(req, &nxt); + if (nxt) + __io_queue_sqe(nxt, NULL); } static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, @@ -3732,8 +3680,8 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, { struct io_kiocb *req = wait->private; struct io_poll_iocb *poll = &req->poll; - struct io_ring_ctx *ctx = req->ctx; __poll_t mask = key_to_poll(key); + struct task_struct *tsk; /* for instances that support it check for an event match first: */ if (mask && !(mask & poll->events)) @@ -3741,46 +3689,11 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, list_del_init(&poll->wait.entry); - /* - * Run completion inline if we can. We're using trylock here because - * we are violating the completion_lock -> poll wq lock ordering. - * If we have a link timeout we're going to need the completion_lock - * for finalizing the request, mark us as having grabbed that already. - */ - if (mask) { - unsigned long flags; - - if (llist_empty(&ctx->poll_llist) && - spin_trylock_irqsave(&ctx->completion_lock, flags)) { - bool trigger_ev; - - hash_del(&req->hash_node); - io_poll_complete(req, mask, 0); - - trigger_ev = io_should_trigger_evfd(ctx); - if (trigger_ev && eventfd_signal_count()) { - trigger_ev = false; - req->work.func = io_poll_trigger_evfd; - } else { - req->flags |= REQ_F_COMP_LOCKED; - io_put_req(req); - req = NULL; - } - spin_unlock_irqrestore(&ctx->completion_lock, flags); - __io_cqring_ev_posted(ctx, trigger_ev); - } else { - req->result = mask; - req->llist_node.next = NULL; - /* if the list wasn't empty, we're done */ - if (!llist_add(&req->llist_node, &ctx->poll_llist)) - req = NULL; - else - req->work.func = io_poll_flush; - } - } - if (req) - io_queue_async_work(req); - + tsk = req->task; + req->result = mask; + init_task_work(&req->task_work, io_poll_task_func); + task_work_add(tsk, &req->task_work, true); + wake_up_process(tsk); return 1; } @@ -3828,6 +3741,9 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe events = READ_ONCE(sqe->poll_events); poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; + + /* task will wait for requests on exit, don't need a ref */ + req->task = current; return 0; } @@ -3839,7 +3755,6 @@ static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt) bool cancel = false; __poll_t mask; - INIT_IO_WORK(&req->work, io_poll_complete_work); INIT_HLIST_NODE(&req->hash_node); poll->head = NULL; @@ -5268,6 +5183,8 @@ static int io_sq_thread(void *data) if (!list_empty(&ctx->poll_list) || (!time_after(jiffies, timeout) && ret != -EBUSY && !percpu_ref_is_dying(&ctx->refs))) { + if (current->task_works) + task_work_run(); cond_resched(); continue; } @@ -5299,6 +5216,10 @@ static int io_sq_thread(void *data) finish_wait(&ctx->sqo_wait, &wait); break; } + if (current->task_works) { + task_work_run(); + continue; + } if (signal_pending(current)) flush_signals(current); schedule(); @@ -5318,6 +5239,9 @@ static int io_sq_thread(void *data) timeout = jiffies + ctx->sq_thread_idle; } + if (current->task_works) + task_work_run(); + set_fs(old_fs); if (cur_mm) { unuse_mm(cur_mm); @@ -5382,8 +5306,13 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, struct io_rings *rings = ctx->rings; int ret = 0; - if (io_cqring_events(ctx, false) >= min_events) - return 0; + do { + if (io_cqring_events(ctx, false) >= min_events) + return 0; + if (!current->task_works) + break; + task_work_run(); + } while (1); if (sig) { #ifdef CONFIG_COMPAT @@ -5403,6 +5332,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, do { prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE); + if (current->task_works) + task_work_run(); if (io_should_wake(&iowq, false)) break; schedule(); @@ -6711,6 +6642,9 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, int submitted = 0; struct fd f; + if (current->task_works) + task_work_run(); + if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP)) return -EINVAL; -- cgit From 8a72758c51f8a5501a0e01ea95069630edb9ca07 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 20 Feb 2020 09:59:44 -0700 Subject: io_uring: mark requests that we can do poll async in io_op_defs Add a pollin/pollout field to the request table, and have commands that we can safely poll for properly marked. Signed-off-by: Jens Axboe --- fs/io_uring.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/fs/io_uring.c b/fs/io_uring.c index a16b5632ce6f..0d973de75127 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -629,6 +629,9 @@ struct io_op_def { unsigned file_table : 1; /* needs ->fs */ unsigned needs_fs : 1; + /* set if opcode supports polled "wait" */ + unsigned pollin : 1; + unsigned pollout : 1; }; static const struct io_op_def io_op_defs[] = { @@ -638,6 +641,7 @@ static const struct io_op_def io_op_defs[] = { .needs_mm = 1, .needs_file = 1, .unbound_nonreg_file = 1, + .pollin = 1, }, [IORING_OP_WRITEV] = { .async_ctx = 1, @@ -645,6 +649,7 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .hash_reg_file = 1, .unbound_nonreg_file = 1, + .pollout = 1, }, [IORING_OP_FSYNC] = { .needs_file = 1, @@ -652,11 +657,13 @@ static const struct io_op_def io_op_defs[] = { [IORING_OP_READ_FIXED] = { .needs_file = 1, .unbound_nonreg_file = 1, + .pollin = 1, }, [IORING_OP_WRITE_FIXED] = { .needs_file = 1, .hash_reg_file = 1, .unbound_nonreg_file = 1, + .pollout = 1, }, [IORING_OP_POLL_ADD] = { .needs_file = 1, @@ -672,6 +679,7 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .unbound_nonreg_file = 1, .needs_fs = 1, + .pollout = 1, }, [IORING_OP_RECVMSG] = { .async_ctx = 1, @@ -679,6 +687,7 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .unbound_nonreg_file = 1, .needs_fs = 1, + .pollin = 1, }, [IORING_OP_TIMEOUT] = { .async_ctx = 1, @@ -690,6 +699,7 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .unbound_nonreg_file = 1, .file_table = 1, + .pollin = 1, }, [IORING_OP_ASYNC_CANCEL] = {}, [IORING_OP_LINK_TIMEOUT] = { @@ -701,6 +711,7 @@ static const struct io_op_def io_op_defs[] = { .needs_mm = 1, .needs_file = 1, .unbound_nonreg_file = 1, + .pollout = 1, }, [IORING_OP_FALLOCATE] = { .needs_file = 1, @@ -729,11 +740,13 @@ static const struct io_op_def io_op_defs[] = { .needs_mm = 1, .needs_file = 1, .unbound_nonreg_file = 1, + .pollin = 1, }, [IORING_OP_WRITE] = { .needs_mm = 1, .needs_file = 1, .unbound_nonreg_file = 1, + .pollout = 1, }, [IORING_OP_FADVISE] = { .needs_file = 1, @@ -745,11 +758,13 @@ static const struct io_op_def io_op_defs[] = { .needs_mm = 1, .needs_file = 1, .unbound_nonreg_file = 1, + .pollout = 1, }, [IORING_OP_RECV] = { .needs_mm = 1, .needs_file = 1, .unbound_nonreg_file = 1, + .pollin = 1, }, [IORING_OP_OPENAT2] = { .needs_file = 1, -- cgit From d7718a9d25a61442da8ee8aeeff6a0097f0ccfd6 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 14 Feb 2020 22:23:12 -0700 Subject: io_uring: use poll driven retry for files that support it Currently io_uring tries any request in a non-blocking manner, if it can, and then retries from a worker thread if we get -EAGAIN. Now that we have a new and fancy poll based retry backend, use that to retry requests if the file supports it. This means that, for example, an IORING_OP_RECVMSG on a socket no longer requires an async thread to complete the IO. If we get -EAGAIN reading from the socket in a non-blocking manner, we arm a poll handler for notification on when the socket becomes readable. When it does, the pending read is executed directly by the task again, through the io_uring task work handlers. Not only is this faster and more efficient, it also means we're not generating potentially tons of async threads that just sit and block, waiting for the IO to complete. The feature is marked with IORING_FEAT_FAST_POLL, meaning that async pollable IO is fast, and that pollother_op is fast as well. Signed-off-by: Jens Axboe --- fs/io_uring.c | 354 ++++++++++++++++++++++++++++++---------- include/trace/events/io_uring.h | 103 ++++++++++++ include/uapi/linux/io_uring.h | 1 + 3 files changed, 375 insertions(+), 83 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 0d973de75127..8c976fde40bd 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -487,6 +487,7 @@ enum { REQ_F_COMP_LOCKED_BIT, REQ_F_NEED_CLEANUP_BIT, REQ_F_OVERFLOW_BIT, + REQ_F_POLLED_BIT, }; enum { @@ -529,6 +530,13 @@ enum { REQ_F_NEED_CLEANUP = BIT(REQ_F_NEED_CLEANUP_BIT), /* in overflow list */ REQ_F_OVERFLOW = BIT(REQ_F_OVERFLOW_BIT), + /* already went through poll handler */ + REQ_F_POLLED = BIT(REQ_F_POLLED_BIT), +}; + +struct async_poll { + struct io_poll_iocb poll; + struct io_wq_work work; }; /* @@ -562,27 +570,29 @@ struct io_kiocb { u8 opcode; struct io_ring_ctx *ctx; - union { - struct list_head list; - struct hlist_node hash_node; - }; - struct list_head link_list; + struct list_head list; unsigned int flags; refcount_t refs; + struct task_struct *task; u64 user_data; u32 result; u32 sequence; + struct list_head link_list; + struct list_head inflight_entry; union { /* * Only commands that never go async can use the below fields, - * obviously. Right now only IORING_OP_POLL_ADD uses them. + * obviously. Right now only IORING_OP_POLL_ADD uses them, and + * async armed poll handlers for regular commands. The latter + * restore the work, if needed. */ struct { - struct task_struct *task; struct callback_head task_work; + struct hlist_node hash_node; + struct async_poll *apoll; }; struct io_wq_work work; }; @@ -3563,9 +3573,209 @@ out: #endif } -static bool io_poll_remove_one(struct io_kiocb *req) +struct io_poll_table { + struct poll_table_struct pt; + struct io_kiocb *req; + int error; +}; + +static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt, + struct wait_queue_head *head) +{ + if (unlikely(poll->head)) { + pt->error = -EINVAL; + return; + } + + pt->error = 0; + poll->head = head; + add_wait_queue(head, &poll->wait); +} + +static void io_async_queue_proc(struct file *file, struct wait_queue_head *head, + struct poll_table_struct *p) +{ + struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); + + __io_queue_proc(&pt->req->apoll->poll, pt, head); +} + +static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll, + __poll_t mask, task_work_func_t func) +{ + struct task_struct *tsk; + + /* for instances that support it check for an event match first: */ + if (mask && !(mask & poll->events)) + return 0; + + trace_io_uring_task_add(req->ctx, req->opcode, req->user_data, mask); + + list_del_init(&poll->wait.entry); + + tsk = req->task; + req->result = mask; + init_task_work(&req->task_work, func); + /* + * If this fails, then the task is exiting. If that is the case, then + * the exit check will ultimately cancel these work items. Hence we + * don't need to check here and handle it specifically. + */ + task_work_add(tsk, &req->task_work, true); + wake_up_process(tsk); + return 1; +} + +static void io_async_task_func(struct callback_head *cb) +{ + struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct async_poll *apoll = req->apoll; + struct io_ring_ctx *ctx = req->ctx; + + trace_io_uring_task_run(req->ctx, req->opcode, req->user_data); + + WARN_ON_ONCE(!list_empty(&req->apoll->poll.wait.entry)); + + if (hash_hashed(&req->hash_node)) { + spin_lock_irq(&ctx->completion_lock); + hash_del(&req->hash_node); + spin_unlock_irq(&ctx->completion_lock); + } + + /* restore ->work in case we need to retry again */ + memcpy(&req->work, &apoll->work, sizeof(req->work)); + + __set_current_state(TASK_RUNNING); + mutex_lock(&ctx->uring_lock); + __io_queue_sqe(req, NULL); + mutex_unlock(&ctx->uring_lock); + + kfree(apoll); +} + +static int io_async_wake(struct wait_queue_entry *wait, unsigned mode, int sync, + void *key) +{ + struct io_kiocb *req = wait->private; + struct io_poll_iocb *poll = &req->apoll->poll; + + trace_io_uring_poll_wake(req->ctx, req->opcode, req->user_data, + key_to_poll(key)); + + return __io_async_wake(req, poll, key_to_poll(key), io_async_task_func); +} + +static void io_poll_req_insert(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + struct hlist_head *list; + + list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)]; + hlist_add_head(&req->hash_node, list); +} + +static __poll_t __io_arm_poll_handler(struct io_kiocb *req, + struct io_poll_iocb *poll, + struct io_poll_table *ipt, __poll_t mask, + wait_queue_func_t wake_func) + __acquires(&ctx->completion_lock) +{ + struct io_ring_ctx *ctx = req->ctx; + bool cancel = false; + + poll->file = req->file; + poll->head = NULL; + poll->done = poll->canceled = false; + poll->events = mask; + + ipt->pt._key = mask; + ipt->req = req; + ipt->error = -EINVAL; + + INIT_LIST_HEAD(&poll->wait.entry); + init_waitqueue_func_entry(&poll->wait, wake_func); + poll->wait.private = req; + + mask = vfs_poll(req->file, &ipt->pt) & poll->events; + + spin_lock_irq(&ctx->completion_lock); + if (likely(poll->head)) { + spin_lock(&poll->head->lock); + if (unlikely(list_empty(&poll->wait.entry))) { + if (ipt->error) + cancel = true; + ipt->error = 0; + mask = 0; + } + if (mask || ipt->error) + list_del_init(&poll->wait.entry); + else if (cancel) + WRITE_ONCE(poll->canceled, true); + else if (!poll->done) /* actually waiting for an event */ + io_poll_req_insert(req); + spin_unlock(&poll->head->lock); + } + + return mask; +} + +static bool io_arm_poll_handler(struct io_kiocb *req) +{ + const struct io_op_def *def = &io_op_defs[req->opcode]; + struct io_ring_ctx *ctx = req->ctx; + struct async_poll *apoll; + struct io_poll_table ipt; + __poll_t mask, ret; + + if (!req->file || !file_can_poll(req->file)) + return false; + if (req->flags & (REQ_F_MUST_PUNT | REQ_F_POLLED)) + return false; + if (!def->pollin && !def->pollout) + return false; + + apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC); + if (unlikely(!apoll)) + return false; + + req->flags |= REQ_F_POLLED; + memcpy(&apoll->work, &req->work, sizeof(req->work)); + + /* + * Don't need a reference here, as we're adding it to the task + * task_works list. If the task exits, the list is pruned. + */ + req->task = current; + req->apoll = apoll; + INIT_HLIST_NODE(&req->hash_node); + + if (def->pollin) + mask = POLLIN | POLLRDNORM; + if (def->pollout) + mask |= POLLOUT | POLLWRNORM; + mask |= POLLERR | POLLPRI; + + ipt.pt._qproc = io_async_queue_proc; + + ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask, + io_async_wake); + if (ret) { + ipt.error = 0; + apoll->poll.done = true; + spin_unlock_irq(&ctx->completion_lock); + memcpy(&req->work, &apoll->work, sizeof(req->work)); + kfree(apoll); + return false; + } + spin_unlock_irq(&ctx->completion_lock); + trace_io_uring_poll_arm(ctx, req->opcode, req->user_data, mask, + apoll->poll.events); + return true; +} + +static bool __io_poll_remove_one(struct io_kiocb *req, + struct io_poll_iocb *poll) { - struct io_poll_iocb *poll = &req->poll; bool do_complete = false; spin_lock(&poll->head->lock); @@ -3575,7 +3785,24 @@ static bool io_poll_remove_one(struct io_kiocb *req) do_complete = true; } spin_unlock(&poll->head->lock); + return do_complete; +} + +static bool io_poll_remove_one(struct io_kiocb *req) +{ + bool do_complete; + + if (req->opcode == IORING_OP_POLL_ADD) { + do_complete = __io_poll_remove_one(req, &req->poll); + } else { + /* non-poll requests have submit ref still */ + do_complete = __io_poll_remove_one(req, &req->apoll->poll); + if (do_complete) + io_put_req(req); + } + hash_del(&req->hash_node); + if (do_complete) { io_cqring_fill_event(req, -ECANCELED); io_commit_cqring(req->ctx); @@ -3686,8 +3913,13 @@ static void io_poll_task_func(struct callback_head *cb) struct io_kiocb *nxt = NULL; io_poll_task_handler(req, &nxt); - if (nxt) + if (nxt) { + struct io_ring_ctx *ctx = nxt->ctx; + + mutex_lock(&ctx->uring_lock); __io_queue_sqe(nxt, NULL); + mutex_unlock(&ctx->uring_lock); + } } static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, @@ -3695,51 +3927,16 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, { struct io_kiocb *req = wait->private; struct io_poll_iocb *poll = &req->poll; - __poll_t mask = key_to_poll(key); - struct task_struct *tsk; - /* for instances that support it check for an event match first: */ - if (mask && !(mask & poll->events)) - return 0; - - list_del_init(&poll->wait.entry); - - tsk = req->task; - req->result = mask; - init_task_work(&req->task_work, io_poll_task_func); - task_work_add(tsk, &req->task_work, true); - wake_up_process(tsk); - return 1; + return __io_async_wake(req, poll, key_to_poll(key), io_poll_task_func); } -struct io_poll_table { - struct poll_table_struct pt; - struct io_kiocb *req; - int error; -}; - static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, struct poll_table_struct *p) { struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); - if (unlikely(pt->req->poll.head)) { - pt->error = -EINVAL; - return; - } - - pt->error = 0; - pt->req->poll.head = head; - add_wait_queue(head, &pt->req->poll.wait); -} - -static void io_poll_req_insert(struct io_kiocb *req) -{ - struct io_ring_ctx *ctx = req->ctx; - struct hlist_head *list; - - list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)]; - hlist_add_head(&req->hash_node, list); + __io_queue_proc(&pt->req->poll, pt, head); } static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) @@ -3757,7 +3954,10 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe events = READ_ONCE(sqe->poll_events); poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; - /* task will wait for requests on exit, don't need a ref */ + /* + * Don't need a reference here, as we're adding it to the task + * task_works list. If the task exits, the list is pruned. + */ req->task = current; return 0; } @@ -3767,46 +3967,15 @@ static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt) struct io_poll_iocb *poll = &req->poll; struct io_ring_ctx *ctx = req->ctx; struct io_poll_table ipt; - bool cancel = false; __poll_t mask; INIT_HLIST_NODE(&req->hash_node); - - poll->head = NULL; - poll->done = false; - poll->canceled = false; - - ipt.pt._qproc = io_poll_queue_proc; - ipt.pt._key = poll->events; - ipt.req = req; - ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */ - - /* initialized the list so that we can do list_empty checks */ - INIT_LIST_HEAD(&poll->wait.entry); - init_waitqueue_func_entry(&poll->wait, io_poll_wake); - poll->wait.private = req; - INIT_LIST_HEAD(&req->list); + ipt.pt._qproc = io_poll_queue_proc; - mask = vfs_poll(poll->file, &ipt.pt) & poll->events; + mask = __io_arm_poll_handler(req, &req->poll, &ipt, poll->events, + io_poll_wake); - spin_lock_irq(&ctx->completion_lock); - if (likely(poll->head)) { - spin_lock(&poll->head->lock); - if (unlikely(list_empty(&poll->wait.entry))) { - if (ipt.error) - cancel = true; - ipt.error = 0; - mask = 0; - } - if (mask || ipt.error) - list_del_init(&poll->wait.entry); - else if (cancel) - WRITE_ONCE(poll->canceled, true); - else if (!poll->done) /* actually waiting for an event */ - io_poll_req_insert(req); - spin_unlock(&poll->head->lock); - } if (mask) { /* no async, we'd stolen it */ ipt.error = 0; io_poll_complete(req, mask, 0); @@ -4751,6 +4920,9 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) if (!(req->flags & REQ_F_LINK)) return NULL; + /* for polled retry, if flag is set, we already went through here */ + if (req->flags & REQ_F_POLLED) + return NULL; nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, link_list); @@ -4788,6 +4960,11 @@ again: */ if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) || (req->flags & REQ_F_MUST_PUNT))) { + if (io_arm_poll_handler(req)) { + if (linked_timeout) + io_queue_linked_timeout(linked_timeout); + goto done_req; + } punt: if (io_op_defs[req->opcode].file_table) { ret = io_grab_files(req); @@ -6782,6 +6959,17 @@ static void __io_uring_show_fdinfo(struct io_ring_ctx *ctx, struct seq_file *m) seq_printf(m, "Personalities:\n"); idr_for_each(&ctx->personality_idr, io_uring_show_cred, m); } + seq_printf(m, "PollList:\n"); + spin_lock_irq(&ctx->completion_lock); + for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) { + struct hlist_head *list = &ctx->cancel_hash[i]; + struct io_kiocb *req; + + hlist_for_each_entry(req, list, hash_node) + seq_printf(m, " op=%d, task_works=%d\n", req->opcode, + req->task->task_works != NULL); + } + spin_unlock_irq(&ctx->completion_lock); mutex_unlock(&ctx->uring_lock); } @@ -6998,7 +7186,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p) p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP | IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS | - IORING_FEAT_CUR_PERSONALITY; + IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL; trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags); return ret; err: diff --git a/include/trace/events/io_uring.h b/include/trace/events/io_uring.h index 27bd9e4f927b..9f0d3b7d56b0 100644 --- a/include/trace/events/io_uring.h +++ b/include/trace/events/io_uring.h @@ -357,6 +357,109 @@ TRACE_EVENT(io_uring_submit_sqe, __entry->force_nonblock, __entry->sq_thread) ); +TRACE_EVENT(io_uring_poll_arm, + + TP_PROTO(void *ctx, u8 opcode, u64 user_data, int mask, int events), + + TP_ARGS(ctx, opcode, user_data, mask, events), + + TP_STRUCT__entry ( + __field( void *, ctx ) + __field( u8, opcode ) + __field( u64, user_data ) + __field( int, mask ) + __field( int, events ) + ), + + TP_fast_assign( + __entry->ctx = ctx; + __entry->opcode = opcode; + __entry->user_data = user_data; + __entry->mask = mask; + __entry->events = events; + ), + + TP_printk("ring %p, op %d, data 0x%llx, mask 0x%x, events 0x%x", + __entry->ctx, __entry->opcode, + (unsigned long long) __entry->user_data, + __entry->mask, __entry->events) +); + +TRACE_EVENT(io_uring_poll_wake, + + TP_PROTO(void *ctx, u8 opcode, u64 user_data, int mask), + + TP_ARGS(ctx, opcode, user_data, mask), + + TP_STRUCT__entry ( + __field( void *, ctx ) + __field( u8, opcode ) + __field( u64, user_data ) + __field( int, mask ) + ), + + TP_fast_assign( + __entry->ctx = ctx; + __entry->opcode = opcode; + __entry->user_data = user_data; + __entry->mask = mask; + ), + + TP_printk("ring %p, op %d, data 0x%llx, mask 0x%x", + __entry->ctx, __entry->opcode, + (unsigned long long) __entry->user_data, + __entry->mask) +); + +TRACE_EVENT(io_uring_task_add, + + TP_PROTO(void *ctx, u8 opcode, u64 user_data, int mask), + + TP_ARGS(ctx, opcode, user_data, mask), + + TP_STRUCT__entry ( + __field( void *, ctx ) + __field( u8, opcode ) + __field( u64, user_data ) + __field( int, mask ) + ), + + TP_fast_assign( + __entry->ctx = ctx; + __entry->opcode = opcode; + __entry->user_data = user_data; + __entry->mask = mask; + ), + + TP_printk("ring %p, op %d, data 0x%llx, mask %x", + __entry->ctx, __entry->opcode, + (unsigned long long) __entry->user_data, + __entry->mask) +); + +TRACE_EVENT(io_uring_task_run, + + TP_PROTO(void *ctx, u8 opcode, u64 user_data), + + TP_ARGS(ctx, opcode, user_data), + + TP_STRUCT__entry ( + __field( void *, ctx ) + __field( u8, opcode ) + __field( u64, user_data ) + ), + + TP_fast_assign( + __entry->ctx = ctx; + __entry->opcode = opcode; + __entry->user_data = user_data; + ), + + TP_printk("ring %p, op %d, data 0x%llx", + __entry->ctx, __entry->opcode, + (unsigned long long) __entry->user_data) +); + #endif /* _TRACE_IO_URING_H */ /* This part must be outside protection */ diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 08891cc1c1e7..53b36311cdac 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -216,6 +216,7 @@ struct io_uring_params { #define IORING_FEAT_SUBMIT_STABLE (1U << 2) #define IORING_FEAT_RW_CUR_POS (1U << 3) #define IORING_FEAT_CUR_PERSONALITY (1U << 4) +#define IORING_FEAT_FAST_POLL (1U << 5) /* * io_uring_register(2) opcodes and arguments -- cgit From 4bc4494ec7c97ee38e2aa3d1cd76e289c49ac083 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Sat, 29 Feb 2020 22:48:24 +0300 Subject: io_uring: remove extra nxt check after punt After __io_queue_sqe() ended up in io_queue_async_work(), it's already known that there is no @nxt req, so skip the check and return from the function. Also, @nxt initialisation now can be done just before io_put_req_find_next(), as there is no jumping until it's checked. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 8c976fde40bd..d70bc7747e84 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -4936,7 +4936,7 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_kiocb *linked_timeout; - struct io_kiocb *nxt = NULL; + struct io_kiocb *nxt; const struct cred *old_creds = NULL; int ret; @@ -4963,7 +4963,7 @@ again: if (io_arm_poll_handler(req)) { if (linked_timeout) io_queue_linked_timeout(linked_timeout); - goto done_req; + goto exit; } punt: if (io_op_defs[req->opcode].file_table) { @@ -4977,10 +4977,11 @@ punt: * submit reference when the iocb is actually submitted. */ io_queue_async_work(req); - goto done_req; + goto exit; } err: + nxt = NULL; /* drop submission reference */ io_put_req_find_next(req, &nxt); @@ -4997,15 +4998,14 @@ err: req_set_fail_links(req); io_put_req(req); } -done_req: if (nxt) { req = nxt; - nxt = NULL; if (req->flags & REQ_F_FORCE_ASYNC) goto punt; goto again; } +exit: if (old_creds) revert_creds(old_creds); } -- cgit From 3b17cf5a58f2a38e23ee980b5dece717d0464fb7 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Sat, 29 Feb 2020 22:56:10 +0300 Subject: io_uring: remove io_prep_next_work() io-wq cares about IO_WQ_WORK_UNBOUND flag only while enqueueing, so it's useless setting it for a next req of a link. Thus, removed it from io_prep_linked_timeout(), and inline the function. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index d70bc7747e84..fb8fe0bd5e18 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -999,17 +999,6 @@ static inline void io_req_work_drop_env(struct io_kiocb *req) } } -static inline void io_prep_next_work(struct io_kiocb *req, - struct io_kiocb **link) -{ - const struct io_op_def *def = &io_op_defs[req->opcode]; - - if (!(req->flags & REQ_F_ISREG) && def->unbound_nonreg_file) - req->work.flags |= IO_WQ_WORK_UNBOUND; - - *link = io_prep_linked_timeout(req); -} - static inline bool io_prep_async_work(struct io_kiocb *req, struct io_kiocb **link) { @@ -2581,8 +2570,8 @@ static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) { struct io_kiocb *link; - io_prep_next_work(nxt, &link); *workptr = &nxt->work; + link = io_prep_linked_timeout(nxt); if (link) { nxt->work.func = io_link_work_cb; nxt->work.data = link; -- cgit From 8755d97a09fed0de206772bcad1838301293c4d8 Mon Sep 17 00:00:00 2001 From: Nathan Chancellor Date: Mon, 2 Mar 2020 16:01:19 -0700 Subject: io_uring: Ensure mask is initialized in io_arm_poll_handler Clang warns: fs/io_uring.c:4178:6: warning: variable 'mask' is used uninitialized whenever 'if' condition is false [-Wsometimes-uninitialized] if (def->pollin) ^~~~~~~~~~~ fs/io_uring.c:4182:2: note: uninitialized use occurs here mask |= POLLERR | POLLPRI; ^~~~ fs/io_uring.c:4178:2: note: remove the 'if' if its condition is always true if (def->pollin) ^~~~~~~~~~~~~~~~ fs/io_uring.c:4154:15: note: initialize the variable 'mask' to silence this warning __poll_t mask, ret; ^ = 0 1 warning generated. io_op_defs has many definitions where pollin is not set so mask indeed might be uninitialized. Initialize it to zero and change the next assignment to |=, in case further masks are added in the future to avoid missing changing the assignment then. Fixes: d7718a9d25a6 ("io_uring: use poll driven retry for files that support it") Link: https://github.com/ClangBuiltLinux/linux/issues/916 Signed-off-by: Nathan Chancellor Signed-off-by: Jens Axboe --- fs/io_uring.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index fb8fe0bd5e18..e92b88455e5e 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3738,8 +3738,9 @@ static bool io_arm_poll_handler(struct io_kiocb *req) req->apoll = apoll; INIT_HLIST_NODE(&req->hash_node); + mask = 0; if (def->pollin) - mask = POLLIN | POLLRDNORM; + mask |= POLLIN | POLLRDNORM; if (def->pollout) mask |= POLLOUT | POLLWRNORM; mask |= POLLERR | POLLPRI; -- cgit From a2100672f3b2afdd55ccc2e640d1a8bd99ff6338 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 2 Mar 2020 23:45:16 +0300 Subject: io_uring: clean up io_close Don't abuse labels for plain and straightworward code. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index e92b88455e5e..950b7b3396ad 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3030,8 +3030,16 @@ static int io_close(struct io_kiocb *req, struct io_kiocb **nxt, return ret; /* if the file has a flush method, be safe and punt to async */ - if (req->close.put_file->f_op->flush && !io_wq_current_is_worker()) - goto eagain; + if (req->close.put_file->f_op->flush && force_nonblock) { + req->work.func = io_close_finish; + /* + * Do manual async queue here to avoid grabbing files - we don't + * need the files, and it'll cause io_close_finish() to close + * the file again and cause a double CQE entry for this request + */ + io_queue_async_work(req); + return 0; + } /* * No ->flush(), safely close from here and just punt the @@ -3039,15 +3047,6 @@ static int io_close(struct io_kiocb *req, struct io_kiocb **nxt, */ __io_close_finish(req, nxt); return 0; -eagain: - req->work.func = io_close_finish; - /* - * Do manual async queue here to avoid grabbing files - we don't - * need the files, and it'll cause io_close_finish() to close - * the file again and cause a double CQE entry for this request - */ - io_queue_async_work(req); - return 0; } static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) -- cgit From 594506fec5faec2b1ec82ad6fb0c8132512fc459 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Tue, 3 Mar 2020 21:33:11 +0300 Subject: io_uring: make submission ref putting consistent The rule is simple, any async handler gets a submission ref and should put it at the end. Make them all follow it, and so more consistent. This is a preparation patch, and as io_wq_assign_next() currently won't ever work, this doesn't care to use io_put_req_find_next() instead of io_put_req(). Signed-off-by: Pavel Begunkov refcount_inc_not_zero() -> refcount_inc() fix. Signed-off-by: Jens Axboe --- fs/io_uring.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 950b7b3396ad..0e935d77d8aa 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2550,7 +2550,7 @@ static bool io_req_cancelled(struct io_kiocb *req) if (req->work.flags & IO_WQ_WORK_CANCEL) { req_set_fail_links(req); io_cqring_add_event(req, -ECANCELED); - io_put_req(req); + io_double_put_req(req); return true; } @@ -2600,6 +2600,7 @@ static void io_fsync_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_fsync(req, &nxt); + io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); } @@ -2609,7 +2610,6 @@ static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt, { /* fsync always requires a blocking context */ if (force_nonblock) { - io_put_req(req); req->work.func = io_fsync_finish; return -EAGAIN; } @@ -2621,9 +2621,6 @@ static void __io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt) { int ret; - if (io_req_cancelled(req)) - return; - ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off, req->sync.len); if (ret < 0) @@ -2637,7 +2634,10 @@ static void io_fallocate_finish(struct io_wq_work **workptr) struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); struct io_kiocb *nxt = NULL; + if (io_req_cancelled(req)) + return; __io_fallocate(req, &nxt); + io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); } @@ -2659,7 +2659,6 @@ static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt, { /* fallocate always requiring blocking context */ if (force_nonblock) { - io_put_req(req); req->work.func = io_fallocate_finish; return -EAGAIN; } @@ -3015,6 +3014,7 @@ static void io_close_finish(struct io_wq_work **workptr) /* not cancellable, don't do io_req_cancelled() */ __io_close_finish(req, &nxt); + io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); } @@ -3031,6 +3031,9 @@ static int io_close(struct io_kiocb *req, struct io_kiocb **nxt, /* if the file has a flush method, be safe and punt to async */ if (req->close.put_file->f_op->flush && force_nonblock) { + /* submission ref will be dropped, take it for async */ + refcount_inc(&req->refs); + req->work.func = io_close_finish; /* * Do manual async queue here to avoid grabbing files - we don't @@ -3088,6 +3091,7 @@ static void io_sync_file_range_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_sync_file_range(req, &nxt); + io_put_req(req); /* put submission ref */ if (nxt) io_wq_assign_next(workptr, nxt); } @@ -3097,7 +3101,6 @@ static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt, { /* sync_file_range always requires a blocking context */ if (force_nonblock) { - io_put_req(req); req->work.func = io_sync_file_range_finish; return -EAGAIN; } @@ -3464,11 +3467,10 @@ static void io_accept_finish(struct io_wq_work **workptr) struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); struct io_kiocb *nxt = NULL; - io_put_req(req); - if (io_req_cancelled(req)) return; __io_accept(req, &nxt, false); + io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); } @@ -4734,17 +4736,14 @@ static void io_wq_submit_work(struct io_wq_work **workptr) } while (1); } - /* drop submission reference */ - io_put_req(req); - if (ret) { req_set_fail_links(req); io_cqring_add_event(req, ret); io_put_req(req); } - /* if a dependent link is ready, pass it back */ - if (!ret && nxt) + io_put_req(req); /* drop submission reference */ + if (nxt) io_wq_assign_next(workptr, nxt); } -- cgit From 014db0073cc6a12e1f421b9231d6f3aa35735823 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Tue, 3 Mar 2020 21:33:12 +0300 Subject: io_uring: remove @nxt from handlers There will be no use for @nxt in the handlers, and it's doesn't work anyway, so purge it Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 204 +++++++++++++++++++++++++--------------------------------- 1 file changed, 86 insertions(+), 118 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 0e935d77d8aa..f4faaa2a9a3f 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1804,17 +1804,6 @@ static void io_complete_rw(struct kiocb *kiocb, long res, long res2) io_put_req(req); } -static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res) -{ - struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); - struct io_kiocb *nxt = NULL; - - io_complete_rw_common(kiocb, res); - io_put_req_find_next(req, &nxt); - - return nxt; -} - static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); @@ -2009,14 +1998,14 @@ static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) } } -static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt) +static void kiocb_done(struct kiocb *kiocb, ssize_t ret) { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); if (req->flags & REQ_F_CUR_POS) req->file->f_pos = kiocb->ki_pos; if (ret >= 0 && kiocb->ki_complete == io_complete_rw) - *nxt = __io_complete_rw(kiocb, ret); + io_complete_rw(kiocb, ret, 0); else io_rw_done(kiocb, ret); } @@ -2265,8 +2254,7 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, return 0; } -static int io_read(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_read(struct io_kiocb *req, bool force_nonblock) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw.kiocb; @@ -2306,7 +2294,7 @@ static int io_read(struct io_kiocb *req, struct io_kiocb **nxt, /* Catch -EAGAIN return for forced non-blocking submission */ if (!force_nonblock || ret2 != -EAGAIN) { - kiocb_done(kiocb, ret2, nxt); + kiocb_done(kiocb, ret2); } else { copy_iov: ret = io_setup_async_rw(req, io_size, iovec, @@ -2355,8 +2343,7 @@ static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, return 0; } -static int io_write(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_write(struct io_kiocb *req, bool force_nonblock) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw.kiocb; @@ -2420,7 +2407,7 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt, if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT)) ret2 = -EAGAIN; if (!force_nonblock || ret2 != -EAGAIN) { - kiocb_done(kiocb, ret2, nxt); + kiocb_done(kiocb, ret2); } else { copy_iov: ret = io_setup_async_rw(req, io_size, iovec, @@ -2477,8 +2464,7 @@ static bool io_splice_punt(struct file *file) return !(file->f_mode & O_NONBLOCK); } -static int io_splice(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_splice(struct io_kiocb *req, bool force_nonblock) { struct io_splice *sp = &req->splice; struct file *in = sp->file_in; @@ -2505,7 +2491,7 @@ static int io_splice(struct io_kiocb *req, struct io_kiocb **nxt, io_cqring_add_event(req, ret); if (ret != sp->len) req_set_fail_links(req); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; } @@ -2578,7 +2564,7 @@ static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) } } -static void __io_fsync(struct io_kiocb *req, struct io_kiocb **nxt) +static void __io_fsync(struct io_kiocb *req) { loff_t end = req->sync.off + req->sync.len; int ret; @@ -2589,7 +2575,7 @@ static void __io_fsync(struct io_kiocb *req, struct io_kiocb **nxt) if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); } static void io_fsync_finish(struct io_wq_work **workptr) @@ -2599,25 +2585,24 @@ static void io_fsync_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; - __io_fsync(req, &nxt); + __io_fsync(req); io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); } -static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_fsync(struct io_kiocb *req, bool force_nonblock) { /* fsync always requires a blocking context */ if (force_nonblock) { req->work.func = io_fsync_finish; return -EAGAIN; } - __io_fsync(req, nxt); + __io_fsync(req); return 0; } -static void __io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt) +static void __io_fallocate(struct io_kiocb *req) { int ret; @@ -2626,7 +2611,7 @@ static void __io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt) if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); } static void io_fallocate_finish(struct io_wq_work **workptr) @@ -2636,7 +2621,7 @@ static void io_fallocate_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; - __io_fallocate(req, &nxt); + __io_fallocate(req); io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); @@ -2654,8 +2639,7 @@ static int io_fallocate_prep(struct io_kiocb *req, return 0; } -static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_fallocate(struct io_kiocb *req, bool force_nonblock) { /* fallocate always requiring blocking context */ if (force_nonblock) { @@ -2663,7 +2647,7 @@ static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt, return -EAGAIN; } - __io_fallocate(req, nxt); + __io_fallocate(req); return 0; } @@ -2736,8 +2720,7 @@ static int io_openat2_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return 0; } -static int io_openat2(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_openat2(struct io_kiocb *req, bool force_nonblock) { struct open_flags op; struct file *file; @@ -2768,15 +2751,14 @@ err: if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; } -static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_openat(struct io_kiocb *req, bool force_nonblock) { req->open.how = build_open_how(req->open.how.flags, req->open.how.mode); - return io_openat2(req, nxt, force_nonblock); + return io_openat2(req, force_nonblock); } static int io_epoll_ctl_prep(struct io_kiocb *req, @@ -2804,8 +2786,7 @@ static int io_epoll_ctl_prep(struct io_kiocb *req, #endif } -static int io_epoll_ctl(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_epoll_ctl(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_EPOLL) struct io_epoll *ie = &req->epoll; @@ -2818,7 +2799,7 @@ static int io_epoll_ctl(struct io_kiocb *req, struct io_kiocb **nxt, if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; #else return -EOPNOTSUPP; @@ -2840,8 +2821,7 @@ static int io_madvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) #endif } -static int io_madvise(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_madvise(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU) struct io_madvise *ma = &req->madvise; @@ -2854,7 +2834,7 @@ static int io_madvise(struct io_kiocb *req, struct io_kiocb **nxt, if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; #else return -EOPNOTSUPP; @@ -2872,8 +2852,7 @@ static int io_fadvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return 0; } -static int io_fadvise(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_fadvise(struct io_kiocb *req, bool force_nonblock) { struct io_fadvise *fa = &req->fadvise; int ret; @@ -2893,7 +2872,7 @@ static int io_fadvise(struct io_kiocb *req, struct io_kiocb **nxt, if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; } @@ -2930,8 +2909,7 @@ static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return 0; } -static int io_statx(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_statx(struct io_kiocb *req, bool force_nonblock) { struct io_open *ctx = &req->open; unsigned lookup_flags; @@ -2968,7 +2946,7 @@ err: if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; } @@ -2995,7 +2973,7 @@ static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) } /* only called when __close_fd_get_file() is done */ -static void __io_close_finish(struct io_kiocb *req, struct io_kiocb **nxt) +static void __io_close_finish(struct io_kiocb *req) { int ret; @@ -3004,7 +2982,7 @@ static void __io_close_finish(struct io_kiocb *req, struct io_kiocb **nxt) req_set_fail_links(req); io_cqring_add_event(req, ret); fput(req->close.put_file); - io_put_req_find_next(req, nxt); + io_put_req(req); } static void io_close_finish(struct io_wq_work **workptr) @@ -3013,14 +2991,13 @@ static void io_close_finish(struct io_wq_work **workptr) struct io_kiocb *nxt = NULL; /* not cancellable, don't do io_req_cancelled() */ - __io_close_finish(req, &nxt); + __io_close_finish(req); io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); } -static int io_close(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_close(struct io_kiocb *req, bool force_nonblock) { int ret; @@ -3048,7 +3025,7 @@ static int io_close(struct io_kiocb *req, struct io_kiocb **nxt, * No ->flush(), safely close from here and just punt the * fput() to async context. */ - __io_close_finish(req, nxt); + __io_close_finish(req); return 0; } @@ -3070,7 +3047,7 @@ static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) return 0; } -static void __io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt) +static void __io_sync_file_range(struct io_kiocb *req) { int ret; @@ -3079,7 +3056,7 @@ static void __io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt) if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); } @@ -3090,14 +3067,13 @@ static void io_sync_file_range_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; - __io_sync_file_range(req, &nxt); + __io_sync_file_range(req); io_put_req(req); /* put submission ref */ if (nxt) io_wq_assign_next(workptr, nxt); } -static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_sync_file_range(struct io_kiocb *req, bool force_nonblock) { /* sync_file_range always requires a blocking context */ if (force_nonblock) { @@ -3105,7 +3081,7 @@ static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt, return -EAGAIN; } - __io_sync_file_range(req, nxt); + __io_sync_file_range(req); return 0; } @@ -3157,8 +3133,7 @@ static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) #endif } -static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_sendmsg(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_NET) struct io_async_msghdr *kmsg = NULL; @@ -3212,15 +3187,14 @@ static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt, io_cqring_add_event(req, ret); if (ret < 0) req_set_fail_links(req); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; #else return -EOPNOTSUPP; #endif } -static int io_send(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_send(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_NET) struct socket *sock; @@ -3263,7 +3237,7 @@ static int io_send(struct io_kiocb *req, struct io_kiocb **nxt, io_cqring_add_event(req, ret); if (ret < 0) req_set_fail_links(req); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; #else return -EOPNOTSUPP; @@ -3304,8 +3278,7 @@ static int io_recvmsg_prep(struct io_kiocb *req, #endif } -static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_NET) struct io_async_msghdr *kmsg = NULL; @@ -3361,15 +3334,14 @@ static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt, io_cqring_add_event(req, ret); if (ret < 0) req_set_fail_links(req); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; #else return -EOPNOTSUPP; #endif } -static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_recv(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_NET) struct socket *sock; @@ -3413,7 +3385,7 @@ static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt, io_cqring_add_event(req, ret); if (ret < 0) req_set_fail_links(req); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; #else return -EOPNOTSUPP; @@ -3441,8 +3413,7 @@ static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) } #if defined(CONFIG_NET) -static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int __io_accept(struct io_kiocb *req, bool force_nonblock) { struct io_accept *accept = &req->accept; unsigned file_flags; @@ -3458,7 +3429,7 @@ static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt, if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; } @@ -3469,20 +3440,19 @@ static void io_accept_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; - __io_accept(req, &nxt, false); + __io_accept(req, false); io_put_req(req); /* drop submission reference */ if (nxt) io_wq_assign_next(workptr, nxt); } #endif -static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_accept(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_NET) int ret; - ret = __io_accept(req, nxt, force_nonblock); + ret = __io_accept(req, force_nonblock); if (ret == -EAGAIN && force_nonblock) { req->work.func = io_accept_finish; return -EAGAIN; @@ -3517,8 +3487,7 @@ static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) #endif } -static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt, - bool force_nonblock) +static int io_connect(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_NET) struct io_async_ctx __io, *io; @@ -3556,7 +3525,7 @@ out: if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); - io_put_req_find_next(req, nxt); + io_put_req(req); return 0; #else return -EOPNOTSUPP; @@ -3953,7 +3922,7 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe return 0; } -static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt) +static int io_poll_add(struct io_kiocb *req) { struct io_poll_iocb *poll = &req->poll; struct io_ring_ctx *ctx = req->ctx; @@ -3975,7 +3944,7 @@ static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt) if (mask) { io_cqring_ev_posted(ctx); - io_put_req_find_next(req, nxt); + io_put_req(req); } return ipt.error; } @@ -4224,7 +4193,7 @@ static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr) static void io_async_find_and_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req, __u64 sqe_addr, - struct io_kiocb **nxt, int success_ret) + int success_ret) { unsigned long flags; int ret; @@ -4250,7 +4219,7 @@ done: if (ret < 0) req_set_fail_links(req); - io_put_req_find_next(req, nxt); + io_put_req(req); } static int io_async_cancel_prep(struct io_kiocb *req, @@ -4266,11 +4235,11 @@ static int io_async_cancel_prep(struct io_kiocb *req, return 0; } -static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt) +static int io_async_cancel(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; - io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0); + io_async_find_and_cancel(ctx, req, req->cancel.addr, 0); return 0; } @@ -4477,7 +4446,7 @@ static void io_cleanup_req(struct io_kiocb *req) } static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, - struct io_kiocb **nxt, bool force_nonblock) + bool force_nonblock) { struct io_ring_ctx *ctx = req->ctx; int ret; @@ -4494,7 +4463,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret < 0) break; } - ret = io_read(req, nxt, force_nonblock); + ret = io_read(req, force_nonblock); break; case IORING_OP_WRITEV: case IORING_OP_WRITE_FIXED: @@ -4504,7 +4473,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret < 0) break; } - ret = io_write(req, nxt, force_nonblock); + ret = io_write(req, force_nonblock); break; case IORING_OP_FSYNC: if (sqe) { @@ -4512,7 +4481,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret < 0) break; } - ret = io_fsync(req, nxt, force_nonblock); + ret = io_fsync(req, force_nonblock); break; case IORING_OP_POLL_ADD: if (sqe) { @@ -4520,7 +4489,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_poll_add(req, nxt); + ret = io_poll_add(req); break; case IORING_OP_POLL_REMOVE: if (sqe) { @@ -4536,7 +4505,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret < 0) break; } - ret = io_sync_file_range(req, nxt, force_nonblock); + ret = io_sync_file_range(req, force_nonblock); break; case IORING_OP_SENDMSG: case IORING_OP_SEND: @@ -4546,9 +4515,9 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, break; } if (req->opcode == IORING_OP_SENDMSG) - ret = io_sendmsg(req, nxt, force_nonblock); + ret = io_sendmsg(req, force_nonblock); else - ret = io_send(req, nxt, force_nonblock); + ret = io_send(req, force_nonblock); break; case IORING_OP_RECVMSG: case IORING_OP_RECV: @@ -4558,9 +4527,9 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, break; } if (req->opcode == IORING_OP_RECVMSG) - ret = io_recvmsg(req, nxt, force_nonblock); + ret = io_recvmsg(req, force_nonblock); else - ret = io_recv(req, nxt, force_nonblock); + ret = io_recv(req, force_nonblock); break; case IORING_OP_TIMEOUT: if (sqe) { @@ -4584,7 +4553,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_accept(req, nxt, force_nonblock); + ret = io_accept(req, force_nonblock); break; case IORING_OP_CONNECT: if (sqe) { @@ -4592,7 +4561,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_connect(req, nxt, force_nonblock); + ret = io_connect(req, force_nonblock); break; case IORING_OP_ASYNC_CANCEL: if (sqe) { @@ -4600,7 +4569,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_async_cancel(req, nxt); + ret = io_async_cancel(req); break; case IORING_OP_FALLOCATE: if (sqe) { @@ -4608,7 +4577,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_fallocate(req, nxt, force_nonblock); + ret = io_fallocate(req, force_nonblock); break; case IORING_OP_OPENAT: if (sqe) { @@ -4616,7 +4585,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_openat(req, nxt, force_nonblock); + ret = io_openat(req, force_nonblock); break; case IORING_OP_CLOSE: if (sqe) { @@ -4624,7 +4593,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_close(req, nxt, force_nonblock); + ret = io_close(req, force_nonblock); break; case IORING_OP_FILES_UPDATE: if (sqe) { @@ -4640,7 +4609,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_statx(req, nxt, force_nonblock); + ret = io_statx(req, force_nonblock); break; case IORING_OP_FADVISE: if (sqe) { @@ -4648,7 +4617,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_fadvise(req, nxt, force_nonblock); + ret = io_fadvise(req, force_nonblock); break; case IORING_OP_MADVISE: if (sqe) { @@ -4656,7 +4625,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_madvise(req, nxt, force_nonblock); + ret = io_madvise(req, force_nonblock); break; case IORING_OP_OPENAT2: if (sqe) { @@ -4664,7 +4633,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_openat2(req, nxt, force_nonblock); + ret = io_openat2(req, force_nonblock); break; case IORING_OP_EPOLL_CTL: if (sqe) { @@ -4672,7 +4641,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret) break; } - ret = io_epoll_ctl(req, nxt, force_nonblock); + ret = io_epoll_ctl(req, force_nonblock); break; case IORING_OP_SPLICE: if (sqe) { @@ -4680,7 +4649,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret < 0) break; } - ret = io_splice(req, nxt, force_nonblock); + ret = io_splice(req, force_nonblock); break; default: ret = -EINVAL; @@ -4724,7 +4693,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr) if (!ret) { do { - ret = io_issue_sqe(req, NULL, &nxt, false); + ret = io_issue_sqe(req, NULL, false); /* * We can get EAGAIN for polled IO even though we're * forcing a sync submission from here, since we can't @@ -4870,8 +4839,7 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer) if (prev) { req_set_fail_links(prev); - io_async_find_and_cancel(ctx, req, prev->user_data, NULL, - -ETIME); + io_async_find_and_cancel(ctx, req, prev->user_data, -ETIME); io_put_req(prev); } else { io_cqring_add_event(req, -ETIME); @@ -4940,7 +4908,7 @@ again: old_creds = override_creds(req->work.creds); } - ret = io_issue_sqe(req, sqe, &nxt, true); + ret = io_issue_sqe(req, sqe, true); /* * We async punt it if the file wasn't marked NOWAIT, or if the file -- cgit From 7a743e225b2a9da772b28a50031e1ccd8a8ce404 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Tue, 3 Mar 2020 21:33:13 +0300 Subject: io_uring: get next work with submission ref drop If after dropping the submission reference req->refs == 1, the request is done, because this one is for io_put_work() and will be dropped synchronously shortly after. In this case it's safe to steal a next work from the request. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io_uring.c | 89 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index f4faaa2a9a3f..40ca9e6a5ace 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1518,6 +1518,27 @@ static void io_free_req(struct io_kiocb *req) io_queue_async_work(nxt); } +static void io_link_work_cb(struct io_wq_work **workptr) +{ + struct io_wq_work *work = *workptr; + struct io_kiocb *link = work->data; + + io_queue_linked_timeout(link); + io_wq_submit_work(workptr); +} + +static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) +{ + struct io_kiocb *link; + + *workptr = &nxt->work; + link = io_prep_linked_timeout(nxt); + if (link) { + nxt->work.func = io_link_work_cb; + nxt->work.data = link; + } +} + /* * Drop reference to request, return next in chain (if there is one) if this * was the last reference to this request. @@ -1537,6 +1558,27 @@ static void io_put_req(struct io_kiocb *req) io_free_req(req); } +static void io_put_req_async_completion(struct io_kiocb *req, + struct io_wq_work **workptr) +{ + /* + * It's in an io-wq worker, so there always should be at least + * one reference, which will be dropped in io_put_work() just + * after the current handler returns. + * + * It also means, that if the counter dropped to 1, then there is + * no asynchronous users left, so it's safe to steal the next work. + */ + refcount_dec(&req->refs); + if (refcount_read(&req->refs) == 1) { + struct io_kiocb *nxt = NULL; + + io_req_find_next(req, &nxt); + if (nxt) + io_wq_assign_next(workptr, nxt); + } +} + /* * Must only be used if we don't need to care about links, usually from * within the completion handling itself. @@ -2543,27 +2585,6 @@ static bool io_req_cancelled(struct io_kiocb *req) return false; } -static void io_link_work_cb(struct io_wq_work **workptr) -{ - struct io_wq_work *work = *workptr; - struct io_kiocb *link = work->data; - - io_queue_linked_timeout(link); - io_wq_submit_work(workptr); -} - -static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) -{ - struct io_kiocb *link; - - *workptr = &nxt->work; - link = io_prep_linked_timeout(nxt); - if (link) { - nxt->work.func = io_link_work_cb; - nxt->work.data = link; - } -} - static void __io_fsync(struct io_kiocb *req) { loff_t end = req->sync.off + req->sync.len; @@ -2581,14 +2602,11 @@ static void __io_fsync(struct io_kiocb *req) static void io_fsync_finish(struct io_wq_work **workptr) { struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct io_kiocb *nxt = NULL; if (io_req_cancelled(req)) return; __io_fsync(req); - io_put_req(req); /* drop submission reference */ - if (nxt) - io_wq_assign_next(workptr, nxt); + io_put_req_async_completion(req, workptr); } static int io_fsync(struct io_kiocb *req, bool force_nonblock) @@ -2617,14 +2635,11 @@ static void __io_fallocate(struct io_kiocb *req) static void io_fallocate_finish(struct io_wq_work **workptr) { struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct io_kiocb *nxt = NULL; if (io_req_cancelled(req)) return; __io_fallocate(req); - io_put_req(req); /* drop submission reference */ - if (nxt) - io_wq_assign_next(workptr, nxt); + io_put_req_async_completion(req, workptr); } static int io_fallocate_prep(struct io_kiocb *req, @@ -2988,13 +3003,10 @@ static void __io_close_finish(struct io_kiocb *req) static void io_close_finish(struct io_wq_work **workptr) { struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct io_kiocb *nxt = NULL; /* not cancellable, don't do io_req_cancelled() */ __io_close_finish(req); - io_put_req(req); /* drop submission reference */ - if (nxt) - io_wq_assign_next(workptr, nxt); + io_put_req_async_completion(req, workptr); } static int io_close(struct io_kiocb *req, bool force_nonblock) @@ -3436,14 +3448,11 @@ static int __io_accept(struct io_kiocb *req, bool force_nonblock) static void io_accept_finish(struct io_wq_work **workptr) { struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct io_kiocb *nxt = NULL; if (io_req_cancelled(req)) return; __io_accept(req, false); - io_put_req(req); /* drop submission reference */ - if (nxt) - io_wq_assign_next(workptr, nxt); + io_put_req_async_completion(req, workptr); } #endif @@ -4682,7 +4691,6 @@ static void io_wq_submit_work(struct io_wq_work **workptr) { struct io_wq_work *work = *workptr; struct io_kiocb *req = container_of(work, struct io_kiocb, work); - struct io_kiocb *nxt = NULL; int ret = 0; /* if NO_CANCEL is set, we must still run the work */ @@ -4711,9 +4719,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr) io_put_req(req); } - io_put_req(req); /* drop submission reference */ - if (nxt) - io_wq_assign_next(workptr, nxt); + io_put_req_async_completion(req, workptr); } static int io_req_needs_file(struct io_kiocb *req, int fd) @@ -6103,6 +6109,7 @@ static void io_put_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); + /* Consider that io_put_req_async_completion() relies on this ref */ io_put_req(req); } -- cgit From dc026a73c7221b4d9d146ed0bde69ff578ebe8dc Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 4 Mar 2020 16:14:09 +0300 Subject: io-wq: shuffle io_worker_handle_work() code This is a preparation patch, it adds some helpers and makes the next patches cleaner. - extract io_impersonate_work() and io_assign_current_work() - replace @next label with nested do-while - move put_work() right after NULL'ing cur_work. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 123 ++++++++++++++++++++++++++++++++----------------------------- 1 file changed, 64 insertions(+), 59 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 042c7e2057ef..e438dc4d7cb3 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -440,14 +440,43 @@ static void io_wq_switch_creds(struct io_worker *worker, worker->saved_creds = old_creds; } +static void io_impersonate_work(struct io_worker *worker, + struct io_wq_work *work) +{ + if (work->files && current->files != work->files) { + task_lock(current); + current->files = work->files; + task_unlock(current); + } + if (work->fs && current->fs != work->fs) + current->fs = work->fs; + if (work->mm != worker->mm) + io_wq_switch_mm(worker, work); + if (worker->cur_creds != work->creds) + io_wq_switch_creds(worker, work); +} + +static void io_assign_current_work(struct io_worker *worker, + struct io_wq_work *work) +{ + /* flush pending signals before assigning new work */ + if (signal_pending(current)) + flush_signals(current); + cond_resched(); + + spin_lock_irq(&worker->lock); + worker->cur_work = work; + spin_unlock_irq(&worker->lock); +} + static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { - struct io_wq_work *work, *old_work = NULL, *put_work = NULL; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; do { + struct io_wq_work *work, *old_work; unsigned hash = -1U; /* @@ -464,69 +493,45 @@ static void io_worker_handle_work(struct io_worker *worker) wqe->flags |= IO_WQE_FLAG_STALLED; spin_unlock_irq(&wqe->lock); - if (put_work && wq->put_work) - wq->put_work(old_work); if (!work) break; -next: - /* flush any pending signals before assigning new work */ - if (signal_pending(current)) - flush_signals(current); - - cond_resched(); - spin_lock_irq(&worker->lock); - worker->cur_work = work; - spin_unlock_irq(&worker->lock); - - if (work->files && current->files != work->files) { - task_lock(current); - current->files = work->files; - task_unlock(current); - } - if (work->fs && current->fs != work->fs) - current->fs = work->fs; - if (work->mm != worker->mm) - io_wq_switch_mm(worker, work); - if (worker->cur_creds != work->creds) - io_wq_switch_creds(worker, work); - /* - * OK to set IO_WQ_WORK_CANCEL even for uncancellable work, - * the worker function will do the right thing. - */ - if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) - work->flags |= IO_WQ_WORK_CANCEL; - - if (wq->get_work) { - put_work = work; - wq->get_work(work); - } - - old_work = work; - work->func(&work); - - spin_lock_irq(&worker->lock); - worker->cur_work = NULL; - spin_unlock_irq(&worker->lock); - - spin_lock_irq(&wqe->lock); - - if (hash != -1U) { - wqe->hash_map &= ~BIT(hash); - wqe->flags &= ~IO_WQE_FLAG_STALLED; - } - if (work && work != old_work) { - spin_unlock_irq(&wqe->lock); - - if (put_work && wq->put_work) { - wq->put_work(put_work); - put_work = NULL; + /* handle a whole dependent link */ + do { + io_assign_current_work(worker, work); + io_impersonate_work(worker, work); + + /* + * OK to set IO_WQ_WORK_CANCEL even for uncancellable + * work, the worker function will do the right thing. + */ + if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) + work->flags |= IO_WQ_WORK_CANCEL; + + if (wq->get_work) + wq->get_work(work); + + old_work = work; + work->func(&work); + + spin_lock_irq(&worker->lock); + worker->cur_work = NULL; + spin_unlock_irq(&worker->lock); + + if (wq->put_work) + wq->put_work(old_work); + + if (hash != -1U) { + spin_lock_irq(&wqe->lock); + wqe->hash_map &= ~BIT_ULL(hash); + wqe->flags &= ~IO_WQE_FLAG_STALLED; + spin_unlock_irq(&wqe->lock); + /* dependent work is not hashed */ + hash = -1U; } + } while (work && work != old_work); - /* dependent work not hashed */ - hash = -1U; - goto next; - } + spin_lock_irq(&wqe->lock); } while (1); } -- cgit From 58e3931987377d3f4ec7bbc13e4ea0aab52dc6b0 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 4 Mar 2020 16:14:10 +0300 Subject: io-wq: optimise locking in io_worker_handle_work() There are 2 optimisations: - Now, io_worker_handler_work() do io_assign_current_work() twice per request, and each one adds lock/unlock(worker->lock) pair. The first is to reset worker->cur_work to NULL, and the second to set a real work shortly after. If there is a dependant work, set it immediately, that effectively removes the extra NULL'ing. - And there is no use in taking wqe->lock for linked works, as they are not hashed now. Optimise it out. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index e438dc4d7cb3..473af080470a 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -476,7 +476,7 @@ static void io_worker_handle_work(struct io_worker *worker) struct io_wq *wq = wqe->wq; do { - struct io_wq_work *work, *old_work; + struct io_wq_work *work; unsigned hash = -1U; /* @@ -495,12 +495,13 @@ static void io_worker_handle_work(struct io_worker *worker) spin_unlock_irq(&wqe->lock); if (!work) break; + io_assign_current_work(worker, work); /* handle a whole dependent link */ do { - io_assign_current_work(worker, work); - io_impersonate_work(worker, work); + struct io_wq_work *old_work; + io_impersonate_work(worker, work); /* * OK to set IO_WQ_WORK_CANCEL even for uncancellable * work, the worker function will do the right thing. @@ -513,10 +514,8 @@ static void io_worker_handle_work(struct io_worker *worker) old_work = work; work->func(&work); - - spin_lock_irq(&worker->lock); - worker->cur_work = NULL; - spin_unlock_irq(&worker->lock); + work = (old_work == work) ? NULL : work; + io_assign_current_work(worker, work); if (wq->put_work) wq->put_work(old_work); @@ -529,7 +528,7 @@ static void io_worker_handle_work(struct io_worker *worker) /* dependent work is not hashed */ hash = -1U; } - } while (work && work != old_work); + } while (work); spin_lock_irq(&wqe->lock); } while (1); -- cgit From f462fd36fc43662eeb42c95a9b8da8659af6d75e Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 4 Mar 2020 16:14:11 +0300 Subject: io-wq: optimise out *next_work() double lock When executing non-linked hashed work, io_worker_handle_work() will lock-unlock wqe->lock to update hash, and then immediately lock-unlock to get next work. Optimise this case and do lock/unlock only once. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 473af080470a..82e76011d409 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -474,11 +474,11 @@ static void io_worker_handle_work(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + unsigned hash = -1U; do { struct io_wq_work *work; - unsigned hash = -1U; - +get_next: /* * If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. @@ -524,9 +524,12 @@ static void io_worker_handle_work(struct io_worker *worker) spin_lock_irq(&wqe->lock); wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; - spin_unlock_irq(&wqe->lock); /* dependent work is not hashed */ hash = -1U; + /* skip unnecessary unlock-lock wqe->lock */ + if (!work) + goto get_next; + spin_unlock_irq(&wqe->lock); } } while (work); -- cgit From e9fd939654f17651ff65e7e55aa6934d29eb4335 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 4 Mar 2020 16:14:12 +0300 Subject: io_uring/io-wq: forward submission ref to async First it changes io-wq interfaces. It replaces {get,put}_work() with free_work(), which guaranteed to be called exactly once. It also enforces free_work() callback to be non-NULL. io_uring follows the changes and instead of putting a submission reference in io_put_req_async_completion(), it will be done in io_free_work(). As removes io_get_work() with corresponding refcount_inc(), the ref balance is maintained. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 29 ++++++++++++++--------------- fs/io-wq.h | 6 ++---- fs/io_uring.c | 31 +++++++++++-------------------- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 82e76011d409..eda36f997dea 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -107,8 +107,7 @@ struct io_wq { struct io_wqe **wqes; unsigned long state; - get_work_fn *get_work; - put_work_fn *put_work; + free_work_fn *free_work; struct task_struct *manager; struct user_struct *user; @@ -509,16 +508,11 @@ get_next: if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; - if (wq->get_work) - wq->get_work(work); - old_work = work; work->func(&work); work = (old_work == work) ? NULL : work; io_assign_current_work(worker, work); - - if (wq->put_work) - wq->put_work(old_work); + wq->free_work(old_work); if (hash != -1U) { spin_lock_irq(&wqe->lock); @@ -749,14 +743,17 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, return true; } -static void io_run_cancel(struct io_wq_work *work) +static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) { + struct io_wq *wq = wqe->wq; + do { struct io_wq_work *old_work = work; work->flags |= IO_WQ_WORK_CANCEL; work->func(&work); work = (work == old_work) ? NULL : work; + wq->free_work(old_work); } while (work); } @@ -773,7 +770,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) * It's close enough to not be an issue, fork() has the same delay. */ if (unlikely(!io_wq_can_queue(wqe, acct, work))) { - io_run_cancel(work); + io_run_cancel(work, wqe); return; } @@ -912,7 +909,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - io_run_cancel(work); + io_run_cancel(work, wqe); return IO_WQ_CANCEL_OK; } @@ -987,7 +984,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - io_run_cancel(work); + io_run_cancel(work, wqe); return IO_WQ_CANCEL_OK; } @@ -1064,6 +1061,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) int ret = -ENOMEM, node; struct io_wq *wq; + if (WARN_ON_ONCE(!data->free_work)) + return ERR_PTR(-EINVAL); + wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); @@ -1074,8 +1074,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) return ERR_PTR(-ENOMEM); } - wq->get_work = data->get_work; - wq->put_work = data->put_work; + wq->free_work = data->free_work; /* caller must already hold a reference to this */ wq->user = data->user; @@ -1132,7 +1131,7 @@ err: bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) { - if (data->get_work != wq->get_work || data->put_work != wq->put_work) + if (data->free_work != wq->free_work) return false; return refcount_inc_not_zero(&wq->use_refs); diff --git a/fs/io-wq.h b/fs/io-wq.h index a0978d6958f0..2117b9a4f161 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -81,14 +81,12 @@ struct io_wq_work { *(work) = (struct io_wq_work){ .func = _func }; \ } while (0) \ -typedef void (get_work_fn)(struct io_wq_work *); -typedef void (put_work_fn)(struct io_wq_work *); +typedef void (free_work_fn)(struct io_wq_work *); struct io_wq_data { struct user_struct *user; - get_work_fn *get_work; - put_work_fn *put_work; + free_work_fn *free_work; }; struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); diff --git a/fs/io_uring.c b/fs/io_uring.c index 40ca9e6a5ace..0d6f4b3b8f13 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1558,8 +1558,8 @@ static void io_put_req(struct io_kiocb *req) io_free_req(req); } -static void io_put_req_async_completion(struct io_kiocb *req, - struct io_wq_work **workptr) +static void io_steal_work(struct io_kiocb *req, + struct io_wq_work **workptr) { /* * It's in an io-wq worker, so there always should be at least @@ -1569,7 +1569,6 @@ static void io_put_req_async_completion(struct io_kiocb *req, * It also means, that if the counter dropped to 1, then there is * no asynchronous users left, so it's safe to steal the next work. */ - refcount_dec(&req->refs); if (refcount_read(&req->refs) == 1) { struct io_kiocb *nxt = NULL; @@ -2578,7 +2577,7 @@ static bool io_req_cancelled(struct io_kiocb *req) if (req->work.flags & IO_WQ_WORK_CANCEL) { req_set_fail_links(req); io_cqring_add_event(req, -ECANCELED); - io_double_put_req(req); + io_put_req(req); return true; } @@ -2606,7 +2605,7 @@ static void io_fsync_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_fsync(req); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_fsync(struct io_kiocb *req, bool force_nonblock) @@ -2639,7 +2638,7 @@ static void io_fallocate_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_fallocate(req); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_fallocate_prep(struct io_kiocb *req, @@ -3006,7 +3005,7 @@ static void io_close_finish(struct io_wq_work **workptr) /* not cancellable, don't do io_req_cancelled() */ __io_close_finish(req); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_close(struct io_kiocb *req, bool force_nonblock) @@ -3452,7 +3451,7 @@ static void io_accept_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_accept(req, false); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } #endif @@ -4719,7 +4718,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr) io_put_req(req); } - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_req_needs_file(struct io_kiocb *req, int fd) @@ -6105,21 +6104,14 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg, return __io_sqe_files_update(ctx, &up, nr_args); } -static void io_put_work(struct io_wq_work *work) +static void io_free_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); - /* Consider that io_put_req_async_completion() relies on this ref */ + /* Consider that io_steal_work() relies on this ref */ io_put_req(req); } -static void io_get_work(struct io_wq_work *work) -{ - struct io_kiocb *req = container_of(work, struct io_kiocb, work); - - refcount_inc(&req->refs); -} - static int io_init_wq_offload(struct io_ring_ctx *ctx, struct io_uring_params *p) { @@ -6130,8 +6122,7 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx, int ret = 0; data.user = ctx->user; - data.get_work = io_get_work; - data.put_work = io_put_work; + data.free_work = io_free_work; if (!(p->flags & IORING_SETUP_ATTACH_WQ)) { /* Do QD, or 4 * CPUS, whatever is smallest */ -- cgit From 5a2e745d4d430c4dbeeeb448c3d5c0c3109e511e Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Sun, 23 Feb 2020 16:23:11 -0700 Subject: io_uring: buffer registration infrastructure This just prepares the ring for having lists of buffers associated with it, that the application can provide for SQEs to consume instead of providing their own. The buffers are organized by group ID. Signed-off-by: Jens Axboe --- fs/io_uring.c | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/fs/io_uring.c b/fs/io_uring.c index 0d6f4b3b8f13..1f3ae208f6a6 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -195,6 +195,13 @@ struct fixed_file_data { struct completion done; }; +struct io_buffer { + struct list_head list; + __u64 addr; + __s32 len; + __u16 bid; +}; + struct io_ring_ctx { struct { struct percpu_ref refs; @@ -272,6 +279,8 @@ struct io_ring_ctx { struct socket *ring_sock; #endif + struct idr io_buffer_idr; + struct idr personality_idr; struct { @@ -875,6 +884,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->cq_overflow_list); init_completion(&ctx->completions[0]); init_completion(&ctx->completions[1]); + idr_init(&ctx->io_buffer_idr); idr_init(&ctx->personality_idr); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->wait); @@ -6524,6 +6534,30 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx) return -ENXIO; } +static int __io_destroy_buffers(int id, void *p, void *data) +{ + struct io_ring_ctx *ctx = data; + struct io_buffer *buf = p; + + /* the head kbuf is the list itself */ + while (!list_empty(&buf->list)) { + struct io_buffer *nxt; + + nxt = list_first_entry(&buf->list, struct io_buffer, list); + list_del(&nxt->list); + kfree(nxt); + } + kfree(buf); + idr_remove(&ctx->io_buffer_idr, id); + return 0; +} + +static void io_destroy_buffers(struct io_ring_ctx *ctx) +{ + idr_for_each(&ctx->io_buffer_idr, __io_destroy_buffers, ctx); + idr_destroy(&ctx->io_buffer_idr); +} + static void io_ring_ctx_free(struct io_ring_ctx *ctx) { io_finish_async(ctx); @@ -6534,6 +6568,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) io_sqe_buffer_unregister(ctx); io_sqe_files_unregister(ctx); io_eventfd_unregister(ctx); + io_destroy_buffers(ctx); idr_destroy(&ctx->personality_idr); #if defined(CONFIG_UNIX) -- cgit From ddf0322db79c5984dc1a1db890f946dd19b7d6d9 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Sun, 23 Feb 2020 16:41:33 -0700 Subject: io_uring: add IORING_OP_PROVIDE_BUFFERS IORING_OP_PROVIDE_BUFFERS uses the buffer registration infrastructure to support passing in an addr/len that is associated with a buffer ID and buffer group ID. The group ID is used to index and lookup the buffers, while the buffer ID can be used to notify the application which buffer in the group was used. The addr passed in is the starting buffer address, and length is each buffer length. A number of buffers to add with can be specified, in which case addr is incremented by length for each addition, and each buffer increments the buffer ID specified. No validation is done of the buffer ID. If the application provides buffers within the same group with identical buffer IDs, then it'll have a hard time telling which buffer ID was used. The only restriction is that the buffer ID can be a max of 16-bits in size, so USHRT_MAX is the maximum ID that can be used. Signed-off-by: Jens Axboe --- fs/io_uring.c | 138 +++++++++++++++++++++++++++++++++++++++++- include/uapi/linux/io_uring.h | 10 ++- 2 files changed, 145 insertions(+), 3 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 1f3ae208f6a6..1a58f2042815 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -447,6 +447,15 @@ struct io_splice { unsigned int flags; }; +struct io_provide_buf { + struct file *file; + __u64 addr; + __s32 len; + __u32 bgid; + __u16 nbufs; + __u16 bid; +}; + struct io_async_connect { struct sockaddr_storage address; }; @@ -572,6 +581,7 @@ struct io_kiocb { struct io_madvise madvise; struct io_epoll epoll; struct io_splice splice; + struct io_provide_buf pbuf; }; struct io_async_ctx *io; @@ -799,7 +809,8 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .hash_reg_file = 1, .unbound_nonreg_file = 1, - } + }, + [IORING_OP_PROVIDE_BUFFERS] = {}, }; static void io_wq_submit_work(struct io_wq_work **workptr); @@ -2785,6 +2796,120 @@ static int io_openat(struct io_kiocb *req, bool force_nonblock) return io_openat2(req, force_nonblock); } +static int io_provide_buffers_prep(struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + struct io_provide_buf *p = &req->pbuf; + u64 tmp; + + if (sqe->ioprio || sqe->rw_flags) + return -EINVAL; + + tmp = READ_ONCE(sqe->fd); + if (!tmp || tmp > USHRT_MAX) + return -E2BIG; + p->nbufs = tmp; + p->addr = READ_ONCE(sqe->addr); + p->len = READ_ONCE(sqe->len); + + if (!access_ok(u64_to_user_ptr(p->addr), p->len)) + return -EFAULT; + + p->bgid = READ_ONCE(sqe->buf_group); + tmp = READ_ONCE(sqe->off); + if (tmp > USHRT_MAX) + return -E2BIG; + p->bid = tmp; + return 0; +} + +static int io_add_buffers(struct io_provide_buf *pbuf, struct io_buffer **head) +{ + struct io_buffer *buf; + u64 addr = pbuf->addr; + int i, bid = pbuf->bid; + + for (i = 0; i < pbuf->nbufs; i++) { + buf = kmalloc(sizeof(*buf), GFP_KERNEL); + if (!buf) + break; + + buf->addr = addr; + buf->len = pbuf->len; + buf->bid = bid; + addr += pbuf->len; + bid++; + if (!*head) { + INIT_LIST_HEAD(&buf->list); + *head = buf; + } else { + list_add_tail(&buf->list, &(*head)->list); + } + } + + return i ? i : -ENOMEM; +} + +static void io_ring_submit_unlock(struct io_ring_ctx *ctx, bool needs_lock) +{ + if (needs_lock) + mutex_unlock(&ctx->uring_lock); +} + +static void io_ring_submit_lock(struct io_ring_ctx *ctx, bool needs_lock) +{ + /* + * "Normal" inline submissions always hold the uring_lock, since we + * grab it from the system call. Same is true for the SQPOLL offload. + * The only exception is when we've detached the request and issue it + * from an async worker thread, grab the lock for that case. + */ + if (needs_lock) + mutex_lock(&ctx->uring_lock); +} + +static int io_provide_buffers(struct io_kiocb *req, bool force_nonblock) +{ + struct io_provide_buf *p = &req->pbuf; + struct io_ring_ctx *ctx = req->ctx; + struct io_buffer *head, *list; + int ret = 0; + + io_ring_submit_lock(ctx, !force_nonblock); + + lockdep_assert_held(&ctx->uring_lock); + + list = head = idr_find(&ctx->io_buffer_idr, p->bgid); + + ret = io_add_buffers(p, &head); + if (ret < 0) + goto out; + + if (!list) { + ret = idr_alloc(&ctx->io_buffer_idr, head, p->bgid, p->bgid + 1, + GFP_KERNEL); + if (ret < 0) { + while (!list_empty(&head->list)) { + struct io_buffer *buf; + + buf = list_first_entry(&head->list, + struct io_buffer, list); + list_del(&buf->list); + kfree(buf); + } + kfree(head); + goto out; + } + } +out: + io_ring_submit_unlock(ctx, !force_nonblock); + if (ret < 0) + req_set_fail_links(req); + io_cqring_add_event(req, ret); + io_put_req(req); + return 0; +} + static int io_epoll_ctl_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { @@ -4392,6 +4517,9 @@ static int io_req_defer_prep(struct io_kiocb *req, case IORING_OP_SPLICE: ret = io_splice_prep(req, sqe); break; + case IORING_OP_PROVIDE_BUFFERS: + ret = io_provide_buffers_prep(req, sqe); + break; default: printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n", req->opcode); @@ -4669,6 +4797,14 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, } ret = io_splice(req, force_nonblock); break; + case IORING_OP_PROVIDE_BUFFERS: + if (sqe) { + ret = io_provide_buffers_prep(req, sqe); + if (ret) + break; + } + ret = io_provide_buffers(req, force_nonblock); + break; default: ret = -EINVAL; break; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 53b36311cdac..bc34a57a660b 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -45,8 +45,13 @@ struct io_uring_sqe { __u64 user_data; /* data to be passed back at completion time */ union { struct { - /* index into fixed buffers, if used */ - __u16 buf_index; + /* pack this to avoid bogus arm OABI complaints */ + union { + /* index into fixed buffers, if used */ + __u16 buf_index; + /* for grouped buffer selection */ + __u16 buf_group; + } __attribute__((packed)); /* personality to use, if used */ __u16 personality; __s32 splice_fd_in; @@ -119,6 +124,7 @@ enum { IORING_OP_OPENAT2, IORING_OP_EPOLL_CTL, IORING_OP_SPLICE, + IORING_OP_PROVIDE_BUFFERS, /* this goes last, obviously */ IORING_OP_LAST, -- cgit From bcda7baaa3f15c7a95db3c024bb046d6e298f76b Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Sun, 23 Feb 2020 16:42:51 -0700 Subject: io_uring: support buffer selection for OP_READ and OP_RECV If a server process has tons of pending socket connections, generally it uses epoll to wait for activity. When the socket is ready for reading (or writing), the task can select a buffer and issue a recv/send on the given fd. Now that we have fast (non-async thread) support, a task can have tons of pending reads or writes pending. But that means they need buffers to back that data, and if the number of connections is high enough, having them preallocated for all possible connections is unfeasible. With IORING_OP_PROVIDE_BUFFERS, an application can register buffers to use for any request. The request then sets IOSQE_BUFFER_SELECT in the sqe, and a given group ID in sqe->buf_group. When the fd becomes ready, a free buffer from the specified group is selected. If none are available, the request is terminated with -ENOBUFS. If successful, the CQE on completion will contain the buffer ID chosen in the cqe->flags member, encoded as: (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER; Once a buffer has been consumed by a request, it is no longer available and must be registered again with IORING_OP_PROVIDE_BUFFERS. Requests need to support this feature. For now, IORING_OP_READ and IORING_OP_RECV support it. This is checked on SQE submission, a CQE with res == -EOPNOTSUPP will be posted if attempted on unsupported requests. Signed-off-by: Jens Axboe --- fs/io_uring.c | 224 ++++++++++++++++++++++++++++++++++-------- include/uapi/linux/io_uring.h | 14 +++ 2 files changed, 199 insertions(+), 39 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 1a58f2042815..a80b5c189c14 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -395,7 +395,9 @@ struct io_sr_msg { void __user *buf; }; int msg_flags; + int bgid; size_t len; + struct io_buffer *kbuf; }; struct io_open { @@ -490,6 +492,7 @@ enum { REQ_F_LINK_BIT = IOSQE_IO_LINK_BIT, REQ_F_HARDLINK_BIT = IOSQE_IO_HARDLINK_BIT, REQ_F_FORCE_ASYNC_BIT = IOSQE_ASYNC_BIT, + REQ_F_BUFFER_SELECT_BIT = IOSQE_BUFFER_SELECT_BIT, REQ_F_LINK_NEXT_BIT, REQ_F_FAIL_LINK_BIT, @@ -506,6 +509,7 @@ enum { REQ_F_NEED_CLEANUP_BIT, REQ_F_OVERFLOW_BIT, REQ_F_POLLED_BIT, + REQ_F_BUFFER_SELECTED_BIT, }; enum { @@ -519,6 +523,8 @@ enum { REQ_F_HARDLINK = BIT(REQ_F_HARDLINK_BIT), /* IOSQE_ASYNC */ REQ_F_FORCE_ASYNC = BIT(REQ_F_FORCE_ASYNC_BIT), + /* IOSQE_BUFFER_SELECT */ + REQ_F_BUFFER_SELECT = BIT(REQ_F_BUFFER_SELECT_BIT), /* already grabbed next link */ REQ_F_LINK_NEXT = BIT(REQ_F_LINK_NEXT_BIT), @@ -550,6 +556,8 @@ enum { REQ_F_OVERFLOW = BIT(REQ_F_OVERFLOW_BIT), /* already went through poll handler */ REQ_F_POLLED = BIT(REQ_F_POLLED_BIT), + /* buffer already selected */ + REQ_F_BUFFER_SELECTED = BIT(REQ_F_BUFFER_SELECTED_BIT), }; struct async_poll { @@ -612,6 +620,7 @@ struct io_kiocb { struct callback_head task_work; struct hlist_node hash_node; struct async_poll *apoll; + int cflags; }; struct io_wq_work work; }; @@ -661,6 +670,8 @@ struct io_op_def { /* set if opcode supports polled "wait" */ unsigned pollin : 1; unsigned pollout : 1; + /* op supports buffer selection */ + unsigned buffer_select : 1; }; static const struct io_op_def io_op_defs[] = { @@ -770,6 +781,7 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .unbound_nonreg_file = 1, .pollin = 1, + .buffer_select = 1, }, [IORING_OP_WRITE] = { .needs_mm = 1, @@ -794,6 +806,7 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .unbound_nonreg_file = 1, .pollin = 1, + .buffer_select = 1, }, [IORING_OP_OPENAT2] = { .needs_file = 1, @@ -1170,7 +1183,7 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) if (cqe) { WRITE_ONCE(cqe->user_data, req->user_data); WRITE_ONCE(cqe->res, req->result); - WRITE_ONCE(cqe->flags, 0); + WRITE_ONCE(cqe->flags, req->cflags); } else { WRITE_ONCE(ctx->rings->cq_overflow, atomic_inc_return(&ctx->cached_cq_overflow)); @@ -1194,7 +1207,7 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) return cqe != NULL; } -static void io_cqring_fill_event(struct io_kiocb *req, long res) +static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags) { struct io_ring_ctx *ctx = req->ctx; struct io_uring_cqe *cqe; @@ -1210,7 +1223,7 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res) if (likely(cqe)) { WRITE_ONCE(cqe->user_data, req->user_data); WRITE_ONCE(cqe->res, res); - WRITE_ONCE(cqe->flags, 0); + WRITE_ONCE(cqe->flags, cflags); } else if (ctx->cq_overflow_flushed) { WRITE_ONCE(ctx->rings->cq_overflow, atomic_inc_return(&ctx->cached_cq_overflow)); @@ -1222,23 +1235,34 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res) req->flags |= REQ_F_OVERFLOW; refcount_inc(&req->refs); req->result = res; + req->cflags = cflags; list_add_tail(&req->list, &ctx->cq_overflow_list); } } -static void io_cqring_add_event(struct io_kiocb *req, long res) +static void io_cqring_fill_event(struct io_kiocb *req, long res) +{ + __io_cqring_fill_event(req, res, 0); +} + +static void __io_cqring_add_event(struct io_kiocb *req, long res, long cflags) { struct io_ring_ctx *ctx = req->ctx; unsigned long flags; spin_lock_irqsave(&ctx->completion_lock, flags); - io_cqring_fill_event(req, res); + __io_cqring_fill_event(req, res, cflags); io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); io_cqring_ev_posted(ctx); } +static void io_cqring_add_event(struct io_kiocb *req, long res) +{ + __io_cqring_add_event(req, res, 0); +} + static inline bool io_is_fallback_req(struct io_kiocb *req) { return req == (struct io_kiocb *) @@ -1660,6 +1684,18 @@ static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req) return true; } +static int io_put_kbuf(struct io_kiocb *req) +{ + struct io_buffer *kbuf = (struct io_buffer *) req->rw.addr; + int cflags; + + cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT; + cflags |= IORING_CQE_F_BUFFER; + req->rw.addr = 0; + kfree(kbuf); + return cflags; +} + /* * Find and free completed poll iocbs */ @@ -1671,10 +1707,15 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, rb.to_free = rb.need_iter = 0; while (!list_empty(done)) { + int cflags = 0; + req = list_first_entry(done, struct io_kiocb, list); list_del(&req->list); - io_cqring_fill_event(req, req->result); + if (req->flags & REQ_F_BUFFER_SELECTED) + cflags = io_put_kbuf(req); + + __io_cqring_fill_event(req, req->result, cflags); (*nr_events)++; if (refcount_dec_and_test(&req->refs) && @@ -1849,13 +1890,16 @@ static inline void req_set_fail_links(struct io_kiocb *req) static void io_complete_rw_common(struct kiocb *kiocb, long res) { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); + int cflags = 0; if (kiocb->ki_flags & IOCB_WRITE) kiocb_end_write(req); if (res != req->result) req_set_fail_links(req); - io_cqring_add_event(req, res); + if (req->flags & REQ_F_BUFFER_SELECTED) + cflags = io_put_kbuf(req); + __io_cqring_add_event(req, res, cflags); } static void io_complete_rw(struct kiocb *kiocb, long res, long res2) @@ -2033,7 +2077,7 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe, req->rw.addr = READ_ONCE(sqe->addr); req->rw.len = READ_ONCE(sqe->len); - /* we own ->private, reuse it for the buffer index */ + /* we own ->private, reuse it for the buffer index / buffer ID */ req->rw.kiocb.private = (void *) (unsigned long) READ_ONCE(sqe->buf_index); return 0; @@ -2146,8 +2190,61 @@ static ssize_t io_import_fixed(struct io_kiocb *req, int rw, return len; } +static void io_ring_submit_unlock(struct io_ring_ctx *ctx, bool needs_lock) +{ + if (needs_lock) + mutex_unlock(&ctx->uring_lock); +} + +static void io_ring_submit_lock(struct io_ring_ctx *ctx, bool needs_lock) +{ + /* + * "Normal" inline submissions always hold the uring_lock, since we + * grab it from the system call. Same is true for the SQPOLL offload. + * The only exception is when we've detached the request and issue it + * from an async worker thread, grab the lock for that case. + */ + if (needs_lock) + mutex_lock(&ctx->uring_lock); +} + +static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len, + int bgid, struct io_buffer *kbuf, + bool needs_lock) +{ + struct io_buffer *head; + + if (req->flags & REQ_F_BUFFER_SELECTED) + return kbuf; + + io_ring_submit_lock(req->ctx, needs_lock); + + lockdep_assert_held(&req->ctx->uring_lock); + + head = idr_find(&req->ctx->io_buffer_idr, bgid); + if (head) { + if (!list_empty(&head->list)) { + kbuf = list_last_entry(&head->list, struct io_buffer, + list); + list_del(&kbuf->list); + } else { + kbuf = head; + idr_remove(&req->ctx->io_buffer_idr, bgid); + } + if (*len > kbuf->len) + *len = kbuf->len; + } else { + kbuf = ERR_PTR(-ENOBUFS); + } + + io_ring_submit_unlock(req->ctx, needs_lock); + + return kbuf; +} + static ssize_t io_import_iovec(int rw, struct io_kiocb *req, - struct iovec **iovec, struct iov_iter *iter) + struct iovec **iovec, struct iov_iter *iter, + bool needs_lock) { void __user *buf = u64_to_user_ptr(req->rw.addr); size_t sqe_len = req->rw.len; @@ -2159,12 +2256,29 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, return io_import_fixed(req, rw, iter); } - /* buffer index only valid with fixed read/write */ - if (req->rw.kiocb.private) + /* buffer index only valid with fixed read/write, or buffer select */ + if (req->rw.kiocb.private && !(req->flags & REQ_F_BUFFER_SELECT)) return -EINVAL; if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) { ssize_t ret; + + if (req->flags & REQ_F_BUFFER_SELECT) { + struct io_buffer *kbuf = (struct io_buffer *) req->rw.addr; + int bgid; + + bgid = (int) (unsigned long) req->rw.kiocb.private; + kbuf = io_buffer_select(req, &sqe_len, bgid, kbuf, + needs_lock); + if (IS_ERR(kbuf)) { + *iovec = NULL; + return PTR_ERR(kbuf); + } + req->rw.addr = (u64) kbuf; + req->flags |= REQ_F_BUFFER_SELECTED; + buf = u64_to_user_ptr(kbuf->addr); + } + ret = import_single_range(rw, buf, sqe_len, *iovec, iter); *iovec = NULL; return ret < 0 ? ret : sqe_len; @@ -2307,7 +2421,7 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, io = req->io; io->rw.iov = io->rw.fast_iov; req->io = NULL; - ret = io_import_iovec(READ, req, &io->rw.iov, &iter); + ret = io_import_iovec(READ, req, &io->rw.iov, &iter, !force_nonblock); req->io = io; if (ret < 0) return ret; @@ -2324,7 +2438,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock) size_t iov_count; ssize_t io_size, ret; - ret = io_import_iovec(READ, req, &iovec, &iter); + ret = io_import_iovec(READ, req, &iovec, &iter, !force_nonblock); if (ret < 0) return ret; @@ -2396,7 +2510,7 @@ static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, io = req->io; io->rw.iov = io->rw.fast_iov; req->io = NULL; - ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter); + ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter, !force_nonblock); req->io = io; if (ret < 0) return ret; @@ -2413,7 +2527,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock) size_t iov_count; ssize_t ret, io_size; - ret = io_import_iovec(WRITE, req, &iovec, &iter); + ret = io_import_iovec(WRITE, req, &iovec, &iter, !force_nonblock); if (ret < 0) return ret; @@ -2850,24 +2964,6 @@ static int io_add_buffers(struct io_provide_buf *pbuf, struct io_buffer **head) return i ? i : -ENOMEM; } -static void io_ring_submit_unlock(struct io_ring_ctx *ctx, bool needs_lock) -{ - if (needs_lock) - mutex_unlock(&ctx->uring_lock); -} - -static void io_ring_submit_lock(struct io_ring_ctx *ctx, bool needs_lock) -{ - /* - * "Normal" inline submissions always hold the uring_lock, since we - * grab it from the system call. Same is true for the SQPOLL offload. - * The only exception is when we've detached the request and issue it - * from an async worker thread, grab the lock for that case. - */ - if (needs_lock) - mutex_lock(&ctx->uring_lock); -} - static int io_provide_buffers(struct io_kiocb *req, bool force_nonblock) { struct io_provide_buf *p = &req->pbuf; @@ -3390,6 +3486,27 @@ static int io_send(struct io_kiocb *req, bool force_nonblock) #endif } +static struct io_buffer *io_recv_buffer_select(struct io_kiocb *req, + int *cflags, bool needs_lock) +{ + struct io_sr_msg *sr = &req->sr_msg; + struct io_buffer *kbuf; + + if (!(req->flags & REQ_F_BUFFER_SELECT)) + return NULL; + + kbuf = io_buffer_select(req, &sr->len, sr->bgid, sr->kbuf, needs_lock); + if (IS_ERR(kbuf)) + return kbuf; + + sr->kbuf = kbuf; + req->flags |= REQ_F_BUFFER_SELECTED; + + *cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT; + *cflags |= IORING_CQE_F_BUFFER; + return kbuf; +} + static int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { @@ -3401,6 +3518,7 @@ static int io_recvmsg_prep(struct io_kiocb *req, sr->msg_flags = READ_ONCE(sqe->msg_flags); sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr)); sr->len = READ_ONCE(sqe->len); + sr->bgid = READ_ONCE(sqe->buf_group); #ifdef CONFIG_COMPAT if (req->ctx->compat) @@ -3490,8 +3608,9 @@ static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) static int io_recv(struct io_kiocb *req, bool force_nonblock) { #if defined(CONFIG_NET) + struct io_buffer *kbuf = NULL; struct socket *sock; - int ret; + int ret, cflags = 0; if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; @@ -3499,15 +3618,25 @@ static int io_recv(struct io_kiocb *req, bool force_nonblock) sock = sock_from_file(req->file, &ret); if (sock) { struct io_sr_msg *sr = &req->sr_msg; + void __user *buf = sr->buf; struct msghdr msg; struct iovec iov; unsigned flags; - ret = import_single_range(READ, sr->buf, sr->len, &iov, + kbuf = io_recv_buffer_select(req, &cflags, !force_nonblock); + if (IS_ERR(kbuf)) + return PTR_ERR(kbuf); + else if (kbuf) + buf = u64_to_user_ptr(kbuf->addr); + + ret = import_single_range(READ, buf, sr->len, &iov, &msg.msg_iter); - if (ret) + if (ret) { + kfree(kbuf); return ret; + } + req->flags |= REQ_F_NEED_CLEANUP; msg.msg_name = NULL; msg.msg_control = NULL; msg.msg_controllen = 0; @@ -3528,7 +3657,9 @@ static int io_recv(struct io_kiocb *req, bool force_nonblock) ret = -EINTR; } - io_cqring_add_event(req, ret); + kfree(kbuf); + req->flags &= ~REQ_F_NEED_CLEANUP; + __io_cqring_add_event(req, ret, cflags); if (ret < 0) req_set_fail_links(req); io_put_req(req); @@ -4566,6 +4697,9 @@ static void io_cleanup_req(struct io_kiocb *req) case IORING_OP_READV: case IORING_OP_READ_FIXED: case IORING_OP_READ: + if (req->flags & REQ_F_BUFFER_SELECTED) + kfree((void *)(unsigned long)req->rw.addr); + /* fallthrough */ case IORING_OP_WRITEV: case IORING_OP_WRITE_FIXED: case IORING_OP_WRITE: @@ -4577,6 +4711,10 @@ static void io_cleanup_req(struct io_kiocb *req) if (io->msg.iov != io->msg.fast_iov) kfree(io->msg.iov); break; + case IORING_OP_RECV: + if (req->flags & REQ_F_BUFFER_SELECTED) + kfree(req->sr_msg.kbuf); + break; case IORING_OP_OPENAT: case IORING_OP_OPENAT2: case IORING_OP_STATX: @@ -5154,7 +5292,8 @@ static inline void io_queue_link_head(struct io_kiocb *req) } #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \ - IOSQE_IO_HARDLINK | IOSQE_ASYNC) + IOSQE_IO_HARDLINK | IOSQE_ASYNC | \ + IOSQE_BUFFER_SELECT) static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, struct io_submit_state *state, struct io_kiocb **link) @@ -5171,6 +5310,12 @@ static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, goto err_req; } + if ((sqe_flags & IOSQE_BUFFER_SELECT) && + !io_op_defs[req->opcode].buffer_select) { + ret = -EOPNOTSUPP; + goto err_req; + } + id = READ_ONCE(sqe->personality); if (id) { req->work.creds = idr_find(&ctx->personality_idr, id); @@ -5183,7 +5328,8 @@ static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, /* same numerical values with corresponding REQ_F_*, safe to copy */ req->flags |= sqe_flags & (IOSQE_IO_DRAIN | IOSQE_IO_HARDLINK | - IOSQE_ASYNC | IOSQE_FIXED_FILE); + IOSQE_ASYNC | IOSQE_FIXED_FILE | + IOSQE_BUFFER_SELECT); ret = io_req_set_file(state, req, sqe); if (unlikely(ret)) { diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index bc34a57a660b..9b263d9b24e6 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -66,6 +66,7 @@ enum { IOSQE_IO_LINK_BIT, IOSQE_IO_HARDLINK_BIT, IOSQE_ASYNC_BIT, + IOSQE_BUFFER_SELECT_BIT, }; /* @@ -81,6 +82,8 @@ enum { #define IOSQE_IO_HARDLINK (1U << IOSQE_IO_HARDLINK_BIT) /* always go async */ #define IOSQE_ASYNC (1U << IOSQE_ASYNC_BIT) +/* select buffer from sqe->buf_group */ +#define IOSQE_BUFFER_SELECT (1U << IOSQE_BUFFER_SELECT_BIT) /* * io_uring_setup() flags @@ -155,6 +158,17 @@ struct io_uring_cqe { __u32 flags; }; +/* + * cqe->flags + * + * IORING_CQE_F_BUFFER If set, the upper 16 bits are the buffer ID + */ +#define IORING_CQE_F_BUFFER (1U << 0) + +enum { + IORING_CQE_BUFFER_SHIFT = 16, +}; + /* * Magic offsets for the application to mmap the data it needs */ -- cgit From 4d954c258a0c365a85a2d1b1cccf63aec38fca4c Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 27 Feb 2020 07:31:19 -0700 Subject: io_uring: add IOSQE_BUFFER_SELECT support for IORING_OP_READV This adds support for the vectored read. This is limited to supporting just 1 segment in the iov, and is provided just for convenience for applications that use IORING_OP_READV already. The iov helpers will be used for IORING_OP_RECVMSG as well. Signed-off-by: Jens Axboe --- fs/io_uring.c | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 97 insertions(+), 14 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index a80b5c189c14..7c855a038a1b 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -682,6 +682,7 @@ static const struct io_op_def io_op_defs[] = { .needs_file = 1, .unbound_nonreg_file = 1, .pollin = 1, + .buffer_select = 1, }, [IORING_OP_WRITEV] = { .async_ctx = 1, @@ -1686,9 +1687,10 @@ static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req) static int io_put_kbuf(struct io_kiocb *req) { - struct io_buffer *kbuf = (struct io_buffer *) req->rw.addr; + struct io_buffer *kbuf; int cflags; + kbuf = (struct io_buffer *) (unsigned long) req->rw.addr; cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT; cflags |= IORING_CQE_F_BUFFER; req->rw.addr = 0; @@ -2242,12 +2244,95 @@ static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len, return kbuf; } +static void __user *io_rw_buffer_select(struct io_kiocb *req, size_t *len, + bool needs_lock) +{ + struct io_buffer *kbuf; + int bgid; + + kbuf = (struct io_buffer *) (unsigned long) req->rw.addr; + bgid = (int) (unsigned long) req->rw.kiocb.private; + kbuf = io_buffer_select(req, len, bgid, kbuf, needs_lock); + if (IS_ERR(kbuf)) + return kbuf; + req->rw.addr = (u64) (unsigned long) kbuf; + req->flags |= REQ_F_BUFFER_SELECTED; + return u64_to_user_ptr(kbuf->addr); +} + +#ifdef CONFIG_COMPAT +static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov, + bool needs_lock) +{ + struct compat_iovec __user *uiov; + compat_ssize_t clen; + void __user *buf; + ssize_t len; + + uiov = u64_to_user_ptr(req->rw.addr); + if (!access_ok(uiov, sizeof(*uiov))) + return -EFAULT; + if (__get_user(clen, &uiov->iov_len)) + return -EFAULT; + if (clen < 0) + return -EINVAL; + + len = clen; + buf = io_rw_buffer_select(req, &len, needs_lock); + if (IS_ERR(buf)) + return PTR_ERR(buf); + iov[0].iov_base = buf; + iov[0].iov_len = (compat_size_t) len; + return 0; +} +#endif + +static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov, + bool needs_lock) +{ + struct iovec __user *uiov = u64_to_user_ptr(req->rw.addr); + void __user *buf; + ssize_t len; + + if (copy_from_user(iov, uiov, sizeof(*uiov))) + return -EFAULT; + + len = iov[0].iov_len; + if (len < 0) + return -EINVAL; + buf = io_rw_buffer_select(req, &len, needs_lock); + if (IS_ERR(buf)) + return PTR_ERR(buf); + iov[0].iov_base = buf; + iov[0].iov_len = len; + return 0; +} + +static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov, + bool needs_lock) +{ + if (req->flags & REQ_F_BUFFER_SELECTED) + return 0; + if (!req->rw.len) + return 0; + else if (req->rw.len > 1) + return -EINVAL; + +#ifdef CONFIG_COMPAT + if (req->ctx->compat) + return io_compat_import(req, iov, needs_lock); +#endif + + return __io_iov_buffer_select(req, iov, needs_lock); +} + static ssize_t io_import_iovec(int rw, struct io_kiocb *req, struct iovec **iovec, struct iov_iter *iter, bool needs_lock) { void __user *buf = u64_to_user_ptr(req->rw.addr); size_t sqe_len = req->rw.len; + ssize_t ret; u8 opcode; opcode = req->opcode; @@ -2261,22 +2346,12 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, return -EINVAL; if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) { - ssize_t ret; - if (req->flags & REQ_F_BUFFER_SELECT) { - struct io_buffer *kbuf = (struct io_buffer *) req->rw.addr; - int bgid; - - bgid = (int) (unsigned long) req->rw.kiocb.private; - kbuf = io_buffer_select(req, &sqe_len, bgid, kbuf, - needs_lock); - if (IS_ERR(kbuf)) { + buf = io_rw_buffer_select(req, &sqe_len, needs_lock); + if (IS_ERR(buf)) { *iovec = NULL; - return PTR_ERR(kbuf); + return PTR_ERR(buf); } - req->rw.addr = (u64) kbuf; - req->flags |= REQ_F_BUFFER_SELECTED; - buf = u64_to_user_ptr(kbuf->addr); } ret = import_single_range(rw, buf, sqe_len, *iovec, iter); @@ -2294,6 +2369,14 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, return iorw->size; } + if (req->flags & REQ_F_BUFFER_SELECT) { + ret = io_iov_buffer_select(req, *iovec, needs_lock); + if (!ret) + iov_iter_init(iter, rw, *iovec, 1, (*iovec)->iov_len); + *iovec = NULL; + return ret; + } + #ifdef CONFIG_COMPAT if (req->ctx->compat) return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV, -- cgit From 0a384abfae66651b28e4bbe16883b1ff046ba3b3 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 27 Feb 2020 08:11:20 -0700 Subject: net: abstract out normal and compat msghdr import This splits it into two parts, one that imports the message, and one that imports the iovec. This allows a caller to only do the first part, and import the iovec manually afterwards. No functional changes in this patch. Acked-by: David Miller Signed-off-by: Jens Axboe --- include/linux/socket.h | 4 ++++ include/net/compat.h | 3 +++ net/compat.c | 30 +++++++++++++++++++++++------- net/socket.c | 25 +++++++++++++++++++++---- 4 files changed, 51 insertions(+), 11 deletions(-) diff --git a/include/linux/socket.h b/include/linux/socket.h index 2d2313403101..fc59ac825561 100644 --- a/include/linux/socket.h +++ b/include/linux/socket.h @@ -391,6 +391,10 @@ extern int recvmsg_copy_msghdr(struct msghdr *msg, struct user_msghdr __user *umsg, unsigned flags, struct sockaddr __user **uaddr, struct iovec **iov); +extern int __copy_msghdr_from_user(struct msghdr *kmsg, + struct user_msghdr __user *umsg, + struct sockaddr __user **save_addr, + struct iovec __user **uiov, size_t *nsegs); /* helpers which do the actual work for syscalls */ extern int __sys_recvfrom(int fd, void __user *ubuf, size_t size, diff --git a/include/net/compat.h b/include/net/compat.h index f277653c7e17..e341260642fe 100644 --- a/include/net/compat.h +++ b/include/net/compat.h @@ -38,6 +38,9 @@ struct compat_cmsghdr { #define compat_mmsghdr mmsghdr #endif /* defined(CONFIG_COMPAT) */ +int __get_compat_msghdr(struct msghdr *kmsg, struct compat_msghdr __user *umsg, + struct sockaddr __user **save_addr, compat_uptr_t *ptr, + compat_size_t *len); int get_compat_msghdr(struct msghdr *, struct compat_msghdr __user *, struct sockaddr __user **, struct iovec **); struct sock_fprog __user *get_compat_bpf_fprog(char __user *optval); diff --git a/net/compat.c b/net/compat.c index 47d99c784947..4bed96e84d9a 100644 --- a/net/compat.c +++ b/net/compat.c @@ -33,10 +33,10 @@ #include #include -int get_compat_msghdr(struct msghdr *kmsg, - struct compat_msghdr __user *umsg, - struct sockaddr __user **save_addr, - struct iovec **iov) +int __get_compat_msghdr(struct msghdr *kmsg, + struct compat_msghdr __user *umsg, + struct sockaddr __user **save_addr, + compat_uptr_t *ptr, compat_size_t *len) { struct compat_msghdr msg; ssize_t err; @@ -79,10 +79,26 @@ int get_compat_msghdr(struct msghdr *kmsg, return -EMSGSIZE; kmsg->msg_iocb = NULL; + *ptr = msg.msg_iov; + *len = msg.msg_iovlen; + return 0; +} + +int get_compat_msghdr(struct msghdr *kmsg, + struct compat_msghdr __user *umsg, + struct sockaddr __user **save_addr, + struct iovec **iov) +{ + compat_uptr_t ptr; + compat_size_t len; + ssize_t err; + + err = __get_compat_msghdr(kmsg, umsg, save_addr, &ptr, &len); + if (err) + return err; - err = compat_import_iovec(save_addr ? READ : WRITE, - compat_ptr(msg.msg_iov), msg.msg_iovlen, - UIO_FASTIOV, iov, &kmsg->msg_iter); + err = compat_import_iovec(save_addr ? READ : WRITE, compat_ptr(ptr), + len, UIO_FASTIOV, iov, &kmsg->msg_iter); return err < 0 ? err : 0; } diff --git a/net/socket.c b/net/socket.c index b79a05de7c6e..70ede74ab24b 100644 --- a/net/socket.c +++ b/net/socket.c @@ -2226,10 +2226,10 @@ struct used_address { unsigned int name_len; }; -static int copy_msghdr_from_user(struct msghdr *kmsg, - struct user_msghdr __user *umsg, - struct sockaddr __user **save_addr, - struct iovec **iov) +int __copy_msghdr_from_user(struct msghdr *kmsg, + struct user_msghdr __user *umsg, + struct sockaddr __user **save_addr, + struct iovec __user **uiov, size_t *nsegs) { struct user_msghdr msg; ssize_t err; @@ -2271,6 +2271,23 @@ static int copy_msghdr_from_user(struct msghdr *kmsg, return -EMSGSIZE; kmsg->msg_iocb = NULL; + *uiov = msg.msg_iov; + *nsegs = msg.msg_iovlen; + return 0; +} + +static int copy_msghdr_from_user(struct msghdr *kmsg, + struct user_msghdr __user *umsg, + struct sockaddr __user **save_addr, + struct iovec **iov) +{ + struct user_msghdr msg; + ssize_t err; + + err = __copy_msghdr_from_user(kmsg, umsg, save_addr, &msg.msg_iov, + &msg.msg_iovlen); + if (err) + return err; err = import_iovec(save_addr ? READ : WRITE, msg.msg_iov, msg.msg_iovlen, -- cgit From 52de1fe122408d7a62b6cff9ed3895ebb882d71f Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 27 Feb 2020 10:15:42 -0700 Subject: io_uring: add IOSQE_BUFFER_SELECT support for IORING_OP_RECVMSG Like IORING_OP_READV, this is limited to supporting just a single segment in the iovec passed in. Signed-off-by: Jens Axboe --- fs/io_uring.c | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 106 insertions(+), 12 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 7c855a038a1b..455d53fd840f 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -729,6 +730,7 @@ static const struct io_op_def io_op_defs[] = { .unbound_nonreg_file = 1, .needs_fs = 1, .pollin = 1, + .buffer_select = 1, }, [IORING_OP_TIMEOUT] = { .async_ctx = 1, @@ -3569,6 +3571,92 @@ static int io_send(struct io_kiocb *req, bool force_nonblock) #endif } +static int __io_recvmsg_copy_hdr(struct io_kiocb *req, struct io_async_ctx *io) +{ + struct io_sr_msg *sr = &req->sr_msg; + struct iovec __user *uiov; + size_t iov_len; + int ret; + + ret = __copy_msghdr_from_user(&io->msg.msg, sr->msg, &io->msg.uaddr, + &uiov, &iov_len); + if (ret) + return ret; + + if (req->flags & REQ_F_BUFFER_SELECT) { + if (iov_len > 1) + return -EINVAL; + if (copy_from_user(io->msg.iov, uiov, sizeof(*uiov))) + return -EFAULT; + sr->len = io->msg.iov[0].iov_len; + iov_iter_init(&io->msg.msg.msg_iter, READ, io->msg.iov, 1, + sr->len); + io->msg.iov = NULL; + } else { + ret = import_iovec(READ, uiov, iov_len, UIO_FASTIOV, + &io->msg.iov, &io->msg.msg.msg_iter); + if (ret > 0) + ret = 0; + } + + return ret; +} + +#ifdef CONFIG_COMPAT +static int __io_compat_recvmsg_copy_hdr(struct io_kiocb *req, + struct io_async_ctx *io) +{ + struct compat_msghdr __user *msg_compat; + struct io_sr_msg *sr = &req->sr_msg; + struct compat_iovec __user *uiov; + compat_uptr_t ptr; + compat_size_t len; + int ret; + + msg_compat = (struct compat_msghdr __user *) sr->msg; + ret = __get_compat_msghdr(&io->msg.msg, msg_compat, &io->msg.uaddr, + &ptr, &len); + if (ret) + return ret; + + uiov = compat_ptr(ptr); + if (req->flags & REQ_F_BUFFER_SELECT) { + compat_ssize_t clen; + + if (len > 1) + return -EINVAL; + if (!access_ok(uiov, sizeof(*uiov))) + return -EFAULT; + if (__get_user(clen, &uiov->iov_len)) + return -EFAULT; + if (clen < 0) + return -EINVAL; + sr->len = io->msg.iov[0].iov_len; + io->msg.iov = NULL; + } else { + ret = compat_import_iovec(READ, uiov, len, UIO_FASTIOV, + &io->msg.iov, + &io->msg.msg.msg_iter); + if (ret < 0) + return ret; + } + + return 0; +} +#endif + +static int io_recvmsg_copy_hdr(struct io_kiocb *req, struct io_async_ctx *io) +{ + io->msg.iov = io->msg.fast_iov; + +#ifdef CONFIG_COMPAT + if (req->ctx->compat) + return __io_compat_recvmsg_copy_hdr(req, io); +#endif + + return __io_recvmsg_copy_hdr(req, io); +} + static struct io_buffer *io_recv_buffer_select(struct io_kiocb *req, int *cflags, bool needs_lock) { @@ -3614,9 +3702,7 @@ static int io_recvmsg_prep(struct io_kiocb *req, if (req->flags & REQ_F_NEED_CLEANUP) return 0; - io->msg.iov = io->msg.fast_iov; - ret = recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags, - &io->msg.uaddr, &io->msg.iov); + ret = io_recvmsg_copy_hdr(req, io); if (!ret) req->flags |= REQ_F_NEED_CLEANUP; return ret; @@ -3630,13 +3716,14 @@ static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) #if defined(CONFIG_NET) struct io_async_msghdr *kmsg = NULL; struct socket *sock; - int ret; + int ret, cflags = 0; if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; sock = sock_from_file(req->file, &ret); if (sock) { + struct io_buffer *kbuf; struct io_async_ctx io; unsigned flags; @@ -3648,19 +3735,23 @@ static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) kmsg->iov = kmsg->fast_iov; kmsg->msg.msg_iter.iov = kmsg->iov; } else { - struct io_sr_msg *sr = &req->sr_msg; - kmsg = &io.msg; kmsg->msg.msg_name = &io.msg.addr; - io.msg.iov = io.msg.fast_iov; - ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg, - sr->msg_flags, &io.msg.uaddr, - &io.msg.iov); + ret = io_recvmsg_copy_hdr(req, &io); if (ret) return ret; } + kbuf = io_recv_buffer_select(req, &cflags, !force_nonblock); + if (IS_ERR(kbuf)) { + return PTR_ERR(kbuf); + } else if (kbuf) { + kmsg->fast_iov[0].iov_base = u64_to_user_ptr(kbuf->addr); + iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->iov, + 1, req->sr_msg.len); + } + flags = req->sr_msg.msg_flags; if (flags & MSG_DONTWAIT) req->flags |= REQ_F_NOWAIT; @@ -3678,7 +3769,7 @@ static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) if (kmsg && kmsg->iov != kmsg->fast_iov) kfree(kmsg->iov); req->flags &= ~REQ_F_NEED_CLEANUP; - io_cqring_add_event(req, ret); + __io_cqring_add_event(req, ret, cflags); if (ret < 0) req_set_fail_links(req); io_put_req(req); @@ -4789,8 +4880,11 @@ static void io_cleanup_req(struct io_kiocb *req) if (io->rw.iov != io->rw.fast_iov) kfree(io->rw.iov); break; - case IORING_OP_SENDMSG: case IORING_OP_RECVMSG: + if (req->flags & REQ_F_BUFFER_SELECTED) + kfree(req->sr_msg.kbuf); + /* fallthrough */ + case IORING_OP_SENDMSG: if (io->msg.iov != io->msg.fast_iov) kfree(io->msg.iov); break; -- cgit From 067524e914cb23e20d59480b318fe2625eaee7c8 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 2 Mar 2020 16:32:28 -0700 Subject: io_uring: provide means of removing buffers We have IORING_OP_PROVIDE_BUFFERS, but the only way to remove buffers is to trigger IO on them. The usual case of shrinking a buffer pool would be to just not replenish the buffers when IO completes, and instead just free it. But it may be nice to have a way to manually remove a number of buffers from a given group, and IORING_OP_REMOVE_BUFFERS provides that functionality. Signed-off-by: Jens Axboe --- fs/io_uring.c | 102 ++++++++++++++++++++++++++++++++++-------- include/uapi/linux/io_uring.h | 1 + 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 455d53fd840f..f131105fa12a 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -827,6 +827,7 @@ static const struct io_op_def io_op_defs[] = { .unbound_nonreg_file = 1, }, [IORING_OP_PROVIDE_BUFFERS] = {}, + [IORING_OP_REMOVE_BUFFERS] = {}, }; static void io_wq_submit_work(struct io_wq_work **workptr); @@ -2995,6 +2996,75 @@ static int io_openat(struct io_kiocb *req, bool force_nonblock) return io_openat2(req, force_nonblock); } +static int io_remove_buffers_prep(struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + struct io_provide_buf *p = &req->pbuf; + u64 tmp; + + if (sqe->ioprio || sqe->rw_flags || sqe->addr || sqe->len || sqe->off) + return -EINVAL; + + tmp = READ_ONCE(sqe->fd); + if (!tmp || tmp > USHRT_MAX) + return -EINVAL; + + memset(p, 0, sizeof(*p)); + p->nbufs = tmp; + p->bgid = READ_ONCE(sqe->buf_group); + return 0; +} + +static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf, + int bgid, unsigned nbufs) +{ + unsigned i = 0; + + /* shouldn't happen */ + if (!nbufs) + return 0; + + /* the head kbuf is the list itself */ + while (!list_empty(&buf->list)) { + struct io_buffer *nxt; + + nxt = list_first_entry(&buf->list, struct io_buffer, list); + list_del(&nxt->list); + kfree(nxt); + if (++i == nbufs) + return i; + } + i++; + kfree(buf); + idr_remove(&ctx->io_buffer_idr, bgid); + + return i; +} + +static int io_remove_buffers(struct io_kiocb *req, bool force_nonblock) +{ + struct io_provide_buf *p = &req->pbuf; + struct io_ring_ctx *ctx = req->ctx; + struct io_buffer *head; + int ret = 0; + + io_ring_submit_lock(ctx, !force_nonblock); + + lockdep_assert_held(&ctx->uring_lock); + + ret = -ENOENT; + head = idr_find(&ctx->io_buffer_idr, p->bgid); + if (head) + ret = __io_remove_buffers(ctx, head, p->bgid, p->nbufs); + + io_ring_submit_lock(ctx, !force_nonblock); + if (ret < 0) + req_set_fail_links(req); + io_cqring_add_event(req, ret); + io_put_req(req); + return 0; +} + static int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { @@ -3070,15 +3140,7 @@ static int io_provide_buffers(struct io_kiocb *req, bool force_nonblock) ret = idr_alloc(&ctx->io_buffer_idr, head, p->bgid, p->bgid + 1, GFP_KERNEL); if (ret < 0) { - while (!list_empty(&head->list)) { - struct io_buffer *buf; - - buf = list_first_entry(&head->list, - struct io_buffer, list); - list_del(&buf->list); - kfree(buf); - } - kfree(head); + __io_remove_buffers(ctx, head, p->bgid, -1U); goto out; } } @@ -4825,6 +4887,9 @@ static int io_req_defer_prep(struct io_kiocb *req, case IORING_OP_PROVIDE_BUFFERS: ret = io_provide_buffers_prep(req, sqe); break; + case IORING_OP_REMOVE_BUFFERS: + ret = io_remove_buffers_prep(req, sqe); + break; default: printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n", req->opcode); @@ -5120,6 +5185,14 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, } ret = io_provide_buffers(req, force_nonblock); break; + case IORING_OP_REMOVE_BUFFERS: + if (sqe) { + ret = io_remove_buffers_prep(req, sqe); + if (ret) + break; + } + ret = io_remove_buffers(req, force_nonblock); + break; default: ret = -EINVAL; break; @@ -6998,16 +7071,7 @@ static int __io_destroy_buffers(int id, void *p, void *data) struct io_ring_ctx *ctx = data; struct io_buffer *buf = p; - /* the head kbuf is the list itself */ - while (!list_empty(&buf->list)) { - struct io_buffer *nxt; - - nxt = list_first_entry(&buf->list, struct io_buffer, list); - list_del(&nxt->list); - kfree(nxt); - } - kfree(buf); - idr_remove(&ctx->io_buffer_idr, id); + __io_remove_buffers(ctx, buf, id, -1U); return 0; } diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 9b263d9b24e6..cef4c0c0f26b 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -128,6 +128,7 @@ enum { IORING_OP_EPOLL_CTL, IORING_OP_SPLICE, IORING_OP_PROVIDE_BUFFERS, + IORING_OP_REMOVE_BUFFERS, /* this goes last, obviously */ IORING_OP_LAST, -- cgit From 84557871f2ff332edd445d70349c8724c313c683 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 3 Mar 2020 15:28:17 -0700 Subject: io_uring: add end-of-bits marker and build time verify it Not easy to tell if we're going over the size of bits we can shove in req->flags, so add an end-of-bits marker and a BUILD_BUG_ON() check for it. Signed-off-by: Jens Axboe --- fs/io_uring.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fs/io_uring.c b/fs/io_uring.c index f131105fa12a..4bbad2ed4ae3 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -511,6 +511,9 @@ enum { REQ_F_OVERFLOW_BIT, REQ_F_POLLED_BIT, REQ_F_BUFFER_SELECTED_BIT, + + /* not a real bit, just to check we're not overflowing the space */ + __REQ_F_LAST_BIT, }; enum { @@ -7998,6 +8001,7 @@ static int __init io_uring_init(void) BUILD_BUG_SQE_ELEM(44, __s32, splice_fd_in); BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST); + BUILD_BUG_ON(__REQ_F_LAST_BIT >= 8 * sizeof(int)); req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC); return 0; }; -- cgit From 469956e853ccdba72bb82ad2eea6e8ab6b15791f Mon Sep 17 00:00:00 2001 From: YueHaibing Date: Wed, 4 Mar 2020 15:53:52 +0800 Subject: io_uring: Fix unused function warnings If CONFIG_NET is not set, gcc warns: fs/io_uring.c:3110:12: warning: io_setup_async_msg defined but not used [-Wunused-function] static int io_setup_async_msg(struct io_kiocb *req, ^~~~~~~~~~~~~~~~~~ There are many funcions wraped by CONFIG_NET, move them together to simplify code, also fix this warning. Reported-by: Hulk Robot Signed-off-by: YueHaibing Minor tweaks. Signed-off-by: Jens Axboe --- fs/io_uring.c | 94 +++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 4bbad2ed4ae3..eedcf9aaee3c 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3477,6 +3477,7 @@ static int io_sync_file_range(struct io_kiocb *req, bool force_nonblock) return 0; } +#if defined(CONFIG_NET) static int io_setup_async_msg(struct io_kiocb *req, struct io_async_msghdr *kmsg) { @@ -3494,7 +3495,6 @@ static int io_setup_async_msg(struct io_kiocb *req, static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { -#if defined(CONFIG_NET) struct io_sr_msg *sr = &req->sr_msg; struct io_async_ctx *io = req->io; int ret; @@ -3520,14 +3520,10 @@ static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) if (!ret) req->flags |= REQ_F_NEED_CLEANUP; return ret; -#else - return -EOPNOTSUPP; -#endif } static int io_sendmsg(struct io_kiocb *req, bool force_nonblock) { -#if defined(CONFIG_NET) struct io_async_msghdr *kmsg = NULL; struct socket *sock; int ret; @@ -3581,14 +3577,10 @@ static int io_sendmsg(struct io_kiocb *req, bool force_nonblock) req_set_fail_links(req); io_put_req(req); return 0; -#else - return -EOPNOTSUPP; -#endif } static int io_send(struct io_kiocb *req, bool force_nonblock) { -#if defined(CONFIG_NET) struct socket *sock; int ret; @@ -3631,9 +3623,6 @@ static int io_send(struct io_kiocb *req, bool force_nonblock) req_set_fail_links(req); io_put_req(req); return 0; -#else - return -EOPNOTSUPP; -#endif } static int __io_recvmsg_copy_hdr(struct io_kiocb *req, struct io_async_ctx *io) @@ -3746,7 +3735,6 @@ static struct io_buffer *io_recv_buffer_select(struct io_kiocb *req, static int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { -#if defined(CONFIG_NET) struct io_sr_msg *sr = &req->sr_msg; struct io_async_ctx *io = req->io; int ret; @@ -3771,14 +3759,10 @@ static int io_recvmsg_prep(struct io_kiocb *req, if (!ret) req->flags |= REQ_F_NEED_CLEANUP; return ret; -#else - return -EOPNOTSUPP; -#endif } static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) { -#if defined(CONFIG_NET) struct io_async_msghdr *kmsg = NULL; struct socket *sock; int ret, cflags = 0; @@ -3839,14 +3823,10 @@ static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) req_set_fail_links(req); io_put_req(req); return 0; -#else - return -EOPNOTSUPP; -#endif } static int io_recv(struct io_kiocb *req, bool force_nonblock) { -#if defined(CONFIG_NET) struct io_buffer *kbuf = NULL; struct socket *sock; int ret, cflags = 0; @@ -3903,15 +3883,10 @@ static int io_recv(struct io_kiocb *req, bool force_nonblock) req_set_fail_links(req); io_put_req(req); return 0; -#else - return -EOPNOTSUPP; -#endif } - static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { -#if defined(CONFIG_NET) struct io_accept *accept = &req->accept; if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL))) @@ -3923,12 +3898,8 @@ static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2)); accept->flags = READ_ONCE(sqe->accept_flags); return 0; -#else - return -EOPNOTSUPP; -#endif } -#if defined(CONFIG_NET) static int __io_accept(struct io_kiocb *req, bool force_nonblock) { struct io_accept *accept = &req->accept; @@ -3958,11 +3929,9 @@ static void io_accept_finish(struct io_wq_work **workptr) __io_accept(req, false); io_steal_work(req, workptr); } -#endif static int io_accept(struct io_kiocb *req, bool force_nonblock) { -#if defined(CONFIG_NET) int ret; ret = __io_accept(req, force_nonblock); @@ -3971,14 +3940,10 @@ static int io_accept(struct io_kiocb *req, bool force_nonblock) return -EAGAIN; } return 0; -#else - return -EOPNOTSUPP; -#endif } static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { -#if defined(CONFIG_NET) struct io_connect *conn = &req->connect; struct io_async_ctx *io = req->io; @@ -3995,14 +3960,10 @@ static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return move_addr_to_kernel(conn->addr, conn->addr_len, &io->connect.address); -#else - return -EOPNOTSUPP; -#endif } static int io_connect(struct io_kiocb *req, bool force_nonblock) { -#if defined(CONFIG_NET) struct io_async_ctx __io, *io; unsigned file_flags; int ret; @@ -4040,10 +4001,59 @@ out: io_cqring_add_event(req, ret); io_put_req(req); return 0; -#else +} +#else /* !CONFIG_NET */ +static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + return -EOPNOTSUPP; +} + +static int io_sendmsg(struct io_kiocb *req, bool force_nonblock) +{ + return -EOPNOTSUPP; +} + +static int io_send(struct io_kiocb *req, bool force_nonblock) +{ + return -EOPNOTSUPP; +} + +static int io_recvmsg_prep(struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + return -EOPNOTSUPP; +} + +static int io_recvmsg(struct io_kiocb *req, bool force_nonblock) +{ + return -EOPNOTSUPP; +} + +static int io_recv(struct io_kiocb *req, bool force_nonblock) +{ + return -EOPNOTSUPP; +} + +static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + return -EOPNOTSUPP; +} + +static int io_accept(struct io_kiocb *req, bool force_nonblock) +{ + return -EOPNOTSUPP; +} + +static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + return -EOPNOTSUPP; +} + +static int io_connect(struct io_kiocb *req, bool force_nonblock) +{ return -EOPNOTSUPP; -#endif } +#endif /* CONFIG_NET */ struct io_poll_table { struct poll_table_struct pt; -- cgit From 32b2244a840a90ea94ba42392de5c48d53f521f5 Mon Sep 17 00:00:00 2001 From: Xiaoguang Wang Date: Wed, 11 Mar 2020 09:26:09 +0800 Subject: io_uring: io_uring_enter(2) don't poll while SETUP_IOPOLL|SETUP_SQPOLL enabled When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, applications don't need to do io completion events polling again, they can rely on io_sq_thread to do polling work, which can reduce cpu usage and uring_lock contention. I modify fio io_uring engine codes a bit to evaluate the performance: static int fio_ioring_getevents(struct thread_data *td, unsigned int min, continue; } - if (!o->sqpoll_thread) { + if (o->sqpoll_thread && o->hipri) { r = io_uring_enter(ld, 0, actual_min, IORING_ENTER_GETEVENTS); if (r < 0) { and use "fio -name=fiotest -filename=/dev/nvme0n1 -iodepth=$depth -thread -rw=read -ioengine=io_uring -hipri=1 -sqthread_poll=1 -direct=1 -bs=4k -size=10G -numjobs=1 -time_based -runtime=120" original codes -------------------------------------------------------------------- iodepth | 4 | 8 | 16 | 32 | 64 bw | 1133MB/s | 1519MB/s | 2090MB/s | 2710MB/s | 3012MB/s fio cpu usage | 100% | 100% | 100% | 100% | 100% -------------------------------------------------------------------- with patch -------------------------------------------------------------------- iodepth | 4 | 8 | 16 | 32 | 64 bw | 1196MB/s | 1721MB/s | 2351MB/s | 2977MB/s | 3357MB/s fio cpu usage | 63.8% | 74.4%% | 81.1% | 83.7% | 82.4% -------------------------------------------------------------------- bw improve | 5.5% | 13.2% | 12.3% | 9.8% | 11.5% -------------------------------------------------------------------- From above test results, we can see that bw has above 5.5%~13% improvement, and fio process's cpu usage also drops much. Note this won't improve io_sq_thread's cpu usage when SETUP_IOPOLL|SETUP_SQPOLL are both enabled, in this case, io_sq_thread always has 100% cpu usage. I think this patch will be friendly to applications which will often use io_uring_wait_cqe() or similar from liburing. Signed-off-by: Xiaoguang Wang Signed-off-by: Jens Axboe --- fs/io_uring.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index eedcf9aaee3c..9f1a462eb780 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1732,6 +1732,8 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, } io_commit_cqring(ctx); + if (ctx->flags & IORING_SETUP_SQPOLL) + io_cqring_ev_posted(ctx); io_free_req_many(ctx, &rb); } @@ -7408,7 +7410,14 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, min_complete = min(min_complete, ctx->cq_entries); - if (ctx->flags & IORING_SETUP_IOPOLL) { + /* + * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user + * space applications don't need to do io completion events + * polling again, they can rely on io_sq_thread to do polling + * work, which can reduce cpu usage and uring_lock contention. + */ + if (ctx->flags & IORING_SETUP_IOPOLL && + !(ctx->flags & IORING_SETUP_SQPOLL)) { ret = io_iopoll_check(ctx, &nr_events, min_complete); } else { ret = io_cqring_wait(ctx, min_complete, sig, sigsz); -- cgit From bbbdeb4720a0759ec90e3bcb20ad28d19e531346 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 11 Mar 2020 07:45:46 -0600 Subject: io_uring: dual license io_uring.h uapi header This just syncs the header it with the liburing version, so there's no confusion on the license of the header parts. Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index cef4c0c0f26b..6d9d2b1cc523 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -1,4 +1,4 @@ -/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note OR MIT */ /* * Header file for the io_uring interface. * -- cgit From 3f9d64415fdaa73017fcb168930006648617b488 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 11 Mar 2020 12:27:04 -0600 Subject: io_uring: fix truncated async read/readv and write/writev retry Ensure we keep the truncated value, if we did truncate it. If not, we might read/write more than the registered buffer size. Also for retry, ensure that we return the truncated mapped value for the vectorized versions of the read/write commands. Signed-off-by: Jens Axboe --- fs/io_uring.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 9f1a462eb780..55afae6f0cf4 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2360,6 +2360,7 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, *iovec = NULL; return PTR_ERR(buf); } + req->rw.len = sqe_len; } ret = import_single_range(rw, buf, sqe_len, *iovec, iter); @@ -2379,8 +2380,10 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, if (req->flags & REQ_F_BUFFER_SELECT) { ret = io_iov_buffer_select(req, *iovec, needs_lock); - if (!ret) - iov_iter_init(iter, rw, *iovec, 1, (*iovec)->iov_len); + if (!ret) { + ret = (*iovec)->iov_len; + iov_iter_init(iter, rw, *iovec, 1, ret); + } *iovec = NULL; return ret; } -- cgit From 2293b4195800f88de2c454a24b25874be56d87f3 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Sat, 7 Mar 2020 01:15:39 +0300 Subject: io-wq: remove duplicated cancel code Deduplicate cancellation parts, as many of them looks the same, as do e.g. - io_wqe_cancel_cb_work() and io_wqe_cancel_work() - io_wq_worker_cancel() and io_work_cancel() Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 136 +++++++++++-------------------------------------------------- 1 file changed, 24 insertions(+), 112 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index eda36f997dea..0e7c6277afcb 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -855,14 +855,13 @@ void io_wq_cancel_all(struct io_wq *wq) } struct io_cb_cancel_data { - struct io_wqe *wqe; - work_cancel_fn *cancel; - void *caller_data; + work_cancel_fn *fn; + void *data; }; -static bool io_work_cancel(struct io_worker *worker, void *cancel_data) +static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { - struct io_cb_cancel_data *data = cancel_data; + struct io_cb_cancel_data *match = data; unsigned long flags; bool ret = false; @@ -873,83 +872,7 @@ static bool io_work_cancel(struct io_worker *worker, void *cancel_data) spin_lock_irqsave(&worker->lock, flags); if (worker->cur_work && !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && - data->cancel(worker->cur_work, data->caller_data)) { - send_sig(SIGINT, worker->task, 1); - ret = true; - } - spin_unlock_irqrestore(&worker->lock, flags); - - return ret; -} - -static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, - work_cancel_fn *cancel, - void *cancel_data) -{ - struct io_cb_cancel_data data = { - .wqe = wqe, - .cancel = cancel, - .caller_data = cancel_data, - }; - struct io_wq_work_node *node, *prev; - struct io_wq_work *work; - unsigned long flags; - bool found = false; - - spin_lock_irqsave(&wqe->lock, flags); - wq_list_for_each(node, prev, &wqe->work_list) { - work = container_of(node, struct io_wq_work, list); - - if (cancel(work, cancel_data)) { - wq_node_del(&wqe->work_list, node, prev); - found = true; - break; - } - } - spin_unlock_irqrestore(&wqe->lock, flags); - - if (found) { - io_run_cancel(work, wqe); - return IO_WQ_CANCEL_OK; - } - - rcu_read_lock(); - found = io_wq_for_each_worker(wqe, io_work_cancel, &data); - rcu_read_unlock(); - return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; -} - -enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, - void *data) -{ - enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; - int node; - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - ret = io_wqe_cancel_cb_work(wqe, cancel, data); - if (ret != IO_WQ_CANCEL_NOTFOUND) - break; - } - - return ret; -} - -struct work_match { - bool (*fn)(struct io_wq_work *, void *data); - void *data; -}; - -static bool io_wq_worker_cancel(struct io_worker *worker, void *data) -{ - struct work_match *match = data; - unsigned long flags; - bool ret = false; - - spin_lock_irqsave(&worker->lock, flags); - if (match->fn(worker->cur_work, match->data) && - !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) { + match->fn(worker->cur_work, match->data)) { send_sig(SIGINT, worker->task, 1); ret = true; } @@ -959,7 +882,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) } static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, - struct work_match *match) + struct io_cb_cancel_data *match) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; @@ -1000,22 +923,16 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; } -static bool io_wq_work_match(struct io_wq_work *work, void *data) -{ - return work == data; -} - -enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) +enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, + void *data) { - struct work_match match = { - .fn = io_wq_work_match, - .data = cwork + struct io_cb_cancel_data match = { + .fn = cancel, + .data = data, }; enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; int node; - cwork->flags |= IO_WQ_WORK_CANCEL; - for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; @@ -1027,33 +944,28 @@ enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) return ret; } +static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data) +{ + return work == data; +} + +enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) +{ + return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork); +} + static bool io_wq_pid_match(struct io_wq_work *work, void *data) { pid_t pid = (pid_t) (unsigned long) data; - if (work) - return work->task_pid == pid; - return false; + return work->task_pid == pid; } enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) { - struct work_match match = { - .fn = io_wq_pid_match, - .data = (void *) (unsigned long) pid - }; - enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; - int node; - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; + void *data = (void *) (unsigned long) pid; - ret = io_wqe_cancel_work(wqe, &match); - if (ret != IO_WQ_CANCEL_NOTFOUND) - break; - } - - return ret; + return io_wq_cancel_cb(wq, io_wq_pid_match, data); } struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) -- cgit From d78298e73a3443a3c1766fa89f5370f52a4efd94 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Sat, 14 Mar 2020 00:31:03 +0300 Subject: io-wq: don't resched if there is no work This little tweak restores the behaviour that was before the recent io_worker_handle_work() optimisation patches. It makes the function do cond_resched() and flush_signals() only if there is an actual work to execute. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 0e7c6277afcb..8afe5565f57a 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -458,10 +458,12 @@ static void io_impersonate_work(struct io_worker *worker, static void io_assign_current_work(struct io_worker *worker, struct io_wq_work *work) { - /* flush pending signals before assigning new work */ - if (signal_pending(current)) - flush_signals(current); - cond_resched(); + if (work) { + /* flush pending signals before assigning new work */ + if (signal_pending(current)) + flush_signals(current); + cond_resched(); + } spin_lock_irq(&worker->lock); worker->cur_work = work; -- cgit From 8766dd516c535abf04491dca674d0ef6c95d814f Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Sat, 14 Mar 2020 00:31:04 +0300 Subject: io-wq: split hashing and enqueueing It's a preparation patch removing io_wq_enqueue_hashed(), which now should be done by io_wq_hash_work() + io_wq_enqueue(). Also, set hash value for dependant works, and do it as late as possible, because req->file can be unavailable before. This hash will be ignored by io-wq. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 14 +++++--------- fs/io-wq.h | 7 ++++++- fs/io_uring.c | 24 ++++++++++-------------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 8afe5565f57a..e26ceef53cbd 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -385,7 +385,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) work = container_of(node, struct io_wq_work, list); /* not hashed, can run anytime */ - if (!(work->flags & IO_WQ_WORK_HASHED)) { + if (!io_wq_is_hashed(work)) { wq_node_del(&wqe->work_list, node, prev); return work; } @@ -795,19 +795,15 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) } /* - * Enqueue work, hashed by some key. Work items that hash to the same value - * will not be done in parallel. Used to limit concurrent writes, generally - * hashed by inode. + * Work items that hash to the same value will not be done in parallel. + * Used to limit concurrent writes, generally hashed by inode. */ -void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val) +void io_wq_hash_work(struct io_wq_work *work, void *val) { - struct io_wqe *wqe = wq->wqes[numa_node_id()]; - unsigned bit; - + unsigned int bit; bit = hash_ptr(val, IO_WQ_HASH_ORDER); work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); - io_wqe_enqueue(wqe, work); } static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) diff --git a/fs/io-wq.h b/fs/io-wq.h index 2117b9a4f161..298b21f4a4d2 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -94,7 +94,12 @@ bool io_wq_get(struct io_wq *wq, struct io_wq_data *data); void io_wq_destroy(struct io_wq *wq); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); -void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val); +void io_wq_hash_work(struct io_wq_work *work, void *val); + +static inline bool io_wq_is_hashed(struct io_wq_work *work) +{ + return work->flags & IO_WQ_WORK_HASHED; +} void io_wq_cancel_all(struct io_wq *wq); enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork); diff --git a/fs/io_uring.c b/fs/io_uring.c index 55afae6f0cf4..dfe40bf80adc 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1040,15 +1040,14 @@ static inline void io_req_work_drop_env(struct io_kiocb *req) } } -static inline bool io_prep_async_work(struct io_kiocb *req, +static inline void io_prep_async_work(struct io_kiocb *req, struct io_kiocb **link) { const struct io_op_def *def = &io_op_defs[req->opcode]; - bool do_hashed = false; if (req->flags & REQ_F_ISREG) { if (def->hash_reg_file) - do_hashed = true; + io_wq_hash_work(&req->work, file_inode(req->file)); } else { if (def->unbound_nonreg_file) req->work.flags |= IO_WQ_WORK_UNBOUND; @@ -1057,25 +1056,18 @@ static inline bool io_prep_async_work(struct io_kiocb *req, io_req_work_grab_env(req, def); *link = io_prep_linked_timeout(req); - return do_hashed; } static inline void io_queue_async_work(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *link; - bool do_hashed; - do_hashed = io_prep_async_work(req, &link); + io_prep_async_work(req, &link); - trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work, - req->flags); - if (!do_hashed) { - io_wq_enqueue(ctx->io_wq, &req->work); - } else { - io_wq_enqueue_hashed(ctx->io_wq, &req->work, - file_inode(req->file)); - } + trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, + &req->work, req->flags); + io_wq_enqueue(ctx->io_wq, &req->work); if (link) io_queue_linked_timeout(link); @@ -1582,6 +1574,10 @@ static void io_link_work_cb(struct io_wq_work **workptr) static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) { struct io_kiocb *link; + const struct io_op_def *def = &io_op_defs[nxt->opcode]; + + if ((nxt->flags & REQ_F_ISREG) && def->hash_reg_file) + io_wq_hash_work(&nxt->work, file_inode(nxt->file)); *workptr = &nxt->work; link = io_prep_linked_timeout(nxt); -- cgit From 60cf46ae605446feb0c43c472c0fd1af4cd96231 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Sat, 14 Mar 2020 00:31:05 +0300 Subject: io-wq: hash dependent work Enable io-wq hashing stuff for dependent works simply by re-enqueueing such requests. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index e26ceef53cbd..9541df2729de 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -375,11 +375,17 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) return __io_worker_unuse(wqe, worker); } -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) +static inline unsigned int io_get_work_hash(struct io_wq_work *work) +{ + return work->flags >> IO_WQ_HASH_SHIFT; +} + +static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; + unsigned int hash; wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); @@ -391,9 +397,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) } /* hashed, can run if not already running */ - *hash = work->flags >> IO_WQ_HASH_SHIFT; - if (!(wqe->hash_map & BIT(*hash))) { - wqe->hash_map |= BIT(*hash); + hash = io_get_work_hash(work); + if (!(wqe->hash_map & BIT(hash))) { + wqe->hash_map |= BIT(hash); wq_node_del(&wqe->work_list, node, prev); return work; } @@ -470,15 +476,17 @@ static void io_assign_current_work(struct io_worker *worker, spin_unlock_irq(&worker->lock); } +static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); + static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; - unsigned hash = -1U; do { struct io_wq_work *work; + unsigned int hash; get_next: /* * If we got some work, mark us as busy. If we didn't, but @@ -487,7 +495,7 @@ get_next: * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe, &hash); + work = io_get_next_work(wqe); if (work) __io_worker_busy(wqe, worker, work); else if (!wq_list_empty(&wqe->work_list)) @@ -511,11 +519,16 @@ get_next: work->flags |= IO_WQ_WORK_CANCEL; old_work = work; + hash = io_get_work_hash(work); work->func(&work); work = (old_work == work) ? NULL : work; io_assign_current_work(worker, work); wq->free_work(old_work); + if (work && io_wq_is_hashed(work)) { + io_wqe_enqueue(wqe, work); + work = NULL; + } if (hash != -1U) { spin_lock_irq(&wqe->lock); wqe->hash_map &= ~BIT_ULL(hash); -- cgit From 4ed734b0d0913e566a9d871e15d24eb240f269f7 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 20 Mar 2020 11:23:41 -0600 Subject: io_uring: honor original task RLIMIT_FSIZE With the previous fixes for number of files open checking, I added some debug code to see if we had other spots where we're checking rlimit() against the async io-wq workers. The only one I found was file size checking, which we should also honor. During write and fallocate prep, store the max file size and override that for the current ask if we're in io-wq worker context. Cc: stable@vger.kernel.org # 5.1+ Signed-off-by: Jens Axboe --- fs/io_uring.c | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index dfe40bf80adc..05260ed485ad 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -604,7 +604,10 @@ struct io_kiocb { struct list_head list; unsigned int flags; refcount_t refs; - struct task_struct *task; + union { + struct task_struct *task; + unsigned long fsize; + }; u64 user_data; u32 result; u32 sequence; @@ -2593,6 +2596,8 @@ static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (unlikely(!(req->file->f_mode & FMODE_WRITE))) return -EBADF; + req->fsize = rlimit(RLIMIT_FSIZE); + /* either don't need iovec imported or already have it */ if (!req->io || req->flags & REQ_F_NEED_CLEANUP) return 0; @@ -2662,10 +2667,17 @@ static int io_write(struct io_kiocb *req, bool force_nonblock) } kiocb->ki_flags |= IOCB_WRITE; + if (!force_nonblock) + current->signal->rlim[RLIMIT_FSIZE].rlim_cur = req->fsize; + if (req->file->f_op->write_iter) ret2 = call_write_iter(req->file, kiocb, &iter); else ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter); + + if (!force_nonblock) + current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; + /* * Raw bdev writes will -EOPNOTSUPP for IOCB_NOWAIT. Just * retry them without IOCB_NOWAIT. @@ -2848,8 +2860,10 @@ static void __io_fallocate(struct io_kiocb *req) { int ret; + current->signal->rlim[RLIMIT_FSIZE].rlim_cur = req->fsize; ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off, req->sync.len); + current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); @@ -2875,6 +2889,7 @@ static int io_fallocate_prep(struct io_kiocb *req, req->sync.off = READ_ONCE(sqe->off); req->sync.len = READ_ONCE(sqe->addr); req->sync.mode = READ_ONCE(sqe->len); + req->fsize = rlimit(RLIMIT_FSIZE); return 0; } -- cgit From 9f5834c868e901b00f1bfe4d0052b5906b4a2b7f Mon Sep 17 00:00:00 2001 From: Lukas Bulwahn Date: Sat, 21 Mar 2020 12:19:07 +0100 Subject: io_uring: make spdxcheck.py happy Commit bbbdeb4720a0 ("io_uring: dual license io_uring.h uapi header") uses a nested SPDX-License-Identifier to dual license the header. Since then, ./scripts/spdxcheck.py complains: include/uapi/linux/io_uring.h: 1:60 Missing parentheses: OR Add parentheses to make spdxcheck.py happy. Signed-off-by: Lukas Bulwahn Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 6d9d2b1cc523..e48d746b8e2a 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -1,4 +1,4 @@ -/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note OR MIT */ +/* SPDX-License-Identifier: (GPL-2.0 WITH Linux-syscall-note) OR MIT */ /* * Header file for the io_uring interface. * -- cgit From f2cf11492b8b30d89b2fbf525c9ea5e8c4ccc842 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Sun, 22 Mar 2020 19:14:26 +0300 Subject: io-wq: close cancel gap for hashed linked work After io_assign_current_work() of a linked work, it can be decided to offloaded to another thread so doing io_wqe_enqueue(). However, until next io_assign_current_work() it can be cancelled, that isn't handled. Don't assign it, if it's not going to be executed. Fixes: 60cf46ae6054 ("io-wq: hash dependent work") Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 9541df2729de..b3fb61ec0870 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -485,7 +485,7 @@ static void io_worker_handle_work(struct io_worker *worker) struct io_wq *wq = wqe->wq; do { - struct io_wq_work *work; + struct io_wq_work *work, *assign_work; unsigned int hash; get_next: /* @@ -522,10 +522,14 @@ get_next: hash = io_get_work_hash(work); work->func(&work); work = (old_work == work) ? NULL : work; - io_assign_current_work(worker, work); + + assign_work = work; + if (work && io_wq_is_hashed(work)) + assign_work = NULL; + io_assign_current_work(worker, assign_work); wq->free_work(old_work); - if (work && io_wq_is_hashed(work)) { + if (work && !assign_work) { io_wqe_enqueue(wqe, work); work = NULL; } -- cgit From 18a542ff19ad149fac9e5a36a4012e3cac7b3b3b Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 23 Mar 2020 00:23:29 +0300 Subject: io_uring: Fix ->data corruption on re-enqueue work->data and work->list are shared in union. io_wq_assign_next() sets ->data if a req having a linked_timeout, but then io-wq may want to use work->list, e.g. to do re-enqueue of a request, so corrupting ->data. ->data is not necessary, just remove it and extract linked_timeout through @link_list. Fixes: 60cf46ae6054 ("io-wq: hash dependent work") Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.h | 5 +---- fs/io_uring.c | 9 ++++----- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/fs/io-wq.h b/fs/io-wq.h index 298b21f4a4d2..d2a5684bf673 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -63,10 +63,7 @@ static inline void wq_node_del(struct io_wq_work_list *list, } while (0) struct io_wq_work { - union { - struct io_wq_work_node list; - void *data; - }; + struct io_wq_work_node list; void (*func)(struct io_wq_work **); struct files_struct *files; struct mm_struct *mm; diff --git a/fs/io_uring.c b/fs/io_uring.c index 05260ed485ad..1f61ea9c87fd 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1567,9 +1567,10 @@ static void io_free_req(struct io_kiocb *req) static void io_link_work_cb(struct io_wq_work **workptr) { - struct io_wq_work *work = *workptr; - struct io_kiocb *link = work->data; + struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); + struct io_kiocb *link; + link = list_first_entry(&req->link_list, struct io_kiocb, link_list); io_queue_linked_timeout(link); io_wq_submit_work(workptr); } @@ -1584,10 +1585,8 @@ static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt) *workptr = &nxt->work; link = io_prep_linked_timeout(nxt); - if (link) { + if (link) nxt->work.func = io_link_work_cb; - nxt->work.data = link; - } } /* -- cgit From 4afdb733b1606c6cb86e7833f9335f4870cf7ddd Mon Sep 17 00:00:00 2001 From: Hillf Danton Date: Mon, 23 Mar 2020 17:42:35 +0800 Subject: io-uring: drop completion when removing file A case of task hung was reported by syzbot, INFO: task syz-executor975:9880 blocked for more than 143 seconds. Not tainted 5.6.0-rc6-syzkaller #0 "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message. syz-executor975 D27576 9880 9878 0x80004000 Call Trace: schedule+0xd0/0x2a0 kernel/sched/core.c:4154 schedule_timeout+0x6db/0xba0 kernel/time/timer.c:1871 do_wait_for_common kernel/sched/completion.c:83 [inline] __wait_for_common kernel/sched/completion.c:104 [inline] wait_for_common kernel/sched/completion.c:115 [inline] wait_for_completion+0x26a/0x3c0 kernel/sched/completion.c:136 io_queue_file_removal+0x1af/0x1e0 fs/io_uring.c:5826 __io_sqe_files_update.isra.0+0x3a1/0xb00 fs/io_uring.c:5867 io_sqe_files_update fs/io_uring.c:5918 [inline] __io_uring_register+0x377/0x2c00 fs/io_uring.c:7131 __do_sys_io_uring_register fs/io_uring.c:7202 [inline] __se_sys_io_uring_register fs/io_uring.c:7184 [inline] __x64_sys_io_uring_register+0x192/0x560 fs/io_uring.c:7184 do_syscall_64+0xf6/0x7d0 arch/x86/entry/common.c:294 entry_SYSCALL_64_after_hwframe+0x49/0xbe and bisect pointed to 05f3fb3c5397 ("io_uring: avoid ring quiesce for fixed file set unregister and update"). It is down to the order that we wait for work done before flushing it while nobody is likely going to wake us up. We can drop that completion on stack as flushing work itself is a sync operation we need and no more is left behind it. To that end, io_file_put::done is re-used for indicating if it can be freed in the workqueue worker context. Reported-and-Inspired-by: syzbot Signed-off-by: Hillf Danton Rename ->done to ->free_pfile Signed-off-by: Jens Axboe --- fs/io_uring.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 1f61ea9c87fd..c2dbef1e3272 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -6349,7 +6349,7 @@ static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file) struct io_file_put { struct llist_node llist; struct file *file; - struct completion *done; + bool free_pfile; }; static void io_ring_file_ref_flush(struct fixed_file_data *data) @@ -6360,9 +6360,7 @@ static void io_ring_file_ref_flush(struct fixed_file_data *data) while ((node = llist_del_all(&data->put_llist)) != NULL) { llist_for_each_entry_safe(pfile, tmp, node, llist) { io_ring_file_put(data->ctx, pfile->file); - if (pfile->done) - complete(pfile->done); - else + if (pfile->free_pfile) kfree(pfile); } } @@ -6562,7 +6560,6 @@ static bool io_queue_file_removal(struct fixed_file_data *data, struct file *file) { struct io_file_put *pfile, pfile_stack; - DECLARE_COMPLETION_ONSTACK(done); /* * If we fail allocating the struct we need for doing async reomval @@ -6571,15 +6568,15 @@ static bool io_queue_file_removal(struct fixed_file_data *data, pfile = kzalloc(sizeof(*pfile), GFP_KERNEL); if (!pfile) { pfile = &pfile_stack; - pfile->done = &done; - } + pfile->free_pfile = false; + } else + pfile->free_pfile = true; pfile->file = file; llist_add(&pfile->llist, &data->put_llist); if (pfile == &pfile_stack) { percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch); - wait_for_completion(&done); flush_work(&data->ref_work); return false; } -- cgit From a5318d3cdffbecf075928363d7e4becfeddabfcb Mon Sep 17 00:00:00 2001 From: Hillf Danton Date: Mon, 23 Mar 2020 17:47:15 +0800 Subject: io-uring: drop 'free_pfile' in struct io_file_put Sync removal of file is only used in case of a GFP_KERNEL kmalloc failure at the cost of io_file_put::done and work flush, while a glich like it can be handled at the call site without too much pain. That said, what is proposed is to drop sync removing of file, and the kink in neck as well. Signed-off-by: Hillf Danton Signed-off-by: Jens Axboe --- fs/io_uring.c | 34 ++++++++++------------------------ 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index c2dbef1e3272..635902122c29 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -6349,7 +6349,6 @@ static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file) struct io_file_put { struct llist_node llist; struct file *file; - bool free_pfile; }; static void io_ring_file_ref_flush(struct fixed_file_data *data) @@ -6360,8 +6359,7 @@ static void io_ring_file_ref_flush(struct fixed_file_data *data) while ((node = llist_del_all(&data->put_llist)) != NULL) { llist_for_each_entry_safe(pfile, tmp, node, llist) { io_ring_file_put(data->ctx, pfile->file); - if (pfile->free_pfile) - kfree(pfile); + kfree(pfile); } } } @@ -6556,32 +6554,18 @@ static void io_atomic_switch(struct percpu_ref *ref) percpu_ref_get(&data->refs); } -static bool io_queue_file_removal(struct fixed_file_data *data, +static int io_queue_file_removal(struct fixed_file_data *data, struct file *file) { - struct io_file_put *pfile, pfile_stack; + struct io_file_put *pfile; - /* - * If we fail allocating the struct we need for doing async reomval - * of this file, just punt to sync and wait for it. - */ pfile = kzalloc(sizeof(*pfile), GFP_KERNEL); - if (!pfile) { - pfile = &pfile_stack; - pfile->free_pfile = false; - } else - pfile->free_pfile = true; + if (!pfile) + return -ENOMEM; pfile->file = file; llist_add(&pfile->llist, &data->put_llist); - - if (pfile == &pfile_stack) { - percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch); - flush_work(&data->ref_work); - return false; - } - - return true; + return 0; } static int __io_sqe_files_update(struct io_ring_ctx *ctx, @@ -6616,9 +6600,11 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, index = i & IORING_FILE_TABLE_MASK; if (table->files[index]) { file = io_file_from_index(ctx, index); + err = io_queue_file_removal(data, file); + if (err) + break; table->files[index] = NULL; - if (io_queue_file_removal(data, file)) - ref_switch = true; + ref_switch = true; } if (fd != -1) { file = fget(fd); -- cgit From 86f3cd1b589a10dbdca98c52cc0cd0f56523c9b3 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Mon, 23 Mar 2020 22:57:22 +0300 Subject: io-wq: handle hashed writes in chains We always punt async buffered writes to an io-wq helper, as the core kernel does not have IOCB_NOWAIT support for that. Most buffered async writes complete very quickly, as it's just a copy operation. This means that doing multiple locking roundtrips on the shared wqe lock for each buffered write is wasteful. Additionally, buffered writes are hashed work items, which means that any buffered write to a given file is serialized. Keep identicaly hashed work items contiguously in @wqe->work_list, and track a tail for each hash bucket. On dequeue of a hashed item, splice all of the same hash in one go using the tracked tail. Until the batch is done, the caller doesn't have to synchronize with the wqe or worker locks again. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 68 ++++++++++++++++++++++++++++++++++++++++++++------------------ fs/io-wq.h | 45 +++++++++++++++++++++++++++++++++-------- 2 files changed, 85 insertions(+), 28 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index b3fb61ec0870..cc5cf2209fb0 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -69,6 +69,8 @@ struct io_worker { #define IO_WQ_HASH_ORDER 5 #endif +#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) + struct io_wqe_acct { unsigned nr_workers; unsigned max_workers; @@ -98,6 +100,7 @@ struct io_wqe { struct list_head all_list; struct io_wq *wq; + struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; }; /* @@ -384,7 +387,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; - struct io_wq_work *work; + struct io_wq_work *work, *tail; unsigned int hash; wq_list_for_each(node, prev, &wqe->work_list) { @@ -392,7 +395,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) /* not hashed, can run anytime */ if (!io_wq_is_hashed(work)) { - wq_node_del(&wqe->work_list, node, prev); + wq_list_del(&wqe->work_list, node, prev); return work; } @@ -400,7 +403,10 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) hash = io_get_work_hash(work); if (!(wqe->hash_map & BIT(hash))) { wqe->hash_map |= BIT(hash); - wq_node_del(&wqe->work_list, node, prev); + /* all items with this hash lie in [work, tail] */ + tail = wqe->hash_tail[hash]; + wqe->hash_tail[hash] = NULL; + wq_list_cut(&wqe->work_list, &tail->list, prev); return work; } } @@ -485,7 +491,7 @@ static void io_worker_handle_work(struct io_worker *worker) struct io_wq *wq = wqe->wq; do { - struct io_wq_work *work, *assign_work; + struct io_wq_work *work; unsigned int hash; get_next: /* @@ -508,8 +514,9 @@ get_next: /* handle a whole dependent link */ do { - struct io_wq_work *old_work; + struct io_wq_work *old_work, *next_hashed, *linked; + next_hashed = wq_next_work(work); io_impersonate_work(worker, work); /* * OK to set IO_WQ_WORK_CANCEL even for uncancellable @@ -518,22 +525,23 @@ get_next: if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; - old_work = work; hash = io_get_work_hash(work); - work->func(&work); - work = (old_work == work) ? NULL : work; - - assign_work = work; - if (work && io_wq_is_hashed(work)) - assign_work = NULL; - io_assign_current_work(worker, assign_work); + linked = old_work = work; + linked->func(&linked); + linked = (old_work == linked) ? NULL : linked; + + work = next_hashed; + if (!work && linked && !io_wq_is_hashed(linked)) { + work = linked; + linked = NULL; + } + io_assign_current_work(worker, work); wq->free_work(old_work); - if (work && !assign_work) { - io_wqe_enqueue(wqe, work); - work = NULL; - } - if (hash != -1U) { + if (linked) + io_wqe_enqueue(wqe, linked); + + if (hash != -1U && !next_hashed) { spin_lock_irq(&wqe->lock); wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; @@ -776,6 +784,26 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) } while (work); } +static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) +{ + unsigned int hash; + struct io_wq_work *tail; + + if (!io_wq_is_hashed(work)) { +append: + wq_list_add_tail(&work->list, &wqe->work_list); + return; + } + + hash = io_get_work_hash(work); + tail = wqe->hash_tail[hash]; + wqe->hash_tail[hash] = work; + if (!tail) + goto append; + + wq_list_add_after(&work->list, &tail->list, &wqe->work_list); +} + static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); @@ -795,7 +823,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) work_flags = work->flags; spin_lock_irqsave(&wqe->lock, flags); - wq_list_add_tail(&work->list, &wqe->work_list); + io_wqe_insert_work(wqe, work); wqe->flags &= ~IO_WQE_FLAG_STALLED; spin_unlock_irqrestore(&wqe->lock, flags); @@ -914,7 +942,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, work = container_of(node, struct io_wq_work, list); if (match->fn(work, match->data)) { - wq_node_del(&wqe->work_list, node, prev); + wq_list_del(&wqe->work_list, node, prev); found = true; break; } diff --git a/fs/io-wq.h b/fs/io-wq.h index d2a5684bf673..3ee7356d6be5 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -28,6 +28,18 @@ struct io_wq_work_list { struct io_wq_work_node *last; }; +static inline void wq_list_add_after(struct io_wq_work_node *node, + struct io_wq_work_node *pos, + struct io_wq_work_list *list) +{ + struct io_wq_work_node *next = pos->next; + + pos->next = node; + node->next = next; + if (!next) + list->last = node; +} + static inline void wq_list_add_tail(struct io_wq_work_node *node, struct io_wq_work_list *list) { @@ -40,17 +52,26 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node, } } -static inline void wq_node_del(struct io_wq_work_list *list, - struct io_wq_work_node *node, +static inline void wq_list_cut(struct io_wq_work_list *list, + struct io_wq_work_node *last, struct io_wq_work_node *prev) { - if (node == list->first) - WRITE_ONCE(list->first, node->next); - if (node == list->last) + /* first in the list, if prev==NULL */ + if (!prev) + WRITE_ONCE(list->first, last->next); + else + prev->next = last->next; + + if (last == list->last) list->last = prev; - if (prev) - prev->next = node->next; - node->next = NULL; + last->next = NULL; +} + +static inline void wq_list_del(struct io_wq_work_list *list, + struct io_wq_work_node *node, + struct io_wq_work_node *prev) +{ + wq_list_cut(list, node, prev); } #define wq_list_for_each(pos, prv, head) \ @@ -78,6 +99,14 @@ struct io_wq_work { *(work) = (struct io_wq_work){ .func = _func }; \ } while (0) \ +static inline struct io_wq_work *wq_next_work(struct io_wq_work *work) +{ + if (!work->list.next) + return NULL; + + return container_of(work->list.next, struct io_wq_work, list); +} + typedef void (free_work_fn)(struct io_wq_work *); struct io_wq_data { -- cgit From bff6035d0c40fa1dd195aa41f61814d622883420 Mon Sep 17 00:00:00 2001 From: Chucheng Luo Date: Wed, 25 Mar 2020 11:31:38 +0800 Subject: io_uring: fix missing 'return' in comment The missing 'return' work may make it hard for other developers to understand it. Signed-off-by: Chucheng Luo Signed-off-by: Jens Axboe --- fs/io_uring.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 635902122c29..487e2742a9e8 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2678,7 +2678,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock) current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; /* - * Raw bdev writes will -EOPNOTSUPP for IOCB_NOWAIT. Just + * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just * retry them without IOCB_NOWAIT. */ if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT)) -- cgit From 3d9932a8b240c9019f48358e8a6928c53c2c7f6b Mon Sep 17 00:00:00 2001 From: Xiaoguang Wang Date: Fri, 27 Mar 2020 15:36:52 +0800 Subject: io_uring: cleanup io_alloc_async_ctx() Cleanup io_alloc_async_ctx() a bit, add a new __io_alloc_async_ctx(), so io_setup_async_rw() won't need to check whether async_ctx is true or false again. Reviewed-by: Stefano Garzarella Signed-off-by: Xiaoguang Wang Signed-off-by: Jens Axboe --- fs/io_uring.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 487e2742a9e8..b12d33b12bc7 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2469,12 +2469,18 @@ static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size, } } +static inline int __io_alloc_async_ctx(struct io_kiocb *req) +{ + req->io = kmalloc(sizeof(*req->io), GFP_KERNEL); + return req->io == NULL; +} + static int io_alloc_async_ctx(struct io_kiocb *req) { if (!io_op_defs[req->opcode].async_ctx) return 0; - req->io = kmalloc(sizeof(*req->io), GFP_KERNEL); - return req->io == NULL; + + return __io_alloc_async_ctx(req); } static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, @@ -2484,7 +2490,7 @@ static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, if (!io_op_defs[req->opcode].async_ctx) return 0; if (!req->io) { - if (io_alloc_async_ctx(req)) + if (__io_alloc_async_ctx(req)) return -ENOMEM; io_req_map_rw(req, io_size, iovec, fast_iov, iter); -- cgit