summaryrefslogtreecommitdiff
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c270
1 files changed, 189 insertions, 81 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 84edfc60d87a..5ece2e6ad154 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>
#include <linux/fs.h>
@@ -99,6 +100,26 @@ static int parse_reply_info_in(void **p, void *end,
} else
info->inline_version = CEPH_INLINE_NONE;
+ if (features & CEPH_FEATURE_MDS_QUOTA) {
+ u8 struct_v, struct_compat;
+ u32 struct_len;
+
+ /*
+ * both struct_v and struct_compat are expected to be >= 1
+ */
+ ceph_decode_8_safe(p, end, struct_v, bad);
+ ceph_decode_8_safe(p, end, struct_compat, bad);
+ if (!struct_v || !struct_compat)
+ goto bad;
+ ceph_decode_32_safe(p, end, struct_len, bad);
+ ceph_decode_need(p, end, struct_len, bad);
+ ceph_decode_64_safe(p, end, info->max_bytes, bad);
+ ceph_decode_64_safe(p, end, info->max_files, bad);
+ } else {
+ info->max_bytes = 0;
+ info->max_files = 0;
+ }
+
info->pool_ns_len = 0;
info->pool_ns_data = NULL;
if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
@@ -383,7 +404,7 @@ static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
return s;
} else {
- dout("mdsc get_session %p 0 -- FAIL", s);
+ dout("mdsc get_session %p 0 -- FAIL\n", s);
return NULL;
}
}
@@ -418,9 +439,10 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
{
- if (mds >= mdsc->max_sessions)
+ if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
return false;
- return mdsc->sessions[mds];
+ else
+ return true;
}
static int __verify_registered_session(struct ceph_mds_client *mdsc,
@@ -447,6 +469,25 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s = kzalloc(sizeof(*s), GFP_NOFS);
if (!s)
return ERR_PTR(-ENOMEM);
+
+ if (mds >= mdsc->max_sessions) {
+ int newmax = 1 << get_count_order(mds + 1);
+ struct ceph_mds_session **sa;
+
+ dout("%s: realloc to %d\n", __func__, newmax);
+ sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
+ if (!sa)
+ goto fail_realloc;
+ if (mdsc->sessions) {
+ memcpy(sa, mdsc->sessions,
+ mdsc->max_sessions * sizeof(void *));
+ kfree(mdsc->sessions);
+ }
+ mdsc->sessions = sa;
+ mdsc->max_sessions = newmax;
+ }
+
+ dout("%s: mds%d\n", __func__, mds);
s->s_mdsc = mdsc;
s->s_mds = mds;
s->s_state = CEPH_MDS_SESSION_NEW;
@@ -475,23 +516,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_flushing);
- dout("register_session mds%d\n", mds);
- if (mds >= mdsc->max_sessions) {
- int newmax = 1 << get_count_order(mds+1);
- struct ceph_mds_session **sa;
-
- dout("register_session realloc to %d\n", newmax);
- sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
- if (!sa)
- goto fail_realloc;
- if (mdsc->sessions) {
- memcpy(sa, mdsc->sessions,
- mdsc->max_sessions * sizeof(void *));
- kfree(mdsc->sessions);
- }
- mdsc->sessions = sa;
- mdsc->max_sessions = newmax;
- }
mdsc->sessions[mds] = s;
atomic_inc(&mdsc->num_sessions);
refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
@@ -603,10 +627,20 @@ static void __register_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req,
struct inode *dir)
{
+ int ret = 0;
+
req->r_tid = ++mdsc->last_tid;
- if (req->r_num_caps)
- ceph_reserve_caps(mdsc, &req->r_caps_reservation,
- req->r_num_caps);
+ if (req->r_num_caps) {
+ ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+ req->r_num_caps);
+ if (ret < 0) {
+ pr_err("__register_request %p "
+ "failed to reserve caps: %d\n", req, ret);
+ /* set req->r_err to fail early from __do_request */
+ req->r_err = ret;
+ return;
+ }
+ }
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
insert_request(&mdsc->request_tree, req);
@@ -734,12 +768,13 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
inode = req->r_inode;
ihold(inode);
} else {
- /* req->r_dentry is non-null for LSSNAP request.
- * fall-thru */
- WARN_ON_ONCE(!req->r_dentry);
+ /* req->r_dentry is non-null for LSSNAP request */
+ rcu_read_lock();
+ inode = get_nonsnap_parent(req->r_dentry);
+ rcu_read_unlock();
+ dout("__choose_mds using snapdir's parent %p\n", inode);
}
- }
- if (!inode && req->r_dentry) {
+ } else if (req->r_dentry) {
/* ignore race with rename; old or new d_parent is okay */
struct dentry *parent;
struct inode *dir;
@@ -1037,22 +1072,23 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
* session caps
*/
-/* caller holds s_cap_lock, we drop it */
-static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
- __releases(session->s_cap_lock)
+static void detach_cap_releases(struct ceph_mds_session *session,
+ struct list_head *target)
{
- LIST_HEAD(tmp_list);
- list_splice_init(&session->s_cap_releases, &tmp_list);
+ lockdep_assert_held(&session->s_cap_lock);
+
+ list_splice_init(&session->s_cap_releases, target);
session->s_num_cap_releases = 0;
- spin_unlock(&session->s_cap_lock);
+ dout("dispose_cap_releases mds%d\n", session->s_mds);
+}
- dout("cleanup_cap_releases mds%d\n", session->s_mds);
- while (!list_empty(&tmp_list)) {
+static void dispose_cap_releases(struct ceph_mds_client *mdsc,
+ struct list_head *dispose)
+{
+ while (!list_empty(dispose)) {
struct ceph_cap *cap;
/* zero out the in-progress message */
- cap = list_first_entry(&tmp_list,
- struct ceph_cap, session_caps);
+ cap = list_first_entry(dispose, struct ceph_cap, session_caps);
list_del(&cap->session_caps);
ceph_put_cap(mdsc, cap);
}
@@ -1213,6 +1249,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
}
spin_unlock(&mdsc->cap_dirty_lock);
+ if (atomic_read(&ci->i_filelock_ref) > 0) {
+ /* make further file lock syscall return -EIO */
+ ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
+ pr_warn_ratelimited(" dropping file locks for %p %lld\n",
+ inode, ceph_ino(inode));
+ }
+
if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
ci->i_prealloc_cap_flush = NULL;
@@ -1242,6 +1285,8 @@ static void remove_session_caps(struct ceph_mds_session *session)
{
struct ceph_fs_client *fsc = session->s_mdsc->fsc;
struct super_block *sb = fsc->sb;
+ LIST_HEAD(dispose);
+
dout("remove_session_caps on %p\n", session);
iterate_session_caps(session, remove_session_caps_cb, fsc);
@@ -1276,10 +1321,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
}
// drop cap expires and unlock s_cap_lock
- cleanup_cap_releases(session->s_mdsc, session);
+ detach_cap_releases(session, &dispose);
BUG_ON(session->s_nr_caps > 0);
BUG_ON(!list_empty(&session->s_cap_flushing));
+ spin_unlock(&session->s_cap_lock);
+ dispose_cap_releases(session->s_mdsc, &dispose);
}
/*
@@ -1426,6 +1473,29 @@ static int __close_session(struct ceph_mds_client *mdsc,
return request_close_session(mdsc, session);
}
+static bool drop_negative_children(struct dentry *dentry)
+{
+ struct dentry *child;
+ bool all_negative = true;
+
+ if (!d_is_dir(dentry))
+ goto out;
+
+ spin_lock(&dentry->d_lock);
+ list_for_each_entry(child, &dentry->d_subdirs, d_child) {
+ if (d_really_is_positive(child)) {
+ all_negative = false;
+ break;
+ }
+ }
+ spin_unlock(&dentry->d_lock);
+
+ if (all_negative)
+ shrink_dcache_parent(dentry);
+out:
+ return all_negative;
+}
+
/*
* Trim old(er) caps.
*
@@ -1460,6 +1530,11 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
goto out;
if ((used | wanted) & CEPH_CAP_ANY_WR)
goto out;
+ /* Note: it's possible that i_filelock_ref becomes non-zero
+ * after dropping auth caps. It doesn't hurt because reply
+ * of lock mds request will re-add auth caps. */
+ if (atomic_read(&ci->i_filelock_ref) > 0)
+ goto out;
}
/* The inode has cached pages, but it's no longer used.
* we can safely drop it */
@@ -1471,16 +1546,27 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
if ((used | wanted) & ~oissued & mine)
goto out; /* we need these caps */
- session->s_trim_caps--;
if (oissued) {
/* we aren't the only cap.. just remove us */
__ceph_remove_cap(cap, true);
+ session->s_trim_caps--;
} else {
+ struct dentry *dentry;
/* try dropping referring dentries */
spin_unlock(&ci->i_ceph_lock);
- d_prune_aliases(inode);
- dout("trim_caps_cb %p cap %p pruned, count now %d\n",
- inode, cap, atomic_read(&inode->i_count));
+ dentry = d_find_any_alias(inode);
+ if (dentry && drop_negative_children(dentry)) {
+ int count;
+ dput(dentry);
+ d_prune_aliases(inode);
+ count = atomic_read(&inode->i_count);
+ if (count == 1)
+ session->s_trim_caps--;
+ dout("trim_caps_cb %p cap %p pruned, count now %d\n",
+ inode, cap, count);
+ } else {
+ dput(dentry);
+ }
return 0;
}
@@ -1492,9 +1578,9 @@ out:
/*
* Trim session cap count down to some max number.
*/
-static int trim_caps(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- int max_caps)
+int ceph_trim_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ int max_caps)
{
int trim_caps = session->s_nr_caps - max_caps;
@@ -2385,11 +2471,14 @@ out:
*/
void ceph_invalidate_dir_request(struct ceph_mds_request *req)
{
- struct inode *inode = req->r_parent;
+ struct inode *dir = req->r_parent;
+ struct inode *old_dir = req->r_old_dentry_dir;
- dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
+ dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
- ceph_dir_clear_complete(inode);
+ ceph_dir_clear_complete(dir);
+ if (old_dir)
+ ceph_dir_clear_complete(old_dir);
if (req->r_dentry)
ceph_invalidate_dentry_lease(req->r_dentry);
if (req->r_old_dentry)
@@ -2465,10 +2554,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
* Otherwise we just have to return an ESTALE
*/
if (result == -ESTALE) {
- dout("got ESTALE on request %llu", req->r_tid);
+ dout("got ESTALE on request %llu\n", req->r_tid);
req->r_resend_mds = -1;
if (req->r_direct_mode != USE_AUTH_MDS) {
- dout("not using auth, setting for that now");
+ dout("not using auth, setting for that now\n");
req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
@@ -2476,13 +2565,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
} else {
int mds = __choose_mds(mdsc, req);
if (mds >= 0 && mds != req->r_session->s_mds) {
- dout("but auth changed, so resending");
+ dout("but auth changed, so resending\n");
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
}
}
- dout("have to return ESTALE on request %llu", req->r_tid);
+ dout("have to return ESTALE on request %llu\n", req->r_tid);
}
@@ -2720,7 +2809,7 @@ static void handle_session(struct ceph_mds_session *session,
break;
case CEPH_SESSION_RECALL_STATE:
- trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
+ ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
break;
case CEPH_SESSION_FLUSHMSG:
@@ -2825,7 +2914,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_mds_cap_reconnect v2;
struct ceph_mds_cap_reconnect_v1 v1;
} rec;
- struct ceph_inode_info *ci;
+ struct ceph_inode_info *ci = cap->ci;
struct ceph_reconnect_state *recon_state = arg;
struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path;
@@ -2834,8 +2923,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
u64 snap_follows;
struct dentry *dentry;
- ci = cap->ci;
-
dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
inode, ceph_vinop(inode), cap, cap->cap_id,
ceph_cap_string(cap->issued));
@@ -2868,7 +2955,8 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
rec.v2.issued = cpu_to_le32(cap->issued);
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v2.pathbase = cpu_to_le64(pathbase);
- rec.v2.flock_len = 0;
+ rec.v2.flock_len = (__force __le32)
+ ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
} else {
rec.v1.cap_id = cpu_to_le64(cap->cap_id);
rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -2892,26 +2980,37 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (recon_state->msg_version >= 2) {
int num_fcntl_locks, num_flock_locks;
- struct ceph_filelock *flocks;
+ struct ceph_filelock *flocks = NULL;
size_t struct_len, total_len = 0;
u8 struct_v = 0;
encode_again:
- ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
- flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
- sizeof(struct ceph_filelock), GFP_NOFS);
- if (!flocks) {
- err = -ENOMEM;
- goto out_free;
+ if (rec.v2.flock_len) {
+ ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+ } else {
+ num_fcntl_locks = 0;
+ num_flock_locks = 0;
}
- err = ceph_encode_locks_to_buffer(inode, flocks,
- num_fcntl_locks,
- num_flock_locks);
- if (err) {
+ if (num_fcntl_locks + num_flock_locks > 0) {
+ flocks = kmalloc((num_fcntl_locks + num_flock_locks) *
+ sizeof(struct ceph_filelock), GFP_NOFS);
+ if (!flocks) {
+ err = -ENOMEM;
+ goto out_free;
+ }
+ err = ceph_encode_locks_to_buffer(inode, flocks,
+ num_fcntl_locks,
+ num_flock_locks);
+ if (err) {
+ kfree(flocks);
+ flocks = NULL;
+ if (err == -ENOSPC)
+ goto encode_again;
+ goto out_free;
+ }
+ } else {
kfree(flocks);
- if (err == -ENOSPC)
- goto encode_again;
- goto out_free;
+ flocks = NULL;
}
if (recon_state->msg_version >= 3) {
@@ -2991,6 +3090,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
int s_nr_caps;
struct ceph_pagelist *pagelist;
struct ceph_reconnect_state recon_state;
+ LIST_HEAD(dispose);
pr_info("mds%d reconnect start\n", mds);
@@ -3024,7 +3124,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
*/
session->s_cap_reconnect = 1;
/* drop old cap expires; we're about to reestablish that state */
- cleanup_cap_releases(mdsc, session);
+ detach_cap_releases(session, &dispose);
+ spin_unlock(&session->s_cap_lock);
+ dispose_cap_releases(mdsc, &dispose);
/* trim unused caps to reduce MDS's cache rejoin time */
if (mdsc->fsc->sb->s_root)
@@ -3391,13 +3493,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
}
/*
- * drop all leases (and dentry refs) in preparation for umount
+ * lock unlock sessions, to wait ongoing session activities
*/
-static void drop_leases(struct ceph_mds_client *mdsc)
+static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
{
int i;
- dout("drop_leases\n");
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
@@ -3493,7 +3594,6 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
if (!mdsc)
return -ENOMEM;
mdsc->fsc = fsc;
- fsc->mdsc = mdsc;
mutex_init(&mdsc->mutex);
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
if (!mdsc->mdsmap) {
@@ -3501,6 +3601,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
return -ENOMEM;
}
+ fsc->mdsc = mdsc;
init_completion(&mdsc->safe_umount_waiters);
init_waitqueue_head(&mdsc->session_close_wq);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -3508,6 +3609,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
atomic_set(&mdsc->num_sessions, 0);
mdsc->max_sessions = 0;
mdsc->stopping = 0;
+ atomic64_set(&mdsc->quotarealms_count, 0);
mdsc->last_snap_seq = 0;
init_rwsem(&mdsc->snap_rwsem);
mdsc->snap_realms = RB_ROOT;
@@ -3581,7 +3683,7 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
dout("pre_umount\n");
mdsc->stopping = 1;
- drop_leases(mdsc);
+ lock_unlock_sessions(mdsc);
ceph_flush_dirty_caps(mdsc);
wait_requests(mdsc);
@@ -3779,6 +3881,9 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
struct ceph_mds_client *mdsc = fsc->mdsc;
dout("mdsc_destroy %p\n", mdsc);
+ if (!mdsc)
+ return;
+
/* flush out any connection work with references to us */
ceph_msgr_flush();
@@ -3855,14 +3960,14 @@ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
goto err_out;
}
return;
+
bad:
pr_err("error decoding fsmap\n");
err_out:
mutex_lock(&mdsc->mutex);
- mdsc->mdsmap_err = -ENOENT;
+ mdsc->mdsmap_err = err;
__wake_requests(mdsc, &mdsc->waiting_for_map);
mutex_unlock(&mdsc->mutex);
- return;
}
/*
@@ -3998,6 +4103,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
case CEPH_MSG_CLIENT_LEASE:
handle_lease(mdsc, s, msg);
break;
+ case CEPH_MSG_CLIENT_QUOTA:
+ ceph_handle_quota(mdsc, s, msg);
+ break;
default:
pr_err("received unknown message type %d %s\n", type,