diff options
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r-- | fs/dlm/lock.c | 1401 |
1 files changed, 780 insertions, 621 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index fd752dd03896..e01d5f29f4d2 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, const struct dlm_message *ms, bool local); static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); -static void toss_rsb(struct kref *kref); +static void deactivate_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -201,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r) /* Threads cannot use the lockspace while it's being recovered */ -static inline void dlm_lock_recovery(struct dlm_ls *ls) +void dlm_lock_recovery(struct dlm_ls *ls) { down_read(&ls->ls_in_recovery); } @@ -320,11 +320,18 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) * Basic operations on rsb's and lkb's */ +static inline unsigned long rsb_toss_jiffies(void) +{ + return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ); +} + /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ static inline void hold_rsb(struct dlm_rsb *r) { + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); kref_get(&r->res_ref); } @@ -333,19 +340,45 @@ void dlm_hold_rsb(struct dlm_rsb *r) hold_rsb(r); } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ +/* TODO move this to lib/refcount.c */ +static __must_check bool +dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock) +__cond_acquires(lock) +{ + if (refcount_dec_not_one(r)) + return false; + + write_lock_bh(lock); + if (!refcount_dec_and_test(r)) { + write_unlock_bh(lock); + return false; + } + + return true; +} + +/* TODO move this to include/linux/kref.h */ +static inline int dlm_kref_put_write_lock_bh(struct kref *kref, + void (*release)(struct kref *kref), + rwlock_t *lock) +{ + if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) { + release(kref); + return 1; + } + + return 0; +} static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - uint32_t bucket = r->res_bucket; int rv; - rv = kref_put_lock(&r->res_ref, toss_rsb, - &ls->ls_rsbtbl[bucket].lock); + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, + &ls->ls_rsbtbl_lock); if (rv) - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); } void dlm_put_rsb(struct dlm_rsb *r) @@ -353,36 +386,209 @@ void dlm_put_rsb(struct dlm_rsb *r) put_rsb(r); } -static int pre_rsb_struct(struct dlm_ls *ls) +/* connected with timer_delete_sync() in dlm_ls_stop() to stop + * new timers when recovery is triggered and don't run them + * again until a resume_scan_timer() tries it again. + */ +static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) { - struct dlm_rsb *r1, *r2; - int count = 0; + if (!dlm_locking_stopped(ls)) + mod_timer(&ls->ls_scan_timer, jiffies); +} - spin_lock(&ls->ls_new_rsb_spin); - if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { - spin_unlock(&ls->ls_new_rsb_spin); - return 0; - } - spin_unlock(&ls->ls_new_rsb_spin); +/* This function tries to resume the timer callback if a rsb + * is on the scan list and no timer is pending. It might that + * the first entry is on currently executed as timer callback + * but we don't care if a timer queued up again and does + * nothing. Should be a rare case. + */ +void resume_scan_timer(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + + spin_lock_bh(&ls->ls_scan_lock); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (r && !timer_pending(&ls->ls_scan_timer)) + enable_scan_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_scan_lock); +} + +/* ls_rsbtbl_lock must be held */ + +static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) +{ + struct dlm_rsb *first; + + /* active rsbs should never be on the scan list */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); - r1 = dlm_allocate_rsb(ls); - r2 = dlm_allocate_rsb(ls); + spin_lock_bh(&ls->ls_scan_lock); + r->res_toss_time = 0; + + /* if the rsb is not queued do nothing */ + if (list_empty(&r->res_scan_list)) + goto out; - spin_lock(&ls->ls_new_rsb_spin); - if (r1) { - list_add(&r1->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; + /* get the first element before delete */ + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_del_init(&r->res_scan_list); + /* check if the first element was the rsb we deleted */ + if (first == r) { + /* try to get the new first element, if the list + * is empty now try to delete the timer, if we are + * too late we don't care. + * + * if the list isn't empty and a new first element got + * in place, set the new timer expire time. + */ + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (!first) + timer_delete(&ls->ls_scan_timer); + else + enable_scan_timer(ls, first->res_toss_time); } - if (r2) { - list_add(&r2->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; + +out: + spin_unlock_bh(&ls->ls_scan_lock); +} + +static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) +{ + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *first; + + /* A dir record for a remote master rsb should never be on the scan list. */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* An active rsb should never be on the scan list. */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + + /* An rsb should not already be on the scan list. */ + WARN_ON(!list_empty(&r->res_scan_list)); + + spin_lock_bh(&ls->ls_scan_lock); + /* set the new rsb absolute expire time in the rsb */ + r->res_toss_time = rsb_toss_jiffies(); + if (list_empty(&ls->ls_scan_list)) { + /* if the queue is empty add the element and it's + * our new expire time + */ + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + enable_scan_timer(ls, r->res_toss_time); + } else { + /* try to get the maybe new first element and then add + * to this rsb with the oldest expire time to the end + * of the queue. If the list was empty before this + * rsb expire time is our next expiration if it wasn't + * the now new first elemet is our new expiration time + */ + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + if (!first) + enable_scan_timer(ls, r->res_toss_time); + else + enable_scan_timer(ls, first->res_toss_time); } - count = ls->ls_new_rsb_count; - spin_unlock(&ls->ls_new_rsb_spin); + spin_unlock_bh(&ls->ls_scan_lock); +} - if (!count) - return -ENOMEM; - return 0; +/* if we hit contention we do in 250 ms a retry to trylock. + * if there is any other mod_timer in between we don't care + * about that it expires earlier again this is only for the + * unlikely case nothing happened in this time. + */ +#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) + +/* Called by lockspace scan_timer to free unused rsb's. */ + +void dlm_rsb_scan(struct timer_list *timer) +{ + struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *r; + int rv; + + while (1) { + /* interrupting point to leave iteration when + * recovery waits for timer_delete_sync(), recovery + * will take care to delete everything in scan list. + */ + if (dlm_locking_stopped(ls)) + break; + + rv = spin_trylock(&ls->ls_scan_lock); + if (!rv) { + /* rearm again try timer */ + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (!r) { + /* the next add_scan will enable the timer again */ + spin_unlock(&ls->ls_scan_lock); + break; + } + + /* + * If the first rsb is not yet expired, then stop because the + * list is sorted with nearest expiration first. + */ + if (time_before(jiffies, r->res_toss_time)) { + /* rearm with the next rsb to expire in the future */ + enable_scan_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_scan_lock); + break; + } + + /* in find_rsb_dir/nodir there is a reverse order of this + * lock, however this is only a trylock if we hit some + * possible contention we try it again. + */ + rv = write_trylock(&ls->ls_rsbtbl_lock); + if (!rv) { + spin_unlock(&ls->ls_scan_lock); + /* rearm again try timer */ + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + list_del(&r->res_slow_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); + + /* ls_rsbtbl_lock is not needed when calling send_remove() */ + write_unlock(&ls->ls_rsbtbl_lock); + + list_del_init(&r->res_scan_list); + spin_unlock(&ls->ls_scan_lock); + + /* An rsb that is a dir record for a remote master rsb + * cannot be removed, and should not have a timer enabled. + */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* We're the master of this rsb but we're not + * the directory record, so we need to tell the + * dir node to remove the dir record + */ + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) + send_remove(r); + + free_inactive_rsb(r); + } } /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can @@ -393,102 +599,52 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, struct dlm_rsb **r_ret) { struct dlm_rsb *r; - int count; - spin_lock(&ls->ls_new_rsb_spin); - if (list_empty(&ls->ls_new_rsb)) { - count = ls->ls_new_rsb_count; - spin_unlock(&ls->ls_new_rsb_spin); - log_debug(ls, "find_rsb retry %d %d %s", - count, dlm_config.ci_new_rsb_count, - (const char *)name); - return -EAGAIN; - } - - r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); - list_del(&r->res_hashchain); - /* Convert the empty list_head to a NULL rb_node for tree usage: */ - memset(&r->res_hashnode, 0, sizeof(struct rb_node)); - ls->ls_new_rsb_count--; - spin_unlock(&ls->ls_new_rsb_spin); + r = dlm_allocate_rsb(); + if (!r) + return -ENOMEM; r->res_ls = ls; r->res_length = len; memcpy(r->res_name, name, len); - mutex_init(&r->res_mutex); + spin_lock_init(&r->res_lock); INIT_LIST_HEAD(&r->res_lookup); INIT_LIST_HEAD(&r->res_grantqueue); INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); + INIT_LIST_HEAD(&r->res_scan_list); INIT_LIST_HEAD(&r->res_recover_list); + INIT_LIST_HEAD(&r->res_masters_list); *r_ret = r; return 0; } -static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) +int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, + struct dlm_rsb **r_ret) { - char maxname[DLM_RESNAME_MAXLEN]; + char key[DLM_RESNAME_MAXLEN] = {}; - memset(maxname, 0, DLM_RESNAME_MAXLEN); - memcpy(maxname, name, nlen); - return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); -} + memcpy(key, name, len); + *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params); + if (*r_ret) + return 0; -int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, - struct dlm_rsb **r_ret) -{ - struct rb_node *node = tree->rb_node; - struct dlm_rsb *r; - int rc; - - while (node) { - r = rb_entry(node, struct dlm_rsb, res_hashnode); - rc = rsb_cmp(r, name, len); - if (rc < 0) - node = node->rb_left; - else if (rc > 0) - node = node->rb_right; - else - goto found; - } - *r_ret = NULL; return -EBADR; - - found: - *r_ret = r; - return 0; } -static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) +static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - struct rb_node **newn = &tree->rb_node; - struct rb_node *parent = NULL; - int rc; - - while (*newn) { - struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, - res_hashnode); + int rv; - parent = *newn; - rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); - if (rc < 0) - newn = &parent->rb_left; - else if (rc > 0) - newn = &parent->rb_right; - else { - log_print("rsb_insert match"); - dlm_dump_rsb(rsb); - dlm_dump_rsb(cur); - return -EEXIST; - } - } + rv = rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); + if (!rv) + rsb_set_flag(rsb, RSB_HASHED); - rb_link_node(&rsb->res_hashnode, parent, newn); - rb_insert_color(&rsb->res_hashnode, tree); - return 0; + return rv; } /* @@ -518,7 +674,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * - * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * @@ -536,8 +692,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) */ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, - uint32_t hash, uint32_t b, - int dir_nodeid, int from_nodeid, + uint32_t hash, int dir_nodeid, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r = NULL; @@ -567,9 +722,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if - * someone sends us a request after we have removed/freed an rsb - * from our toss list. (They sent a request instead of lookup - * because they are using an rsb from their toss list.) + * someone sends us a request after we have removed/freed an rsb. + * (They sent a request instead of lookup because they are using + * an rsb taken from their scan list.) */ if (from_local || from_dir || @@ -578,51 +733,83 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } retry: - if (create) { - error = pre_rsb_struct(ls); - if (error < 0) - goto out; - } - - spin_lock(&ls->ls_rsbtbl[b].lock); - - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) - goto do_toss; + goto do_new; + + /* check if the rsb is active under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; + goto do_new; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } + kref_get(&r->res_ref); - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto out; - do_toss: - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + do_inactive: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* + * The expectation here is that the rsb will have HASHED and + * INACTIVE flags set, and that the rsb can be moved from + * inactive back to active again. However, between releasing + * the read lock and acquiring the write lock, this rsb could + * have been removed from rsbtbl, and had HASHED cleared, to + * be freed. To deal with this case, we would normally need + * to repeat dlm_search_rsb_tree while holding the write lock, + * but rcu allows us to simply check the HASHED flag, because + * the rcu read lock means the rsb will not be freed yet. + * If the HASHED flag is not set, then the rsb is being freed, + * so we add a new rsb struct. If the HASHED flag is set, + * and INACTIVE is not set, it means another thread has + * made the rsb active, as we're expecting to do here, and + * we just repeat the lookup (this will be very unlikely.) + */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; goto do_new; + } /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread - * is using this rsb because it's on the toss list, so we can + * is using this rsb because it's inactive, so we can * look at or update res_master_nodeid without lock_rsb. */ if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node) has sent us a request */ - log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */ - log_error(ls, "find_rsb toss from_dir %d master %d", + log_error(ls, "find_rsb inactive from_dir %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); /* fix it and go on */ @@ -639,9 +826,18 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - goto out_unlock; + /* we always deactivate scan timer for the rsb, when + * we move it out of the inactive state as rsb state + * can be changed and scan timers are only for inactive + * rsbs. + */ + del_scan(ls, r); + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); + kref_init(&r->res_ref); /* ref is now used in active state */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + + goto out; do_new: @@ -650,18 +846,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, */ if (error == -EBADR && !create) - goto out_unlock; + goto out; error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = dir_nodeid; kref_init(&r->res_ref); @@ -681,7 +872,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, dlm_free_rsb(r); r = NULL; error = -ENOTBLK; - goto out_unlock; + goto out; } if (from_other) { @@ -701,9 +892,20 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } out_add: - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (!error) { + list_add(&r->res_slow_list, &ls->ls_slow_active); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); out: *r_ret = r; return error; @@ -714,8 +916,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, dlm_recover_masters). */ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, - uint32_t hash, uint32_t b, - int dir_nodeid, int from_nodeid, + uint32_t hash, int dir_nodeid, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r = NULL; @@ -724,59 +925,82 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, int error; retry: - error = pre_rsb_struct(ls); - if (error < 0) - goto out; + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto do_new; - spin_lock(&ls->ls_rsbtbl[b].lock); + /* check if the rsb is in active state under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_new; + } - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (error) - goto do_toss; + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ kref_get(&r->res_ref); - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto out; - do_toss: - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + + do_inactive: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* See comment in find_rsb_dir. */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new; + } + /* * rsb found inactive. No other thread is using this rsb because - * it's on the toss list, so we can look at or update - * res_master_nodeid without lock_rsb. + * it's inactive, so we can look at or update res_master_nodeid + * without lock_rsb. */ if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { /* our rsb is not master, and another node has sent us a request; this should never happen */ - log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d", from_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if (!recover && (r->res_master_nodeid != our_nodeid) && (dir_nodeid == our_nodeid)) { /* our rsb is not master, and we are dir; may as well fix it; this should never happen */ - log_error(ls, "find_rsb toss our %d master %d dir %d", + log_error(ls, "find_rsb inactive our %d master %d dir %d", our_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; } - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - goto out_unlock; + del_scan(ls, r); + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); + kref_init(&r->res_ref); + write_unlock_bh(&ls->ls_rsbtbl_lock); + + goto out; do_new: @@ -785,49 +1009,98 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, */ error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = dir_nodeid; r->res_master_nodeid = dir_nodeid; r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; kref_init(&r->res_ref); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (!error) { + list_add(&r->res_slow_list, &ls->ls_slow_active); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); + out: *r_ret = r; return error; } +/* + * rsb rcu usage + * + * While rcu read lock is held, the rsb cannot be freed, + * which allows a lookup optimization. + * + * Two threads are accessing the same rsb concurrently, + * the first (A) is trying to use the rsb, the second (B) + * is trying to free the rsb. + * + * thread A thread B + * (trying to use rsb) (trying to free rsb) + * + * A1. rcu read lock + * A2. rsbtbl read lock + * A3. look up rsb in rsbtbl + * A4. rsbtbl read unlock + * B1. rsbtbl write lock + * B2. look up rsb in rsbtbl + * B3. remove rsb from rsbtbl + * B4. clear rsb HASHED flag + * B5. rsbtbl write unlock + * B6. begin freeing rsb using rcu... + * + * (rsb is inactive, so try to make it active again) + * A5. read rsb HASHED flag (safe because rsb is not freed yet) + * A6. the rsb HASHED flag is not set, which it means the rsb + * is being removed from rsbtbl and freed, so don't use it. + * A7. rcu read unlock + * + * B7. ...finish freeing rsb using rcu + * A8. create a new rsb + * + * Without the rcu optimization, steps A5-8 would need to do + * an extra rsbtbl lookup: + * A5. rsbtbl write lock + * A6. look up rsb in rsbtbl, not found + * A7. rsbtbl write unlock + * A8. create a new rsb + */ + static int find_rsb(struct dlm_ls *ls, const void *name, int len, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { - uint32_t hash, b; int dir_nodeid; + uint32_t hash; + int rv; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - dir_nodeid = dlm_hash2nodeid(ls, hash); + rcu_read_lock(); if (dlm_no_directory(ls)) - return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); else - return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, - from_nodeid, flags, r_ret); + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, + from_nodeid, flags, r_ret); + rcu_read_unlock(); + return rv; } /* we have received a request and found that res_master_nodeid != our_nodeid, @@ -874,7 +1147,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, } static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, - int from_nodeid, bool toss_list, unsigned int flags, + int from_nodeid, bool is_inactive, unsigned int flags, int *r_nodeid, int *result) { int fix_master = (flags & DLM_LU_RECOVER_MASTER); @@ -887,7 +1160,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_dir_nodeid = our_nodeid; } - if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { + if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) { /* Recovery uses this function to set a new master when * the previous master failed. Setting NEW_MASTER will * force dlm_recover_masters to call recover_master on this @@ -898,9 +1171,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_nodeid = from_nodeid; rsb_set_flag(r, RSB_NEW_MASTER); - if (toss_list) { - /* I don't think we should ever find it on toss list. */ - log_error(ls, "%s fix_master on toss", __func__); + if (is_inactive) { + /* I don't think we should ever find it inactive. */ + log_error(ls, "%s fix_master inactive", __func__); dlm_dump_rsb(r); } } @@ -940,7 +1213,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no if (!from_master && !fix_master && (r->res_master_nodeid == from_nodeid)) { /* this can happen when the master sends remove, the dir node - * finds the rsb on the keep list and ignores the remove, + * finds the rsb on the active list and ignores the remove, * and the former master sends a lookup */ @@ -984,11 +1257,11 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, - int len, unsigned int flags, int *r_nodeid, int *result) +static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; - uint32_t hash, b; + uint32_t hash; int our_nodeid = dlm_our_nodeid(); int dir_nodeid, error; @@ -1002,8 +1275,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - dir_nodeid = dlm_hash2nodeid(ls, hash); if (dir_nodeid != our_nodeid) { log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", @@ -1014,160 +1285,199 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } retry: - error = pre_rsb_struct(ls); - if (error < 0) - return error; + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto not_found; - spin_lock(&ls->ls_rsbtbl[b].lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (!error) { - /* because the rsb is active, we need to lock_rsb before - * checking/changing re_master_nodeid - */ + /* check if the rsb is active under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto not_found; + } - hold_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - lock_rsb(r); + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } - __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, - flags, r_nodeid, result); + /* because the rsb is active, we need to lock_rsb before + * checking/changing re_master_nodeid + */ - /* the rsb was active */ - unlock_rsb(r); - put_rsb(r); + hold_rsb(r); + read_unlock_bh(&ls->ls_rsbtbl_lock); + lock_rsb(r); - return 0; - } + __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, + flags, r_nodeid, result); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + /* the rsb was active */ + unlock_rsb(r); + put_rsb(r); + + return 0; + + do_inactive: + /* unlikely path - check if still part of ls_rsbtbl */ + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* see comment in find_rsb_dir */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* something as changed, very unlikely but + * try again + */ + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); goto not_found; + } - /* because the rsb is inactive (on toss list), it's not refcounted - * and lock_rsb is not used, but is protected by the rsbtbl lock - */ + /* because the rsb is inactive, it's not refcounted and lock_rsb + is not used, but is protected by the rsbtbl lock */ __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - r->res_toss_time = jiffies; - /* the rsb was inactive (on toss list) */ - spin_unlock(&ls->ls_rsbtbl[b].lock); + /* A dir record rsb should never be on scan list. + * Except when we are the dir and master node. + * This function should only be called by the dir + * node. + */ + WARN_ON(!list_empty(&r->res_scan_list) && + r->res_master_nodeid != our_nodeid); + + write_unlock_bh(&ls->ls_rsbtbl_lock); return 0; not_found: error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = our_nodeid; r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; - kref_init(&r->res_ref); - r->res_toss_time = jiffies; + rsb_set_flag(r, RSB_INACTIVE); - error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); - if (error) { + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (error) { + write_unlock_bh(&ls->ls_rsbtbl_lock); /* should never happen */ dlm_free_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); goto retry; } + list_add(&r->res_slow_list, &ls->ls_slow_inactive); + write_unlock_bh(&ls->ls_rsbtbl_lock); + if (result) *result = DLM_LU_ADD; *r_nodeid = from_nodeid; - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + out: return error; } +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) +{ + int rv; + rcu_read_lock(); + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); + rcu_read_unlock(); + return rv; +} + static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { - struct rb_node *n; struct dlm_rsb *r; - int i; - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - spin_lock(&ls->ls_rsbtbl[i].lock); - for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - if (r->res_hash == hash) - dlm_dump_rsb(r); - } - spin_unlock(&ls->ls_rsbtbl[i].lock); + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { + if (r->res_hash == hash) + dlm_dump_rsb(r); } + read_unlock_bh(&ls->ls_rsbtbl_lock); } void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) { struct dlm_rsb *r = NULL; - uint32_t hash, b; int error; - hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - - spin_lock(&ls->ls_rsbtbl[b].lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + rcu_read_lock(); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) - goto out_dump; - - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) goto out; - out_dump: + dlm_dump_rsb(r); out: - spin_unlock(&ls->ls_rsbtbl[b].lock); + rcu_read_unlock(); } -static void toss_rsb(struct kref *kref) +static void deactivate_rsb(struct kref *kref) { struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; + int our_nodeid = dlm_our_nodeid(); DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); - kref_init(&r->res_ref); - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); - rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); - r->res_toss_time = jiffies; - set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags); + rsb_set_flag(r, RSB_INACTIVE); + list_move(&r->res_slow_list, &ls->ls_slow_inactive); + + /* + * When the rsb becomes unused, there are two possibilities: + * 1. Leave the inactive rsb in place (don't remove it). + * 2. Add it to the scan list to be removed. + * + * 1 is done when the rsb is acting as the dir record + * for a remotely mastered rsb. The rsb must be left + * in place as an inactive rsb to act as the dir record. + * + * 2 is done when a) the rsb is not the master and not the + * dir record, b) when the rsb is both the master and the + * dir record, c) when the rsb is master but not dir record. + * + * (If no directory is used, the rsb can always be removed.) + */ + if (dlm_no_directory(ls) || + (r->res_master_nodeid == our_nodeid || + dlm_dir_nodeid(r) != our_nodeid)) + add_scan(ls, r); + if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } -/* See comment for unhold_lkb */ - -static void unhold_rsb(struct dlm_rsb *r) +void free_inactive_rsb(struct dlm_rsb *r) { - int rv; - rv = kref_put(&r->res_ref, toss_rsb); - DLM_ASSERT(!rv, dlm_dump_rsb(r);); -} - -static void kill_rsb(struct kref *kref) -{ - struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); - - /* All work is done after the return from kref_put() so we - can release the write_lock before the remove and free. */ + WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); + + dlm_free_rsb(r); } /* Attaching/detaching lkb's from rsb's is for rsb reference counting. @@ -1188,36 +1498,34 @@ static void detach_lkb(struct dlm_lkb *lkb) } static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, - int start, int end) + unsigned long start, unsigned long end) { + struct xa_limit limit; struct dlm_lkb *lkb; int rv; - lkb = dlm_allocate_lkb(ls); + limit.max = end; + limit.min = start; + + lkb = dlm_allocate_lkb(); if (!lkb) return -ENOMEM; - lkb->lkb_last_bast_mode = -1; + lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV; + lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV; + lkb->lkb_last_cb_mode = DLM_LOCK_IV; lkb->lkb_nodeid = -1; lkb->lkb_grmode = DLM_LOCK_IV; kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); - INIT_LIST_HEAD(&lkb->lkb_cb_list); - INIT_LIST_HEAD(&lkb->lkb_callbacks); - spin_lock_init(&lkb->lkb_cb_lock); - INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); - - idr_preload(GFP_NOFS); - spin_lock(&ls->ls_lkbidr_spin); - rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); - if (rv >= 0) - lkb->lkb_id = rv; - spin_unlock(&ls->ls_lkbidr_spin); - idr_preload_end(); + + write_lock_bh(&ls->ls_lkbxa_lock); + rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC); + write_unlock_bh(&ls->ls_lkbxa_lock); if (rv < 0) { - log_error(ls, "create_lkb idr error %d", rv); + log_error(ls, "create_lkb xa error %d", rv); dlm_free_lkb(lkb); return rv; } @@ -1228,18 +1536,28 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { - return _create_lkb(ls, lkb_ret, 1, 0); + return _create_lkb(ls, lkb_ret, 1, ULONG_MAX); } static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - spin_lock(&ls->ls_lkbidr_spin); - lkb = idr_find(&ls->ls_lkbidr, lkid); - if (lkb) - kref_get(&lkb->lkb_ref); - spin_unlock(&ls->ls_lkbidr_spin); + rcu_read_lock(); + lkb = xa_load(&ls->ls_lkbxa, lkid); + if (lkb) { + /* check if lkb is still part of lkbxa under lkbxa_lock as + * the lkb_ref is tight to the lkbxa data structure, see + * __put_lkb(). + */ + read_lock_bh(&ls->ls_lkbxa_lock); + if (kref_read(&lkb->lkb_ref)) + kref_get(&lkb->lkb_ref); + else + lkb = NULL; + read_unlock_bh(&ls->ls_lkbxa_lock); + } + rcu_read_unlock(); *lkb_ret = lkb; return lkb ? 0 : -ENOENT; @@ -1263,11 +1581,11 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) uint32_t lkid = lkb->lkb_id; int rv; - rv = kref_put_lock(&lkb->lkb_ref, kill_lkb, - &ls->ls_lkbidr_spin); + rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb, + &ls->ls_lkbxa_lock); if (rv) { - idr_remove(&ls->ls_lkbidr, lkid); - spin_unlock(&ls->ls_lkbidr_spin); + xa_erase(&ls->ls_lkbxa, lkid); + write_unlock_bh(&ls->ls_lkbxa_lock); detach_lkb(lkb); @@ -1377,10 +1695,8 @@ static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) { - hold_lkb(lkb); del_lkb(r, lkb); add_lkb(r, lkb, sts); - unhold_lkb(lkb); } static int msg_reply_type(int mstype) @@ -1403,19 +1719,11 @@ static int msg_reply_type(int mstype) /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) +static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; - int error = 0; - - mutex_lock(&ls->ls_waiters_mutex); - - if (is_overlap_unlock(lkb) || - (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { - error = -EINVAL; - goto out; - } + spin_lock_bh(&ls->ls_waiters_lock); if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { switch (mstype) { case DLM_MSG_UNLOCK: @@ -1425,7 +1733,11 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); break; default: - error = -EBUSY; + /* should never happen as validate_lock_args() checks + * on lkb_wait_type and validate_unlock_args() only + * creates UNLOCK or CANCEL messages. + */ + WARN_ON_ONCE(1); goto out; } lkb->lkb_wait_count++; @@ -1447,12 +1759,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: - if (error) - log_error(ls, "addwait error %x %d flags %x %d %d %s", - lkb->lkb_id, error, dlm_iflags_val(lkb), mstype, - lkb->lkb_wait_type, lkb->lkb_resource->res_name); - mutex_unlock(&ls->ls_waiters_mutex); - return error; + spin_unlock_bh(&ls->ls_waiters_lock); } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -1551,14 +1858,18 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); error = _remove_from_waiters(lkb, mstype, NULL); - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); return error; } /* Handles situations where we might be processing a "fake" or "local" reply in - which we can't try to take waiters_mutex again. */ + * the recovery context which stops any locking activity. Only debugfs might + * change the lockspace waiters but they will held the recovery lock to ensure + * remove_from_waiters_ms() in local case will be the only user manipulating the + * lockspace waiters in recovery context. + */ static int remove_from_waiters_ms(struct dlm_lkb *lkb, const struct dlm_message *ms, bool local) @@ -1567,159 +1878,16 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, int error; if (!local) - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); + else + WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) || + !dlm_locking_stopped(ls)); error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); if (!local) - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); return error; } -static void shrink_bucket(struct dlm_ls *ls, int b) -{ - struct rb_node *n, *next; - struct dlm_rsb *r; - char *name; - int our_nodeid = dlm_our_nodeid(); - int remote_count = 0; - int need_shrink = 0; - int i, len, rv; - - memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); - - spin_lock(&ls->ls_rsbtbl[b].lock); - - if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - return; - } - - for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { - next = rb_next(n); - r = rb_entry(n, struct dlm_rsb, res_hashnode); - - /* If we're the directory record for this rsb, and - we're not the master of it, then we need to wait - for the master node to send us a dir remove for - before removing the dir record. */ - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) == our_nodeid)) { - continue; - } - - need_shrink = 1; - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - continue; - } - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid == our_nodeid) && - (dlm_dir_nodeid(r) != our_nodeid)) { - - /* We're the master of this rsb but we're not - the directory record, so we need to tell the - dir node to remove the dir record. */ - - ls->ls_remove_lens[remote_count] = r->res_length; - memcpy(ls->ls_remove_names[remote_count], r->res_name, - DLM_RESNAME_MAXLEN); - remote_count++; - - if (remote_count >= DLM_REMOVE_NAMES_MAX) - break; - continue; - } - - if (!kref_put(&r->res_ref, kill_rsb)) { - log_error(ls, "tossed rsb in use %s", r->res_name); - continue; - } - - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - dlm_free_rsb(r); - } - - if (need_shrink) - set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); - else - clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); - spin_unlock(&ls->ls_rsbtbl[b].lock); - - /* - * While searching for rsb's to free, we found some that require - * remote removal. We leave them in place and find them again here - * so there is a very small gap between removing them from the toss - * list and sending the removal. Keeping this gap small is - * important to keep us (the master node) from being out of sync - * with the remote dir node for very long. - */ - - for (i = 0; i < remote_count; i++) { - name = ls->ls_remove_names[i]; - len = ls->ls_remove_lens[i]; - - spin_lock(&ls->ls_rsbtbl[b].lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (rv) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name not toss %s", name); - continue; - } - - if (r->res_master_nodeid != our_nodeid) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name master %d dir %d our %d %s", - r->res_master_nodeid, r->res_dir_nodeid, - our_nodeid, name); - continue; - } - - if (r->res_dir_nodeid == our_nodeid) { - /* should never happen */ - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "remove_name dir %d master %d our %d %s", - r->res_dir_nodeid, r->res_master_nodeid, - our_nodeid, name); - continue; - } - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name toss_time %lu now %lu %s", - r->res_toss_time, jiffies, name); - continue; - } - - if (!kref_put(&r->res_ref, kill_rsb)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "remove_name in use %s", name); - continue; - } - - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - send_remove(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - - dlm_free_rsb(r); - } -} - -void dlm_scan_rsbs(struct dlm_ls *ls) -{ - int i; - - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - shrink_bucket(ls, i); - if (dlm_locking_stopped(ls)) - break; - cond_resched(); - } -} - /* lkb is master or local copy */ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -2538,7 +2706,6 @@ static void process_lookup_list(struct dlm_rsb *r) list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { list_del_init(&lkb->lkb_rsb_lookup); _request_lock(r, lkb); - schedule(); } } @@ -2701,16 +2868,14 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, case -EINVAL: /* annoy the user because dlm usage is wrong */ WARN_ON(1); - log_error(ls, "%s %d %x %x %x %d %d %s", __func__, + log_error(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; default: - log_debug(ls, "%s %d %x %x %x %d %d %s", __func__, + log_debug(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; } @@ -2768,13 +2933,16 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) goto out; } + if (is_overlap_unlock(lkb)) + goto out; + /* cancel not allowed with another cancel/unlock in progress */ if (args->flags & DLM_LKF_CANCEL) { if (lkb->lkb_exflags & DLM_LKF_CANCEL) goto out; - if (is_overlap(lkb)) + if (is_overlap_cancel(lkb)) goto out; if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { @@ -2812,9 +2980,6 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) goto out; - if (is_overlap_unlock(lkb)) - goto out; - if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); rv = -EBUSY; @@ -3332,8 +3497,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace, static int _create_message(struct dlm_ls *ls, int mb_len, int to_nodeid, int mstype, struct dlm_message **ms_ret, - struct dlm_mhandle **mh_ret, - gfp_t allocation) + struct dlm_mhandle **mh_ret) { struct dlm_message *ms; struct dlm_mhandle *mh; @@ -3343,7 +3507,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len, pass into midcomms_commit and a message buffer (mb) that we write our data into */ - mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb); + mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb); if (!mh) return -ENOBUFS; @@ -3365,8 +3529,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len, static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, int to_nodeid, int mstype, struct dlm_message **ms_ret, - struct dlm_mhandle **mh_ret, - gfp_t allocation) + struct dlm_mhandle **mh_ret) { int mb_len = sizeof(struct dlm_message); @@ -3387,7 +3550,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, } return _create_message(r->res_ls, mb_len, to_nodeid, mstype, - ms_ret, mh_ret, allocation); + ms_ret, mh_ret); } /* further lowcomms enhancements or alternate implementations may make @@ -3452,11 +3615,8 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) to_nodeid = r->res_nodeid; - error = add_to_waiters(lkb, mstype, to_nodeid); - if (error) - return error; - - error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS); + add_to_waiters(lkb, mstype, to_nodeid); + error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto fail; @@ -3516,8 +3676,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) to_nodeid = lkb->lkb_nodeid; - error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh, - GFP_NOFS); + error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); if (error) goto out; @@ -3538,8 +3697,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) to_nodeid = lkb->lkb_nodeid; - error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh, - GFP_NOFS); + error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); if (error) goto out; @@ -3560,12 +3718,8 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) to_nodeid = dlm_dir_nodeid(r); - error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); - if (error) - return error; - - error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh, - GFP_NOFS); + add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); + error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); if (error) goto fail; @@ -3589,8 +3743,7 @@ static int send_remove(struct dlm_rsb *r) to_nodeid = dlm_dir_nodeid(r); - error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh, - GFP_ATOMIC); + error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); if (error) goto out; @@ -3611,7 +3764,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, to_nodeid = lkb->lkb_nodeid; - error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS); + error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto out; @@ -3653,8 +3806,7 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_mhandle *mh; int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); - error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh, - GFP_NOFS); + error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); if (error) goto out; @@ -4139,7 +4291,6 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) { char name[DLM_RESNAME_MAXLEN+1]; struct dlm_rsb *r; - uint32_t hash, b; int rv, len, dir_nodeid, from_nodeid; from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); @@ -4159,68 +4310,76 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - /* Look for name on rsbtbl.toss, if it's there, kill it. - If it's on rsbtbl.keep, it's being used, and we should ignore this - message. This is an expected race between the dir node sending a - request to the master node at the same time as the master node sends - a remove to the dir node. The resolution to that race is for the - dir node to ignore the remove message, and the master node to - recreate the master rsb when it gets a request from the dir node for - an rsb it doesn't have. */ + /* + * Look for inactive rsb, if it's there, free it. + * If the rsb is active, it's being used, and we should ignore this + * message. This is an expected race between the dir node sending a + * request to the master node at the same time as the master node sends + * a remove to the dir node. The resolution to that race is for the + * dir node to ignore the remove message, and the master node to + * recreate the master rsb when it gets a request from the dir node for + * an rsb it doesn't have. + */ memset(name, 0, sizeof(name)); memcpy(name, ms->m_extra, len); - hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); + rcu_read_lock(); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (rv) { + rcu_read_unlock(); + /* should not happen */ + log_error(ls, "%s from %d not found %s", __func__, + from_nodeid, name); + return; + } - spin_lock(&ls->ls_rsbtbl[b].lock); + write_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + rcu_read_unlock(); + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* should not happen */ + log_error(ls, "%s from %d got removed during removal %s", + __func__, from_nodeid, name); + return; + } + /* at this stage the rsb can only being freed here */ + rcu_read_unlock(); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (rv) { - /* verify the rsb is on keep list per comment above */ - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (rv) { - /* should not happen */ - log_error(ls, "receive_remove from %d not found %s", - from_nodeid, name); - spin_unlock(&ls->ls_rsbtbl[b].lock); - return; - } + if (!rsb_flag(r, RSB_INACTIVE)) { if (r->res_master_nodeid != from_nodeid) { /* should not happen */ - log_error(ls, "receive_remove keep from %d master %d", + log_error(ls, "receive_remove on active rsb from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } + /* Ignore the remove message, see race comment above. */ + log_debug(ls, "receive_remove from %d master %d first %x %s", from_nodeid, r->res_master_nodeid, r->res_first_lkid, name); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } if (r->res_master_nodeid != from_nodeid) { - log_error(ls, "receive_remove toss from %d master %d", + log_error(ls, "receive_remove inactive from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } - if (kref_put(&r->res_ref, kill_rsb)) { - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - spin_unlock(&ls->ls_rsbtbl[b].lock); - dlm_free_rsb(r); - } else { - log_error(ls, "receive_remove from %d rsb ref error", - from_nodeid); - dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - } + list_del(&r->res_slow_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); + write_unlock_bh(&ls->ls_rsbtbl_lock); + + free_inactive_rsb(r); } static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) @@ -4407,7 +4566,6 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, if (error) goto out; - /* local reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; @@ -4446,7 +4604,6 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, if (error) goto out; - /* local reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; @@ -4498,7 +4655,6 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, if (error) goto out; - /* local reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; @@ -4757,20 +4913,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms, static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms, int nodeid) { - if (dlm_locking_stopped(ls)) { +try_again: + read_lock_bh(&ls->ls_requestqueue_lock); + if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { /* If we were a member of this lockspace, left, and rejoined, other nodes may still be sending us messages from the lockspace generation before we left. */ if (WARN_ON_ONCE(!ls->ls_generation)) { + read_unlock_bh(&ls->ls_requestqueue_lock); log_limit(ls, "receive %d from %d ignore old gen", le32_to_cpu(ms->m_type), nodeid); return; } + read_unlock_bh(&ls->ls_requestqueue_lock); + write_lock_bh(&ls->ls_requestqueue_lock); + /* recheck because we hold writelock now */ + if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { + write_unlock_bh(&ls->ls_requestqueue_lock); + goto try_again; + } + dlm_add_requestqueue(ls, nodeid, ms); + write_unlock_bh(&ls->ls_requestqueue_lock); } else { - dlm_wait_requestqueue(ls); _receive_message(ls, ms, 0); + read_unlock_bh(&ls->ls_requestqueue_lock); } } @@ -4830,7 +4998,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid) /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to be inactive (in this ls) before transitioning to recovery mode */ - down_read(&ls->ls_recv_active); + read_lock_bh(&ls->ls_recv_active); if (hd->h_cmd == DLM_MSG) dlm_receive_message(ls, &p->message, nodeid); else if (hd->h_cmd == DLM_RCOM) @@ -4838,7 +5006,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid) else log_error(ls, "invalid h_cmd %d from %d lockspace %x", hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace)); - up_read(&ls->ls_recv_active); + read_unlock_bh(&ls->ls_recv_active); dlm_put_lockspace(ls); } @@ -4847,16 +5015,19 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms_local) { if (middle_conversion(lkb)) { + log_rinfo(ls, "%s %x middle convert in progress", __func__, + lkb->lkb_id); + + /* We sent this lock to the new master. The new master will + * tell us when it's granted. We no longer need a reply, so + * use a fake reply to put the lkb into the right state. + */ hold_lkb(lkb); memset(ms_local, 0, sizeof(struct dlm_message)); ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); _receive_convert_reply(lkb, ms_local, true); - - /* Same special case as in receive_rcom_lock_args() */ - lkb->lkb_grmode = DLM_LOCK_IV; - rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); unhold_lkb(lkb); } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { @@ -4899,8 +5070,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) if (!ms_local) return; - mutex_lock(&ls->ls_waiters_mutex); - list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); @@ -4993,7 +5162,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) } schedule(); } - mutex_unlock(&ls->ls_waiters_mutex); kfree(ms_local); } @@ -5001,7 +5169,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) { struct dlm_lkb *lkb = NULL, *iter; - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) { hold_lkb(iter); @@ -5009,7 +5177,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) break; } } - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); return lkb; } @@ -5109,9 +5277,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) } /* Forcibly remove from waiters list */ - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); list_del_init(&lkb->lkb_wait_reply); - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); /* * The lkb is now clear of all prior waiters state and can be @@ -5144,7 +5312,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: _request_lock(r, lkb); - if (is_master(r)) + if (r->res_nodeid != -1 && is_master(r)) confirm_master(r, 0); break; case DLM_MSG_CONVERT: @@ -5236,7 +5404,7 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, /* Get rid of locks held by nodes that are gone. */ -void dlm_recover_purge(struct dlm_ls *ls) +void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list) { struct dlm_rsb *r; struct dlm_member *memb; @@ -5255,11 +5423,9 @@ void dlm_recover_purge(struct dlm_ls *ls) if (!nodes_count) return; - down_write(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { - hold_rsb(r); + list_for_each_entry(r, root_list, res_root_list) { lock_rsb(r); - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { purge_dead_list(ls, r, &r->res_grantqueue, nodeid_gone, &lkb_count); purge_dead_list(ls, r, &r->res_convertqueue, @@ -5268,25 +5434,21 @@ void dlm_recover_purge(struct dlm_ls *ls) nodeid_gone, &lkb_count); } unlock_rsb(r); - unhold_rsb(r); + cond_resched(); } - up_write(&ls->ls_root_sem); if (lkb_count) log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", lkb_count, nodes_count); } -static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) +static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) { - struct rb_node *n; struct dlm_rsb *r; - spin_lock(&ls->ls_rsbtbl[bucket].lock); - for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; if (!is_master(r)) { @@ -5294,10 +5456,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) continue; } hold_rsb(r); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return r; } - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return NULL; } @@ -5321,19 +5483,15 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) void dlm_recover_grant(struct dlm_ls *ls) { struct dlm_rsb *r; - int bucket = 0; unsigned int count = 0; unsigned int rsb_count = 0; unsigned int lkb_count = 0; while (1) { - r = find_grant_rsb(ls, bucket); - if (!r) { - if (bucket == ls->ls_rsbtbl_size - 1) - break; - bucket++; - continue; - } + r = find_grant_rsb(ls); + if (!r) + break; + rsb_count++; count = 0; lock_rsb(r); @@ -5416,10 +5574,11 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, The real granted mode of these converting locks cannot be determined until all locks have been rebuilt on the rsb (recover_conversion) */ - if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && - middle_conversion(lkb)) { - rl->rl_status = DLM_LKSTS_CONVERT; - lkb->lkb_grmode = DLM_LOCK_IV; + if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) { + /* We may need to adjust grmode depending on other granted locks. */ + log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid); rsb_set_flag(r, RSB_RECOVER_CONVERT); } @@ -5641,10 +5800,10 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } /* add this new lkb to the per-process list of locks */ - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); hold_lkb(lkb); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); do_put = false; out_put: trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false); @@ -5726,7 +5885,7 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, int found_other_mode = 0; int rv = 0; - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { if (iter->lkb_resource->res_length != namelen) continue; @@ -5743,7 +5902,7 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, *lkid = iter->lkb_id; break; } - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); if (!lkb && found_other_mode) { rv = -EAGAIN; @@ -5774,9 +5933,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, * for the proc locks list. */ - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); out: kfree(ua_tmp); return rv; @@ -5820,11 +5979,11 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out_put; - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); /* dlm_user_add_cb() may have already taken lkb off the proc list */ if (!list_empty(&lkb->lkb_ownqueue)) list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); out_put: trace_dlm_unlock_end(ls, lkb, flags, error); dlm_put_lkb(lkb); @@ -5935,9 +6094,9 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) int error; hold_lkb(lkb); /* reference for the ls_orphans list */ - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); set_unlock_args(0, lkb->lkb_ua, &args); @@ -5975,7 +6134,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, { struct dlm_lkb *lkb = NULL; - spin_lock(&ls->ls_clear_proc_locks); + spin_lock_bh(&ls->ls_clear_proc_locks); if (list_empty(&proc->locks)) goto out; @@ -5987,7 +6146,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, else set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); out: - spin_unlock(&ls->ls_clear_proc_locks); + spin_unlock_bh(&ls->ls_clear_proc_locks); return lkb; } @@ -6003,6 +6162,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; dlm_lock_recovery(ls); @@ -6023,7 +6183,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } - spin_lock(&ls->ls_clear_proc_locks); + spin_lock_bh(&ls->ls_clear_proc_locks); /* in-progress unlocks */ list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { @@ -6032,29 +6192,29 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - dlm_purge_lkb_callbacks(lkb); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + dlm_free_cb(cb); } - spin_unlock(&ls->ls_clear_proc_locks); + spin_unlock_bh(&ls->ls_clear_proc_locks); dlm_unlock_recovery(ls); } static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; while (1) { lkb = NULL; - spin_lock(&proc->locks_spin); + spin_lock_bh(&proc->locks_spin); if (!list_empty(&proc->locks)) { lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); list_del_init(&lkb->lkb_ownqueue); } - spin_unlock(&proc->locks_spin); + spin_unlock_bh(&proc->locks_spin); if (!lkb) break; @@ -6064,21 +6224,20 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); /* ref from proc->locks list */ } - spin_lock(&proc->locks_spin); + spin_lock_bh(&proc->locks_spin); list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { list_del_init(&lkb->lkb_ownqueue); set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); dlm_put_lkb(lkb); } - spin_unlock(&proc->locks_spin); + spin_unlock_bh(&proc->locks_spin); - spin_lock(&proc->asts_spin); - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - dlm_purge_lkb_callbacks(lkb); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + spin_lock_bh(&proc->asts_spin); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + dlm_free_cb(cb); } - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); } /* pid of 0 means purge all orphans */ @@ -6087,7 +6246,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid) { struct dlm_lkb *lkb, *safe; - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { if (pid && lkb->lkb_ownpid != pid) continue; @@ -6095,7 +6254,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid) list_del_init(&lkb->lkb_ownqueue); dlm_put_lkb(lkb); } - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); } static int send_purge(struct dlm_ls *ls, int nodeid, int pid) @@ -6105,7 +6264,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid) int error; error = _create_message(ls, sizeof(struct dlm_message), nodeid, - DLM_MSG_PURGE, &ms, &mh, GFP_NOFS); + DLM_MSG_PURGE, &ms, &mh); if (error) return error; ms->m_nodeid = cpu_to_le32(nodeid); @@ -6188,8 +6347,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, if (error) return error; - error = add_to_waiters(lkb, mstype, to_nodeid); + add_to_waiters(lkb, mstype, to_nodeid); dlm_put_lkb(lkb); - return error; + return 0; } |