summaryrefslogtreecommitdiff
path: root/fs/xfs/xfs_log_cil.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/xfs/xfs_log_cil.c')
-rw-r--r--fs/xfs/xfs_log_cil.c1182
1 files changed, 810 insertions, 372 deletions
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c
index 83a039762b81..778ac47adb8c 100644
--- a/fs/xfs/xfs_log_cil.c
+++ b/fs/xfs/xfs_log_cil.c
@@ -16,8 +16,7 @@
#include "xfs_log.h"
#include "xfs_log_priv.h"
#include "xfs_trace.h"
-
-struct workqueue_struct *xfs_discard_wq;
+#include "xfs_discard.h"
/*
* Allocate a new ticket. Failing to get a new ticket makes it really hard to
@@ -37,16 +36,59 @@ xlog_cil_ticket_alloc(
{
struct xlog_ticket *tic;
- tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0);
+ tic = xlog_ticket_alloc(log, 0, 1, 0);
/*
* set the current reservation to zero so we know to steal the basic
* transaction overhead reservation from the first transaction commit.
*/
tic->t_curr_res = 0;
+ tic->t_iclog_hdrs = 0;
return tic;
}
+static inline void
+xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil)
+{
+ struct xlog *log = cil->xc_log;
+
+ atomic_set(&cil->xc_iclog_hdrs,
+ (XLOG_CIL_BLOCKING_SPACE_LIMIT(log) /
+ (log->l_iclog_size - log->l_iclog_hsize)));
+}
+
+/*
+ * Check if the current log item was first committed in this sequence.
+ * We can't rely on just the log item being in the CIL, we have to check
+ * the recorded commit sequence number.
+ *
+ * Note: for this to be used in a non-racy manner, it has to be called with
+ * CIL flushing locked out. As a result, it should only be used during the
+ * transaction commit process when deciding what to format into the item.
+ */
+static bool
+xlog_item_in_current_chkpt(
+ struct xfs_cil *cil,
+ struct xfs_log_item *lip)
+{
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
+ return false;
+
+ /*
+ * li_seq is written on the first commit of a log item to record the
+ * first checkpoint it is written to. Hence if it is different to the
+ * current sequence, we're in a new checkpoint.
+ */
+ return lip->li_seq == READ_ONCE(cil->xc_current_sequence);
+}
+
+bool
+xfs_log_item_in_current_chkpt(
+ struct xfs_log_item *lip)
+{
+ return xlog_item_in_current_chkpt(lip->li_log->l_cilp, lip);
+}
+
/*
* Unavoidable forward declaration - xlog_cil_push_work() calls
* xlog_cil_ctx_alloc() itself.
@@ -58,18 +100,91 @@ xlog_cil_ctx_alloc(void)
{
struct xfs_cil_ctx *ctx;
- ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS);
+ ctx = kzalloc(sizeof(*ctx), GFP_KERNEL | __GFP_NOFAIL);
INIT_LIST_HEAD(&ctx->committing);
- INIT_LIST_HEAD(&ctx->busy_extents);
+ INIT_LIST_HEAD(&ctx->busy_extents.extent_list);
+ INIT_LIST_HEAD(&ctx->log_items);
+ INIT_LIST_HEAD(&ctx->lv_chain);
INIT_WORK(&ctx->push_work, xlog_cil_push_work);
return ctx;
}
+/*
+ * Aggregate the CIL per cpu structures into global counts, lists, etc and
+ * clear the percpu state ready for the next context to use. This is called
+ * from the push code with the context lock held exclusively, hence nothing else
+ * will be accessing or modifying the per-cpu counters.
+ */
+static void
+xlog_cil_push_pcp_aggregate(
+ struct xfs_cil *cil,
+ struct xfs_cil_ctx *ctx)
+{
+ struct xlog_cil_pcp *cilpcp;
+ int cpu;
+
+ for_each_cpu(cpu, &ctx->cil_pcpmask) {
+ cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+
+ ctx->ticket->t_curr_res += cilpcp->space_reserved;
+ cilpcp->space_reserved = 0;
+
+ if (!list_empty(&cilpcp->busy_extents)) {
+ list_splice_init(&cilpcp->busy_extents,
+ &ctx->busy_extents.extent_list);
+ }
+ if (!list_empty(&cilpcp->log_items))
+ list_splice_init(&cilpcp->log_items, &ctx->log_items);
+
+ /*
+ * We're in the middle of switching cil contexts. Reset the
+ * counter we use to detect when the current context is nearing
+ * full.
+ */
+ cilpcp->space_used = 0;
+ }
+}
+
+/*
+ * Aggregate the CIL per-cpu space used counters into the global atomic value.
+ * This is called when the per-cpu counter aggregation will first pass the soft
+ * limit threshold so we can switch to atomic counter aggregation for accurate
+ * detection of hard limit traversal.
+ */
+static void
+xlog_cil_insert_pcp_aggregate(
+ struct xfs_cil *cil,
+ struct xfs_cil_ctx *ctx)
+{
+ int cpu;
+ int count = 0;
+
+ /* Trigger atomic updates then aggregate only for the first caller */
+ if (!test_and_clear_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags))
+ return;
+
+ /*
+ * We can race with other cpus setting cil_pcpmask. However, we've
+ * atomically cleared PCP_SPACE which forces other threads to add to
+ * the global space used count. cil_pcpmask is a superset of cilpcp
+ * structures that could have a nonzero space_used.
+ */
+ for_each_cpu(cpu, &ctx->cil_pcpmask) {
+ struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+
+ count += xchg(&cilpcp->space_used, 0);
+ }
+ atomic_add(count, &ctx->space_used);
+}
+
static void
xlog_cil_ctx_switch(
struct xfs_cil *cil,
struct xfs_cil_ctx *ctx)
{
+ xlog_cil_set_iclog_hdr_count(cil);
+ set_bit(XLOG_CIL_EMPTY, &cil->xc_flags);
+ set_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags);
ctx->sequence = ++cil->xc_current_sequence;
ctx->cil = cil;
cil->xc_ctx = ctx;
@@ -91,6 +206,7 @@ xlog_cil_init_post_recovery(
{
log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
log->l_cilp->xc_ctx->sequence = 1;
+ xlog_cil_set_iclog_hdr_count(log->l_cilp);
}
static inline int
@@ -103,39 +219,6 @@ xlog_cil_iovec_space(
}
/*
- * shadow buffers can be large, so we need to use kvmalloc() here to ensure
- * success. Unfortunately, kvmalloc() only allows GFP_KERNEL contexts to fall
- * back to vmalloc, so we can't actually do anything useful with gfp flags to
- * control the kmalloc() behaviour within kvmalloc(). Hence kmalloc() will do
- * direct reclaim and compaction in the slow path, both of which are
- * horrendously expensive. We just want kmalloc to fail fast and fall back to
- * vmalloc if it can't get somethign straight away from the free lists or buddy
- * allocator. Hence we have to open code kvmalloc outselves here.
- *
- * Also, we are in memalloc_nofs_save task context here, so despite the use of
- * GFP_KERNEL here, we are actually going to be doing GFP_NOFS allocations. This
- * is actually the only way to make vmalloc() do GFP_NOFS allocations, so lets
- * just all pretend this is a GFP_KERNEL context operation....
- */
-static inline void *
-xlog_cil_kvmalloc(
- size_t buf_size)
-{
- gfp_t flags = GFP_KERNEL;
- void *p;
-
- flags &= ~__GFP_DIRECT_RECLAIM;
- flags |= __GFP_NOWARN | __GFP_NORETRY;
- do {
- p = kmalloc(buf_size, flags);
- if (!p)
- p = vmalloc(buf_size);
- } while (!p);
-
- return p;
-}
-
-/*
* Allocate or pin log vector buffers for CIL insertion.
*
* The CIL currently uses disposable buffers for copying a snapshot of the
@@ -192,7 +275,7 @@ xlog_cil_alloc_shadow_bufs(
struct xfs_log_vec *lv;
int niovecs = 0;
int nbytes = 0;
- int buf_size;
+ int alloc_size;
bool ordered = false;
/* Skip items which aren't dirty in this transaction. */
@@ -214,28 +297,33 @@ xlog_cil_alloc_shadow_bufs(
}
/*
- * We 64-bit align the length of each iovec so that the start
- * of the next one is naturally aligned. We'll need to
- * account for that slack space here. Then round nbytes up
- * to 64-bit alignment so that the initial buffer alignment is
- * easy to calculate and verify.
+ * We 64-bit align the length of each iovec so that the start of
+ * the next one is naturally aligned. We'll need to account for
+ * that slack space here.
+ *
+ * We also add the xlog_op_header to each region when
+ * formatting, but that's not accounted to the size of the item
+ * at this point. Hence we'll need an addition number of bytes
+ * for each vector to hold an opheader.
+ *
+ * Then round nbytes up to 64-bit alignment so that the initial
+ * buffer alignment is easy to calculate and verify.
*/
- nbytes += niovecs * sizeof(uint64_t);
- nbytes = round_up(nbytes, sizeof(uint64_t));
+ nbytes = xlog_item_space(niovecs, nbytes);
/*
* The data buffer needs to start 64-bit aligned, so round up
* that space to ensure we can align it appropriately and not
* overrun the buffer.
*/
- buf_size = nbytes + xlog_cil_iovec_space(niovecs);
+ alloc_size = nbytes + xlog_cil_iovec_space(niovecs);
/*
* if we have no shadow buffer, or it is too small, we need to
* reallocate it.
*/
if (!lip->li_lv_shadow ||
- buf_size > lip->li_lv_shadow->lv_size) {
+ alloc_size > lip->li_lv_shadow->lv_alloc_size) {
/*
* We free and allocate here as a realloc would copy
* unnecessary data. We don't use kvzalloc() for the
@@ -243,15 +331,16 @@ xlog_cil_alloc_shadow_bufs(
* the buffer, only the log vector header and the iovec
* storage.
*/
- kmem_free(lip->li_lv_shadow);
- lv = xlog_cil_kvmalloc(buf_size);
+ kvfree(lip->li_lv_shadow);
+ lv = xlog_kvmalloc(alloc_size);
memset(lv, 0, xlog_cil_iovec_space(niovecs));
+ INIT_LIST_HEAD(&lv->lv_list);
lv->lv_item = lip;
- lv->lv_size = buf_size;
+ lv->lv_alloc_size = alloc_size;
if (ordered)
- lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
+ lv->lv_buf_used = XFS_LOG_VEC_ORDERED;
else
lv->lv_iovecp = (struct xfs_log_iovec *)&lv[1];
lip->li_lv_shadow = lv;
@@ -259,11 +348,10 @@ xlog_cil_alloc_shadow_bufs(
/* same or smaller, optimise common overwrite case */
lv = lip->li_lv_shadow;
if (ordered)
- lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
+ lv->lv_buf_used = XFS_LOG_VEC_ORDERED;
else
- lv->lv_buf_len = 0;
+ lv->lv_buf_used = 0;
lv->lv_bytes = 0;
- lv->lv_next = NULL;
}
/* Ensure the lv is set up according to ->iop_size */
@@ -277,40 +365,35 @@ xlog_cil_alloc_shadow_bufs(
/*
* Prepare the log item for insertion into the CIL. Calculate the difference in
- * log space and vectors it will consume, and if it is a new item pin it as
- * well.
+ * log space it will consume, and if it is a new item pin it as well.
*/
STATIC void
xfs_cil_prepare_item(
struct xlog *log,
+ struct xfs_log_item *lip,
struct xfs_log_vec *lv,
- struct xfs_log_vec *old_lv,
- int *diff_len,
- int *diff_iovecs)
+ int *diff_len)
{
/* Account for the new LV being passed in */
- if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) {
+ if (lv->lv_buf_used != XFS_LOG_VEC_ORDERED)
*diff_len += lv->lv_bytes;
- *diff_iovecs += lv->lv_niovecs;
- }
/*
* If there is no old LV, this is the first time we've seen the item in
* this CIL context and so we need to pin it. If we are replacing the
- * old_lv, then remove the space it accounts for and make it the shadow
+ * old lv, then remove the space it accounts for and make it the shadow
* buffer for later freeing. In both cases we are now switching to the
* shadow buffer, so update the pointer to it appropriately.
*/
- if (!old_lv) {
+ if (!lip->li_lv) {
if (lv->lv_item->li_ops->iop_pin)
lv->lv_item->li_ops->iop_pin(lv->lv_item);
lv->lv_item->li_lv_shadow = NULL;
- } else if (old_lv != lv) {
- ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED);
+ } else if (lip->li_lv != lv) {
+ ASSERT(lv->lv_buf_used != XFS_LOG_VEC_ORDERED);
- *diff_len -= old_lv->lv_bytes;
- *diff_iovecs -= old_lv->lv_niovecs;
- lv->lv_item->li_lv_shadow = old_lv;
+ *diff_len -= lip->li_lv->lv_bytes;
+ lv->lv_item->li_lv_shadow = lip->li_lv;
}
/* attach new log vector to log item */
@@ -358,12 +441,10 @@ static void
xlog_cil_insert_format_items(
struct xlog *log,
struct xfs_trans *tp,
- int *diff_len,
- int *diff_iovecs)
+ int *diff_len)
{
struct xfs_log_item *lip;
-
/* Bail out if we didn't find a log item. */
if (list_empty(&tp->t_items)) {
ASSERT(0);
@@ -371,10 +452,8 @@ xlog_cil_insert_format_items(
}
list_for_each_entry(lip, &tp->t_items, li_trans) {
- struct xfs_log_vec *lv;
- struct xfs_log_vec *old_lv = NULL;
- struct xfs_log_vec *shadow;
- bool ordered = false;
+ struct xfs_log_vec *lv = lip->li_lv;
+ struct xfs_log_vec *shadow = lip->li_lv_shadow;
/* Skip items which aren't dirty in this transaction. */
if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
@@ -384,36 +463,35 @@ xlog_cil_insert_format_items(
* The formatting size information is already attached to
* the shadow lv on the log item.
*/
- shadow = lip->li_lv_shadow;
- if (shadow->lv_buf_len == XFS_LOG_VEC_ORDERED)
- ordered = true;
+ if (shadow->lv_buf_used == XFS_LOG_VEC_ORDERED) {
+ if (!lv) {
+ lv = shadow;
+ lv->lv_item = lip;
+ }
+ ASSERT(shadow->lv_alloc_size == lv->lv_alloc_size);
+ xfs_cil_prepare_item(log, lip, lv, diff_len);
+ continue;
+ }
/* Skip items that do not have any vectors for writing */
- if (!shadow->lv_niovecs && !ordered)
+ if (!shadow->lv_niovecs)
continue;
/* compare to existing item size */
- old_lv = lip->li_lv;
- if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) {
+ if (lv && shadow->lv_alloc_size <= lv->lv_alloc_size) {
/* same or smaller, optimise common overwrite case */
- lv = lip->li_lv;
- lv->lv_next = NULL;
-
- if (ordered)
- goto insert;
/*
* set the item up as though it is a new insertion so
* that the space reservation accounting is correct.
*/
- *diff_iovecs -= lv->lv_niovecs;
*diff_len -= lv->lv_bytes;
/* Ensure the lv is set up according to ->iop_size */
lv->lv_niovecs = shadow->lv_niovecs;
/* reset the lv buffer information for new formatting */
- lv->lv_buf_len = 0;
+ lv->lv_buf_used = 0;
lv->lv_bytes = 0;
lv->lv_buf = (char *)lv +
xlog_cil_iovec_space(lv->lv_niovecs);
@@ -421,21 +499,32 @@ xlog_cil_insert_format_items(
/* switch to shadow buffer! */
lv = shadow;
lv->lv_item = lip;
- if (ordered) {
- /* track as an ordered logvec */
- ASSERT(lip->li_lv == NULL);
- goto insert;
- }
}
ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t)));
lip->li_ops->iop_format(lip, lv);
-insert:
- xfs_cil_prepare_item(log, lv, old_lv, diff_len, diff_iovecs);
+ xfs_cil_prepare_item(log, lip, lv, diff_len);
}
}
/*
+ * The use of lockless waitqueue_active() requires that the caller has
+ * serialised itself against the wakeup call in xlog_cil_push_work(). That
+ * can be done by either holding the push lock or the context lock.
+ */
+static inline bool
+xlog_cil_over_hard_limit(
+ struct xlog *log,
+ int32_t space_used)
+{
+ if (waitqueue_active(&log->l_cilp->xc_push_wait))
+ return true;
+ if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log))
+ return true;
+ return false;
+}
+
+/*
* Insert the log items into the CIL and calculate the difference in space
* consumed by the item. Add the space to the checkpoint ticket and calculate
* if the change requires additional log metadata. If it does, take that space
@@ -445,15 +534,18 @@ insert:
static void
xlog_cil_insert_items(
struct xlog *log,
- struct xfs_trans *tp)
+ struct xfs_trans *tp,
+ uint32_t released_space)
{
struct xfs_cil *cil = log->l_cilp;
struct xfs_cil_ctx *ctx = cil->xc_ctx;
struct xfs_log_item *lip;
int len = 0;
- int diff_iovecs = 0;
- int iclog_space;
int iovhdr_res = 0, split_res = 0, ctx_res = 0;
+ int space_used;
+ int order;
+ unsigned int cpu_nr;
+ struct xlog_cil_pcp *cilpcp;
ASSERT(tp);
@@ -461,51 +553,119 @@ xlog_cil_insert_items(
* We can do this safely because the context can't checkpoint until we
* are done so it doesn't matter exactly how we update the CIL.
*/
- xlog_cil_insert_format_items(log, tp, &len, &diff_iovecs);
+ xlog_cil_insert_format_items(log, tp, &len);
- spin_lock(&cil->xc_cil_lock);
+ /*
+ * Subtract the space released by intent cancelation from the space we
+ * consumed so that we remove it from the CIL space and add it back to
+ * the current transaction reservation context.
+ */
+ len -= released_space;
- /* account for space used by new iovec headers */
- iovhdr_res = diff_iovecs * sizeof(xlog_op_header_t);
- len += iovhdr_res;
- ctx->nvecs += diff_iovecs;
+ /*
+ * Grab the per-cpu pointer for the CIL before we start any accounting.
+ * That ensures that we are running with pre-emption disabled and so we
+ * can't be scheduled away between split sample/update operations that
+ * are done without outside locking to serialise them.
+ */
+ cpu_nr = get_cpu();
+ cilpcp = this_cpu_ptr(cil->xc_pcp);
- /* attach the transaction to the CIL if it has any busy extents */
- if (!list_empty(&tp->t_busy))
- list_splice_init(&tp->t_busy, &ctx->busy_extents);
+ /* Tell the future push that there was work added by this CPU. */
+ if (!cpumask_test_cpu(cpu_nr, &ctx->cil_pcpmask))
+ cpumask_test_and_set_cpu(cpu_nr, &ctx->cil_pcpmask);
/*
- * Now transfer enough transaction reservation to the context ticket
- * for the checkpoint. The context ticket is special - the unit
- * reservation has to grow as well as the current reservation as we
- * steal from tickets so we can correctly determine the space used
- * during the transaction commit.
+ * We need to take the CIL checkpoint unit reservation on the first
+ * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't
+ * unnecessarily do an atomic op in the fast path here. We can clear the
+ * XLOG_CIL_EMPTY bit as we are under the xc_ctx_lock here and that
+ * needs to be held exclusively to reset the XLOG_CIL_EMPTY bit.
*/
- if (ctx->ticket->t_curr_res == 0) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) &&
+ test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
ctx_res = ctx->ticket->t_unit_res;
- ctx->ticket->t_curr_res = ctx_res;
- tp->t_ticket->t_curr_res -= ctx_res;
+
+ /*
+ * Check if we need to steal iclog headers. atomic_read() is not a
+ * locked atomic operation, so we can check the value before we do any
+ * real atomic ops in the fast path. If we've already taken the CIL unit
+ * reservation from this commit, we've already got one iclog header
+ * space reserved so we have to account for that otherwise we risk
+ * overrunning the reservation on this ticket.
+ *
+ * If the CIL is already at the hard limit, we might need more header
+ * space that originally reserved. So steal more header space from every
+ * commit that occurs once we are over the hard limit to ensure the CIL
+ * push won't run out of reservation space.
+ *
+ * This can steal more than we need, but that's OK.
+ *
+ * The cil->xc_ctx_lock provides the serialisation necessary for safely
+ * calling xlog_cil_over_hard_limit() in this context.
+ */
+ space_used = atomic_read(&ctx->space_used) + cilpcp->space_used + len;
+ if (atomic_read(&cil->xc_iclog_hdrs) > 0 ||
+ xlog_cil_over_hard_limit(log, space_used)) {
+ split_res = log->l_iclog_hsize +
+ sizeof(struct xlog_op_header);
+ if (ctx_res)
+ ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1);
+ else
+ ctx_res = split_res * tp->t_ticket->t_iclog_hdrs;
+ atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs);
+ }
+ cilpcp->space_reserved += ctx_res;
+
+ /*
+ * Accurately account when over the soft limit, otherwise fold the
+ * percpu count into the global count if over the per-cpu threshold.
+ */
+ if (!test_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) {
+ atomic_add(len, &ctx->space_used);
+ } else if (cilpcp->space_used + len >
+ (XLOG_CIL_SPACE_LIMIT(log) / num_online_cpus())) {
+ space_used = atomic_add_return(cilpcp->space_used + len,
+ &ctx->space_used);
+ cilpcp->space_used = 0;
+
+ /*
+ * If we just transitioned over the soft limit, we need to
+ * transition to the global atomic counter.
+ */
+ if (space_used >= XLOG_CIL_SPACE_LIMIT(log))
+ xlog_cil_insert_pcp_aggregate(cil, ctx);
+ } else {
+ cilpcp->space_used += len;
}
+ /* attach the transaction to the CIL if it has any busy extents */
+ if (!list_empty(&tp->t_busy))
+ list_splice_init(&tp->t_busy, &cilpcp->busy_extents);
- /* do we need space for more log record headers? */
- iclog_space = log->l_iclog_size - log->l_iclog_hsize;
- if (len > 0 && (ctx->space_used / iclog_space !=
- (ctx->space_used + len) / iclog_space)) {
- split_res = (len + iclog_space - 1) / iclog_space;
- /* need to take into account split region headers, too */
- split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
- ctx->ticket->t_unit_res += split_res;
- ctx->ticket->t_curr_res += split_res;
- tp->t_ticket->t_curr_res -= split_res;
- ASSERT(tp->t_ticket->t_curr_res >= len);
+ /*
+ * Now update the order of everything modified in the transaction
+ * and insert items into the CIL if they aren't already there.
+ * We do this here so we only need to take the CIL lock once during
+ * the transaction commit.
+ */
+ order = atomic_inc_return(&ctx->order_id);
+ list_for_each_entry(lip, &tp->t_items, li_trans) {
+ /* Skip items which aren't dirty in this transaction. */
+ if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
+ continue;
+
+ lip->li_order_id = order;
+ if (!list_empty(&lip->li_cil))
+ continue;
+ list_add_tail(&lip->li_cil, &cilpcp->log_items);
}
- tp->t_ticket->t_curr_res -= len;
- ctx->space_used += len;
+ put_cpu();
/*
* If we've overrun the reservation, dump the tx details before we move
* the log items. Shutdown is imminent...
*/
+ tp->t_ticket->t_curr_res -= ctx_res + len;
if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
xfs_warn(log->l_mp, "Transaction log reservation overrun:");
xfs_warn(log->l_mp,
@@ -515,115 +675,197 @@ xlog_cil_insert_items(
split_res);
xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res);
xlog_print_trans(tp);
+ xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
}
+}
- /*
- * Now (re-)position everything modified at the tail of the CIL.
- * We do this here so we only need to take the CIL lock once during
- * the transaction commit.
- */
- list_for_each_entry(lip, &tp->t_items, li_trans) {
-
- /* Skip items which aren't dirty in this transaction. */
- if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
- continue;
+static inline void
+xlog_cil_ail_insert_batch(
+ struct xfs_ail *ailp,
+ struct xfs_ail_cursor *cur,
+ struct xfs_log_item **log_items,
+ int nr_items,
+ xfs_lsn_t commit_lsn)
+{
+ int i;
- /*
- * Only move the item if it isn't already at the tail. This is
- * to prevent a transient list_empty() state when reinserting
- * an item that is already the only item in the CIL.
- */
- if (!list_is_last(&lip->li_cil, &cil->xc_cil))
- list_move_tail(&lip->li_cil, &cil->xc_cil);
- }
+ spin_lock(&ailp->ail_lock);
+ /* xfs_trans_ail_update_bulk drops ailp->ail_lock */
+ xfs_trans_ail_update_bulk(ailp, cur, log_items, nr_items, commit_lsn);
- spin_unlock(&cil->xc_cil_lock);
+ for (i = 0; i < nr_items; i++) {
+ struct xfs_log_item *lip = log_items[i];
- if (tp->t_ticket->t_curr_res < 0)
- xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
+ if (lip->li_ops->iop_unpin)
+ lip->li_ops->iop_unpin(lip, 0);
+ }
}
+/*
+ * Take the checkpoint's log vector chain of items and insert the attached log
+ * items into the AIL. This uses bulk insertion techniques to minimise AIL lock
+ * traffic.
+ *
+ * The AIL tracks log items via the start record LSN of the checkpoint,
+ * not the commit record LSN. This is because we can pipeline multiple
+ * checkpoints, and so the start record of checkpoint N+1 can be
+ * written before the commit record of checkpoint N. i.e:
+ *
+ * start N commit N
+ * +-------------+------------+----------------+
+ * start N+1 commit N+1
+ *
+ * The tail of the log cannot be moved to the LSN of commit N when all
+ * the items of that checkpoint are written back, because then the
+ * start record for N+1 is no longer in the active portion of the log
+ * and recovery will fail/corrupt the filesystem.
+ *
+ * Hence when all the log items in checkpoint N are written back, the
+ * tail of the log most now only move as far forwards as the start LSN
+ * of checkpoint N+1.
+ *
+ * If we are called with the aborted flag set, it is because a log write during
+ * a CIL checkpoint commit has failed. In this case, all the items in the
+ * checkpoint have already gone through iop_committed and iop_committing, which
+ * means that checkpoint commit abort handling is treated exactly the same as an
+ * iclog write error even though we haven't started any IO yet. Hence in this
+ * case all we need to do is iop_committed processing, followed by an
+ * iop_unpin(aborted) call.
+ *
+ * The AIL cursor is used to optimise the insert process. If commit_lsn is not
+ * at the end of the AIL, the insert cursor avoids the need to walk the AIL to
+ * find the insertion point on every xfs_log_item_batch_insert() call. This
+ * saves a lot of needless list walking and is a net win, even though it
+ * slightly increases that amount of AIL lock traffic to set it up and tear it
+ * down.
+ */
static void
-xlog_cil_free_logvec(
- struct xfs_log_vec *log_vector)
+xlog_cil_ail_insert(
+ struct xfs_cil_ctx *ctx,
+ bool aborted)
{
+#define LOG_ITEM_BATCH_SIZE 32
+ struct xfs_ail *ailp = ctx->cil->xc_log->l_ailp;
+ struct xfs_log_item *log_items[LOG_ITEM_BATCH_SIZE];
struct xfs_log_vec *lv;
+ struct xfs_ail_cursor cur;
+ xfs_lsn_t old_head;
+ int i = 0;
- for (lv = log_vector; lv; ) {
- struct xfs_log_vec *next = lv->lv_next;
- kmem_free(lv);
- lv = next;
- }
-}
+ /*
+ * Update the AIL head LSN with the commit record LSN of this
+ * checkpoint. As iclogs are always completed in order, this should
+ * always be the same (as iclogs can contain multiple commit records) or
+ * higher LSN than the current head. We do this before insertion of the
+ * items so that log space checks during insertion will reflect the
+ * space that this checkpoint has already consumed. We call
+ * xfs_ail_update_finish() so that tail space and space-based wakeups
+ * will be recalculated appropriately.
+ */
+ ASSERT(XFS_LSN_CMP(ctx->commit_lsn, ailp->ail_head_lsn) >= 0 ||
+ aborted);
+ spin_lock(&ailp->ail_lock);
+ xfs_trans_ail_cursor_last(ailp, &cur, ctx->start_lsn);
+ old_head = ailp->ail_head_lsn;
+ ailp->ail_head_lsn = ctx->commit_lsn;
+ /* xfs_ail_update_finish() drops the ail_lock */
+ xfs_ail_update_finish(ailp, NULLCOMMITLSN);
-static void
-xlog_discard_endio_work(
- struct work_struct *work)
-{
- struct xfs_cil_ctx *ctx =
- container_of(work, struct xfs_cil_ctx, discard_endio_work);
- struct xfs_mount *mp = ctx->cil->xc_log->l_mp;
+ /*
+ * We move the AIL head forwards to account for the space used in the
+ * log before we remove that space from the grant heads. This prevents a
+ * transient condition where reservation space appears to become
+ * available on return, only for it to disappear again immediately as
+ * the AIL head update accounts in the log tail space.
+ */
+ smp_wmb(); /* paired with smp_rmb in xlog_grant_space_left */
+ xlog_grant_return_space(ailp->ail_log, old_head, ailp->ail_head_lsn);
- xfs_extent_busy_clear(mp, &ctx->busy_extents, false);
- kmem_free(ctx);
-}
+ /* unpin all the log items */
+ list_for_each_entry(lv, &ctx->lv_chain, lv_list) {
+ struct xfs_log_item *lip = lv->lv_item;
+ xfs_lsn_t item_lsn;
-/*
- * Queue up the actual completion to a thread to avoid IRQ-safe locking for
- * pagb_lock. Note that we need a unbounded workqueue, otherwise we might
- * get the execution delayed up to 30 seconds for weird reasons.
- */
-static void
-xlog_discard_endio(
- struct bio *bio)
-{
- struct xfs_cil_ctx *ctx = bio->bi_private;
+ if (aborted) {
+ trace_xlog_ail_insert_abort(lip);
+ set_bit(XFS_LI_ABORTED, &lip->li_flags);
+ }
- INIT_WORK(&ctx->discard_endio_work, xlog_discard_endio_work);
- queue_work(xfs_discard_wq, &ctx->discard_endio_work);
- bio_put(bio);
-}
+ if (lip->li_ops->flags & XFS_ITEM_RELEASE_WHEN_COMMITTED) {
+ lip->li_ops->iop_release(lip);
+ continue;
+ }
-static void
-xlog_discard_busy_extents(
- struct xfs_mount *mp,
- struct xfs_cil_ctx *ctx)
-{
- struct list_head *list = &ctx->busy_extents;
- struct xfs_extent_busy *busyp;
- struct bio *bio = NULL;
- struct blk_plug plug;
- int error = 0;
+ if (lip->li_ops->iop_committed)
+ item_lsn = lip->li_ops->iop_committed(lip,
+ ctx->start_lsn);
+ else
+ item_lsn = ctx->start_lsn;
- ASSERT(xfs_has_discard(mp));
-
- blk_start_plug(&plug);
- list_for_each_entry(busyp, list, list) {
- trace_xfs_discard_extent(mp, busyp->agno, busyp->bno,
- busyp->length);
-
- error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev,
- XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno),
- XFS_FSB_TO_BB(mp, busyp->length),
- GFP_NOFS, 0, &bio);
- if (error && error != -EOPNOTSUPP) {
- xfs_info(mp,
- "discard failed for extent [0x%llx,%u], error %d",
- (unsigned long long)busyp->bno,
- busyp->length,
- error);
- break;
+ /* item_lsn of -1 means the item needs no further processing */
+ if (XFS_LSN_CMP(item_lsn, (xfs_lsn_t)-1) == 0)
+ continue;
+
+ /*
+ * if we are aborting the operation, no point in inserting the
+ * object into the AIL as we are in a shutdown situation.
+ */
+ if (aborted) {
+ ASSERT(xlog_is_shutdown(ailp->ail_log));
+ if (lip->li_ops->iop_unpin)
+ lip->li_ops->iop_unpin(lip, 1);
+ continue;
+ }
+
+ if (item_lsn != ctx->start_lsn) {
+
+ /*
+ * Not a bulk update option due to unusual item_lsn.
+ * Push into AIL immediately, rechecking the lsn once
+ * we have the ail lock. Then unpin the item. This does
+ * not affect the AIL cursor the bulk insert path is
+ * using.
+ */
+ spin_lock(&ailp->ail_lock);
+ if (XFS_LSN_CMP(item_lsn, lip->li_lsn) > 0)
+ xfs_trans_ail_update(ailp, lip, item_lsn);
+ else
+ spin_unlock(&ailp->ail_lock);
+ if (lip->li_ops->iop_unpin)
+ lip->li_ops->iop_unpin(lip, 0);
+ continue;
+ }
+
+ /* Item is a candidate for bulk AIL insert. */
+ log_items[i++] = lv->lv_item;
+ if (i >= LOG_ITEM_BATCH_SIZE) {
+ xlog_cil_ail_insert_batch(ailp, &cur, log_items,
+ LOG_ITEM_BATCH_SIZE, ctx->start_lsn);
+ i = 0;
}
}
- if (bio) {
- bio->bi_private = ctx;
- bio->bi_end_io = xlog_discard_endio;
- submit_bio(bio);
- } else {
- xlog_discard_endio_work(&ctx->discard_endio_work);
+ /* make sure we insert the remainder! */
+ if (i)
+ xlog_cil_ail_insert_batch(ailp, &cur, log_items, i,
+ ctx->start_lsn);
+
+ spin_lock(&ailp->ail_lock);
+ xfs_trans_ail_cursor_done(&cur);
+ spin_unlock(&ailp->ail_lock);
+}
+
+static void
+xlog_cil_free_logvec(
+ struct list_head *lv_chain)
+{
+ struct xfs_log_vec *lv;
+
+ while (!list_empty(lv_chain)) {
+ lv = list_first_entry(lv_chain, struct xfs_log_vec, lv_list);
+ list_del_init(&lv->lv_list);
+ kvfree(lv);
}
- blk_finish_plug(&plug);
}
/*
@@ -652,23 +894,25 @@ xlog_cil_committed(
spin_unlock(&ctx->cil->xc_push_lock);
}
- xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain,
- ctx->start_lsn, abort);
+ xlog_cil_ail_insert(ctx, abort);
- xfs_extent_busy_sort(&ctx->busy_extents);
- xfs_extent_busy_clear(mp, &ctx->busy_extents,
+ xfs_extent_busy_sort(&ctx->busy_extents.extent_list);
+ xfs_extent_busy_clear(&ctx->busy_extents.extent_list,
xfs_has_discard(mp) && !abort);
spin_lock(&ctx->cil->xc_push_lock);
list_del(&ctx->committing);
spin_unlock(&ctx->cil->xc_push_lock);
- xlog_cil_free_logvec(ctx->lv_chain);
+ xlog_cil_free_logvec(&ctx->lv_chain);
- if (!list_empty(&ctx->busy_extents))
- xlog_discard_busy_extents(mp, ctx);
- else
- kmem_free(ctx);
+ if (!list_empty(&ctx->busy_extents.extent_list)) {
+ ctx->busy_extents.owner = ctx;
+ xfs_discard_extents(mp, &ctx->busy_extents);
+ return;
+ }
+
+ kfree(ctx);
}
void
@@ -696,7 +940,7 @@ xlog_cil_set_ctx_write_state(
struct xlog_in_core *iclog)
{
struct xfs_cil *cil = ctx->cil;
- xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header.h_lsn);
+ xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header->h_lsn);
ASSERT(!ctx->commit_lsn);
if (!ctx->start_lsn) {
@@ -705,11 +949,21 @@ xlog_cil_set_ctx_write_state(
* The LSN we need to pass to the log items on transaction
* commit is the LSN reported by the first log vector write, not
* the commit lsn. If we use the commit record lsn then we can
- * move the tail beyond the grant write head.
+ * move the grant write head beyond the tail LSN and overwrite
+ * it.
*/
ctx->start_lsn = lsn;
wake_up_all(&cil->xc_start_wait);
spin_unlock(&cil->xc_push_lock);
+
+ /*
+ * Make sure the metadata we are about to overwrite in the log
+ * has been flushed to stable storage before this iclog is
+ * issued.
+ */
+ spin_lock(&cil->xc_log->l_icloglock);
+ iclog->ic_flags |= XLOG_ICL_NEED_FLUSH;
+ spin_unlock(&cil->xc_log->l_icloglock);
return;
}
@@ -812,7 +1066,7 @@ restart:
static int
xlog_cil_write_chain(
struct xfs_cil_ctx *ctx,
- struct xfs_log_vec *chain)
+ uint32_t chain_len)
{
struct xlog *log = ctx->cil->xc_log;
int error;
@@ -820,7 +1074,7 @@ xlog_cil_write_chain(
error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD);
if (error)
return error;
- return xlog_write(log, ctx, chain, ctx->ticket, XLOG_START_TRANS);
+ return xlog_write(log, ctx, &ctx->lv_chain, ctx->ticket, chain_len);
}
/*
@@ -834,9 +1088,14 @@ xlog_cil_write_commit_record(
struct xfs_cil_ctx *ctx)
{
struct xlog *log = ctx->cil->xc_log;
+ struct xlog_op_header ophdr = {
+ .oh_clientid = XFS_TRANSACTION,
+ .oh_tid = cpu_to_be32(ctx->ticket->t_tid),
+ .oh_flags = XLOG_COMMIT_TRANS,
+ };
struct xfs_log_iovec reg = {
- .i_addr = NULL,
- .i_len = 0,
+ .i_addr = &ophdr,
+ .i_len = sizeof(struct xlog_op_header),
.i_type = XLOG_REG_TYPE_COMMIT,
};
struct xfs_log_vec vec = {
@@ -844,6 +1103,8 @@ xlog_cil_write_commit_record(
.lv_iovecp = &reg,
};
int error;
+ LIST_HEAD(lv_chain);
+ list_add(&vec.lv_list, &lv_chain);
if (xlog_is_shutdown(log))
return -EIO;
@@ -852,12 +1113,155 @@ xlog_cil_write_commit_record(
if (error)
return error;
- error = xlog_write(log, ctx, &vec, ctx->ticket, XLOG_COMMIT_TRANS);
+ /* account for space used by record data */
+ ctx->ticket->t_curr_res -= reg.i_len;
+ error = xlog_write(log, ctx, &lv_chain, ctx->ticket, reg.i_len);
if (error)
- xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
+ xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
return error;
}
+struct xlog_cil_trans_hdr {
+ struct xlog_op_header oph[2];
+ struct xfs_trans_header thdr;
+ struct xfs_log_iovec lhdr[2];
+};
+
+/*
+ * Build a checkpoint transaction header to begin the journal transaction. We
+ * need to account for the space used by the transaction header here as it is
+ * not accounted for in xlog_write().
+ *
+ * This is the only place we write a transaction header, so we also build the
+ * log opheaders that indicate the start of a log transaction and wrap the
+ * transaction header. We keep the start record in it's own log vector rather
+ * than compacting them into a single region as this ends up making the logic
+ * in xlog_write() for handling empty opheaders for start, commit and unmount
+ * records much simpler.
+ */
+static void
+xlog_cil_build_trans_hdr(
+ struct xfs_cil_ctx *ctx,
+ struct xlog_cil_trans_hdr *hdr,
+ struct xfs_log_vec *lvhdr,
+ int num_iovecs)
+{
+ struct xlog_ticket *tic = ctx->ticket;
+ __be32 tid = cpu_to_be32(tic->t_tid);
+
+ memset(hdr, 0, sizeof(*hdr));
+
+ /* Log start record */
+ hdr->oph[0].oh_tid = tid;
+ hdr->oph[0].oh_clientid = XFS_TRANSACTION;
+ hdr->oph[0].oh_flags = XLOG_START_TRANS;
+
+ /* log iovec region pointer */
+ hdr->lhdr[0].i_addr = &hdr->oph[0];
+ hdr->lhdr[0].i_len = sizeof(struct xlog_op_header);
+ hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER;
+
+ /* log opheader */
+ hdr->oph[1].oh_tid = tid;
+ hdr->oph[1].oh_clientid = XFS_TRANSACTION;
+ hdr->oph[1].oh_len = cpu_to_be32(sizeof(struct xfs_trans_header));
+
+ /* transaction header in host byte order format */
+ hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
+ hdr->thdr.th_type = XFS_TRANS_CHECKPOINT;
+ hdr->thdr.th_tid = tic->t_tid;
+ hdr->thdr.th_num_items = num_iovecs;
+
+ /* log iovec region pointer */
+ hdr->lhdr[1].i_addr = &hdr->oph[1];
+ hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) +
+ sizeof(struct xfs_trans_header);
+ hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR;
+
+ lvhdr->lv_niovecs = 2;
+ lvhdr->lv_iovecp = &hdr->lhdr[0];
+ lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len;
+
+ tic->t_curr_res -= lvhdr->lv_bytes;
+}
+
+/*
+ * CIL item reordering compare function. We want to order in ascending ID order,
+ * but we want to leave items with the same ID in the order they were added to
+ * the list. This is important for operations like reflink where we log 4 order
+ * dependent intents in a single transaction when we overwrite an existing
+ * shared extent with a new shared extent. i.e. BUI(unmap), CUI(drop),
+ * CUI (inc), BUI(remap)...
+ */
+static int
+xlog_cil_order_cmp(
+ void *priv,
+ const struct list_head *a,
+ const struct list_head *b)
+{
+ struct xfs_log_vec *l1 = container_of(a, struct xfs_log_vec, lv_list);
+ struct xfs_log_vec *l2 = container_of(b, struct xfs_log_vec, lv_list);
+
+ return l1->lv_order_id > l2->lv_order_id;
+}
+
+/*
+ * Pull all the log vectors off the items in the CIL, and remove the items from
+ * the CIL. We don't need the CIL lock here because it's only needed on the
+ * transaction commit side which is currently locked out by the flush lock.
+ *
+ * If a log item is marked with a whiteout, we do not need to write it to the
+ * journal and so we just move them to the whiteout list for the caller to
+ * dispose of appropriately.
+ */
+static void
+xlog_cil_build_lv_chain(
+ struct xfs_cil_ctx *ctx,
+ struct list_head *whiteouts,
+ uint32_t *num_iovecs,
+ uint32_t *num_bytes)
+{
+ while (!list_empty(&ctx->log_items)) {
+ struct xfs_log_item *item;
+ struct xfs_log_vec *lv;
+
+ item = list_first_entry(&ctx->log_items,
+ struct xfs_log_item, li_cil);
+
+ if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) {
+ list_move(&item->li_cil, whiteouts);
+ trace_xfs_cil_whiteout_skip(item);
+ continue;
+ }
+
+ lv = item->li_lv;
+ lv->lv_order_id = item->li_order_id;
+
+ /* we don't write ordered log vectors */
+ if (lv->lv_buf_used != XFS_LOG_VEC_ORDERED)
+ *num_bytes += lv->lv_bytes;
+ *num_iovecs += lv->lv_niovecs;
+ list_add_tail(&lv->lv_list, &ctx->lv_chain);
+
+ list_del_init(&item->li_cil);
+ item->li_order_id = 0;
+ item->li_lv = NULL;
+ }
+}
+
+static void
+xlog_cil_cleanup_whiteouts(
+ struct list_head *whiteouts)
+{
+ while (!list_empty(whiteouts)) {
+ struct xfs_log_item *item = list_first_entry(whiteouts,
+ struct xfs_log_item, li_cil);
+ list_del_init(&item->li_cil);
+ trace_xfs_cil_whiteout_unpin(item);
+ item->li_ops->iop_unpin(item, 1);
+ }
+}
+
/*
* Push the Committed Item List to the log.
*
@@ -871,28 +1275,32 @@ xlog_cil_write_commit_record(
* same sequence twice. If we get a race between multiple pushes for the same
* sequence they will block on the first one and then abort, hence avoiding
* needless pushes.
+ *
+ * This runs from a workqueue so it does not inherent any specific memory
+ * allocation context. However, we do not want to block on memory reclaim
+ * recursing back into the filesystem because this push may have been triggered
+ * by memory reclaim itself. Hence we really need to run under full GFP_NOFS
+ * contraints here.
*/
static void
xlog_cil_push_work(
struct work_struct *work)
{
+ unsigned int nofs_flags = memalloc_nofs_save();
struct xfs_cil_ctx *ctx =
container_of(work, struct xfs_cil_ctx, push_work);
struct xfs_cil *cil = ctx->cil;
struct xlog *log = cil->xc_log;
- struct xfs_log_vec *lv;
struct xfs_cil_ctx *new_ctx;
- struct xlog_ticket *tic;
- int num_iovecs;
+ int num_iovecs = 0;
+ int num_bytes = 0;
int error = 0;
- struct xfs_trans_header thdr;
- struct xfs_log_iovec lhdr;
- struct xfs_log_vec lvhdr = { NULL };
- xfs_lsn_t preflush_tail_lsn;
+ struct xlog_cil_trans_hdr thdr;
+ struct xfs_log_vec lvhdr = {};
xfs_csn_t push_seq;
- struct bio bio;
- DECLARE_COMPLETION_ONSTACK(bdev_flush);
bool push_commit_stable;
+ LIST_HEAD (whiteouts);
+ struct xlog_ticket *ticket;
new_ctx = xlog_cil_ctx_alloc();
new_ctx->ticket = xlog_cil_ticket_alloc(log);
@@ -916,12 +1324,14 @@ xlog_cil_push_work(
if (waitqueue_active(&cil->xc_push_wait))
wake_up_all(&cil->xc_push_wait);
+ xlog_cil_push_pcp_aggregate(cil, ctx);
+
/*
* Check if we've anything to push. If there is nothing, then we don't
* move on to a new sequence number and so we have to be able to push
* this sequence again later.
*/
- if (list_empty(&cil->xc_cil)) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
cil->xc_push_seq = 0;
spin_unlock(&cil->xc_push_lock);
goto out_skip;
@@ -961,45 +1371,7 @@ xlog_cil_push_work(
list_add(&ctx->committing, &cil->xc_committing);
spin_unlock(&cil->xc_push_lock);
- /*
- * The CIL is stable at this point - nothing new will be added to it
- * because we hold the flush lock exclusively. Hence we can now issue
- * a cache flush to ensure all the completed metadata in the journal we
- * are about to overwrite is on stable storage.
- *
- * Because we are issuing this cache flush before we've written the
- * tail lsn to the iclog, we can have metadata IO completions move the
- * tail forwards between the completion of this flush and the iclog
- * being written. In this case, we need to re-issue the cache flush
- * before the iclog write. To detect whether the log tail moves, sample
- * the tail LSN *before* we issue the flush.
- */
- preflush_tail_lsn = atomic64_read(&log->l_tail_lsn);
- xfs_flush_bdev_async(&bio, log->l_mp->m_ddev_targp->bt_bdev,
- &bdev_flush);
-
- /*
- * Pull all the log vectors off the items in the CIL, and remove the
- * items from the CIL. We don't need the CIL lock here because it's only
- * needed on the transaction commit side which is currently locked out
- * by the flush lock.
- */
- lv = NULL;
- num_iovecs = 0;
- while (!list_empty(&cil->xc_cil)) {
- struct xfs_log_item *item;
-
- item = list_first_entry(&cil->xc_cil,
- struct xfs_log_item, li_cil);
- list_del_init(&item->li_cil);
- if (!ctx->lv_chain)
- ctx->lv_chain = item->li_lv;
- else
- lv->lv_next = item->li_lv;
- lv = item->li_lv;
- item->li_lv = NULL;
- num_iovecs += lv->lv_niovecs;
- }
+ xlog_cil_build_lv_chain(ctx, &whiteouts, &num_iovecs, &num_bytes);
/*
* Switch the contexts so we can drop the context lock and move out
@@ -1032,35 +1404,30 @@ xlog_cil_push_work(
up_write(&cil->xc_ctx_lock);
/*
+ * Sort the log vector chain before we add the transaction headers.
+ * This ensures we always have the transaction headers at the start
+ * of the chain.
+ */
+ list_sort(NULL, &ctx->lv_chain, xlog_cil_order_cmp);
+
+ /*
* Build a checkpoint transaction header and write it to the log to
* begin the transaction. We need to account for the space used by the
* transaction header here as it is not accounted for in xlog_write().
- *
- * The LSN we need to pass to the log items on transaction commit is
- * the LSN reported by the first log vector write. If we use the commit
- * record lsn then we can move the tail beyond the grant write head.
+ * Add the lvhdr to the head of the lv chain we pass to xlog_write() so
+ * it gets written into the iclog first.
*/
- tic = ctx->ticket;
- thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
- thdr.th_type = XFS_TRANS_CHECKPOINT;
- thdr.th_tid = tic->t_tid;
- thdr.th_num_items = num_iovecs;
- lhdr.i_addr = &thdr;
- lhdr.i_len = sizeof(xfs_trans_header_t);
- lhdr.i_type = XLOG_REG_TYPE_TRANSHDR;
- tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t);
-
- lvhdr.lv_niovecs = 1;
- lvhdr.lv_iovecp = &lhdr;
- lvhdr.lv_next = ctx->lv_chain;
+ xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs);
+ num_bytes += lvhdr.lv_bytes;
+ list_add(&lvhdr.lv_list, &ctx->lv_chain);
/*
- * Before we format and submit the first iclog, we have to ensure that
- * the metadata writeback ordering cache flush is complete.
+ * Take the lvhdr back off the lv_chain immediately after calling
+ * xlog_cil_write_chain() as it should not be passed to log IO
+ * completion.
*/
- wait_for_completion(&bdev_flush);
-
- error = xlog_cil_write_chain(ctx, &lvhdr);
+ error = xlog_cil_write_chain(ctx, num_bytes);
+ list_del(&lvhdr.lv_list);
if (error)
goto out_abort_free_ticket;
@@ -1068,7 +1435,14 @@ xlog_cil_push_work(
if (error)
goto out_abort_free_ticket;
- xfs_log_ticket_ungrant(log, tic);
+ /*
+ * Grab the ticket from the ctx so we can ungrant it after releasing the
+ * commit_iclog. The ctx may be freed by the time we return from
+ * releasing the commit_iclog (i.e. checkpoint has been completed and
+ * callback run) so we can't reference the ctx after the call to
+ * xlog_state_release_iclog().
+ */
+ ticket = ctx->ticket;
/*
* If the checkpoint spans multiple iclogs, wait for all previous iclogs
@@ -1084,9 +1458,9 @@ xlog_cil_push_work(
*/
spin_lock(&log->l_icloglock);
if (ctx->start_lsn != ctx->commit_lsn) {
- xfs_lsn_t plsn;
+ xfs_lsn_t plsn = be64_to_cpu(
+ ctx->commit_iclog->ic_prev->ic_header->h_lsn);
- plsn = be64_to_cpu(ctx->commit_iclog->ic_prev->ic_header.h_lsn);
if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) {
/*
* Waiting on ic_force_wait orders the completion of
@@ -1118,30 +1492,40 @@ xlog_cil_push_work(
if (push_commit_stable &&
ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE)
xlog_state_switch_iclogs(log, ctx->commit_iclog, 0);
- xlog_state_release_iclog(log, ctx->commit_iclog, preflush_tail_lsn);
+ ticket = ctx->ticket;
+ xlog_state_release_iclog(log, ctx->commit_iclog, ticket);
/* Not safe to reference ctx now! */
spin_unlock(&log->l_icloglock);
+ xlog_cil_cleanup_whiteouts(&whiteouts);
+ xfs_log_ticket_ungrant(log, ticket);
+ memalloc_nofs_restore(nofs_flags);
return;
out_skip:
up_write(&cil->xc_ctx_lock);
xfs_log_ticket_put(new_ctx->ticket);
- kmem_free(new_ctx);
+ kfree(new_ctx);
+ memalloc_nofs_restore(nofs_flags);
return;
out_abort_free_ticket:
- xfs_log_ticket_ungrant(log, tic);
ASSERT(xlog_is_shutdown(log));
+ xlog_cil_cleanup_whiteouts(&whiteouts);
if (!ctx->commit_iclog) {
+ xfs_log_ticket_ungrant(log, ctx->ticket);
xlog_cil_committed(ctx);
+ memalloc_nofs_restore(nofs_flags);
return;
}
spin_lock(&log->l_icloglock);
- xlog_state_release_iclog(log, ctx->commit_iclog, 0);
+ ticket = ctx->ticket;
+ xlog_state_release_iclog(log, ctx->commit_iclog, ticket);
/* Not safe to reference ctx now! */
spin_unlock(&log->l_icloglock);
+ xfs_log_ticket_ungrant(log, ticket);
+ memalloc_nofs_restore(nofs_flags);
}
/*
@@ -1153,21 +1537,30 @@ out_abort_free_ticket:
*/
static void
xlog_cil_push_background(
- struct xlog *log) __releases(cil->xc_ctx_lock)
+ struct xlog *log)
{
struct xfs_cil *cil = log->l_cilp;
+ int space_used = atomic_read(&cil->xc_ctx->space_used);
/*
* The cil won't be empty because we are called while holding the
- * context lock so whatever we added to the CIL will still be there
+ * context lock so whatever we added to the CIL will still be there.
*/
- ASSERT(!list_empty(&cil->xc_cil));
+ ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
/*
- * Don't do a background push if we haven't used up all the
- * space available yet.
+ * We are done if:
+ * - we haven't used up all the space available yet; or
+ * - we've already queued up a push; and
+ * - we're not over the hard limit; and
+ * - nothing has been over the hard limit.
+ *
+ * If so, we don't need to take the push lock as there's nothing to do.
*/
- if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) {
+ if (space_used < XLOG_CIL_SPACE_LIMIT(log) ||
+ (cil->xc_push_seq == cil->xc_current_sequence &&
+ space_used < XLOG_CIL_BLOCKING_SPACE_LIMIT(log) &&
+ !waitqueue_active(&cil->xc_push_wait))) {
up_read(&cil->xc_ctx_lock);
return;
}
@@ -1194,12 +1587,11 @@ xlog_cil_push_background(
* dipping back down under the hard limit.
*
* The ctx->xc_push_lock provides the serialisation necessary for safely
- * using the lockless waitqueue_active() check in this context.
+ * calling xlog_cil_over_hard_limit() in this context.
*/
- if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) ||
- waitqueue_active(&cil->xc_push_wait)) {
+ if (xlog_cil_over_hard_limit(log, space_used)) {
trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket);
- ASSERT(cil->xc_ctx->space_used < log->l_logsize);
+ ASSERT(space_used < log->l_logsize);
xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock);
return;
}
@@ -1243,18 +1635,28 @@ xlog_cil_push_now(
if (!async)
flush_workqueue(cil->xc_push_wq);
+ spin_lock(&cil->xc_push_lock);
+
+ /*
+ * If this is an async flush request, we always need to set the
+ * xc_push_commit_stable flag even if something else has already queued
+ * a push. The flush caller is asking for the CIL to be on stable
+ * storage when the next push completes, so regardless of who has queued
+ * the push, the flush requires stable semantics from it.
+ */
+ cil->xc_push_commit_stable = async;
+
/*
* If the CIL is empty or we've already pushed the sequence then
- * there's no work we need to do.
+ * there's no more work that we need to do.
*/
- spin_lock(&cil->xc_push_lock);
- if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) ||
+ push_seq <= cil->xc_push_seq) {
spin_unlock(&cil->xc_push_lock);
return;
}
cil->xc_push_seq = push_seq;
- cil->xc_push_commit_stable = async;
queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work);
spin_unlock(&cil->xc_push_lock);
}
@@ -1267,13 +1669,50 @@ xlog_cil_empty(
bool empty = false;
spin_lock(&cil->xc_push_lock);
- if (list_empty(&cil->xc_cil))
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
empty = true;
spin_unlock(&cil->xc_push_lock);
return empty;
}
/*
+ * If there are intent done items in this transaction and the related intent was
+ * committed in the current (same) CIL checkpoint, we don't need to write either
+ * the intent or intent done item to the journal as the change will be
+ * journalled atomically within this checkpoint. As we cannot remove items from
+ * the CIL here, mark the related intent with a whiteout so that the CIL push
+ * can remove it rather than writing it to the journal. Then remove the intent
+ * done item from the current transaction and release it so it doesn't get put
+ * into the CIL at all.
+ */
+static uint32_t
+xlog_cil_process_intents(
+ struct xfs_cil *cil,
+ struct xfs_trans *tp)
+{
+ struct xfs_log_item *lip, *ilip, *next;
+ uint32_t len = 0;
+
+ list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) {
+ if (!(lip->li_ops->flags & XFS_ITEM_INTENT_DONE))
+ continue;
+
+ ilip = lip->li_ops->iop_intent(lip);
+ if (!ilip || !xlog_item_in_current_chkpt(cil, ilip))
+ continue;
+ set_bit(XFS_LI_WHITEOUT, &ilip->li_flags);
+ trace_xfs_cil_whiteout_mark(ilip);
+ len += ilip->li_lv->lv_bytes;
+ kvfree(ilip->li_lv);
+ ilip->li_lv = NULL;
+
+ xfs_trans_del_item(lip);
+ lip->li_ops->iop_release(lip);
+ }
+ return len;
+}
+
+/*
* Commit a transaction with the given vector to the Committed Item List.
*
* To do this, we need to format the item, pin it in memory if required and
@@ -1295,6 +1734,7 @@ xlog_cil_commit(
{
struct xfs_cil *cil = log->l_cilp;
struct xfs_log_item *lip, *next;
+ uint32_t released_space = 0;
/*
* Do all necessary memory allocation before we lock the CIL.
@@ -1306,7 +1746,10 @@ xlog_cil_commit(
/* lock out background commit */
down_read(&cil->xc_ctx_lock);
- xlog_cil_insert_items(log, tp);
+ if (tp->t_flags & XFS_TRANS_HAS_INTENT_DONE)
+ released_space = xlog_cil_process_intents(cil, tp);
+
+ xlog_cil_insert_items(log, tp, released_space);
if (regrant && !xlog_is_shutdown(log))
xfs_log_ticket_regrant(log, tp->t_ticket);
@@ -1352,6 +1795,13 @@ xlog_cil_flush(
trace_xfs_log_force(log->l_mp, seq, _RET_IP_);
xlog_cil_push_now(log, seq, true);
+
+ /*
+ * If the CIL is empty, make sure that any previous checkpoint that may
+ * still be in an active iclog is pushed to stable storage.
+ */
+ if (test_bit(XLOG_CIL_EMPTY, &log->l_cilp->xc_flags))
+ xfs_log_force(log->l_mp, 0);
}
/*
@@ -1435,7 +1885,7 @@ restart:
* we would have found the context on the committing list.
*/
if (sequence == cil->xc_current_sequence &&
- !list_empty(&cil->xc_cil)) {
+ !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
spin_unlock(&cil->xc_push_lock);
goto restart;
}
@@ -1456,42 +1906,18 @@ out_shutdown:
}
/*
- * Check if the current log item was first committed in this sequence.
- * We can't rely on just the log item being in the CIL, we have to check
- * the recorded commit sequence number.
- *
- * Note: for this to be used in a non-racy manner, it has to be called with
- * CIL flushing locked out. As a result, it should only be used during the
- * transaction commit process when deciding what to format into the item.
- */
-bool
-xfs_log_item_in_current_chkpt(
- struct xfs_log_item *lip)
-{
- struct xfs_cil *cil = lip->li_mountp->m_log->l_cilp;
-
- if (list_empty(&lip->li_cil))
- return false;
-
- /*
- * li_seq is written on the first commit of a log item to record the
- * first checkpoint it is written to. Hence if it is different to the
- * current sequence, we're in a new checkpoint.
- */
- return lip->li_seq == READ_ONCE(cil->xc_current_sequence);
-}
-
-/*
* Perform initial CIL structure initialisation.
*/
int
xlog_cil_init(
- struct xlog *log)
+ struct xlog *log)
{
- struct xfs_cil *cil;
- struct xfs_cil_ctx *ctx;
+ struct xfs_cil *cil;
+ struct xfs_cil_ctx *ctx;
+ struct xlog_cil_pcp *cilpcp;
+ int cpu;
- cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL);
+ cil = kzalloc(sizeof(*cil), GFP_KERNEL | __GFP_RETRY_MAYFAIL);
if (!cil)
return -ENOMEM;
/*
@@ -1504,24 +1930,33 @@ xlog_cil_init(
if (!cil->xc_push_wq)
goto out_destroy_cil;
- INIT_LIST_HEAD(&cil->xc_cil);
+ cil->xc_log = log;
+ cil->xc_pcp = alloc_percpu(struct xlog_cil_pcp);
+ if (!cil->xc_pcp)
+ goto out_destroy_wq;
+
+ for_each_possible_cpu(cpu) {
+ cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+ INIT_LIST_HEAD(&cilpcp->busy_extents);
+ INIT_LIST_HEAD(&cilpcp->log_items);
+ }
+
INIT_LIST_HEAD(&cil->xc_committing);
- spin_lock_init(&cil->xc_cil_lock);
spin_lock_init(&cil->xc_push_lock);
init_waitqueue_head(&cil->xc_push_wait);
init_rwsem(&cil->xc_ctx_lock);
init_waitqueue_head(&cil->xc_start_wait);
init_waitqueue_head(&cil->xc_commit_wait);
- cil->xc_log = log;
log->l_cilp = cil;
ctx = xlog_cil_ctx_alloc();
xlog_cil_ctx_switch(cil, ctx);
-
return 0;
+out_destroy_wq:
+ destroy_workqueue(cil->xc_push_wq);
out_destroy_cil:
- kmem_free(cil);
+ kfree(cil);
return -ENOMEM;
}
@@ -1529,14 +1964,17 @@ void
xlog_cil_destroy(
struct xlog *log)
{
- if (log->l_cilp->xc_ctx) {
- if (log->l_cilp->xc_ctx->ticket)
- xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
- kmem_free(log->l_cilp->xc_ctx);
+ struct xfs_cil *cil = log->l_cilp;
+
+ if (cil->xc_ctx) {
+ if (cil->xc_ctx->ticket)
+ xfs_log_ticket_put(cil->xc_ctx->ticket);
+ kfree(cil->xc_ctx);
}
- ASSERT(list_empty(&log->l_cilp->xc_cil));
- destroy_workqueue(log->l_cilp->xc_push_wq);
- kmem_free(log->l_cilp);
+ ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
+ free_percpu(cil->xc_pcp);
+ destroy_workqueue(cil->xc_push_wq);
+ kfree(cil);
}