diff options
Diffstat (limited to 'fs/dlm/lock.c')
| -rw-r--r-- | fs/dlm/lock.c | 3037 |
1 files changed, 1544 insertions, 1493 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 6df332296c66..be938fdf17d9 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,11 +1,9 @@ +// SPDX-License-Identifier: GPL-2.0-only /****************************************************************************** ******************************************************************************* ** ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. ** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. ** ******************************************************************************* ******************************************************************************/ @@ -55,13 +53,15 @@ R: do_xxxx() L: receive_xxxx_reply() <- R: send_xxxx_reply() */ +#include <trace/events/dlm.h> + #include <linux/types.h> #include <linux/rbtree.h> #include <linux/slab.h> #include "dlm_internal.h" #include <linux/dlm_device.h> #include "memory.h" -#include "lowcomms.h" +#include "midcomms.h" #include "requestqueue.h" #include "util.h" #include "dir.h" @@ -86,11 +86,10 @@ static int send_remove(struct dlm_rsb *r); static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms); -static int receive_extralen(struct dlm_message *ms); + const struct dlm_message *ms, bool local); +static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); -static void del_timeout(struct dlm_lkb *lkb); -static void toss_rsb(struct kref *kref); +static void deactivate_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -164,7 +163,7 @@ void dlm_print_lkb(struct dlm_lkb *lkb) printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, - lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, + dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode, lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, (unsigned long long)lkb->lkb_recover_seq); } @@ -202,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r) /* Threads cannot use the lockspace while it's being recovered */ -static inline void dlm_lock_recovery(struct dlm_ls *ls) +void dlm_lock_recovery(struct dlm_ls *ls) { down_read(&ls->ls_in_recovery); } @@ -229,12 +228,12 @@ static inline int force_blocking_asts(struct dlm_lkb *lkb) static inline int is_demoted(struct dlm_lkb *lkb) { - return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); + return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags); } static inline int is_altmode(struct dlm_lkb *lkb) { - return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); + return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags); } static inline int is_granted(struct dlm_lkb *lkb) @@ -250,12 +249,13 @@ static inline int is_remote(struct dlm_rsb *r) static inline int is_process_copy(struct dlm_lkb *lkb) { - return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); + return lkb->lkb_nodeid && + !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); } static inline int is_master_copy(struct dlm_lkb *lkb) { - return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; + return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); } static inline int middle_conversion(struct dlm_lkb *lkb) @@ -273,18 +273,18 @@ static inline int down_conversion(struct dlm_lkb *lkb) static inline int is_overlap_unlock(struct dlm_lkb *lkb) { - return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; + return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); } static inline int is_overlap_cancel(struct dlm_lkb *lkb) { - return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; + return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); } static inline int is_overlap(struct dlm_lkb *lkb) { - return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | - DLM_IFL_OVERLAP_CANCEL)); + return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) || + test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); } static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) @@ -292,23 +292,13 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) if (is_master_copy(lkb)) return; - del_timeout(lkb); - DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); - /* if the operation was a cancel, then return -DLM_ECANCEL, if a - timeout caused the cancel then return -ETIMEDOUT */ - if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { - lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; - rv = -ETIMEDOUT; - } - - if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { - lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; + if (rv == -DLM_ECANCEL && + test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags)) rv = -EDEADLK; - } - dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); + dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb)); } static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -330,11 +320,18 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) * Basic operations on rsb's and lkb's */ +static inline unsigned long rsb_toss_jiffies(void) +{ + return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ); +} + /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ static inline void hold_rsb(struct dlm_rsb *r) { + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); kref_get(&r->res_ref); } @@ -343,17 +340,45 @@ void dlm_hold_rsb(struct dlm_rsb *r) hold_rsb(r); } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ +/* TODO move this to lib/refcount.c */ +static __must_check bool +dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock) +__cond_acquires(lock) +{ + if (refcount_dec_not_one(r)) + return false; + + write_lock_bh(lock); + if (!refcount_dec_and_test(r)) { + write_unlock_bh(lock); + return false; + } + + return true; +} + +/* TODO move this to include/linux/kref.h */ +static inline int dlm_kref_put_write_lock_bh(struct kref *kref, + void (*release)(struct kref *kref), + rwlock_t *lock) +{ + if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) { + release(kref); + return 1; + } + + return 0; +} static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - uint32_t bucket = r->res_bucket; + int rv; - spin_lock(&ls->ls_rsbtbl[bucket].lock); - kref_put(&r->res_ref, toss_rsb); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, + &ls->ls_rsbtbl_lock); + if (rv) + write_unlock_bh(&ls->ls_rsbtbl_lock); } void dlm_put_rsb(struct dlm_rsb *r) @@ -361,141 +386,265 @@ void dlm_put_rsb(struct dlm_rsb *r) put_rsb(r); } -static int pre_rsb_struct(struct dlm_ls *ls) +/* connected with timer_delete_sync() in dlm_ls_stop() to stop + * new timers when recovery is triggered and don't run them + * again until a resume_scan_timer() tries it again. + */ +static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) { - struct dlm_rsb *r1, *r2; - int count = 0; + if (!dlm_locking_stopped(ls)) + mod_timer(&ls->ls_scan_timer, jiffies); +} - spin_lock(&ls->ls_new_rsb_spin); - if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { - spin_unlock(&ls->ls_new_rsb_spin); - return 0; - } - spin_unlock(&ls->ls_new_rsb_spin); +/* This function tries to resume the timer callback if a rsb + * is on the scan list and no timer is pending. It might that + * the first entry is on currently executed as timer callback + * but we don't care if a timer queued up again and does + * nothing. Should be a rare case. + */ +void resume_scan_timer(struct dlm_ls *ls) +{ + struct dlm_rsb *r; - r1 = dlm_allocate_rsb(ls); - r2 = dlm_allocate_rsb(ls); + spin_lock_bh(&ls->ls_scan_lock); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (r && !timer_pending(&ls->ls_scan_timer)) + enable_scan_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_scan_lock); +} + +/* ls_rsbtbl_lock must be held */ + +static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) +{ + struct dlm_rsb *first; - spin_lock(&ls->ls_new_rsb_spin); - if (r1) { - list_add(&r1->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; + /* active rsbs should never be on the scan list */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + + spin_lock_bh(&ls->ls_scan_lock); + r->res_toss_time = 0; + + /* if the rsb is not queued do nothing */ + if (list_empty(&r->res_scan_list)) + goto out; + + /* get the first element before delete */ + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_del_init(&r->res_scan_list); + /* check if the first element was the rsb we deleted */ + if (first == r) { + /* try to get the new first element, if the list + * is empty now try to delete the timer, if we are + * too late we don't care. + * + * if the list isn't empty and a new first element got + * in place, set the new timer expire time. + */ + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (!first) + timer_delete(&ls->ls_scan_timer); + else + enable_scan_timer(ls, first->res_toss_time); } - if (r2) { - list_add(&r2->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; + +out: + spin_unlock_bh(&ls->ls_scan_lock); +} + +static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) +{ + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *first; + + /* A dir record for a remote master rsb should never be on the scan list. */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* An active rsb should never be on the scan list. */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + + /* An rsb should not already be on the scan list. */ + WARN_ON(!list_empty(&r->res_scan_list)); + + spin_lock_bh(&ls->ls_scan_lock); + /* set the new rsb absolute expire time in the rsb */ + r->res_toss_time = rsb_toss_jiffies(); + if (list_empty(&ls->ls_scan_list)) { + /* if the queue is empty add the element and it's + * our new expire time + */ + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + enable_scan_timer(ls, r->res_toss_time); + } else { + /* try to get the maybe new first element and then add + * to this rsb with the oldest expire time to the end + * of the queue. If the list was empty before this + * rsb expire time is our next expiration if it wasn't + * the now new first elemet is our new expiration time + */ + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + if (!first) + enable_scan_timer(ls, r->res_toss_time); + else + enable_scan_timer(ls, first->res_toss_time); } - count = ls->ls_new_rsb_count; - spin_unlock(&ls->ls_new_rsb_spin); + spin_unlock_bh(&ls->ls_scan_lock); +} - if (!count) - return -ENOMEM; - return 0; +/* if we hit contention we do in 250 ms a retry to trylock. + * if there is any other mod_timer in between we don't care + * about that it expires earlier again this is only for the + * unlikely case nothing happened in this time. + */ +#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) + +/* Called by lockspace scan_timer to free unused rsb's. */ + +void dlm_rsb_scan(struct timer_list *timer) +{ + struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer); + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *r; + int rv; + + while (1) { + /* interrupting point to leave iteration when + * recovery waits for timer_delete_sync(), recovery + * will take care to delete everything in scan list. + */ + if (dlm_locking_stopped(ls)) + break; + + rv = spin_trylock(&ls->ls_scan_lock); + if (!rv) { + /* rearm again try timer */ + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (!r) { + /* the next add_scan will enable the timer again */ + spin_unlock(&ls->ls_scan_lock); + break; + } + + /* + * If the first rsb is not yet expired, then stop because the + * list is sorted with nearest expiration first. + */ + if (time_before(jiffies, r->res_toss_time)) { + /* rearm with the next rsb to expire in the future */ + enable_scan_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_scan_lock); + break; + } + + /* in find_rsb_dir/nodir there is a reverse order of this + * lock, however this is only a trylock if we hit some + * possible contention we try it again. + */ + rv = write_trylock(&ls->ls_rsbtbl_lock); + if (!rv) { + spin_unlock(&ls->ls_scan_lock); + /* rearm again try timer */ + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + list_del(&r->res_slow_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); + + /* ls_rsbtbl_lock is not needed when calling send_remove() */ + write_unlock(&ls->ls_rsbtbl_lock); + + list_del_init(&r->res_scan_list); + spin_unlock(&ls->ls_scan_lock); + + /* An rsb that is a dir record for a remote master rsb + * cannot be removed, and should not have a timer enabled. + */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* We're the master of this rsb but we're not + * the directory record, so we need to tell the + * dir node to remove the dir record + */ + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) + send_remove(r); + + free_inactive_rsb(r); + } } /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can unlock any spinlocks, go back and call pre_rsb_struct again. Otherwise, take an rsb off the list and return it. */ -static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, +static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, struct dlm_rsb **r_ret) { struct dlm_rsb *r; - int count; - spin_lock(&ls->ls_new_rsb_spin); - if (list_empty(&ls->ls_new_rsb)) { - count = ls->ls_new_rsb_count; - spin_unlock(&ls->ls_new_rsb_spin); - log_debug(ls, "find_rsb retry %d %d %s", - count, dlm_config.ci_new_rsb_count, name); - return -EAGAIN; - } - - r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); - list_del(&r->res_hashchain); - /* Convert the empty list_head to a NULL rb_node for tree usage: */ - memset(&r->res_hashnode, 0, sizeof(struct rb_node)); - ls->ls_new_rsb_count--; - spin_unlock(&ls->ls_new_rsb_spin); + r = dlm_allocate_rsb(); + if (!r) + return -ENOMEM; r->res_ls = ls; r->res_length = len; memcpy(r->res_name, name, len); - mutex_init(&r->res_mutex); + spin_lock_init(&r->res_lock); INIT_LIST_HEAD(&r->res_lookup); INIT_LIST_HEAD(&r->res_grantqueue); INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); + INIT_LIST_HEAD(&r->res_scan_list); INIT_LIST_HEAD(&r->res_recover_list); + INIT_LIST_HEAD(&r->res_masters_list); *r_ret = r; return 0; } -static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) +int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, + struct dlm_rsb **r_ret) { - char maxname[DLM_RESNAME_MAXLEN]; + char key[DLM_RESNAME_MAXLEN] = {}; - memset(maxname, 0, DLM_RESNAME_MAXLEN); - memcpy(maxname, name, nlen); - return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); -} + memcpy(key, name, len); + *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params); + if (*r_ret) + return 0; -int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, - struct dlm_rsb **r_ret) -{ - struct rb_node *node = tree->rb_node; - struct dlm_rsb *r; - int rc; - - while (node) { - r = rb_entry(node, struct dlm_rsb, res_hashnode); - rc = rsb_cmp(r, name, len); - if (rc < 0) - node = node->rb_left; - else if (rc > 0) - node = node->rb_right; - else - goto found; - } - *r_ret = NULL; return -EBADR; - - found: - *r_ret = r; - return 0; } -static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) +static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - struct rb_node **newn = &tree->rb_node; - struct rb_node *parent = NULL; - int rc; - - while (*newn) { - struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, - res_hashnode); + int rv; - parent = *newn; - rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); - if (rc < 0) - newn = &parent->rb_left; - else if (rc > 0) - newn = &parent->rb_right; - else { - log_print("rsb_insert match"); - dlm_dump_rsb(rsb); - dlm_dump_rsb(cur); - return -EEXIST; - } - } + rv = rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); + if (!rv) + rsb_set_flag(rsb, RSB_HASHED); - rb_link_node(&rsb->res_hashnode, parent, newn); - rb_insert_color(&rsb->res_hashnode, tree); - return 0; + return rv; } /* @@ -525,7 +674,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * - * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * @@ -542,9 +691,8 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) * while that rsb has a potentially stale master.) */ -static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, - uint32_t hash, uint32_t b, - int dir_nodeid, int from_nodeid, +static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, + uint32_t hash, int dir_nodeid, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r = NULL; @@ -574,9 +722,9 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if - * someone sends us a request after we have removed/freed an rsb - * from our toss list. (They sent a request instead of lookup - * because they are using an rsb from their toss list.) + * someone sends us a request after we have removed/freed an rsb. + * (They sent a request instead of lookup because they are using + * an rsb taken from their scan list.) */ if (from_local || from_dir || @@ -585,52 +733,83 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, } retry: - if (create) { - error = pre_rsb_struct(ls); - if (error < 0) - goto out; - } - - spin_lock(&ls->ls_rsbtbl[b].lock); - - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) - goto do_toss; + goto do_new; + + /* check if the rsb is active under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; + goto do_new; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } + kref_get(&r->res_ref); - error = 0; - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto out; - do_toss: - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + do_inactive: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* + * The expectation here is that the rsb will have HASHED and + * INACTIVE flags set, and that the rsb can be moved from + * inactive back to active again. However, between releasing + * the read lock and acquiring the write lock, this rsb could + * have been removed from rsbtbl, and had HASHED cleared, to + * be freed. To deal with this case, we would normally need + * to repeat dlm_search_rsb_tree while holding the write lock, + * but rcu allows us to simply check the HASHED flag, because + * the rcu read lock means the rsb will not be freed yet. + * If the HASHED flag is not set, then the rsb is being freed, + * so we add a new rsb struct. If the HASHED flag is set, + * and INACTIVE is not set, it means another thread has + * made the rsb active, as we're expecting to do here, and + * we just repeat the lookup (this will be very unlikely.) + */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; goto do_new; + } /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread - * is using this rsb because it's on the toss list, so we can + * is using this rsb because it's inactive, so we can * look at or update res_master_nodeid without lock_rsb. */ if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node) has sent us a request */ - log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */ - log_error(ls, "find_rsb toss from_dir %d master %d", + log_error(ls, "find_rsb inactive from_dir %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); /* fix it and go on */ @@ -647,9 +826,18 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, r->res_first_lkid = 0; } - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - goto out_unlock; + /* we always deactivate scan timer for the rsb, when + * we move it out of the inactive state as rsb state + * can be changed and scan timers are only for inactive + * rsbs. + */ + del_scan(ls, r); + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); + kref_init(&r->res_ref); /* ref is now used in active state */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + + goto out; do_new: @@ -658,18 +846,13 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, */ if (error == -EBADR && !create) - goto out_unlock; + goto out; error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = dir_nodeid; kref_init(&r->res_ref); @@ -689,7 +872,7 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, dlm_free_rsb(r); r = NULL; error = -ENOTBLK; - goto out_unlock; + goto out; } if (from_other) { @@ -709,9 +892,20 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, } out_add: - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (!error) { + list_add(&r->res_slow_list, &ls->ls_slow_active); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); out: *r_ret = r; return error; @@ -721,9 +915,8 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, dlm_recover_locks) before we've made ourself master (in dlm_recover_masters). */ -static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, - uint32_t hash, uint32_t b, - int dir_nodeid, int from_nodeid, +static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, + uint32_t hash, int dir_nodeid, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r = NULL; @@ -732,59 +925,82 @@ static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, int error; retry: - error = pre_rsb_struct(ls); - if (error < 0) - goto out; + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto do_new; - spin_lock(&ls->ls_rsbtbl[b].lock); + /* check if the rsb is in active state under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_new; + } - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (error) - goto do_toss; + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ kref_get(&r->res_ref); - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto out; - do_toss: - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + + do_inactive: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* See comment in find_rsb_dir. */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new; + } + /* * rsb found inactive. No other thread is using this rsb because - * it's on the toss list, so we can look at or update - * res_master_nodeid without lock_rsb. + * it's inactive, so we can look at or update res_master_nodeid + * without lock_rsb. */ if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { /* our rsb is not master, and another node has sent us a request; this should never happen */ - log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d", from_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if (!recover && (r->res_master_nodeid != our_nodeid) && (dir_nodeid == our_nodeid)) { /* our rsb is not master, and we are dir; may as well fix it; this should never happen */ - log_error(ls, "find_rsb toss our %d master %d dir %d", + log_error(ls, "find_rsb inactive our %d master %d dir %d", our_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; } - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - goto out_unlock; + del_scan(ls, r); + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); + kref_init(&r->res_ref); + write_unlock_bh(&ls->ls_rsbtbl_lock); + + goto out; do_new: @@ -793,48 +1009,98 @@ static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, */ error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = dir_nodeid; r->res_master_nodeid = dir_nodeid; r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; kref_init(&r->res_ref); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (!error) { + list_add(&r->res_slow_list, &ls->ls_slow_active); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); + out: *r_ret = r; return error; } -static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid, - unsigned int flags, struct dlm_rsb **r_ret) +/* + * rsb rcu usage + * + * While rcu read lock is held, the rsb cannot be freed, + * which allows a lookup optimization. + * + * Two threads are accessing the same rsb concurrently, + * the first (A) is trying to use the rsb, the second (B) + * is trying to free the rsb. + * + * thread A thread B + * (trying to use rsb) (trying to free rsb) + * + * A1. rcu read lock + * A2. rsbtbl read lock + * A3. look up rsb in rsbtbl + * A4. rsbtbl read unlock + * B1. rsbtbl write lock + * B2. look up rsb in rsbtbl + * B3. remove rsb from rsbtbl + * B4. clear rsb HASHED flag + * B5. rsbtbl write unlock + * B6. begin freeing rsb using rcu... + * + * (rsb is inactive, so try to make it active again) + * A5. read rsb HASHED flag (safe because rsb is not freed yet) + * A6. the rsb HASHED flag is not set, which it means the rsb + * is being removed from rsbtbl and freed, so don't use it. + * A7. rcu read unlock + * + * B7. ...finish freeing rsb using rcu + * A8. create a new rsb + * + * Without the rcu optimization, steps A5-8 would need to do + * an extra rsbtbl lookup: + * A5. rsbtbl write lock + * A6. look up rsb in rsbtbl, not found + * A7. rsbtbl write unlock + * A8. create a new rsb + */ + +static int find_rsb(struct dlm_ls *ls, const void *name, int len, + int from_nodeid, unsigned int flags, + struct dlm_rsb **r_ret) { - uint32_t hash, b; int dir_nodeid; + uint32_t hash; + int rv; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - dir_nodeid = dlm_hash2nodeid(ls, hash); + rcu_read_lock(); if (dlm_no_directory(ls)) - return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); else - return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, - from_nodeid, flags, r_ret); + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, + from_nodeid, flags, r_ret); + rcu_read_unlock(); + return rv; } /* we have received a request and found that res_master_nodeid != our_nodeid, @@ -880,6 +1146,88 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, } } +static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, + int from_nodeid, bool is_inactive, unsigned int flags, + int *r_nodeid, int *result) +{ + int fix_master = (flags & DLM_LU_RECOVER_MASTER); + int from_master = (flags & DLM_LU_RECOVER_DIR); + + if (r->res_dir_nodeid != our_nodeid) { + /* should not happen, but may as well fix it and carry on */ + log_error(ls, "%s res_dir %d our %d %s", __func__, + r->res_dir_nodeid, our_nodeid, r->res_name); + r->res_dir_nodeid = our_nodeid; + } + + if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) { + /* Recovery uses this function to set a new master when + * the previous master failed. Setting NEW_MASTER will + * force dlm_recover_masters to call recover_master on this + * rsb even though the res_nodeid is no longer removed. + */ + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + + if (is_inactive) { + /* I don't think we should ever find it inactive. */ + log_error(ls, "%s fix_master inactive", __func__); + dlm_dump_rsb(r); + } + } + + if (from_master && (r->res_master_nodeid != from_nodeid)) { + /* this will happen if from_nodeid became master during + * a previous recovery cycle, and we aborted the previous + * cycle before recovering this master value + */ + + log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s", + __func__, from_nodeid, r->res_master_nodeid, + r->res_nodeid, r->res_first_lkid, r->res_name); + + if (r->res_master_nodeid == our_nodeid) { + log_error(ls, "from_master %d our_master", from_nodeid); + dlm_dump_rsb(r); + goto ret_assign; + } + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + } + + if (!r->res_master_nodeid) { + /* this will happen if recovery happens while we're looking + * up the master for this rsb + */ + + log_debug(ls, "%s master 0 to %d first %x %s", __func__, + from_nodeid, r->res_first_lkid, r->res_name); + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + } + + if (!from_master && !fix_master && + (r->res_master_nodeid == from_nodeid)) { + /* this can happen when the master sends remove, the dir node + * finds the rsb on the active list and ignores the remove, + * and the former master sends a lookup + */ + + log_limit(ls, "%s from master %d flags %x first %x %s", + __func__, from_nodeid, flags, r->res_first_lkid, + r->res_name); + } + + ret_assign: + *r_nodeid = r->res_master_nodeid; + if (result) + *result = DLM_LU_MATCH; +} + /* * We're the dir node for this res and another node wants to know the * master nodeid. During normal operation (non recovery) this is only @@ -909,15 +1257,13 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, - unsigned int flags, int *r_nodeid, int *result) +static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; - uint32_t hash, b; - int from_master = (flags & DLM_LU_RECOVER_DIR); - int fix_master = (flags & DLM_LU_RECOVER_MASTER); + uint32_t hash; int our_nodeid = dlm_our_nodeid(); - int dir_nodeid, error, toss_list = 0; + int dir_nodeid, error; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; @@ -929,8 +1275,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, } hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - dir_nodeid = dlm_hash2nodeid(ls, hash); if (dir_nodeid != our_nodeid) { log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", @@ -941,227 +1285,199 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, } retry: - error = pre_rsb_struct(ls); - if (error < 0) - return error; - - spin_lock(&ls->ls_rsbtbl[b].lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (!error) { - /* because the rsb is active, we need to lock_rsb before - checking/changing re_master_nodeid */ - - hold_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - lock_rsb(r); - goto found; - } - - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) goto not_found; - /* because the rsb is inactive (on toss list), it's not refcounted - and lock_rsb is not used, but is protected by the rsbtbl lock */ + /* check if the rsb is active under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto not_found; + } - toss_list = 1; - found: - if (r->res_dir_nodeid != our_nodeid) { - /* should not happen, but may as well fix it and carry on */ - log_error(ls, "dlm_master_lookup res_dir %d our %d %s", - r->res_dir_nodeid, our_nodeid, r->res_name); - r->res_dir_nodeid = our_nodeid; + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; } - if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { - /* Recovery uses this function to set a new master when - the previous master failed. Setting NEW_MASTER will - force dlm_recover_masters to call recover_master on this - rsb even though the res_nodeid is no longer removed. */ + /* because the rsb is active, we need to lock_rsb before + * checking/changing re_master_nodeid + */ - r->res_master_nodeid = from_nodeid; - r->res_nodeid = from_nodeid; - rsb_set_flag(r, RSB_NEW_MASTER); + hold_rsb(r); + read_unlock_bh(&ls->ls_rsbtbl_lock); + lock_rsb(r); - if (toss_list) { - /* I don't think we should ever find it on toss list. */ - log_error(ls, "dlm_master_lookup fix_master on toss"); - dlm_dump_rsb(r); - } - } + __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, + flags, r_nodeid, result); - if (from_master && (r->res_master_nodeid != from_nodeid)) { - /* this will happen if from_nodeid became master during - a previous recovery cycle, and we aborted the previous - cycle before recovering this master value */ + /* the rsb was active */ + unlock_rsb(r); + put_rsb(r); - log_limit(ls, "dlm_master_lookup from_master %d " - "master_nodeid %d res_nodeid %d first %x %s", - from_nodeid, r->res_master_nodeid, r->res_nodeid, - r->res_first_lkid, r->res_name); + return 0; - if (r->res_master_nodeid == our_nodeid) { - log_error(ls, "from_master %d our_master", from_nodeid); - dlm_dump_rsb(r); - dlm_send_rcom_lookup_dump(r, from_nodeid); - goto out_found; + do_inactive: + /* unlikely path - check if still part of ls_rsbtbl */ + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* see comment in find_rsb_dir */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* something as changed, very unlikely but + * try again + */ + goto retry; } - - r->res_master_nodeid = from_nodeid; - r->res_nodeid = from_nodeid; - rsb_set_flag(r, RSB_NEW_MASTER); + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto not_found; } - if (!r->res_master_nodeid) { - /* this will happen if recovery happens while we're looking - up the master for this rsb */ + /* because the rsb is inactive, it's not refcounted and lock_rsb + is not used, but is protected by the rsbtbl lock */ - log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s", - from_nodeid, r->res_first_lkid, r->res_name); - r->res_master_nodeid = from_nodeid; - r->res_nodeid = from_nodeid; - } + __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, + r_nodeid, result); - if (!from_master && !fix_master && - (r->res_master_nodeid == from_nodeid)) { - /* this can happen when the master sends remove, the dir node - finds the rsb on the keep list and ignores the remove, - and the former master sends a lookup */ - - log_limit(ls, "dlm_master_lookup from master %d flags %x " - "first %x %s", from_nodeid, flags, - r->res_first_lkid, r->res_name); - } + /* A dir record rsb should never be on scan list. + * Except when we are the dir and master node. + * This function should only be called by the dir + * node. + */ + WARN_ON(!list_empty(&r->res_scan_list) && + r->res_master_nodeid != our_nodeid); - out_found: - *r_nodeid = r->res_master_nodeid; - if (result) - *result = DLM_LU_MATCH; + write_unlock_bh(&ls->ls_rsbtbl_lock); - if (toss_list) { - r->res_toss_time = jiffies; - /* the rsb was inactive (on toss list) */ - spin_unlock(&ls->ls_rsbtbl[b].lock); - } else { - /* the rsb was active */ - unlock_rsb(r); - put_rsb(r); - } return 0; not_found: error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = our_nodeid; r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; - kref_init(&r->res_ref); - r->res_toss_time = jiffies; - - error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); - if (error) { + rsb_set_flag(r, RSB_INACTIVE); + + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (error) { + write_unlock_bh(&ls->ls_rsbtbl_lock); /* should never happen */ dlm_free_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); goto retry; } + list_add(&r->res_slow_list, &ls->ls_slow_inactive); + write_unlock_bh(&ls->ls_rsbtbl_lock); + if (result) *result = DLM_LU_ADD; *r_nodeid = from_nodeid; - error = 0; - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + out: return error; } +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) +{ + int rv; + rcu_read_lock(); + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); + rcu_read_unlock(); + return rv; +} + static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { - struct rb_node *n; struct dlm_rsb *r; - int i; - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - spin_lock(&ls->ls_rsbtbl[i].lock); - for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - if (r->res_hash == hash) - dlm_dump_rsb(r); - } - spin_unlock(&ls->ls_rsbtbl[i].lock); + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { + if (r->res_hash == hash) + dlm_dump_rsb(r); } + read_unlock_bh(&ls->ls_rsbtbl_lock); } -void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) +void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) { struct dlm_rsb *r = NULL; - uint32_t hash, b; int error; - hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - - spin_lock(&ls->ls_rsbtbl[b].lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + rcu_read_lock(); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) - goto out_dump; - - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) goto out; - out_dump: + dlm_dump_rsb(r); out: - spin_unlock(&ls->ls_rsbtbl[b].lock); + rcu_read_unlock(); } -static void toss_rsb(struct kref *kref) +static void deactivate_rsb(struct kref *kref) { struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; + int our_nodeid = dlm_our_nodeid(); DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); - kref_init(&r->res_ref); - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); - rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); - r->res_toss_time = jiffies; - ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; + rsb_set_flag(r, RSB_INACTIVE); + list_move(&r->res_slow_list, &ls->ls_slow_inactive); + + /* + * When the rsb becomes unused, there are two possibilities: + * 1. Leave the inactive rsb in place (don't remove it). + * 2. Add it to the scan list to be removed. + * + * 1 is done when the rsb is acting as the dir record + * for a remotely mastered rsb. The rsb must be left + * in place as an inactive rsb to act as the dir record. + * + * 2 is done when a) the rsb is not the master and not the + * dir record, b) when the rsb is both the master and the + * dir record, c) when the rsb is master but not dir record. + * + * (If no directory is used, the rsb can always be removed.) + */ + if (dlm_no_directory(ls) || + (r->res_master_nodeid == our_nodeid || + dlm_dir_nodeid(r) != our_nodeid)) + add_scan(ls, r); + if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } -/* See comment for unhold_lkb */ - -static void unhold_rsb(struct dlm_rsb *r) -{ - int rv; - rv = kref_put(&r->res_ref, toss_rsb); - DLM_ASSERT(!rv, dlm_dump_rsb(r);); -} - -static void kill_rsb(struct kref *kref) +void free_inactive_rsb(struct dlm_rsb *r) { - struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); - - /* All work is done after the return from kref_put() so we - can release the write_lock before the remove and free. */ + WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); + + dlm_free_rsb(r); } /* Attaching/detaching lkb's from rsb's is for rsb reference counting. @@ -1181,35 +1497,36 @@ static void detach_lkb(struct dlm_lkb *lkb) } } -static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) +static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, + unsigned long start, unsigned long end) { + struct xa_limit limit; struct dlm_lkb *lkb; int rv; - lkb = dlm_allocate_lkb(ls); + limit.max = end; + limit.min = start; + + lkb = dlm_allocate_lkb(); if (!lkb) return -ENOMEM; + lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV; + lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV; + lkb->lkb_last_cb_mode = DLM_LOCK_IV; lkb->lkb_nodeid = -1; lkb->lkb_grmode = DLM_LOCK_IV; kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); - INIT_LIST_HEAD(&lkb->lkb_time_list); - INIT_LIST_HEAD(&lkb->lkb_cb_list); - mutex_init(&lkb->lkb_cb_mutex); - INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); - - idr_preload(GFP_NOFS); - spin_lock(&ls->ls_lkbidr_spin); - rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT); - if (rv >= 0) - lkb->lkb_id = rv; - spin_unlock(&ls->ls_lkbidr_spin); - idr_preload_end(); + + write_lock_bh(&ls->ls_lkbxa_lock); + rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC); + write_unlock_bh(&ls->ls_lkbxa_lock); if (rv < 0) { - log_error(ls, "create_lkb idr error %d", rv); + log_error(ls, "create_lkb xa error %d", rv); + dlm_free_lkb(lkb); return rv; } @@ -1217,15 +1534,30 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) return 0; } +static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) +{ + return _create_lkb(ls, lkb_ret, 1, ULONG_MAX); +} + static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - spin_lock(&ls->ls_lkbidr_spin); - lkb = idr_find(&ls->ls_lkbidr, lkid); - if (lkb) - kref_get(&lkb->lkb_ref); - spin_unlock(&ls->ls_lkbidr_spin); + rcu_read_lock(); + lkb = xa_load(&ls->ls_lkbxa, lkid); + if (lkb) { + /* check if lkb is still part of lkbxa under lkbxa_lock as + * the lkb_ref is tight to the lkbxa data structure, see + * __put_lkb(). + */ + read_lock_bh(&ls->ls_lkbxa_lock); + if (kref_read(&lkb->lkb_ref)) + kref_get(&lkb->lkb_ref); + else + lkb = NULL; + read_unlock_bh(&ls->ls_lkbxa_lock); + } + rcu_read_unlock(); *lkb_ret = lkb; return lkb ? 0 : -ENOENT; @@ -1247,11 +1579,13 @@ static void kill_lkb(struct kref *kref) static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) { uint32_t lkid = lkb->lkb_id; + int rv; - spin_lock(&ls->ls_lkbidr_spin); - if (kref_put(&lkb->lkb_ref, kill_lkb)) { - idr_remove(&ls->ls_lkbidr, lkid); - spin_unlock(&ls->ls_lkbidr_spin); + rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb, + &ls->ls_lkbxa_lock); + if (rv) { + xa_erase(&ls->ls_lkbxa, lkid); + write_unlock_bh(&ls->ls_lkbxa_lock); detach_lkb(lkb); @@ -1259,11 +1593,9 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) if (lkb->lkb_lvbptr && is_master_copy(lkb)) dlm_free_lvb(lkb->lkb_lvbptr); dlm_free_lkb(lkb); - return 1; - } else { - spin_unlock(&ls->ls_lkbidr_spin); - return 0; } + + return rv; } int dlm_put_lkb(struct dlm_lkb *lkb) @@ -1285,6 +1617,13 @@ static inline void hold_lkb(struct dlm_lkb *lkb) kref_get(&lkb->lkb_ref); } +static void unhold_lkb_assert(struct kref *kref) +{ + struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); + + DLM_ASSERT(false, dlm_print_lkb(lkb);); +} + /* This is called when we need to remove a reference and are certain it's not the last ref. e.g. del_lkb is always called between a find_lkb/put_lkb and is always the inverse of a previous add_lkb. @@ -1292,21 +1631,23 @@ static inline void hold_lkb(struct dlm_lkb *lkb) static inline void unhold_lkb(struct dlm_lkb *lkb) { - int rv; - rv = kref_put(&lkb->lkb_ref, kill_lkb); - DLM_ASSERT(!rv, dlm_print_lkb(lkb);); + kref_put(&lkb->lkb_ref, unhold_lkb_assert); } static void lkb_add_ordered(struct list_head *new, struct list_head *head, int mode) { - struct dlm_lkb *lkb = NULL; + struct dlm_lkb *lkb = NULL, *iter; - list_for_each_entry(lkb, head, lkb_statequeue) - if (lkb->lkb_rqmode < mode) + list_for_each_entry(iter, head, lkb_statequeue) + if (iter->lkb_rqmode < mode) { + lkb = iter; + list_add_tail(new, &iter->lkb_statequeue); break; + } - __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue); + if (!lkb) + list_add_tail(new, head); } /* add/remove lkb to rsb's grant/convert/wait queue */ @@ -1354,10 +1695,8 @@ static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) { - hold_lkb(lkb); del_lkb(r, lkb); add_lkb(r, lkb, sts); - unhold_lkb(lkb); } static int msg_reply_type(int mstype) @@ -1377,101 +1716,28 @@ static int msg_reply_type(int mstype) return -1; } -static int nodeid_warned(int nodeid, int num_nodes, int *warned) -{ - int i; - - for (i = 0; i < num_nodes; i++) { - if (!warned[i]) { - warned[i] = nodeid; - return 0; - } - if (warned[i] == nodeid) - return 1; - } - return 0; -} - -void dlm_scan_waiters(struct dlm_ls *ls) -{ - struct dlm_lkb *lkb; - s64 us; - s64 debug_maxus = 0; - u32 debug_scanned = 0; - u32 debug_expired = 0; - int num_nodes = 0; - int *warned = NULL; - - if (!dlm_config.ci_waitwarn_us) - return; - - mutex_lock(&ls->ls_waiters_mutex); - - list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { - if (!lkb->lkb_wait_time) - continue; - - debug_scanned++; - - us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time)); - - if (us < dlm_config.ci_waitwarn_us) - continue; - - lkb->lkb_wait_time = 0; - - debug_expired++; - if (us > debug_maxus) - debug_maxus = us; - - if (!num_nodes) { - num_nodes = ls->ls_num_nodes; - warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL); - } - if (!warned) - continue; - if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) - continue; - - log_error(ls, "waitwarn %x %lld %d us check connection to " - "node %d", lkb->lkb_id, (long long)us, - dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid); - } - mutex_unlock(&ls->ls_waiters_mutex); - kfree(warned); - - if (debug_expired) - log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us", - debug_scanned, debug_expired, - dlm_config.ci_waitwarn_us, (long long)debug_maxus); -} - /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) +static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; - int error = 0; - - mutex_lock(&ls->ls_waiters_mutex); - - if (is_overlap_unlock(lkb) || - (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { - error = -EINVAL; - goto out; - } + spin_lock_bh(&ls->ls_waiters_lock); if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { switch (mstype) { case DLM_MSG_UNLOCK: - lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); break; case DLM_MSG_CANCEL: - lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); break; default: - error = -EBUSY; + /* should never happen as validate_lock_args() checks + * on lkb_wait_type and validate_unlock_args() only + * creates UNLOCK or CANCEL messages. + */ + WARN_ON_ONCE(1); goto out; } lkb->lkb_wait_count++; @@ -1479,7 +1745,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", lkb->lkb_id, lkb->lkb_wait_type, mstype, - lkb->lkb_wait_count, lkb->lkb_flags); + lkb->lkb_wait_count, dlm_iflags_val(lkb)); goto out; } @@ -1489,17 +1755,11 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; - lkb->lkb_wait_time = ktime_get(); lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: - if (error) - log_error(ls, "addwait error %x %d flags %x %d %d %s", - lkb->lkb_id, error, lkb->lkb_flags, mstype, - lkb->lkb_wait_type, lkb->lkb_resource->res_name); - mutex_unlock(&ls->ls_waiters_mutex); - return error; + spin_unlock_bh(&ls->ls_waiters_lock); } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -1508,21 +1768,21 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) set RESEND and dlm_recover_waiters_post() */ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, - struct dlm_message *ms) + const struct dlm_message *ms) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int overlap_done = 0; - if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { + if (mstype == DLM_MSG_UNLOCK_REPLY && + test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) { log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; overlap_done = 1; goto out_del; } - if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { + if (mstype == DLM_MSG_CANCEL_REPLY && + test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) { log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; overlap_done = 1; goto out_del; } @@ -1546,13 +1806,13 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, lingering state of the cancel and fail with -EBUSY. */ if ((mstype == DLM_MSG_CONVERT_REPLY) && - (lkb->lkb_wait_type == DLM_MSG_CONVERT) && - is_overlap_cancel(lkb) && ms && !ms->m_result) { + (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result && + test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) { log_debug(ls, "remwait %x convert_reply zap overlap_cancel", lkb->lkb_id); lkb->lkb_wait_type = 0; - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; lkb->lkb_wait_count--; + unhold_lkb(lkb); goto out_del; } @@ -1565,8 +1825,8 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, } log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", - lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid, - mstype, lkb->lkb_flags); + lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0, + lkb->lkb_remid, mstype, dlm_iflags_val(lkb)); return -1; out_del: @@ -1579,12 +1839,13 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, log_error(ls, "remwait error %x reply %d wait_type %d overlap", lkb->lkb_id, mstype, lkb->lkb_wait_type); lkb->lkb_wait_count--; + unhold_lkb(lkb); lkb->lkb_wait_type = 0; } DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); - lkb->lkb_flags &= ~DLM_IFL_RESEND; + clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); lkb->lkb_wait_count--; if (!lkb->lkb_wait_count) list_del_init(&lkb->lkb_wait_reply); @@ -1597,349 +1858,34 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); error = _remove_from_waiters(lkb, mstype, NULL); - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); return error; } -/* Handles situations where we might be processing a "fake" or "stub" reply in - which we can't try to take waiters_mutex again. */ +/* Handles situations where we might be processing a "fake" or "local" reply in + * the recovery context which stops any locking activity. Only debugfs might + * change the lockspace waiters but they will held the recovery lock to ensure + * remove_from_waiters_ms() in local case will be the only user manipulating the + * lockspace waiters in recovery context. + */ -static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) +static int remove_from_waiters_ms(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; - if (ms->m_flags != DLM_IFL_STUB_MS) - mutex_lock(&ls->ls_waiters_mutex); - error = _remove_from_waiters(lkb, ms->m_type, ms); - if (ms->m_flags != DLM_IFL_STUB_MS) - mutex_unlock(&ls->ls_waiters_mutex); - return error; -} - -/* If there's an rsb for the same resource being removed, ensure - that the remove message is sent before the new lookup message. - It should be rare to need a delay here, but if not, then it may - be worthwhile to add a proper wait mechanism rather than a delay. */ - -static void wait_pending_remove(struct dlm_rsb *r) -{ - struct dlm_ls *ls = r->res_ls; - restart: - spin_lock(&ls->ls_remove_spin); - if (ls->ls_remove_len && - !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { - log_debug(ls, "delay lookup for remove dir %d %s", - r->res_dir_nodeid, r->res_name); - spin_unlock(&ls->ls_remove_spin); - msleep(1); - goto restart; - } - spin_unlock(&ls->ls_remove_spin); -} - -/* - * ls_remove_spin protects ls_remove_name and ls_remove_len which are - * read by other threads in wait_pending_remove. ls_remove_names - * and ls_remove_lens are only used by the scan thread, so they do - * not need protection. - */ - -static void shrink_bucket(struct dlm_ls *ls, int b) -{ - struct rb_node *n, *next; - struct dlm_rsb *r; - char *name; - int our_nodeid = dlm_our_nodeid(); - int remote_count = 0; - int need_shrink = 0; - int i, len, rv; - - memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); - - spin_lock(&ls->ls_rsbtbl[b].lock); - - if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - return; - } - - for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { - next = rb_next(n); - r = rb_entry(n, struct dlm_rsb, res_hashnode); - - /* If we're the directory record for this rsb, and - we're not the master of it, then we need to wait - for the master node to send us a dir remove for - before removing the dir record. */ - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) == our_nodeid)) { - continue; - } - - need_shrink = 1; - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - continue; - } - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid == our_nodeid) && - (dlm_dir_nodeid(r) != our_nodeid)) { - - /* We're the master of this rsb but we're not - the directory record, so we need to tell the - dir node to remove the dir record. */ - - ls->ls_remove_lens[remote_count] = r->res_length; - memcpy(ls->ls_remove_names[remote_count], r->res_name, - DLM_RESNAME_MAXLEN); - remote_count++; - - if (remote_count >= DLM_REMOVE_NAMES_MAX) - break; - continue; - } - - if (!kref_put(&r->res_ref, kill_rsb)) { - log_error(ls, "tossed rsb in use %s", r->res_name); - continue; - } - - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - dlm_free_rsb(r); - } - - if (need_shrink) - ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; + if (!local) + spin_lock_bh(&ls->ls_waiters_lock); else - ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; - spin_unlock(&ls->ls_rsbtbl[b].lock); - - /* - * While searching for rsb's to free, we found some that require - * remote removal. We leave them in place and find them again here - * so there is a very small gap between removing them from the toss - * list and sending the removal. Keeping this gap small is - * important to keep us (the master node) from being out of sync - * with the remote dir node for very long. - * - * From the time the rsb is removed from toss until just after - * send_remove, the rsb name is saved in ls_remove_name. A new - * lookup checks this to ensure that a new lookup message for the - * same resource name is not sent just before the remove message. - */ - - for (i = 0; i < remote_count; i++) { - name = ls->ls_remove_names[i]; - len = ls->ls_remove_lens[i]; - - spin_lock(&ls->ls_rsbtbl[b].lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (rv) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name not toss %s", name); - continue; - } - - if (r->res_master_nodeid != our_nodeid) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name master %d dir %d our %d %s", - r->res_master_nodeid, r->res_dir_nodeid, - our_nodeid, name); - continue; - } - - if (r->res_dir_nodeid == our_nodeid) { - /* should never happen */ - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "remove_name dir %d master %d our %d %s", - r->res_dir_nodeid, r->res_master_nodeid, - our_nodeid, name); - continue; - } - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name toss_time %lu now %lu %s", - r->res_toss_time, jiffies, name); - continue; - } - - if (!kref_put(&r->res_ref, kill_rsb)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "remove_name in use %s", name); - continue; - } - - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - - /* block lookup of same name until we've sent remove */ - spin_lock(&ls->ls_remove_spin); - ls->ls_remove_len = len; - memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); - spin_unlock(&ls->ls_remove_spin); - spin_unlock(&ls->ls_rsbtbl[b].lock); - - send_remove(r); - - /* allow lookup of name again */ - spin_lock(&ls->ls_remove_spin); - ls->ls_remove_len = 0; - memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); - spin_unlock(&ls->ls_remove_spin); - - dlm_free_rsb(r); - } -} - -void dlm_scan_rsbs(struct dlm_ls *ls) -{ - int i; - - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - shrink_bucket(ls, i); - if (dlm_locking_stopped(ls)) - break; - cond_resched(); - } -} - -static void add_timeout(struct dlm_lkb *lkb) -{ - struct dlm_ls *ls = lkb->lkb_resource->res_ls; - - if (is_master_copy(lkb)) - return; - - if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && - !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { - lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; - goto add_it; - } - if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) - goto add_it; - return; - - add_it: - DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb);); - mutex_lock(&ls->ls_timeout_mutex); - hold_lkb(lkb); - list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout); - mutex_unlock(&ls->ls_timeout_mutex); -} - -static void del_timeout(struct dlm_lkb *lkb) -{ - struct dlm_ls *ls = lkb->lkb_resource->res_ls; - - mutex_lock(&ls->ls_timeout_mutex); - if (!list_empty(&lkb->lkb_time_list)) { - list_del_init(&lkb->lkb_time_list); - unhold_lkb(lkb); - } - mutex_unlock(&ls->ls_timeout_mutex); -} - -/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and - lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex - and then lock rsb because of lock ordering in add_timeout. We may need - to specify some special timeout-related bits in the lkb that are just to - be accessed under the timeout_mutex. */ - -void dlm_scan_timeout(struct dlm_ls *ls) -{ - struct dlm_rsb *r; - struct dlm_lkb *lkb; - int do_cancel, do_warn; - s64 wait_us; - - for (;;) { - if (dlm_locking_stopped(ls)) - break; - - do_cancel = 0; - do_warn = 0; - mutex_lock(&ls->ls_timeout_mutex); - list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) { - - wait_us = ktime_to_us(ktime_sub(ktime_get(), - lkb->lkb_timestamp)); - - if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) && - wait_us >= (lkb->lkb_timeout_cs * 10000)) - do_cancel = 1; - - if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && - wait_us >= dlm_config.ci_timewarn_cs * 10000) - do_warn = 1; - - if (!do_cancel && !do_warn) - continue; - hold_lkb(lkb); - break; - } - mutex_unlock(&ls->ls_timeout_mutex); - - if (!do_cancel && !do_warn) - break; - - r = lkb->lkb_resource; - hold_rsb(r); - lock_rsb(r); - - if (do_warn) { - /* clear flag so we only warn once */ - lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; - if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT)) - del_timeout(lkb); - dlm_timeout_warn(lkb); - } - - if (do_cancel) { - log_debug(ls, "timeout cancel %x node %d %s", - lkb->lkb_id, lkb->lkb_nodeid, r->res_name); - lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; - lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; - del_timeout(lkb); - _cancel_lock(r, lkb); - } - - unlock_rsb(r); - unhold_rsb(r); - dlm_put_lkb(lkb); - } -} - -/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping - dlm_recoverd before checking/setting ls_recover_begin. */ - -void dlm_adjust_timeouts(struct dlm_ls *ls) -{ - struct dlm_lkb *lkb; - u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin); - - ls->ls_recover_begin = 0; - mutex_lock(&ls->ls_timeout_mutex); - list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) - lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); - mutex_unlock(&ls->ls_timeout_mutex); - - if (!dlm_config.ci_waitwarn_us) - return; - - mutex_lock(&ls->ls_waiters_mutex); - list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { - if (ktime_to_us(lkb->lkb_wait_time)) - lkb->lkb_wait_time = ktime_get(); - } - mutex_unlock(&ls->ls_waiters_mutex); + WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) || + !dlm_locking_stopped(ls)); + error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); + if (!local) + spin_unlock_bh(&ls->ls_waiters_lock); + return error; } /* lkb is master or local copy */ @@ -1992,7 +1938,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } if (rsb_flag(r, RSB_VALNOTVALID)) - lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; + set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags); } static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -2025,7 +1971,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) /* lkb is process copy (pc) */ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { int b; @@ -2041,7 +1987,7 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, if (len > r->res_ls->ls_lvblen) len = r->res_ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); - lkb->lkb_lvbseq = ms->m_lvbseq; + lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); } } @@ -2132,7 +2078,7 @@ static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { set_lvb_lock_pc(r, lkb, ms); _grant_lock(r, lkb); @@ -2170,12 +2116,12 @@ static void munge_demoted(struct dlm_lkb *lkb) lkb->lkb_grmode = DLM_LOCK_NL; } -static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) +static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms) { - if (ms->m_type != DLM_MSG_REQUEST_REPLY && - ms->m_type != DLM_MSG_GRANT) { + if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && + ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { log_print("munge_altmode %x invalid reply type %d", - lkb->lkb_id, ms->m_type); + lkb->lkb_id, le32_to_cpu(ms->m_type)); return; } @@ -2464,15 +2410,13 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, conversion_deadlock_detect(r, lkb)) { if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { lkb->lkb_grmode = DLM_LOCK_NL; - lkb->lkb_sbflags |= DLM_SBF_DEMOTED; - } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { - if (err) - *err = -EDEADLK; - else { - log_print("can_be_granted deadlock %x now %d", - lkb->lkb_id, now); - dlm_dump_rsb(r); - } + set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags); + } else if (err) { + *err = -EDEADLK; + } else { + log_print("can_be_granted deadlock %x now %d", + lkb->lkb_id, now); + dlm_dump_rsb(r); } goto out; } @@ -2493,7 +2437,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, lkb->lkb_rqmode = alt; rv = _can_be_granted(r, lkb, now, 0); if (rv) - lkb->lkb_sbflags |= DLM_SBF_ALTMODE; + set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags); else lkb->lkb_rqmode = rqmode; } @@ -2501,13 +2445,6 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, return rv; } -/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock - for locks pending on the convert list. Once verified (watch for these - log_prints), we should be able to just call _can_be_granted() and not - bother with the demote/deadlk cases here (and there's no easy way to deal - with a deadlk here, we'd have to generate something like grant_lock with - the deadlk error.) */ - /* Returns the highest requested mode of all blocked conversions; sets cw if there's a blocked conversion to DLM_LOCK_CW. */ @@ -2545,9 +2482,22 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, } if (deadlk) { - log_print("WARN: pending deadlock %x node %d %s", - lkb->lkb_id, lkb->lkb_nodeid, r->res_name); - dlm_dump_rsb(r); + /* + * If DLM_LKB_NODLKWT flag is set and conversion + * deadlock is detected, we request blocking AST and + * down (or cancel) conversion. + */ + if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) { + if (lkb->lkb_highbast < lkb->lkb_rqmode) { + queue_bast(r, lkb, lkb->lkb_rqmode); + lkb->lkb_highbast = lkb->lkb_rqmode; + } + } else { + log_print("WARN: pending deadlock %x node %d %s", + lkb->lkb_id, lkb->lkb_nodeid, + r->res_name); + dlm_dump_rsb(r); + } continue; } @@ -2744,8 +2694,6 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 0; } - wait_pending_remove(r); - r->res_first_lkid = lkb->lkb_id; send_lookup(r, lkb); return 1; @@ -2758,7 +2706,6 @@ static void process_lookup_list(struct dlm_rsb *r) list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { list_del_init(&lkb->lkb_rsb_lookup); _request_lock(r, lkb); - schedule(); } } @@ -2802,10 +2749,9 @@ static void confirm_master(struct dlm_rsb *r, int error) } static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, - int namelen, unsigned long timeout_cs, - void (*ast) (void *astparam), + int namelen, void (*ast)(void *astparam), void *astparam, - void (*bast) (void *astparam, int mode), + void (*bast)(void *astparam, int mode), struct dlm_args *args) { int rv = -EINVAL; @@ -2859,7 +2805,6 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, args->astfn = ast; args->astparam = astparam; args->bastfn = bast; - args->timeout = timeout_cs; args->mode = mode; args->lksb = lksb; rv = 0; @@ -2884,29 +2829,30 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_args *args) { - int rv = -EINVAL; + int rv = -EBUSY; if (args->flags & DLM_LKF_CONVERT) { - if (lkb->lkb_flags & DLM_IFL_MSTCPY) + if (lkb->lkb_status != DLM_LKSTS_GRANTED) goto out; - if (args->flags & DLM_LKF_QUECVT && - !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) + /* lock not allowed if there's any op in progress */ + if (lkb->lkb_wait_type || lkb->lkb_wait_count) goto out; - rv = -EBUSY; - if (lkb->lkb_status != DLM_LKSTS_GRANTED) + if (is_overlap(lkb)) goto out; - if (lkb->lkb_wait_type) + rv = -EINVAL; + if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) goto out; - if (is_overlap(lkb)) + if (args->flags & DLM_LKF_QUECVT && + !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) goto out; } lkb->lkb_exflags = args->flags; - lkb->lkb_sbflags = 0; + dlm_set_sbflags_val(lkb, 0); lkb->lkb_astfn = args->astfn; lkb->lkb_astparam = args->astparam; lkb->lkb_bastfn = args->bastfn; @@ -2914,14 +2860,25 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_lksb = args->lksb; lkb->lkb_lvbptr = args->lksb->sb_lvbptr; lkb->lkb_ownpid = (int) current->pid; - lkb->lkb_timeout_cs = args->timeout; rv = 0; out: - if (rv) - log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s", - rv, lkb->lkb_id, lkb->lkb_flags, args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + switch (rv) { + case 0: + break; + case -EINVAL: + /* annoy the user because dlm usage is wrong */ + WARN_ON(1); + log_error(ls, "%s %d %x %x %x %d %d", __func__, + rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, + lkb->lkb_status, lkb->lkb_wait_type); + break; + default: + log_debug(ls, "%s %d %x %x %x %d %d", __func__, + rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, + lkb->lkb_status, lkb->lkb_wait_type); + break; + } + return rv; } @@ -2935,23 +2892,12 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; - int rv = -EINVAL; - - if (lkb->lkb_flags & DLM_IFL_MSTCPY) { - log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); - dlm_print_lkb(lkb); - goto out; - } - - /* an lkb may still exist even though the lock is EOL'ed due to a - cancel, unlock or failed noqueue request; an app can't use these - locks; return same error as if the lkid had not been found at all */ + int rv = -EBUSY; - if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { - log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); - rv = -ENOENT; + /* normal unlock not allowed if there's any op in progress */ + if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && + (lkb->lkb_wait_type || lkb->lkb_wait_count)) goto out; - } /* an lkb may be waiting for an rsb lookup to complete where the lookup was initiated by another lock */ @@ -2966,24 +2912,41 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) unhold_lkb(lkb); /* undoes create_lkb() */ } /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ - rv = -EBUSY; goto out; } + rv = -EINVAL; + if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) { + log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); + dlm_print_lkb(lkb); + goto out; + } + + /* an lkb may still exist even though the lock is EOL'ed due to a + * cancel, unlock or failed noqueue request; an app can't use these + * locks; return same error as if the lkid had not been found at all + */ + + if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) { + log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); + rv = -ENOENT; + goto out; + } + + if (is_overlap_unlock(lkb)) + goto out; + /* cancel not allowed with another cancel/unlock in progress */ if (args->flags & DLM_LKF_CANCEL) { if (lkb->lkb_exflags & DLM_LKF_CANCEL) goto out; - if (is_overlap(lkb)) + if (is_overlap_cancel(lkb)) goto out; - /* don't let scand try to do a cancel */ - del_timeout(lkb); - - if (lkb->lkb_flags & DLM_IFL_RESEND) { - lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { + set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); rv = -EBUSY; goto out; } @@ -2998,7 +2961,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) switch (lkb->lkb_wait_type) { case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: - lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); rv = -EBUSY; goto out; case DLM_MSG_UNLOCK: @@ -3017,14 +2980,8 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) goto out; - if (is_overlap_unlock(lkb)) - goto out; - - /* don't let scand try to do a cancel */ - del_timeout(lkb); - - if (lkb->lkb_flags & DLM_IFL_RESEND) { - lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { + set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); rv = -EBUSY; goto out; } @@ -3032,33 +2989,41 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) switch (lkb->lkb_wait_type) { case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: - lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); rv = -EBUSY; goto out; case DLM_MSG_UNLOCK: goto out; } /* add_to_waiters() will set OVERLAP_UNLOCK */ - goto out_ok; } - /* normal unlock not allowed if there's any op in progress */ - rv = -EBUSY; - if (lkb->lkb_wait_type || lkb->lkb_wait_count) - goto out; - out_ok: /* an overlapping op shouldn't blow away exflags from other op */ lkb->lkb_exflags |= args->flags; - lkb->lkb_sbflags = 0; + dlm_set_sbflags_val(lkb, 0); lkb->lkb_astparam = args->astparam; rv = 0; out: - if (rv) - log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, - lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, + switch (rv) { + case 0: + break; + case -EINVAL: + /* annoy the user because dlm usage is wrong */ + WARN_ON(1); + log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv, + lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags, args->flags, lkb->lkb_wait_type, lkb->lkb_resource->res_name); + break; + default: + log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv, + lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags, + args->flags, lkb->lkb_wait_type, + lkb->lkb_resource->res_name); + break; + } + return rv; } @@ -3082,7 +3047,6 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) if (can_be_queued(lkb)) { error = -EINPROGRESS; add_lkb(r, lkb, DLM_LKSTS_WAITING); - add_timeout(lkb); goto out; } @@ -3123,7 +3087,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) deadlock, so we leave it on the granted queue and return EDEADLK in the ast for the convert. */ - if (deadlk) { + if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { /* it's left on the granted queue */ revert_lock(r, lkb); queue_cast(r, lkb, -EDEADLK); @@ -3151,7 +3115,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) error = -EINPROGRESS; del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); - add_timeout(lkb); goto out; } @@ -3309,8 +3272,9 @@ static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) * request_lock(), convert_lock(), unlock_lock(), cancel_lock() */ -static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, - int len, struct dlm_args *args) +static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, + const void *name, int len, + struct dlm_args *args) { struct dlm_rsb *r; int error; @@ -3409,7 +3373,7 @@ int dlm_lock(dlm_lockspace_t *lockspace, int mode, struct dlm_lksb *lksb, uint32_t flags, - void *name, + const void *name, unsigned int namelen, uint32_t parent_lkid, void (*ast) (void *astarg), @@ -3435,8 +3399,10 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (error) goto out; - error = set_lock_args(mode, lksb, flags, namelen, 0, ast, - astarg, bast, &args); + trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); + + error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast, + &args); if (error) goto out_put; @@ -3448,6 +3414,8 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (error == -EINPROGRESS) error = 0; out_put: + trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true); + if (convert || error) __put_lkb(ls, lkb); if (error == -EAGAIN || error == -EDEADLK) @@ -3479,6 +3447,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (error) goto out; + trace_dlm_unlock_start(ls, lkb, flags); + error = set_unlock_args(flags, astarg, &args); if (error) goto out_put; @@ -3493,6 +3463,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) error = 0; out_put: + trace_dlm_unlock_end(ls, lkb, flags, error); + dlm_put_lkb(lkb); out: dlm_unlock_recovery(ls); @@ -3532,24 +3504,22 @@ static int _create_message(struct dlm_ls *ls, int mb_len, char *mb; /* get_buffer gives us a message handle (mh) that we need to - pass into lowcomms_commit and a message buffer (mb) that we + pass into midcomms_commit and a message buffer (mb) that we write our data into */ - mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb); + mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb); if (!mh) return -ENOBUFS; - memset(mb, 0, mb_len); - ms = (struct dlm_message *) mb; - ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); - ms->m_header.h_lockspace = ls->ls_global_id; - ms->m_header.h_nodeid = dlm_our_nodeid(); - ms->m_header.h_length = mb_len; + ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); + ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); + ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); + ms->m_header.h_length = cpu_to_le16(mb_len); ms->m_header.h_cmd = DLM_MSG; - ms->m_type = mstype; + ms->m_type = cpu_to_le32(mstype); *mh_ret = mh; *ms_ret = ms; @@ -3574,7 +3544,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, case DLM_MSG_REQUEST_REPLY: case DLM_MSG_CONVERT_REPLY: case DLM_MSG_GRANT: - if (lkb && lkb->lkb_lvbptr) + if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK)) mb_len += r->res_ls->ls_lvblen; break; } @@ -3586,51 +3556,51 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, /* further lowcomms enhancements or alternate implementations may make the return value from this function useful at some point */ -static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) +static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms, + const void *name, int namelen) { - dlm_message_out(ms); - dlm_lowcomms_commit_buffer(mh); + dlm_midcomms_commit_mhandle(mh, name, namelen); return 0; } static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms) { - ms->m_nodeid = lkb->lkb_nodeid; - ms->m_pid = lkb->lkb_ownpid; - ms->m_lkid = lkb->lkb_id; - ms->m_remid = lkb->lkb_remid; - ms->m_exflags = lkb->lkb_exflags; - ms->m_sbflags = lkb->lkb_sbflags; - ms->m_flags = lkb->lkb_flags; - ms->m_lvbseq = lkb->lkb_lvbseq; - ms->m_status = lkb->lkb_status; - ms->m_grmode = lkb->lkb_grmode; - ms->m_rqmode = lkb->lkb_rqmode; - ms->m_hash = r->res_hash; + ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid); + ms->m_pid = cpu_to_le32(lkb->lkb_ownpid); + ms->m_lkid = cpu_to_le32(lkb->lkb_id); + ms->m_remid = cpu_to_le32(lkb->lkb_remid); + ms->m_exflags = cpu_to_le32(lkb->lkb_exflags); + ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb)); + ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb)); + ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); + ms->m_status = cpu_to_le32(lkb->lkb_status); + ms->m_grmode = cpu_to_le32(lkb->lkb_grmode); + ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode); + ms->m_hash = cpu_to_le32(r->res_hash); /* m_result and m_bastmode are set from function args, not from lkb fields */ if (lkb->lkb_bastfn) - ms->m_asts |= DLM_CB_BAST; + ms->m_asts |= cpu_to_le32(DLM_CB_BAST); if (lkb->lkb_astfn) - ms->m_asts |= DLM_CB_CAST; + ms->m_asts |= cpu_to_le32(DLM_CB_CAST); /* compare with switch in create_message; send_remove() doesn't use send_args() */ switch (ms->m_type) { - case DLM_MSG_REQUEST: - case DLM_MSG_LOOKUP: + case cpu_to_le32(DLM_MSG_REQUEST): + case cpu_to_le32(DLM_MSG_LOOKUP): memcpy(ms->m_extra, r->res_name, r->res_length); break; - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_REQUEST_REPLY: - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_GRANT: - if (!lkb->lkb_lvbptr) + case cpu_to_le32(DLM_MSG_CONVERT): + case cpu_to_le32(DLM_MSG_UNLOCK): + case cpu_to_le32(DLM_MSG_REQUEST_REPLY): + case cpu_to_le32(DLM_MSG_CONVERT_REPLY): + case cpu_to_le32(DLM_MSG_GRANT): + if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK)) break; memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); break; @@ -3645,17 +3615,14 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) to_nodeid = r->res_nodeid; - error = add_to_waiters(lkb, mstype, to_nodeid); - if (error) - return error; - + add_to_waiters(lkb, mstype, to_nodeid); error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto fail; send_args(r, lkb, ms); - error = send_message(mh, ms); + error = send_message(mh, ms, r->res_name, r->res_length); if (error) goto fail; return 0; @@ -3679,10 +3646,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* down conversions go without a reply from the master */ if (!error && down_conversion(lkb)) { remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); - r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS; - r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; - r->res_ls->ls_stub_ms.m_result = 0; - __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); + r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); + r->res_ls->ls_local_ms.m_result = 0; + __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true); } return error; @@ -3718,7 +3684,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) ms->m_result = 0; - error = send_message(mh, ms); + error = send_message(mh, ms, r->res_name, r->res_length); out: return error; } @@ -3737,9 +3703,9 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) send_args(r, lkb, ms); - ms->m_bastmode = mode; + ms->m_bastmode = cpu_to_le32(mode); - error = send_message(mh, ms); + error = send_message(mh, ms, r->res_name, r->res_length); out: return error; } @@ -3752,17 +3718,14 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) to_nodeid = dlm_dir_nodeid(r); - error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); - if (error) - return error; - + add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); if (error) goto fail; send_args(r, lkb, ms); - error = send_message(mh, ms); + error = send_message(mh, ms, r->res_name, r->res_length); if (error) goto fail; return 0; @@ -3785,9 +3748,9 @@ static int send_remove(struct dlm_rsb *r) goto out; memcpy(ms->m_extra, r->res_name, r->res_length); - ms->m_hash = r->res_hash; + ms->m_hash = cpu_to_le32(r->res_hash); - error = send_message(mh, ms); + error = send_message(mh, ms, r->res_name, r->res_length); out: return error; } @@ -3807,9 +3770,9 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, send_args(r, lkb, ms); - ms->m_result = rv; + ms->m_result = cpu_to_le32(to_dlm_errno(rv)); - error = send_message(mh, ms); + error = send_message(mh, ms, r->res_name, r->res_length); out: return error; } @@ -3834,23 +3797,24 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); } -static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, - int ret_nodeid, int rv) +static int send_lookup_reply(struct dlm_ls *ls, + const struct dlm_message *ms_in, int ret_nodeid, + int rv) { - struct dlm_rsb *r = &ls->ls_stub_rsb; + struct dlm_rsb *r = &ls->ls_local_rsb; struct dlm_message *ms; struct dlm_mhandle *mh; - int error, nodeid = ms_in->m_header.h_nodeid; + int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); if (error) goto out; ms->m_lkid = ms_in->m_lkid; - ms->m_result = rv; - ms->m_nodeid = ret_nodeid; + ms->m_result = cpu_to_le32(to_dlm_errno(rv)); + ms->m_nodeid = cpu_to_le32(ret_nodeid); - error = send_message(mh, ms); + error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in)); out: return error; } @@ -3859,31 +3823,32 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, of message, unlike the send side where we can safely send everything about the lkb for any type of message */ -static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) +static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms) { - lkb->lkb_exflags = ms->m_exflags; - lkb->lkb_sbflags = ms->m_sbflags; - lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | - (ms->m_flags & 0x0000FFFF); + lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); + dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); + dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); } -static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) +static void receive_flags_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, + bool local) { - if (ms->m_flags == DLM_IFL_STUB_MS) + if (local) return; - lkb->lkb_sbflags = ms->m_sbflags; - lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | - (ms->m_flags & 0x0000FFFF); + dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); + dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); } -static int receive_extralen(struct dlm_message *ms) +static int receive_extralen(const struct dlm_message *ms) { - return (ms->m_header.h_length - sizeof(struct dlm_message)); + return (le16_to_cpu(ms->m_header.h_length) - + sizeof(struct dlm_message)); } static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { int len; @@ -3911,16 +3876,16 @@ static void fake_astfn(void *astparam) } static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { - lkb->lkb_nodeid = ms->m_header.h_nodeid; - lkb->lkb_ownpid = ms->m_pid; - lkb->lkb_remid = ms->m_lkid; + lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); + lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); + lkb->lkb_remid = le32_to_cpu(ms->m_lkid); lkb->lkb_grmode = DLM_LOCK_IV; - lkb->lkb_rqmode = ms->m_rqmode; + lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); - lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; - lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL; + lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { /* lkb was just created so there won't be an lvb yet */ @@ -3933,7 +3898,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, } static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { if (lkb->lkb_status != DLM_LKSTS_GRANTED) return -EBUSY; @@ -3941,56 +3906,65 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, if (receive_lvb(ls, lkb, ms)) return -ENOMEM; - lkb->lkb_rqmode = ms->m_rqmode; - lkb->lkb_lvbseq = ms->m_lvbseq; + lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); + lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); return 0; } static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { if (receive_lvb(ls, lkb, ms)) return -ENOMEM; return 0; } -/* We fill in the stub-lkb fields with the info that send_xxxx_reply() +/* We fill in the local-lkb fields with the info that send_xxxx_reply() uses to send a reply and that the remote end uses to process the reply. */ -static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) +static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms) { - struct dlm_lkb *lkb = &ls->ls_stub_lkb; - lkb->lkb_nodeid = ms->m_header.h_nodeid; - lkb->lkb_remid = ms->m_lkid; + struct dlm_lkb *lkb = &ls->ls_local_lkb; + lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); + lkb->lkb_remid = le32_to_cpu(ms->m_lkid); } /* This is called after the rsb is locked so that we can safely inspect fields in the lkb. */ -static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) +static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms) { - int from = ms->m_header.h_nodeid; + int from = le32_to_cpu(ms->m_header.h_nodeid); int error = 0; + /* currently mixing of user/kernel locks are not supported */ + if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) && + !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { + log_error(lkb->lkb_resource->res_ls, + "got user dlm message for a kernel lock"); + error = -EINVAL; + goto out; + } + switch (ms->m_type) { - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_CANCEL: + case cpu_to_le32(DLM_MSG_CONVERT): + case cpu_to_le32(DLM_MSG_UNLOCK): + case cpu_to_le32(DLM_MSG_CANCEL): if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) error = -EINVAL; break; - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_UNLOCK_REPLY: - case DLM_MSG_CANCEL_REPLY: - case DLM_MSG_GRANT: - case DLM_MSG_BAST: + case cpu_to_le32(DLM_MSG_CONVERT_REPLY): + case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): + case cpu_to_le32(DLM_MSG_CANCEL_REPLY): + case cpu_to_le32(DLM_MSG_GRANT): + case cpu_to_le32(DLM_MSG_BAST): if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) error = -EINVAL; break; - case DLM_MSG_REQUEST_REPLY: + case cpu_to_le32(DLM_MSG_REQUEST_REPLY): if (!is_process_copy(lkb)) error = -EINVAL; else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) @@ -4001,87 +3975,31 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) error = -EINVAL; } +out: if (error) log_error(lkb->lkb_resource->res_ls, "ignore invalid message %d from %d %x %x %x %d", - ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, - lkb->lkb_flags, lkb->lkb_nodeid); + le32_to_cpu(ms->m_type), from, lkb->lkb_id, + lkb->lkb_remid, dlm_iflags_val(lkb), + lkb->lkb_nodeid); return error; } -static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) -{ - char name[DLM_RESNAME_MAXLEN + 1]; - struct dlm_message *ms; - struct dlm_mhandle *mh; - struct dlm_rsb *r; - uint32_t hash, b; - int rv, dir_nodeid; - - memset(name, 0, sizeof(name)); - memcpy(name, ms_name, len); - - hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - - dir_nodeid = dlm_hash2nodeid(ls, hash); - - log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name); - - spin_lock(&ls->ls_rsbtbl[b].lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (!rv) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "repeat_remove on keep %s", name); - return; - } - - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (!rv) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "repeat_remove on toss %s", name); - return; - } - - /* use ls->remove_name2 to avoid conflict with shrink? */ - - spin_lock(&ls->ls_remove_spin); - ls->ls_remove_len = len; - memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); - spin_unlock(&ls->ls_remove_spin); - spin_unlock(&ls->ls_rsbtbl[b].lock); - - rv = _create_message(ls, sizeof(struct dlm_message) + len, - dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); - if (rv) - return; - - memcpy(ms->m_extra, name, len); - ms->m_hash = hash; - - send_message(mh, ms); - - spin_lock(&ls->ls_remove_spin); - ls->ls_remove_len = 0; - memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); - spin_unlock(&ls->ls_remove_spin); -} - -static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int from_nodeid; int error, namelen = 0; - from_nodeid = ms->m_header.h_nodeid; + from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); error = create_lkb(ls, &lkb); if (error) goto fail; receive_flags(lkb, ms); - lkb->lkb_flags |= DLM_IFL_MSTCPY; + set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); error = receive_request_args(ls, lkb, ms); if (error) { __put_lkb(ls, lkb); @@ -4136,46 +4054,34 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) ENOTBLK request failures when the lookup reply designating us as master is delayed. */ - /* We could repeatedly return -EBADR here if our send_remove() is - delayed in being sent/arriving/being processed on the dir node. - Another node would repeatedly lookup up the master, and the dir - node would continue returning our nodeid until our send_remove - took effect. - - We send another remove message in case our previous send_remove - was lost/ignored/missed somehow. */ - if (error != -ENOTBLK) { log_limit(ls, "receive_request %x from %d %d", - ms->m_lkid, from_nodeid, error); - } - - if (namelen && error == -EBADR) { - send_repeat_remove(ls, ms->m_extra, namelen); - msleep(1000); + le32_to_cpu(ms->m_lkid), from_nodeid, error); } - setup_stub_lkb(ls, ms); - send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + setup_local_lkb(ls, ms); + send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); return error; } -static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error, reply = 1; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) goto fail; - if (lkb->lkb_remid != ms->m_lkid) { + if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { log_error(ls, "receive_convert %x remid %x recover_seq %llu " "remote %d %x", lkb->lkb_id, lkb->lkb_remid, (unsigned long long)lkb->lkb_recover_seq, - ms->m_header.h_nodeid, ms->m_lkid); + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid)); error = -ENOENT; + dlm_put_lkb(lkb); goto fail; } @@ -4209,26 +4115,28 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) return 0; fail: - setup_stub_lkb(ls, ms); - send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + setup_local_lkb(ls, ms); + send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); return error; } -static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) goto fail; - if (lkb->lkb_remid != ms->m_lkid) { + if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { log_error(ls, "receive_unlock %x remid %x remote %d %x", lkb->lkb_id, lkb->lkb_remid, - ms->m_header.h_nodeid, ms->m_lkid); + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid)); error = -ENOENT; + dlm_put_lkb(lkb); goto fail; } @@ -4259,18 +4167,18 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) return 0; fail: - setup_stub_lkb(ls, ms); - send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + setup_local_lkb(ls, ms); + send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); return error; } -static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) goto fail; @@ -4295,18 +4203,18 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) return 0; fail: - setup_stub_lkb(ls, ms); - send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + setup_local_lkb(ls, ms); + send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); return error; } -static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4319,7 +4227,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) if (error) goto out; - receive_flags_reply(lkb, ms); + receive_flags_reply(lkb, ms, false); if (is_altmode(lkb)) munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); @@ -4331,13 +4239,13 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4350,8 +4258,8 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) if (error) goto out; - queue_bast(r, lkb, ms->m_bastmode); - lkb->lkb_highbast = ms->m_bastmode; + queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode)); + lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode); out: unlock_rsb(r); put_rsb(r); @@ -4359,11 +4267,11 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms) { int len, error, ret_nodeid, from_nodeid, our_nodeid; - from_nodeid = ms->m_header.h_nodeid; + from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); our_nodeid = dlm_our_nodeid(); len = receive_extralen(ms); @@ -4379,14 +4287,13 @@ static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) send_lookup_reply(ls, ms, ret_nodeid, error); } -static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) { char name[DLM_RESNAME_MAXLEN+1]; struct dlm_rsb *r; - uint32_t hash, b; int rv, len, dir_nodeid, from_nodeid; - from_nodeid = ms->m_header.h_nodeid; + from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); len = receive_extralen(ms); @@ -4396,90 +4303,99 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) return; } - dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); + dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash)); if (dir_nodeid != dlm_our_nodeid()) { log_error(ls, "receive_remove from %d bad nodeid %d", from_nodeid, dir_nodeid); return; } - /* Look for name on rsbtbl.toss, if it's there, kill it. - If it's on rsbtbl.keep, it's being used, and we should ignore this - message. This is an expected race between the dir node sending a - request to the master node at the same time as the master node sends - a remove to the dir node. The resolution to that race is for the - dir node to ignore the remove message, and the master node to - recreate the master rsb when it gets a request from the dir node for - an rsb it doesn't have. */ + /* + * Look for inactive rsb, if it's there, free it. + * If the rsb is active, it's being used, and we should ignore this + * message. This is an expected race between the dir node sending a + * request to the master node at the same time as the master node sends + * a remove to the dir node. The resolution to that race is for the + * dir node to ignore the remove message, and the master node to + * recreate the master rsb when it gets a request from the dir node for + * an rsb it doesn't have. + */ memset(name, 0, sizeof(name)); memcpy(name, ms->m_extra, len); - hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); + rcu_read_lock(); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (rv) { + rcu_read_unlock(); + /* should not happen */ + log_error(ls, "%s from %d not found %s", __func__, + from_nodeid, name); + return; + } - spin_lock(&ls->ls_rsbtbl[b].lock); + write_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + rcu_read_unlock(); + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* should not happen */ + log_error(ls, "%s from %d got removed during removal %s", + __func__, from_nodeid, name); + return; + } + /* at this stage the rsb can only being freed here */ + rcu_read_unlock(); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (rv) { - /* verify the rsb is on keep list per comment above */ - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (rv) { - /* should not happen */ - log_error(ls, "receive_remove from %d not found %s", - from_nodeid, name); - spin_unlock(&ls->ls_rsbtbl[b].lock); - return; - } + if (!rsb_flag(r, RSB_INACTIVE)) { if (r->res_master_nodeid != from_nodeid) { /* should not happen */ - log_error(ls, "receive_remove keep from %d master %d", + log_error(ls, "receive_remove on active rsb from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } + /* Ignore the remove message, see race comment above. */ + log_debug(ls, "receive_remove from %d master %d first %x %s", from_nodeid, r->res_master_nodeid, r->res_first_lkid, name); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } if (r->res_master_nodeid != from_nodeid) { - log_error(ls, "receive_remove toss from %d master %d", + log_error(ls, "receive_remove inactive from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } - if (kref_put(&r->res_ref, kill_rsb)) { - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - spin_unlock(&ls->ls_rsbtbl[b].lock); - dlm_free_rsb(r); - } else { - log_error(ls, "receive_remove from %d rsb ref error", - from_nodeid); - dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - } + list_del(&r->res_slow_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); + write_unlock_bh(&ls->ls_rsbtbl_lock); + + free_inactive_rsb(r); } -static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) { - do_purge(ls, ms->m_nodeid, ms->m_pid); + do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); } -static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_request_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error, mstype, result; - int from_nodeid = ms->m_header.h_nodeid; + int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4495,7 +4411,8 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); if (error) { log_error(ls, "receive_request_reply %x remote %d %x result %d", - lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result); + lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid), + from_dlm_errno(le32_to_cpu(ms->m_result))); dlm_dump_rsb(r); goto out; } @@ -4509,7 +4426,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } /* this is the value returned from do_request() on the master */ - result = ms->m_result; + result = from_dlm_errno(le32_to_cpu(ms->m_result)); switch (result) { case -EAGAIN: @@ -4522,13 +4439,12 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) case -EINPROGRESS: case 0: /* request was queued or granted on remote master */ - receive_flags_reply(lkb, ms); - lkb->lkb_remid = ms->m_lkid; + receive_flags_reply(lkb, ms, false); + lkb->lkb_remid = le32_to_cpu(ms->m_lkid); if (is_altmode(lkb)) munge_altmode(lkb, ms); if (result) { add_lkb(r, lkb, DLM_LKSTS_WAITING); - add_timeout(lkb); } else { grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); @@ -4570,20 +4486,21 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_id, result); } - if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { + if ((result == 0 || result == -EINPROGRESS) && + test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) { log_debug(ls, "receive_request_reply %x result %d unlock", lkb->lkb_id, result); - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); send_unlock(r, lkb); - } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { + } else if ((result == -EINPROGRESS) && + test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, + &lkb->lkb_iflags)) { log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); send_cancel(r, lkb); } else { - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); + clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); } out: unlock_rsb(r); @@ -4593,34 +4510,33 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms, bool local) { /* this is the value returned from do_convert() on the master */ - switch (ms->m_result) { + switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { case -EAGAIN: /* convert would block (be queued) on remote master */ queue_cast(r, lkb, -EAGAIN); break; case -EDEADLK: - receive_flags_reply(lkb, ms); + receive_flags_reply(lkb, ms, local); revert_lock_pc(r, lkb); queue_cast(r, lkb, -EDEADLK); break; case -EINPROGRESS: /* convert was queued on remote master */ - receive_flags_reply(lkb, ms); + receive_flags_reply(lkb, ms, local); if (is_demoted(lkb)) munge_demoted(lkb); del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); - add_timeout(lkb); break; case 0: /* convert was granted on remote master */ - receive_flags_reply(lkb, ms); + receive_flags_reply(lkb, ms, local); if (is_demoted(lkb)) munge_demoted(lkb); grant_lock_pc(r, lkb, ms); @@ -4629,14 +4545,16 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, default: log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", - lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, - ms->m_result); + lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), + from_dlm_errno(le32_to_cpu(ms->m_result))); dlm_print_rsb(r); dlm_print_lkb(lkb); } } -static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) +static void _receive_convert_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4648,32 +4566,33 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) if (error) goto out; - /* stub reply can happen with waiters_mutex held */ - error = remove_from_waiters_ms(lkb, ms); + error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; - __receive_convert_reply(r, lkb, ms); + __receive_convert_reply(r, lkb, ms, local); out: unlock_rsb(r); put_rsb(r); } -static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; - _receive_convert_reply(lkb, ms); + _receive_convert_reply(lkb, ms, false); dlm_put_lkb(lkb); return 0; } -static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) +static void _receive_unlock_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4685,16 +4604,15 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) if (error) goto out; - /* stub reply can happen with waiters_mutex held */ - error = remove_from_waiters_ms(lkb, ms); + error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; /* this is the value returned from do_unlock() on the master */ - switch (ms->m_result) { + switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { case -DLM_EUNLOCK: - receive_flags_reply(lkb, ms); + receive_flags_reply(lkb, ms, local); remove_lock_pc(r, lkb); queue_cast(r, lkb, -DLM_EUNLOCK); break; @@ -4702,28 +4620,30 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) break; default: log_error(r->res_ls, "receive_unlock_reply %x error %d", - lkb->lkb_id, ms->m_result); + lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result))); } out: unlock_rsb(r); put_rsb(r); } -static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; - _receive_unlock_reply(lkb, ms); + _receive_unlock_reply(lkb, ms, false); dlm_put_lkb(lkb); return 0; } -static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) +static void _receive_cancel_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4735,16 +4655,15 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) if (error) goto out; - /* stub reply can happen with waiters_mutex held */ - error = remove_from_waiters_ms(lkb, ms); + error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; /* this is the value returned from do_cancel() on the master */ - switch (ms->m_result) { + switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { case -DLM_ECANCEL: - receive_flags_reply(lkb, ms); + receive_flags_reply(lkb, ms, local); revert_lock_pc(r, lkb); queue_cast(r, lkb, -DLM_ECANCEL); break; @@ -4752,37 +4671,41 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) break; default: log_error(r->res_ls, "receive_cancel_reply %x error %d", - lkb->lkb_id, ms->m_result); + lkb->lkb_id, + from_dlm_errno(le32_to_cpu(ms->m_result))); } out: unlock_rsb(r); put_rsb(r); } -static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; - _receive_cancel_reply(lkb, ms); + _receive_cancel_reply(lkb, ms, false); dlm_put_lkb(lkb); return 0; } -static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_lookup_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error, ret_nodeid; int do_lookup_list = 0; - error = find_lkb(ls, ms->m_lkid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb); if (error) { - log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid); + log_error(ls, "%s no lkid %x", __func__, + le32_to_cpu(ms->m_lkid)); return; } @@ -4797,7 +4720,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) if (error) goto out; - ret_nodeid = ms->m_nodeid; + ret_nodeid = le32_to_cpu(ms->m_nodeid); /* We sometimes receive a request from the dir node for this rsb before we've received the dir node's loookup_reply for it. @@ -4809,8 +4732,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) /* This should never happen */ log_error(ls, "receive_lookup_reply %x from %d ret %d " "master %d dir %d our %d first %x %s", - lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid, - r->res_master_nodeid, r->res_dir_nodeid, + lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), + ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid, dlm_our_nodeid(), r->res_first_lkid, r->res_name); } @@ -4822,7 +4745,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) } else if (ret_nodeid == -1) { /* the remote node doesn't believe it's the dir node */ log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", - lkb->lkb_id, ms->m_header.h_nodeid); + lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid)); r->res_master_nodeid = 0; r->res_nodeid = -1; lkb->lkb_nodeid = -1; @@ -4834,7 +4757,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) if (is_overlap(lkb)) { log_debug(ls, "receive_lookup_reply %x unlock %x", - lkb->lkb_id, lkb->lkb_flags); + lkb->lkb_id, dlm_iflags_val(lkb)); queue_cast_overlap(r, lkb); unhold_lkb(lkb); /* undoes create_lkb() */ goto out_list; @@ -4851,15 +4774,17 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) dlm_put_lkb(lkb); } -static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, +static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq) { int error = 0, noent = 0; - if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { + if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) { log_limit(ls, "receive %d from non-member %d %x %x %d", - ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, - ms->m_remid, ms->m_result); + le32_to_cpu(ms->m_type), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), + from_dlm_errno(le32_to_cpu(ms->m_result))); return; } @@ -4867,77 +4792,78 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, /* messages sent to a master node */ - case DLM_MSG_REQUEST: + case cpu_to_le32(DLM_MSG_REQUEST): error = receive_request(ls, ms); break; - case DLM_MSG_CONVERT: + case cpu_to_le32(DLM_MSG_CONVERT): error = receive_convert(ls, ms); break; - case DLM_MSG_UNLOCK: + case cpu_to_le32(DLM_MSG_UNLOCK): error = receive_unlock(ls, ms); break; - case DLM_MSG_CANCEL: + case cpu_to_le32(DLM_MSG_CANCEL): noent = 1; error = receive_cancel(ls, ms); break; /* messages sent from a master node (replies to above) */ - case DLM_MSG_REQUEST_REPLY: + case cpu_to_le32(DLM_MSG_REQUEST_REPLY): error = receive_request_reply(ls, ms); break; - case DLM_MSG_CONVERT_REPLY: + case cpu_to_le32(DLM_MSG_CONVERT_REPLY): error = receive_convert_reply(ls, ms); break; - case DLM_MSG_UNLOCK_REPLY: + case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): error = receive_unlock_reply(ls, ms); break; - case DLM_MSG_CANCEL_REPLY: + case cpu_to_le32(DLM_MSG_CANCEL_REPLY): error = receive_cancel_reply(ls, ms); break; /* messages sent from a master node (only two types of async msg) */ - case DLM_MSG_GRANT: + case cpu_to_le32(DLM_MSG_GRANT): noent = 1; error = receive_grant(ls, ms); break; - case DLM_MSG_BAST: + case cpu_to_le32(DLM_MSG_BAST): noent = 1; error = receive_bast(ls, ms); break; /* messages sent to a dir node */ - case DLM_MSG_LOOKUP: + case cpu_to_le32(DLM_MSG_LOOKUP): receive_lookup(ls, ms); break; - case DLM_MSG_REMOVE: + case cpu_to_le32(DLM_MSG_REMOVE): receive_remove(ls, ms); break; /* messages sent from a dir node (remove has no reply) */ - case DLM_MSG_LOOKUP_REPLY: + case cpu_to_le32(DLM_MSG_LOOKUP_REPLY): receive_lookup_reply(ls, ms); break; /* other messages */ - case DLM_MSG_PURGE: + case cpu_to_le32(DLM_MSG_PURGE): receive_purge(ls, ms); break; default: - log_error(ls, "unknown message type %d", ms->m_type); + log_error(ls, "unknown message type %d", + le32_to_cpu(ms->m_type)); } /* @@ -4953,22 +4879,26 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, if (error == -ENOENT && noent) { log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", - ms->m_type, ms->m_remid, ms->m_header.h_nodeid, - ms->m_lkid, saved_seq); + le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), saved_seq); } else if (error == -ENOENT) { log_error(ls, "receive %d no %x remote %d %x saved_seq %u", - ms->m_type, ms->m_remid, ms->m_header.h_nodeid, - ms->m_lkid, saved_seq); + le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), saved_seq); - if (ms->m_type == DLM_MSG_CONVERT) - dlm_dump_rsb_hash(ls, ms->m_hash); + if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT)) + dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash)); } if (error == -EINVAL) { log_error(ls, "receive %d inval from %d lkid %x remid %x " "saved_seq %u", - ms->m_type, ms->m_header.h_nodeid, - ms->m_lkid, ms->m_remid, saved_seq); + le32_to_cpu(ms->m_type), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), + saved_seq); } } @@ -4980,30 +4910,42 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, requestqueue, to processing all the saved messages, to processing new messages as they arrive. */ -static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, +static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms, int nodeid) { - if (dlm_locking_stopped(ls)) { +try_again: + read_lock_bh(&ls->ls_requestqueue_lock); + if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { /* If we were a member of this lockspace, left, and rejoined, other nodes may still be sending us messages from the lockspace generation before we left. */ - if (!ls->ls_generation) { + if (WARN_ON_ONCE(!ls->ls_generation)) { + read_unlock_bh(&ls->ls_requestqueue_lock); log_limit(ls, "receive %d from %d ignore old gen", - ms->m_type, nodeid); + le32_to_cpu(ms->m_type), nodeid); return; } + read_unlock_bh(&ls->ls_requestqueue_lock); + write_lock_bh(&ls->ls_requestqueue_lock); + /* recheck because we hold writelock now */ + if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { + write_unlock_bh(&ls->ls_requestqueue_lock); + goto try_again; + } + dlm_add_requestqueue(ls, nodeid, ms); + write_unlock_bh(&ls->ls_requestqueue_lock); } else { - dlm_wait_requestqueue(ls); _receive_message(ls, ms, 0); + read_unlock_bh(&ls->ls_requestqueue_lock); } } /* This is called by dlm_recoverd to process messages that were saved on the requestqueue. */ -void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, +void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq) { _receive_message(ls, ms, saved_seq); @@ -5014,38 +4956,38 @@ void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, standard locking activity) or an RCOM (recovery message sent as part of lockspace recovery). */ -void dlm_receive_buffer(union dlm_packet *p, int nodeid) +void dlm_receive_buffer(const union dlm_packet *p, int nodeid) { - struct dlm_header *hd = &p->header; + const struct dlm_header *hd = &p->header; struct dlm_ls *ls; int type = 0; switch (hd->h_cmd) { case DLM_MSG: - dlm_message_in(&p->message); - type = p->message.m_type; + type = le32_to_cpu(p->message.m_type); break; case DLM_RCOM: - dlm_rcom_in(&p->rcom); - type = p->rcom.rc_type; + type = le32_to_cpu(p->rcom.rc_type); break; default: log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); return; } - if (hd->h_nodeid != nodeid) { + if (le32_to_cpu(hd->h_nodeid) != nodeid) { log_print("invalid h_nodeid %d from %d lockspace %x", - hd->h_nodeid, nodeid, hd->h_lockspace); + le32_to_cpu(hd->h_nodeid), nodeid, + le32_to_cpu(hd->u.h_lockspace)); return; } - ls = dlm_find_lockspace_global(hd->h_lockspace); + ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace)); if (!ls) { if (dlm_config.ci_log_debug) { printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " "%u from %d cmd %d type %d\n", - hd->h_lockspace, nodeid, hd->h_cmd, type); + le32_to_cpu(hd->u.h_lockspace), nodeid, + hd->h_cmd, type); } if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) @@ -5056,35 +4998,40 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to be inactive (in this ls) before transitioning to recovery mode */ - down_read(&ls->ls_recv_active); + read_lock_bh(&ls->ls_recv_active); if (hd->h_cmd == DLM_MSG) dlm_receive_message(ls, &p->message, nodeid); - else + else if (hd->h_cmd == DLM_RCOM) dlm_receive_rcom(ls, &p->rcom, nodeid); - up_read(&ls->ls_recv_active); + else + log_error(ls, "invalid h_cmd %d from %d lockspace %x", + hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace)); + read_unlock_bh(&ls->ls_recv_active); dlm_put_lockspace(ls); } static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms_stub) + struct dlm_message *ms_local) { if (middle_conversion(lkb)) { + log_rinfo(ls, "%s %x middle convert in progress", __func__, + lkb->lkb_id); + + /* We sent this lock to the new master. The new master will + * tell us when it's granted. We no longer need a reply, so + * use a fake reply to put the lkb into the right state. + */ hold_lkb(lkb); - memset(ms_stub, 0, sizeof(struct dlm_message)); - ms_stub->m_flags = DLM_IFL_STUB_MS; - ms_stub->m_type = DLM_MSG_CONVERT_REPLY; - ms_stub->m_result = -EINPROGRESS; - ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; - _receive_convert_reply(lkb, ms_stub); - - /* Same special case as in receive_rcom_lock_args() */ - lkb->lkb_grmode = DLM_LOCK_IV; - rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); + memset(ms_local, 0, sizeof(struct dlm_message)); + ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); + ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); + ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); + _receive_convert_reply(lkb, ms_local, true); unhold_lkb(lkb); } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { - lkb->lkb_flags |= DLM_IFL_RESEND; + set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); } /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down @@ -5115,17 +5062,13 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, void dlm_recover_waiters_pre(struct dlm_ls *ls) { struct dlm_lkb *lkb, *safe; - struct dlm_message *ms_stub; - int wait_type, stub_unlock_result, stub_cancel_result; + struct dlm_message *ms_local; + int wait_type, local_unlock_result, local_cancel_result; int dir_nodeid; - ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL); - if (!ms_stub) { - log_error(ls, "dlm_recover_waiters_pre no mem"); + ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL); + if (!ms_local) return; - } - - mutex_lock(&ls->ls_waiters_mutex); list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { @@ -5150,7 +5093,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) resent after recovery is done */ if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { - lkb->lkb_flags |= DLM_IFL_RESEND; + set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); continue; } @@ -5158,8 +5101,8 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) continue; wait_type = lkb->lkb_wait_type; - stub_unlock_result = -DLM_EUNLOCK; - stub_cancel_result = -DLM_ECANCEL; + local_unlock_result = -DLM_EUNLOCK; + local_cancel_result = -DLM_ECANCEL; /* Main reply may have been received leaving a zero wait_type, but a reply for the overlapping op may not have been @@ -5170,48 +5113,46 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) if (is_overlap_cancel(lkb)) { wait_type = DLM_MSG_CANCEL; if (lkb->lkb_grmode == DLM_LOCK_IV) - stub_cancel_result = 0; + local_cancel_result = 0; } if (is_overlap_unlock(lkb)) { wait_type = DLM_MSG_UNLOCK; if (lkb->lkb_grmode == DLM_LOCK_IV) - stub_unlock_result = -ENOENT; + local_unlock_result = -ENOENT; } log_debug(ls, "rwpre overlap %x %x %d %d %d", - lkb->lkb_id, lkb->lkb_flags, wait_type, - stub_cancel_result, stub_unlock_result); + lkb->lkb_id, dlm_iflags_val(lkb), wait_type, + local_cancel_result, local_unlock_result); } switch (wait_type) { case DLM_MSG_REQUEST: - lkb->lkb_flags |= DLM_IFL_RESEND; + set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); break; case DLM_MSG_CONVERT: - recover_convert_waiter(ls, lkb, ms_stub); + recover_convert_waiter(ls, lkb, ms_local); break; case DLM_MSG_UNLOCK: hold_lkb(lkb); - memset(ms_stub, 0, sizeof(struct dlm_message)); - ms_stub->m_flags = DLM_IFL_STUB_MS; - ms_stub->m_type = DLM_MSG_UNLOCK_REPLY; - ms_stub->m_result = stub_unlock_result; - ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; - _receive_unlock_reply(lkb, ms_stub); + memset(ms_local, 0, sizeof(struct dlm_message)); + ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY); + ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result)); + ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); + _receive_unlock_reply(lkb, ms_local, true); dlm_put_lkb(lkb); break; case DLM_MSG_CANCEL: hold_lkb(lkb); - memset(ms_stub, 0, sizeof(struct dlm_message)); - ms_stub->m_flags = DLM_IFL_STUB_MS; - ms_stub->m_type = DLM_MSG_CANCEL_REPLY; - ms_stub->m_result = stub_cancel_result; - ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; - _receive_cancel_reply(lkb, ms_stub); + memset(ms_local, 0, sizeof(struct dlm_message)); + ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY); + ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result)); + ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); + _receive_cancel_reply(lkb, ms_local, true); dlm_put_lkb(lkb); break; @@ -5221,45 +5162,52 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) } schedule(); } - mutex_unlock(&ls->ls_waiters_mutex); - kfree(ms_stub); + kfree(ms_local); } static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) { - struct dlm_lkb *lkb; - int found = 0; + struct dlm_lkb *lkb = NULL, *iter; - mutex_lock(&ls->ls_waiters_mutex); - list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { - if (lkb->lkb_flags & DLM_IFL_RESEND) { - hold_lkb(lkb); - found = 1; + spin_lock_bh(&ls->ls_waiters_lock); + list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { + if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) { + hold_lkb(iter); + lkb = iter; break; } } - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); - if (!found) - lkb = NULL; return lkb; } -/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the - master or dir-node for r. Processing the lkb may result in it being placed - back on waiters. */ - -/* We do this after normal locking has been enabled and any saved messages - (in requestqueue) have been processed. We should be confident that at - this point we won't get or process a reply to any of these waiting - operations. But, new ops may be coming in on the rsbs/locks here from - userspace or remotely. */ - -/* there may have been an overlap unlock/cancel prior to recovery or after - recovery. if before, the lkb may still have a pos wait_count; if after, the - overlap flag would just have been set and nothing new sent. we can be - confident here than any replies to either the initial op or overlap ops - prior to recovery have been received. */ +/* + * Forced state reset for locks that were in the middle of remote operations + * when recovery happened (i.e. lkbs that were on the waiters list, waiting + * for a reply from a remote operation.) The lkbs remaining on the waiters + * list need to be reevaluated; some may need resending to a different node + * than previously, and some may now need local handling rather than remote. + * + * First, the lkb state for the voided remote operation is forcibly reset, + * equivalent to what remove_from_waiters() would normally do: + * . lkb removed from ls_waiters list + * . lkb wait_type cleared + * . lkb waiters_count cleared + * . lkb ref count decremented for each waiters_count (almost always 1, + * but possibly 2 in case of cancel/unlock overlapping, which means + * two remote replies were being expected for the lkb.) + * + * Second, the lkb is reprocessed like an original operation would be, + * by passing it to _request_lock or _convert_lock, which will either + * process the lkb operation locally, or send it to a remote node again + * and put the lkb back onto the waiters list. + * + * When reprocessing the lkb, we may find that it's flagged for an overlapping + * force-unlock or cancel, either from before recovery began, or after recovery + * finished. If this is the case, the unlock/cancel is done directly, and the + * original operation is not initiated again (no _request_lock/_convert_lock.) + */ int dlm_recover_waiters_post(struct dlm_ls *ls) { @@ -5274,6 +5222,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) break; } + /* + * Find an lkb from the waiters list that's been affected by + * recovery node changes, and needs to be reprocessed. Does + * hold_lkb(), adding a refcount. + */ lkb = find_resend_waiter(ls); if (!lkb) break; @@ -5282,9 +5235,16 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) hold_rsb(r); lock_rsb(r); + /* + * If the lkb has been flagged for a force unlock or cancel, + * then the reprocessing below will be replaced by just doing + * the unlock/cancel directly. + */ mstype = lkb->lkb_wait_type; - oc = is_overlap_cancel(lkb); - ou = is_overlap_unlock(lkb); + oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, + &lkb->lkb_iflags); + ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, + &lkb->lkb_iflags); err = 0; log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " @@ -5293,19 +5253,39 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, dlm_dir_nodeid(r), oc, ou); - /* At this point we assume that we won't get a reply to any - previous op or overlap op on this lock. First, do a big - remove_from_waiters() for all previous ops. */ + /* + * No reply to the pre-recovery operation will now be received, + * so a forced equivalent of remove_from_waiters() is needed to + * reset the waiters state that was in place before recovery. + */ + + clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); - lkb->lkb_flags &= ~DLM_IFL_RESEND; - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; - lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + /* Forcibly clear wait_type */ lkb->lkb_wait_type = 0; - lkb->lkb_wait_count = 0; - mutex_lock(&ls->ls_waiters_mutex); + + /* + * Forcibly reset wait_count and associated refcount. The + * wait_count will almost always be 1, but in case of an + * overlapping unlock/cancel it could be 2: see where + * add_to_waiters() finds the lkb is already on the waiters + * list and does lkb_wait_count++; hold_lkb(). + */ + while (lkb->lkb_wait_count) { + lkb->lkb_wait_count--; + unhold_lkb(lkb); + } + + /* Forcibly remove from waiters list */ + spin_lock_bh(&ls->ls_waiters_lock); list_del_init(&lkb->lkb_wait_reply); - mutex_unlock(&ls->ls_waiters_mutex); - unhold_lkb(lkb); /* for waiters list */ + spin_unlock_bh(&ls->ls_waiters_lock); + + /* + * The lkb is now clear of all prior waiters state and can be + * processed locally, or sent to remote node again, or directly + * cancelled/unlocked. + */ if (oc || ou) { /* do an unlock or cancel instead of resending */ @@ -5332,7 +5312,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: _request_lock(r, lkb); - if (is_master(r)) + if (r->res_nodeid != -1 && is_master(r)) confirm_master(r, 0); break; case DLM_MSG_CONVERT: @@ -5424,7 +5404,7 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, /* Get rid of locks held by nodes that are gone. */ -void dlm_recover_purge(struct dlm_ls *ls) +void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list) { struct dlm_rsb *r; struct dlm_member *memb; @@ -5443,11 +5423,9 @@ void dlm_recover_purge(struct dlm_ls *ls) if (!nodes_count) return; - down_write(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { - hold_rsb(r); + list_for_each_entry(r, root_list, res_root_list) { lock_rsb(r); - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { purge_dead_list(ls, r, &r->res_grantqueue, nodeid_gone, &lkb_count); purge_dead_list(ls, r, &r->res_convertqueue, @@ -5456,25 +5434,21 @@ void dlm_recover_purge(struct dlm_ls *ls) nodeid_gone, &lkb_count); } unlock_rsb(r); - unhold_rsb(r); + cond_resched(); } - up_write(&ls->ls_root_sem); if (lkb_count) log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", lkb_count, nodes_count); } -static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) +static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) { - struct rb_node *n; struct dlm_rsb *r; - spin_lock(&ls->ls_rsbtbl[bucket].lock); - for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; if (!is_master(r)) { @@ -5482,10 +5456,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) continue; } hold_rsb(r); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return r; } - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return NULL; } @@ -5509,19 +5483,15 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) void dlm_recover_grant(struct dlm_ls *ls) { struct dlm_rsb *r; - int bucket = 0; unsigned int count = 0; unsigned int rsb_count = 0; unsigned int lkb_count = 0; while (1) { - r = find_grant_rsb(ls, bucket); - if (!r) { - if (bucket == ls->ls_rsbtbl_size - 1) - break; - bucket++; - continue; - } + r = find_grant_rsb(ls); + if (!r) + break; + rsb_count++; count = 0; lock_rsb(r); @@ -5571,16 +5541,16 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, /* needs at least dlm_rcom + rcom_lock */ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_rsb *r, struct dlm_rcom *rc) + struct dlm_rsb *r, const struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; - lkb->lkb_nodeid = rc->rc_header.h_nodeid; + lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); - lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF; - lkb->lkb_flags |= DLM_IFL_MSTCPY; + dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags)); + set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); lkb->lkb_rqmode = rl->rl_rqmode; lkb->lkb_grmode = rl->rl_grmode; @@ -5590,8 +5560,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { - int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - - sizeof(struct rcom_lock); + int lvblen = le16_to_cpu(rc->rc_header.h_length) - + sizeof(struct dlm_rcom) - sizeof(struct rcom_lock); if (lvblen > ls->ls_lvblen) return -EINVAL; lkb->lkb_lvbptr = dlm_allocate_lvb(ls); @@ -5604,10 +5574,11 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, The real granted mode of these converting locks cannot be determined until all locks have been rebuilt on the rsb (recover_conversion) */ - if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && - middle_conversion(lkb)) { - rl->rl_status = DLM_LKSTS_CONVERT; - lkb->lkb_grmode = DLM_LOCK_IV; + if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) { + /* We may need to adjust grmode depending on other granted locks. */ + log_rinfo(ls, "%s %x middle convert gr %d rq %d remote %d %x", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid); rsb_set_flag(r, RSB_RECOVER_CONVERT); } @@ -5621,15 +5592,19 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, back the rcom_lock struct we got but with the remid field filled in. */ /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, + __le32 *rl_remid, __le32 *rl_result) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; struct dlm_lkb *lkb; uint32_t remid = 0; - int from_nodeid = rc->rc_header.h_nodeid; + int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); int error; + /* init rl_remid with rcom lock rl_remid */ + *rl_remid = rl->rl_remid; + if (rl->rl_parent_lkid) { error = -EOPNOTSUPP; goto out; @@ -5677,7 +5652,6 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) attach_lkb(r, lkb); add_lkb(r, lkb, rl->rl_status); - error = 0; ls->ls_recover_locks_in++; if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) @@ -5686,7 +5660,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) out_remid: /* this is the new value returned to the lock holder for saving in its process-copy lkb */ - rl->rl_remid = cpu_to_le32(lkb->lkb_id); + *rl_remid = cpu_to_le32(lkb->lkb_id); lkb->lkb_recover_seq = ls->ls_recover_seq; @@ -5697,12 +5671,13 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) if (error && error != -EEXIST) log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", from_nodeid, remid, error); - rl->rl_result = cpu_to_le32(error); + *rl_result = cpu_to_le32(error); return error; } /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, + uint64_t seq) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; @@ -5717,7 +5692,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) error = find_lkb(ls, lkid, &lkb); if (error) { log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); return error; } @@ -5727,7 +5703,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) if (!is_process_copy(lkb)) { log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); dlm_dump_rsb(r); unlock_rsb(r); put_rsb(r); @@ -5742,9 +5719,10 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) a barrier between recover_masters and recover_locks. */ log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); - dlm_send_rcom_lock(r, lkb); + dlm_send_rcom_lock(r, lkb, seq); goto out; case -EEXIST: case 0: @@ -5752,7 +5730,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) break; default: log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); } /* an ack for dlm_recover_locks() which waits for replies from @@ -5767,11 +5746,11 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) } int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, - int mode, uint32_t flags, void *name, unsigned int namelen, - unsigned long timeout_cs) + int mode, uint32_t flags, void *name, unsigned int namelen) { struct dlm_lkb *lkb; struct dlm_args args; + bool do_put = true; int error; dlm_lock_recovery(ls); @@ -5782,29 +5761,29 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, goto out; } + trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); + if (flags & DLM_LKF_VALBLK) { ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); if (!ua->lksb.sb_lvbptr) { kfree(ua); - __put_lkb(ls, lkb); error = -ENOMEM; - goto out; + goto out_put; } } - - /* After ua is attached to lkb it will be freed by dlm_free_lkb(). - When DLM_IFL_USER is set, the dlm knows that this is a userspace - lock and that lkb_astparam is the dlm_user_args structure. */ - - error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, - fake_astfn, ua, fake_bastfn, &args); - lkb->lkb_flags |= DLM_IFL_USER; - + error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua, + fake_bastfn, &args); if (error) { - __put_lkb(ls, lkb); - goto out; + kfree(ua->lksb.sb_lvbptr); + ua->lksb.sb_lvbptr = NULL; + kfree(ua); + goto out_put; } + /* After ua is attached to lkb it will be freed by dlm_free_lkb(). + When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace + lock and that lkb_astparam is the dlm_user_args structure. */ + set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags); error = request_lock(ls, lkb, name, namelen, &args); switch (error) { @@ -5815,25 +5794,28 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, break; case -EAGAIN: error = 0; - /* fall through */ + fallthrough; default: - __put_lkb(ls, lkb); - goto out; + goto out_put; } /* add this new lkb to the per-process list of locks */ - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); hold_lkb(lkb); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); + do_put = false; + out_put: + trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false); + if (do_put) + __put_lkb(ls, lkb); out: dlm_unlock_recovery(ls); return error; } int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, - int mode, uint32_t flags, uint32_t lkid, char *lvb_in, - unsigned long timeout_cs) + int mode, uint32_t flags, uint32_t lkid, char *lvb_in) { struct dlm_lkb *lkb; struct dlm_args args; @@ -5846,6 +5828,8 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out; + trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags); + /* user can change the params on its lock when it converts it, or add an lvb that didn't exist before */ @@ -5868,8 +5852,8 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, ua->bastaddr = ua_tmp->bastaddr; ua->user_lksb = ua_tmp->user_lksb; - error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, - fake_astfn, ua, fake_bastfn, &args); + error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua, + fake_bastfn, &args); if (error) goto out_put; @@ -5878,6 +5862,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) error = 0; out_put: + trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false); dlm_put_lkb(lkb); out: dlm_unlock_recovery(ls); @@ -5893,39 +5878,38 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, int mode, uint32_t flags, void *name, unsigned int namelen, - unsigned long timeout_cs, uint32_t *lkid) + uint32_t *lkid) { - struct dlm_lkb *lkb; + struct dlm_lkb *lkb = NULL, *iter; struct dlm_user_args *ua; int found_other_mode = 0; - int found = 0; int rv = 0; - mutex_lock(&ls->ls_orphans_mutex); - list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) { - if (lkb->lkb_resource->res_length != namelen) + spin_lock_bh(&ls->ls_orphans_lock); + list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { + if (iter->lkb_resource->res_length != namelen) continue; - if (memcmp(lkb->lkb_resource->res_name, name, namelen)) + if (memcmp(iter->lkb_resource->res_name, name, namelen)) continue; - if (lkb->lkb_grmode != mode) { + if (iter->lkb_grmode != mode) { found_other_mode = 1; continue; } - found = 1; - list_del_init(&lkb->lkb_ownqueue); - lkb->lkb_flags &= ~DLM_IFL_ORPHAN; - *lkid = lkb->lkb_id; + lkb = iter; + list_del_init(&iter->lkb_ownqueue); + clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags); + *lkid = iter->lkb_id; break; } - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); - if (!found && found_other_mode) { + if (!lkb && found_other_mode) { rv = -EAGAIN; goto out; } - if (!found) { + if (!lkb) { rv = -ENOENT; goto out; } @@ -5949,9 +5933,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, * for the proc locks list. */ - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); out: kfree(ua_tmp); return rv; @@ -5971,6 +5955,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out; + trace_dlm_unlock_start(ls, lkb, flags); + ua = lkb->lkb_ua; if (lvb_in && ua->lksb.sb_lvbptr) @@ -5993,12 +5979,13 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out_put; - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); /* dlm_user_add_cb() may have already taken lkb off the proc list */ if (!list_empty(&lkb->lkb_ownqueue)) list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); out_put: + trace_dlm_unlock_end(ls, lkb, flags, error); dlm_put_lkb(lkb); out: dlm_unlock_recovery(ls); @@ -6020,6 +6007,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out; + trace_dlm_unlock_start(ls, lkb, flags); + ua = lkb->lkb_ua; if (ua_tmp->castparam) ua->castparam = ua_tmp->castparam; @@ -6037,6 +6026,7 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error == -EBUSY) error = 0; out_put: + trace_dlm_unlock_end(ls, lkb, flags, error); dlm_put_lkb(lkb); out: dlm_unlock_recovery(ls); @@ -6058,6 +6048,8 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) if (error) goto out; + trace_dlm_unlock_start(ls, lkb, flags); + ua = lkb->lkb_ua; error = set_unlock_args(flags, ua, &args); @@ -6073,7 +6065,7 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) error = validate_unlock_args(lkb, &args); if (error) goto out_r; - lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; + set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags); error = _cancel_lock(r, lkb); out_r: @@ -6086,6 +6078,7 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) if (error == -EBUSY) error = 0; out_put: + trace_dlm_unlock_end(ls, lkb, flags, error); dlm_put_lkb(lkb); out: dlm_unlock_recovery(ls); @@ -6101,9 +6094,9 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) int error; hold_lkb(lkb); /* reference for the ls_orphans list */ - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); set_unlock_args(0, lkb->lkb_ua, &args); @@ -6141,7 +6134,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, { struct dlm_lkb *lkb = NULL; - mutex_lock(&ls->ls_clear_proc_locks); + spin_lock_bh(&ls->ls_clear_proc_locks); if (list_empty(&proc->locks)) goto out; @@ -6149,11 +6142,11 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, list_del_init(&lkb->lkb_ownqueue); if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) - lkb->lkb_flags |= DLM_IFL_ORPHAN; + set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags); else - lkb->lkb_flags |= DLM_IFL_DEAD; + set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); out: - mutex_unlock(&ls->ls_clear_proc_locks); + spin_unlock_bh(&ls->ls_clear_proc_locks); return lkb; } @@ -6169,6 +6162,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; dlm_lock_recovery(ls); @@ -6177,7 +6171,6 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) lkb = del_proc_lock(ls, proc); if (!lkb) break; - del_timeout(lkb); if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) orphan_proc_lock(ls, lkb); else @@ -6190,64 +6183,61 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } - mutex_lock(&ls->ls_clear_proc_locks); + spin_lock_bh(&ls->ls_clear_proc_locks); /* in-progress unlocks */ list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { list_del_init(&lkb->lkb_ownqueue); - lkb->lkb_flags |= DLM_IFL_DEAD; + set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); dlm_put_lkb(lkb); } - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - memset(&lkb->lkb_callbacks, 0, - sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + dlm_free_cb(cb); } - mutex_unlock(&ls->ls_clear_proc_locks); + spin_unlock_bh(&ls->ls_clear_proc_locks); dlm_unlock_recovery(ls); } static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; while (1) { lkb = NULL; - spin_lock(&proc->locks_spin); + spin_lock_bh(&proc->locks_spin); if (!list_empty(&proc->locks)) { lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); list_del_init(&lkb->lkb_ownqueue); } - spin_unlock(&proc->locks_spin); + spin_unlock_bh(&proc->locks_spin); if (!lkb) break; - lkb->lkb_flags |= DLM_IFL_DEAD; + set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); unlock_proc_lock(ls, lkb); dlm_put_lkb(lkb); /* ref from proc->locks list */ } - spin_lock(&proc->locks_spin); + spin_lock_bh(&proc->locks_spin); list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { list_del_init(&lkb->lkb_ownqueue); - lkb->lkb_flags |= DLM_IFL_DEAD; + set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); dlm_put_lkb(lkb); } - spin_unlock(&proc->locks_spin); + spin_unlock_bh(&proc->locks_spin); - spin_lock(&proc->asts_spin); - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - memset(&lkb->lkb_callbacks, 0, - sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + spin_lock_bh(&proc->asts_spin); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + dlm_free_cb(cb); } - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); } /* pid of 0 means purge all orphans */ @@ -6256,7 +6246,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid) { struct dlm_lkb *lkb, *safe; - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { if (pid && lkb->lkb_ownpid != pid) continue; @@ -6264,7 +6254,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid) list_del_init(&lkb->lkb_ownqueue); dlm_put_lkb(lkb); } - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); } static int send_purge(struct dlm_ls *ls, int nodeid, int pid) @@ -6277,10 +6267,10 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid) DLM_MSG_PURGE, &ms, &mh); if (error) return error; - ms->m_nodeid = nodeid; - ms->m_pid = pid; + ms->m_nodeid = cpu_to_le32(nodeid); + ms->m_pid = cpu_to_le32(pid); - return send_message(mh, ms); + return send_message(mh, ms, NULL, 0); } int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, @@ -6301,3 +6291,64 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, return error; } +/* debug functionality */ +int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, + int lkb_nodeid, unsigned int lkb_dflags, int lkb_status) +{ + struct dlm_lksb *lksb; + struct dlm_lkb *lkb; + struct dlm_rsb *r; + int error; + + /* we currently can't set a valid user lock */ + if (lkb_dflags & BIT(DLM_DFL_USER_BIT)) + return -EOPNOTSUPP; + + lksb = kzalloc(sizeof(*lksb), GFP_NOFS); + if (!lksb) + return -ENOMEM; + + error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1); + if (error) { + kfree(lksb); + return error; + } + + dlm_set_dflags_val(lkb, lkb_dflags); + lkb->lkb_nodeid = lkb_nodeid; + lkb->lkb_lksb = lksb; + /* user specific pointer, just don't have it NULL for kernel locks */ + if (~lkb_dflags & BIT(DLM_DFL_USER_BIT)) + lkb->lkb_astparam = (void *)0xDEADBEEF; + + error = find_rsb(ls, name, len, 0, R_REQUEST, &r); + if (error) { + kfree(lksb); + __put_lkb(ls, lkb); + return error; + } + + lock_rsb(r); + attach_lkb(r, lkb); + add_lkb(r, lkb, lkb_status); + unlock_rsb(r); + put_rsb(r); + + return 0; +} + +int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, + int mstype, int to_nodeid) +{ + struct dlm_lkb *lkb; + int error; + + error = find_lkb(ls, lkb_id, &lkb); + if (error) + return error; + + add_to_waiters(lkb, mstype, to_nodeid); + dlm_put_lkb(lkb); + return 0; +} + |
