diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2025-01-29 09:40:23 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2025-01-29 09:40:23 -0800 |
commit | 92cc9acff7194b1b9db078901f2a83182bb73202 (patch) | |
tree | 281de6ae3baa472e4d031d55de77e9c2a42d0423 /fs | |
parent | 27560b371ab82c1894d048aef0d113acb093f67f (diff) | |
parent | 2d4fde59fd502a65c1698b61ad4d0f10a9ab665a (diff) |
Merge tag 'fuse-update-6.14' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi/fuse
Pull fuse updates from Miklos Szeredi:
"Add support for io-uring communication between kernel and userspace
using IORING_OP_URING_CMD (Bernd Schubert). Following features enable
gains in performance compared to the regular interface:
- Allow processing multiple requests with less syscall overhead
- Combine commit of old and fetch of new fuse request
- CPU/NUMA affinity of queues
Patches were reviewed by several people, including Pavel Begunkov,
io-uring co-maintainer"
* tag 'fuse-update-6.14' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi/fuse:
fuse: prevent disabling io-uring on active connections
fuse: enable fuse-over-io-uring
fuse: block request allocation until io-uring init is complete
fuse: {io-uring} Prevent mount point hang on fuse-server termination
fuse: Allow to queue bg requests through io-uring
fuse: Allow to queue fg requests through io-uring
fuse: {io-uring} Make fuse_dev_queue_{interrupt,forget} non-static
fuse: {io-uring} Handle teardown of ring entries
fuse: Add io-uring sqe commit and fetch support
fuse: {io-uring} Make hash-list req unique finding functions non-static
fuse: Add fuse-io-uring handling into fuse_copy
fuse: Make fuse_copy non static
fuse: {io-uring} Handle SQEs - register commands
fuse: make args->in_args[0] to be always the header
fuse: Add fuse-io-uring design documentation
fuse: Move request bits
fuse: Move fuse_get_dev to header file
fuse: rename to fuse_dev_end_requests and make non-static
Diffstat (limited to 'fs')
-rw-r--r-- | fs/fuse/Kconfig | 12 | ||||
-rw-r--r-- | fs/fuse/Makefile | 1 | ||||
-rw-r--r-- | fs/fuse/dax.c | 11 | ||||
-rw-r--r-- | fs/fuse/dev.c | 127 | ||||
-rw-r--r-- | fs/fuse/dev_uring.c | 1319 | ||||
-rw-r--r-- | fs/fuse/dev_uring_i.h | 205 | ||||
-rw-r--r-- | fs/fuse/dir.c | 32 | ||||
-rw-r--r-- | fs/fuse/fuse_dev_i.h | 66 | ||||
-rw-r--r-- | fs/fuse/fuse_i.h | 32 | ||||
-rw-r--r-- | fs/fuse/inode.c | 14 | ||||
-rw-r--r-- | fs/fuse/xattr.c | 7 |
11 files changed, 1749 insertions, 77 deletions
diff --git a/fs/fuse/Kconfig b/fs/fuse/Kconfig index 8674dbfbe59d..ca215a3cba3e 100644 --- a/fs/fuse/Kconfig +++ b/fs/fuse/Kconfig @@ -63,3 +63,15 @@ config FUSE_PASSTHROUGH to be performed directly on a backing file. If you want to allow passthrough operations, answer Y. + +config FUSE_IO_URING + bool "FUSE communication over io-uring" + default y + depends on FUSE_FS + depends on IO_URING + help + This allows sending FUSE requests over the io-uring interface and + also adds request core affinity. + + If you want to allow fuse server/client communication through io-uring, + answer Y diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile index 2c372180d631..3f0f312a31c1 100644 --- a/fs/fuse/Makefile +++ b/fs/fuse/Makefile @@ -15,5 +15,6 @@ fuse-y += iomode.o fuse-$(CONFIG_FUSE_DAX) += dax.o fuse-$(CONFIG_FUSE_PASSTHROUGH) += passthrough.o fuse-$(CONFIG_SYSCTL) += sysctl.o +fuse-$(CONFIG_FUSE_IO_URING) += dev_uring.o virtiofs-y := virtio_fs.o diff --git a/fs/fuse/dax.c b/fs/fuse/dax.c index 9abbc2f2894f..0b6ee6dd1fd6 100644 --- a/fs/fuse/dax.c +++ b/fs/fuse/dax.c @@ -240,11 +240,12 @@ static int fuse_send_removemapping(struct inode *inode, args.opcode = FUSE_REMOVEMAPPING; args.nodeid = fi->nodeid; - args.in_numargs = 2; - args.in_args[0].size = sizeof(*inargp); - args.in_args[0].value = inargp; - args.in_args[1].size = inargp->count * sizeof(*remove_one); - args.in_args[1].value = remove_one; + args.in_numargs = 3; + fuse_set_zero_arg0(&args); + args.in_args[1].size = sizeof(*inargp); + args.in_args[1].value = inargp; + args.in_args[2].size = inargp->count * sizeof(*remove_one); + args.in_args[2].value = remove_one; return fuse_simple_request(fm, &args); } diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index 27ccae63495d..5b5f789b37eb 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -6,7 +6,9 @@ See the file COPYING. */ +#include "dev_uring_i.h" #include "fuse_i.h" +#include "fuse_dev_i.h" #include <linux/init.h> #include <linux/module.h> @@ -28,23 +30,8 @@ MODULE_ALIAS_MISCDEV(FUSE_MINOR); MODULE_ALIAS("devname:fuse"); -/* Ordinary requests have even IDs, while interrupts IDs are odd */ -#define FUSE_INT_REQ_BIT (1ULL << 0) -#define FUSE_REQ_ID_STEP (1ULL << 1) - static struct kmem_cache *fuse_req_cachep; -static void end_requests(struct list_head *head); - -static struct fuse_dev *fuse_get_dev(struct file *file) -{ - /* - * Lockless access is OK, because file->private data is set - * once during mount and is valid until the file is released. - */ - return READ_ONCE(file->private_data); -} - static void fuse_request_init(struct fuse_mount *fm, struct fuse_req *req) { INIT_LIST_HEAD(&req->list); @@ -89,7 +76,8 @@ void fuse_set_initialized(struct fuse_conn *fc) static bool fuse_block_alloc(struct fuse_conn *fc, bool for_background) { - return !fc->initialized || (for_background && fc->blocked); + return !fc->initialized || (for_background && fc->blocked) || + (fc->io_uring && !fuse_uring_ready(fc)); } static void fuse_drop_waiting(struct fuse_conn *fc) @@ -234,7 +222,7 @@ u64 fuse_get_unique(struct fuse_iqueue *fiq) } EXPORT_SYMBOL_GPL(fuse_get_unique); -static unsigned int fuse_req_hash(u64 unique) +unsigned int fuse_req_hash(u64 unique) { return hash_long(unique & ~FUSE_INT_REQ_BIT, FUSE_PQ_HASH_BITS); } @@ -250,7 +238,8 @@ __releases(fiq->lock) spin_unlock(&fiq->lock); } -static void fuse_dev_queue_forget(struct fuse_iqueue *fiq, struct fuse_forget_link *forget) +void fuse_dev_queue_forget(struct fuse_iqueue *fiq, + struct fuse_forget_link *forget) { spin_lock(&fiq->lock); if (fiq->connected) { @@ -263,7 +252,7 @@ static void fuse_dev_queue_forget(struct fuse_iqueue *fiq, struct fuse_forget_li } } -static void fuse_dev_queue_interrupt(struct fuse_iqueue *fiq, struct fuse_req *req) +void fuse_dev_queue_interrupt(struct fuse_iqueue *fiq, struct fuse_req *req) { spin_lock(&fiq->lock); if (list_empty(&req->intr_entry)) { @@ -580,7 +569,25 @@ ssize_t __fuse_simple_request(struct mnt_idmap *idmap, return ret; } -static bool fuse_request_queue_background(struct fuse_req *req) +#ifdef CONFIG_FUSE_IO_URING +static bool fuse_request_queue_background_uring(struct fuse_conn *fc, + struct fuse_req *req) +{ + struct fuse_iqueue *fiq = &fc->iq; + + req->in.h.unique = fuse_get_unique(fiq); + req->in.h.len = sizeof(struct fuse_in_header) + + fuse_len_args(req->args->in_numargs, + (struct fuse_arg *) req->args->in_args); + + return fuse_uring_queue_bq_req(req); +} +#endif + +/* + * @return true if queued + */ +static int fuse_request_queue_background(struct fuse_req *req) { struct fuse_mount *fm = req->fm; struct fuse_conn *fc = fm->fc; @@ -592,6 +599,12 @@ static bool fuse_request_queue_background(struct fuse_req *req) atomic_inc(&fc->num_waiting); } __set_bit(FR_ISREPLY, &req->flags); + +#ifdef CONFIG_FUSE_IO_URING + if (fuse_uring_ready(fc)) + return fuse_request_queue_background_uring(fc, req); +#endif + spin_lock(&fc->bg_lock); if (likely(fc->connected)) { fc->num_background++; @@ -692,22 +705,8 @@ static int unlock_request(struct fuse_req *req) return err; } -struct fuse_copy_state { - int write; - struct fuse_req *req; - struct iov_iter *iter; - struct pipe_buffer *pipebufs; - struct pipe_buffer *currbuf; - struct pipe_inode_info *pipe; - unsigned long nr_segs; - struct page *pg; - unsigned len; - unsigned offset; - unsigned move_pages:1; -}; - -static void fuse_copy_init(struct fuse_copy_state *cs, int write, - struct iov_iter *iter) +void fuse_copy_init(struct fuse_copy_state *cs, int write, + struct iov_iter *iter) { memset(cs, 0, sizeof(*cs)); cs->write = write; @@ -814,6 +813,9 @@ static int fuse_copy_do(struct fuse_copy_state *cs, void **val, unsigned *size) *size -= ncpy; cs->len -= ncpy; cs->offset += ncpy; + if (cs->is_uring) + cs->ring.copied_sz += ncpy; + return ncpy; } @@ -1068,9 +1070,9 @@ static int fuse_copy_one(struct fuse_copy_state *cs, void *val, unsigned size) } /* Copy request arguments to/from userspace buffer */ -static int fuse_copy_args(struct fuse_copy_state *cs, unsigned numargs, - unsigned argpages, struct fuse_arg *args, - int zeroing) +int fuse_copy_args(struct fuse_copy_state *cs, unsigned numargs, + unsigned argpages, struct fuse_arg *args, + int zeroing) { int err = 0; unsigned i; @@ -1760,7 +1762,7 @@ static int fuse_retrieve(struct fuse_mount *fm, struct inode *inode, args = &ap->args; args->nodeid = outarg->nodeid; args->opcode = FUSE_NOTIFY_REPLY; - args->in_numargs = 2; + args->in_numargs = 3; args->in_pages = true; args->end = fuse_retrieve_end; @@ -1788,9 +1790,10 @@ static int fuse_retrieve(struct fuse_mount *fm, struct inode *inode, } ra->inarg.offset = outarg->offset; ra->inarg.size = total_len; - args->in_args[0].size = sizeof(ra->inarg); - args->in_args[0].value = &ra->inarg; - args->in_args[1].size = total_len; + fuse_set_zero_arg0(args); + args->in_args[1].size = sizeof(ra->inarg); + args->in_args[1].value = &ra->inarg; + args->in_args[2].size = total_len; err = fuse_simple_notify_reply(fm, args, outarg->notify_unique); if (err) @@ -1885,7 +1888,7 @@ static void fuse_resend(struct fuse_conn *fc) spin_unlock(&fiq->lock); list_for_each_entry(req, &to_queue, list) clear_bit(FR_PENDING, &req->flags); - end_requests(&to_queue); + fuse_dev_end_requests(&to_queue); return; } /* iq and pq requests are both oldest to newest */ @@ -1934,7 +1937,7 @@ static int fuse_notify(struct fuse_conn *fc, enum fuse_notify_code code, } /* Look up request on processing list by unique ID */ -static struct fuse_req *request_find(struct fuse_pqueue *fpq, u64 unique) +struct fuse_req *fuse_request_find(struct fuse_pqueue *fpq, u64 unique) { unsigned int hash = fuse_req_hash(unique); struct fuse_req *req; @@ -1946,10 +1949,17 @@ static struct fuse_req *request_find(struct fuse_pqueue *fpq, u64 unique) return NULL; } -static int copy_out_args(struct fuse_copy_state *cs, struct fuse_args *args, - unsigned nbytes) +int fuse_copy_out_args(struct fuse_copy_state *cs, struct fuse_args *args, + unsigned nbytes) { - unsigned reqsize = sizeof(struct fuse_out_header); + + unsigned int reqsize = 0; + + /* + * Uring has all headers separated from args - args is payload only + */ + if (!cs->is_uring) + reqsize = sizeof(struct fuse_out_header); reqsize += fuse_len_args(args->out_numargs, args->out_args); @@ -2011,7 +2021,7 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud, spin_lock(&fpq->lock); req = NULL; if (fpq->connected) - req = request_find(fpq, oh.unique & ~FUSE_INT_REQ_BIT); + req = fuse_request_find(fpq, oh.unique & ~FUSE_INT_REQ_BIT); err = -ENOENT; if (!req) { @@ -2049,7 +2059,7 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud, if (oh.error) err = nbytes != sizeof(oh) ? -EINVAL : 0; else - err = copy_out_args(cs, req->args, nbytes); + err = fuse_copy_out_args(cs, req->args, nbytes); fuse_copy_finish(cs); spin_lock(&fpq->lock); @@ -2204,7 +2214,7 @@ static __poll_t fuse_dev_poll(struct file *file, poll_table *wait) } /* Abort all requests on the given list (pending or processing) */ -static void end_requests(struct list_head *head) +void fuse_dev_end_requests(struct list_head *head) { while (!list_empty(head)) { struct fuse_req *req; @@ -2307,7 +2317,13 @@ void fuse_abort_conn(struct fuse_conn *fc) wake_up_all(&fc->blocked_waitq); spin_unlock(&fc->lock); - end_requests(&to_end); + fuse_dev_end_requests(&to_end); + + /* + * fc->lock must not be taken to avoid conflicts with io-uring + * locks + */ + fuse_uring_abort(fc); } else { spin_unlock(&fc->lock); } @@ -2319,6 +2335,8 @@ void fuse_wait_aborted(struct fuse_conn *fc) /* matches implicit memory barrier in fuse_drop_waiting() */ smp_mb(); wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0); + + fuse_uring_wait_stopped_queues(fc); } int fuse_dev_release(struct inode *inode, struct file *file) @@ -2337,7 +2355,7 @@ int fuse_dev_release(struct inode *inode, struct file *file) list_splice_init(&fpq->processing[i], &to_end); spin_unlock(&fpq->lock); - end_requests(&to_end); + fuse_dev_end_requests(&to_end); /* Are we the last open device? */ if (atomic_dec_and_test(&fc->dev_count)) { @@ -2475,6 +2493,9 @@ const struct file_operations fuse_dev_operations = { .fasync = fuse_dev_fasync, .unlocked_ioctl = fuse_dev_ioctl, .compat_ioctl = compat_ptr_ioctl, +#ifdef CONFIG_FUSE_IO_URING + .uring_cmd = fuse_uring_cmd, +#endif }; EXPORT_SYMBOL_GPL(fuse_dev_operations); diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c new file mode 100644 index 000000000000..ebd2931b4f2a --- /dev/null +++ b/fs/fuse/dev_uring.c @@ -0,0 +1,1319 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * FUSE: Filesystem in Userspace + * Copyright (c) 2023-2024 DataDirect Networks. + */ + +#include "fuse_i.h" +#include "dev_uring_i.h" +#include "fuse_dev_i.h" + +#include <linux/fs.h> +#include <linux/io_uring/cmd.h> + +static bool __read_mostly enable_uring; +module_param(enable_uring, bool, 0644); +MODULE_PARM_DESC(enable_uring, + "Enable userspace communication through io-uring"); + +#define FUSE_URING_IOV_SEGS 2 /* header and payload */ + + +bool fuse_uring_enabled(void) +{ + return enable_uring; +} + +struct fuse_uring_pdu { + struct fuse_ring_ent *ent; +}; + +static const struct fuse_iqueue_ops fuse_io_uring_ops; + +static void uring_cmd_set_ring_ent(struct io_uring_cmd *cmd, + struct fuse_ring_ent *ring_ent) +{ + struct fuse_uring_pdu *pdu = + io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu); + + pdu->ent = ring_ent; +} + +static struct fuse_ring_ent *uring_cmd_to_ring_ent(struct io_uring_cmd *cmd) +{ + struct fuse_uring_pdu *pdu = + io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu); + + return pdu->ent; +} + +static void fuse_uring_flush_bg(struct fuse_ring_queue *queue) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_conn *fc = ring->fc; + + lockdep_assert_held(&queue->lock); + lockdep_assert_held(&fc->bg_lock); + + /* + * Allow one bg request per queue, ignoring global fc limits. + * This prevents a single queue from consuming all resources and + * eliminates the need for remote queue wake-ups when global + * limits are met but this queue has no more waiting requests. + */ + while ((fc->active_background < fc->max_background || + !queue->active_background) && + (!list_empty(&queue->fuse_req_bg_queue))) { + struct fuse_req *req; + + req = list_first_entry(&queue->fuse_req_bg_queue, + struct fuse_req, list); + fc->active_background++; + queue->active_background++; + + list_move_tail(&req->list, &queue->fuse_req_queue); + } +} + +static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req, + int error) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + struct fuse_conn *fc = ring->fc; + + lockdep_assert_not_held(&queue->lock); + spin_lock(&queue->lock); + ent->fuse_req = NULL; + if (test_bit(FR_BACKGROUND, &req->flags)) { + queue->active_background--; + spin_lock(&fc->bg_lock); + fuse_uring_flush_bg(queue); + spin_unlock(&fc->bg_lock); + } + + spin_unlock(&queue->lock); + + if (error) + req->out.h.error = error; + + clear_bit(FR_SENT, &req->flags); + fuse_request_end(req); +} + +/* Abort all list queued request on the given ring queue */ +static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue) +{ + struct fuse_req *req; + LIST_HEAD(req_list); + + spin_lock(&queue->lock); + list_for_each_entry(req, &queue->fuse_req_queue, list) + clear_bit(FR_PENDING, &req->flags); + list_splice_init(&queue->fuse_req_queue, &req_list); + spin_unlock(&queue->lock); + + /* must not hold queue lock to avoid order issues with fi->lock */ + fuse_dev_end_requests(&req_list); +} + +void fuse_uring_abort_end_requests(struct fuse_ring *ring) +{ + int qid; + struct fuse_ring_queue *queue; + struct fuse_conn *fc = ring->fc; + + for (qid = 0; qid < ring->nr_queues; qid++) { + queue = READ_ONCE(ring->queues[qid]); + if (!queue) + continue; + + queue->stopped = true; + + WARN_ON_ONCE(ring->fc->max_background != UINT_MAX); + spin_lock(&queue->lock); + spin_lock(&fc->bg_lock); + fuse_uring_flush_bg(queue); + spin_unlock(&fc->bg_lock); + spin_unlock(&queue->lock); + fuse_uring_abort_end_queue_requests(queue); + } +} + +void fuse_uring_destruct(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + int qid; + + if (!ring) + return; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = ring->queues[qid]; + struct fuse_ring_ent *ent, *next; + + if (!queue) + continue; + + WARN_ON(!list_empty(&queue->ent_avail_queue)); + WARN_ON(!list_empty(&queue->ent_w_req_queue)); + WARN_ON(!list_empty(&queue->ent_commit_queue)); + WARN_ON(!list_empty(&queue->ent_in_userspace)); + + list_for_each_entry_safe(ent, next, &queue->ent_released, + list) { + list_del_init(&ent->list); + kfree(ent); + } + + kfree(queue->fpq.processing); + kfree(queue); + ring->queues[qid] = NULL; + } + + kfree(ring->queues); + kfree(ring); + fc->ring = NULL; +} + +/* + * Basic ring setup for this connection based on the provided configuration + */ +static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) +{ + struct fuse_ring *ring; + size_t nr_queues = num_possible_cpus(); + struct fuse_ring *res = NULL; + size_t max_payload_size; + + ring = kzalloc(sizeof(*fc->ring), GFP_KERNEL_ACCOUNT); + if (!ring) + return NULL; + + ring->queues = kcalloc(nr_queues, sizeof(struct fuse_ring_queue *), + GFP_KERNEL_ACCOUNT); + if (!ring->queues) + goto out_err; + + max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write); + max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE); + + spin_lock(&fc->lock); + if (fc->ring) { + /* race, another thread created the ring in the meantime */ + spin_unlock(&fc->lock); + res = fc->ring; + goto out_err; + } + + init_waitqueue_head(&ring->stop_waitq); + + fc->ring = ring; + ring->nr_queues = nr_queues; + ring->fc = fc; + ring->max_payload_sz = max_payload_size; + atomic_set(&ring->queue_refs, 0); + + spin_unlock(&fc->lock); + return ring; + +out_err: + kfree(ring->queues); + kfree(ring); + return res; +} + +static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, + int qid) +{ + struct fuse_conn *fc = ring->fc; + struct fuse_ring_queue *queue; + struct list_head *pq; + + queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); + if (!queue) + return NULL; + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); + if (!pq) { + kfree(queue); + return NULL; + } + + queue->qid = qid; + queue->ring = ring; + spin_lock_init(&queue->lock); + + INIT_LIST_HEAD(&queue->ent_avail_queue); + INIT_LIST_HEAD(&queue->ent_commit_queue); + INIT_LIST_HEAD(&queue->ent_w_req_queue); + INIT_LIST_HEAD(&queue->ent_in_userspace); + INIT_LIST_HEAD(&queue->fuse_req_queue); + INIT_LIST_HEAD(&queue->fuse_req_bg_queue); + INIT_LIST_HEAD(&queue->ent_released); + + queue->fpq.processing = pq; + fuse_pqueue_init(&queue->fpq); + + spin_lock(&fc->lock); + if (ring->queues[qid]) { + spin_unlock(&fc->lock); + kfree(queue->fpq.processing); + kfree(queue); + return ring->queues[qid]; + } + + /* + * write_once and lock as the caller mostly doesn't take the lock at all + */ + WRITE_ONCE(ring->queues[qid], queue); + spin_unlock(&fc->lock); + + return queue; +} + +static void fuse_uring_stop_fuse_req_end(struct fuse_req *req) +{ + clear_bit(FR_SENT, &req->flags); + req->out.h.error = -ECONNABORTED; + fuse_request_end(req); +} + +/* + * Release a request/entry on connection tear down + */ +static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent) +{ + struct fuse_req *req; + struct io_uring_cmd *cmd; + + struct fuse_ring_queue *queue = ent->queue; + + spin_lock(&queue->lock); + cmd = ent->cmd; + ent->cmd = NULL; + req = ent->fuse_req; + ent->fuse_req = NULL; + if (req) { + /* remove entry from queue->fpq->processing */ + list_del_init(&req->list); + } + + /* + * The entry must not be freed immediately, due to access of direct + * pointer access of entries through IO_URING_F_CANCEL - there is a risk + * of race between daemon termination (which triggers IO_URING_F_CANCEL + * and accesses entries without checking the list state first + */ + list_move(&ent->list, &queue->ent_released); + ent->state = FRRS_RELEASED; + spin_unlock(&queue->lock); + + if (cmd) + io_uring_cmd_done(cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED); + + if (req) + fuse_uring_stop_fuse_req_end(req); +} + +static void fuse_uring_stop_list_entries(struct list_head *head, + struct fuse_ring_queue *queue, + enum fuse_ring_req_state exp_state) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_ring_ent *ent, *next; + ssize_t queue_refs = SSIZE_MAX; + LIST_HEAD(to_teardown); + + spin_lock(&queue->lock); + list_for_each_entry_safe(ent, next, head, list) { + if (ent->state != exp_state) { + pr_warn("entry teardown qid=%d state=%d expected=%d", + queue->qid, ent->state, exp_state); + continue; + } + + ent->state = FRRS_TEARDOWN; + list_move(&ent->list, &to_teardown); + } + spin_unlock(&queue->lock); + + /* no queue lock to avoid lock order issues */ + list_for_each_entry_safe(ent, next, &to_teardown, list) { + fuse_uring_entry_teardown(ent); + queue_refs = atomic_dec_return(&ring->queue_refs); + WARN_ON_ONCE(queue_refs < 0); + } +} + +static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue) +{ + fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, + FRRS_USERSPACE); + fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue, + FRRS_AVAILABLE); +} + +/* + * Log state debug info + */ +static void fuse_uring_log_ent_state(struct fuse_ring *ring) +{ + int qid; + struct fuse_ring_ent *ent; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = ring->queues[qid]; + + if (!queue) + continue; + + spin_lock(&queue->lock); + /* + * Log entries from the intermediate queue, the other queues + * should be empty + */ + list_for_each_entry(ent, &queue->ent_w_req_queue, list) { + pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n", + ring, qid, ent, ent->state); + } + list_for_each_entry(ent, &queue->ent_commit_queue, list) { + pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n", + ring, qid, ent, ent->state); + } + spin_unlock(&queue->lock); + } + ring->stop_debug_log = 1; +} + +static void fuse_uring_async_stop_queues(struct work_struct *work) +{ + int qid; + struct fuse_ring *ring = + container_of(work, struct fuse_ring, async_teardown_work.work); + + /* XXX code dup */ + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); + + if (!queue) + continue; + + fuse_uring_teardown_entries(queue); + } + + /* + * Some ring entries might be in the middle of IO operations, + * i.e. in process to get handled by file_operations::uring_cmd + * or on the way to userspace - we could handle that with conditions in + * run time code, but easier/cleaner to have an async tear down handler + * If there are still queue references left + */ + if (atomic_read(&ring->queue_refs) > 0) { + if (time_after(jiffies, + ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT)) + fuse_uring_log_ent_state(ring); + + schedule_delayed_work(&ring->async_teardown_work, + FUSE_URING_TEARDOWN_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + +/* + * Stop the ring queues + */ +void fuse_uring_stop_queues(struct fuse_ring *ring) +{ + int qid; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); + + if (!queue) + continue; + + fuse_uring_teardown_entries(queue); + } + + if (atomic_read(&ring->queue_refs) > 0) { + ring->teardown_time = jiffies; + INIT_DELAYED_WORK(&ring->async_teardown_work, + fuse_uring_async_stop_queues); + schedule_delayed_work(&ring->async_teardown_work, + FUSE_URING_TEARDOWN_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + +/* + * Handle IO_URING_F_CANCEL, typically should come on daemon termination. + * + * Releasing the last entry should trigger fuse_dev_release() if + * the daemon was terminated + */ +static void fuse_uring_cancel(struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); + struct fuse_ring_queue *queue; + bool need_cmd_done = false; + + /* + * direct access on ent - it must not be destructed as long as + * IO_URING_F_CANCEL might come up + */ + queue = ent->queue; + spin_lock(&queue->lock); + if (ent->state == FRRS_AVAILABLE) { + ent->state = FRRS_USERSPACE; + list_move(&ent->list, &queue->ent_in_userspace); + need_cmd_done = true; + ent->cmd = NULL; + } + spin_unlock(&queue->lock); + + if (need_cmd_done) { + /* no queue lock to avoid lock order issues */ + io_uring_cmd_done(cmd, -ENOTCONN, 0, issue_flags); + } +} + +static void fuse_uring_prepare_cancel(struct io_uring_cmd *cmd, int issue_flags, + struct fuse_ring_ent *ring_ent) +{ + uring_cmd_set_ring_ent(cmd, ring_ent); + io_uring_cmd_mark_cancelable(cmd, issue_flags); +} + +/* + * Checks for errors and stores it into the request + */ +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, + struct fuse_req *req, + struct fuse_conn *fc) +{ + int err; + + err = -EINVAL; + if (oh->unique == 0) { + /* Not supported through io-uring yet */ + pr_warn_once("notify through fuse-io-uring not supported\n"); + goto err; + } + + if (oh->error <= -ERESTARTSYS || oh->error > 0) + goto err; + + if (oh->error) { + err = oh->error; + goto err; + } + + err = -ENOENT; + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", + req->in.h.unique, + oh->unique & ~FUSE_INT_REQ_BIT); + goto err; + } + + /* + * Is it an interrupt reply ID? + * XXX: Not supported through fuse-io-uring yet, it should not even + * find the request - should not happen. + */ + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); + + err = 0; +err: + return err; +} + +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, + struct fuse_req *req, + struct fuse_ring_ent *ent) +{ + struct fuse_copy_state cs; + struct fuse_args *args = req->args; + struct iov_iter iter; + int err; + struct fuse_uring_ent_in_out ring_in_out; + + err = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, + sizeof(ring_in_out)); + if (err) + return -EFAULT; + + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, + &iter); + if (err) + return err; + + fuse_copy_init(&cs, 0, &iter); + cs.is_uring = 1; + cs.req = req; + + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); +} + + /* + * Copy data from the req to the ring buffer + */ +static int fuse_uring_args_to_ring(struct fuse_ring *ring, struct fuse_req *req, + struct fuse_ring_ent *ent) +{ + struct fuse_copy_state cs; + struct fuse_args *args = req->args; + struct fuse_in_arg *in_args = args->in_args; + int num_args = args->in_numargs; + int err; + struct iov_iter iter; + struct fuse_uring_ent_in_out ent_in_out = { + .flags = 0, + .commit_id = req->in.h.unique, + }; + + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); + if (err) { + pr_info_ratelimited("fuse: Import of user buffer failed\n"); + return err; + } + + fuse_copy_init(&cs, 1, &iter); + cs.is_uring = 1; + cs.req = req; + + if (num_args > 0) { + /* + * Expectation is that the first argument is the per op header. + * Some op code have that as zero size. + */ + if (args->in_args[0].size > 0) { + err = copy_to_user(&ent->headers->op_in, in_args->value, + in_args->size); + if (err) { + pr_info_ratelimited( + "Copying the header failed.\n"); + return -EFAULT; + } + } + in_args++; + num_args--; + } + + /* copy the payload */ + err = fuse_copy_args(&cs, num_args, args->in_pages, + (struct fuse_arg *)in_args, 0); + if (err) { + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); + return err; + } + + ent_in_out.payload_sz = cs.ring.copied_sz; + err = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, + sizeof(ent_in_out)); + return err ? -EFAULT : 0; +} + +static int fuse_uring_copy_to_ring(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + int err; + + err = -EIO; + if (WARN_ON(ent->state != FRRS_FUSE_REQ)) { + pr_err("qid=%d ring-req=%p invalid state %d on send\n", + queue->qid, ent, ent->state); + return err; + } + + err = -EINVAL; + if (WARN_ON(req->in.h.unique == 0)) + return err; + + /* copy the request */ + err = fuse_uring_args_to_ring(ring, req, ent); + if (unlikely(err)) { + pr_info_ratelimited("Copy to ring failed: %d\n", err); + return err; + } + + /* copy fuse_in_header */ + err = copy_to_user(&ent->headers->in_out, &req->in.h, + sizeof(req->in.h)); + if (err) { + err = -EFAULT; + return err; + } + + return 0; +} + +static int fuse_uring_prepare_send(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + int err; + + err = fuse_uring_copy_to_ring(ent, req); + if (!err) + set_bit(FR_SENT, &req->flags); + else + fuse_uring_req_end(ent, req, err); + + return err; +} + +/* + * Write data to the ring buffer and send the request to userspace, + * userspace will read it + * This is comparable with classical read(/dev/fuse) + */ +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent, + struct fuse_req *req, + unsigned int issue_flags) +{ + struct fuse_ring_queue *queue = ent->queue; + int err; + struct io_uring_cmd *cmd; + + err = fuse_uring_prepare_send(ent, req); + if (err) + return err; + + spin_lock(&queue->lock); + cmd = ent->cmd; + ent->cmd = NULL; + ent->state = FRRS_USERSPACE; + list_move(&ent->list, &queue->ent_in_userspace); + spin_unlock(&queue->lock); + + io_uring_cmd_done(cmd, 0, 0, issue_flags); + return 0; +} + +/* + * Make a ring entry available for fuse_req assignment + */ +static void fuse_uring_ent_avail(struct fuse_ring_ent *ent, + struct fuse_ring_queue *queue) +{ + WARN_ON_ONCE(!ent->cmd); + list_move(&ent->list, &queue->ent_avail_queue); + ent->state = FRRS_AVAILABLE; +} + +/* Used to find the request on SQE commit */ +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_pqueue *fpq = &queue->fpq; + unsigned int hash; + + req->ring_entry = ent; + hash = fuse_req_hash(req->in.h.unique); + list_move_tail(&req->list, &fpq->processing[hash]); +} + +/* + * Assign a fuse queue entry to the given entry + */ +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_conn *fc = req->fm->fc; + struct fuse_iqueue *fiq = &fc->iq; + + lockdep_assert_held(&queue->lock); + + if (WARN_ON_ONCE(ent->state != FRRS_AVAILABLE && + ent->state != FRRS_COMMIT)) { + pr_warn("%s qid=%d state=%d\n", __func__, ent->queue->qid, + ent->state); + } + + spin_lock(&fiq->lock); + clear_bit(FR_PENDING, &req->flags); + spin_unlock(&fiq->lock); + ent->fuse_req = req; + ent->state = FRRS_FUSE_REQ; + list_move(&ent->list, &queue->ent_w_req_queue); + fuse_uring_add_to_pq(ent, req); +} + +/* Fetch the next fuse request if available */ +static struct fuse_req *fuse_uring_ent_assign_req(struct fuse_ring_ent *ent) + __must_hold(&queue->lock) +{ + struct fuse_req *req; + struct fuse_ring_queue *queue = ent->queue; + struct list_head *req_queue = &queue->fuse_req_queue; + + lockdep_assert_held(&queue->lock); + + /* get and assign the next entry while it is still holding the lock */ + req = list_first_entry_or_null(req_queue, struct fuse_req, list); + if (req) + fuse_uring_add_req_to_ring_ent(ent, req); + + return req; +} + +/* + * Read data from the ring buffer, which user space has written to + * This is comparible with handling of classical write(/dev/fuse). + * Also make the ring request available again for new fuse requests. + */ +static void fuse_uring_commit(struct fuse_ring_ent *ent, struct fuse_req *req, + unsigned int issue_flags) +{ + struct fuse_ring *ring = ent->queue->ring; + struct fuse_conn *fc = ring->fc; + ssize_t err = 0; + + err = copy_from_user(&req->out.h, &ent->headers->in_out, + sizeof(req->out.h)); + if (err) { + req->out.h.error = -EFAULT; + goto out; + } + + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); + if (err) { + /* req->out.h.error already set */ + goto out; + } + + err = fuse_uring_copy_from_ring(ring, req, ent); +out: + fuse_uring_req_end(ent, req, err); +} + +/* + * Get the next fuse req and send it + */ +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ent, + struct fuse_ring_queue *queue, + unsigned int issue_flags) +{ + int err; + struct fuse_req *req; + +retry: + spin_lock(&queue->lock); + fuse_uring_ent_avail(ent, queue); + req = fuse_uring_ent_assign_req(ent); + spin_unlock(&queue->lock); + + if (req) { + err = fuse_uring_send_next_to_ring(ent, req, issue_flags); + if (err) + goto retry; + } +} + +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) +{ + struct fuse_ring_queue *queue = ent->queue; + + lockdep_assert_held(&queue->lock); + + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) + return -EIO; + + ent->state = FRRS_COMMIT; + list_move(&ent->list, &queue->ent_commit_queue); + + return 0; +} + +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, + struct fuse_conn *fc) +{ + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); + struct fuse_ring_ent *ent; + int err; + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); + unsigned int qid = READ_ONCE(cmd_req->qid); + struct fuse_pqueue *fpq; + struct fuse_req *req; + + err = -ENOTCONN; + if (!ring) + return err; + + if (qid >= ring->nr_queues) + return -EINVAL; + + queue = ring->queues[qid]; + if (!queue) + return err; + fpq = &queue->fpq; + + if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped)) + return err; + + spin_lock(&queue->lock); + /* Find a request based on the unique ID of the fuse request + * This should get revised, as it needs a hash calculation and list + * search. And full struct fuse_pqueue is needed (memory overhead). + * As well as the link from req to ring_ent. + */ + req = fuse_request_find(fpq, commit_id); + err = -ENOENT; + if (!req) { + pr_info("qid=%d commit_id %llu not found\n", queue->qid, + commit_id); + spin_unlock(&queue->lock); + return err; + } + list_del_init(&req->list); + ent = req->ring_entry; + req->ring_entry = NULL; + + err = fuse_ring_ent_set_commit(ent); + if (err != 0) { + pr_info_ratelimited("qid=%d commit_id %llu state %d", + queue->qid, commit_id, ent->state); + spin_unlock(&queue->lock); + req->out.h.error = err; + clear_bit(FR_SENT, &req->flags); + fuse_request_end(req); + return err; + } + + ent->cmd = cmd; + spin_unlock(&queue->lock); + + /* without the queue lock, as other locks are taken */ + fuse_uring_prepare_cancel(cmd, issue_flags, ent); + fuse_uring_commit(ent, req, issue_flags); + + /* + * Fetching the next request is absolutely required as queued + * fuse requests would otherwise not get processed - committing + * and fetching is done in one step vs legacy fuse, which has separated + * read (fetch request) and write (commit result). + */ + fuse_uring_next_fuse_req(ent, queue, issue_flags); + return 0; +} + +static bool is_ring_ready(struct fuse_ring *ring, int current_qid) +{ + int qid; + struct fuse_ring_queue *queue; + bool ready = true; + + for (qid = 0; qid < ring->nr_queues && ready; qid++) { + if (current_qid == qid) + continue; + + queue = ring->queues[qid]; + if (!queue) { + ready = false; + break; + } + + spin_lock(&queue->lock); + if (list_empty(&queue->ent_avail_queue)) + ready = false; + spin_unlock(&queue->lock); + } + + return ready; +} + +/* + * fuse_uring_req_fetch command handling + */ +static void fuse_uring_do_register(struct fuse_ring_ent *ent, + struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + struct fuse_conn *fc = ring->fc; + struct fuse_iqueue *fiq = &fc->iq; + + fuse_uring_prepare_cancel(cmd, issue_flags, ent); + + spin_lock(&queue->lock); + ent->cmd = cmd; + fuse_uring_ent_avail(ent, queue); + spin_unlock(&queue->lock); + + if (!ring->ready) { + bool ready = is_ring_ready(ring, queue->qid); + + if (ready) { + WRITE_ONCE(fiq->ops, &fuse_io_uring_ops); + WRITE_ONCE(ring->ready, true); + wake_up_all(&fc->blocked_waitq); + } + } +} + +/* + * sqe->addr is a ptr to an iovec array, iov[0] has the headers, iov[1] + * the payload + */ +static int fuse_uring_get_iovec_from_sqe(const struct io_uring_sqe *sqe, + struct iovec iov[FUSE_URING_IOV_SEGS]) +{ + struct iovec __user *uiov = u64_to_user_ptr(READ_ONCE(sqe->addr)); + struct iov_iter iter; + ssize_t ret; + + if (sqe->len != FUSE_URING_IOV_SEGS) + return -EINVAL; + + /* + * Direction for buffer access will actually be READ and WRITE, + * using write for the import should include READ access as well. + */ + ret = import_iovec(WRITE, uiov, FUSE_URING_IOV_SEGS, + FUSE_URING_IOV_SEGS, &iov, &iter); + if (ret < 0) + return ret; + + return 0; +} + +static struct fuse_ring_ent * +fuse_uring_create_ring_ent(struct io_uring_cmd *cmd, + struct fuse_ring_queue *queue) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_ring_ent *ent; + size_t payload_size; + struct iovec iov[FUSE_URING_IOV_SEGS]; + int err; + + err = fuse_uring_get_iovec_from_sqe(cmd->sqe, iov); + if (err) { + pr_info_ratelimited("Failed to get iovec from sqe, err=%d\n", + err); + return ERR_PTR(err); + } + + err = -EINVAL; + if (iov[0].iov_len < sizeof(struct fuse_uring_req_header)) { + pr_info_ratelimited("Invalid header len %zu\n", iov[0].iov_len); + return ERR_PTR(err); + } + + payload_size = iov[1].iov_len; + if (payload_size < ring->max_payload_sz) { + pr_info_ratelimited("Invalid req payload len %zu\n", + payload_size); + return ERR_PTR(err); + } + + err = -ENOMEM; + ent = kzalloc(sizeof(*ent), GFP_KERNEL_ACCOUNT); + if (!ent) + return ERR_PTR(err); + + INIT_LIST_HEAD(&ent->list); + + ent->queue = queue; + ent->headers = iov[0].iov_base; + ent->payload = iov[1].iov_base; + + atomic_inc(&ring->queue_refs); + return ent; +} + +/* + * Register header and payload buffer with the kernel and puts the + * entry as "ready to get fuse requests" on the queue + */ +static int fuse_uring_register(struct io_uring_cmd *cmd, + unsigned int issue_flags, struct fuse_conn *fc) +{ + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + struct fuse_ring_ent *ent; + int err; + unsigned int qid = READ_ONCE(cmd_req->qid); + + err = -ENOMEM; + if (!ring) { + ring = fuse_uring_create(fc); + if (!ring) + return err; + } + + if (qid >= ring->nr_queues) { + pr_info_ratelimited("fuse: Invalid ring qid %u\n", qid); + return -EINVAL; + } + + queue = ring->queues[qid]; + if (!queue) { + queue = fuse_uring_create_queue(ring, qid); + if (!queue) + return err; + } + + /* + * The created queue above does not need to be destructed in + * case of entry errors below, will be done at ring destruction time. + */ + + ent = fuse_uring_create_ring_ent(cmd, queue); + if (IS_ERR(ent)) + return PTR_ERR(ent); + + fuse_uring_do_register(ent, cmd, issue_flags); + + return 0; +} + +/* + * Entry function from io_uring to handle the given passthrough command + * (op code IORING_OP_URING_CMD) + */ +int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) +{ + struct fuse_dev *fud; + struct fuse_conn *fc; + u32 cmd_op = cmd->cmd_op; + int err; + + if ((unlikely(issue_flags & IO_URING_F_CANCEL))) { + fuse_uring_cancel(cmd, issue_flags); + return 0; + } + + /* This extra SQE size holds struct fuse_uring_cmd_req */ + if (!(issue_flags & IO_URING_F_SQE128)) + return -EINVAL; + + fud = fuse_get_dev(cmd->file); + if (!fud) { + pr_info_ratelimited("No fuse device found\n"); + return -ENOTCONN; + } + fc = fud->fc; + + /* Once a connection has io-uring enabled on it, it can't be disabled */ + if (!enable_uring && !fc->io_uring) { + pr_info_ratelimited("fuse-io-uring is disabled\n"); + return -EOPNOTSUPP; + } + + if (fc->aborted) + return -ECONNABORTED; + if (!fc->connected) + return -ENOTCONN; + + /* + * fuse_uring_register() needs the ring to be initialized, + * we need to know the max payload size + */ + if (!fc->initialized) + return -EAGAIN; + + switch (cmd_op) { + case FUSE_IO_URING_CMD_REGISTER: + err = fuse_uring_register(cmd, issue_flags, fc); + if (err) { + pr_info_once("FUSE_IO_URING_CMD_REGISTER failed err=%d\n", + err); + fc->io_uring = 0; + wake_up_all(&fc->blocked_waitq); + return err; + } + break; + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); + if (err) { + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", + err); + return err; + } + break; + default: + return -EINVAL; + } + + return -EIOCBQUEUED; +} + +static void fuse_uring_send(struct fuse_ring_ent *ent, struct io_uring_cmd *cmd, + ssize_t ret, unsigned int issue_flags) +{ + struct fuse_ring_queue *queue = ent->queue; + + spin_lock(&queue->lock); + ent->state = FRRS_USERSPACE; + list_move(&ent->list, &queue->ent_in_userspace); + ent->cmd = NULL; + spin_unlock(&queue->lock); + + io_uring_cmd_done(cmd, ret, 0, issue_flags); +} + +/* + * This prepares and sends the ring request in fuse-uring task context. + * User buffers are not mapped yet - the application does not have permission + * to write to it - this has to be executed in ring task context. + */ +static void fuse_uring_send_in_task(struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); + struct fuse_ring_queue *queue = ent->queue; + int err; + + if (!(issue_flags & IO_URING_F_TASK_DEAD)) { + err = fuse_uring_prepare_send(ent, ent->fuse_req); + if (err) { + fuse_uring_next_fuse_req(ent, queue, issue_flags); + return; + } + } else { + err = -ECANCELED; + } + + fuse_uring_send(ent, cmd, err, issue_flags); +} + +static struct fuse_ring_queue *fuse_uring_task_to_queue(struct fuse_ring *ring) +{ + unsigned int qid; + struct fuse_ring_queue *queue; + + qid = task_cpu(current); + + if (WARN_ONCE(qid >= ring->nr_queues, + "Core number (%u) exceeds nr queues (%zu)\n", qid, + ring->nr_queues)) + qid = 0; + + queue = ring->queues[qid]; + WARN_ONCE(!queue, "Missing queue for qid %d\n", qid); + + return queue; +} + +static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent) +{ + struct io_uring_cmd *cmd = ent->cmd; + + uring_cmd_set_ring_ent(cmd, ent); + io_uring_cmd_complete_in_task(cmd, fuse_uring_send_in_task); +} + +/* queue a fuse request and send it if a ring entry is available */ +void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req) +{ + struct fuse_conn *fc = req->fm->fc; + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + struct fuse_ring_ent *ent = NULL; + int err; + + err = -EINVAL; + queue = fuse_uring_task_to_queue(ring); + if (!queue) + goto err; + + if (req->in.h.opcode != FUSE_NOTIFY_REPLY) + req->in.h.unique = fuse_get_unique(fiq); + + spin_lock(&queue->lock); + err = -ENOTCONN; + if (unlikely(queue->stopped)) + goto err_unlock; + + ent = list_first_entry_or_null(&queue->ent_avail_queue, + struct fuse_ring_ent, list); + if (ent) + fuse_uring_add_req_to_ring_ent(ent, req); + else + list_add_tail(&req->list, &queue->fuse_req_queue); + spin_unlock(&queue->lock); + + if (ent) + fuse_uring_dispatch_ent(ent); + + return; + +err_unlock: + spin_unlock(&queue->lock); +err: + req->out.h.error = err; + clear_bit(FR_PENDING, &req->flags); + fuse_request_end(req); +} + +bool fuse_uring_queue_bq_req(struct fuse_req *req) +{ + struct fuse_conn *fc = req->fm->fc; + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + struct fuse_ring_ent *ent = NULL; + + queue = fuse_uring_task_to_queue(ring); + if (!queue) + return false; + + spin_lock(&queue->lock); + if (unlikely(queue->stopped)) { + spin_unlock(&queue->lock); + return false; + } + + list_add_tail(&req->list, &queue->fuse_req_bg_queue); + + ent = list_first_entry_or_null(&queue->ent_avail_queue, + struct fuse_ring_ent, list); + spin_lock(&fc->bg_lock); + fc->num_background++; + if (fc->num_background == fc->max_background) + fc->blocked = 1; + fuse_uring_flush_bg(queue); + spin_unlock(&fc->bg_lock); + + /* + * Due to bg_queue flush limits there might be other bg requests + * in the queue that need to be handled first. Or no further req + * might be available. + */ + req = list_first_entry_or_null(&queue->fuse_req_queue, struct fuse_req, + list); + if (ent && req) { + fuse_uring_add_req_to_ring_ent(ent, req); + spin_unlock(&queue->lock); + + fuse_uring_dispatch_ent(ent); + } else { + spin_unlock(&queue->lock); + } + + return true; +} + +static const struct fuse_iqueue_ops fuse_io_uring_ops = { + /* should be send over io-uring as enhancement */ + .send_forget = fuse_dev_queue_forget, + + /* + * could be send over io-uring, but interrupts should be rare, + * no need to make the code complex + */ + .send_interrupt = fuse_dev_queue_interrupt, + .send_req = fuse_uring_queue_fuse_req, +}; diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h new file mode 100644 index 000000000000..2102b3d0c1ae --- /dev/null +++ b/fs/fuse/dev_uring_i.h @@ -0,0 +1,205 @@ +/* SPDX-License-Identifier: GPL-2.0 + * + * FUSE: Filesystem in Userspace + * Copyright (c) 2023-2024 DataDirect Networks. + */ + +#ifndef _FS_FUSE_DEV_URING_I_H +#define _FS_FUSE_DEV_URING_I_H + +#include "fuse_i.h" + +#ifdef CONFIG_FUSE_IO_URING + +#define FUSE_URING_TEARDOWN_TIMEOUT (5 * HZ) +#define FUSE_URING_TEARDOWN_INTERVAL (HZ/20) + +enum fuse_ring_req_state { + FRRS_INVALID = 0, + + /* The ring entry received from userspace and it is being processed */ + FRRS_COMMIT, + + /* The ring entry is waiting for new fuse requests */ + FRRS_AVAILABLE, + + /* The ring entry got assigned a fuse req */ + FRRS_FUSE_REQ, + + /* The ring entry is in or on the way to user space */ + FRRS_USERSPACE, + + /* The ring entry is in teardown */ + FRRS_TEARDOWN, + + /* The ring entry is released, but not freed yet */ + FRRS_RELEASED, +}; + +/** A fuse ring entry, part of the ring queue */ +struct fuse_ring_ent { + /* userspace buffer */ + struct fuse_uring_req_header __user *headers; + void __user *payload; + + /* the ring queue that owns the request */ + struct fuse_ring_queue *queue; + + /* fields below are protected by queue->lock */ + + struct io_uring_cmd *cmd; + + struct list_head list; + + enum fuse_ring_req_state state; + + struct fuse_req *fuse_req; +}; + +struct fuse_ring_queue { + /* + * back pointer to the main fuse uring structure that holds this + * queue + */ + struct fuse_ring *ring; + + /* queue id, corresponds to the cpu core */ + unsigned int qid; + + /* + * queue lock, taken when any value in the queue changes _and_ also + * a ring entry state changes. + */ + spinlock_t lock; + + /* available ring entries (struct fuse_ring_ent) */ + struct list_head ent_avail_queue; + + /* + * entries in the process of being committed or in the process + * to be sent to userspace + */ + struct list_head ent_w_req_queue; + struct list_head ent_commit_queue; + + /* entries in userspace */ + struct list_head ent_in_userspace; + + /* entries that are released */ + struct list_head ent_released; + + /* fuse requests waiting for an entry slot */ + struct list_head fuse_req_queue; + + /* background fuse requests */ + struct list_head fuse_req_bg_queue; + + struct fuse_pqueue fpq; + + unsigned int active_background; + + bool stopped; +}; + +/** + * Describes if uring is for communication and holds alls the data needed + * for uring communication + */ +struct fuse_ring { + /* back pointer */ + struct fuse_conn *fc; + + /* number of ring queues */ + size_t nr_queues; + + /* maximum payload/arg size */ + size_t max_payload_sz; + + struct fuse_ring_queue **queues; + + /* + * Log ring entry states on stop when entries cannot be released + */ + unsigned int stop_debug_log : 1; + + wait_queue_head_t stop_waitq; + + /* async tear down */ + struct delayed_work async_teardown_work; + + /* log */ + unsigned long teardown_time; + + atomic_t queue_refs; + + bool ready; +}; + +bool fuse_uring_enabled(void); +void fuse_uring_destruct(struct fuse_conn *fc); +void fuse_uring_stop_queues(struct fuse_ring *ring); +void fuse_uring_abort_end_requests(struct fuse_ring *ring); +int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); +void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req); +bool fuse_uring_queue_bq_req(struct fuse_req *req); + +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring == NULL) + return; + + if (atomic_read(&ring->queue_refs) > 0) { + fuse_uring_abort_end_requests(ring); + fuse_uring_stop_queues(ring); + } +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring) + wait_event(ring->stop_waitq, + atomic_read(&ring->queue_refs) == 0); +} + +static inline bool fuse_uring_ready(struct fuse_conn *fc) +{ + return fc->ring && fc->ring->ready; +} + +#else /* CONFIG_FUSE_IO_URING */ + +struct fuse_ring; + +static inline void fuse_uring_create(struct fuse_conn *fc) +{ +} + +static inline void fuse_uring_destruct(struct fuse_conn *fc) +{ +} + +static inline bool fuse_uring_enabled(void) +{ + return false; +} + +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ +} + +static inline bool fuse_uring_ready(struct fuse_conn *fc) +{ + return false; +} + +#endif /* CONFIG_FUSE_IO_URING */ + +#endif /* _FS_FUSE_DEV_URING_I_H */ diff --git a/fs/fuse/dir.c b/fs/fuse/dir.c index bf057cf7098d..be693a8a1010 100644 --- a/fs/fuse/dir.c +++ b/fs/fuse/dir.c @@ -175,9 +175,10 @@ static void fuse_lookup_init(struct fuse_conn *fc, struct fuse_args *args, memset(outarg, 0, sizeof(struct fuse_entry_out)); args->opcode = FUSE_LOOKUP; args->nodeid = nodeid; - args->in_numargs = 1; - args->in_args[0].size = name->len + 1; - args->in_args[0].value = name->name; + args->in_numargs = 2; + fuse_set_zero_arg0(args); + args->in_args[1].size = name->len + 1; + args->in_args[1].value = name->name; args->out_numargs = 1; args->out_args[0].size = sizeof(struct fuse_entry_out); args->out_args[0].value = outarg; @@ -929,11 +930,12 @@ static int fuse_symlink(struct mnt_idmap *idmap, struct inode *dir, FUSE_ARGS(args); args.opcode = FUSE_SYMLINK; - args.in_numargs = 2; - args.in_args[0].size = entry->d_name.len + 1; - args.in_args[0].value = entry->d_name.name; - args.in_args[1].size = len; - args.in_args[1].value = link; + args.in_numargs = 3; + fuse_set_zero_arg0(&args); + args.in_args[1].size = entry->d_name.len + 1; + args.in_args[1].value = entry->d_name.name; + args.in_args[2].size = len; + args.in_args[2].value = link; return create_new_entry(idmap, fm, &args, dir, entry, S_IFLNK); } @@ -993,9 +995,10 @@ static int fuse_unlink(struct inode *dir, struct dentry *entry) args.opcode = FUSE_UNLINK; args.nodeid = get_node_id(dir); - args.in_numargs = 1; - args.in_args[0].size = entry->d_name.len + 1; - args.in_args[0].value = entry->d_name.name; + args.in_numargs = 2; + fuse_set_zero_arg0(&args); + args.in_args[1].size = entry->d_name.len + 1; + args.in_args[1].value = entry->d_name.name; err = fuse_simple_request(fm, &args); if (!err) { fuse_dir_changed(dir); @@ -1016,9 +1019,10 @@ static int fuse_rmdir(struct inode *dir, struct dentry *entry) args.opcode = FUSE_RMDIR; args.nodeid = get_node_id(dir); - args.in_numargs = 1; - args.in_args[0].size = entry->d_name.len + 1; - args.in_args[0].value = entry->d_name.name; + args.in_numargs = 2; + fuse_set_zero_arg0(&args); + args.in_args[1].size = entry->d_name.len + 1; + args.in_args[1].value = entry->d_name.name; err = fuse_simple_request(fm, &args); if (!err) { fuse_dir_changed(dir); diff --git a/fs/fuse/fuse_dev_i.h b/fs/fuse/fuse_dev_i.h new file mode 100644 index 000000000000..3b2bfe1248d3 --- /dev/null +++ b/fs/fuse/fuse_dev_i.h @@ -0,0 +1,66 @@ +/* SPDX-License-Identifier: GPL-2.0 + * + * FUSE: Filesystem in Userspace + * Copyright (C) 2001-2008 Miklos Szeredi <miklos@szeredi.hu> + */ +#ifndef _FS_FUSE_DEV_I_H +#define _FS_FUSE_DEV_I_H + +#include <linux/types.h> + +/* Ordinary requests have even IDs, while interrupts IDs are odd */ +#define FUSE_INT_REQ_BIT (1ULL << 0) +#define FUSE_REQ_ID_STEP (1ULL << 1) + +struct fuse_arg; +struct fuse_args; +struct fuse_pqueue; +struct fuse_req; +struct fuse_iqueue; +struct fuse_forget_link; + +struct fuse_copy_state { + int write; + struct fuse_req *req; + struct iov_iter *iter; + struct pipe_buffer *pipebufs; + struct pipe_buffer *currbuf; + struct pipe_inode_info *pipe; + unsigned long nr_segs; + struct page *pg; + unsigned int len; + unsigned int offset; + unsigned int move_pages:1; + unsigned int is_uring:1; + struct { + unsigned int copied_sz; /* copied size into the user buffer */ + } ring; +}; + +static inline struct fuse_dev *fuse_get_dev(struct file *file) +{ + /* + * Lockless access is OK, because file->private data is set + * once during mount and is valid until the file is released. + */ + return READ_ONCE(file->private_data); +} + +unsigned int fuse_req_hash(u64 unique); +struct fuse_req *fuse_request_find(struct fuse_pqueue *fpq, u64 unique); + +void fuse_dev_end_requests(struct list_head *head); + +void fuse_copy_init(struct fuse_copy_state *cs, int write, + struct iov_iter *iter); +int fuse_copy_args(struct fuse_copy_state *cs, unsigned int numargs, + unsigned int argpages, struct fuse_arg *args, + int zeroing); +int fuse_copy_out_args(struct fuse_copy_state *cs, struct fuse_args *args, + unsigned int nbytes); +void fuse_dev_queue_forget(struct fuse_iqueue *fiq, + struct fuse_forget_link *forget); +void fuse_dev_queue_interrupt(struct fuse_iqueue *fiq, struct fuse_req *req); + +#endif + diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index 74744c6f2860..fee96fe7887b 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -310,7 +310,7 @@ struct fuse_args { bool is_ext:1; bool is_pinned:1; bool invalidate_vmap:1; - struct fuse_in_arg in_args[3]; + struct fuse_in_arg in_args[4]; struct fuse_arg out_args[2]; void (*end)(struct fuse_mount *fm, struct fuse_args *args, int error); /* Used for kvec iter backed by vmalloc address */ @@ -438,6 +438,10 @@ struct fuse_req { /** fuse_mount this request belongs to */ struct fuse_mount *fm; + +#ifdef CONFIG_FUSE_IO_URING + void *ring_entry; +#endif }; struct fuse_iqueue; @@ -863,6 +867,9 @@ struct fuse_conn { /* Use pages instead of pointer for kernel I/O */ unsigned int use_pages_for_kvec_io:1; + /* Use io_uring for communication */ + unsigned int io_uring; + /** Maximum stack depth for passthrough backing files */ int max_stack_depth; @@ -923,6 +930,11 @@ struct fuse_conn { /** IDR for backing files ids */ struct idr backing_files_map; #endif + +#ifdef CONFIG_FUSE_IO_URING + /** uring connection information*/ + struct fuse_ring *ring; +#endif }; /* @@ -947,6 +959,19 @@ struct fuse_mount { struct rcu_head rcu; }; +/* + * Empty header for FUSE opcodes without specific header needs. + * Used as a placeholder in args->in_args[0] for consistency + * across all FUSE operations, simplifying request handling. + */ +struct fuse_zero_header {}; + +static inline void fuse_set_zero_arg0(struct fuse_args *args) +{ + args->in_args[0].size = sizeof(struct fuse_zero_header); + args->in_args[0].value = NULL; +} + static inline struct fuse_mount *get_fuse_mount_super(struct super_block *sb) { return sb->s_fs_info; @@ -1220,6 +1245,11 @@ void fuse_change_entry_timeout(struct dentry *entry, struct fuse_entry_out *o); struct fuse_conn *fuse_conn_get(struct fuse_conn *fc); /** + * Initialize the fuse processing queue + */ +void fuse_pqueue_init(struct fuse_pqueue *fpq); + +/** * Initialize fuse_conn */ void fuse_conn_init(struct fuse_conn *fc, struct fuse_mount *fm, diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c index 3ce4f4e81d09..e9db2cb8c150 100644 --- a/fs/fuse/inode.c +++ b/fs/fuse/inode.c @@ -7,6 +7,7 @@ */ #include "fuse_i.h" +#include "dev_uring_i.h" #include <linux/pagemap.h> #include <linux/slab.h> @@ -937,7 +938,7 @@ static void fuse_iqueue_init(struct fuse_iqueue *fiq, fiq->priv = priv; } -static void fuse_pqueue_init(struct fuse_pqueue *fpq) +void fuse_pqueue_init(struct fuse_pqueue *fpq) { unsigned int i; @@ -992,6 +993,8 @@ static void delayed_release(struct rcu_head *p) { struct fuse_conn *fc = container_of(p, struct fuse_conn, rcu); + fuse_uring_destruct(fc); + put_user_ns(fc->user_ns); fc->release(fc); } @@ -1387,6 +1390,8 @@ static void process_init_reply(struct fuse_mount *fm, struct fuse_args *args, else ok = false; } + if (flags & FUSE_OVER_IO_URING && fuse_uring_enabled()) + fc->io_uring = 1; } else { ra_pages = fc->max_read / PAGE_SIZE; fc->no_lock = 1; @@ -1446,6 +1451,13 @@ void fuse_send_init(struct fuse_mount *fm) if (IS_ENABLED(CONFIG_FUSE_PASSTHROUGH)) flags |= FUSE_PASSTHROUGH; + /* + * This is just an information flag for fuse server. No need to check + * the reply - server is either sending IORING_OP_URING_CMD or not. + */ + if (fuse_uring_enabled()) + flags |= FUSE_OVER_IO_URING; + ia->in.flags = flags; ia->in.flags2 = flags >> 32; diff --git a/fs/fuse/xattr.c b/fs/fuse/xattr.c index 9f568d345c51..93dfb06b6cea 100644 --- a/fs/fuse/xattr.c +++ b/fs/fuse/xattr.c @@ -164,9 +164,10 @@ int fuse_removexattr(struct inode *inode, const char *name) args.opcode = FUSE_REMOVEXATTR; args.nodeid = get_node_id(inode); - args.in_numargs = 1; - args.in_args[0].size = strlen(name) + 1; - args.in_args[0].value = name; + args.in_numargs = 2; + fuse_set_zero_arg0(&args); + args.in_args[1].size = strlen(name) + 1; + args.in_args[1].value = name; err = fuse_simple_request(fm, &args); if (err == -ENOSYS) { fm->fc->no_removexattr = 1; |