summaryrefslogtreecommitdiff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c1401
1 files changed, 780 insertions, 621 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index fd752dd03896..e01d5f29f4d2 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local);
static int receive_extralen(const struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
-static void toss_rsb(struct kref *kref);
+static void deactivate_rsb(struct kref *kref);
/*
* Lock compatibilty matrix - thanks Steve
@@ -201,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r)
/* Threads cannot use the lockspace while it's being recovered */
-static inline void dlm_lock_recovery(struct dlm_ls *ls)
+void dlm_lock_recovery(struct dlm_ls *ls)
{
down_read(&ls->ls_in_recovery);
}
@@ -320,11 +320,18 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
* Basic operations on rsb's and lkb's
*/
+static inline unsigned long rsb_toss_jiffies(void)
+{
+ return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
+}
+
/* This is only called to add a reference when the code already holds
a valid reference to the rsb, so there's no need for locking. */
static inline void hold_rsb(struct dlm_rsb *r)
{
+ /* inactive rsbs are not ref counted */
+ WARN_ON(rsb_flag(r, RSB_INACTIVE));
kref_get(&r->res_ref);
}
@@ -333,19 +340,45 @@ void dlm_hold_rsb(struct dlm_rsb *r)
hold_rsb(r);
}
-/* When all references to the rsb are gone it's transferred to
- the tossed list for later disposal. */
+/* TODO move this to lib/refcount.c */
+static __must_check bool
+dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
+__cond_acquires(lock)
+{
+ if (refcount_dec_not_one(r))
+ return false;
+
+ write_lock_bh(lock);
+ if (!refcount_dec_and_test(r)) {
+ write_unlock_bh(lock);
+ return false;
+ }
+
+ return true;
+}
+
+/* TODO move this to include/linux/kref.h */
+static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
+ void (*release)(struct kref *kref),
+ rwlock_t *lock)
+{
+ if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
+ release(kref);
+ return 1;
+ }
+
+ return 0;
+}
static void put_rsb(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- uint32_t bucket = r->res_bucket;
int rv;
- rv = kref_put_lock(&r->res_ref, toss_rsb,
- &ls->ls_rsbtbl[bucket].lock);
+ rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
+ &ls->ls_rsbtbl_lock);
if (rv)
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
}
void dlm_put_rsb(struct dlm_rsb *r)
@@ -353,36 +386,209 @@ void dlm_put_rsb(struct dlm_rsb *r)
put_rsb(r);
}
-static int pre_rsb_struct(struct dlm_ls *ls)
+/* connected with timer_delete_sync() in dlm_ls_stop() to stop
+ * new timers when recovery is triggered and don't run them
+ * again until a resume_scan_timer() tries it again.
+ */
+static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
{
- struct dlm_rsb *r1, *r2;
- int count = 0;
+ if (!dlm_locking_stopped(ls))
+ mod_timer(&ls->ls_scan_timer, jiffies);
+}
- spin_lock(&ls->ls_new_rsb_spin);
- if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
- spin_unlock(&ls->ls_new_rsb_spin);
- return 0;
- }
- spin_unlock(&ls->ls_new_rsb_spin);
+/* This function tries to resume the timer callback if a rsb
+ * is on the scan list and no timer is pending. It might that
+ * the first entry is on currently executed as timer callback
+ * but we don't care if a timer queued up again and does
+ * nothing. Should be a rare case.
+ */
+void resume_scan_timer(struct dlm_ls *ls)
+{
+ struct dlm_rsb *r;
+
+ spin_lock_bh(&ls->ls_scan_lock);
+ r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ if (r && !timer_pending(&ls->ls_scan_timer))
+ enable_scan_timer(ls, r->res_toss_time);
+ spin_unlock_bh(&ls->ls_scan_lock);
+}
+
+/* ls_rsbtbl_lock must be held */
+
+static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
+{
+ struct dlm_rsb *first;
+
+ /* active rsbs should never be on the scan list */
+ WARN_ON(!rsb_flag(r, RSB_INACTIVE));
- r1 = dlm_allocate_rsb(ls);
- r2 = dlm_allocate_rsb(ls);
+ spin_lock_bh(&ls->ls_scan_lock);
+ r->res_toss_time = 0;
+
+ /* if the rsb is not queued do nothing */
+ if (list_empty(&r->res_scan_list))
+ goto out;
- spin_lock(&ls->ls_new_rsb_spin);
- if (r1) {
- list_add(&r1->res_hashchain, &ls->ls_new_rsb);
- ls->ls_new_rsb_count++;
+ /* get the first element before delete */
+ first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ list_del_init(&r->res_scan_list);
+ /* check if the first element was the rsb we deleted */
+ if (first == r) {
+ /* try to get the new first element, if the list
+ * is empty now try to delete the timer, if we are
+ * too late we don't care.
+ *
+ * if the list isn't empty and a new first element got
+ * in place, set the new timer expire time.
+ */
+ first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ if (!first)
+ timer_delete(&ls->ls_scan_timer);
+ else
+ enable_scan_timer(ls, first->res_toss_time);
}
- if (r2) {
- list_add(&r2->res_hashchain, &ls->ls_new_rsb);
- ls->ls_new_rsb_count++;
+
+out:
+ spin_unlock_bh(&ls->ls_scan_lock);
+}
+
+static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
+{
+ int our_nodeid = dlm_our_nodeid();
+ struct dlm_rsb *first;
+
+ /* A dir record for a remote master rsb should never be on the scan list. */
+ WARN_ON(!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) == our_nodeid));
+
+ /* An active rsb should never be on the scan list. */
+ WARN_ON(!rsb_flag(r, RSB_INACTIVE));
+
+ /* An rsb should not already be on the scan list. */
+ WARN_ON(!list_empty(&r->res_scan_list));
+
+ spin_lock_bh(&ls->ls_scan_lock);
+ /* set the new rsb absolute expire time in the rsb */
+ r->res_toss_time = rsb_toss_jiffies();
+ if (list_empty(&ls->ls_scan_list)) {
+ /* if the queue is empty add the element and it's
+ * our new expire time
+ */
+ list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
+ enable_scan_timer(ls, r->res_toss_time);
+ } else {
+ /* try to get the maybe new first element and then add
+ * to this rsb with the oldest expire time to the end
+ * of the queue. If the list was empty before this
+ * rsb expire time is our next expiration if it wasn't
+ * the now new first elemet is our new expiration time
+ */
+ first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
+ if (!first)
+ enable_scan_timer(ls, r->res_toss_time);
+ else
+ enable_scan_timer(ls, first->res_toss_time);
}
- count = ls->ls_new_rsb_count;
- spin_unlock(&ls->ls_new_rsb_spin);
+ spin_unlock_bh(&ls->ls_scan_lock);
+}
- if (!count)
- return -ENOMEM;
- return 0;
+/* if we hit contention we do in 250 ms a retry to trylock.
+ * if there is any other mod_timer in between we don't care
+ * about that it expires earlier again this is only for the
+ * unlikely case nothing happened in this time.
+ */
+#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
+
+/* Called by lockspace scan_timer to free unused rsb's. */
+
+void dlm_rsb_scan(struct timer_list *timer)
+{
+ struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
+ int our_nodeid = dlm_our_nodeid();
+ struct dlm_rsb *r;
+ int rv;
+
+ while (1) {
+ /* interrupting point to leave iteration when
+ * recovery waits for timer_delete_sync(), recovery
+ * will take care to delete everything in scan list.
+ */
+ if (dlm_locking_stopped(ls))
+ break;
+
+ rv = spin_trylock(&ls->ls_scan_lock);
+ if (!rv) {
+ /* rearm again try timer */
+ enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
+ break;
+ }
+
+ r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ if (!r) {
+ /* the next add_scan will enable the timer again */
+ spin_unlock(&ls->ls_scan_lock);
+ break;
+ }
+
+ /*
+ * If the first rsb is not yet expired, then stop because the
+ * list is sorted with nearest expiration first.
+ */
+ if (time_before(jiffies, r->res_toss_time)) {
+ /* rearm with the next rsb to expire in the future */
+ enable_scan_timer(ls, r->res_toss_time);
+ spin_unlock(&ls->ls_scan_lock);
+ break;
+ }
+
+ /* in find_rsb_dir/nodir there is a reverse order of this
+ * lock, however this is only a trylock if we hit some
+ * possible contention we try it again.
+ */
+ rv = write_trylock(&ls->ls_rsbtbl_lock);
+ if (!rv) {
+ spin_unlock(&ls->ls_scan_lock);
+ /* rearm again try timer */
+ enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
+ break;
+ }
+
+ list_del(&r->res_slow_list);
+ rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+ dlm_rhash_rsb_params);
+ rsb_clear_flag(r, RSB_HASHED);
+
+ /* ls_rsbtbl_lock is not needed when calling send_remove() */
+ write_unlock(&ls->ls_rsbtbl_lock);
+
+ list_del_init(&r->res_scan_list);
+ spin_unlock(&ls->ls_scan_lock);
+
+ /* An rsb that is a dir record for a remote master rsb
+ * cannot be removed, and should not have a timer enabled.
+ */
+ WARN_ON(!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) == our_nodeid));
+
+ /* We're the master of this rsb but we're not
+ * the directory record, so we need to tell the
+ * dir node to remove the dir record
+ */
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid == our_nodeid) &&
+ (dlm_dir_nodeid(r) != our_nodeid))
+ send_remove(r);
+
+ free_inactive_rsb(r);
+ }
}
/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
@@ -393,102 +599,52 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
struct dlm_rsb **r_ret)
{
struct dlm_rsb *r;
- int count;
- spin_lock(&ls->ls_new_rsb_spin);
- if (list_empty(&ls->ls_new_rsb)) {
- count = ls->ls_new_rsb_count;
- spin_unlock(&ls->ls_new_rsb_spin);
- log_debug(ls, "find_rsb retry %d %d %s",
- count, dlm_config.ci_new_rsb_count,
- (const char *)name);
- return -EAGAIN;
- }
-
- r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
- list_del(&r->res_hashchain);
- /* Convert the empty list_head to a NULL rb_node for tree usage: */
- memset(&r->res_hashnode, 0, sizeof(struct rb_node));
- ls->ls_new_rsb_count--;
- spin_unlock(&ls->ls_new_rsb_spin);
+ r = dlm_allocate_rsb();
+ if (!r)
+ return -ENOMEM;
r->res_ls = ls;
r->res_length = len;
memcpy(r->res_name, name, len);
- mutex_init(&r->res_mutex);
+ spin_lock_init(&r->res_lock);
INIT_LIST_HEAD(&r->res_lookup);
INIT_LIST_HEAD(&r->res_grantqueue);
INIT_LIST_HEAD(&r->res_convertqueue);
INIT_LIST_HEAD(&r->res_waitqueue);
INIT_LIST_HEAD(&r->res_root_list);
+ INIT_LIST_HEAD(&r->res_scan_list);
INIT_LIST_HEAD(&r->res_recover_list);
+ INIT_LIST_HEAD(&r->res_masters_list);
*r_ret = r;
return 0;
}
-static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
+ struct dlm_rsb **r_ret)
{
- char maxname[DLM_RESNAME_MAXLEN];
+ char key[DLM_RESNAME_MAXLEN] = {};
- memset(maxname, 0, DLM_RESNAME_MAXLEN);
- memcpy(maxname, name, nlen);
- return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
-}
+ memcpy(key, name, len);
+ *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
+ if (*r_ret)
+ return 0;
-int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
- struct dlm_rsb **r_ret)
-{
- struct rb_node *node = tree->rb_node;
- struct dlm_rsb *r;
- int rc;
-
- while (node) {
- r = rb_entry(node, struct dlm_rsb, res_hashnode);
- rc = rsb_cmp(r, name, len);
- if (rc < 0)
- node = node->rb_left;
- else if (rc > 0)
- node = node->rb_right;
- else
- goto found;
- }
- *r_ret = NULL;
return -EBADR;
-
- found:
- *r_ret = r;
- return 0;
}
-static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
{
- struct rb_node **newn = &tree->rb_node;
- struct rb_node *parent = NULL;
- int rc;
-
- while (*newn) {
- struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
- res_hashnode);
+ int rv;
- parent = *newn;
- rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
- if (rc < 0)
- newn = &parent->rb_left;
- else if (rc > 0)
- newn = &parent->rb_right;
- else {
- log_print("rsb_insert match");
- dlm_dump_rsb(rsb);
- dlm_dump_rsb(cur);
- return -EEXIST;
- }
- }
+ rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+ dlm_rhash_rsb_params);
+ if (!rv)
+ rsb_set_flag(rsb, RSB_HASHED);
- rb_link_node(&rsb->res_hashnode, parent, newn);
- rb_insert_color(&rsb->res_hashnode, tree);
- return 0;
+ return rv;
}
/*
@@ -518,7 +674,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
* So, if the given rsb is on the toss list, it is moved to the keep list
* before being returned.
*
- * toss_rsb() happens when all local usage of the rsb is done, i.e. no
+ * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
* more refcounts exist, so the rsb is moved from the keep list to the
* toss list.
*
@@ -536,8 +692,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
*/
static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
- uint32_t hash, uint32_t b,
- int dir_nodeid, int from_nodeid,
+ uint32_t hash, int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r = NULL;
@@ -567,9 +722,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
*
* If someone sends us a request, we are the dir node, and we do
* not find the rsb anywhere, then recreate it. This happens if
- * someone sends us a request after we have removed/freed an rsb
- * from our toss list. (They sent a request instead of lookup
- * because they are using an rsb from their toss list.)
+ * someone sends us a request after we have removed/freed an rsb.
+ * (They sent a request instead of lookup because they are using
+ * an rsb taken from their scan list.)
*/
if (from_local || from_dir ||
@@ -578,51 +733,83 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
retry:
- if (create) {
- error = pre_rsb_struct(ls);
- if (error < 0)
- goto out;
- }
-
- spin_lock(&ls->ls_rsbtbl[b].lock);
-
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
- goto do_toss;
+ goto do_new;
+
+ /* check if the rsb is active under read lock - likely path */
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ if (!rsb_flag(r, RSB_HASHED)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ error = -EBADR;
+ goto do_new;
+ }
/*
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
+ if (rsb_flag(r, RSB_INACTIVE)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto do_inactive;
+ }
+
kref_get(&r->res_ref);
- goto out_unlock;
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto out;
- do_toss:
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
- if (error)
+ do_inactive:
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+
+ /*
+ * The expectation here is that the rsb will have HASHED and
+ * INACTIVE flags set, and that the rsb can be moved from
+ * inactive back to active again. However, between releasing
+ * the read lock and acquiring the write lock, this rsb could
+ * have been removed from rsbtbl, and had HASHED cleared, to
+ * be freed. To deal with this case, we would normally need
+ * to repeat dlm_search_rsb_tree while holding the write lock,
+ * but rcu allows us to simply check the HASHED flag, because
+ * the rcu read lock means the rsb will not be freed yet.
+ * If the HASHED flag is not set, then the rsb is being freed,
+ * so we add a new rsb struct. If the HASHED flag is set,
+ * and INACTIVE is not set, it means another thread has
+ * made the rsb active, as we're expecting to do here, and
+ * we just repeat the lookup (this will be very unlikely.)
+ */
+ if (rsb_flag(r, RSB_HASHED)) {
+ if (!rsb_flag(r, RSB_INACTIVE)) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto retry;
+ }
+ } else {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ error = -EBADR;
goto do_new;
+ }
/*
* rsb found inactive (master_nodeid may be out of date unless
* we are the dir_nodeid or were the master) No other thread
- * is using this rsb because it's on the toss list, so we can
+ * is using this rsb because it's inactive, so we can
* look at or update res_master_nodeid without lock_rsb.
*/
if ((r->res_master_nodeid != our_nodeid) && from_other) {
/* our rsb was not master, and another node (not the dir node)
has sent us a request */
- log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
+ log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
from_nodeid, r->res_master_nodeid, dir_nodeid,
r->res_name);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
error = -ENOTBLK;
- goto out_unlock;
+ goto out;
}
if ((r->res_master_nodeid != our_nodeid) && from_dir) {
/* don't think this should ever happen */
- log_error(ls, "find_rsb toss from_dir %d master %d",
+ log_error(ls, "find_rsb inactive from_dir %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
/* fix it and go on */
@@ -639,9 +826,18 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
r->res_first_lkid = 0;
}
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
- goto out_unlock;
+ /* we always deactivate scan timer for the rsb, when
+ * we move it out of the inactive state as rsb state
+ * can be changed and scan timers are only for inactive
+ * rsbs.
+ */
+ del_scan(ls, r);
+ list_move(&r->res_slow_list, &ls->ls_slow_active);
+ rsb_clear_flag(r, RSB_INACTIVE);
+ kref_init(&r->res_ref); /* ref is now used in active state */
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+
+ goto out;
do_new:
@@ -650,18 +846,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
*/
if (error == -EBADR && !create)
- goto out_unlock;
+ goto out;
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- goto retry;
- }
- if (error)
- goto out_unlock;
+ if (WARN_ON_ONCE(error))
+ goto out;
r->res_hash = hash;
- r->res_bucket = b;
r->res_dir_nodeid = dir_nodeid;
kref_init(&r->res_ref);
@@ -681,7 +872,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
dlm_free_rsb(r);
r = NULL;
error = -ENOTBLK;
- goto out_unlock;
+ goto out;
}
if (from_other) {
@@ -701,9 +892,20 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
out_add:
- error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
- out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+ error = rsb_insert(r, &ls->ls_rsbtbl);
+ if (error == -EEXIST) {
+ /* somebody else was faster and it seems the
+ * rsb exists now, we do a whole relookup
+ */
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ dlm_free_rsb(r);
+ goto retry;
+ } else if (!error) {
+ list_add(&r->res_slow_list, &ls->ls_slow_active);
+ }
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
out:
*r_ret = r;
return error;
@@ -714,8 +916,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
dlm_recover_masters). */
static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
- uint32_t hash, uint32_t b,
- int dir_nodeid, int from_nodeid,
+ uint32_t hash, int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r = NULL;
@@ -724,59 +925,82 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
int error;
retry:
- error = pre_rsb_struct(ls);
- if (error < 0)
- goto out;
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+ if (error)
+ goto do_new;
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ /* check if the rsb is in active state under read lock - likely path */
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ if (!rsb_flag(r, RSB_HASHED)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto do_new;
+ }
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
- if (error)
- goto do_toss;
+ if (rsb_flag(r, RSB_INACTIVE)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto do_inactive;
+ }
/*
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
kref_get(&r->res_ref);
- goto out_unlock;
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto out;
- do_toss:
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
- if (error)
+
+ do_inactive:
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+
+ /* See comment in find_rsb_dir. */
+ if (rsb_flag(r, RSB_HASHED)) {
+ if (!rsb_flag(r, RSB_INACTIVE)) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto retry;
+ }
+ } else {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_new;
+ }
+
/*
* rsb found inactive. No other thread is using this rsb because
- * it's on the toss list, so we can look at or update
- * res_master_nodeid without lock_rsb.
+ * it's inactive, so we can look at or update res_master_nodeid
+ * without lock_rsb.
*/
if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
/* our rsb is not master, and another node has sent us a
request; this should never happen */
- log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
+ log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
error = -ENOTBLK;
- goto out_unlock;
+ goto out;
}
if (!recover && (r->res_master_nodeid != our_nodeid) &&
(dir_nodeid == our_nodeid)) {
/* our rsb is not master, and we are dir; may as well fix it;
this should never happen */
- log_error(ls, "find_rsb toss our %d master %d dir %d",
+ log_error(ls, "find_rsb inactive our %d master %d dir %d",
our_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
}
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
- goto out_unlock;
+ del_scan(ls, r);
+ list_move(&r->res_slow_list, &ls->ls_slow_active);
+ rsb_clear_flag(r, RSB_INACTIVE);
+ kref_init(&r->res_ref);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+
+ goto out;
do_new:
@@ -785,49 +1009,98 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
*/
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- goto retry;
- }
- if (error)
- goto out_unlock;
+ if (WARN_ON_ONCE(error))
+ goto out;
r->res_hash = hash;
- r->res_bucket = b;
r->res_dir_nodeid = dir_nodeid;
r->res_master_nodeid = dir_nodeid;
r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
kref_init(&r->res_ref);
- error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
- out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+ error = rsb_insert(r, &ls->ls_rsbtbl);
+ if (error == -EEXIST) {
+ /* somebody else was faster and it seems the
+ * rsb exists now, we do a whole relookup
+ */
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ dlm_free_rsb(r);
+ goto retry;
+ } else if (!error) {
+ list_add(&r->res_slow_list, &ls->ls_slow_active);
+ }
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+
out:
*r_ret = r;
return error;
}
+/*
+ * rsb rcu usage
+ *
+ * While rcu read lock is held, the rsb cannot be freed,
+ * which allows a lookup optimization.
+ *
+ * Two threads are accessing the same rsb concurrently,
+ * the first (A) is trying to use the rsb, the second (B)
+ * is trying to free the rsb.
+ *
+ * thread A thread B
+ * (trying to use rsb) (trying to free rsb)
+ *
+ * A1. rcu read lock
+ * A2. rsbtbl read lock
+ * A3. look up rsb in rsbtbl
+ * A4. rsbtbl read unlock
+ * B1. rsbtbl write lock
+ * B2. look up rsb in rsbtbl
+ * B3. remove rsb from rsbtbl
+ * B4. clear rsb HASHED flag
+ * B5. rsbtbl write unlock
+ * B6. begin freeing rsb using rcu...
+ *
+ * (rsb is inactive, so try to make it active again)
+ * A5. read rsb HASHED flag (safe because rsb is not freed yet)
+ * A6. the rsb HASHED flag is not set, which it means the rsb
+ * is being removed from rsbtbl and freed, so don't use it.
+ * A7. rcu read unlock
+ *
+ * B7. ...finish freeing rsb using rcu
+ * A8. create a new rsb
+ *
+ * Without the rcu optimization, steps A5-8 would need to do
+ * an extra rsbtbl lookup:
+ * A5. rsbtbl write lock
+ * A6. look up rsb in rsbtbl, not found
+ * A7. rsbtbl write unlock
+ * A8. create a new rsb
+ */
+
static int find_rsb(struct dlm_ls *ls, const void *name, int len,
int from_nodeid, unsigned int flags,
struct dlm_rsb **r_ret)
{
- uint32_t hash, b;
int dir_nodeid;
+ uint32_t hash;
+ int rv;
if (len > DLM_RESNAME_MAXLEN)
return -EINVAL;
hash = jhash(name, len, 0);
- b = hash & (ls->ls_rsbtbl_size - 1);
-
dir_nodeid = dlm_hash2nodeid(ls, hash);
+ rcu_read_lock();
if (dlm_no_directory(ls))
- return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
+ rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
from_nodeid, flags, r_ret);
else
- return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
- from_nodeid, flags, r_ret);
+ rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
+ from_nodeid, flags, r_ret);
+ rcu_read_unlock();
+ return rv;
}
/* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -874,7 +1147,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
}
static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
- int from_nodeid, bool toss_list, unsigned int flags,
+ int from_nodeid, bool is_inactive, unsigned int flags,
int *r_nodeid, int *result)
{
int fix_master = (flags & DLM_LU_RECOVER_MASTER);
@@ -887,7 +1160,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
r->res_dir_nodeid = our_nodeid;
}
- if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
+ if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
/* Recovery uses this function to set a new master when
* the previous master failed. Setting NEW_MASTER will
* force dlm_recover_masters to call recover_master on this
@@ -898,9 +1171,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_NEW_MASTER);
- if (toss_list) {
- /* I don't think we should ever find it on toss list. */
- log_error(ls, "%s fix_master on toss", __func__);
+ if (is_inactive) {
+ /* I don't think we should ever find it inactive. */
+ log_error(ls, "%s fix_master inactive", __func__);
dlm_dump_rsb(r);
}
}
@@ -940,7 +1213,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
if (!from_master && !fix_master &&
(r->res_master_nodeid == from_nodeid)) {
/* this can happen when the master sends remove, the dir node
- * finds the rsb on the keep list and ignores the remove,
+ * finds the rsb on the active list and ignores the remove,
* and the former master sends a lookup
*/
@@ -984,11 +1257,11 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
* . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
*/
-int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
- int len, unsigned int flags, int *r_nodeid, int *result)
+static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+ int len, unsigned int flags, int *r_nodeid, int *result)
{
struct dlm_rsb *r = NULL;
- uint32_t hash, b;
+ uint32_t hash;
int our_nodeid = dlm_our_nodeid();
int dir_nodeid, error;
@@ -1002,8 +1275,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
}
hash = jhash(name, len, 0);
- b = hash & (ls->ls_rsbtbl_size - 1);
-
dir_nodeid = dlm_hash2nodeid(ls, hash);
if (dir_nodeid != our_nodeid) {
log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
@@ -1014,160 +1285,199 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
}
retry:
- error = pre_rsb_struct(ls);
- if (error < 0)
- return error;
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+ if (error)
+ goto not_found;
- spin_lock(&ls->ls_rsbtbl[b].lock);
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
- if (!error) {
- /* because the rsb is active, we need to lock_rsb before
- * checking/changing re_master_nodeid
- */
+ /* check if the rsb is active under read lock - likely path */
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ if (!rsb_flag(r, RSB_HASHED)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto not_found;
+ }
- hold_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- lock_rsb(r);
+ if (rsb_flag(r, RSB_INACTIVE)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto do_inactive;
+ }
- __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
- flags, r_nodeid, result);
+ /* because the rsb is active, we need to lock_rsb before
+ * checking/changing re_master_nodeid
+ */
- /* the rsb was active */
- unlock_rsb(r);
- put_rsb(r);
+ hold_rsb(r);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ lock_rsb(r);
- return 0;
- }
+ __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
+ flags, r_nodeid, result);
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
- if (error)
+ /* the rsb was active */
+ unlock_rsb(r);
+ put_rsb(r);
+
+ return 0;
+
+ do_inactive:
+ /* unlikely path - check if still part of ls_rsbtbl */
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+
+ /* see comment in find_rsb_dir */
+ if (rsb_flag(r, RSB_HASHED)) {
+ if (!rsb_flag(r, RSB_INACTIVE)) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ /* something as changed, very unlikely but
+ * try again
+ */
+ goto retry;
+ }
+ } else {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
goto not_found;
+ }
- /* because the rsb is inactive (on toss list), it's not refcounted
- * and lock_rsb is not used, but is protected by the rsbtbl lock
- */
+ /* because the rsb is inactive, it's not refcounted and lock_rsb
+ is not used, but is protected by the rsbtbl lock */
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
r_nodeid, result);
- r->res_toss_time = jiffies;
- /* the rsb was inactive (on toss list) */
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ /* A dir record rsb should never be on scan list.
+ * Except when we are the dir and master node.
+ * This function should only be called by the dir
+ * node.
+ */
+ WARN_ON(!list_empty(&r->res_scan_list) &&
+ r->res_master_nodeid != our_nodeid);
+
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return 0;
not_found:
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- goto retry;
- }
- if (error)
- goto out_unlock;
+ if (WARN_ON_ONCE(error))
+ goto out;
r->res_hash = hash;
- r->res_bucket = b;
r->res_dir_nodeid = our_nodeid;
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
- kref_init(&r->res_ref);
- r->res_toss_time = jiffies;
+ rsb_set_flag(r, RSB_INACTIVE);
- error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
- if (error) {
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+ error = rsb_insert(r, &ls->ls_rsbtbl);
+ if (error == -EEXIST) {
+ /* somebody else was faster and it seems the
+ * rsb exists now, we do a whole relookup
+ */
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ dlm_free_rsb(r);
+ goto retry;
+ } else if (error) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
/* should never happen */
dlm_free_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
goto retry;
}
+ list_add(&r->res_slow_list, &ls->ls_slow_inactive);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+
if (result)
*result = DLM_LU_ADD;
*r_nodeid = from_nodeid;
- out_unlock:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ out:
return error;
}
+int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+ int len, unsigned int flags, int *r_nodeid, int *result)
+{
+ int rv;
+ rcu_read_lock();
+ rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
+ rcu_read_unlock();
+ return rv;
+}
+
static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
{
- struct rb_node *n;
struct dlm_rsb *r;
- int i;
- for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- spin_lock(&ls->ls_rsbtbl[i].lock);
- for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
- r = rb_entry(n, struct dlm_rsb, res_hashnode);
- if (r->res_hash == hash)
- dlm_dump_rsb(r);
- }
- spin_unlock(&ls->ls_rsbtbl[i].lock);
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
+ if (r->res_hash == hash)
+ dlm_dump_rsb(r);
}
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
}
void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
{
struct dlm_rsb *r = NULL;
- uint32_t hash, b;
int error;
- hash = jhash(name, len, 0);
- b = hash & (ls->ls_rsbtbl_size - 1);
-
- spin_lock(&ls->ls_rsbtbl[b].lock);
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+ rcu_read_lock();
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error)
- goto out_dump;
-
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
- if (error)
goto out;
- out_dump:
+
dlm_dump_rsb(r);
out:
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ rcu_read_unlock();
}
-static void toss_rsb(struct kref *kref)
+static void deactivate_rsb(struct kref *kref)
{
struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
struct dlm_ls *ls = r->res_ls;
+ int our_nodeid = dlm_our_nodeid();
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
- kref_init(&r->res_ref);
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
- rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
- r->res_toss_time = jiffies;
- set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
+ rsb_set_flag(r, RSB_INACTIVE);
+ list_move(&r->res_slow_list, &ls->ls_slow_inactive);
+
+ /*
+ * When the rsb becomes unused, there are two possibilities:
+ * 1. Leave the inactive rsb in place (don't remove it).
+ * 2. Add it to the scan list to be removed.
+ *
+ * 1 is done when the rsb is acting as the dir record
+ * for a remotely mastered rsb. The rsb must be left
+ * in place as an inactive rsb to act as the dir record.
+ *
+ * 2 is done when a) the rsb is not the master and not the
+ * dir record, b) when the rsb is both the master and the
+ * dir record, c) when the rsb is master but not dir record.
+ *
+ * (If no directory is used, the rsb can always be removed.)
+ */
+ if (dlm_no_directory(ls) ||
+ (r->res_master_nodeid == our_nodeid ||
+ dlm_dir_nodeid(r) != our_nodeid))
+ add_scan(ls, r);
+
if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr);
r->res_lvbptr = NULL;
}
}
-/* See comment for unhold_lkb */
-
-static void unhold_rsb(struct dlm_rsb *r)
+void free_inactive_rsb(struct dlm_rsb *r)
{
- int rv;
- rv = kref_put(&r->res_ref, toss_rsb);
- DLM_ASSERT(!rv, dlm_dump_rsb(r););
-}
-
-static void kill_rsb(struct kref *kref)
-{
- struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
-
- /* All work is done after the return from kref_put() so we
- can release the write_lock before the remove and free. */
+ WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
+
+ dlm_free_rsb(r);
}
/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
@@ -1188,36 +1498,34 @@ static void detach_lkb(struct dlm_lkb *lkb)
}
static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
- int start, int end)
+ unsigned long start, unsigned long end)
{
+ struct xa_limit limit;
struct dlm_lkb *lkb;
int rv;
- lkb = dlm_allocate_lkb(ls);
+ limit.max = end;
+ limit.min = start;
+
+ lkb = dlm_allocate_lkb();
if (!lkb)
return -ENOMEM;
- lkb->lkb_last_bast_mode = -1;
+ lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
+ lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
+ lkb->lkb_last_cb_mode = DLM_LOCK_IV;
lkb->lkb_nodeid = -1;
lkb->lkb_grmode = DLM_LOCK_IV;
kref_init(&lkb->lkb_ref);
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
- INIT_LIST_HEAD(&lkb->lkb_cb_list);
- INIT_LIST_HEAD(&lkb->lkb_callbacks);
- spin_lock_init(&lkb->lkb_cb_lock);
- INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
-
- idr_preload(GFP_NOFS);
- spin_lock(&ls->ls_lkbidr_spin);
- rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
- if (rv >= 0)
- lkb->lkb_id = rv;
- spin_unlock(&ls->ls_lkbidr_spin);
- idr_preload_end();
+
+ write_lock_bh(&ls->ls_lkbxa_lock);
+ rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
+ write_unlock_bh(&ls->ls_lkbxa_lock);
if (rv < 0) {
- log_error(ls, "create_lkb idr error %d", rv);
+ log_error(ls, "create_lkb xa error %d", rv);
dlm_free_lkb(lkb);
return rv;
}
@@ -1228,18 +1536,28 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
{
- return _create_lkb(ls, lkb_ret, 1, 0);
+ return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
}
static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
- spin_lock(&ls->ls_lkbidr_spin);
- lkb = idr_find(&ls->ls_lkbidr, lkid);
- if (lkb)
- kref_get(&lkb->lkb_ref);
- spin_unlock(&ls->ls_lkbidr_spin);
+ rcu_read_lock();
+ lkb = xa_load(&ls->ls_lkbxa, lkid);
+ if (lkb) {
+ /* check if lkb is still part of lkbxa under lkbxa_lock as
+ * the lkb_ref is tight to the lkbxa data structure, see
+ * __put_lkb().
+ */
+ read_lock_bh(&ls->ls_lkbxa_lock);
+ if (kref_read(&lkb->lkb_ref))
+ kref_get(&lkb->lkb_ref);
+ else
+ lkb = NULL;
+ read_unlock_bh(&ls->ls_lkbxa_lock);
+ }
+ rcu_read_unlock();
*lkb_ret = lkb;
return lkb ? 0 : -ENOENT;
@@ -1263,11 +1581,11 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
uint32_t lkid = lkb->lkb_id;
int rv;
- rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
- &ls->ls_lkbidr_spin);
+ rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
+ &ls->ls_lkbxa_lock);
if (rv) {
- idr_remove(&ls->ls_lkbidr, lkid);
- spin_unlock(&ls->ls_lkbidr_spin);
+ xa_erase(&ls->ls_lkbxa, lkid);
+ write_unlock_bh(&ls->ls_lkbxa_lock);
detach_lkb(lkb);
@@ -1377,10 +1695,8 @@ static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
{
- hold_lkb(lkb);
del_lkb(r, lkb);
add_lkb(r, lkb, sts);
- unhold_lkb(lkb);
}
static int msg_reply_type(int mstype)
@@ -1403,19 +1719,11 @@ static int msg_reply_type(int mstype)
/* add/remove lkb from global waiters list of lkb's waiting for
a reply from a remote node */
-static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
+static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
- int error = 0;
-
- mutex_lock(&ls->ls_waiters_mutex);
-
- if (is_overlap_unlock(lkb) ||
- (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
- error = -EINVAL;
- goto out;
- }
+ spin_lock_bh(&ls->ls_waiters_lock);
if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
switch (mstype) {
case DLM_MSG_UNLOCK:
@@ -1425,7 +1733,11 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
break;
default:
- error = -EBUSY;
+ /* should never happen as validate_lock_args() checks
+ * on lkb_wait_type and validate_unlock_args() only
+ * creates UNLOCK or CANCEL messages.
+ */
+ WARN_ON_ONCE(1);
goto out;
}
lkb->lkb_wait_count++;
@@ -1447,12 +1759,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
hold_lkb(lkb);
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
out:
- if (error)
- log_error(ls, "addwait error %x %d flags %x %d %d %s",
- lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
- lkb->lkb_wait_type, lkb->lkb_resource->res_name);
- mutex_unlock(&ls->ls_waiters_mutex);
- return error;
+ spin_unlock_bh(&ls->ls_waiters_lock);
}
/* We clear the RESEND flag because we might be taking an lkb off the waiters
@@ -1551,14 +1858,18 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock_bh(&ls->ls_waiters_lock);
error = _remove_from_waiters(lkb, mstype, NULL);
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
/* Handles situations where we might be processing a "fake" or "local" reply in
- which we can't try to take waiters_mutex again. */
+ * the recovery context which stops any locking activity. Only debugfs might
+ * change the lockspace waiters but they will held the recovery lock to ensure
+ * remove_from_waiters_ms() in local case will be the only user manipulating the
+ * lockspace waiters in recovery context.
+ */
static int remove_from_waiters_ms(struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local)
@@ -1567,159 +1878,16 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
int error;
if (!local)
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock_bh(&ls->ls_waiters_lock);
+ else
+ WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
+ !dlm_locking_stopped(ls));
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
if (!local)
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return error;
}
-static void shrink_bucket(struct dlm_ls *ls, int b)
-{
- struct rb_node *n, *next;
- struct dlm_rsb *r;
- char *name;
- int our_nodeid = dlm_our_nodeid();
- int remote_count = 0;
- int need_shrink = 0;
- int i, len, rv;
-
- memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
-
- spin_lock(&ls->ls_rsbtbl[b].lock);
-
- if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- return;
- }
-
- for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
- next = rb_next(n);
- r = rb_entry(n, struct dlm_rsb, res_hashnode);
-
- /* If we're the directory record for this rsb, and
- we're not the master of it, then we need to wait
- for the master node to send us a dir remove for
- before removing the dir record. */
-
- if (!dlm_no_directory(ls) &&
- (r->res_master_nodeid != our_nodeid) &&
- (dlm_dir_nodeid(r) == our_nodeid)) {
- continue;
- }
-
- need_shrink = 1;
-
- if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.ci_toss_secs * HZ)) {
- continue;
- }
-
- if (!dlm_no_directory(ls) &&
- (r->res_master_nodeid == our_nodeid) &&
- (dlm_dir_nodeid(r) != our_nodeid)) {
-
- /* We're the master of this rsb but we're not
- the directory record, so we need to tell the
- dir node to remove the dir record. */
-
- ls->ls_remove_lens[remote_count] = r->res_length;
- memcpy(ls->ls_remove_names[remote_count], r->res_name,
- DLM_RESNAME_MAXLEN);
- remote_count++;
-
- if (remote_count >= DLM_REMOVE_NAMES_MAX)
- break;
- continue;
- }
-
- if (!kref_put(&r->res_ref, kill_rsb)) {
- log_error(ls, "tossed rsb in use %s", r->res_name);
- continue;
- }
-
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- dlm_free_rsb(r);
- }
-
- if (need_shrink)
- set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
- else
- clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
-
- /*
- * While searching for rsb's to free, we found some that require
- * remote removal. We leave them in place and find them again here
- * so there is a very small gap between removing them from the toss
- * list and sending the removal. Keeping this gap small is
- * important to keep us (the master node) from being out of sync
- * with the remote dir node for very long.
- */
-
- for (i = 0; i < remote_count; i++) {
- name = ls->ls_remove_names[i];
- len = ls->ls_remove_lens[i];
-
- spin_lock(&ls->ls_rsbtbl[b].lock);
- rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
- if (rv) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- log_debug(ls, "remove_name not toss %s", name);
- continue;
- }
-
- if (r->res_master_nodeid != our_nodeid) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- log_debug(ls, "remove_name master %d dir %d our %d %s",
- r->res_master_nodeid, r->res_dir_nodeid,
- our_nodeid, name);
- continue;
- }
-
- if (r->res_dir_nodeid == our_nodeid) {
- /* should never happen */
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- log_error(ls, "remove_name dir %d master %d our %d %s",
- r->res_dir_nodeid, r->res_master_nodeid,
- our_nodeid, name);
- continue;
- }
-
- if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.ci_toss_secs * HZ)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- log_debug(ls, "remove_name toss_time %lu now %lu %s",
- r->res_toss_time, jiffies, name);
- continue;
- }
-
- if (!kref_put(&r->res_ref, kill_rsb)) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- log_error(ls, "remove_name in use %s", name);
- continue;
- }
-
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- send_remove(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
-
- dlm_free_rsb(r);
- }
-}
-
-void dlm_scan_rsbs(struct dlm_ls *ls)
-{
- int i;
-
- for (i = 0; i < ls->ls_rsbtbl_size; i++) {
- shrink_bucket(ls, i);
- if (dlm_locking_stopped(ls))
- break;
- cond_resched();
- }
-}
-
/* lkb is master or local copy */
static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -2538,7 +2706,6 @@ static void process_lookup_list(struct dlm_rsb *r)
list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
list_del_init(&lkb->lkb_rsb_lookup);
_request_lock(r, lkb);
- schedule();
}
}
@@ -2701,16 +2868,14 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
case -EINVAL:
/* annoy the user because dlm usage is wrong */
WARN_ON(1);
- log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
+ log_error(ls, "%s %d %x %x %x %d %d", __func__,
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
- lkb->lkb_status, lkb->lkb_wait_type,
- lkb->lkb_resource->res_name);
+ lkb->lkb_status, lkb->lkb_wait_type);
break;
default:
- log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
+ log_debug(ls, "%s %d %x %x %x %d %d", __func__,
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
- lkb->lkb_status, lkb->lkb_wait_type,
- lkb->lkb_resource->res_name);
+ lkb->lkb_status, lkb->lkb_wait_type);
break;
}
@@ -2768,13 +2933,16 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
goto out;
}
+ if (is_overlap_unlock(lkb))
+ goto out;
+
/* cancel not allowed with another cancel/unlock in progress */
if (args->flags & DLM_LKF_CANCEL) {
if (lkb->lkb_exflags & DLM_LKF_CANCEL)
goto out;
- if (is_overlap(lkb))
+ if (is_overlap_cancel(lkb))
goto out;
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
@@ -2812,9 +2980,6 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
goto out;
- if (is_overlap_unlock(lkb))
- goto out;
-
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
rv = -EBUSY;
@@ -3332,8 +3497,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
static int _create_message(struct dlm_ls *ls, int mb_len,
int to_nodeid, int mstype,
struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret,
- gfp_t allocation)
+ struct dlm_mhandle **mh_ret)
{
struct dlm_message *ms;
struct dlm_mhandle *mh;
@@ -3343,7 +3507,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
pass into midcomms_commit and a message buffer (mb) that we
write our data into */
- mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
+ mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
if (!mh)
return -ENOBUFS;
@@ -3365,8 +3529,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
int to_nodeid, int mstype,
struct dlm_message **ms_ret,
- struct dlm_mhandle **mh_ret,
- gfp_t allocation)
+ struct dlm_mhandle **mh_ret)
{
int mb_len = sizeof(struct dlm_message);
@@ -3387,7 +3550,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
- ms_ret, mh_ret, allocation);
+ ms_ret, mh_ret);
}
/* further lowcomms enhancements or alternate implementations may make
@@ -3452,11 +3615,8 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
to_nodeid = r->res_nodeid;
- error = add_to_waiters(lkb, mstype, to_nodeid);
- if (error)
- return error;
-
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+ add_to_waiters(lkb, mstype, to_nodeid);
+ error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
if (error)
goto fail;
@@ -3516,8 +3676,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
- GFP_NOFS);
+ error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
if (error)
goto out;
@@ -3538,8 +3697,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
- GFP_NOFS);
+ error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
if (error)
goto out;
@@ -3560,12 +3718,8 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
to_nodeid = dlm_dir_nodeid(r);
- error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
- if (error)
- return error;
-
- error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
- GFP_NOFS);
+ add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
+ error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
if (error)
goto fail;
@@ -3589,8 +3743,7 @@ static int send_remove(struct dlm_rsb *r)
to_nodeid = dlm_dir_nodeid(r);
- error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
- GFP_ATOMIC);
+ error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
if (error)
goto out;
@@ -3611,7 +3764,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
to_nodeid = lkb->lkb_nodeid;
- error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
+ error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
if (error)
goto out;
@@ -3653,8 +3806,7 @@ static int send_lookup_reply(struct dlm_ls *ls,
struct dlm_mhandle *mh;
int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
- error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
- GFP_NOFS);
+ error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
if (error)
goto out;
@@ -4139,7 +4291,6 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
{
char name[DLM_RESNAME_MAXLEN+1];
struct dlm_rsb *r;
- uint32_t hash, b;
int rv, len, dir_nodeid, from_nodeid;
from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
@@ -4159,68 +4310,76 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
return;
}
- /* Look for name on rsbtbl.toss, if it's there, kill it.
- If it's on rsbtbl.keep, it's being used, and we should ignore this
- message. This is an expected race between the dir node sending a
- request to the master node at the same time as the master node sends
- a remove to the dir node. The resolution to that race is for the
- dir node to ignore the remove message, and the master node to
- recreate the master rsb when it gets a request from the dir node for
- an rsb it doesn't have. */
+ /*
+ * Look for inactive rsb, if it's there, free it.
+ * If the rsb is active, it's being used, and we should ignore this
+ * message. This is an expected race between the dir node sending a
+ * request to the master node at the same time as the master node sends
+ * a remove to the dir node. The resolution to that race is for the
+ * dir node to ignore the remove message, and the master node to
+ * recreate the master rsb when it gets a request from the dir node for
+ * an rsb it doesn't have.
+ */
memset(name, 0, sizeof(name));
memcpy(name, ms->m_extra, len);
- hash = jhash(name, len, 0);
- b = hash & (ls->ls_rsbtbl_size - 1);
+ rcu_read_lock();
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+ if (rv) {
+ rcu_read_unlock();
+ /* should not happen */
+ log_error(ls, "%s from %d not found %s", __func__,
+ from_nodeid, name);
+ return;
+ }
- spin_lock(&ls->ls_rsbtbl[b].lock);
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+ if (!rsb_flag(r, RSB_HASHED)) {
+ rcu_read_unlock();
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ /* should not happen */
+ log_error(ls, "%s from %d got removed during removal %s",
+ __func__, from_nodeid, name);
+ return;
+ }
+ /* at this stage the rsb can only being freed here */
+ rcu_read_unlock();
- rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
- if (rv) {
- /* verify the rsb is on keep list per comment above */
- rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
- if (rv) {
- /* should not happen */
- log_error(ls, "receive_remove from %d not found %s",
- from_nodeid, name);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- return;
- }
+ if (!rsb_flag(r, RSB_INACTIVE)) {
if (r->res_master_nodeid != from_nodeid) {
/* should not happen */
- log_error(ls, "receive_remove keep from %d master %d",
+ log_error(ls, "receive_remove on active rsb from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
+ /* Ignore the remove message, see race comment above. */
+
log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
if (r->res_master_nodeid != from_nodeid) {
- log_error(ls, "receive_remove toss from %d master %d",
+ log_error(ls, "receive_remove inactive from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
- if (kref_put(&r->res_ref, kill_rsb)) {
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- dlm_free_rsb(r);
- } else {
- log_error(ls, "receive_remove from %d rsb ref error",
- from_nodeid);
- dlm_print_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- }
+ list_del(&r->res_slow_list);
+ rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+ dlm_rhash_rsb_params);
+ rsb_clear_flag(r, RSB_HASHED);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+
+ free_inactive_rsb(r);
}
static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
@@ -4407,7 +4566,6 @@ static void _receive_convert_reply(struct dlm_lkb *lkb,
if (error)
goto out;
- /* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
@@ -4446,7 +4604,6 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb,
if (error)
goto out;
- /* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
@@ -4498,7 +4655,6 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb,
if (error)
goto out;
- /* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
@@ -4757,20 +4913,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
int nodeid)
{
- if (dlm_locking_stopped(ls)) {
+try_again:
+ read_lock_bh(&ls->ls_requestqueue_lock);
+ if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
/* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the
lockspace generation before we left. */
if (WARN_ON_ONCE(!ls->ls_generation)) {
+ read_unlock_bh(&ls->ls_requestqueue_lock);
log_limit(ls, "receive %d from %d ignore old gen",
le32_to_cpu(ms->m_type), nodeid);
return;
}
+ read_unlock_bh(&ls->ls_requestqueue_lock);
+ write_lock_bh(&ls->ls_requestqueue_lock);
+ /* recheck because we hold writelock now */
+ if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
+ write_unlock_bh(&ls->ls_requestqueue_lock);
+ goto try_again;
+ }
+
dlm_add_requestqueue(ls, nodeid, ms);
+ write_unlock_bh(&ls->ls_requestqueue_lock);
} else {
- dlm_wait_requestqueue(ls);
_receive_message(ls, ms, 0);
+ read_unlock_bh(&ls->ls_requestqueue_lock);
}
}
@@ -4830,7 +4998,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
be inactive (in this ls) before transitioning to recovery mode */
- down_read(&ls->ls_recv_active);
+ read_lock_bh(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, &p->message, nodeid);
else if (hd->h_cmd == DLM_RCOM)
@@ -4838,7 +5006,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
else
log_error(ls, "invalid h_cmd %d from %d lockspace %x",
hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
- up_read(&ls->ls_recv_active);
+ read_unlock_bh(&ls->ls_recv_active);
dlm_put_lockspace(ls);
}
@@ -4847,16 +5015,19 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms_local)
{
if (middle_conversion(lkb)) {
+ log_rinfo(ls, "%s %x middle convert in progress", __func__,
+ lkb->lkb_id);
+
+ /* We sent this lock to the new master. The new master will
+ * tell us when it's granted. We no longer need a reply, so
+ * use a fake reply to put the lkb into the right state.
+ */
hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_convert_reply(lkb, ms_local, true);
-
- /* Same special case as in receive_rcom_lock_args() */
- lkb->lkb_grmode = DLM_LOCK_IV;
- rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
unhold_lkb(lkb);
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
@@ -4899,8 +5070,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (!ms_local)
return;
- mutex_lock(&ls->ls_waiters_mutex);
-
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
@@ -4993,7 +5162,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
}
schedule();
}
- mutex_unlock(&ls->ls_waiters_mutex);
kfree(ms_local);
}
@@ -5001,7 +5169,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
{
struct dlm_lkb *lkb = NULL, *iter;
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
hold_lkb(iter);
@@ -5009,7 +5177,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
break;
}
}
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock_bh(&ls->ls_waiters_lock);
return lkb;
}
@@ -5109,9 +5277,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
}
/* Forcibly remove from waiters list */
- mutex_lock(&ls->ls_waiters_mutex);
+ spin_lock_bh(&ls->ls_waiters_lock);
list_del_init(&lkb->lkb_wait_reply);
- mutex_unlock(&ls->ls_waiters_mutex);
+ spin_unlock_bh(&ls->ls_waiters_lock);
/*
* The lkb is now clear of all prior waiters state and can be
@@ -5144,7 +5312,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
case DLM_MSG_LOOKUP:
case DLM_MSG_REQUEST:
_request_lock(r, lkb);
- if (is_master(r))
+ if (r->res_nodeid != -1 && is_master(r))
confirm_master(r, 0);
break;
case DLM_MSG_CONVERT:
@@ -5236,7 +5404,7 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
/* Get rid of locks held by nodes that are gone. */
-void dlm_recover_purge(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
{
struct dlm_rsb *r;
struct dlm_member *memb;
@@ -5255,11 +5423,9 @@ void dlm_recover_purge(struct dlm_ls *ls)
if (!nodes_count)
return;
- down_write(&ls->ls_root_sem);
- list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
- hold_rsb(r);
+ list_for_each_entry(r, root_list, res_root_list) {
lock_rsb(r);
- if (is_master(r)) {
+ if (r->res_nodeid != -1 && is_master(r)) {
purge_dead_list(ls, r, &r->res_grantqueue,
nodeid_gone, &lkb_count);
purge_dead_list(ls, r, &r->res_convertqueue,
@@ -5268,25 +5434,21 @@ void dlm_recover_purge(struct dlm_ls *ls)
nodeid_gone, &lkb_count);
}
unlock_rsb(r);
- unhold_rsb(r);
+
cond_resched();
}
- up_write(&ls->ls_root_sem);
if (lkb_count)
log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
lkb_count, nodes_count);
}
-static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
{
- struct rb_node *n;
struct dlm_rsb *r;
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
- for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
- r = rb_entry(n, struct dlm_rsb, res_hashnode);
-
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue;
if (!is_master(r)) {
@@ -5294,10 +5456,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
continue;
}
hold_rsb(r);
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
return r;
}
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
return NULL;
}
@@ -5321,19 +5483,15 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
void dlm_recover_grant(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- int bucket = 0;
unsigned int count = 0;
unsigned int rsb_count = 0;
unsigned int lkb_count = 0;
while (1) {
- r = find_grant_rsb(ls, bucket);
- if (!r) {
- if (bucket == ls->ls_rsbtbl_size - 1)
- break;
- bucket++;
- continue;
- }
+ r = find_grant_rsb(ls);
+ if (!r)
+ break;
+
rsb_count++;
count = 0;
lock_rsb(r);
@@ -5416,10 +5574,11 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
The real granted mode of these converting locks cannot be determined
until all locks have been rebuilt on the rsb (recover_conversion) */
- if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
- middle_conversion(lkb)) {
- rl->rl_status = DLM_LKSTS_CONVERT;
- lkb->lkb_grmode = DLM_LOCK_IV;
+ if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
+ /* We may need to adjust grmode depending on other granted locks. */
+ log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
+ __func__, lkb->lkb_id, lkb->lkb_grmode,
+ lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
rsb_set_flag(r, RSB_RECOVER_CONVERT);
}
@@ -5641,10 +5800,10 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
}
/* add this new lkb to the per-process list of locks */
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
hold_lkb(lkb);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
do_put = false;
out_put:
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
@@ -5726,7 +5885,7 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
int found_other_mode = 0;
int rv = 0;
- mutex_lock(&ls->ls_orphans_mutex);
+ spin_lock_bh(&ls->ls_orphans_lock);
list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
if (iter->lkb_resource->res_length != namelen)
continue;
@@ -5743,7 +5902,7 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
*lkid = iter->lkb_id;
break;
}
- mutex_unlock(&ls->ls_orphans_mutex);
+ spin_unlock_bh(&ls->ls_orphans_lock);
if (!lkb && found_other_mode) {
rv = -EAGAIN;
@@ -5774,9 +5933,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
* for the proc locks list.
*/
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
out:
kfree(ua_tmp);
return rv;
@@ -5820,11 +5979,11 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
if (error)
goto out_put;
- spin_lock(&ua->proc->locks_spin);
+ spin_lock_bh(&ua->proc->locks_spin);
/* dlm_user_add_cb() may have already taken lkb off the proc list */
if (!list_empty(&lkb->lkb_ownqueue))
list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
- spin_unlock(&ua->proc->locks_spin);
+ spin_unlock_bh(&ua->proc->locks_spin);
out_put:
trace_dlm_unlock_end(ls, lkb, flags, error);
dlm_put_lkb(lkb);
@@ -5935,9 +6094,9 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
int error;
hold_lkb(lkb); /* reference for the ls_orphans list */
- mutex_lock(&ls->ls_orphans_mutex);
+ spin_lock_bh(&ls->ls_orphans_lock);
list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
- mutex_unlock(&ls->ls_orphans_mutex);
+ spin_unlock_bh(&ls->ls_orphans_lock);
set_unlock_args(0, lkb->lkb_ua, &args);
@@ -5975,7 +6134,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
{
struct dlm_lkb *lkb = NULL;
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
if (list_empty(&proc->locks))
goto out;
@@ -5987,7 +6146,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
else
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
out:
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
return lkb;
}
@@ -6003,6 +6162,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
{
+ struct dlm_callback *cb, *cb_safe;
struct dlm_lkb *lkb, *safe;
dlm_lock_recovery(ls);
@@ -6023,7 +6183,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- spin_lock(&ls->ls_clear_proc_locks);
+ spin_lock_bh(&ls->ls_clear_proc_locks);
/* in-progress unlocks */
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
@@ -6032,29 +6192,29 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb);
}
- list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
- dlm_purge_lkb_callbacks(lkb);
- list_del_init(&lkb->lkb_cb_list);
- dlm_put_lkb(lkb);
+ list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
+ list_del(&cb->list);
+ dlm_free_cb(cb);
}
- spin_unlock(&ls->ls_clear_proc_locks);
+ spin_unlock_bh(&ls->ls_clear_proc_locks);
dlm_unlock_recovery(ls);
}
static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
{
+ struct dlm_callback *cb, *cb_safe;
struct dlm_lkb *lkb, *safe;
while (1) {
lkb = NULL;
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
if (!list_empty(&proc->locks)) {
lkb = list_entry(proc->locks.next, struct dlm_lkb,
lkb_ownqueue);
list_del_init(&lkb->lkb_ownqueue);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
if (!lkb)
break;
@@ -6064,21 +6224,20 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb); /* ref from proc->locks list */
}
- spin_lock(&proc->locks_spin);
+ spin_lock_bh(&proc->locks_spin);
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
list_del_init(&lkb->lkb_ownqueue);
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
dlm_put_lkb(lkb);
}
- spin_unlock(&proc->locks_spin);
+ spin_unlock_bh(&proc->locks_spin);
- spin_lock(&proc->asts_spin);
- list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
- dlm_purge_lkb_callbacks(lkb);
- list_del_init(&lkb->lkb_cb_list);
- dlm_put_lkb(lkb);
+ spin_lock_bh(&proc->asts_spin);
+ list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
+ list_del(&cb->list);
+ dlm_free_cb(cb);
}
- spin_unlock(&proc->asts_spin);
+ spin_unlock_bh(&proc->asts_spin);
}
/* pid of 0 means purge all orphans */
@@ -6087,7 +6246,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
{
struct dlm_lkb *lkb, *safe;
- mutex_lock(&ls->ls_orphans_mutex);
+ spin_lock_bh(&ls->ls_orphans_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
if (pid && lkb->lkb_ownpid != pid)
continue;
@@ -6095,7 +6254,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
list_del_init(&lkb->lkb_ownqueue);
dlm_put_lkb(lkb);
}
- mutex_unlock(&ls->ls_orphans_mutex);
+ spin_unlock_bh(&ls->ls_orphans_lock);
}
static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
@@ -6105,7 +6264,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
int error;
error = _create_message(ls, sizeof(struct dlm_message), nodeid,
- DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
+ DLM_MSG_PURGE, &ms, &mh);
if (error)
return error;
ms->m_nodeid = cpu_to_le32(nodeid);
@@ -6188,8 +6347,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
if (error)
return error;
- error = add_to_waiters(lkb, mstype, to_nodeid);
+ add_to_waiters(lkb, mstype, to_nodeid);
dlm_put_lkb(lkb);
- return error;
+ return 0;
}