diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/Kconfig | 1 | ||||
-rw-r--r-- | fs/dlm/ast.c | 284 | ||||
-rw-r--r-- | fs/dlm/ast.h | 18 | ||||
-rw-r--r-- | fs/dlm/config.c | 187 | ||||
-rw-r--r-- | fs/dlm/config.h | 30 | ||||
-rw-r--r-- | fs/dlm/debug_fs.c | 328 | ||||
-rw-r--r-- | fs/dlm/dir.c | 157 | ||||
-rw-r--r-- | fs/dlm/dir.h | 3 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 160 | ||||
-rw-r--r-- | fs/dlm/lock.c | 1401 | ||||
-rw-r--r-- | fs/dlm/lock.h | 15 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 380 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c | 128 | ||||
-rw-r--r-- | fs/dlm/lowcomms.h | 7 | ||||
-rw-r--r-- | fs/dlm/main.c | 12 | ||||
-rw-r--r-- | fs/dlm/member.c | 29 | ||||
-rw-r--r-- | fs/dlm/memory.c | 48 | ||||
-rw-r--r-- | fs/dlm/memory.h | 8 | ||||
-rw-r--r-- | fs/dlm/midcomms.c | 71 | ||||
-rw-r--r-- | fs/dlm/midcomms.h | 5 | ||||
-rw-r--r-- | fs/dlm/rcom.c | 33 | ||||
-rw-r--r-- | fs/dlm/recover.c | 237 | ||||
-rw-r--r-- | fs/dlm/recover.h | 12 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 146 | ||||
-rw-r--r-- | fs/dlm/requestqueue.c | 43 | ||||
-rw-r--r-- | fs/dlm/user.c | 153 |
26 files changed, 1906 insertions, 1990 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index f82a4952769d..b46165df5a91 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -3,7 +3,6 @@ menuconfig DLM tristate "Distributed Lock Manager (DLM)" depends on INET depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n) - select IP_SCTP help A general purpose distributed lock manager for kernel or userspace applications. diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 1f2f70a1b824..0fe8d80ce5e8 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -12,48 +12,68 @@ #include <trace/events/dlm.h> #include "dlm_internal.h" +#include "lvb_table.h" #include "memory.h" #include "lock.h" #include "user.h" #include "ast.h" -void dlm_release_callback(struct kref *ref) +static void dlm_run_callback(uint32_t ls_id, uint32_t lkb_id, int8_t mode, + uint32_t flags, uint8_t sb_flags, int sb_status, + struct dlm_lksb *lksb, + void (*astfn)(void *astparam), + void (*bastfn)(void *astparam, int mode), + void *astparam, const char *res_name, + size_t res_length) { - struct dlm_callback *cb = container_of(ref, struct dlm_callback, ref); + if (flags & DLM_CB_BAST) { + trace_dlm_bast(ls_id, lkb_id, mode, res_name, res_length); + bastfn(astparam, mode); + } else if (flags & DLM_CB_CAST) { + trace_dlm_ast(ls_id, lkb_id, sb_flags, sb_status, res_name, + res_length); + lksb->sb_status = sb_status; + lksb->sb_flags = sb_flags; + astfn(astparam); + } +} +static void dlm_do_callback(struct dlm_callback *cb) +{ + dlm_run_callback(cb->ls_id, cb->lkb_id, cb->mode, cb->flags, + cb->sb_flags, cb->sb_status, cb->lkb_lksb, + cb->astfn, cb->bastfn, cb->astparam, + cb->res_name, cb->res_length); dlm_free_cb(cb); } -void dlm_callback_set_last_ptr(struct dlm_callback **from, - struct dlm_callback *to) +static void dlm_callback_work(struct work_struct *work) { - if (*from) - kref_put(&(*from)->ref, dlm_release_callback); - - if (to) - kref_get(&to->ref); + struct dlm_callback *cb = container_of(work, struct dlm_callback, work); - *from = to; + dlm_do_callback(cb); } -int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags) +bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, int *copy_lvb) { - struct dlm_ls *ls = lkb->lkb_resource->res_ls; - int rv = DLM_ENQUEUE_CALLBACK_SUCCESS; - struct dlm_callback *cb; + struct dlm_rsb *rsb = lkb->lkb_resource; + struct dlm_ls *ls = rsb->res_ls; int prev_mode; + if (copy_lvb) + *copy_lvb = 0; + if (flags & DLM_CB_BAST) { /* if cb is a bast, it should be skipped if the blocking mode is * compatible with the last granted mode */ - if (lkb->lkb_last_cast) { - if (dlm_modes_compat(mode, lkb->lkb_last_cast->mode)) { + if (lkb->lkb_last_cast_cb_mode != -1) { + if (dlm_modes_compat(mode, lkb->lkb_last_cast_cb_mode)) { log_debug(ls, "skip %x bast mode %d for cast mode %d", lkb->lkb_id, mode, - lkb->lkb_last_cast->mode); - goto out; + lkb->lkb_last_cast_cb_mode); + return true; } } @@ -63,152 +83,130 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, * is a bast for the same mode or a more restrictive mode. * (the addional > PR check is needed for PR/CW inversion) */ - if (lkb->lkb_last_cb && lkb->lkb_last_cb->flags & DLM_CB_BAST) { - prev_mode = lkb->lkb_last_cb->mode; + if (lkb->lkb_last_cb_mode != -1 && + lkb->lkb_last_cb_flags & DLM_CB_BAST) { + prev_mode = lkb->lkb_last_cb_mode; if ((prev_mode == mode) || (prev_mode > mode && prev_mode > DLM_LOCK_PR)) { log_debug(ls, "skip %x add bast mode %d for bast mode %d", lkb->lkb_id, mode, prev_mode); - goto out; + return true; } } - } - - cb = dlm_allocate_cb(); - if (!cb) { - rv = DLM_ENQUEUE_CALLBACK_FAILURE; - goto out; - } - cb->flags = flags; - cb->mode = mode; - cb->sb_status = status; - cb->sb_flags = (sbflags & 0x000000FF); - kref_init(&cb->ref); - if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags)) - rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED; + lkb->lkb_last_bast_time = ktime_get(); + lkb->lkb_last_bast_cb_mode = mode; + } else if (flags & DLM_CB_CAST) { + if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { + prev_mode = lkb->lkb_last_cast_cb_mode; - list_add_tail(&cb->list, &lkb->lkb_callbacks); + if (!status && lkb->lkb_lksb->sb_lvbptr && + dlm_lvb_operations[prev_mode + 1][mode + 1]) { + if (copy_lvb) + *copy_lvb = 1; + } + } - if (flags & DLM_CB_CAST) - dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb); + lkb->lkb_last_cast_cb_mode = mode; + lkb->lkb_last_cast_time = ktime_get(); + } - dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb); + lkb->lkb_last_cb_mode = mode; + lkb->lkb_last_cb_flags = flags; - out: - return rv; + return false; } -int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb) +int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb) { - /* oldest undelivered cb is callbacks first entry */ - *cb = list_first_entry_or_null(&lkb->lkb_callbacks, - struct dlm_callback, list); - if (!*cb) - return DLM_DEQUEUE_CALLBACK_EMPTY; - - /* remove it from callbacks so shift others down */ - list_del(&(*cb)->list); - if (list_empty(&lkb->lkb_callbacks)) - return DLM_DEQUEUE_CALLBACK_LAST; - - return DLM_DEQUEUE_CALLBACK_SUCCESS; + struct dlm_rsb *rsb = lkb->lkb_resource; + struct dlm_ls *ls = rsb->res_ls; + + *cb = dlm_allocate_cb(); + if (WARN_ON_ONCE(!*cb)) + return -ENOMEM; + + /* for tracing */ + (*cb)->lkb_id = lkb->lkb_id; + (*cb)->ls_id = ls->ls_global_id; + memcpy((*cb)->res_name, rsb->res_name, rsb->res_length); + (*cb)->res_length = rsb->res_length; + + (*cb)->flags = flags; + (*cb)->mode = mode; + (*cb)->sb_status = status; + (*cb)->sb_flags = (sbflags & 0x000000FF); + (*cb)->lkb_lksb = lkb->lkb_lksb; + + return 0; } -void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, - uint32_t sbflags) +static int dlm_get_queue_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb) { - struct dlm_ls *ls = lkb->lkb_resource->res_ls; int rv; - if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { - dlm_user_add_ast(lkb, flags, mode, status, sbflags); - return; - } + rv = dlm_get_cb(lkb, flags, mode, status, sbflags, cb); + if (rv) + return rv; - spin_lock(&lkb->lkb_cb_lock); - rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags); - switch (rv) { - case DLM_ENQUEUE_CALLBACK_NEED_SCHED: - kref_get(&lkb->lkb_ref); + (*cb)->astfn = lkb->lkb_astfn; + (*cb)->bastfn = lkb->lkb_bastfn; + (*cb)->astparam = lkb->lkb_astparam; + INIT_WORK(&(*cb)->work, dlm_callback_work); - spin_lock(&ls->ls_cb_lock); - if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { - list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay); - } else { - queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); - } - spin_unlock(&ls->ls_cb_lock); - break; - case DLM_ENQUEUE_CALLBACK_FAILURE: - WARN_ON_ONCE(1); - break; - case DLM_ENQUEUE_CALLBACK_SUCCESS: - break; - default: - WARN_ON_ONCE(1); - break; - } - spin_unlock(&lkb->lkb_cb_lock); + return 0; } -void dlm_callback_work(struct work_struct *work) +void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, + uint32_t sbflags) { - struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work); - struct dlm_ls *ls = lkb->lkb_resource->res_ls; - void (*castfn) (void *astparam); - void (*bastfn) (void *astparam, int mode); + struct dlm_rsb *rsb = lkb->lkb_resource; + struct dlm_ls *ls = rsb->res_ls; struct dlm_callback *cb; int rv; - spin_lock(&lkb->lkb_cb_lock); - rv = dlm_dequeue_lkb_callback(lkb, &cb); - if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) { - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - spin_unlock(&lkb->lkb_cb_lock); - goto out; + if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { + dlm_user_add_ast(lkb, flags, mode, status, sbflags); + return; } - spin_unlock(&lkb->lkb_cb_lock); - - for (;;) { - castfn = lkb->lkb_astfn; - bastfn = lkb->lkb_bastfn; - - if (cb->flags & DLM_CB_BAST) { - trace_dlm_bast(ls, lkb, cb->mode); - lkb->lkb_last_bast_time = ktime_get(); - lkb->lkb_last_bast_mode = cb->mode; - bastfn(lkb->lkb_astparam, cb->mode); - } else if (cb->flags & DLM_CB_CAST) { - lkb->lkb_lksb->sb_status = cb->sb_status; - lkb->lkb_lksb->sb_flags = cb->sb_flags; - trace_dlm_ast(ls, lkb); - lkb->lkb_last_cast_time = ktime_get(); - castfn(lkb->lkb_astparam); - } - kref_put(&cb->ref, dlm_release_callback); + if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL)) + return; - spin_lock(&lkb->lkb_cb_lock); - rv = dlm_dequeue_lkb_callback(lkb, &cb); - if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) { - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - spin_unlock(&lkb->lkb_cb_lock); - break; + spin_lock_bh(&ls->ls_cb_lock); + if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) + list_add(&cb->list, &ls->ls_cb_delay); + } else { + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) { + dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags, + sbflags, status, lkb->lkb_lksb, + lkb->lkb_astfn, lkb->lkb_bastfn, + lkb->lkb_astparam, rsb->res_name, + rsb->res_length); + } else { + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) + queue_work(ls->ls_callback_wq, &cb->work); } - spin_unlock(&lkb->lkb_cb_lock); } - -out: - /* undo kref_get from dlm_add_callback, may cause lkb to be freed */ - dlm_put_lkb(lkb); + spin_unlock_bh(&ls->ls_cb_lock); } int dlm_callback_start(struct dlm_ls *ls) { - ls->ls_callback_wq = alloc_workqueue("dlm_callback", - WQ_HIGHPRI | WQ_MEM_RECLAIM, 0); + if (!test_bit(LSFL_FS, &ls->ls_flags) || + test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) + return 0; + + ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback", + WQ_HIGHPRI | WQ_MEM_RECLAIM); if (!ls->ls_callback_wq) { log_print("can't start dlm_callback workqueue"); return -ENOMEM; @@ -224,31 +222,37 @@ void dlm_callback_stop(struct dlm_ls *ls) void dlm_callback_suspend(struct dlm_ls *ls) { - if (ls->ls_callback_wq) { - spin_lock(&ls->ls_cb_lock); - set_bit(LSFL_CB_DELAY, &ls->ls_flags); - spin_unlock(&ls->ls_cb_lock); + if (!test_bit(LSFL_FS, &ls->ls_flags)) + return; + + spin_lock_bh(&ls->ls_cb_lock); + set_bit(LSFL_CB_DELAY, &ls->ls_flags); + spin_unlock_bh(&ls->ls_cb_lock); + if (ls->ls_callback_wq) flush_workqueue(ls->ls_callback_wq); - } } #define MAX_CB_QUEUE 25 void dlm_callback_resume(struct dlm_ls *ls) { - struct dlm_lkb *lkb, *safe; + struct dlm_callback *cb, *safe; int count = 0, sum = 0; bool empty; - if (!ls->ls_callback_wq) + if (!test_bit(LSFL_FS, &ls->ls_flags)) return; more: - spin_lock(&ls->ls_cb_lock); - list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) { - list_del_init(&lkb->lkb_cb_list); - queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); + spin_lock_bh(&ls->ls_cb_lock); + list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) { + list_del(&cb->list); + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) + dlm_do_callback(cb); + else + queue_work(ls->ls_callback_wq, &cb->work); + count++; if (count == MAX_CB_QUEUE) break; @@ -256,7 +260,7 @@ more: empty = list_empty(&ls->ls_cb_delay); if (empty) clear_bit(LSFL_CB_DELAY, &ls->ls_flags); - spin_unlock(&ls->ls_cb_lock); + spin_unlock_bh(&ls->ls_cb_lock); sum += count; if (!empty) { diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h index ce007892dc2d..e2b86845d331 100644 --- a/fs/dlm/ast.h +++ b/fs/dlm/ast.h @@ -11,22 +11,14 @@ #ifndef __ASTD_DOT_H__ #define __ASTD_DOT_H__ -#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1 -#define DLM_ENQUEUE_CALLBACK_SUCCESS 0 -#define DLM_ENQUEUE_CALLBACK_FAILURE -1 -int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags); -#define DLM_DEQUEUE_CALLBACK_EMPTY 2 -#define DLM_DEQUEUE_CALLBACK_LAST 1 -#define DLM_DEQUEUE_CALLBACK_SUCCESS 0 -int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb); +bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, int *copy_lvb); +int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb); void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags); -void dlm_callback_set_last_ptr(struct dlm_callback **from, - struct dlm_callback *to); -void dlm_release_callback(struct kref *ref); -void dlm_callback_work(struct work_struct *work); int dlm_callback_start(struct dlm_ls *ls); void dlm_callback_stop(struct dlm_ls *ls); void dlm_callback_suspend(struct dlm_ls *ls); diff --git a/fs/dlm/config.c b/fs/dlm/config.c index e55e0a2cd2e8..a23fd524a6ee 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -24,9 +24,9 @@ #include "lowcomms.h" /* - * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/nodeid + * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/nodeid (refers to <node>) * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/weight - * /config/dlm/<cluster>/comms/<comm>/nodeid + * /config/dlm/<cluster>/comms/<comm>/nodeid (refers to <comm>) * /config/dlm/<cluster>/comms/<comm>/local * /config/dlm/<cluster>/comms/<comm>/addr (write only) * /config/dlm/<cluster>/comms/<comm>/addr_list (read only) @@ -63,22 +63,16 @@ static void release_node(struct config_item *); static struct configfs_attribute *comm_attrs[]; static struct configfs_attribute *node_attrs[]; +const struct rhashtable_params dlm_rhash_rsb_params = { + .nelem_hint = 3, /* start small */ + .key_len = DLM_RESNAME_MAXLEN, + .key_offset = offsetof(struct dlm_rsb, res_name), + .head_offset = offsetof(struct dlm_rsb, res_node), + .automatic_shrinking = true, +}; + struct dlm_cluster { struct config_group group; - unsigned int cl_tcp_port; - unsigned int cl_buffer_size; - unsigned int cl_rsbtbl_size; - unsigned int cl_recover_timer; - unsigned int cl_toss_secs; - unsigned int cl_scan_secs; - unsigned int cl_log_debug; - unsigned int cl_log_info; - unsigned int cl_protocol; - unsigned int cl_mark; - unsigned int cl_new_rsb_count; - unsigned int cl_recover_callbacks; - char cl_cluster_name[DLM_LOCKSPACE_LEN]; - struct dlm_spaces *sps; struct dlm_comms *cms; }; @@ -107,25 +101,60 @@ enum { static ssize_t cluster_cluster_name_show(struct config_item *item, char *buf) { - struct dlm_cluster *cl = config_item_to_cluster(item); - return sprintf(buf, "%s\n", cl->cl_cluster_name); + return sprintf(buf, "%s\n", dlm_config.ci_cluster_name); } static ssize_t cluster_cluster_name_store(struct config_item *item, const char *buf, size_t len) { - struct dlm_cluster *cl = config_item_to_cluster(item); - strscpy(dlm_config.ci_cluster_name, buf, - sizeof(dlm_config.ci_cluster_name)); - strscpy(cl->cl_cluster_name, buf, sizeof(cl->cl_cluster_name)); + sizeof(dlm_config.ci_cluster_name)); return len; } CONFIGFS_ATTR(cluster_, cluster_name); -static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, - int *info_field, int (*check_cb)(unsigned int x), +static ssize_t cluster_tcp_port_show(struct config_item *item, char *buf) +{ + return sprintf(buf, "%u\n", be16_to_cpu(dlm_config.ci_tcp_port)); +} + +static int dlm_check_zero_and_dlm_running(unsigned int x) +{ + if (!x) + return -EINVAL; + + if (dlm_lowcomms_is_running()) + return -EBUSY; + + return 0; +} + +static ssize_t cluster_tcp_port_store(struct config_item *item, + const char *buf, size_t len) +{ + int rc; + u16 x; + + if (!capable(CAP_SYS_ADMIN)) + return -EPERM; + + rc = kstrtou16(buf, 0, &x); + if (rc) + return rc; + + rc = dlm_check_zero_and_dlm_running(x); + if (rc) + return rc; + + dlm_config.ci_tcp_port = cpu_to_be16(x); + return len; +} + +CONFIGFS_ATTR(cluster_, tcp_port); + +static ssize_t cluster_set(unsigned int *info_field, + int (*check_cb)(unsigned int x), const char *buf, size_t len) { unsigned int x; @@ -143,7 +172,6 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, return rc; } - *cl_field = x; *info_field = x; return len; @@ -153,14 +181,11 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, static ssize_t cluster_##name##_store(struct config_item *item, \ const char *buf, size_t len) \ { \ - struct dlm_cluster *cl = config_item_to_cluster(item); \ - return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name, \ - check_cb, buf, len); \ + return cluster_set(&dlm_config.ci_##name, check_cb, buf, len); \ } \ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \ { \ - struct dlm_cluster *cl = config_item_to_cluster(item); \ - return snprintf(buf, PAGE_SIZE, "%u\n", cl->cl_##name); \ + return snprintf(buf, PAGE_SIZE, "%u\n", dlm_config.ci_##name); \ } \ CONFIGFS_ATTR(cluster_, name); @@ -172,6 +197,9 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x) break; case 1: /* SCTP */ + if (!IS_ENABLED(CONFIG_IP_SCTP)) + return -EOPNOTSUPP; + break; default: return -EINVAL; @@ -183,17 +211,6 @@ static int dlm_check_protocol_and_dlm_running(unsigned int x) return 0; } -static int dlm_check_zero_and_dlm_running(unsigned int x) -{ - if (!x) - return -EINVAL; - - if (dlm_lowcomms_is_running()) - return -EBUSY; - - return 0; -} - static int dlm_check_zero(unsigned int x) { if (!x) @@ -210,7 +227,6 @@ static int dlm_check_buffer_size(unsigned int x) return 0; } -CLUSTER_ATTR(tcp_port, dlm_check_zero_and_dlm_running); CLUSTER_ATTR(buffer_size, dlm_check_buffer_size); CLUSTER_ATTR(rsbtbl_size, dlm_check_zero); CLUSTER_ATTR(recover_timer, dlm_check_zero); @@ -415,20 +431,6 @@ static struct config_group *make_cluster(struct config_group *g, configfs_add_default_group(&sps->ss_group, &cl->group); configfs_add_default_group(&cms->cs_group, &cl->group); - cl->cl_tcp_port = dlm_config.ci_tcp_port; - cl->cl_buffer_size = dlm_config.ci_buffer_size; - cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; - cl->cl_recover_timer = dlm_config.ci_recover_timer; - cl->cl_toss_secs = dlm_config.ci_toss_secs; - cl->cl_scan_secs = dlm_config.ci_scan_secs; - cl->cl_log_debug = dlm_config.ci_log_debug; - cl->cl_log_info = dlm_config.ci_log_info; - cl->cl_protocol = dlm_config.ci_protocol; - cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count; - cl->cl_recover_callbacks = dlm_config.ci_recover_callbacks; - memcpy(cl->cl_cluster_name, dlm_config.ci_cluster_name, - DLM_LOCKSPACE_LEN); - space_list = &sps->ss_group; comm_list = &cms->cs_group; return &cl->group; @@ -509,6 +511,12 @@ static void release_space(struct config_item *i) static struct config_item *make_comm(struct config_group *g, const char *name) { struct dlm_comm *cm; + unsigned int nodeid; + int rv; + + rv = kstrtouint(name, 0, &nodeid); + if (rv) + return ERR_PTR(rv); cm = kzalloc(sizeof(struct dlm_comm), GFP_NOFS); if (!cm) @@ -520,7 +528,7 @@ static struct config_item *make_comm(struct config_group *g, const char *name) if (!cm->seq) cm->seq = dlm_comm_count++; - cm->nodeid = -1; + cm->nodeid = nodeid; cm->local = 0; cm->addr_count = 0; cm->mark = 0; @@ -547,16 +555,25 @@ static void release_comm(struct config_item *i) static struct config_item *make_node(struct config_group *g, const char *name) { struct dlm_space *sp = config_item_to_space(g->cg_item.ci_parent); + unsigned int nodeid; struct dlm_node *nd; + uint32_t seq = 0; + int rv; + + rv = kstrtouint(name, 0, &nodeid); + if (rv) + return ERR_PTR(rv); nd = kzalloc(sizeof(struct dlm_node), GFP_NOFS); if (!nd) return ERR_PTR(-ENOMEM); config_item_init_type_name(&nd->item, name, &node_type); - nd->nodeid = -1; + nd->nodeid = nodeid; nd->weight = 1; /* default weight of 1 if none is set */ nd->new = 1; /* set to 0 once it's been read by dlm_nodeid_list() */ + dlm_comm_seq(nodeid, &seq, true); + nd->comm_seq = seq; mutex_lock(&sp->members_lock); list_add(&nd->list, &sp->members); @@ -614,16 +631,19 @@ void dlm_config_exit(void) static ssize_t comm_nodeid_show(struct config_item *item, char *buf) { - return sprintf(buf, "%d\n", config_item_to_comm(item)->nodeid); + unsigned int nodeid; + int rv; + + rv = kstrtouint(config_item_name(item), 0, &nodeid); + if (WARN_ON(rv)) + return rv; + + return sprintf(buf, "%u\n", nodeid); } static ssize_t comm_nodeid_store(struct config_item *item, const char *buf, size_t len) { - int rc = kstrtoint(buf, 0, &config_item_to_comm(item)->nodeid); - - if (rc) - return rc; return len; } @@ -664,7 +684,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf, memcpy(addr, buf, len); - rv = dlm_midcomms_addr(cm->nodeid, addr, len); + rv = dlm_midcomms_addr(cm->nodeid, addr); if (rv) { kfree(addr); return rv; @@ -764,20 +784,19 @@ static struct configfs_attribute *comm_attrs[] = { static ssize_t node_nodeid_show(struct config_item *item, char *buf) { - return sprintf(buf, "%d\n", config_item_to_node(item)->nodeid); + unsigned int nodeid; + int rv; + + rv = kstrtouint(config_item_name(item), 0, &nodeid); + if (WARN_ON(rv)) + return rv; + + return sprintf(buf, "%u\n", nodeid); } static ssize_t node_nodeid_store(struct config_item *item, const char *buf, size_t len) { - struct dlm_node *nd = config_item_to_node(item); - uint32_t seq = 0; - int rc = kstrtoint(buf, 0, &nd->nodeid); - - if (rc) - return rc; - dlm_comm_seq(nd->nodeid, &seq); - nd->comm_seq = seq; return len; } @@ -837,7 +856,7 @@ static struct dlm_comm *get_comm(int nodeid) if (!comm_list) return NULL; - mutex_lock(&clusters_root.subsys.su_mutex); + WARN_ON_ONCE(!mutex_is_locked(&clusters_root.subsys.su_mutex)); list_for_each_entry(i, &comm_list->cg_children, ci_entry) { cm = config_item_to_comm(i); @@ -848,7 +867,6 @@ static struct dlm_comm *get_comm(int nodeid) config_item_get(i); break; } - mutex_unlock(&clusters_root.subsys.su_mutex); if (!found) cm = NULL; @@ -908,11 +926,20 @@ int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, return rv; } -int dlm_comm_seq(int nodeid, uint32_t *seq) +int dlm_comm_seq(int nodeid, uint32_t *seq, bool locked) { - struct dlm_comm *cm = get_comm(nodeid); + struct dlm_comm *cm; + + if (locked) { + cm = get_comm(nodeid); + } else { + mutex_lock(&clusters_root.subsys.su_mutex); + cm = get_comm(nodeid); + mutex_unlock(&clusters_root.subsys.su_mutex); + } if (!cm) - return -EEXIST; + return -ENOENT; + *seq = cm->seq; put_comm(cm); return 0; @@ -920,7 +947,7 @@ int dlm_comm_seq(int nodeid, uint32_t *seq) int dlm_our_nodeid(void) { - return local_comm ? local_comm->nodeid : 0; + return local_comm->nodeid; } /* num 0 is first addr, num 1 is second addr */ @@ -949,7 +976,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_CLUSTER_NAME "" struct dlm_config_info dlm_config = { - .ci_tcp_port = DEFAULT_TCP_PORT, + .ci_tcp_port = cpu_to_be16(DEFAULT_TCP_PORT), .ci_buffer_size = DLM_MAX_SOCKET_BUFSIZE, .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, .ci_recover_timer = DEFAULT_RECOVER_TIMER, diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 4c91fcca0fd4..13a3d0b26194 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -21,24 +21,26 @@ struct dlm_config_node { uint32_t comm_seq; }; -#define DLM_MAX_ADDR_COUNT 3 +extern const struct rhashtable_params dlm_rhash_rsb_params; + +#define DLM_MAX_ADDR_COUNT 8 #define DLM_PROTO_TCP 0 #define DLM_PROTO_SCTP 1 struct dlm_config_info { - int ci_tcp_port; - int ci_buffer_size; - int ci_rsbtbl_size; - int ci_recover_timer; - int ci_toss_secs; - int ci_scan_secs; - int ci_log_debug; - int ci_log_info; - int ci_protocol; - int ci_mark; - int ci_new_rsb_count; - int ci_recover_callbacks; + __be16 ci_tcp_port; + unsigned int ci_buffer_size; + unsigned int ci_rsbtbl_size; + unsigned int ci_recover_timer; + unsigned int ci_toss_secs; + unsigned int ci_scan_secs; + unsigned int ci_log_debug; + unsigned int ci_log_info; + unsigned int ci_protocol; + unsigned int ci_mark; + unsigned int ci_new_rsb_count; + unsigned int ci_recover_callbacks; char ci_cluster_name[DLM_LOCKSPACE_LEN]; }; @@ -48,7 +50,7 @@ int dlm_config_init(void); void dlm_config_exit(void); int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int *count_out); -int dlm_comm_seq(int nodeid, uint32_t *seq); +int dlm_comm_seq(int nodeid, uint32_t *seq, bool locked); int dlm_our_nodeid(void); int dlm_our_addr(struct sockaddr_storage *addr, int num); diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index 4fa11d9ddbb6..700a0cbb2f14 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -247,7 +247,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb, lkb->lkb_status, lkb->lkb_grmode, lkb->lkb_rqmode, - lkb->lkb_last_bast_mode, + lkb->lkb_last_bast_cb_mode, rsb_lookup, lkb->lkb_wait_type, lkb->lkb_lvbseq, @@ -366,58 +366,10 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s) unlock_rsb(r); } -static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb) -{ - struct dlm_callback *cb; - - /* lkb_id lkb_flags mode flags sb_status sb_flags */ - - spin_lock(&lkb->lkb_cb_lock); - list_for_each_entry(cb, &lkb->lkb_callbacks, list) { - seq_printf(s, "%x %x %d %x %d %x\n", - lkb->lkb_id, - dlm_iflags_val(lkb), - cb->mode, - cb->flags, - cb->sb_status, - cb->sb_flags); - } - spin_unlock(&lkb->lkb_cb_lock); -} - -static void print_format5(struct dlm_rsb *r, struct seq_file *s) -{ - struct dlm_lkb *lkb; - - lock_rsb(r); - - list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { - print_format5_lock(s, lkb); - if (seq_has_overflowed(s)) - goto out; - } - - list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { - print_format5_lock(s, lkb); - if (seq_has_overflowed(s)) - goto out; - } - - list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) { - print_format5_lock(s, lkb); - if (seq_has_overflowed(s)) - goto out; - } - out: - unlock_rsb(r); -} - -struct rsbtbl_iter { - struct dlm_rsb *rsb; - unsigned bucket; - int format; - int header; -}; +static const struct seq_operations format1_seq_ops; +static const struct seq_operations format2_seq_ops; +static const struct seq_operations format3_seq_ops; +static const struct seq_operations format4_seq_ops; /* * If the buffer is full, seq_printf can be called again, but it @@ -428,207 +380,61 @@ struct rsbtbl_iter { static int table_seq_show(struct seq_file *seq, void *iter_ptr) { - struct rsbtbl_iter *ri = iter_ptr; - - switch (ri->format) { - case 1: - print_format1(ri->rsb, seq); - break; - case 2: - if (ri->header) { - seq_puts(seq, "id nodeid remid pid xid exflags flags sts grmode rqmode time_ms r_nodeid r_len r_name\n"); - ri->header = 0; - } - print_format2(ri->rsb, seq); - break; - case 3: - if (ri->header) { - seq_puts(seq, "rsb ptr nodeid first_lkid flags !root_list_empty !recover_list_empty recover_locks_count len\n"); - ri->header = 0; - } - print_format3(ri->rsb, seq); - break; - case 4: - if (ri->header) { - seq_puts(seq, "rsb ptr nodeid master_nodeid dir_nodeid our_nodeid toss_time flags len str|hex name\n"); - ri->header = 0; - } - print_format4(ri->rsb, seq); - break; - case 5: - if (ri->header) { - seq_puts(seq, "lkb_id lkb_flags mode flags sb_status sb_flags\n"); - ri->header = 0; - } - print_format5(ri->rsb, seq); - break; - } + struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_slow_list); + + if (seq->op == &format1_seq_ops) + print_format1(rsb, seq); + else if (seq->op == &format2_seq_ops) + print_format2(rsb, seq); + else if (seq->op == &format3_seq_ops) + print_format3(rsb, seq); + else if (seq->op == &format4_seq_ops) + print_format4(rsb, seq); return 0; } -static const struct seq_operations format1_seq_ops; -static const struct seq_operations format2_seq_ops; -static const struct seq_operations format3_seq_ops; -static const struct seq_operations format4_seq_ops; -static const struct seq_operations format5_seq_ops; - static void *table_seq_start(struct seq_file *seq, loff_t *pos) { - struct rb_root *tree; - struct rb_node *node; struct dlm_ls *ls = seq->private; - struct rsbtbl_iter *ri; - struct dlm_rsb *r; - loff_t n = *pos; - unsigned bucket, entry; - int toss = (seq->op == &format4_seq_ops); - - bucket = n >> 32; - entry = n & ((1LL << 32) - 1); - - if (bucket >= ls->ls_rsbtbl_size) - return NULL; - - ri = kzalloc(sizeof(*ri), GFP_NOFS); - if (!ri) - return NULL; - if (n == 0) - ri->header = 1; - if (seq->op == &format1_seq_ops) - ri->format = 1; - if (seq->op == &format2_seq_ops) - ri->format = 2; - if (seq->op == &format3_seq_ops) - ri->format = 3; - if (seq->op == &format4_seq_ops) - ri->format = 4; - if (seq->op == &format5_seq_ops) - ri->format = 5; - - tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!RB_EMPTY_ROOT(tree)) { - for (node = rb_first(tree); node; node = rb_next(node)) { - r = rb_entry(node, struct dlm_rsb, res_hashnode); - if (!entry--) { - dlm_hold_rsb(r); - ri->rsb = r; - ri->bucket = bucket; - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - return ri; - } - } - } - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - - /* - * move to the first rsb in the next non-empty bucket - */ + struct list_head *list; - /* zero the entry */ - n &= ~((1LL << 32) - 1); + if (!*pos) { + if (seq->op == &format2_seq_ops) + seq_puts(seq, "id nodeid remid pid xid exflags flags sts grmode rqmode time_ms r_nodeid r_len r_name\n"); + else if (seq->op == &format3_seq_ops) + seq_puts(seq, "rsb ptr nodeid first_lkid flags !root_list_empty !recover_list_empty recover_locks_count len\n"); + else if (seq->op == &format4_seq_ops) + seq_puts(seq, "rsb ptr nodeid master_nodeid dir_nodeid our_nodeid toss_time flags len str|hex name\n"); + } - while (1) { - bucket++; - n += 1LL << 32; + if (seq->op == &format4_seq_ops) + list = &ls->ls_slow_inactive; + else + list = &ls->ls_slow_active; - if (bucket >= ls->ls_rsbtbl_size) { - kfree(ri); - return NULL; - } - tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!RB_EMPTY_ROOT(tree)) { - node = rb_first(tree); - r = rb_entry(node, struct dlm_rsb, res_hashnode); - dlm_hold_rsb(r); - ri->rsb = r; - ri->bucket = bucket; - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - *pos = n; - return ri; - } - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - } + read_lock_bh(&ls->ls_rsbtbl_lock); + return seq_list_start(list, *pos); } static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) { struct dlm_ls *ls = seq->private; - struct rsbtbl_iter *ri = iter_ptr; - struct rb_root *tree; - struct rb_node *next; - struct dlm_rsb *r, *rp; - loff_t n = *pos; - unsigned bucket; - int toss = (seq->op == &format4_seq_ops); - - bucket = n >> 32; - - /* - * move to the next rsb in the same bucket - */ - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - rp = ri->rsb; - next = rb_next(&rp->res_hashnode); - - if (next) { - r = rb_entry(next, struct dlm_rsb, res_hashnode); - dlm_hold_rsb(r); - ri->rsb = r; - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - dlm_put_rsb(rp); - ++*pos; - return ri; - } - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - dlm_put_rsb(rp); + struct list_head *list; - /* - * move to the first rsb in the next non-empty bucket - */ - - /* zero the entry */ - n &= ~((1LL << 32) - 1); - - while (1) { - bucket++; - n += 1LL << 32; + if (seq->op == &format4_seq_ops) + list = &ls->ls_slow_inactive; + else + list = &ls->ls_slow_active; - if (bucket >= ls->ls_rsbtbl_size) { - kfree(ri); - ++*pos; - return NULL; - } - tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!RB_EMPTY_ROOT(tree)) { - next = rb_first(tree); - r = rb_entry(next, struct dlm_rsb, res_hashnode); - dlm_hold_rsb(r); - ri->rsb = r; - ri->bucket = bucket; - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - *pos = n; - return ri; - } - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - } + return seq_list_next(iter_ptr, list, pos); } static void table_seq_stop(struct seq_file *seq, void *iter_ptr) { - struct rsbtbl_iter *ri = iter_ptr; + struct dlm_ls *ls = seq->private; - if (ri) { - dlm_put_rsb(ri->rsb); - kfree(ri); - } + read_unlock_bh(&ls->ls_rsbtbl_lock); } static const struct seq_operations format1_seq_ops = { @@ -659,18 +465,10 @@ static const struct seq_operations format4_seq_ops = { .show = table_seq_show, }; -static const struct seq_operations format5_seq_ops = { - .start = table_seq_start, - .next = table_seq_next, - .stop = table_seq_stop, - .show = table_seq_show, -}; - static const struct file_operations format1_fops; static const struct file_operations format2_fops; static const struct file_operations format3_fops; static const struct file_operations format4_fops; -static const struct file_operations format5_fops; static int table_open1(struct inode *inode, struct file *file) { @@ -757,20 +555,6 @@ static int table_open4(struct inode *inode, struct file *file) return 0; } -static int table_open5(struct inode *inode, struct file *file) -{ - struct seq_file *seq; - int ret; - - ret = seq_open(file, &format5_seq_ops); - if (ret) - return ret; - - seq = file->private_data; - seq->private = inode->i_private; /* the dlm_ls */ - return 0; -} - static const struct file_operations format1_fops = { .owner = THIS_MODULE, .open = table_open1, @@ -804,14 +588,6 @@ static const struct file_operations format4_fops = { .release = seq_release }; -static const struct file_operations format5_fops = { - .owner = THIS_MODULE, - .open = table_open5, - .read = seq_read, - .llseek = seq_lseek, - .release = seq_release -}; - /* * dump lkb's on the ls_waiters list */ @@ -823,7 +599,13 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf, size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv; mutex_lock(&debug_buf_lock); - mutex_lock(&ls->ls_waiters_mutex); + ret = dlm_lock_recovery_try(ls); + if (!ret) { + rv = -EAGAIN; + goto out; + } + + spin_lock_bh(&ls->ls_waiters_lock); memset(debug_buf, 0, sizeof(debug_buf)); list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { @@ -834,9 +616,11 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf, break; pos += ret; } - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); + dlm_unlock_recovery(ls); rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos); +out: mutex_unlock(&debug_buf_lock); return rv; } @@ -858,7 +642,12 @@ static ssize_t waiters_write(struct file *file, const char __user *user_buf, if (n != 3) return -EINVAL; + error = dlm_lock_recovery_try(ls); + if (!error) + return -EAGAIN; + error = dlm_debug_add_lkb_to_waiters(ls, lkb_id, mstype, to_nodeid); + dlm_unlock_recovery(ls); if (error) return error; @@ -944,7 +733,6 @@ out: static const struct file_operations dlm_rawmsg_fops = { .open = simple_open, .write = dlm_rawmsg_write, - .llseek = no_llseek, }; void *dlm_create_debug_comms_file(int nodeid, void *data) @@ -1021,16 +809,6 @@ void dlm_create_debug_file(struct dlm_ls *ls) dlm_root, ls, &waiters_fops); - - /* format 5 */ - - snprintf(name, sizeof(name), "%s_queued_asts", ls->ls_name); - - ls->ls_debug_queued_asts_dentry = debugfs_create_file(name, - 0644, - dlm_root, - ls, - &format5_fops); } void __init dlm_register_debugfs(void) diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index f6acba4310a7..b1ab0adbd9d0 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -47,15 +47,13 @@ int dlm_dir_nodeid(struct dlm_rsb *r) return r->res_dir_nodeid; } -void dlm_recover_dir_nodeid(struct dlm_ls *ls) +void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list) { struct dlm_rsb *r; - down_read(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + list_for_each_entry(r, root_list, res_root_list) { r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash); } - up_read(&ls->ls_root_sem); } int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq) @@ -200,35 +198,98 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name, int len) { struct dlm_rsb *r; - uint32_t hash, bucket; int rv; - hash = jhash(name, len, 0); - bucket = hash & (ls->ls_rsbtbl_size - 1); - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); - if (rv) - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, - name, len, &r); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - + read_lock_bh(&ls->ls_rsbtbl_lock); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + read_unlock_bh(&ls->ls_rsbtbl_lock); if (!rv) return r; - down_read(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) { if (len == r->res_length && !memcmp(name, r->res_name, len)) { - up_read(&ls->ls_root_sem); log_debug(ls, "find_rsb_root revert to root_list %s", r->res_name); return r; } } - up_read(&ls->ls_root_sem); return NULL; } +struct dlm_dir_dump { + /* init values to match if whole + * dump fits to one seq. Sanity check only. + */ + uint64_t seq_init; + uint64_t nodeid_init; + /* compare local pointer with last lookup, + * just a sanity check. + */ + struct list_head *last; + + unsigned int sent_res; /* for log info */ + unsigned int sent_msg; /* for log info */ + + struct list_head list; +}; + +static void drop_dir_ctx(struct dlm_ls *ls, int nodeid) +{ + struct dlm_dir_dump *dd, *safe; + + write_lock_bh(&ls->ls_dir_dump_lock); + list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) { + if (dd->nodeid_init == nodeid) { + log_error(ls, "drop dump seq %llu", + (unsigned long long)dd->seq_init); + list_del(&dd->list); + kfree(dd); + } + } + write_unlock_bh(&ls->ls_dir_dump_lock); +} + +static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid) +{ + struct dlm_dir_dump *iter, *dd = NULL; + + read_lock_bh(&ls->ls_dir_dump_lock); + list_for_each_entry(iter, &ls->ls_dir_dump_list, list) { + if (iter->nodeid_init == nodeid) { + dd = iter; + break; + } + } + read_unlock_bh(&ls->ls_dir_dump_lock); + + return dd; +} + +static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid) +{ + struct dlm_dir_dump *dd; + + dd = lookup_dir_dump(ls, nodeid); + if (dd) { + log_error(ls, "found ongoing dir dump for node %d, will drop it", + nodeid); + drop_dir_ctx(ls, nodeid); + } + + dd = kzalloc(sizeof(*dd), GFP_ATOMIC); + if (!dd) + return NULL; + + dd->seq_init = ls->ls_recover_seq; + dd->nodeid_init = nodeid; + + write_lock_bh(&ls->ls_dir_dump_lock); + list_add(&dd->list, &ls->ls_dir_dump_list); + write_unlock_bh(&ls->ls_dir_dump_lock); + + return dd; +} + /* Find the rsb where we left off (or start again), then send rsb names for rsb's we're master of and whose directory node matches the requesting node. inbuf is the rsb name last sent, inlen is the name's length */ @@ -239,27 +300,50 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, struct list_head *list; struct dlm_rsb *r; int offset = 0, dir_nodeid; + struct dlm_dir_dump *dd; __be16 be_namelen; - down_read(&ls->ls_root_sem); + read_lock_bh(&ls->ls_masters_lock); if (inlen > 1) { + dd = lookup_dir_dump(ls, nodeid); + if (!dd) { + log_error(ls, "failed to lookup dir dump context nodeid: %d", + nodeid); + goto out; + } + + /* next chunk in dump */ r = find_rsb_root(ls, inbuf, inlen); if (!r) { log_error(ls, "copy_master_names from %d start %d %.*s", nodeid, inlen, inlen, inbuf); goto out; } - list = r->res_root_list.next; + list = r->res_masters_list.next; + + /* sanity checks */ + if (dd->last != &r->res_masters_list || + dd->seq_init != ls->ls_recover_seq) { + log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu", + (unsigned long long)dd->seq_init, + (unsigned long long)ls->ls_recover_seq); + goto out; + } } else { - list = ls->ls_root_list.next; - } + dd = init_dir_dump(ls, nodeid); + if (!dd) { + log_error(ls, "failed to allocate dir dump context"); + goto out; + } - for (offset = 0; list != &ls->ls_root_list; list = list->next) { - r = list_entry(list, struct dlm_rsb, res_root_list); - if (r->res_nodeid) - continue; + /* start dump */ + list = ls->ls_masters_list.next; + dd->last = list; + } + for (offset = 0; list != &ls->ls_masters_list; list = list->next) { + r = list_entry(list, struct dlm_rsb, res_masters_list); dir_nodeid = dlm_dir_nodeid(r); if (dir_nodeid != nodeid) continue; @@ -277,7 +361,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, be_namelen = cpu_to_be16(0); memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); offset += sizeof(__be16); - ls->ls_recover_dir_sent_msg++; + dd->sent_msg++; goto out; } @@ -286,7 +370,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, offset += sizeof(__be16); memcpy(outbuf + offset, r->res_name, r->res_length); offset += r->res_length; - ls->ls_recover_dir_sent_res++; + dd->sent_res++; + dd->last = list; } /* @@ -294,14 +379,22 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, * terminating record. */ - if ((list == &ls->ls_root_list) && + if ((list == &ls->ls_masters_list) && (offset + sizeof(uint16_t) <= outlen)) { + /* end dump */ be_namelen = cpu_to_be16(0xFFFF); memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); offset += sizeof(__be16); - ls->ls_recover_dir_sent_msg++; + dd->sent_msg++; + log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages", + nodeid, dd->sent_res, dd->sent_msg); + + write_lock_bh(&ls->ls_dir_dump_lock); + list_del_init(&dd->list); + write_unlock_bh(&ls->ls_dir_dump_lock); + kfree(dd); } out: - up_read(&ls->ls_root_sem); + read_unlock_bh(&ls->ls_masters_lock); } diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h index 39ecb69d7ef3..5b2a7ee3762d 100644 --- a/fs/dlm/dir.h +++ b/fs/dlm/dir.h @@ -14,7 +14,8 @@ int dlm_dir_nodeid(struct dlm_rsb *rsb); int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); -void dlm_recover_dir_nodeid(struct dlm_ls *ls); +void dlm_recover_dir_nodeid(struct dlm_ls *ls, + const struct list_head *root_list); int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq); void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, char *outbuf, int outlen, int nodeid); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 3b4dbce849f0..d534a4bc162b 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -16,6 +16,7 @@ * This is the main header file to be included in each DLM source file. */ +#include <uapi/linux/dlm_device.h> #include <linux/slab.h> #include <linux/sched.h> #include <linux/types.h> @@ -33,8 +34,9 @@ #include <linux/kernel.h> #include <linux/jhash.h> #include <linux/miscdevice.h> +#include <linux/rhashtable.h> #include <linux/mutex.h> -#include <linux/idr.h> +#include <linux/xarray.h> #include <linux/ratelimit.h> #include <linux/uaccess.h> @@ -98,17 +100,6 @@ do { \ } \ } - -#define DLM_RTF_SHRINK_BIT 0 - -struct dlm_rsbtable { - struct rb_root keep; - struct rb_root toss; - spinlock_t lock; - unsigned long flags; -}; - - /* * Lockspace member (per node in a ls) */ @@ -204,8 +195,7 @@ struct dlm_args { #define DLM_IFL_OVERLAP_CANCEL_BIT 20 #define DLM_IFL_ENDOFLIFE_BIT 21 #define DLM_IFL_DEADLOCK_CANCEL_BIT 24 -#define DLM_IFL_CB_PENDING_BIT 25 -#define __DLM_IFL_MAX_BIT DLM_IFL_CB_PENDING_BIT +#define __DLM_IFL_MAX_BIT DLM_IFL_DEADLOCK_CANCEL_BIT /* lkb_dflags */ @@ -217,14 +207,47 @@ struct dlm_args { #define DLM_CB_CAST 0x00000001 #define DLM_CB_BAST 0x00000002 +/* much of this is just saving user space pointers associated with the + * lock that we pass back to the user lib with an ast + */ + +struct dlm_user_args { + struct dlm_user_proc *proc; /* each process that opens the lockspace + * device has private data + * (dlm_user_proc) on the struct file, + * the process's locks point back to it + */ + struct dlm_lksb lksb; + struct dlm_lksb __user *user_lksb; + void __user *castparam; + void __user *castaddr; + void __user *bastparam; + void __user *bastaddr; + uint64_t xid; +}; + struct dlm_callback { uint32_t flags; /* DLM_CBF_ */ int sb_status; /* copy to lksb status */ uint8_t sb_flags; /* copy to lksb flags */ int8_t mode; /* rq mode of bast, gr mode of cast */ + bool copy_lvb; + struct dlm_lksb *lkb_lksb; + unsigned char lvbptr[DLM_USER_LVB_LEN]; + + union { + void *astparam; /* caller's ast arg */ + struct dlm_user_args ua; + }; + struct work_struct work; + void (*bastfn)(void *astparam, int mode); + void (*astfn)(void *astparam); + char res_name[DLM_RESNAME_MAXLEN]; + size_t res_length; + uint32_t ls_id; + uint32_t lkb_id; struct list_head list; - struct kref ref; }; struct dlm_lkb { @@ -255,13 +278,10 @@ struct dlm_lkb { struct list_head lkb_ownqueue; /* list of locks for a process */ ktime_t lkb_timestamp; - spinlock_t lkb_cb_lock; - struct work_struct lkb_cb_work; - struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */ - struct list_head lkb_callbacks; - struct dlm_callback *lkb_last_cast; - struct dlm_callback *lkb_last_cb; - int lkb_last_bast_mode; + int8_t lkb_last_cast_cb_mode; + int8_t lkb_last_bast_cb_mode; + int8_t lkb_last_cb_mode; + uint8_t lkb_last_cb_flags; ktime_t lkb_last_cast_time; /* for debugging */ ktime_t lkb_last_bast_time; /* for debugging */ @@ -275,6 +295,7 @@ struct dlm_lkb { void *lkb_astparam; /* caller's ast arg */ struct dlm_user_args *lkb_ua; }; + struct rcu_head rcu; }; /* @@ -290,30 +311,30 @@ struct dlm_lkb { struct dlm_rsb { struct dlm_ls *res_ls; /* the lockspace */ struct kref res_ref; - struct mutex res_mutex; + spinlock_t res_lock; unsigned long res_flags; int res_length; /* length of rsb name */ int res_nodeid; int res_master_nodeid; int res_dir_nodeid; - int res_id; /* for ls_recover_idr */ + unsigned long res_id; /* for ls_recover_xa */ uint32_t res_lvbseq; uint32_t res_hash; - uint32_t res_bucket; /* rsbtbl */ unsigned long res_toss_time; uint32_t res_first_lkid; struct list_head res_lookup; /* lkbs waiting on first */ - union { - struct list_head res_hashchain; - struct rb_node res_hashnode; /* rsbtbl */ - }; + struct rhash_head res_node; /* rsbtbl */ struct list_head res_grantqueue; struct list_head res_convertqueue; struct list_head res_waitqueue; + struct list_head res_slow_list; /* ls_slow_* */ + struct list_head res_scan_list; struct list_head res_root_list; /* used for recovery */ + struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ int res_recover_locks_count; + struct rcu_head rcu; char *res_lvbptr; char res_name[DLM_RESNAME_MAXLEN+1]; @@ -346,6 +367,8 @@ enum rsb_flags { RSB_RECOVER_CONVERT, RSB_RECOVER_GRANT, RSB_RECOVER_LVB_INVAL, + RSB_INACTIVE, + RSB_HASHED, /* set while rsb is on ls_rsbtbl */ }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) @@ -536,16 +559,8 @@ struct rcom_lock { char rl_lvb[]; }; -/* - * The max number of resources per rsbtbl bucket that shrink will attempt - * to remove in each iteration. - */ - -#define DLM_REMOVE_NAMES_MAX 8 - struct dlm_ls { struct list_head ls_list; /* list of lockspaces */ - dlm_lockspace_t *ls_local_handle; uint32_t ls_global_id; /* global unique lockspace ID */ uint32_t ls_generation; uint32_t ls_exflags; @@ -555,28 +570,28 @@ struct dlm_ls { wait_queue_head_t ls_count_wait; int ls_create_count; /* create/release refcount */ unsigned long ls_flags; /* LSFL_ */ - unsigned long ls_scan_time; struct kobject ls_kobj; - struct idr ls_lkbidr; - spinlock_t ls_lkbidr_spin; + struct xarray ls_lkbxa; + rwlock_t ls_lkbxa_lock; + + /* an rsb is on rsbtl for primary locking functions, + and on a slow list for recovery/dump iteration */ + struct rhashtable ls_rsbtbl; + rwlock_t ls_rsbtbl_lock; /* for ls_rsbtbl and ls_slow */ + struct list_head ls_slow_inactive; /* to iterate rsbtbl */ + struct list_head ls_slow_active; /* to iterate rsbtbl */ - struct dlm_rsbtable *ls_rsbtbl; - uint32_t ls_rsbtbl_size; + struct timer_list ls_scan_timer; /* based on first scan_list rsb toss_time */ + struct list_head ls_scan_list; /* rsbs ordered by res_toss_time */ + spinlock_t ls_scan_lock; - struct mutex ls_waiters_mutex; + spinlock_t ls_waiters_lock; struct list_head ls_waiters; /* lkbs needing a reply */ - struct mutex ls_orphans_mutex; + spinlock_t ls_orphans_lock; struct list_head ls_orphans; - spinlock_t ls_new_rsb_spin; - int ls_new_rsb_count; - struct list_head ls_new_rsb; /* new rsb structs */ - - char *ls_remove_names[DLM_REMOVE_NAMES_MAX]; - int ls_remove_lens[DLM_REMOVE_NAMES_MAX]; - struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ @@ -613,7 +628,6 @@ struct dlm_ls { spinlock_t ls_cb_lock; struct list_head ls_cb_delay; /* save for queue_work later */ - struct timer_list ls_timer; struct task_struct *ls_recoverd_task; struct mutex ls_recoverd_active; spinlock_t ls_recover_lock; @@ -622,33 +636,33 @@ struct dlm_ls { uint64_t ls_recover_seq; struct dlm_recover *ls_recover_args; struct rw_semaphore ls_in_recovery; /* block local requests */ - struct rw_semaphore ls_recv_active; /* block dlm_recv */ + rwlock_t ls_recv_active; /* block dlm_recv */ struct list_head ls_requestqueue;/* queue remote requests */ - atomic_t ls_requestqueue_cnt; - wait_queue_head_t ls_requestqueue_wait; - struct mutex ls_requestqueue_mutex; + rwlock_t ls_requestqueue_lock; struct dlm_rcom *ls_recover_buf; int ls_recover_nodeid; /* for debugging */ - unsigned int ls_recover_dir_sent_res; /* for log info */ - unsigned int ls_recover_dir_sent_msg; /* for log info */ unsigned int ls_recover_locks_in; /* for log info */ uint64_t ls_rcom_seq; spinlock_t ls_rcom_spin; struct list_head ls_recover_list; spinlock_t ls_recover_list_lock; int ls_recover_list_count; - struct idr ls_recover_idr; - spinlock_t ls_recover_idr_lock; + struct xarray ls_recover_xa; + spinlock_t ls_recover_xa_lock; wait_queue_head_t ls_wait_general; wait_queue_head_t ls_recover_lock_wait; spinlock_t ls_clear_proc_locks; - struct list_head ls_root_list; /* root resources */ - struct rw_semaphore ls_root_sem; /* protect root_list */ + struct list_head ls_masters_list; /* root resources */ + rwlock_t ls_masters_lock; /* protect root_list */ + struct list_head ls_dir_dump_list; /* root resources */ + rwlock_t ls_dir_dump_lock; /* protect root_list */ const struct dlm_lockspace_ops *ls_ops; void *ls_ops_arg; + struct work_struct ls_free_work; + int ls_namelen; char ls_name[DLM_LOCKSPACE_LEN + 1]; }; @@ -686,23 +700,9 @@ struct dlm_ls { #define LSFL_UEVENT_WAIT 7 #define LSFL_CB_DELAY 9 #define LSFL_NODIR 10 - -/* much of this is just saving user space pointers associated with the - lock that we pass back to the user lib with an ast */ - -struct dlm_user_args { - struct dlm_user_proc *proc; /* each process that opens the lockspace - device has private data - (dlm_user_proc) on the struct file, - the process's locks point back to it*/ - struct dlm_lksb lksb; - struct dlm_lksb __user *user_lksb; - void __user *castparam; - void __user *castaddr; - void __user *bastparam; - void __user *bastaddr; - uint64_t xid; -}; +#define LSFL_RECV_MSG_BLOCKED 11 +#define LSFL_FS 12 +#define LSFL_SOFTIRQ 13 #define DLM_PROC_FLAGS_CLOSING 1 #define DLM_PROC_FLAGS_COMPAT 2 @@ -806,6 +806,8 @@ static inline void dlm_set_sbflags_val(struct dlm_lkb *lkb, uint32_t val) __DLM_SBF_MAX_BIT); } +extern struct workqueue_struct *dlm_wq; + int dlm_plock_init(void); void dlm_plock_exit(void); diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index fd752dd03896..e01d5f29f4d2 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, const struct dlm_message *ms, bool local); static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); -static void toss_rsb(struct kref *kref); +static void deactivate_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -201,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r) /* Threads cannot use the lockspace while it's being recovered */ -static inline void dlm_lock_recovery(struct dlm_ls *ls) +void dlm_lock_recovery(struct dlm_ls *ls) { down_read(&ls->ls_in_recovery); } @@ -320,11 +320,18 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) * Basic operations on rsb's and lkb's */ +static inline unsigned long rsb_toss_jiffies(void) +{ + return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ); +} + /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ static inline void hold_rsb(struct dlm_rsb *r) { + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); kref_get(&r->res_ref); } @@ -333,19 +340,45 @@ void dlm_hold_rsb(struct dlm_rsb *r) hold_rsb(r); } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ +/* TODO move this to lib/refcount.c */ +static __must_check bool +dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock) +__cond_acquires(lock) +{ + if (refcount_dec_not_one(r)) + return false; + + write_lock_bh(lock); + if (!refcount_dec_and_test(r)) { + write_unlock_bh(lock); + return false; + } + + return true; +} + +/* TODO move this to include/linux/kref.h */ +static inline int dlm_kref_put_write_lock_bh(struct kref *kref, + void (*release)(struct kref *kref), + rwlock_t *lock) +{ + if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) { + release(kref); + return 1; + } + + return 0; +} static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - uint32_t bucket = r->res_bucket; int rv; - rv = kref_put_lock(&r->res_ref, toss_rsb, - &ls->ls_rsbtbl[bucket].lock); + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, + &ls->ls_rsbtbl_lock); if (rv) - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); } void dlm_put_rsb(struct dlm_rsb *r) @@ -353,36 +386,209 @@ void dlm_put_rsb(struct dlm_rsb *r) put_rsb(r); } -static int pre_rsb_struct(struct dlm_ls *ls) +/* connected with timer_delete_sync() in dlm_ls_stop() to stop + * new timers when recovery is triggered and don't run them + * again until a resume_scan_timer() tries it again. + */ +static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) { - struct dlm_rsb *r1, *r2; - int count = 0; + if (!dlm_locking_stopped(ls)) + mod_timer(&ls->ls_scan_timer, jiffies); +} - spin_lock(&ls->ls_new_rsb_spin); - if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { - spin_unlock(&ls->ls_new_rsb_spin); - return 0; - } - spin_unlock(&ls->ls_new_rsb_spin); +/* This function tries to resume the timer callback if a rsb + * is on the scan list and no timer is pending. It might that + * the first entry is on currently executed as timer callback + * but we don't care if a timer queued up again and does + * nothing. Should be a rare case. + */ +void resume_scan_timer(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + + spin_lock_bh(&ls->ls_scan_lock); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (r && !timer_pending(&ls->ls_scan_timer)) + enable_scan_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_scan_lock); +} + +/* ls_rsbtbl_lock must be held */ + +static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) +{ + struct dlm_rsb *first; + + /* active rsbs should never be on the scan list */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); - r1 = dlm_allocate_rsb(ls); - r2 = dlm_allocate_rsb(ls); + spin_lock_bh(&ls->ls_scan_lock); + r->res_toss_time = 0; + + /* if the rsb is not queued do nothing */ + if (list_empty(&r->res_scan_list)) + goto out; - spin_lock(&ls->ls_new_rsb_spin); - if (r1) { - list_add(&r1->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; + /* get the first element before delete */ + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_del_init(&r->res_scan_list); + /* check if the first element was the rsb we deleted */ + if (first == r) { + /* try to get the new first element, if the list + * is empty now try to delete the timer, if we are + * too late we don't care. + * + * if the list isn't empty and a new first element got + * in place, set the new timer expire time. + */ + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (!first) + timer_delete(&ls->ls_scan_timer); + else + enable_scan_timer(ls, first->res_toss_time); } - if (r2) { - list_add(&r2->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; + +out: + spin_unlock_bh(&ls->ls_scan_lock); +} + +static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) +{ + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *first; + + /* A dir record for a remote master rsb should never be on the scan list. */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* An active rsb should never be on the scan list. */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + + /* An rsb should not already be on the scan list. */ + WARN_ON(!list_empty(&r->res_scan_list)); + + spin_lock_bh(&ls->ls_scan_lock); + /* set the new rsb absolute expire time in the rsb */ + r->res_toss_time = rsb_toss_jiffies(); + if (list_empty(&ls->ls_scan_list)) { + /* if the queue is empty add the element and it's + * our new expire time + */ + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + enable_scan_timer(ls, r->res_toss_time); + } else { + /* try to get the maybe new first element and then add + * to this rsb with the oldest expire time to the end + * of the queue. If the list was empty before this + * rsb expire time is our next expiration if it wasn't + * the now new first elemet is our new expiration time + */ + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + if (!first) + enable_scan_timer(ls, r->res_toss_time); + else + enable_scan_timer(ls, first->res_toss_time); } - count = ls->ls_new_rsb_count; - spin_unlock(&ls->ls_new_rsb_spin); + spin_unlock_bh(&ls->ls_scan_lock); +} - if (!count) - return -ENOMEM; - return 0; +/* if we hit contention we do in 250 ms a retry to trylock. + * if there is any other mod_timer in between we don't care + * about that it expires earlier again this is only for the + * unlikely case nothing happened in this time. + */ +#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) + +/* Called by lockspace scan_timer to free unused rsb's. */ + +void dlm_rsb_scan(struct timer_list *timer) +{ + struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *r; + int rv; + + while (1) { + /* interrupting point to leave iteration when + * recovery waits for timer_delete_sync(), recovery + * will take care to delete everything in scan list. + */ + if (dlm_locking_stopped(ls)) + break; + + rv = spin_trylock(&ls->ls_scan_lock); + if (!rv) { + /* rearm again try timer */ + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (!r) { + /* the next add_scan will enable the timer again */ + spin_unlock(&ls->ls_scan_lock); + break; + } + + /* + * If the first rsb is not yet expired, then stop because the + * list is sorted with nearest expiration first. + */ + if (time_before(jiffies, r->res_toss_time)) { + /* rearm with the next rsb to expire in the future */ + enable_scan_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_scan_lock); + break; + } + + /* in find_rsb_dir/nodir there is a reverse order of this + * lock, however this is only a trylock if we hit some + * possible contention we try it again. + */ + rv = write_trylock(&ls->ls_rsbtbl_lock); + if (!rv) { + spin_unlock(&ls->ls_scan_lock); + /* rearm again try timer */ + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + list_del(&r->res_slow_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); + + /* ls_rsbtbl_lock is not needed when calling send_remove() */ + write_unlock(&ls->ls_rsbtbl_lock); + + list_del_init(&r->res_scan_list); + spin_unlock(&ls->ls_scan_lock); + + /* An rsb that is a dir record for a remote master rsb + * cannot be removed, and should not have a timer enabled. + */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* We're the master of this rsb but we're not + * the directory record, so we need to tell the + * dir node to remove the dir record + */ + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) + send_remove(r); + + free_inactive_rsb(r); + } } /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can @@ -393,102 +599,52 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, struct dlm_rsb **r_ret) { struct dlm_rsb *r; - int count; - spin_lock(&ls->ls_new_rsb_spin); - if (list_empty(&ls->ls_new_rsb)) { - count = ls->ls_new_rsb_count; - spin_unlock(&ls->ls_new_rsb_spin); - log_debug(ls, "find_rsb retry %d %d %s", - count, dlm_config.ci_new_rsb_count, - (const char *)name); - return -EAGAIN; - } - - r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); - list_del(&r->res_hashchain); - /* Convert the empty list_head to a NULL rb_node for tree usage: */ - memset(&r->res_hashnode, 0, sizeof(struct rb_node)); - ls->ls_new_rsb_count--; - spin_unlock(&ls->ls_new_rsb_spin); + r = dlm_allocate_rsb(); + if (!r) + return -ENOMEM; r->res_ls = ls; r->res_length = len; memcpy(r->res_name, name, len); - mutex_init(&r->res_mutex); + spin_lock_init(&r->res_lock); INIT_LIST_HEAD(&r->res_lookup); INIT_LIST_HEAD(&r->res_grantqueue); INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); + INIT_LIST_HEAD(&r->res_scan_list); INIT_LIST_HEAD(&r->res_recover_list); + INIT_LIST_HEAD(&r->res_masters_list); *r_ret = r; return 0; } -static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) +int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, + struct dlm_rsb **r_ret) { - char maxname[DLM_RESNAME_MAXLEN]; + char key[DLM_RESNAME_MAXLEN] = {}; - memset(maxname, 0, DLM_RESNAME_MAXLEN); - memcpy(maxname, name, nlen); - return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); -} + memcpy(key, name, len); + *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params); + if (*r_ret) + return 0; -int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, - struct dlm_rsb **r_ret) -{ - struct rb_node *node = tree->rb_node; - struct dlm_rsb *r; - int rc; - - while (node) { - r = rb_entry(node, struct dlm_rsb, res_hashnode); - rc = rsb_cmp(r, name, len); - if (rc < 0) - node = node->rb_left; - else if (rc > 0) - node = node->rb_right; - else - goto found; - } - *r_ret = NULL; return -EBADR; - - found: - *r_ret = r; - return 0; } -static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) +static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - struct rb_node **newn = &tree->rb_node; - struct rb_node *parent = NULL; - int rc; - - while (*newn) { - struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, - res_hashnode); + int rv; - parent = *newn; - rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); - if (rc < 0) - newn = &parent->rb_left; - else if (rc > 0) - newn = &parent->rb_right; - else { - log_print("rsb_insert match"); - dlm_dump_rsb(rsb); - dlm_dump_rsb(cur); - return -EEXIST; - } - } + rv = rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); + if (!rv) + rsb_set_flag(rsb, RSB_HASHED); - rb_link_node(&rsb->res_hashnode, parent, newn); - rb_insert_color(&rsb->res_hashnode, tree); - return 0; + return rv; } /* @@ -518,7 +674,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * - * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * @@ -536,8 +692,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) */ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, - uint32_t hash, uint32_t b, - int dir_nodeid, int from_nodeid, + uint32_t hash, int dir_nodeid, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r = NULL; @@ -567,9 +722,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if - * someone sends us a request after we have removed/freed an rsb - * from our toss list. (They sent a request instead of lookup - * because they are using an rsb from their toss list.) + * someone sends us a request after we have removed/freed an rsb. + * (They sent a request instead of lookup because they are using + * an rsb taken from their scan list.) */ if (from_local || from_dir || @@ -578,51 +733,83 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } retry: - if (create) { - error = pre_rsb_struct(ls); - if (error < 0) - goto out; - } - - spin_lock(&ls->ls_rsbtbl[b].lock); - - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) - goto do_toss; + goto do_new; + + /* check if the rsb is active under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; + goto do_new; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } + kref_get(&r->res_ref); - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto out; - do_toss: - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + do_inactive: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* + * The expectation here is that the rsb will have HASHED and + * INACTIVE flags set, and that the rsb can be moved from + * inactive back to active again. However, between releasing + * the read lock and acquiring the write lock, this rsb could + * have been removed from rsbtbl, and had HASHED cleared, to + * be freed. To deal with this case, we would normally need + * to repeat dlm_search_rsb_tree while holding the write lock, + * but rcu allows us to simply check the HASHED flag, because + * the rcu read lock means the rsb will not be freed yet. + * If the HASHED flag is not set, then the rsb is being freed, + * so we add a new rsb struct. If the HASHED flag is set, + * and INACTIVE is not set, it means another thread has + * made the rsb active, as we're expecting to do here, and + * we just repeat the lookup (this will be very unlikely.) + */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); + error = -EBADR; goto do_new; + } /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread - * is using this rsb because it's on the toss list, so we can + * is using this rsb because it's inactive, so we can * look at or update res_master_nodeid without lock_rsb. */ if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node) has sent us a request */ - log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */ - log_error(ls, "find_rsb toss from_dir %d master %d", + log_error(ls, "find_rsb inactive from_dir %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); /* fix it and go on */ @@ -639,9 +826,18 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - goto out_unlock; + /* we always deactivate scan timer for the rsb, when + * we move it out of the inactive state as rsb state + * can be changed and scan timers are only for inactive + * rsbs. + */ + del_scan(ls, r); + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); + kref_init(&r->res_ref); /* ref is now used in active state */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + + goto out; do_new: @@ -650,18 +846,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, */ if (error == -EBADR && !create) - goto out_unlock; + goto out; error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = dir_nodeid; kref_init(&r->res_ref); @@ -681,7 +872,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, dlm_free_rsb(r); r = NULL; error = -ENOTBLK; - goto out_unlock; + goto out; } if (from_other) { @@ -701,9 +892,20 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } out_add: - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (!error) { + list_add(&r->res_slow_list, &ls->ls_slow_active); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); out: *r_ret = r; return error; @@ -714,8 +916,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, dlm_recover_masters). */ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, - uint32_t hash, uint32_t b, - int dir_nodeid, int from_nodeid, + uint32_t hash, int dir_nodeid, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r = NULL; @@ -724,59 +925,82 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, int error; retry: - error = pre_rsb_struct(ls); - if (error < 0) - goto out; + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto do_new; - spin_lock(&ls->ls_rsbtbl[b].lock); + /* check if the rsb is in active state under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_new; + } - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (error) - goto do_toss; + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ kref_get(&r->res_ref); - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto out; - do_toss: - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + + do_inactive: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* See comment in find_rsb_dir. */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new; + } + /* * rsb found inactive. No other thread is using this rsb because - * it's on the toss list, so we can look at or update - * res_master_nodeid without lock_rsb. + * it's inactive, so we can look at or update res_master_nodeid + * without lock_rsb. */ if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { /* our rsb is not master, and another node has sent us a request; this should never happen */ - log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d", from_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if (!recover && (r->res_master_nodeid != our_nodeid) && (dir_nodeid == our_nodeid)) { /* our rsb is not master, and we are dir; may as well fix it; this should never happen */ - log_error(ls, "find_rsb toss our %d master %d dir %d", + log_error(ls, "find_rsb inactive our %d master %d dir %d", our_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; } - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - goto out_unlock; + del_scan(ls, r); + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); + kref_init(&r->res_ref); + write_unlock_bh(&ls->ls_rsbtbl_lock); + + goto out; do_new: @@ -785,49 +1009,98 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, */ error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = dir_nodeid; r->res_master_nodeid = dir_nodeid; r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; kref_init(&r->res_ref); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (!error) { + list_add(&r->res_slow_list, &ls->ls_slow_active); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); + out: *r_ret = r; return error; } +/* + * rsb rcu usage + * + * While rcu read lock is held, the rsb cannot be freed, + * which allows a lookup optimization. + * + * Two threads are accessing the same rsb concurrently, + * the first (A) is trying to use the rsb, the second (B) + * is trying to free the rsb. + * + * thread A thread B + * (trying to use rsb) (trying to free rsb) + * + * A1. rcu read lock + * A2. rsbtbl read lock + * A3. look up rsb in rsbtbl + * A4. rsbtbl read unlock + * B1. rsbtbl write lock + * B2. look up rsb in rsbtbl + * B3. remove rsb from rsbtbl + * B4. clear rsb HASHED flag + * B5. rsbtbl write unlock + * B6. begin freeing rsb using rcu... + * + * (rsb is inactive, so try to make it active again) + * A5. read rsb HASHED flag (safe because rsb is not freed yet) + * A6. the rsb HASHED flag is not set, which it means the rsb + * is being removed from rsbtbl and freed, so don't use it. + * A7. rcu read unlock + * + * B7. ...finish freeing rsb using rcu + * A8. create a new rsb + * + * Without the rcu optimization, steps A5-8 would need to do + * an extra rsbtbl lookup: + * A5. rsbtbl write lock + * A6. look up rsb in rsbtbl, not found + * A7. rsbtbl write unlock + * A8. create a new rsb + */ + static int find_rsb(struct dlm_ls *ls, const void *name, int len, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { - uint32_t hash, b; int dir_nodeid; + uint32_t hash; + int rv; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - dir_nodeid = dlm_hash2nodeid(ls, hash); + rcu_read_lock(); if (dlm_no_directory(ls)) - return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); else - return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, - from_nodeid, flags, r_ret); + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, + from_nodeid, flags, r_ret); + rcu_read_unlock(); + return rv; } /* we have received a request and found that res_master_nodeid != our_nodeid, @@ -874,7 +1147,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, } static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, - int from_nodeid, bool toss_list, unsigned int flags, + int from_nodeid, bool is_inactive, unsigned int flags, int *r_nodeid, int *result) { int fix_master = (flags & DLM_LU_RECOVER_MASTER); @@ -887,7 +1160,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_dir_nodeid = our_nodeid; } - if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { + if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) { /* Recovery uses this function to set a new master when * the previous master failed. Setting NEW_MASTER will * force dlm_recover_masters to call recover_master on this @@ -898,9 +1171,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_nodeid = from_nodeid; rsb_set_flag(r, RSB_NEW_MASTER); - if (toss_list) { - /* I don't think we should ever find it on toss list. */ - log_error(ls, "%s fix_master on toss", __func__); + if (is_inactive) { + /* I don't think we should ever find it inactive. */ + log_error(ls, "%s fix_master inactive", __func__); dlm_dump_rsb(r); } } @@ -940,7 +1213,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no if (!from_master && !fix_master && (r->res_master_nodeid == from_nodeid)) { /* this can happen when the master sends remove, the dir node - * finds the rsb on the keep list and ignores the remove, + * finds the rsb on the active list and ignores the remove, * and the former master sends a lookup */ @@ -984,11 +1257,11 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, - int len, unsigned int flags, int *r_nodeid, int *result) +static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; - uint32_t hash, b; + uint32_t hash; int our_nodeid = dlm_our_nodeid(); int dir_nodeid, error; @@ -1002,8 +1275,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - dir_nodeid = dlm_hash2nodeid(ls, hash); if (dir_nodeid != our_nodeid) { log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", @@ -1014,160 +1285,199 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } retry: - error = pre_rsb_struct(ls); - if (error < 0) - return error; + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto not_found; - spin_lock(&ls->ls_rsbtbl[b].lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (!error) { - /* because the rsb is active, we need to lock_rsb before - * checking/changing re_master_nodeid - */ + /* check if the rsb is active under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto not_found; + } - hold_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - lock_rsb(r); + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } - __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, - flags, r_nodeid, result); + /* because the rsb is active, we need to lock_rsb before + * checking/changing re_master_nodeid + */ - /* the rsb was active */ - unlock_rsb(r); - put_rsb(r); + hold_rsb(r); + read_unlock_bh(&ls->ls_rsbtbl_lock); + lock_rsb(r); - return 0; - } + __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, + flags, r_nodeid, result); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) + /* the rsb was active */ + unlock_rsb(r); + put_rsb(r); + + return 0; + + do_inactive: + /* unlikely path - check if still part of ls_rsbtbl */ + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* see comment in find_rsb_dir */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* something as changed, very unlikely but + * try again + */ + goto retry; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); goto not_found; + } - /* because the rsb is inactive (on toss list), it's not refcounted - * and lock_rsb is not used, but is protected by the rsbtbl lock - */ + /* because the rsb is inactive, it's not refcounted and lock_rsb + is not used, but is protected by the rsbtbl lock */ __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - r->res_toss_time = jiffies; - /* the rsb was inactive (on toss list) */ - spin_unlock(&ls->ls_rsbtbl[b].lock); + /* A dir record rsb should never be on scan list. + * Except when we are the dir and master node. + * This function should only be called by the dir + * node. + */ + WARN_ON(!list_empty(&r->res_scan_list) && + r->res_master_nodeid != our_nodeid); + + write_unlock_bh(&ls->ls_rsbtbl_lock); return 0; not_found: error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - goto retry; - } - if (error) - goto out_unlock; + if (WARN_ON_ONCE(error)) + goto out; r->res_hash = hash; - r->res_bucket = b; r->res_dir_nodeid = our_nodeid; r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; - kref_init(&r->res_ref); - r->res_toss_time = jiffies; + rsb_set_flag(r, RSB_INACTIVE); - error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); - if (error) { + write_lock_bh(&ls->ls_rsbtbl_lock); + error = rsb_insert(r, &ls->ls_rsbtbl); + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry; + } else if (error) { + write_unlock_bh(&ls->ls_rsbtbl_lock); /* should never happen */ dlm_free_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); goto retry; } + list_add(&r->res_slow_list, &ls->ls_slow_inactive); + write_unlock_bh(&ls->ls_rsbtbl_lock); + if (result) *result = DLM_LU_ADD; *r_nodeid = from_nodeid; - out_unlock: - spin_unlock(&ls->ls_rsbtbl[b].lock); + out: return error; } +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) +{ + int rv; + rcu_read_lock(); + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); + rcu_read_unlock(); + return rv; +} + static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { - struct rb_node *n; struct dlm_rsb *r; - int i; - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - spin_lock(&ls->ls_rsbtbl[i].lock); - for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - if (r->res_hash == hash) - dlm_dump_rsb(r); - } - spin_unlock(&ls->ls_rsbtbl[i].lock); + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { + if (r->res_hash == hash) + dlm_dump_rsb(r); } + read_unlock_bh(&ls->ls_rsbtbl_lock); } void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) { struct dlm_rsb *r = NULL; - uint32_t hash, b; int error; - hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); - - spin_lock(&ls->ls_rsbtbl[b].lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + rcu_read_lock(); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) - goto out_dump; - - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (error) goto out; - out_dump: + dlm_dump_rsb(r); out: - spin_unlock(&ls->ls_rsbtbl[b].lock); + rcu_read_unlock(); } -static void toss_rsb(struct kref *kref) +static void deactivate_rsb(struct kref *kref) { struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; + int our_nodeid = dlm_our_nodeid(); DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); - kref_init(&r->res_ref); - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); - rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); - r->res_toss_time = jiffies; - set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags); + rsb_set_flag(r, RSB_INACTIVE); + list_move(&r->res_slow_list, &ls->ls_slow_inactive); + + /* + * When the rsb becomes unused, there are two possibilities: + * 1. Leave the inactive rsb in place (don't remove it). + * 2. Add it to the scan list to be removed. + * + * 1 is done when the rsb is acting as the dir record + * for a remotely mastered rsb. The rsb must be left + * in place as an inactive rsb to act as the dir record. + * + * 2 is done when a) the rsb is not the master and not the + * dir record, b) when the rsb is both the master and the + * dir record, c) when the rsb is master but not dir record. + * + * (If no directory is used, the rsb can always be removed.) + */ + if (dlm_no_directory(ls) || + (r->res_master_nodeid == our_nodeid || + dlm_dir_nodeid(r) != our_nodeid)) + add_scan(ls, r); + if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } -/* See comment for unhold_lkb */ - -static void unhold_rsb(struct dlm_rsb *r) +void free_inactive_rsb(struct dlm_rsb *r) { - int rv; - rv = kref_put(&r->res_ref, toss_rsb); - DLM_ASSERT(!rv, dlm_dump_rsb(r);); -} - -static void kill_rsb(struct kref *kref) -{ - struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); - - /* All work is done after the return from kref_put() so we - can release the write_lock before the remove and free. */ + WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); + + dlm_free_rsb(r); } /* Attaching/detaching lkb's from rsb's is for rsb reference counting. @@ -1188,36 +1498,34 @@ static void detach_lkb(struct dlm_lkb *lkb) } static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, - int start, int end) + unsigned long start, unsigned long end) { + struct xa_limit limit; struct dlm_lkb *lkb; int rv; - lkb = dlm_allocate_lkb(ls); + limit.max = end; + limit.min = start; + + lkb = dlm_allocate_lkb(); if (!lkb) return -ENOMEM; - lkb->lkb_last_bast_mode = -1; + lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV; + lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV; + lkb->lkb_last_cb_mode = DLM_LOCK_IV; lkb->lkb_nodeid = -1; lkb->lkb_grmode = DLM_LOCK_IV; kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); - INIT_LIST_HEAD(&lkb->lkb_cb_list); - INIT_LIST_HEAD(&lkb->lkb_callbacks); - spin_lock_init(&lkb->lkb_cb_lock); - INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); - - idr_preload(GFP_NOFS); - spin_lock(&ls->ls_lkbidr_spin); - rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); - if (rv >= 0) - lkb->lkb_id = rv; - spin_unlock(&ls->ls_lkbidr_spin); - idr_preload_end(); + + write_lock_bh(&ls->ls_lkbxa_lock); + rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC); + write_unlock_bh(&ls->ls_lkbxa_lock); if (rv < 0) { - log_error(ls, "create_lkb idr error %d", rv); + log_error(ls, "create_lkb xa error %d", rv); dlm_free_lkb(lkb); return rv; } @@ -1228,18 +1536,28 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { - return _create_lkb(ls, lkb_ret, 1, 0); + return _create_lkb(ls, lkb_ret, 1, ULONG_MAX); } static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - spin_lock(&ls->ls_lkbidr_spin); - lkb = idr_find(&ls->ls_lkbidr, lkid); - if (lkb) - kref_get(&lkb->lkb_ref); - spin_unlock(&ls->ls_lkbidr_spin); + rcu_read_lock(); + lkb = xa_load(&ls->ls_lkbxa, lkid); + if (lkb) { + /* check if lkb is still part of lkbxa under lkbxa_lock as + * the lkb_ref is tight to the lkbxa data structure, see + * __put_lkb(). + */ + read_lock_bh(&ls->ls_lkbxa_lock); + if (kref_read(&lkb->lkb_ref)) + kref_get(&lkb->lkb_ref); + else + lkb = NULL; + read_unlock_bh(&ls->ls_lkbxa_lock); + } + rcu_read_unlock(); *lkb_ret = lkb; return lkb ? 0 : -ENOENT; @@ -1263,11 +1581,11 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) uint32_t lkid = lkb->lkb_id; int rv; - rv = kref_put_lock(&lkb->lkb_ref, kill_lkb, - &ls->ls_lkbidr_spin); + rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb, + &ls->ls_lkbxa_lock); if (rv) { - idr_remove(&ls->ls_lkbidr, lkid); - spin_unlock(&ls->ls_lkbidr_spin); + xa_erase(&ls->ls_lkbxa, lkid); + write_unlock_bh(&ls->ls_lkbxa_lock); detach_lkb(lkb); @@ -1377,10 +1695,8 @@ static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) { - hold_lkb(lkb); del_lkb(r, lkb); add_lkb(r, lkb, sts); - unhold_lkb(lkb); } static int msg_reply_type(int mstype) @@ -1403,19 +1719,11 @@ static int msg_reply_type(int mstype) /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) +static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; - int error = 0; - - mutex_lock(&ls->ls_waiters_mutex); - - if (is_overlap_unlock(lkb) || - (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { - error = -EINVAL; - goto out; - } + spin_lock_bh(&ls->ls_waiters_lock); if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { switch (mstype) { case DLM_MSG_UNLOCK: @@ -1425,7 +1733,11 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); break; default: - error = -EBUSY; + /* should never happen as validate_lock_args() checks + * on lkb_wait_type and validate_unlock_args() only + * creates UNLOCK or CANCEL messages. + */ + WARN_ON_ONCE(1); goto out; } lkb->lkb_wait_count++; @@ -1447,12 +1759,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: - if (error) - log_error(ls, "addwait error %x %d flags %x %d %d %s", - lkb->lkb_id, error, dlm_iflags_val(lkb), mstype, - lkb->lkb_wait_type, lkb->lkb_resource->res_name); - mutex_unlock(&ls->ls_waiters_mutex); - return error; + spin_unlock_bh(&ls->ls_waiters_lock); } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -1551,14 +1858,18 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); error = _remove_from_waiters(lkb, mstype, NULL); - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); return error; } /* Handles situations where we might be processing a "fake" or "local" reply in - which we can't try to take waiters_mutex again. */ + * the recovery context which stops any locking activity. Only debugfs might + * change the lockspace waiters but they will held the recovery lock to ensure + * remove_from_waiters_ms() in local case will be the only user manipulating the + * lockspace waiters in recovery context. + */ static int remove_from_waiters_ms(struct dlm_lkb *lkb, const struct dlm_message *ms, bool local) @@ -1567,159 +1878,16 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, int error; if (!local) - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); + else + WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) || + !dlm_locking_stopped(ls)); error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); if (!local) - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); return error; } -static void shrink_bucket(struct dlm_ls *ls, int b) -{ - struct rb_node *n, *next; - struct dlm_rsb *r; - char *name; - int our_nodeid = dlm_our_nodeid(); - int remote_count = 0; - int need_shrink = 0; - int i, len, rv; - - memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); - - spin_lock(&ls->ls_rsbtbl[b].lock); - - if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - return; - } - - for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { - next = rb_next(n); - r = rb_entry(n, struct dlm_rsb, res_hashnode); - - /* If we're the directory record for this rsb, and - we're not the master of it, then we need to wait - for the master node to send us a dir remove for - before removing the dir record. */ - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) == our_nodeid)) { - continue; - } - - need_shrink = 1; - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - continue; - } - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid == our_nodeid) && - (dlm_dir_nodeid(r) != our_nodeid)) { - - /* We're the master of this rsb but we're not - the directory record, so we need to tell the - dir node to remove the dir record. */ - - ls->ls_remove_lens[remote_count] = r->res_length; - memcpy(ls->ls_remove_names[remote_count], r->res_name, - DLM_RESNAME_MAXLEN); - remote_count++; - - if (remote_count >= DLM_REMOVE_NAMES_MAX) - break; - continue; - } - - if (!kref_put(&r->res_ref, kill_rsb)) { - log_error(ls, "tossed rsb in use %s", r->res_name); - continue; - } - - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - dlm_free_rsb(r); - } - - if (need_shrink) - set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); - else - clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags); - spin_unlock(&ls->ls_rsbtbl[b].lock); - - /* - * While searching for rsb's to free, we found some that require - * remote removal. We leave them in place and find them again here - * so there is a very small gap between removing them from the toss - * list and sending the removal. Keeping this gap small is - * important to keep us (the master node) from being out of sync - * with the remote dir node for very long. - */ - - for (i = 0; i < remote_count; i++) { - name = ls->ls_remove_names[i]; - len = ls->ls_remove_lens[i]; - - spin_lock(&ls->ls_rsbtbl[b].lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (rv) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name not toss %s", name); - continue; - } - - if (r->res_master_nodeid != our_nodeid) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name master %d dir %d our %d %s", - r->res_master_nodeid, r->res_dir_nodeid, - our_nodeid, name); - continue; - } - - if (r->res_dir_nodeid == our_nodeid) { - /* should never happen */ - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "remove_name dir %d master %d our %d %s", - r->res_dir_nodeid, r->res_master_nodeid, - our_nodeid, name); - continue; - } - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_debug(ls, "remove_name toss_time %lu now %lu %s", - r->res_toss_time, jiffies, name); - continue; - } - - if (!kref_put(&r->res_ref, kill_rsb)) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "remove_name in use %s", name); - continue; - } - - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - send_remove(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - - dlm_free_rsb(r); - } -} - -void dlm_scan_rsbs(struct dlm_ls *ls) -{ - int i; - - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - shrink_bucket(ls, i); - if (dlm_locking_stopped(ls)) - break; - cond_resched(); - } -} - /* lkb is master or local copy */ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -2538,7 +2706,6 @@ static void process_lookup_list(struct dlm_rsb *r) list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { list_del_init(&lkb->lkb_rsb_lookup); _request_lock(r, lkb); - schedule(); } } @@ -2701,16 +2868,14 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, case -EINVAL: /* annoy the user because dlm usage is wrong */ WARN_ON(1); - log_error(ls, "%s %d %x %x %x %d %d %s", __func__, + log_error(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; default: - log_debug(ls, "%s %d %x %x %x %d %d %s", __func__, + log_debug(ls, "%s %d %x %x %x %d %d", __func__, rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, - lkb->lkb_status, lkb->lkb_wait_type, - lkb->lkb_resource->res_name); + lkb->lkb_status, lkb->lkb_wait_type); break; } @@ -2768,13 +2933,16 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) goto out; } + if (is_overlap_unlock(lkb)) + goto out; + /* cancel not allowed with another cancel/unlock in progress */ if (args->flags & DLM_LKF_CANCEL) { if (lkb->lkb_exflags & DLM_LKF_CANCEL) goto out; - if (is_overlap(lkb)) + if (is_overlap_cancel(lkb)) goto out; if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { @@ -2812,9 +2980,6 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) goto out; - if (is_overlap_unlock(lkb)) - goto out; - if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); rv = -EBUSY; @@ -3332,8 +3497,7 @@ int dlm_unlock(dlm_lockspace_t *lockspace, static int _create_message(struct dlm_ls *ls, int mb_len, int to_nodeid, int mstype, struct dlm_message **ms_ret, - struct dlm_mhandle **mh_ret, - gfp_t allocation) + struct dlm_mhandle **mh_ret) { struct dlm_message *ms; struct dlm_mhandle *mh; @@ -3343,7 +3507,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len, pass into midcomms_commit and a message buffer (mb) that we write our data into */ - mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb); + mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb); if (!mh) return -ENOBUFS; @@ -3365,8 +3529,7 @@ static int _create_message(struct dlm_ls *ls, int mb_len, static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, int to_nodeid, int mstype, struct dlm_message **ms_ret, - struct dlm_mhandle **mh_ret, - gfp_t allocation) + struct dlm_mhandle **mh_ret) { int mb_len = sizeof(struct dlm_message); @@ -3387,7 +3550,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, } return _create_message(r->res_ls, mb_len, to_nodeid, mstype, - ms_ret, mh_ret, allocation); + ms_ret, mh_ret); } /* further lowcomms enhancements or alternate implementations may make @@ -3452,11 +3615,8 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) to_nodeid = r->res_nodeid; - error = add_to_waiters(lkb, mstype, to_nodeid); - if (error) - return error; - - error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS); + add_to_waiters(lkb, mstype, to_nodeid); + error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto fail; @@ -3516,8 +3676,7 @@ static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) to_nodeid = lkb->lkb_nodeid; - error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh, - GFP_NOFS); + error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); if (error) goto out; @@ -3538,8 +3697,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) to_nodeid = lkb->lkb_nodeid; - error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh, - GFP_NOFS); + error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); if (error) goto out; @@ -3560,12 +3718,8 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) to_nodeid = dlm_dir_nodeid(r); - error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); - if (error) - return error; - - error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh, - GFP_NOFS); + add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); + error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); if (error) goto fail; @@ -3589,8 +3743,7 @@ static int send_remove(struct dlm_rsb *r) to_nodeid = dlm_dir_nodeid(r); - error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh, - GFP_ATOMIC); + error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); if (error) goto out; @@ -3611,7 +3764,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, to_nodeid = lkb->lkb_nodeid; - error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS); + error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto out; @@ -3653,8 +3806,7 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_mhandle *mh; int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); - error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh, - GFP_NOFS); + error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); if (error) goto out; @@ -4139,7 +4291,6 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) { char name[DLM_RESNAME_MAXLEN+1]; struct dlm_rsb *r; - uint32_t hash, b; int rv, len, dir_nodeid, from_nodeid; from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); @@ -4159,68 +4310,76 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - /* Look for name on rsbtbl.toss, if it's there, kill it. - If it's on rsbtbl.keep, it's being used, and we should ignore this - message. This is an expected race between the dir node sending a - request to the master node at the same time as the master node sends - a remove to the dir node. The resolution to that race is for the - dir node to ignore the remove message, and the master node to - recreate the master rsb when it gets a request from the dir node for - an rsb it doesn't have. */ + /* + * Look for inactive rsb, if it's there, free it. + * If the rsb is active, it's being used, and we should ignore this + * message. This is an expected race between the dir node sending a + * request to the master node at the same time as the master node sends + * a remove to the dir node. The resolution to that race is for the + * dir node to ignore the remove message, and the master node to + * recreate the master rsb when it gets a request from the dir node for + * an rsb it doesn't have. + */ memset(name, 0, sizeof(name)); memcpy(name, ms->m_extra, len); - hash = jhash(name, len, 0); - b = hash & (ls->ls_rsbtbl_size - 1); + rcu_read_lock(); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (rv) { + rcu_read_unlock(); + /* should not happen */ + log_error(ls, "%s from %d not found %s", __func__, + from_nodeid, name); + return; + } - spin_lock(&ls->ls_rsbtbl[b].lock); + write_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + rcu_read_unlock(); + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* should not happen */ + log_error(ls, "%s from %d got removed during removal %s", + __func__, from_nodeid, name); + return; + } + /* at this stage the rsb can only being freed here */ + rcu_read_unlock(); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); - if (rv) { - /* verify the rsb is on keep list per comment above */ - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); - if (rv) { - /* should not happen */ - log_error(ls, "receive_remove from %d not found %s", - from_nodeid, name); - spin_unlock(&ls->ls_rsbtbl[b].lock); - return; - } + if (!rsb_flag(r, RSB_INACTIVE)) { if (r->res_master_nodeid != from_nodeid) { /* should not happen */ - log_error(ls, "receive_remove keep from %d master %d", + log_error(ls, "receive_remove on active rsb from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } + /* Ignore the remove message, see race comment above. */ + log_debug(ls, "receive_remove from %d master %d first %x %s", from_nodeid, r->res_master_nodeid, r->res_first_lkid, name); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } if (r->res_master_nodeid != from_nodeid) { - log_error(ls, "receive_remove toss from %d master %d", + log_error(ls, "receive_remove inactive from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } - if (kref_put(&r->res_ref, kill_rsb)) { - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - spin_unlock(&ls->ls_rsbtbl[b].lock); - dlm_free_rsb(r); - } else { - log_error(ls, "receive_remove from %d rsb ref error", - from_nodeid); - dlm_print_rsb(r); - spin_unlock(&ls->ls_rsbtbl[b].lock); - } + list_del(&r->res_slow_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); + write_unlock_bh(&ls->ls_rsbtbl_lock); + + free_inactive_rsb(r); } static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) @@ -4407,7 +4566,6 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, if (error) goto out; - /* local reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; @@ -4446,7 +4604,6 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, if (error) goto out; - /* local reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; @@ -4498,7 +4655,6 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, if (error) goto out; - /* local reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms, local); if (error) goto out; @@ -4757,20 +4913,32 @@ static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms, static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms, int nodeid) { - if (dlm_locking_stopped(ls)) { +try_again: + read_lock_bh(&ls->ls_requestqueue_lock); + if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { /* If we were a member of this lockspace, left, and rejoined, other nodes may still be sending us messages from the lockspace generation before we left. */ if (WARN_ON_ONCE(!ls->ls_generation)) { + read_unlock_bh(&ls->ls_requestqueue_lock); log_limit(ls, "receive %d from %d ignore old gen", le32_to_cpu(ms->m_type), nodeid); return; } + read_unlock_bh(&ls->ls_requestqueue_lock); + write_lock_bh(&ls->ls_requestqueue_lock); + /* recheck because we hold writelock now */ + if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { + write_unlock_bh(&ls->ls_requestqueue_lock); + goto try_again; + } + dlm_add_requestqueue(ls, nodeid, ms); + write_unlock_bh(&ls->ls_requestqueue_lock); } else { - dlm_wait_requestqueue(ls); _receive_message(ls, ms, 0); + read_unlock_bh(&ls->ls_requestqueue_lock); } } @@ -4830,7 +4998,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid) /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to be inactive (in this ls) before transitioning to recovery mode */ - down_read(&ls->ls_recv_active); + read_lock_bh(&ls->ls_recv_active); if (hd->h_cmd == DLM_MSG) dlm_receive_message(ls, &p->message, nodeid); else if (hd->h_cmd == DLM_RCOM) @@ -4838,7 +5006,7 @@ void dlm_receive_buffer(const union dlm_packet *p, int nodeid) else log_error(ls, "invalid h_cmd %d from %d lockspace %x", hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace)); - up_read(&ls->ls_recv_active); + read_unlock_bh(&ls->ls_recv_active); dlm_put_lockspace(ls); } @@ -4847,16 +5015,19 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms_local) { if (middle_conversion(lkb)) { + log_rinfo(ls, "%s %x middle convert in progress", __func__, + lkb->lkb_id); + + /* We sent this lock to the new master. The new master will + * tell us when it's granted. We no longer need a reply, so + * use a fake reply to put the lkb into the right state. + */ hold_lkb(lkb); memset(ms_local, 0, sizeof(struct dlm_message)); ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); _receive_convert_reply(lkb, ms_local, true); - - /* Same special case as in receive_rcom_lock_args() */ - lkb->lkb_grmode = DLM_LOCK_IV; - rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); unhold_lkb(lkb); } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { @@ -4899,8 +5070,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) if (!ms_local) return; - mutex_lock(&ls->ls_waiters_mutex); - list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); @@ -4993,7 +5162,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) } schedule(); } - mutex_unlock(&ls->ls_waiters_mutex); kfree(ms_local); } @@ -5001,7 +5169,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) { struct dlm_lkb *lkb = NULL, *iter; - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) { hold_lkb(iter); @@ -5009,7 +5177,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) break; } } - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); return lkb; } @@ -5109,9 +5277,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) } /* Forcibly remove from waiters list */ - mutex_lock(&ls->ls_waiters_mutex); + spin_lock_bh(&ls->ls_waiters_lock); list_del_init(&lkb->lkb_wait_reply); - mutex_unlock(&ls->ls_waiters_mutex); + spin_unlock_bh(&ls->ls_waiters_lock); /* * The lkb is now clear of all prior waiters state and can be @@ -5144,7 +5312,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: _request_lock(r, lkb); - if (is_master(r)) + if (r->res_nodeid != -1 && is_master(r)) confirm_master(r, 0); break; case DLM_MSG_CONVERT: @@ -5236,7 +5404,7 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, /* Get rid of locks held by nodes that are gone. */ -void dlm_recover_purge(struct dlm_ls *ls) +void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list) { struct dlm_rsb *r; struct dlm_member *memb; @@ -5255,11 +5423,9 @@ void dlm_recover_purge(struct dlm_ls *ls) if (!nodes_count) return; - down_write(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { - hold_rsb(r); + list_for_each_entry(r, root_list, res_root_list) { lock_rsb(r); - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { purge_dead_list(ls, r, &r->res_grantqueue, nodeid_gone, &lkb_count); purge_dead_list(ls, r, &r->res_convertqueue, @@ -5268,25 +5434,21 @@ void dlm_recover_purge(struct dlm_ls *ls) nodeid_gone, &lkb_count); } unlock_rsb(r); - unhold_rsb(r); + cond_resched(); } - up_write(&ls->ls_root_sem); if (lkb_count) log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", lkb_count, nodes_count); } -static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) +static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) { - struct rb_node *n; struct dlm_rsb *r; - spin_lock(&ls->ls_rsbtbl[bucket].lock); - for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; if (!is_master(r)) { @@ -5294,10 +5456,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) continue; } hold_rsb(r); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return r; } - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return NULL; } @@ -5321,19 +5483,15 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) void dlm_recover_grant(struct dlm_ls *ls) { struct dlm_rsb *r; - int bucket = 0; unsigned int count = 0; unsigned int rsb_count = 0; unsigned int lkb_count = 0; while (1) { - r = find_grant_rsb(ls, bucket); - if (!r) { - if (bucket == ls->ls_rsbtbl_size - 1) - break; - bucket++; - continue; - } + r = find_grant_rsb(ls); + if (!r) + break; + rsb_count++; count = 0; lock_rsb(r); @@ -5416,10 +5574,11 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, The real granted mode of these converting locks cannot be determined until all locks have been rebuilt on the rsb (recover_conversion) */ - if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && - middle_conversion(lkb)) { - rl->rl_status = DLM_LKSTS_CONVERT; - lkb->lkb_grmode = DLM_LOCK_IV; + if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) { + /* We may need to adjust grmode depending on other granted locks. */ + log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid); rsb_set_flag(r, RSB_RECOVER_CONVERT); } @@ -5641,10 +5800,10 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } /* add this new lkb to the per-process list of locks */ - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); hold_lkb(lkb); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); do_put = false; out_put: trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false); @@ -5726,7 +5885,7 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, int found_other_mode = 0; int rv = 0; - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { if (iter->lkb_resource->res_length != namelen) continue; @@ -5743,7 +5902,7 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, *lkid = iter->lkb_id; break; } - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); if (!lkb && found_other_mode) { rv = -EAGAIN; @@ -5774,9 +5933,9 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, * for the proc locks list. */ - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); out: kfree(ua_tmp); return rv; @@ -5820,11 +5979,11 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, if (error) goto out_put; - spin_lock(&ua->proc->locks_spin); + spin_lock_bh(&ua->proc->locks_spin); /* dlm_user_add_cb() may have already taken lkb off the proc list */ if (!list_empty(&lkb->lkb_ownqueue)) list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); - spin_unlock(&ua->proc->locks_spin); + spin_unlock_bh(&ua->proc->locks_spin); out_put: trace_dlm_unlock_end(ls, lkb, flags, error); dlm_put_lkb(lkb); @@ -5935,9 +6094,9 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) int error; hold_lkb(lkb); /* reference for the ls_orphans list */ - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); set_unlock_args(0, lkb->lkb_ua, &args); @@ -5975,7 +6134,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, { struct dlm_lkb *lkb = NULL; - spin_lock(&ls->ls_clear_proc_locks); + spin_lock_bh(&ls->ls_clear_proc_locks); if (list_empty(&proc->locks)) goto out; @@ -5987,7 +6146,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, else set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); out: - spin_unlock(&ls->ls_clear_proc_locks); + spin_unlock_bh(&ls->ls_clear_proc_locks); return lkb; } @@ -6003,6 +6162,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; dlm_lock_recovery(ls); @@ -6023,7 +6183,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } - spin_lock(&ls->ls_clear_proc_locks); + spin_lock_bh(&ls->ls_clear_proc_locks); /* in-progress unlocks */ list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { @@ -6032,29 +6192,29 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - dlm_purge_lkb_callbacks(lkb); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + dlm_free_cb(cb); } - spin_unlock(&ls->ls_clear_proc_locks); + spin_unlock_bh(&ls->ls_clear_proc_locks); dlm_unlock_recovery(ls); } static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; while (1) { lkb = NULL; - spin_lock(&proc->locks_spin); + spin_lock_bh(&proc->locks_spin); if (!list_empty(&proc->locks)) { lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); list_del_init(&lkb->lkb_ownqueue); } - spin_unlock(&proc->locks_spin); + spin_unlock_bh(&proc->locks_spin); if (!lkb) break; @@ -6064,21 +6224,20 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); /* ref from proc->locks list */ } - spin_lock(&proc->locks_spin); + spin_lock_bh(&proc->locks_spin); list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { list_del_init(&lkb->lkb_ownqueue); set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); dlm_put_lkb(lkb); } - spin_unlock(&proc->locks_spin); + spin_unlock_bh(&proc->locks_spin); - spin_lock(&proc->asts_spin); - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - dlm_purge_lkb_callbacks(lkb); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + spin_lock_bh(&proc->asts_spin); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + dlm_free_cb(cb); } - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); } /* pid of 0 means purge all orphans */ @@ -6087,7 +6246,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid) { struct dlm_lkb *lkb, *safe; - mutex_lock(&ls->ls_orphans_mutex); + spin_lock_bh(&ls->ls_orphans_lock); list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { if (pid && lkb->lkb_ownpid != pid) continue; @@ -6095,7 +6254,7 @@ static void do_purge(struct dlm_ls *ls, int nodeid, int pid) list_del_init(&lkb->lkb_ownqueue); dlm_put_lkb(lkb); } - mutex_unlock(&ls->ls_orphans_mutex); + spin_unlock_bh(&ls->ls_orphans_lock); } static int send_purge(struct dlm_ls *ls, int nodeid, int pid) @@ -6105,7 +6264,7 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid) int error; error = _create_message(ls, sizeof(struct dlm_message), nodeid, - DLM_MSG_PURGE, &ms, &mh, GFP_NOFS); + DLM_MSG_PURGE, &ms, &mh); if (error) return error; ms->m_nodeid = cpu_to_le32(nodeid); @@ -6188,8 +6347,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, if (error) return error; - error = add_to_waiters(lkb, mstype, to_nodeid); + add_to_waiters(lkb, mstype, to_nodeid); dlm_put_lkb(lkb); - return error; + return 0; } diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index b54e2cbbe6e2..b23d7b854ed4 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -18,20 +18,23 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq); void dlm_receive_buffer(const union dlm_packet *p, int nodeid); int dlm_modes_compat(int mode1, int mode2); +void free_inactive_rsb(struct dlm_rsb *r); void dlm_put_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r); int dlm_put_lkb(struct dlm_lkb *lkb); -void dlm_scan_rsbs(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls); +void dlm_lock_recovery(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls); +void dlm_rsb_scan(struct timer_list *timer); +void resume_scan_timer(struct dlm_ls *ls); int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, int len, unsigned int flags, int *r_nodeid, int *result); -int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, +int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, struct dlm_rsb **r_ret); -void dlm_recover_purge(struct dlm_ls *ls); +void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list); void dlm_purge_mstcpy_locks(struct dlm_rsb *r); void dlm_recover_grant(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls); @@ -63,17 +66,19 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, static inline int is_master(struct dlm_rsb *r) { + WARN_ON_ONCE(r->res_nodeid == -1); + return !r->res_nodeid; } static inline void lock_rsb(struct dlm_rsb *r) { - mutex_lock(&r->res_mutex); + spin_lock_bh(&r->res_lock); } static inline void unlock_rsb(struct dlm_rsb *r) { - mutex_unlock(&r->res_mutex); + spin_unlock_bh(&r->res_lock); } #endif diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 0455dddb0797..1929327ffbe1 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -29,8 +29,6 @@ static int ls_count; static struct mutex ls_lock; static struct list_head lslist; static spinlock_t lslist_lock; -static struct task_struct * scand_task; - static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) { @@ -40,7 +38,7 @@ static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) if (rc) return rc; - ls = dlm_find_lockspace_local(ls->ls_local_handle); + ls = dlm_find_lockspace_local(ls); if (!ls) return -EINVAL; @@ -176,12 +174,6 @@ static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, return a->store ? a->store(ls, buf, len) : len; } -static void lockspace_kobj_release(struct kobject *k) -{ - struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); - kfree(ls); -} - static const struct sysfs_ops dlm_attr_ops = { .show = dlm_attr_show, .store = dlm_attr_store, @@ -190,7 +182,6 @@ static const struct sysfs_ops dlm_attr_ops = { static struct kobj_type dlm_ktype = { .default_groups = dlm_groups, .sysfs_ops = &dlm_attr_ops, - .release = lockspace_kobj_release, }; static struct kset *dlm_kset; @@ -247,66 +238,11 @@ void dlm_lockspace_exit(void) kset_unregister(dlm_kset); } -static struct dlm_ls *find_ls_to_scan(void) -{ - struct dlm_ls *ls; - - spin_lock(&lslist_lock); - list_for_each_entry(ls, &lslist, ls_list) { - if (time_after_eq(jiffies, ls->ls_scan_time + - dlm_config.ci_scan_secs * HZ)) { - spin_unlock(&lslist_lock); - return ls; - } - } - spin_unlock(&lslist_lock); - return NULL; -} - -static int dlm_scand(void *data) -{ - struct dlm_ls *ls; - - while (!kthread_should_stop()) { - ls = find_ls_to_scan(); - if (ls) { - if (dlm_lock_recovery_try(ls)) { - ls->ls_scan_time = jiffies; - dlm_scan_rsbs(ls); - dlm_unlock_recovery(ls); - } else { - ls->ls_scan_time += HZ; - } - continue; - } - schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); - } - return 0; -} - -static int dlm_scand_start(void) -{ - struct task_struct *p; - int error = 0; - - p = kthread_run(dlm_scand, NULL, "dlm_scand"); - if (IS_ERR(p)) - error = PTR_ERR(p); - else - scand_task = p; - return error; -} - -static void dlm_scand_stop(void) -{ - kthread_stop(scand_task); -} - struct dlm_ls *dlm_find_lockspace_global(uint32_t id) { struct dlm_ls *ls; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_global_id == id) { @@ -316,24 +252,15 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id) } ls = NULL; out: - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); return ls; } struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) { - struct dlm_ls *ls; + struct dlm_ls *ls = lockspace; - spin_lock(&lslist_lock); - list_for_each_entry(ls, &lslist, ls_list) { - if (ls->ls_local_handle == lockspace) { - atomic_inc(&ls->ls_count); - goto out; - } - } - ls = NULL; - out: - spin_unlock(&lslist_lock); + atomic_inc(&ls->ls_count); return ls; } @@ -341,7 +268,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor) { struct dlm_ls *ls; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_device.minor == minor) { atomic_inc(&ls->ls_count); @@ -350,7 +277,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor) } ls = NULL; out: - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); return ls; } @@ -365,15 +292,15 @@ static void remove_lockspace(struct dlm_ls *ls) retry: wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); if (atomic_read(&ls->ls_count) != 0) { - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); goto retry; } WARN_ON(ls->ls_create_count != 0); list_del(&ls->ls_list); - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); } static int threads_start(void) @@ -382,23 +309,48 @@ static int threads_start(void) /* Thread for sending/receiving messages for all lockspace's */ error = dlm_midcomms_start(); - if (error) { + if (error) log_print("cannot start dlm midcomms %d", error); - goto fail; - } - error = dlm_scand_start(); - if (error) { - log_print("cannot start dlm_scand thread %d", error); - goto midcomms_fail; - } + return error; +} +static int lkb_idr_free(struct dlm_lkb *lkb) +{ + if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) + dlm_free_lvb(lkb->lkb_lvbptr); + + dlm_free_lkb(lkb); return 0; +} - midcomms_fail: - dlm_midcomms_stop(); - fail: - return error; +static void rhash_free_rsb(void *ptr, void *arg) +{ + struct dlm_rsb *rsb = ptr; + + dlm_free_rsb(rsb); +} + +static void free_lockspace(struct work_struct *work) +{ + struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work); + struct dlm_lkb *lkb; + unsigned long id; + + /* + * Free all lkb's in xa + */ + xa_for_each(&ls->ls_lkbxa, id, lkb) { + lkb_idr_free(lkb); + } + xa_destroy(&ls->ls_lkbxa); + + /* + * Free all rsb's on rsbtbl + */ + rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); + + kfree(ls); } static int new_lockspace(const char *name, const char *cluster, @@ -407,9 +359,8 @@ static int new_lockspace(const char *name, const char *cluster, int *ops_result, dlm_lockspace_t **lockspace) { struct dlm_ls *ls; - int i, size, error; - int do_unreg = 0; int namelen = strlen(name); + int error; if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) return -EINVAL; @@ -448,7 +399,7 @@ static int new_lockspace(const char *name, const char *cluster, error = 0; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { WARN_ON(ls->ls_create_count <= 0); if (ls->ls_namelen != namelen) @@ -464,7 +415,7 @@ static int new_lockspace(const char *name, const char *cluster, error = 1; break; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); if (error) goto out; @@ -480,47 +431,36 @@ static int new_lockspace(const char *name, const char *cluster, atomic_set(&ls->ls_count, 0); init_waitqueue_head(&ls->ls_count_wait); ls->ls_flags = 0; - ls->ls_scan_time = jiffies; if (ops && dlm_config.ci_recover_callbacks) { ls->ls_ops = ops; ls->ls_ops_arg = ops_arg; } + if (flags & DLM_LSFL_SOFTIRQ) + set_bit(LSFL_SOFTIRQ, &ls->ls_flags); + /* ls_exflags are forced to match among nodes, and we don't * need to require all nodes to have some flags set */ - ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); + ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL | + DLM_LSFL_SOFTIRQ)); - size = READ_ONCE(dlm_config.ci_rsbtbl_size); - ls->ls_rsbtbl_size = size; + INIT_LIST_HEAD(&ls->ls_slow_inactive); + INIT_LIST_HEAD(&ls->ls_slow_active); + rwlock_init(&ls->ls_rsbtbl_lock); - ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable))); - if (!ls->ls_rsbtbl) + error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); + if (error) goto out_lsfree; - for (i = 0; i < size; i++) { - ls->ls_rsbtbl[i].keep.rb_node = NULL; - ls->ls_rsbtbl[i].toss.rb_node = NULL; - spin_lock_init(&ls->ls_rsbtbl[i].lock); - } - - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { - ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, - GFP_KERNEL); - if (!ls->ls_remove_names[i]) - goto out_rsbtbl; - } - idr_init(&ls->ls_lkbidr); - spin_lock_init(&ls->ls_lkbidr_spin); + xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); + rwlock_init(&ls->ls_lkbxa_lock); INIT_LIST_HEAD(&ls->ls_waiters); - mutex_init(&ls->ls_waiters_mutex); + spin_lock_init(&ls->ls_waiters_lock); INIT_LIST_HEAD(&ls->ls_orphans); - mutex_init(&ls->ls_orphans_mutex); - - INIT_LIST_HEAD(&ls->ls_new_rsb); - spin_lock_init(&ls->ls_new_rsb_spin); + spin_lock_init(&ls->ls_orphans_lock); INIT_LIST_HEAD(&ls->ls_nodes); INIT_LIST_HEAD(&ls->ls_nodes_gone); @@ -543,6 +483,8 @@ static int new_lockspace(const char *name, const char *cluster, spin_lock_init(&ls->ls_cb_lock); INIT_LIST_HEAD(&ls->ls_cb_delay); + INIT_WORK(&ls->ls_free_work, free_lockspace); + ls->ls_recoverd_task = NULL; mutex_init(&ls->ls_recoverd_active); spin_lock_init(&ls->ls_recover_lock); @@ -552,11 +494,9 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_recover_seq = get_random_u64(); ls->ls_recover_args = NULL; init_rwsem(&ls->ls_in_recovery); - init_rwsem(&ls->ls_recv_active); + rwlock_init(&ls->ls_recv_active); INIT_LIST_HEAD(&ls->ls_requestqueue); - atomic_set(&ls->ls_requestqueue_cnt, 0); - init_waitqueue_head(&ls->ls_requestqueue_wait); - mutex_init(&ls->ls_requestqueue_mutex); + rwlock_init(&ls->ls_requestqueue_lock); spin_lock_init(&ls->ls_clear_proc_locks); /* Due backwards compatibility with 3.1 we need to use maximum @@ -565,8 +505,10 @@ static int new_lockspace(const char *name, const char *cluster, * might send less. */ ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); - if (!ls->ls_recover_buf) - goto out_lkbidr; + if (!ls->ls_recover_buf) { + error = -ENOMEM; + goto out_lkbxa; + } ls->ls_slot = 0; ls->ls_num_slots = 0; @@ -575,25 +517,31 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_recover_list); spin_lock_init(&ls->ls_recover_list_lock); - idr_init(&ls->ls_recover_idr); - spin_lock_init(&ls->ls_recover_idr_lock); + xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); + spin_lock_init(&ls->ls_recover_xa_lock); ls->ls_recover_list_count = 0; - ls->ls_local_handle = ls; init_waitqueue_head(&ls->ls_wait_general); - INIT_LIST_HEAD(&ls->ls_root_list); - init_rwsem(&ls->ls_root_sem); + INIT_LIST_HEAD(&ls->ls_masters_list); + rwlock_init(&ls->ls_masters_lock); + INIT_LIST_HEAD(&ls->ls_dir_dump_list); + rwlock_init(&ls->ls_dir_dump_lock); + + INIT_LIST_HEAD(&ls->ls_scan_list); + spin_lock_init(&ls->ls_scan_lock); + timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE); - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); ls->ls_create_count = 1; list_add(&ls->ls_list, &lslist); - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); - if (flags & DLM_LSFL_FS) { - error = dlm_callback_start(ls); - if (error) { - log_error(ls, "can't start dlm_callback %d", error); - goto out_delist; - } + if (flags & DLM_LSFL_FS) + set_bit(LSFL_FS, &ls->ls_flags); + + error = dlm_callback_start(ls); + if (error) { + log_error(ls, "can't start dlm_callback %d", error); + goto out_delist; } init_waitqueue_head(&ls->ls_recover_lock_wait); @@ -614,9 +562,6 @@ static int new_lockspace(const char *name, const char *cluster, wait_event(ls->ls_recover_lock_wait, test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); - /* let kobject handle freeing of ls if there's an error */ - do_unreg = 1; - ls->ls_kobj.kset = dlm_kset; error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, "%s", ls->ls_name); @@ -631,7 +576,7 @@ static int new_lockspace(const char *name, const char *cluster, lockspace to start running (via sysfs) in dlm_ls_start(). */ error = do_uevent(ls, 1); - if (error) + if (error < 0) goto out_recoverd; /* wait until recovery is successful or failed */ @@ -655,22 +600,17 @@ static int new_lockspace(const char *name, const char *cluster, out_callback: dlm_callback_stop(ls); out_delist: - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_del(&ls->ls_list); - spin_unlock(&lslist_lock); - idr_destroy(&ls->ls_recover_idr); + spin_unlock_bh(&lslist_lock); + xa_destroy(&ls->ls_recover_xa); kfree(ls->ls_recover_buf); - out_lkbidr: - idr_destroy(&ls->ls_lkbidr); - out_rsbtbl: - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) - kfree(ls->ls_remove_names[i]); - vfree(ls->ls_rsbtbl); + out_lkbxa: + xa_destroy(&ls->ls_lkbxa); + rhashtable_destroy(&ls->ls_rsbtbl); out_lsfree: - if (do_unreg) - kobject_put(&ls->ls_kobj); - else - kfree(ls); + kobject_put(&ls->ls_kobj); + kfree(ls); out: module_put(THIS_MODULE); return error; @@ -697,7 +637,6 @@ static int __dlm_new_lockspace(const char *name, const char *cluster, if (error > 0) error = 0; if (!ls_count) { - dlm_scand_stop(); dlm_midcomms_shutdown(); dlm_midcomms_stop(); } @@ -721,62 +660,51 @@ int dlm_new_user_lockspace(const char *name, const char *cluster, void *ops_arg, int *ops_result, dlm_lockspace_t **lockspace) { + if (flags & DLM_LSFL_SOFTIRQ) + return -EINVAL; + return __dlm_new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, ops_result, lockspace); } -static int lkb_idr_is_local(int id, void *p, void *data) -{ - struct dlm_lkb *lkb = p; - - return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; -} - -static int lkb_idr_is_any(int id, void *p, void *data) -{ - return 1; -} - -static int lkb_idr_free(int id, void *p, void *data) -{ - struct dlm_lkb *lkb = p; - - if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) - dlm_free_lvb(lkb->lkb_lvbptr); - - dlm_free_lkb(lkb); - return 0; -} - -/* NOTE: We check the lkbidr here rather than the resource table. +/* NOTE: We check the lkbxa here rather than the resource table. This is because there may be LKBs queued as ASTs that have been unlinked from their RSBs and are pending deletion once the AST has been delivered */ static int lockspace_busy(struct dlm_ls *ls, int force) { - int rv; + struct dlm_lkb *lkb; + unsigned long id; + int rv = 0; - spin_lock(&ls->ls_lkbidr_spin); + read_lock_bh(&ls->ls_lkbxa_lock); if (force == 0) { - rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); + xa_for_each(&ls->ls_lkbxa, id, lkb) { + rv = 1; + break; + } } else if (force == 1) { - rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); + xa_for_each(&ls->ls_lkbxa, id, lkb) { + if (lkb->lkb_nodeid == 0 && + lkb->lkb_grmode != DLM_LOCK_IV) { + rv = 1; + break; + } + } } else { rv = 0; } - spin_unlock(&ls->ls_lkbidr_spin); + read_unlock_bh(&ls->ls_lkbxa_lock); return rv; } static int release_lockspace(struct dlm_ls *ls, int force) { - struct dlm_rsb *rsb; - struct rb_node *n; - int i, busy, rv; + int busy, rv; busy = lockspace_busy(ls, force); - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); if (ls->ls_create_count == 1) { if (busy) { rv = -EBUSY; @@ -790,7 +718,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) } else { rv = -EINVAL; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); if (rv) { log_debug(ls, "release_lockspace no remove %d", rv); @@ -807,8 +735,13 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_recoverd_stop(ls); + /* clear the LSFL_RUNNING flag to fast up + * time_shutdown_sync(), we don't care anymore + */ + clear_bit(LSFL_RUNNING, &ls->ls_flags); + timer_shutdown_sync(&ls->ls_scan_timer); + if (ls_count == 1) { - dlm_scand_stop(); dlm_clear_members(ls); dlm_midcomms_shutdown(); } @@ -819,45 +752,10 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_delete_debug_file(ls); - idr_destroy(&ls->ls_recover_idr); - kfree(ls->ls_recover_buf); - - /* - * Free all lkb's in idr - */ - - idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); - idr_destroy(&ls->ls_lkbidr); - - /* - * Free all rsb's on rsbtbl[] lists - */ - - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) { - rsb = rb_entry(n, struct dlm_rsb, res_hashnode); - rb_erase(n, &ls->ls_rsbtbl[i].keep); - dlm_free_rsb(rsb); - } - - while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) { - rsb = rb_entry(n, struct dlm_rsb, res_hashnode); - rb_erase(n, &ls->ls_rsbtbl[i].toss); - dlm_free_rsb(rsb); - } - } - - vfree(ls->ls_rsbtbl); - - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) - kfree(ls->ls_remove_names[i]); + kobject_put(&ls->ls_kobj); - while (!list_empty(&ls->ls_new_rsb)) { - rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, - res_hashchain); - list_del(&rsb->res_hashchain); - dlm_free_rsb(rsb); - } + xa_destroy(&ls->ls_recover_xa); + kfree(ls->ls_recover_buf); /* * Free structures on any other lists @@ -868,10 +766,11 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_clear_members(ls); dlm_clear_members_gone(ls); kfree(ls->ls_node_array); - log_rinfo(ls, "release_lockspace final free"); - kobject_put(&ls->ls_kobj); - /* The ls structure will be freed when the kobject is done with */ + log_rinfo(ls, "%s final free", __func__); + + /* delayed free of data structures see free_lockspace() */ + queue_work(dlm_wq, &ls->ls_free_work); module_put(THIS_MODULE); return 0; } @@ -918,20 +817,19 @@ void dlm_stop_lockspaces(void) restart: count = 0; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { count++; continue; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); log_error(ls, "no userland control daemon, stopping lockspace"); dlm_ls_stop(ls); goto restart; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); if (count) log_print("dlm user daemon left %d lockspaces", count); } - diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 6296c62c10fa..e4373bce1bc2 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -160,9 +160,8 @@ struct dlm_proto_ops { bool try_new_addr; const char *name; int proto; + int how; - int (*connect)(struct connection *con, struct socket *sock, - struct sockaddr *addr, int addr_len); void (*sockopts)(struct socket *sock); int (*bind)(struct socket *sock); int (*listen_validate)(void); @@ -204,6 +203,7 @@ static void process_dlm_messages(struct work_struct *work); static DECLARE_WORK(process_work, process_dlm_messages); static DEFINE_SPINLOCK(processqueue_lock); static bool process_dlm_messages_pending; +static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq); static atomic_t processqueue_count; static LIST_HEAD(processqueue); @@ -248,7 +248,7 @@ struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) struct kmem_cache *dlm_lowcomms_msg_cache_create(void) { - return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); + return KMEM_CACHE(dlm_msg, 0); } /* need to held writequeue_lock */ @@ -460,10 +460,11 @@ static bool dlm_lowcomms_con_has_addr(const struct connection *con, return false; } -int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr) { struct connection *con; - bool ret, idx; + bool ret; + int idx; idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, GFP_NOFS); @@ -533,7 +534,7 @@ static void lowcomms_state_change(struct sock *sk) /* SCTP layer is not calling sk_data_ready when the connection * is done, so we catch the signal through here. */ - if (sk->sk_shutdown == RCV_SHUTDOWN) + if (sk->sk_shutdown & RCV_SHUTDOWN) lowcomms_data_ready(sk); } @@ -661,18 +662,18 @@ static void add_sock(struct socket *sock, struct connection *con) /* Add the port number to an IPv6 or 4 sockaddr and return the address length */ -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, +static void make_sockaddr(struct sockaddr_storage *saddr, __be16 port, int *addr_len) { saddr->ss_family = dlm_local_addr[0].ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); + in4_addr->sin_port = port; *addr_len = sizeof(struct sockaddr_in); memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); } else { struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); + in6_addr->sin6_port = port; *addr_len = sizeof(struct sockaddr_in6); } memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); @@ -810,7 +811,7 @@ static void shutdown_connection(struct connection *con, bool and_other) return; } - ret = kernel_sock_shutdown(con->sock, SHUT_WR); + ret = kernel_sock_shutdown(con->sock, dlm_proto_ops->how); up_read(&con->sock_lock); if (ret) { log_print("Connection %p failed to shutdown: %d will force close", @@ -857,46 +858,42 @@ static void free_processqueue_entry(struct processqueue_entry *pentry) kfree(pentry); } -struct dlm_processed_nodes { - int nodeid; - - struct list_head list; -}; - static void process_dlm_messages(struct work_struct *work) { struct processqueue_entry *pentry; - spin_lock(&processqueue_lock); + spin_lock_bh(&processqueue_lock); pentry = list_first_entry_or_null(&processqueue, struct processqueue_entry, list); if (WARN_ON_ONCE(!pentry)) { process_dlm_messages_pending = false; - spin_unlock(&processqueue_lock); + spin_unlock_bh(&processqueue_lock); return; } list_del(&pentry->list); - atomic_dec(&processqueue_count); - spin_unlock(&processqueue_lock); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); + spin_unlock_bh(&processqueue_lock); for (;;) { dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, pentry->buflen); free_processqueue_entry(pentry); - spin_lock(&processqueue_lock); + spin_lock_bh(&processqueue_lock); pentry = list_first_entry_or_null(&processqueue, struct processqueue_entry, list); if (!pentry) { process_dlm_messages_pending = false; - spin_unlock(&processqueue_lock); + spin_unlock_bh(&processqueue_lock); break; } list_del(&pentry->list); - atomic_dec(&processqueue_count); - spin_unlock(&processqueue_lock); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); + spin_unlock_bh(&processqueue_lock); } } @@ -966,14 +963,14 @@ again: memmove(con->rx_leftover_buf, pentry->buf + ret, con->rx_leftover); - spin_lock(&processqueue_lock); + spin_lock_bh(&processqueue_lock); ret = atomic_inc_return(&processqueue_count); list_add_tail(&pentry->list, &processqueue); if (!process_dlm_messages_pending) { process_dlm_messages_pending = true; queue_work(process_workqueue, &process_work); } - spin_unlock(&processqueue_lock); + spin_unlock_bh(&processqueue_lock); if (ret > DLM_MAX_PROCESS_BUFFERS) return DLM_IO_FLUSH; @@ -1126,7 +1123,7 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed) /* * sctp_bind_addrs - bind a SCTP socket to all our addresses */ -static int sctp_bind_addrs(struct socket *sock, uint16_t port) +static int sctp_bind_addrs(struct socket *sock, __be16 port) { struct sockaddr_storage localaddr; struct sockaddr *addr = (struct sockaddr *)&localaddr; @@ -1229,14 +1226,13 @@ out: }; static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, - gfp_t allocation, char **ppc, - void (*cb)(void *data), + char **ppc, void (*cb)(void *data), void *data) { struct writequeue_entry *e; struct dlm_msg *msg; - msg = dlm_allocate_msg(allocation); + msg = dlm_allocate_msg(); if (!msg) return NULL; @@ -1261,9 +1257,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, * dlm_lowcomms_commit_msg which is a must call if success */ #ifndef __CHECKER__ -struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, - char **ppc, void (*cb)(void *data), - void *data) +struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc, + void (*cb)(void *data), void *data) { struct connection *con; struct dlm_msg *msg; @@ -1284,7 +1279,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, return NULL; } - msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); + msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data); if (!msg) { srcu_read_unlock(&connections_srcu, idx); return NULL; @@ -1348,8 +1343,8 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg) if (msg->retransmit) return 1; - msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, - GFP_ATOMIC, &ppc, NULL, NULL); + msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, + NULL, NULL); if (!msg_resend) return -ENOMEM; @@ -1513,7 +1508,20 @@ static void process_recv_sockets(struct work_struct *work) /* CF_RECV_PENDING cleared */ break; case DLM_IO_FLUSH: - flush_workqueue(process_workqueue); + /* we can't flush the process_workqueue here because a + * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non + * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead + * we have a waitqueue to wait until all messages are + * processed. + * + * This handling is only necessary to backoff the sender and + * not queue all messages from the socket layer into DLM + * processqueue. When DLM is capable to parse multiple messages + * on an e.g. per socket basis this handling can might be + * removed. Especially in a message burst we are too slow to + * process messages and the queue will fill up memory. + */ + wait_event(processqueue_wq, !atomic_read(&processqueue_count)); fallthrough; case DLM_IO_RESCHED: cond_resched(); @@ -1591,8 +1599,7 @@ static int dlm_connect(struct connection *con) log_print_ratelimited("connecting to %d", con->nodeid); make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); - result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, - addr_len); + result = kernel_connect(sock, (struct sockaddr *)&addr, addr_len, 0); switch (result) { case -EINPROGRESS: /* not an error */ @@ -1626,13 +1633,6 @@ static void process_send_sockets(struct work_struct *work) switch (ret) { case 0: break; - case -EINPROGRESS: - /* avoid spamming resched on connection - * we might can switch to a state_change - * event based mechanism if established - */ - msleep(100); - break; default: /* CF_SEND_PENDING not cleared */ up_write(&con->sock_lock); @@ -1703,11 +1703,7 @@ static int work_start(void) return -ENOMEM; } - /* ordered dlm message process queue, - * should be converted to a tasklet - */ - process_workqueue = alloc_ordered_workqueue("dlm_process", - WQ_HIGHPRI | WQ_MEM_RECLAIM); + process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0); if (!process_workqueue) { log_print("can't start dlm_process"); destroy_workqueue(io_workqueue); @@ -1827,18 +1823,12 @@ static int dlm_tcp_bind(struct socket *sock) return 0; } -static int dlm_tcp_connect(struct connection *con, struct socket *sock, - struct sockaddr *addr, int addr_len) -{ - return kernel_connect(sock, addr, addr_len, O_NONBLOCK); -} - static int dlm_tcp_listen_validate(void) { /* We don't support multi-homed hosts */ if (dlm_local_count > 1) { - log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); - return -EINVAL; + log_print("Detect multi-homed hosts but use only the first IP address."); + log_print("Try SCTP, if you want to enable multi-link."); } return 0; @@ -1869,7 +1859,7 @@ static int dlm_tcp_listen_bind(struct socket *sock) static const struct dlm_proto_ops dlm_tcp_ops = { .name = "TCP", .proto = IPPROTO_TCP, - .connect = dlm_tcp_connect, + .how = SHUT_WR, .sockopts = dlm_tcp_sockopts, .bind = dlm_tcp_bind, .listen_validate = dlm_tcp_listen_validate, @@ -1882,22 +1872,6 @@ static int dlm_sctp_bind(struct socket *sock) return sctp_bind_addrs(sock, 0); } -static int dlm_sctp_connect(struct connection *con, struct socket *sock, - struct sockaddr *addr, int addr_len) -{ - int ret; - - /* - * Make kernel_connect() function return in specified time, - * since O_NONBLOCK argument in connect() function does not work here, - * then, we should restore the default value of this attribute. - */ - sock_set_sndtimeo(sock->sk, 5); - ret = kernel_connect(sock, addr, addr_len, 0); - sock_set_sndtimeo(sock->sk, 0); - return ret; -} - static int dlm_sctp_listen_validate(void) { if (!IS_ENABLED(CONFIG_IP_SCTP)) { @@ -1924,8 +1898,8 @@ static void dlm_sctp_sockopts(struct socket *sock) static const struct dlm_proto_ops dlm_sctp_ops = { .name = "SCTP", .proto = IPPROTO_SCTP, + .how = SHUT_RDWR, .try_new_addr = true, - .connect = dlm_sctp_connect, .sockopts = dlm_sctp_sockopts, .bind = dlm_sctp_bind, .listen_validate = dlm_sctp_listen_validate, diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index 3e8dca66183b..fd0df604eb93 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -39,15 +39,14 @@ void dlm_lowcomms_stop(void); void dlm_lowcomms_init(void); void dlm_lowcomms_exit(void); int dlm_lowcomms_close(int nodeid); -struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, - char **ppc, void (*cb)(void *data), - void *data); +struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc, + void (*cb)(void *data), void *data); void dlm_lowcomms_commit_msg(struct dlm_msg *msg); void dlm_lowcomms_put_msg(struct dlm_msg *msg); int dlm_lowcomms_resend_msg(struct dlm_msg *msg); int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); -int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr); void dlm_midcomms_receive_done(int nodeid); struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void); struct kmem_cache *dlm_lowcomms_msg_cache_create(void); diff --git a/fs/dlm/main.c b/fs/dlm/main.c index 6ca28299c9db..4887c8a05318 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -22,6 +22,8 @@ #define CREATE_TRACE_POINTS #include <trace/events/dlm.h> +struct workqueue_struct *dlm_wq; + static int __init init_dlm(void) { int error; @@ -50,10 +52,18 @@ static int __init init_dlm(void) if (error) goto out_user; + dlm_wq = alloc_workqueue("dlm_wq", 0, 0); + if (!dlm_wq) { + error = -ENOMEM; + goto out_plock; + } + printk("DLM installed\n"); return 0; + out_plock: + dlm_plock_exit(); out_user: dlm_user_exit(); out_debug: @@ -70,6 +80,8 @@ static int __init init_dlm(void) static void __exit exit_dlm(void) { + /* be sure every pending work e.g. freeing is done */ + destroy_workqueue(dlm_wq); dlm_plock_exit(); dlm_user_exit(); dlm_config_exit(); diff --git a/fs/dlm/member.c b/fs/dlm/member.c index be7909ead71b..b0864c93230f 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -366,6 +366,8 @@ int dlm_is_member(struct dlm_ls *ls, int nodeid) int dlm_is_removed(struct dlm_ls *ls, int nodeid) { + WARN_ON_ONCE(!nodeid || nodeid == -1); + if (find_memb(&ls->ls_nodes_gone, nodeid)) return 1; return 0; @@ -491,7 +493,7 @@ static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb) we consider the node to have failed (versus being removed due to dlm_release_lockspace) */ - error = dlm_comm_seq(memb->nodeid, &seq); + error = dlm_comm_seq(memb->nodeid, &seq, false); if (!error && seq == memb->comm_seq) return; @@ -630,7 +632,7 @@ int dlm_ls_stop(struct dlm_ls *ls) * message to the requestqueue without races. */ - down_write(&ls->ls_recv_active); + write_lock_bh(&ls->ls_recv_active); /* * Abort any recovery that's in progress (see RECOVER_STOP, @@ -638,18 +640,25 @@ int dlm_ls_stop(struct dlm_ls *ls) * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). */ - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); + if (new) + timer_delete_sync(&ls->ls_scan_timer); ls->ls_recover_seq++; - spin_unlock(&ls->ls_recover_lock); + + /* activate requestqueue and stop processing */ + write_lock_bh(&ls->ls_requestqueue_lock); + set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags); + write_unlock_bh(&ls->ls_requestqueue_lock); + spin_unlock_bh(&ls->ls_recover_lock); /* * Let dlm_recv run again, now any normal messages will be saved on the * requestqueue for later. */ - up_write(&ls->ls_recv_active); + write_unlock_bh(&ls->ls_recv_active); /* * This in_recovery lock does two things: @@ -674,13 +683,13 @@ int dlm_ls_stop(struct dlm_ls *ls) dlm_recoverd_suspend(ls); - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); kfree(ls->ls_slots); ls->ls_slots = NULL; ls->ls_num_slots = 0; ls->ls_slots_size = 0; ls->ls_recover_status = 0; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); dlm_recoverd_resume(ls); @@ -714,12 +723,12 @@ int dlm_ls_start(struct dlm_ls *ls) if (error < 0) goto fail_rv; - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); /* the lockspace needs to be stopped before it can be started */ if (!dlm_locking_stopped(ls)) { - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); log_error(ls, "start ignored: lockspace running"); error = -EINVAL; goto fail; @@ -730,7 +739,7 @@ int dlm_ls_start(struct dlm_ls *ls) rv->seq = ++ls->ls_recover_seq; rv_old = ls->ls_recover_args; ls->ls_recover_args = rv; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); if (rv_old) { log_error(ls, "unused recovery %llx %d", diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 64f212a066cf..5c35cc67aca4 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -72,6 +72,8 @@ out: void dlm_memory_exit(void) { + rcu_barrier(); + kmem_cache_destroy(writequeue_cache); kmem_cache_destroy(mhandle_cache); kmem_cache_destroy(msg_cache); @@ -82,10 +84,7 @@ void dlm_memory_exit(void) char *dlm_allocate_lvb(struct dlm_ls *ls) { - char *p; - - p = kzalloc(ls->ls_lvblen, GFP_NOFS); - return p; + return kzalloc(ls->ls_lvblen, GFP_ATOMIC); } void dlm_free_lvb(char *p) @@ -93,31 +92,33 @@ void dlm_free_lvb(char *p) kfree(p); } -struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls) +struct dlm_rsb *dlm_allocate_rsb(void) { - struct dlm_rsb *r; - - r = kmem_cache_zalloc(rsb_cache, GFP_NOFS); - return r; + return kmem_cache_zalloc(rsb_cache, GFP_ATOMIC); } -void dlm_free_rsb(struct dlm_rsb *r) +static void __free_rsb_rcu(struct rcu_head *rcu) { + struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu); if (r->res_lvbptr) dlm_free_lvb(r->res_lvbptr); kmem_cache_free(rsb_cache, r); } -struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls) +void dlm_free_rsb(struct dlm_rsb *r) { - struct dlm_lkb *lkb; + call_rcu(&r->rcu, __free_rsb_rcu); +} - lkb = kmem_cache_zalloc(lkb_cache, GFP_NOFS); - return lkb; +struct dlm_lkb *dlm_allocate_lkb(void) +{ + return kmem_cache_zalloc(lkb_cache, GFP_ATOMIC); } -void dlm_free_lkb(struct dlm_lkb *lkb) +static void __free_lkb_rcu(struct rcu_head *rcu) { + struct dlm_lkb *lkb = container_of(rcu, struct dlm_lkb, rcu); + if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { struct dlm_user_args *ua; ua = lkb->lkb_ua; @@ -127,16 +128,17 @@ void dlm_free_lkb(struct dlm_lkb *lkb) } } - /* drop references if they are set */ - dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL); - dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL); - kmem_cache_free(lkb_cache, lkb); } -struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation) +void dlm_free_lkb(struct dlm_lkb *lkb) +{ + call_rcu(&lkb->rcu, __free_lkb_rcu); +} + +struct dlm_mhandle *dlm_allocate_mhandle(void) { - return kmem_cache_alloc(mhandle_cache, allocation); + return kmem_cache_alloc(mhandle_cache, GFP_ATOMIC); } void dlm_free_mhandle(struct dlm_mhandle *mhandle) @@ -154,9 +156,9 @@ void dlm_free_writequeue(struct writequeue_entry *writequeue) kmem_cache_free(writequeue_cache, writequeue); } -struct dlm_msg *dlm_allocate_msg(gfp_t allocation) +struct dlm_msg *dlm_allocate_msg(void) { - return kmem_cache_alloc(msg_cache, allocation); + return kmem_cache_alloc(msg_cache, GFP_ATOMIC); } void dlm_free_msg(struct dlm_msg *msg) diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h index 6b29563d24f7..551b6b788489 100644 --- a/fs/dlm/memory.h +++ b/fs/dlm/memory.h @@ -14,17 +14,17 @@ int dlm_memory_init(void); void dlm_memory_exit(void); -struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls); +struct dlm_rsb *dlm_allocate_rsb(void); void dlm_free_rsb(struct dlm_rsb *r); -struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls); +struct dlm_lkb *dlm_allocate_lkb(void); void dlm_free_lkb(struct dlm_lkb *l); char *dlm_allocate_lvb(struct dlm_ls *ls); void dlm_free_lvb(char *l); -struct dlm_mhandle *dlm_allocate_mhandle(gfp_t allocation); +struct dlm_mhandle *dlm_allocate_mhandle(void); void dlm_free_mhandle(struct dlm_mhandle *mhandle); struct writequeue_entry *dlm_allocate_writequeue(void); void dlm_free_writequeue(struct writequeue_entry *writequeue); -struct dlm_msg *dlm_allocate_msg(gfp_t allocation); +struct dlm_msg *dlm_allocate_msg(void); void dlm_free_msg(struct dlm_msg *msg); struct dlm_callback *dlm_allocate_cb(void); void dlm_free_cb(struct dlm_callback *cb); diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 2247ebb61be1..2c101bbe261a 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -226,8 +226,7 @@ static DEFINE_MUTEX(close_lock); struct kmem_cache *dlm_midcomms_cache_create(void) { - return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle), - 0, 0, NULL); + return KMEM_CACHE(dlm_mhandle, 0); } static inline const char *dlm_state_str(int state) @@ -335,12 +334,12 @@ static struct midcomms_node *nodeid2node(int nodeid) return __find_node(nodeid, nodeid_hash(nodeid)); } -int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr) { int ret, idx, r = nodeid_hash(nodeid); struct midcomms_node *node; - ret = dlm_lowcomms_addr(nodeid, addr, len); + ret = dlm_lowcomms_addr(nodeid, addr); if (ret) return ret; @@ -365,9 +364,9 @@ int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) node->users = 0; midcomms_node_reset(node); - spin_lock(&nodes_lock); + spin_lock_bh(&nodes_lock); hlist_add_head_rcu(&node->hlist, &node_hash[r]); - spin_unlock(&nodes_lock); + spin_unlock_bh(&nodes_lock); node->debugfs = dlm_create_debug_comms_file(nodeid, node); return 0; @@ -380,8 +379,7 @@ static int dlm_send_ack(int nodeid, uint32_t seq) struct dlm_msg *msg; char *ppc; - msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_ATOMIC, &ppc, - NULL, NULL); + msg = dlm_lowcomms_new_msg(nodeid, mb_len, &ppc, NULL, NULL); if (!msg) return -ENOMEM; @@ -429,7 +427,7 @@ static int dlm_send_fin(struct midcomms_node *node, struct dlm_mhandle *mh; char *ppc; - mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc); + mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, &ppc); if (!mh) return -ENOMEM; @@ -479,7 +477,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq) static void dlm_pas_fin_ack_rcv(struct midcomms_node *node) { - spin_lock(&node->state_lock); + spin_lock_bh(&node->state_lock); pr_debug("receive passive fin ack from node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -493,13 +491,13 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node) wake_up(&node->shutdown_wait); break; default: - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); log_print("%s: unexpected state: %d", __func__, node->state); WARN_ON_ONCE(1); return; } - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); } static void dlm_receive_buffer_3_2_trace(uint32_t seq, @@ -536,7 +534,7 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p, if (is_expected_seq) { switch (p->header.h_cmd) { case DLM_FIN: - spin_lock(&node->state_lock); + spin_lock_bh(&node->state_lock); pr_debug("receive fin msg from node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -577,13 +575,13 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p, /* probably remove_member caught it, do nothing */ break; default: - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); log_print("%s: unexpected state: %d", __func__, node->state); WARN_ON_ONCE(1); return; } - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); break; default: WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); @@ -977,13 +975,13 @@ static void midcomms_new_msg_cb(void *data) } static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, - int len, gfp_t allocation, char **ppc) + int len, char **ppc) { struct dlm_opts *opts; struct dlm_msg *msg; msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN, - allocation, ppc, midcomms_new_msg_cb, mh); + ppc, midcomms_new_msg_cb, mh); if (!msg) return NULL; @@ -1002,8 +1000,7 @@ static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int node * dlm_midcomms_commit_mhandle which is a must call if success */ #ifndef __CHECKER__ -struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, - gfp_t allocation, char **ppc) +struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc) { struct midcomms_node *node; struct dlm_mhandle *mh; @@ -1018,7 +1015,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, /* this is a bug, however we going on and hope it will be resolved */ WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags)); - mh = dlm_allocate_mhandle(allocation); + mh = dlm_allocate_mhandle(); if (!mh) goto err; @@ -1029,8 +1026,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, switch (node->version) { case DLM_VERSION_3_1: - msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc, - NULL, NULL); + msg = dlm_lowcomms_new_msg(nodeid, len, ppc, NULL, NULL); if (!msg) { dlm_free_mhandle(mh); goto err; @@ -1041,8 +1037,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, /* send ack back if necessary */ dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD); - msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation, - ppc); + msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, ppc); if (!msg) { dlm_free_mhandle(mh); goto err; @@ -1187,7 +1182,7 @@ void dlm_midcomms_exit(void) static void dlm_act_fin_ack_rcv(struct midcomms_node *node) { - spin_lock(&node->state_lock); + spin_lock_bh(&node->state_lock); pr_debug("receive active fin ack from node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -1207,13 +1202,13 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node) wake_up(&node->shutdown_wait); break; default: - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); log_print("%s: unexpected state: %d", __func__, node->state); WARN_ON_ONCE(1); return; } - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); } void dlm_midcomms_add_member(int nodeid) @@ -1228,7 +1223,7 @@ void dlm_midcomms_add_member(int nodeid) return; } - spin_lock(&node->state_lock); + spin_lock_bh(&node->state_lock); if (!node->users) { pr_debug("receive add member from node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -1256,7 +1251,7 @@ void dlm_midcomms_add_member(int nodeid) node->users++; pr_debug("node %d users inc count %d\n", nodeid, node->users); - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); srcu_read_unlock(&nodes_srcu, idx); } @@ -1274,13 +1269,13 @@ void dlm_midcomms_remove_member(int nodeid) return; } - spin_lock(&node->state_lock); + spin_lock_bh(&node->state_lock); /* case of dlm_midcomms_addr() created node but * was not added before because dlm_midcomms_close() * removed the node */ if (!node->users) { - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1318,7 +1313,7 @@ void dlm_midcomms_remove_member(int nodeid) break; } } - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); srcu_read_unlock(&nodes_srcu, idx); } @@ -1356,7 +1351,7 @@ static void midcomms_shutdown(struct midcomms_node *node) return; } - spin_lock(&node->state_lock); + spin_lock_bh(&node->state_lock); pr_debug("receive active shutdown for node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); switch (node->state) { @@ -1375,7 +1370,7 @@ static void midcomms_shutdown(struct midcomms_node *node) */ break; } - spin_unlock(&node->state_lock); + spin_unlock_bh(&node->state_lock); if (DLM_DEBUG_FENCE_TERMINATION) msleep(5000); @@ -1446,9 +1441,9 @@ int dlm_midcomms_close(int nodeid) ret = dlm_lowcomms_close(nodeid); dlm_delete_debug_comms_file(node->debugfs); - spin_lock(&nodes_lock); + spin_lock_bh(&nodes_lock); hlist_del_rcu(&node->hlist); - spin_unlock(&nodes_lock); + spin_unlock_bh(&nodes_lock); srcu_read_unlock(&nodes_srcu, idx); /* wait that all readers left until flush send queue */ @@ -1502,8 +1497,8 @@ int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf, rd.node = node; rd.buf = buf; - msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS, - &msgbuf, midcomms_new_rawmsg_cb, &rd); + msg = dlm_lowcomms_new_msg(node->nodeid, buflen, &msgbuf, + midcomms_new_rawmsg_cb, &rd); if (!msg) return -ENOMEM; diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h index e7246fb3ef57..7fad1d170bba 100644 --- a/fs/dlm/midcomms.h +++ b/fs/dlm/midcomms.h @@ -16,11 +16,10 @@ struct midcomms_node; int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len); int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen); -struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, - gfp_t allocation, char **ppc); +struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc); void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name, int namelen); -int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr); void dlm_midcomms_version_wait(void); int dlm_midcomms_close(int nodeid); int dlm_midcomms_start(void); diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 3b734aed26b5..be1a71a6303a 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -55,7 +55,7 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, struct dlm_mhandle *mh; char *mb; - mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb); + mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb); if (!mh) { log_print("%s to %d type %d len %d ENOBUFS", __func__, to_nodeid, type, len); @@ -75,8 +75,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, struct dlm_msg *msg; char *mb; - msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb, - NULL, NULL); + msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL); if (!msg) { log_print("create_rcom to %d type %d len %d ENOBUFS", to_nodeid, type, len); @@ -144,18 +143,18 @@ static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq) { - spin_lock(&ls->ls_rcom_spin); + spin_lock_bh(&ls->ls_rcom_spin); *new_seq = cpu_to_le64(++ls->ls_rcom_seq); set_bit(LSFL_RCOM_WAIT, &ls->ls_flags); - spin_unlock(&ls->ls_rcom_spin); + spin_unlock_bh(&ls->ls_rcom_spin); } static void disallow_sync_reply(struct dlm_ls *ls) { - spin_lock(&ls->ls_rcom_spin); + spin_lock_bh(&ls->ls_rcom_spin); clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); clear_bit(LSFL_RCOM_READY, &ls->ls_flags); - spin_unlock(&ls->ls_rcom_spin); + spin_unlock_bh(&ls->ls_rcom_spin); } /* @@ -246,10 +245,10 @@ static void receive_rcom_status(struct dlm_ls *ls, goto do_create; } - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); status = ls->ls_recover_status; num_slots = ls->ls_num_slots; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); len += num_slots * sizeof(struct rcom_slot); do_create: @@ -267,9 +266,9 @@ static void receive_rcom_status(struct dlm_ls *ls, if (!num_slots) goto do_send; - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); if (ls->ls_num_slots != num_slots) { - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); log_debug(ls, "receive_rcom_status num_slots %d to %d", num_slots, ls->ls_num_slots); rc->rc_result = 0; @@ -278,7 +277,7 @@ static void receive_rcom_status(struct dlm_ls *ls, } dlm_slots_copy_out(ls, rc); - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); do_send: send_rcom_stateless(msg, rc); @@ -286,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls, static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in) { - spin_lock(&ls->ls_rcom_spin); + spin_lock_bh(&ls->ls_rcom_spin); if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) { log_debug(ls, "reject reply %d from %d seq %llx expect %llx", @@ -302,7 +301,7 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in) clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); wake_up(&ls->ls_wait_general); out: - spin_unlock(&ls->ls_rcom_spin); + spin_unlock_bh(&ls->ls_rcom_spin); } int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, @@ -510,7 +509,7 @@ int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in) char *mb; int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config); - mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb); + mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb); if (!mh) return -ENOBUFS; @@ -614,11 +613,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid) break; } - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); status = ls->ls_recover_status; stop = dlm_recovery_stopped(ls); seq = ls->ls_recover_seq; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS))) goto ignore; diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 53917c0aa3c0..be4240f09abd 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -74,9 +74,9 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)) uint32_t dlm_recover_status(struct dlm_ls *ls) { uint32_t status; - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); status = ls->ls_recover_status; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); return status; } @@ -87,9 +87,9 @@ static void _set_recover_status(struct dlm_ls *ls, uint32_t status) void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) { - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); _set_recover_status(ls, status); - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); } static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, @@ -188,13 +188,13 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq) rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen); if (!rv) { - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); _set_recover_status(ls, DLM_RS_NODES_ALL); ls->ls_num_slots = num_slots; ls->ls_slots_size = slots_size; ls->ls_slots = slots; ls->ls_generation = gen; - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); } else { dlm_set_recover_status(ls, DLM_RS_NODES_ALL); } @@ -241,9 +241,9 @@ static int recover_list_empty(struct dlm_ls *ls) { int empty; - spin_lock(&ls->ls_recover_list_lock); + spin_lock_bh(&ls->ls_recover_list_lock); empty = list_empty(&ls->ls_recover_list); - spin_unlock(&ls->ls_recover_list_lock); + spin_unlock_bh(&ls->ls_recover_list_lock); return empty; } @@ -252,23 +252,23 @@ static void recover_list_add(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - spin_lock(&ls->ls_recover_list_lock); + spin_lock_bh(&ls->ls_recover_list_lock); if (list_empty(&r->res_recover_list)) { list_add_tail(&r->res_recover_list, &ls->ls_recover_list); ls->ls_recover_list_count++; dlm_hold_rsb(r); } - spin_unlock(&ls->ls_recover_list_lock); + spin_unlock_bh(&ls->ls_recover_list_lock); } static void recover_list_del(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - spin_lock(&ls->ls_recover_list_lock); + spin_lock_bh(&ls->ls_recover_list_lock); list_del_init(&r->res_recover_list); ls->ls_recover_list_count--; - spin_unlock(&ls->ls_recover_list_lock); + spin_unlock_bh(&ls->ls_recover_list_lock); dlm_put_rsb(r); } @@ -277,7 +277,7 @@ static void recover_list_clear(struct dlm_ls *ls) { struct dlm_rsb *r, *s; - spin_lock(&ls->ls_recover_list_lock); + spin_lock_bh(&ls->ls_recover_list_lock); list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) { list_del_init(&r->res_recover_list); r->res_recover_locks_count = 0; @@ -290,78 +290,81 @@ static void recover_list_clear(struct dlm_ls *ls) ls->ls_recover_list_count); ls->ls_recover_list_count = 0; } - spin_unlock(&ls->ls_recover_list_lock); + spin_unlock_bh(&ls->ls_recover_list_lock); } -static int recover_idr_empty(struct dlm_ls *ls) +static int recover_xa_empty(struct dlm_ls *ls) { int empty = 1; - spin_lock(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); if (ls->ls_recover_list_count) empty = 0; - spin_unlock(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); return empty; } -static int recover_idr_add(struct dlm_rsb *r) +static int recover_xa_add(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; + struct xa_limit limit = { + .min = 1, + .max = UINT_MAX, + }; + uint32_t id; int rv; - idr_preload(GFP_NOFS); - spin_lock(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); if (r->res_id) { rv = -1; goto out_unlock; } - rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT); + rv = xa_alloc(&ls->ls_recover_xa, &id, r, limit, GFP_ATOMIC); if (rv < 0) goto out_unlock; - r->res_id = rv; + r->res_id = id; ls->ls_recover_list_count++; dlm_hold_rsb(r); rv = 0; out_unlock: - spin_unlock(&ls->ls_recover_idr_lock); - idr_preload_end(); + spin_unlock_bh(&ls->ls_recover_xa_lock); return rv; } -static void recover_idr_del(struct dlm_rsb *r) +static void recover_xa_del(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - spin_lock(&ls->ls_recover_idr_lock); - idr_remove(&ls->ls_recover_idr, r->res_id); + spin_lock_bh(&ls->ls_recover_xa_lock); + xa_erase_bh(&ls->ls_recover_xa, r->res_id); r->res_id = 0; ls->ls_recover_list_count--; - spin_unlock(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); dlm_put_rsb(r); } -static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) +static struct dlm_rsb *recover_xa_find(struct dlm_ls *ls, uint64_t id) { struct dlm_rsb *r; - spin_lock(&ls->ls_recover_idr_lock); - r = idr_find(&ls->ls_recover_idr, (int)id); - spin_unlock(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); + r = xa_load(&ls->ls_recover_xa, (int)id); + spin_unlock_bh(&ls->ls_recover_xa_lock); return r; } -static void recover_idr_clear(struct dlm_ls *ls) +static void recover_xa_clear(struct dlm_ls *ls) { struct dlm_rsb *r; - int id; + unsigned long id; - spin_lock(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); - idr_for_each_entry(&ls->ls_recover_idr, r, id) { - idr_remove(&ls->ls_recover_idr, id); + xa_for_each(&ls->ls_recover_xa, id, r) { + xa_erase_bh(&ls->ls_recover_xa, id); r->res_id = 0; r->res_recover_locks_count = 0; ls->ls_recover_list_count--; @@ -374,7 +377,7 @@ static void recover_idr_clear(struct dlm_ls *ls) ls->ls_recover_list_count); ls->ls_recover_list_count = 0; } - spin_unlock(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); } @@ -449,10 +452,11 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) int is_removed = 0; int error; - if (is_master(r)) + if (r->res_nodeid != -1 && is_master(r)) return 0; - is_removed = dlm_is_removed(ls, r->res_nodeid); + if (r->res_nodeid != -1) + is_removed = dlm_is_removed(ls, r->res_nodeid); if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER)) return 0; @@ -472,7 +476,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) set_new_master(r); error = 0; } else { - recover_idr_add(r); + recover_xa_add(r); error = dlm_send_rcom_lookup(r, dir_nodeid, seq); } @@ -521,7 +525,8 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count) * the correct dir node. */ -int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq) +int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq, + const struct list_head *root_list) { struct dlm_rsb *r; unsigned int total = 0; @@ -531,10 +536,8 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq) log_rinfo(ls, "dlm_recover_masters"); - down_read(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + list_for_each_entry(r, root_list, res_root_list) { if (dlm_recovery_stopped(ls)) { - up_read(&ls->ls_root_sem); error = -EINTR; goto out; } @@ -548,19 +551,16 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq) cond_resched(); total++; - if (error) { - up_read(&ls->ls_root_sem); + if (error) goto out; - } } - up_read(&ls->ls_root_sem); log_rinfo(ls, "dlm_recover_masters %u of %u", count, total); - error = dlm_wait_function(ls, &recover_idr_empty); + error = dlm_wait_function(ls, &recover_xa_empty); out: if (error) - recover_idr_clear(ls); + recover_xa_clear(ls); return error; } @@ -569,7 +569,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) struct dlm_rsb *r; int ret_nodeid, new_master; - r = recover_idr_find(ls, le64_to_cpu(rc->rc_id)); + r = recover_xa_find(ls, le64_to_cpu(rc->rc_id)); if (!r) { log_error(ls, "dlm_recover_master_reply no id %llx", (unsigned long long)le64_to_cpu(rc->rc_id)); @@ -588,9 +588,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) r->res_nodeid = new_master; set_new_master(r); unlock_rsb(r); - recover_idr_del(r); + recover_xa_del(r); - if (recover_idr_empty(ls)) + if (recover_xa_empty(ls)) wake_up(&ls->ls_wait_general); out: return 0; @@ -658,14 +658,14 @@ static int recover_locks(struct dlm_rsb *r, uint64_t seq) return error; } -int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq) +int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, + const struct list_head *root_list) { struct dlm_rsb *r; int error, count = 0; - down_read(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { - if (is_master(r)) { + list_for_each_entry(r, root_list, res_root_list) { + if (r->res_nodeid != -1 && is_master(r)) { rsb_clear_flag(r, RSB_NEW_MASTER); continue; } @@ -675,19 +675,15 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq) if (dlm_recovery_stopped(ls)) { error = -EINTR; - up_read(&ls->ls_root_sem); goto out; } error = recover_locks(r, seq); - if (error) { - up_read(&ls->ls_root_sem); + if (error) goto out; - } count += r->res_recover_locks_count; } - up_read(&ls->ls_root_sem); log_rinfo(ls, "dlm_recover_locks %d out", count); @@ -815,33 +811,42 @@ static void recover_lvb(struct dlm_rsb *r) } /* All master rsb's flagged RECOVER_CONVERT need to be looked at. The locks - converting PR->CW or CW->PR need to have their lkb_grmode set. */ + * converting PR->CW or CW->PR may need to have their lkb_grmode changed. + */ static void recover_conversion(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; + uint32_t other_lkid = 0; + int other_grmode = -1; struct dlm_lkb *lkb; - int grmode = -1; list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { if (lkb->lkb_grmode == DLM_LOCK_PR || lkb->lkb_grmode == DLM_LOCK_CW) { - grmode = lkb->lkb_grmode; + other_grmode = lkb->lkb_grmode; + other_lkid = lkb->lkb_id; break; } } + if (other_grmode == -1) + return; + list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { - if (lkb->lkb_grmode != DLM_LOCK_IV) - continue; - if (grmode == -1) { - log_debug(ls, "recover_conversion %x set gr to rq %d", - lkb->lkb_id, lkb->lkb_rqmode); - lkb->lkb_grmode = lkb->lkb_rqmode; - } else { - log_debug(ls, "recover_conversion %x set gr %d", - lkb->lkb_id, grmode); - lkb->lkb_grmode = grmode; + /* Lock recovery created incompatible granted modes, so + * change the granted mode of the converting lock to + * NL. The rqmode of the converting lock should be CW, + * which means the converting lock should be granted at + * the end of recovery. + */ + if (((lkb->lkb_grmode == DLM_LOCK_PR) && (other_grmode == DLM_LOCK_CW)) || + ((lkb->lkb_grmode == DLM_LOCK_CW) && (other_grmode == DLM_LOCK_PR))) { + log_limit(ls, "%s %x gr %d rq %d, remote %d %x, other_lkid %u, other gr %d, set gr=NL", + __func__, lkb->lkb_id, lkb->lkb_grmode, + lkb->lkb_rqmode, lkb->lkb_nodeid, + lkb->lkb_remid, other_lkid, other_grmode); + lkb->lkb_grmode = DLM_LOCK_NL; } } } @@ -856,15 +861,14 @@ static void recover_grant(struct dlm_rsb *r) rsb_set_flag(r, RSB_RECOVER_GRANT); } -void dlm_recover_rsbs(struct dlm_ls *ls) +void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list) { struct dlm_rsb *r; unsigned int count = 0; - down_read(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + list_for_each_entry(r, root_list, res_root_list) { lock_rsb(r); - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { if (rsb_flag(r, RSB_RECOVER_CONVERT)) recover_conversion(r); @@ -883,76 +887,31 @@ void dlm_recover_rsbs(struct dlm_ls *ls) rsb_clear_flag(r, RSB_NEW_MASTER2); unlock_rsb(r); } - up_read(&ls->ls_root_sem); if (count) log_rinfo(ls, "dlm_recover_rsbs %d done", count); } -/* Create a single list of all root rsb's to be used during recovery */ - -int dlm_create_root_list(struct dlm_ls *ls) -{ - struct rb_node *n; - struct dlm_rsb *r; - int i, error = 0; - - down_write(&ls->ls_root_sem); - if (!list_empty(&ls->ls_root_list)) { - log_error(ls, "root list not empty"); - error = -EINVAL; - goto out; - } - - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - spin_lock(&ls->ls_rsbtbl[i].lock); - for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - list_add(&r->res_root_list, &ls->ls_root_list); - dlm_hold_rsb(r); - } - - if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss)) - log_error(ls, "dlm_create_root_list toss not empty"); - spin_unlock(&ls->ls_rsbtbl[i].lock); - } - out: - up_write(&ls->ls_root_sem); - return error; -} - -void dlm_release_root_list(struct dlm_ls *ls) +void dlm_clear_inactive(struct dlm_ls *ls) { struct dlm_rsb *r, *safe; + unsigned int count = 0; - down_write(&ls->ls_root_sem); - list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) { - list_del_init(&r->res_root_list); - dlm_put_rsb(r); - } - up_write(&ls->ls_root_sem); -} + write_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry_safe(r, safe, &ls->ls_slow_inactive, res_slow_list) { + list_del(&r->res_slow_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); -void dlm_clear_toss(struct dlm_ls *ls) -{ - struct rb_node *n, *next; - struct dlm_rsb *r; - unsigned int count = 0; - int i; - - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - spin_lock(&ls->ls_rsbtbl[i].lock); - for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { - next = rb_next(n); - r = rb_entry(n, struct dlm_rsb, res_hashnode); - rb_erase(n, &ls->ls_rsbtbl[i].toss); - dlm_free_rsb(r); - count++; - } - spin_unlock(&ls->ls_rsbtbl[i].lock); + if (!list_empty(&r->res_scan_list)) + list_del_init(&r->res_scan_list); + + free_inactive_rsb(r); + count++; } + write_unlock_bh(&ls->ls_rsbtbl_lock); if (count) - log_rinfo(ls, "dlm_clear_toss %u done", count); + log_rinfo(ls, "dlm_clear_inactive %u done", count); } diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h index dbc51013ecad..ec69896462fb 100644 --- a/fs/dlm/recover.h +++ b/fs/dlm/recover.h @@ -19,14 +19,14 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq); int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq); int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq); int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq); -int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq, + const struct list_head *root_list); int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc); -int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, + const struct list_head *root_list); void dlm_recovered_lock(struct dlm_rsb *r); -int dlm_create_root_list(struct dlm_ls *ls); -void dlm_release_root_list(struct dlm_ls *ls); -void dlm_clear_toss(struct dlm_ls *ls); -void dlm_recover_rsbs(struct dlm_ls *ls); +void dlm_clear_inactive(struct dlm_ls *ls); +void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list); #endif /* __RECOVER_DOT_H__ */ diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 4d17491dea2f..12272a8f6d75 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -20,6 +20,67 @@ #include "requestqueue.h" #include "recoverd.h" +static int dlm_create_masters_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + int error = 0; + + write_lock_bh(&ls->ls_masters_lock); + if (!list_empty(&ls->ls_masters_list)) { + log_error(ls, "root list not empty"); + error = -EINVAL; + goto out; + } + + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { + if (r->res_nodeid) + continue; + + list_add(&r->res_masters_list, &ls->ls_masters_list); + dlm_hold_rsb(r); + } + read_unlock_bh(&ls->ls_rsbtbl_lock); + out: + write_unlock_bh(&ls->ls_masters_lock); + return error; +} + +static void dlm_release_masters_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r, *safe; + + write_lock_bh(&ls->ls_masters_lock); + list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) { + list_del_init(&r->res_masters_list); + dlm_put_rsb(r); + } + write_unlock_bh(&ls->ls_masters_lock); +} + +static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list) +{ + struct dlm_rsb *r; + + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { + list_add(&r->res_root_list, root_list); + dlm_hold_rsb(r); + } + + WARN_ON_ONCE(!list_empty(&ls->ls_slow_inactive)); + read_unlock_bh(&ls->ls_rsbtbl_lock); +} + +static void dlm_release_root_list(struct list_head *root_list) +{ + struct dlm_rsb *r, *safe; + + list_for_each_entry_safe(r, safe, root_list, res_root_list) { + list_del_init(&r->res_root_list); + dlm_put_rsb(r); + } +} /* If the start for which we're re-enabling locking (seq) has been superseded by a newer stop (ls_recover_seq), we need to leave locking disabled. @@ -32,24 +93,35 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq) { int error = -EINTR; - down_write(&ls->ls_recv_active); + write_lock_bh(&ls->ls_recv_active); - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); if (ls->ls_recover_seq == seq) { set_bit(LSFL_RUNNING, &ls->ls_flags); + /* Schedule next timer if recovery put something on inactive. + * + * The rsbs that was queued while recovery on toss hasn't + * started yet because LSFL_RUNNING was set everything + * else recovery hasn't started as well because ls_in_recovery + * is still hold. So we should not run into the case that + * resume_scan_timer() queues a timer that can occur in + * a no op. + */ + resume_scan_timer(ls); /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); error = 0; } - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); - up_write(&ls->ls_recv_active); + write_unlock_bh(&ls->ls_recv_active); return error; } static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) { + LIST_HEAD(root_list); unsigned long start; int error, neg = 0; @@ -59,14 +131,14 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_callback_suspend(ls); - dlm_clear_toss(ls); + dlm_clear_inactive(ls); /* * This list of root rsb's will be the basis of most of the recovery * routines. */ - dlm_create_root_list(ls); + dlm_create_root_list(ls, &root_list); /* * Add or remove nodes from the lockspace's ls_nodes list. @@ -79,13 +151,28 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members(ls, rv, &neg); if (error) { log_rinfo(ls, "dlm_recover_members error %d", error); - goto fail; + goto fail_root_list; } - dlm_recover_dir_nodeid(ls); + dlm_recover_dir_nodeid(ls, &root_list); + + /* Create a snapshot of all active rsbs were we are the master of. + * During the barrier between dlm_recover_members_wait() and + * dlm_recover_directory() other nodes can dump their necessary + * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom + * communication dlm_copy_master_names() handling. + * + * TODO We should create a per lockspace list that contains rsbs + * that we are the master of. Instead of creating this list while + * recovery we keep track of those rsbs while locking handling and + * recovery can use it when necessary. + */ + error = dlm_create_masters_list(ls); + if (error) { + log_rinfo(ls, "dlm_create_masters_list error %d", error); + goto fail_root_list; + } - ls->ls_recover_dir_sent_res = 0; - ls->ls_recover_dir_sent_msg = 0; ls->ls_recover_locks_in = 0; dlm_set_recover_status(ls, DLM_RS_NODES); @@ -93,7 +180,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_members_wait error %d", error); - goto fail; + dlm_release_masters_list(ls); + goto fail_root_list; } start = jiffies; @@ -106,7 +194,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory error %d", error); - goto fail; + dlm_release_masters_list(ls); + goto fail_root_list; } dlm_set_recover_status(ls, DLM_RS_DIR); @@ -114,11 +203,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory_wait error %d", error); - goto fail; + dlm_release_masters_list(ls); + goto fail_root_list; } - log_rinfo(ls, "dlm_recover_directory %u out %u messages", - ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg); + dlm_release_masters_list(ls); /* * We may have outstanding operations that are waiting for a reply from @@ -130,7 +219,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) if (dlm_recovery_stopped(ls)) { error = -EINTR; - goto fail; + goto fail_root_list; } if (neg || dlm_no_directory(ls)) { @@ -138,27 +227,27 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * Clear lkb's for departed nodes. */ - dlm_recover_purge(ls); + dlm_recover_purge(ls, &root_list); /* * Get new master nodeid's for rsb's that were mastered on * departed nodes. */ - error = dlm_recover_masters(ls, rv->seq); + error = dlm_recover_masters(ls, rv->seq, &root_list); if (error) { log_rinfo(ls, "dlm_recover_masters error %d", error); - goto fail; + goto fail_root_list; } /* * Send our locks on remastered rsb's to the new masters. */ - error = dlm_recover_locks(ls, rv->seq); + error = dlm_recover_locks(ls, rv->seq, &root_list); if (error) { log_rinfo(ls, "dlm_recover_locks error %d", error); - goto fail; + goto fail_root_list; } dlm_set_recover_status(ls, DLM_RS_LOCKS); @@ -166,7 +255,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); - goto fail; + goto fail_root_list; } log_rinfo(ls, "dlm_recover_locks %u in", @@ -178,7 +267,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * settings. */ - dlm_recover_rsbs(ls); + dlm_recover_rsbs(ls, &root_list); } else { /* * Other lockspace members may be going through the "neg" steps @@ -190,11 +279,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); - goto fail; + goto fail_root_list; } } - dlm_release_root_list(ls); + dlm_release_root_list(&root_list); /* * Purge directory-related requests that are saved in requestqueue. @@ -243,8 +332,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) return 0; + fail_root_list: + dlm_release_root_list(&root_list); fail: - dlm_release_root_list(ls); mutex_unlock(&ls->ls_recoverd_active); return error; @@ -259,12 +349,12 @@ static void do_ls_recovery(struct dlm_ls *ls) struct dlm_recover *rv = NULL; int error; - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); rv = ls->ls_recover_args; ls->ls_recover_args = NULL; if (rv && ls->ls_recover_seq == rv->seq) clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags); - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); if (rv) { error = ls_recover(ls, rv); diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c index 892d6ca21e74..719a5243a069 100644 --- a/fs/dlm/requestqueue.c +++ b/fs/dlm/requestqueue.c @@ -37,7 +37,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, int length = le16_to_cpu(ms->m_header.h_length) - sizeof(struct dlm_message); - e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS); + e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC); if (!e) { log_print("dlm_add_requestqueue: out of memory len %d", length); return; @@ -48,10 +48,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, memcpy(&e->request, ms, sizeof(*ms)); memcpy(&e->request.m_extra, ms->m_extra, length); - atomic_inc(&ls->ls_requestqueue_cnt); - mutex_lock(&ls->ls_requestqueue_mutex); list_add_tail(&e->list, &ls->ls_requestqueue); - mutex_unlock(&ls->ls_requestqueue_mutex); } /* @@ -71,16 +68,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls) struct dlm_message *ms; int error = 0; - mutex_lock(&ls->ls_requestqueue_mutex); - + write_lock_bh(&ls->ls_requestqueue_lock); for (;;) { if (list_empty(&ls->ls_requestqueue)) { - mutex_unlock(&ls->ls_requestqueue_mutex); + clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags); error = 0; break; } - e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); - mutex_unlock(&ls->ls_requestqueue_mutex); + e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list); ms = &e->request; @@ -93,41 +88,23 @@ int dlm_process_requestqueue(struct dlm_ls *ls) e->recover_seq); dlm_receive_message_saved(ls, &e->request, e->recover_seq); - - mutex_lock(&ls->ls_requestqueue_mutex); list_del(&e->list); - if (atomic_dec_and_test(&ls->ls_requestqueue_cnt)) - wake_up(&ls->ls_requestqueue_wait); kfree(e); if (dlm_locking_stopped(ls)) { log_debug(ls, "process_requestqueue abort running"); - mutex_unlock(&ls->ls_requestqueue_mutex); error = -EINTR; break; } + write_unlock_bh(&ls->ls_requestqueue_lock); schedule(); + write_lock_bh(&ls->ls_requestqueue_lock); } + write_unlock_bh(&ls->ls_requestqueue_lock); return error; } -/* - * After recovery is done, locking is resumed and dlm_recoverd takes all the - * saved requests and processes them as they would have been by dlm_recv. At - * the same time, dlm_recv will start receiving new requests from remote nodes. - * We want to delay dlm_recv processing new requests until dlm_recoverd has - * finished processing the old saved requests. We don't check for locking - * stopped here because dlm_ls_stop won't stop locking until it's suspended us - * (dlm_recv). - */ - -void dlm_wait_requestqueue(struct dlm_ls *ls) -{ - wait_event(ls->ls_requestqueue_wait, - atomic_read(&ls->ls_requestqueue_cnt) == 0); -} - static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) { __le32 type = ms->m_type; @@ -158,17 +135,15 @@ void dlm_purge_requestqueue(struct dlm_ls *ls) struct dlm_message *ms; struct rq_entry *e, *safe; - mutex_lock(&ls->ls_requestqueue_mutex); + write_lock_bh(&ls->ls_requestqueue_lock); list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { ms = &e->request; if (purge_request(ls, ms, e->nodeid)) { list_del(&e->list); - if (atomic_dec_and_test(&ls->ls_requestqueue_cnt)) - wake_up(&ls->ls_requestqueue_wait); kfree(e); } } - mutex_unlock(&ls->ls_requestqueue_mutex); + write_unlock_bh(&ls->ls_requestqueue_lock); } diff --git a/fs/dlm/user.c b/fs/dlm/user.c index 9f9b68448830..5cb3896be826 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -145,24 +145,6 @@ static void compat_output(struct dlm_lock_result *res, } #endif -/* should held proc->asts_spin lock */ -void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb) -{ - struct dlm_callback *cb, *safe; - - list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) { - list_del(&cb->list); - kref_put(&cb->ref, dlm_release_callback); - } - - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - - /* invalidate */ - dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL); - dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL); - lkb->lkb_last_bast_mode = -1; -} - /* Figure out if this lock is at the end of its life and no longer available for the application to use. The lkb still exists until the final ast is read. A lock becomes EOL in three situations: @@ -199,14 +181,15 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, struct dlm_ls *ls; struct dlm_user_args *ua; struct dlm_user_proc *proc; - int rv; + struct dlm_callback *cb; + int rv, copy_lvb; if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) || test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags)) return; ls = lkb->lkb_resource->res_ls; - spin_lock(&ls->ls_clear_proc_locks); + spin_lock_bh(&ls->ls_clear_proc_locks); /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed @@ -228,38 +211,38 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status)) set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags); - spin_lock(&proc->asts_spin); - - rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags); - switch (rv) { - case DLM_ENQUEUE_CALLBACK_FAILURE: - spin_unlock(&proc->asts_spin); - WARN_ON_ONCE(1); - goto out; - case DLM_ENQUEUE_CALLBACK_NEED_SCHED: - kref_get(&lkb->lkb_ref); - list_add_tail(&lkb->lkb_cb_list, &proc->asts); - wake_up_interruptible(&proc->wait); - break; - case DLM_ENQUEUE_CALLBACK_SUCCESS: - break; - default: - WARN_ON_ONCE(1); - break; + spin_lock_bh(&proc->asts_spin); + + if (!dlm_may_skip_callback(lkb, flags, mode, status, sbflags, + ©_lvb)) { + rv = dlm_get_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) { + cb->copy_lvb = copy_lvb; + cb->ua = *ua; + cb->lkb_lksb = &cb->ua.lksb; + if (copy_lvb) { + memcpy(cb->lvbptr, ua->lksb.sb_lvbptr, + DLM_USER_LVB_LEN); + cb->lkb_lksb->sb_lvbptr = cb->lvbptr; + } + + list_add_tail(&cb->list, &proc->asts); + wake_up_interruptible(&proc->wait); + } } - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) { /* N.B. spin_lock locks_spin, not asts_spin */ - spin_lock(&proc->locks_spin); + spin_lock_bh(&proc->locks_spin); if (!list_empty(&lkb->lkb_ownqueue)) { list_del_init(&lkb->lkb_ownqueue); dlm_put_lkb(lkb); } - spin_unlock(&proc->locks_spin); + spin_unlock_bh(&proc->locks_spin); } out: - spin_unlock(&ls->ls_clear_proc_locks); + spin_unlock_bh(&ls->ls_clear_proc_locks); } static int device_user_lock(struct dlm_user_proc *proc, @@ -465,7 +448,7 @@ static int device_remove_lockspace(struct dlm_lspace_params *params) if (params->flags & DLM_USER_LSFLG_FORCEFREE) force = 2; - lockspace = ls->ls_local_handle; + lockspace = ls; dlm_put_lockspace(ls); /* The final dlm_release_lockspace waits for references to go to @@ -668,7 +651,7 @@ static int device_open(struct inode *inode, struct file *file) return -ENOMEM; } - proc->lockspace = ls->ls_local_handle; + proc->lockspace = ls; INIT_LIST_HEAD(&proc->asts); INIT_LIST_HEAD(&proc->locks); INIT_LIST_HEAD(&proc->unlocking); @@ -803,11 +786,9 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, loff_t *ppos) { struct dlm_user_proc *proc = file->private_data; - struct dlm_lkb *lkb; DECLARE_WAITQUEUE(wait, current); struct dlm_callback *cb; - int rv, ret, copy_lvb = 0; - int old_mode, new_mode; + int rv, ret; if (count == sizeof(struct dlm_device_version)) { rv = copy_version_to_user(buf, count); @@ -826,16 +807,14 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, #endif return -EINVAL; - try_another: - /* do we really need this? can a read happen after a close? */ if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags)) return -EINVAL; - spin_lock(&proc->asts_spin); + spin_lock_bh(&proc->asts_spin); if (list_empty(&proc->asts)) { if (file->f_flags & O_NONBLOCK) { - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); return -EAGAIN; } @@ -844,16 +823,16 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, repeat: set_current_state(TASK_INTERRUPTIBLE); if (list_empty(&proc->asts) && !signal_pending(current)) { - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); schedule(); - spin_lock(&proc->asts_spin); + spin_lock_bh(&proc->asts_spin); goto repeat; } set_current_state(TASK_RUNNING); remove_wait_queue(&proc->wait, &wait); if (signal_pending(current)) { - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); return -ERESTARTSYS; } } @@ -862,60 +841,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, without removing lkb_cb_list; so empty lkb_cb_list is always consistent with empty lkb_callbacks */ - lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list); - - /* rem_lkb_callback sets a new lkb_last_cast */ - old_mode = lkb->lkb_last_cast->mode; - - rv = dlm_dequeue_lkb_callback(lkb, &cb); - switch (rv) { - case DLM_DEQUEUE_CALLBACK_EMPTY: - /* this shouldn't happen; lkb should have been removed from - * list when last item was dequeued - */ - log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id); - list_del_init(&lkb->lkb_cb_list); - spin_unlock(&proc->asts_spin); - /* removes ref for proc->asts, may cause lkb to be freed */ - dlm_put_lkb(lkb); - WARN_ON_ONCE(1); - goto try_another; - case DLM_DEQUEUE_CALLBACK_LAST: - list_del_init(&lkb->lkb_cb_list); - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - break; - case DLM_DEQUEUE_CALLBACK_SUCCESS: - break; - default: - WARN_ON_ONCE(1); - break; - } - spin_unlock(&proc->asts_spin); + cb = list_first_entry(&proc->asts, struct dlm_callback, list); + list_del(&cb->list); + spin_unlock_bh(&proc->asts_spin); if (cb->flags & DLM_CB_BAST) { - trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode); + trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name, + cb->res_length); } else if (cb->flags & DLM_CB_CAST) { - new_mode = cb->mode; - - if (!cb->sb_status && lkb->lkb_lksb->sb_lvbptr && - dlm_lvb_operations[old_mode + 1][new_mode + 1]) - copy_lvb = 1; - - lkb->lkb_lksb->sb_status = cb->sb_status; - lkb->lkb_lksb->sb_flags = cb->sb_flags; - trace_dlm_ast(lkb->lkb_resource->res_ls, lkb); + cb->lkb_lksb->sb_status = cb->sb_status; + cb->lkb_lksb->sb_flags = cb->sb_flags; + trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status, + cb->sb_flags, cb->res_name, cb->res_length); } - ret = copy_result_to_user(lkb->lkb_ua, + ret = copy_result_to_user(&cb->ua, test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags), - cb->flags, cb->mode, copy_lvb, buf, count); - - kref_put(&cb->ref, dlm_release_callback); - - /* removes ref for proc->asts, may cause lkb to be freed */ - if (rv == DLM_DEQUEUE_CALLBACK_LAST) - dlm_put_lkb(lkb); - + cb->flags, cb->mode, cb->copy_lvb, buf, count); + dlm_free_cb(cb); return ret; } @@ -925,12 +868,12 @@ static __poll_t device_poll(struct file *file, poll_table *wait) poll_wait(file, &proc->wait, wait); - spin_lock(&proc->asts_spin); + spin_lock_bh(&proc->asts_spin); if (!list_empty(&proc->asts)) { - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); return EPOLLIN | EPOLLRDNORM; } - spin_unlock(&proc->asts_spin); + spin_unlock_bh(&proc->asts_spin); return 0; } |