summaryrefslogtreecommitdiff
path: root/net/ceph/mon_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/mon_client.c')
-rw-r--r--net/ceph/mon_client.c1406
1 files changed, 950 insertions, 456 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 1fe25cd29d0e..c227ececa925 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>
#include <linux/module.h>
@@ -6,6 +7,7 @@
#include <linux/random.h>
#include <linux/sched.h>
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/debugfs.h>
@@ -34,55 +36,122 @@ static const struct ceph_connection_operations mon_con_ops;
static int __validate_auth(struct ceph_mon_client *monc);
+static int decode_mon_info(void **p, void *end, bool msgr2,
+ struct ceph_entity_addr *addr)
+{
+ void *mon_info_end;
+ u32 struct_len;
+ u8 struct_v;
+ int ret;
+
+ ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
+ &struct_len);
+ if (ret)
+ return ret;
+
+ mon_info_end = *p + struct_len;
+ ceph_decode_skip_string(p, end, e_inval); /* skip mon name */
+ ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
+ if (ret)
+ return ret;
+
+ *p = mon_info_end;
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
/*
* Decode a monmap blob (e.g., during mount).
+ *
+ * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
*/
-struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
+static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
{
- struct ceph_monmap *m = NULL;
- int i, err = -EINVAL;
+ struct ceph_monmap *monmap = NULL;
struct ceph_fsid fsid;
- u32 epoch, num_mon;
- u16 version;
- u32 len;
+ u32 struct_len;
+ int blob_len;
+ int num_mon;
+ u8 struct_v;
+ u32 epoch;
+ int ret;
+ int i;
- ceph_decode_32_safe(&p, end, len, bad);
- ceph_decode_need(&p, end, len, bad);
+ ceph_decode_32_safe(p, end, blob_len, e_inval);
+ ceph_decode_need(p, end, blob_len, e_inval);
- dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
+ ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
+ if (ret)
+ goto fail;
- ceph_decode_16_safe(&p, end, version, bad);
+ dout("%s struct_v %d\n", __func__, struct_v);
+ ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
+ ceph_decode_32_safe(p, end, epoch, e_inval);
+ if (struct_v >= 6) {
+ u32 feat_struct_len;
+ u8 feat_struct_v;
- ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
- ceph_decode_copy(&p, &fsid, sizeof(fsid));
- epoch = ceph_decode_32(&p);
+ *p += sizeof(struct ceph_timespec); /* skip last_changed */
+ *p += sizeof(struct ceph_timespec); /* skip created */
- num_mon = ceph_decode_32(&p);
- ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
+ ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
+ &feat_struct_v, &feat_struct_len);
+ if (ret)
+ goto fail;
- if (num_mon >= CEPH_MAX_MON)
- goto bad;
- m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
- if (m == NULL)
- return ERR_PTR(-ENOMEM);
- m->fsid = fsid;
- m->epoch = epoch;
- m->num_mon = num_mon;
- ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
- for (i = 0; i < num_mon; i++)
- ceph_decode_addr(&m->mon_inst[i].addr);
-
- dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
- m->num_mon);
- for (i = 0; i < m->num_mon; i++)
- dout("monmap_decode mon%d is %s\n", i,
- ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
- return m;
+ *p += feat_struct_len; /* skip persistent_features */
-bad:
- dout("monmap_decode failed with %d\n", err);
- kfree(m);
- return ERR_PTR(err);
+ ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
+ &feat_struct_v, &feat_struct_len);
+ if (ret)
+ goto fail;
+
+ *p += feat_struct_len; /* skip optional_features */
+ }
+ ceph_decode_32_safe(p, end, num_mon, e_inval);
+
+ dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
+ num_mon);
+ if (num_mon > CEPH_MAX_MON)
+ goto e_inval;
+
+ monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
+ if (!monmap) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+ monmap->fsid = fsid;
+ monmap->epoch = epoch;
+ monmap->num_mon = num_mon;
+
+ /* legacy_mon_addr map or mon_info map */
+ for (i = 0; i < num_mon; i++) {
+ struct ceph_entity_inst *inst = &monmap->mon_inst[i];
+
+ ceph_decode_skip_string(p, end, e_inval); /* skip mon name */
+ inst->name.type = CEPH_ENTITY_TYPE_MON;
+ inst->name.num = cpu_to_le64(i);
+
+ if (struct_v >= 6)
+ ret = decode_mon_info(p, end, msgr2, &inst->addr);
+ else
+ ret = ceph_decode_entity_addr(p, end, &inst->addr);
+ if (ret)
+ goto fail;
+
+ dout("%s mon%d addr %s\n", __func__, i,
+ ceph_pr_addr(&inst->addr));
+ }
+
+ return monmap;
+
+e_inval:
+ ret = -EINVAL;
+fail:
+ kfree(monmap);
+ return ERR_PTR(ret);
}
/*
@@ -92,9 +161,11 @@ int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
{
int i;
- for (i = 0; i < m->num_mon; i++)
- if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
+ for (i = 0; i < m->num_mon; i++) {
+ if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
return 1;
+ }
+
return 0;
}
@@ -122,47 +193,112 @@ static void __close_session(struct ceph_mon_client *monc)
ceph_msg_revoke(monc->m_subscribe);
ceph_msg_revoke_incoming(monc->m_subscribe_ack);
ceph_con_close(&monc->con);
- monc->cur_mon = -1;
+
monc->pending_auth = 0;
ceph_auth_reset(monc->auth);
}
/*
- * Open a session with a (new) monitor.
+ * Pick a new monitor at random and set cur_mon. If we are repicking
+ * (i.e. cur_mon is already set), be sure to pick a different one.
*/
-static int __open_session(struct ceph_mon_client *monc)
+static void pick_new_mon(struct ceph_mon_client *monc)
{
- char r;
- int ret;
+ int old_mon = monc->cur_mon;
- if (monc->cur_mon < 0) {
- get_random_bytes(&r, 1);
- monc->cur_mon = r % monc->monmap->num_mon;
- dout("open_session num=%d r=%d -> mon%d\n",
- monc->monmap->num_mon, r, monc->cur_mon);
- monc->sub_sent = 0;
- monc->sub_renew_after = jiffies; /* i.e., expired */
- monc->want_next_osdmap = !!monc->want_next_osdmap;
-
- dout("open_session mon%d opening\n", monc->cur_mon);
- ceph_con_open(&monc->con,
- CEPH_ENTITY_TYPE_MON, monc->cur_mon,
- &monc->monmap->mon_inst[monc->cur_mon].addr);
-
- /* initiatiate authentication handshake */
- ret = ceph_auth_build_hello(monc->auth,
- monc->m_auth->front.iov_base,
- monc->m_auth->front_max);
- __send_prepared_auth_request(monc, ret);
+ BUG_ON(monc->monmap->num_mon < 1);
+
+ if (monc->monmap->num_mon == 1) {
+ monc->cur_mon = 0;
} else {
- dout("open_session mon%d already open\n", monc->cur_mon);
+ int max = monc->monmap->num_mon;
+ int o = -1;
+ int n;
+
+ if (monc->cur_mon >= 0) {
+ if (monc->cur_mon < monc->monmap->num_mon)
+ o = monc->cur_mon;
+ if (o >= 0)
+ max--;
+ }
+
+ n = get_random_u32_below(max);
+ if (o >= 0 && n >= o)
+ n++;
+
+ monc->cur_mon = n;
}
- return 0;
+
+ dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
+ monc->cur_mon, monc->monmap->num_mon);
+}
+
+/*
+ * Open a session with a new monitor.
+ */
+static void __open_session(struct ceph_mon_client *monc)
+{
+ int ret;
+
+ pick_new_mon(monc);
+
+ monc->hunting = true;
+ if (monc->had_a_connection) {
+ monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
+ if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
+ monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
+ }
+
+ monc->sub_renew_after = jiffies; /* i.e., expired */
+ monc->sub_renew_sent = 0;
+
+ dout("%s opening mon%d\n", __func__, monc->cur_mon);
+ ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
+ &monc->monmap->mon_inst[monc->cur_mon].addr);
+
+ /*
+ * Queue a keepalive to ensure that in case of an early fault
+ * the messenger doesn't put us into STANDBY state and instead
+ * retries. This also ensures that our timestamp is valid by
+ * the time we finish hunting and delayed_work() checks it.
+ */
+ ceph_con_keepalive(&monc->con);
+ if (ceph_msgr2(monc->client)) {
+ monc->pending_auth = 1;
+ return;
+ }
+
+ /* initiate authentication handshake */
+ ret = ceph_auth_build_hello(monc->auth,
+ monc->m_auth->front.iov_base,
+ monc->m_auth->front_alloc_len);
+ BUG_ON(ret <= 0);
+ __send_prepared_auth_request(monc, ret);
}
-static bool __sub_expired(struct ceph_mon_client *monc)
+static void reopen_session(struct ceph_mon_client *monc)
{
- return time_after_eq(jiffies, monc->sub_renew_after);
+ if (!monc->hunting)
+ pr_info("mon%d %s session lost, hunting for new mon\n",
+ monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
+
+ __close_session(monc);
+ __open_session(monc);
+}
+
+void ceph_monc_reopen_session(struct ceph_mon_client *monc)
+{
+ mutex_lock(&monc->mutex);
+ reopen_session(monc);
+ mutex_unlock(&monc->mutex);
+}
+
+static void un_backoff(struct ceph_mon_client *monc)
+{
+ monc->hunt_mult /= 2; /* reduce by 50% */
+ if (monc->hunt_mult < 1)
+ monc->hunt_mult = 1;
+ dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
}
/*
@@ -170,69 +306,77 @@ static bool __sub_expired(struct ceph_mon_client *monc)
*/
static void __schedule_delayed(struct ceph_mon_client *monc)
{
- unsigned int delay;
+ unsigned long delay;
- if (monc->cur_mon < 0 || __sub_expired(monc))
- delay = 10 * HZ;
+ if (monc->hunting)
+ delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
else
- delay = 20 * HZ;
- dout("__schedule_delayed after %u\n", delay);
- schedule_delayed_work(&monc->delayed_work, delay);
+ delay = CEPH_MONC_PING_INTERVAL;
+
+ dout("__schedule_delayed after %lu\n", delay);
+ mod_delayed_work(system_percpu_wq, &monc->delayed_work,
+ round_jiffies_relative(delay));
}
+const char *ceph_sub_str[] = {
+ [CEPH_SUB_MONMAP] = "monmap",
+ [CEPH_SUB_OSDMAP] = "osdmap",
+ [CEPH_SUB_FSMAP] = "fsmap.user",
+ [CEPH_SUB_MDSMAP] = "mdsmap",
+};
+
/*
- * Send subscribe request for mdsmap and/or osdmap.
+ * Send subscribe request for one or more maps, according to
+ * monc->subs.
*/
static void __send_subscribe(struct ceph_mon_client *monc)
{
- dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
- (unsigned int)monc->sub_sent, __sub_expired(monc),
- monc->want_next_osdmap);
- if ((__sub_expired(monc) && !monc->sub_sent) ||
- monc->want_next_osdmap == 1) {
- struct ceph_msg *msg = monc->m_subscribe;
- struct ceph_mon_subscribe_item *i;
- void *p, *end;
- int num;
-
- p = msg->front.iov_base;
- end = p + msg->front_max;
-
- num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
- ceph_encode_32(&p, num);
-
- if (monc->want_next_osdmap) {
- dout("__send_subscribe to 'osdmap' %u\n",
- (unsigned int)monc->have_osdmap);
- ceph_encode_string(&p, end, "osdmap", 6);
- i = p;
- i->have = cpu_to_le64(monc->have_osdmap);
- i->onetime = 1;
- p += sizeof(*i);
- monc->want_next_osdmap = 2; /* requested */
- }
- if (monc->want_mdsmap) {
- dout("__send_subscribe to 'mdsmap' %u+\n",
- (unsigned int)monc->have_mdsmap);
- ceph_encode_string(&p, end, "mdsmap", 6);
- i = p;
- i->have = cpu_to_le64(monc->have_mdsmap);
- i->onetime = 0;
- p += sizeof(*i);
- }
- ceph_encode_string(&p, end, "monmap", 6);
- i = p;
- i->have = 0;
- i->onetime = 0;
- p += sizeof(*i);
-
- msg->front.iov_len = p - msg->front.iov_base;
- msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- ceph_msg_revoke(msg);
- ceph_con_send(&monc->con, ceph_msg_get(msg));
-
- monc->sub_sent = jiffies | 1; /* never 0 */
+ struct ceph_msg *msg = monc->m_subscribe;
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front_alloc_len;
+ int num = 0;
+ int i;
+
+ dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
+
+ BUG_ON(monc->cur_mon < 0);
+
+ if (!monc->sub_renew_sent)
+ monc->sub_renew_sent = jiffies | 1; /* never 0 */
+
+ msg->hdr.version = cpu_to_le16(2);
+
+ for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+ if (monc->subs[i].want)
+ num++;
+ }
+ BUG_ON(num < 1); /* monmap sub is always there */
+ ceph_encode_32(&p, num);
+ for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+ char buf[32];
+ int len;
+
+ if (!monc->subs[i].want)
+ continue;
+
+ len = sprintf(buf, "%s", ceph_sub_str[i]);
+ if (i == CEPH_SUB_MDSMAP &&
+ monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
+ len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
+
+ dout("%s %s start %llu flags 0x%x\n", __func__, buf,
+ le64_to_cpu(monc->subs[i].item.start),
+ monc->subs[i].item.flags);
+ ceph_encode_string(&p, end, buf, len);
+ memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
+ p += sizeof(monc->subs[i].item);
}
+
+ BUG_ON(p > end);
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ ceph_msg_revoke(msg);
+ ceph_con_send(&monc->con, ceph_msg_get(msg));
}
static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -246,15 +390,20 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
seconds = le32_to_cpu(h->duration);
mutex_lock(&monc->mutex);
- if (monc->hunting) {
- pr_info("mon%d %s session established\n",
- monc->cur_mon,
- ceph_pr_addr(&monc->con.peer_addr.in_addr));
- monc->hunting = false;
+ if (monc->sub_renew_sent) {
+ /*
+ * This is only needed for legacy (infernalis or older)
+ * MONs -- see delayed_work().
+ */
+ monc->sub_renew_after = monc->sub_renew_sent +
+ (seconds >> 1) * HZ - 1;
+ dout("%s sent %lu duration %d renew after %lu\n", __func__,
+ monc->sub_renew_sent, seconds, monc->sub_renew_after);
+ monc->sub_renew_sent = 0;
+ } else {
+ dout("%s sent %lu renew after %lu, ignoring\n", __func__,
+ monc->sub_renew_sent, monc->sub_renew_after);
}
- dout("handle_subscribe_ack after %d seconds\n", seconds);
- monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
- monc->sub_sent = 0;
mutex_unlock(&monc->mutex);
return;
bad:
@@ -263,46 +412,123 @@ bad:
}
/*
+ * Register interest in a map
+ *
+ * @sub: one of CEPH_SUB_*
+ * @epoch: X for "every map since X", or 0 for "just the latest"
+ */
+static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
+ u32 epoch, bool continuous)
+{
+ __le64 start = cpu_to_le64(epoch);
+ u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
+
+ dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
+ epoch, continuous);
+
+ if (monc->subs[sub].want &&
+ monc->subs[sub].item.start == start &&
+ monc->subs[sub].item.flags == flags)
+ return false;
+
+ monc->subs[sub].item.start = start;
+ monc->subs[sub].item.flags = flags;
+ monc->subs[sub].want = true;
+
+ return true;
+}
+
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+ bool continuous)
+{
+ bool need_request;
+
+ mutex_lock(&monc->mutex);
+ need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
+ mutex_unlock(&monc->mutex);
+
+ return need_request;
+}
+EXPORT_SYMBOL(ceph_monc_want_map);
+
+/*
* Keep track of which maps we have
+ *
+ * @sub: one of CEPH_SUB_*
*/
-int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
+static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
+ u32 epoch)
+{
+ dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
+
+ if (monc->subs[sub].want) {
+ if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
+ monc->subs[sub].want = false;
+ else
+ monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
+ }
+
+ monc->subs[sub].have = epoch;
+}
+
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
{
mutex_lock(&monc->mutex);
- monc->have_mdsmap = got;
+ __ceph_monc_got_map(monc, sub, epoch);
mutex_unlock(&monc->mutex);
- return 0;
}
-EXPORT_SYMBOL(ceph_monc_got_mdsmap);
+EXPORT_SYMBOL(ceph_monc_got_map);
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+void ceph_monc_renew_subs(struct ceph_mon_client *monc)
{
mutex_lock(&monc->mutex);
- monc->have_osdmap = got;
- monc->want_next_osdmap = 0;
+ __send_subscribe(monc);
mutex_unlock(&monc->mutex);
- return 0;
}
+EXPORT_SYMBOL(ceph_monc_renew_subs);
/*
- * Register interest in the next osdmap
+ * Wait for an osdmap with a given epoch.
+ *
+ * @epoch: epoch to wait for
+ * @timeout: in jiffies, 0 means "wait forever"
*/
-void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
+int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
+ unsigned long timeout)
{
- dout("request_next_osdmap have %u\n", monc->have_osdmap);
+ unsigned long started = jiffies;
+ long ret;
+
mutex_lock(&monc->mutex);
- if (!monc->want_next_osdmap)
- monc->want_next_osdmap = 1;
- if (monc->want_next_osdmap < 2)
- __send_subscribe(monc);
+ while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
+ mutex_unlock(&monc->mutex);
+
+ if (timeout && time_after_eq(jiffies, started + timeout))
+ return -ETIMEDOUT;
+
+ ret = wait_event_interruptible_timeout(monc->client->auth_wq,
+ monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
+ ceph_timeout_jiffies(timeout));
+ if (ret < 0)
+ return ret;
+
+ mutex_lock(&monc->mutex);
+ }
+
mutex_unlock(&monc->mutex);
+ return 0;
}
+EXPORT_SYMBOL(ceph_monc_wait_osdmap);
/*
- *
+ * Open a session with a random monitor. Request monmap and osdmap,
+ * which are waited upon in __ceph_open_session().
*/
int ceph_monc_open_session(struct ceph_mon_client *monc)
{
mutex_lock(&monc->mutex);
+ __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
+ __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
__open_session(monc);
__schedule_delayed(monc);
mutex_unlock(&monc->mutex);
@@ -310,126 +536,57 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)
}
EXPORT_SYMBOL(ceph_monc_open_session);
-/*
- * We require the fsid and global_id in order to initialize our
- * debugfs dir.
- */
-static bool have_debugfs_info(struct ceph_mon_client *monc)
-{
- dout("have_debugfs_info fsid %d globalid %lld\n",
- (int)monc->client->have_fsid, monc->auth->global_id);
- return monc->client->have_fsid && monc->auth->global_id > 0;
-}
-
-/*
- * The monitor responds with mount ack indicate mount success. The
- * included client ticket allows the client to talk to MDSs and OSDs.
- */
static void ceph_monc_handle_map(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_client *client = monc->client;
- struct ceph_monmap *monmap = NULL, *old = monc->monmap;
+ struct ceph_monmap *monmap;
void *p, *end;
- int had_debugfs_info, init_debugfs = 0;
mutex_lock(&monc->mutex);
- had_debugfs_info = have_debugfs_info(monc);
-
dout("handle_monmap\n");
p = msg->front.iov_base;
end = p + msg->front.iov_len;
- monmap = ceph_monmap_decode(p, end);
+ monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client));
if (IS_ERR(monmap)) {
pr_err("problem decoding monmap, %d\n",
(int)PTR_ERR(monmap));
+ ceph_msg_dump(msg);
goto out;
}
- if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
+ if (ceph_check_fsid(client, &monmap->fsid) < 0) {
kfree(monmap);
goto out;
}
- client->monc.monmap = monmap;
- kfree(old);
-
- if (!client->have_fsid) {
- client->have_fsid = true;
- if (!had_debugfs_info && have_debugfs_info(monc)) {
- pr_info("client%lld fsid %pU\n",
- ceph_client_id(monc->client),
- &monc->client->fsid);
- init_debugfs = 1;
- }
- mutex_unlock(&monc->mutex);
+ kfree(monc->monmap);
+ monc->monmap = monmap;
- if (init_debugfs) {
- /*
- * do debugfs initialization without mutex to avoid
- * creating a locking dependency
- */
- ceph_debugfs_client_init(monc->client);
- }
+ __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
+ client->have_fsid = true;
- goto out_unlocked;
- }
out:
mutex_unlock(&monc->mutex);
-out_unlocked:
wake_up_all(&client->auth_wq);
}
/*
- * generic requests (e.g., statfs, poolop)
+ * generic requests (currently statfs, mon_get_version)
*/
-static struct ceph_mon_generic_request *__lookup_generic_req(
- struct ceph_mon_client *monc, u64 tid)
-{
- struct ceph_mon_generic_request *req;
- struct rb_node *n = monc->generic_request_tree.rb_node;
-
- while (n) {
- req = rb_entry(n, struct ceph_mon_generic_request, node);
- if (tid < req->tid)
- n = n->rb_left;
- else if (tid > req->tid)
- n = n->rb_right;
- else
- return req;
- }
- return NULL;
-}
-
-static void __insert_generic_request(struct ceph_mon_client *monc,
- struct ceph_mon_generic_request *new)
-{
- struct rb_node **p = &monc->generic_request_tree.rb_node;
- struct rb_node *parent = NULL;
- struct ceph_mon_generic_request *req = NULL;
-
- while (*p) {
- parent = *p;
- req = rb_entry(parent, struct ceph_mon_generic_request, node);
- if (new->tid < req->tid)
- p = &(*p)->rb_left;
- else if (new->tid > req->tid)
- p = &(*p)->rb_right;
- else
- BUG();
- }
-
- rb_link_node(&new->node, parent, p);
- rb_insert_color(&new->node, &monc->generic_request_tree);
-}
+DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
static void release_generic_request(struct kref *kref)
{
struct ceph_mon_generic_request *req =
container_of(kref, struct ceph_mon_generic_request, kref);
+ dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
+ req->reply);
+ WARN_ON(!RB_EMPTY_NODE(&req->node));
+
if (req->reply)
ceph_msg_put(req->reply);
if (req->request)
@@ -440,7 +597,8 @@ static void release_generic_request(struct kref *kref)
static void put_generic_request(struct ceph_mon_generic_request *req)
{
- kref_put(&req->kref, release_generic_request);
+ if (req)
+ kref_put(&req->kref, release_generic_request);
}
static void get_generic_request(struct ceph_mon_generic_request *req)
@@ -448,6 +606,103 @@ static void get_generic_request(struct ceph_mon_generic_request *req)
kref_get(&req->kref);
}
+static struct ceph_mon_generic_request *
+alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
+{
+ struct ceph_mon_generic_request *req;
+
+ req = kzalloc(sizeof(*req), gfp);
+ if (!req)
+ return NULL;
+
+ req->monc = monc;
+ kref_init(&req->kref);
+ RB_CLEAR_NODE(&req->node);
+ init_completion(&req->completion);
+
+ dout("%s greq %p\n", __func__, req);
+ return req;
+}
+
+static void register_generic_request(struct ceph_mon_generic_request *req)
+{
+ struct ceph_mon_client *monc = req->monc;
+
+ WARN_ON(req->tid);
+
+ get_generic_request(req);
+ req->tid = ++monc->last_tid;
+ insert_generic_request(&monc->generic_request_tree, req);
+}
+
+static void send_generic_request(struct ceph_mon_client *monc,
+ struct ceph_mon_generic_request *req)
+{
+ WARN_ON(!req->tid);
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+ req->request->hdr.tid = cpu_to_le64(req->tid);
+ ceph_con_send(&monc->con, ceph_msg_get(req->request));
+}
+
+static void __finish_generic_request(struct ceph_mon_generic_request *req)
+{
+ struct ceph_mon_client *monc = req->monc;
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+ erase_generic_request(&monc->generic_request_tree, req);
+
+ ceph_msg_revoke(req->request);
+ ceph_msg_revoke_incoming(req->reply);
+}
+
+static void finish_generic_request(struct ceph_mon_generic_request *req)
+{
+ __finish_generic_request(req);
+ put_generic_request(req);
+}
+
+static void complete_generic_request(struct ceph_mon_generic_request *req)
+{
+ if (req->complete_cb)
+ req->complete_cb(req);
+ else
+ complete_all(&req->completion);
+ put_generic_request(req);
+}
+
+static void cancel_generic_request(struct ceph_mon_generic_request *req)
+{
+ struct ceph_mon_client *monc = req->monc;
+ struct ceph_mon_generic_request *lookup_req;
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+
+ mutex_lock(&monc->mutex);
+ lookup_req = lookup_generic_request(&monc->generic_request_tree,
+ req->tid);
+ if (lookup_req) {
+ WARN_ON(lookup_req != req);
+ finish_generic_request(req);
+ }
+
+ mutex_unlock(&monc->mutex);
+}
+
+static int wait_generic_request(struct ceph_mon_generic_request *req)
+{
+ int ret;
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+ ret = wait_for_completion_interruptible(&req->completion);
+ if (ret)
+ cancel_generic_request(req);
+ else
+ ret = req->result; /* completed */
+
+ return ret;
+}
+
static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip)
@@ -458,7 +713,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
struct ceph_msg *m;
mutex_lock(&monc->mutex);
- req = __lookup_generic_req(monc, tid);
+ req = lookup_generic_request(&monc->generic_request_tree, tid);
if (!req) {
dout("get_generic_reply %lld dne\n", tid);
*skip = 1;
@@ -477,32 +732,6 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m;
}
-static int do_generic_request(struct ceph_mon_client *monc,
- struct ceph_mon_generic_request *req)
-{
- int err;
-
- /* register request */
- mutex_lock(&monc->mutex);
- req->tid = ++monc->last_tid;
- req->request->hdr.tid = cpu_to_le64(req->tid);
- __insert_generic_request(monc, req);
- monc->num_generic_requests++;
- ceph_con_send(&monc->con, ceph_msg_get(req->request));
- mutex_unlock(&monc->mutex);
-
- err = wait_for_completion_interruptible(&req->completion);
-
- mutex_lock(&monc->mutex);
- rb_erase(&req->node, &monc->generic_request_tree);
- monc->num_generic_requests--;
- mutex_unlock(&monc->mutex);
-
- if (!err)
- err = req->result;
- return err;
-}
-
/*
* statfs
*/
@@ -513,193 +742,322 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
u64 tid = le64_to_cpu(msg->hdr.tid);
+ dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
if (msg->front.iov_len != sizeof(*reply))
goto bad;
- dout("handle_statfs_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
- req = __lookup_generic_req(monc, tid);
- if (req) {
- *(struct ceph_statfs *)req->buf = reply->st;
- req->result = 0;
- get_generic_request(req);
+ req = lookup_generic_request(&monc->generic_request_tree, tid);
+ if (!req) {
+ mutex_unlock(&monc->mutex);
+ return;
}
+
+ req->result = 0;
+ *req->u.st = reply->st; /* struct */
+ __finish_generic_request(req);
mutex_unlock(&monc->mutex);
- if (req) {
- complete_all(&req->completion);
- put_generic_request(req);
- }
+
+ complete_generic_request(req);
return;
bad:
- pr_err("corrupt generic reply, tid %llu\n", tid);
+ pr_err("corrupt statfs reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
/*
* Do a synchronous statfs().
*/
-int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
+int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
+ struct ceph_statfs *buf)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_statfs *h;
- int err;
+ int ret = -ENOMEM;
- req = kzalloc(sizeof(*req), GFP_NOFS);
+ req = alloc_generic_request(monc, GFP_NOFS);
if (!req)
- return -ENOMEM;
-
- kref_init(&req->kref);
- req->buf = buf;
- req->buf_len = sizeof(*buf);
- init_completion(&req->completion);
+ goto out;
- err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
true);
if (!req->request)
goto out;
- req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
- true);
+
+ req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
if (!req->reply)
goto out;
+ req->u.st = buf;
+ req->request->hdr.version = cpu_to_le16(2);
+
+ mutex_lock(&monc->mutex);
+ register_generic_request(req);
/* fill out request */
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
+ h->contains_data_pool = (data_pool != CEPH_NOPOOL);
+ h->data_pool = cpu_to_le64(data_pool);
+ send_generic_request(monc, req);
+ mutex_unlock(&monc->mutex);
- err = do_generic_request(monc, req);
-
+ ret = wait_generic_request(req);
out:
- kref_put(&req->kref, release_generic_request);
- return err;
+ put_generic_request(req);
+ return ret;
}
EXPORT_SYMBOL(ceph_monc_do_statfs);
+static void handle_get_version_reply(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
+{
+ struct ceph_mon_generic_request *req;
+ u64 tid = le64_to_cpu(msg->hdr.tid);
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front_alloc_len;
+ u64 handle;
+
+ dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
+ ceph_decode_need(&p, end, 2*sizeof(u64), bad);
+ handle = ceph_decode_64(&p);
+ if (tid != 0 && tid != handle)
+ goto bad;
+
+ mutex_lock(&monc->mutex);
+ req = lookup_generic_request(&monc->generic_request_tree, handle);
+ if (!req) {
+ mutex_unlock(&monc->mutex);
+ return;
+ }
+
+ req->result = 0;
+ req->u.newest = ceph_decode_64(&p);
+ __finish_generic_request(req);
+ mutex_unlock(&monc->mutex);
+
+ complete_generic_request(req);
+ return;
+
+bad:
+ pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
+ ceph_msg_dump(msg);
+}
+
+static struct ceph_mon_generic_request *
+__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
+ ceph_monc_callback_t cb, u64 private_data)
+{
+ struct ceph_mon_generic_request *req;
+
+ req = alloc_generic_request(monc, GFP_NOIO);
+ if (!req)
+ goto err_put_req;
+
+ req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
+ sizeof(u64) + sizeof(u32) + strlen(what),
+ GFP_NOIO, true);
+ if (!req->request)
+ goto err_put_req;
+
+ req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
+ true);
+ if (!req->reply)
+ goto err_put_req;
+
+ req->complete_cb = cb;
+ req->private_data = private_data;
+
+ mutex_lock(&monc->mutex);
+ register_generic_request(req);
+ {
+ void *p = req->request->front.iov_base;
+ void *const end = p + req->request->front_alloc_len;
+
+ ceph_encode_64(&p, req->tid); /* handle */
+ ceph_encode_string(&p, end, what, strlen(what));
+ WARN_ON(p != end);
+ }
+ send_generic_request(monc, req);
+ mutex_unlock(&monc->mutex);
+
+ return req;
+
+err_put_req:
+ put_generic_request(req);
+ return ERR_PTR(-ENOMEM);
+}
+
/*
- * pool ops
+ * Send MMonGetVersion and wait for the reply.
+ *
+ * @what: one of "mdsmap", "osdmap" or "monmap"
*/
-static int get_poolop_reply_buf(const char *src, size_t src_len,
- char *dst, size_t dst_len)
+int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
+ u64 *newest)
{
- u32 buf_len;
+ struct ceph_mon_generic_request *req;
+ int ret;
+
+ req = __ceph_monc_get_version(monc, what, NULL, 0);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ ret = wait_generic_request(req);
+ if (!ret)
+ *newest = req->u.newest;
- if (src_len != sizeof(u32) + dst_len)
- return -EINVAL;
+ put_generic_request(req);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_monc_get_version);
+
+/*
+ * Send MMonGetVersion,
+ *
+ * @what: one of "mdsmap", "osdmap" or "monmap"
+ */
+int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
+ ceph_monc_callback_t cb, u64 private_data)
+{
+ struct ceph_mon_generic_request *req;
- buf_len = le32_to_cpu(*(u32 *)src);
- if (buf_len != dst_len)
- return -EINVAL;
+ req = __ceph_monc_get_version(monc, what, cb, private_data);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
- memcpy(dst, src + sizeof(u32), dst_len);
+ put_generic_request(req);
return 0;
}
+EXPORT_SYMBOL(ceph_monc_get_version_async);
-static void handle_poolop_reply(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
+static void handle_command_ack(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
{
struct ceph_mon_generic_request *req;
- struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front_alloc_len;
u64 tid = le64_to_cpu(msg->hdr.tid);
- if (msg->front.iov_len < sizeof(*reply))
- goto bad;
- dout("handle_poolop_reply %p tid %llu\n", msg, tid);
+ dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
+ ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
+ sizeof(u32), bad);
+ p += sizeof(struct ceph_mon_request_header);
mutex_lock(&monc->mutex);
- req = __lookup_generic_req(monc, tid);
- if (req) {
- if (req->buf_len &&
- get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
- msg->front.iov_len - sizeof(*reply),
- req->buf, req->buf_len) < 0) {
- mutex_unlock(&monc->mutex);
- goto bad;
- }
- req->result = le32_to_cpu(reply->reply_code);
- get_generic_request(req);
+ req = lookup_generic_request(&monc->generic_request_tree, tid);
+ if (!req) {
+ mutex_unlock(&monc->mutex);
+ return;
}
+
+ req->result = ceph_decode_32(&p);
+ __finish_generic_request(req);
mutex_unlock(&monc->mutex);
- if (req) {
- complete(&req->completion);
- put_generic_request(req);
- }
+
+ complete_generic_request(req);
return;
bad:
- pr_err("corrupt generic reply, tid %llu\n", tid);
+ pr_err("corrupt mon_command ack, tid %llu\n", tid);
ceph_msg_dump(msg);
}
-/*
- * Do a synchronous pool op.
- */
-static int do_poolop(struct ceph_mon_client *monc, u32 op,
- u32 pool, u64 snapid,
- char *buf, int len)
+static __printf(2, 0)
+int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
+ va_list ap)
{
struct ceph_mon_generic_request *req;
- struct ceph_mon_poolop *h;
- int err;
+ struct ceph_mon_command *h;
+ int ret = -ENOMEM;
+ int len;
- req = kzalloc(sizeof(*req), GFP_NOFS);
+ req = alloc_generic_request(monc, GFP_NOIO);
if (!req)
- return -ENOMEM;
-
- kref_init(&req->kref);
- req->buf = buf;
- req->buf_len = len;
- init_completion(&req->completion);
+ goto out;
- err = -ENOMEM;
- req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
- true);
+ req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
if (!req->request)
goto out;
- req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
+
+ req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
true);
if (!req->reply)
goto out;
- /* fill out request */
- req->request->hdr.version = cpu_to_le16(2);
+ mutex_lock(&monc->mutex);
+ register_generic_request(req);
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
- h->pool = cpu_to_le32(pool);
- h->op = cpu_to_le32(op);
- h->auid = 0;
- h->snapid = cpu_to_le64(snapid);
- h->name_len = 0;
-
- err = do_generic_request(monc, req);
+ h->num_strs = cpu_to_le32(1);
+ len = vsprintf(h->str, fmt, ap);
+ h->str_len = cpu_to_le32(len);
+ send_generic_request(monc, req);
+ mutex_unlock(&monc->mutex);
+ ret = wait_generic_request(req);
out:
- kref_put(&req->kref, release_generic_request);
- return err;
+ put_generic_request(req);
+ return ret;
}
-int ceph_monc_create_snapid(struct ceph_mon_client *monc,
- u32 pool, u64 *snapid)
+static __printf(2, 3)
+int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
{
- return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
- pool, 0, (char *)snapid, sizeof(*snapid));
+ va_list ap;
+ int ret;
+ va_start(ap, fmt);
+ ret = do_mon_command_vargs(monc, fmt, ap);
+ va_end(ap);
+ return ret;
}
-EXPORT_SYMBOL(ceph_monc_create_snapid);
-int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
- u32 pool, u64 snapid)
+int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
+ struct ceph_entity_addr *client_addr)
{
- return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
- pool, snapid, NULL, 0);
+ int ret;
+
+ ret = do_mon_command(monc,
+ "{ \"prefix\": \"osd blocklist\", \
+ \"blocklistop\": \"add\", \
+ \"addr\": \"%pISpc/%u\" }",
+ &client_addr->in_addr,
+ le32_to_cpu(client_addr->nonce));
+ if (ret == -EINVAL) {
+ /*
+ * The monitor returns EINVAL on an unrecognized command.
+ * Try the legacy command -- it is exactly the same except
+ * for the name.
+ */
+ ret = do_mon_command(monc,
+ "{ \"prefix\": \"osd blacklist\", \
+ \"blacklistop\": \"add\", \
+ \"addr\": \"%pISpc/%u\" }",
+ &client_addr->in_addr,
+ le32_to_cpu(client_addr->nonce));
+ }
+ if (ret)
+ return ret;
+ /*
+ * Make sure we have the osdmap that includes the blocklist
+ * entry. This is needed to ensure that the OSDs pick up the
+ * new blocklist before processing any future requests from
+ * this client.
+ */
+ return ceph_wait_for_latest_osdmap(monc->client, 0);
}
+EXPORT_SYMBOL(ceph_monc_blocklist_add);
/*
* Resend pending generic requests.
@@ -727,20 +1085,45 @@ static void delayed_work(struct work_struct *work)
struct ceph_mon_client *monc =
container_of(work, struct ceph_mon_client, delayed_work.work);
- dout("monc delayed_work\n");
mutex_lock(&monc->mutex);
+ dout("%s mon%d\n", __func__, monc->cur_mon);
+ if (monc->cur_mon < 0) {
+ goto out;
+ }
+
if (monc->hunting) {
- __close_session(monc);
- __open_session(monc); /* continue hunting */
+ dout("%s continuing hunt\n", __func__);
+ reopen_session(monc);
} else {
- ceph_con_keepalive(&monc->con);
+ int is_auth = ceph_auth_is_authenticated(monc->auth);
+
+ dout("%s is_authed %d\n", __func__, is_auth);
+ if (ceph_con_keepalive_expired(&monc->con,
+ CEPH_MONC_PING_TIMEOUT)) {
+ dout("monc keepalive timeout\n");
+ is_auth = 0;
+ reopen_session(monc);
+ }
+
+ if (!monc->hunting) {
+ ceph_con_keepalive(&monc->con);
+ __validate_auth(monc);
+ un_backoff(monc);
+ }
- __validate_auth(monc);
+ if (is_auth &&
+ !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
+ unsigned long now = jiffies;
- if (ceph_auth_is_authenticated(monc->auth))
- __send_subscribe(monc);
+ dout("%s renew subs? now %lu renew after %lu\n",
+ __func__, now, monc->sub_renew_after);
+ if (time_after_eq(now, monc->sub_renew_after))
+ __send_subscribe(monc);
+ }
}
__schedule_delayed(monc);
+
+out:
mutex_unlock(&monc->mutex);
}
@@ -750,36 +1133,39 @@ static void delayed_work(struct work_struct *work)
*/
static int build_initial_monmap(struct ceph_mon_client *monc)
{
+ __le32 my_type = ceph_msgr2(monc->client) ?
+ CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY;
struct ceph_options *opt = monc->client->options;
- struct ceph_entity_addr *mon_addr = opt->mon_addr;
int num_mon = opt->num_mon;
int i;
/* build initial monmap */
- monc->monmap = kzalloc(sizeof(*monc->monmap) +
- num_mon*sizeof(monc->monmap->mon_inst[0]),
+ monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
GFP_KERNEL);
if (!monc->monmap)
return -ENOMEM;
+ monc->monmap->num_mon = num_mon;
+
for (i = 0; i < num_mon; i++) {
- monc->monmap->mon_inst[i].addr = mon_addr[i];
- monc->monmap->mon_inst[i].addr.nonce = 0;
- monc->monmap->mon_inst[i].name.type =
- CEPH_ENTITY_TYPE_MON;
- monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
+ struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i];
+
+ memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr,
+ sizeof(inst->addr.in_addr));
+ inst->addr.type = my_type;
+ inst->addr.nonce = 0;
+ inst->name.type = CEPH_ENTITY_TYPE_MON;
+ inst->name.num = cpu_to_le64(i);
}
- monc->monmap->num_mon = num_mon;
return 0;
}
int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
{
- int err = 0;
+ int err;
dout("init\n");
memset(monc, 0, sizeof(*monc));
monc->client = cl;
- monc->monmap = NULL;
mutex_init(&monc->mutex);
err = build_initial_monmap(monc);
@@ -788,8 +1174,8 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
/* connection */
/* authentication */
- monc->auth = ceph_auth_init(cl->options->name,
- cl->options->key);
+ monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
+ cl->options->con_modes);
if (IS_ERR(monc->auth)) {
err = PTR_ERR(monc->auth);
goto out_monmap;
@@ -802,21 +1188,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
err = -ENOMEM;
monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
sizeof(struct ceph_mon_subscribe_ack),
- GFP_NOFS, true);
+ GFP_KERNEL, true);
if (!monc->m_subscribe_ack)
goto out_auth;
- monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
- true);
+ monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
+ GFP_KERNEL, true);
if (!monc->m_subscribe)
goto out_subscribe_ack;
- monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
- true);
+ monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
+ GFP_KERNEL, true);
if (!monc->m_auth_reply)
goto out_subscribe;
- monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
+ monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
monc->pending_auth = 0;
if (!monc->m_auth)
goto out_auth_reply;
@@ -825,18 +1211,15 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
&monc->client->msgr);
monc->cur_mon = -1;
- monc->hunting = true;
- monc->sub_renew_after = jiffies;
- monc->sub_sent = 0;
+ monc->had_a_connection = false;
+ monc->hunt_mult = 1;
INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
monc->generic_request_tree = RB_ROOT;
- monc->num_generic_requests = 0;
monc->last_tid = 0;
- monc->have_mdsmap = 0;
- monc->have_osdmap = 0;
- monc->want_next_osdmap = 1;
+ monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
+
return 0;
out_auth_reply:
@@ -857,13 +1240,15 @@ EXPORT_SYMBOL(ceph_monc_init);
void ceph_monc_stop(struct ceph_mon_client *monc)
{
dout("stop\n");
- cancel_delayed_work_sync(&monc->delayed_work);
mutex_lock(&monc->mutex);
__close_session(monc);
-
+ monc->hunting = false;
+ monc->cur_mon = -1;
mutex_unlock(&monc->mutex);
+ cancel_delayed_work_sync(&monc->delayed_work);
+
/*
* flush msgr queue before we destroy ourselves to ensure that:
* - any work that references our embedded con is finished.
@@ -874,6 +1259,8 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
ceph_auth_destroy(monc->auth);
+ WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
+
ceph_msg_put(monc->m_auth);
ceph_msg_put(monc->m_auth_reply);
ceph_msg_put(monc->m_subscribe);
@@ -883,28 +1270,33 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
}
EXPORT_SYMBOL(ceph_monc_stop);
-static void handle_auth_reply(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
+static void finish_hunting(struct ceph_mon_client *monc)
{
- int ret;
- int was_auth = 0;
- int had_debugfs_info, init_debugfs = 0;
+ if (monc->hunting) {
+ dout("%s found mon%d\n", __func__, monc->cur_mon);
+ monc->hunting = false;
+ monc->had_a_connection = true;
+ un_backoff(monc);
+ __schedule_delayed(monc);
+ }
+}
+
+static void finish_auth(struct ceph_mon_client *monc, int auth_err,
+ bool was_authed)
+{
+ dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
+ WARN_ON(auth_err > 0);
- mutex_lock(&monc->mutex);
- had_debugfs_info = have_debugfs_info(monc);
- was_auth = ceph_auth_is_authenticated(monc->auth);
monc->pending_auth = 0;
- ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
- msg->front.iov_len,
- monc->m_auth->front.iov_base,
- monc->m_auth->front_max);
- if (ret < 0) {
- monc->client->auth_err = ret;
+ if (auth_err) {
+ monc->client->auth_err = auth_err;
wake_up_all(&monc->client->auth_wq);
- } else if (ret > 0) {
- __send_prepared_auth_request(monc, ret);
- } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
- dout("authenticated, starting session\n");
+ return;
+ }
+
+ if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
+ dout("%s authenticated, starting session global_id %llu\n",
+ __func__, monc->auth->global_id);
monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
monc->client->msgr.inst.name.num =
@@ -912,23 +1304,31 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
__send_subscribe(monc);
__resend_generic_request(monc);
- }
- if (!had_debugfs_info && have_debugfs_info(monc)) {
- pr_info("client%lld fsid %pU\n",
- ceph_client_id(monc->client),
- &monc->client->fsid);
- init_debugfs = 1;
+ pr_info("mon%d %s session established\n", monc->cur_mon,
+ ceph_pr_addr(&monc->con.peer_addr));
}
- mutex_unlock(&monc->mutex);
+}
- if (init_debugfs) {
- /*
- * do debugfs initialization without mutex to avoid
- * creating a locking dependency
- */
- ceph_debugfs_client_init(monc->client);
+static void handle_auth_reply(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
+{
+ bool was_authed;
+ int ret;
+
+ mutex_lock(&monc->mutex);
+ was_authed = ceph_auth_is_authenticated(monc->auth);
+ ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
+ msg->front.iov_len,
+ monc->m_auth->front.iov_base,
+ monc->m_auth->front_alloc_len);
+ if (ret > 0) {
+ __send_prepared_auth_request(monc, ret);
+ } else {
+ finish_auth(monc, ret, was_authed);
+ finish_hunting(monc);
}
+ mutex_unlock(&monc->mutex);
}
static int __validate_auth(struct ceph_mon_client *monc)
@@ -939,7 +1339,7 @@ static int __validate_auth(struct ceph_mon_client *monc)
return 0;
ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
- monc->m_auth->front_max);
+ monc->m_auth->front_alloc_len);
if (ret <= 0)
return ret; /* either an error, or no need to authenticate */
__send_prepared_auth_request(monc, ret);
@@ -957,17 +1357,96 @@ int ceph_monc_validate_auth(struct ceph_mon_client *monc)
}
EXPORT_SYMBOL(ceph_monc_validate_auth);
+static int mon_get_auth_request(struct ceph_connection *con,
+ void *buf, int *buf_len,
+ void **authorizer, int *authorizer_len)
+{
+ struct ceph_mon_client *monc = con->private;
+ int ret;
+
+ mutex_lock(&monc->mutex);
+ ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
+ mutex_unlock(&monc->mutex);
+ if (ret < 0)
+ return ret;
+
+ *buf_len = ret;
+ *authorizer = NULL;
+ *authorizer_len = 0;
+ return 0;
+}
+
+static int mon_handle_auth_reply_more(struct ceph_connection *con,
+ void *reply, int reply_len,
+ void *buf, int *buf_len,
+ void **authorizer, int *authorizer_len)
+{
+ struct ceph_mon_client *monc = con->private;
+ int ret;
+
+ mutex_lock(&monc->mutex);
+ ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len,
+ buf, *buf_len);
+ mutex_unlock(&monc->mutex);
+ if (ret < 0)
+ return ret;
+
+ *buf_len = ret;
+ *authorizer = NULL;
+ *authorizer_len = 0;
+ return 0;
+}
+
+static int mon_handle_auth_done(struct ceph_connection *con,
+ u64 global_id, void *reply, int reply_len,
+ u8 *session_key, int *session_key_len,
+ u8 *con_secret, int *con_secret_len)
+{
+ struct ceph_mon_client *monc = con->private;
+ bool was_authed;
+ int ret;
+
+ mutex_lock(&monc->mutex);
+ WARN_ON(!monc->hunting);
+ was_authed = ceph_auth_is_authenticated(monc->auth);
+ ret = ceph_auth_handle_reply_done(monc->auth, global_id,
+ reply, reply_len,
+ session_key, session_key_len,
+ con_secret, con_secret_len);
+ finish_auth(monc, ret, was_authed);
+ if (!ret)
+ finish_hunting(monc);
+ mutex_unlock(&monc->mutex);
+ return 0;
+}
+
+static int mon_handle_auth_bad_method(struct ceph_connection *con,
+ int used_proto, int result,
+ const int *allowed_protos, int proto_cnt,
+ const int *allowed_modes, int mode_cnt)
+{
+ struct ceph_mon_client *monc = con->private;
+ bool was_authed;
+
+ mutex_lock(&monc->mutex);
+ WARN_ON(!monc->hunting);
+ was_authed = ceph_auth_is_authenticated(monc->auth);
+ ceph_auth_handle_bad_method(monc->auth, used_proto, result,
+ allowed_protos, proto_cnt,
+ allowed_modes, mode_cnt);
+ finish_auth(monc, -EACCES, was_authed);
+ mutex_unlock(&monc->mutex);
+ return 0;
+}
+
/*
* handle incoming message
*/
-static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
+static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_mon_client *monc = con->private;
int type = le16_to_cpu(msg->hdr.type);
- if (!monc)
- return;
-
switch (type) {
case CEPH_MSG_AUTH_REPLY:
handle_auth_reply(monc, msg);
@@ -981,8 +1460,12 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg);
break;
- case CEPH_MSG_POOLOP_REPLY:
- handle_poolop_reply(monc, msg);
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ handle_get_version_reply(monc, msg);
+ break;
+
+ case CEPH_MSG_MON_COMMAND_ACK:
+ handle_command_ack(monc, msg);
break;
case CEPH_MSG_MON_MAP:
@@ -998,7 +1481,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
if (monc->client->extra_mon_dispatch &&
monc->client->extra_mon_dispatch(monc->client, msg) == 0)
break;
-
+
pr_err("received unknown message type %d %s\n", type,
ceph_msg_type_name(type));
}
@@ -1023,15 +1506,26 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack);
break;
- case CEPH_MSG_POOLOP_REPLY:
case CEPH_MSG_STATFS_REPLY:
+ case CEPH_MSG_MON_COMMAND_ACK:
return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY:
m = ceph_msg_get(monc->m_auth_reply);
break;
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ if (le64_to_cpu(hdr->tid) != 0)
+ return get_generic_reply(con, hdr, skip);
+
+ /*
+ * Older OSDs don't set reply tid even if the original
+ * request had a non-zero tid. Work around this weirdness
+ * by allocating a new message.
+ */
+ fallthrough;
case CEPH_MSG_MON_MAP:
case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP:
+ case CEPH_MSG_FS_MAP_USER:
m = ceph_msg_new(type, front_len, GFP_NOFS, false);
if (!m)
return NULL; /* ENOMEM--return skip == 0 */
@@ -1041,7 +1535,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
if (!m) {
pr_info("alloc_msg unknown type %d\n", type);
*skip = 1;
+ } else if (front_len > m->front_alloc_len) {
+ pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
+ front_len, m->front_alloc_len,
+ (unsigned int)con->peer_name.type,
+ le64_to_cpu(con->peer_name.num));
+ ceph_msg_put(m);
+ m = ceph_msg_new(type, front_len, GFP_NOFS, false);
}
+
return m;
}
@@ -1053,29 +1555,17 @@ static void mon_fault(struct ceph_connection *con)
{
struct ceph_mon_client *monc = con->private;
- if (!monc)
- return;
-
- dout("mon_fault\n");
mutex_lock(&monc->mutex);
- if (!con->private)
- goto out;
-
- if (!monc->hunting)
- pr_info("mon%d %s session lost, "
- "hunting for new mon\n", monc->cur_mon,
- ceph_pr_addr(&monc->con.peer_addr.in_addr));
-
- __close_session(monc);
- if (!monc->hunting) {
- /* start hunting */
- monc->hunting = true;
- __open_session(monc);
- } else {
- /* already hunting, let's wait a bit */
- __schedule_delayed(monc);
+ dout("%s mon%d\n", __func__, monc->cur_mon);
+ if (monc->cur_mon >= 0) {
+ if (!monc->hunting) {
+ dout("%s hunting for new mon\n", __func__);
+ reopen_session(monc);
+ __schedule_delayed(monc);
+ } else {
+ dout("%s already hunting\n", __func__);
+ }
}
-out:
mutex_unlock(&monc->mutex);
}
@@ -1084,19 +1574,23 @@ out:
* will come from the messenger workqueue, which is drained prior to
* mon_client destruction.
*/
-static struct ceph_connection *con_get(struct ceph_connection *con)
+static struct ceph_connection *mon_get_con(struct ceph_connection *con)
{
return con;
}
-static void con_put(struct ceph_connection *con)
+static void mon_put_con(struct ceph_connection *con)
{
}
static const struct ceph_connection_operations mon_con_ops = {
- .get = con_get,
- .put = con_put,
- .dispatch = dispatch,
- .fault = mon_fault,
+ .get = mon_get_con,
+ .put = mon_put_con,
.alloc_msg = mon_alloc_msg,
+ .dispatch = mon_dispatch,
+ .fault = mon_fault,
+ .get_auth_request = mon_get_auth_request,
+ .handle_auth_reply_more = mon_handle_auth_reply_more,
+ .handle_auth_done = mon_handle_auth_done,
+ .handle_auth_bad_method = mon_handle_auth_bad_method,
};