summaryrefslogtreecommitdiff
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r--io_uring/io_uring.c249
1 files changed, 124 insertions, 125 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index cd9a137ad6ce..cf348c33f485 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -59,7 +59,6 @@
#include <linux/bvec.h>
#include <linux/net.h>
#include <net/sock.h>
-#include <net/af_unix.h>
#include <linux/anon_inodes.h>
#include <linux/sched/mm.h>
#include <linux/uaccess.h>
@@ -95,6 +94,7 @@
#include "notif.h"
#include "waitid.h"
#include "futex.h"
+#include "napi.h"
#include "timeout.h"
#include "poll.h"
@@ -122,11 +122,6 @@
#define IO_COMPL_BATCH 32
#define IO_REQ_ALLOC_BATCH 8
-enum {
- IO_CHECK_CQ_OVERFLOW_BIT,
- IO_CHECK_CQ_DROPPED_BIT,
-};
-
struct io_defer_entry {
struct list_head list;
struct io_kiocb *req;
@@ -349,6 +344,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
+ io_napi_init(ctx);
+
return ctx;
err:
kfree(ctx->cancel_table.hbs);
@@ -463,7 +460,6 @@ static void io_prep_async_work(struct io_kiocb *req)
req->work.list.next = NULL;
req->work.flags = 0;
- req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
if (req->flags & REQ_F_FORCE_ASYNC)
req->work.flags |= IO_WQ_WORK_CONCURRENT;
@@ -670,7 +666,6 @@ static void io_cq_unlock_post(struct io_ring_ctx *ctx)
io_commit_cqring_flush(ctx);
}
-/* Returns true if there are no backlogged entries after the flush */
static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{
struct io_overflow_cqe *ocqe;
@@ -949,6 +944,8 @@ bool io_fill_cqe_req_aux(struct io_kiocb *req, bool defer, s32 res, u32 cflags)
u64 user_data = req->cqe.user_data;
struct io_uring_cqe *cqe;
+ lockdep_assert(!io_wq_current_is_worker());
+
if (!defer)
return __io_post_aux_cqe(ctx, user_data, res, cflags, false);
@@ -1025,15 +1022,15 @@ static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
- if (req->ctx->task_complete && req->ctx->submitter_task != current) {
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (ctx->task_complete && ctx->submitter_task != current) {
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req);
} else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
- !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
+ !(ctx->flags & IORING_SETUP_IOPOLL)) {
__io_req_complete_post(req, issue_flags);
} else {
- struct io_ring_ctx *ctx = req->ctx;
-
mutex_lock(&ctx->uring_lock);
__io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
mutex_unlock(&ctx->uring_lock);
@@ -1174,40 +1171,44 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
percpu_ref_put(&ctx->refs);
}
-static unsigned int handle_tw_list(struct llist_node *node,
- struct io_ring_ctx **ctx,
- struct io_tw_state *ts,
- struct llist_node *last)
+/*
+ * Run queued task_work, returning the number of entries processed in *count.
+ * If more entries than max_entries are available, stop processing once this
+ * is reached and return the rest of the list.
+ */
+struct llist_node *io_handle_tw_list(struct llist_node *node,
+ unsigned int *count,
+ unsigned int max_entries)
{
- unsigned int count = 0;
+ struct io_ring_ctx *ctx = NULL;
+ struct io_tw_state ts = { };
- while (node && node != last) {
+ do {
struct llist_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
- prefetch(container_of(next, struct io_kiocb, io_task_work.node));
-
- if (req->ctx != *ctx) {
- ctx_flush_and_put(*ctx, ts);
- *ctx = req->ctx;
+ if (req->ctx != ctx) {
+ ctx_flush_and_put(ctx, &ts);
+ ctx = req->ctx;
/* if not contended, grab and improve batching */
- ts->locked = mutex_trylock(&(*ctx)->uring_lock);
- percpu_ref_get(&(*ctx)->refs);
+ ts.locked = mutex_trylock(&ctx->uring_lock);
+ percpu_ref_get(&ctx->refs);
}
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
- req, ts);
+ req, &ts);
node = next;
- count++;
+ (*count)++;
if (unlikely(need_resched())) {
- ctx_flush_and_put(*ctx, ts);
- *ctx = NULL;
+ ctx_flush_and_put(ctx, &ts);
+ ctx = NULL;
cond_resched();
}
- }
+ } while (node && *count < max_entries);
- return count;
+ ctx_flush_and_put(ctx, &ts);
+ return node;
}
/**
@@ -1224,22 +1225,6 @@ static inline struct llist_node *io_llist_xchg(struct llist_head *head,
return xchg(&head->first, new);
}
-/**
- * io_llist_cmpxchg - possibly swap all entries in a lock-less list
- * @head: the head of lock-less list to delete all entries
- * @old: expected old value of the first entry of the list
- * @new: new entry as the head of the list
- *
- * perform a cmpxchg on the first entry of the list.
- */
-
-static inline struct llist_node *io_llist_cmpxchg(struct llist_head *head,
- struct llist_node *old,
- struct llist_node *new)
-{
- return cmpxchg(&head->first, old, new);
-}
-
static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
{
struct llist_node *node = llist_del_all(&tctx->task_list);
@@ -1268,45 +1253,41 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
}
}
-void tctx_task_work(struct callback_head *cb)
+struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
+ unsigned int max_entries,
+ unsigned int *count)
{
- struct io_tw_state ts = {};
- struct io_ring_ctx *ctx = NULL;
- struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
- task_work);
- struct llist_node fake = {};
struct llist_node *node;
- unsigned int loops = 0;
- unsigned int count = 0;
if (unlikely(current->flags & PF_EXITING)) {
io_fallback_tw(tctx, true);
- return;
+ return NULL;
}
- do {
- loops++;
- node = io_llist_xchg(&tctx->task_list, &fake);
- count += handle_tw_list(node, &ctx, &ts, &fake);
-
- /* skip expensive cmpxchg if there are items in the list */
- if (READ_ONCE(tctx->task_list.first) != &fake)
- continue;
- if (ts.locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
- io_submit_flush_completions(ctx);
- if (READ_ONCE(tctx->task_list.first) != &fake)
- continue;
- }
- node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
- } while (node != &fake);
-
- ctx_flush_and_put(ctx, &ts);
+ node = llist_del_all(&tctx->task_list);
+ if (node) {
+ node = llist_reverse_order(node);
+ node = io_handle_tw_list(node, count, max_entries);
+ }
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
io_uring_drop_tctx_refs(current);
- trace_io_uring_task_work_run(tctx, count, loops);
+ trace_io_uring_task_work_run(tctx, *count);
+ return node;
+}
+
+void tctx_task_work(struct callback_head *cb)
+{
+ struct io_uring_task *tctx;
+ struct llist_node *ret;
+ unsigned int count = 0;
+
+ tctx = container_of(cb, struct io_uring_task, task_work);
+ ret = tctx_task_work_run(tctx, UINT_MAX, &count);
+ /* can't happen */
+ WARN_ON_ONCE(ret);
}
static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
@@ -1389,6 +1370,15 @@ static void io_req_normal_work_add(struct io_kiocb *req)
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+ /* SQPOLL doesn't need the task_work added, it'll run it itself */
+ if (ctx->flags & IORING_SETUP_SQPOLL) {
+ struct io_sq_data *sqd = ctx->sq_data;
+
+ if (wq_has_sleeper(&sqd->wait))
+ wake_up(&sqd->wait);
+ return;
+ }
+
if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
return;
@@ -1420,7 +1410,20 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
}
}
-static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts)
+static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
+ int min_events)
+{
+ if (llist_empty(&ctx->work_llist))
+ return false;
+ if (events < min_events)
+ return true;
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+ return false;
+}
+
+static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
+ int min_events)
{
struct llist_node *node;
unsigned int loops = 0;
@@ -1440,7 +1443,6 @@ again:
struct llist_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
- prefetch(container_of(next, struct io_kiocb, io_task_work.node));
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
req, ts);
@@ -1449,18 +1451,20 @@ again:
}
loops++;
- if (!llist_empty(&ctx->work_llist))
+ if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
if (ts->locked) {
io_submit_flush_completions(ctx);
- if (!llist_empty(&ctx->work_llist))
+ if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
}
+
trace_io_uring_local_work_run(ctx, ret, loops);
return ret;
}
-static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
+static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
+ int min_events)
{
struct io_tw_state ts = { .locked = true, };
int ret;
@@ -1468,20 +1472,20 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
if (llist_empty(&ctx->work_llist))
return 0;
- ret = __io_run_local_work(ctx, &ts);
+ ret = __io_run_local_work(ctx, &ts, min_events);
/* shouldn't happen! */
if (WARN_ON_ONCE(!ts.locked))
mutex_lock(&ctx->uring_lock);
return ret;
}
-static int io_run_local_work(struct io_ring_ctx *ctx)
+static int io_run_local_work(struct io_ring_ctx *ctx, int min_events)
{
struct io_tw_state ts = {};
int ret;
ts.locked = mutex_trylock(&ctx->uring_lock);
- ret = __io_run_local_work(ctx, &ts);
+ ret = __io_run_local_work(ctx, &ts, min_events);
if (ts.locked)
mutex_unlock(&ctx->uring_lock);
@@ -1677,7 +1681,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
io_task_work_pending(ctx)) {
u32 tail = ctx->cached_cq_tail;
- (void) io_run_local_work_locked(ctx);
+ (void) io_run_local_work_locked(ctx, min);
if (task_work_pending(current) ||
wq_list_empty(&ctx->iopoll_list)) {
@@ -1768,9 +1772,9 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
}
}
-unsigned int io_file_get_flags(struct file *file)
+io_req_flags_t io_file_get_flags(struct file *file)
{
- unsigned int res = 0;
+ io_req_flags_t res = 0;
if (S_ISREG(file_inode(file)->i_mode))
res |= REQ_F_ISREG;
@@ -1966,10 +1970,28 @@ fail:
goto fail;
}
+ /*
+ * If DEFER_TASKRUN is set, it's only allowed to post CQEs from the
+ * submitter task context. Final request completions are handed to the
+ * right context, however this is not the case of auxiliary CQEs,
+ * which is the main mean of operation for multishot requests.
+ * Don't allow any multishot execution from io-wq. It's more restrictive
+ * than necessary and also cleaner.
+ */
+ if (req->flags & REQ_F_APOLL_MULTISHOT) {
+ err = -EBADFD;
+ if (!io_file_can_poll(req))
+ goto fail;
+ err = -ECANCELED;
+ if (io_arm_poll_handler(req, issue_flags) != IO_APOLL_OK)
+ goto fail;
+ return;
+ }
+
if (req->flags & REQ_F_FORCE_ASYNC) {
bool opcode_poll = def->pollin || def->pollout;
- if (opcode_poll && file_can_poll(req->file)) {
+ if (opcode_poll && io_file_can_poll(req)) {
needs_poll = true;
issue_flags |= IO_URING_F_NONBLOCK;
}
@@ -2171,7 +2193,8 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
/* req is partially pre-initialised, see io_preinit_req() */
req->opcode = opcode = READ_ONCE(sqe->opcode);
/* same numerical values with corresponding REQ_F_*, safe to copy */
- req->flags = sqe_flags = READ_ONCE(sqe->flags);
+ sqe_flags = READ_ONCE(sqe->flags);
+ req->flags = (io_req_flags_t) sqe_flags;
req->cqe.user_data = READ_ONCE(sqe->user_data);
req->file = NULL;
req->rsrc_node = NULL;
@@ -2475,33 +2498,6 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
return ret;
}
-struct io_wait_queue {
- struct wait_queue_entry wq;
- struct io_ring_ctx *ctx;
- unsigned cq_tail;
- unsigned nr_timeouts;
- ktime_t timeout;
-};
-
-static inline bool io_has_work(struct io_ring_ctx *ctx)
-{
- return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
- !llist_empty(&ctx->work_llist);
-}
-
-static inline bool io_should_wake(struct io_wait_queue *iowq)
-{
- struct io_ring_ctx *ctx = iowq->ctx;
- int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
-
- /*
- * Wake up if we have enough events, or if a timeout occurred since we
- * started waiting. For timeouts, we always want to return to userspace,
- * regardless of event count.
- */
- return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
-}
-
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
int wake_flags, void *key)
{
@@ -2520,7 +2516,7 @@ int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
if (!llist_empty(&ctx->work_llist)) {
__set_current_state(TASK_RUNNING);
- if (io_run_local_work(ctx) > 0)
+ if (io_run_local_work(ctx, INT_MAX) > 0)
return 0;
}
if (io_run_task_work() > 0)
@@ -2588,7 +2584,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
if (!io_allowed_run_tw(ctx))
return -EEXIST;
if (!llist_empty(&ctx->work_llist))
- io_run_local_work(ctx);
+ io_run_local_work(ctx, min_events);
io_run_task_work();
io_cqring_overflow_flush(ctx);
/* if user messes with these they will just get an early return */
@@ -2621,16 +2617,19 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
if (get_timespec64(&ts, uts))
return -EFAULT;
+
iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
+ io_napi_adjust_timeout(ctx, &iowq, &ts);
}
+ io_napi_busy_loop(ctx, &iowq);
+
trace_io_uring_cqring_wait(ctx, min_events);
do {
+ int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
unsigned long check_cq;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
- int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
-
atomic_set(&ctx->cq_wait_nr, nr_wait);
set_current_state(TASK_INTERRUPTIBLE);
} else {
@@ -2649,7 +2648,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
*/
io_run_task_work();
if (!llist_empty(&ctx->work_llist))
- io_run_local_work(ctx);
+ io_run_local_work(ctx, nr_wait);
/*
* Non-local task_work will be run on exit to userspace, but
@@ -2917,6 +2916,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_req_caches_free(ctx);
if (ctx->hash_map)
io_wq_put_hash(ctx->hash_map);
+ io_napi_free(ctx);
kfree(ctx->cancel_table.hbs);
kfree(ctx->cancel_table_locked.hbs);
kfree(ctx->io_bl);
@@ -3304,7 +3304,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
io_allowed_defer_tw_run(ctx))
- ret |= io_run_local_work(ctx) > 0;
+ ret |= io_run_local_work(ctx, INT_MAX) > 0;
ret |= io_cancel_defer_files(ctx, task, cancel_all);
mutex_lock(&ctx->uring_lock);
ret |= io_poll_remove_all(ctx, task, cancel_all);
@@ -3666,7 +3666,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
* it should handle ownership problems if any.
*/
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
- (void)io_run_local_work_locked(ctx);
+ (void)io_run_local_work_locked(ctx, min_complete);
}
mutex_unlock(&ctx->uring_lock);
}
@@ -4153,7 +4153,7 @@ static int __init io_uring_init(void)
BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
- BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
+ BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof_field(struct io_kiocb, flags));
BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
@@ -4175,9 +4175,8 @@ static int __init io_uring_init(void)
SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU,
offsetof(struct io_kiocb, cmd.data),
sizeof_field(struct io_kiocb, cmd.data), NULL);
- io_buf_cachep = kmem_cache_create("io_buffer", sizeof(struct io_buffer), 0,
- SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT,
- NULL);
+ io_buf_cachep = KMEM_CACHE(io_buffer,
+ SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT);
#ifdef CONFIG_SYSCTL
register_sysctl_init("kernel", kernel_io_uring_disabled_table);