diff options
Diffstat (limited to 'fs/dlm/recover.c')
| -rw-r--r-- | fs/dlm/recover.c | 122 |
1 files changed, 67 insertions, 55 deletions
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index f493d5f30c58..3ac020fb8139 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -293,73 +293,78 @@ static void recover_list_clear(struct dlm_ls *ls) spin_unlock_bh(&ls->ls_recover_list_lock); } -static int recover_idr_empty(struct dlm_ls *ls) +static int recover_xa_empty(struct dlm_ls *ls) { int empty = 1; - spin_lock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); if (ls->ls_recover_list_count) empty = 0; - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); return empty; } -static int recover_idr_add(struct dlm_rsb *r) +static int recover_xa_add(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; + struct xa_limit limit = { + .min = 1, + .max = UINT_MAX, + }; + uint32_t id; int rv; - spin_lock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); if (r->res_id) { rv = -1; goto out_unlock; } - rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT); + rv = xa_alloc(&ls->ls_recover_xa, &id, r, limit, GFP_ATOMIC); if (rv < 0) goto out_unlock; - r->res_id = rv; + r->res_id = id; ls->ls_recover_list_count++; dlm_hold_rsb(r); rv = 0; out_unlock: - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); return rv; } -static void recover_idr_del(struct dlm_rsb *r) +static void recover_xa_del(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - spin_lock_bh(&ls->ls_recover_idr_lock); - idr_remove(&ls->ls_recover_idr, r->res_id); + spin_lock_bh(&ls->ls_recover_xa_lock); + xa_erase_bh(&ls->ls_recover_xa, r->res_id); r->res_id = 0; ls->ls_recover_list_count--; - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); dlm_put_rsb(r); } -static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) +static struct dlm_rsb *recover_xa_find(struct dlm_ls *ls, uint64_t id) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_recover_idr_lock); - r = idr_find(&ls->ls_recover_idr, (int)id); - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); + r = xa_load(&ls->ls_recover_xa, (int)id); + spin_unlock_bh(&ls->ls_recover_xa_lock); return r; } -static void recover_idr_clear(struct dlm_ls *ls) +static void recover_xa_clear(struct dlm_ls *ls) { struct dlm_rsb *r; - int id; + unsigned long id; - spin_lock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); - idr_for_each_entry(&ls->ls_recover_idr, r, id) { - idr_remove(&ls->ls_recover_idr, id); + xa_for_each(&ls->ls_recover_xa, id, r) { + xa_erase_bh(&ls->ls_recover_xa, id); r->res_id = 0; r->res_recover_locks_count = 0; ls->ls_recover_list_count--; @@ -372,7 +377,7 @@ static void recover_idr_clear(struct dlm_ls *ls) ls->ls_recover_list_count); ls->ls_recover_list_count = 0; } - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); } @@ -447,10 +452,11 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) int is_removed = 0; int error; - if (is_master(r)) + if (r->res_nodeid != -1 && is_master(r)) return 0; - is_removed = dlm_is_removed(ls, r->res_nodeid); + if (r->res_nodeid != -1) + is_removed = dlm_is_removed(ls, r->res_nodeid); if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER)) return 0; @@ -470,7 +476,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) set_new_master(r); error = 0; } else { - recover_idr_add(r); + recover_xa_add(r); error = dlm_send_rcom_lookup(r, dir_nodeid, seq); } @@ -551,10 +557,10 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq, log_rinfo(ls, "dlm_recover_masters %u of %u", count, total); - error = dlm_wait_function(ls, &recover_idr_empty); + error = dlm_wait_function(ls, &recover_xa_empty); out: if (error) - recover_idr_clear(ls); + recover_xa_clear(ls); return error; } @@ -563,7 +569,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) struct dlm_rsb *r; int ret_nodeid, new_master; - r = recover_idr_find(ls, le64_to_cpu(rc->rc_id)); + r = recover_xa_find(ls, le64_to_cpu(rc->rc_id)); if (!r) { log_error(ls, "dlm_recover_master_reply no id %llx", (unsigned long long)le64_to_cpu(rc->rc_id)); @@ -582,9 +588,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) r->res_nodeid = new_master; set_new_master(r); unlock_rsb(r); - recover_idr_del(r); + recover_xa_del(r); - if (recover_idr_empty(ls)) + if (recover_xa_empty(ls)) wake_up(&ls->ls_wait_general); out: return 0; @@ -659,7 +665,7 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, int error, count = 0; list_for_each_entry(r, root_list, res_root_list) { - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { rsb_clear_flag(r, RSB_NEW_MASTER); continue; } @@ -805,33 +811,42 @@ static void recover_lvb(struct dlm_rsb *r) } /* All master rsb's flagged RECOVER_CONVERT need to be looked at. The locks - converting PR->CW or CW->PR need to have their lkb_grmode set. */ + * converting PR->CW or CW->PR may need to have their lkb_grmode changed. + */ static void recover_conversion(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; + uint32_t other_lkid = 0; + int other_grmode = -1; struct dlm_lkb *lkb; - int grmode = -1; list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { if (lkb->lkb_grmode == DLM_LOCK_PR || lkb->lkb_grmode == DLM_LOCK_CW) { - grmode = lkb->lkb_grmode; + other_grmode = lkb->lkb_grmode; + other_lkid = lkb->lkb_id; break; } } + if (other_grmode == -1) + return; + list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { - if (lkb->lkb_grmode != DLM_LOCK_IV) - continue; - if (grmode == -1) { - log_debug(ls, "recover_conversion %x set gr to rq %d", - lkb->lkb_id, lkb->lkb_rqmode); - lkb->lkb_grmode = lkb->lkb_rqmode; - } else { - log_debug(ls, "recover_conversion %x set gr %d", - lkb->lkb_id, grmode); - lkb->lkb_grmode = grmode; + /* Lock recovery created incompatible granted modes, so + * change the granted mode of the converting lock to + * NL. The rqmode of the converting lock should be CW, + * which means the converting lock should be granted at + * the end of recovery. + */ + if (((lkb->lkb_grmode == DLM_LOCK_PR) && (other_grmode == DLM_LOCK_CW)) || + ((lkb->lkb_grmode == DLM_LOCK_CW) && (other_grmode == DLM_LOCK_PR))) { + log_rinfo(ls, "%s %x gr %d rq %d, remote %d %x, other_lkid %u, other gr %d, set gr=NL", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, + lkb->lkb_remid, other_lkid, other_grmode); + lkb->lkb_grmode = DLM_LOCK_NL; } } } @@ -853,7 +868,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list) list_for_each_entry(r, root_list, res_root_list) { lock_rsb(r); - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { if (rsb_flag(r, RSB_RECOVER_CONVERT)) recover_conversion(r); @@ -877,29 +892,26 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list) log_rinfo(ls, "dlm_recover_rsbs %d done", count); } -/* Create a single list of all root rsb's to be used during recovery */ - -void dlm_clear_toss(struct dlm_ls *ls) +void dlm_clear_inactive(struct dlm_ls *ls) { struct dlm_rsb *r, *safe; unsigned int count = 0; write_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) { - list_del(&r->res_rsbs_list); + list_for_each_entry_safe(r, safe, &ls->ls_slow_inactive, res_slow_list) { + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); - /* remove it from the toss queue if its part of it */ - if (!list_empty(&r->res_toss_q_list)) - list_del_init(&r->res_toss_q_list); + if (!list_empty(&r->res_scan_list)) + list_del_init(&r->res_scan_list); - free_toss_rsb(r); + free_inactive_rsb(r); count++; } write_unlock_bh(&ls->ls_rsbtbl_lock); if (count) - log_rinfo(ls, "dlm_clear_toss %u done", count); + log_rinfo(ls, "dlm_clear_inactive %u done", count); } |
