diff options
Diffstat (limited to 'drivers/block/rbd.c')
| -rw-r--r-- | drivers/block/rbd.c | 5905 |
1 files changed, 3414 insertions, 2491 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b008b6a98098..af0e21149dbc 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -32,8 +32,9 @@ #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> #include <linux/ceph/cls_lock_client.h> +#include <linux/ceph/striper.h> #include <linux/ceph/decode.h> -#include <linux/parser.h> +#include <linux/fs_parser.h> #include <linux/bsearch.h> #include <linux/kernel.h> @@ -51,15 +52,6 @@ #define RBD_DEBUG /* Activate rbd_assert() calls */ /* - * The basic unit of block I/O is a sector. It is interpreted in a - * number of contexts in Linux (blk, bio, genhd), but the default is - * universally 512 bytes. These symbols are just slightly more - * meaningful than the bare numbers they represent. - */ -#define SECTOR_SHIFT 9 -#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) - -/* * Increment the given counter and return its updated value. * If the counter is already 0 it will not be incremented. * If the counter is already at its maximum value returns @@ -69,7 +61,7 @@ static int atomic_inc_return_safe(atomic_t *v) { unsigned int counter; - counter = (unsigned int)__atomic_add_unless(v, 1, 0); + counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0); if (counter <= (unsigned int)INT_MAX) return (int)counter; @@ -123,12 +115,20 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_LAYERING (1ULL<<0) #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) +#define RBD_FEATURE_OBJECT_MAP (1ULL<<3) +#define RBD_FEATURE_FAST_DIFF (1ULL<<4) +#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) #define RBD_FEATURE_DATA_POOL (1ULL<<7) +#define RBD_FEATURE_OPERATIONS (1ULL<<8) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ RBD_FEATURE_EXCLUSIVE_LOCK | \ - RBD_FEATURE_DATA_POOL) + RBD_FEATURE_OBJECT_MAP | \ + RBD_FEATURE_FAST_DIFF | \ + RBD_FEATURE_DEEP_FLATTEN | \ + RBD_FEATURE_DATA_POOL | \ + RBD_FEATURE_OPERATIONS) /* Features supported by this (client software) implementation. */ @@ -187,6 +187,7 @@ struct rbd_image_header { struct rbd_spec { u64 pool_id; const char *pool_name; + const char *pool_ns; /* NULL if default, never "" */ const char *image_id; const char *image_name; @@ -206,126 +207,151 @@ struct rbd_client { struct list_head node; }; -struct rbd_img_request; -typedef void (*rbd_img_callback_t)(struct rbd_img_request *); - -#define BAD_WHICH U32_MAX /* Good which or bad which, which? */ +struct pending_result { + int result; /* first nonzero result */ + int num_pending; +}; -struct rbd_obj_request; -typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); +struct rbd_img_request; enum obj_request_type { - OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES + OBJ_REQUEST_NODATA = 1, + OBJ_REQUEST_BIO, /* pointer into provided bio (list) */ + OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */ + OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */ }; enum obj_operation_type { + OBJ_OP_READ = 1, OBJ_OP_WRITE, - OBJ_OP_READ, OBJ_OP_DISCARD, + OBJ_OP_ZEROOUT, }; -enum obj_req_flags { - OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ - OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ - OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ - OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ +#define RBD_OBJ_FLAG_DELETION (1U << 0) +#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1) +#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2) +#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3) +#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4) + +enum rbd_obj_read_state { + RBD_OBJ_READ_START = 1, + RBD_OBJ_READ_OBJECT, + RBD_OBJ_READ_PARENT, }; -struct rbd_obj_request { - u64 object_no; - u64 offset; /* object start byte */ - u64 length; /* bytes from offset */ - unsigned long flags; +/* + * Writes go through the following state machine to deal with + * layering: + * + * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . . + * . | . + * . v . + * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . . + * . | . . + * . v v (deep-copyup . + * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) . + * flattened) v | . . + * . v . . + * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup . + * | not needed) v + * v . + * done . . . . . . . . . . . . . . . . . . + * ^ + * | + * RBD_OBJ_WRITE_FLAT + * + * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether + * assert_exists guard is needed or not (in some cases it's not needed + * even if there is a parent). + */ +enum rbd_obj_write_state { + RBD_OBJ_WRITE_START = 1, + RBD_OBJ_WRITE_PRE_OBJECT_MAP, + RBD_OBJ_WRITE_OBJECT, + __RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_POST_OBJECT_MAP, +}; - /* - * An object request associated with an image will have its - * img_data flag set; a standalone object request will not. - * - * A standalone object request will have which == BAD_WHICH - * and a null obj_request pointer. - * - * An object request initiated in support of a layered image - * object (to check for its existence before a write) will - * have which == BAD_WHICH and a non-null obj_request pointer. - * - * Finally, an object request for rbd image data will have - * which != BAD_WHICH, and will have a non-null img_request - * pointer. The value of which will be in the range - * 0..(img_request->obj_request_count-1). - */ +enum rbd_obj_copyup_state { + RBD_OBJ_COPYUP_START = 1, + RBD_OBJ_COPYUP_READ_PARENT, + __RBD_OBJ_COPYUP_OBJECT_MAPS, + RBD_OBJ_COPYUP_OBJECT_MAPS, + __RBD_OBJ_COPYUP_WRITE_OBJECT, + RBD_OBJ_COPYUP_WRITE_OBJECT, +}; + +struct rbd_obj_request { + struct ceph_object_extent ex; + unsigned int flags; /* RBD_OBJ_FLAG_* */ union { - struct rbd_obj_request *obj_request; /* STAT op */ - struct { - struct rbd_img_request *img_request; - u64 img_offset; - /* links for img_request->obj_requests list */ - struct list_head links; - }; + enum rbd_obj_read_state read_state; /* for reads */ + enum rbd_obj_write_state write_state; /* for writes */ }; - u32 which; /* posn image request list */ - enum obj_request_type type; + struct rbd_img_request *img_request; + struct ceph_file_extent *img_extents; + u32 num_img_extents; + union { - struct bio *bio_list; + struct ceph_bio_iter bio_pos; struct { - struct page **pages; - u32 page_count; + struct ceph_bvec_iter bvec_pos; + u32 bvec_count; + u32 bvec_idx; }; }; - struct page **copyup_pages; - u32 copyup_page_count; - struct ceph_osd_request *osd_req; + enum rbd_obj_copyup_state copyup_state; + struct bio_vec *copyup_bvecs; + u32 copyup_bvec_count; - u64 xferred; /* bytes transferred */ - int result; - - rbd_obj_callback_t callback; - struct completion completion; + struct list_head osd_reqs; /* w/ r_private_item */ + struct mutex state_mutex; + struct pending_result pending; struct kref kref; }; enum img_req_flags { - IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ - IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */ +}; + +enum rbd_img_state { + RBD_IMG_START = 1, + RBD_IMG_EXCLUSIVE_LOCK, + __RBD_IMG_OBJECT_REQUESTS, + RBD_IMG_OBJECT_REQUESTS, }; struct rbd_img_request { struct rbd_device *rbd_dev; - u64 offset; /* starting image byte offset */ - u64 length; /* byte count from offset */ + enum obj_operation_type op_type; + enum obj_request_type data_type; unsigned long flags; + enum rbd_img_state state; union { u64 snap_id; /* for reads */ struct ceph_snap_context *snapc; /* for writes */ }; - union { - struct request *rq; /* block request */ - struct rbd_obj_request *obj_request; /* obj req initiator */ - }; - struct page **copyup_pages; - u32 copyup_page_count; - spinlock_t completion_lock;/* protects next_completion */ - u32 next_completion; - rbd_img_callback_t callback; - u64 xferred;/* aggregate bytes transferred */ - int result; /* first nonzero obj_request result */ + struct rbd_obj_request *obj_request; /* obj req initiator */ - u32 obj_request_count; - struct list_head obj_requests; /* rbd_obj_request structs */ + struct list_head lock_item; + struct list_head object_extents; /* obj_req.ex structs */ - struct kref kref; + struct mutex state_mutex; + struct pending_result pending; + struct work_struct work; + int work_result; }; #define for_each_obj_request(ireq, oreq) \ - list_for_each_entry(oreq, &(ireq)->obj_requests, links) -#define for_each_obj_request_from(ireq, oreq) \ - list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) + list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item) #define for_each_obj_request_safe(ireq, oreq, n) \ - list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) + list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item) enum rbd_watch_state { RBD_WATCH_STATE_UNREGISTERED, @@ -336,7 +362,7 @@ enum rbd_watch_state { enum rbd_lock_state { RBD_LOCK_STATE_UNLOCKED, RBD_LOCK_STATE_LOCKED, - RBD_LOCK_STATE_RELEASING, + RBD_LOCK_STATE_QUIESCING, }; /* WatchNotify::ClientId */ @@ -347,8 +373,6 @@ struct rbd_client_id { struct rbd_mapping { u64 size; - u64 features; - bool read_only; }; /* @@ -393,7 +417,17 @@ struct rbd_device { struct work_struct released_lock_work; struct delayed_work lock_dwork; struct work_struct unlock_work; - wait_queue_head_t lock_waitq; + spinlock_t lock_lists_lock; + struct list_head acquiring_list; + struct list_head running_list; + struct completion acquire_wait; + int acquire_err; + struct completion quiescing_wait; + + spinlock_t object_map_lock; + u8 *object_map; + u64 object_map_size; /* in objects */ + u64 object_map_flags; struct workqueue_struct *task_wq; @@ -421,12 +455,11 @@ struct rbd_device { * Flag bits for rbd_dev->flags: * - REMOVING (which is coupled with rbd_dev->open_count) is protected * by rbd_dev->lock - * - BLACKLISTED is protected by rbd_dev->lock_rwsem */ enum rbd_dev_flags { - RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ + RBD_DEV_FLAG_EXISTS, /* rbd_dev_device_setup() ran */ RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ - RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ + RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */ }; static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ @@ -442,33 +475,30 @@ static DEFINE_SPINLOCK(rbd_client_list_lock); static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; -static struct bio_set *rbd_bio_clone; - static int rbd_major; static DEFINE_IDA(rbd_dev_id_ida); static struct workqueue_struct *rbd_wq; +static struct ceph_snap_context rbd_empty_snapc = { + .nref = REFCOUNT_INIT(1), +}; + /* - * Default to false for now, as single-major requires >= 0.75 version of - * userspace rbd utility. + * single-major requires >= 0.75 version of userspace rbd utility. */ -static bool single_major = false; -module_param(single_major, bool, S_IRUGO); -MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)"); - -static int rbd_img_request_submit(struct rbd_img_request *img_request); - -static ssize_t rbd_add(struct bus_type *bus, const char *buf, - size_t count); -static ssize_t rbd_remove(struct bus_type *bus, const char *buf, - size_t count); -static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, - size_t count); -static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, - size_t count); +static bool single_major = true; +module_param(single_major, bool, 0444); +MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); + +static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count); +static ssize_t remove_store(const struct bus_type *bus, const char *buf, + size_t count); +static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf, + size_t count); +static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf, + size_t count); static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); -static void rbd_spec_put(struct rbd_spec *spec); static int rbd_dev_id_to_minor(int dev_id) { @@ -480,10 +510,22 @@ static int minor_to_rbd_dev_id(int minor) return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } +static bool rbd_is_ro(struct rbd_device *rbd_dev) +{ + return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags); +} + +static bool rbd_is_snap(struct rbd_device *rbd_dev) +{ + return rbd_dev->spec->snap_id != CEPH_NOSNAP; +} + static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) { + lockdep_assert_held(&rbd_dev->lock_rwsem); + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || - rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; + rbd_dev->lock_state == RBD_LOCK_STATE_QUIESCING; } static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) @@ -496,16 +538,16 @@ static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) return is_lock_owner; } -static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf) +static ssize_t supported_features_show(const struct bus_type *bus, char *buf) { return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED); } -static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); -static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); -static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); -static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major); -static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL); +static BUS_ATTR_WO(add); +static BUS_ATTR_WO(remove); +static BUS_ATTR_WO(add_single_major); +static BUS_ATTR_WO(remove_single_major); +static BUS_ATTR_RO(supported_features); static struct attribute *rbd_bus_attrs[] = { &bus_attr_add.attr, @@ -533,7 +575,7 @@ static const struct attribute_group rbd_bus_group = { }; __ATTRIBUTE_GROUPS(rbd_bus); -static struct bus_type rbd_bus_type = { +static const struct bus_type rbd_bus_type = { .name = "rbd", .bus_groups = rbd_bus_groups, }; @@ -587,29 +629,40 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) # define rbd_assert(expr) ((void) 0) #endif /* !RBD_DEBUG */ -static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request); -static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); -static void rbd_img_parent_read(struct rbd_obj_request *obj_request); static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); static int rbd_dev_refresh(struct rbd_device *rbd_dev); -static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); -static int rbd_dev_header_info(struct rbd_device *rbd_dev); -static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); +static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev, + struct rbd_image_header *header); static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u64 snap_id); static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, u8 *order, u64 *snap_size); -static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, - u64 *snap_features); +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev); -static int rbd_open(struct block_device *bdev, fmode_t mode) +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result); +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result); + +/* + * Return true if nothing else is pending. + */ +static bool pending_result_dec(struct pending_result *pending, int *result) { - struct rbd_device *rbd_dev = bdev->bd_disk->private_data; - bool removing = false; + rbd_assert(pending->num_pending > 0); - if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) - return -EROFS; + if (*result && !pending->result) + pending->result = *result; + if (--pending->num_pending) + return false; + + *result = pending->result; + return true; +} + +static int rbd_open(struct gendisk *disk, blk_mode_t mode) +{ + struct rbd_device *rbd_dev = disk->private_data; + bool removing = false; spin_lock_irq(&rbd_dev->lock); if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) @@ -625,7 +678,7 @@ static int rbd_open(struct block_device *bdev, fmode_t mode) return 0; } -static void rbd_release(struct gendisk *disk, fmode_t mode) +static void rbd_release(struct gendisk *disk) { struct rbd_device *rbd_dev = disk->private_data; unsigned long open_count_before; @@ -638,76 +691,10 @@ static void rbd_release(struct gendisk *disk, fmode_t mode) put_device(&rbd_dev->dev); } -static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) -{ - int ret = 0; - int val; - bool ro; - bool ro_changed = false; - - /* get_user() may sleep, so call it before taking rbd_dev->lock */ - if (get_user(val, (int __user *)(arg))) - return -EFAULT; - - ro = val ? true : false; - /* Snapshot doesn't allow to write*/ - if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) - return -EROFS; - - spin_lock_irq(&rbd_dev->lock); - /* prevent others open this device */ - if (rbd_dev->open_count > 1) { - ret = -EBUSY; - goto out; - } - - if (rbd_dev->mapping.read_only != ro) { - rbd_dev->mapping.read_only = ro; - ro_changed = true; - } - -out: - spin_unlock_irq(&rbd_dev->lock); - /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */ - if (ret == 0 && ro_changed) - set_disk_ro(rbd_dev->disk, ro ? 1 : 0); - - return ret; -} - -static int rbd_ioctl(struct block_device *bdev, fmode_t mode, - unsigned int cmd, unsigned long arg) -{ - struct rbd_device *rbd_dev = bdev->bd_disk->private_data; - int ret = 0; - - switch (cmd) { - case BLKROSET: - ret = rbd_ioctl_set_ro(rbd_dev, arg); - break; - default: - ret = -ENOTTY; - } - - return ret; -} - -#ifdef CONFIG_COMPAT -static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, - unsigned int cmd, unsigned long arg) -{ - return rbd_ioctl(bdev, mode, cmd, arg); -} -#endif /* CONFIG_COMPAT */ - static const struct block_device_operations rbd_bd_ops = { .owner = THIS_MODULE, .open = rbd_open, .release = rbd_release, - .ioctl = rbd_ioctl, -#ifdef CONFIG_COMPAT - .compat_ioctl = rbd_compat_ioctl, -#endif }; /* @@ -768,24 +755,23 @@ static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) */ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) { - struct rbd_client *client_node; - bool found = false; + struct rbd_client *rbdc = NULL, *iter; if (ceph_opts->flags & CEPH_OPT_NOSHARE) return NULL; spin_lock(&rbd_client_list_lock); - list_for_each_entry(client_node, &rbd_client_list, node) { - if (!ceph_compare_options(ceph_opts, client_node->client)) { - __rbd_get_client(client_node); + list_for_each_entry(iter, &rbd_client_list, node) { + if (!ceph_compare_options(ceph_opts, iter->client)) { + __rbd_get_client(iter); - found = true; + rbdc = iter; break; } } spin_unlock(&rbd_client_list_lock); - return found ? client_node : NULL; + return rbdc; } /* @@ -793,89 +779,74 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) */ enum { Opt_queue_depth, - Opt_last_int, + Opt_alloc_size, + Opt_lock_timeout, /* int args above */ - Opt_last_string, + Opt_pool_ns, + Opt_compression_hint, /* string args above */ Opt_read_only, Opt_read_write, Opt_lock_on_read, Opt_exclusive, - Opt_err + Opt_notrim, }; -static match_table_t rbd_opts_tokens = { - {Opt_queue_depth, "queue_depth=%d"}, - /* int args above */ - /* string args above */ - {Opt_read_only, "read_only"}, - {Opt_read_only, "ro"}, /* Alternate spelling */ - {Opt_read_write, "read_write"}, - {Opt_read_write, "rw"}, /* Alternate spelling */ - {Opt_lock_on_read, "lock_on_read"}, - {Opt_exclusive, "exclusive"}, - {Opt_err, NULL} +enum { + Opt_compression_hint_none, + Opt_compression_hint_compressible, + Opt_compression_hint_incompressible, +}; + +static const struct constant_table rbd_param_compression_hint[] = { + {"none", Opt_compression_hint_none}, + {"compressible", Opt_compression_hint_compressible}, + {"incompressible", Opt_compression_hint_incompressible}, + {} +}; + +static const struct fs_parameter_spec rbd_parameters[] = { + fsparam_u32 ("alloc_size", Opt_alloc_size), + fsparam_enum ("compression_hint", Opt_compression_hint, + rbd_param_compression_hint), + fsparam_flag ("exclusive", Opt_exclusive), + fsparam_flag ("lock_on_read", Opt_lock_on_read), + fsparam_u32 ("lock_timeout", Opt_lock_timeout), + fsparam_flag ("notrim", Opt_notrim), + fsparam_string ("_pool_ns", Opt_pool_ns), + fsparam_u32 ("queue_depth", Opt_queue_depth), + fsparam_flag ("read_only", Opt_read_only), + fsparam_flag ("read_write", Opt_read_write), + fsparam_flag ("ro", Opt_read_only), + fsparam_flag ("rw", Opt_read_write), + {} }; struct rbd_options { int queue_depth; + int alloc_size; + unsigned long lock_timeout; bool read_only; bool lock_on_read; bool exclusive; + bool trim; + + u32 alloc_hint_flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */ }; -#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ +#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_DEFAULT_RQ +#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024) +#define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */ #define RBD_READ_ONLY_DEFAULT false #define RBD_LOCK_ON_READ_DEFAULT false #define RBD_EXCLUSIVE_DEFAULT false +#define RBD_TRIM_DEFAULT true -static int parse_rbd_opts_token(char *c, void *private) -{ - struct rbd_options *rbd_opts = private; - substring_t argstr[MAX_OPT_ARGS]; - int token, intval, ret; - - token = match_token(c, rbd_opts_tokens, argstr); - if (token < Opt_last_int) { - ret = match_int(&argstr[0], &intval); - if (ret < 0) { - pr_err("bad mount option arg (not int) at '%s'\n", c); - return ret; - } - dout("got int token %d val %d\n", token, intval); - } else if (token > Opt_last_int && token < Opt_last_string) { - dout("got string token %d val %s\n", token, argstr[0].from); - } else { - dout("got token %d\n", token); - } - - switch (token) { - case Opt_queue_depth: - if (intval < 1) { - pr_err("queue_depth out of range\n"); - return -EINVAL; - } - rbd_opts->queue_depth = intval; - break; - case Opt_read_only: - rbd_opts->read_only = true; - break; - case Opt_read_write: - rbd_opts->read_only = false; - break; - case Opt_lock_on_read: - rbd_opts->lock_on_read = true; - break; - case Opt_exclusive: - rbd_opts->exclusive = true; - break; - default: - /* libceph prints "bad option" msg */ - return -EINVAL; - } - - return 0; -} +struct rbd_parse_opts_ctx { + struct rbd_spec *spec; + struct ceph_options *copts; + struct rbd_options *opts; +}; static char* obj_op_name(enum obj_operation_type op_type) { @@ -886,32 +857,14 @@ static char* obj_op_name(enum obj_operation_type op_type) return "write"; case OBJ_OP_DISCARD: return "discard"; + case OBJ_OP_ZEROOUT: + return "zeroout"; default: return "???"; } } /* - * Get a ceph client with specific addr and configuration, if one does - * not exist create it. Either way, ceph_opts is consumed by this - * function. - */ -static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) -{ - struct rbd_client *rbdc; - - mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); - rbdc = rbd_client_find(ceph_opts); - if (rbdc) /* using an existing client */ - ceph_destroy_options(ceph_opts); - else - rbdc = rbd_client_create(ceph_opts); - mutex_unlock(&client_mutex); - - return rbdc; -} - -/* * Destroy ceph client * * Caller must hold rbd_client_list_lock. @@ -939,6 +892,40 @@ static void rbd_put_client(struct rbd_client *rbdc) kref_put(&rbdc->kref, rbd_client_release); } +/* + * Get a ceph client with specific addr and configuration, if one does + * not exist create it. Either way, ceph_opts is consumed by this + * function. + */ +static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) +{ + struct rbd_client *rbdc; + int ret; + + mutex_lock(&client_mutex); + rbdc = rbd_client_find(ceph_opts); + if (rbdc) { + ceph_destroy_options(ceph_opts); + + /* + * Using an existing client. Make sure ->pg_pools is up to + * date before we look up the pool id in do_rbd_add(). + */ + ret = ceph_wait_for_latest_osdmap(rbdc->client, + rbdc->client->options->mount_timeout); + if (ret) { + rbd_warn(NULL, "failed to get latest osdmap: %d", ret); + rbd_put_client(rbdc); + rbdc = ERR_PTR(ret); + } + } else { + rbdc = rbd_client_create(ceph_opts); + } + mutex_unlock(&client_mutex); + + return rbdc; +} + static bool rbd_image_format_valid(u32 image_format) { return image_format == 1 || image_format == 2; @@ -1007,15 +994,24 @@ static void rbd_init_layout(struct rbd_device *rbd_dev) RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); } +static void rbd_image_header_cleanup(struct rbd_image_header *header) +{ + kfree(header->object_prefix); + ceph_put_snap_context(header->snapc); + kfree(header->snap_sizes); + kfree(header->snap_names); + + memset(header, 0, sizeof(*header)); +} + /* * Fill an rbd image header with information from the given format 1 * on-disk header. */ -static int rbd_header_from_disk(struct rbd_device *rbd_dev, - struct rbd_image_header_ondisk *ondisk) +static int rbd_header_from_disk(struct rbd_image_header *header, + struct rbd_image_header_ondisk *ondisk, + bool first_time) { - struct rbd_image_header *header = &rbd_dev->header; - bool first_time = header->object_prefix == NULL; struct ceph_snap_context *snapc; char *object_prefix = NULL; char *snap_names = NULL; @@ -1082,11 +1078,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, if (first_time) { header->object_prefix = object_prefix; header->obj_order = ondisk->options.order; - rbd_init_layout(rbd_dev); - } else { - ceph_put_snap_context(header->snapc); - kfree(header->snap_names); - kfree(header->snap_sizes); } /* The remaining fields always get updated (when we refresh) */ @@ -1211,1829 +1202,2413 @@ static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, return 0; } -static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, - u64 *snap_features) -{ - rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); - if (snap_id == CEPH_NOSNAP) { - *snap_features = rbd_dev->header.features; - } else if (rbd_dev->image_format == 1) { - *snap_features = 0; /* No features for format 1 */ - } else { - u64 features = 0; - int ret; - - ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); - if (ret) - return ret; - - *snap_features = features; - } - return 0; -} - static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) { u64 snap_id = rbd_dev->spec->snap_id; u64 size = 0; - u64 features = 0; int ret; ret = rbd_snap_size(rbd_dev, snap_id, &size); if (ret) return ret; - ret = rbd_snap_features(rbd_dev, snap_id, &features); - if (ret) - return ret; rbd_dev->mapping.size = size; - rbd_dev->mapping.features = features; - return 0; } static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) { rbd_dev->mapping.size = 0; - rbd_dev->mapping.features = 0; } -static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) +static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes) { - u64 segment_size = rbd_obj_bytes(&rbd_dev->header); + struct ceph_bio_iter it = *bio_pos; - return offset & (segment_size - 1); + ceph_bio_iter_advance(&it, off); + ceph_bio_iter_advance_step(&it, bytes, ({ + memzero_bvec(&bv); + })); } -static u64 rbd_segment_length(struct rbd_device *rbd_dev, - u64 offset, u64 length) +static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) { - u64 segment_size = rbd_obj_bytes(&rbd_dev->header); - - offset &= segment_size - 1; + struct ceph_bvec_iter it = *bvec_pos; - rbd_assert(length <= U64_MAX - offset); - if (offset + length > segment_size) - length = segment_size - offset; - - return length; + ceph_bvec_iter_advance(&it, off); + ceph_bvec_iter_advance_step(&it, bytes, ({ + memzero_bvec(&bv); + })); } /* - * bio helpers + * Zero a range in @obj_req data buffer defined by a bio (list) or + * (private) bio_vec array. + * + * @off is relative to the start of the data buffer. */ - -static void bio_chain_put(struct bio *chain) +static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, + u32 bytes) { - struct bio *tmp; + dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes); - while (chain) { - tmp = chain; - chain = chain->bi_next; - bio_put(tmp); + switch (obj_req->img_request->data_type) { + case OBJ_REQUEST_BIO: + zero_bios(&obj_req->bio_pos, off, bytes); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + zero_bvecs(&obj_req->bvec_pos, off, bytes); + break; + default: + BUG(); } } -/* - * zeros a bio chain, starting at specific offset - */ -static void zero_bio_chain(struct bio *chain, int start_ofs) -{ - struct bio_vec bv; - struct bvec_iter iter; - unsigned long flags; - void *buf; - int pos = 0; - - while (chain) { - bio_for_each_segment(bv, chain, iter) { - if (pos + bv.bv_len > start_ofs) { - int remainder = max(start_ofs - pos, 0); - buf = bvec_kmap_irq(&bv, &flags); - memset(buf + remainder, 0, - bv.bv_len - remainder); - flush_dcache_page(bv.bv_page); - bvec_kunmap_irq(buf, &flags); - } - pos += bv.bv_len; - } - - chain = chain->bi_next; - } +static void rbd_obj_request_destroy(struct kref *kref); +static void rbd_obj_request_put(struct rbd_obj_request *obj_request) +{ + rbd_assert(obj_request != NULL); + dout("%s: obj %p (was %d)\n", __func__, obj_request, + kref_read(&obj_request->kref)); + kref_put(&obj_request->kref, rbd_obj_request_destroy); } -/* - * similar to zero_bio_chain(), zeros data defined by a page array, - * starting at the given byte offset from the start of the array and - * continuing up to the given end offset. The pages array is - * assumed to be big enough to hold all bytes up to the end. - */ -static void zero_pages(struct page **pages, u64 offset, u64 end) +static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, + struct rbd_obj_request *obj_request) { - struct page **page = &pages[offset >> PAGE_SHIFT]; - - rbd_assert(end > offset); - rbd_assert(end - offset <= (u64)SIZE_MAX); - while (offset < end) { - size_t page_offset; - size_t length; - unsigned long flags; - void *kaddr; - - page_offset = offset & ~PAGE_MASK; - length = min_t(size_t, PAGE_SIZE - page_offset, end - offset); - local_irq_save(flags); - kaddr = kmap_atomic(*page); - memset(kaddr + page_offset, 0, length); - flush_dcache_page(*page); - kunmap_atomic(kaddr); - local_irq_restore(flags); + rbd_assert(obj_request->img_request == NULL); - offset += length; - page++; - } + /* Image request now owns object's original reference */ + obj_request->img_request = img_request; + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); } -/* - * Clone a portion of a bio, starting at the given byte offset - * and continuing for the number of bytes indicated. - */ -static struct bio *bio_clone_range(struct bio *bio_src, - unsigned int offset, - unsigned int len, - gfp_t gfpmask) +static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, + struct rbd_obj_request *obj_request) { - struct bio *bio; - - bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone); - if (!bio) - return NULL; /* ENOMEM */ + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); + list_del(&obj_request->ex.oe_item); + rbd_assert(obj_request->img_request == img_request); + rbd_obj_request_put(obj_request); +} - bio_advance(bio, offset); - bio->bi_iter.bi_size = len; +static void rbd_osd_submit(struct ceph_osd_request *osd_req) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; - return bio; + dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n", + __func__, osd_req, obj_req, obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len); + ceph_osdc_start_request(osd_req->r_osdc, osd_req); } /* - * Clone a portion of a bio chain, starting at the given byte offset - * into the first bio in the source chain and continuing for the - * number of bytes indicated. The result is another bio chain of - * exactly the given length, or a null pointer on error. - * - * The bio_src and offset parameters are both in-out. On entry they - * refer to the first source bio and the offset into that bio where - * the start of data to be cloned is located. - * - * On return, bio_src is updated to refer to the bio in the source - * chain that contains first un-cloned byte, and *offset will - * contain the offset of that byte within that bio. + * The default/initial value for all image request flags is 0. Each + * is conditionally set to 1 at image request initialization time + * and currently never change thereafter. */ -static struct bio *bio_chain_clone_range(struct bio **bio_src, - unsigned int *offset, - unsigned int len, - gfp_t gfpmask) +static void img_request_layered_set(struct rbd_img_request *img_request) { - struct bio *bi = *bio_src; - unsigned int off = *offset; - struct bio *chain = NULL; - struct bio **end; - - /* Build up a chain of clone bios up to the limit */ + set_bit(IMG_REQ_LAYERED, &img_request->flags); +} - if (!bi || off >= bi->bi_iter.bi_size || !len) - return NULL; /* Nothing to clone */ +static bool img_request_layered_test(struct rbd_img_request *img_request) +{ + return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; +} - end = &chain; - while (len) { - unsigned int bi_size; - struct bio *bio; +static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; - if (!bi) { - rbd_warn(NULL, "bio_chain exhausted with %u left", len); - goto out_err; /* EINVAL; ran out of bio's */ - } - bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len); - bio = bio_clone_range(bi, off, bi_size, gfpmask); - if (!bio) - goto out_err; /* ENOMEM */ - - *end = bio; - end = &bio->bi_next; - - off += bi_size; - if (off == bi->bi_iter.bi_size) { - bi = bi->bi_next; - off = 0; - } - len -= bi_size; - } - *bio_src = bi; - *offset = off; + return !obj_req->ex.oe_off && + obj_req->ex.oe_len == rbd_dev->layout.object_size; +} - return chain; -out_err: - bio_chain_put(chain); +static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; - return NULL; + return obj_req->ex.oe_off + obj_req->ex.oe_len == + rbd_dev->layout.object_size; } /* - * The default/initial value for all object request flags is 0. For - * each flag, once its value is set to 1 it is never reset to 0 - * again. + * Must be called after rbd_obj_calc_img_extents(). */ -static void obj_request_img_data_set(struct rbd_obj_request *obj_request) +static void rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req) { - if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { - struct rbd_device *rbd_dev; + rbd_assert(obj_req->img_request->snapc); - rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked img_data", - obj_request); + if (obj_req->img_request->op_type == OBJ_OP_DISCARD) { + dout("%s %p objno %llu discard\n", __func__, obj_req, + obj_req->ex.oe_objno); + return; } -} - -static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; -} -static void obj_request_done_set(struct rbd_obj_request *obj_request) -{ - if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { - struct rbd_device *rbd_dev = NULL; + if (!obj_req->num_img_extents) { + dout("%s %p objno %llu not overlapping\n", __func__, obj_req, + obj_req->ex.oe_objno); + return; + } - if (obj_request_img_data_test(obj_request)) - rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked done", - obj_request); + if (rbd_obj_is_entire(obj_req) && + !obj_req->img_request->snapc->num_snaps) { + dout("%s %p objno %llu entire\n", __func__, obj_req, + obj_req->ex.oe_objno); + return; } -} -static bool obj_request_done_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; } -/* - * This sets the KNOWN flag after (possibly) setting the EXISTS - * flag. The latter is set based on the "exists" value provided. - * - * Note that for our purposes once an object exists it never goes - * away again. It's possible that the response from two existence - * checks are separated by the creation of the target object, and - * the first ("doesn't exist") response arrives *after* the second - * ("does exist"). In that case we ignore the second one. - */ -static void obj_request_existence_set(struct rbd_obj_request *obj_request, - bool exists) +static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) { - if (exists) - set_bit(OBJ_REQ_EXISTS, &obj_request->flags); - set_bit(OBJ_REQ_KNOWN, &obj_request->flags); - smp_mb(); + return ceph_file_extents_bytes(obj_req->img_extents, + obj_req->num_img_extents); } -static bool obj_request_known_test(struct rbd_obj_request *obj_request) +static bool rbd_img_is_write(struct rbd_img_request *img_req) { - smp_mb(); - return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; + switch (img_req->op_type) { + case OBJ_OP_READ: + return false; + case OBJ_OP_WRITE: + case OBJ_OP_DISCARD: + case OBJ_OP_ZEROOUT: + return true; + default: + BUG(); + } } -static bool obj_request_exists_test(struct rbd_obj_request *obj_request) +static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { - smp_mb(); - return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; + struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; + + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + + /* + * Writes aren't allowed to return a data payload. In some + * guarded write cases (e.g. stat + zero on an empty object) + * a stat response makes it through, but we don't care. + */ + if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request)) + result = 0; + else + result = osd_req->r_result; + + rbd_obj_handle_request(obj_req, result); } -static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request) +static void rbd_osd_format_read(struct ceph_osd_request *osd_req) { + struct rbd_obj_request *obj_request = osd_req->r_priv; struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; + struct ceph_options *opt = rbd_dev->rbd_client->client->options; - return obj_request->img_offset < - round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header)); + osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica; + osd_req->r_snapid = obj_request->img_request->snap_id; } -static void rbd_obj_request_get(struct rbd_obj_request *obj_request) +static void rbd_osd_format_write(struct ceph_osd_request *osd_req) { - dout("%s: obj %p (was %d)\n", __func__, obj_request, - kref_read(&obj_request->kref)); - kref_get(&obj_request->kref); -} + struct rbd_obj_request *obj_request = osd_req->r_priv; -static void rbd_obj_request_destroy(struct kref *kref); -static void rbd_obj_request_put(struct rbd_obj_request *obj_request) -{ - rbd_assert(obj_request != NULL); - dout("%s: obj %p (was %d)\n", __func__, obj_request, - kref_read(&obj_request->kref)); - kref_put(&obj_request->kref, rbd_obj_request_destroy); + osd_req->r_flags = CEPH_OSD_FLAG_WRITE; + ktime_get_real_ts64(&osd_req->r_mtime); + osd_req->r_data_offset = obj_request->ex.oe_off; } -static void rbd_img_request_get(struct rbd_img_request *img_request) +static struct ceph_osd_request * +__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, + struct ceph_snap_context *snapc, int num_ops) { - dout("%s: img %p (was %d)\n", __func__, img_request, - kref_read(&img_request->kref)); - kref_get(&img_request->kref); + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + const char *name_format = rbd_dev->image_format == 1 ? + RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; + int ret; + + req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); + if (!req) + return ERR_PTR(-ENOMEM); + + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); + req->r_callback = rbd_osd_req_callback; + req->r_priv = obj_req; + + /* + * Data objects may be stored in a separate pool, but always in + * the same namespace in that pool as the header in its pool. + */ + ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); + req->r_base_oloc.pool = rbd_dev->layout.pool_id; + + ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, + rbd_dev->header.object_prefix, + obj_req->ex.oe_objno); + if (ret) + return ERR_PTR(ret); + + return req; } -static bool img_request_child_test(struct rbd_img_request *img_request); -static void rbd_parent_request_destroy(struct kref *kref); -static void rbd_img_request_destroy(struct kref *kref); -static void rbd_img_request_put(struct rbd_img_request *img_request) +static struct ceph_osd_request * +rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops) { - rbd_assert(img_request != NULL); - dout("%s: img %p (was %d)\n", __func__, img_request, - kref_read(&img_request->kref)); - if (img_request_child_test(img_request)) - kref_put(&img_request->kref, rbd_parent_request_destroy); - else - kref_put(&img_request->kref, rbd_img_request_destroy); + rbd_assert(obj_req->img_request->snapc); + return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc, + num_ops); } -static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, - struct rbd_obj_request *obj_request) +static struct rbd_obj_request *rbd_obj_request_create(void) { - rbd_assert(obj_request->img_request == NULL); + struct rbd_obj_request *obj_request; - /* Image request now owns object's original reference */ - obj_request->img_request = img_request; - obj_request->which = img_request->obj_request_count; - rbd_assert(!obj_request_img_data_test(obj_request)); - obj_request_img_data_set(obj_request); - rbd_assert(obj_request->which != BAD_WHICH); - img_request->obj_request_count++; - list_add_tail(&obj_request->links, &img_request->obj_requests); - dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, - obj_request->which); -} + obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); + if (!obj_request) + return NULL; -static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, - struct rbd_obj_request *obj_request) -{ - rbd_assert(obj_request->which != BAD_WHICH); + ceph_object_extent_init(&obj_request->ex); + INIT_LIST_HEAD(&obj_request->osd_reqs); + mutex_init(&obj_request->state_mutex); + kref_init(&obj_request->kref); - dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, - obj_request->which); - list_del(&obj_request->links); - rbd_assert(img_request->obj_request_count > 0); - img_request->obj_request_count--; - rbd_assert(obj_request->which == img_request->obj_request_count); - obj_request->which = BAD_WHICH; - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request->img_request == img_request); - obj_request->img_request = NULL; - obj_request->callback = NULL; - rbd_obj_request_put(obj_request); + dout("%s %p\n", __func__, obj_request); + return obj_request; } -static bool obj_request_type_valid(enum obj_request_type type) +static void rbd_obj_request_destroy(struct kref *kref) { - switch (type) { + struct rbd_obj_request *obj_request; + struct ceph_osd_request *osd_req; + u32 i; + + obj_request = container_of(kref, struct rbd_obj_request, kref); + + dout("%s: obj %p\n", __func__, obj_request); + + while (!list_empty(&obj_request->osd_reqs)) { + osd_req = list_first_entry(&obj_request->osd_reqs, + struct ceph_osd_request, r_private_item); + list_del_init(&osd_req->r_private_item); + ceph_osdc_put_request(osd_req); + } + + switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: case OBJ_REQUEST_BIO: - case OBJ_REQUEST_PAGES: - return true; + case OBJ_REQUEST_BVECS: + break; /* Nothing to do */ + case OBJ_REQUEST_OWN_BVECS: + kfree(obj_request->bvec_pos.bvecs); + break; default: - return false; + BUG(); + } + + kfree(obj_request->img_extents); + if (obj_request->copyup_bvecs) { + for (i = 0; i < obj_request->copyup_bvec_count; i++) { + if (obj_request->copyup_bvecs[i].bv_page) + __free_page(obj_request->copyup_bvecs[i].bv_page); + } + kfree(obj_request->copyup_bvecs); } + + kmem_cache_free(rbd_obj_request_cache, obj_request); } -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request); +/* It's OK to call this for a device with no parent */ -static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) +static void rbd_spec_put(struct rbd_spec *spec); +static void rbd_dev_unparent(struct rbd_device *rbd_dev) { - struct ceph_osd_request *osd_req = obj_request->osd_req; - - dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, - obj_request, obj_request->object_no, obj_request->offset, - obj_request->length, osd_req); - if (obj_request_img_data_test(obj_request)) { - WARN_ON(obj_request->callback != rbd_img_obj_callback); - rbd_img_request_get(obj_request->img_request); - } - ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); + rbd_dev_remove_parent(rbd_dev); + rbd_spec_put(rbd_dev->parent_spec); + rbd_dev->parent_spec = NULL; + rbd_dev->parent_overlap = 0; } -static void rbd_img_request_complete(struct rbd_img_request *img_request) +/* + * Parent image reference counting is used to determine when an + * image's parent fields can be safely torn down--after there are no + * more in-flight requests to the parent image. When the last + * reference is dropped, cleaning them up is safe. + */ +static void rbd_dev_parent_put(struct rbd_device *rbd_dev) { + int counter; - dout("%s: img %p\n", __func__, img_request); + if (!rbd_dev->parent_spec) + return; - /* - * If no error occurred, compute the aggregate transfer - * count for the image request. We could instead use - * atomic64_cmpxchg() to update it as each object request - * completes; not clear which way is better off hand. - */ - if (!img_request->result) { - struct rbd_obj_request *obj_request; - u64 xferred = 0; + counter = atomic_dec_return_safe(&rbd_dev->parent_ref); + if (counter > 0) + return; - for_each_obj_request(img_request, obj_request) - xferred += obj_request->xferred; - img_request->xferred = xferred; - } + /* Last reference; clean up parent data structures */ - if (img_request->callback) - img_request->callback(img_request); + if (!counter) + rbd_dev_unparent(rbd_dev); else - rbd_img_request_put(img_request); + rbd_warn(rbd_dev, "parent reference underflow"); } /* - * The default/initial value for all image request flags is 0. Each - * is conditionally set to 1 at image request initialization time - * and currently never change thereafter. + * If an image has a non-zero parent overlap, get a reference to its + * parent. + * + * Returns true if the rbd device has a parent with a non-zero + * overlap and a reference for it was successfully taken, or + * false otherwise. */ -static void img_request_write_set(struct rbd_img_request *img_request) +static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) { - set_bit(IMG_REQ_WRITE, &img_request->flags); - smp_mb(); + int counter = 0; + + if (!rbd_dev->parent_spec) + return false; + + if (rbd_dev->parent_overlap) + counter = atomic_inc_return_safe(&rbd_dev->parent_ref); + + if (counter < 0) + rbd_warn(rbd_dev, "parent reference overflow"); + + return counter > 0; } -static bool img_request_write_test(struct rbd_img_request *img_request) +static void rbd_img_request_init(struct rbd_img_request *img_request, + struct rbd_device *rbd_dev, + enum obj_operation_type op_type) { - smp_mb(); - return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; + memset(img_request, 0, sizeof(*img_request)); + + img_request->rbd_dev = rbd_dev; + img_request->op_type = op_type; + + INIT_LIST_HEAD(&img_request->lock_item); + INIT_LIST_HEAD(&img_request->object_extents); + mutex_init(&img_request->state_mutex); } /* - * Set the discard flag when the img_request is an discard request + * Only snap_id is captured here, for reads. For writes, snapshot + * context is captured in rbd_img_object_requests() after exclusive + * lock is ensured to be held. */ -static void img_request_discard_set(struct rbd_img_request *img_request) +static void rbd_img_capture_header(struct rbd_img_request *img_req) { - set_bit(IMG_REQ_DISCARD, &img_request->flags); - smp_mb(); + struct rbd_device *rbd_dev = img_req->rbd_dev; + + lockdep_assert_held(&rbd_dev->header_rwsem); + + if (!rbd_img_is_write(img_req)) + img_req->snap_id = rbd_dev->spec->snap_id; + + if (rbd_dev_parent_get(rbd_dev)) + img_request_layered_set(img_req); } -static bool img_request_discard_test(struct rbd_img_request *img_request) +static void rbd_img_request_destroy(struct rbd_img_request *img_request) { - smp_mb(); - return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0; + struct rbd_obj_request *obj_request; + struct rbd_obj_request *next_obj_request; + + dout("%s: img %p\n", __func__, img_request); + + WARN_ON(!list_empty(&img_request->lock_item)); + for_each_obj_request_safe(img_request, obj_request, next_obj_request) + rbd_img_obj_request_del(img_request, obj_request); + + if (img_request_layered_test(img_request)) + rbd_dev_parent_put(img_request->rbd_dev); + + if (rbd_img_is_write(img_request)) + ceph_put_snap_context(img_request->snapc); + + if (test_bit(IMG_REQ_CHILD, &img_request->flags)) + kmem_cache_free(rbd_img_request_cache, img_request); } -static void img_request_child_set(struct rbd_img_request *img_request) +#define BITS_PER_OBJ 2 +#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ) +#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1) + +static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno, + u64 *index, u8 *shift) { - set_bit(IMG_REQ_CHILD, &img_request->flags); - smp_mb(); + u32 off; + + rbd_assert(objno < rbd_dev->object_map_size); + *index = div_u64_rem(objno, OBJS_PER_BYTE, &off); + *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ; } -static void img_request_child_clear(struct rbd_img_request *img_request) +static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) { - clear_bit(IMG_REQ_CHILD, &img_request->flags); - smp_mb(); + u64 index; + u8 shift; + + lockdep_assert_held(&rbd_dev->object_map_lock); + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + return (rbd_dev->object_map[index] >> shift) & OBJ_MASK; } -static bool img_request_child_test(struct rbd_img_request *img_request) +static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val) { - smp_mb(); - return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; + u64 index; + u8 shift; + u8 *p; + + lockdep_assert_held(&rbd_dev->object_map_lock); + rbd_assert(!(val & ~OBJ_MASK)); + + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + p = &rbd_dev->object_map[index]; + *p = (*p & ~(OBJ_MASK << shift)) | (val << shift); } -static void img_request_layered_set(struct rbd_img_request *img_request) +static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) { - set_bit(IMG_REQ_LAYERED, &img_request->flags); - smp_mb(); + u8 state; + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + spin_unlock(&rbd_dev->object_map_lock); + return state; } -static void img_request_layered_clear(struct rbd_img_request *img_request) +static bool use_object_map(struct rbd_device *rbd_dev) { - clear_bit(IMG_REQ_LAYERED, &img_request->flags); - smp_mb(); + /* + * An image mapped read-only can't use the object map -- it isn't + * loaded because the header lock isn't acquired. Someone else can + * write to the image and update the object map behind our back. + * + * A snapshot can't be written to, so using the object map is always + * safe. + */ + if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev)) + return false; + + return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) && + !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)); } -static bool img_request_layered_test(struct rbd_img_request *img_request) +static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno) { - smp_mb(); - return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; + u8 state; + + /* fall back to default logic if object map is disabled or invalid */ + if (!use_object_map(rbd_dev)) + return true; + + state = rbd_object_map_get(rbd_dev, objno); + return state != OBJECT_NONEXISTENT; } -static enum obj_operation_type -rbd_img_request_op_type(struct rbd_img_request *img_request) +static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id, + struct ceph_object_id *oid) { - if (img_request_write_test(img_request)) - return OBJ_OP_WRITE; - else if (img_request_discard_test(img_request)) - return OBJ_OP_DISCARD; + if (snap_id == CEPH_NOSNAP) + ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id); else - return OBJ_OP_READ; + ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id, snap_id); } -static void -rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) +static int rbd_object_map_lock(struct rbd_device *rbd_dev) { - u64 xferred = obj_request->xferred; - u64 length = obj_request->length; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + u8 lock_type; + char *lock_tag; + struct ceph_locker *lockers; + u32 num_lockers; + bool broke_lock = false; + int ret; - dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, - obj_request, obj_request->img_request, obj_request->result, - xferred, length); - /* - * ENOENT means a hole in the image. We zero-fill the entire - * length of the request. A short read also implies zero-fill - * to the end of the request. An error requires the whole - * length of the request to be reported finished with an error - * to the block layer. In each case we update the xferred - * count to indicate the whole request was satisfied. - */ - rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); - if (obj_request->result == -ENOENT) { - if (obj_request->type == OBJ_REQUEST_BIO) - zero_bio_chain(obj_request->bio_list, 0); - else - zero_pages(obj_request->pages, 0, length); - obj_request->result = 0; - } else if (xferred < length && !obj_request->result) { - if (obj_request->type == OBJ_REQUEST_BIO) - zero_bio_chain(obj_request->bio_list, xferred); - else - zero_pages(obj_request->pages, xferred, length); + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + +again: + ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0); + if (ret != -EBUSY || broke_lock) { + if (ret == -EEXIST) + ret = 0; /* already locked by myself */ + if (ret) + rbd_warn(rbd_dev, "failed to lock object map: %d", ret); + return ret; } - obj_request->xferred = length; - obj_request_done_set(obj_request); -} -static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p cb %p\n", __func__, obj_request, - obj_request->callback); - if (obj_request->callback) - obj_request->callback(obj_request); - else - complete_all(&obj_request->completion); -} + ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, &lock_type, &lock_tag, + &lockers, &num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; -static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) -{ - obj_request->result = err; - obj_request->xferred = 0; - /* - * kludge - mirror rbd_obj_request_submit() to match a put in - * rbd_img_obj_callback() - */ - if (obj_request_img_data_test(obj_request)) { - WARN_ON(obj_request->callback != rbd_img_obj_callback); - rbd_img_request_get(obj_request->img_request); + rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret); + return ret; } - obj_request_done_set(obj_request); - rbd_obj_request_complete(obj_request); + + kfree(lock_tag); + if (num_lockers == 0) + goto again; + + rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, lockers[0].id.cookie, + &lockers[0].id.name); + ceph_free_lockers(lockers, num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to break object map lock: %d", ret); + return ret; + } + + broke_lock = true; + goto again; } -static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) +static void rbd_object_map_unlock(struct rbd_device *rbd_dev) { - struct rbd_img_request *img_request = NULL; - struct rbd_device *rbd_dev = NULL; - bool layered = false; - - if (obj_request_img_data_test(obj_request)) { - img_request = obj_request->img_request; - layered = img_request && img_request_layered_test(img_request); - rbd_dev = img_request->rbd_dev; - } - - dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, - obj_request, img_request, obj_request->result, - obj_request->xferred, obj_request->length); - if (layered && obj_request->result == -ENOENT && - obj_request->img_offset < rbd_dev->parent_overlap) - rbd_img_parent_read(obj_request); - else if (img_request) - rbd_img_obj_request_read_callback(obj_request); - else - obj_request_done_set(obj_request); + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + + ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + ""); + if (ret && ret != -ENOENT) + rbd_warn(rbd_dev, "failed to unlock object map: %d", ret); } -static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) +static int decode_object_map_header(void **p, void *end, u64 *object_map_size) { - dout("%s: obj %p result %d %llu\n", __func__, obj_request, - obj_request->result, obj_request->length); - /* - * There is no such thing as a successful short write. Set - * it to our originally-requested length. - */ - obj_request->xferred = obj_request->length; - obj_request_done_set(obj_request); + u8 struct_v; + u32 struct_len; + u32 header_len; + void *header_end; + int ret; + + ceph_decode_32_safe(p, end, header_len, e_inval); + header_end = *p + header_len; + + ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v, + &struct_len); + if (ret) + return ret; + + ceph_decode_64_safe(p, end, *object_map_size, e_inval); + + *p = header_end; + return 0; + +e_inval: + return -EINVAL; } -static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request) +static int __rbd_object_map_load(struct rbd_device *rbd_dev) { - dout("%s: obj %p result %d %llu\n", __func__, obj_request, - obj_request->result, obj_request->length); - /* - * There is no such thing as a successful short discard. Set - * it to our originally-requested length. - */ - obj_request->xferred = obj_request->length; - /* discarding a non-existent object is not a problem */ - if (obj_request->result == -ENOENT) - obj_request->result = 0; - obj_request_done_set(obj_request); + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + struct page **pages; + void *p, *end; + size_t reply_len; + u64 num_objects; + u64 object_map_bytes; + u64 object_map_size; + int num_pages; + int ret; + + rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size); + + num_objects = ceph_get_num_objects(&rbd_dev->layout, + rbd_dev->mapping.size); + object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ, + BITS_PER_BYTE); + num_pages = calc_pages_for(0, object_map_bytes) + 1; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + reply_len = num_pages * PAGE_SIZE; + rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid); + ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc, + "rbd", "object_map_load", CEPH_OSD_FLAG_READ, + NULL, 0, pages, &reply_len); + if (ret) + goto out; + + p = page_address(pages[0]); + end = p + min(reply_len, (size_t)PAGE_SIZE); + ret = decode_object_map_header(&p, end, &object_map_size); + if (ret) + goto out; + + if (object_map_size != num_objects) { + rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu", + object_map_size, num_objects); + ret = -EINVAL; + goto out; + } + + if (offset_in_page(p) + object_map_bytes > reply_len) { + ret = -EINVAL; + goto out; + } + + rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL); + if (!rbd_dev->object_map) { + ret = -ENOMEM; + goto out; + } + + rbd_dev->object_map_size = object_map_size; + ceph_copy_from_page_vector(pages, rbd_dev->object_map, + offset_in_page(p), object_map_bytes); + +out: + ceph_release_page_vector(pages, num_pages); + return ret; } -/* - * For a simple stat call there's nothing to do. We'll do more if - * this is part of a write sequence for a layered image. - */ -static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) +static void rbd_object_map_free(struct rbd_device *rbd_dev) { - dout("%s: obj %p\n", __func__, obj_request); - obj_request_done_set(obj_request); + kvfree(rbd_dev->object_map); + rbd_dev->object_map = NULL; + rbd_dev->object_map_size = 0; } -static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) +static int rbd_object_map_load(struct rbd_device *rbd_dev) { - dout("%s: obj %p\n", __func__, obj_request); + int ret; - if (obj_request_img_data_test(obj_request)) - rbd_osd_copyup_callback(obj_request); - else - obj_request_done_set(obj_request); + ret = __rbd_object_map_load(rbd_dev); + if (ret) + return ret; + + ret = rbd_dev_v2_get_flags(rbd_dev); + if (ret) { + rbd_object_map_free(rbd_dev); + return ret; + } + + if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID) + rbd_warn(rbd_dev, "object map is invalid"); + + return 0; } -static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) +static int rbd_object_map_open(struct rbd_device *rbd_dev) { - struct rbd_obj_request *obj_request = osd_req->r_priv; - u16 opcode; + int ret; - dout("%s: osd_req %p\n", __func__, osd_req); - rbd_assert(osd_req == obj_request->osd_req); - if (obj_request_img_data_test(obj_request)) { - rbd_assert(obj_request->img_request); - rbd_assert(obj_request->which != BAD_WHICH); - } else { - rbd_assert(obj_request->which == BAD_WHICH); + ret = rbd_object_map_lock(rbd_dev); + if (ret) + return ret; + + ret = rbd_object_map_load(rbd_dev); + if (ret) { + rbd_object_map_unlock(rbd_dev); + return ret; } - if (osd_req->r_result < 0) - obj_request->result = osd_req->r_result; + return 0; +} + +static void rbd_object_map_close(struct rbd_device *rbd_dev) +{ + rbd_object_map_free(rbd_dev); + rbd_object_map_unlock(rbd_dev); +} + +/* + * This function needs snap_id (or more precisely just something to + * distinguish between HEAD and snapshot object maps), new_state and + * current_state that were passed to rbd_object_map_update(). + * + * To avoid allocating and stashing a context we piggyback on the OSD + * request. A HEAD update has two ops (assert_locked). For new_state + * and current_state we decode our own object_map_update op, encoded in + * rbd_cls_object_map_update(). + */ +static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, + struct ceph_osd_request *osd_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_data *osd_data; + u64 objno; + u8 state, new_state, current_state; + bool has_current_state; + void *p; + + if (osd_req->r_result) + return osd_req->r_result; /* - * We support a 64-bit length, but ultimately it has to be - * passed to the block layer, which just supports a 32-bit - * length field. + * Nothing to do for a snapshot object map. */ - obj_request->xferred = osd_req->r_ops[0].outdata_len; - rbd_assert(obj_request->xferred < (u64)UINT_MAX); + if (osd_req->r_num_ops == 1) + return 0; - opcode = osd_req->r_ops[0].op; - switch (opcode) { - case CEPH_OSD_OP_READ: - rbd_osd_read_callback(obj_request); - break; - case CEPH_OSD_OP_SETALLOCHINT: - rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || - osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); - /* fall through */ - case CEPH_OSD_OP_WRITE: - case CEPH_OSD_OP_WRITEFULL: - rbd_osd_write_callback(obj_request); - break; - case CEPH_OSD_OP_STAT: - rbd_osd_stat_callback(obj_request); - break; - case CEPH_OSD_OP_DELETE: - case CEPH_OSD_OP_TRUNCATE: - case CEPH_OSD_OP_ZERO: - rbd_osd_discard_callback(obj_request); - break; - case CEPH_OSD_OP_CALL: - rbd_osd_call_callback(obj_request); - break; - default: - rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", - obj_request->object_no, opcode); - break; - } + /* + * Update in-memory HEAD object map. + */ + rbd_assert(osd_req->r_num_ops == 2); + osd_data = osd_req_op_data(osd_req, 1, cls, request_data); + rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES); + + p = page_address(osd_data->pages[0]); + objno = ceph_decode_64(&p); + rbd_assert(objno == obj_req->ex.oe_objno); + rbd_assert(ceph_decode_64(&p) == objno + 1); + new_state = ceph_decode_8(&p); + has_current_state = ceph_decode_8(&p); + if (has_current_state) + current_state = ceph_decode_8(&p); + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + if (!has_current_state || current_state == state || + (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN)) + __rbd_object_map_set(rbd_dev, objno, new_state); + spin_unlock(&rbd_dev->object_map_lock); - if (obj_request_done_test(obj_request)) - rbd_obj_request_complete(obj_request); + return 0; } -static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +static void rbd_object_map_callback(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; - rbd_assert(obj_request_img_data_test(obj_request)); - osd_req->r_snapid = obj_request->img_request->snap_id; + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + + result = rbd_object_map_update_finish(obj_req, osd_req); + rbd_obj_handle_request(obj_req, result); } -static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + u8 state = rbd_object_map_get(rbd_dev, objno); + + if (state == new_state || + (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) || + (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) + return false; - ktime_get_real_ts(&osd_req->r_mtime); - osd_req->r_data_offset = obj_request->offset; + return true; } -static struct ceph_osd_request * -__rbd_osd_req_create(struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - int num_ops, unsigned int flags, - struct rbd_obj_request *obj_request) +static int rbd_cls_object_map_update(struct ceph_osd_request *req, + int which, u64 objno, u8 new_state, + const u8 *current_state) +{ + struct page **pages; + void *p, *start; + int ret; + + ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update"); + if (ret) + return ret; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + p = start = page_address(pages[0]); + ceph_encode_64(&p, objno); + ceph_encode_64(&p, objno + 1); + ceph_encode_8(&p, new_state); + if (current_state) { + ceph_encode_8(&p, 1); + ceph_encode_8(&p, *current_state); + } else { + ceph_encode_8(&p, 0); + } + + osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0, + false, true); + return 0; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id, + u8 new_state, const u8 *current_state) { + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_request *req; - const char *name_format = rbd_dev->image_format == 1 ? - RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; + int num_ops = 1; + int which = 0; + int ret; - req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); + if (snap_id == CEPH_NOSNAP) { + if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state)) + return 1; + + num_ops++; /* assert_locked */ + } + + req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO); if (!req) - return NULL; + return -ENOMEM; - req->r_flags = flags; - req->r_callback = rbd_osd_req_callback; - req->r_priv = obj_request; + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); + req->r_callback = rbd_object_map_callback; + req->r_priv = obj_req; - req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, - rbd_dev->header.object_prefix, obj_request->object_no)) - goto err_req; + rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid); + ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + ktime_get_real_ts64(&req->r_mtime); - if (ceph_osdc_alloc_messages(req, GFP_NOIO)) - goto err_req; + if (snap_id == CEPH_NOSNAP) { + /* + * Protect against possible race conditions during lock + * ownership transitions. + */ + ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", ""); + if (ret) + return ret; + } - return req; + ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno, + new_state, current_state); + if (ret) + return ret; -err_req: - ceph_osdc_put_request(req); - return NULL; + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + return ret; + + ceph_osdc_start_request(osdc, req); + return 0; +} + +static void prune_extents(struct ceph_file_extent *img_extents, + u32 *num_img_extents, u64 overlap) +{ + u32 cnt = *num_img_extents; + + /* drop extents completely beyond the overlap */ + while (cnt && img_extents[cnt - 1].fe_off >= overlap) + cnt--; + + if (cnt) { + struct ceph_file_extent *ex = &img_extents[cnt - 1]; + + /* trim final overlapping extent */ + if (ex->fe_off + ex->fe_len > overlap) + ex->fe_len = overlap - ex->fe_off; + } + + *num_img_extents = cnt; } /* - * Create an osd request. A read request has one osd op (read). - * A write request has either one (watch) or two (hint+write) osd ops. - * (All rbd data writes are prefixed with an allocation hint op, but - * technically osd watch is a write request, hence this distinction.) + * Determine the byte range(s) covered by either just the object extent + * or the entire object in the parent image. */ -static struct ceph_osd_request *rbd_osd_req_create( - struct rbd_device *rbd_dev, - enum obj_operation_type op_type, - unsigned int num_ops, - struct rbd_obj_request *obj_request) +static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, + bool entire) { - struct ceph_snap_context *snapc = NULL; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - if (obj_request_img_data_test(obj_request) && - (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) { - struct rbd_img_request *img_request = obj_request->img_request; - if (op_type == OBJ_OP_WRITE) { - rbd_assert(img_request_write_test(img_request)); - } else { - rbd_assert(img_request_discard_test(img_request)); - } - snapc = img_request->snapc; + if (!rbd_dev->parent_overlap) + return 0; + + ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno, + entire ? 0 : obj_req->ex.oe_off, + entire ? rbd_dev->layout.object_size : + obj_req->ex.oe_len, + &obj_req->img_extents, + &obj_req->num_img_extents); + if (ret) + return ret; + + prune_extents(obj_req->img_extents, &obj_req->num_img_extents, + rbd_dev->parent_overlap); + return 0; +} + +static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + switch (obj_req->img_request->data_type) { + case OBJ_REQUEST_BIO: + osd_req_op_extent_osd_data_bio(osd_req, which, + &obj_req->bio_pos, + obj_req->ex.oe_len); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + rbd_assert(obj_req->bvec_pos.iter.bi_size == + obj_req->ex.oe_len); + rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); + osd_req_op_extent_osd_data_bvec_pos(osd_req, which, + &obj_req->bvec_pos); + break; + default: + BUG(); } +} - rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); +static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which) +{ + struct page **pages; - return __rbd_osd_req_create(rbd_dev, snapc, num_ops, - (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? - CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(osd_req, which, pages, + 8 + sizeof(struct ceph_timespec), + 0, false, true); + return 0; } -/* - * Create a copyup osd request based on the information in the object - * request supplied. A copyup request has two or three osd ops, a - * copyup method call, potentially a hint op, and a write or truncate - * or zero op. - */ -static struct ceph_osd_request * -rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) +static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which, + u32 bytes) { - struct rbd_img_request *img_request; - int num_osd_ops = 3; + struct rbd_obj_request *obj_req = osd_req->r_priv; + int ret; - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - rbd_assert(img_request); - rbd_assert(img_request_write_test(img_request) || - img_request_discard_test(img_request)); + ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup"); + if (ret) + return ret; - if (img_request_discard_test(img_request)) - num_osd_ops = 2; + osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs, + obj_req->copyup_bvec_count, bytes); + return 0; +} - return __rbd_osd_req_create(img_request->rbd_dev, - img_request->snapc, num_osd_ops, - CEPH_OSD_FLAG_WRITE, obj_request); +static int rbd_obj_init_read(struct rbd_obj_request *obj_req) +{ + obj_req->read_state = RBD_OBJ_READ_START; + return 0; } -static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) +static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) { - ceph_osdc_put_request(osd_req); + struct rbd_obj_request *obj_req = osd_req->r_priv; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u16 opcode; + + if (!use_object_map(rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) { + osd_req_op_alloc_hint_init(osd_req, which++, + rbd_dev->layout.object_size, + rbd_dev->layout.object_size, + rbd_dev->opts->alloc_hint_flags); + } + + if (rbd_obj_is_entire(obj_req)) + opcode = CEPH_OSD_OP_WRITEFULL; + else + opcode = CEPH_OSD_OP_WRITE; + + osd_req_op_extent_init(osd_req, which, opcode, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_setup_data(osd_req, which); } -static struct rbd_obj_request * -rbd_obj_request_create(enum obj_request_type type) +static int rbd_obj_init_write(struct rbd_obj_request *obj_req) { - struct rbd_obj_request *obj_request; + int ret; - rbd_assert(obj_request_type_valid(type)); + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; - obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); - if (!obj_request) - return NULL; + obj_req->write_state = RBD_OBJ_WRITE_START; + return 0; +} - obj_request->which = BAD_WHICH; - obj_request->type = type; - INIT_LIST_HEAD(&obj_request->links); - init_completion(&obj_request->completion); - kref_init(&obj_request->kref); +static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req) +{ + return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE : + CEPH_OSD_OP_ZERO; +} - dout("%s %p\n", __func__, obj_request); - return obj_request; +static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0); + } else { + osd_req_op_extent_init(osd_req, which, + truncate_or_zero_opcode(obj_req), + obj_req->ex.oe_off, obj_req->ex.oe_len, + 0, 0); + } } -static void rbd_obj_request_destroy(struct kref *kref) +static int rbd_obj_init_discard(struct rbd_obj_request *obj_req) { - struct rbd_obj_request *obj_request; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u64 off, next_off; + int ret; - obj_request = container_of(kref, struct rbd_obj_request, kref); + /* + * Align the range to alloc_size boundary and punt on discards + * that are too small to free up any space. + * + * alloc_size == object_size && is_tail() is a special case for + * filestore with filestore_punch_hole = false, needed to allow + * truncate (in addition to delete). + */ + if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size || + !rbd_obj_is_tail(obj_req)) { + off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size); + next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len, + rbd_dev->opts->alloc_size); + if (off >= next_off) + return 1; + + dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, + obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, + off, next_off - off); + obj_req->ex.oe_off = off; + obj_req->ex.oe_len = next_off - off; + } + + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; - dout("%s: obj %p\n", __func__, obj_request); + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; - rbd_assert(obj_request->img_request == NULL); - rbd_assert(obj_request->which == BAD_WHICH); + obj_req->write_state = RBD_OBJ_WRITE_START; + return 0; +} - if (obj_request->osd_req) - rbd_osd_req_destroy(obj_request->osd_req); +static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + u16 opcode; - rbd_assert(obj_request_type_valid(obj_request->type)); - switch (obj_request->type) { - case OBJ_REQUEST_NODATA: - break; /* Nothing to do */ - case OBJ_REQUEST_BIO: - if (obj_request->bio_list) - bio_chain_put(obj_request->bio_list); - break; - case OBJ_REQUEST_PAGES: - /* img_data requests don't own their page array */ - if (obj_request->pages && - !obj_request_img_data_test(obj_request)) - ceph_release_page_vector(obj_request->pages, - obj_request->page_count); - break; + if (rbd_obj_is_entire(obj_req)) { + if (obj_req->num_img_extents) { + if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + osd_req_op_init(osd_req, which++, + CEPH_OSD_OP_CREATE, 0); + opcode = CEPH_OSD_OP_TRUNCATE; + } else { + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which++, + CEPH_OSD_OP_DELETE, 0); + opcode = 0; + } + } else { + opcode = truncate_or_zero_opcode(obj_req); } - kmem_cache_free(rbd_obj_request_cache, obj_request); + if (opcode) + osd_req_op_extent_init(osd_req, which, opcode, + obj_req->ex.oe_off, obj_req->ex.oe_len, + 0, 0); } -/* It's OK to call this for a device with no parent */ +static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req) +{ + int ret; -static void rbd_spec_put(struct rbd_spec *spec); -static void rbd_dev_unparent(struct rbd_device *rbd_dev) + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; + + if (!obj_req->num_img_extents) { + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; + } + + obj_req->write_state = RBD_OBJ_WRITE_START; + return 0; +} + +static int count_write_ops(struct rbd_obj_request *obj_req) { - rbd_dev_remove_parent(rbd_dev); - rbd_spec_put(rbd_dev->parent_spec); - rbd_dev->parent_spec = NULL; - rbd_dev->parent_overlap = 0; + struct rbd_img_request *img_req = obj_req->img_request; + + switch (img_req->op_type) { + case OBJ_OP_WRITE: + if (!use_object_map(img_req->rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) + return 2; /* setallochint + write/writefull */ + + return 1; /* write/writefull */ + case OBJ_OP_DISCARD: + return 1; /* delete/truncate/zero */ + case OBJ_OP_ZEROOUT: + if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && + !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + return 2; /* create + truncate */ + + return 1; /* delete/truncate/zero */ + default: + BUG(); + } +} + +static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + switch (obj_req->img_request->op_type) { + case OBJ_OP_WRITE: + __rbd_osd_setup_write_ops(osd_req, which); + break; + case OBJ_OP_DISCARD: + __rbd_osd_setup_discard_ops(osd_req, which); + break; + case OBJ_OP_ZEROOUT: + __rbd_osd_setup_zeroout_ops(osd_req, which); + break; + default: + BUG(); + } } /* - * Parent image reference counting is used to determine when an - * image's parent fields can be safely torn down--after there are no - * more in-flight requests to the parent image. When the last - * reference is dropped, cleaning them up is safe. + * Prune the list of object requests (adjust offset and/or length, drop + * redundant requests). Prepare object request state machines and image + * request state machine for execution. */ -static void rbd_dev_parent_put(struct rbd_device *rbd_dev) +static int __rbd_img_fill_request(struct rbd_img_request *img_req) { - int counter; + struct rbd_obj_request *obj_req, *next_obj_req; + int ret; - if (!rbd_dev->parent_spec) - return; + for_each_obj_request_safe(img_req, obj_req, next_obj_req) { + switch (img_req->op_type) { + case OBJ_OP_READ: + ret = rbd_obj_init_read(obj_req); + break; + case OBJ_OP_WRITE: + ret = rbd_obj_init_write(obj_req); + break; + case OBJ_OP_DISCARD: + ret = rbd_obj_init_discard(obj_req); + break; + case OBJ_OP_ZEROOUT: + ret = rbd_obj_init_zeroout(obj_req); + break; + default: + BUG(); + } + if (ret < 0) + return ret; + if (ret > 0) { + rbd_img_obj_request_del(img_req, obj_req); + continue; + } + } - counter = atomic_dec_return_safe(&rbd_dev->parent_ref); - if (counter > 0) - return; + img_req->state = RBD_IMG_START; + return 0; +} - /* Last reference; clean up parent data structures */ +union rbd_img_fill_iter { + struct ceph_bio_iter bio_iter; + struct ceph_bvec_iter bvec_iter; +}; - if (!counter) - rbd_dev_unparent(rbd_dev); - else - rbd_warn(rbd_dev, "parent reference underflow"); +struct rbd_img_fill_ctx { + enum obj_request_type pos_type; + union rbd_img_fill_iter *pos; + union rbd_img_fill_iter iter; + ceph_object_extent_fn_t set_pos_fn; + ceph_object_extent_fn_t count_fn; + ceph_object_extent_fn_t copy_fn; +}; + +static struct ceph_object_extent *alloc_object_extent(void *arg) +{ + struct rbd_img_request *img_req = arg; + struct rbd_obj_request *obj_req; + + obj_req = rbd_obj_request_create(); + if (!obj_req) + return NULL; + + rbd_img_obj_request_add(img_req, obj_req); + return &obj_req->ex; } /* - * If an image has a non-zero parent overlap, get a reference to its - * parent. - * - * Returns true if the rbd device has a parent with a non-zero - * overlap and a reference for it was successfully taken, or - * false otherwise. + * While su != os && sc == 1 is technically not fancy (it's the same + * layout as su == os && sc == 1), we can't use the nocopy path for it + * because ->set_pos_fn() should be called only once per object. + * ceph_file_to_extents() invokes action_fn once per stripe unit, so + * treat su != os && sc == 1 as fancy. */ -static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) +static bool rbd_layout_is_fancy(struct ceph_file_layout *l) { - int counter = 0; + return l->stripe_unit != l->object_size; +} - if (!rbd_dev->parent_spec) - return false; +static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct rbd_img_fill_ctx *fctx) +{ + u32 i; + int ret; - down_read(&rbd_dev->header_rwsem); - if (rbd_dev->parent_overlap) - counter = atomic_inc_return_safe(&rbd_dev->parent_ref); - up_read(&rbd_dev->header_rwsem); + img_req->data_type = fctx->pos_type; - if (counter < 0) - rbd_warn(rbd_dev, "parent reference overflow"); + /* + * Create object requests and set each object request's starting + * position in the provided bio (list) or bio_vec array. + */ + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_file_to_extents(&img_req->rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + alloc_object_extent, img_req, + fctx->set_pos_fn, &fctx->iter); + if (ret) + return ret; + } - return counter > 0; + return __rbd_img_fill_request(img_req); } /* - * Caller is responsible for filling in the list of object requests - * that comprises the image request, and the Linux request pointer - * (if there is one). + * Map a list of image extents to a list of object extents, create the + * corresponding object requests (normally each to a different object, + * but not always) and add them to @img_req. For each object request, + * set up its data descriptor to point to the corresponding chunk(s) of + * @fctx->pos data buffer. + * + * Because ceph_file_to_extents() will merge adjacent object extents + * together, each object request's data descriptor may point to multiple + * different chunks of @fctx->pos data buffer. + * + * @fctx->pos data buffer is assumed to be large enough. */ -static struct rbd_img_request *rbd_img_request_create( - struct rbd_device *rbd_dev, - u64 offset, u64 length, - enum obj_operation_type op_type, - struct ceph_snap_context *snapc) +static int rbd_img_fill_request(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct rbd_img_fill_ctx *fctx) { - struct rbd_img_request *img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; + struct rbd_obj_request *obj_req; + u32 i; + int ret; - img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO); - if (!img_request) - return NULL; + if (fctx->pos_type == OBJ_REQUEST_NODATA || + !rbd_layout_is_fancy(&rbd_dev->layout)) + return rbd_img_fill_request_nocopy(img_req, img_extents, + num_img_extents, fctx); - img_request->rq = NULL; - img_request->rbd_dev = rbd_dev; - img_request->offset = offset; - img_request->length = length; - img_request->flags = 0; - if (op_type == OBJ_OP_DISCARD) { - img_request_discard_set(img_request); - img_request->snapc = snapc; - } else if (op_type == OBJ_OP_WRITE) { - img_request_write_set(img_request); - img_request->snapc = snapc; - } else { - img_request->snap_id = rbd_dev->spec->snap_id; + img_req->data_type = OBJ_REQUEST_OWN_BVECS; + + /* + * Create object requests and determine ->bvec_count for each object + * request. Note that ->bvec_count sum over all object requests may + * be greater than the number of bio_vecs in the provided bio (list) + * or bio_vec array because when mapped, those bio_vecs can straddle + * stripe unit boundaries. + */ + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_file_to_extents(&rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + alloc_object_extent, img_req, + fctx->count_fn, &fctx->iter); + if (ret) + return ret; + } + + for_each_obj_request(img_req, obj_req) { + obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count, + sizeof(*obj_req->bvec_pos.bvecs), + GFP_NOIO); + if (!obj_req->bvec_pos.bvecs) + return -ENOMEM; } - if (rbd_dev_parent_get(rbd_dev)) - img_request_layered_set(img_request); - spin_lock_init(&img_request->completion_lock); - img_request->next_completion = 0; - img_request->callback = NULL; - img_request->result = 0; - img_request->obj_request_count = 0; - INIT_LIST_HEAD(&img_request->obj_requests); - kref_init(&img_request->kref); - dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, - obj_op_name(op_type), offset, length, img_request); + /* + * Fill in each object request's private bio_vec array, splitting and + * rearranging the provided bio_vecs in stripe unit chunks as needed. + */ + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_iterate_extents(&rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + fctx->copy_fn, &fctx->iter); + if (ret) + return ret; + } - return img_request; + return __rbd_img_fill_request(img_req); } -static void rbd_img_request_destroy(struct kref *kref) +static int rbd_img_fill_nodata(struct rbd_img_request *img_req, + u64 off, u64 len) { - struct rbd_img_request *img_request; - struct rbd_obj_request *obj_request; - struct rbd_obj_request *next_obj_request; + struct ceph_file_extent ex = { off, len }; + union rbd_img_fill_iter dummy = {}; + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_NODATA, + .pos = &dummy, + }; - img_request = container_of(kref, struct rbd_img_request, kref); + return rbd_img_fill_request(img_req, &ex, 1, &fctx); +} - dout("%s: img %p\n", __func__, img_request); +static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; - for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_img_obj_request_del(img_request, obj_request); - rbd_assert(img_request->obj_request_count == 0); + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + obj_req->bio_pos = *it; + ceph_bio_iter_advance(it, bytes); +} - if (img_request_layered_test(img_request)) { - img_request_layered_clear(img_request); - rbd_dev_parent_put(img_request->rbd_dev); - } +static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; - if (img_request_write_test(img_request) || - img_request_discard_test(img_request)) - ceph_put_snap_context(img_request->snapc); + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + ceph_bio_iter_advance_step(it, bytes, ({ + obj_req->bvec_count++; + })); - kmem_cache_free(rbd_img_request_cache, img_request); } -static struct rbd_img_request *rbd_parent_request_create( - struct rbd_obj_request *obj_request, - u64 img_offset, u64 length) +static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) { - struct rbd_img_request *parent_request; - struct rbd_device *rbd_dev; + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + ceph_bio_iter_advance_step(it, bytes, ({ + obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; + obj_req->bvec_pos.iter.bi_size += bv.bv_len; + })); +} - parent_request = rbd_img_request_create(rbd_dev->parent, img_offset, - length, OBJ_OP_READ, NULL); - if (!parent_request) - return NULL; +static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct ceph_bio_iter *bio_pos) +{ + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_BIO, + .pos = (union rbd_img_fill_iter *)bio_pos, + .set_pos_fn = set_bio_pos, + .count_fn = count_bio_bvecs, + .copy_fn = copy_bio_bvecs, + }; + + return rbd_img_fill_request(img_req, img_extents, num_img_extents, + &fctx); +} - img_request_child_set(parent_request); - rbd_obj_request_get(obj_request); - parent_request->obj_request = obj_request; +static int rbd_img_fill_from_bio(struct rbd_img_request *img_req, + u64 off, u64 len, struct bio *bio) +{ + struct ceph_file_extent ex = { off, len }; + struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter }; - return parent_request; + return __rbd_img_fill_from_bio(img_req, &ex, 1, &it); } -static void rbd_parent_request_destroy(struct kref *kref) +static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) { - struct rbd_img_request *parent_request; - struct rbd_obj_request *orig_request; + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - parent_request = container_of(kref, struct rbd_img_request, kref); - orig_request = parent_request->obj_request; + obj_req->bvec_pos = *it; + ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes); + ceph_bvec_iter_advance(it, bytes); +} - parent_request->obj_request = NULL; - rbd_obj_request_put(orig_request); - img_request_child_clear(parent_request); +static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - rbd_img_request_destroy(kref); + ceph_bvec_iter_advance_step(it, bytes, ({ + obj_req->bvec_count++; + })); } -static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) +static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) { - struct rbd_img_request *img_request; - unsigned int xferred; - int result; - bool more; + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; + ceph_bvec_iter_advance_step(it, bytes, ({ + obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; + obj_req->bvec_pos.iter.bi_size += bv.bv_len; + })); +} - rbd_assert(obj_request->xferred <= (u64)UINT_MAX); - xferred = (unsigned int)obj_request->xferred; - result = obj_request->result; - if (result) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - enum obj_operation_type op_type; +static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct ceph_bvec_iter *bvec_pos) +{ + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_BVECS, + .pos = (union rbd_img_fill_iter *)bvec_pos, + .set_pos_fn = set_bvec_pos, + .count_fn = count_bvecs, + .copy_fn = copy_bvecs, + }; - if (img_request_discard_test(img_request)) - op_type = OBJ_OP_DISCARD; - else if (img_request_write_test(img_request)) - op_type = OBJ_OP_WRITE; - else - op_type = OBJ_OP_READ; - - rbd_warn(rbd_dev, "%s %llx at %llx (%llx)", - obj_op_name(op_type), obj_request->length, - obj_request->img_offset, obj_request->offset); - rbd_warn(rbd_dev, " result %d xferred %x", - result, xferred); - if (!img_request->result) - img_request->result = result; - /* - * Need to end I/O on the entire obj_request worth of - * bytes in case of error. - */ - xferred = obj_request->length; - } + return rbd_img_fill_request(img_req, img_extents, num_img_extents, + &fctx); +} - if (img_request_child_test(img_request)) { - rbd_assert(img_request->obj_request != NULL); - more = obj_request->which < img_request->obj_request_count - 1; - } else { - blk_status_t status = errno_to_blk_status(result); +static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct bio_vec *bvecs) +{ + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = ceph_file_extents_bytes(img_extents, + num_img_extents) }, + }; - rbd_assert(img_request->rq != NULL); + return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents, + &it); +} - more = blk_update_request(img_request->rq, status, xferred); - if (!more) - __blk_mq_end_request(img_request->rq, status); - } +static void rbd_img_handle_request_work(struct work_struct *work) +{ + struct rbd_img_request *img_req = + container_of(work, struct rbd_img_request, work); - return more; + rbd_img_handle_request(img_req, img_req->work_result); } -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +static void rbd_img_schedule(struct rbd_img_request *img_req, int result) { - struct rbd_img_request *img_request; - u32 which = obj_request->which; - bool more = true; + INIT_WORK(&img_req->work, rbd_img_handle_request_work); + img_req->work_result = result; + queue_work(rbd_wq, &img_req->work); +} - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; +static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; - dout("%s: img %p obj %p\n", __func__, img_request, obj_request); - rbd_assert(img_request != NULL); - rbd_assert(img_request->obj_request_count > 0); - rbd_assert(which != BAD_WHICH); - rbd_assert(which < img_request->obj_request_count); + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) { + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + return true; + } - spin_lock_irq(&img_request->completion_lock); - if (which != img_request->next_completion) - goto out; + dout("%s %p objno %llu assuming dne\n", __func__, obj_req, + obj_req->ex.oe_objno); + return false; +} + +static int rbd_obj_read_object(struct rbd_obj_request *obj_req) +{ + struct ceph_osd_request *osd_req; + int ret; - for_each_obj_request_from(img_request, obj_request) { - rbd_assert(more); - rbd_assert(which < img_request->obj_request_count); + osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); - if (!obj_request_done_test(obj_request)) - break; - more = rbd_img_obj_end_request(obj_request); - which++; - } + osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_setup_data(osd_req, 0); + rbd_osd_format_read(osd_req); - rbd_assert(more ^ (which == img_request->obj_request_count)); - img_request->next_completion = which; -out: - spin_unlock_irq(&img_request->completion_lock); - rbd_img_request_put(img_request); + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; - if (!more) - rbd_img_request_complete(img_request); + rbd_osd_submit(osd_req); + return 0; } -/* - * Add individual osd ops to the given ceph_osd_request and prepare - * them for submission. num_ops is the current number of - * osd operations already to the object request. - */ -static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, - struct ceph_osd_request *osd_request, - enum obj_operation_type op_type, - unsigned int num_ops) +static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) { - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev = img_request->rbd_dev; - u64 object_size = rbd_obj_bytes(&rbd_dev->header); - u64 offset = obj_request->offset; - u64 length = obj_request->length; - u64 img_end; - u16 opcode; + struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *parent = img_req->rbd_dev->parent; + struct rbd_img_request *child_img_req; + int ret; - if (op_type == OBJ_OP_DISCARD) { - if (!offset && length == object_size && - (!img_request_layered_test(img_request) || - !obj_request_overlaps_parent(obj_request))) { - opcode = CEPH_OSD_OP_DELETE; - } else if ((offset + length == object_size)) { - opcode = CEPH_OSD_OP_TRUNCATE; - } else { - down_read(&rbd_dev->header_rwsem); - img_end = rbd_dev->header.image_size; - up_read(&rbd_dev->header_rwsem); + child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO); + if (!child_img_req) + return -ENOMEM; - if (obj_request->img_offset + length == img_end) - opcode = CEPH_OSD_OP_TRUNCATE; - else - opcode = CEPH_OSD_OP_ZERO; + rbd_img_request_init(child_img_req, parent, OBJ_OP_READ); + __set_bit(IMG_REQ_CHILD, &child_img_req->flags); + child_img_req->obj_request = obj_req; + + down_read(&parent->header_rwsem); + rbd_img_capture_header(child_img_req); + up_read(&parent->header_rwsem); + + dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req, + obj_req); + + if (!rbd_img_is_write(img_req)) { + switch (img_req->data_type) { + case OBJ_REQUEST_BIO: + ret = __rbd_img_fill_from_bio(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + &obj_req->bio_pos); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + ret = __rbd_img_fill_from_bvecs(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + &obj_req->bvec_pos); + break; + default: + BUG(); } - } else if (op_type == OBJ_OP_WRITE) { - if (!offset && length == object_size) - opcode = CEPH_OSD_OP_WRITEFULL; - else - opcode = CEPH_OSD_OP_WRITE; - osd_req_op_alloc_hint_init(osd_request, num_ops, - object_size, object_size); - num_ops++; } else { - opcode = CEPH_OSD_OP_READ; + ret = rbd_img_fill_from_bvecs(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + obj_req->copyup_bvecs); + } + if (ret) { + rbd_img_request_destroy(child_img_req); + return ret; } - if (opcode == CEPH_OSD_OP_DELETE) - osd_req_op_init(osd_request, num_ops, opcode, 0); - else - osd_req_op_extent_init(osd_request, num_ops, opcode, - offset, length, 0, 0); - - if (obj_request->type == OBJ_REQUEST_BIO) - osd_req_op_extent_osd_data_bio(osd_request, num_ops, - obj_request->bio_list, length); - else if (obj_request->type == OBJ_REQUEST_PAGES) - osd_req_op_extent_osd_data_pages(osd_request, num_ops, - obj_request->pages, length, - offset & ~PAGE_MASK, false, false); - - /* Discards are also writes */ - if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) - rbd_osd_req_format_write(obj_request); - else - rbd_osd_req_format_read(obj_request); + /* avoid parent chain recursion */ + rbd_img_schedule(child_img_req, 0); + return 0; } -/* - * Split up an image request into one or more object requests, each - * to a different object. The "type" parameter indicates whether - * "data_desc" is the pointer to the head of a list of bio - * structures, or the base of a page array. In either case this - * function assumes data_desc describes memory sufficient to hold - * all data described by the image request. - */ -static int rbd_img_request_fill(struct rbd_img_request *img_request, - enum obj_request_type type, - void *data_desc) +static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - struct rbd_obj_request *obj_request = NULL; - struct rbd_obj_request *next_obj_request; - struct bio *bio_list = NULL; - unsigned int bio_offset = 0; - struct page **pages = NULL; - enum obj_operation_type op_type; - u64 img_offset; - u64 resid; - - dout("%s: img %p type %d data_desc %p\n", __func__, img_request, - (int)type, data_desc); - - img_offset = img_request->offset; - resid = img_request->length; - rbd_assert(resid > 0); - op_type = rbd_img_request_op_type(img_request); - - if (type == OBJ_REQUEST_BIO) { - bio_list = data_desc; - rbd_assert(img_offset == - bio_list->bi_iter.bi_sector << SECTOR_SHIFT); - } else if (type == OBJ_REQUEST_PAGES) { - pages = data_desc; - } + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - while (resid) { - struct ceph_osd_request *osd_req; - u64 object_no = img_offset >> rbd_dev->header.obj_order; - u64 offset = rbd_segment_offset(rbd_dev, img_offset); - u64 length = rbd_segment_length(rbd_dev, img_offset, resid); +again: + switch (obj_req->read_state) { + case RBD_OBJ_READ_START: + rbd_assert(!*result); - obj_request = rbd_obj_request_create(type); - if (!obj_request) - goto out_unwind; + if (!rbd_obj_may_exist(obj_req)) { + *result = -ENOENT; + obj_req->read_state = RBD_OBJ_READ_OBJECT; + goto again; + } - obj_request->object_no = object_no; - obj_request->offset = offset; - obj_request->length = length; + ret = rbd_obj_read_object(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->read_state = RBD_OBJ_READ_OBJECT; + return false; + case RBD_OBJ_READ_OBJECT: + if (*result == -ENOENT && rbd_dev->parent_overlap) { + /* reverse map this object extent onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, false); + if (ret) { + *result = ret; + return true; + } + if (obj_req->num_img_extents) { + ret = rbd_obj_read_from_parent(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->read_state = RBD_OBJ_READ_PARENT; + return false; + } + } /* - * set obj_request->img_request before creating the - * osd_request so that it gets the right snapc + * -ENOENT means a hole in the image -- zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. */ - rbd_img_obj_request_add(img_request, obj_request); - - if (type == OBJ_REQUEST_BIO) { - unsigned int clone_size; - - rbd_assert(length <= (u64)UINT_MAX); - clone_size = (unsigned int)length; - obj_request->bio_list = - bio_chain_clone_range(&bio_list, - &bio_offset, - clone_size, - GFP_NOIO); - if (!obj_request->bio_list) - goto out_unwind; - } else if (type == OBJ_REQUEST_PAGES) { - unsigned int page_count; - - obj_request->pages = pages; - page_count = (u32)calc_pages_for(offset, length); - obj_request->page_count = page_count; - if ((offset + length) & ~PAGE_MASK) - page_count--; /* more on last page */ - pages += page_count; + if (*result == -ENOENT) { + rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len); + *result = 0; + } else if (*result >= 0) { + if (*result < obj_req->ex.oe_len) + rbd_obj_zero_range(obj_req, *result, + obj_req->ex.oe_len - *result); + else + rbd_assert(*result == obj_req->ex.oe_len); + *result = 0; } + return true; + case RBD_OBJ_READ_PARENT: + /* + * The parent image is read only up to the overlap -- zero-fill + * from the overlap to the end of the request. + */ + if (!*result) { + u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req); - osd_req = rbd_osd_req_create(rbd_dev, op_type, - (op_type == OBJ_OP_WRITE) ? 2 : 1, - obj_request); - if (!osd_req) - goto out_unwind; + if (obj_overlap < obj_req->ex.oe_len) + rbd_obj_zero_range(obj_req, obj_overlap, + obj_req->ex.oe_len - obj_overlap); + } + return true; + default: + BUG(); + } +} - obj_request->osd_req = osd_req; - obj_request->callback = rbd_img_obj_callback; - obj_request->img_offset = img_offset; +static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; - rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; - img_offset += length; - resid -= length; + if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) && + (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) { + dout("%s %p noop for nonexistent\n", __func__, obj_req); + return true; } - return 0; + return false; +} -out_unwind: - for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_img_obj_request_del(img_request, obj_request); +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 new_state; - return -ENOMEM; + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + new_state = OBJECT_PENDING; + else + new_state = OBJECT_EXISTS; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL); } -static void -rbd_osd_copyup_callback(struct rbd_obj_request *obj_request) +static int rbd_obj_write_object(struct rbd_obj_request *obj_req) { - struct rbd_img_request *img_request; - struct rbd_device *rbd_dev; - struct page **pages; - u32 page_count; + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; + int ret; - dout("%s: obj %p\n", __func__, obj_request); + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) + num_ops++; /* stat */ - rbd_assert(obj_request->type == OBJ_REQUEST_BIO || - obj_request->type == OBJ_REQUEST_NODATA); - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - rbd_assert(img_request); + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); - rbd_dev = img_request->rbd_dev; - rbd_assert(rbd_dev); + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + ret = rbd_osd_setup_stat(osd_req, which++); + if (ret) + return ret; + } - pages = obj_request->copyup_pages; - rbd_assert(pages != NULL); - obj_request->copyup_pages = NULL; - page_count = obj_request->copyup_page_count; - rbd_assert(page_count); - obj_request->copyup_page_count = 0; - ceph_release_page_vector(pages, page_count); + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); - /* - * We want the transfer count to reflect the size of the - * original write request. There is no such thing as a - * successful short write, so if the request was successful - * we can just set it to the originally-requested length. - */ - if (!obj_request->result) - obj_request->xferred = obj_request->length; + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; - obj_request_done_set(obj_request); + rbd_osd_submit(osd_req); + return 0; } -static void -rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +/* + * copyup_bvecs pages are never highmem pages + */ +static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) +{ + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = bytes }, + }; + + ceph_bvec_iter_advance_step(&it, bytes, ({ + if (memchr_inv(bvec_virt(&bv), 0, bv.bv_len)) + return false; + })); + return true; +} + +#define MODS_ONLY U32_MAX + +static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req, + u32 bytes) { - struct rbd_obj_request *orig_request; struct ceph_osd_request *osd_req; - struct rbd_device *rbd_dev; - struct page **pages; - enum obj_operation_type op_type; - u32 page_count; - int img_result; - u64 parent_length; + int ret; - rbd_assert(img_request_child_test(img_request)); + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); + rbd_assert(bytes > 0 && bytes != MODS_ONLY); - /* First get what we need from the image request */ + osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); - pages = img_request->copyup_pages; - rbd_assert(pages != NULL); - img_request->copyup_pages = NULL; - page_count = img_request->copyup_page_count; - rbd_assert(page_count); - img_request->copyup_page_count = 0; + ret = rbd_osd_setup_copyup(osd_req, 0, bytes); + if (ret) + return ret; - orig_request = img_request->obj_request; - rbd_assert(orig_request != NULL); - rbd_assert(obj_request_type_valid(orig_request->type)); - img_result = img_request->result; - parent_length = img_request->length; - rbd_assert(img_result || parent_length == img_request->xferred); - rbd_img_request_put(img_request); + rbd_osd_format_write(osd_req); - rbd_assert(orig_request->img_request); - rbd_dev = orig_request->img_request->rbd_dev; - rbd_assert(rbd_dev); + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; - /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to free the pages - * and re-submit the original write request. - */ - if (!rbd_dev->parent_overlap) { - ceph_release_page_vector(pages, page_count); - rbd_obj_request_submit(orig_request); - return; + rbd_osd_submit(osd_req); + return 0; +} + +static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req, + u32 bytes) +{ + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; + int ret; + + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); + + if (bytes != MODS_ONLY) + num_ops++; /* copyup */ + + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + if (bytes != MODS_ONLY) { + ret = rbd_osd_setup_copyup(osd_req, which++, bytes); + if (ret) + return ret; } - if (img_result) - goto out_err; + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); - /* - * The original osd request is of no use to use any more. - * We need a new one that can hold the three ops in a copyup - * request. Allocate the new copyup osd request for the - * original request, and release the old one. - */ - img_result = -ENOMEM; - osd_req = rbd_osd_req_create_copyup(orig_request); - if (!osd_req) - goto out_err; - rbd_osd_req_destroy(orig_request->osd_req); - orig_request->osd_req = osd_req; - orig_request->copyup_pages = pages; - orig_request->copyup_page_count = page_count; + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; - /* Initialize the copyup op */ + rbd_osd_submit(osd_req); + return 0; +} - osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); - osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, - false, false); +static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) +{ + u32 i; - /* Add the other op(s) */ + rbd_assert(!obj_req->copyup_bvecs); + obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap); + obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count, + sizeof(*obj_req->copyup_bvecs), + GFP_NOIO); + if (!obj_req->copyup_bvecs) + return -ENOMEM; - op_type = rbd_img_request_op_type(orig_request->img_request); - rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1); + for (i = 0; i < obj_req->copyup_bvec_count; i++) { + unsigned int len = min(obj_overlap, (u64)PAGE_SIZE); + struct page *page = alloc_page(GFP_NOIO); - /* All set, send it off. */ + if (!page) + return -ENOMEM; - rbd_obj_request_submit(orig_request); - return; + bvec_set_page(&obj_req->copyup_bvecs[i], page, len, 0); + obj_overlap -= len; + } -out_err: - ceph_release_page_vector(pages, page_count); - rbd_obj_request_error(orig_request, img_result); + rbd_assert(!obj_overlap); + return 0; } /* - * Read from the parent image the range of data that covers the - * entire target of the given object request. This is used for - * satisfying a layered image write request when the target of an - * object request from the image request does not exist. - * - * A page array big enough to hold the returned data is allocated - * and supplied to rbd_img_request_fill() as the "data descriptor." - * When the read completes, this page array will be transferred to - * the original object request for the copyup operation. - * - * If an error occurs, it is recorded as the result of the original - * object request in rbd_img_obj_exists_callback(). + * The target object doesn't exist. Read the data for the entire + * target object up to the overlap point (if any) from the parent, + * so we can use it for a copyup. */ -static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) +static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req) { - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - struct rbd_img_request *parent_request = NULL; - u64 img_offset; - u64 length; - struct page **pages = NULL; - u32 page_count; - int result; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - rbd_assert(rbd_dev->parent != NULL); + rbd_assert(obj_req->num_img_extents); + prune_extents(obj_req->img_extents, &obj_req->num_img_extents, + rbd_dev->parent_overlap); + if (!obj_req->num_img_extents) { + /* + * The overlap has become 0 (most likely because the + * image has been flattened). Re-submit the original write + * request -- pass MODS_ONLY since the copyup isn't needed + * anymore. + */ + return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY); + } - /* - * Determine the byte range covered by the object in the - * child image to which the original request was to be sent. - */ - img_offset = obj_request->img_offset - obj_request->offset; - length = rbd_obj_bytes(&rbd_dev->header); + ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); + if (ret) + return ret; - /* - * There is no defined parent data beyond the parent - * overlap, so limit what we read at that boundary if - * necessary. - */ - if (img_offset + length > rbd_dev->parent_overlap) { - rbd_assert(img_offset < rbd_dev->parent_overlap); - length = rbd_dev->parent_overlap - img_offset; - } + return rbd_obj_read_from_parent(obj_req); +} - /* - * Allocate a page array big enough to receive the data read - * from the parent. - */ - page_count = (u32)calc_pages_for(0, length); - pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) { - result = PTR_ERR(pages); - pages = NULL; - goto out_err; - } +static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_snap_context *snapc = obj_req->img_request->snapc; + u8 new_state; + u32 i; + int ret; - result = -ENOMEM; - parent_request = rbd_parent_request_create(obj_request, - img_offset, length); - if (!parent_request) - goto out_err; + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); - result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); - if (result) - goto out_err; + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return; - parent_request->copyup_pages = pages; - parent_request->copyup_page_count = page_count; - parent_request->callback = rbd_img_obj_parent_read_full_callback; + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + return; - result = rbd_img_request_submit(parent_request); - if (!result) - return 0; + for (i = 0; i < snapc->num_snaps; i++) { + if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) && + i + 1 < snapc->num_snaps) + new_state = OBJECT_EXISTS_CLEAN; + else + new_state = OBJECT_EXISTS; - parent_request->copyup_pages = NULL; - parent_request->copyup_page_count = 0; - parent_request->obj_request = NULL; - rbd_obj_request_put(obj_request); -out_err: - if (pages) - ceph_release_page_vector(pages, page_count); - if (parent_request) - rbd_img_request_put(parent_request); - return result; + ret = rbd_object_map_update(obj_req, snapc->snaps[i], + new_state, NULL); + if (ret < 0) { + obj_req->pending.result = ret; + return; + } + + rbd_assert(!ret); + obj_req->pending.num_pending++; + } } -static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) +static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) { - struct rbd_obj_request *orig_request; - struct rbd_device *rbd_dev; - int result; + u32 bytes = rbd_obj_img_extents_bytes(obj_req); + int ret; - rbd_assert(!obj_request_img_data_test(obj_request)); + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); /* - * All we need from the object request is the original - * request and the result of the STAT op. Grab those, then - * we're done with the request. + * Only send non-zero copyup data to save some I/O and network + * bandwidth -- zero copyup data is equivalent to the object not + * existing. */ - orig_request = obj_request->obj_request; - obj_request->obj_request = NULL; - rbd_obj_request_put(orig_request); - rbd_assert(orig_request); - rbd_assert(orig_request->img_request); - - result = obj_request->result; - obj_request->result = 0; - - dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, - obj_request, orig_request, result, - obj_request->xferred, obj_request->length); - rbd_obj_request_put(obj_request); + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + bytes = 0; - /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to re-submit the - * original request. - */ - rbd_dev = orig_request->img_request->rbd_dev; - if (!rbd_dev->parent_overlap) { - rbd_obj_request_submit(orig_request); - return; - } + if (obj_req->img_request->snapc->num_snaps && bytes > 0) { + /* + * Send a copyup request with an empty snapshot context to + * deep-copyup the object through all existing snapshots. + * A second request with the current snapshot context will be + * sent for the actual modification. + */ + ret = rbd_obj_copyup_empty_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } - /* - * Our only purpose here is to determine whether the object - * exists, and we don't want to treat the non-existence as - * an error. If something else comes back, transfer the - * error to the original request and complete it now. - */ - if (!result) { - obj_request_existence_set(orig_request, true); - } else if (result == -ENOENT) { - obj_request_existence_set(orig_request, false); - } else { - goto fail_orig_request; + obj_req->pending.num_pending++; + bytes = MODS_ONLY; } - /* - * Resubmit the original request now that we have recorded - * whether the target object exists. - */ - result = rbd_img_obj_request_submit(orig_request); - if (result) - goto fail_orig_request; - - return; + ret = rbd_obj_copyup_current_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } -fail_orig_request: - rbd_obj_request_error(orig_request, result); + obj_req->pending.num_pending++; } -static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result) { - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - struct rbd_obj_request *stat_request; - struct page **pages; - u32 page_count; - size_t size; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; - stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES); - if (!stat_request) - return -ENOMEM; +again: + switch (obj_req->copyup_state) { + case RBD_OBJ_COPYUP_START: + rbd_assert(!*result); - stat_request->object_no = obj_request->object_no; + ret = rbd_obj_copyup_read_parent(obj_req); + if (ret) { + *result = ret; + return true; + } + if (obj_req->num_img_extents) + obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT; + else + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; + return false; + case RBD_OBJ_COPYUP_READ_PARENT: + if (*result) + return true; + + if (is_zero_bvecs(obj_req->copyup_bvecs, + rbd_obj_img_extents_bytes(obj_req))) { + dout("%s %p detected zeros\n", __func__, obj_req); + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS; + } - stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - stat_request); - if (!stat_request->osd_req) { - ret = -ENOMEM; - goto fail_stat_request; - } + rbd_obj_copyup_object_maps(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS; + return false; + case __RBD_OBJ_COPYUP_OBJECT_MAPS: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + fallthrough; + case RBD_OBJ_COPYUP_OBJECT_MAPS: + if (*result) { + rbd_warn(rbd_dev, "snap object map update failed: %d", + *result); + return true; + } - /* - * The response data for a STAT call consists of: - * le64 length; - * struct { - * le32 tv_sec; - * le32 tv_nsec; - * } mtime; - */ - size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); - page_count = (u32)calc_pages_for(0, size); - pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) { - ret = PTR_ERR(pages); - goto fail_stat_request; + rbd_obj_copyup_write_object(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT; + return false; + case __RBD_OBJ_COPYUP_WRITE_OBJECT: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + fallthrough; + case RBD_OBJ_COPYUP_WRITE_OBJECT: + return true; + default: + BUG(); } +} - osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, - false, false); +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 current_state = OBJECT_PENDING; - rbd_obj_request_get(obj_request); - stat_request->obj_request = obj_request; - stat_request->pages = pages; - stat_request->page_count = page_count; - stat_request->callback = rbd_img_obj_exists_callback; + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; - rbd_obj_request_submit(stat_request); - return 0; + if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION)) + return 1; -fail_stat_request: - rbd_obj_request_put(stat_request); - return ret; + return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT, + ¤t_state); } -static bool img_obj_request_simple(struct rbd_obj_request *obj_request) +static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result) { - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev = img_request->rbd_dev; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - /* Reads */ - if (!img_request_write_test(img_request) && - !img_request_discard_test(img_request)) - return true; +again: + switch (obj_req->write_state) { + case RBD_OBJ_WRITE_START: + rbd_assert(!*result); - /* Non-layered writes */ - if (!img_request_layered_test(img_request)) - return true; + rbd_obj_set_copyup_enabled(obj_req); + if (rbd_obj_write_is_noop(obj_req)) + return true; - /* - * Layered writes outside of the parent overlap range don't - * share any data with the parent. - */ - if (!obj_request_overlaps_parent(obj_request)) + ret = rbd_obj_write_pre_object_map(obj_req); + if (ret < 0) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP; + if (ret > 0) + goto again; + return false; + case RBD_OBJ_WRITE_PRE_OBJECT_MAP: + if (*result) { + rbd_warn(rbd_dev, "pre object map update failed: %d", + *result); + return true; + } + ret = rbd_obj_write_object(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_OBJECT; + return false; + case RBD_OBJ_WRITE_OBJECT: + if (*result == -ENOENT) { + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + *result = 0; + obj_req->copyup_state = RBD_OBJ_COPYUP_START; + obj_req->write_state = __RBD_OBJ_WRITE_COPYUP; + goto again; + } + /* + * On a non-existent object: + * delete - -ENOENT, truncate/zero - 0 + */ + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + *result = 0; + } + if (*result) + return true; + + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; + goto again; + case __RBD_OBJ_WRITE_COPYUP: + if (!rbd_obj_advance_copyup(obj_req, result)) + return false; + fallthrough; + case RBD_OBJ_WRITE_COPYUP: + if (*result) { + rbd_warn(rbd_dev, "copyup failed: %d", *result); + return true; + } + ret = rbd_obj_write_post_object_map(obj_req); + if (ret < 0) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP; + if (ret > 0) + goto again; + return false; + case RBD_OBJ_WRITE_POST_OBJECT_MAP: + if (*result) + rbd_warn(rbd_dev, "post object map update failed: %d", + *result); return true; + default: + BUG(); + } +} - /* - * Entire-object layered writes - we will overwrite whatever - * parent data there is anyway. - */ - if (!obj_request->offset && - obj_request->length == rbd_obj_bytes(&rbd_dev->header)) - return true; +/* + * Return true if @obj_req is completed. + */ +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req, + int *result) +{ + struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; - /* - * If the object is known to already exist, its parent data has - * already been copied. - */ - if (obj_request_known_test(obj_request) && - obj_request_exists_test(obj_request)) - return true; + mutex_lock(&obj_req->state_mutex); + if (!rbd_img_is_write(img_req)) + done = rbd_obj_advance_read(obj_req, result); + else + done = rbd_obj_advance_write(obj_req, result); + mutex_unlock(&obj_req->state_mutex); - return false; + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d", + obj_op_name(img_req->op_type), obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len, *result); + } + return done; +} + +/* + * This is open-coded in rbd_img_handle_request() to avoid parent chain + * recursion. + */ +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result) +{ + if (__rbd_obj_handle_request(obj_req, &result)) + rbd_img_handle_request(obj_req->img_request, result); } -static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) +static bool need_exclusive_lock(struct rbd_img_request *img_req) { - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request_type_valid(obj_request->type)); - rbd_assert(obj_request->img_request); + struct rbd_device *rbd_dev = img_req->rbd_dev; - if (img_obj_request_simple(obj_request)) { - rbd_obj_request_submit(obj_request); - return 0; - } + if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) + return false; - /* - * It's a layered write. The target object might exist but - * we may not know that yet. If we know it doesn't exist, - * start by reading the data for the full target object from - * the parent so we can use it for a copyup to the target. - */ - if (obj_request_known_test(obj_request)) - return rbd_img_obj_parent_read_full(obj_request); + if (rbd_is_ro(rbd_dev)) + return false; - /* We don't know whether the target exists. Go find out. */ + rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); + if (rbd_dev->opts->lock_on_read || + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return true; - return rbd_img_obj_exists_submit(obj_request); + return rbd_img_is_write(img_req); } -static int rbd_img_request_submit(struct rbd_img_request *img_request) +static bool rbd_lock_add_request(struct rbd_img_request *img_req) { - struct rbd_obj_request *obj_request; - struct rbd_obj_request *next_obj_request; - int ret = 0; + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool locked; - dout("%s: img %p\n", __func__, img_request); + lockdep_assert_held(&rbd_dev->lock_rwsem); + locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED; + spin_lock(&rbd_dev->lock_lists_lock); + rbd_assert(list_empty(&img_req->lock_item)); + if (!locked) + list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list); + else + list_add_tail(&img_req->lock_item, &rbd_dev->running_list); + spin_unlock(&rbd_dev->lock_lists_lock); + return locked; +} - rbd_img_request_get(img_request); - for_each_obj_request_safe(img_request, obj_request, next_obj_request) { - ret = rbd_img_obj_request_submit(obj_request); - if (ret) - goto out_put_ireq; - } +static void rbd_lock_del_request(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool need_wakeup = false; -out_put_ireq: - rbd_img_request_put(img_request); - return ret; + lockdep_assert_held(&rbd_dev->lock_rwsem); + spin_lock(&rbd_dev->lock_lists_lock); + if (!list_empty(&img_req->lock_item)) { + rbd_assert(!list_empty(&rbd_dev->running_list)); + list_del_init(&img_req->lock_item); + need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_QUIESCING && + list_empty(&rbd_dev->running_list)); + } + spin_unlock(&rbd_dev->lock_lists_lock); + if (need_wakeup) + complete(&rbd_dev->quiescing_wait); } -static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) +static int rbd_img_exclusive_lock(struct rbd_img_request *img_req) { - struct rbd_obj_request *obj_request; - struct rbd_device *rbd_dev; - u64 obj_end; - u64 img_xferred; - int img_result; + struct rbd_device *rbd_dev = img_req->rbd_dev; - rbd_assert(img_request_child_test(img_request)); - - /* First get what we need from the image request and release it */ + if (!need_exclusive_lock(img_req)) + return 1; - obj_request = img_request->obj_request; - img_xferred = img_request->xferred; - img_result = img_request->result; - rbd_img_request_put(img_request); + if (rbd_lock_add_request(img_req)) + return 1; /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to re-submit the - * original request. + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_lock_waiters(). */ - rbd_assert(obj_request); - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; - if (!rbd_dev->parent_overlap) { - rbd_obj_request_submit(obj_request); - return; + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + return 0; +} + +static void rbd_img_object_requests(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + struct rbd_obj_request *obj_req; + + rbd_assert(!img_req->pending.result && !img_req->pending.num_pending); + rbd_assert(!need_exclusive_lock(img_req) || + __rbd_is_lock_owner(rbd_dev)); + + if (rbd_img_is_write(img_req)) { + rbd_assert(!img_req->snapc); + down_read(&rbd_dev->header_rwsem); + img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc); + up_read(&rbd_dev->header_rwsem); } - obj_request->result = img_result; - if (obj_request->result) - goto out; + for_each_obj_request(img_req, obj_req) { + int result = 0; - /* - * We need to zero anything beyond the parent overlap - * boundary. Since rbd_img_obj_request_read_callback() - * will zero anything beyond the end of a short read, an - * easy way to do this is to pretend the data from the - * parent came up short--ending at the overlap boundary. - */ - rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); - obj_end = obj_request->img_offset + obj_request->length; - if (obj_end > rbd_dev->parent_overlap) { - u64 xferred = 0; + if (__rbd_obj_handle_request(obj_req, &result)) { + if (result) { + img_req->pending.result = result; + return; + } + } else { + img_req->pending.num_pending++; + } + } +} - if (obj_request->img_offset < rbd_dev->parent_overlap) - xferred = rbd_dev->parent_overlap - - obj_request->img_offset; +static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) +{ + int ret; - obj_request->xferred = min(img_xferred, xferred); - } else { - obj_request->xferred = img_xferred; +again: + switch (img_req->state) { + case RBD_IMG_START: + rbd_assert(!*result); + + ret = rbd_img_exclusive_lock(img_req); + if (ret < 0) { + *result = ret; + return true; + } + img_req->state = RBD_IMG_EXCLUSIVE_LOCK; + if (ret > 0) + goto again; + return false; + case RBD_IMG_EXCLUSIVE_LOCK: + if (*result) + return true; + + rbd_img_object_requests(img_req); + if (!img_req->pending.num_pending) { + *result = img_req->pending.result; + img_req->state = RBD_IMG_OBJECT_REQUESTS; + goto again; + } + img_req->state = __RBD_IMG_OBJECT_REQUESTS; + return false; + case __RBD_IMG_OBJECT_REQUESTS: + if (!pending_result_dec(&img_req->pending, result)) + return false; + fallthrough; + case RBD_IMG_OBJECT_REQUESTS: + return true; + default: + BUG(); } -out: - rbd_img_obj_request_read_callback(obj_request); - rbd_obj_request_complete(obj_request); } -static void rbd_img_parent_read(struct rbd_obj_request *obj_request) +/* + * Return true if @img_req is completed. + */ +static bool __rbd_img_handle_request(struct rbd_img_request *img_req, + int *result) { - struct rbd_img_request *img_request; - int result; + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request->img_request != NULL); - rbd_assert(obj_request->result == (s32) -ENOENT); - rbd_assert(obj_request_type_valid(obj_request->type)); - - /* rbd_read_finish(obj_request, obj_request->length); */ - img_request = rbd_parent_request_create(obj_request, - obj_request->img_offset, - obj_request->length); - result = -ENOMEM; - if (!img_request) - goto out_err; + if (need_exclusive_lock(img_req)) { + down_read(&rbd_dev->lock_rwsem); + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + if (done) + rbd_lock_del_request(img_req); + mutex_unlock(&img_req->state_mutex); + up_read(&rbd_dev->lock_rwsem); + } else { + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + mutex_unlock(&img_req->state_mutex); + } - if (obj_request->type == OBJ_REQUEST_BIO) - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - obj_request->bio_list); - else - result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES, - obj_request->pages); - if (result) - goto out_err; + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s%s result %d", + test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "", + obj_op_name(img_req->op_type), *result); + } + return done; +} - img_request->callback = rbd_img_parent_read_callback; - result = rbd_img_request_submit(img_request); - if (result) - goto out_err; +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result) +{ +again: + if (!__rbd_img_handle_request(img_req, &result)) + return; - return; -out_err: - if (img_request) - rbd_img_request_put(img_request); - obj_request->result = result; - obj_request->xferred = 0; - obj_request_done_set(obj_request); + if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { + struct rbd_obj_request *obj_req = img_req->obj_request; + + rbd_img_request_destroy(img_req); + if (__rbd_obj_handle_request(obj_req, &result)) { + img_req = obj_req->img_request; + goto again; + } + } else { + struct request *rq = blk_mq_rq_from_pdu(img_req); + + rbd_img_request_destroy(img_req); + blk_mq_end_request(rq, errno_to_blk_status(result)); + } } static const struct rbd_client_id rbd_empty_cid; @@ -3074,13 +3649,22 @@ static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) mutex_unlock(&rbd_dev->watch_mutex); } +static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) +{ + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; + strcpy(rbd_dev->lock_cookie, cookie); + rbd_set_owner_cid(rbd_dev, &cid); + queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); +} + /* * lock_rwsem must be held for write */ static int rbd_lock(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - struct rbd_client_id cid = rbd_get_cid(rbd_dev); char cookie[32]; int ret; @@ -3091,13 +3675,10 @@ static int rbd_lock(struct rbd_device *rbd_dev) ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, RBD_LOCK_TAG, "", 0); - if (ret) + if (ret && ret != -EEXIST) return ret; - rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; - strcpy(rbd_dev->lock_cookie, cookie); - rbd_set_owner_cid(rbd_dev, &cid); - queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); + __rbd_lock(rbd_dev, cookie); return 0; } @@ -3115,7 +3696,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev) ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, rbd_dev->lock_cookie); if (ret && ret != -ENOENT) - rbd_warn(rbd_dev, "failed to unlock: %d", ret); + rbd_warn(rbd_dev, "failed to unlock header: %d", ret); /* treat errors as the image is unlocked */ rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; @@ -3131,8 +3712,8 @@ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_client_id cid = rbd_get_cid(rbd_dev); - int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; - char buf[buf_size]; + char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN]; + int buf_size = sizeof(buf); void *p = buf; dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); @@ -3151,11 +3732,7 @@ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, static void rbd_notify_op_lock(struct rbd_device *rbd_dev, enum rbd_notify_op notify_op) { - struct page **reply_pages; - size_t reply_len; - - __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); - ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); + __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL); } static void rbd_notify_acquired_lock(struct work_struct *work) @@ -3242,62 +3819,117 @@ e_inval: goto out; } -static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +/* + * Either image request state machine(s) or rbd_add_acquire_lock() + * (i.e. "rbd map"). + */ +static void wake_lock_waiters(struct rbd_device *rbd_dev, int result) { - dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + struct rbd_img_request *img_req; + + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); cancel_delayed_work(&rbd_dev->lock_dwork); - if (wake_all) - wake_up_all(&rbd_dev->lock_waitq); - else - wake_up(&rbd_dev->lock_waitq); + if (!completion_done(&rbd_dev->acquire_wait)) { + rbd_assert(list_empty(&rbd_dev->acquiring_list) && + list_empty(&rbd_dev->running_list)); + rbd_dev->acquire_err = result; + complete_all(&rbd_dev->acquire_wait); + return; + } + + while (!list_empty(&rbd_dev->acquiring_list)) { + img_req = list_first_entry(&rbd_dev->acquiring_list, + struct rbd_img_request, lock_item); + mutex_lock(&img_req->state_mutex); + rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK); + if (!result) + list_move_tail(&img_req->lock_item, + &rbd_dev->running_list); + else + list_del_init(&img_req->lock_item); + rbd_img_schedule(img_req, result); + mutex_unlock(&img_req->state_mutex); + } +} + +static bool locker_equal(const struct ceph_locker *lhs, + const struct ceph_locker *rhs) +{ + return lhs->id.name.type == rhs->id.name.type && + lhs->id.name.num == rhs->id.name.num && + !strcmp(lhs->id.cookie, rhs->id.cookie) && + ceph_addr_equal_no_type(&lhs->info.addr, &rhs->info.addr); +} + +static void free_locker(struct ceph_locker *locker) +{ + if (locker) + ceph_free_lockers(locker, 1); } -static int get_lock_owner_info(struct rbd_device *rbd_dev, - struct ceph_locker **lockers, u32 *num_lockers) +static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_locker *lockers; + u32 num_lockers; u8 lock_type; char *lock_tag; + u64 handle; int ret; - dout("%s rbd_dev %p\n", __func__, rbd_dev); - ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, - &lock_type, &lock_tag, lockers, num_lockers); - if (ret) - return ret; + &lock_type, &lock_tag, &lockers, &num_lockers); + if (ret) { + rbd_warn(rbd_dev, "failed to get header lockers: %d", ret); + return ERR_PTR(ret); + } - if (*num_lockers == 0) { + if (num_lockers == 0) { dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); + lockers = NULL; goto out; } if (strcmp(lock_tag, RBD_LOCK_TAG)) { rbd_warn(rbd_dev, "locked by external mechanism, tag %s", lock_tag); - ret = -EBUSY; - goto out; + goto err_busy; } - if (lock_type == CEPH_CLS_LOCK_SHARED) { - rbd_warn(rbd_dev, "shared lock type detected"); - ret = -EBUSY; - goto out; + if (lock_type != CEPH_CLS_LOCK_EXCLUSIVE) { + rbd_warn(rbd_dev, "incompatible lock type detected"); + goto err_busy; } - if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, - strlen(RBD_LOCK_COOKIE_PREFIX))) { + WARN_ON(num_lockers != 1); + ret = sscanf(lockers[0].id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", + &handle); + if (ret != 1) { rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", - (*lockers)[0].id.cookie); - ret = -EBUSY; - goto out; + lockers[0].id.cookie); + goto err_busy; + } + if (ceph_addr_is_blank(&lockers[0].info.addr)) { + rbd_warn(rbd_dev, "locker has a blank address"); + goto err_busy; } + dout("%s rbd_dev %p got locker %s%llu@%pISpc/%u handle %llu\n", + __func__, rbd_dev, ENTITY_NAME(lockers[0].id.name), + &lockers[0].info.addr.in_addr, + le32_to_cpu(lockers[0].info.addr.nonce), handle); + out: kfree(lock_tag); - return ret; + return lockers; + +err_busy: + kfree(lock_tag); + ceph_free_lockers(lockers, num_lockers); + return ERR_PTR(-EBUSY); } static int find_watcher(struct rbd_device *rbd_dev, @@ -3313,13 +3945,19 @@ static int find_watcher(struct rbd_device *rbd_dev, ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, &watchers, &num_watchers); - if (ret) + if (ret) { + rbd_warn(rbd_dev, "failed to get watchers: %d", ret); return ret; + } sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); for (i = 0; i < num_watchers; i++) { - if (!memcmp(&watchers[i].addr, &locker->info.addr, - sizeof(locker->info.addr)) && + /* + * Ignore addr->type while comparing. This mimics + * entity_addr_t::get_legacy_str() + strcmp(). + */ + if (ceph_addr_equal_no_type(&watchers[i].addr, + &locker->info.addr) && watchers[i].cookie == cookie) { struct rbd_client_id cid = { .gid = le64_to_cpu(watchers[i].name.num), @@ -3347,104 +3985,160 @@ out: static int rbd_try_lock(struct rbd_device *rbd_dev) { struct ceph_client *client = rbd_dev->rbd_client->client; - struct ceph_locker *lockers; - u32 num_lockers; + struct ceph_locker *locker, *refreshed_locker; int ret; for (;;) { + locker = refreshed_locker = NULL; + ret = rbd_lock(rbd_dev); - if (ret != -EBUSY) - return ret; + if (!ret) + goto out; + if (ret != -EBUSY) { + rbd_warn(rbd_dev, "failed to lock header: %d", ret); + goto out; + } /* determine if the current lock holder is still alive */ - ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); - if (ret) - return ret; - - if (num_lockers == 0) + locker = get_lock_owner_info(rbd_dev); + if (IS_ERR(locker)) { + ret = PTR_ERR(locker); + locker = NULL; + goto out; + } + if (!locker) goto again; - ret = find_watcher(rbd_dev, lockers); - if (ret) { - if (ret > 0) - ret = 0; /* have to request lock */ + ret = find_watcher(rbd_dev, locker); + if (ret) + goto out; /* request lock or error */ + + refreshed_locker = get_lock_owner_info(rbd_dev); + if (IS_ERR(refreshed_locker)) { + ret = PTR_ERR(refreshed_locker); + refreshed_locker = NULL; goto out; } + if (!refreshed_locker || + !locker_equal(locker, refreshed_locker)) + goto again; - rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", - ENTITY_NAME(lockers[0].id.name)); + rbd_warn(rbd_dev, "breaking header lock owned by %s%llu", + ENTITY_NAME(locker->id.name)); - ret = ceph_monc_blacklist_add(&client->monc, - &lockers[0].info.addr); + ret = ceph_monc_blocklist_add(&client->monc, + &locker->info.addr); if (ret) { - rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", - ENTITY_NAME(lockers[0].id.name), ret); + rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d", + ENTITY_NAME(locker->id.name), ret); goto out; } ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, - lockers[0].id.cookie, - &lockers[0].id.name); - if (ret && ret != -ENOENT) + locker->id.cookie, &locker->id.name); + if (ret && ret != -ENOENT) { + rbd_warn(rbd_dev, "failed to break header lock: %d", + ret); goto out; + } again: - ceph_free_lockers(lockers, num_lockers); + free_locker(refreshed_locker); + free_locker(locker); } out: - ceph_free_lockers(lockers, num_lockers); + free_locker(refreshed_locker); + free_locker(locker); return ret; } +static int rbd_post_acquire_action(struct rbd_device *rbd_dev) +{ + int ret; + + ret = rbd_dev_refresh(rbd_dev); + if (ret) + return ret; + + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) { + ret = rbd_object_map_open(rbd_dev); + if (ret) + return ret; + } + + return 0; +} + /* - * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + * Return: + * 0 - lock acquired + * 1 - caller should call rbd_request_lock() + * <0 - error */ -static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, - int *pret) +static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) { - enum rbd_lock_state lock_state; + int ret; down_read(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); if (__rbd_is_lock_owner(rbd_dev)) { - lock_state = rbd_dev->lock_state; up_read(&rbd_dev->lock_rwsem); - return lock_state; + return 0; } up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); - if (!__rbd_is_lock_owner(rbd_dev)) { - *pret = rbd_try_lock(rbd_dev); - if (*pret) - rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + if (__rbd_is_lock_owner(rbd_dev)) { + up_write(&rbd_dev->lock_rwsem); + return 0; + } + + ret = rbd_try_lock(rbd_dev); + if (ret < 0) { + rbd_warn(rbd_dev, "failed to acquire lock: %d", ret); + goto out; + } + if (ret > 0) { + up_write(&rbd_dev->lock_rwsem); + return ret; } - lock_state = rbd_dev->lock_state; + rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); + rbd_assert(list_empty(&rbd_dev->running_list)); + + ret = rbd_post_acquire_action(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "post-acquire action failed: %d", ret); + /* + * Can't stay in RBD_LOCK_STATE_LOCKED because + * rbd_lock_add_request() would let the request through, + * assuming that e.g. object map is locked and loaded. + */ + rbd_unlock(rbd_dev); + } + +out: + wake_lock_waiters(rbd_dev, ret); up_write(&rbd_dev->lock_rwsem); - return lock_state; + return ret; } static void rbd_acquire_lock(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, lock_dwork); - enum rbd_lock_state lock_state; int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); again: - lock_state = rbd_try_acquire_lock(rbd_dev, &ret); - if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { - if (lock_state == RBD_LOCK_STATE_LOCKED) - wake_requests(rbd_dev, true); - dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, - rbd_dev, lock_state, ret); + ret = rbd_try_acquire_lock(rbd_dev); + if (ret <= 0) { + dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret); return; } @@ -3453,16 +4147,9 @@ again: goto again; /* treat this as a dead client */ } else if (ret == -EROFS) { rbd_warn(rbd_dev, "peer will not release lock"); - /* - * If this is rbd_add_acquire_lock(), we want to fail - * immediately -- reuse BLACKLISTED flag. Otherwise we - * want to block. - */ - if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - /* wake "rbd map --exclusive" process */ - wake_requests(rbd_dev, false); - } + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); } else if (ret < 0) { rbd_warn(rbd_dev, "error requesting lock: %d", ret); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, @@ -3472,50 +4159,72 @@ again: * lock owner acked, but resend if we don't see them * release the lock */ - dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, + dout("%s rbd_dev %p requeuing lock_dwork\n", __func__, rbd_dev); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); } } -/* - * lock_rwsem must be held for write - */ -static bool rbd_release_lock(struct rbd_device *rbd_dev) +static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) { - dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); + dout("%s rbd_dev %p\n", __func__, rbd_dev); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) return false; - rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; - downgrade_write(&rbd_dev->lock_rwsem); /* * Ensure that all in-flight IO is flushed. - * - * FIXME: ceph_osdc_sync() flushes the entire OSD client, which - * may be shared with other devices. */ - ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); - up_read(&rbd_dev->lock_rwsem); + rbd_dev->lock_state = RBD_LOCK_STATE_QUIESCING; + rbd_assert(!completion_done(&rbd_dev->quiescing_wait)); + if (list_empty(&rbd_dev->running_list)) + return true; + + up_write(&rbd_dev->lock_rwsem); + wait_for_completion(&rbd_dev->quiescing_wait); down_write(&rbd_dev->lock_rwsem); - dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); - if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) + if (rbd_dev->lock_state != RBD_LOCK_STATE_QUIESCING) return false; + rbd_assert(list_empty(&rbd_dev->running_list)); + return true; +} + +static void rbd_pre_release_action(struct rbd_device *rbd_dev) +{ + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) + rbd_object_map_close(rbd_dev); +} + +static void __rbd_release_lock(struct rbd_device *rbd_dev) +{ + rbd_assert(list_empty(&rbd_dev->running_list)); + + rbd_pre_release_action(rbd_dev); rbd_unlock(rbd_dev); +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_release_lock(struct rbd_device *rbd_dev) +{ + if (!rbd_quiesce_lock(rbd_dev)) + return; + + __rbd_release_lock(rbd_dev); + /* * Give others a chance to grab the lock - we would re-acquire - * almost immediately if we got new IO during ceph_osdc_sync() - * otherwise. We need to ack our own notifications, so this - * lock_dwork will be requeued from rbd_wait_state_locked() - * after wake_requests() in rbd_handle_released_lock(). + * almost immediately if we got new IO while draining the running + * list otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_handle_released_lock() by + * way of maybe_kick_acquire(). */ cancel_delayed_work(&rbd_dev->lock_dwork); - return true; } static void rbd_release_lock_work(struct work_struct *work) @@ -3528,6 +4237,23 @@ static void rbd_release_lock_work(struct work_struct *work) up_write(&rbd_dev->lock_rwsem); } +static void maybe_kick_acquire(struct rbd_device *rbd_dev) +{ + bool have_requests; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + if (__rbd_is_lock_owner(rbd_dev)) + return; + + spin_lock(&rbd_dev->lock_lists_lock); + have_requests = !list_empty(&rbd_dev->acquiring_list); + spin_unlock(&rbd_dev->lock_lists_lock); + if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) { + dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + } +} + static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, void **p) { @@ -3543,22 +4269,17 @@ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { down_write(&rbd_dev->lock_rwsem); if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { - /* - * we already know that the remote client is - * the owner - */ - up_write(&rbd_dev->lock_rwsem); - return; + dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n", + __func__, rbd_dev, cid.gid, cid.handle); + } else { + rbd_set_owner_cid(rbd_dev, &cid); } - - rbd_set_owner_cid(rbd_dev, &cid); downgrade_write(&rbd_dev->lock_rwsem); } else { down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3577,21 +4298,18 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { down_write(&rbd_dev->lock_rwsem); if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { - dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", + dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n", __func__, rbd_dev, cid.gid, cid.handle, rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); - up_write(&rbd_dev->lock_rwsem); - return; + } else { + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); } - - rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); downgrade_write(&rbd_dev->lock_rwsem); } else { down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3650,8 +4368,8 @@ static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, u64 cookie, s32 *result) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; - char buf[buf_size]; + char buf[4 + CEPH_ENCODING_START_BLK_LEN]; + int buf_size = sizeof(buf); int ret; if (result) { @@ -3833,16 +4551,18 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev) { dout("%s rbd_dev %p\n", __func__, rbd_dev); - cancel_delayed_work_sync(&rbd_dev->watch_dwork); cancel_work_sync(&rbd_dev->acquired_lock_work); cancel_work_sync(&rbd_dev->released_lock_work); cancel_delayed_work_sync(&rbd_dev->lock_dwork); cancel_work_sync(&rbd_dev->unlock_work); } +/* + * header_rwsem must not be held to avoid a deadlock with + * rbd_dev_refresh() when flushing notifies. + */ static void rbd_unregister_watch(struct rbd_device *rbd_dev) { - WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); cancel_tasks_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex); @@ -3851,6 +4571,7 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev) rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; mutex_unlock(&rbd_dev->watch_mutex); + cancel_delayed_work_sync(&rbd_dev->watch_dwork); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); } @@ -3863,7 +4584,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) char cookie[32]; int ret; - WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + if (!rbd_quiesce_lock(rbd_dev)) + return; format_lock_cookie(rbd_dev, cookie); ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, @@ -3875,15 +4597,19 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) rbd_warn(rbd_dev, "failed to update lock cookie: %d", ret); + if (rbd_dev->opts->exclusive) + rbd_warn(rbd_dev, + "temporarily releasing lock on exclusive mapping"); + /* * Lock cookie cannot be updated on older OSDs, so do * a manual release and queue an acquire. */ - if (rbd_release_lock(rbd_dev)) - queue_delayed_work(rbd_dev->task_wq, - &rbd_dev->lock_dwork, 0); + __rbd_release_lock(rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); } else { - strcpy(rbd_dev->lock_cookie, cookie); + __rbd_lock(rbd_dev, cookie); + wake_lock_waiters(rbd_dev, 0); } } @@ -3904,15 +4630,18 @@ static void rbd_reregister_watch(struct work_struct *work) ret = __rbd_register_watch(rbd_dev); if (ret) { rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - if (ret == -EBLACKLISTED || ret == -ENOENT) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - wake_requests(rbd_dev, true); - } else { + if (ret != -EBLOCKLISTED && ret != -ENOENT) { queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, RBD_RETRY_DELAY); + mutex_unlock(&rbd_dev->watch_mutex); + return; } + mutex_unlock(&rbd_dev->watch_mutex); + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); return; } @@ -3927,7 +4656,7 @@ static void rbd_reregister_watch(struct work_struct *work) ret = rbd_dev_refresh(rbd_dev); if (ret) - rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret); } /* @@ -3975,7 +4704,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, CEPH_OSD_FLAG_READ, req_page, outbound_size, - reply_page, &inbound_size); + &reply_page, &inbound_size); if (!ret) { memcpy(inbound, page_address(reply_page), inbound_size); ret = inbound_size; @@ -3987,191 +4716,106 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, return ret; } -/* - * lock_rwsem must be held for read - */ -static void rbd_wait_state_locked(struct rbd_device *rbd_dev) -{ - DEFINE_WAIT(wait); - - do { - /* - * Note the use of mod_delayed_work() in rbd_acquire_lock() - * and cancel_delayed_work() in wake_requests(). - */ - dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); - queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); - prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, - TASK_UNINTERRUPTIBLE); - up_read(&rbd_dev->lock_rwsem); - schedule(); - down_read(&rbd_dev->lock_rwsem); - } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && - !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)); - - finish_wait(&rbd_dev->lock_waitq, &wait); -} - static void rbd_queue_workfn(struct work_struct *work) { - struct request *rq = blk_mq_rq_from_pdu(work); - struct rbd_device *rbd_dev = rq->q->queuedata; - struct rbd_img_request *img_request; - struct ceph_snap_context *snapc = NULL; + struct rbd_img_request *img_request = + container_of(work, struct rbd_img_request, work); + struct rbd_device *rbd_dev = img_request->rbd_dev; + enum obj_operation_type op_type = img_request->op_type; + struct request *rq = blk_mq_rq_from_pdu(img_request); u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; u64 length = blk_rq_bytes(rq); - enum obj_operation_type op_type; u64 mapping_size; - bool must_be_locked; int result; - switch (req_op(rq)) { - case REQ_OP_DISCARD: - case REQ_OP_WRITE_ZEROES: - op_type = OBJ_OP_DISCARD; - break; - case REQ_OP_WRITE: - op_type = OBJ_OP_WRITE; - break; - case REQ_OP_READ: - op_type = OBJ_OP_READ; - break; - default: - dout("%s: non-fs request type %d\n", __func__, req_op(rq)); - result = -EIO; - goto err; - } - /* Ignore/skip any zero-length requests */ - if (!length) { dout("%s: zero-length request\n", __func__); result = 0; - goto err_rq; - } - - /* Only reads are allowed to a read-only device */ - - if (op_type != OBJ_OP_READ) { - if (rbd_dev->mapping.read_only) { - result = -EROFS; - goto err_rq; - } - rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); - } - - /* - * Quit early if the mapped snapshot no longer exists. It's - * still possible the snapshot will have disappeared by the - * time our request arrives at the osd, but there's no sense in - * sending it if we already know. - */ - if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { - dout("request for non-existent snapshot"); - rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); - result = -ENXIO; - goto err_rq; - } - - if (offset && length > U64_MAX - offset + 1) { - rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, - length); - result = -EINVAL; - goto err_rq; /* Shouldn't happen */ + goto err_img_request; } blk_mq_start_request(rq); down_read(&rbd_dev->header_rwsem); mapping_size = rbd_dev->mapping.size; - if (op_type != OBJ_OP_READ) { - snapc = rbd_dev->header.snapc; - ceph_get_snap_context(snapc); - } + rbd_img_capture_header(img_request); up_read(&rbd_dev->header_rwsem); if (offset + length > mapping_size) { rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, length, mapping_size); result = -EIO; - goto err_rq; - } - - must_be_locked = - (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && - (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); - if (must_be_locked) { - down_read(&rbd_dev->lock_rwsem); - if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && - !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { - if (rbd_dev->opts->exclusive) { - rbd_warn(rbd_dev, "exclusive lock required"); - result = -EROFS; - goto err_unlock; - } - rbd_wait_state_locked(rbd_dev); - } - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { - result = -EBLACKLISTED; - goto err_unlock; - } + goto err_img_request; } - img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, - snapc); - if (!img_request) { - result = -ENOMEM; - goto err_unlock; - } - img_request->rq = rq; - snapc = NULL; /* img_request consumes a ref */ + dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev, + img_request, obj_op_name(op_type), offset, length); - if (op_type == OBJ_OP_DISCARD) - result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA, - NULL); + if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT) + result = rbd_img_fill_nodata(img_request, offset, length); else - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - rq->bio); - if (result) - goto err_img_request; - - result = rbd_img_request_submit(img_request); + result = rbd_img_fill_from_bio(img_request, offset, length, + rq->bio); if (result) goto err_img_request; - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); + rbd_img_handle_request(img_request, 0); return; err_img_request: - rbd_img_request_put(img_request); -err_unlock: - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); -err_rq: + rbd_img_request_destroy(img_request); if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", obj_op_name(op_type), length, offset, result); - ceph_put_snap_context(snapc); -err: blk_mq_end_request(rq, errno_to_blk_status(result)); } static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, const struct blk_mq_queue_data *bd) { - struct request *rq = bd->rq; - struct work_struct *work = blk_mq_rq_to_pdu(rq); + struct rbd_device *rbd_dev = hctx->queue->queuedata; + struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq); + enum obj_operation_type op_type; - queue_work(rbd_wq, work); + switch (req_op(bd->rq)) { + case REQ_OP_DISCARD: + op_type = OBJ_OP_DISCARD; + break; + case REQ_OP_WRITE_ZEROES: + op_type = OBJ_OP_ZEROOUT; + break; + case REQ_OP_WRITE: + op_type = OBJ_OP_WRITE; + break; + case REQ_OP_READ: + op_type = OBJ_OP_READ; + break; + default: + rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq)); + return BLK_STS_IOERR; + } + + rbd_img_request_init(img_req, rbd_dev, op_type); + + if (rbd_img_is_write(img_req)) { + if (rbd_is_ro(rbd_dev)) { + rbd_warn(rbd_dev, "%s on read-only mapping", + obj_op_name(img_req->op_type)); + return BLK_STS_IOERR; + } + rbd_assert(!rbd_is_snap(rbd_dev)); + } + + INIT_WORK(&img_req->work, rbd_queue_workfn); + queue_work(rbd_wq, &img_req->work); return BLK_STS_OK; } static void rbd_free_disk(struct rbd_device *rbd_dev) { - blk_cleanup_queue(rbd_dev->disk->queue); - blk_mq_free_tag_set(&rbd_dev->tag_set); put_disk(rbd_dev->disk); + blk_mq_free_tag_set(&rbd_dev->tag_set); rbd_dev->disk = NULL; } @@ -4195,10 +4839,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); - if (ret) - goto out_req; - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -4209,7 +4849,11 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, true); - ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out_req; + + ceph_osdc_start_request(osdc, req); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) ceph_copy_from_page_vector(pages, buf, 0, ret); @@ -4224,7 +4868,9 @@ out_req: * return, the rbd_dev->header field will contain up-to-date * information about the image. */ -static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) +static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev, + struct rbd_image_header *header, + bool first_time) { struct rbd_image_header_ondisk *ondisk = NULL; u32 snap_count = 0; @@ -4272,32 +4918,13 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) snap_count = le32_to_cpu(ondisk->snap_count); } while (snap_count != want_count); - ret = rbd_header_from_disk(rbd_dev, ondisk); + ret = rbd_header_from_disk(header, ondisk, first_time); out: kfree(ondisk); return ret; } -/* - * Clear the rbd device's EXISTS flag if the snapshot it's mapped to - * has disappeared from the (just updated) snapshot context. - */ -static void rbd_exists_validate(struct rbd_device *rbd_dev) -{ - u64 snap_id; - - if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) - return; - - snap_id = rbd_dev->spec->snap_id; - if (snap_id == CEPH_NOSNAP) - return; - - if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) - clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); -} - static void rbd_dev_update_size(struct rbd_device *rbd_dev) { sector_t size; @@ -4311,140 +4938,69 @@ static void rbd_dev_update_size(struct rbd_device *rbd_dev) !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) { size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; dout("setting size to %llu sectors", (unsigned long long)size); - set_capacity(rbd_dev->disk, size); - revalidate_disk(rbd_dev->disk); - } -} - -static int rbd_dev_refresh(struct rbd_device *rbd_dev) -{ - u64 mapping_size; - int ret; - - down_write(&rbd_dev->header_rwsem); - mapping_size = rbd_dev->mapping.size; - - ret = rbd_dev_header_info(rbd_dev); - if (ret) - goto out; - - /* - * If there is a parent, see if it has disappeared due to the - * mapped image getting flattened. - */ - if (rbd_dev->parent) { - ret = rbd_dev_v2_parent_info(rbd_dev); - if (ret) - goto out; + set_capacity_and_notify(rbd_dev->disk, size); } - - if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { - rbd_dev->mapping.size = rbd_dev->header.image_size; - } else { - /* validate mapped snapshot's EXISTS flag */ - rbd_exists_validate(rbd_dev); - } - -out: - up_write(&rbd_dev->header_rwsem); - if (!ret && mapping_size != rbd_dev->mapping.size) - rbd_dev_update_size(rbd_dev); - - return ret; -} - -static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq, - unsigned int hctx_idx, unsigned int numa_node) -{ - struct work_struct *work = blk_mq_rq_to_pdu(rq); - - INIT_WORK(work, rbd_queue_workfn); - return 0; } static const struct blk_mq_ops rbd_mq_ops = { .queue_rq = rbd_queue_rq, - .init_request = rbd_init_request, }; static int rbd_init_disk(struct rbd_device *rbd_dev) { struct gendisk *disk; - struct request_queue *q; - u64 segment_size; + unsigned int objset_bytes = + rbd_dev->layout.object_size * rbd_dev->layout.stripe_count; + struct queue_limits lim = { + .max_hw_sectors = objset_bytes >> SECTOR_SHIFT, + .io_opt = objset_bytes, + .io_min = rbd_dev->opts->alloc_size, + .max_segments = USHRT_MAX, + .max_segment_size = UINT_MAX, + }; int err; - /* create gendisk info */ - disk = alloc_disk(single_major ? - (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : - RBD_MINORS_PER_MAJOR); - if (!disk) - return -ENOMEM; - - snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", - rbd_dev->dev_id); - disk->major = rbd_dev->major; - disk->first_minor = rbd_dev->minor; - if (single_major) - disk->flags |= GENHD_FL_EXT_DEVT; - disk->fops = &rbd_bd_ops; - disk->private_data = rbd_dev; - memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); rbd_dev->tag_set.ops = &rbd_mq_ops; rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; rbd_dev->tag_set.numa_node = NUMA_NO_NODE; - rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE; - rbd_dev->tag_set.nr_hw_queues = 1; - rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); + rbd_dev->tag_set.nr_hw_queues = num_present_cpus(); + rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request); err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); if (err) - goto out_disk; + return err; - q = blk_mq_init_queue(&rbd_dev->tag_set); - if (IS_ERR(q)) { - err = PTR_ERR(q); - goto out_tag_set; + if (rbd_dev->opts->trim) { + lim.discard_granularity = rbd_dev->opts->alloc_size; + lim.max_hw_discard_sectors = objset_bytes >> SECTOR_SHIFT; + lim.max_write_zeroes_sectors = objset_bytes >> SECTOR_SHIFT; } - queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q); - /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */ - - /* set io sizes to object size */ - segment_size = rbd_obj_bytes(&rbd_dev->header); - blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); - q->limits.max_sectors = queue_max_hw_sectors(q); - blk_queue_max_segments(q, segment_size / SECTOR_SIZE); - blk_queue_max_segment_size(q, segment_size); - blk_queue_io_min(q, segment_size); - blk_queue_io_opt(q, segment_size); - - /* enable the discard support */ - queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q); - q->limits.discard_granularity = segment_size; - q->limits.discard_alignment = segment_size; - blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE); - blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE); - if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) - q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; + lim.features |= BLK_FEAT_STABLE_WRITES; - /* - * disk_release() expects a queue ref from add_disk() and will - * put it. Hold an extra ref until add_disk() is called. - */ - WARN_ON(!blk_get_queue(q)); - disk->queue = q; - q->queuedata = rbd_dev; + disk = blk_mq_alloc_disk(&rbd_dev->tag_set, &lim, rbd_dev); + if (IS_ERR(disk)) { + err = PTR_ERR(disk); + goto out_tag_set; + } + snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", + rbd_dev->dev_id); + disk->major = rbd_dev->major; + disk->first_minor = rbd_dev->minor; + if (single_major) + disk->minors = (1 << RBD_SINGLE_MAJOR_PART_SHIFT); + else + disk->minors = RBD_MINORS_PER_MAJOR; + disk->fops = &rbd_bd_ops; + disk->private_data = rbd_dev; rbd_dev->disk = disk; return 0; out_tag_set: blk_mq_free_tag_set(&rbd_dev->tag_set); -out_disk: - put_disk(disk); return err; } @@ -4466,17 +5022,12 @@ static ssize_t rbd_size_show(struct device *dev, (unsigned long long)rbd_dev->mapping.size); } -/* - * Note this shows the features for whatever's mapped, which is not - * necessarily the base image. - */ static ssize_t rbd_features_show(struct device *dev, struct device_attribute *attr, char *buf) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "0x%016llx\n", - (unsigned long long)rbd_dev->mapping.features); + return sprintf(buf, "0x%016llx\n", rbd_dev->header.features); } static ssize_t rbd_major_show(struct device *dev, @@ -4531,6 +5082,9 @@ static ssize_t rbd_config_info_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + if (!capable(CAP_SYS_ADMIN)) + return -EPERM; + return sprintf(buf, "%s\n", rbd_dev->config_info); } @@ -4551,6 +5105,14 @@ static ssize_t rbd_pool_id_show(struct device *dev, (unsigned long long) rbd_dev->spec->pool_id); } +static ssize_t rbd_pool_ns_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: ""); +} + static ssize_t rbd_name_show(struct device *dev, struct device_attribute *attr, char *buf) { @@ -4611,11 +5173,13 @@ static ssize_t rbd_parent_show(struct device *dev, count += sprintf(&buf[count], "%s" "pool_id %llu\npool_name %s\n" + "pool_ns %s\n" "image_id %s\nimage_name %s\n" "snap_id %llu\nsnap_name %s\n" "overlap %llu\n", !count ? "" : "\n", /* first? */ spec->pool_id, spec->pool_name, + spec->pool_ns ?: "", spec->image_id, spec->image_name ?: "(unknown)", spec->snap_id, spec->snap_name, rbd_dev->parent_overlap); @@ -4632,6 +5196,9 @@ static ssize_t rbd_image_refresh(struct device *dev, struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); int ret; + if (!capable(CAP_SYS_ADMIN)) + return -EPERM; + ret = rbd_dev_refresh(rbd_dev); if (ret) return ret; @@ -4639,22 +5206,23 @@ static ssize_t rbd_image_refresh(struct device *dev, return size; } -static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); -static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); -static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); -static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); -static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); -static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); -static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL); -static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL); -static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); -static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); -static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); -static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); -static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); -static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); -static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); -static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); +static DEVICE_ATTR(size, 0444, rbd_size_show, NULL); +static DEVICE_ATTR(features, 0444, rbd_features_show, NULL); +static DEVICE_ATTR(major, 0444, rbd_major_show, NULL); +static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL); +static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL); +static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL); +static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL); +static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL); +static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL); +static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL); +static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL); +static DEVICE_ATTR(name, 0444, rbd_name_show, NULL); +static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL); +static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh); +static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL); +static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL); +static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL); static struct attribute *rbd_attrs[] = { &dev_attr_size.attr, @@ -4667,6 +5235,7 @@ static struct attribute *rbd_attrs[] = { &dev_attr_config_info.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, + &dev_attr_pool_ns.attr, &dev_attr_name.attr, &dev_attr_image_id.attr, &dev_attr_current_snap.attr, @@ -4727,6 +5296,7 @@ static void rbd_spec_free(struct kref *kref) struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); kfree(spec->pool_name); + kfree(spec->pool_ns); kfree(spec->image_id); kfree(spec->image_name); kfree(spec->snap_name); @@ -4755,7 +5325,7 @@ static void rbd_dev_release(struct device *dev) if (need_put) { destroy_workqueue(rbd_dev->task_wq); - ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + ida_free(&rbd_dev_id_ida, rbd_dev->dev_id); } rbd_dev_free(rbd_dev); @@ -4769,8 +5339,7 @@ static void rbd_dev_release(struct device *dev) module_put(THIS_MODULE); } -static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, - struct rbd_spec *spec) +static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec) { struct rbd_device *rbd_dev; @@ -4785,6 +5354,12 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, rbd_dev->header.data_pool_id = CEPH_NOPOOL; ceph_oid_init(&rbd_dev->header_oid); rbd_dev->header_oloc.pool = spec->pool_id; + if (spec->pool_ns) { + WARN_ON(!*spec->pool_ns); + rbd_dev->header_oloc.pool_ns = + ceph_find_or_create_string(spec->pool_ns, + strlen(spec->pool_ns)); + } mutex_init(&rbd_dev->watch_mutex); rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; @@ -4796,16 +5371,19 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); - init_waitqueue_head(&rbd_dev->lock_waitq); + spin_lock_init(&rbd_dev->lock_lists_lock); + INIT_LIST_HEAD(&rbd_dev->acquiring_list); + INIT_LIST_HEAD(&rbd_dev->running_list); + init_completion(&rbd_dev->acquire_wait); + init_completion(&rbd_dev->quiescing_wait); + + spin_lock_init(&rbd_dev->object_map_lock); rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; device_initialize(&rbd_dev->dev); - rbd_dev->rbd_client = rbdc; - rbd_dev->spec = spec; - return rbd_dev; } @@ -4818,16 +5396,14 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, { struct rbd_device *rbd_dev; - rbd_dev = __rbd_dev_create(rbdc, spec); + rbd_dev = __rbd_dev_create(spec); if (!rbd_dev) return NULL; - rbd_dev->opts = opts; - /* get an id and fill in device name */ - rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, - minor_to_rbd_dev_id(1 << MINORBITS), - GFP_KERNEL); + rbd_dev->dev_id = ida_alloc_max(&rbd_dev_id_ida, + minor_to_rbd_dev_id(1 << MINORBITS) - 1, + GFP_KERNEL); if (rbd_dev->dev_id < 0) goto fail_rbd_dev; @@ -4840,11 +5416,15 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, /* we have a ref from do_rbd_add() */ __module_get(THIS_MODULE); + rbd_dev->rbd_client = rbdc; + rbd_dev->spec = spec; + rbd_dev->opts = opts; + dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); return rbd_dev; fail_dev_id: - ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + ida_free(&rbd_dev_id_ida, rbd_dev->dev_id); fail_rbd_dev: rbd_dev_free(rbd_dev); return NULL; @@ -4894,41 +5474,39 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, return 0; } -static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) -{ - return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, - &rbd_dev->header.obj_order, - &rbd_dev->header.image_size); -} - -static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) +static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev, + char **pobject_prefix) { + size_t size; void *reply_buf; + char *object_prefix; int ret; void *p; - reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); + /* Response will be an encoded string, which includes a length */ + size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX; + reply_buf = kzalloc(size, GFP_KERNEL); if (!reply_buf) return -ENOMEM; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, &rbd_dev->header_oloc, "get_object_prefix", - NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); + NULL, 0, reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; p = reply_buf; - rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, - p + ret, NULL, GFP_NOIO); + object_prefix = ceph_extract_encoded_string(&p, p + ret, NULL, + GFP_NOIO); + if (IS_ERR(object_prefix)) { + ret = PTR_ERR(object_prefix); + goto out; + } ret = 0; - if (IS_ERR(rbd_dev->header.object_prefix)) { - ret = PTR_ERR(rbd_dev->header.object_prefix); - rbd_dev->header.object_prefix = NULL; - } else { - dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); - } + *pobject_prefix = object_prefix; + dout(" object_prefix = %s\n", object_prefix); out: kfree(reply_buf); @@ -4936,9 +5514,12 @@ out: } static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, - u64 *snap_features) + bool read_only, u64 *snap_features) { - __le64 snapid = cpu_to_le64(snap_id); + struct { + __le64 snap_id; + u8 read_only; + } features_in; struct { __le64 features; __le64 incompat; @@ -4946,9 +5527,12 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 unsup; int ret; + features_in.snap_id = cpu_to_le64(snap_id); + features_in.read_only = read_only; + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, &rbd_dev->header_oloc, "get_features", - &snapid, sizeof(snapid), + &features_in, sizeof(features_in), &features_buf, sizeof(features_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) @@ -4973,141 +5557,265 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, return 0; } -static int rbd_dev_v2_features(struct rbd_device *rbd_dev) +/* + * These are generic image flags, but since they are used only for + * object map, store them in rbd_dev->object_map_flags. + * + * For the same reason, this function is called only on object map + * (re)load and not on header refresh. + */ +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev) { - return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, - &rbd_dev->header.features); + __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id); + __le64 flags; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_flags", + &snapid, sizeof(snapid), + &flags, sizeof(flags)); + if (ret < 0) + return ret; + if (ret < sizeof(flags)) + return -EBADMSG; + + rbd_dev->object_map_flags = le64_to_cpu(flags); + return 0; } -static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) +struct parent_image_info { + u64 pool_id; + const char *pool_ns; + const char *image_id; + u64 snap_id; + + bool has_overlap; + u64 overlap; +}; + +static void rbd_parent_info_cleanup(struct parent_image_info *pii) { - struct rbd_spec *parent_spec; - size_t size; - void *reply_buf = NULL; - __le64 snapid; + kfree(pii->pool_ns); + kfree(pii->image_id); + + memset(pii, 0, sizeof(*pii)); +} + +/* + * The caller is responsible for @pii. + */ +static int decode_parent_image_spec(void **p, void *end, + struct parent_image_info *pii) +{ + u8 struct_v; + u32 struct_len; + int ret; + + ret = ceph_start_decoding(p, end, 1, "ParentImageSpec", + &struct_v, &struct_len); + if (ret) + return ret; + + ceph_decode_64_safe(p, end, pii->pool_id, e_inval); + pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); + if (IS_ERR(pii->pool_ns)) { + ret = PTR_ERR(pii->pool_ns); + pii->pool_ns = NULL; + return ret; + } + pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); + if (IS_ERR(pii->image_id)) { + ret = PTR_ERR(pii->image_id); + pii->image_id = NULL; + return ret; + } + ceph_decode_64_safe(p, end, pii->snap_id, e_inval); + return 0; + +e_inval: + return -EINVAL; +} + +static int __get_parent_info(struct rbd_device *rbd_dev, + struct page *req_page, + struct page *reply_page, + struct parent_image_info *pii) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + size_t reply_len = PAGE_SIZE; + void *p, *end; + int ret; + + ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + "rbd", "parent_get", CEPH_OSD_FLAG_READ, + req_page, sizeof(u64), &reply_page, &reply_len); + if (ret) + return ret == -EOPNOTSUPP ? 1 : ret; + + p = page_address(reply_page); + end = p + reply_len; + ret = decode_parent_image_spec(&p, end, pii); + if (ret) + return ret; + + ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, + req_page, sizeof(u64), &reply_page, &reply_len); + if (ret) + return ret; + + p = page_address(reply_page); + end = p + reply_len; + ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval); + if (pii->has_overlap) + ceph_decode_64_safe(&p, end, pii->overlap, e_inval); + + dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", + __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id, + pii->has_overlap, pii->overlap); + return 0; + +e_inval: + return -EINVAL; +} + +/* + * The caller is responsible for @pii. + */ +static int __get_parent_info_legacy(struct rbd_device *rbd_dev, + struct page *req_page, + struct page *reply_page, + struct parent_image_info *pii) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + size_t reply_len = PAGE_SIZE; + void *p, *end; + int ret; + + ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + "rbd", "get_parent", CEPH_OSD_FLAG_READ, + req_page, sizeof(u64), &reply_page, &reply_len); + if (ret) + return ret; + + p = page_address(reply_page); + end = p + reply_len; + ceph_decode_64_safe(&p, end, pii->pool_id, e_inval); + pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); + if (IS_ERR(pii->image_id)) { + ret = PTR_ERR(pii->image_id); + pii->image_id = NULL; + return ret; + } + ceph_decode_64_safe(&p, end, pii->snap_id, e_inval); + pii->has_overlap = true; + ceph_decode_64_safe(&p, end, pii->overlap, e_inval); + + dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", + __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id, + pii->has_overlap, pii->overlap); + return 0; + +e_inval: + return -EINVAL; +} + +static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev, + struct parent_image_info *pii) +{ + struct page *req_page, *reply_page; void *p; - void *end; - u64 pool_id; - char *image_id; - u64 snap_id; - u64 overlap; int ret; - parent_spec = rbd_spec_alloc(); - if (!parent_spec) + req_page = alloc_page(GFP_KERNEL); + if (!req_page) return -ENOMEM; - size = sizeof (__le64) + /* pool_id */ - sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ - sizeof (__le64) + /* snap_id */ - sizeof (__le64); /* overlap */ - reply_buf = kmalloc(size, GFP_KERNEL); - if (!reply_buf) { - ret = -ENOMEM; - goto out_err; + reply_page = alloc_page(GFP_KERNEL); + if (!reply_page) { + __free_page(req_page); + return -ENOMEM; } - snapid = cpu_to_le64(rbd_dev->spec->snap_id); - ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, - &rbd_dev->header_oloc, "get_parent", - &snapid, sizeof(snapid), reply_buf, size); - dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); - if (ret < 0) - goto out_err; + p = page_address(req_page); + ceph_encode_64(&p, rbd_dev->spec->snap_id); + ret = __get_parent_info(rbd_dev, req_page, reply_page, pii); + if (ret > 0) + ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page, + pii); - p = reply_buf; - end = reply_buf + ret; - ret = -ERANGE; - ceph_decode_64_safe(&p, end, pool_id, out_err); - if (pool_id == CEPH_NOPOOL) { - /* - * Either the parent never existed, or we have - * record of it but the image got flattened so it no - * longer has a parent. When the parent of a - * layered image disappears we immediately set the - * overlap to 0. The effect of this is that all new - * requests will be treated as if the image had no - * parent. - */ - if (rbd_dev->parent_overlap) { - rbd_dev->parent_overlap = 0; - rbd_dev_parent_put(rbd_dev); - pr_info("%s: clone image has been flattened\n", - rbd_dev->disk->disk_name); - } + __free_page(req_page); + __free_page(reply_page); + return ret; +} +static int rbd_dev_setup_parent(struct rbd_device *rbd_dev) +{ + struct rbd_spec *parent_spec; + struct parent_image_info pii = { 0 }; + int ret; + + parent_spec = rbd_spec_alloc(); + if (!parent_spec) + return -ENOMEM; + + ret = rbd_dev_v2_parent_info(rbd_dev, &pii); + if (ret) + goto out_err; + + if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) goto out; /* No parent? No problem. */ - } /* The ceph file layout needs to fit pool id in 32 bits */ ret = -EIO; - if (pool_id > (u64)U32_MAX) { + if (pii.pool_id > (u64)U32_MAX) { rbd_warn(NULL, "parent pool id too large (%llu > %u)", - (unsigned long long)pool_id, U32_MAX); + (unsigned long long)pii.pool_id, U32_MAX); goto out_err; } - image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); - if (IS_ERR(image_id)) { - ret = PTR_ERR(image_id); - goto out_err; - } - ceph_decode_64_safe(&p, end, snap_id, out_err); - ceph_decode_64_safe(&p, end, overlap, out_err); - /* - * The parent won't change (except when the clone is - * flattened, already handled that). So we only need to - * record the parent spec we have not already done so. + * The parent won't change except when the clone is flattened, + * so we only need to record the parent image spec once. */ - if (!rbd_dev->parent_spec) { - parent_spec->pool_id = pool_id; - parent_spec->image_id = image_id; - parent_spec->snap_id = snap_id; - rbd_dev->parent_spec = parent_spec; - parent_spec = NULL; /* rbd_dev now owns this */ - } else { - kfree(image_id); + parent_spec->pool_id = pii.pool_id; + if (pii.pool_ns && *pii.pool_ns) { + parent_spec->pool_ns = pii.pool_ns; + pii.pool_ns = NULL; } + parent_spec->image_id = pii.image_id; + pii.image_id = NULL; + parent_spec->snap_id = pii.snap_id; + + rbd_assert(!rbd_dev->parent_spec); + rbd_dev->parent_spec = parent_spec; + parent_spec = NULL; /* rbd_dev now owns this */ /* - * We always update the parent overlap. If it's zero we issue - * a warning, as we will proceed as if there was no parent. + * Record the parent overlap. If it's zero, issue a warning as + * we will proceed as if there is no parent. */ - if (!overlap) { - if (parent_spec) { - /* refresh, careful to warn just once */ - if (rbd_dev->parent_overlap) - rbd_warn(rbd_dev, - "clone now standalone (overlap became 0)"); - } else { - /* initial probe */ - rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); - } - } - rbd_dev->parent_overlap = overlap; + if (!pii.overlap) + rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); + rbd_dev->parent_overlap = pii.overlap; out: ret = 0; out_err: - kfree(reply_buf); + rbd_parent_info_cleanup(&pii); rbd_spec_put(parent_spec); - return ret; } -static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) +static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev, + u64 *stripe_unit, u64 *stripe_count) { struct { __le64 stripe_unit; __le64 stripe_count; } __attribute__ ((packed)) striping_info_buf = { 0 }; size_t size = sizeof (striping_info_buf); - void *p; - u64 obj_size; - u64 stripe_unit; - u64 stripe_count; int ret; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, @@ -5119,49 +5827,33 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) if (ret < size) return -ERANGE; - /* - * We don't actually support the "fancy striping" feature - * (STRIPINGV2) yet, but if the striping sizes are the - * defaults the behavior is the same as before. So find - * out, and only fail if the image has non-default values. - */ - ret = -EINVAL; - obj_size = rbd_obj_bytes(&rbd_dev->header); - p = &striping_info_buf; - stripe_unit = ceph_decode_64(&p); - if (stripe_unit != obj_size) { - rbd_warn(rbd_dev, "unsupported stripe unit " - "(got %llu want %llu)", - stripe_unit, obj_size); - return -EINVAL; - } - stripe_count = ceph_decode_64(&p); - if (stripe_count != 1) { - rbd_warn(rbd_dev, "unsupported stripe count " - "(got %llu want 1)", stripe_count); - return -EINVAL; - } - rbd_dev->header.stripe_unit = stripe_unit; - rbd_dev->header.stripe_count = stripe_count; + *stripe_unit = le64_to_cpu(striping_info_buf.stripe_unit); + *stripe_count = le64_to_cpu(striping_info_buf.stripe_count); + dout(" stripe_unit = %llu stripe_count = %llu\n", *stripe_unit, + *stripe_count); return 0; } -static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) +static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev, s64 *data_pool_id) { - __le64 data_pool_id; + __le64 data_pool_buf; int ret; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, &rbd_dev->header_oloc, "get_data_pool", - NULL, 0, &data_pool_id, sizeof(data_pool_id)); + NULL, 0, &data_pool_buf, + sizeof(data_pool_buf)); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; - if (ret < sizeof(data_pool_id)) + if (ret < sizeof(data_pool_buf)) return -EBADMSG; - rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); - WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); + *data_pool_id = le64_to_cpu(data_pool_buf); + dout(" data_pool_id = %lld\n", *data_pool_id); + WARN_ON(*data_pool_id == CEPH_NOPOOL); + return 0; } @@ -5353,7 +6045,8 @@ out_err: return ret; } -static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) +static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, + struct ceph_snap_context **psnapc) { size_t size; int ret; @@ -5414,9 +6107,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) for (i = 0; i < snap_count; i++) snapc->snaps[i] = ceph_decode_64(&p); - ceph_put_snap_context(rbd_dev->header.snapc); - rbd_dev->header.snapc = snapc; - + *psnapc = snapc; dout(" snap context seq = %llu, snap_count = %u\n", (unsigned long long)seq, (unsigned int)snap_count); out: @@ -5465,38 +6156,42 @@ out: return snap_name; } -static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) +static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev, + struct rbd_image_header *header, + bool first_time) { - bool first_time = rbd_dev->header.object_prefix == NULL; int ret; - ret = rbd_dev_v2_image_size(rbd_dev); + ret = _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, + first_time ? &header->obj_order : NULL, + &header->image_size); if (ret) return ret; if (first_time) { - ret = rbd_dev_v2_header_onetime(rbd_dev); + ret = rbd_dev_v2_header_onetime(rbd_dev, header); if (ret) return ret; } - ret = rbd_dev_v2_snap_context(rbd_dev); - if (ret && first_time) { - kfree(rbd_dev->header.object_prefix); - rbd_dev->header.object_prefix = NULL; - } + ret = rbd_dev_v2_snap_context(rbd_dev, &header->snapc); + if (ret) + return ret; - return ret; + return 0; } -static int rbd_dev_header_info(struct rbd_device *rbd_dev) +static int rbd_dev_header_info(struct rbd_device *rbd_dev, + struct rbd_image_header *header, + bool first_time) { rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + rbd_assert(!header->object_prefix && !header->snapc); if (rbd_dev->image_format == 1) - return rbd_dev_v1_header_info(rbd_dev); + return rbd_dev_v1_header_info(rbd_dev, header, first_time); - return rbd_dev_v2_header_info(rbd_dev); + return rbd_dev_v2_header_info(rbd_dev, header, first_time); } /* @@ -5511,7 +6206,7 @@ static inline size_t next_token(const char **buf) * These are the characters that produce nonzero for * isspace() in the "C" and "POSIX" locales. */ - const char *spaces = " \f\n\r\t\v"; + static const char spaces[] = " \f\n\r\t\v"; *buf += strspn(*buf, spaces); /* Find start of token */ @@ -5552,6 +6247,141 @@ static inline char *dup_token(const char **buf, size_t *lenp) return dup; } +static int rbd_parse_param(struct fs_parameter *param, + struct rbd_parse_opts_ctx *pctx) +{ + struct rbd_options *opt = pctx->opts; + struct fs_parse_result result; + struct p_log log = {.prefix = "rbd"}; + int token, ret; + + ret = ceph_parse_param(param, pctx->copts, NULL); + if (ret != -ENOPARAM) + return ret; + + token = __fs_parse(&log, rbd_parameters, param, &result); + dout("%s fs_parse '%s' token %d\n", __func__, param->key, token); + if (token < 0) { + if (token == -ENOPARAM) + return inval_plog(&log, "Unknown parameter '%s'", + param->key); + return token; + } + + switch (token) { + case Opt_queue_depth: + if (result.uint_32 < 1) + goto out_of_range; + opt->queue_depth = result.uint_32; + break; + case Opt_alloc_size: + if (result.uint_32 < SECTOR_SIZE) + goto out_of_range; + if (!is_power_of_2(result.uint_32)) + return inval_plog(&log, "alloc_size must be a power of 2"); + opt->alloc_size = result.uint_32; + break; + case Opt_lock_timeout: + /* 0 is "wait forever" (i.e. infinite timeout) */ + if (result.uint_32 > INT_MAX / 1000) + goto out_of_range; + opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000); + break; + case Opt_pool_ns: + kfree(pctx->spec->pool_ns); + pctx->spec->pool_ns = param->string; + param->string = NULL; + break; + case Opt_compression_hint: + switch (result.uint_32) { + case Opt_compression_hint_none: + opt->alloc_hint_flags &= + ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE | + CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE); + break; + case Opt_compression_hint_compressible: + opt->alloc_hint_flags |= + CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE; + opt->alloc_hint_flags &= + ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE; + break; + case Opt_compression_hint_incompressible: + opt->alloc_hint_flags |= + CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE; + opt->alloc_hint_flags &= + ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE; + break; + default: + BUG(); + } + break; + case Opt_read_only: + opt->read_only = true; + break; + case Opt_read_write: + opt->read_only = false; + break; + case Opt_lock_on_read: + opt->lock_on_read = true; + break; + case Opt_exclusive: + opt->exclusive = true; + break; + case Opt_notrim: + opt->trim = false; + break; + default: + BUG(); + } + + return 0; + +out_of_range: + return inval_plog(&log, "%s out of range", param->key); +} + +/* + * This duplicates most of generic_parse_monolithic(), untying it from + * fs_context and skipping standard superblock and security options. + */ +static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx) +{ + char *key; + int ret = 0; + + dout("%s '%s'\n", __func__, options); + while ((key = strsep(&options, ",")) != NULL) { + if (*key) { + struct fs_parameter param = { + .key = key, + .type = fs_value_is_flag, + }; + char *value = strchr(key, '='); + size_t v_len = 0; + + if (value) { + if (value == key) + continue; + *value++ = 0; + v_len = strlen(value); + param.string = kmemdup_nul(value, v_len, + GFP_KERNEL); + if (!param.string) + return -ENOMEM; + param.type = fs_value_is_string; + } + param.size = v_len; + + ret = rbd_parse_param(¶m, pctx); + kfree(param.string); + if (ret) + break; + } + } + + return ret; +} + /* * Parse the options provided for an "rbd add" (i.e., rbd image * mapping) request. These arrive via a write to /sys/bus/rbd/add, @@ -5603,9 +6433,7 @@ static int rbd_add_parse_args(const char *buf, const char *mon_addrs; char *snap_name; size_t mon_addrs_size; - struct rbd_spec *spec = NULL; - struct rbd_options *rbd_opts = NULL; - struct ceph_options *copts; + struct rbd_parse_opts_ctx pctx = { 0 }; int ret; /* The first four tokens are required */ @@ -5616,7 +6444,7 @@ static int rbd_add_parse_args(const char *buf, return -EINVAL; } mon_addrs = buf; - mon_addrs_size = len + 1; + mon_addrs_size = len; buf += len; ret = -EINVAL; @@ -5628,22 +6456,22 @@ static int rbd_add_parse_args(const char *buf, goto out_err; } - spec = rbd_spec_alloc(); - if (!spec) + pctx.spec = rbd_spec_alloc(); + if (!pctx.spec) goto out_mem; - spec->pool_name = dup_token(&buf, NULL); - if (!spec->pool_name) + pctx.spec->pool_name = dup_token(&buf, NULL); + if (!pctx.spec->pool_name) goto out_mem; - if (!*spec->pool_name) { + if (!*pctx.spec->pool_name) { rbd_warn(NULL, "no pool name provided"); goto out_err; } - spec->image_name = dup_token(&buf, NULL); - if (!spec->image_name) + pctx.spec->image_name = dup_token(&buf, NULL); + if (!pctx.spec->image_name) goto out_mem; - if (!*spec->image_name) { + if (!*pctx.spec->image_name) { rbd_warn(NULL, "no image name provided"); goto out_err; } @@ -5664,73 +6492,48 @@ static int rbd_add_parse_args(const char *buf, if (!snap_name) goto out_mem; *(snap_name + len) = '\0'; - spec->snap_name = snap_name; + pctx.spec->snap_name = snap_name; + + pctx.copts = ceph_alloc_options(); + if (!pctx.copts) + goto out_mem; /* Initialize all rbd options to the defaults */ - rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); - if (!rbd_opts) + pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL); + if (!pctx.opts) goto out_mem; - rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; - rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; - rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; - rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT; + pctx.opts->read_only = RBD_READ_ONLY_DEFAULT; + pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; + pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT; + pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT; + pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; + pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT; + pctx.opts->trim = RBD_TRIM_DEFAULT; - copts = ceph_parse_options(options, mon_addrs, - mon_addrs + mon_addrs_size - 1, - parse_rbd_opts_token, rbd_opts); - if (IS_ERR(copts)) { - ret = PTR_ERR(copts); + ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL, + ','); + if (ret) goto out_err; - } - kfree(options); - *ceph_opts = copts; - *opts = rbd_opts; - *rbd_spec = spec; + ret = rbd_parse_options(options, &pctx); + if (ret) + goto out_err; + *ceph_opts = pctx.copts; + *opts = pctx.opts; + *rbd_spec = pctx.spec; + kfree(options); return 0; + out_mem: ret = -ENOMEM; out_err: - kfree(rbd_opts); - rbd_spec_put(spec); + kfree(pctx.opts); + ceph_destroy_options(pctx.copts); + rbd_spec_put(pctx.spec); kfree(options); - - return ret; -} - -/* - * Return pool id (>= 0) or a negative error code. - */ -static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) -{ - struct ceph_options *opts = rbdc->client->options; - u64 newest_epoch; - int tries = 0; - int ret; - -again: - ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); - if (ret == -ENOENT && tries++ < 1) { - ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap", - &newest_epoch); - if (ret < 0) - return ret; - - if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { - ceph_osdc_maybe_request_map(&rbdc->client->osdc); - (void) ceph_monc_wait_osdmap(&rbdc->client->monc, - newest_epoch, - opts->mount_timeout); - goto again; - } else { - /* the osdmap we have is new enough */ - return -ENOENT; - } - } - return ret; } @@ -5738,25 +6541,45 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) { down_write(&rbd_dev->lock_rwsem); if (__rbd_is_lock_owner(rbd_dev)) - rbd_unlock(rbd_dev); + __rbd_release_lock(rbd_dev); up_write(&rbd_dev->lock_rwsem); } +/* + * If the wait is interrupted, an error is returned even if the lock + * was successfully acquired. rbd_dev_image_unlock() will release it + * if needed. + */ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) { + long ret; + if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { + if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read) + return 0; + rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); return -EINVAL; } - /* FIXME: "rbd map --exclusive" should be in interruptible */ - down_read(&rbd_dev->lock_rwsem); - rbd_wait_state_locked(rbd_dev); - up_read(&rbd_dev->lock_rwsem); - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { - rbd_warn(rbd_dev, "failed to acquire exclusive lock"); - return -EROFS; + if (rbd_is_ro(rbd_dev)) + return 0; + + rbd_assert(!rbd_is_lock_owner(rbd_dev)); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait, + ceph_timeout_jiffies(rbd_dev->opts->lock_timeout)); + if (ret > 0) { + ret = rbd_dev->acquire_err; + } else { + cancel_delayed_work_sync(&rbd_dev->lock_dwork); + if (!ret) + ret = -ETIMEDOUT; + + rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret); } + if (ret) + return ret; return 0; } @@ -5807,7 +6630,6 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) dout("rbd id object name is %s\n", oid.name); /* Response will be an encoded string, which includes a length */ - size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; response = kzalloc(size, GFP_NOIO); if (!response) { @@ -5819,7 +6641,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, "get_id", NULL, 0, - response, RBD_IMAGE_ID_LEN_MAX); + response, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret == -ENOENT) { image_id = kstrdup("", GFP_KERNEL); @@ -5852,58 +6674,49 @@ out: */ static void rbd_dev_unprobe(struct rbd_device *rbd_dev) { - struct rbd_image_header *header; - rbd_dev_parent_put(rbd_dev); + rbd_object_map_free(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); /* Free dynamic fields from the header, then zero it out */ - header = &rbd_dev->header; - ceph_put_snap_context(header->snapc); - kfree(header->snap_sizes); - kfree(header->snap_names); - kfree(header->object_prefix); - memset(header, 0, sizeof (*header)); + rbd_image_header_cleanup(&rbd_dev->header); } -static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) +static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev, + struct rbd_image_header *header) { int ret; - ret = rbd_dev_v2_object_prefix(rbd_dev); + ret = rbd_dev_v2_object_prefix(rbd_dev, &header->object_prefix); if (ret) - goto out_err; + return ret; /* * Get the and check features for the image. Currently the * features are assumed to never change. */ - ret = rbd_dev_v2_features(rbd_dev); + ret = _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, + rbd_is_ro(rbd_dev), &header->features); if (ret) - goto out_err; + return ret; /* If the image supports fancy striping, get its parameters */ - if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { - ret = rbd_dev_v2_striping_info(rbd_dev); - if (ret < 0) - goto out_err; + if (header->features & RBD_FEATURE_STRIPINGV2) { + ret = rbd_dev_v2_striping_info(rbd_dev, &header->stripe_unit, + &header->stripe_count); + if (ret) + return ret; } - if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { - ret = rbd_dev_v2_data_pool(rbd_dev); + if (header->features & RBD_FEATURE_DATA_POOL) { + ret = rbd_dev_v2_data_pool(rbd_dev, &header->data_pool_id); if (ret) - goto out_err; + return ret; } - rbd_init_layout(rbd_dev); return 0; - -out_err: - rbd_dev->header.features = 0; - kfree(rbd_dev->header.object_prefix); - rbd_dev->header.object_prefix = NULL; - return ret; } /* @@ -5925,7 +6738,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) goto out_err; } - parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); + parent = __rbd_dev_create(rbd_dev->parent_spec); if (!parent) { ret = -ENOMEM; goto out_err; @@ -5935,8 +6748,10 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) * Images related by parent/child relationships always share * rbd_client and spec/parent_spec, so bump their refcounts. */ - __rbd_get_client(rbd_dev->rbd_client); - rbd_spec_get(rbd_dev->parent_spec); + parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client); + parent->spec = rbd_spec_get(rbd_dev->parent_spec); + + __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags); ret = rbd_dev_image_probe(parent, depth); if (ret < 0) @@ -5955,7 +6770,6 @@ out_err: static void rbd_dev_device_release(struct rbd_device *rbd_dev) { clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); - rbd_dev_mapping_clear(rbd_dev); rbd_free_disk(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); @@ -5989,23 +6803,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) if (ret) goto err_out_blkdev; - ret = rbd_dev_mapping_set(rbd_dev); - if (ret) - goto err_out_disk; - set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); - set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); + set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev)); ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); if (ret) - goto err_out_mapping; + goto err_out_disk; set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); return 0; -err_out_mapping: - rbd_dev_mapping_clear(rbd_dev); err_out_disk: rbd_free_disk(rbd_dev); err_out_blkdev: @@ -6034,11 +6842,30 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) return ret; } +static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap) +{ + if (!is_snap) { + pr_info("image %s/%s%s%s does not exist\n", + rbd_dev->spec->pool_name, + rbd_dev->spec->pool_ns ?: "", + rbd_dev->spec->pool_ns ? "/" : "", + rbd_dev->spec->image_name); + } else { + pr_info("snap %s/%s%s%s@%s does not exist\n", + rbd_dev->spec->pool_name, + rbd_dev->spec->pool_ns ?: "", + rbd_dev->spec->pool_ns ? "/" : "", + rbd_dev->spec->image_name, + rbd_dev->spec->snap_name); + } +} + static void rbd_dev_image_release(struct rbd_device *rbd_dev) { - rbd_dev_unprobe(rbd_dev); - if (rbd_dev->opts) + if (!rbd_is_ro(rbd_dev)) rbd_unregister_watch(rbd_dev); + + rbd_dev_unprobe(rbd_dev); rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); rbd_dev->spec->image_id = NULL; @@ -6049,9 +6876,13 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev) * device. If this image is the one being mapped (i.e., not a * parent), initiate a watch on its header object before using that * object to get detailed information about the rbd image. + * + * On success, returns with header_rwsem held for write if called + * with @depth == 0. */ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) { + bool need_watch = !rbd_is_ro(rbd_dev); int ret; /* @@ -6068,20 +6899,26 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) if (ret) goto err_out_format; - if (!depth) { + if (need_watch) { ret = rbd_register_watch(rbd_dev); if (ret) { if (ret == -ENOENT) - pr_info("image %s/%s does not exist\n", - rbd_dev->spec->pool_name, - rbd_dev->spec->image_name); + rbd_print_dne(rbd_dev, false); goto err_out_format; } } - ret = rbd_dev_header_info(rbd_dev); - if (ret) - goto err_out_watch; + if (!depth) + down_write(&rbd_dev->header_rwsem); + + ret = rbd_dev_header_info(rbd_dev, &rbd_dev->header, true); + if (ret) { + if (ret == -ENOENT && !need_watch) + rbd_print_dne(rbd_dev, false); + goto err_out_probe; + } + + rbd_init_layout(rbd_dev); /* * If this image is the one being mapped, we have pool name and @@ -6095,25 +6932,25 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) ret = rbd_spec_fill_names(rbd_dev); if (ret) { if (ret == -ENOENT) - pr_info("snap %s/%s@%s does not exist\n", - rbd_dev->spec->pool_name, - rbd_dev->spec->image_name, - rbd_dev->spec->snap_name); + rbd_print_dne(rbd_dev, true); goto err_out_probe; } - if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { - ret = rbd_dev_v2_parent_info(rbd_dev); + ret = rbd_dev_mapping_set(rbd_dev); + if (ret) + goto err_out_probe; + + if (rbd_is_snap(rbd_dev) && + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) { + ret = rbd_object_map_load(rbd_dev); if (ret) goto err_out_probe; + } - /* - * Need to warn users if this image is the one being - * mapped and has a parent. - */ - if (!depth && rbd_dev->parent_spec) - rbd_warn(rbd_dev, - "WARNING: kernel layering is EXPERIMENTAL!"); + if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { + ret = rbd_dev_setup_parent(rbd_dev); + if (ret) + goto err_out_probe; } ret = rbd_dev_probe_parent(rbd_dev, depth); @@ -6125,10 +6962,11 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) return 0; err_out_probe: - rbd_dev_unprobe(rbd_dev); -err_out_watch: if (!depth) + up_write(&rbd_dev->header_rwsem); + if (need_watch) rbd_unregister_watch(rbd_dev); + rbd_dev_unprobe(rbd_dev); err_out_format: rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); @@ -6136,18 +6974,119 @@ err_out_format: return ret; } -static ssize_t do_rbd_add(struct bus_type *bus, - const char *buf, - size_t count) +static void rbd_dev_update_header(struct rbd_device *rbd_dev, + struct rbd_image_header *header) +{ + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + rbd_assert(rbd_dev->header.object_prefix); /* !first_time */ + + if (rbd_dev->header.image_size != header->image_size) { + rbd_dev->header.image_size = header->image_size; + + if (!rbd_is_snap(rbd_dev)) { + rbd_dev->mapping.size = header->image_size; + rbd_dev_update_size(rbd_dev); + } + } + + ceph_put_snap_context(rbd_dev->header.snapc); + rbd_dev->header.snapc = header->snapc; + header->snapc = NULL; + + if (rbd_dev->image_format == 1) { + kfree(rbd_dev->header.snap_names); + rbd_dev->header.snap_names = header->snap_names; + header->snap_names = NULL; + + kfree(rbd_dev->header.snap_sizes); + rbd_dev->header.snap_sizes = header->snap_sizes; + header->snap_sizes = NULL; + } +} + +static void rbd_dev_update_parent(struct rbd_device *rbd_dev, + struct parent_image_info *pii) +{ + if (pii->pool_id == CEPH_NOPOOL || !pii->has_overlap) { + /* + * Either the parent never existed, or we have + * record of it but the image got flattened so it no + * longer has a parent. When the parent of a + * layered image disappears we immediately set the + * overlap to 0. The effect of this is that all new + * requests will be treated as if the image had no + * parent. + * + * If !pii.has_overlap, the parent image spec is not + * applicable. It's there to avoid duplication in each + * snapshot record. + */ + if (rbd_dev->parent_overlap) { + rbd_dev->parent_overlap = 0; + rbd_dev_parent_put(rbd_dev); + pr_info("%s: clone has been flattened\n", + rbd_dev->disk->disk_name); + } + } else { + rbd_assert(rbd_dev->parent_spec); + + /* + * Update the parent overlap. If it became zero, issue + * a warning as we will proceed as if there is no parent. + */ + if (!pii->overlap && rbd_dev->parent_overlap) + rbd_warn(rbd_dev, + "clone has become standalone (overlap 0)"); + rbd_dev->parent_overlap = pii->overlap; + } +} + +static int rbd_dev_refresh(struct rbd_device *rbd_dev) +{ + struct rbd_image_header header = { 0 }; + struct parent_image_info pii = { 0 }; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = rbd_dev_header_info(rbd_dev, &header, false); + if (ret) + goto out; + + /* + * If there is a parent, see if it has disappeared due to the + * mapped image getting flattened. + */ + if (rbd_dev->parent) { + ret = rbd_dev_v2_parent_info(rbd_dev, &pii); + if (ret) + goto out; + } + + down_write(&rbd_dev->header_rwsem); + rbd_dev_update_header(rbd_dev, &header); + if (rbd_dev->parent) + rbd_dev_update_parent(rbd_dev, &pii); + up_write(&rbd_dev->header_rwsem); + +out: + rbd_parent_info_cleanup(&pii); + rbd_image_header_cleanup(&header); + return ret; +} + +static ssize_t do_rbd_add(const char *buf, size_t count) { struct rbd_device *rbd_dev = NULL; struct ceph_options *ceph_opts = NULL; struct rbd_options *rbd_opts = NULL; struct rbd_spec *spec = NULL; struct rbd_client *rbdc; - bool read_only; int rc; + if (!capable(CAP_SYS_ADMIN)) + return -EPERM; + if (!try_module_get(THIS_MODULE)) return -ENODEV; @@ -6163,7 +7102,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, } /* pick the pool */ - rc = rbd_add_get_pool_id(rbdc, spec->pool_name); + rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); if (rc < 0) { if (rc == -ENOENT) pr_info("pool %s does not exist\n", spec->pool_name); @@ -6180,35 +7119,34 @@ static ssize_t do_rbd_add(struct bus_type *bus, spec = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */ + /* if we are mapping a snapshot it will be a read-only mapping */ + if (rbd_dev->opts->read_only || + strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) + __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags); + rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); if (!rbd_dev->config_info) { rc = -ENOMEM; goto err_out_rbd_dev; } - down_write(&rbd_dev->header_rwsem); rc = rbd_dev_image_probe(rbd_dev, 0); - if (rc < 0) { - up_write(&rbd_dev->header_rwsem); + if (rc < 0) goto err_out_rbd_dev; - } - - /* If we are mapping a snapshot it must be marked read-only */ - read_only = rbd_dev->opts->read_only; - if (rbd_dev->spec->snap_id != CEPH_NOSNAP) - read_only = true; - rbd_dev->mapping.read_only = read_only; + if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) { + rbd_warn(rbd_dev, "alloc_size adjusted to %u", + rbd_dev->layout.object_size); + rbd_dev->opts->alloc_size = rbd_dev->layout.object_size; + } rc = rbd_dev_device_setup(rbd_dev); if (rc) goto err_out_image_probe; - if (rbd_dev->opts->exclusive) { - rc = rbd_add_acquire_lock(rbd_dev); - if (rc) - goto err_out_device_setup; - } + rc = rbd_add_acquire_lock(rbd_dev); + if (rc) + goto err_out_image_lock; /* Everything's ready. Announce the disk to the world. */ @@ -6216,9 +7154,9 @@ static ssize_t do_rbd_add(struct bus_type *bus, if (rc) goto err_out_image_lock; - add_disk(rbd_dev->disk); - /* see rbd_init_disk() */ - blk_put_queue(rbd_dev->disk->queue); + rc = device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL); + if (rc) + goto err_out_cleanup_disk; spin_lock(&rbd_dev_list_lock); list_add_tail(&rbd_dev->node, &rbd_dev_list); @@ -6232,9 +7170,10 @@ out: module_put(THIS_MODULE); return rc; +err_out_cleanup_disk: + rbd_free_disk(rbd_dev); err_out_image_lock: rbd_dev_image_unlock(rbd_dev); -err_out_device_setup: rbd_dev_device_release(rbd_dev); err_out_image_probe: rbd_dev_image_release(rbd_dev); @@ -6248,21 +7187,18 @@ err_out_args: goto out; } -static ssize_t rbd_add(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count) { if (single_major) return -EINVAL; - return do_rbd_add(bus, buf, count); + return do_rbd_add(buf, count); } -static ssize_t rbd_add_single_major(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf, + size_t count) { - return do_rbd_add(bus, buf, count); + return do_rbd_add(buf, count); } static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) @@ -6292,18 +7228,17 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) } } -static ssize_t do_rbd_remove(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t do_rbd_remove(const char *buf, size_t count) { struct rbd_device *rbd_dev = NULL; - struct list_head *tmp; int dev_id; char opt_buf[6]; - bool already = false; bool force = false; int ret; + if (!capable(CAP_SYS_ADMIN)) + return -EPERM; + dev_id = -1; opt_buf[0] = '\0'; sscanf(buf, "%d %5s", &dev_id, opt_buf); @@ -6322,8 +7257,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus, ret = -ENOENT; spin_lock(&rbd_dev_list_lock); - list_for_each(tmp, &rbd_dev_list) { - rbd_dev = list_entry(tmp, struct rbd_device, node); + list_for_each_entry(rbd_dev, &rbd_dev_list, node) { if (rbd_dev->dev_id == dev_id) { ret = 0; break; @@ -6333,13 +7267,13 @@ static ssize_t do_rbd_remove(struct bus_type *bus, spin_lock_irq(&rbd_dev->lock); if (rbd_dev->open_count && !force) ret = -EBUSY; - else - already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, - &rbd_dev->flags); + else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING, + &rbd_dev->flags)) + ret = -EINPROGRESS; spin_unlock_irq(&rbd_dev->lock); } spin_unlock(&rbd_dev_list_lock); - if (ret < 0 || already) + if (ret) return ret; if (force) { @@ -6347,8 +7281,10 @@ static ssize_t do_rbd_remove(struct bus_type *bus, * Prevent new IO from being queued and wait for existing * IO to complete/fail. */ - blk_mq_freeze_queue(rbd_dev->disk->queue); - blk_set_queue_dying(rbd_dev->disk->queue); + unsigned int memflags = blk_mq_freeze_queue(rbd_dev->disk->queue); + + blk_mark_disk_dead(rbd_dev->disk); + blk_mq_unfreeze_queue(rbd_dev->disk->queue, memflags); } del_gendisk(rbd_dev->disk); @@ -6364,34 +7300,33 @@ static ssize_t do_rbd_remove(struct bus_type *bus, return count; } -static ssize_t rbd_remove(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t remove_store(const struct bus_type *bus, const char *buf, size_t count) { if (single_major) return -EINVAL; - return do_rbd_remove(bus, buf, count); + return do_rbd_remove(buf, count); } -static ssize_t rbd_remove_single_major(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf, + size_t count) { - return do_rbd_remove(bus, buf, count); + return do_rbd_remove(buf, count); } /* * create control files in sysfs * /sys/bus/rbd/... */ -static int rbd_sysfs_init(void) +static int __init rbd_sysfs_init(void) { int ret; ret = device_register(&rbd_root_dev); - if (ret < 0) + if (ret < 0) { + put_device(&rbd_root_dev); return ret; + } ret = bus_register(&rbd_bus_type); if (ret < 0) @@ -6400,13 +7335,13 @@ static int rbd_sysfs_init(void) return ret; } -static void rbd_sysfs_cleanup(void) +static void __exit rbd_sysfs_cleanup(void) { bus_unregister(&rbd_bus_type); device_unregister(&rbd_root_dev); } -static int rbd_slab_init(void) +static int __init rbd_slab_init(void) { rbd_assert(!rbd_img_request_cache); rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); @@ -6418,16 +7353,8 @@ static int rbd_slab_init(void) if (!rbd_obj_request_cache) goto out_err; - rbd_assert(!rbd_bio_clone); - rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0); - if (!rbd_bio_clone) - goto out_err_clone; - return 0; -out_err_clone: - kmem_cache_destroy(rbd_obj_request_cache); - rbd_obj_request_cache = NULL; out_err: kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; @@ -6443,10 +7370,6 @@ static void rbd_slab_exit(void) rbd_assert(rbd_img_request_cache); kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; - - rbd_assert(rbd_bio_clone); - bioset_free(rbd_bio_clone); - rbd_bio_clone = NULL; } static int __init rbd_init(void) @@ -6466,7 +7389,7 @@ static int __init rbd_init(void) * The number of active work items is limited by the number of * rbd devices * queue depth, so leave @max_active at default. */ - rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0); + rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM | WQ_PERCPU, 0); if (!rbd_wq) { rc = -ENOMEM; goto err_out_slab; |
