From 2205363dce7447b8e85f1ead14387664c1a98753 Mon Sep 17 00:00:00 2001 From: Jan Kara Date: Mon, 20 Oct 2008 23:50:38 +0200 Subject: ocfs2: Implement quota recovery Implement functions for recovery after a crash. Functions just read local quota file and sync info to global quota file. Signed-off-by: Jan Kara Signed-off-by: Mark Fasheh --- fs/ocfs2/quota_local.c | 425 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 417 insertions(+), 8 deletions(-) (limited to 'fs/ocfs2/quota_local.c') diff --git a/fs/ocfs2/quota_local.c b/fs/ocfs2/quota_local.c index 40e82b483136..b98562174cd0 100644 --- a/fs/ocfs2/quota_local.c +++ b/fs/ocfs2/quota_local.c @@ -49,14 +49,25 @@ static unsigned int ol_quota_chunk_block(struct super_block *sb, int c) return 1 + (ol_chunk_blocks(sb) + 1) * c; } -/* Offset of the dquot structure in the quota file */ -static loff_t ol_dqblk_off(struct super_block *sb, int c, int off) +static unsigned int ol_dqblk_block(struct super_block *sb, int c, int off) +{ + int epb = ol_quota_entries_per_block(sb); + + return ol_quota_chunk_block(sb, c) + 1 + off / epb; +} + +static unsigned int ol_dqblk_block_off(struct super_block *sb, int c, int off) { int epb = ol_quota_entries_per_block(sb); - return ((ol_quota_chunk_block(sb, c) + 1 + off / epb) - << sb->s_blocksize_bits) + - (off % epb) * sizeof(struct ocfs2_local_disk_dqblk); + return (off % epb) * sizeof(struct ocfs2_local_disk_dqblk); +} + +/* Offset of the dquot structure in the quota file */ +static loff_t ol_dqblk_off(struct super_block *sb, int c, int off) +{ + return (ol_dqblk_block(sb, c, off) << sb->s_blocksize_bits) + + ol_dqblk_block_off(sb, c, off); } /* Compute block number from given offset */ @@ -253,6 +264,379 @@ static void olq_update_info(struct buffer_head *bh, void *private) spin_unlock(&dq_data_lock); } +static int ocfs2_add_recovery_chunk(struct super_block *sb, + struct ocfs2_local_disk_chunk *dchunk, + int chunk, + struct list_head *head) +{ + struct ocfs2_recovery_chunk *rc; + + rc = kmalloc(sizeof(struct ocfs2_recovery_chunk), GFP_NOFS); + if (!rc) + return -ENOMEM; + rc->rc_chunk = chunk; + rc->rc_bitmap = kmalloc(sb->s_blocksize, GFP_NOFS); + if (!rc->rc_bitmap) { + kfree(rc); + return -ENOMEM; + } + memcpy(rc->rc_bitmap, dchunk->dqc_bitmap, + (ol_chunk_entries(sb) + 7) >> 3); + list_add_tail(&rc->rc_list, head); + return 0; +} + +static void free_recovery_list(struct list_head *head) +{ + struct ocfs2_recovery_chunk *next; + struct ocfs2_recovery_chunk *rchunk; + + list_for_each_entry_safe(rchunk, next, head, rc_list) { + list_del(&rchunk->rc_list); + kfree(rchunk->rc_bitmap); + kfree(rchunk); + } +} + +void ocfs2_free_quota_recovery(struct ocfs2_quota_recovery *rec) +{ + int type; + + for (type = 0; type < MAXQUOTAS; type++) + free_recovery_list(&(rec->r_list[type])); + kfree(rec); +} + +/* Load entries in our quota file we have to recover*/ +static int ocfs2_recovery_load_quota(struct inode *lqinode, + struct ocfs2_local_disk_dqinfo *ldinfo, + int type, + struct list_head *head) +{ + struct super_block *sb = lqinode->i_sb; + struct buffer_head *hbh; + struct ocfs2_local_disk_chunk *dchunk; + int i, chunks = le32_to_cpu(ldinfo->dqi_chunks); + int status = 0; + + for (i = 0; i < chunks; i++) { + hbh = ocfs2_read_quota_block(lqinode, + ol_quota_chunk_block(sb, i), + &status); + if (!hbh) { + mlog_errno(status); + break; + } + dchunk = (struct ocfs2_local_disk_chunk *)hbh->b_data; + if (le32_to_cpu(dchunk->dqc_free) < ol_chunk_entries(sb)) + status = ocfs2_add_recovery_chunk(sb, dchunk, i, head); + brelse(hbh); + if (status < 0) + break; + } + if (status < 0) + free_recovery_list(head); + return status; +} + +static struct ocfs2_quota_recovery *ocfs2_alloc_quota_recovery(void) +{ + int type; + struct ocfs2_quota_recovery *rec; + + rec = kmalloc(sizeof(struct ocfs2_quota_recovery), GFP_NOFS); + if (!rec) + return NULL; + for (type = 0; type < MAXQUOTAS; type++) + INIT_LIST_HEAD(&(rec->r_list[type])); + return rec; +} + +/* Load information we need for quota recovery into memory */ +struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery( + struct ocfs2_super *osb, + int slot_num) +{ + unsigned int feature[MAXQUOTAS] = { OCFS2_FEATURE_RO_COMPAT_USRQUOTA, + OCFS2_FEATURE_RO_COMPAT_GRPQUOTA}; + unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE, + LOCAL_GROUP_QUOTA_SYSTEM_INODE }; + struct super_block *sb = osb->sb; + struct ocfs2_local_disk_dqinfo *ldinfo; + struct inode *lqinode; + struct buffer_head *bh; + int type; + int status = 0; + struct ocfs2_quota_recovery *rec; + + mlog(ML_NOTICE, "Beginning quota recovery in slot %u\n", slot_num); + rec = ocfs2_alloc_quota_recovery(); + if (!rec) + return ERR_PTR(-ENOMEM); + /* First init... */ + + for (type = 0; type < MAXQUOTAS; type++) { + if (!OCFS2_HAS_RO_COMPAT_FEATURE(sb, feature[type])) + continue; + /* At this point, journal of the slot is already replayed so + * we can trust metadata and data of the quota file */ + lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num); + if (!lqinode) { + status = -ENOENT; + goto out; + } + status = ocfs2_inode_lock_full(lqinode, NULL, 1, + OCFS2_META_LOCK_RECOVERY); + if (status < 0) { + mlog_errno(status); + goto out_put; + } + /* Now read local header */ + bh = ocfs2_read_quota_block(lqinode, 0, &status); + if (!bh) { + mlog_errno(status); + mlog(ML_ERROR, "failed to read quota file info header " + "(slot=%d type=%d)\n", slot_num, type); + goto out_lock; + } + ldinfo = (struct ocfs2_local_disk_dqinfo *)(bh->b_data + + OCFS2_LOCAL_INFO_OFF); + status = ocfs2_recovery_load_quota(lqinode, ldinfo, type, + &rec->r_list[type]); + brelse(bh); +out_lock: + ocfs2_inode_unlock(lqinode, 1); +out_put: + iput(lqinode); + if (status < 0) + break; + } +out: + if (status < 0) { + ocfs2_free_quota_recovery(rec); + rec = ERR_PTR(status); + } + return rec; +} + +/* Sync changes in local quota file into global quota file and + * reinitialize local quota file. + * The function expects local quota file to be already locked and + * dqonoff_mutex locked. */ +static int ocfs2_recover_local_quota_file(struct inode *lqinode, + int type, + struct ocfs2_quota_recovery *rec) +{ + struct super_block *sb = lqinode->i_sb; + struct ocfs2_mem_dqinfo *oinfo = sb_dqinfo(sb, type)->dqi_priv; + struct ocfs2_local_disk_chunk *dchunk; + struct ocfs2_local_disk_dqblk *dqblk; + struct dquot *dquot; + handle_t *handle; + struct buffer_head *hbh = NULL, *qbh = NULL; + int status = 0; + int bit, chunk; + struct ocfs2_recovery_chunk *rchunk, *next; + qsize_t spacechange, inodechange; + + mlog_entry("ino=%lu type=%u", (unsigned long)lqinode->i_ino, type); + + status = ocfs2_lock_global_qf(oinfo, 1); + if (status < 0) + goto out; + + list_for_each_entry_safe(rchunk, next, &(rec->r_list[type]), rc_list) { + chunk = rchunk->rc_chunk; + hbh = ocfs2_read_quota_block(lqinode, + ol_quota_chunk_block(sb, chunk), + &status); + if (!hbh) { + mlog_errno(status); + break; + } + dchunk = (struct ocfs2_local_disk_chunk *)hbh->b_data; + for_each_bit(bit, rchunk->rc_bitmap, ol_chunk_entries(sb)) { + qbh = ocfs2_read_quota_block(lqinode, + ol_dqblk_block(sb, chunk, bit), + &status); + if (!qbh) { + mlog_errno(status); + break; + } + dqblk = (struct ocfs2_local_disk_dqblk *)(qbh->b_data + + ol_dqblk_block_off(sb, chunk, bit)); + dquot = dqget(sb, le64_to_cpu(dqblk->dqb_id), type); + if (!dquot) { + status = -EIO; + mlog(ML_ERROR, "Failed to get quota structure " + "for id %u, type %d. Cannot finish quota " + "file recovery.\n", + (unsigned)le64_to_cpu(dqblk->dqb_id), + type); + goto out_put_bh; + } + handle = ocfs2_start_trans(OCFS2_SB(sb), + OCFS2_QSYNC_CREDITS); + if (IS_ERR(handle)) { + status = PTR_ERR(handle); + mlog_errno(status); + goto out_put_dquot; + } + mutex_lock(&sb_dqopt(sb)->dqio_mutex); + spin_lock(&dq_data_lock); + /* Add usage from quota entry into quota changes + * of our node. Auxiliary variables are important + * due to signedness */ + spacechange = le64_to_cpu(dqblk->dqb_spacemod); + inodechange = le64_to_cpu(dqblk->dqb_inodemod); + dquot->dq_dqb.dqb_curspace += spacechange; + dquot->dq_dqb.dqb_curinodes += inodechange; + spin_unlock(&dq_data_lock); + /* We want to drop reference held by the crashed + * node. Since we have our own reference we know + * global structure actually won't be freed. */ + status = ocfs2_global_release_dquot(dquot); + if (status < 0) { + mlog_errno(status); + goto out_commit; + } + /* Release local quota file entry */ + status = ocfs2_journal_access(handle, lqinode, + qbh, OCFS2_JOURNAL_ACCESS_WRITE); + if (status < 0) { + mlog_errno(status); + goto out_commit; + } + lock_buffer(qbh); + WARN_ON(!ocfs2_test_bit(bit, dchunk->dqc_bitmap)); + ocfs2_clear_bit(bit, dchunk->dqc_bitmap); + le32_add_cpu(&dchunk->dqc_free, 1); + unlock_buffer(qbh); + status = ocfs2_journal_dirty(handle, qbh); + if (status < 0) + mlog_errno(status); +out_commit: + mutex_unlock(&sb_dqopt(sb)->dqio_mutex); + ocfs2_commit_trans(OCFS2_SB(sb), handle); +out_put_dquot: + dqput(dquot); +out_put_bh: + brelse(qbh); + if (status < 0) + break; + } + brelse(hbh); + list_del(&rchunk->rc_list); + kfree(rchunk->rc_bitmap); + kfree(rchunk); + if (status < 0) + break; + } + ocfs2_unlock_global_qf(oinfo, 1); +out: + if (status < 0) + free_recovery_list(&(rec->r_list[type])); + mlog_exit(status); + return status; +} + +/* Recover local quota files for given node different from us */ +int ocfs2_finish_quota_recovery(struct ocfs2_super *osb, + struct ocfs2_quota_recovery *rec, + int slot_num) +{ + unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE, + LOCAL_GROUP_QUOTA_SYSTEM_INODE }; + struct super_block *sb = osb->sb; + struct ocfs2_local_disk_dqinfo *ldinfo; + struct buffer_head *bh; + handle_t *handle; + int type; + int status = 0; + struct inode *lqinode; + unsigned int flags; + + mlog(ML_NOTICE, "Finishing quota recovery in slot %u\n", slot_num); + mutex_lock(&sb_dqopt(sb)->dqonoff_mutex); + for (type = 0; type < MAXQUOTAS; type++) { + if (list_empty(&(rec->r_list[type]))) + continue; + mlog(0, "Recovering quota in slot %d\n", slot_num); + lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num); + if (!lqinode) { + status = -ENOENT; + goto out; + } + status = ocfs2_inode_lock_full(lqinode, NULL, 1, + OCFS2_META_LOCK_NOQUEUE); + /* Someone else is holding the lock? Then he must be + * doing the recovery. Just skip the file... */ + if (status == -EAGAIN) { + mlog(ML_NOTICE, "skipping quota recovery for slot %d " + "because quota file is locked.\n", slot_num); + status = 0; + goto out_put; + } else if (status < 0) { + mlog_errno(status); + goto out_put; + } + /* Now read local header */ + bh = ocfs2_read_quota_block(lqinode, 0, &status); + if (!bh) { + mlog_errno(status); + mlog(ML_ERROR, "failed to read quota file info header " + "(slot=%d type=%d)\n", slot_num, type); + goto out_lock; + } + ldinfo = (struct ocfs2_local_disk_dqinfo *)(bh->b_data + + OCFS2_LOCAL_INFO_OFF); + /* Is recovery still needed? */ + flags = le32_to_cpu(ldinfo->dqi_flags); + if (!(flags & OLQF_CLEAN)) + status = ocfs2_recover_local_quota_file(lqinode, + type, + rec); + /* We don't want to mark file as clean when it is actually + * active */ + if (slot_num == osb->slot_num) + goto out_bh; + /* Mark quota file as clean if we are recovering quota file of + * some other node. */ + handle = ocfs2_start_trans(osb, 1); + if (IS_ERR(handle)) { + status = PTR_ERR(handle); + mlog_errno(status); + goto out_bh; + } + status = ocfs2_journal_access(handle, lqinode, bh, + OCFS2_JOURNAL_ACCESS_WRITE); + if (status < 0) { + mlog_errno(status); + goto out_trans; + } + lock_buffer(bh); + ldinfo->dqi_flags = cpu_to_le32(flags | OLQF_CLEAN); + unlock_buffer(bh); + status = ocfs2_journal_dirty(handle, bh); + if (status < 0) + mlog_errno(status); +out_trans: + ocfs2_commit_trans(osb, handle); +out_bh: + brelse(bh); +out_lock: + ocfs2_inode_unlock(lqinode, 1); +out_put: + iput(lqinode); + if (status < 0) + break; + } +out: + mutex_unlock(&sb_dqopt(sb)->dqonoff_mutex); + kfree(rec); + return status; +} + /* Read information header from quota file */ static int ocfs2_local_read_info(struct super_block *sb, int type) { @@ -262,6 +646,7 @@ static int ocfs2_local_read_info(struct super_block *sb, int type) struct inode *lqinode = sb_dqopt(sb)->files[type]; int status; struct buffer_head *bh = NULL; + struct ocfs2_quota_recovery *rec; int locked = 0; info->dqi_maxblimit = 0x7fffffffffffffffLL; @@ -275,6 +660,7 @@ static int ocfs2_local_read_info(struct super_block *sb, int type) info->dqi_priv = oinfo; oinfo->dqi_type = type; INIT_LIST_HEAD(&oinfo->dqi_chunk); + oinfo->dqi_rec = NULL; oinfo->dqi_lqi_bh = NULL; oinfo->dqi_ibh = NULL; @@ -305,10 +691,27 @@ static int ocfs2_local_read_info(struct super_block *sb, int type) oinfo->dqi_ibh = bh; /* We crashed when using local quota file? */ - if (!(info->dqi_flags & OLQF_CLEAN)) - goto out_err; /* So far we just bail out. Later we should resync here */ + if (!(info->dqi_flags & OLQF_CLEAN)) { + rec = OCFS2_SB(sb)->quota_rec; + if (!rec) { + rec = ocfs2_alloc_quota_recovery(); + if (!rec) { + status = -ENOMEM; + mlog_errno(status); + goto out_err; + } + OCFS2_SB(sb)->quota_rec = rec; + } - status = ocfs2_load_local_quota_bitmaps(sb_dqopt(sb)->files[type], + status = ocfs2_recovery_load_quota(lqinode, ldinfo, type, + &rec->r_list[type]); + if (status < 0) { + mlog_errno(status); + goto out_err; + } + } + + status = ocfs2_load_local_quota_bitmaps(lqinode, ldinfo, &oinfo->dqi_chunk); if (status < 0) { @@ -394,6 +797,12 @@ static int ocfs2_local_free_info(struct super_block *sb, int type) } ocfs2_release_local_quota_bitmaps(&oinfo->dqi_chunk); + /* dqonoff_mutex protects us against racing with recovery thread... */ + if (oinfo->dqi_rec) { + ocfs2_free_quota_recovery(oinfo->dqi_rec); + mark_clean = 0; + } + if (!mark_clean) goto out; -- cgit