From 7b7a8665edd8db733980389b098530f9e4f630b2 Mon Sep 17 00:00:00 2001 From: Christoph Hellwig Date: Wed, 4 Sep 2013 15:04:39 +0200 Subject: direct-io: Implement generic deferred AIO completions Add support to the core direct-io code to defer AIO completions to user context using a workqueue. This replaces opencoded and less efficient code in XFS and ext4 (we save a memory allocation for each direct IO) and will be needed to properly support O_(D)SYNC for AIO. The communication between the filesystem and the direct I/O code requires a new buffer head flag, which is a bit ugly but not avoidable until the direct I/O code stops abusing the buffer_head structure for communicating with the filesystems. Currently this creates a per-superblock unbound workqueue for these completions, which is taken from an earlier patch by Jan Kara. I'm not really convinced about this use and would prefer a "normal" global workqueue with a high concurrency limit, but this needs further discussion. JK: Fixed ext4 part, dynamic allocation of the workqueue. Signed-off-by: Christoph Hellwig Signed-off-by: Jan Kara Signed-off-by: Al Viro --- fs/direct-io.c | 85 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 16 deletions(-) (limited to 'fs/direct-io.c') diff --git a/fs/direct-io.c b/fs/direct-io.c index 7ab90f5081ee..8b31b9f449f4 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -127,6 +127,7 @@ struct dio { spinlock_t bio_lock; /* protects BIO fields below */ int page_errors; /* errno from get_user_pages() */ int is_async; /* is IO async ? */ + bool defer_completion; /* defer AIO completion to workqueue? */ int io_error; /* IO error in completion path */ unsigned long refcount; /* direct_io_worker() and bios */ struct bio *bio_list; /* singly linked via bi_private */ @@ -141,7 +142,10 @@ struct dio { * allocation time. Don't add new fields after pages[] unless you * wish that they not be zeroed. */ - struct page *pages[DIO_PAGES]; /* page buffer */ + union { + struct page *pages[DIO_PAGES]; /* page buffer */ + struct work_struct complete_work;/* deferred AIO completion */ + }; } ____cacheline_aligned_in_smp; static struct kmem_cache *dio_cache __read_mostly; @@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio, * dio_complete() - called when all DIO BIO I/O has been completed * @offset: the byte offset in the file of the completed operation * - * This releases locks as dictated by the locking type, lets interested parties - * know that a DIO operation has completed, and calculates the resulting return - * code for the operation. + * This drops i_dio_count, lets interested parties know that a DIO operation + * has completed, and calculates the resulting return code for the operation. * * It lets the filesystem know if it registered an interest earlier via * get_block. Pass the private field of the map buffer_head so that * filesystems can use it to hold additional state between get_block calls and * dio_complete. */ -static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async) +static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, + bool is_async) { ssize_t transferred = 0; @@ -258,19 +262,26 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is if (ret == 0) ret = transferred; - if (dio->end_io && dio->result) { - dio->end_io(dio->iocb, offset, transferred, - dio->private, ret, is_async); - } else { - inode_dio_done(dio->inode); - if (is_async) - aio_complete(dio->iocb, ret, 0); - } + if (dio->end_io && dio->result) + dio->end_io(dio->iocb, offset, transferred, dio->private); + + inode_dio_done(dio->inode); + if (is_async) + aio_complete(dio->iocb, ret, 0); + kmem_cache_free(dio_cache, dio); return ret; } +static void dio_aio_complete_work(struct work_struct *work) +{ + struct dio *dio = container_of(work, struct dio, complete_work); + + dio_complete(dio, dio->iocb->ki_pos, 0, true); +} + static int dio_bio_complete(struct dio *dio, struct bio *bio); + /* * Asynchronous IO callback. */ @@ -290,8 +301,13 @@ static void dio_bio_end_aio(struct bio *bio, int error) spin_unlock_irqrestore(&dio->bio_lock, flags); if (remaining == 0) { - dio_complete(dio, dio->iocb->ki_pos, 0, true); - kmem_cache_free(dio_cache, dio); + if (dio->result && dio->defer_completion) { + INIT_WORK(&dio->complete_work, dio_aio_complete_work); + queue_work(dio->inode->i_sb->s_dio_done_wq, + &dio->complete_work); + } else { + dio_complete(dio, dio->iocb->ki_pos, 0, true); + } } } @@ -510,6 +526,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio) return ret; } +/* + * Create workqueue for deferred direct IO completions. We allocate the + * workqueue when it's first needed. This avoids creating workqueue for + * filesystems that don't need it and also allows us to create the workqueue + * late enough so the we can include s_id in the name of the workqueue. + */ +static int sb_init_dio_done_wq(struct super_block *sb) +{ + struct workqueue_struct *wq = alloc_workqueue("dio/%s", + WQ_MEM_RECLAIM, 0, + sb->s_id); + if (!wq) + return -ENOMEM; + /* + * This has to be atomic as more DIOs can race to create the workqueue + */ + cmpxchg(&sb->s_dio_done_wq, NULL, wq); + /* Someone created workqueue before us? Free ours... */ + if (wq != sb->s_dio_done_wq) + destroy_workqueue(wq); + return 0; +} + +static int dio_set_defer_completion(struct dio *dio) +{ + struct super_block *sb = dio->inode->i_sb; + + if (dio->defer_completion) + return 0; + dio->defer_completion = true; + if (!sb->s_dio_done_wq) + return sb_init_dio_done_wq(sb); + return 0; +} + /* * Call into the fs to map some more disk blocks. We record the current number * of available blocks at sdio->blocks_available. These are in units of the @@ -581,6 +632,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio, /* Store for completion */ dio->private = map_bh->b_private; + + if (ret == 0 && buffer_defer_completion(map_bh)) + ret = dio_set_defer_completion(dio); } return ret; } @@ -1269,7 +1323,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode, if (drop_refcount(dio) == 0) { retval = dio_complete(dio, offset, retval, false); - kmem_cache_free(dio_cache, dio); } else BUG_ON(retval != -EIOCBQUEUED); -- cgit