diff options
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r-- | fs/io_uring.c | 371 |
1 files changed, 216 insertions, 155 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index a4bce17af506..65a17d560a73 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -78,7 +78,6 @@ #include <linux/task_work.h> #include <linux/pagemap.h> #include <linux/io_uring.h> -#include <linux/freezer.h> #define CREATE_TRACE_POINTS #include <trace/events/io_uring.h> @@ -258,7 +257,8 @@ enum { struct io_sq_data { refcount_t refs; - struct rw_semaphore rw_lock; + atomic_t park_pending; + struct mutex lock; /* ctx's that are using this sqd */ struct list_head ctx_list; @@ -273,6 +273,7 @@ struct io_sq_data { unsigned long state; struct completion exited; + struct callback_head *park_task_work; }; #define IO_IOPOLL_BATCH 8 @@ -402,7 +403,7 @@ struct io_ring_ctx { struct socket *ring_sock; #endif - struct idr io_buffer_idr; + struct xarray io_buffers; struct xarray personalities; u32 pers_next; @@ -454,6 +455,22 @@ struct io_ring_ctx { struct list_head tctx_list; }; +struct io_uring_task { + /* submission side */ + struct xarray xa; + struct wait_queue_head wait; + const struct io_ring_ctx *last; + struct io_wq *io_wq; + struct percpu_counter inflight; + atomic_t in_idle; + bool sqpoll; + + spinlock_t task_lock; + struct io_wq_work_list task_list; + unsigned long task_state; + struct callback_head task_work; +}; + /* * First field must be the file pointer in all the * iocb unions! See also 'struct kiocb' in <linux/fs.h> @@ -680,6 +697,7 @@ enum { REQ_F_NO_FILE_TABLE_BIT, REQ_F_LTIMEOUT_ACTIVE_BIT, REQ_F_COMPLETE_INLINE_BIT, + REQ_F_REISSUE_BIT, /* not a real bit, just to check we're not overflowing the space */ __REQ_F_LAST_BIT, @@ -723,6 +741,8 @@ enum { REQ_F_LTIMEOUT_ACTIVE = BIT(REQ_F_LTIMEOUT_ACTIVE_BIT), /* completion is deferred through io_comp_state */ REQ_F_COMPLETE_INLINE = BIT(REQ_F_COMPLETE_INLINE_BIT), + /* caller should reissue async */ + REQ_F_REISSUE = BIT(REQ_F_REISSUE_BIT), }; struct async_poll { @@ -1077,8 +1097,6 @@ static bool io_match_task(struct io_kiocb *head, io_for_each_link(req, head) { if (req->flags & REQ_F_INFLIGHT) return true; - if (req->task->files == files) - return true; } return false; } @@ -1135,7 +1153,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_waitqueue_head(&ctx->cq_wait); INIT_LIST_HEAD(&ctx->cq_overflow_list); init_completion(&ctx->ref_comp); - idr_init(&ctx->io_buffer_idr); + xa_init_flags(&ctx->io_buffers, XA_FLAGS_ALLOC1); xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->wait); @@ -1198,7 +1216,7 @@ static void io_prep_async_work(struct io_kiocb *req) if (req->flags & REQ_F_ISREG) { if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL)) io_wq_hash_work(&req->work, file_inode(req->file)); - } else { + } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) { if (def->unbound_nonreg_file) req->work.flags |= IO_WQ_WORK_UNBOUND; } @@ -1221,16 +1239,16 @@ static void io_queue_async_work(struct io_kiocb *req) BUG_ON(!tctx); BUG_ON(!tctx->io_wq); - trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, - &req->work, req->flags); /* init ->work of the whole link before punting */ io_prep_async_link(req); + trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, + &req->work, req->flags); io_wq_enqueue(tctx->io_wq, &req->work); if (link) io_queue_linked_timeout(link); } -static void io_kill_timeout(struct io_kiocb *req) +static void io_kill_timeout(struct io_kiocb *req, int status) { struct io_timeout_data *io = req->async_data; int ret; @@ -1240,31 +1258,11 @@ static void io_kill_timeout(struct io_kiocb *req) atomic_set(&req->ctx->cq_timeouts, atomic_read(&req->ctx->cq_timeouts) + 1); list_del_init(&req->timeout.list); - io_cqring_fill_event(req, 0); + io_cqring_fill_event(req, status); io_put_req_deferred(req, 1); } } -/* - * Returns true if we found and killed one or more timeouts - */ -static bool io_kill_timeouts(struct io_ring_ctx *ctx, struct task_struct *tsk, - struct files_struct *files) -{ - struct io_kiocb *req, *tmp; - int canceled = 0; - - spin_lock_irq(&ctx->completion_lock); - list_for_each_entry_safe(req, tmp, &ctx->timeout_list, timeout.list) { - if (io_match_task(req, tsk, files)) { - io_kill_timeout(req); - canceled++; - } - } - spin_unlock_irq(&ctx->completion_lock); - return canceled != 0; -} - static void __io_queue_deferred(struct io_ring_ctx *ctx) { do { @@ -1309,7 +1307,7 @@ static void io_flush_timeouts(struct io_ring_ctx *ctx) break; list_del_init(&req->timeout.list); - io_kill_timeout(req); + io_kill_timeout(req, 0); } while (!list_empty(&ctx->timeout_list)); ctx->cq_last_tm_flush = seq; @@ -1550,14 +1548,17 @@ static void io_req_complete_post(struct io_kiocb *req, long res, io_put_task(req->task, 1); list_add(&req->compl.list, &cs->locked_free_list); cs->locked_free_nr++; - } else - req = NULL; + } else { + if (!percpu_ref_tryget(&ctx->refs)) + req = NULL; + } io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); - io_cqring_ev_posted(ctx); - if (req) + if (req) { + io_cqring_ev_posted(ctx); percpu_ref_put(&ctx->refs); + } } static void io_req_complete_state(struct io_kiocb *req, long res, @@ -1925,17 +1926,44 @@ static int io_req_task_work_add(struct io_kiocb *req) return ret; } -static void io_req_task_work_add_fallback(struct io_kiocb *req, - task_work_func_t cb) +static bool io_run_task_work_head(struct callback_head **work_head) +{ + struct callback_head *work, *next; + bool executed = false; + + do { + work = xchg(work_head, NULL); + if (!work) + break; + + do { + next = work->next; + work->func(work); + work = next; + cond_resched(); + } while (work); + executed = true; + } while (1); + + return executed; +} + +static void io_task_work_add_head(struct callback_head **work_head, + struct callback_head *task_work) { - struct io_ring_ctx *ctx = req->ctx; struct callback_head *head; - init_task_work(&req->task_work, cb); do { - head = READ_ONCE(ctx->exit_task_work); - req->task_work.next = head; - } while (cmpxchg(&ctx->exit_task_work, head, &req->task_work) != head); + head = READ_ONCE(*work_head); + task_work->next = head; + } while (cmpxchg(work_head, head, task_work) != head); +} + +static void io_req_task_work_add_fallback(struct io_kiocb *req, + task_work_func_t cb) +{ + init_task_work(&req->task_work, cb); + io_task_work_add_head(&req->ctx->exit_task_work, &req->task_work); } static void __io_req_task_cancel(struct io_kiocb *req, int error) @@ -2451,6 +2479,11 @@ static bool io_rw_should_reissue(struct io_kiocb *req) return false; return true; } +#else +static bool io_rw_should_reissue(struct io_kiocb *req) +{ + return false; +} #endif static bool io_rw_reissue(struct io_kiocb *req) @@ -2476,13 +2509,14 @@ static void __io_complete_rw(struct io_kiocb *req, long res, long res2, { int cflags = 0; - if ((res == -EAGAIN || res == -EOPNOTSUPP) && io_rw_reissue(req)) + if (req->rw.kiocb.ki_flags & IOCB_WRITE) + kiocb_end_write(req); + if ((res == -EAGAIN || res == -EOPNOTSUPP) && io_rw_should_reissue(req)) { + req->flags |= REQ_F_REISSUE; return; + } if (res != req->result) req_set_fail_links(req); - - if (req->rw.kiocb.ki_flags & IOCB_WRITE) - kiocb_end_write(req); if (req->flags & REQ_F_BUFFER_SELECTED) cflags = io_put_rw_kbuf(req); __io_req_complete(req, issue_flags, res, cflags); @@ -2843,7 +2877,7 @@ static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len, lockdep_assert_held(&req->ctx->uring_lock); - head = idr_find(&req->ctx->io_buffer_idr, bgid); + head = xa_load(&req->ctx->io_buffers, bgid); if (head) { if (!list_empty(&head->list)) { kbuf = list_last_entry(&head->list, struct io_buffer, @@ -2851,7 +2885,7 @@ static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len, list_del(&kbuf->list); } else { kbuf = head; - idr_remove(&req->ctx->io_buffer_idr, bgid); + xa_erase(&req->ctx->io_buffers, bgid); } if (*len > kbuf->len) *len = kbuf->len; @@ -3259,11 +3293,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) ret = io_iter_do_read(req, iter); - if (ret == -EIOCBQUEUED) { - if (req->async_data) - iov_iter_revert(iter, io_size - iov_iter_count(iter)); - goto out_free; - } else if (ret == -EAGAIN) { + if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) { /* IOPOLL retry should happen for io-wq threads */ if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL)) goto done; @@ -3273,6 +3303,8 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) /* some cases will consume bytes even on error returns */ iov_iter_revert(iter, io_size - iov_iter_count(iter)); ret = 0; + } else if (ret == -EIOCBQUEUED) { + goto out_free; } else if (ret <= 0 || ret == io_size || !force_nonblock || (req->flags & REQ_F_NOWAIT) || !(req->flags & REQ_F_ISREG)) { /* read all, failed, already did sync or don't want to retry */ @@ -3385,6 +3417,9 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags) else ret2 = -EINVAL; + if (req->flags & REQ_F_REISSUE) + ret2 = -EAGAIN; + /* * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just * retry them without IOCB_NOWAIT. @@ -3394,8 +3429,6 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags) /* no retry on NONBLOCK nor RWF_NOWAIT */ if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT)) goto done; - if (ret2 == -EIOCBQUEUED && req->async_data) - iov_iter_revert(iter, io_size - iov_iter_count(iter)); if (!force_nonblock || ret2 != -EAGAIN) { /* IOPOLL retry should happen for io-wq threads */ if ((req->ctx->flags & IORING_SETUP_IOPOLL) && ret2 == -EAGAIN) @@ -3892,7 +3925,7 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf, } i++; kfree(buf); - idr_remove(&ctx->io_buffer_idr, bgid); + xa_erase(&ctx->io_buffers, bgid); return i; } @@ -3910,7 +3943,7 @@ static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) lockdep_assert_held(&ctx->uring_lock); ret = -ENOENT; - head = idr_find(&ctx->io_buffer_idr, p->bgid); + head = xa_load(&ctx->io_buffers, p->bgid); if (head) ret = __io_remove_buffers(ctx, head, p->bgid, p->nbufs); if (ret < 0) @@ -3930,6 +3963,7 @@ static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) static int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { + unsigned long size; struct io_provide_buf *p = &req->pbuf; u64 tmp; @@ -3943,7 +3977,8 @@ static int io_provide_buffers_prep(struct io_kiocb *req, p->addr = READ_ONCE(sqe->addr); p->len = READ_ONCE(sqe->len); - if (!access_ok(u64_to_user_ptr(p->addr), (p->len * p->nbufs))) + size = (unsigned long)p->len * p->nbufs; + if (!access_ok(u64_to_user_ptr(p->addr), size)) return -EFAULT; p->bgid = READ_ONCE(sqe->buf_group); @@ -3993,21 +4028,14 @@ static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) lockdep_assert_held(&ctx->uring_lock); - list = head = idr_find(&ctx->io_buffer_idr, p->bgid); + list = head = xa_load(&ctx->io_buffers, p->bgid); ret = io_add_buffers(p, &head); - if (ret < 0) - goto out; - - if (!list) { - ret = idr_alloc(&ctx->io_buffer_idr, head, p->bgid, p->bgid + 1, - GFP_KERNEL); - if (ret < 0) { + if (ret >= 0 && !list) { + ret = xa_insert(&ctx->io_buffers, p->bgid, head, GFP_KERNEL); + if (ret < 0) __io_remove_buffers(ctx, head, p->bgid, -1U); - goto out; - } } -out: if (ret < 0) req_set_fail_links(req); @@ -4345,6 +4373,7 @@ static int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags) struct io_async_msghdr iomsg, *kmsg; struct socket *sock; unsigned flags; + int min_ret = 0; int ret; sock = sock_from_file(req->file); @@ -4359,12 +4388,15 @@ static int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags) kmsg = &iomsg; } - flags = req->sr_msg.msg_flags; + flags = req->sr_msg.msg_flags | MSG_NOSIGNAL; if (flags & MSG_DONTWAIT) req->flags |= REQ_F_NOWAIT; else if (issue_flags & IO_URING_F_NONBLOCK) flags |= MSG_DONTWAIT; + if (flags & MSG_WAITALL) + min_ret = iov_iter_count(&kmsg->msg.msg_iter); + ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags); if ((issue_flags & IO_URING_F_NONBLOCK) && ret == -EAGAIN) return io_setup_async_msg(req, kmsg); @@ -4375,7 +4407,7 @@ static int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags) if (kmsg->free_iov) kfree(kmsg->free_iov); req->flags &= ~REQ_F_NEED_CLEANUP; - if (ret < 0) + if (ret < min_ret) req_set_fail_links(req); __io_req_complete(req, issue_flags, ret, 0); return 0; @@ -4388,6 +4420,7 @@ static int io_send(struct io_kiocb *req, unsigned int issue_flags) struct iovec iov; struct socket *sock; unsigned flags; + int min_ret = 0; int ret; sock = sock_from_file(req->file); @@ -4403,12 +4436,15 @@ static int io_send(struct io_kiocb *req, unsigned int issue_flags) msg.msg_controllen = 0; msg.msg_namelen = 0; - flags = req->sr_msg.msg_flags; + flags = req->sr_msg.msg_flags | MSG_NOSIGNAL; if (flags & MSG_DONTWAIT) req->flags |= REQ_F_NOWAIT; else if (issue_flags & IO_URING_F_NONBLOCK) flags |= MSG_DONTWAIT; + if (flags & MSG_WAITALL) + min_ret = iov_iter_count(&msg.msg_iter); + msg.msg_flags = flags; ret = sock_sendmsg(sock, &msg); if ((issue_flags & IO_URING_F_NONBLOCK) && ret == -EAGAIN) @@ -4416,7 +4452,7 @@ static int io_send(struct io_kiocb *req, unsigned int issue_flags) if (ret == -ERESTARTSYS) ret = -EINTR; - if (ret < 0) + if (ret < min_ret) req_set_fail_links(req); __io_req_complete(req, issue_flags, ret, 0); return 0; @@ -4568,6 +4604,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) struct socket *sock; struct io_buffer *kbuf; unsigned flags; + int min_ret = 0; int ret, cflags = 0; bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK; @@ -4593,12 +4630,15 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) 1, req->sr_msg.len); } - flags = req->sr_msg.msg_flags; + flags = req->sr_msg.msg_flags | MSG_NOSIGNAL; if (flags & MSG_DONTWAIT) req->flags |= REQ_F_NOWAIT; else if (force_nonblock) flags |= MSG_DONTWAIT; + if (flags & MSG_WAITALL) + min_ret = iov_iter_count(&kmsg->msg.msg_iter); + ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.umsg, kmsg->uaddr, flags); if (force_nonblock && ret == -EAGAIN) @@ -4612,7 +4652,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) if (kmsg->free_iov) kfree(kmsg->free_iov); req->flags &= ~REQ_F_NEED_CLEANUP; - if (ret < 0) + if (ret < min_ret || ((flags & MSG_WAITALL) && (kmsg->msg.msg_flags & (MSG_TRUNC | MSG_CTRUNC)))) req_set_fail_links(req); __io_req_complete(req, issue_flags, ret, cflags); return 0; @@ -4627,6 +4667,7 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags) struct socket *sock; struct iovec iov; unsigned flags; + int min_ret = 0; int ret, cflags = 0; bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK; @@ -4652,12 +4693,15 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags) msg.msg_iocb = NULL; msg.msg_flags = 0; - flags = req->sr_msg.msg_flags; + flags = req->sr_msg.msg_flags | MSG_NOSIGNAL; if (flags & MSG_DONTWAIT) req->flags |= REQ_F_NOWAIT; else if (force_nonblock) flags |= MSG_DONTWAIT; + if (flags & MSG_WAITALL) + min_ret = iov_iter_count(&msg.msg_iter); + ret = sock_recvmsg(sock, &msg, flags); if (force_nonblock && ret == -EAGAIN) return -EAGAIN; @@ -4666,7 +4710,7 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags) out_free: if (req->flags & REQ_F_BUFFER_SELECTED) cflags = io_put_recv_kbuf(req); - if (ret < 0) + if (ret < min_ret || ((flags & MSG_WAITALL) && (msg.msg_flags & (MSG_TRUNC | MSG_CTRUNC)))) req_set_fail_links(req); __io_req_complete(req, issue_flags, ret, cflags); return 0; @@ -4763,7 +4807,6 @@ static int io_connect(struct io_kiocb *req, unsigned int issue_flags) ret = -ENOMEM; goto out; } - io = req->async_data; memcpy(req->async_data, &__io, sizeof(__io)); return -EAGAIN; } @@ -5526,7 +5569,8 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, data->mode = io_translate_timeout_mode(flags); hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode); - io_req_track_inflight(req); + if (is_timeout_link) + io_req_track_inflight(req); return 0; } @@ -6129,6 +6173,7 @@ static void io_wq_submit_work(struct io_wq_work *work) ret = -ECANCELED; if (!ret) { + req->flags &= ~REQ_F_REISSUE; do { ret = io_issue_sqe(req, 0); /* @@ -6204,7 +6249,6 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer) spin_unlock_irqrestore(&ctx->completion_lock, flags); if (prev) { - req_set_fail_links(prev); io_async_find_and_cancel(ctx, req, prev->user_data, -ETIME); io_put_req_deferred(prev, 1); } else { @@ -6423,8 +6467,6 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, ret = io_init_req(ctx, req, sqe); if (unlikely(ret)) { fail_req: - io_put_req(req); - io_req_complete(req, ret); if (link->head) { /* fail even hard links since we don't submit */ link->head->flags |= REQ_F_FAIL_LINK; @@ -6432,6 +6474,8 @@ fail_req: io_req_complete(link->head, -ECANCELED); link->head = NULL; } + io_put_req(req); + io_req_complete(req, ret); return ret; } ret = io_req_prep(req, sqe); @@ -6684,7 +6728,7 @@ static int io_sq_thread(void *data) char buf[TASK_COMM_LEN]; DEFINE_WAIT(wait); - sprintf(buf, "iou-sqp-%d", sqd->task_pid); + snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); set_task_comm(current, buf); current->pf_io_worker = NULL; @@ -6694,22 +6738,30 @@ static int io_sq_thread(void *data) set_cpus_allowed_ptr(current, cpu_online_mask); current->flags |= PF_NO_SETAFFINITY; - down_read(&sqd->rw_lock); - + mutex_lock(&sqd->lock); while (!test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)) { int ret; bool cap_entries, sqt_spin, needs_sched; - if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state)) { - up_read(&sqd->rw_lock); + if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || + signal_pending(current)) { + bool did_sig = false; + + mutex_unlock(&sqd->lock); + if (signal_pending(current)) { + struct ksignal ksig; + + did_sig = get_signal(&ksig); + } cond_resched(); - down_read(&sqd->rw_lock); + mutex_lock(&sqd->lock); + if (did_sig) + break; io_run_task_work(); + io_run_task_work_head(&sqd->park_task_work); timeout = jiffies + sqd->sq_thread_idle; continue; } - if (fatal_signal_pending(current)) - break; sqt_spin = false; cap_entries = !list_is_singular(&sqd->ctx_list); list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { @@ -6750,32 +6802,27 @@ static int io_sq_thread(void *data) list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) io_ring_set_wakeup_flag(ctx); - up_read(&sqd->rw_lock); + mutex_unlock(&sqd->lock); schedule(); - down_read(&sqd->rw_lock); + mutex_lock(&sqd->lock); list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) io_ring_clear_wakeup_flag(ctx); } finish_wait(&sqd->wait, &wait); + io_run_task_work_head(&sqd->park_task_work); timeout = jiffies + sqd->sq_thread_idle; } - up_read(&sqd->rw_lock); - down_write(&sqd->rw_lock); - /* - * someone may have parked and added a cancellation task_work, run - * it first because we don't want it in io_uring_cancel_sqpoll() - */ - io_run_task_work(); list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) io_uring_cancel_sqpoll(ctx); sqd->thread = NULL; list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) io_ring_set_wakeup_flag(ctx); - up_write(&sqd->rw_lock); + mutex_unlock(&sqd->lock); io_run_task_work(); + io_run_task_work_head(&sqd->park_task_work); complete(&sqd->exited); do_exit(0); } @@ -6821,7 +6868,7 @@ static int io_run_task_work_sig(void) return 1; if (!signal_pending(current)) return 0; - if (test_tsk_thread_flag(current, TIF_NOTIFY_SIGNAL)) + if (test_thread_flag(TIF_NOTIFY_SIGNAL)) return -ERESTARTSYS; return -EINTR; } @@ -7075,23 +7122,28 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) } static void io_sq_thread_unpark(struct io_sq_data *sqd) - __releases(&sqd->rw_lock) + __releases(&sqd->lock) { WARN_ON_ONCE(sqd->thread == current); + /* + * Do the dance but not conditional clear_bit() because it'd race with + * other threads incrementing park_pending and setting the bit. + */ clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); - up_write(&sqd->rw_lock); + if (atomic_dec_return(&sqd->park_pending)) + set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); + mutex_unlock(&sqd->lock); } static void io_sq_thread_park(struct io_sq_data *sqd) - __acquires(&sqd->rw_lock) + __acquires(&sqd->lock) { WARN_ON_ONCE(sqd->thread == current); + atomic_inc(&sqd->park_pending); set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); - down_write(&sqd->rw_lock); - /* set again for consistency, in case concurrent parks are happening */ - set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); + mutex_lock(&sqd->lock); if (sqd->thread) wake_up_process(sqd->thread); } @@ -7100,17 +7152,19 @@ static void io_sq_thread_stop(struct io_sq_data *sqd) { WARN_ON_ONCE(sqd->thread == current); - down_write(&sqd->rw_lock); + mutex_lock(&sqd->lock); set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); if (sqd->thread) wake_up_process(sqd->thread); - up_write(&sqd->rw_lock); + mutex_unlock(&sqd->lock); wait_for_completion(&sqd->exited); } static void io_put_sq_data(struct io_sq_data *sqd) { if (refcount_dec_and_test(&sqd->refs)) { + WARN_ON_ONCE(atomic_read(&sqd->park_pending)); + io_sq_thread_stop(sqd); kfree(sqd); } @@ -7184,9 +7238,10 @@ static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, if (!sqd) return ERR_PTR(-ENOMEM); + atomic_set(&sqd->park_pending, 0); refcount_set(&sqd->refs, 1); INIT_LIST_HEAD(&sqd->ctx_list); - init_rwsem(&sqd->rw_lock); + mutex_init(&sqd->lock); init_waitqueue_head(&sqd->wait); init_completion(&sqd->exited); return sqd; @@ -7866,22 +7921,17 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, ret = 0; io_sq_thread_park(sqd); + list_add(&ctx->sqd_list, &sqd->ctx_list); + io_sqd_update_thread_idle(sqd); /* don't attach to a dying SQPOLL thread, would be racy */ - if (attached && !sqd->thread) { + if (attached && !sqd->thread) ret = -ENXIO; - } else { - list_add(&ctx->sqd_list, &sqd->ctx_list); - io_sqd_update_thread_idle(sqd); - } io_sq_thread_unpark(sqd); - if (ret < 0) { - io_put_sq_data(sqd); - ctx->sq_data = NULL; - return ret; - } else if (attached) { + if (ret < 0) + goto err; + if (attached) return 0; - } if (p->flags & IORING_SETUP_SQ_AFF) { int cpu = p->sq_thread_cpu; @@ -8332,19 +8382,13 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx) return -ENXIO; } -static int __io_destroy_buffers(int id, void *p, void *data) -{ - struct io_ring_ctx *ctx = data; - struct io_buffer *buf = p; - - __io_remove_buffers(ctx, buf, id, -1U); - return 0; -} - static void io_destroy_buffers(struct io_ring_ctx *ctx) { - idr_for_each(&ctx->io_buffer_idr, __io_destroy_buffers, ctx); - idr_destroy(&ctx->io_buffer_idr); + struct io_buffer *buf; + unsigned long index; + + xa_for_each(&ctx->io_buffers, index, buf) + __io_remove_buffers(ctx, buf, index, -1U); } static void io_req_cache_free(struct list_head *list, struct task_struct *tsk) @@ -8386,11 +8430,13 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) { /* * Some may use context even when all refs and requests have been put, - * and they are free to do so while still holding uring_lock, see - * __io_req_task_submit(). Wait for them to finish. + * and they are free to do so while still holding uring_lock or + * completion_lock, see __io_req_task_submit(). Wait for them to finish. */ mutex_lock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock); + spin_lock_irq(&ctx->completion_lock); + spin_unlock_irq(&ctx->completion_lock); io_sq_thread_finish(ctx); io_sqe_buffers_unregister(ctx); @@ -8478,26 +8524,9 @@ static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id) return -EINVAL; } -static bool io_run_ctx_fallback(struct io_ring_ctx *ctx) +static inline bool io_run_ctx_fallback(struct io_ring_ctx *ctx) { - struct callback_head *work, *next; - bool executed = false; - - do { - work = xchg(&ctx->exit_task_work, NULL); - if (!work) - break; - - do { - next = work->next; - work->func(work); - work = next; - cond_resched(); - } while (work); - executed = true; - } while (1); - - return executed; + return io_run_task_work_head(&ctx->exit_task_work); } struct io_tctx_exit { @@ -8529,6 +8558,14 @@ static void io_ring_exit_work(struct work_struct *work) struct io_tctx_node *node; int ret; + /* prevent SQPOLL from submitting new requests */ + if (ctx->sq_data) { + io_sq_thread_park(ctx->sq_data); + list_del_init(&ctx->sqd_list); + io_sqd_update_thread_idle(ctx->sq_data); + io_sq_thread_unpark(ctx->sq_data); + } + /* * If we're doing polled IO and end up having requests being * submitted async (out-of-line), then completions can come in while @@ -8565,6 +8602,28 @@ static void io_ring_exit_work(struct work_struct *work) io_ring_ctx_free(ctx); } +/* Returns true if we found and killed one or more timeouts */ +static bool io_kill_timeouts(struct io_ring_ctx *ctx, struct task_struct *tsk, + struct files_struct *files) +{ + struct io_kiocb *req, *tmp; + int canceled = 0; + + spin_lock_irq(&ctx->completion_lock); + list_for_each_entry_safe(req, tmp, &ctx->timeout_list, timeout.list) { + if (io_match_task(req, tsk, files)) { + io_kill_timeout(req, -ECANCELED); + canceled++; + } + } + if (canceled != 0) + io_commit_cqring(ctx); + spin_unlock_irq(&ctx->completion_lock); + if (canceled != 0) + io_cqring_ev_posted(ctx); + return canceled != 0; +} + static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) { unsigned long index; @@ -8879,7 +8938,7 @@ static void io_sqpoll_cancel_sync(struct io_ring_ctx *ctx) if (task) { init_completion(&work.completion); init_task_work(&work.task_work, io_sqpoll_cancel_cb); - WARN_ON_ONCE(task_work_add(task, &work.task_work, TWA_SIGNAL)); + io_task_work_add_head(&sqd->park_task_work, &work.task_work); wake_up_process(task); } io_sq_thread_unpark(sqd); @@ -8956,6 +9015,8 @@ void __io_uring_task_cancel(void) /* make sure overflow events are dropped */ atomic_inc(&tctx->in_idle); + __io_uring_files_cancel(NULL); + do { /* read completions before cancelations */ inflight = tctx_inflight(tctx); |