summaryrefslogtreecommitdiff
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 14:14:42 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 14:14:42 -0800
commit4533f6e27a366ecc3da4876074ebfe0cc0ea4f0f (patch)
tree8b6f1aeeda991e6a1ce98702d7cc35d2d2a444b1 /fs/ceph
parent89d3fa45b4add00cd0056361a2498e978cb1e119 (diff)
parent0f5417cea6cfeafd5cdec4223df63ca79918fdea (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil: "On the RBD side, there is a conversion to blk-mq from Christoph, several long-standing bug fixes from Ilya, and some cleanup from Rickard Strandqvist. On the CephFS side there is a long list of fixes from Zheng, including improved session handling, a few IO path fixes, some dcache management correctness fixes, and several blocking while !TASK_RUNNING fixes. The core code gets a few cleanups and Chaitanya has added support for TCP_NODELAY (which has been used on the server side for ages but we somehow missed on the kernel client). There is also an update to MAINTAINERS to fix up some email addresses and reflect that Ilya and Zheng are doing most of the maintenance for RBD and CephFS these days. Do not be surprised to see a pull request come from one of them in the future if I am unavailable for some reason" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits) MAINTAINERS: update Ceph and RBD maintainers libceph: kfree() in put_osd() shouldn't depend on authorizer libceph: fix double __remove_osd() problem rbd: convert to blk-mq ceph: return error for traceless reply race ceph: fix dentry leaks ceph: re-send requests when MDS enters reconnecting stage ceph: show nocephx_require_signatures and notcp_nodelay options libceph: tcp_nodelay support rbd: do not treat standalone as flatten ceph: fix atomic_open snapdir ceph: properly mark empty directory as complete client: include kernel version in client metadata ceph: provide seperate {inode,file}_operations for snapdir ceph: fix request time stamp encoding ceph: fix reading inline data when i_size > PAGE_SIZE ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_close_sessions) ceph: avoid block operation when !TASK_RUNNING (ceph_get_caps) ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync) rbd: fix error paths in rbd_dev_refresh() ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/acl.c14
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/caps.c127
-rw-r--r--fs/ceph/dir.c33
-rw-r--r--fs/ceph/file.c37
-rw-r--r--fs/ceph/inode.c41
-rw-r--r--fs/ceph/mds_client.c127
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/snap.c54
-rw-r--r--fs/ceph/super.c4
-rw-r--r--fs/ceph/super.h5
11 files changed, 296 insertions, 167 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 5bd853ba44ff..64fa248343f6 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode,
spin_unlock(&ci->i_ceph_lock);
}
-static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
- int type)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct posix_acl *acl = ACL_NOT_CACHED;
-
- spin_lock(&ci->i_ceph_lock);
- if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
- acl = get_cached_acl(inode, type);
- spin_unlock(&ci->i_ceph_lock);
-
- return acl;
-}
-
struct posix_acl *ceph_get_acl(struct inode *inode, int type)
{
int size;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 24be059fd1f8..fd5599d32362 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page)
u64 len = PAGE_CACHE_SIZE;
if (off >= i_size_read(inode)) {
- zero_user_segment(page, err, PAGE_CACHE_SIZE);
+ zero_user_segment(page, 0, PAGE_CACHE_SIZE);
SetPageUptodate(page);
return 0;
}
- /*
- * Uptodate inline data should have been added into page cache
- * while getting Fcr caps.
- */
- if (ci->i_inline_version != CEPH_INLINE_NONE)
- return -EINVAL;
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ /*
+ * Uptodate inline data should have been added
+ * into page cache while getting Fcr caps.
+ */
+ if (off == 0)
+ return -EINVAL;
+ zero_user_segment(page, 0, PAGE_CACHE_SIZE);
+ SetPageUptodate(page);
+ return 0;
+ }
err = ceph_readpage_from_fscache(inode, page);
if (err == 0)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b93c631c6c87..8172775428a0 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode,
struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
realmino);
if (realm) {
- ceph_get_snap_realm(mdsc, realm);
spin_lock(&realm->inodes_with_caps_lock);
ci->i_snap_realm = realm;
list_add(&ci->i_snap_realm_item,
@@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode,
spin_lock(&mdsc->cap_dirty_lock);
list_del_init(&ci->i_dirty_item);
- ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
if (list_empty(&ci->i_flushing_item)) {
+ ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
mdsc->num_cap_flushing++;
dout(" inode %p now flushing seq %lld\n", inode,
@@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
* requested from the MDS.
*/
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
- loff_t endoff, int *got, struct page **pinned_page,
- int *check_max, int *err)
+ loff_t endoff, int *got, int *check_max, int *err)
{
struct inode *inode = &ci->vfs_inode;
int ret = 0;
- int have, implemented, _got = 0;
+ int have, implemented;
int file_wanted;
dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want));
-again:
+
spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */
@@ -2138,50 +2136,34 @@ again:
inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking));
if ((revoking & not) == 0) {
- _got = need | (have & want);
- __take_cap_refs(ci, _got);
+ *got = need | (have & want);
+ __take_cap_refs(ci, *got);
ret = 1;
}
} else {
+ int session_readonly = false;
+ if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
+ struct ceph_mds_session *s = ci->i_auth_cap->session;
+ spin_lock(&s->s_cap_lock);
+ session_readonly = s->s_readonly;
+ spin_unlock(&s->s_cap_lock);
+ }
+ if (session_readonly) {
+ dout("get_cap_refs %p needed %s but mds%d readonly\n",
+ inode, ceph_cap_string(need), ci->i_auth_cap->mds);
+ *err = -EROFS;
+ ret = 1;
+ goto out_unlock;
+ }
+
dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need));
}
out_unlock:
spin_unlock(&ci->i_ceph_lock);
- if (ci->i_inline_version != CEPH_INLINE_NONE &&
- (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
- i_size_read(inode) > 0) {
- int ret1;
- struct page *page = find_get_page(inode->i_mapping, 0);
- if (page) {
- if (PageUptodate(page)) {
- *pinned_page = page;
- goto out;
- }
- page_cache_release(page);
- }
- /*
- * drop cap refs first because getattr while holding
- * caps refs can cause deadlock.
- */
- ceph_put_cap_refs(ci, _got);
- _got = 0;
-
- /* getattr request will bring inline data into page cache */
- ret1 = __ceph_do_getattr(inode, NULL,
- CEPH_STAT_CAP_INLINE_DATA, true);
- if (ret1 >= 0) {
- ret = 0;
- goto again;
- }
- *err = ret1;
- ret = 1;
- }
-out:
dout("get_cap_refs %p ret %d got %s\n", inode,
- ret, ceph_cap_string(_got));
- *got = _got;
+ ret, ceph_cap_string(*got));
return ret;
}
@@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff)
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page)
{
- int check_max, ret, err;
+ int _got, check_max, ret, err = 0;
retry:
if (endoff > 0)
check_max_size(&ci->vfs_inode, endoff);
+ _got = 0;
check_max = 0;
- err = 0;
ret = wait_event_interruptible(ci->i_cap_wq,
- try_get_cap_refs(ci, need, want, endoff,
- got, pinned_page,
- &check_max, &err));
+ try_get_cap_refs(ci, need, want, endoff,
+ &_got, &check_max, &err));
if (err)
ret = err;
+ if (ret < 0)
+ return ret;
+
if (check_max)
goto retry;
- return ret;
+
+ if (ci->i_inline_version != CEPH_INLINE_NONE &&
+ (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+ i_size_read(&ci->vfs_inode) > 0) {
+ struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
+ if (page) {
+ if (PageUptodate(page)) {
+ *pinned_page = page;
+ goto out;
+ }
+ page_cache_release(page);
+ }
+ /*
+ * drop cap refs first because getattr while holding
+ * caps refs can cause deadlock.
+ */
+ ceph_put_cap_refs(ci, _got);
+ _got = 0;
+
+ /* getattr request will bring inline data into page cache */
+ ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+ CEPH_STAT_CAP_INLINE_DATA, true);
+ if (ret < 0)
+ return ret;
+ goto retry;
+ }
+out:
+ *got = _got;
+ return 0;
}
/*
@@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode)
*/
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
- void *snaptrace, int snaptrace_len,
u64 inline_version,
void *inline_data, int inline_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session,
struct ceph_cap *cap, int issued)
__releases(ci->i_ceph_lock)
+ __releases(mdsc->snap_rwsem)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
spin_unlock(&ci->i_ceph_lock);
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
- down_write(&mdsc->snap_rwsem);
- ceph_update_snap_trace(mdsc, snaptrace,
- snaptrace + snaptrace_len, false);
- downgrade_write(&mdsc->snap_rwsem);
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
if (newcaps & ~issued)
@@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_cap *cap;
struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL;
+ struct ceph_snap_realm *realm;
int mds = session->s_mds;
int op, issued;
u32 seq, mseq;
@@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
goto done_unlocked;
case CEPH_CAP_OP_IMPORT:
+ realm = NULL;
+ if (snaptrace_len) {
+ down_write(&mdsc->snap_rwsem);
+ ceph_update_snap_trace(mdsc, snaptrace,
+ snaptrace + snaptrace_len,
+ false, &realm);
+ downgrade_write(&mdsc->snap_rwsem);
+ } else {
+ down_read(&mdsc->snap_rwsem);
+ }
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
- handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
+ handle_cap_grant(mdsc, inode, h,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
+ if (realm)
+ ceph_put_snap_realm(mdsc, realm);
goto done_unlocked;
}
@@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
- handle_cap_grant(mdsc, inode, h, NULL, 0,
+ handle_cap_grant(mdsc, inode, h,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
goto done_unlocked;
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index c241603764fd..0411dbb15815 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -26,8 +26,6 @@
* point by name.
*/
-const struct inode_operations ceph_dir_iops;
-const struct file_operations ceph_dir_fops;
const struct dentry_operations ceph_dentry_ops;
/*
@@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
/*
* We created the item, then did a lookup, and found
* it was already linked to another inode we already
- * had in our cache (and thus got spliced). Link our
- * dentry to that inode, but don't hash it, just in
- * case the VFS wants to dereference it.
+ * had in our cache (and thus got spliced). To not
+ * confuse VFS (especially when inode is a directory),
+ * we don't link our dentry to that inode, return an
+ * error instead.
+ *
+ * This event should be rare and it happens only when
+ * we talk to old MDS. Recent MDS does not send traceless
+ * reply for request that creates new inode.
*/
- BUG_ON(!result->d_inode);
- d_instantiate(dentry, result->d_inode);
- return 0;
+ d_drop(result);
+ return -ESTALE;
}
return PTR_ERR(result);
}
@@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = {
.fsync = ceph_dir_fsync,
};
+const struct file_operations ceph_snapdir_fops = {
+ .iterate = ceph_readdir,
+ .llseek = ceph_dir_llseek,
+ .open = ceph_open,
+ .release = ceph_release,
+};
+
const struct inode_operations ceph_dir_iops = {
.lookup = ceph_lookup,
.permission = ceph_permission,
@@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = {
.atomic_open = ceph_atomic_open,
};
+const struct inode_operations ceph_snapdir_iops = {
+ .lookup = ceph_lookup,
+ .permission = ceph_permission,
+ .getattr = ceph_getattr,
+ .mkdir = ceph_mkdir,
+ .rmdir = ceph_unlink,
+};
+
const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate,
.d_release = ceph_d_release,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 905986dd4c3c..a3d774b35149 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
err = ceph_mdsc_do_request(mdsc,
(flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
req);
+ err = ceph_handle_snapdir(req, dentry, err);
if (err)
goto out_req;
- err = ceph_handle_snapdir(req, dentry, err);
if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
@@ -392,13 +392,14 @@ more:
if (ret >= 0) {
int didpages;
if (was_short && (pos + ret < inode->i_size)) {
- u64 tmp = min(this_len - ret,
- inode->i_size - pos - ret);
+ int zlen = min(this_len - ret,
+ inode->i_size - pos - ret);
+ int zoff = (o_direct ? buf_align : io_align) +
+ read + ret;
dout(" zero gap %llu to %llu\n",
- pos + ret, pos + ret + tmp);
- ceph_zero_page_vector_range(page_align + read + ret,
- tmp, pages);
- ret += tmp;
+ pos + ret, pos + ret + zlen);
+ ceph_zero_page_vector_range(zoff, zlen, pages);
+ ret += zlen;
}
didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
@@ -878,28 +879,34 @@ again:
i_size = i_size_read(inode);
if (retry_op == READ_INLINE) {
- /* does not support inline data > PAGE_SIZE */
- if (i_size > PAGE_CACHE_SIZE) {
- ret = -EIO;
- } else if (iocb->ki_pos < i_size) {
+ BUG_ON(ret > 0 || read > 0);
+ if (iocb->ki_pos < i_size &&
+ iocb->ki_pos < PAGE_CACHE_SIZE) {
loff_t end = min_t(loff_t, i_size,
iocb->ki_pos + len);
+ end = min_t(loff_t, end, PAGE_CACHE_SIZE);
if (statret < end)
zero_user_segment(page, statret, end);
ret = copy_page_to_iter(page,
iocb->ki_pos & ~PAGE_MASK,
end - iocb->ki_pos, to);
iocb->ki_pos += ret;
- } else {
- ret = 0;
+ read += ret;
+ }
+ if (iocb->ki_pos < i_size && read < len) {
+ size_t zlen = min_t(size_t, len - read,
+ i_size - iocb->ki_pos);
+ ret = iov_iter_zero(zlen, to);
+ iocb->ki_pos += ret;
+ read += ret;
}
__free_pages(page, 0);
- return ret;
+ return read;
}
/* hit EOF or hole? */
if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
- ret < len) {
+ ret < len) {
dout("sync_read hit hole, ppos %lld < size %lld"
", reading more\n", iocb->ki_pos,
inode->i_size);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 6b5173605154..119c43c80638 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent)
inode->i_mode = parent->i_mode;
inode->i_uid = parent->i_uid;
inode->i_gid = parent->i_gid;
- inode->i_op = &ceph_dir_iops;
- inode->i_fop = &ceph_dir_fops;
+ inode->i_op = &ceph_snapdir_iops;
+ inode->i_fop = &ceph_snapdir_fops;
ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
ci->i_rbytes = 0;
return inode;
@@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
ceph_vinop(inode), inode->i_mode);
}
- /* set dir completion flag? */
- if (S_ISDIR(inode->i_mode) &&
- ci->i_files == 0 && ci->i_subdirs == 0 &&
- ceph_snap(inode) == CEPH_NOSNAP &&
- (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
- (issued & CEPH_CAP_FILE_EXCL) == 0 &&
- !__ceph_dir_is_complete(ci)) {
- dout(" marking %p complete (empty)\n", inode);
- __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
- ci->i_ordered_count);
- }
-
/* were we issued a capability? */
if (info->cap.caps) {
if (ceph_snap(inode) == CEPH_NOSNAP) {
+ unsigned caps = le32_to_cpu(info->cap.caps);
ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id),
- cap_fmode,
- le32_to_cpu(info->cap.caps),
+ cap_fmode, caps,
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq),
le64_to_cpu(info->cap.realm),
info->cap.flags, &new_cap);
+
+ /* set dir completion flag? */
+ if (S_ISDIR(inode->i_mode) &&
+ ci->i_files == 0 && ci->i_subdirs == 0 &&
+ (caps & CEPH_CAP_FILE_SHARED) &&
+ (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+ !__ceph_dir_is_complete(ci)) {
+ dout(" marking %p complete (empty)\n", inode);
+ __ceph_dir_set_complete(ci,
+ atomic_read(&ci->i_release_count),
+ ci->i_ordered_count);
+ }
+
wake = true;
} else {
dout(" %p got snap_caps %s\n", inode,
@@ -1446,12 +1447,14 @@ retry_lookup:
}
if (!dn->d_inode) {
- dn = splice_dentry(dn, in, NULL);
- if (IS_ERR(dn)) {
- err = PTR_ERR(dn);
+ struct dentry *realdn = splice_dentry(dn, in, NULL);
+ if (IS_ERR(realdn)) {
+ err = PTR_ERR(realdn);
+ d_drop(dn);
dn = NULL;
goto next_item;
}
+ dn = realdn;
}
di = dn->d_fsdata;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 5f62fb7a5d0a..71c073f38e54 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
mdsc->max_sessions = newmax;
}
mdsc->sessions[mds] = s;
+ atomic_inc(&mdsc->num_sessions);
atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
@@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
mdsc->sessions[s->s_mds] = NULL;
ceph_con_close(&s->s_con);
ceph_put_mds_session(s);
+ atomic_dec(&mdsc->num_sessions);
}
/*
@@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
struct ceph_options *opt = mdsc->fsc->client->options;
void *p;
- const char* metadata[3][2] = {
+ const char* metadata[][2] = {
{"hostname", utsname()->nodename},
+ {"kernel_version", utsname()->release},
{"entity_id", opt->name ? opt->name : ""},
{NULL, NULL}
};
@@ -1464,19 +1467,33 @@ out_unlocked:
return err;
}
+static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int ret;
+ spin_lock(&ci->i_ceph_lock);
+ if (ci->i_flushing_caps)
+ ret = ci->i_cap_flush_seq >= want_flush_seq;
+ else
+ ret = 1;
+ spin_unlock(&ci->i_ceph_lock);
+ return ret;
+}
+
/*
* flush all dirty inode data to disk.
*
* returns true if we've flushed through want_flush_seq
*/
-static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
+static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
{
- int mds, ret = 1;
+ int mds;
dout("check_cap_flush want %lld\n", want_flush_seq);
mutex_lock(&mdsc->mutex);
- for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
+ for (mds = 0; mds < mdsc->max_sessions; mds++) {
struct ceph_mds_session *session = mdsc->sessions[mds];
+ struct inode *inode = NULL;
if (!session)
continue;
@@ -1489,29 +1506,29 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
list_entry(session->s_cap_flushing.next,
struct ceph_inode_info,
i_flushing_item);
- struct inode *inode = &ci->vfs_inode;
- spin_lock(&ci->i_ceph_lock);
- if (ci->i_cap_flush_seq <= want_flush_seq) {
+ if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
dout("check_cap_flush still flushing %p "
- "seq %lld <= %lld to mds%d\n", inode,
- ci->i_cap_flush_seq, want_flush_seq,
- session->s_mds);
- ret = 0;
+ "seq %lld <= %lld to mds%d\n",
+ &ci->vfs_inode, ci->i_cap_flush_seq,
+ want_flush_seq, session->s_mds);
+ inode = igrab(&ci->vfs_inode);
}
- spin_unlock(&ci->i_ceph_lock);
}
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
- if (!ret)
- return ret;
+ if (inode) {
+ wait_event(mdsc->cap_flushing_wq,
+ check_cap_flush(inode, want_flush_seq));
+ iput(inode);
+ }
+
mutex_lock(&mdsc->mutex);
}
mutex_unlock(&mdsc->mutex);
dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
- return ret;
}
/*
@@ -1923,7 +1940,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
head->num_releases = cpu_to_le16(releases);
/* time stamp */
- ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+ {
+ struct ceph_timespec ts;
+ ceph_encode_timespec(&ts, &req->r_stamp);
+ ceph_encode_copy(&p, &ts, sizeof(ts));
+ }
BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base;
@@ -2012,7 +2033,11 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
/* time stamp */
p = msg->front.iov_base + req->r_request_release_offset;
- ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+ {
+ struct ceph_timespec ts;
+ ceph_encode_timespec(&ts, &req->r_stamp);
+ ceph_encode_copy(&p, &ts, sizeof(ts));
+ }
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -2159,6 +2184,8 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
p = rb_next(p);
if (req->r_got_unsafe)
continue;
+ if (req->r_attempts > 0)
+ continue; /* only new requests */
if (req->r_session &&
req->r_session->s_mds == mds) {
dout(" kicking tid %llu\n", req->r_tid);
@@ -2286,6 +2313,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
struct ceph_mds_request *req;
struct ceph_mds_reply_head *head = msg->front.iov_base;
struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
+ struct ceph_snap_realm *realm;
u64 tid;
int err, result;
int mds = session->s_mds;
@@ -2401,11 +2429,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
}
/* snap trace */
+ realm = NULL;
if (rinfo->snapblob_len) {
down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, rinfo->snapblob,
- rinfo->snapblob + rinfo->snapblob_len,
- le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
+ rinfo->snapblob + rinfo->snapblob_len,
+ le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
+ &realm);
downgrade_write(&mdsc->snap_rwsem);
} else {
down_read(&mdsc->snap_rwsem);
@@ -2423,6 +2453,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_unlock(&req->r_fill_mutex);
up_read(&mdsc->snap_rwsem);
+ if (realm)
+ ceph_put_snap_realm(mdsc, realm);
out_err:
mutex_lock(&mdsc->mutex);
if (!req->r_aborted) {
@@ -2487,6 +2519,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
BUG_ON(req->r_err);
BUG_ON(req->r_got_result);
+ req->r_attempts = 0;
req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds;
put_request_session(req);
@@ -2580,6 +2613,14 @@ static void handle_session(struct ceph_mds_session *session,
send_flushmsg_ack(mdsc, session, seq);
break;
+ case CEPH_SESSION_FORCE_RO:
+ dout("force_session_readonly %p\n", session);
+ spin_lock(&session->s_cap_lock);
+ session->s_readonly = true;
+ spin_unlock(&session->s_cap_lock);
+ wake_up_session_caps(session, 0);
+ break;
+
default:
pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
WARN_ON(1);
@@ -2610,6 +2651,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_mds_request *req, *nreq;
+ struct rb_node *p;
int err;
dout("replay_unsafe_requests mds%d\n", session->s_mds);
@@ -2622,6 +2664,28 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
ceph_con_send(&session->s_con, req->r_request);
}
}
+
+ /*
+ * also re-send old requests when MDS enters reconnect stage. So that MDS
+ * can process completed request in clientreplay stage.
+ */
+ p = rb_first(&mdsc->request_tree);
+ while (p) {
+ req = rb_entry(p, struct ceph_mds_request, r_node);
+ p = rb_next(p);
+ if (req->r_got_unsafe)
+ continue;
+ if (req->r_attempts == 0)
+ continue; /* only old requests */
+ if (req->r_session &&
+ req->r_session->s_mds == session->s_mds) {
+ err = __prepare_send_request(mdsc, req, session->s_mds);
+ if (!err) {
+ ceph_msg_get(req->r_request);
+ ceph_con_send(&session->s_con, req->r_request);
+ }
+ }
+ }
mutex_unlock(&mdsc->mutex);
}
@@ -2787,6 +2851,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
spin_unlock(&session->s_gen_ttl_lock);
spin_lock(&session->s_cap_lock);
+ /* don't know if session is readonly */
+ session->s_readonly = 0;
/*
* notify __ceph_remove_cap() that we are composing cap reconnect.
* If a cap get released before being added to the cap reconnect,
@@ -2933,9 +2999,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
mutex_unlock(&s->s_mutex);
s->s_state = CEPH_MDS_SESSION_RESTARTING;
}
-
- /* kick any requests waiting on the recovering mds */
- kick_requests(mdsc, i);
} else if (oldstate == newstate) {
continue; /* nothing new with this mds */
}
@@ -3295,6 +3358,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
init_waitqueue_head(&mdsc->session_close_wq);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
mdsc->sessions = NULL;
+ atomic_set(&mdsc->num_sessions, 0);
mdsc->max_sessions = 0;
mdsc->stopping = 0;
init_rwsem(&mdsc->snap_rwsem);
@@ -3428,14 +3492,17 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
dout("sync\n");
mutex_lock(&mdsc->mutex);
want_tid = mdsc->last_tid;
- want_flush = mdsc->cap_flush_seq;
mutex_unlock(&mdsc->mutex);
- dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
ceph_flush_dirty_caps(mdsc);
+ spin_lock(&mdsc->cap_dirty_lock);
+ want_flush = mdsc->cap_flush_seq;
+ spin_unlock(&mdsc->cap_dirty_lock);
+
+ dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
wait_unsafe_requests(mdsc, want_tid);
- wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
+ wait_caps_flush(mdsc, want_flush);
}
/*
@@ -3443,17 +3510,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
*/
static bool done_closing_sessions(struct ceph_mds_client *mdsc)
{
- int i, n = 0;
-
if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
return true;
-
- mutex_lock(&mdsc->mutex);
- for (i = 0; i < mdsc->max_sessions; i++)
- if (mdsc->sessions[i])
- n++;
- mutex_unlock(&mdsc->mutex);
- return n == 0;
+ return atomic_read(&mdsc->num_sessions) == 0;
}
/*
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e2817d00f7d9..1875b5d985c6 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -137,6 +137,7 @@ struct ceph_mds_session {
int s_nr_caps, s_trim_caps;
int s_num_cap_releases;
int s_cap_reconnect;
+ int s_readonly;
struct list_head s_cap_releases; /* waiting cap_release messages */
struct list_head s_cap_releases_done; /* ready to send */
struct ceph_cap *s_cap_iterator;
@@ -272,6 +273,7 @@ struct ceph_mds_client {
struct list_head waiting_for_map;
struct ceph_mds_session **sessions; /* NULL for mds if no session */
+ atomic_t num_sessions;
int max_sessions; /* len of s_mds_sessions */
int stopping; /* true if shutting down */
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index ce35fbd4ba5d..a97e39f09ba6 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -70,13 +70,11 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
* safe. we do need to protect against concurrent empty list
* additions, however.
*/
- if (atomic_read(&realm->nref) == 0) {
+ if (atomic_inc_return(&realm->nref) == 1) {
spin_lock(&mdsc->snap_empty_lock);
list_del_init(&realm->empty_item);
spin_unlock(&mdsc->snap_empty_lock);
}
-
- atomic_inc(&realm->nref);
}
static void __insert_snap_realm(struct rb_root *root,
@@ -116,7 +114,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
if (!realm)
return ERR_PTR(-ENOMEM);
- atomic_set(&realm->nref, 0); /* tree does not take a ref */
+ atomic_set(&realm->nref, 1); /* for caller */
realm->ino = ino;
INIT_LIST_HEAD(&realm->children);
INIT_LIST_HEAD(&realm->child_item);
@@ -134,8 +132,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
*
* caller must hold snap_rwsem for write.
*/
-struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
- u64 ino)
+static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
+ u64 ino)
{
struct rb_node *n = mdsc->snap_realms.rb_node;
struct ceph_snap_realm *r;
@@ -154,6 +152,16 @@ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
return NULL;
}
+struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
+ u64 ino)
+{
+ struct ceph_snap_realm *r;
+ r = __lookup_snap_realm(mdsc, ino);
+ if (r)
+ ceph_get_snap_realm(mdsc, r);
+ return r;
+}
+
static void __put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm);
@@ -273,7 +281,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
}
realm->parent_ino = parentino;
realm->parent = parent;
- ceph_get_snap_realm(mdsc, parent);
list_add(&realm->child_item, &parent->children);
return 1;
}
@@ -631,12 +638,14 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
* Caller must hold snap_rwsem for write.
*/
int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
- void *p, void *e, bool deletion)
+ void *p, void *e, bool deletion,
+ struct ceph_snap_realm **realm_ret)
{
struct ceph_mds_snap_realm *ri; /* encoded */
__le64 *snaps; /* encoded */
__le64 *prior_parent_snaps; /* encoded */
- struct ceph_snap_realm *realm;
+ struct ceph_snap_realm *realm = NULL;
+ struct ceph_snap_realm *first_realm = NULL;
int invalidate = 0;
int err = -ENOMEM;
LIST_HEAD(dirty_realms);
@@ -704,13 +713,18 @@ more:
dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
realm, invalidate, p, e);
- if (p < e)
- goto more;
-
/* invalidate when we reach the _end_ (root) of the trace */
- if (invalidate)
+ if (invalidate && p >= e)
rebuild_snap_realms(realm);
+ if (!first_realm)
+ first_realm = realm;
+ else
+ ceph_put_snap_realm(mdsc, realm);
+
+ if (p < e)
+ goto more;
+
/*
* queue cap snaps _after_ we've built the new snap contexts,
* so that i_head_snapc can be set appropriately.
@@ -721,12 +735,21 @@ more:
queue_realm_cap_snaps(realm);
}
+ if (realm_ret)
+ *realm_ret = first_realm;
+ else
+ ceph_put_snap_realm(mdsc, first_realm);
+
__cleanup_empty_realms(mdsc);
return 0;
bad:
err = -EINVAL;
fail:
+ if (realm && !IS_ERR(realm))
+ ceph_put_snap_realm(mdsc, realm);
+ if (first_realm)
+ ceph_put_snap_realm(mdsc, first_realm);
pr_err("update_snap_trace error %d\n", err);
return err;
}
@@ -844,7 +867,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
if (IS_ERR(realm))
goto out;
}
- ceph_get_snap_realm(mdsc, realm);
dout("splitting snap_realm %llx %p\n", realm->ino, realm);
for (i = 0; i < num_split_inos; i++) {
@@ -905,7 +927,7 @@ skip_inode:
/* we may have taken some of the old realm's children. */
for (i = 0; i < num_split_realms; i++) {
struct ceph_snap_realm *child =
- ceph_lookup_snap_realm(mdsc,
+ __lookup_snap_realm(mdsc,
le64_to_cpu(split_realms[i]));
if (!child)
continue;
@@ -918,7 +940,7 @@ skip_inode:
* snap, we can avoid queueing cap_snaps.
*/
ceph_update_snap_trace(mdsc, p, e,
- op == CEPH_SNAP_OP_DESTROY);
+ op == CEPH_SNAP_OP_DESTROY, NULL);
if (op == CEPH_SNAP_OP_SPLIT)
/* we took a reference when we created the realm, above */
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 5ae62587a71d..a63997b8bcff 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -414,6 +414,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noshare");
if (opt->flags & CEPH_OPT_NOCRC)
seq_puts(m, ",nocrc");
+ if (opt->flags & CEPH_OPT_NOMSGAUTH)
+ seq_puts(m, ",nocephx_require_signatures");
+ if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
+ seq_puts(m, ",notcp_nodelay");
if (opt->name)
seq_printf(m, ",name=%s", opt->name);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index e1aa32d0759d..04c8124ed30e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -693,7 +693,8 @@ extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm);
extern int ceph_update_snap_trace(struct ceph_mds_client *m,
- void *p, void *e, bool deletion);
+ void *p, void *e, bool deletion,
+ struct ceph_snap_realm **realm_ret);
extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg);
@@ -892,7 +893,9 @@ extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
int ceph_uninline_data(struct file *filp, struct page *locked_page);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
+extern const struct file_operations ceph_snapdir_fops;
extern const struct inode_operations ceph_dir_iops;
+extern const struct inode_operations ceph_snapdir_iops;
extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
ceph_snapdir_dentry_ops;