summaryrefslogtreecommitdiff
path: root/fs/dlm/recoverd.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/recoverd.c')
-rw-r--r--fs/dlm/recoverd.c204
1 files changed, 159 insertions, 45 deletions
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 6f4e1d42d733..12272a8f6d75 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -1,12 +1,10 @@
+// SPDX-License-Identifier: GPL-2.0-only
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
**
-** This copyrighted material is made available to anyone wishing to use,
-** modify, copy, or redistribute it subject to the terms and conditions
-** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
@@ -22,6 +20,67 @@
#include "requestqueue.h"
#include "recoverd.h"
+static int dlm_create_masters_list(struct dlm_ls *ls)
+{
+ struct dlm_rsb *r;
+ int error = 0;
+
+ write_lock_bh(&ls->ls_masters_lock);
+ if (!list_empty(&ls->ls_masters_list)) {
+ log_error(ls, "root list not empty");
+ error = -EINVAL;
+ goto out;
+ }
+
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
+ if (r->res_nodeid)
+ continue;
+
+ list_add(&r->res_masters_list, &ls->ls_masters_list);
+ dlm_hold_rsb(r);
+ }
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
+ write_unlock_bh(&ls->ls_masters_lock);
+ return error;
+}
+
+static void dlm_release_masters_list(struct dlm_ls *ls)
+{
+ struct dlm_rsb *r, *safe;
+
+ write_lock_bh(&ls->ls_masters_lock);
+ list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
+ list_del_init(&r->res_masters_list);
+ dlm_put_rsb(r);
+ }
+ write_unlock_bh(&ls->ls_masters_lock);
+}
+
+static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
+{
+ struct dlm_rsb *r;
+
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
+ list_add(&r->res_root_list, root_list);
+ dlm_hold_rsb(r);
+ }
+
+ WARN_ON_ONCE(!list_empty(&ls->ls_slow_inactive));
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+}
+
+static void dlm_release_root_list(struct list_head *root_list)
+{
+ struct dlm_rsb *r, *safe;
+
+ list_for_each_entry_safe(r, safe, root_list, res_root_list) {
+ list_del_init(&r->res_root_list);
+ dlm_put_rsb(r);
+ }
+}
/* If the start for which we're re-enabling locking (seq) has been superseded
by a newer stop (ls_recover_seq), we need to leave locking disabled.
@@ -34,24 +93,35 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
int error = -EINTR;
- down_write(&ls->ls_recv_active);
+ write_lock_bh(&ls->ls_recv_active);
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags);
+ /* Schedule next timer if recovery put something on inactive.
+ *
+ * The rsbs that was queued while recovery on toss hasn't
+ * started yet because LSFL_RUNNING was set everything
+ * else recovery hasn't started as well because ls_in_recovery
+ * is still hold. So we should not run into the case that
+ * resume_scan_timer() queues a timer that can occur in
+ * a no op.
+ */
+ resume_scan_timer(ls);
/* unblocks processes waiting to enter the dlm */
up_write(&ls->ls_in_recovery);
clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
error = 0;
}
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
- up_write(&ls->ls_recv_active);
+ write_unlock_bh(&ls->ls_recv_active);
return error;
}
static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
{
+ LIST_HEAD(root_list);
unsigned long start;
int error, neg = 0;
@@ -61,37 +131,57 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_callback_suspend(ls);
- dlm_clear_toss(ls);
+ dlm_clear_inactive(ls);
/*
* This list of root rsb's will be the basis of most of the recovery
* routines.
*/
- dlm_create_root_list(ls);
+ dlm_create_root_list(ls, &root_list);
/*
* Add or remove nodes from the lockspace's ls_nodes list.
+ *
+ * Due to the fact that we must report all membership changes to lsops
+ * or midcomms layer, it is not permitted to abort ls_recover() until
+ * this is done.
*/
error = dlm_recover_members(ls, rv, &neg);
if (error) {
log_rinfo(ls, "dlm_recover_members error %d", error);
- goto fail;
+ goto fail_root_list;
}
- dlm_recover_dir_nodeid(ls);
+ dlm_recover_dir_nodeid(ls, &root_list);
+
+ /* Create a snapshot of all active rsbs were we are the master of.
+ * During the barrier between dlm_recover_members_wait() and
+ * dlm_recover_directory() other nodes can dump their necessary
+ * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
+ * communication dlm_copy_master_names() handling.
+ *
+ * TODO We should create a per lockspace list that contains rsbs
+ * that we are the master of. Instead of creating this list while
+ * recovery we keep track of those rsbs while locking handling and
+ * recovery can use it when necessary.
+ */
+ error = dlm_create_masters_list(ls);
+ if (error) {
+ log_rinfo(ls, "dlm_create_masters_list error %d", error);
+ goto fail_root_list;
+ }
- ls->ls_recover_dir_sent_res = 0;
- ls->ls_recover_dir_sent_msg = 0;
ls->ls_recover_locks_in = 0;
dlm_set_recover_status(ls, DLM_RS_NODES);
- error = dlm_recover_members_wait(ls);
+ error = dlm_recover_members_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_members_wait error %d", error);
- goto fail;
+ dlm_release_masters_list(ls);
+ goto fail_root_list;
}
start = jiffies;
@@ -101,22 +191,23 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* nodes their master rsb names that hash to us.
*/
- error = dlm_recover_directory(ls);
+ error = dlm_recover_directory(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory error %d", error);
- goto fail;
+ dlm_release_masters_list(ls);
+ goto fail_root_list;
}
dlm_set_recover_status(ls, DLM_RS_DIR);
- error = dlm_recover_directory_wait(ls);
+ error = dlm_recover_directory_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
- goto fail;
+ dlm_release_masters_list(ls);
+ goto fail_root_list;
}
- log_rinfo(ls, "dlm_recover_directory %u out %u messages",
- ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
+ dlm_release_masters_list(ls);
/*
* We may have outstanding operations that are waiting for a reply from
@@ -126,44 +217,45 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_recover_waiters_pre(ls);
- error = dlm_recovery_stopped(ls);
- if (error)
- goto fail;
+ if (dlm_recovery_stopped(ls)) {
+ error = -EINTR;
+ goto fail_root_list;
+ }
if (neg || dlm_no_directory(ls)) {
/*
* Clear lkb's for departed nodes.
*/
- dlm_recover_purge(ls);
+ dlm_recover_purge(ls, &root_list);
/*
* Get new master nodeid's for rsb's that were mastered on
* departed nodes.
*/
- error = dlm_recover_masters(ls);
+ error = dlm_recover_masters(ls, rv->seq, &root_list);
if (error) {
log_rinfo(ls, "dlm_recover_masters error %d", error);
- goto fail;
+ goto fail_root_list;
}
/*
* Send our locks on remastered rsb's to the new masters.
*/
- error = dlm_recover_locks(ls);
+ error = dlm_recover_locks(ls, rv->seq, &root_list);
if (error) {
log_rinfo(ls, "dlm_recover_locks error %d", error);
- goto fail;
+ goto fail_root_list;
}
dlm_set_recover_status(ls, DLM_RS_LOCKS);
- error = dlm_recover_locks_wait(ls);
+ error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
- goto fail;
+ goto fail_root_list;
}
log_rinfo(ls, "dlm_recover_locks %u in",
@@ -175,7 +267,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* settings.
*/
- dlm_recover_rsbs(ls);
+ dlm_recover_rsbs(ls, &root_list);
} else {
/*
* Other lockspace members may be going through the "neg" steps
@@ -184,14 +276,14 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
*/
dlm_set_recover_status(ls, DLM_RS_LOCKS);
- error = dlm_recover_locks_wait(ls);
+ error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
- goto fail;
+ goto fail_root_list;
}
}
- dlm_release_root_list(ls);
+ dlm_release_root_list(&root_list);
/*
* Purge directory-related requests that are saved in requestqueue.
@@ -203,7 +295,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_DONE);
- error = dlm_recover_done_wait(ls);
+ error = dlm_recover_done_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_done_wait error %d", error);
goto fail;
@@ -211,8 +303,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_clear_members_gone(ls);
- dlm_adjust_timeouts(ls);
-
dlm_callback_resume(ls);
error = enable_locking(ls, rv->seq);
@@ -240,14 +330,13 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
jiffies_to_msecs(jiffies - start));
mutex_unlock(&ls->ls_recoverd_active);
- dlm_lsop_recover_done(ls);
return 0;
+ fail_root_list:
+ dlm_release_root_list(&root_list);
fail:
- dlm_release_root_list(ls);
- log_rinfo(ls, "dlm_recover %llu error %d",
- (unsigned long long)rv->seq, error);
mutex_unlock(&ls->ls_recoverd_active);
+
return error;
}
@@ -258,16 +347,41 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
static void do_ls_recovery(struct dlm_ls *ls)
{
struct dlm_recover *rv = NULL;
+ int error;
- spin_lock(&ls->ls_recover_lock);
+ spin_lock_bh(&ls->ls_recover_lock);
rv = ls->ls_recover_args;
ls->ls_recover_args = NULL;
if (rv && ls->ls_recover_seq == rv->seq)
clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
- spin_unlock(&ls->ls_recover_lock);
+ spin_unlock_bh(&ls->ls_recover_lock);
if (rv) {
- ls_recover(ls, rv);
+ error = ls_recover(ls, rv);
+ switch (error) {
+ case 0:
+ ls->ls_recovery_result = 0;
+ complete(&ls->ls_recovery_done);
+
+ dlm_lsop_recover_done(ls);
+ break;
+ case -EINTR:
+ /* if recovery was interrupted -EINTR we wait for the next
+ * ls_recover() iteration until it hopefully succeeds.
+ */
+ log_rinfo(ls, "%s %llu interrupted and should be queued to run again",
+ __func__, (unsigned long long)rv->seq);
+ break;
+ default:
+ log_rinfo(ls, "%s %llu error %d", __func__,
+ (unsigned long long)rv->seq, error);
+
+ /* let new_lockspace() get aware of critical error */
+ ls->ls_recovery_result = error;
+ complete(&ls->ls_recovery_done);
+ break;
+ }
+
kfree(rv->nodes);
kfree(rv);
}