diff options
Diffstat (limited to 'fs/dlm/member.c')
| -rw-r--r-- | fs/dlm/member.c | 98 |
1 files changed, 70 insertions, 28 deletions
diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 923c01a8a0aa..c0f557a80a75 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -18,7 +18,7 @@ #include "midcomms.h" #include "lowcomms.h" -int dlm_slots_version(struct dlm_header *h) +int dlm_slots_version(const struct dlm_header *h) { if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS) return 0; @@ -307,6 +307,21 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) } } +static int add_remote_member(int nodeid) +{ + int error; + + if (nodeid == dlm_our_nodeid()) + return 0; + + error = dlm_lowcomms_connect_node(nodeid); + if (error < 0) + return error; + + dlm_midcomms_add_member(nodeid); + return 0; +} + static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) { struct dlm_member *memb; @@ -316,16 +331,16 @@ static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) if (!memb) return -ENOMEM; - error = dlm_lowcomms_connect_node(node->nodeid); + memb->nodeid = node->nodeid; + memb->weight = node->weight; + memb->comm_seq = node->comm_seq; + + error = add_remote_member(node->nodeid); if (error < 0) { kfree(memb); return error; } - memb->nodeid = node->nodeid; - memb->weight = node->weight; - memb->comm_seq = node->comm_seq; - dlm_midcomms_add_member(node->nodeid); add_ordered_member(ls, memb); ls->ls_num_nodes++; return 0; @@ -351,6 +366,8 @@ int dlm_is_member(struct dlm_ls *ls, int nodeid) int dlm_is_removed(struct dlm_ls *ls, int nodeid) { + WARN_ON_ONCE(!nodeid || nodeid == -1); + if (find_memb(&ls->ls_nodes_gone, nodeid)) return 1; return 0; @@ -370,14 +387,17 @@ static void clear_memb_list(struct list_head *head, } } -static void clear_members_cb(int nodeid) +static void remove_remote_member(int nodeid) { + if (nodeid == dlm_our_nodeid()) + return; + dlm_midcomms_remove_member(nodeid); } void dlm_clear_members(struct dlm_ls *ls) { - clear_memb_list(&ls->ls_nodes, clear_members_cb); + clear_memb_list(&ls->ls_nodes, remove_remote_member); ls->ls_num_nodes = 0; } @@ -431,7 +451,7 @@ static void make_member_array(struct dlm_ls *ls) /* send a status request to all members just to establish comms connections */ -static int ping_members(struct dlm_ls *ls) +static int ping_members(struct dlm_ls *ls, uint64_t seq) { struct dlm_member *memb; int error = 0; @@ -441,7 +461,7 @@ static int ping_members(struct dlm_ls *ls) error = -EINTR; break; } - error = dlm_rcom_status(ls, memb->nodeid, 0); + error = dlm_rcom_status(ls, memb->nodeid, 0, seq); if (error) break; } @@ -458,7 +478,8 @@ static void dlm_lsop_recover_prep(struct dlm_ls *ls) ls->ls_ops->recover_prep(ls->ls_ops_arg); } -static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb) +static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb, + unsigned int release_recover) { struct dlm_slot slot; uint32_t seq; @@ -473,9 +494,9 @@ static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb) we consider the node to have failed (versus being removed due to dlm_release_lockspace) */ - error = dlm_comm_seq(memb->nodeid, &seq); + error = dlm_comm_seq(memb->nodeid, &seq, false); - if (!error && seq == memb->comm_seq) + if (!release_recover && !error && seq == memb->comm_seq) return; slot.nodeid = memb->nodeid; @@ -532,6 +553,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) struct dlm_member *memb, *safe; struct dlm_config_node *node; int i, error, neg = 0, low = -1; + unsigned int release_recover; /* previously removed members that we've not finished removing need to * count as a negative change so the "neg" recovery steps will happen @@ -549,11 +571,21 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { node = find_config_node(rv, memb->nodeid); - if (node && !node->new) + if (!node) { + log_error(ls, "remove member %d invalid", + memb->nodeid); + return -EFAULT; + } + + if (!node->new && !node->gone) continue; - if (!node) { - log_rinfo(ls, "remove member %d", memb->nodeid); + release_recover = 0; + + if (node->gone) { + release_recover = node->release_recover; + log_rinfo(ls, "remove member %d%s", memb->nodeid, + release_recover ? " (release_recover)" : ""); } else { /* removed and re-added */ log_rinfo(ls, "remove member %d comm_seq %u %u", @@ -562,15 +594,18 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) neg++; list_move(&memb->list, &ls->ls_nodes_gone); - dlm_midcomms_remove_member(memb->nodeid); + remove_remote_member(memb->nodeid); ls->ls_num_nodes--; - dlm_lsop_recover_slot(ls, memb); + dlm_lsop_recover_slot(ls, memb, release_recover); } /* add new members to ls_nodes */ for (i = 0; i < rv->nodes_count; i++) { node = &rv->nodes[i]; + if (node->gone) + continue; + if (dlm_is_member(ls, node->nodeid)) continue; error = dlm_add_member(ls, node); @@ -589,7 +624,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) make_member_array(ls); *neg_out = neg; - error = ping_members(ls); + error = ping_members(ls, rv->seq); log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); return error; } @@ -612,7 +647,7 @@ int dlm_ls_stop(struct dlm_ls *ls) * message to the requestqueue without races. */ - down_write(&ls->ls_recv_active); + write_lock_bh(&ls->ls_recv_active); /* * Abort any recovery that's in progress (see RECOVER_STOP, @@ -620,18 +655,25 @@ int dlm_ls_stop(struct dlm_ls *ls) * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). */ - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); + if (new) + timer_delete_sync(&ls->ls_scan_timer); ls->ls_recover_seq++; - spin_unlock(&ls->ls_recover_lock); + + /* activate requestqueue and stop processing */ + write_lock_bh(&ls->ls_requestqueue_lock); + set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags); + write_unlock_bh(&ls->ls_requestqueue_lock); + spin_unlock_bh(&ls->ls_recover_lock); /* * Let dlm_recv run again, now any normal messages will be saved on the * requestqueue for later. */ - up_write(&ls->ls_recv_active); + write_unlock_bh(&ls->ls_recv_active); /* * This in_recovery lock does two things: @@ -656,13 +698,13 @@ int dlm_ls_stop(struct dlm_ls *ls) dlm_recoverd_suspend(ls); - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); kfree(ls->ls_slots); ls->ls_slots = NULL; ls->ls_num_slots = 0; ls->ls_slots_size = 0; ls->ls_recover_status = 0; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); dlm_recoverd_resume(ls); @@ -696,12 +738,12 @@ int dlm_ls_start(struct dlm_ls *ls) if (error < 0) goto fail_rv; - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); /* the lockspace needs to be stopped before it can be started */ if (!dlm_locking_stopped(ls)) { - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); log_error(ls, "start ignored: lockspace running"); error = -EINVAL; goto fail; @@ -712,7 +754,7 @@ int dlm_ls_start(struct dlm_ls *ls) rv->seq = ++ls->ls_recover_seq; rv_old = ls->ls_recover_args; ls->ls_recover_args = rv; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); if (rv_old) { log_error(ls, "unused recovery %llx %d", |
