diff options
Diffstat (limited to 'fs/gfs2/glock.c')
| -rw-r--r-- | fs/gfs2/glock.c | 1044 |
1 files changed, 486 insertions, 558 deletions
diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index 1438e7465e30..92e029104d8a 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -34,8 +34,8 @@ #include <linux/lockref.h> #include <linux/rhashtable.h> #include <linux/pid_namespace.h> -#include <linux/fdtable.h> #include <linux/file.h> +#include <linux/random.h> #include "gfs2.h" #include "incore.h" @@ -61,12 +61,10 @@ struct gfs2_glock_iter { typedef void (*glock_examiner) (struct gfs2_glock * gl); static void do_xmote(struct gfs2_glock *gl, struct gfs2_holder *gh, unsigned int target); -static void __gfs2_glock_dq(struct gfs2_holder *gh); -static void handle_callback(struct gfs2_glock *gl, unsigned int state, - unsigned long delay, bool remote); +static void request_demote(struct gfs2_glock *gl, unsigned int state, + unsigned long delay, bool remote); static struct dentry *gfs2_root; -static struct workqueue_struct *glock_workqueue; static LIST_HEAD(lru_list); static atomic_t lru_count = ATOMIC_INIT(0); static DEFINE_SPINLOCK(lru_lock); @@ -139,44 +137,43 @@ static void gfs2_glock_dealloc(struct rcu_head *rcu) kmem_cache_free(gfs2_glock_cachep, gl); } -/** - * glock_blocked_by_withdraw - determine if we can still use a glock - * @gl: the glock - * - * We need to allow some glocks to be enqueued, dequeued, promoted, and demoted - * when we're withdrawn. For example, to maintain metadata integrity, we should - * disallow the use of inode and rgrp glocks when withdrawn. Other glocks like - * the iopen or freeze glock may be safely used because none of their - * metadata goes through the journal. So in general, we should disallow all - * glocks that are journaled, and allow all the others. One exception is: - * we need to allow our active journal to be promoted and demoted so others - * may recover it and we can reacquire it when they're done. - */ -static bool glock_blocked_by_withdraw(struct gfs2_glock *gl) +static void __gfs2_glock_free(struct gfs2_glock *gl) { + rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms); + smp_mb(); + wake_up_glock(gl); + call_rcu(&gl->gl_rcu, gfs2_glock_dealloc); +} + +void gfs2_glock_free(struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - if (likely(!gfs2_withdrawn(sdp))) - return false; - if (gl->gl_ops->go_flags & GLOF_NONDISK) - return false; - if (!sdp->sd_jdesc || - gl->gl_name.ln_number == sdp->sd_jdesc->jd_no_addr) - return false; - return true; + __gfs2_glock_free(gl); + if (atomic_dec_and_test(&sdp->sd_glock_disposal)) + wake_up(&sdp->sd_kill_wait); } -void gfs2_glock_free(struct gfs2_glock *gl) -{ +void gfs2_glock_free_later(struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - gfs2_glock_assert_withdraw(gl, atomic_read(&gl->gl_revokes) == 0); - rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms); - smp_mb(); - wake_up_glock(gl); - call_rcu(&gl->gl_rcu, gfs2_glock_dealloc); + spin_lock(&lru_lock); + list_add(&gl->gl_lru, &sdp->sd_dead_glocks); + spin_unlock(&lru_lock); if (atomic_dec_and_test(&sdp->sd_glock_disposal)) - wake_up(&sdp->sd_glock_wait); + wake_up(&sdp->sd_kill_wait); +} + +static void gfs2_free_dead_glocks(struct gfs2_sbd *sdp) +{ + struct list_head *list = &sdp->sd_dead_glocks; + + while(!list_empty(list)) { + struct gfs2_glock *gl; + + gl = list_first_entry(list, struct gfs2_glock, gl_lru); + list_del_init(&gl->gl_lru); + __gfs2_glock_free(gl); + } } /** @@ -192,34 +189,9 @@ struct gfs2_glock *gfs2_glock_hold(struct gfs2_glock *gl) return gl; } -/** - * demote_ok - Check to see if it's ok to unlock a glock - * @gl: the glock - * - * Returns: 1 if it's ok - */ - -static int demote_ok(const struct gfs2_glock *gl) -{ - const struct gfs2_glock_operations *glops = gl->gl_ops; - - if (gl->gl_state == LM_ST_UNLOCKED) - return 0; - if (!list_empty(&gl->gl_holders)) - return 0; - if (glops->go_demote_ok) - return glops->go_demote_ok(gl); - return 1; -} - - -void gfs2_glock_add_to_lru(struct gfs2_glock *gl) +static void gfs2_glock_add_to_lru(struct gfs2_glock *gl) { - if (!(gl->gl_ops->go_flags & GLOF_LRU)) - return; - spin_lock(&lru_lock); - list_move_tail(&gl->gl_lru, &lru_list); if (!test_bit(GLF_LRU, &gl->gl_flags)) { @@ -232,9 +204,6 @@ void gfs2_glock_add_to_lru(struct gfs2_glock *gl) static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl) { - if (!(gl->gl_ops->go_flags & GLOF_LRU)) - return; - spin_lock(&lru_lock); if (test_bit(GLF_LRU, &gl->gl_flags)) { list_del_init(&gl->gl_lru); @@ -248,8 +217,10 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl) * Enqueue the glock on the work queue. Passes one glock reference on to the * work queue. */ -static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { - if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) { +static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + + if (!queue_delayed_work(sdp->sd_glock_wq, &gl->gl_work, delay)) { /* * We are holding the lockref spinlock, and the work was still * queued above. The queued work (glock_work_func) takes that @@ -261,12 +232,6 @@ static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) } } -static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { - spin_lock(&gl->gl_lockref.lock); - __gfs2_glock_queue_work(gl, delay); - spin_unlock(&gl->gl_lockref.lock); -} - static void __gfs2_glock_put(struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; @@ -285,12 +250,18 @@ static void __gfs2_glock_put(struct gfs2_glock *gl) sdp->sd_lockstruct.ls_ops->lm_put_lock(gl); } -/* - * Cause the glock to be put in work queue context. - */ -void gfs2_glock_queue_put(struct gfs2_glock *gl) +static bool __gfs2_glock_put_or_lock(struct gfs2_glock *gl) { - gfs2_glock_queue_work(gl, 0); + if (lockref_put_or_lock(&gl->gl_lockref)) + return true; + GLOCK_BUG_ON(gl, gl->gl_lockref.count != 1); + if (gl->gl_state != LM_ST_UNLOCKED) { + gl->gl_lockref.count--; + gfs2_glock_add_to_lru(gl); + spin_unlock(&gl->gl_lockref.lock); + return true; + } + return false; } /** @@ -301,12 +272,28 @@ void gfs2_glock_queue_put(struct gfs2_glock *gl) void gfs2_glock_put(struct gfs2_glock *gl) { - if (lockref_put_or_lock(&gl->gl_lockref)) + if (__gfs2_glock_put_or_lock(gl)) return; __gfs2_glock_put(gl); } +/* + * gfs2_glock_put_async - Decrement reference count without sleeping + * @gl: The glock to put + * + * Decrement the reference count on glock immediately unless it is the last + * reference. Defer putting the last reference to work queue context. + */ +void gfs2_glock_put_async(struct gfs2_glock *gl) +{ + if (__gfs2_glock_put_or_lock(gl)) + return; + + gfs2_glock_queue_work(gl, 0); + spin_unlock(&gl->gl_lockref.lock); +} + /** * may_grant - check if it's ok to grant a new lock * @gl: The glock @@ -467,14 +454,18 @@ done: /** * do_promote - promote as many requests as possible on the current queue * @gl: The glock - * - * Returns: 1 if there is a blocked holder at the head of the list */ -static int do_promote(struct gfs2_glock *gl) +static void do_promote(struct gfs2_glock *gl) { + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; struct gfs2_holder *gh, *current_gh; + if (gfs2_withdrawn(sdp)) { + do_error(gl, LM_OUT_ERROR); + return; + } + current_gh = find_first_holder(gl); list_for_each_entry(gh, &gl->gl_holders, gh_list) { if (test_bit(HIF_HOLDER, &gh->gh_iflags)) @@ -482,13 +473,10 @@ static int do_promote(struct gfs2_glock *gl) if (!may_grant(gl, current_gh, gh)) { /* * If we get here, it means we may not grant this - * holder for some reason. If this holder is at the - * head of the list, it means we have a blocked holder - * at the head, so return 1. + * holder for some reason. */ - if (list_is_first(&gh->gh_list, &gl->gl_holders)) - return 1; - do_error(gl, 0); + if (current_gh) + do_error(gl, 0); /* Fail queued try locks */ break; } set_bit(HIF_HOLDER, &gh->gh_iflags); @@ -497,7 +485,6 @@ static int do_promote(struct gfs2_glock *gl) if (!current_gh) current_gh = gh; } - return 0; } /** @@ -517,6 +504,23 @@ static inline struct gfs2_holder *find_first_waiter(const struct gfs2_glock *gl) } /** + * find_last_waiter - find the last gh that's waiting for the glock + * @gl: the glock + * + * This also is a fast way of finding out if there are any waiters. + */ + +static inline struct gfs2_holder *find_last_waiter(const struct gfs2_glock *gl) +{ + struct gfs2_holder *gh; + + if (list_empty(&gl->gl_holders)) + return NULL; + gh = list_last_entry(&gl->gl_holders, struct gfs2_holder, gh_list); + return test_bit(HIF_HOLDER, &gh->gh_iflags) ? NULL : gh; +} + +/** * state_change - record that the glock is now in a different state * @gl: the glock * @new_state: the new state @@ -524,18 +528,6 @@ static inline struct gfs2_holder *find_first_waiter(const struct gfs2_glock *gl) static void state_change(struct gfs2_glock *gl, unsigned int new_state) { - int held1, held2; - - held1 = (gl->gl_state != LM_ST_UNLOCKED); - held2 = (new_state != LM_ST_UNLOCKED); - - if (held1 != held2) { - GLOCK_BUG_ON(gl, __lockref_is_dead(&gl->gl_lockref)); - if (held2) - gl->gl_lockref.count++; - else - gl->gl_lockref.count--; - } if (new_state != gl->gl_target) /* shorten our minimum hold time */ gl->gl_hold_time = max(gl->gl_hold_time - GL_GLOCK_HOLD_DECR, @@ -544,11 +536,11 @@ static void state_change(struct gfs2_glock *gl, unsigned int new_state) gl->gl_tchange = jiffies; } -static void gfs2_set_demote(struct gfs2_glock *gl) +static void gfs2_set_demote(int nr, struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - set_bit(GLF_DEMOTE, &gl->gl_flags); + set_bit(nr, &gl->gl_flags); smp_mb(); wake_up(&sdp->sd_async_glock_wait); } @@ -571,31 +563,31 @@ static void gfs2_demote_wake(struct gfs2_glock *gl) static void finish_xmote(struct gfs2_glock *gl, unsigned int ret) { const struct gfs2_glock_operations *glops = gl->gl_ops; - struct gfs2_holder *gh; - unsigned state = ret & LM_OUT_ST_MASK; - spin_lock(&gl->gl_lockref.lock); - trace_gfs2_glock_state_change(gl, state); - state_change(gl, state); - gh = find_first_waiter(gl); + if (!(ret & ~LM_OUT_ST_MASK)) { + unsigned state = ret & LM_OUT_ST_MASK; + + trace_gfs2_glock_state_change(gl, state); + state_change(gl, state); + } /* Demote to UN request arrived during demote to SH or DF */ if (test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags) && - state != LM_ST_UNLOCKED && gl->gl_demote_state == LM_ST_UNLOCKED) + gl->gl_state != LM_ST_UNLOCKED && + gl->gl_demote_state == LM_ST_UNLOCKED) gl->gl_target = LM_ST_UNLOCKED; /* Check for state != intended state */ - if (unlikely(state != gl->gl_target)) { - if (gh && (ret & LM_OUT_CANCELED)) - gfs2_holder_wake(gh); + if (unlikely(gl->gl_state != gl->gl_target)) { + struct gfs2_holder *gh = find_first_waiter(gl); + if (gh && !test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags)) { - /* move to back of queue and try next entry */ if (ret & LM_OUT_CANCELED) { - if ((gh->gh_flags & LM_FLAG_PRIORITY) == 0) - list_move_tail(&gh->gh_list, &gl->gl_holders); - gh = find_first_waiter(gl); - gl->gl_target = gh->gh_state; - goto retry; + list_del_init(&gh->gh_list); + trace_gfs2_glock_queue(gh, 0); + gfs2_holder_wake(gh); + gl->gl_target = gl->gl_state; + goto out; } /* Some error or failed "try lock" - report it */ if ((ret & LM_OUT_ERROR) || @@ -605,10 +597,9 @@ static void finish_xmote(struct gfs2_glock *gl, unsigned int ret) goto out; } } - switch(state) { + switch(gl->gl_state) { /* Unlocked due to conversion deadlock, try again */ case LM_ST_UNLOCKED: -retry: do_xmote(gl, gh, gl->gl_target); break; /* Conversion fails, unlock and try again */ @@ -617,18 +608,21 @@ retry: do_xmote(gl, gh, LM_ST_UNLOCKED); break; default: /* Everything else */ - fs_err(gl->gl_name.ln_sbd, "wanted %u got %u\n", - gl->gl_target, state); + fs_err(gl->gl_name.ln_sbd, + "glock %u:%llu requested=%u ret=%u\n", + gl->gl_name.ln_type, gl->gl_name.ln_number, + gl->gl_req, ret); GLOCK_BUG_ON(gl, 1); } - spin_unlock(&gl->gl_lockref.lock); return; } /* Fast path - we got what we asked for */ - if (test_and_clear_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags)) + if (test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags)) { + clear_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags); gfs2_demote_wake(gl); - if (state != LM_ST_UNLOCKED) { + } + if (gl->gl_state != LM_ST_UNLOCKED) { if (glops->go_xmote_bh) { int rv; @@ -643,18 +637,8 @@ retry: do_promote(gl); } out: - clear_bit(GLF_LOCK, &gl->gl_flags); - spin_unlock(&gl->gl_lockref.lock); -} - -static bool is_system_glock(struct gfs2_glock *gl) -{ - struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - struct gfs2_inode *m_ip = GFS2_I(sdp->sd_statfs_inode); - - if (gl == m_ip->i_gl) - return true; - return false; + if (!test_bit(GLF_CANCELING, &gl->gl_flags)) + clear_bit(GLF_LOCK, &gl->gl_flags); } /** @@ -672,136 +656,86 @@ __acquires(&gl->gl_lockref.lock) { const struct gfs2_glock_operations *glops = gl->gl_ops; struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - unsigned int lck_flags = (unsigned int)(gh ? gh->gh_flags : 0); + struct lm_lockstruct *ls = &sdp->sd_lockstruct; int ret; - if (target != LM_ST_UNLOCKED && glock_blocked_by_withdraw(gl) && - gh && !(gh->gh_flags & LM_FLAG_NOEXP)) - goto skip_inval; + /* + * When a filesystem is withdrawing, the remaining cluster nodes will + * take care of recovering the withdrawing node's journal. We only + * need to make sure that once we trigger remote recovery, we won't + * write to the shared block device anymore. This means that here, + * + * - no new writes to the filesystem must be triggered (->go_sync()). + * + * - any cached data should be discarded by calling ->go_inval(), dirty + * or not and journaled or unjournaled. + * + * - no more dlm locking operations should be issued (->lm_lock()). + */ - lck_flags &= (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP | - LM_FLAG_PRIORITY); GLOCK_BUG_ON(gl, gl->gl_state == target); GLOCK_BUG_ON(gl, gl->gl_state == gl->gl_target); - if ((target == LM_ST_UNLOCKED || target == LM_ST_DEFERRED) && - glops->go_inval) { - /* - * If another process is already doing the invalidate, let that - * finish first. The glock state machine will get back to this - * holder again later. - */ - if (test_and_set_bit(GLF_INVALIDATE_IN_PROGRESS, - &gl->gl_flags)) - return; - do_error(gl, 0); /* Fail queued try locks */ - } - gl->gl_req = target; - set_bit(GLF_BLOCKING, &gl->gl_flags); - if ((gl->gl_req == LM_ST_UNLOCKED) || - (gl->gl_state == LM_ST_EXCLUSIVE) || - (lck_flags & (LM_FLAG_TRY|LM_FLAG_TRY_1CB))) - clear_bit(GLF_BLOCKING, &gl->gl_flags); + + if (!glops->go_inval || !glops->go_sync) + goto skip_inval; + spin_unlock(&gl->gl_lockref.lock); - if (glops->go_sync) { + if (!gfs2_withdrawn(sdp)) { ret = glops->go_sync(gl); - /* If we had a problem syncing (due to io errors or whatever, - * we should not invalidate the metadata or tell dlm to - * release the glock to other nodes. - */ if (ret) { if (cmpxchg(&sdp->sd_log_error, 0, ret)) { - fs_err(sdp, "Error %d syncing glock \n", ret); + fs_err(sdp, "Error %d syncing glock\n", ret); gfs2_dump_glock(NULL, gl, true); + gfs2_withdraw(sdp); } - goto skip_inval; } } - if (test_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags)) { - /* - * The call to go_sync should have cleared out the ail list. - * If there are still items, we have a problem. We ought to - * withdraw, but we can't because the withdraw code also uses - * glocks. Warn about the error, dump the glock, then fall - * through and wait for logd to do the withdraw for us. - */ - if ((atomic_read(&gl->gl_ail_count) != 0) && - (!cmpxchg(&sdp->sd_log_error, 0, -EIO))) { - gfs2_glock_assert_warn(gl, - !atomic_read(&gl->gl_ail_count)); - gfs2_dump_glock(NULL, gl, true); - } + + if (target == LM_ST_UNLOCKED || target == LM_ST_DEFERRED) glops->go_inval(gl, target == LM_ST_DEFERRED ? 0 : DIO_METADATA); - clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags); - } + spin_lock(&gl->gl_lockref.lock); skip_inval: - gfs2_glock_hold(gl); - /* - * Check for an error encountered since we called go_sync and go_inval. - * If so, we can't withdraw from the glock code because the withdraw - * code itself uses glocks (see function signal_our_withdraw) to - * change the mount to read-only. Most importantly, we must not call - * dlm to unlock the glock until the journal is in a known good state - * (after journal replay) otherwise other nodes may use the object - * (rgrp or dinode) and then later, journal replay will corrupt the - * file system. The best we can do here is wait for the logd daemon - * to see sd_log_error and withdraw, and in the meantime, requeue the - * work for later. - * - * We make a special exception for some system glocks, such as the - * system statfs inode glock, which needs to be granted before the - * gfs2_quotad daemon can exit, and that exit needs to finish before - * we can unmount the withdrawn file system. - * - * However, if we're just unlocking the lock (say, for unmount, when - * gfs2_gl_hash_clear calls clear_glock) and recovery is complete - * then it's okay to tell dlm to unlock it. - */ - if (unlikely(sdp->sd_log_error && !gfs2_withdrawn(sdp))) - gfs2_withdraw_delayed(sdp); - if (glock_blocked_by_withdraw(gl) && - (target != LM_ST_UNLOCKED || - test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags))) { - if (!is_system_glock(gl)) { - handle_callback(gl, LM_ST_UNLOCKED, 0, false); /* sets demote */ - /* - * Ordinarily, we would call dlm and its callback would call - * finish_xmote, which would call state_change() to the new state. - * Since we withdrew, we won't call dlm, so call state_change - * manually, but to the UNLOCKED state we desire. - */ - state_change(gl, LM_ST_UNLOCKED); + if (gfs2_withdrawn(sdp)) { + if (target != LM_ST_UNLOCKED) + target = LM_OUT_ERROR; + goto out; + } + + if (ls->ls_ops->lm_lock) { + set_bit(GLF_PENDING_REPLY, &gl->gl_flags); + spin_unlock(&gl->gl_lockref.lock); + ret = ls->ls_ops->lm_lock(gl, target, gh ? gh->gh_flags : 0); + spin_lock(&gl->gl_lockref.lock); + + if (!ret) { + /* The operation will be completed asynchronously. */ + gl->gl_lockref.count++; + return; + } + clear_bit(GLF_PENDING_REPLY, &gl->gl_flags); + + if (ret == -ENODEV) { /* - * We skip telling dlm to do the locking, so we won't get a - * reply that would otherwise clear GLF_LOCK. So we clear it here. + * The lockspace has been released and the lock has + * been unlocked implicitly. */ - clear_bit(GLF_LOCK, &gl->gl_flags); - clear_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags); - gfs2_glock_queue_work(gl, GL_GLOCK_DFT_HOLD); - goto out; + if (target != LM_ST_UNLOCKED) { + target = LM_OUT_ERROR; + goto out; + } } else { - clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags); - } - } - - if (sdp->sd_lockstruct.ls_ops->lm_lock) { - /* lock_dlm */ - ret = sdp->sd_lockstruct.ls_ops->lm_lock(gl, target, lck_flags); - if (ret == -EINVAL && gl->gl_target == LM_ST_UNLOCKED && - target == LM_ST_UNLOCKED && - test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) { - finish_xmote(gl, target); - gfs2_glock_queue_work(gl, 0); - } else if (ret) { fs_err(sdp, "lm_lock ret %d\n", ret); GLOCK_BUG_ON(gl, !gfs2_withdrawn(sdp)); + return; } - } else { /* lock_nolock */ - finish_xmote(gl, target); - gfs2_glock_queue_work(gl, 0); } + out: - spin_lock(&gl->gl_lockref.lock); + /* Complete the operation now. */ + finish_xmote(gl, target); + gl->gl_lockref.count++; + gfs2_glock_queue_work(gl, 0); } /** @@ -815,15 +749,26 @@ static void run_queue(struct gfs2_glock *gl, const int nonblock) __releases(&gl->gl_lockref.lock) __acquires(&gl->gl_lockref.lock) { - struct gfs2_holder *gh = NULL; + struct gfs2_holder *gh; - if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) + if (test_bit(GLF_LOCK, &gl->gl_flags)) return; + set_bit(GLF_LOCK, &gl->gl_flags); + /* + * The GLF_DEMOTE_IN_PROGRESS flag is only set intermittently during + * locking operations. We have just started a locking operation by + * setting the GLF_LOCK flag, so the GLF_DEMOTE_IN_PROGRESS flag must + * be cleared. + */ GLOCK_BUG_ON(gl, test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags)); - if (test_bit(GLF_DEMOTE, &gl->gl_flags) && - gl->gl_demote_state != gl->gl_state) { + if (test_bit(GLF_DEMOTE, &gl->gl_flags)) { + if (gl->gl_demote_state == gl->gl_state) { + gfs2_demote_wake(gl); + goto promote; + } + if (find_first_holder(gl)) goto out_unlock; if (nonblock) @@ -831,30 +776,33 @@ __acquires(&gl->gl_lockref.lock) set_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags); GLOCK_BUG_ON(gl, gl->gl_demote_state == LM_ST_EXCLUSIVE); gl->gl_target = gl->gl_demote_state; - } else { - if (test_bit(GLF_DEMOTE, &gl->gl_flags)) - gfs2_demote_wake(gl); - if (do_promote(gl) == 0) - goto out_unlock; - gh = find_first_waiter(gl); - gl->gl_target = gh->gh_state; - if (!(gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB))) - do_error(gl, 0); /* Fail queued try locks */ + do_xmote(gl, NULL, gl->gl_target); + return; } + +promote: + do_promote(gl); + if (find_first_holder(gl)) + goto out_unlock; + gh = find_first_waiter(gl); + if (!gh) + goto out_unlock; + if (nonblock) + goto out_sched; + gl->gl_target = gh->gh_state; + if (!(gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB))) + do_error(gl, 0); /* Fail queued try locks */ do_xmote(gl, gh, gl->gl_target); return; out_sched: clear_bit(GLF_LOCK, &gl->gl_flags); - smp_mb__after_atomic(); gl->gl_lockref.count++; - __gfs2_glock_queue_work(gl, 0); + gfs2_glock_queue_work(gl, 0); return; out_unlock: clear_bit(GLF_LOCK, &gl->gl_flags); - smp_mb__after_atomic(); - return; } /** @@ -870,12 +818,8 @@ void glock_set_object(struct gfs2_glock *gl, void *object) prev_object = gl->gl_object; gl->gl_object = object; spin_unlock(&gl->gl_lockref.lock); - if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == NULL)) { - pr_warn("glock=%u/%llx\n", - gl->gl_name.ln_type, - (unsigned long long)gl->gl_name.ln_number); + if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == NULL)) gfs2_dump_glock(NULL, gl, true); - } } /** @@ -891,12 +835,8 @@ void glock_clear_object(struct gfs2_glock *gl, void *object) prev_object = gl->gl_object; gl->gl_object = NULL; spin_unlock(&gl->gl_lockref.lock); - if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == object)) { - pr_warn("glock=%u/%llx\n", - gl->gl_name.ln_type, - (unsigned long long)gl->gl_name.ln_number); + if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == object)) gfs2_dump_glock(NULL, gl, true); - } } void gfs2_inode_remember_delete(struct gfs2_glock *gl, u64 generation) @@ -931,48 +871,56 @@ static void gfs2_glock_poke(struct gfs2_glock *gl) gfs2_holder_uninit(&gh); } -static bool gfs2_try_evict(struct gfs2_glock *gl) +static struct gfs2_inode *gfs2_grab_existing_inode(struct gfs2_glock *gl) +{ + struct gfs2_inode *ip; + + spin_lock(&gl->gl_lockref.lock); + ip = gl->gl_object; + if (ip && !igrab(&ip->i_inode)) + ip = NULL; + spin_unlock(&gl->gl_lockref.lock); + if (ip) { + wait_on_new_inode(&ip->i_inode); + if (is_bad_inode(&ip->i_inode)) { + iput(&ip->i_inode); + ip = NULL; + } + } + return ip; +} + +static void gfs2_try_to_evict(struct gfs2_glock *gl) { struct gfs2_inode *ip; - bool evicted = false; /* * If there is contention on the iopen glock and we have an inode, try - * to grab and release the inode so that it can be evicted. This will - * allow the remote node to go ahead and delete the inode without us - * having to do it, which will avoid rgrp glock thrashing. + * to grab and release the inode so that it can be evicted. The + * GLF_DEFER_DELETE flag indicates to gfs2_evict_inode() that the inode + * should not be deleted locally. This will allow the remote node to + * go ahead and delete the inode without us having to do it, which will + * avoid rgrp glock thrashing. * * The remote node is likely still holding the corresponding inode * glock, so it will run before we get to verify that the delete has - * happened below. + * happened below. (Verification is triggered by the call to + * gfs2_queue_verify_delete() in gfs2_evict_inode().) */ - spin_lock(&gl->gl_lockref.lock); - ip = gl->gl_object; - if (ip && !igrab(&ip->i_inode)) - ip = NULL; - spin_unlock(&gl->gl_lockref.lock); + ip = gfs2_grab_existing_inode(gl); if (ip) { - gl->gl_no_formal_ino = ip->i_no_formal_ino; - set_bit(GIF_DEFERRED_DELETE, &ip->i_flags); + set_bit(GLF_DEFER_DELETE, &gl->gl_flags); d_prune_aliases(&ip->i_inode); iput(&ip->i_inode); + clear_bit(GLF_DEFER_DELETE, &gl->gl_flags); /* If the inode was evicted, gl->gl_object will now be NULL. */ - spin_lock(&gl->gl_lockref.lock); - ip = gl->gl_object; - if (ip) { - clear_bit(GIF_DEFERRED_DELETE, &ip->i_flags); - if (!igrab(&ip->i_inode)) - ip = NULL; - } - spin_unlock(&gl->gl_lockref.lock); + ip = gfs2_grab_existing_inode(gl); if (ip) { gfs2_glock_poke(ip->i_gl); iput(&ip->i_inode); } - evicted = !ip; } - return evicted; } bool gfs2_queue_try_to_evict(struct gfs2_glock *gl) @@ -981,18 +929,18 @@ bool gfs2_queue_try_to_evict(struct gfs2_glock *gl) if (test_and_set_bit(GLF_TRY_TO_EVICT, &gl->gl_flags)) return false; - return queue_delayed_work(sdp->sd_delete_wq, - &gl->gl_delete, 0); + return !mod_delayed_work(sdp->sd_delete_wq, &gl->gl_delete, 0); } -static bool gfs2_queue_verify_evict(struct gfs2_glock *gl) +bool gfs2_queue_verify_delete(struct gfs2_glock *gl, bool later) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + unsigned long delay; - if (test_and_set_bit(GLF_VERIFY_EVICT, &gl->gl_flags)) + if (test_and_set_bit(GLF_VERIFY_DELETE, &gl->gl_flags)) return false; - return queue_delayed_work(sdp->sd_delete_wq, - &gl->gl_delete, 5 * HZ); + delay = later ? HZ + get_random_long() % (HZ * 9) : 0; + return queue_delayed_work(sdp->sd_delete_wq, &gl->gl_delete, delay); } static void delete_work_func(struct work_struct *work) @@ -1000,43 +948,27 @@ static void delete_work_func(struct work_struct *work) struct delayed_work *dwork = to_delayed_work(work); struct gfs2_glock *gl = container_of(dwork, struct gfs2_glock, gl_delete); struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - struct inode *inode; - u64 no_addr = gl->gl_name.ln_number; + bool verify_delete = test_and_clear_bit(GLF_VERIFY_DELETE, &gl->gl_flags); - if (test_and_clear_bit(GLF_TRY_TO_EVICT, &gl->gl_flags)) { - /* - * If we can evict the inode, give the remote node trying to - * delete the inode some time before verifying that the delete - * has happened. Otherwise, if we cause contention on the inode glock - * immediately, the remote node will think that we still have - * the inode in use, and so it will give up waiting. - * - * If we can't evict the inode, signal to the remote node that - * the inode is still in use. We'll later try to delete the - * inode locally in gfs2_evict_inode. - * - * FIXME: We only need to verify that the remote node has - * deleted the inode because nodes before this remote delete - * rework won't cooperate. At a later time, when we no longer - * care about compatibility with such nodes, we can skip this - * step entirely. - */ - if (gfs2_try_evict(gl)) { - if (test_bit(SDF_DEACTIVATING, &sdp->sd_flags)) - goto out; - if (gfs2_queue_verify_evict(gl)) - return; - } - goto out; - } + /* + * Check for the GLF_VERIFY_DELETE above: this ensures that we won't + * immediately process GLF_VERIFY_DELETE work that the below call to + * gfs2_try_to_evict() queues. + */ + + if (test_and_clear_bit(GLF_TRY_TO_EVICT, &gl->gl_flags)) + gfs2_try_to_evict(gl); + + if (verify_delete) { + u64 no_addr = gl->gl_name.ln_number; + struct inode *inode; - if (test_and_clear_bit(GLF_VERIFY_EVICT, &gl->gl_flags)) { inode = gfs2_lookup_by_inum(sdp, no_addr, gl->gl_no_formal_ino, GFS2_BLKST_UNLINKED); if (IS_ERR(inode)) { if (PTR_ERR(inode) == -EAGAIN && - !test_bit(SDF_DEACTIVATING, &sdp->sd_flags) && - gfs2_queue_verify_evict(gl)) + !test_bit(SDF_KILL, &sdp->sd_flags) && + gfs2_queue_verify_delete(gl, true)) return; } else { d_prune_aliases(inode); @@ -1044,7 +976,6 @@ static void delete_work_func(struct work_struct *work) } } -out: gfs2_glock_put(gl); } @@ -1054,43 +985,44 @@ static void glock_work_func(struct work_struct *work) struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work); unsigned int drop_refs = 1; - if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) { + spin_lock(&gl->gl_lockref.lock); + if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags)) { + clear_bit(GLF_HAVE_REPLY, &gl->gl_flags); finish_xmote(gl, gl->gl_reply); drop_refs++; } - spin_lock(&gl->gl_lockref.lock); if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && gl->gl_state != LM_ST_UNLOCKED && gl->gl_demote_state != LM_ST_EXCLUSIVE) { - unsigned long holdtime, now = jiffies; + if (gl->gl_name.ln_type == LM_TYPE_INODE) { + unsigned long holdtime, now = jiffies; - holdtime = gl->gl_tchange + gl->gl_hold_time; - if (time_before(now, holdtime)) - delay = holdtime - now; + holdtime = gl->gl_tchange + gl->gl_hold_time; + if (time_before(now, holdtime)) + delay = holdtime - now; + } if (!delay) { clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags); - gfs2_set_demote(gl); + gfs2_set_demote(GLF_DEMOTE, gl); } } run_queue(gl, 0); if (delay) { /* Keep one glock reference for the work we requeue. */ drop_refs--; - if (gl->gl_name.ln_type != LM_TYPE_INODE) - delay = 0; - __gfs2_glock_queue_work(gl, delay); + gfs2_glock_queue_work(gl, delay); } - /* - * Drop the remaining glock references manually here. (Mind that - * __gfs2_glock_queue_work depends on the lockref spinlock begin held - * here as well.) - */ + /* Drop the remaining glock references manually. */ + GLOCK_BUG_ON(gl, gl->gl_lockref.count < drop_refs); gl->gl_lockref.count -= drop_refs; if (!gl->gl_lockref.count) { - __gfs2_glock_put(gl); - return; + if (gl->gl_state == LM_ST_UNLOCKED) { + __gfs2_glock_put(gl); + return; + } + gfs2_glock_add_to_lru(gl); } spin_unlock(&gl->gl_lockref.lock); } @@ -1126,6 +1058,8 @@ again: out: rcu_read_unlock(); finish_wait(wq, &wait.wait); + if (gl) + gfs2_glock_remove_from_lru(gl); return gl; } @@ -1146,19 +1080,15 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, const struct gfs2_glock_operations *glops, int create, struct gfs2_glock **glp) { - struct super_block *s = sdp->sd_vfs; struct lm_lockname name = { .ln_number = number, .ln_type = glops->go_type, .ln_sbd = sdp }; struct gfs2_glock *gl, *tmp; struct address_space *mapping; - int ret = 0; gl = find_insert_glock(&name, NULL); - if (gl) { - *glp = gl; - return 0; - } + if (gl) + goto found; if (!create) return -ENOENT; @@ -1186,10 +1116,12 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, atomic_inc(&sdp->sd_glock_disposal); gl->gl_node.next = NULL; - gl->gl_flags = glops->go_instantiate ? BIT(GLF_INSTANTIATE_NEEDED) : 0; + gl->gl_flags = BIT(GLF_INITIAL); + if (glops->go_instantiate) + gl->gl_flags |= BIT(GLF_INSTANTIATE_NEEDED); gl->gl_name = name; + lockref_init(&gl->gl_lockref); lockdep_set_subclass(&gl->gl_lockref.lock, glops->go_subclass); - gl->gl_lockref.count = 1; gl->gl_state = LM_ST_UNLOCKED; gl->gl_target = LM_ST_UNLOCKED; gl->gl_demote_state = LM_ST_EXCLUSIVE; @@ -1209,32 +1141,31 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, mapping = gfs2_glock2aspace(gl); if (mapping) { + gfp_t gfp_mask; + mapping->a_ops = &gfs2_meta_aops; - mapping->host = s->s_bdev->bd_inode; + mapping->host = sdp->sd_inode; mapping->flags = 0; - mapping_set_gfp_mask(mapping, GFP_NOFS); - mapping->private_data = NULL; + gfp_mask = mapping_gfp_mask(sdp->sd_inode->i_mapping); + mapping_set_gfp_mask(mapping, gfp_mask); + mapping->i_private_data = NULL; mapping->writeback_index = 0; } tmp = find_insert_glock(&name, gl); - if (!tmp) { - *glp = gl; - goto out; - } - if (IS_ERR(tmp)) { - ret = PTR_ERR(tmp); - goto out_free; - } - *glp = tmp; + if (tmp) { + gfs2_glock_dealloc(&gl->gl_rcu); + if (atomic_dec_and_test(&sdp->sd_glock_disposal)) + wake_up(&sdp->sd_kill_wait); -out_free: - gfs2_glock_dealloc(&gl->gl_rcu); - if (atomic_dec_and_test(&sdp->sd_glock_disposal)) - wake_up(&sdp->sd_glock_wait); + if (IS_ERR(tmp)) + return PTR_ERR(tmp); + gl = tmp; + } -out: - return ret; +found: + *glp = gl; + return 0; } /** @@ -1243,7 +1174,7 @@ out: * @state: the state we're requesting * @flags: the modifier flags * @gh: the holder structure - * + * @ip: caller's return address for debugging */ void __gfs2_holder_init(struct gfs2_glock *gl, unsigned int state, u16 flags, @@ -1404,7 +1335,7 @@ out: } /** - * handle_callback - process a demote request + * request_demote - process a demote request * @gl: the glock * @state: the state the caller wants us to change to * @delay: zero to demote immediately; otherwise pending demote @@ -1414,13 +1345,10 @@ out: * practise: LM_ST_SHARED and LM_ST_UNLOCKED */ -static void handle_callback(struct gfs2_glock *gl, unsigned int state, - unsigned long delay, bool remote) +static void request_demote(struct gfs2_glock *gl, unsigned int state, + unsigned long delay, bool remote) { - if (delay) - set_bit(GLF_PENDING_DEMOTE, &gl->gl_flags); - else - gfs2_set_demote(gl); + gfs2_set_demote(delay ? GLF_PENDING_DEMOTE : GLF_DEMOTE, gl); if (gl->gl_demote_state == LM_ST_EXCLUSIVE) { gl->gl_demote_state = state; gl->gl_demote_time = jiffies; @@ -1452,13 +1380,29 @@ void gfs2_print_dbg(struct seq_file *seq, const char *fmt, ...) va_end(args); } +static bool gfs2_should_queue_trylock(struct gfs2_glock *gl, + struct gfs2_holder *gh) +{ + struct gfs2_holder *current_gh, *gh2; + + current_gh = find_first_holder(gl); + if (current_gh && !may_grant(gl, current_gh, gh)) + return false; + + list_for_each_entry(gh2, &gl->gl_holders, gh_list) { + if (test_bit(HIF_HOLDER, &gh2->gh_iflags)) + continue; + if (!(gh2->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB))) + return false; + } + return true; +} + static inline bool pid_is_meaningful(const struct gfs2_holder *gh) { if (!(gh->gh_flags & GL_NOPID)) return true; - if (gh->gh_state == LM_ST_UNLOCKED) - return true; - return false; + return !test_bit(HIF_HOLDER, &gh->gh_iflags); } /** @@ -1472,28 +1416,20 @@ static inline bool pid_is_meaningful(const struct gfs2_holder *gh) */ static inline void add_to_queue(struct gfs2_holder *gh) -__releases(&gl->gl_lockref.lock) -__acquires(&gl->gl_lockref.lock) { struct gfs2_glock *gl = gh->gh_gl; struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - struct list_head *insert_pt = NULL; struct gfs2_holder *gh2; - int try_futile = 0; GLOCK_BUG_ON(gl, gh->gh_owner_pid == NULL); if (test_and_set_bit(HIF_WAIT, &gh->gh_iflags)) GLOCK_BUG_ON(gl, true); - if (gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) { - if (test_bit(GLF_LOCK, &gl->gl_flags)) { - struct gfs2_holder *current_gh; - - current_gh = find_first_holder(gl); - try_futile = !may_grant(gl, current_gh, gh); - } - if (test_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags)) - goto fail; + if ((gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) && + !gfs2_should_queue_trylock(gl, gh)) { + gh->gh_error = GLR_TRYFAILED; + gfs2_holder_wake(gh); + return; } list_for_each_entry(gh2, &gl->gl_holders, gh_list) { @@ -1505,37 +1441,10 @@ __acquires(&gl->gl_lockref.lock) continue; goto trap_recursive; } - list_for_each_entry(gh2, &gl->gl_holders, gh_list) { - if (try_futile && - !(gh2->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB))) { -fail: - gh->gh_error = GLR_TRYFAILED; - gfs2_holder_wake(gh); - return; - } - if (test_bit(HIF_HOLDER, &gh2->gh_iflags)) - continue; - if (unlikely((gh->gh_flags & LM_FLAG_PRIORITY) && !insert_pt)) - insert_pt = &gh2->gh_list; - } trace_gfs2_glock_queue(gh, 1); gfs2_glstats_inc(gl, GFS2_LKS_QCOUNT); gfs2_sbstats_inc(gl, GFS2_LKS_QCOUNT); - if (likely(insert_pt == NULL)) { - list_add_tail(&gh->gh_list, &gl->gl_holders); - if (unlikely(gh->gh_flags & LM_FLAG_PRIORITY)) - goto do_cancel; - return; - } - list_add_tail(&gh->gh_list, insert_pt); -do_cancel: - gh = list_first_entry(&gl->gl_holders, struct gfs2_holder, gh_list); - if (!(gh->gh_flags & LM_FLAG_PRIORITY)) { - spin_unlock(&gl->gl_lockref.lock); - if (sdp->sd_lockstruct.ls_ops->lm_cancel) - sdp->sd_lockstruct.ls_ops->lm_cancel(gl); - spin_lock(&gl->gl_lockref.lock); - } + list_add_tail(&gh->gh_list, &gl->gl_holders); return; trap_recursive: @@ -1563,26 +1472,44 @@ trap_recursive: int gfs2_glock_nq(struct gfs2_holder *gh) { struct gfs2_glock *gl = gh->gh_gl; - int error = 0; + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + int error; - if (glock_blocked_by_withdraw(gl) && !(gh->gh_flags & LM_FLAG_NOEXP)) + if (gfs2_withdrawn(sdp)) return -EIO; - if (test_bit(GLF_LRU, &gl->gl_flags)) - gfs2_glock_remove_from_lru(gl); + if (gh->gh_flags & GL_NOBLOCK) { + struct gfs2_holder *current_gh; + + error = -ECHILD; + spin_lock(&gl->gl_lockref.lock); + if (find_last_waiter(gl)) + goto unlock; + current_gh = find_first_holder(gl); + if (!may_grant(gl, current_gh, gh)) + goto unlock; + set_bit(HIF_HOLDER, &gh->gh_iflags); + list_add_tail(&gh->gh_list, &gl->gl_holders); + trace_gfs2_promote(gh); + error = 0; +unlock: + spin_unlock(&gl->gl_lockref.lock); + return error; + } gh->gh_error = 0; spin_lock(&gl->gl_lockref.lock); add_to_queue(gh); - if (unlikely((LM_FLAG_NOEXP & gh->gh_flags) && - test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) { - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + if (unlikely((LM_FLAG_RECOVER & gh->gh_flags) && + test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags))) { + set_bit(GLF_HAVE_REPLY, &gl->gl_flags); gl->gl_lockref.count++; - __gfs2_glock_queue_work(gl, 0); + gfs2_glock_queue_work(gl, 0); } run_queue(gl, 1); spin_unlock(&gl->gl_lockref.lock); + error = 0; if (!(gh->gh_flags & GL_ASYNC)) error = gfs2_glock_wait(gh); @@ -1601,12 +1528,6 @@ int gfs2_glock_poll(struct gfs2_holder *gh) return test_bit(HIF_WAIT, &gh->gh_iflags) ? 0 : 1; } -static inline bool needs_demote(struct gfs2_glock *gl) -{ - return (test_bit(GLF_DEMOTE, &gl->gl_flags) || - test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags)); -} - static void __gfs2_glock_dq(struct gfs2_holder *gh) { struct gfs2_glock *gl = gh->gh_gl; @@ -1615,11 +1536,11 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh) /* * This holder should not be cached, so mark it for demote. - * Note: this should be done before the check for needs_demote - * below. + * Note: this should be done before the glock_needs_demote + * check below. */ if (gh->gh_flags & GL_NOCACHE) - handle_callback(gl, LM_ST_UNLOCKED, 0, false); + request_demote(gl, LM_ST_UNLOCKED, 0, false); list_del_init(&gh->gh_list); clear_bit(HIF_HOLDER, &gh->gh_iflags); @@ -1629,21 +1550,18 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh) * If there hasn't been a demote request we are done. * (Let the remaining holders, if any, keep holding it.) */ - if (!needs_demote(gl)) { + if (!glock_needs_demote(gl)) { if (list_empty(&gl->gl_holders)) fast_path = 1; } - if (!test_bit(GLF_LFLUSH, &gl->gl_flags) && demote_ok(gl)) - gfs2_glock_add_to_lru(gl); - if (unlikely(!fast_path)) { gl->gl_lockref.count++; if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && !test_bit(GLF_DEMOTE, &gl->gl_flags) && gl->gl_name.ln_type == LM_TYPE_INODE) delay = gl->gl_hold_time; - __gfs2_glock_queue_work(gl, delay); + gfs2_glock_queue_work(gl, delay); } } @@ -1655,7 +1573,6 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh) void gfs2_glock_dq(struct gfs2_holder *gh) { struct gfs2_glock *gl = gh->gh_gl; - struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; spin_lock(&gl->gl_lockref.lock); if (!gfs2_holder_queued(gh)) { @@ -1667,29 +1584,19 @@ void gfs2_glock_dq(struct gfs2_holder *gh) } if (list_is_first(&gh->gh_list, &gl->gl_holders) && - !test_bit(HIF_HOLDER, &gh->gh_iflags)) { + !test_bit(HIF_HOLDER, &gh->gh_iflags) && + test_bit(GLF_LOCK, &gl->gl_flags) && + !test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags) && + !test_bit(GLF_CANCELING, &gl->gl_flags)) { + set_bit(GLF_CANCELING, &gl->gl_flags); spin_unlock(&gl->gl_lockref.lock); gl->gl_name.ln_sbd->sd_lockstruct.ls_ops->lm_cancel(gl); wait_on_bit(&gh->gh_iflags, HIF_WAIT, TASK_UNINTERRUPTIBLE); spin_lock(&gl->gl_lockref.lock); - } - - /* - * If we're in the process of file system withdraw, we cannot just - * dequeue any glocks until our journal is recovered, lest we introduce - * file system corruption. We need two exceptions to this rule: We need - * to allow unlocking of nondisk glocks and the glock for our own - * journal that needs recovery. - */ - if (test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags) && - glock_blocked_by_withdraw(gl) && - gh->gh_gl != sdp->sd_jinode_gl) { - sdp->sd_glock_dqs_held++; - spin_unlock(&gl->gl_lockref.lock); - might_sleep(); - wait_on_bit(&sdp->sd_flags, SDF_WITHDRAW_RECOVERY, - TASK_UNINTERRUPTIBLE); - spin_lock(&gl->gl_lockref.lock); + clear_bit(GLF_CANCELING, &gl->gl_flags); + clear_bit(GLF_LOCK, &gl->gl_flags); + if (!gfs2_holder_queued(gh)) + goto out; } __gfs2_glock_dq(gh); @@ -1853,21 +1760,23 @@ void gfs2_glock_dq_m(unsigned int num_gh, struct gfs2_holder *ghs) void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state) { unsigned long delay = 0; - unsigned long holdtime; - unsigned long now = jiffies; gfs2_glock_hold(gl); spin_lock(&gl->gl_lockref.lock); - holdtime = gl->gl_tchange + gl->gl_hold_time; if (!list_empty(&gl->gl_holders) && gl->gl_name.ln_type == LM_TYPE_INODE) { + unsigned long now = jiffies; + unsigned long holdtime; + + holdtime = gl->gl_tchange + gl->gl_hold_time; + if (time_before(now, holdtime)) delay = holdtime - now; - if (test_bit(GLF_REPLY_PENDING, &gl->gl_flags)) + if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags)) delay = gl->gl_hold_time; } - handle_callback(gl, state, delay, true); - __gfs2_glock_queue_work(gl, delay); + request_demote(gl, state, delay, true); + gfs2_glock_queue_work(gl, delay); spin_unlock(&gl->gl_lockref.lock); } @@ -1877,7 +1786,7 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state) * * Glocks are not frozen if (a) the result of the dlm operation is * an error, (b) the locking operation was an unlock operation or - * (c) if there is a "noexp" flagged request anywhere in the queue + * (c) if there is a "recover" flagged request anywhere in the queue * * Returns: 1 if freezing should occur, 0 otherwise */ @@ -1894,7 +1803,7 @@ static int gfs2_should_freeze(const struct gfs2_glock *gl) list_for_each_entry(gh, &gl->gl_holders, gh_list) { if (test_bit(HIF_HOLDER, &gh->gh_iflags)) continue; - if (LM_FLAG_NOEXP & gh->gh_flags) + if (LM_FLAG_RECOVER & gh->gh_flags) return 0; } @@ -1915,19 +1824,20 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret) struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct; spin_lock(&gl->gl_lockref.lock); + clear_bit(GLF_PENDING_REPLY, &gl->gl_flags); gl->gl_reply = ret; if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags))) { if (gfs2_should_freeze(gl)) { - set_bit(GLF_FROZEN, &gl->gl_flags); + set_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags); spin_unlock(&gl->gl_lockref.lock); return; } } gl->gl_lockref.count++; - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); - __gfs2_glock_queue_work(gl, 0); + set_bit(GLF_HAVE_REPLY, &gl->gl_flags); + gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); } @@ -1947,6 +1857,16 @@ static int glock_cmp(void *priv, const struct list_head *a, return 0; } +static bool can_free_glock(struct gfs2_glock *gl) +{ + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + + return !test_bit(GLF_LOCK, &gl->gl_flags) && + !gl->gl_lockref.count && + (!test_bit(GLF_LFLUSH, &gl->gl_flags) || + test_bit(SDF_KILL, &sdp->sd_flags)); +} + /** * gfs2_dispose_glock_lru - Demote a list of glocks * @list: The list to dispose of @@ -1961,37 +1881,38 @@ static int glock_cmp(void *priv, const struct list_head *a, * private) */ -static void gfs2_dispose_glock_lru(struct list_head *list) +static unsigned long gfs2_dispose_glock_lru(struct list_head *list) __releases(&lru_lock) __acquires(&lru_lock) { struct gfs2_glock *gl; + unsigned long freed = 0; list_sort(NULL, list, glock_cmp); while(!list_empty(list)) { gl = list_first_entry(list, struct gfs2_glock, gl_lru); - list_del_init(&gl->gl_lru); - clear_bit(GLF_LRU, &gl->gl_flags); if (!spin_trylock(&gl->gl_lockref.lock)) { add_back_to_lru: - list_add(&gl->gl_lru, &lru_list); - set_bit(GLF_LRU, &gl->gl_flags); - atomic_inc(&lru_count); + list_move(&gl->gl_lru, &lru_list); continue; } - if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) { + if (!can_free_glock(gl)) { spin_unlock(&gl->gl_lockref.lock); goto add_back_to_lru; } + list_del_init(&gl->gl_lru); + atomic_dec(&lru_count); + clear_bit(GLF_LRU, &gl->gl_flags); + freed++; gl->gl_lockref.count++; - if (demote_ok(gl)) - handle_callback(gl, LM_ST_UNLOCKED, 0, false); - WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags)); - __gfs2_glock_queue_work(gl, 0); + if (gl->gl_state != LM_ST_UNLOCKED) + request_demote(gl, LM_ST_UNLOCKED, 0, false); + gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); cond_resched_lock(&lru_lock); } + return freed; } /** @@ -2003,30 +1924,21 @@ add_back_to_lru: * gfs2_dispose_glock_lru() above. */ -static long gfs2_scan_glock_lru(int nr) +static unsigned long gfs2_scan_glock_lru(unsigned long nr) { struct gfs2_glock *gl, *next; LIST_HEAD(dispose); - long freed = 0; + unsigned long freed = 0; spin_lock(&lru_lock); list_for_each_entry_safe(gl, next, &lru_list, gl_lru) { - if (nr-- <= 0) + if (!nr--) break; - /* Test for being demotable */ - if (!test_bit(GLF_LOCK, &gl->gl_flags)) { - if (!spin_trylock(&gl->gl_lockref.lock)) - continue; - if (!gl->gl_lockref.count) { - list_move(&gl->gl_lru, &dispose); - atomic_dec(&lru_count); - freed++; - } - spin_unlock(&gl->gl_lockref.lock); - } + if (can_free_glock(gl)) + list_move(&gl->gl_lru, &dispose); } if (!list_empty(&dispose)) - gfs2_dispose_glock_lru(&dispose); + freed = gfs2_dispose_glock_lru(&dispose); spin_unlock(&lru_lock); return freed; @@ -2046,11 +1958,7 @@ static unsigned long gfs2_glock_shrink_count(struct shrinker *shrink, return vfs_pressure_ratio(atomic_read(&lru_count)); } -static struct shrinker glock_shrinker = { - .seeks = DEFAULT_SEEKS, - .count_objects = gfs2_glock_shrink_count, - .scan_objects = gfs2_glock_shrink_scan, -}; +static struct shrinker *glock_shrinker; /** * glock_hash_walk - Call a function for glock in a hash bucket @@ -2086,7 +1994,7 @@ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp) void gfs2_cancel_delete_work(struct gfs2_glock *gl) { clear_bit(GLF_TRY_TO_EVICT, &gl->gl_flags); - clear_bit(GLF_VERIFY_EVICT, &gl->gl_flags); + clear_bit(GLF_VERIFY_DELETE, &gl->gl_flags); if (cancel_delayed_work(&gl->gl_delete)) gfs2_glock_put(gl); } @@ -2117,12 +2025,16 @@ void gfs2_flush_delete_work(struct gfs2_sbd *sdp) static void thaw_glock(struct gfs2_glock *gl) { - if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) + if (!test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags)) return; if (!lockref_get_not_dead(&gl->gl_lockref)) return; - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + + gfs2_glock_remove_from_lru(gl); + spin_lock(&gl->gl_lockref.lock); + set_bit(GLF_HAVE_REPLY, &gl->gl_flags); gfs2_glock_queue_work(gl, 0); + spin_unlock(&gl->gl_lockref.lock); } /** @@ -2139,8 +2051,8 @@ static void clear_glock(struct gfs2_glock *gl) if (!__lockref_is_dead(&gl->gl_lockref)) { gl->gl_lockref.count++; if (gl->gl_state != LM_ST_UNLOCKED) - handle_callback(gl, LM_ST_UNLOCKED, 0, false); - __gfs2_glock_queue_work(gl, 0); + request_demote(gl, LM_ST_UNLOCKED, 0, false); + gfs2_glock_queue_work(gl, 0); } spin_unlock(&gl->gl_lockref.lock); } @@ -2168,18 +2080,26 @@ static void dump_glock_func(struct gfs2_glock *gl) dump_glock(NULL, gl, true); } -static void withdraw_dq(struct gfs2_glock *gl) +static void withdraw_glock(struct gfs2_glock *gl) { spin_lock(&gl->gl_lockref.lock); - if (!__lockref_is_dead(&gl->gl_lockref) && - glock_blocked_by_withdraw(gl)) + if (!__lockref_is_dead(&gl->gl_lockref)) { + /* + * We don't want to write back any more dirty data. Unlock the + * remaining inode and resource group glocks; this will cause + * their ->go_inval() hooks to toss out all the remaining + * cached data, dirty or not. + */ + if (gl->gl_ops->go_inval && gl->gl_state != LM_ST_UNLOCKED) + request_demote(gl, LM_ST_UNLOCKED, 0, false); do_error(gl, LM_OUT_ERROR); /* remove pending waiters */ + } spin_unlock(&gl->gl_lockref.lock); } -void gfs2_gl_dq_holders(struct gfs2_sbd *sdp) +void gfs2_withdraw_glocks(struct gfs2_sbd *sdp) { - glock_hash_walk(withdraw_dq, sdp); + glock_hash_walk(withdraw_glock, sdp); } /** @@ -2191,14 +2111,31 @@ void gfs2_gl_dq_holders(struct gfs2_sbd *sdp) void gfs2_gl_hash_clear(struct gfs2_sbd *sdp) { + unsigned long start = jiffies; + bool timed_out = false; + set_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags); - flush_workqueue(glock_workqueue); + flush_workqueue(sdp->sd_glock_wq); glock_hash_walk(clear_glock, sdp); - flush_workqueue(glock_workqueue); - wait_event_timeout(sdp->sd_glock_wait, - atomic_read(&sdp->sd_glock_disposal) == 0, - HZ * 600); + flush_workqueue(sdp->sd_glock_wq); + + while (!timed_out) { + wait_event_timeout(sdp->sd_kill_wait, + !atomic_read(&sdp->sd_glock_disposal), + HZ * 60); + if (!atomic_read(&sdp->sd_glock_disposal)) + break; + timed_out = time_after(jiffies, start + (HZ * 600)); + fs_warn(sdp, "%u glocks left after %u seconds%s\n", + atomic_read(&sdp->sd_glock_disposal), + jiffies_to_msecs(jiffies - start) / 1000, + timed_out ? ":" : "; still waiting"); + } + gfs2_lm_unmount(sdp); + gfs2_free_dead_glocks(sdp); glock_hash_walk(dump_glock_func, sdp); + destroy_workqueue(sdp->sd_glock_wq); + sdp->sd_glock_wq = NULL; } static const char *state2str(unsigned state) @@ -2223,12 +2160,10 @@ static const char *hflags2str(char *buf, u16 flags, unsigned long iflags) *p++ = 't'; if (flags & LM_FLAG_TRY_1CB) *p++ = 'T'; - if (flags & LM_FLAG_NOEXP) + if (flags & LM_FLAG_RECOVER) *p++ = 'e'; if (flags & LM_FLAG_ANY) *p++ = 'A'; - if (flags & LM_FLAG_PRIORITY) - *p++ = 'p'; if (flags & LM_FLAG_NODE_SCOPE) *p++ = 'n'; if (flags & GL_ASYNC) @@ -2296,13 +2231,13 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl) *p++ = 'y'; if (test_bit(GLF_LFLUSH, gflags)) *p++ = 'f'; - if (test_bit(GLF_INVALIDATE_IN_PROGRESS, gflags)) - *p++ = 'i'; - if (test_bit(GLF_REPLY_PENDING, gflags)) + if (test_bit(GLF_PENDING_REPLY, gflags)) + *p++ = 'R'; + if (test_bit(GLF_HAVE_REPLY, gflags)) *p++ = 'r'; if (test_bit(GLF_INITIAL, gflags)) - *p++ = 'I'; - if (test_bit(GLF_FROZEN, gflags)) + *p++ = 'a'; + if (test_bit(GLF_HAVE_FROZEN_REPLY, gflags)) *p++ = 'F'; if (!list_empty(&gl->gl_holders)) *p++ = 'q'; @@ -2312,16 +2247,18 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl) *p++ = 'o'; if (test_bit(GLF_BLOCKING, gflags)) *p++ = 'b'; - if (test_bit(GLF_FREEING, gflags)) - *p++ = 'x'; if (test_bit(GLF_INSTANTIATE_NEEDED, gflags)) *p++ = 'n'; if (test_bit(GLF_INSTANTIATE_IN_PROG, gflags)) *p++ = 'N'; if (test_bit(GLF_TRY_TO_EVICT, gflags)) *p++ = 'e'; - if (test_bit(GLF_VERIFY_EVICT, gflags)) + if (test_bit(GLF_VERIFY_DELETE, gflags)) *p++ = 'E'; + if (test_bit(GLF_DEFER_DELETE, gflags)) + *p++ = 's'; + if (test_bit(GLF_CANCELING, gflags)) + *p++ = 'C'; *p = 0; return buf; } @@ -2465,19 +2402,16 @@ int __init gfs2_glock_init(void) if (ret < 0) return ret; - glock_workqueue = alloc_workqueue("glock_workqueue", WQ_MEM_RECLAIM | - WQ_HIGHPRI | WQ_FREEZABLE, 0); - if (!glock_workqueue) { + glock_shrinker = shrinker_alloc(0, "gfs2-glock"); + if (!glock_shrinker) { rhashtable_destroy(&gl_hash_table); return -ENOMEM; } - ret = register_shrinker(&glock_shrinker, "gfs2-glock"); - if (ret) { - destroy_workqueue(glock_workqueue); - rhashtable_destroy(&gl_hash_table); - return ret; - } + glock_shrinker->count_objects = gfs2_glock_shrink_count; + glock_shrinker->scan_objects = gfs2_glock_shrink_scan; + + shrinker_register(glock_shrinker); for (i = 0; i < GLOCK_WAIT_TABLE_SIZE; i++) init_waitqueue_head(glock_wait_table + i); @@ -2487,9 +2421,8 @@ int __init gfs2_glock_init(void) void gfs2_glock_exit(void) { - unregister_shrinker(&glock_shrinker); + shrinker_free(glock_shrinker); rhashtable_destroy(&gl_hash_table); - destroy_workqueue(glock_workqueue); } static void gfs2_glock_iter_next(struct gfs2_glock_iter *gi, loff_t n) @@ -2499,8 +2432,7 @@ static void gfs2_glock_iter_next(struct gfs2_glock_iter *gi, loff_t n) if (gl) { if (n == 0) return; - if (!lockref_put_not_zero(&gl->gl_lockref)) - gfs2_glock_queue_put(gl); + gfs2_glock_put_async(gl); } for (;;) { gl = rhashtable_walk_next(&gi->hti); @@ -2722,22 +2654,18 @@ static struct file *gfs2_glockfd_next_file(struct gfs2_glockfd_iter *i) i->file = NULL; } - rcu_read_lock(); for(;; i->fd++) { - struct inode *inode; - - i->file = task_lookup_next_fd_rcu(i->task, &i->fd); + i->file = fget_task_next(i->task, &i->fd); if (!i->file) { i->fd = 0; break; } - inode = file_inode(i->file); - if (inode->i_sb != i->sb) - continue; - if (get_file_rcu(i->file)) + + if (file_inode(i->file)->i_sb == i->sb) break; + + fput(i->file); } - rcu_read_unlock(); return i->file; } |
