summaryrefslogtreecommitdiff
path: root/fs/ocfs2/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/journal.c')
-rw-r--r--fs/ocfs2/journal.c789
1 files changed, 528 insertions, 261 deletions
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index 242170d83971..85239807dec7 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -1,26 +1,10 @@
-/* -*- mode: c; c-basic-offset: 8; -*-
- * vim: noexpandtab sw=8 ts=8 sts=0:
- *
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
* journal.c
*
* Defines functions of journalling api
*
* Copyright (C) 2003, 2004 Oracle. All rights reserved.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public
- * License along with this program; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 021110-1307, USA.
*/
#include <linux/fs.h>
@@ -30,6 +14,8 @@
#include <linux/kthread.h>
#include <linux/time.h>
#include <linux/random.h>
+#include <linux/delay.h>
+#include <linux/writeback.h>
#include <cluster/masklog.h>
@@ -49,6 +35,8 @@
#include "sysfile.h"
#include "uptodate.h"
#include "quota.h"
+#include "file.h"
+#include "namei.h"
#include "buffer_head_io.h"
#include "ocfs2_trace.h"
@@ -68,13 +56,15 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
static int ocfs2_trylock_journal(struct ocfs2_super *osb,
int slot_num);
static int ocfs2_recover_orphans(struct ocfs2_super *osb,
- int slot);
+ int slot,
+ enum ocfs2_orphan_reco_type orphan_reco_type);
static int ocfs2_commit_thread(void *arg);
static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
int slot_num,
struct ocfs2_dinode *la_dinode,
struct ocfs2_dinode *tl_dinode,
- struct ocfs2_quota_recovery *qrec);
+ struct ocfs2_quota_recovery *qrec,
+ enum ocfs2_orphan_reco_type orphan_reco_type);
static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
{
@@ -100,10 +90,10 @@ enum ocfs2_replay_state {
struct ocfs2_replay_map {
unsigned int rm_slots;
enum ocfs2_replay_state rm_state;
- unsigned char rm_replay_slots[0];
+ unsigned char rm_replay_slots[] __counted_by(rm_slots);
};
-void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
+static void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
{
if (!osb->replay_map)
return;
@@ -124,9 +114,9 @@ int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
if (osb->replay_map)
return 0;
- replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
- (osb->max_slots * sizeof(char)), GFP_KERNEL);
-
+ replay_map = kzalloc(struct_size(replay_map, rm_replay_slots,
+ osb->max_slots),
+ GFP_KERNEL);
if (!replay_map) {
mlog_errno(-ENOMEM);
return -ENOMEM;
@@ -148,7 +138,8 @@ int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
return 0;
}
-void ocfs2_queue_replay_slots(struct ocfs2_super *osb)
+static void ocfs2_queue_replay_slots(struct ocfs2_super *osb,
+ enum ocfs2_orphan_reco_type orphan_reco_type)
{
struct ocfs2_replay_map *replay_map = osb->replay_map;
int i;
@@ -162,7 +153,8 @@ void ocfs2_queue_replay_slots(struct ocfs2_super *osb)
for (i = 0; i < replay_map->rm_slots; i++)
if (replay_map->rm_replay_slots[i])
ocfs2_queue_recovery_completion(osb->journal, i, NULL,
- NULL, NULL);
+ NULL, NULL,
+ orphan_reco_type);
replay_map->rm_state = REPLAY_DONE;
}
@@ -182,49 +174,69 @@ int ocfs2_recovery_init(struct ocfs2_super *osb)
struct ocfs2_recovery_map *rm;
mutex_init(&osb->recovery_lock);
- osb->disable_recovery = 0;
+ osb->recovery_state = OCFS2_REC_ENABLED;
osb->recovery_thread_task = NULL;
init_waitqueue_head(&osb->recovery_event);
- rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
- osb->max_slots * sizeof(unsigned int),
+ rm = kzalloc(struct_size(rm, rm_entries, osb->max_slots),
GFP_KERNEL);
if (!rm) {
mlog_errno(-ENOMEM);
return -ENOMEM;
}
- rm->rm_entries = (unsigned int *)((char *)rm +
- sizeof(struct ocfs2_recovery_map));
osb->recovery_map = rm;
return 0;
}
-/* we can't grab the goofy sem lock from inside wait_event, so we use
- * memory barriers to make sure that we'll see the null task before
- * being woken up */
static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
{
- mb();
return osb->recovery_thread_task != NULL;
}
+static void ocfs2_recovery_disable(struct ocfs2_super *osb,
+ enum ocfs2_recovery_state state)
+{
+ mutex_lock(&osb->recovery_lock);
+ /*
+ * If recovery thread is not running, we can directly transition to
+ * final state.
+ */
+ if (!ocfs2_recovery_thread_running(osb)) {
+ osb->recovery_state = state + 1;
+ goto out_lock;
+ }
+ osb->recovery_state = state;
+ /* Wait for recovery thread to acknowledge state transition */
+ wait_event_cmd(osb->recovery_event,
+ !ocfs2_recovery_thread_running(osb) ||
+ osb->recovery_state >= state + 1,
+ mutex_unlock(&osb->recovery_lock),
+ mutex_lock(&osb->recovery_lock));
+out_lock:
+ mutex_unlock(&osb->recovery_lock);
+
+ /*
+ * At this point we know that no more recovery work can be queued so
+ * wait for any recovery completion work to complete.
+ */
+ if (osb->ocfs2_wq)
+ flush_workqueue(osb->ocfs2_wq);
+}
+
+void ocfs2_recovery_disable_quota(struct ocfs2_super *osb)
+{
+ ocfs2_recovery_disable(osb, OCFS2_REC_QUOTA_WANT_DISABLE);
+}
+
void ocfs2_recovery_exit(struct ocfs2_super *osb)
{
struct ocfs2_recovery_map *rm;
/* disable any new recovery threads and wait for any currently
* running ones to exit. Do this before setting the vol_state. */
- mutex_lock(&osb->recovery_lock);
- osb->disable_recovery = 1;
- mutex_unlock(&osb->recovery_lock);
- wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
-
- /* At this point, we know that no more recovery threads can be
- * launched, so wait for any recovery completion work to
- * complete. */
- flush_workqueue(ocfs2_wq);
+ ocfs2_recovery_disable(osb, OCFS2_REC_WANT_DISABLE);
/*
* Now that recovery is shut down, and the osb is about to be
@@ -316,7 +328,7 @@ static int ocfs2_commit_cache(struct ocfs2_super *osb)
}
jbd2_journal_lock_updates(journal->j_journal);
- status = jbd2_journal_flush(journal->j_journal);
+ status = jbd2_journal_flush(journal->j_journal, 0);
jbd2_journal_unlock_updates(journal->j_journal);
if (status < 0) {
up_write(&journal->j_trans_barrier);
@@ -367,7 +379,7 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
mlog_errno(PTR_ERR(handle));
if (is_journal_aborted(journal)) {
- ocfs2_abort(osb->sb, "Detected aborted journal");
+ ocfs2_abort(osb->sb, "Detected aborted journal\n");
handle = ERR_PTR(-EROFS);
}
} else {
@@ -426,14 +438,14 @@ int ocfs2_extend_trans(handle_t *handle, int nblocks)
if (!nblocks)
return 0;
- old_nblocks = handle->h_buffer_credits;
+ old_nblocks = jbd2_handle_buffer_credits(handle);
trace_ocfs2_extend_trans(old_nblocks, nblocks);
#ifdef CONFIG_OCFS2_DEBUG_FS
status = 1;
#else
- status = jbd2_journal_extend(handle, nblocks);
+ status = jbd2_journal_extend(handle, nblocks, 0);
if (status < 0) {
mlog_errno(status);
goto bail;
@@ -455,10 +467,56 @@ bail:
return status;
}
-struct ocfs2_triggers {
- struct jbd2_buffer_trigger_type ot_triggers;
- int ot_offset;
-};
+/*
+ * Make sure handle has at least 'nblocks' credits available. If it does not
+ * have that many credits available, we will try to extend the handle to have
+ * enough credits. If that fails, we will restart transaction to have enough
+ * credits. Similar notes regarding data consistency and locking implications
+ * as for ocfs2_extend_trans() apply here.
+ */
+int ocfs2_assure_trans_credits(handle_t *handle, int nblocks)
+{
+ int old_nblks = jbd2_handle_buffer_credits(handle);
+
+ trace_ocfs2_assure_trans_credits(old_nblks);
+ if (old_nblks >= nblocks)
+ return 0;
+ return ocfs2_extend_trans(handle, nblocks - old_nblks);
+}
+
+/*
+ * If we have fewer than thresh credits, extend by OCFS2_MAX_TRANS_DATA.
+ * If that fails, restart the transaction & regain write access for the
+ * buffer head which is used for metadata modifications.
+ * Taken from Ext4: extend_or_restart_transaction()
+ */
+int ocfs2_allocate_extend_trans(handle_t *handle, int thresh)
+{
+ int status, old_nblks;
+
+ BUG_ON(!handle);
+
+ old_nblks = jbd2_handle_buffer_credits(handle);
+ trace_ocfs2_allocate_extend_trans(old_nblks, thresh);
+
+ if (old_nblks < thresh)
+ return 0;
+
+ status = jbd2_journal_extend(handle, OCFS2_MAX_TRANS_DATA, 0);
+ if (status < 0) {
+ mlog_errno(status);
+ goto bail;
+ }
+
+ if (status > 0) {
+ status = jbd2_journal_restart(handle, OCFS2_MAX_TRANS_DATA);
+ if (status < 0)
+ mlog_errno(status);
+ }
+
+bail:
+ return status;
+}
static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers)
{
@@ -523,87 +581,76 @@ static void ocfs2_db_frozen_trigger(struct jbd2_buffer_trigger_type *triggers,
static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers,
struct buffer_head *bh)
{
+ struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers);
+
mlog(ML_ERROR,
"ocfs2_abort_trigger called by JBD2. bh = 0x%lx, "
"bh->b_blocknr = %llu\n",
(unsigned long)bh,
(unsigned long long)bh->b_blocknr);
- /* We aren't guaranteed to have the superblock here - but if we
- * don't, it'll just crash. */
- ocfs2_error(bh->b_assoc_map->host->i_sb,
+ ocfs2_error(ot->sb,
"JBD2 has aborted our journal, ocfs2 cannot continue\n");
}
-static struct ocfs2_triggers di_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
- .ot_offset = offsetof(struct ocfs2_dinode, i_check),
-};
-
-static struct ocfs2_triggers eb_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
- .ot_offset = offsetof(struct ocfs2_extent_block, h_check),
-};
-
-static struct ocfs2_triggers rb_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
- .ot_offset = offsetof(struct ocfs2_refcount_block, rf_check),
-};
-
-static struct ocfs2_triggers gd_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
- .ot_offset = offsetof(struct ocfs2_group_desc, bg_check),
-};
-
-static struct ocfs2_triggers db_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_db_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
-};
+static void ocfs2_setup_csum_triggers(struct super_block *sb,
+ enum ocfs2_journal_trigger_type type,
+ struct ocfs2_triggers *ot)
+{
+ BUG_ON(type >= OCFS2_JOURNAL_TRIGGER_COUNT);
-static struct ocfs2_triggers xb_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
- .ot_offset = offsetof(struct ocfs2_xattr_block, xb_check),
-};
+ switch (type) {
+ case OCFS2_JTR_DI:
+ ot->ot_triggers.t_frozen = ocfs2_frozen_trigger;
+ ot->ot_offset = offsetof(struct ocfs2_dinode, i_check);
+ break;
+ case OCFS2_JTR_EB:
+ ot->ot_triggers.t_frozen = ocfs2_frozen_trigger;
+ ot->ot_offset = offsetof(struct ocfs2_extent_block, h_check);
+ break;
+ case OCFS2_JTR_RB:
+ ot->ot_triggers.t_frozen = ocfs2_frozen_trigger;
+ ot->ot_offset = offsetof(struct ocfs2_refcount_block, rf_check);
+ break;
+ case OCFS2_JTR_GD:
+ ot->ot_triggers.t_frozen = ocfs2_frozen_trigger;
+ ot->ot_offset = offsetof(struct ocfs2_group_desc, bg_check);
+ break;
+ case OCFS2_JTR_DB:
+ ot->ot_triggers.t_frozen = ocfs2_db_frozen_trigger;
+ break;
+ case OCFS2_JTR_XB:
+ ot->ot_triggers.t_frozen = ocfs2_frozen_trigger;
+ ot->ot_offset = offsetof(struct ocfs2_xattr_block, xb_check);
+ break;
+ case OCFS2_JTR_DQ:
+ ot->ot_triggers.t_frozen = ocfs2_dq_frozen_trigger;
+ break;
+ case OCFS2_JTR_DR:
+ ot->ot_triggers.t_frozen = ocfs2_frozen_trigger;
+ ot->ot_offset = offsetof(struct ocfs2_dx_root_block, dr_check);
+ break;
+ case OCFS2_JTR_DL:
+ ot->ot_triggers.t_frozen = ocfs2_frozen_trigger;
+ ot->ot_offset = offsetof(struct ocfs2_dx_leaf, dl_check);
+ break;
+ case OCFS2_JTR_NONE:
+ /* To make compiler happy... */
+ return;
+ }
-static struct ocfs2_triggers dq_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_dq_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
-};
+ ot->ot_triggers.t_abort = ocfs2_abort_trigger;
+ ot->sb = sb;
+}
-static struct ocfs2_triggers dr_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
- .ot_offset = offsetof(struct ocfs2_dx_root_block, dr_check),
-};
+void ocfs2_initialize_journal_triggers(struct super_block *sb,
+ struct ocfs2_triggers triggers[])
+{
+ enum ocfs2_journal_trigger_type type;
-static struct ocfs2_triggers dl_triggers = {
- .ot_triggers = {
- .t_frozen = ocfs2_frozen_trigger,
- .t_abort = ocfs2_abort_trigger,
- },
- .ot_offset = offsetof(struct ocfs2_dx_leaf, dl_check),
-};
+ for (type = OCFS2_JTR_DI; type < OCFS2_JOURNAL_TRIGGER_COUNT; type++)
+ ocfs2_setup_csum_triggers(sb, type, &triggers[type]);
+}
static int __ocfs2_journal_access(handle_t *handle,
struct ocfs2_caching_info *ci,
@@ -626,9 +673,26 @@ static int __ocfs2_journal_access(handle_t *handle,
/* we can safely remove this assertion after testing. */
if (!buffer_uptodate(bh)) {
mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n");
- mlog(ML_ERROR, "b_blocknr=%llu\n",
- (unsigned long long)bh->b_blocknr);
- BUG();
+ mlog(ML_ERROR, "b_blocknr=%llu, b_state=0x%lx\n",
+ (unsigned long long)bh->b_blocknr, bh->b_state);
+
+ lock_buffer(bh);
+ /*
+ * A previous transaction with a couple of buffer heads fail
+ * to checkpoint, so all the bhs are marked as BH_Write_EIO.
+ * For current transaction, the bh is just among those error
+ * bhs which previous transaction handle. We can't just clear
+ * its BH_Write_EIO and reuse directly, since other bhs are
+ * not written to disk yet and that will cause metadata
+ * inconsistency. So we should set fs read-only to avoid
+ * further damage.
+ */
+ if (buffer_write_io_error(bh) && !buffer_uptodate(bh)) {
+ unlock_buffer(bh);
+ return ocfs2_error(osb->sb, "A previous attempt to "
+ "write this buffer head failed\n");
+ }
+ unlock_buffer(bh);
}
/* Set the current transaction information on the ci so
@@ -668,56 +732,91 @@ static int __ocfs2_journal_access(handle_t *handle,
int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_DI],
+ type);
}
int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_EB],
+ type);
}
int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &rb_triggers,
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_RB],
type);
}
int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_GD],
+ type);
}
int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_DB],
+ type);
}
int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_XB],
+ type);
}
int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_DQ],
+ type);
}
int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_DR],
+ type);
}
int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci,
struct buffer_head *bh, int type)
{
- return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type);
+ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
+
+ return __ocfs2_journal_access(handle, ci, bh,
+ &osb->s_journal_triggers[OCFS2_JTR_DL],
+ type);
}
int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci,
@@ -733,7 +832,22 @@ void ocfs2_journal_dirty(handle_t *handle, struct buffer_head *bh)
trace_ocfs2_journal_dirty((unsigned long long)bh->b_blocknr);
status = jbd2_journal_dirty_metadata(handle, bh);
- BUG_ON(status);
+ if (status) {
+ mlog_errno(status);
+ if (!is_handle_aborted(handle)) {
+ journal_t *journal = handle->h_transaction->t_journal;
+
+ mlog(ML_ERROR, "jbd2_journal_dirty_metadata failed: "
+ "handle type %u started at line %u, credits %u/%u "
+ "errcode %d. Aborting transaction and journal.\n",
+ handle->h_type, handle->h_line_no,
+ handle->h_requested_credits,
+ jbd2_handle_buffer_credits(handle), status);
+ handle->h_err = status;
+ jbd2_journal_abort_handle(handle);
+ jbd2_journal_abort(journal, status);
+ }
+ }
}
#define OCFS2_DEFAULT_COMMIT_INTERVAL (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE)
@@ -755,20 +869,54 @@ void ocfs2_set_journal_params(struct ocfs2_super *osb)
write_unlock(&journal->j_state_lock);
}
-int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
+/*
+ * alloc & initialize skeleton for journal structure.
+ * ocfs2_journal_init() will make fs have journal ability.
+ */
+int ocfs2_journal_alloc(struct ocfs2_super *osb)
+{
+ int status = 0;
+ struct ocfs2_journal *journal;
+
+ journal = kzalloc(sizeof(struct ocfs2_journal), GFP_KERNEL);
+ if (!journal) {
+ mlog(ML_ERROR, "unable to alloc journal\n");
+ status = -ENOMEM;
+ goto bail;
+ }
+ osb->journal = journal;
+ journal->j_osb = osb;
+
+ atomic_set(&journal->j_num_trans, 0);
+ init_rwsem(&journal->j_trans_barrier);
+ init_waitqueue_head(&journal->j_checkpointed);
+ spin_lock_init(&journal->j_lock);
+ journal->j_trans_id = 1UL;
+ INIT_LIST_HEAD(&journal->j_la_cleanups);
+ INIT_WORK(&journal->j_recovery_work, ocfs2_complete_recovery);
+ journal->j_state = OCFS2_JOURNAL_FREE;
+
+bail:
+ return status;
+}
+
+static int ocfs2_journal_submit_inode_data_buffers(struct jbd2_inode *jinode)
+{
+ return filemap_fdatawrite_range(jinode->i_vfs_inode->i_mapping,
+ jinode->i_dirty_start, jinode->i_dirty_end);
+}
+
+int ocfs2_journal_init(struct ocfs2_super *osb, int *dirty)
{
int status = -1;
struct inode *inode = NULL; /* the journal inode */
journal_t *j_journal = NULL;
+ struct ocfs2_journal *journal = osb->journal;
struct ocfs2_dinode *di = NULL;
struct buffer_head *bh = NULL;
- struct ocfs2_super *osb;
int inode_lock = 0;
BUG_ON(!journal);
-
- osb = journal->j_osb;
-
/* already have the inode for our journal */
inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
osb->slot_num);
@@ -801,31 +949,35 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
inode_lock = 1;
di = (struct ocfs2_dinode *)bh->b_data;
- if (inode->i_size < OCFS2_MIN_JOURNAL_SIZE) {
+ if (i_size_read(inode) < OCFS2_MIN_JOURNAL_SIZE) {
mlog(ML_ERROR, "Journal file size (%lld) is too small!\n",
- inode->i_size);
+ i_size_read(inode));
status = -EINVAL;
goto done;
}
- trace_ocfs2_journal_init(inode->i_size,
+ trace_ocfs2_journal_init(i_size_read(inode),
(unsigned long long)inode->i_blocks,
OCFS2_I(inode)->ip_clusters);
/* call the kernels journal init function now */
j_journal = jbd2_journal_init_inode(inode);
- if (j_journal == NULL) {
+ if (IS_ERR(j_journal)) {
mlog(ML_ERROR, "Linux journal layer error\n");
- status = -EINVAL;
+ status = PTR_ERR(j_journal);
goto done;
}
- trace_ocfs2_journal_init_maxlen(j_journal->j_maxlen);
+ trace_ocfs2_journal_init_maxlen(j_journal->j_total_len);
*dirty = (le32_to_cpu(di->id1.journal1.ij_flags) &
OCFS2_JOURNAL_DIRTY_FL);
journal->j_journal = j_journal;
+ journal->j_journal->j_submit_inode_data_buffers =
+ ocfs2_journal_submit_inode_data_buffers;
+ journal->j_journal->j_finish_inode_data_buffers =
+ jbd2_journal_finish_inode_data_buffers;
journal->j_inode = inode;
journal->j_bh = bh;
@@ -918,7 +1070,7 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
if (!igrab(inode))
BUG();
- num_running_trans = atomic_read(&(osb->journal->j_num_trans));
+ num_running_trans = atomic_read(&(journal->j_num_trans));
trace_ocfs2_journal_shutdown(num_running_trans);
/* Do a commit_cache here. It will flush our journal, *and*
@@ -937,17 +1089,19 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
osb->commit_task = NULL;
}
- BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0);
+ BUG_ON(atomic_read(&(journal->j_num_trans)) != 0);
- if (ocfs2_mount_local(osb)) {
+ if (ocfs2_mount_local(osb) &&
+ (journal->j_journal->j_flags & JBD2_LOADED)) {
jbd2_journal_lock_updates(journal->j_journal);
- status = jbd2_journal_flush(journal->j_journal);
+ status = jbd2_journal_flush(journal->j_journal, 0);
jbd2_journal_unlock_updates(journal->j_journal);
if (status < 0)
mlog_errno(status);
}
- if (status == 0) {
+ /* Shutdown the kernel journal system */
+ if (!jbd2_journal_destroy(journal->j_journal) && !status) {
/*
* Do not toggle if flush was unsuccessful otherwise
* will leave dirty metadata in a "clean" journal
@@ -956,9 +1110,6 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
if (status < 0)
mlog_errno(status);
}
-
- /* Shutdown the kernel journal system */
- jbd2_journal_destroy(journal->j_journal);
journal->j_journal = NULL;
OCFS2_I(inode)->ip_open_count--;
@@ -971,10 +1122,10 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
journal->j_state = OCFS2_JOURNAL_FREE;
-// up_write(&journal->j_trans_barrier);
done:
- if (inode)
- iput(inode);
+ iput(inode);
+ kfree(journal);
+ osb->journal = NULL;
}
static void ocfs2_clear_journal_error(struct super_block *sb,
@@ -1012,6 +1163,14 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed)
ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num);
+ if (replayed) {
+ jbd2_journal_lock_updates(journal->j_journal);
+ status = jbd2_journal_flush(journal->j_journal, 0);
+ jbd2_journal_unlock_updates(journal->j_journal);
+ if (status < 0)
+ mlog_errno(status);
+ }
+
status = ocfs2_journal_toggle_dirty(osb, 1, replayed);
if (status < 0) {
mlog_errno(status);
@@ -1021,7 +1180,7 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed)
/* Launch the commit thread */
if (!local) {
osb->commit_task = kthread_run(ocfs2_commit_thread, osb,
- "ocfs2cmt");
+ "ocfs2cmt-%s", osb->uuid_str);
if (IS_ERR(osb->commit_task)) {
status = PTR_ERR(osb->commit_task);
osb->commit_task = NULL;
@@ -1091,12 +1250,10 @@ static int ocfs2_force_read_journal(struct inode *inode)
int status = 0;
int i;
u64 v_blkno, p_blkno, p_blocks, num_blocks;
-#define CONCURRENT_JOURNAL_FILL 32ULL
- struct buffer_head *bhs[CONCURRENT_JOURNAL_FILL];
-
- memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL);
+ struct buffer_head *bh = NULL;
+ struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
- num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, inode->i_size);
+ num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, i_size_read(inode));
v_blkno = 0;
while (v_blkno < num_blocks) {
status = ocfs2_extent_map_get_blocks(inode, v_blkno,
@@ -1106,29 +1263,32 @@ static int ocfs2_force_read_journal(struct inode *inode)
goto bail;
}
- if (p_blocks > CONCURRENT_JOURNAL_FILL)
- p_blocks = CONCURRENT_JOURNAL_FILL;
-
- /* We are reading journal data which should not
- * be put in the uptodate cache */
- status = ocfs2_read_blocks_sync(OCFS2_SB(inode->i_sb),
- p_blkno, p_blocks, bhs);
- if (status < 0) {
- mlog_errno(status);
- goto bail;
- }
-
- for(i = 0; i < p_blocks; i++) {
- brelse(bhs[i]);
- bhs[i] = NULL;
+ for (i = 0; i < p_blocks; i++, p_blkno++) {
+ bh = __find_get_block_nonatomic(osb->sb->s_bdev, p_blkno,
+ osb->sb->s_blocksize);
+ /* block not cached. */
+ if (!bh)
+ continue;
+
+ brelse(bh);
+ bh = NULL;
+ /* We are reading journal data which should not
+ * be put in the uptodate cache.
+ */
+ status = ocfs2_read_blocks_sync(osb, p_blkno, 1, &bh);
+ if (status < 0) {
+ mlog_errno(status);
+ goto bail;
+ }
+
+ brelse(bh);
+ bh = NULL;
}
v_blkno += p_blocks;
}
bail:
- for(i = 0; i < CONCURRENT_JOURNAL_FILL; i++)
- brelse(bhs[i]);
return status;
}
@@ -1138,6 +1298,7 @@ struct ocfs2_la_recovery_item {
struct ocfs2_dinode *lri_la_dinode;
struct ocfs2_dinode *lri_tl_dinode;
struct ocfs2_quota_recovery *lri_qrec;
+ enum ocfs2_orphan_reco_type lri_orphan_reco_type;
};
/* Does the second half of the recovery process. By this point, the
@@ -1159,6 +1320,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
struct ocfs2_dinode *la_dinode, *tl_dinode;
struct ocfs2_la_recovery_item *item, *n;
struct ocfs2_quota_recovery *qrec;
+ enum ocfs2_orphan_reco_type orphan_reco_type;
LIST_HEAD(tmp_la_list);
trace_ocfs2_complete_recovery(
@@ -1176,6 +1338,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
la_dinode = item->lri_la_dinode;
tl_dinode = item->lri_tl_dinode;
qrec = item->lri_qrec;
+ orphan_reco_type = item->lri_orphan_reco_type;
trace_ocfs2_complete_recovery_slot(item->lri_slot,
la_dinode ? le64_to_cpu(la_dinode->i_blkno) : 0,
@@ -1200,7 +1363,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
kfree(tl_dinode);
}
- ret = ocfs2_recover_orphans(osb, item->lri_slot);
+ ret = ocfs2_recover_orphans(osb, item->lri_slot,
+ orphan_reco_type);
if (ret < 0)
mlog_errno(ret);
@@ -1225,7 +1389,8 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
int slot_num,
struct ocfs2_dinode *la_dinode,
struct ocfs2_dinode *tl_dinode,
- struct ocfs2_quota_recovery *qrec)
+ struct ocfs2_quota_recovery *qrec,
+ enum ocfs2_orphan_reco_type orphan_reco_type)
{
struct ocfs2_la_recovery_item *item;
@@ -1249,10 +1414,11 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
item->lri_slot = slot_num;
item->lri_tl_dinode = tl_dinode;
item->lri_qrec = qrec;
+ item->lri_orphan_reco_type = orphan_reco_type;
spin_lock(&journal->j_lock);
list_add_tail(&item->lri_list, &journal->j_la_cleanups);
- queue_work(ocfs2_wq, &journal->j_recovery_work);
+ queue_work(journal->j_osb->ocfs2_wq, &journal->j_recovery_work);
spin_unlock(&journal->j_lock);
}
@@ -1268,15 +1434,15 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
/* No need to queue up our truncate_log as regular cleanup will catch
* that */
ocfs2_queue_recovery_completion(journal, osb->slot_num,
- osb->local_alloc_copy, NULL, NULL);
+ osb->local_alloc_copy, NULL, NULL,
+ ORPHAN_NEED_TRUNCATE);
ocfs2_schedule_truncate_log_flush(osb, 0);
osb->local_alloc_copy = NULL;
- osb->dirty = 0;
/* queue to recover orphan slots for all offline slots */
ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
- ocfs2_queue_replay_slots(osb);
+ ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE);
ocfs2_free_replay_slots(osb);
}
@@ -1287,7 +1453,8 @@ void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
osb->slot_num,
NULL,
NULL,
- osb->quota_rec);
+ osb->quota_rec,
+ ORPHAN_NEED_TRUNCATE);
osb->quota_rec = NULL;
}
}
@@ -1301,17 +1468,37 @@ static int __ocfs2_recovery_thread(void *arg)
int rm_quota_used = 0, i;
struct ocfs2_quota_recovery *qrec;
+ /* Whether the quota supported. */
+ int quota_enabled = OCFS2_HAS_RO_COMPAT_FEATURE(osb->sb,
+ OCFS2_FEATURE_RO_COMPAT_USRQUOTA)
+ || OCFS2_HAS_RO_COMPAT_FEATURE(osb->sb,
+ OCFS2_FEATURE_RO_COMPAT_GRPQUOTA);
+
status = ocfs2_wait_on_mount(osb);
if (status < 0) {
goto bail;
}
- rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
- if (!rm_quota) {
- status = -ENOMEM;
- goto bail;
+ if (quota_enabled) {
+ rm_quota = kcalloc(osb->max_slots, sizeof(int), GFP_NOFS);
+ if (!rm_quota) {
+ status = -ENOMEM;
+ goto bail;
+ }
}
restart:
+ if (quota_enabled) {
+ mutex_lock(&osb->recovery_lock);
+ /* Confirm that recovery thread will no longer recover quotas */
+ if (osb->recovery_state == OCFS2_REC_QUOTA_WANT_DISABLE) {
+ osb->recovery_state = OCFS2_REC_QUOTA_DISABLED;
+ wake_up(&osb->recovery_event);
+ }
+ if (osb->recovery_state >= OCFS2_REC_QUOTA_DISABLED)
+ quota_enabled = 0;
+ mutex_unlock(&osb->recovery_lock);
+ }
+
status = ocfs2_super_lock(osb, 1);
if (status < 0) {
mlog_errno(status);
@@ -1324,7 +1511,7 @@ restart:
/* queue recovery for our own slot */
ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
- NULL, NULL);
+ NULL, NULL, ORPHAN_NO_NEED_TRUNCATE);
spin_lock(&osb->osb_lock);
while (rm->rm_used) {
@@ -1345,9 +1532,14 @@ restart:
* then quota usage would be out of sync until some node takes
* the slot. So we remember which nodes need quota recovery
* and when everything else is done, we recover quotas. */
- for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
- if (i == rm_quota_used)
- rm_quota[rm_quota_used++] = slot_num;
+ if (quota_enabled) {
+ for (i = 0; i < rm_quota_used
+ && rm_quota[i] != slot_num; i++)
+ ;
+
+ if (i == rm_quota_used)
+ rm_quota[rm_quota_used++] = slot_num;
+ }
status = ocfs2_recover_node(osb, node_num, slot_num);
skip_recovery:
@@ -1375,21 +1567,25 @@ skip_recovery:
/* Now it is right time to recover quotas... We have to do this under
* superblock lock so that no one can start using the slot (and crash)
* before we recover it */
- for (i = 0; i < rm_quota_used; i++) {
- qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
- if (IS_ERR(qrec)) {
- status = PTR_ERR(qrec);
- mlog_errno(status);
- continue;
+ if (quota_enabled) {
+ for (i = 0; i < rm_quota_used; i++) {
+ qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
+ if (IS_ERR(qrec)) {
+ status = PTR_ERR(qrec);
+ mlog_errno(status);
+ continue;
+ }
+ ocfs2_queue_recovery_completion(osb->journal,
+ rm_quota[i],
+ NULL, NULL, qrec,
+ ORPHAN_NEED_TRUNCATE);
}
- ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
- NULL, NULL, qrec);
}
ocfs2_super_unlock(osb, 1);
/* queue recovery for offline slots */
- ocfs2_queue_replay_slots(osb);
+ ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE);
bail:
mutex_lock(&osb->recovery_lock);
@@ -1400,37 +1596,36 @@ bail:
ocfs2_free_replay_slots(osb);
osb->recovery_thread_task = NULL;
- mb(); /* sync with ocfs2_recovery_thread_running */
+ if (osb->recovery_state == OCFS2_REC_WANT_DISABLE)
+ osb->recovery_state = OCFS2_REC_DISABLED;
wake_up(&osb->recovery_event);
mutex_unlock(&osb->recovery_lock);
kfree(rm_quota);
- /* no one is callint kthread_stop() for us so the kthread() api
- * requires that we call do_exit(). And it isn't exported, but
- * complete_and_exit() seems to be a minimal wrapper around it. */
- complete_and_exit(NULL, status);
return status;
}
void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
{
+ int was_set = -1;
+
mutex_lock(&osb->recovery_lock);
+ if (osb->recovery_state < OCFS2_REC_WANT_DISABLE)
+ was_set = ocfs2_recovery_map_set(osb, node_num);
trace_ocfs2_recovery_thread(node_num, osb->node_num,
- osb->disable_recovery, osb->recovery_thread_task,
- osb->disable_recovery ?
- -1 : ocfs2_recovery_map_set(osb, node_num));
+ osb->recovery_state, osb->recovery_thread_task, was_set);
- if (osb->disable_recovery)
+ if (osb->recovery_state >= OCFS2_REC_WANT_DISABLE)
goto out;
if (osb->recovery_thread_task)
goto out;
osb->recovery_thread_task = kthread_run(__ocfs2_recovery_thread, osb,
- "ocfs2rec");
+ "ocfs2rec-%s", osb->uuid_str);
if (IS_ERR(osb->recovery_thread_task)) {
mlog_errno((int)PTR_ERR(osb->recovery_thread_task));
osb->recovery_thread_task = NULL;
@@ -1558,17 +1753,16 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
}
journal = jbd2_journal_init_inode(inode);
- if (journal == NULL) {
+ if (IS_ERR(journal)) {
mlog(ML_ERROR, "Linux journal layer error\n");
- status = -EIO;
+ status = PTR_ERR(journal);
goto done;
}
status = jbd2_journal_load(journal);
if (status < 0) {
mlog_errno(status);
- if (!igrab(inode))
- BUG();
+ BUG_ON(!igrab(inode));
jbd2_journal_destroy(journal);
goto done;
}
@@ -1577,7 +1771,7 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
/* wipe the journal */
jbd2_journal_lock_updates(journal);
- status = jbd2_journal_flush(journal);
+ status = jbd2_journal_flush(journal, 0);
jbd2_journal_unlock_updates(journal);
if (status < 0)
mlog_errno(status);
@@ -1597,8 +1791,7 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
if (status < 0)
mlog_errno(status);
- if (!igrab(inode))
- BUG();
+ BUG_ON(!igrab(inode));
jbd2_journal_destroy(journal);
@@ -1610,9 +1803,7 @@ done:
if (got_lock)
ocfs2_inode_unlock(inode, 1);
- if (inode)
- iput(inode);
-
+ iput(inode);
brelse(bh);
return status;
@@ -1676,7 +1867,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
/* This will kfree the memory pointed to by la_copy and tl_copy */
ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
- tl_copy, NULL);
+ tl_copy, NULL, ORPHAN_NEED_TRUNCATE);
status = 0;
done:
@@ -1719,8 +1910,7 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb,
ocfs2_inode_unlock(inode, 1);
bail:
- if (inode)
- iput(inode);
+ iput(inode);
return status;
}
@@ -1795,7 +1985,7 @@ bail:
/*
* Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
- * randomness to the timeout to minimize multple nodes firing the timer at the
+ * randomness to the timeout to minimize multiple nodes firing the timer at the
* same time.
*/
static inline unsigned long ocfs2_orphan_scan_timeout(void)
@@ -1834,7 +2024,7 @@ static inline unsigned long ocfs2_orphan_scan_timeout(void)
* hasn't happened. The node queues a scan and increments the
* sequence number in the LVB.
*/
-void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
+static void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
{
struct ocfs2_orphan_scan *os;
int status, i;
@@ -1866,14 +2056,14 @@ void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
for (i = 0; i < osb->max_slots; i++)
ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
- NULL);
+ NULL, ORPHAN_NO_NEED_TRUNCATE);
/*
* We queued a recovery on orphan slots, increment the sequence
* number and update LVB so other node will skip the scan for a while
*/
seqno++;
os->os_count++;
- os->os_scantime = CURRENT_TIME;
+ os->os_scantime = ktime_get_seconds();
unlock:
ocfs2_orphan_scan_unlock(osb, seqno);
out:
@@ -1883,7 +2073,7 @@ out:
}
/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
-void ocfs2_orphan_scan_work(struct work_struct *work)
+static void ocfs2_orphan_scan_work(struct work_struct *work)
{
struct ocfs2_orphan_scan *os;
struct ocfs2_super *osb;
@@ -1895,7 +2085,7 @@ void ocfs2_orphan_scan_work(struct work_struct *work)
mutex_lock(&os->os_lock);
ocfs2_queue_orphan_scan(osb);
if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE)
- queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work,
+ queue_delayed_work(osb->ocfs2_wq, &os->os_orphan_scan_work,
ocfs2_orphan_scan_timeout());
mutex_unlock(&os->os_lock);
}
@@ -1930,12 +2120,12 @@ void ocfs2_orphan_scan_start(struct ocfs2_super *osb)
struct ocfs2_orphan_scan *os;
os = &osb->osb_orphan_scan;
- os->os_scantime = CURRENT_TIME;
+ os->os_scantime = ktime_get_seconds();
if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
else {
atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE);
- queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work,
+ queue_delayed_work(osb->ocfs2_wq, &os->os_orphan_scan_work,
ocfs2_orphan_scan_timeout());
}
}
@@ -1944,24 +2134,44 @@ struct ocfs2_orphan_filldir_priv {
struct dir_context ctx;
struct inode *head;
struct ocfs2_super *osb;
+ enum ocfs2_orphan_reco_type orphan_reco_type;
};
-static int ocfs2_orphan_filldir(void *priv, const char *name, int name_len,
- loff_t pos, u64 ino, unsigned type)
+static bool ocfs2_orphan_filldir(struct dir_context *ctx, const char *name,
+ int name_len, loff_t pos, u64 ino,
+ unsigned type)
{
- struct ocfs2_orphan_filldir_priv *p = priv;
+ struct ocfs2_orphan_filldir_priv *p =
+ container_of(ctx, struct ocfs2_orphan_filldir_priv, ctx);
struct inode *iter;
if (name_len == 1 && !strncmp(".", name, 1))
- return 0;
+ return true;
if (name_len == 2 && !strncmp("..", name, 2))
- return 0;
+ return true;
+
+ /* do not include dio entry in case of orphan scan */
+ if ((p->orphan_reco_type == ORPHAN_NO_NEED_TRUNCATE) &&
+ (!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX,
+ OCFS2_DIO_ORPHAN_PREFIX_LEN)))
+ return true;
/* Skip bad inodes so that recovery can continue */
iter = ocfs2_iget(p->osb, ino,
OCFS2_FI_FLAG_ORPHAN_RECOVERY, 0);
if (IS_ERR(iter))
- return 0;
+ return true;
+
+ if (!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX,
+ OCFS2_DIO_ORPHAN_PREFIX_LEN))
+ OCFS2_I(iter)->ip_flags |= OCFS2_INODE_DIO_ORPHAN_ENTRY;
+
+ /* Skip inodes which are already added to recover list, since dio may
+ * happen concurrently with unlink/rename */
+ if (OCFS2_I(iter)->ip_next_orphan) {
+ iput(iter);
+ return true;
+ }
trace_ocfs2_orphan_filldir((unsigned long long)OCFS2_I(iter)->ip_blkno);
/* No locking is required for the next_orphan queue as there
@@ -1969,19 +2179,21 @@ static int ocfs2_orphan_filldir(void *priv, const char *name, int name_len,
OCFS2_I(iter)->ip_next_orphan = p->head;
p->head = iter;
- return 0;
+ return true;
}
static int ocfs2_queue_orphans(struct ocfs2_super *osb,
int slot,
- struct inode **head)
+ struct inode **head,
+ enum ocfs2_orphan_reco_type orphan_reco_type)
{
int status;
struct inode *orphan_dir_inode = NULL;
struct ocfs2_orphan_filldir_priv priv = {
.ctx.actor = ocfs2_orphan_filldir,
.osb = osb,
- .head = *head
+ .head = *head,
+ .orphan_reco_type = orphan_reco_type
};
orphan_dir_inode = ocfs2_get_system_file_inode(osb,
@@ -1993,7 +2205,7 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb,
return status;
}
- mutex_lock(&orphan_dir_inode->i_mutex);
+ inode_lock(orphan_dir_inode);
status = ocfs2_inode_lock(orphan_dir_inode, NULL, 0);
if (status < 0) {
mlog_errno(status);
@@ -2011,7 +2223,7 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb,
out_cluster:
ocfs2_inode_unlock(orphan_dir_inode, 0);
out:
- mutex_unlock(&orphan_dir_inode->i_mutex);
+ inode_unlock(orphan_dir_inode);
iput(orphan_dir_inode);
return status;
}
@@ -2071,17 +2283,20 @@ static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
* advertising our state to ocfs2_delete_inode().
*/
static int ocfs2_recover_orphans(struct ocfs2_super *osb,
- int slot)
+ int slot,
+ enum ocfs2_orphan_reco_type orphan_reco_type)
{
int ret = 0;
struct inode *inode = NULL;
struct inode *iter;
struct ocfs2_inode_info *oi;
+ struct buffer_head *di_bh = NULL;
+ struct ocfs2_dinode *di = NULL;
trace_ocfs2_recover_orphans(slot);
ocfs2_mark_recovering_orphan_dir(osb, slot);
- ret = ocfs2_queue_orphans(osb, slot, &inode);
+ ret = ocfs2_queue_orphans(osb, slot, &inode, orphan_reco_type);
ocfs2_clear_recovering_orphan_dir(osb, slot);
/* Error here should be noted, but we want to continue with as
@@ -2095,21 +2310,61 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
(unsigned long long)oi->ip_blkno);
iter = oi->ip_next_orphan;
+ oi->ip_next_orphan = NULL;
- spin_lock(&oi->ip_lock);
- /* The remote delete code may have set these on the
- * assumption that the other node would wipe them
- * successfully. If they are still in the node's
- * orphan dir, we need to reset that state. */
- oi->ip_flags &= ~(OCFS2_INODE_DELETED|OCFS2_INODE_SKIP_DELETE);
-
- /* Set the proper information to get us going into
- * ocfs2_delete_inode. */
- oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
- spin_unlock(&oi->ip_lock);
+ if (oi->ip_flags & OCFS2_INODE_DIO_ORPHAN_ENTRY) {
+ inode_lock(inode);
+ ret = ocfs2_rw_lock(inode, 1);
+ if (ret < 0) {
+ mlog_errno(ret);
+ goto unlock_mutex;
+ }
+ /*
+ * We need to take and drop the inode lock to
+ * force read inode from disk.
+ */
+ ret = ocfs2_inode_lock(inode, &di_bh, 1);
+ if (ret) {
+ mlog_errno(ret);
+ goto unlock_rw;
+ }
+
+ di = (struct ocfs2_dinode *)di_bh->b_data;
+
+ if (di->i_flags & cpu_to_le32(OCFS2_DIO_ORPHANED_FL)) {
+ ret = ocfs2_truncate_file(inode, di_bh,
+ i_size_read(inode));
+ if (ret < 0) {
+ if (ret != -ENOSPC)
+ mlog_errno(ret);
+ goto unlock_inode;
+ }
+
+ ret = ocfs2_del_inode_from_orphan(osb, inode,
+ di_bh, 0, 0);
+ if (ret)
+ mlog_errno(ret);
+ }
+unlock_inode:
+ ocfs2_inode_unlock(inode, 1);
+ brelse(di_bh);
+ di_bh = NULL;
+unlock_rw:
+ ocfs2_rw_unlock(inode, 1);
+unlock_mutex:
+ inode_unlock(inode);
+
+ /* clear dio flag in ocfs2_inode_info */
+ oi->ip_flags &= ~OCFS2_INODE_DIO_ORPHAN_ENTRY;
+ } else {
+ spin_lock(&oi->ip_lock);
+ /* Set the proper information to get us going into
+ * ocfs2_delete_inode. */
+ oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
+ spin_unlock(&oi->ip_lock);
+ }
iput(inode);
-
inode = iter;
}
@@ -2156,8 +2411,20 @@ static int ocfs2_commit_thread(void *arg)
|| kthread_should_stop());
status = ocfs2_commit_cache(osb);
- if (status < 0)
- mlog_errno(status);
+ if (status < 0) {
+ static unsigned long abort_warn_time;
+
+ /* Warn about this once per minute */
+ if (printk_timed_ratelimit(&abort_warn_time, 60*HZ))
+ mlog(ML_ERROR, "status = %d, journal is "
+ "already aborted.\n", status);
+ /*
+ * After ocfs2_commit_cache() fails, j_num_trans has a
+ * non-zero value. Sleep here to avoid a busy-wait
+ * loop.
+ */
+ msleep_interruptible(1000);
+ }
if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){
mlog(ML_KTHREAD,