diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
| -rw-r--r-- | fs/ceph/mds_client.c | 2020 |
1 files changed, 1494 insertions, 526 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 26a0a8b9975e..1740047aef0f 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -12,9 +12,11 @@ #include <linux/bits.h> #include <linux/ktime.h> #include <linux/bitmap.h> +#include <linux/mnt_idmapping.h> #include "super.h" #include "mds_client.h" +#include "crypto.h" #include <linux/ceph/ceph_features.h> #include <linux/ceph/messenger.h> @@ -184,8 +186,54 @@ static int parse_reply_info_in(void **p, void *end, info->rsnaps = 0; } + if (struct_v >= 5) { + u32 alen; + + ceph_decode_32_safe(p, end, alen, bad); + + while (alen--) { + u32 len; + + /* key */ + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_skip_n(p, end, len, bad); + /* value */ + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_skip_n(p, end, len, bad); + } + } + + /* fscrypt flag -- ignore */ + if (struct_v >= 6) + ceph_decode_skip_8(p, end, bad); + + info->fscrypt_auth = NULL; + info->fscrypt_auth_len = 0; + info->fscrypt_file = NULL; + info->fscrypt_file_len = 0; + if (struct_v >= 7) { + ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); + if (info->fscrypt_auth_len) { + info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, + GFP_KERNEL); + if (!info->fscrypt_auth) + return -ENOMEM; + ceph_decode_copy_safe(p, end, info->fscrypt_auth, + info->fscrypt_auth_len, bad); + } + ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); + if (info->fscrypt_file_len) { + info->fscrypt_file = kmalloc(info->fscrypt_file_len, + GFP_KERNEL); + if (!info->fscrypt_file) + return -ENOMEM; + ceph_decode_copy_safe(p, end, info->fscrypt_file, + info->fscrypt_file_len, bad); + } + } *p = end; } else { + /* legacy (unversioned) struct */ if (features & CEPH_FEATURE_MDS_INLINE_DATA) { ceph_decode_64_safe(p, end, info->inline_version, bad); ceph_decode_32_safe(p, end, info->inline_len, bad); @@ -263,27 +311,47 @@ bad: static int parse_reply_info_lease(void **p, void *end, struct ceph_mds_reply_lease **lease, - u64 features) + u64 features, u32 *altname_len, u8 **altname) { + u8 struct_v; + u32 struct_len; + void *lend; + if (features == (u64)-1) { - u8 struct_v, struct_compat; - u32 struct_len; + u8 struct_compat; + ceph_decode_8_safe(p, end, struct_v, bad); ceph_decode_8_safe(p, end, struct_compat, bad); + /* struct_v is expected to be >= 1. we only understand * encoding whose struct_compat == 1. */ if (!struct_v || struct_compat != 1) goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); - ceph_decode_need(p, end, struct_len, bad); - end = *p + struct_len; + } else { + struct_len = sizeof(**lease); + *altname_len = 0; + *altname = NULL; } - ceph_decode_need(p, end, sizeof(**lease), bad); + lend = *p + struct_len; + ceph_decode_need(p, end, struct_len, bad); *lease = *p; *p += sizeof(**lease); - if (features == (u64)-1) - *p = end; + + if (features == (u64)-1) { + if (struct_v >= 2) { + ceph_decode_32_safe(p, end, *altname_len, bad); + ceph_decode_need(p, end, *altname_len, bad); + *altname = *p; + *p += *altname_len; + } else { + *altname = NULL; + *altname_len = 0; + } + } + *p = lend; return 0; bad: return -EIO; @@ -313,7 +381,8 @@ static int parse_reply_info_trace(void **p, void *end, info->dname = *p; *p += info->dname_len; - err = parse_reply_info_lease(p, end, &info->dlease, features); + err = parse_reply_info_lease(p, end, &info->dlease, features, + &info->altname_len, &info->altname); if (err < 0) goto out_bad; } @@ -339,9 +408,11 @@ out_bad: * parse readdir results */ static int parse_reply_info_readdir(void **p, void *end, - struct ceph_mds_reply_info_parsed *info, - u64 features) + struct ceph_mds_request *req, + u64 features) { + struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; + struct ceph_client *cl = req->r_mdsc->fsc->client; u32 num, i = 0; int err; @@ -364,25 +435,94 @@ static int parse_reply_info_readdir(void **p, void *end, BUG_ON(!info->dir_entries); if ((unsigned long)(info->dir_entries + num) > (unsigned long)info->dir_entries + info->dir_buf_size) { - pr_err("dir contents are larger than expected\n"); + pr_err_client(cl, "dir contents are larger than expected\n"); WARN_ON(1); goto bad; } info->dir_nr = num; while (num) { + struct inode *inode = d_inode(req->r_dentry); + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; + struct fscrypt_str tname = FSTR_INIT(NULL, 0); + struct fscrypt_str oname = FSTR_INIT(NULL, 0); + struct ceph_fname fname; + u32 altname_len, _name_len; + u8 *altname, *_name; + /* dentry */ - ceph_decode_32_safe(p, end, rde->name_len, bad); - ceph_decode_need(p, end, rde->name_len, bad); - rde->name = *p; - *p += rde->name_len; - dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); + ceph_decode_32_safe(p, end, _name_len, bad); + ceph_decode_need(p, end, _name_len, bad); + _name = *p; + *p += _name_len; + doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name); + + if (info->hash_order) + rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, + _name, _name_len); /* dentry lease */ - err = parse_reply_info_lease(p, end, &rde->lease, features); + err = parse_reply_info_lease(p, end, &rde->lease, features, + &altname_len, &altname); if (err) goto out_bad; + + /* + * Try to dencrypt the dentry names and update them + * in the ceph_mds_reply_dir_entry struct. + */ + fname.dir = inode; + fname.name = _name; + fname.name_len = _name_len; + fname.ctext = altname; + fname.ctext_len = altname_len; + /* + * The _name_len maybe larger than altname_len, such as + * when the human readable name length is in range of + * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), + * then the copy in ceph_fname_to_usr will corrupt the + * data if there has no encryption key. + * + * Just set the no_copy flag and then if there has no + * encryption key the oname.name will be assigned to + * _name always. + */ + fname.no_copy = true; + if (altname_len == 0) { + /* + * Set tname to _name, and this will be used + * to do the base64_decode in-place. It's + * safe because the decoded string should + * always be shorter, which is 3/4 of origin + * string. + */ + tname.name = _name; + + /* + * Set oname to _name too, and this will be + * used to do the dencryption in-place. + */ + oname.name = _name; + oname.len = _name_len; + } else { + /* + * This will do the decryption only in-place + * from altname cryptext directly. + */ + oname.name = altname; + oname.len = altname_len; + } + rde->is_nokey = false; + err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); + if (err) { + pr_err_client(cl, "unable to decode %.*s, got %d\n", + _name_len, _name, err); + goto out_bad; + } + rde->name = oname.name; + rde->name_len = oname.len; + /* inode */ err = parse_reply_info_in(p, end, &rde->inode, features); if (err < 0) @@ -401,7 +541,7 @@ done: bad: err = -EIO; out_bad: - pr_err("problem parsing dir contents %d\n", err); + pr_err_client(cl, "problem parsing dir contents %d\n", err); return err; } @@ -432,10 +572,11 @@ bad: static int ceph_parse_deleg_inos(void **p, void *end, struct ceph_mds_session *s) { + struct ceph_client *cl = s->s_mdsc->fsc->client; u32 sets; ceph_decode_32_safe(p, end, sets, bad); - dout("got %u sets of delegated inodes\n", sets); + doutc(cl, "got %u sets of delegated inodes\n", sets); while (sets--) { u64 start, len; @@ -444,8 +585,9 @@ static int ceph_parse_deleg_inos(void **p, void *end, /* Don't accept a delegation of system inodes */ if (start < CEPH_INO_SYSTEM_BASE) { - pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", - start, len); + pr_warn_ratelimited_client(cl, + "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", + start, len); continue; } while (len--) { @@ -453,10 +595,10 @@ static int ceph_parse_deleg_inos(void **p, void *end, DELEGATED_INO_AVAILABLE, GFP_KERNEL); if (!err) { - dout("added delegated inode 0x%llx\n", - start - 1); + doutc(cl, "added delegated inode 0x%llx\n", start - 1); } else if (err == -EBUSY) { - pr_warn("MDS delegated inode 0x%llx more than once.\n", + pr_warn_client(cl, + "MDS delegated inode 0x%llx more than once.\n", start - 1); } else { return err; @@ -581,15 +723,16 @@ bad: * parse extra results */ static int parse_reply_info_extra(void **p, void *end, - struct ceph_mds_reply_info_parsed *info, + struct ceph_mds_request *req, u64 features, struct ceph_mds_session *s) { + struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; u32 op = le32_to_cpu(info->head->op); if (op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) - return parse_reply_info_readdir(p, end, info, features); + return parse_reply_info_readdir(p, end, req, features); else if (op == CEPH_MDS_OP_CREATE) return parse_reply_info_create(p, end, info, features, s); else if (op == CEPH_MDS_OP_GETVXATTR) @@ -602,9 +745,10 @@ static int parse_reply_info_extra(void **p, void *end, * parse entire mds reply */ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, - struct ceph_mds_reply_info_parsed *info, - u64 features) + struct ceph_mds_request *req, u64 features) { + struct ceph_mds_reply_info_parsed *info = &req->r_reply_info; + struct ceph_client *cl = s->s_mdsc->fsc->client; void *p, *end; u32 len; int err; @@ -626,7 +770,7 @@ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, ceph_decode_32_safe(&p, end, len, bad); if (len > 0) { ceph_decode_need(&p, end, len, bad); - err = parse_reply_info_extra(&p, p+len, info, features, s); + err = parse_reply_info_extra(&p, p+len, req, features, s); if (err < 0) goto out_bad; } @@ -644,14 +788,28 @@ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, bad: err = -EIO; out_bad: - pr_err("mds parse_reply err %d\n", err); + pr_err_client(cl, "mds parse_reply err %d\n", err); + ceph_msg_dump(msg); return err; } static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) { + int i; + + kfree(info->diri.fscrypt_auth); + kfree(info->diri.fscrypt_file); + kfree(info->targeti.fscrypt_auth); + kfree(info->targeti.fscrypt_file); if (!info->dir_entries) return; + + for (i = 0; i < info->dir_nr; i++) { + struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; + + kfree(rde->inode.fscrypt_auth); + kfree(rde->inode.fscrypt_file); + } free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); } @@ -669,7 +827,7 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) * And the worst case is that for the none async openc request it will * successfully open the file if the CDentry hasn't been unlinked yet, * but later the previous delayed async unlink request will remove the - * CDenty. That means the just created file is possiblly deleted later + * CDentry. That means the just created file is possibly deleted later * by accident. * * We need to wait for the inflight async unlink requests to finish @@ -677,7 +835,8 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) */ int ceph_wait_on_conflict_unlink(struct dentry *dentry) { - struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); + struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); + struct ceph_client *cl = fsc->client; struct dentry *pdentry = dentry->d_parent; struct dentry *udentry, *found = NULL; struct ceph_dentry_info *di; @@ -702,14 +861,14 @@ int ceph_wait_on_conflict_unlink(struct dentry *dentry) goto next; if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) - pr_warn("%s dentry %p:%pd async unlink bit is not set\n", - __func__, dentry, dentry); + pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n", + dentry, dentry); if (!d_same_name(udentry, pdentry, &dname)) goto next; + found = dget_dlock(udentry); spin_unlock(&udentry->d_lock); - found = dget(udentry); break; next: spin_unlock(&udentry->d_lock); @@ -719,8 +878,8 @@ next: if (likely(!found)) return 0; - dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__, - dentry, dentry, found, found); + doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry, + found, found); err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, TASK_KILLABLE); @@ -804,8 +963,12 @@ static int __verify_registered_session(struct ceph_mds_client *mdsc, static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, int mds) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_session *s; + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) + return ERR_PTR(-EIO); + if (mds >= mdsc->mdsmap->possible_max_rank) return ERR_PTR(-EINVAL); @@ -816,21 +979,22 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, if (mds >= mdsc->max_sessions) { int newmax = 1 << get_count_order(mds + 1); struct ceph_mds_session **sa; + size_t ptr_size = sizeof(struct ceph_mds_session *); - dout("%s: realloc to %d\n", __func__, newmax); - sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); + doutc(cl, "realloc to %d\n", newmax); + sa = kcalloc(newmax, ptr_size, GFP_NOFS); if (!sa) goto fail_realloc; if (mdsc->sessions) { memcpy(sa, mdsc->sessions, - mdsc->max_sessions * sizeof(void *)); + mdsc->max_sessions * ptr_size); kfree(mdsc->sessions); } mdsc->sessions = sa; mdsc->max_sessions = newmax; } - dout("%s: mds%d\n", __func__, mds); + doutc(cl, "mds%d\n", mds); s->s_mdsc = mdsc; s->s_mds = mds; s->s_state = CEPH_MDS_SESSION_NEW; @@ -873,7 +1037,7 @@ fail_realloc: static void __unregister_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *s) { - dout("__unregister_session mds%d %p\n", s->s_mds, s); + doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s); BUG_ON(mdsc->sessions[s->s_mds] != s); mdsc->sessions[s->s_mds] = NULL; ceph_con_close(&s->s_con); @@ -926,7 +1090,7 @@ void ceph_mdsc_release_request(struct kref *kref) struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request, r_kref); - ceph_mdsc_release_dir_caps_no_check(req); + ceph_mdsc_release_dir_caps_async(req); destroy_reply_info(&req->r_reply_info); if (req->r_request) ceph_msg_put(req->r_request); @@ -941,6 +1105,7 @@ void ceph_mdsc_release_request(struct kref *kref) iput(req->r_parent); } iput(req->r_target_inode); + iput(req->r_new_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) @@ -959,8 +1124,12 @@ void ceph_mdsc_release_request(struct kref *kref) kfree(req->r_path1); kfree(req->r_path2); put_cred(req->r_cred); + if (req->r_mnt_idmap) + mnt_idmap_put(req->r_mnt_idmap); if (req->r_pagelist) ceph_pagelist_release(req->r_pagelist); + kfree(req->r_fscrypt_auth); + kfree(req->r_altname); put_request_session(req); ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); WARN_ON_ONCE(!list_empty(&req->r_wait)); @@ -996,6 +1165,7 @@ static void __register_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, struct inode *dir) { + struct ceph_client *cl = mdsc->fsc->client; int ret = 0; req->r_tid = ++mdsc->last_tid; @@ -1003,18 +1173,20 @@ static void __register_request(struct ceph_mds_client *mdsc, ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, req->r_num_caps); if (ret < 0) { - pr_err("__register_request %p " - "failed to reserve caps: %d\n", req, ret); + pr_err_client(cl, "%p failed to reserve caps: %d\n", + req, ret); /* set req->r_err to fail early from __do_request */ req->r_err = ret; return; } } - dout("__register_request %p tid %lld\n", req, req->r_tid); + doutc(cl, "%p tid %lld\n", req, req->r_tid); ceph_mdsc_get_request(req); insert_request(&mdsc->request_tree, req); req->r_cred = get_current_cred(); + if (!req->r_mnt_idmap) + req->r_mnt_idmap = &nop_mnt_idmap; if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) mdsc->oldest_tid = req->r_tid; @@ -1033,7 +1205,7 @@ static void __register_request(struct ceph_mds_client *mdsc, static void __unregister_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { - dout("__unregister_request %p tid %lld\n", req, req->r_tid); + doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid); /* Never leave an unregistered request on an unsafe list! */ list_del_init(&req->r_unsafe_item); @@ -1119,6 +1291,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, int mds = -1; u32 hash = req->r_direct_hash; bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); + struct ceph_client *cl = mdsc->fsc->client; if (random) *random = false; @@ -1130,8 +1303,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, if (req->r_resend_mds >= 0 && (__have_session(mdsc, req->r_resend_mds) || ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { - dout("%s using resend_mds mds%d\n", __func__, - req->r_resend_mds); + doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); return req->r_resend_mds; } @@ -1148,7 +1320,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc, rcu_read_lock(); inode = get_nonsnap_parent(req->r_dentry); rcu_read_unlock(); - dout("%s using snapdir's parent %p\n", __func__, inode); + doutc(cl, "using snapdir's parent %p %llx.%llx\n", + inode, ceph_vinop(inode)); } } else if (req->r_dentry) { /* ignore race with rename; old or new d_parent is okay */ @@ -1168,7 +1341,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc, /* direct snapped/virtual snapdir requests * based on parent dir inode */ inode = get_nonsnap_parent(parent); - dout("%s using nonsnap parent %p\n", __func__, inode); + doutc(cl, "using nonsnap parent %p %llx.%llx\n", + inode, ceph_vinop(inode)); } else { /* dentry target */ inode = d_inode(req->r_dentry); @@ -1184,10 +1358,11 @@ static int __choose_mds(struct ceph_mds_client *mdsc, rcu_read_unlock(); } - dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, - hash, mode); if (!inode) goto random; + + doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode, + ceph_vinop(inode), (int)is_hash, hash, mode); ci = ceph_inode(inode); if (is_hash && S_ISDIR(inode->i_mode)) { @@ -1203,9 +1378,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc, get_random_bytes(&r, 1); r %= frag.ndist; mds = frag.dist[r]; - dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", - __func__, inode, ceph_vinop(inode), - frag.frag, mds, (int)r, frag.ndist); + doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n", + inode, ceph_vinop(inode), frag.frag, + mds, (int)r, frag.ndist); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= CEPH_MDS_STATE_ACTIVE && !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) @@ -1218,9 +1393,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc, if (frag.mds >= 0) { /* choose auth mds */ mds = frag.mds; - dout("%s %p %llx.%llx frag %u mds%d (auth)\n", - __func__, inode, ceph_vinop(inode), - frag.frag, mds); + doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n", + inode, ceph_vinop(inode), frag.frag, mds); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= CEPH_MDS_STATE_ACTIVE) { if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, @@ -1244,9 +1418,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc, goto random; } mds = cap->session->s_mds; - dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, - inode, ceph_vinop(inode), mds, - cap == ci->i_auth_cap ? "auth " : "", cap); + doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode, + ceph_vinop(inode), mds, + cap == ci->i_auth_cap ? "auth " : "", cap); spin_unlock(&ci->i_ceph_lock); out: iput(inode); @@ -1257,7 +1431,7 @@ random: *random = true; mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); - dout("%s chose random mds%d\n", __func__, mds); + doutc(cl, "chose random mds%d\n", mds); return mds; } @@ -1361,7 +1535,8 @@ static int encode_metric_spec(void **p, void *end) * session message, specialization for CEPH_SESSION_REQUEST_OPEN * to include additional client metadata fields. */ -static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) +static struct ceph_msg * +create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq) { struct ceph_msg *msg; struct ceph_mds_session_head *h; @@ -1370,6 +1545,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 int metadata_key_count = 0; struct ceph_options *opt = mdsc->fsc->client->options; struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; + struct ceph_client *cl = mdsc->fsc->client; size_t size, count; void *p, *end; int ret; @@ -1404,27 +1580,30 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 size = METRIC_BYTES(count); extra_bytes += 2 + 4 + 4 + size; + /* flags, mds auth caps and oldest_client_tid */ + extra_bytes += 4 + 4 + 8; + /* Allocate the message */ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, GFP_NOFS, false); if (!msg) { - pr_err("ENOMEM creating session open msg\n"); + pr_err_client(cl, "ENOMEM creating session open msg\n"); return ERR_PTR(-ENOMEM); } p = msg->front.iov_base; end = p + msg->front.iov_len; h = p; - h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); + h->op = cpu_to_le32(op); h->seq = cpu_to_le64(seq); /* * Serialize client metadata into waiting buffer space, using * the format that userspace expects for map<string, string> * - * ClientSession messages with metadata are v4 + * ClientSession messages with metadata are v7 */ - msg->hdr.version = cpu_to_le16(4); + msg->hdr.version = cpu_to_le16(7); msg->hdr.compat_version = cpu_to_le16(1); /* The write pointer, following the session_head structure */ @@ -1448,18 +1627,27 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 ret = encode_supported_features(&p, end); if (ret) { - pr_err("encode_supported_features failed!\n"); + pr_err_client(cl, "encode_supported_features failed!\n"); ceph_msg_put(msg); return ERR_PTR(ret); } ret = encode_metric_spec(&p, end); if (ret) { - pr_err("encode_metric_spec failed!\n"); + pr_err_client(cl, "encode_metric_spec failed!\n"); ceph_msg_put(msg); return ERR_PTR(ret); } + /* version == 5, flags */ + ceph_encode_32(&p, 0); + + /* version == 6, mds auth caps */ + ceph_encode_32(&p, 0); + + /* version == 7, oldest_client_tid */ + ceph_encode_64(&p, mdsc->oldest_tid); + msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); @@ -1478,15 +1666,19 @@ static int __open_session(struct ceph_mds_client *mdsc, int mstate; int mds = session->s_mds; + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) + return -EIO; + /* wait for mds to go active? */ mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); - dout("open_session to mds%d (%s)\n", mds, - ceph_mds_state_name(mstate)); + doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds, + ceph_mds_state_name(mstate)); session->s_state = CEPH_MDS_SESSION_OPENING; session->s_renew_requested = jiffies; /* send connect message */ - msg = create_session_open_msg(mdsc, session->s_seq); + msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN, + session->s_seq); if (IS_ERR(msg)) return PTR_ERR(msg); ceph_con_send(&session->s_con, msg); @@ -1524,8 +1716,9 @@ struct ceph_mds_session * ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) { struct ceph_mds_session *session; + struct ceph_client *cl = mdsc->fsc->client; - dout("open_export_target_session to mds%d\n", target); + doutc(cl, "to mds%d\n", target); mutex_lock(&mdsc->mutex); session = __open_export_target_session(mdsc, target); @@ -1540,13 +1733,14 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_info *mi; struct ceph_mds_session *ts; int i, mds = session->s_mds; + struct ceph_client *cl = mdsc->fsc->client; if (mds >= mdsc->mdsmap->possible_max_rank) return; mi = &mdsc->mdsmap->m_info[mds]; - dout("open_export_target_sessions for mds%d (%d targets)\n", - session->s_mds, mi->num_export_targets); + doutc(cl, "for mds%d (%d targets)\n", session->s_mds, + mi->num_export_targets); for (i = 0; i < mi->num_export_targets; i++) { ts = __open_export_target_session(mdsc, mi->export_targets[i]); @@ -1554,14 +1748,6 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc, } } -void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - mutex_lock(&mdsc->mutex); - __open_export_target_sessions(mdsc, session); - mutex_unlock(&mdsc->mutex); -} - /* * session caps */ @@ -1569,11 +1755,13 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, static void detach_cap_releases(struct ceph_mds_session *session, struct list_head *target) { + struct ceph_client *cl = session->s_mdsc->fsc->client; + lockdep_assert_held(&session->s_cap_lock); list_splice_init(&session->s_cap_releases, target); session->s_num_cap_releases = 0; - dout("dispose_cap_releases mds%d\n", session->s_mds); + doutc(cl, "mds%d\n", session->s_mds); } static void dispose_cap_releases(struct ceph_mds_client *mdsc, @@ -1591,16 +1779,17 @@ static void dispose_cap_releases(struct ceph_mds_client *mdsc, static void cleanup_session_requests(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; struct rb_node *p; - dout("cleanup_session_requests mds%d\n", session->s_mds); + doutc(cl, "mds%d\n", session->s_mds); mutex_lock(&mdsc->mutex); while (!list_empty(&session->s_unsafe)) { req = list_first_entry(&session->s_unsafe, struct ceph_mds_request, r_unsafe_item); - pr_warn_ratelimited(" dropping unsafe request %llu\n", - req->r_tid); + pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n", + req->r_tid); if (req->r_target_inode) mapping_set_error(req->r_target_inode->i_mapping, -EIO); if (req->r_unsafe_dir) @@ -1626,19 +1815,22 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc, * Caller must hold session s_mutex. */ int ceph_iterate_session_caps(struct ceph_mds_session *session, - int (*cb)(struct inode *, struct ceph_cap *, - void *), void *arg) + int (*cb)(struct inode *, int mds, void *), + void *arg) { + struct ceph_client *cl = session->s_mdsc->fsc->client; struct list_head *p; struct ceph_cap *cap; struct inode *inode, *last_inode = NULL; struct ceph_cap *old_cap = NULL; int ret; - dout("iterate_session_caps %p mds%d\n", session, session->s_mds); + doutc(cl, "%p mds%d\n", session, session->s_mds); spin_lock(&session->s_cap_lock); p = session->s_caps.next; while (p != &session->s_caps) { + int mds; + cap = list_entry(p, struct ceph_cap, session_caps); inode = igrab(&cap->ci->netfs.inode); if (!inode) { @@ -1646,6 +1838,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, continue; } session->s_cap_iterator = cap; + mds = cap->mds; spin_unlock(&session->s_cap_lock); if (last_inode) { @@ -1657,14 +1850,13 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, old_cap = NULL; } - ret = cb(inode, cap, arg); + ret = cb(inode, mds, arg); last_inode = inode; spin_lock(&session->s_cap_lock); p = p->next; if (!cap->ci) { - dout("iterate_session_caps finishing cap %p removal\n", - cap); + doutc(cl, "finishing cap %p removal\n", cap); BUG_ON(cap->session != session); cap->session = NULL; list_del_init(&cap->session_caps); @@ -1690,20 +1882,26 @@ out: return ret; } -static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, - void *arg) +static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_client *cl = ceph_inode_to_client(inode); bool invalidate = false; - int iputs; + struct ceph_cap *cap; + int iputs = 0; - dout("removing cap %p, ci is %p, inode is %p\n", - cap, ci, &ci->netfs.inode); spin_lock(&ci->i_ceph_lock); - iputs = ceph_purge_inode_cap(inode, cap, &invalidate); + cap = __get_cap_for_mds(ci, mds); + if (cap) { + doutc(cl, " removing cap %p, ci is %p, inode is %p\n", + cap, ci, &ci->netfs.inode); + + iputs = ceph_purge_inode_cap(inode, cap, &invalidate); + } spin_unlock(&ci->i_ceph_lock); - wake_up_all(&ci->i_cap_wq); + if (cap) + wake_up_all(&ci->i_cap_wq); if (invalidate) ceph_queue_invalidate(inode); while (iputs--) @@ -1720,7 +1918,7 @@ static void remove_session_caps(struct ceph_mds_session *session) struct super_block *sb = fsc->sb; LIST_HEAD(dispose); - dout("remove_session_caps on %p\n", session); + doutc(fsc->client, "on %p\n", session); ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); wake_up_all(&fsc->mdsc->cap_flushing_wq); @@ -1774,8 +1972,7 @@ enum { * * caller must hold s_mutex. */ -static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, - void *arg) +static int wake_up_session_cb(struct inode *inode, int mds, void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); unsigned long ev = (unsigned long)arg; @@ -1786,12 +1983,14 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, ci->i_requested_max_size = 0; spin_unlock(&ci->i_ceph_lock); } else if (ev == RENEWCAPS) { - if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) { - /* mds did not re-issue stale cap */ - spin_lock(&ci->i_ceph_lock); + struct ceph_cap *cap; + + spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + /* mds did not re-issue stale cap */ + if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) cap->issued = cap->implemented = CEPH_CAP_PIN; - spin_unlock(&ci->i_ceph_lock); - } + spin_unlock(&ci->i_ceph_lock); } else if (ev == FORCE_RO) { } wake_up_all(&ci->i_cap_wq); @@ -1800,7 +1999,9 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, static void wake_up_session_caps(struct ceph_mds_session *session, int ev) { - dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); + struct ceph_client *cl = session->s_mdsc->fsc->client; + + doutc(cl, "session %p mds%d\n", session, session->s_mds); ceph_iterate_session_caps(session, wake_up_session_cb, (void *)(unsigned long)ev); } @@ -1814,29 +2015,30 @@ static void wake_up_session_caps(struct ceph_mds_session *session, int ev) static int send_renew_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *msg; int state; if (time_after_eq(jiffies, session->s_cap_ttl) && time_after_eq(session->s_cap_ttl, session->s_renew_requested)) - pr_info("mds%d caps stale\n", session->s_mds); + pr_info_client(cl, "mds%d caps stale\n", session->s_mds); session->s_renew_requested = jiffies; /* do not try to renew caps until a recovering mds has reconnected * with its clients. */ state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); if (state < CEPH_MDS_STATE_RECONNECT) { - dout("send_renew_caps ignoring mds%d (%s)\n", - session->s_mds, ceph_mds_state_name(state)); + doutc(cl, "ignoring mds%d (%s)\n", session->s_mds, + ceph_mds_state_name(state)); return 0; } - dout("send_renew_caps to mds%d (%s)\n", session->s_mds, - ceph_mds_state_name(state)); - msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, + doutc(cl, "to mds%d (%s)\n", session->s_mds, + ceph_mds_state_name(state)); + msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS, ++session->s_renew_seq); - if (!msg) - return -ENOMEM; + if (IS_ERR(msg)) + return PTR_ERR(msg); ceph_con_send(&session->s_con, msg); return 0; } @@ -1844,10 +2046,11 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, static int send_flushmsg_ack(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, u64 seq) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *msg; - dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", - session->s_mds, ceph_session_state_name(session->s_state), seq); + doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds, + ceph_session_state_name(session->s_state), seq); msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); if (!msg) return -ENOMEM; @@ -1864,6 +2067,7 @@ static int send_flushmsg_ack(struct ceph_mds_client *mdsc, static void renewed_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int is_renew) { + struct ceph_client *cl = mdsc->fsc->client; int was_stale; int wake = 0; @@ -1875,15 +2079,17 @@ static void renewed_caps(struct ceph_mds_client *mdsc, if (was_stale) { if (time_before(jiffies, session->s_cap_ttl)) { - pr_info("mds%d caps renewed\n", session->s_mds); + pr_info_client(cl, "mds%d caps renewed\n", + session->s_mds); wake = 1; } else { - pr_info("mds%d caps still stale\n", session->s_mds); + pr_info_client(cl, "mds%d caps still stale\n", + session->s_mds); } } - dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", - session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", - time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); + doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds, + session->s_cap_ttl, was_stale ? "stale" : "fresh", + time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); spin_unlock(&session->s_cap_lock); if (wake) @@ -1895,11 +2101,11 @@ static void renewed_caps(struct ceph_mds_client *mdsc, */ static int request_close_session(struct ceph_mds_session *session) { + struct ceph_client *cl = session->s_mdsc->fsc->client; struct ceph_msg *msg; - dout("request_close_session mds%d state %s seq %lld\n", - session->s_mds, ceph_session_state_name(session->s_state), - session->s_seq); + doutc(cl, "mds%d state %s seq %lld\n", session->s_mds, + ceph_session_state_name(session->s_state), session->s_seq); msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); if (!msg) @@ -1929,7 +2135,7 @@ static bool drop_negative_children(struct dentry *dentry) goto out; spin_lock(&dentry->d_lock); - list_for_each_entry(child, &dentry->d_subdirs, d_child) { + hlist_for_each_entry(child, &dentry->d_children, d_sib) { if (d_really_is_positive(child)) { all_negative = false; break; @@ -1953,24 +2159,33 @@ out: * Yes, this is a bit sloppy. Our only real goal here is to respond to * memory pressure from the MDS, though, so it needn't be perfect. */ -static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) +static int trim_caps_cb(struct inode *inode, int mds, void *arg) { + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); + struct ceph_client *cl = mdsc->fsc->client; int *remaining = arg; struct ceph_inode_info *ci = ceph_inode(inode); int used, wanted, oissued, mine; + struct ceph_cap *cap; if (*remaining <= 0) return -1; spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + if (!cap) { + spin_unlock(&ci->i_ceph_lock); + return 0; + } mine = cap->issued | cap->implemented; used = __ceph_caps_used(ci); wanted = __ceph_caps_file_wanted(ci); oissued = __ceph_caps_issued_other(ci, cap); - dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", - inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), - ceph_cap_string(used), ceph_cap_string(wanted)); + doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n", + inode, ceph_vinop(inode), cap, ceph_cap_string(mine), + ceph_cap_string(oissued), ceph_cap_string(used), + ceph_cap_string(wanted)); if (cap == ci->i_auth_cap) { if (ci->i_dirty_caps || ci->i_flushing_caps || !list_empty(&ci->i_cap_snaps)) @@ -1996,7 +2211,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) if (oissued) { /* we aren't the only cap.. just remove us */ - ceph_remove_cap(cap, true); + ceph_remove_cap(mdsc, cap, true); (*remaining)--; } else { struct dentry *dentry; @@ -2007,11 +2222,11 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) int count; dput(dentry); d_prune_aliases(inode); - count = atomic_read(&inode->i_count); + count = icount_read(inode); if (count == 1) (*remaining)--; - dout("trim_caps_cb %p cap %p pruned, count now %d\n", - inode, cap, count); + doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n", + inode, ceph_vinop(inode), cap, count); } else { dput(dentry); } @@ -2030,26 +2245,28 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int max_caps) { + struct ceph_client *cl = mdsc->fsc->client; int trim_caps = session->s_nr_caps - max_caps; - dout("trim_caps mds%d start: %d / %d, trim %d\n", - session->s_mds, session->s_nr_caps, max_caps, trim_caps); + doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds, + session->s_nr_caps, max_caps, trim_caps); if (trim_caps > 0) { int remaining = trim_caps; ceph_iterate_session_caps(session, trim_caps_cb, &remaining); - dout("trim_caps mds%d done: %d / %d, trimmed %d\n", - session->s_mds, session->s_nr_caps, max_caps, - trim_caps - remaining); + doutc(cl, "mds%d done: %d / %d, trimmed %d\n", + session->s_mds, session->s_nr_caps, max_caps, + trim_caps - remaining); } - ceph_flush_cap_releases(mdsc, session); + ceph_flush_session_cap_releases(mdsc, session); return 0; } static int check_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_tid) { + struct ceph_client *cl = mdsc->fsc->client; int ret = 1; spin_lock(&mdsc->cap_dirty_lock); @@ -2058,8 +2275,8 @@ static int check_caps_flush(struct ceph_mds_client *mdsc, list_first_entry(&mdsc->cap_flush_list, struct ceph_cap_flush, g_list); if (cf->tid <= want_flush_tid) { - dout("check_caps_flush still flushing tid " - "%llu <= %llu\n", cf->tid, want_flush_tid); + doutc(cl, "still flushing tid %llu <= %llu\n", + cf->tid, want_flush_tid); ret = 0; } } @@ -2075,12 +2292,14 @@ static int check_caps_flush(struct ceph_mds_client *mdsc, static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_tid) { - dout("check_caps_flush want %llu\n", want_flush_tid); + struct ceph_client *cl = mdsc->fsc->client; + + doutc(cl, "want %llu\n", want_flush_tid); wait_event(mdsc->cap_flushing_wq, check_caps_flush(mdsc, want_flush_tid)); - dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); + doutc(cl, "ok, flushed thru %llu\n", want_flush_tid); } /* @@ -2089,6 +2308,7 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *msg = NULL; struct ceph_mds_cap_release *head; struct ceph_mds_cap_item *item; @@ -2135,7 +2355,7 @@ again: item->ino = cpu_to_le64(cap->cap_ino); item->cap_id = cpu_to_le64(cap->cap_id); item->migrate_seq = cpu_to_le32(cap->mseq); - item->seq = cpu_to_le32(cap->issue_seq); + item->issue_seq = cpu_to_le32(cap->issue_seq); msg->front.iov_len += sizeof(*item); ceph_put_cap(mdsc, cap); @@ -2147,7 +2367,7 @@ again: msg->front.iov_len += sizeof(*cap_barrier); msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + doutc(cl, "mds%d %p\n", session->s_mds, msg); ceph_con_send(&session->s_con, msg); msg = NULL; } @@ -2167,13 +2387,13 @@ again: msg->front.iov_len += sizeof(*cap_barrier); msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + doutc(cl, "mds%d %p\n", session->s_mds, msg); ceph_con_send(&session->s_con, msg); } return; out_err: - pr_err("send_cap_releases mds%d, failed to allocate message\n", - session->s_mds); + pr_err_client(cl, "mds%d, failed to allocate message\n", + session->s_mds); spin_lock(&session->s_cap_lock); list_splice(&tmp_list, &session->s_cap_releases); session->s_num_cap_releases += num_cap_releases; @@ -2193,19 +2413,20 @@ static void ceph_cap_release_work(struct work_struct *work) ceph_put_mds_session(session); } -void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, +void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { + struct ceph_client *cl = mdsc->fsc->client; if (mdsc->stopping) return; ceph_get_mds_session(session); if (queue_work(mdsc->fsc->cap_wq, &session->s_cap_release_work)) { - dout("cap release work queued\n"); + doutc(cl, "cap release work queued\n"); } else { ceph_put_mds_session(session); - dout("failed to queue cap release work\n"); + doutc(cl, "failed to queue cap release work\n"); } } @@ -2219,7 +2440,7 @@ void __ceph_queue_cap_release(struct ceph_mds_session *session, session->s_num_cap_releases++; if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) - ceph_flush_cap_releases(session->s_mdsc, session); + ceph_flush_session_cap_releases(session->s_mdsc, session); } static void ceph_cap_reclaim_work(struct work_struct *work) @@ -2233,13 +2454,14 @@ static void ceph_cap_reclaim_work(struct work_struct *work) void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) { + struct ceph_client *cl = mdsc->fsc->client; if (mdsc->stopping) return; if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { - dout("caps reclaim work queued\n"); + doutc(cl, "caps reclaim work queued\n"); } else { - dout("failed to queue caps release work\n"); + doutc(cl, "failed to queue caps release work\n"); } } @@ -2255,6 +2477,50 @@ void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) } } +void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc) +{ + struct ceph_client *cl = mdsc->fsc->client; + if (mdsc->stopping) + return; + + if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) { + doutc(cl, "caps unlink work queued\n"); + } else { + doutc(cl, "failed to queue caps unlink work\n"); + } +} + +static void ceph_cap_unlink_work(struct work_struct *work) +{ + struct ceph_mds_client *mdsc = + container_of(work, struct ceph_mds_client, cap_unlink_work); + struct ceph_client *cl = mdsc->fsc->client; + + doutc(cl, "begin\n"); + spin_lock(&mdsc->cap_delay_lock); + while (!list_empty(&mdsc->cap_unlink_delay_list)) { + struct ceph_inode_info *ci; + struct inode *inode; + + ci = list_first_entry(&mdsc->cap_unlink_delay_list, + struct ceph_inode_info, + i_cap_delay_list); + list_del_init(&ci->i_cap_delay_list); + + inode = igrab(&ci->netfs.inode); + if (inode) { + spin_unlock(&mdsc->cap_delay_lock); + doutc(cl, "on %p %llx.%llx\n", inode, + ceph_vinop(inode)); + ceph_check_caps(ci, CHECK_CAPS_FLUSH); + iput(inode); + spin_lock(&mdsc->cap_delay_lock); + } + } + spin_unlock(&mdsc->cap_delay_lock); + doutc(cl, "done\n"); +} + /* * requests */ @@ -2267,6 +2533,7 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; size_t size = sizeof(struct ceph_mds_reply_dir_entry); unsigned int num_entries; + u64 bytes_count; int order; spin_lock(&ci->i_ceph_lock); @@ -2275,7 +2542,11 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, num_entries = max(num_entries, 1U); num_entries = min(num_entries, opt->max_readdir); - order = get_order(size * num_entries); + bytes_count = (u64)size * num_entries; + if (unlikely(bytes_count > ULONG_MAX)) + bytes_count = ULONG_MAX; + + order = get_order((unsigned long)bytes_count); while (order >= 0) { rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | __GFP_NOWARN | @@ -2285,7 +2556,7 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, break; order--; } - if (!rinfo->dir_entries) + if (!rinfo->dir_entries || unlikely(order < 0)) return -ENOMEM; num_entries = (PAGE_SIZE << order) / size; @@ -2351,20 +2622,94 @@ static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) return mdsc->oldest_tid; } -/* - * Build a dentry's path. Allocate on heap; caller must kfree. Based - * on build_path_from_dentry in fs/cifs/dir.c. +#if IS_ENABLED(CONFIG_FS_ENCRYPTION) +static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) +{ + struct inode *dir = req->r_parent; + struct dentry *dentry = req->r_dentry; + const struct qstr *name = req->r_dname; + u8 *cryptbuf = NULL; + u32 len = 0; + int ret = 0; + + /* only encode if we have parent and dentry */ + if (!dir || !dentry) + goto success; + + /* No-op unless this is encrypted */ + if (!IS_ENCRYPTED(dir)) + goto success; + + ret = ceph_fscrypt_prepare_readdir(dir); + if (ret < 0) + return ERR_PTR(ret); + + /* No key? Just ignore it. */ + if (!fscrypt_has_encryption_key(dir)) + goto success; + + if (!name) + name = &dentry->d_name; + + if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) { + WARN_ON_ONCE(1); + return ERR_PTR(-ENAMETOOLONG); + } + + /* No need to append altname if name is short enough */ + if (len <= CEPH_NOHASH_NAME_MAX) { + len = 0; + goto success; + } + + cryptbuf = kmalloc(len, GFP_KERNEL); + if (!cryptbuf) + return ERR_PTR(-ENOMEM); + + ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len); + if (ret) { + kfree(cryptbuf); + return ERR_PTR(ret); + } +success: + *plen = len; + return cryptbuf; +} +#else +static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen) +{ + *plen = 0; + return NULL; +} +#endif + +/** + * ceph_mdsc_build_path - build a path string to a given dentry + * @mdsc: mds client + * @dentry: dentry to which path should be built + * @path_info: output path, length, base ino+snap, and freepath ownership flag + * @for_wire: is this path going to be sent to the MDS? + * + * Build a string that represents the path to the dentry. This is mostly called + * for two different purposes: + * + * 1) we need to build a path string to send to the MDS (for_wire == true) + * 2) we need a path string for local presentation (e.g. debugfs) + * (for_wire == false) * - * If @stop_on_nosnap, generate path relative to the first non-snapped - * inode. + * The path is built in reverse, starting with the dentry. Walk back up toward + * the root, building the path until the first non-snapped inode is reached + * (for_wire) or the root inode is reached (!for_wire). * * Encode hidden .snap dirs as a double /, i.e. * foo/.snap/bar -> foo//bar */ -char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, - int stop_on_nosnap) +char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, + struct ceph_path_info *path_info, int for_wire) { - struct dentry *temp; + struct ceph_client *cl = mdsc->fsc->client; + struct dentry *cur; + struct inode *inode; char *path; int pos; unsigned seq; @@ -2381,34 +2726,71 @@ retry: path[pos] = '\0'; seq = read_seqbegin(&rename_lock); - rcu_read_lock(); - temp = dentry; + cur = dget(dentry); for (;;) { - struct inode *inode; + struct dentry *parent; - spin_lock(&temp->d_lock); - inode = d_inode(temp); + spin_lock(&cur->d_lock); + inode = d_inode(cur); if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { - dout("build_path path+%d: %p SNAPDIR\n", - pos, temp); - } else if (stop_on_nosnap && inode && dentry != temp && + doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur); + spin_unlock(&cur->d_lock); + parent = dget_parent(cur); + } else if (for_wire && inode && dentry != cur && ceph_snap(inode) == CEPH_NOSNAP) { - spin_unlock(&temp->d_lock); + spin_unlock(&cur->d_lock); pos++; /* get rid of any prepended '/' */ break; + } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) { + pos -= cur->d_name.len; + if (pos < 0) { + spin_unlock(&cur->d_lock); + break; + } + memcpy(path + pos, cur->d_name.name, cur->d_name.len); + spin_unlock(&cur->d_lock); + parent = dget_parent(cur); } else { - pos -= temp->d_name.len; + int len, ret; + char buf[NAME_MAX]; + + /* + * Proactively copy name into buf, in case we need to + * present it as-is. + */ + memcpy(buf, cur->d_name.name, cur->d_name.len); + len = cur->d_name.len; + spin_unlock(&cur->d_lock); + parent = dget_parent(cur); + + ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); + if (ret < 0) { + dput(parent); + dput(cur); + return ERR_PTR(ret); + } + + if (fscrypt_has_encryption_key(d_inode(parent))) { + len = ceph_encode_encrypted_dname(d_inode(parent), + buf, len); + if (len < 0) { + dput(parent); + dput(cur); + return ERR_PTR(len); + } + } + pos -= len; if (pos < 0) { - spin_unlock(&temp->d_lock); + dput(parent); break; } - memcpy(path + pos, temp->d_name.name, temp->d_name.len); + memcpy(path + pos, buf, len); } - spin_unlock(&temp->d_lock); - temp = READ_ONCE(temp->d_parent); + dput(cur); + cur = parent; /* Are we at the root? */ - if (IS_ROOT(temp)) + if (IS_ROOT(cur)) break; /* Are we out of buffer? */ @@ -2417,73 +2799,93 @@ retry: path[pos] = '/'; } - base = ceph_ino(d_inode(temp)); - rcu_read_unlock(); + inode = d_inode(cur); + base = inode ? ceph_ino(inode) : 0; + dput(cur); if (read_seqretry(&rename_lock, seq)) goto retry; if (pos < 0) { /* - * A rename didn't occur, but somehow we didn't end up where - * we thought we would. Throw a warning and try again. + * The path is longer than PATH_MAX and this function + * cannot ever succeed. Creating paths that long is + * possible with Ceph, but Linux cannot use them. */ - pr_warn("build_path did not end path lookup where " - "expected, pos is %d\n", pos); - goto retry; + return ERR_PTR(-ENAMETOOLONG); } - *pbase = base; - *plen = PATH_MAX - 1 - pos; - dout("build_path on %p %d built %llx '%.*s'\n", - dentry, d_count(dentry), base, *plen, path + pos); + /* Initialize the output structure */ + memset(path_info, 0, sizeof(*path_info)); + + path_info->vino.ino = base; + path_info->pathlen = PATH_MAX - 1 - pos; + path_info->path = path + pos; + path_info->freepath = true; + + /* Set snap from dentry if available */ + if (d_inode(dentry)) + path_info->vino.snap = ceph_snap(d_inode(dentry)); + else + path_info->vino.snap = CEPH_NOSNAP; + + doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry), + base, PATH_MAX - 1 - pos, path + pos); return path + pos; } -static int build_dentry_path(struct dentry *dentry, struct inode *dir, - const char **ppath, int *ppathlen, u64 *pino, - bool *pfreepath, bool parent_locked) +static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry, + struct inode *dir, struct ceph_path_info *path_info, + bool parent_locked) { char *path; rcu_read_lock(); if (!dir) dir = d_inode_rcu(dentry->d_parent); - if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { - *pino = ceph_ino(dir); + if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP && + !IS_ENCRYPTED(dir)) { + path_info->vino.ino = ceph_ino(dir); + path_info->vino.snap = ceph_snap(dir); rcu_read_unlock(); - *ppath = dentry->d_name.name; - *ppathlen = dentry->d_name.len; + path_info->path = dentry->d_name.name; + path_info->pathlen = dentry->d_name.len; + path_info->freepath = false; return 0; } rcu_read_unlock(); - path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); + path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); if (IS_ERR(path)) return PTR_ERR(path); - *ppath = path; - *pfreepath = true; + /* + * ceph_mdsc_build_path already fills path_info, including snap handling. + */ return 0; } -static int build_inode_path(struct inode *inode, - const char **ppath, int *ppathlen, u64 *pino, - bool *pfreepath) +static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info) { + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct dentry *dentry; char *path; if (ceph_snap(inode) == CEPH_NOSNAP) { - *pino = ceph_ino(inode); - *ppathlen = 0; + path_info->vino.ino = ceph_ino(inode); + path_info->vino.snap = ceph_snap(inode); + path_info->pathlen = 0; + path_info->freepath = false; return 0; } dentry = d_find_alias(inode); - path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); + path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1); dput(dentry); if (IS_ERR(path)) return PTR_ERR(path); - *ppath = path; - *pfreepath = true; + /* + * ceph_mdsc_build_path already fills path_info, including snap from dentry. + * Override with inode's snap since that's what this function is for. + */ + path_info->vino.snap = ceph_snap(inode); return 0; } @@ -2491,34 +2893,41 @@ static int build_inode_path(struct inode *inode, * request arguments may be specified via an inode *, a dentry *, or * an explicit ino+path. */ -static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, - struct inode *rdiri, const char *rpath, - u64 rino, const char **ppath, int *pathlen, - u64 *ino, bool *freepath, bool parent_locked) +static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode, + struct dentry *rdentry, struct inode *rdiri, + const char *rpath, u64 rino, + struct ceph_path_info *path_info, + bool parent_locked) { + struct ceph_client *cl = mdsc->fsc->client; int r = 0; + /* Initialize the output structure */ + memset(path_info, 0, sizeof(*path_info)); + if (rinode) { - r = build_inode_path(rinode, ppath, pathlen, ino, freepath); - dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), - ceph_snap(rinode)); + r = build_inode_path(rinode, path_info); + doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode), + ceph_snap(rinode)); } else if (rdentry) { - r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, - freepath, parent_locked); - dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, - *ppath); + r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked); + doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino, + path_info->pathlen, path_info->path); } else if (rpath || rino) { - *ino = rino; - *ppath = rpath; - *pathlen = rpath ? strlen(rpath) : 0; - dout(" path %.*s\n", *pathlen, rpath); + path_info->vino.ino = rino; + path_info->vino.snap = CEPH_NOSNAP; + path_info->path = rpath; + path_info->pathlen = rpath ? strlen(rpath) : 0; + path_info->freepath = false; + + doutc(cl, " path %.*s\n", path_info->pathlen, rpath); } return r; } -static void encode_timestamp_and_gids(void **p, - const struct ceph_mds_request *req) +static void encode_mclientrequest_tail(void **p, + const struct ceph_mds_request *req) { struct ceph_timespec ts; int i; @@ -2526,11 +2935,54 @@ static void encode_timestamp_and_gids(void **p, ceph_encode_timespec64(&ts, &req->r_stamp); ceph_encode_copy(p, &ts, sizeof(ts)); - /* gid_list */ + /* v4: gid_list */ ceph_encode_32(p, req->r_cred->group_info->ngroups); for (i = 0; i < req->r_cred->group_info->ngroups; i++) ceph_encode_64(p, from_kgid(&init_user_ns, req->r_cred->group_info->gid[i])); + + /* v5: altname */ + ceph_encode_32(p, req->r_altname_len); + ceph_encode_copy(p, req->r_altname, req->r_altname_len); + + /* v6: fscrypt_auth and fscrypt_file */ + if (req->r_fscrypt_auth) { + u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); + + ceph_encode_32(p, authlen); + ceph_encode_copy(p, req->r_fscrypt_auth, authlen); + } else { + ceph_encode_32(p, 0); + } + if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) { + ceph_encode_32(p, sizeof(__le64)); + ceph_encode_64(p, req->r_fscrypt_file); + } else { + ceph_encode_32(p, 0); + } +} + +static inline u16 mds_supported_head_version(struct ceph_mds_session *session) +{ + if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features)) + return 1; + + if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) + return 2; + + return CEPH_MDS_REQUEST_HEAD_VERSION; +} + +static struct ceph_mds_request_head_legacy * +find_legacy_request_head(void *p, u64 features) +{ + bool legacy = !(features & CEPH_FEATURE_FS_BTIME); + struct ceph_mds_request_head *head; + + if (legacy) + return (struct ceph_mds_request_head_legacy *)p; + head = (struct ceph_mds_request_head *)p; + return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid; } /* @@ -2542,53 +2994,125 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, { int mds = session->s_mds; struct ceph_mds_client *mdsc = session->s_mdsc; + struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *msg; - struct ceph_mds_request_head_old *head; - const char *path1 = NULL; - const char *path2 = NULL; - u64 ino1 = 0, ino2 = 0; - int pathlen1 = 0, pathlen2 = 0; - bool freepath1 = false, freepath2 = false; + struct ceph_mds_request_head_legacy *lhead; + struct ceph_path_info path_info1 = {0}; + struct ceph_path_info path_info2 = {0}; + struct dentry *old_dentry = NULL; int len; u16 releases; void *p, *end; int ret; bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); - - ret = set_request_path_attr(req->r_inode, req->r_dentry, - req->r_parent, req->r_path1, req->r_ino1.ino, - &path1, &pathlen1, &ino1, &freepath1, - test_bit(CEPH_MDS_R_PARENT_LOCKED, - &req->r_req_flags)); + u16 request_head_version = mds_supported_head_version(session); + kuid_t caller_fsuid = req->r_cred->fsuid; + kgid_t caller_fsgid = req->r_cred->fsgid; + bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); + + ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry, + req->r_parent, req->r_path1, req->r_ino1.ino, + &path_info1, parent_locked); if (ret < 0) { msg = ERR_PTR(ret); goto out; } + /* + * When the parent directory's i_rwsem is *not* locked, req->r_parent may + * have become stale (e.g. after a concurrent rename) between the time the + * dentry was looked up and now. If we detect that the stored r_parent + * does not match the inode number we just encoded for the request, switch + * to the correct inode so that the MDS receives a valid parent reference. + */ + if (!parent_locked && req->r_parent && path_info1.vino.ino && + ceph_ino(req->r_parent) != path_info1.vino.ino) { + struct inode *old_parent = req->r_parent; + struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); + if (!IS_ERR(correct_dir)) { + WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n", + ceph_ino(old_parent), path_info1.vino.ino); + /* + * Transfer CEPH_CAP_PIN from the old parent to the new one. + * The pin was taken earlier in ceph_mdsc_submit_request(). + */ + ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN); + iput(old_parent); + req->r_parent = correct_dir; + ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); + } + } + /* If r_old_dentry is set, then assume that its parent is locked */ - ret = set_request_path_attr(NULL, req->r_old_dentry, - req->r_old_dentry_dir, - req->r_path2, req->r_ino2.ino, - &path2, &pathlen2, &ino2, &freepath2, true); + if (req->r_old_dentry && + !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) + old_dentry = req->r_old_dentry; + ret = set_request_path_attr(mdsc, NULL, old_dentry, + req->r_old_dentry_dir, + req->r_path2, req->r_ino2.ino, + &path_info2, true); if (ret < 0) { msg = ERR_PTR(ret); goto out_free1; } - len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); - len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + - sizeof(struct ceph_timespec); - len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); + req->r_altname = get_fscrypt_altname(req, &req->r_altname_len); + if (IS_ERR(req->r_altname)) { + msg = ERR_CAST(req->r_altname); + req->r_altname = NULL; + goto out_free2; + } + + /* + * For old cephs without supporting the 32bit retry/fwd feature + * it will copy the raw memories directly when decoding the + * requests. While new cephs will decode the head depending the + * version member, so we need to make sure it will be compatible + * with them both. + */ + if (legacy) + len = sizeof(struct ceph_mds_request_head_legacy); + else if (request_head_version == 1) + len = offsetofend(struct ceph_mds_request_head, args); + else if (request_head_version == 2) + len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); + else + len = sizeof(struct ceph_mds_request_head); - /* calculate (max) length for cap releases */ + /* filepaths */ + len += 2 * (1 + sizeof(u32) + sizeof(u64)); + len += path_info1.pathlen + path_info2.pathlen; + + /* cap releases */ len += sizeof(struct ceph_mds_request_release) * (!!req->r_inode_drop + !!req->r_dentry_drop + !!req->r_old_inode_drop + !!req->r_old_dentry_drop); if (req->r_dentry_drop) - len += pathlen1; + len += path_info1.pathlen; if (req->r_old_dentry_drop) - len += pathlen2; + len += path_info2.pathlen; + + /* MClientRequest tail */ + + /* req->r_stamp */ + len += sizeof(struct ceph_timespec); + + /* gid list */ + len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); + + /* alternate name */ + len += sizeof(u32) + req->r_altname_len; + + /* fscrypt_auth */ + len += sizeof(u32); // fscrypt_auth + if (req->r_fscrypt_auth) + len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); + + /* fscrypt_file */ + len += sizeof(u32); + if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) + len += sizeof(__le64); msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); if (!msg) { @@ -2598,36 +3122,90 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, msg->hdr.tid = cpu_to_le64(req->r_tid); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); + + if ((req->r_mnt_idmap != &nop_mnt_idmap) && + !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) { + WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op)); + + if (enable_unsafe_idmap) { + pr_warn_once_client(cl, + "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" + " is not supported by MDS. UID/GID-based restrictions may" + " not work properly.\n"); + + caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, + VFSUIDT_INIT(req->r_cred->fsuid)); + caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, + VFSGIDT_INIT(req->r_cred->fsgid)); + } else { + pr_err_ratelimited_client(cl, + "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" + " is not supported by MDS. Fail request with -EIO.\n"); + + ret = -EIO; + goto out_err; + } + } + /* - * The old ceph_mds_request_head didn't contain a version field, and + * The ceph_mds_request_head_legacy didn't contain a version field, and * one was added when we moved the message version from 3->4. */ if (legacy) { msg->hdr.version = cpu_to_le16(3); - head = msg->front.iov_base; - p = msg->front.iov_base + sizeof(*head); - } else { - struct ceph_mds_request_head *new_head = msg->front.iov_base; + p = msg->front.iov_base + sizeof(*lhead); + } else if (request_head_version == 1) { + struct ceph_mds_request_head *nhead = msg->front.iov_base; msg->hdr.version = cpu_to_le16(4); - new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); - head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; - p = msg->front.iov_base + sizeof(*new_head); + nhead->version = cpu_to_le16(1); + p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args); + } else if (request_head_version == 2) { + struct ceph_mds_request_head *nhead = msg->front.iov_base; + + msg->hdr.version = cpu_to_le16(6); + nhead->version = cpu_to_le16(2); + + p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd); + } else { + struct ceph_mds_request_head *nhead = msg->front.iov_base; + kuid_t owner_fsuid; + kgid_t owner_fsgid; + + msg->hdr.version = cpu_to_le16(6); + nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); + nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head)); + + if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) { + owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns, + VFSUIDT_INIT(req->r_cred->fsuid)); + owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns, + VFSGIDT_INIT(req->r_cred->fsgid)); + nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid)); + nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid)); + } else { + nhead->owner_uid = cpu_to_le32(-1); + nhead->owner_gid = cpu_to_le32(-1); + } + + p = msg->front.iov_base + sizeof(*nhead); } end = msg->front.iov_base + msg->front.iov_len; - head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); - head->op = cpu_to_le32(req->r_op); - head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, - req->r_cred->fsuid)); - head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, - req->r_cred->fsgid)); - head->ino = cpu_to_le64(req->r_deleg_ino); - head->args = req->r_args; + lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); + lhead->op = cpu_to_le32(req->r_op); + lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, + caller_fsuid)); + lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, + caller_fsgid)); + lhead->ino = cpu_to_le64(req->r_deleg_ino); + lhead->args = req->r_args; - ceph_encode_filepath(&p, end, ino1, path1); - ceph_encode_filepath(&p, end, ino2, path2); + ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path); + ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path); /* make note of release offset, in case we need to replay */ req->r_request_release_offset = p - msg->front.iov_base; @@ -2639,15 +3217,23 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, req->r_inode ? req->r_inode : d_inode(req->r_dentry), mds, req->r_inode_drop, req->r_inode_unless, req->r_op == CEPH_MDS_OP_READDIR); - if (req->r_dentry_drop) - releases += ceph_encode_dentry_release(&p, req->r_dentry, + if (req->r_dentry_drop) { + ret = ceph_encode_dentry_release(&p, req->r_dentry, req->r_parent, mds, req->r_dentry_drop, req->r_dentry_unless); - if (req->r_old_dentry_drop) - releases += ceph_encode_dentry_release(&p, req->r_old_dentry, + if (ret < 0) + goto out_err; + releases += ret; + } + if (req->r_old_dentry_drop) { + ret = ceph_encode_dentry_release(&p, req->r_old_dentry, req->r_old_dentry_dir, mds, req->r_old_dentry_drop, req->r_old_dentry_unless); + if (ret < 0) + goto out_err; + releases += ret; + } if (req->r_old_inode_drop) releases += ceph_encode_inode_release(&p, d_inode(req->r_old_dentry), @@ -2658,9 +3244,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, p = msg->front.iov_base + req->r_request_release_offset; } - head->num_releases = cpu_to_le16(releases); + lhead->num_releases = cpu_to_le16(releases); - encode_timestamp_and_gids(&p, req); + encode_mclientrequest_tail(&p, req); if (WARN_ON_ONCE(p > end)) { ceph_msg_put(msg); @@ -2682,13 +3268,15 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, msg->hdr.data_off = cpu_to_le16(0); out_free2: - if (freepath2) - ceph_mdsc_free_path((char *)path2, pathlen2); + ceph_mdsc_free_path_info(&path_info2); out_free1: - if (freepath1) - ceph_mdsc_free_path((char *)path1, pathlen1); + ceph_mdsc_free_path_info(&path_info1); out: return msg; +out_err: + ceph_msg_put(msg); + msg = ERR_PTR(ret); + goto out_free2; } /* @@ -2705,18 +3293,6 @@ static void complete_request(struct ceph_mds_client *mdsc, complete_all(&req->r_completion); } -static struct ceph_mds_request_head_old * -find_old_request_head(void *p, u64 features) -{ - bool legacy = !(features & CEPH_FEATURE_FS_BTIME); - struct ceph_mds_request_head *new_head; - - if (legacy) - return (struct ceph_mds_request_head_old *)p; - new_head = (struct ceph_mds_request_head *)p; - return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; -} - /* * called under mdsc->mutex */ @@ -2726,29 +3302,29 @@ static int __prepare_send_request(struct ceph_mds_session *session, { int mds = session->s_mds; struct ceph_mds_client *mdsc = session->s_mdsc; - struct ceph_mds_request_head_old *rhead; + struct ceph_client *cl = mdsc->fsc->client; + struct ceph_mds_request_head_legacy *lhead; + struct ceph_mds_request_head *nhead; struct ceph_msg *msg; - int flags = 0, max_retry; + int flags = 0, old_max_retry; + bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, + &session->s_features); /* - * The type of 'r_attempts' in kernel 'ceph_mds_request' - * is 'int', while in 'ceph_mds_request_head' the type of - * 'num_retry' is '__u8'. So in case the request retries - * exceeding 256 times, the MDS will receive a incorrect - * retry seq. - * - * In this case it's ususally a bug in MDS and continue - * retrying the request makes no sense. - * - * In future this could be fixed in ceph code, so avoid - * using the hardcode here. + * Avoid infinite retrying after overflow. The client will + * increase the retry count and if the MDS is old version, + * so we limit to retry at most 256 times. */ - max_retry = sizeof_field(struct ceph_mds_request_head, num_retry); - max_retry = 1 << (max_retry * BITS_PER_BYTE); - if (req->r_attempts >= max_retry) { - pr_warn_ratelimited("%s request tid %llu seq overflow\n", - __func__, req->r_tid); - return -EMULTIHOP; + if (req->r_attempts) { + old_max_retry = sizeof_field(struct ceph_mds_request_head, + num_retry); + old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); + if ((old_version && req->r_attempts >= old_max_retry) || + ((uint32_t)req->r_attempts >= U32_MAX)) { + pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n", + req->r_tid); + return -EMULTIHOP; + } } req->r_attempts++; @@ -2761,8 +3337,8 @@ static int __prepare_send_request(struct ceph_mds_session *session, else req->r_sent_on_mseq = -1; } - dout("%s %p tid %lld %s (attempt %d)\n", __func__, req, - req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); + doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid, + ceph_mds_op_name(req->r_op), req->r_attempts); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { void *p; @@ -2774,23 +3350,27 @@ static int __prepare_send_request(struct ceph_mds_session *session, * d_move mangles the src name. */ msg = req->r_request; - rhead = find_old_request_head(msg->front.iov_base, - session->s_con.peer_features); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); - flags = le32_to_cpu(rhead->flags); + flags = le32_to_cpu(lhead->flags); flags |= CEPH_MDS_FLAG_REPLAY; - rhead->flags = cpu_to_le32(flags); + lhead->flags = cpu_to_le32(flags); if (req->r_target_inode) - rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); + lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); - rhead->num_retry = req->r_attempts - 1; + lhead->num_retry = req->r_attempts - 1; + if (!old_version) { + nhead = (struct ceph_mds_request_head*)msg->front.iov_base; + nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); + } /* remove cap/dentry releases from message */ - rhead->num_releases = 0; + lhead->num_releases = 0; p = msg->front.iov_base + req->r_request_release_offset; - encode_timestamp_and_gids(&p, req); + encode_mclientrequest_tail(&p, req); msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); @@ -2808,20 +3388,25 @@ static int __prepare_send_request(struct ceph_mds_session *session, } req->r_request = msg; - rhead = find_old_request_head(msg->front.iov_base, - session->s_con.peer_features); - rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); + lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_REPLAY; if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_ASYNC; if (req->r_parent) flags |= CEPH_MDS_FLAG_WANT_DENTRY; - rhead->flags = cpu_to_le32(flags); - rhead->num_fwd = req->r_num_fwd; - rhead->num_retry = req->r_attempts - 1; + lhead->flags = cpu_to_le32(flags); + lhead->num_fwd = req->r_num_fwd; + lhead->num_retry = req->r_attempts - 1; + if (!old_version) { + nhead = (struct ceph_mds_request_head*)msg->front.iov_base; + nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); + nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); + } - dout(" r_parent = %p\n", req->r_parent); + doutc(cl, " r_parent = %p\n", req->r_parent); return 0; } @@ -2849,6 +3434,7 @@ static int __send_request(struct ceph_mds_session *session, static void __do_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_session *session = NULL; int mds = -1; int err = 0; @@ -2860,25 +3446,30 @@ static void __do_request(struct ceph_mds_client *mdsc, return; } + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { + doutc(cl, "metadata corrupted\n"); + err = -EIO; + goto finish; + } if (req->r_timeout && time_after_eq(jiffies, req->r_started + req->r_timeout)) { - dout("do_request timed out\n"); + doutc(cl, "timed out\n"); err = -ETIMEDOUT; goto finish; } if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { - dout("do_request forced umount\n"); + doutc(cl, "forced umount\n"); err = -EIO; goto finish; } if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { if (mdsc->mdsmap_err) { err = mdsc->mdsmap_err; - dout("do_request mdsmap err %d\n", err); + doutc(cl, "mdsmap err %d\n", err); goto finish; } if (mdsc->mdsmap->m_epoch == 0) { - dout("do_request no mdsmap, waiting for map\n"); + doutc(cl, "no mdsmap, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); return; } @@ -2899,7 +3490,7 @@ static void __do_request(struct ceph_mds_client *mdsc, err = -EJUKEBOX; goto finish; } - dout("do_request no mds or not active, waiting for map\n"); + doutc(cl, "no mds or not active, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); return; } @@ -2915,8 +3506,8 @@ static void __do_request(struct ceph_mds_client *mdsc, } req->r_session = ceph_get_mds_session(session); - dout("do_request mds%d session %p state %s\n", mds, session, - ceph_session_state_name(session->s_state)); + doutc(cl, "mds%d session %p state %s\n", mds, session, + ceph_session_state_name(session->s_state)); /* * The old ceph will crash the MDSs when see unknown OPs @@ -2973,7 +3564,7 @@ static void __do_request(struct ceph_mds_client *mdsc, /* * For async create we will choose the auth MDS of frag in parent - * directory to send the request and ususally this works fine, but + * directory to send the request and usually this works fine, but * if the migrated the dirtory to another MDS before it could handle * it the request will be forwarded. * @@ -3007,8 +3598,8 @@ static void __do_request(struct ceph_mds_client *mdsc, spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { - dout("do_request session changed for auth cap %d -> %d\n", - cap->session->s_mds, session->s_mds); + doutc(cl, "session changed for auth cap %d -> %d\n", + cap->session->s_mds, session->s_mds); /* Remove the auth cap from old session */ spin_lock(&cap->session->s_cap_lock); @@ -3035,7 +3626,7 @@ out_session: ceph_put_mds_session(session); finish: if (err) { - dout("__do_request early error %d\n", err); + doutc(cl, "early error %d\n", err); req->r_err = err; complete_request(mdsc, req); __unregister_request(mdsc, req); @@ -3049,6 +3640,7 @@ finish: static void __wake_requests(struct ceph_mds_client *mdsc, struct list_head *head) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; LIST_HEAD(tmp_list); @@ -3058,7 +3650,8 @@ static void __wake_requests(struct ceph_mds_client *mdsc, req = list_entry(tmp_list.next, struct ceph_mds_request, r_wait); list_del_init(&req->r_wait); - dout(" wake request %p tid %llu\n", req, req->r_tid); + doutc(cl, " wake request %p tid %llu\n", req, + req->r_tid); __do_request(mdsc, req); } } @@ -3069,10 +3662,11 @@ static void __wake_requests(struct ceph_mds_client *mdsc, */ static void kick_requests(struct ceph_mds_client *mdsc, int mds) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; struct rb_node *p = rb_first(&mdsc->request_tree); - dout("kick_requests mds%d\n", mds); + doutc(cl, "kick_requests mds%d\n", mds); while (p) { req = rb_entry(p, struct ceph_mds_request, r_node); p = rb_next(p); @@ -3082,7 +3676,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) continue; /* only new requests */ if (req->r_session && req->r_session->s_mds == mds) { - dout(" kicking tid %llu\n", req->r_tid); + doutc(cl, " kicking tid %llu\n", req->r_tid); list_del_init(&req->r_wait); __do_request(mdsc, req); } @@ -3092,6 +3686,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req) { + struct ceph_client *cl = mdsc->fsc->client; int err = 0; /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ @@ -3113,8 +3708,7 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, if (req->r_inode) { err = ceph_wait_on_async_create(req->r_inode); if (err) { - dout("%s: wait for async create returned: %d\n", - __func__, err); + doutc(cl, "wait for async create returned: %d\n", err); return err; } } @@ -3122,13 +3716,12 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, if (!err && req->r_old_inode) { err = ceph_wait_on_async_create(req->r_old_inode); if (err) { - dout("%s: wait for async create returned: %d\n", - __func__, err); + doutc(cl, "wait for async create returned: %d\n", err); return err; } } - dout("submit_request on %p for inode %p\n", req, dir); + doutc(cl, "submit_request on %p for inode %p\n", req, dir); mutex_lock(&mdsc->mutex); __register_request(mdsc, req, dir); __do_request(mdsc, req); @@ -3141,10 +3734,11 @@ int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, ceph_mds_request_wait_callback_t wait_func) { + struct ceph_client *cl = mdsc->fsc->client; int err; /* wait */ - dout("do_request waiting\n"); + doutc(cl, "do_request waiting\n"); if (wait_func) { err = wait_func(mdsc, req); } else { @@ -3158,14 +3752,14 @@ int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, else err = timeleft; /* killed */ } - dout("do_request waited, got %d\n", err); + doutc(cl, "do_request waited, got %d\n", err); mutex_lock(&mdsc->mutex); /* only abort if we didn't race with a real reply */ if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { err = le32_to_cpu(req->r_reply_info.head->result); } else if (err < 0) { - dout("aborted request %lld with %d\n", req->r_tid, err); + doutc(cl, "aborted request %lld with %d\n", req->r_tid, err); /* * ensure we aren't running concurrently with @@ -3196,15 +3790,16 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req) { + struct ceph_client *cl = mdsc->fsc->client; int err; - dout("do_request on %p\n", req); + doutc(cl, "do_request on %p\n", req); /* issue */ err = ceph_mdsc_submit_request(mdsc, dir, req); if (!err) err = ceph_mdsc_wait_request(mdsc, req, NULL); - dout("do_request %p done, result %d\n", req, err); + doutc(cl, "do_request %p done, result %d\n", req, err); return err; } @@ -3216,8 +3811,10 @@ void ceph_invalidate_dir_request(struct ceph_mds_request *req) { struct inode *dir = req->r_parent; struct inode *old_dir = req->r_old_dentry_dir; + struct ceph_client *cl = req->r_mdsc->fsc->client; - dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); + doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n", + dir, old_dir); ceph_dir_clear_complete(dir); if (old_dir) @@ -3238,6 +3835,7 @@ void ceph_invalidate_dir_request(struct ceph_mds_request *req) static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) { struct ceph_mds_client *mdsc = session->s_mdsc; + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; struct ceph_mds_reply_head *head = msg->front.iov_base; struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ @@ -3245,9 +3843,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) u64 tid; int err, result; int mds = session->s_mds; + bool close_sessions = false; if (msg->front.iov_len < sizeof(*head)) { - pr_err("mdsc_handle_reply got corrupt (short) reply\n"); + pr_err_client(cl, "got corrupt (short) reply\n"); ceph_msg_dump(msg); return; } @@ -3257,17 +3856,17 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) mutex_lock(&mdsc->mutex); req = lookup_get_request(mdsc, tid); if (!req) { - dout("handle_reply on unknown tid %llu\n", tid); + doutc(cl, "on unknown tid %llu\n", tid); mutex_unlock(&mdsc->mutex); return; } - dout("handle_reply %p\n", req); + doutc(cl, "handle_reply %p\n", req); /* correct session? */ if (req->r_session != session) { - pr_err("mdsc_handle_reply got %llu on session mds%d" - " not mds%d\n", tid, session->s_mds, - req->r_session ? req->r_session->s_mds : -1); + pr_err_client(cl, "got %llu on session mds%d not mds%d\n", + tid, session->s_mds, + req->r_session ? req->r_session->s_mds : -1); mutex_unlock(&mdsc->mutex); goto out; } @@ -3275,14 +3874,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) /* dup? */ if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { - pr_warn("got a dup %s reply on %llu from mds%d\n", - head->safe ? "safe" : "unsafe", tid, mds); + pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n", + head->safe ? "safe" : "unsafe", tid, mds); mutex_unlock(&mdsc->mutex); goto out; } if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { - pr_warn("got unsafe after safe on %llu from mds%d\n", - tid, mds); + pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n", + tid, mds); mutex_unlock(&mdsc->mutex); goto out; } @@ -3305,7 +3904,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) * response. And even if it did, there is nothing * useful we could do with a revised return value. */ - dout("got safe reply %llu, mds%d\n", tid, mds); + doutc(cl, "got safe reply %llu, mds%d\n", tid, mds); mutex_unlock(&mdsc->mutex); goto out; @@ -3315,23 +3914,36 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); } - dout("handle_reply tid %lld result %d\n", tid, result); - rinfo = &req->r_reply_info; + doutc(cl, "tid %lld result %d\n", tid, result); if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) - err = parse_reply_info(session, msg, rinfo, (u64)-1); + err = parse_reply_info(session, msg, req, (u64)-1); else - err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); + err = parse_reply_info(session, msg, req, + session->s_con.peer_features); mutex_unlock(&mdsc->mutex); /* Must find target inode outside of mutexes to avoid deadlocks */ + rinfo = &req->r_reply_info; if ((err >= 0) && rinfo->head->is_target) { - struct inode *in; + struct inode *in = xchg(&req->r_new_inode, NULL); struct ceph_vino tvino = { .ino = le64_to_cpu(rinfo->targeti.in->ino), .snap = le64_to_cpu(rinfo->targeti.in->snapid) }; - in = ceph_get_inode(mdsc->fsc->sb, tvino); + /* + * If we ended up opening an existing inode, discard + * r_new_inode + */ + if (req->r_op == CEPH_MDS_OP_CREATE && + !req->r_reply_info.has_create_ino) { + /* This should never happen on an async create */ + WARN_ON_ONCE(req->r_deleg_ino); + iput(in); + in = NULL; + } + + in = ceph_get_inode(mdsc->fsc->sb, tvino, in); if (IS_ERR(in)) { err = PTR_ERR(in); mutex_lock(&session->s_mutex); @@ -3342,7 +3954,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) mutex_lock(&session->s_mutex); if (err < 0) { - pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); + pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n", + mds, tid); ceph_msg_dump(msg); goto out_err; } @@ -3351,10 +3964,17 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) realm = NULL; if (rinfo->snapblob_len) { down_write(&mdsc->snap_rwsem); - ceph_update_snap_trace(mdsc, rinfo->snapblob, + err = ceph_update_snap_trace(mdsc, rinfo->snapblob, rinfo->snapblob + rinfo->snapblob_len, le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, &realm); + if (err) { + up_write(&mdsc->snap_rwsem); + close_sessions = true; + if (err == -EIO) + ceph_msg_dump(msg); + goto out_err; + } downgrade_write(&mdsc->snap_rwsem); } else { down_read(&mdsc->snap_rwsem); @@ -3367,7 +3987,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (err == 0) { if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || req->r_op == CEPH_MDS_OP_LSSNAP)) - ceph_readdir_prepopulate(req, req->r_session); + err = ceph_readdir_prepopulate(req, req->r_session); } current->journal_info = NULL; mutex_unlock(&req->r_fill_mutex); @@ -3399,7 +4019,7 @@ out_err: set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); } } else { - dout("reply arrived after request %lld was aborted\n", tid); + doutc(cl, "reply arrived after request %lld was aborted\n", tid); } mutex_unlock(&mdsc->mutex); @@ -3412,6 +4032,10 @@ out_err: req->r_end_latency, err); out: ceph_mdsc_put_request(req); + + /* Defer closing the sessions after s_mutex lock being released */ + if (close_sessions) + ceph_mdsc_close_sessions(mdsc); return; } @@ -3424,6 +4048,7 @@ static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, struct ceph_msg *msg) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; u64 tid = le64_to_cpu(msg->hdr.tid); u32 next_mds; @@ -3441,43 +4066,32 @@ static void handle_forward(struct ceph_mds_client *mdsc, req = lookup_get_request(mdsc, tid); if (!req) { mutex_unlock(&mdsc->mutex); - dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); + doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); return; /* dup reply? */ } if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { - dout("forward tid %llu aborted, unregistering\n", tid); + doutc(cl, "forward tid %llu aborted, unregistering\n", tid); __unregister_request(mdsc, req); - } else if (fwd_seq <= req->r_num_fwd) { + } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { /* - * The type of 'num_fwd' in ceph 'MClientRequestForward' - * is 'int32_t', while in 'ceph_mds_request_head' the - * type is '__u8'. So in case the request bounces between - * MDSes exceeding 256 times, the client will get stuck. - * - * In this case it's ususally a bug in MDS and continue - * bouncing the request makes no sense. + * Avoid infinite retrying after overflow. * - * In future this could be fixed in ceph code, so avoid - * using the hardcode here. + * The MDS will increase the fwd count and in client side + * if the num_fwd is less than the one saved in request + * that means the MDS is an old version and overflowed of + * 8 bits. */ - int max = sizeof_field(struct ceph_mds_request_head, num_fwd); - max = 1 << (max * BITS_PER_BYTE); - if (req->r_num_fwd >= max) { - mutex_lock(&req->r_fill_mutex); - req->r_err = -EMULTIHOP; - set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); - mutex_unlock(&req->r_fill_mutex); - aborted = true; - pr_warn_ratelimited("forward tid %llu seq overflow\n", - tid); - } else { - dout("forward tid %llu to mds%d - old seq %d <= %d\n", - tid, next_mds, req->r_num_fwd, fwd_seq); - } + mutex_lock(&req->r_fill_mutex); + req->r_err = -EMULTIHOP; + set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); + mutex_unlock(&req->r_fill_mutex); + aborted = true; + pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n", + tid); } else { /* resend. forward race not possible; mds would drop */ - dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); + doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds); BUG_ON(req->r_err); BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); req->r_attempts = 0; @@ -3495,7 +4109,8 @@ static void handle_forward(struct ceph_mds_client *mdsc, return; bad: - pr_err("mdsc_handle_forward decode error err=%d\n", err); + pr_err_client(cl, "decode error err=%d\n", err); + ceph_msg_dump(msg); } static int __decode_session_metadata(void **p, void *end, @@ -3533,15 +4148,19 @@ static void handle_session(struct ceph_mds_session *session, struct ceph_msg *msg) { struct ceph_mds_client *mdsc = session->s_mdsc; + struct ceph_client *cl = mdsc->fsc->client; int mds = session->s_mds; int msg_version = le16_to_cpu(msg->hdr.version); void *p = msg->front.iov_base; void *end = p + msg->front.iov_len; struct ceph_mds_session_head *h; - u32 op; + struct ceph_mds_cap_auth *cap_auths = NULL; + u32 op, cap_auths_num = 0; u64 seq, features = 0; int wake = 0; bool blocklisted = false; + u32 i; + /* decode */ ceph_decode_need(&p, end, sizeof(*h), bad); @@ -3580,12 +4199,107 @@ static void handle_session(struct ceph_mds_session *session, /* version >= 5, flags */ ceph_decode_32_safe(&p, end, flags, bad); if (flags & CEPH_SESSION_BLOCKLISTED) { - pr_warn("mds%d session blocklisted\n", session->s_mds); + pr_warn_client(cl, "mds%d session blocklisted\n", + session->s_mds); blocklisted = true; } } + if (msg_version >= 6) { + ceph_decode_32_safe(&p, end, cap_auths_num, bad); + doutc(cl, "cap_auths_num %d\n", cap_auths_num); + + if (cap_auths_num && op != CEPH_SESSION_OPEN) { + WARN_ON_ONCE(op != CEPH_SESSION_OPEN); + goto skip_cap_auths; + } + + cap_auths = kcalloc(cap_auths_num, + sizeof(struct ceph_mds_cap_auth), + GFP_KERNEL); + if (!cap_auths) { + pr_err_client(cl, "No memory for cap_auths\n"); + return; + } + + for (i = 0; i < cap_auths_num; i++) { + u32 _len, j; + + /* struct_v, struct_compat, and struct_len in MDSCapAuth */ + ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); + + /* struct_v, struct_compat, and struct_len in MDSCapMatch */ + ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad); + ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad); + ceph_decode_32_safe(&p, end, _len, bad); + if (_len) { + cap_auths[i].match.gids = kcalloc(_len, sizeof(u32), + GFP_KERNEL); + if (!cap_auths[i].match.gids) { + pr_err_client(cl, "No memory for gids\n"); + goto fail; + } + + cap_auths[i].match.num_gids = _len; + for (j = 0; j < _len; j++) + ceph_decode_32_safe(&p, end, + cap_auths[i].match.gids[j], + bad); + } + + ceph_decode_32_safe(&p, end, _len, bad); + if (_len) { + cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char), + GFP_KERNEL); + if (!cap_auths[i].match.path) { + pr_err_client(cl, "No memory for path\n"); + goto fail; + } + ceph_decode_copy(&p, cap_auths[i].match.path, _len); + + /* Remove the tailing '/' */ + while (_len && cap_auths[i].match.path[_len - 1] == '/') { + cap_auths[i].match.path[_len - 1] = '\0'; + _len -= 1; + } + } + + ceph_decode_32_safe(&p, end, _len, bad); + if (_len) { + cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char), + GFP_KERNEL); + if (!cap_auths[i].match.fs_name) { + pr_err_client(cl, "No memory for fs_name\n"); + goto fail; + } + ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len); + } + + ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad); + ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad); + ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad); + doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n", + cap_auths[i].match.uid, cap_auths[i].match.num_gids, + cap_auths[i].match.path, cap_auths[i].match.fs_name, + cap_auths[i].match.root_squash, + cap_auths[i].readable, cap_auths[i].writeable); + } + } + +skip_cap_auths: mutex_lock(&mdsc->mutex); + if (op == CEPH_SESSION_OPEN) { + if (mdsc->s_cap_auths) { + for (i = 0; i < mdsc->s_cap_auths_num; i++) { + kfree(mdsc->s_cap_auths[i].match.gids); + kfree(mdsc->s_cap_auths[i].match.path); + kfree(mdsc->s_cap_auths[i].match.fs_name); + } + kfree(mdsc->s_cap_auths); + } + mdsc->s_cap_auths_num = cap_auths_num; + mdsc->s_cap_auths = cap_auths; + } if (op == CEPH_SESSION_CLOSE) { ceph_get_mds_session(session); __unregister_session(mdsc, session); @@ -3596,25 +4310,27 @@ static void handle_session(struct ceph_mds_session *session, mutex_lock(&session->s_mutex); - dout("handle_session mds%d %s %p state %s seq %llu\n", - mds, ceph_session_op_name(op), session, - ceph_session_state_name(session->s_state), seq); + doutc(cl, "mds%d %s %p state %s seq %llu\n", mds, + ceph_session_op_name(op), session, + ceph_session_state_name(session->s_state), seq); if (session->s_state == CEPH_MDS_SESSION_HUNG) { session->s_state = CEPH_MDS_SESSION_OPEN; - pr_info("mds%d came back\n", session->s_mds); + pr_info_client(cl, "mds%d came back\n", session->s_mds); } switch (op) { case CEPH_SESSION_OPEN: if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) - pr_info("mds%d reconnect success\n", session->s_mds); + pr_info_client(cl, "mds%d reconnect success\n", + session->s_mds); + session->s_features = features; if (session->s_state == CEPH_MDS_SESSION_OPEN) { - pr_notice("mds%d is already opened\n", session->s_mds); + pr_notice_client(cl, "mds%d is already opened\n", + session->s_mds); } else { session->s_state = CEPH_MDS_SESSION_OPEN; - session->s_features = features; renewed_caps(mdsc, session, 0); if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) @@ -3641,7 +4357,8 @@ static void handle_session(struct ceph_mds_session *session, case CEPH_SESSION_CLOSE: if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) - pr_info("mds%d reconnect denied\n", session->s_mds); + pr_info_client(cl, "mds%d reconnect denied\n", + session->s_mds); session->s_state = CEPH_MDS_SESSION_CLOSED; cleanup_session_requests(mdsc, session); remove_session_caps(session); @@ -3650,8 +4367,8 @@ static void handle_session(struct ceph_mds_session *session, break; case CEPH_SESSION_STALE: - pr_info("mds%d caps went stale, renewing\n", - session->s_mds); + pr_info_client(cl, "mds%d caps went stale, renewing\n", + session->s_mds); atomic_inc(&session->s_cap_gen); session->s_cap_ttl = jiffies - 1; send_renew_caps(mdsc, session); @@ -3662,11 +4379,17 @@ static void handle_session(struct ceph_mds_session *session, break; case CEPH_SESSION_FLUSHMSG: + /* flush cap releases */ + spin_lock(&session->s_cap_lock); + if (session->s_num_cap_releases) + ceph_flush_session_cap_releases(mdsc, session); + spin_unlock(&session->s_cap_lock); + send_flushmsg_ack(mdsc, session, seq); break; case CEPH_SESSION_FORCE_RO: - dout("force_session_readonly %p\n", session); + doutc(cl, "force_session_readonly %p\n", session); spin_lock(&session->s_cap_lock); session->s_readonly = true; spin_unlock(&session->s_cap_lock); @@ -3675,7 +4398,8 @@ static void handle_session(struct ceph_mds_session *session, case CEPH_SESSION_REJECT: WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); - pr_info("mds%d rejected session\n", session->s_mds); + pr_info_client(cl, "mds%d rejected session\n", + session->s_mds); session->s_state = CEPH_MDS_SESSION_REJECTED; cleanup_session_requests(mdsc, session); remove_session_caps(session); @@ -3685,7 +4409,7 @@ static void handle_session(struct ceph_mds_session *session, break; default: - pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); + pr_err_client(cl, "bad op %d mds%d\n", op, mds); WARN_ON(1); } @@ -3702,32 +4426,40 @@ static void handle_session(struct ceph_mds_session *session, return; bad: - pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, - (int)msg->front.iov_len); + pr_err_client(cl, "corrupt message mds%d len %d\n", mds, + (int)msg->front.iov_len); ceph_msg_dump(msg); +fail: + for (i = 0; i < cap_auths_num; i++) { + kfree(cap_auths[i].match.gids); + kfree(cap_auths[i].match.path); + kfree(cap_auths[i].match.fs_name); + } + kfree(cap_auths); return; } void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) { + struct ceph_client *cl = req->r_mdsc->fsc->client; int dcaps; dcaps = xchg(&req->r_dir_caps, 0); if (dcaps) { - dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); + doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); } } -void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) +void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req) { + struct ceph_client *cl = req->r_mdsc->fsc->client; int dcaps; dcaps = xchg(&req->r_dir_caps, 0); if (dcaps) { - dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); - ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), - dcaps); + doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); + ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps); } } @@ -3740,7 +4472,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, *nreq; struct rb_node *p; - dout("replay_unsafe_requests mds%d\n", session->s_mds); + doutc(mdsc->fsc->client, "mds%d\n", session->s_mds); mutex_lock(&mdsc->mutex); list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) @@ -3763,7 +4495,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, if (req->r_session->s_mds != session->s_mds) continue; - ceph_mdsc_release_dir_caps_no_check(req); + ceph_mdsc_release_dir_caps_async(req); __send_request(session, req, true); } @@ -3882,42 +4614,46 @@ out_unlock: /* * Encode information about a cap for a reconnect with the MDS. */ -static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, - void *arg) +static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) { + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); + struct ceph_client *cl = ceph_inode_to_client(inode); union { struct ceph_mds_cap_reconnect v2; struct ceph_mds_cap_reconnect_v1 v1; } rec; - struct ceph_inode_info *ci = cap->ci; + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_reconnect_state *recon_state = arg; struct ceph_pagelist *pagelist = recon_state->pagelist; struct dentry *dentry; - char *path; - int pathlen = 0, err; - u64 pathbase; + struct ceph_cap *cap; + struct ceph_path_info path_info = {0}; + int err; u64 snap_follows; - dout(" adding %p ino %llx.%llx cap %p %lld %s\n", - inode, ceph_vinop(inode), cap, cap->cap_id, - ceph_cap_string(cap->issued)); - dentry = d_find_primary(inode); if (dentry) { /* set pathbase to parent dir when msg_version >= 2 */ - path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, + char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, recon_state->msg_version >= 2); dput(dentry); if (IS_ERR(path)) { err = PTR_ERR(path); goto out_err; } - } else { - path = NULL; - pathbase = 0; } spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + if (!cap) { + spin_unlock(&ci->i_ceph_lock); + err = 0; + goto out_err; + } + doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode, + ceph_vinop(inode), cap, cap->cap_id, + ceph_cap_string(cap->issued)); + cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ cap->mseq = 0; /* and migrate_seq */ @@ -3937,18 +4673,22 @@ static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); rec.v2.issued = cpu_to_le32(cap->issued); rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); - rec.v2.pathbase = cpu_to_le64(pathbase); + rec.v2.pathbase = cpu_to_le64(path_info.vino.ino); rec.v2.flock_len = (__force __le32) ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); } else { + struct timespec64 ts; + rec.v1.cap_id = cpu_to_le64(cap->cap_id); rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); rec.v1.issued = cpu_to_le32(cap->issued); rec.v1.size = cpu_to_le64(i_size_read(inode)); - ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); - ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); + ts = inode_get_mtime(inode); + ceph_encode_timespec64(&rec.v1.mtime, &ts); + ts = inode_get_atime(inode); + ceph_encode_timespec64(&rec.v1.atime, &ts); rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); - rec.v1.pathbase = cpu_to_le64(pathbase); + rec.v1.pathbase = cpu_to_le64(path_info.vino.ino); } if (list_empty(&ci->i_cap_snaps)) { @@ -4010,7 +4750,7 @@ encode_again: sizeof(struct ceph_filelock); rec.v2.flock_len = cpu_to_le32(struct_len); - struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); + struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2); if (struct_v >= 2) struct_len += sizeof(u64); /* snap_follows */ @@ -4034,7 +4774,7 @@ encode_again: ceph_pagelist_encode_8(pagelist, 1); ceph_pagelist_encode_32(pagelist, struct_len); } - ceph_pagelist_encode_string(pagelist, path, pathlen); + ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); ceph_locks_to_pagelist(flocks, pagelist, num_fcntl_locks, num_flock_locks); @@ -4045,17 +4785,17 @@ out_freeflocks: } else { err = ceph_pagelist_reserve(pagelist, sizeof(u64) + sizeof(u32) + - pathlen + sizeof(rec.v1)); + path_info.pathlen + sizeof(rec.v1)); if (err) goto out_err; ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); - ceph_pagelist_encode_string(pagelist, path, pathlen); + ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); } out_err: - ceph_mdsc_free_path(path, pathlen); + ceph_mdsc_free_path_info(&path_info); if (!err) recon_state->nr_caps++; return err; @@ -4066,6 +4806,7 @@ static int encode_snap_realms(struct ceph_mds_client *mdsc, { struct rb_node *p; struct ceph_pagelist *pagelist = recon_state->pagelist; + struct ceph_client *cl = mdsc->fsc->client; int err = 0; if (recon_state->msg_version >= 4) { @@ -4104,8 +4845,8 @@ static int encode_snap_realms(struct ceph_mds_client *mdsc, ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); } - dout(" adding snap realm %llx seq %lld parent %llx\n", - realm->ino, realm->seq, realm->parent_ino); + doutc(cl, " adding snap realm %llx seq %lld parent %llx\n", + realm->ino, realm->seq, realm->parent_ino); sr_rec.ino = cpu_to_le64(realm->ino); sr_rec.seq = cpu_to_le64(realm->seq); sr_rec.parent = cpu_to_le64(realm->parent_ino); @@ -4134,6 +4875,7 @@ fail: static void send_mds_reconnect(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *reply; int mds = session->s_mds; int err = -ENOMEM; @@ -4142,7 +4884,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, }; LIST_HEAD(dispose); - pr_info("mds%d reconnect start\n", mds); + pr_info_client(cl, "mds%d reconnect start\n", mds); recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!recon_state.pagelist) @@ -4158,8 +4900,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, session->s_state = CEPH_MDS_SESSION_RECONNECTING; session->s_seq = 0; - dout("session %p state %s\n", session, - ceph_session_state_name(session->s_state)); + doutc(cl, "session %p state %s\n", session, + ceph_session_state_name(session->s_state)); atomic_inc(&session->s_cap_gen); @@ -4206,7 +4948,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, } else { recon_state.msg_version = 2; } - /* trsaverse this session's caps */ + /* traverse this session's caps */ err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); spin_lock(&session->s_cap_lock); @@ -4293,7 +5035,8 @@ fail: fail_nomsg: ceph_pagelist_release(recon_state.pagelist); fail_nopagelist: - pr_err("error %d preparing reconnect for mds%d\n", err, mds); + pr_err_client(cl, "error %d preparing reconnect for mds%d\n", + err, mds); return; } @@ -4312,9 +5055,9 @@ static void check_new_map(struct ceph_mds_client *mdsc, int oldstate, newstate; struct ceph_mds_session *s; unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; + struct ceph_client *cl = mdsc->fsc->client; - dout("check_new_map new %u old %u\n", - newmap->m_epoch, oldmap->m_epoch); + doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); if (newmap->m_info) { for (i = 0; i < newmap->possible_max_rank; i++) { @@ -4330,12 +5073,12 @@ static void check_new_map(struct ceph_mds_client *mdsc, oldstate = ceph_mdsmap_get_state(oldmap, i); newstate = ceph_mdsmap_get_state(newmap, i); - dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", - i, ceph_mds_state_name(oldstate), - ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", - ceph_mds_state_name(newstate), - ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", - ceph_session_state_name(s->s_state)); + doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n", + i, ceph_mds_state_name(oldstate), + ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", + ceph_mds_state_name(newstate), + ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", + ceph_session_state_name(s->s_state)); if (i >= newmap->possible_max_rank) { /* force close session for stopped mds */ @@ -4388,7 +5131,8 @@ static void check_new_map(struct ceph_mds_client *mdsc, newstate >= CEPH_MDS_STATE_ACTIVE) { if (oldstate != CEPH_MDS_STATE_CREATING && oldstate != CEPH_MDS_STATE_STARTING) - pr_info("mds%d recovery completed\n", s->s_mds); + pr_info_client(cl, "mds%d recovery completed\n", + s->s_mds); kick_requests(mdsc, i); mutex_unlock(&mdsc->mutex); mutex_lock(&s->s_mutex); @@ -4432,12 +5176,13 @@ static void check_new_map(struct ceph_mds_client *mdsc, s = __open_export_target_session(mdsc, i); if (IS_ERR(s)) { err = PTR_ERR(s); - pr_err("failed to open export target session, err %d\n", - err); + pr_err_client(cl, + "failed to open export target session, err %d\n", + err); continue; } } - dout("send reconnect to export target mds.%d\n", i); + doutc(cl, "send reconnect to export target mds.%d\n", i); mutex_unlock(&mdsc->mutex); send_mds_reconnect(mdsc, s); ceph_put_mds_session(s); @@ -4453,8 +5198,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, if (s->s_state == CEPH_MDS_SESSION_OPEN || s->s_state == CEPH_MDS_SESSION_HUNG || s->s_state == CEPH_MDS_SESSION_CLOSING) { - dout(" connecting to export targets of laggy mds%d\n", - i); + doutc(cl, " connecting to export targets of laggy mds%d\n", i); __open_export_target_sessions(mdsc, s); } } @@ -4481,6 +5225,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, struct ceph_msg *msg) { + struct ceph_client *cl = mdsc->fsc->client; struct super_block *sb = mdsc->fsc->sb; struct inode *inode; struct dentry *parent, *dentry; @@ -4492,7 +5237,10 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct qstr dname; int release = 0; - dout("handle_lease from mds%d\n", mds); + doutc(cl, "from mds%d\n", mds); + + if (!ceph_inc_mds_stopping_blocker(mdsc, session)) + return; /* decode */ if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) @@ -4507,22 +5255,19 @@ static void handle_lease(struct ceph_mds_client *mdsc, /* lookup inode */ inode = ceph_find_inode(sb, vino); - dout("handle_lease %s, ino %llx %p %.*s\n", - ceph_lease_op_name(h->action), vino.ino, inode, - dname.len, dname.name); + doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), + vino.ino, inode, dname.len, dname.name); mutex_lock(&session->s_mutex); - inc_session_sequence(session); - if (!inode) { - dout("handle_lease no inode %llx\n", vino.ino); + doutc(cl, "no inode %llx\n", vino.ino); goto release; } /* dentry */ parent = d_find_alias(inode); if (!parent) { - dout("no parent dentry on inode %p\n", inode); + doutc(cl, "no parent dentry on inode %p\n", inode); WARN_ON(1); goto release; /* hrm... */ } @@ -4575,10 +5320,14 @@ release: out: mutex_unlock(&session->s_mutex); iput(inode); + + ceph_dec_mds_stopping_blocker(mdsc); return; bad: - pr_err("corrupt lease message\n"); + ceph_dec_mds_stopping_blocker(mdsc); + + pr_err_client(cl, "corrupt lease message\n"); ceph_msg_dump(msg); } @@ -4586,13 +5335,14 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, struct dentry *dentry, char action, u32 seq) { + struct ceph_client *cl = session->s_mdsc->fsc->client; struct ceph_msg *msg; struct ceph_mds_lease *lease; struct inode *dir; int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; - dout("lease_send_msg identry %p %s to mds%d\n", - dentry, ceph_lease_op_name(action), session->s_mds); + doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action), + session->s_mds); msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); if (!msg) @@ -4625,6 +5375,7 @@ static void lock_unlock_session(struct ceph_mds_session *s) static void maybe_recover_session(struct ceph_mds_client *mdsc) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_fs_client *fsc = mdsc->fsc; if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) @@ -4636,17 +5387,19 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc) if (!READ_ONCE(fsc->blocklisted)) return; - pr_info("auto reconnect after blocklisted\n"); + pr_info_client(cl, "auto reconnect after blocklisted\n"); ceph_force_reconnect(fsc->sb); } bool check_session_state(struct ceph_mds_session *s) { + struct ceph_client *cl = s->s_mdsc->fsc->client; + switch (s->s_state) { case CEPH_MDS_SESSION_OPEN: if (s->s_ttl && time_after(jiffies, s->s_ttl)) { s->s_state = CEPH_MDS_SESSION_HUNG; - pr_info("mds%d hung\n", s->s_mds); + pr_info_client(cl, "mds%d hung\n", s->s_mds); } break; case CEPH_MDS_SESSION_CLOSING: @@ -4666,6 +5419,8 @@ bool check_session_state(struct ceph_mds_session *s) */ void inc_session_sequence(struct ceph_mds_session *s) { + struct ceph_client *cl = s->s_mdsc->fsc->client; + lockdep_assert_held(&s->s_mutex); s->s_seq++; @@ -4673,11 +5428,11 @@ void inc_session_sequence(struct ceph_mds_session *s) if (s->s_state == CEPH_MDS_SESSION_CLOSING) { int ret; - dout("resending session close request for mds%d\n", s->s_mds); + doutc(cl, "resending session close request for mds%d\n", s->s_mds); ret = request_close_session(s); if (ret < 0) - pr_err("unable to close session to mds%d: %d\n", - s->s_mds, ret); + pr_err_client(cl, "unable to close session to mds%d: %d\n", + s->s_mds, ret); } } @@ -4706,9 +5461,9 @@ static void delayed_work(struct work_struct *work) int renew_caps; int i; - dout("mdsc delayed_work\n"); + doutc(mdsc->fsc->client, "mdsc delayed_work\n"); - if (mdsc->stopping) + if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) return; mutex_lock(&mdsc->mutex); @@ -4729,6 +5484,8 @@ static void delayed_work(struct work_struct *work) } mutex_unlock(&mdsc->mutex); + ceph_flush_session_cap_releases(mdsc, s); + mutex_lock(&s->s_mutex); if (renew_caps) send_renew_caps(mdsc, s); @@ -4773,6 +5530,11 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) } init_completion(&mdsc->safe_umount_waiters); + spin_lock_init(&mdsc->stopping_lock); + atomic_set(&mdsc->stopping_blockers, 0); + init_completion(&mdsc->stopping_waiter); + atomic64_set(&mdsc->dirty_folios, 0); + init_waitqueue_head(&mdsc->flush_end_wq); init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); mdsc->quotarealms_inodes = RB_ROOT; @@ -4785,8 +5547,11 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); mdsc->last_renew_caps = jiffies; INIT_LIST_HEAD(&mdsc->cap_delay_list); +#ifdef CONFIG_DEBUG_FS INIT_LIST_HEAD(&mdsc->cap_wait_list); +#endif spin_lock_init(&mdsc->cap_delay_lock); + INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list); INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); mdsc->last_cap_flush_tid = 1; @@ -4795,6 +5560,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->cap_dirty_lock); init_waitqueue_head(&mdsc->cap_flushing_wq); INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); + INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work); err = ceph_metric_init(&mdsc->metric); if (err) goto err_mdsmap; @@ -4832,6 +5598,7 @@ err_mdsc: */ static void wait_requests(struct ceph_mds_client *mdsc) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_request *req; @@ -4839,25 +5606,25 @@ static void wait_requests(struct ceph_mds_client *mdsc) if (__get_oldest_req(mdsc)) { mutex_unlock(&mdsc->mutex); - dout("wait_requests waiting for requests\n"); + doutc(cl, "waiting for requests\n"); wait_for_completion_timeout(&mdsc->safe_umount_waiters, ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining requests */ mutex_lock(&mdsc->mutex); while ((req = __get_oldest_req(mdsc))) { - dout("wait_requests timed out on tid %llu\n", - req->r_tid); + doutc(cl, "timed out on tid %llu\n", req->r_tid); list_del_init(&req->r_wait); __unregister_request(mdsc, req); } } mutex_unlock(&mdsc->mutex); - dout("wait_requests done\n"); + doutc(cl, "done\n"); } void send_flush_mdlog(struct ceph_mds_session *s) { + struct ceph_client *cl = s->s_mdsc->fsc->client; struct ceph_msg *msg; /* @@ -4867,27 +5634,202 @@ void send_flush_mdlog(struct ceph_mds_session *s) return; mutex_lock(&s->s_mutex); - dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds, - ceph_session_state_name(s->s_state), s->s_seq); + doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n", + s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, s->s_seq); if (!msg) { - pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n", - s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); + pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n", + s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); } else { ceph_con_send(&s->s_con, msg); } mutex_unlock(&s->s_mutex); } +static int ceph_mds_auth_match(struct ceph_mds_client *mdsc, + struct ceph_mds_cap_auth *auth, + const struct cred *cred, + char *tpath) +{ + u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); + u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); + struct ceph_client *cl = mdsc->fsc->client; + const char *fs_name = mdsc->fsc->mount_options->mds_namespace; + const char *spath = mdsc->fsc->mount_options->server_path; + bool gid_matched = false; + u32 gid, tlen, len; + int i, j; + + doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n", + fs_name, auth->match.fs_name ? auth->match.fs_name : ""); + if (auth->match.fs_name && strcmp(auth->match.fs_name, fs_name)) { + /* fsname mismatch, try next one */ + return 0; + } + + doutc(cl, "match.uid %lld\n", auth->match.uid); + if (auth->match.uid != MDS_AUTH_UID_ANY) { + if (auth->match.uid != caller_uid) + return 0; + if (auth->match.num_gids) { + for (i = 0; i < auth->match.num_gids; i++) { + if (caller_gid == auth->match.gids[i]) + gid_matched = true; + } + if (!gid_matched && cred->group_info->ngroups) { + for (i = 0; i < cred->group_info->ngroups; i++) { + gid = from_kgid(&init_user_ns, + cred->group_info->gid[i]); + for (j = 0; j < auth->match.num_gids; j++) { + if (gid == auth->match.gids[j]) { + gid_matched = true; + break; + } + } + if (gid_matched) + break; + } + } + if (!gid_matched) + return 0; + } + } + + /* path match */ + if (auth->match.path) { + if (!tpath) + return 0; + + tlen = strlen(tpath); + len = strlen(auth->match.path); + if (len) { + char *_tpath = tpath; + bool free_tpath = false; + int m, n; + + doutc(cl, "server path %s, tpath %s, match.path %s\n", + spath, tpath, auth->match.path); + if (spath && (m = strlen(spath)) != 1) { + /* mount path + '/' + tpath + an extra space */ + n = m + 1 + tlen + 1; + _tpath = kmalloc(n, GFP_NOFS); + if (!_tpath) + return -ENOMEM; + /* remove the leading '/' */ + snprintf(_tpath, n, "%s/%s", spath + 1, tpath); + free_tpath = true; + tlen = strlen(_tpath); + } + + /* + * Please note the tailing '/' for match.path has already + * been removed when parsing. + * + * Remove the tailing '/' for the target path. + */ + while (tlen && _tpath[tlen - 1] == '/') { + _tpath[tlen - 1] = '\0'; + tlen -= 1; + } + doutc(cl, "_tpath %s\n", _tpath); + + /* + * In case first == _tpath && tlen == len: + * match.path=/foo --> /foo _path=/foo --> match + * match.path=/foo/ --> /foo _path=/foo --> match + * + * In case first == _tmatch.path && tlen > len: + * match.path=/foo/ --> /foo _path=/foo/ --> match + * match.path=/foo --> /foo _path=/foo/ --> match + * match.path=/foo/ --> /foo _path=/foo/d --> match + * match.path=/foo --> /foo _path=/food --> mismatch + * + * All the other cases --> mismatch + */ + bool path_matched = true; + char *first = strstr(_tpath, auth->match.path); + if (first != _tpath || + (tlen > len && _tpath[len] != '/')) { + path_matched = false; + } + + if (free_tpath) + kfree(_tpath); + + if (!path_matched) + return 0; + } + } + + doutc(cl, "matched\n"); + return 1; +} + +int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask) +{ + const struct cred *cred = get_current_cred(); + u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid); + u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid); + struct ceph_mds_cap_auth *rw_perms_s = NULL; + struct ceph_client *cl = mdsc->fsc->client; + bool root_squash_perms = true; + int i, err; + + doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n", + tpath, mask, caller_uid, caller_gid); + + for (i = 0; i < mdsc->s_cap_auths_num; i++) { + struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i]; + + err = ceph_mds_auth_match(mdsc, s, cred, tpath); + if (err < 0) { + put_cred(cred); + return err; + } else if (err > 0) { + /* always follow the last auth caps' permission */ + root_squash_perms = true; + rw_perms_s = NULL; + if ((mask & MAY_WRITE) && s->writeable && + s->match.root_squash && (!caller_uid || !caller_gid)) + root_squash_perms = false; + + if (((mask & MAY_WRITE) && !s->writeable) || + ((mask & MAY_READ) && !s->readable)) + rw_perms_s = s; + } + } + + put_cred(cred); + + doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms, + rw_perms_s); + if (root_squash_perms && rw_perms_s == NULL) { + doutc(cl, "access allowed\n"); + return 0; + } + + if (!root_squash_perms) { + doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write", + caller_uid, caller_gid); + } + if (rw_perms_s) { + doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d", + rw_perms_s->readable, rw_perms_s->writeable, + !!(mask & MAY_READ), !!(mask & MAY_WRITE)); + } + doutc(cl, "access denied\n"); + return -EACCES; +} + /* * called before mount is ro, and before dentries are torn down. * (hmm, does this still race with new lookups?) */ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) { - dout("pre_umount\n"); - mdsc->stopping = 1; + doutc(mdsc->fsc->client, "begin\n"); + mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); @@ -4901,6 +5843,7 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) ceph_msgr_flush(); ceph_cleanup_quotarealms_inodes(mdsc); + doutc(mdsc->fsc->client, "done\n"); } /* @@ -4909,12 +5852,13 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) { + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req = NULL, *nextreq; struct ceph_mds_session *last_session = NULL; struct rb_node *n; mutex_lock(&mdsc->mutex); - dout("%s want %lld\n", __func__, want_tid); + doutc(cl, "want %lld\n", want_tid); restart: req = __get_oldest_req(mdsc); while (req && req->r_tid <= want_tid) { @@ -4948,8 +5892,8 @@ restart: } else { ceph_put_mds_session(s); } - dout("%s wait on %llu (want %llu)\n", __func__, - req->r_tid, want_tid); + doutc(cl, "wait on %llu (want %llu)\n", + req->r_tid, want_tid); wait_for_completion(&req->r_safe_completion); mutex_lock(&mdsc->mutex); @@ -4967,22 +5911,24 @@ restart: } mutex_unlock(&mdsc->mutex); ceph_put_mds_session(last_session); - dout("%s done\n", __func__); + doutc(cl, "done\n"); } void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { + struct ceph_client *cl = mdsc->fsc->client; u64 want_tid, want_flush; if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) return; - dout("sync\n"); + doutc(cl, "sync\n"); mutex_lock(&mdsc->mutex); want_tid = mdsc->last_tid; mutex_unlock(&mdsc->mutex); ceph_flush_dirty_caps(mdsc); + ceph_flush_cap_releases(mdsc); spin_lock(&mdsc->cap_dirty_lock); want_flush = mdsc->last_cap_flush_tid; if (!list_empty(&mdsc->cap_flush_list)) { @@ -4993,8 +5939,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) } spin_unlock(&mdsc->cap_dirty_lock); - dout("sync want tid %lld flush_seq %lld\n", - want_tid, want_flush); + doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush); flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); wait_caps_flush(mdsc, want_flush); @@ -5011,16 +5956,17 @@ static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) } /* - * called after sb is ro. + * called after sb is ro or when metadata corrupted. */ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) { struct ceph_options *opts = mdsc->fsc->client->options; + struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_session *session; int i; int skipped = 0; - dout("close_sessions\n"); + doutc(cl, "begin\n"); /* close sessions */ mutex_lock(&mdsc->mutex); @@ -5038,7 +5984,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) } mutex_unlock(&mdsc->mutex); - dout("waiting for sessions to close\n"); + doutc(cl, "waiting for sessions to close\n"); wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc, skipped), ceph_timeout_jiffies(opts->mount_timeout)); @@ -5064,9 +6010,10 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) ceph_cleanup_global_and_empty_realms(mdsc); cancel_work_sync(&mdsc->cap_reclaim_work); + cancel_work_sync(&mdsc->cap_unlink_work); cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ - dout("stopped\n"); + doutc(cl, "done\n"); } void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) @@ -5074,7 +6021,7 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) struct ceph_mds_session *session; int mds; - dout("force umount\n"); + doutc(mdsc->fsc->client, "force umount\n"); mutex_lock(&mdsc->mutex); for (mds = 0; mds < mdsc->max_sessions; mds++) { @@ -5105,7 +6052,7 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) { - dout("stop\n"); + doutc(mdsc->fsc->client, "stop\n"); /* * Make sure the delayed work stopped before releasing * the resources. @@ -5120,13 +6067,25 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); ceph_caps_finalize(mdsc); + + if (mdsc->s_cap_auths) { + int i; + + for (i = 0; i < mdsc->s_cap_auths_num; i++) { + kfree(mdsc->s_cap_auths[i].match.gids); + kfree(mdsc->s_cap_auths[i].match.path); + kfree(mdsc->s_cap_auths[i].match.fs_name); + } + kfree(mdsc->s_cap_auths); + } + ceph_pool_perm_destroy(mdsc); } void ceph_mdsc_destroy(struct ceph_fs_client *fsc) { struct ceph_mds_client *mdsc = fsc->mdsc; - dout("mdsc_destroy %p\n", mdsc); + doutc(fsc->client, "%p\n", mdsc); if (!mdsc) return; @@ -5140,12 +6099,13 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc) fsc->mdsc = NULL; kfree(mdsc); - dout("mdsc_destroy %p done\n", mdsc); + doutc(fsc->client, "%p done\n", mdsc); } void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { struct ceph_fs_client *fsc = mdsc->fsc; + struct ceph_client *cl = fsc->client; const char *mds_namespace = fsc->mount_options->mds_namespace; void *p = msg->front.iov_base; void *end = p + msg->front.iov_len; @@ -5157,7 +6117,7 @@ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) ceph_decode_need(&p, end, sizeof(u32), bad); epoch = ceph_decode_32(&p); - dout("handle_fsmap epoch %u\n", epoch); + doutc(cl, "epoch %u\n", epoch); /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); @@ -5202,8 +6162,10 @@ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) return; bad: - pr_err("error decoding fsmap %d. Shutting down mount.\n", err); + pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n", + err); ceph_umount_begin(mdsc->fsc->sb); + ceph_msg_dump(msg); err_out: mutex_lock(&mdsc->mutex); mdsc->mdsmap_err = err; @@ -5216,6 +6178,7 @@ err_out: */ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { + struct ceph_client *cl = mdsc->fsc->client; u32 epoch; u32 maplen; void *p = msg->front.iov_base; @@ -5230,18 +6193,17 @@ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) return; epoch = ceph_decode_32(&p); maplen = ceph_decode_32(&p); - dout("handle_map epoch %u len %d\n", epoch, (int)maplen); + doutc(cl, "epoch %u len %d\n", epoch, (int)maplen); /* do we need it? */ mutex_lock(&mdsc->mutex); if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { - dout("handle_map epoch %u <= our %u\n", - epoch, mdsc->mdsmap->m_epoch); + doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch); mutex_unlock(&mdsc->mutex); return; } - newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); + newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client)); if (IS_ERR(newmap)) { err = PTR_ERR(newmap); goto bad_unlock; @@ -5270,8 +6232,10 @@ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) bad_unlock: mutex_unlock(&mdsc->mutex); bad: - pr_err("error decoding mdsmap %d. Shutting down mount.\n", err); + pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n", + err); ceph_umount_begin(mdsc->fsc->sb); + ceph_msg_dump(msg); return; } @@ -5300,14 +6264,18 @@ static void mds_peer_reset(struct ceph_connection *con) struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; - pr_warn("mds%d closed our session\n", s->s_mds); - send_mds_reconnect(mdsc, s); + pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n", + s->s_mds); + if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO && + ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT) + send_mds_reconnect(mdsc, s); } static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) { struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; + struct ceph_client *cl = mdsc->fsc->client; int type = le16_to_cpu(msg->hdr.type); mutex_lock(&mdsc->mutex); @@ -5347,8 +6315,8 @@ static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) break; default: - pr_err("received unknown message type %d %s\n", type, - ceph_msg_type_name(type)); + pr_err_client(cl, "received unknown message type %d %s\n", + type, ceph_msg_type_name(type)); } out: ceph_msg_put(msg); |
