summaryrefslogtreecommitdiff
path: root/fs/dlm/member.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/member.c')
-rw-r--r--fs/dlm/member.c98
1 files changed, 70 insertions, 28 deletions
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 923c01a8a0aa..c0f557a80a75 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -18,7 +18,7 @@
#include "midcomms.h"
#include "lowcomms.h"
-int dlm_slots_version(struct dlm_header *h)
+int dlm_slots_version(const struct dlm_header *h)
{
if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
return 0;
@@ -307,6 +307,21 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
}
}
+static int add_remote_member(int nodeid)
+{
+ int error;
+
+ if (nodeid == dlm_our_nodeid())
+ return 0;
+
+ error = dlm_lowcomms_connect_node(nodeid);
+ if (error < 0)
+ return error;
+
+ dlm_midcomms_add_member(nodeid);
+ return 0;
+}
+
static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
{
struct dlm_member *memb;
@@ -316,16 +331,16 @@ static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
if (!memb)
return -ENOMEM;
- error = dlm_lowcomms_connect_node(node->nodeid);
+ memb->nodeid = node->nodeid;
+ memb->weight = node->weight;
+ memb->comm_seq = node->comm_seq;
+
+ error = add_remote_member(node->nodeid);
if (error < 0) {
kfree(memb);
return error;
}
- memb->nodeid = node->nodeid;
- memb->weight = node->weight;
- memb->comm_seq = node->comm_seq;
- dlm_midcomms_add_member(node->nodeid);
add_ordered_member(ls, memb);
ls->ls_num_nodes++;
return 0;
@@ -351,6 +366,8 @@ int dlm_is_member(struct dlm_ls *ls, int nodeid)
int dlm_is_removed(struct dlm_ls *ls, int nodeid)
{
+ WARN_ON_ONCE(!nodeid || nodeid == -1);
+
if (find_memb(&ls->ls_nodes_gone, nodeid))
return 1;
return 0;
@@ -370,14 +387,17 @@ static void clear_memb_list(struct list_head *head,
}
}
-static void clear_members_cb(int nodeid)
+static void remove_remote_member(int nodeid)
{
+ if (nodeid == dlm_our_nodeid())
+ return;
+
dlm_midcomms_remove_member(nodeid);
}
void dlm_clear_members(struct dlm_ls *ls)
{
- clear_memb_list(&ls->ls_nodes, clear_members_cb);
+ clear_memb_list(&ls->ls_nodes, remove_remote_member);
ls->ls_num_nodes = 0;
}
@@ -431,7 +451,7 @@ static void make_member_array(struct dlm_ls *ls)
/* send a status request to all members just to establish comms connections */
-static int ping_members(struct dlm_ls *ls)
+static int ping_members(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
int error = 0;
@@ -441,7 +461,7 @@ static int ping_members(struct dlm_ls *ls)
error = -EINTR;
break;
}
- error = dlm_rcom_status(ls, memb->nodeid, 0);
+ error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
if (error)
break;
}
@@ -458,7 +478,8 @@ static void dlm_lsop_recover_prep(struct dlm_ls *ls)
ls->ls_ops->recover_prep(ls->ls_ops_arg);
}
-static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
+static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb,
+ unsigned int release_recover)
{
struct dlm_slot slot;
uint32_t seq;
@@ -473,9 +494,9 @@ static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
we consider the node to have failed (versus
being removed due to dlm_release_lockspace) */
- error = dlm_comm_seq(memb->nodeid, &seq);
+ error = dlm_comm_seq(memb->nodeid, &seq, false);
- if (!error && seq == memb->comm_seq)
+ if (!release_recover && !error && seq == memb->comm_seq)
return;
slot.nodeid = memb->nodeid;
@@ -532,6 +553,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
struct dlm_member *memb, *safe;
struct dlm_config_node *node;
int i, error, neg = 0, low = -1;
+ unsigned int release_recover;
/* previously removed members that we've not finished removing need to
* count as a negative change so the "neg" recovery steps will happen
@@ -549,11 +571,21 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
node = find_config_node(rv, memb->nodeid);
- if (node && !node->new)
+ if (!node) {
+ log_error(ls, "remove member %d invalid",
+ memb->nodeid);
+ return -EFAULT;
+ }
+
+ if (!node->new && !node->gone)
continue;
- if (!node) {
- log_rinfo(ls, "remove member %d", memb->nodeid);
+ release_recover = 0;
+
+ if (node->gone) {
+ release_recover = node->release_recover;
+ log_rinfo(ls, "remove member %d%s", memb->nodeid,
+ release_recover ? " (release_recover)" : "");
} else {
/* removed and re-added */
log_rinfo(ls, "remove member %d comm_seq %u %u",
@@ -562,15 +594,18 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
neg++;
list_move(&memb->list, &ls->ls_nodes_gone);
- dlm_midcomms_remove_member(memb->nodeid);
+ remove_remote_member(memb->nodeid);
ls->ls_num_nodes--;
- dlm_lsop_recover_slot(ls, memb);
+ dlm_lsop_recover_slot(ls, memb, release_recover);
}
/* add new members to ls_nodes */
for (i = 0; i < rv->nodes_count; i++) {
node = &rv->nodes[i];
+ if (node->gone)
+ continue;
+
if (dlm_is_member(ls, node->nodeid))
continue;
error = dlm_add_member(ls, node);
@@ -589,7 +624,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
make_member_array(ls);
*neg_out = neg;
- error = ping_members(ls);
+ error = ping_members(ls, rv->seq);
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
return error;
}
@@ -612,7 +647,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
* message to the requestqueue without races.
*/
- down_write(&ls->ls_recv_active);
+ write_lock_bh(&ls->ls_recv_active);
/*
* Abort any recovery that's in progress (see RECOVER_STOP,
@@ -620,18 +655,25 @@ int dlm_ls_stop(struct dlm_ls *ls)
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
+ if (new)
+ timer_delete_sync(&ls->ls_scan_timer);
ls->ls_recover_seq++;
- spin_unlock(&ls->ls_recover_lock);
+
+ /* activate requestqueue and stop processing */
+ write_lock_bh(&ls->ls_requestqueue_lock);
+ set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/
- up_write(&ls->ls_recv_active);
+ write_unlock_bh(&ls->ls_recv_active);
/*
* This in_recovery lock does two things:
@@ -656,13 +698,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
dlm_recoverd_suspend(ls);
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
kfree(ls->ls_slots);
ls->ls_slots = NULL;
ls->ls_num_slots = 0;
ls->ls_slots_size = 0;
ls->ls_recover_status = 0;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
dlm_recoverd_resume(ls);
@@ -696,12 +738,12 @@ int dlm_ls_start(struct dlm_ls *ls)
if (error < 0)
goto fail_rv;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
/* the lockspace needs to be stopped before it can be started */
if (!dlm_locking_stopped(ls)) {
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
log_error(ls, "start ignored: lockspace running");
error = -EINVAL;
goto fail;
@@ -712,7 +754,7 @@ int dlm_ls_start(struct dlm_ls *ls)
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (rv_old) {
log_error(ls, "unused recovery %llx %d",