diff options
Diffstat (limited to 'fs/gfs2/glock.c')
-rw-r--r-- | fs/gfs2/glock.c | 638 |
1 files changed, 314 insertions, 324 deletions
diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index 34540f9d011c..ba25b884169e 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -34,8 +34,8 @@ #include <linux/lockref.h> #include <linux/rhashtable.h> #include <linux/pid_namespace.h> -#include <linux/fdtable.h> #include <linux/file.h> +#include <linux/random.h> #include "gfs2.h" #include "incore.h" @@ -61,12 +61,10 @@ struct gfs2_glock_iter { typedef void (*glock_examiner) (struct gfs2_glock * gl); static void do_xmote(struct gfs2_glock *gl, struct gfs2_holder *gh, unsigned int target); -static void __gfs2_glock_dq(struct gfs2_holder *gh); -static void handle_callback(struct gfs2_glock *gl, unsigned int state, - unsigned long delay, bool remote); +static void request_demote(struct gfs2_glock *gl, unsigned int state, + unsigned long delay, bool remote); static struct dentry *gfs2_root; -static struct workqueue_struct *glock_workqueue; static LIST_HEAD(lru_list); static atomic_t lru_count = ATOMIC_INIT(0); static DEFINE_SPINLOCK(lru_lock); @@ -166,19 +164,45 @@ static bool glock_blocked_by_withdraw(struct gfs2_glock *gl) return true; } -void gfs2_glock_free(struct gfs2_glock *gl) +static void __gfs2_glock_free(struct gfs2_glock *gl) { - struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - - gfs2_glock_assert_withdraw(gl, atomic_read(&gl->gl_revokes) == 0); rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms); smp_mb(); wake_up_glock(gl); call_rcu(&gl->gl_rcu, gfs2_glock_dealloc); +} + +void gfs2_glock_free(struct gfs2_glock *gl) { + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + + __gfs2_glock_free(gl); if (atomic_dec_and_test(&sdp->sd_glock_disposal)) wake_up(&sdp->sd_kill_wait); } +void gfs2_glock_free_later(struct gfs2_glock *gl) { + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + + spin_lock(&lru_lock); + list_add(&gl->gl_lru, &sdp->sd_dead_glocks); + spin_unlock(&lru_lock); + if (atomic_dec_and_test(&sdp->sd_glock_disposal)) + wake_up(&sdp->sd_kill_wait); +} + +static void gfs2_free_dead_glocks(struct gfs2_sbd *sdp) +{ + struct list_head *list = &sdp->sd_dead_glocks; + + while(!list_empty(list)) { + struct gfs2_glock *gl; + + gl = list_first_entry(list, struct gfs2_glock, gl_lru); + list_del_init(&gl->gl_lru); + __gfs2_glock_free(gl); + } +} + /** * gfs2_glock_hold() - increment reference count on glock * @gl: The glock to hold @@ -192,34 +216,9 @@ struct gfs2_glock *gfs2_glock_hold(struct gfs2_glock *gl) return gl; } -/** - * demote_ok - Check to see if it's ok to unlock a glock - * @gl: the glock - * - * Returns: 1 if it's ok - */ - -static int demote_ok(const struct gfs2_glock *gl) +static void gfs2_glock_add_to_lru(struct gfs2_glock *gl) { - const struct gfs2_glock_operations *glops = gl->gl_ops; - - if (gl->gl_state == LM_ST_UNLOCKED) - return 0; - if (!list_empty(&gl->gl_holders)) - return 0; - if (glops->go_demote_ok) - return glops->go_demote_ok(gl); - return 1; -} - - -void gfs2_glock_add_to_lru(struct gfs2_glock *gl) -{ - if (!(gl->gl_ops->go_flags & GLOF_LRU)) - return; - spin_lock(&lru_lock); - list_move_tail(&gl->gl_lru, &lru_list); if (!test_bit(GLF_LRU, &gl->gl_flags)) { @@ -232,9 +231,6 @@ void gfs2_glock_add_to_lru(struct gfs2_glock *gl) static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl) { - if (!(gl->gl_ops->go_flags & GLOF_LRU)) - return; - spin_lock(&lru_lock); if (test_bit(GLF_LRU, &gl->gl_flags)) { list_del_init(&gl->gl_lru); @@ -248,8 +244,10 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl) * Enqueue the glock on the work queue. Passes one glock reference on to the * work queue. */ -static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { - if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) { +static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + + if (!queue_delayed_work(sdp->sd_glock_wq, &gl->gl_work, delay)) { /* * We are holding the lockref spinlock, and the work was still * queued above. The queued work (glock_work_func) takes that @@ -261,12 +259,6 @@ static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) } } -static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { - spin_lock(&gl->gl_lockref.lock); - __gfs2_glock_queue_work(gl, delay); - spin_unlock(&gl->gl_lockref.lock); -} - static void __gfs2_glock_put(struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; @@ -285,12 +277,18 @@ static void __gfs2_glock_put(struct gfs2_glock *gl) sdp->sd_lockstruct.ls_ops->lm_put_lock(gl); } -/* - * Cause the glock to be put in work queue context. - */ -void gfs2_glock_queue_put(struct gfs2_glock *gl) +static bool __gfs2_glock_put_or_lock(struct gfs2_glock *gl) { - gfs2_glock_queue_work(gl, 0); + if (lockref_put_or_lock(&gl->gl_lockref)) + return true; + GLOCK_BUG_ON(gl, gl->gl_lockref.count != 1); + if (gl->gl_state != LM_ST_UNLOCKED) { + gl->gl_lockref.count--; + gfs2_glock_add_to_lru(gl); + spin_unlock(&gl->gl_lockref.lock); + return true; + } + return false; } /** @@ -301,12 +299,28 @@ void gfs2_glock_queue_put(struct gfs2_glock *gl) void gfs2_glock_put(struct gfs2_glock *gl) { - if (lockref_put_or_lock(&gl->gl_lockref)) + if (__gfs2_glock_put_or_lock(gl)) return; __gfs2_glock_put(gl); } +/* + * gfs2_glock_put_async - Decrement reference count without sleeping + * @gl: The glock to put + * + * Decrement the reference count on glock immediately unless it is the last + * reference. Defer putting the last reference to work queue context. + */ +void gfs2_glock_put_async(struct gfs2_glock *gl) +{ + if (__gfs2_glock_put_or_lock(gl)) + return; + + gfs2_glock_queue_work(gl, 0); + spin_unlock(&gl->gl_lockref.lock); +} + /** * may_grant - check if it's ok to grant a new lock * @gl: The glock @@ -541,18 +555,6 @@ static inline struct gfs2_holder *find_last_waiter(const struct gfs2_glock *gl) static void state_change(struct gfs2_glock *gl, unsigned int new_state) { - int held1, held2; - - held1 = (gl->gl_state != LM_ST_UNLOCKED); - held2 = (new_state != LM_ST_UNLOCKED); - - if (held1 != held2) { - GLOCK_BUG_ON(gl, __lockref_is_dead(&gl->gl_lockref)); - if (held2) - gl->gl_lockref.count++; - else - gl->gl_lockref.count--; - } if (new_state != gl->gl_target) /* shorten our minimum hold time */ gl->gl_hold_time = max(gl->gl_hold_time - GL_GLOCK_HOLD_DECR, @@ -561,11 +563,11 @@ static void state_change(struct gfs2_glock *gl, unsigned int new_state) gl->gl_tchange = jiffies; } -static void gfs2_set_demote(struct gfs2_glock *gl) +static void gfs2_set_demote(int nr, struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - set_bit(GLF_DEMOTE, &gl->gl_flags); + set_bit(nr, &gl->gl_flags); smp_mb(); wake_up(&sdp->sd_async_glock_wait); } @@ -591,7 +593,6 @@ static void finish_xmote(struct gfs2_glock *gl, unsigned int ret) struct gfs2_holder *gh; unsigned state = ret & LM_OUT_ST_MASK; - spin_lock(&gl->gl_lockref.lock); trace_gfs2_glock_state_change(gl, state); state_change(gl, state); gh = find_first_waiter(gl); @@ -606,14 +607,19 @@ static void finish_xmote(struct gfs2_glock *gl, unsigned int ret) if (gh && (ret & LM_OUT_CANCELED)) gfs2_holder_wake(gh); if (gh && !test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags)) { - /* move to back of queue and try next entry */ if (ret & LM_OUT_CANCELED) { - list_move_tail(&gh->gh_list, &gl->gl_holders); + list_del_init(&gh->gh_list); + trace_gfs2_glock_queue(gh, 0); + gl->gl_target = gl->gl_state; gh = find_first_waiter(gl); - gl->gl_target = gh->gh_state; - if (do_promote(gl)) - goto out; - goto retry; + if (gh) { + gl->gl_target = gh->gh_state; + if (do_promote(gl)) + goto out; + do_xmote(gl, gh, gl->gl_target); + return; + } + goto out; } /* Some error or failed "try lock" - report it */ if ((ret & LM_OUT_ERROR) || @@ -626,7 +632,6 @@ static void finish_xmote(struct gfs2_glock *gl, unsigned int ret) switch(state) { /* Unlocked due to conversion deadlock, try again */ case LM_ST_UNLOCKED: -retry: do_xmote(gl, gh, gl->gl_target); break; /* Conversion fails, unlock and try again */ @@ -639,7 +644,6 @@ retry: gl->gl_target, state); GLOCK_BUG_ON(gl, 1); } - spin_unlock(&gl->gl_lockref.lock); return; } @@ -661,8 +665,8 @@ retry: do_promote(gl); } out: - clear_bit(GLF_LOCK, &gl->gl_flags); - spin_unlock(&gl->gl_lockref.lock); + if (!test_bit(GLF_CANCELING, &gl->gl_flags)) + clear_bit(GLF_LOCK, &gl->gl_flags); } static bool is_system_glock(struct gfs2_glock *gl) @@ -690,6 +694,7 @@ __acquires(&gl->gl_lockref.lock) { const struct gfs2_glock_operations *glops = gl->gl_ops; struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + struct lm_lockstruct *ls = &sdp->sd_lockstruct; unsigned int lck_flags = (unsigned int)(gh ? gh->gh_flags : 0); int ret; @@ -718,6 +723,9 @@ __acquires(&gl->gl_lockref.lock) (gl->gl_state == LM_ST_EXCLUSIVE) || (lck_flags & (LM_FLAG_TRY|LM_FLAG_TRY_1CB))) clear_bit(GLF_BLOCKING, &gl->gl_flags); + if (!glops->go_inval && !glops->go_sync) + goto skip_inval; + spin_unlock(&gl->gl_lockref.lock); if (glops->go_sync) { ret = glops->go_sync(gl); @@ -730,6 +738,7 @@ __acquires(&gl->gl_lockref.lock) fs_err(sdp, "Error %d syncing glock \n", ret); gfs2_dump_glock(NULL, gl, true); } + spin_lock(&gl->gl_lockref.lock); goto skip_inval; } } @@ -750,9 +759,10 @@ __acquires(&gl->gl_lockref.lock) glops->go_inval(gl, target == LM_ST_DEFERRED ? 0 : DIO_METADATA); clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags); } + spin_lock(&gl->gl_lockref.lock); skip_inval: - gfs2_glock_hold(gl); + gl->gl_lockref.count++; /* * Check for an error encountered since we called go_sync and go_inval. * If so, we can't withdraw from the glock code because the withdraw @@ -780,7 +790,7 @@ skip_inval: (target != LM_ST_UNLOCKED || test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags))) { if (!is_system_glock(gl)) { - handle_callback(gl, LM_ST_UNLOCKED, 0, false); /* sets demote */ + request_demote(gl, LM_ST_UNLOCKED, 0, false); /* * Ordinarily, we would call dlm and its callback would call * finish_xmote, which would call state_change() to the new state. @@ -795,30 +805,38 @@ skip_inval: clear_bit(GLF_LOCK, &gl->gl_flags); clear_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags); gfs2_glock_queue_work(gl, GL_GLOCK_DFT_HOLD); - goto out; + return; } else { clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags); } } - if (sdp->sd_lockstruct.ls_ops->lm_lock) { - /* lock_dlm */ - ret = sdp->sd_lockstruct.ls_ops->lm_lock(gl, target, lck_flags); + if (ls->ls_ops->lm_lock) { + set_bit(GLF_PENDING_REPLY, &gl->gl_flags); + spin_unlock(&gl->gl_lockref.lock); + ret = ls->ls_ops->lm_lock(gl, target, lck_flags); + spin_lock(&gl->gl_lockref.lock); + if (ret == -EINVAL && gl->gl_target == LM_ST_UNLOCKED && target == LM_ST_UNLOCKED && - test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) { - finish_xmote(gl, target); - gfs2_glock_queue_work(gl, 0); + test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) { + /* + * The lockspace has been released and the lock has + * been unlocked implicitly. + */ } else if (ret) { fs_err(sdp, "lm_lock ret %d\n", ret); - GLOCK_BUG_ON(gl, !gfs2_withdrawing_or_withdrawn(sdp)); + target = gl->gl_state | LM_OUT_ERROR; + } else { + /* The operation will be completed asynchronously. */ + return; } - } else { /* lock_nolock */ - finish_xmote(gl, target); - gfs2_glock_queue_work(gl, 0); + clear_bit(GLF_PENDING_REPLY, &gl->gl_flags); } -out: - spin_lock(&gl->gl_lockref.lock); + + /* Complete the operation now. */ + finish_xmote(gl, target); + gfs2_glock_queue_work(gl, 0); } /** @@ -832,11 +850,13 @@ static void run_queue(struct gfs2_glock *gl, const int nonblock) __releases(&gl->gl_lockref.lock) __acquires(&gl->gl_lockref.lock) { - struct gfs2_holder *gh = NULL; + struct gfs2_holder *gh; - if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) + if (test_bit(GLF_LOCK, &gl->gl_flags)) return; + set_bit(GLF_LOCK, &gl->gl_flags); + /* While a demote is in progress, the GLF_LOCK flag must be set. */ GLOCK_BUG_ON(gl, test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags)); if (test_bit(GLF_DEMOTE, &gl->gl_flags) && @@ -848,30 +868,33 @@ __acquires(&gl->gl_lockref.lock) set_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags); GLOCK_BUG_ON(gl, gl->gl_demote_state == LM_ST_EXCLUSIVE); gl->gl_target = gl->gl_demote_state; + do_xmote(gl, NULL, gl->gl_target); + return; } else { if (test_bit(GLF_DEMOTE, &gl->gl_flags)) gfs2_demote_wake(gl); if (do_promote(gl)) goto out_unlock; gh = find_first_waiter(gl); + if (!gh) + goto out_unlock; gl->gl_target = gh->gh_state; if (!(gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB))) do_error(gl, 0); /* Fail queued try locks */ + do_xmote(gl, gh, gl->gl_target); + return; } - do_xmote(gl, gh, gl->gl_target); - return; out_sched: clear_bit(GLF_LOCK, &gl->gl_flags); smp_mb__after_atomic(); gl->gl_lockref.count++; - __gfs2_glock_queue_work(gl, 0); + gfs2_glock_queue_work(gl, 0); return; out_unlock: clear_bit(GLF_LOCK, &gl->gl_flags); smp_mb__after_atomic(); - return; } /** @@ -887,12 +910,8 @@ void glock_set_object(struct gfs2_glock *gl, void *object) prev_object = gl->gl_object; gl->gl_object = object; spin_unlock(&gl->gl_lockref.lock); - if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == NULL)) { - pr_warn("glock=%u/%llx\n", - gl->gl_name.ln_type, - (unsigned long long)gl->gl_name.ln_number); + if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == NULL)) gfs2_dump_glock(NULL, gl, true); - } } /** @@ -908,12 +927,8 @@ void glock_clear_object(struct gfs2_glock *gl, void *object) prev_object = gl->gl_object; gl->gl_object = NULL; spin_unlock(&gl->gl_lockref.lock); - if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == object)) { - pr_warn("glock=%u/%llx\n", - gl->gl_name.ln_type, - (unsigned long long)gl->gl_name.ln_number); + if (gfs2_assert_warn(gl->gl_name.ln_sbd, prev_object == object)) gfs2_dump_glock(NULL, gl, true); - } } void gfs2_inode_remember_delete(struct gfs2_glock *gl, u64 generation) @@ -948,48 +963,56 @@ static void gfs2_glock_poke(struct gfs2_glock *gl) gfs2_holder_uninit(&gh); } -static bool gfs2_try_evict(struct gfs2_glock *gl) +static struct gfs2_inode *gfs2_grab_existing_inode(struct gfs2_glock *gl) +{ + struct gfs2_inode *ip; + + spin_lock(&gl->gl_lockref.lock); + ip = gl->gl_object; + if (ip && !igrab(&ip->i_inode)) + ip = NULL; + spin_unlock(&gl->gl_lockref.lock); + if (ip) { + wait_on_inode(&ip->i_inode); + if (is_bad_inode(&ip->i_inode)) { + iput(&ip->i_inode); + ip = NULL; + } + } + return ip; +} + +static void gfs2_try_evict(struct gfs2_glock *gl) { struct gfs2_inode *ip; - bool evicted = false; /* * If there is contention on the iopen glock and we have an inode, try - * to grab and release the inode so that it can be evicted. This will - * allow the remote node to go ahead and delete the inode without us - * having to do it, which will avoid rgrp glock thrashing. + * to grab and release the inode so that it can be evicted. The + * GIF_DEFER_DELETE flag indicates to gfs2_evict_inode() that the inode + * should not be deleted locally. This will allow the remote node to + * go ahead and delete the inode without us having to do it, which will + * avoid rgrp glock thrashing. * * The remote node is likely still holding the corresponding inode * glock, so it will run before we get to verify that the delete has - * happened below. + * happened below. (Verification is triggered by the call to + * gfs2_queue_verify_delete() in gfs2_evict_inode().) */ - spin_lock(&gl->gl_lockref.lock); - ip = gl->gl_object; - if (ip && !igrab(&ip->i_inode)) - ip = NULL; - spin_unlock(&gl->gl_lockref.lock); + ip = gfs2_grab_existing_inode(gl); if (ip) { - gl->gl_no_formal_ino = ip->i_no_formal_ino; - set_bit(GIF_DEFERRED_DELETE, &ip->i_flags); + set_bit(GLF_DEFER_DELETE, &gl->gl_flags); d_prune_aliases(&ip->i_inode); iput(&ip->i_inode); + clear_bit(GLF_DEFER_DELETE, &gl->gl_flags); /* If the inode was evicted, gl->gl_object will now be NULL. */ - spin_lock(&gl->gl_lockref.lock); - ip = gl->gl_object; - if (ip) { - clear_bit(GIF_DEFERRED_DELETE, &ip->i_flags); - if (!igrab(&ip->i_inode)) - ip = NULL; - } - spin_unlock(&gl->gl_lockref.lock); + ip = gfs2_grab_existing_inode(gl); if (ip) { gfs2_glock_poke(ip->i_gl); iput(&ip->i_inode); } - evicted = !ip; } - return evicted; } bool gfs2_queue_try_to_evict(struct gfs2_glock *gl) @@ -998,18 +1021,18 @@ bool gfs2_queue_try_to_evict(struct gfs2_glock *gl) if (test_and_set_bit(GLF_TRY_TO_EVICT, &gl->gl_flags)) return false; - return queue_delayed_work(sdp->sd_delete_wq, - &gl->gl_delete, 0); + return !mod_delayed_work(sdp->sd_delete_wq, &gl->gl_delete, 0); } -static bool gfs2_queue_verify_evict(struct gfs2_glock *gl) +bool gfs2_queue_verify_delete(struct gfs2_glock *gl, bool later) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + unsigned long delay; - if (test_and_set_bit(GLF_VERIFY_EVICT, &gl->gl_flags)) + if (test_and_set_bit(GLF_VERIFY_DELETE, &gl->gl_flags)) return false; - return queue_delayed_work(sdp->sd_delete_wq, - &gl->gl_delete, 5 * HZ); + delay = later ? HZ + get_random_long() % (HZ * 9) : 0; + return queue_delayed_work(sdp->sd_delete_wq, &gl->gl_delete, delay); } static void delete_work_func(struct work_struct *work) @@ -1017,43 +1040,21 @@ static void delete_work_func(struct work_struct *work) struct delayed_work *dwork = to_delayed_work(work); struct gfs2_glock *gl = container_of(dwork, struct gfs2_glock, gl_delete); struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - struct inode *inode; - u64 no_addr = gl->gl_name.ln_number; + bool verify_delete = test_and_clear_bit(GLF_VERIFY_DELETE, &gl->gl_flags); - if (test_and_clear_bit(GLF_TRY_TO_EVICT, &gl->gl_flags)) { - /* - * If we can evict the inode, give the remote node trying to - * delete the inode some time before verifying that the delete - * has happened. Otherwise, if we cause contention on the inode glock - * immediately, the remote node will think that we still have - * the inode in use, and so it will give up waiting. - * - * If we can't evict the inode, signal to the remote node that - * the inode is still in use. We'll later try to delete the - * inode locally in gfs2_evict_inode. - * - * FIXME: We only need to verify that the remote node has - * deleted the inode because nodes before this remote delete - * rework won't cooperate. At a later time, when we no longer - * care about compatibility with such nodes, we can skip this - * step entirely. - */ - if (gfs2_try_evict(gl)) { - if (test_bit(SDF_KILL, &sdp->sd_flags)) - goto out; - if (gfs2_queue_verify_evict(gl)) - return; - } - goto out; - } + if (test_and_clear_bit(GLF_TRY_TO_EVICT, &gl->gl_flags)) + gfs2_try_evict(gl); + + if (verify_delete) { + u64 no_addr = gl->gl_name.ln_number; + struct inode *inode; - if (test_and_clear_bit(GLF_VERIFY_EVICT, &gl->gl_flags)) { inode = gfs2_lookup_by_inum(sdp, no_addr, gl->gl_no_formal_ino, GFS2_BLKST_UNLINKED); if (IS_ERR(inode)) { if (PTR_ERR(inode) == -EAGAIN && !test_bit(SDF_KILL, &sdp->sd_flags) && - gfs2_queue_verify_evict(gl)) + gfs2_queue_verify_delete(gl, true)) return; } else { d_prune_aliases(inode); @@ -1061,7 +1062,6 @@ static void delete_work_func(struct work_struct *work) } } -out: gfs2_glock_put(gl); } @@ -1071,43 +1071,44 @@ static void glock_work_func(struct work_struct *work) struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work); unsigned int drop_refs = 1; - if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) { + spin_lock(&gl->gl_lockref.lock); + if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags)) { + clear_bit(GLF_HAVE_REPLY, &gl->gl_flags); finish_xmote(gl, gl->gl_reply); drop_refs++; } - spin_lock(&gl->gl_lockref.lock); if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && gl->gl_state != LM_ST_UNLOCKED && gl->gl_demote_state != LM_ST_EXCLUSIVE) { - unsigned long holdtime, now = jiffies; + if (gl->gl_name.ln_type == LM_TYPE_INODE) { + unsigned long holdtime, now = jiffies; - holdtime = gl->gl_tchange + gl->gl_hold_time; - if (time_before(now, holdtime)) - delay = holdtime - now; + holdtime = gl->gl_tchange + gl->gl_hold_time; + if (time_before(now, holdtime)) + delay = holdtime - now; + } if (!delay) { clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags); - gfs2_set_demote(gl); + gfs2_set_demote(GLF_DEMOTE, gl); } } run_queue(gl, 0); if (delay) { /* Keep one glock reference for the work we requeue. */ drop_refs--; - if (gl->gl_name.ln_type != LM_TYPE_INODE) - delay = 0; - __gfs2_glock_queue_work(gl, delay); + gfs2_glock_queue_work(gl, delay); } - /* - * Drop the remaining glock references manually here. (Mind that - * __gfs2_glock_queue_work depends on the lockref spinlock begin held - * here as well.) - */ + /* Drop the remaining glock references manually. */ + GLOCK_BUG_ON(gl, gl->gl_lockref.count < drop_refs); gl->gl_lockref.count -= drop_refs; if (!gl->gl_lockref.count) { - __gfs2_glock_put(gl); - return; + if (gl->gl_state == LM_ST_UNLOCKED) { + __gfs2_glock_put(gl); + return; + } + gfs2_glock_add_to_lru(gl); } spin_unlock(&gl->gl_lockref.lock); } @@ -1143,6 +1144,8 @@ again: out: rcu_read_unlock(); finish_wait(wq, &wait.wait); + if (gl) + gfs2_glock_remove_from_lru(gl); return gl; } @@ -1163,19 +1166,15 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, const struct gfs2_glock_operations *glops, int create, struct gfs2_glock **glp) { - struct super_block *s = sdp->sd_vfs; struct lm_lockname name = { .ln_number = number, .ln_type = glops->go_type, .ln_sbd = sdp }; struct gfs2_glock *gl, *tmp; struct address_space *mapping; - int ret = 0; gl = find_insert_glock(&name, NULL); - if (gl) { - *glp = gl; - return 0; - } + if (gl) + goto found; if (!create) return -ENOENT; @@ -1203,10 +1202,12 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, atomic_inc(&sdp->sd_glock_disposal); gl->gl_node.next = NULL; - gl->gl_flags = glops->go_instantiate ? BIT(GLF_INSTANTIATE_NEEDED) : 0; + gl->gl_flags = BIT(GLF_INITIAL); + if (glops->go_instantiate) + gl->gl_flags |= BIT(GLF_INSTANTIATE_NEEDED); gl->gl_name = name; + lockref_init(&gl->gl_lockref); lockdep_set_subclass(&gl->gl_lockref.lock, glops->go_subclass); - gl->gl_lockref.count = 1; gl->gl_state = LM_ST_UNLOCKED; gl->gl_target = LM_ST_UNLOCKED; gl->gl_demote_state = LM_ST_EXCLUSIVE; @@ -1227,7 +1228,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, mapping = gfs2_glock2aspace(gl); if (mapping) { mapping->a_ops = &gfs2_meta_aops; - mapping->host = s->s_bdev->bd_inode; + mapping->host = sdp->sd_inode; mapping->flags = 0; mapping_set_gfp_mask(mapping, GFP_NOFS); mapping->i_private_data = NULL; @@ -1235,23 +1236,19 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, } tmp = find_insert_glock(&name, gl); - if (!tmp) { - *glp = gl; - goto out; - } - if (IS_ERR(tmp)) { - ret = PTR_ERR(tmp); - goto out_free; - } - *glp = tmp; + if (tmp) { + gfs2_glock_dealloc(&gl->gl_rcu); + if (atomic_dec_and_test(&sdp->sd_glock_disposal)) + wake_up(&sdp->sd_kill_wait); -out_free: - gfs2_glock_dealloc(&gl->gl_rcu); - if (atomic_dec_and_test(&sdp->sd_glock_disposal)) - wake_up(&sdp->sd_kill_wait); + if (IS_ERR(tmp)) + return PTR_ERR(tmp); + gl = tmp; + } -out: - return ret; +found: + *glp = gl; + return 0; } /** @@ -1421,7 +1418,7 @@ out: } /** - * handle_callback - process a demote request + * request_demote - process a demote request * @gl: the glock * @state: the state the caller wants us to change to * @delay: zero to demote immediately; otherwise pending demote @@ -1431,13 +1428,10 @@ out: * practise: LM_ST_SHARED and LM_ST_UNLOCKED */ -static void handle_callback(struct gfs2_glock *gl, unsigned int state, - unsigned long delay, bool remote) +static void request_demote(struct gfs2_glock *gl, unsigned int state, + unsigned long delay, bool remote) { - if (delay) - set_bit(GLF_PENDING_DEMOTE, &gl->gl_flags); - else - gfs2_set_demote(gl); + gfs2_set_demote(delay ? GLF_PENDING_DEMOTE : GLF_DEMOTE, gl); if (gl->gl_demote_state == LM_ST_EXCLUSIVE) { gl->gl_demote_state = state; gl->gl_demote_time = jiffies; @@ -1473,9 +1467,7 @@ static inline bool pid_is_meaningful(const struct gfs2_holder *gh) { if (!(gh->gh_flags & GL_NOPID)) return true; - if (gh->gh_state == LM_ST_UNLOCKED) - return true; - return false; + return !test_bit(HIF_HOLDER, &gh->gh_iflags); } /** @@ -1494,7 +1486,6 @@ __acquires(&gl->gl_lockref.lock) { struct gfs2_glock *gl = gh->gh_gl; struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; - struct list_head *insert_pt = NULL; struct gfs2_holder *gh2; int try_futile = 0; @@ -1530,21 +1521,11 @@ fail: gfs2_holder_wake(gh); return; } - if (test_bit(HIF_HOLDER, &gh2->gh_iflags)) - continue; } trace_gfs2_glock_queue(gh, 1); gfs2_glstats_inc(gl, GFS2_LKS_QCOUNT); gfs2_sbstats_inc(gl, GFS2_LKS_QCOUNT); - if (likely(insert_pt == NULL)) { - list_add_tail(&gh->gh_list, &gl->gl_holders); - return; - } - list_add_tail(&gh->gh_list, insert_pt); - spin_unlock(&gl->gl_lockref.lock); - if (sdp->sd_lockstruct.ls_ops->lm_cancel) - sdp->sd_lockstruct.ls_ops->lm_cancel(gl); - spin_lock(&gl->gl_lockref.lock); + list_add_tail(&gh->gh_list, &gl->gl_holders); return; trap_recursive: @@ -1596,17 +1577,14 @@ unlock: return error; } - if (test_bit(GLF_LRU, &gl->gl_flags)) - gfs2_glock_remove_from_lru(gl); - gh->gh_error = 0; spin_lock(&gl->gl_lockref.lock); add_to_queue(gh); if (unlikely((LM_FLAG_NOEXP & gh->gh_flags) && - test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) { - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags))) { + set_bit(GLF_HAVE_REPLY, &gl->gl_flags); gl->gl_lockref.count++; - __gfs2_glock_queue_work(gl, 0); + gfs2_glock_queue_work(gl, 0); } run_queue(gl, 1); spin_unlock(&gl->gl_lockref.lock); @@ -1630,12 +1608,6 @@ int gfs2_glock_poll(struct gfs2_holder *gh) return test_bit(HIF_WAIT, &gh->gh_iflags) ? 0 : 1; } -static inline bool needs_demote(struct gfs2_glock *gl) -{ - return (test_bit(GLF_DEMOTE, &gl->gl_flags) || - test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags)); -} - static void __gfs2_glock_dq(struct gfs2_holder *gh) { struct gfs2_glock *gl = gh->gh_gl; @@ -1644,11 +1616,11 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh) /* * This holder should not be cached, so mark it for demote. - * Note: this should be done before the check for needs_demote - * below. + * Note: this should be done before the glock_needs_demote + * check below. */ if (gh->gh_flags & GL_NOCACHE) - handle_callback(gl, LM_ST_UNLOCKED, 0, false); + request_demote(gl, LM_ST_UNLOCKED, 0, false); list_del_init(&gh->gh_list); clear_bit(HIF_HOLDER, &gh->gh_iflags); @@ -1658,21 +1630,18 @@ static void __gfs2_glock_dq(struct gfs2_holder *gh) * If there hasn't been a demote request we are done. * (Let the remaining holders, if any, keep holding it.) */ - if (!needs_demote(gl)) { + if (!glock_needs_demote(gl)) { if (list_empty(&gl->gl_holders)) fast_path = 1; } - if (!test_bit(GLF_LFLUSH, &gl->gl_flags) && demote_ok(gl)) - gfs2_glock_add_to_lru(gl); - if (unlikely(!fast_path)) { gl->gl_lockref.count++; if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && !test_bit(GLF_DEMOTE, &gl->gl_flags) && gl->gl_name.ln_type == LM_TYPE_INODE) delay = gl->gl_hold_time; - __gfs2_glock_queue_work(gl, delay); + gfs2_glock_queue_work(gl, delay); } } @@ -1696,11 +1665,19 @@ void gfs2_glock_dq(struct gfs2_holder *gh) } if (list_is_first(&gh->gh_list, &gl->gl_holders) && - !test_bit(HIF_HOLDER, &gh->gh_iflags)) { + !test_bit(HIF_HOLDER, &gh->gh_iflags) && + test_bit(GLF_LOCK, &gl->gl_flags) && + !test_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags) && + !test_bit(GLF_CANCELING, &gl->gl_flags)) { + set_bit(GLF_CANCELING, &gl->gl_flags); spin_unlock(&gl->gl_lockref.lock); gl->gl_name.ln_sbd->sd_lockstruct.ls_ops->lm_cancel(gl); wait_on_bit(&gh->gh_iflags, HIF_WAIT, TASK_UNINTERRUPTIBLE); spin_lock(&gl->gl_lockref.lock); + clear_bit(GLF_CANCELING, &gl->gl_flags); + clear_bit(GLF_LOCK, &gl->gl_flags); + if (!gfs2_holder_queued(gh)) + goto out; } /* @@ -1882,21 +1859,23 @@ void gfs2_glock_dq_m(unsigned int num_gh, struct gfs2_holder *ghs) void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state) { unsigned long delay = 0; - unsigned long holdtime; - unsigned long now = jiffies; gfs2_glock_hold(gl); spin_lock(&gl->gl_lockref.lock); - holdtime = gl->gl_tchange + gl->gl_hold_time; if (!list_empty(&gl->gl_holders) && gl->gl_name.ln_type == LM_TYPE_INODE) { + unsigned long now = jiffies; + unsigned long holdtime; + + holdtime = gl->gl_tchange + gl->gl_hold_time; + if (time_before(now, holdtime)) delay = holdtime - now; - if (test_bit(GLF_REPLY_PENDING, &gl->gl_flags)) + if (test_bit(GLF_HAVE_REPLY, &gl->gl_flags)) delay = gl->gl_hold_time; } - handle_callback(gl, state, delay, true); - __gfs2_glock_queue_work(gl, delay); + request_demote(gl, state, delay, true); + gfs2_glock_queue_work(gl, delay); spin_unlock(&gl->gl_lockref.lock); } @@ -1944,19 +1923,20 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret) struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct; spin_lock(&gl->gl_lockref.lock); + clear_bit(GLF_PENDING_REPLY, &gl->gl_flags); gl->gl_reply = ret; if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags))) { if (gfs2_should_freeze(gl)) { - set_bit(GLF_FROZEN, &gl->gl_flags); + set_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags); spin_unlock(&gl->gl_lockref.lock); return; } } gl->gl_lockref.count++; - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); - __gfs2_glock_queue_work(gl, 0); + set_bit(GLF_HAVE_REPLY, &gl->gl_flags); + gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); } @@ -1976,6 +1956,16 @@ static int glock_cmp(void *priv, const struct list_head *a, return 0; } +static bool can_free_glock(struct gfs2_glock *gl) +{ + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + + return !test_bit(GLF_LOCK, &gl->gl_flags) && + !gl->gl_lockref.count && + (!test_bit(GLF_LFLUSH, &gl->gl_flags) || + test_bit(SDF_KILL, &sdp->sd_flags)); +} + /** * gfs2_dispose_glock_lru - Demote a list of glocks * @list: The list to dispose of @@ -1990,37 +1980,38 @@ static int glock_cmp(void *priv, const struct list_head *a, * private) */ -static void gfs2_dispose_glock_lru(struct list_head *list) +static unsigned long gfs2_dispose_glock_lru(struct list_head *list) __releases(&lru_lock) __acquires(&lru_lock) { struct gfs2_glock *gl; + unsigned long freed = 0; list_sort(NULL, list, glock_cmp); while(!list_empty(list)) { gl = list_first_entry(list, struct gfs2_glock, gl_lru); - list_del_init(&gl->gl_lru); - clear_bit(GLF_LRU, &gl->gl_flags); if (!spin_trylock(&gl->gl_lockref.lock)) { add_back_to_lru: - list_add(&gl->gl_lru, &lru_list); - set_bit(GLF_LRU, &gl->gl_flags); - atomic_inc(&lru_count); + list_move(&gl->gl_lru, &lru_list); continue; } - if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) { + if (!can_free_glock(gl)) { spin_unlock(&gl->gl_lockref.lock); goto add_back_to_lru; } + list_del_init(&gl->gl_lru); + atomic_dec(&lru_count); + clear_bit(GLF_LRU, &gl->gl_flags); + freed++; gl->gl_lockref.count++; - if (demote_ok(gl)) - handle_callback(gl, LM_ST_UNLOCKED, 0, false); - WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags)); - __gfs2_glock_queue_work(gl, 0); + if (gl->gl_state != LM_ST_UNLOCKED) + request_demote(gl, LM_ST_UNLOCKED, 0, false); + gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); cond_resched_lock(&lru_lock); } + return freed; } /** @@ -2032,32 +2023,21 @@ add_back_to_lru: * gfs2_dispose_glock_lru() above. */ -static long gfs2_scan_glock_lru(int nr) +static unsigned long gfs2_scan_glock_lru(unsigned long nr) { struct gfs2_glock *gl, *next; LIST_HEAD(dispose); - long freed = 0; + unsigned long freed = 0; spin_lock(&lru_lock); list_for_each_entry_safe(gl, next, &lru_list, gl_lru) { - if (nr-- <= 0) + if (!nr--) break; - /* Test for being demotable */ - if (!test_bit(GLF_LOCK, &gl->gl_flags)) { - if (!spin_trylock(&gl->gl_lockref.lock)) - continue; - if (gl->gl_lockref.count <= 1 && - (gl->gl_state == LM_ST_UNLOCKED || - demote_ok(gl))) { - list_move(&gl->gl_lru, &dispose); - atomic_dec(&lru_count); - freed++; - } - spin_unlock(&gl->gl_lockref.lock); - } + if (can_free_glock(gl)) + list_move(&gl->gl_lru, &dispose); } if (!list_empty(&dispose)) - gfs2_dispose_glock_lru(&dispose); + freed = gfs2_dispose_glock_lru(&dispose); spin_unlock(&lru_lock); return freed; @@ -2113,7 +2093,7 @@ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp) void gfs2_cancel_delete_work(struct gfs2_glock *gl) { clear_bit(GLF_TRY_TO_EVICT, &gl->gl_flags); - clear_bit(GLF_VERIFY_EVICT, &gl->gl_flags); + clear_bit(GLF_VERIFY_DELETE, &gl->gl_flags); if (cancel_delayed_work(&gl->gl_delete)) gfs2_glock_put(gl); } @@ -2144,12 +2124,16 @@ void gfs2_flush_delete_work(struct gfs2_sbd *sdp) static void thaw_glock(struct gfs2_glock *gl) { - if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) + if (!test_and_clear_bit(GLF_HAVE_FROZEN_REPLY, &gl->gl_flags)) return; if (!lockref_get_not_dead(&gl->gl_lockref)) return; - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + + gfs2_glock_remove_from_lru(gl); + spin_lock(&gl->gl_lockref.lock); + set_bit(GLF_HAVE_REPLY, &gl->gl_flags); gfs2_glock_queue_work(gl, 0); + spin_unlock(&gl->gl_lockref.lock); } /** @@ -2166,8 +2150,8 @@ static void clear_glock(struct gfs2_glock *gl) if (!__lockref_is_dead(&gl->gl_lockref)) { gl->gl_lockref.count++; if (gl->gl_state != LM_ST_UNLOCKED) - handle_callback(gl, LM_ST_UNLOCKED, 0, false); - __gfs2_glock_queue_work(gl, 0); + request_demote(gl, LM_ST_UNLOCKED, 0, false); + gfs2_glock_queue_work(gl, 0); } spin_unlock(&gl->gl_lockref.lock); } @@ -2218,14 +2202,31 @@ void gfs2_gl_dq_holders(struct gfs2_sbd *sdp) void gfs2_gl_hash_clear(struct gfs2_sbd *sdp) { + unsigned long start = jiffies; + bool timed_out = false; + set_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags); - flush_workqueue(glock_workqueue); + flush_workqueue(sdp->sd_glock_wq); glock_hash_walk(clear_glock, sdp); - flush_workqueue(glock_workqueue); - wait_event_timeout(sdp->sd_kill_wait, - atomic_read(&sdp->sd_glock_disposal) == 0, - HZ * 600); + flush_workqueue(sdp->sd_glock_wq); + + while (!timed_out) { + wait_event_timeout(sdp->sd_kill_wait, + !atomic_read(&sdp->sd_glock_disposal), + HZ * 60); + if (!atomic_read(&sdp->sd_glock_disposal)) + break; + timed_out = time_after(jiffies, start + (HZ * 600)); + fs_warn(sdp, "%u glocks left after %u seconds%s\n", + atomic_read(&sdp->sd_glock_disposal), + jiffies_to_msecs(jiffies - start) / 1000, + timed_out ? ":" : "; still waiting"); + } + gfs2_lm_unmount(sdp); + gfs2_free_dead_glocks(sdp); glock_hash_walk(dump_glock_func, sdp); + destroy_workqueue(sdp->sd_glock_wq); + sdp->sd_glock_wq = NULL; } static const char *state2str(unsigned state) @@ -2323,11 +2324,13 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl) *p++ = 'f'; if (test_bit(GLF_INVALIDATE_IN_PROGRESS, gflags)) *p++ = 'i'; - if (test_bit(GLF_REPLY_PENDING, gflags)) + if (test_bit(GLF_PENDING_REPLY, gflags)) + *p++ = 'R'; + if (test_bit(GLF_HAVE_REPLY, gflags)) *p++ = 'r'; if (test_bit(GLF_INITIAL, gflags)) - *p++ = 'I'; - if (test_bit(GLF_FROZEN, gflags)) + *p++ = 'a'; + if (test_bit(GLF_HAVE_FROZEN_REPLY, gflags)) *p++ = 'F'; if (!list_empty(&gl->gl_holders)) *p++ = 'q'; @@ -2337,7 +2340,7 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl) *p++ = 'o'; if (test_bit(GLF_BLOCKING, gflags)) *p++ = 'b'; - if (test_bit(GLF_FREEING, gflags)) + if (test_bit(GLF_UNLOCKED, gflags)) *p++ = 'x'; if (test_bit(GLF_INSTANTIATE_NEEDED, gflags)) *p++ = 'n'; @@ -2345,8 +2348,12 @@ static const char *gflags2str(char *buf, const struct gfs2_glock *gl) *p++ = 'N'; if (test_bit(GLF_TRY_TO_EVICT, gflags)) *p++ = 'e'; - if (test_bit(GLF_VERIFY_EVICT, gflags)) + if (test_bit(GLF_VERIFY_DELETE, gflags)) *p++ = 'E'; + if (test_bit(GLF_DEFER_DELETE, gflags)) + *p++ = 's'; + if (test_bit(GLF_CANCELING, gflags)) + *p++ = 'C'; *p = 0; return buf; } @@ -2490,16 +2497,8 @@ int __init gfs2_glock_init(void) if (ret < 0) return ret; - glock_workqueue = alloc_workqueue("glock_workqueue", WQ_MEM_RECLAIM | - WQ_HIGHPRI | WQ_FREEZABLE, 0); - if (!glock_workqueue) { - rhashtable_destroy(&gl_hash_table); - return -ENOMEM; - } - glock_shrinker = shrinker_alloc(0, "gfs2-glock"); if (!glock_shrinker) { - destroy_workqueue(glock_workqueue); rhashtable_destroy(&gl_hash_table); return -ENOMEM; } @@ -2519,7 +2518,6 @@ void gfs2_glock_exit(void) { shrinker_free(glock_shrinker); rhashtable_destroy(&gl_hash_table); - destroy_workqueue(glock_workqueue); } static void gfs2_glock_iter_next(struct gfs2_glock_iter *gi, loff_t n) @@ -2529,8 +2527,7 @@ static void gfs2_glock_iter_next(struct gfs2_glock_iter *gi, loff_t n) if (gl) { if (n == 0) return; - if (!lockref_put_not_zero(&gl->gl_lockref)) - gfs2_glock_queue_put(gl); + gfs2_glock_put_async(gl); } for (;;) { gl = rhashtable_walk_next(&gi->hti); @@ -2752,25 +2749,18 @@ static struct file *gfs2_glockfd_next_file(struct gfs2_glockfd_iter *i) i->file = NULL; } - rcu_read_lock(); for(;; i->fd++) { - struct inode *inode; - - i->file = task_lookup_next_fdget_rcu(i->task, &i->fd); + i->file = fget_task_next(i->task, &i->fd); if (!i->file) { i->fd = 0; break; } - inode = file_inode(i->file); - if (inode->i_sb == i->sb) + if (file_inode(i->file)->i_sb == i->sb) break; - rcu_read_unlock(); fput(i->file); - rcu_read_lock(); } - rcu_read_unlock(); return i->file; } |