summaryrefslogtreecommitdiff
path: root/kernel/rcu/tree.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/rcu/tree.c')
-rw-r--r--kernel/rcu/tree.c2325
1 files changed, 1199 insertions, 1126 deletions
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index cf34a961821a..293bbd9ac3f4 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -31,6 +31,7 @@
#include <linux/bitops.h>
#include <linux/export.h>
#include <linux/completion.h>
+#include <linux/kmemleak.h>
#include <linux/moduleparam.h>
#include <linux/panic.h>
#include <linux/panic_notifier.h>
@@ -74,13 +75,20 @@
#define MODULE_PARAM_PREFIX "rcutree."
/* Data structures. */
+static void rcu_sr_normal_gp_cleanup_work(struct work_struct *);
static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = {
.gpwrap = true,
-#ifdef CONFIG_RCU_NOCB_CPU
- .cblist.flags = SEGCBLIST_RCU_CORE,
-#endif
};
+
+int rcu_get_gpwrap_count(int cpu)
+{
+ struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+
+ return READ_ONCE(rdp->gpwrap_count);
+}
+EXPORT_SYMBOL_GPL(rcu_get_gpwrap_count);
+
static struct rcu_state rcu_state = {
.level = { &rcu_state.node[0] },
.gp_state = RCU_GP_IDLE,
@@ -92,6 +100,12 @@ static struct rcu_state rcu_state = {
.exp_mutex = __MUTEX_INITIALIZER(rcu_state.exp_mutex),
.exp_wake_mutex = __MUTEX_INITIALIZER(rcu_state.exp_wake_mutex),
.ofl_lock = __ARCH_SPIN_LOCK_UNLOCKED,
+ .srs_cleanup_work = __WORK_INITIALIZER(rcu_state.srs_cleanup_work,
+ rcu_sr_normal_gp_cleanup_work),
+ .srs_cleanups_pending = ATOMIC_INIT(0),
+#ifdef CONFIG_RCU_NOCB_CPU
+ .nocb_mutex = __MUTEX_INITIALIZER(rcu_state.nocb_mutex),
+#endif
};
/* Dump rcu_node combining tree at boot to verify correct setup. */
@@ -144,14 +158,14 @@ static int rcu_scheduler_fully_active __read_mostly;
static void rcu_report_qs_rnp(unsigned long mask, struct rcu_node *rnp,
unsigned long gps, unsigned long flags);
-static void rcu_init_new_rnp(struct rcu_node *rnp_leaf);
-static void rcu_cleanup_dead_rnp(struct rcu_node *rnp_leaf);
-static void rcu_boost_kthread_setaffinity(struct rcu_node *rnp, int outgoingcpu);
static void invoke_rcu_core(void);
static void rcu_report_exp_rdp(struct rcu_data *rdp);
-static void sync_sched_exp_online_cleanup(int cpu);
static void check_cb_ovld_locked(struct rcu_data *rdp, struct rcu_node *rnp);
static bool rcu_rdp_is_offloaded(struct rcu_data *rdp);
+static bool rcu_rdp_cpu_online(struct rcu_data *rdp);
+static bool rcu_init_invoked(void);
+static void rcu_cleanup_dead_rnp(struct rcu_node *rnp_leaf);
+static void rcu_init_new_rnp(struct rcu_node *rnp_leaf);
/*
* rcuc/rcub/rcuop kthread realtime priority. The "rcuop"
@@ -169,6 +183,9 @@ static int gp_init_delay;
module_param(gp_init_delay, int, 0444);
static int gp_cleanup_delay;
module_param(gp_cleanup_delay, int, 0444);
+static int nohz_full_patience_delay;
+module_param(nohz_full_patience_delay, int, 0444);
+static int nohz_full_patience_delay_jiffies;
// Add delay to rcu_read_unlock() for strict grace periods.
static int rcu_unlock_delay;
@@ -176,26 +193,6 @@ static int rcu_unlock_delay;
module_param(rcu_unlock_delay, int, 0444);
#endif
-/*
- * This rcu parameter is runtime-read-only. It reflects
- * a minimum allowed number of objects which can be cached
- * per-CPU. Object size is equal to one page. This value
- * can be changed at boot time.
- */
-static int rcu_min_cached_objs = 5;
-module_param(rcu_min_cached_objs, int, 0444);
-
-// A page shrinker can ask for pages to be freed to make them
-// available for other parts of the system. This usually happens
-// under low memory conditions, and in that case we should also
-// defer page-cache filling for a short time period.
-//
-// The default value is 5 seconds, which is long enough to reduce
-// interference with the shrinker while it asks other systems to
-// drain their caches.
-static int rcu_delay_page_cache_fill_msec = 5000;
-module_param(rcu_delay_page_cache_fill_msec, int, 0444);
-
/* Retrieve RCU kthreads priority for rcutorture */
int rcu_get_gp_kthreads_prio(void)
{
@@ -215,27 +212,6 @@ EXPORT_SYMBOL_GPL(rcu_get_gp_kthreads_prio);
#define PER_RCU_NODE_PERIOD 3 /* Number of grace periods between delays for debugging. */
/*
- * Compute the mask of online CPUs for the specified rcu_node structure.
- * This will not be stable unless the rcu_node structure's ->lock is
- * held, but the bit corresponding to the current CPU will be stable
- * in most contexts.
- */
-static unsigned long rcu_rnp_online_cpus(struct rcu_node *rnp)
-{
- return READ_ONCE(rnp->qsmaskinitnext);
-}
-
-/*
- * Is the CPU corresponding to the specified rcu_data structure online
- * from RCU's perspective? This perspective is given by that structure's
- * ->qsmaskinitnext field rather than by the global cpu_online_mask.
- */
-static bool rcu_rdp_cpu_online(struct rcu_data *rdp)
-{
- return !!(rdp->grpmask & rcu_rnp_online_cpus(rdp->mynode));
-}
-
-/*
* Return true if an RCU grace period is in progress. The READ_ONCE()s
* permit this function to be invoked without holding the root rcu_node
* structure's ->lock, but of course results can be subject to change.
@@ -258,76 +234,113 @@ static long rcu_get_n_cbs_cpu(int cpu)
return 0;
}
+/**
+ * rcu_softirq_qs - Provide a set of RCU quiescent states in softirq processing
+ *
+ * Mark a quiescent state for RCU, Tasks RCU, and Tasks Trace RCU.
+ * This is a special-purpose function to be used in the softirq
+ * infrastructure and perhaps the occasional long-running softirq
+ * handler.
+ *
+ * Note that from RCU's viewpoint, a call to rcu_softirq_qs() is
+ * equivalent to momentarily completely enabling preemption. For
+ * example, given this code::
+ *
+ * local_bh_disable();
+ * do_something();
+ * rcu_softirq_qs(); // A
+ * do_something_else();
+ * local_bh_enable(); // B
+ *
+ * A call to synchronize_rcu() that began concurrently with the
+ * call to do_something() would be guaranteed to wait only until
+ * execution reached statement A. Without that rcu_softirq_qs(),
+ * that same synchronize_rcu() would instead be guaranteed to wait
+ * until execution reached statement B.
+ */
void rcu_softirq_qs(void)
{
+ RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
+ lock_is_held(&rcu_lock_map) ||
+ lock_is_held(&rcu_sched_lock_map),
+ "Illegal rcu_softirq_qs() in RCU read-side critical section");
rcu_qs();
rcu_preempt_deferred_qs(current);
rcu_tasks_qs(current, false);
}
/*
- * Reset the current CPU's ->dynticks counter to indicate that the
+ * Reset the current CPU's RCU_WATCHING counter to indicate that the
* newly onlined CPU is no longer in an extended quiescent state.
* This will either leave the counter unchanged, or increment it
* to the next non-quiescent value.
*
* The non-atomic test/increment sequence works because the upper bits
- * of the ->dynticks counter are manipulated only by the corresponding CPU,
+ * of the ->state variable are manipulated only by the corresponding CPU,
* or when the corresponding CPU is offline.
*/
-static void rcu_dynticks_eqs_online(void)
+static void rcu_watching_online(void)
{
- if (ct_dynticks() & RCU_DYNTICKS_IDX)
+ if (ct_rcu_watching() & CT_RCU_WATCHING)
return;
- ct_state_inc(RCU_DYNTICKS_IDX);
-}
-
-/*
- * Snapshot the ->dynticks counter with full ordering so as to allow
- * stable comparison of this counter with past and future snapshots.
- */
-static int rcu_dynticks_snap(int cpu)
-{
- smp_mb(); // Fundamental RCU ordering guarantee.
- return ct_dynticks_cpu_acquire(cpu);
+ ct_state_inc(CT_RCU_WATCHING);
}
/*
- * Return true if the snapshot returned from rcu_dynticks_snap()
+ * Return true if the snapshot returned from ct_rcu_watching()
* indicates that RCU is in an extended quiescent state.
*/
-static bool rcu_dynticks_in_eqs(int snap)
+static bool rcu_watching_snap_in_eqs(int snap)
{
- return !(snap & RCU_DYNTICKS_IDX);
+ return !(snap & CT_RCU_WATCHING);
}
-/*
- * Return true if the CPU corresponding to the specified rcu_data
- * structure has spent some time in an extended quiescent state since
- * rcu_dynticks_snap() returned the specified snapshot.
+/**
+ * rcu_watching_snap_stopped_since() - Has RCU stopped watching a given CPU
+ * since the specified @snap?
+ *
+ * @rdp: The rcu_data corresponding to the CPU for which to check EQS.
+ * @snap: rcu_watching snapshot taken when the CPU wasn't in an EQS.
+ *
+ * Returns true if the CPU corresponding to @rdp has spent some time in an
+ * extended quiescent state since @snap. Note that this doesn't check if it
+ * /still/ is in an EQS, just that it went through one since @snap.
+ *
+ * This is meant to be used in a loop waiting for a CPU to go through an EQS.
*/
-static bool rcu_dynticks_in_eqs_since(struct rcu_data *rdp, int snap)
+static bool rcu_watching_snap_stopped_since(struct rcu_data *rdp, int snap)
{
- return snap != rcu_dynticks_snap(rdp->cpu);
+ /*
+ * The first failing snapshot is already ordered against the accesses
+ * performed by the remote CPU after it exits idle.
+ *
+ * The second snapshot therefore only needs to order against accesses
+ * performed by the remote CPU prior to entering idle and therefore can
+ * rely solely on acquire semantics.
+ */
+ if (WARN_ON_ONCE(rcu_watching_snap_in_eqs(snap)))
+ return true;
+
+ return snap != ct_rcu_watching_cpu_acquire(rdp->cpu);
}
/*
* Return true if the referenced integer is zero while the specified
* CPU remains within a single extended quiescent state.
*/
-bool rcu_dynticks_zero_in_eqs(int cpu, int *vp)
+bool rcu_watching_zero_in_eqs(int cpu, int *vp)
{
int snap;
// If not quiescent, force back to earlier extended quiescent state.
- snap = ct_dynticks_cpu(cpu) & ~RCU_DYNTICKS_IDX;
- smp_rmb(); // Order ->dynticks and *vp reads.
+ snap = ct_rcu_watching_cpu(cpu) & ~CT_RCU_WATCHING;
+ smp_rmb(); // Order CT state and *vp reads.
if (READ_ONCE(*vp))
return false; // Non-zero, so report failure;
- smp_rmb(); // Order *vp read and ->dynticks re-read.
+ smp_rmb(); // Order *vp read and CT state re-read.
// If still in the same extended quiescent state, we are good!
- return snap == ct_dynticks_cpu(cpu);
+ return snap == ct_rcu_watching_cpu(cpu);
}
/*
@@ -341,17 +354,17 @@ bool rcu_dynticks_zero_in_eqs(int cpu, int *vp)
*
* The caller must have disabled interrupts and must not be idle.
*/
-notrace void rcu_momentary_dyntick_idle(void)
+notrace void rcu_momentary_eqs(void)
{
int seq;
raw_cpu_write(rcu_data.rcu_need_heavy_qs, false);
- seq = ct_state_inc(2 * RCU_DYNTICKS_IDX);
+ seq = ct_state_inc(2 * CT_RCU_WATCHING);
/* It is illegal to call this from idle state. */
- WARN_ON_ONCE(!(seq & RCU_DYNTICKS_IDX));
+ WARN_ON_ONCE(!(seq & CT_RCU_WATCHING));
rcu_preempt_deferred_qs(current);
}
-EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle);
+EXPORT_SYMBOL_GPL(rcu_momentary_eqs);
/**
* rcu_is_cpu_rrupt_from_idle - see if 'interrupted' from idle
@@ -363,7 +376,7 @@ EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle);
*/
static int rcu_is_cpu_rrupt_from_idle(void)
{
- long nesting;
+ long nmi_nesting = ct_nmi_nesting();
/*
* Usually called from the tick; but also used from smp_function_call()
@@ -373,23 +386,30 @@ static int rcu_is_cpu_rrupt_from_idle(void)
lockdep_assert_irqs_disabled();
/* Check for counter underflows */
- RCU_LOCKDEP_WARN(ct_dynticks_nesting() < 0,
- "RCU dynticks_nesting counter underflow!");
- RCU_LOCKDEP_WARN(ct_dynticks_nmi_nesting() <= 0,
- "RCU dynticks_nmi_nesting counter underflow/zero!");
-
- /* Are we at first interrupt nesting level? */
- nesting = ct_dynticks_nmi_nesting();
- if (nesting > 1)
+ RCU_LOCKDEP_WARN(ct_nesting() < 0,
+ "RCU nesting counter underflow!");
+
+ /* Non-idle interrupt or nested idle interrupt */
+ if (nmi_nesting > 1)
return false;
/*
- * If we're not in an interrupt, we must be in the idle task!
+ * Non nested idle interrupt (interrupting section where RCU
+ * wasn't watching).
*/
- WARN_ON_ONCE(!nesting && !is_idle_task(current));
+ if (nmi_nesting == 1)
+ return true;
+
+ /* Not in an interrupt */
+ if (!nmi_nesting) {
+ RCU_LOCKDEP_WARN(!in_task() || !is_idle_task(current),
+ "RCU nmi_nesting counter not in idle task!");
+ return !rcu_is_watching_curr_cpu();
+ }
+
+ RCU_LOCKDEP_WARN(1, "RCU nmi_nesting counter underflow/zero!");
- /* Does CPU appear to be idle from an RCU standpoint? */
- return ct_dynticks_nesting() == 0;
+ return false;
}
#define DEFAULT_RCU_BLIMIT (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD) ? 1000 : 10)
@@ -526,21 +546,34 @@ static struct rcu_node *rcu_get_root(void)
/*
* Send along grace-period-related data for rcutorture diagnostics.
*/
-void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
- unsigned long *gp_seq)
+void rcutorture_get_gp_data(int *flags, unsigned long *gp_seq)
{
- switch (test_type) {
- case RCU_FLAVOR:
- *flags = READ_ONCE(rcu_state.gp_flags);
- *gp_seq = rcu_seq_current(&rcu_state.gp_seq);
- break;
- default:
- break;
- }
+ *flags = READ_ONCE(rcu_state.gp_flags);
+ *gp_seq = rcu_seq_current(&rcu_state.gp_seq);
}
EXPORT_SYMBOL_GPL(rcutorture_get_gp_data);
-#if defined(CONFIG_NO_HZ_FULL) && (!defined(CONFIG_GENERIC_ENTRY) || !defined(CONFIG_KVM_XFER_TO_GUEST_WORK))
+/* Gather grace-period sequence numbers for rcutorture diagnostics. */
+unsigned long long rcutorture_gather_gp_seqs(void)
+{
+ return ((READ_ONCE(rcu_state.gp_seq) & 0xffffULL) << 40) |
+ ((READ_ONCE(rcu_state.expedited_sequence) & 0xffffffULL) << 16) |
+ (READ_ONCE(rcu_state.gp_seq_polled) & 0xffffULL);
+}
+EXPORT_SYMBOL_GPL(rcutorture_gather_gp_seqs);
+
+/* Format grace-period sequence numbers for rcutorture diagnostics. */
+void rcutorture_format_gp_seqs(unsigned long long seqs, char *cp, size_t len)
+{
+ unsigned int egp = (seqs >> 16) & 0xffffffULL;
+ unsigned int ggp = (seqs >> 40) & 0xffffULL;
+ unsigned int pgp = seqs & 0xffffULL;
+
+ snprintf(cp, len, "g%04x:e%06x:p%04x", ggp, egp, pgp);
+}
+EXPORT_SYMBOL_GPL(rcutorture_format_gp_seqs);
+
+#if defined(CONFIG_NO_HZ_FULL) && (!defined(CONFIG_GENERIC_ENTRY) || !defined(CONFIG_VIRT_XFER_TO_GUEST_WORK))
/*
* An empty function that will trigger a reschedule on
* IRQ tail once IRQs get re-enabled on userspace/guest resume.
@@ -569,7 +602,7 @@ noinstr void rcu_irq_work_resched(void)
if (IS_ENABLED(CONFIG_GENERIC_ENTRY) && !(current->flags & PF_VCPU))
return;
- if (IS_ENABLED(CONFIG_KVM_XFER_TO_GUEST_WORK) && (current->flags & PF_VCPU))
+ if (IS_ENABLED(CONFIG_VIRT_XFER_TO_GUEST_WORK) && (current->flags & PF_VCPU))
return;
instrumentation_begin();
@@ -578,7 +611,7 @@ noinstr void rcu_irq_work_resched(void)
}
instrumentation_end();
}
-#endif /* #if defined(CONFIG_NO_HZ_FULL) && (!defined(CONFIG_GENERIC_ENTRY) || !defined(CONFIG_KVM_XFER_TO_GUEST_WORK)) */
+#endif /* #if defined(CONFIG_NO_HZ_FULL) && (!defined(CONFIG_GENERIC_ENTRY) || !defined(CONFIG_VIRT_XFER_TO_GUEST_WORK)) */
#ifdef CONFIG_PROVE_RCU
/**
@@ -588,12 +621,12 @@ void rcu_irq_exit_check_preempt(void)
{
lockdep_assert_irqs_disabled();
- RCU_LOCKDEP_WARN(ct_dynticks_nesting() <= 0,
- "RCU dynticks_nesting counter underflow/zero!");
- RCU_LOCKDEP_WARN(ct_dynticks_nmi_nesting() !=
- DYNTICK_IRQ_NONIDLE,
- "Bad RCU dynticks_nmi_nesting counter\n");
- RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(),
+ RCU_LOCKDEP_WARN(ct_nesting() <= 0,
+ "RCU nesting counter underflow/zero!");
+ RCU_LOCKDEP_WARN(ct_nmi_nesting() !=
+ CT_NESTING_IRQ_NONIDLE,
+ "Bad RCU nmi_nesting counter\n");
+ RCU_LOCKDEP_WARN(!rcu_is_watching_curr_cpu(),
"RCU in extended quiescent state!");
}
#endif /* #ifdef CONFIG_PROVE_RCU */
@@ -633,7 +666,7 @@ void __rcu_irq_enter_check_tick(void)
if (in_nmi())
return;
- RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(),
+ RCU_LOCKDEP_WARN(!rcu_is_watching_curr_cpu(),
"Illegal rcu_irq_enter_check_tick() from extended quiescent state");
if (!tick_nohz_full_cpu(rdp->cpu) ||
@@ -651,7 +684,7 @@ void __rcu_irq_enter_check_tick(void)
// prevents self-deadlock. So we can safely recheck under the lock.
// Note that the nohz_full state currently cannot change.
raw_spin_lock_rcu_node(rdp->mynode);
- if (rdp->rcu_urgent_qs && !rdp->rcu_forced_tick) {
+ if (READ_ONCE(rdp->rcu_urgent_qs) && !rdp->rcu_forced_tick) {
// A nohz_full CPU is in the kernel and RCU needs a
// quiescent state. Turn on the tick!
WRITE_ONCE(rdp->rcu_forced_tick, true);
@@ -659,6 +692,7 @@ void __rcu_irq_enter_check_tick(void)
}
raw_spin_unlock_rcu_node(rdp->mynode);
}
+NOKPROBE_SYMBOL(__rcu_irq_enter_check_tick);
#endif /* CONFIG_NO_HZ_FULL */
/*
@@ -695,12 +729,16 @@ static void rcu_disable_urgency_upon_qs(struct rcu_data *rdp)
}
/**
- * rcu_is_watching - see if RCU thinks that the current CPU is not idle
+ * rcu_is_watching - RCU read-side critical sections permitted on current CPU?
+ *
+ * Return @true if RCU is watching the running CPU and @false otherwise.
+ * An @true return means that this CPU can safely enter RCU read-side
+ * critical sections.
*
- * Return true if RCU is watching the running CPU, which means that this
- * CPU can safely enter RCU read-side critical sections. In other words,
- * if the current CPU is not in its idle loop or is in an interrupt or
- * NMI handler, return true.
+ * Although calls to rcu_is_watching() from most parts of the kernel
+ * will return @true, there are important exceptions. For example, if the
+ * current CPU is deep within its idle loop, in kernel entry/exit code,
+ * or offline, rcu_is_watching() will return @false.
*
* Make notrace because it can be called by the internal functions of
* ftrace, and making this notrace removes unnecessary recursion calls.
@@ -710,7 +748,7 @@ notrace bool rcu_is_watching(void)
bool ret;
preempt_disable_notrace();
- ret = !rcu_dynticks_curr_cpu_in_eqs();
+ ret = rcu_is_watching_curr_cpu();
preempt_enable_notrace();
return ret;
}
@@ -734,45 +772,24 @@ void rcu_request_urgent_qs_task(struct task_struct *t)
smp_store_release(per_cpu_ptr(&rcu_data.rcu_urgent_qs, cpu), true);
}
-#if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU)
+static unsigned long seq_gpwrap_lag = ULONG_MAX / 4;
-/*
- * Is the current CPU online as far as RCU is concerned?
- *
- * Disable preemption to avoid false positives that could otherwise
- * happen due to the current CPU number being sampled, this task being
- * preempted, its old CPU being taken offline, resuming on some other CPU,
- * then determining that its old CPU is now offline.
- *
- * Disable checking if in an NMI handler because we cannot safely
- * report errors from NMI handlers anyway. In addition, it is OK to use
- * RCU on an offline processor during initial boot, hence the check for
- * rcu_scheduler_fully_active.
+/**
+ * rcu_set_gpwrap_lag - Set RCU GP sequence overflow lag value.
+ * @lag_gps: Set overflow lag to this many grace period worth of counters
+ * which is used by rcutorture to quickly force a gpwrap situation.
+ * @lag_gps = 0 means we reset it back to the boot-time value.
*/
-bool rcu_lockdep_current_cpu_online(void)
+void rcu_set_gpwrap_lag(unsigned long lag_gps)
{
- struct rcu_data *rdp;
- bool ret = false;
+ unsigned long lag_seq_count;
- if (in_nmi() || !rcu_scheduler_fully_active)
- return true;
- preempt_disable_notrace();
- rdp = this_cpu_ptr(&rcu_data);
- /*
- * Strictly, we care here about the case where the current CPU is
- * in rcu_cpu_starting() and thus has an excuse for rdp->grpmask
- * not being up to date. So arch_spin_is_locked() might have a
- * false positive if it's held by some *other* CPU, but that's
- * OK because that just means a false *negative* on the warning.
- */
- if (rcu_rdp_cpu_online(rdp) || arch_spin_is_locked(&rcu_state.ofl_lock))
- ret = true;
- preempt_enable_notrace();
- return ret;
+ lag_seq_count = (lag_gps == 0)
+ ? ULONG_MAX / 4
+ : lag_gps << RCU_SEQ_CTR_SHIFT;
+ WRITE_ONCE(seq_gpwrap_lag, lag_seq_count);
}
-EXPORT_SYMBOL_GPL(rcu_lockdep_current_cpu_online);
-
-#endif /* #if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU) */
+EXPORT_SYMBOL_GPL(rcu_set_gpwrap_lag);
/*
* When trying to report a quiescent state on behalf of some other CPU,
@@ -784,22 +801,35 @@ EXPORT_SYMBOL_GPL(rcu_lockdep_current_cpu_online);
static void rcu_gpnum_ovf(struct rcu_node *rnp, struct rcu_data *rdp)
{
raw_lockdep_assert_held_rcu_node(rnp);
- if (ULONG_CMP_LT(rcu_seq_current(&rdp->gp_seq) + ULONG_MAX / 4,
- rnp->gp_seq))
+ if (ULONG_CMP_LT(rcu_seq_current(&rdp->gp_seq) + seq_gpwrap_lag,
+ rnp->gp_seq)) {
WRITE_ONCE(rdp->gpwrap, true);
+ WRITE_ONCE(rdp->gpwrap_count, READ_ONCE(rdp->gpwrap_count) + 1);
+ }
if (ULONG_CMP_LT(rdp->rcu_iw_gp_seq + ULONG_MAX / 4, rnp->gp_seq))
rdp->rcu_iw_gp_seq = rnp->gp_seq + ULONG_MAX / 4;
}
/*
- * Snapshot the specified CPU's dynticks counter so that we can later
+ * Snapshot the specified CPU's RCU_WATCHING counter so that we can later
* credit them with an implicit quiescent state. Return 1 if this CPU
* is in dynticks idle mode, which is an extended quiescent state.
*/
-static int dyntick_save_progress_counter(struct rcu_data *rdp)
+static int rcu_watching_snap_save(struct rcu_data *rdp)
{
- rdp->dynticks_snap = rcu_dynticks_snap(rdp->cpu);
- if (rcu_dynticks_in_eqs(rdp->dynticks_snap)) {
+ /*
+ * Full ordering between remote CPU's post idle accesses and updater's
+ * accesses prior to current GP (and also the started GP sequence number)
+ * is enforced by rcu_seq_start() implicit barrier and even further by
+ * smp_mb__after_unlock_lock() barriers chained all the way throughout the
+ * rnp locking tree since rcu_gp_init() and up to the current leaf rnp
+ * locking.
+ *
+ * Ordering between remote CPU's pre idle accesses and post grace period
+ * updater's accesses is enforced by the below acquire semantic.
+ */
+ rdp->watching_snap = ct_rcu_watching_cpu_acquire(rdp->cpu);
+ if (rcu_watching_snap_in_eqs(rdp->watching_snap)) {
trace_rcu_fqs(rcu_state.name, rdp->gp_seq, rdp->cpu, TPS("dti"));
rcu_gpnum_ovf(rdp->mynode, rdp);
return 1;
@@ -807,15 +837,24 @@ static int dyntick_save_progress_counter(struct rcu_data *rdp)
return 0;
}
+#ifndef arch_irq_stat_cpu
+#define arch_irq_stat_cpu(cpu) 0
+#endif
+
/*
- * Return true if the specified CPU has passed through a quiescent
- * state by virtue of being in or having passed through an dynticks
- * idle state since the last call to dyntick_save_progress_counter()
- * for this same CPU, or by virtue of having been offline.
+ * Returns positive if the specified CPU has passed through a quiescent state
+ * by virtue of being in or having passed through an dynticks idle state since
+ * the last call to rcu_watching_snap_save() for this same CPU, or by
+ * virtue of having been offline.
+ *
+ * Returns negative if the specified CPU needs a force resched.
+ *
+ * Returns zero otherwise.
*/
-static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
+static int rcu_watching_snap_recheck(struct rcu_data *rdp)
{
unsigned long jtsq;
+ int ret = 0;
struct rcu_node *rnp = rdp->mynode;
/*
@@ -826,7 +865,7 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
* read-side critical section that started before the beginning
* of the current RCU grace period.
*/
- if (rcu_dynticks_in_eqs_since(rdp, rdp->dynticks_snap)) {
+ if (rcu_watching_snap_stopped_since(rdp, rdp->watching_snap)) {
trace_rcu_fqs(rcu_state.name, rdp->gp_seq, rdp->cpu, TPS("dti"));
rcu_gpnum_ovf(rnp, rdp);
return 1;
@@ -861,8 +900,8 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
__func__, rnp1->grplo, rnp1->grphi, rnp1->qsmask, rnp1->qsmaskinit, rnp1->qsmaskinitnext, rnp1->rcu_gp_init_mask);
pr_info("%s %d: %c online: %ld(%d) offline: %ld(%d)\n",
__func__, rdp->cpu, ".o"[rcu_rdp_cpu_online(rdp)],
- (long)rdp->rcu_onl_gp_seq, rdp->rcu_onl_gp_flags,
- (long)rdp->rcu_ofl_gp_seq, rdp->rcu_ofl_gp_flags);
+ (long)rdp->rcu_onl_gp_seq, rdp->rcu_onl_gp_state,
+ (long)rdp->rcu_ofl_gp_seq, rdp->rcu_ofl_gp_state);
return 1; /* Break things loose after complaining. */
}
@@ -901,8 +940,8 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
(time_after(jiffies, READ_ONCE(rdp->last_fqs_resched) + jtsq * 3) ||
rcu_state.cbovld)) {
WRITE_ONCE(rdp->rcu_urgent_qs, true);
- resched_cpu(rdp->cpu);
WRITE_ONCE(rdp->last_fqs_resched, jiffies);
+ ret = -1;
}
/*
@@ -915,8 +954,8 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
if (time_after(jiffies, rcu_state.jiffies_resched)) {
if (time_after(jiffies,
READ_ONCE(rdp->last_fqs_resched) + jtsq)) {
- resched_cpu(rdp->cpu);
WRITE_ONCE(rdp->last_fqs_resched, jiffies);
+ ret = -1;
}
if (IS_ENABLED(CONFIG_IRQ_WORK) &&
!rdp->rcu_iw_pending && rdp->rcu_iw_gp_seq != rnp->gp_seq &&
@@ -925,9 +964,27 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
rdp->rcu_iw_gp_seq = rnp->gp_seq;
irq_work_queue_on(&rdp->rcu_iw, rdp->cpu);
}
+
+ if (rcu_cpu_stall_cputime && rdp->snap_record.gp_seq != rdp->gp_seq) {
+ int cpu = rdp->cpu;
+ struct rcu_snap_record *rsrp;
+ struct kernel_cpustat *kcsp;
+
+ kcsp = &kcpustat_cpu(cpu);
+
+ rsrp = &rdp->snap_record;
+ rsrp->cputime_irq = kcpustat_field(kcsp, CPUTIME_IRQ, cpu);
+ rsrp->cputime_softirq = kcpustat_field(kcsp, CPUTIME_SOFTIRQ, cpu);
+ rsrp->cputime_system = kcpustat_field(kcsp, CPUTIME_SYSTEM, cpu);
+ rsrp->nr_hardirqs = kstat_cpu_irqs_sum(cpu) + arch_irq_stat_cpu(cpu);
+ rsrp->nr_softirqs = kstat_cpu_softirqs_sum(cpu);
+ rsrp->nr_csw = nr_context_switches_cpu(cpu);
+ rsrp->jiffies = jiffies;
+ rsrp->gp_seq = rdp->gp_seq;
+ }
}
- return 0;
+ return ret;
}
/* Trace-event wrapper function for trace_rcu_future_grace_period. */
@@ -1225,7 +1282,7 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
/* Handle the ends of any preceding grace periods first. */
if (rcu_seq_completed_gp(rdp->gp_seq, rnp->gp_seq) ||
- unlikely(READ_ONCE(rdp->gpwrap))) {
+ unlikely(rdp->gpwrap)) {
if (!offloaded)
ret = rcu_advance_cbs(rnp, rdp); /* Advance CBs. */
rdp->core_needs_qs = false;
@@ -1239,7 +1296,7 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
/* Now handle the beginnings of any new-to-this-CPU grace periods. */
if (rcu_seq_new_gp(rdp->gp_seq, rnp->gp_seq) ||
- unlikely(READ_ONCE(rdp->gpwrap))) {
+ unlikely(rdp->gpwrap)) {
/*
* If the current grace period is waiting for this CPU,
* set up to detect a quiescent state, otherwise don't
@@ -1254,7 +1311,7 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
rdp->gp_seq = rnp->gp_seq; /* Remember new grace-period state. */
if (ULONG_CMP_LT(rdp->gp_seq_needed, rnp->gp_seq_needed) || rdp->gpwrap)
WRITE_ONCE(rdp->gp_seq_needed, rnp->gp_seq_needed);
- if (IS_ENABLED(CONFIG_PROVE_RCU) && READ_ONCE(rdp->gpwrap))
+ if (IS_ENABLED(CONFIG_PROVE_RCU) && rdp->gpwrap)
WRITE_ONCE(rdp->last_sched_clock, jiffies);
WRITE_ONCE(rdp->gpwrap, false);
rcu_gpnum_ovf(rnp, rdp);
@@ -1296,7 +1353,7 @@ EXPORT_SYMBOL_GPL(rcu_gp_slow_register);
/* Unregister a counter, with NULL for not caring which. */
void rcu_gp_slow_unregister(atomic_t *rgssp)
{
- WARN_ON_ONCE(rgssp && rgssp != rcu_gp_slow_suppress);
+ WARN_ON_ONCE(rgssp && rgssp != rcu_gp_slow_suppress && rcu_gp_slow_suppress != NULL);
WRITE_ONCE(rcu_gp_slow_suppress, NULL);
}
@@ -1350,13 +1407,6 @@ static void rcu_strict_gp_boundary(void *unused)
invoke_rcu_core();
}
-// Has rcu_init() been invoked? This is used (for example) to determine
-// whether spinlocks may be acquired safely.
-static bool rcu_init_invoked(void)
-{
- return !!rcu_state.n_online_cpus;
-}
-
// Make the polled API aware of the beginning of a grace period.
static void rcu_poll_gp_seq_start(unsigned long *snap)
{
@@ -1428,6 +1478,326 @@ static void rcu_poll_gp_seq_end_unlocked(unsigned long *snap)
}
/*
+ * There is a single llist, which is used for handling
+ * synchronize_rcu() users' enqueued rcu_synchronize nodes.
+ * Within this llist, there are two tail pointers:
+ *
+ * wait tail: Tracks the set of nodes, which need to
+ * wait for the current GP to complete.
+ * done tail: Tracks the set of nodes, for which grace
+ * period has elapsed. These nodes processing
+ * will be done as part of the cleanup work
+ * execution by a kworker.
+ *
+ * At every grace period init, a new wait node is added
+ * to the llist. This wait node is used as wait tail
+ * for this new grace period. Given that there are a fixed
+ * number of wait nodes, if all wait nodes are in use
+ * (which can happen when kworker callback processing
+ * is delayed) and additional grace period is requested.
+ * This means, a system is slow in processing callbacks.
+ *
+ * TODO: If a slow processing is detected, a first node
+ * in the llist should be used as a wait-tail for this
+ * grace period, therefore users which should wait due
+ * to a slow process are handled by _this_ grace period
+ * and not next.
+ *
+ * Below is an illustration of how the done and wait
+ * tail pointers move from one set of rcu_synchronize nodes
+ * to the other, as grace periods start and finish and
+ * nodes are processed by kworker.
+ *
+ *
+ * a. Initial llist callbacks list:
+ *
+ * +----------+ +--------+ +-------+
+ * | | | | | |
+ * | head |---------> | cb2 |--------->| cb1 |
+ * | | | | | |
+ * +----------+ +--------+ +-------+
+ *
+ *
+ *
+ * b. New GP1 Start:
+ *
+ * WAIT TAIL
+ * |
+ * |
+ * v
+ * +----------+ +--------+ +--------+ +-------+
+ * | | | | | | | |
+ * | head ------> wait |------> cb2 |------> | cb1 |
+ * | | | head1 | | | | |
+ * +----------+ +--------+ +--------+ +-------+
+ *
+ *
+ *
+ * c. GP completion:
+ *
+ * WAIT_TAIL == DONE_TAIL
+ *
+ * DONE TAIL
+ * |
+ * |
+ * v
+ * +----------+ +--------+ +--------+ +-------+
+ * | | | | | | | |
+ * | head ------> wait |------> cb2 |------> | cb1 |
+ * | | | head1 | | | | |
+ * +----------+ +--------+ +--------+ +-------+
+ *
+ *
+ *
+ * d. New callbacks and GP2 start:
+ *
+ * WAIT TAIL DONE TAIL
+ * | |
+ * | |
+ * v v
+ * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+
+ * | | | | | | | | | | | | | |
+ * | head ------> wait |--->| cb4 |--->| cb3 |--->|wait |--->| cb2 |--->| cb1 |
+ * | | | head2| | | | | |head1| | | | |
+ * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+
+ *
+ *
+ *
+ * e. GP2 completion:
+ *
+ * WAIT_TAIL == DONE_TAIL
+ * DONE TAIL
+ * |
+ * |
+ * v
+ * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+
+ * | | | | | | | | | | | | | |
+ * | head ------> wait |--->| cb4 |--->| cb3 |--->|wait |--->| cb2 |--->| cb1 |
+ * | | | head2| | | | | |head1| | | | |
+ * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+
+ *
+ *
+ * While the llist state transitions from d to e, a kworker
+ * can start executing rcu_sr_normal_gp_cleanup_work() and
+ * can observe either the old done tail (@c) or the new
+ * done tail (@e). So, done tail updates and reads need
+ * to use the rel-acq semantics. If the concurrent kworker
+ * observes the old done tail, the newly queued work
+ * execution will process the updated done tail. If the
+ * concurrent kworker observes the new done tail, then
+ * the newly queued work will skip processing the done
+ * tail, as workqueue semantics guarantees that the new
+ * work is executed only after the previous one completes.
+ *
+ * f. kworker callbacks processing complete:
+ *
+ *
+ * DONE TAIL
+ * |
+ * |
+ * v
+ * +----------+ +--------+
+ * | | | |
+ * | head ------> wait |
+ * | | | head2 |
+ * +----------+ +--------+
+ *
+ */
+static bool rcu_sr_is_wait_head(struct llist_node *node)
+{
+ return &(rcu_state.srs_wait_nodes)[0].node <= node &&
+ node <= &(rcu_state.srs_wait_nodes)[SR_NORMAL_GP_WAIT_HEAD_MAX - 1].node;
+}
+
+static struct llist_node *rcu_sr_get_wait_head(void)
+{
+ struct sr_wait_node *sr_wn;
+ int i;
+
+ for (i = 0; i < SR_NORMAL_GP_WAIT_HEAD_MAX; i++) {
+ sr_wn = &(rcu_state.srs_wait_nodes)[i];
+
+ if (!atomic_cmpxchg_acquire(&sr_wn->inuse, 0, 1))
+ return &sr_wn->node;
+ }
+
+ return NULL;
+}
+
+static void rcu_sr_put_wait_head(struct llist_node *node)
+{
+ struct sr_wait_node *sr_wn = container_of(node, struct sr_wait_node, node);
+
+ atomic_set_release(&sr_wn->inuse, 0);
+}
+
+/* Enable rcu_normal_wake_from_gp automatically on small systems. */
+#define WAKE_FROM_GP_CPU_THRESHOLD 16
+
+static int rcu_normal_wake_from_gp = -1;
+module_param(rcu_normal_wake_from_gp, int, 0644);
+static struct workqueue_struct *sync_wq;
+
+static void rcu_sr_normal_complete(struct llist_node *node)
+{
+ struct rcu_synchronize *rs = container_of(
+ (struct rcu_head *) node, struct rcu_synchronize, head);
+
+ WARN_ONCE(IS_ENABLED(CONFIG_PROVE_RCU) &&
+ !poll_state_synchronize_rcu_full(&rs->oldstate),
+ "A full grace period is not passed yet!\n");
+
+ /* Finally. */
+ complete(&rs->completion);
+}
+
+static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work)
+{
+ struct llist_node *done, *rcu, *next, *head;
+
+ /*
+ * This work execution can potentially execute
+ * while a new done tail is being updated by
+ * grace period kthread in rcu_sr_normal_gp_cleanup().
+ * So, read and updates of done tail need to
+ * follow acq-rel semantics.
+ *
+ * Given that wq semantics guarantees that a single work
+ * cannot execute concurrently by multiple kworkers,
+ * the done tail list manipulations are protected here.
+ */
+ done = smp_load_acquire(&rcu_state.srs_done_tail);
+ if (WARN_ON_ONCE(!done))
+ return;
+
+ WARN_ON_ONCE(!rcu_sr_is_wait_head(done));
+ head = done->next;
+ done->next = NULL;
+
+ /*
+ * The dummy node, which is pointed to by the
+ * done tail which is acq-read above is not removed
+ * here. This allows lockless additions of new
+ * rcu_synchronize nodes in rcu_sr_normal_add_req(),
+ * while the cleanup work executes. The dummy
+ * nodes is removed, in next round of cleanup
+ * work execution.
+ */
+ llist_for_each_safe(rcu, next, head) {
+ if (!rcu_sr_is_wait_head(rcu)) {
+ rcu_sr_normal_complete(rcu);
+ continue;
+ }
+
+ rcu_sr_put_wait_head(rcu);
+ }
+
+ /* Order list manipulations with atomic access. */
+ atomic_dec_return_release(&rcu_state.srs_cleanups_pending);
+}
+
+/*
+ * Helper function for rcu_gp_cleanup().
+ */
+static void rcu_sr_normal_gp_cleanup(void)
+{
+ struct llist_node *wait_tail, *next = NULL, *rcu = NULL;
+ int done = 0;
+
+ wait_tail = rcu_state.srs_wait_tail;
+ if (wait_tail == NULL)
+ return;
+
+ rcu_state.srs_wait_tail = NULL;
+ ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_wait_tail);
+ WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail));
+
+ /*
+ * Process (a) and (d) cases. See an illustration.
+ */
+ llist_for_each_safe(rcu, next, wait_tail->next) {
+ if (rcu_sr_is_wait_head(rcu))
+ break;
+
+ rcu_sr_normal_complete(rcu);
+ // It can be last, update a next on this step.
+ wait_tail->next = next;
+
+ if (++done == SR_MAX_USERS_WAKE_FROM_GP)
+ break;
+ }
+
+ /*
+ * Fast path, no more users to process except putting the second last
+ * wait head if no inflight-workers. If there are in-flight workers,
+ * they will remove the last wait head.
+ *
+ * Note that the ACQUIRE orders atomic access with list manipulation.
+ */
+ if (wait_tail->next && wait_tail->next->next == NULL &&
+ rcu_sr_is_wait_head(wait_tail->next) &&
+ !atomic_read_acquire(&rcu_state.srs_cleanups_pending)) {
+ rcu_sr_put_wait_head(wait_tail->next);
+ wait_tail->next = NULL;
+ }
+
+ /* Concurrent sr_normal_gp_cleanup work might observe this update. */
+ ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_done_tail);
+ smp_store_release(&rcu_state.srs_done_tail, wait_tail);
+
+ /*
+ * We schedule a work in order to perform a final processing
+ * of outstanding users(if still left) and releasing wait-heads
+ * added by rcu_sr_normal_gp_init() call.
+ */
+ if (wait_tail->next) {
+ atomic_inc(&rcu_state.srs_cleanups_pending);
+ if (!queue_work(sync_wq, &rcu_state.srs_cleanup_work))
+ atomic_dec(&rcu_state.srs_cleanups_pending);
+ }
+}
+
+/*
+ * Helper function for rcu_gp_init().
+ */
+static bool rcu_sr_normal_gp_init(void)
+{
+ struct llist_node *first;
+ struct llist_node *wait_head;
+ bool start_new_poll = false;
+
+ first = READ_ONCE(rcu_state.srs_next.first);
+ if (!first || rcu_sr_is_wait_head(first))
+ return start_new_poll;
+
+ wait_head = rcu_sr_get_wait_head();
+ if (!wait_head) {
+ // Kick another GP to retry.
+ start_new_poll = true;
+ return start_new_poll;
+ }
+
+ /* Inject a wait-dummy-node. */
+ llist_add(wait_head, &rcu_state.srs_next);
+
+ /*
+ * A waiting list of rcu_synchronize nodes should be empty on
+ * this step, since a GP-kthread, rcu_gp_init() -> gp_cleanup(),
+ * rolls it over. If not, it is a BUG, warn a user.
+ */
+ WARN_ON_ONCE(rcu_state.srs_wait_tail != NULL);
+ rcu_state.srs_wait_tail = wait_head;
+ ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_wait_tail);
+
+ return start_new_poll;
+}
+
+static void rcu_sr_normal_add_req(struct rcu_synchronize *rs)
+{
+ llist_add((struct llist_node *) &rs->head, &rcu_state.srs_next);
+}
+
+/*
* Initialize a new grace period. Return false if no grace period required.
*/
static noinline_for_stack bool rcu_gp_init(void)
@@ -1437,10 +1807,12 @@ static noinline_for_stack bool rcu_gp_init(void)
unsigned long mask;
struct rcu_data *rdp;
struct rcu_node *rnp = rcu_get_root();
+ bool start_new_poll;
+ unsigned long old_gp_seq;
WRITE_ONCE(rcu_state.gp_activity, jiffies);
raw_spin_lock_irq_rcu_node(rnp);
- if (!READ_ONCE(rcu_state.gp_flags)) {
+ if (!rcu_state.gp_flags) {
/* Spurious wakeup, tell caller to go back to sleep. */
raw_spin_unlock_irq_rcu_node(rnp);
return false;
@@ -1458,14 +1830,49 @@ static noinline_for_stack bool rcu_gp_init(void)
/* Advance to a new grace period and initialize state. */
record_gp_stall_check_time();
+ /*
+ * A new wait segment must be started before gp_seq advanced, so
+ * that previous gp waiters won't observe the new gp_seq.
+ */
+ start_new_poll = rcu_sr_normal_gp_init();
/* Record GP times before starting GP, hence rcu_seq_start(). */
+ old_gp_seq = rcu_state.gp_seq;
+ /*
+ * Critical ordering: rcu_seq_start() must happen BEFORE the CPU hotplug
+ * scan below. Otherwise we risk a race where a newly onlining CPU could
+ * be missed by the current grace period, potentially leading to
+ * use-after-free errors. For a detailed explanation of this race, see
+ * Documentation/RCU/Design/Requirements/Requirements.rst in the
+ * "Hotplug CPU" section.
+ *
+ * Also note that the root rnp's gp_seq is kept separate from, and lags,
+ * the rcu_state's gp_seq, for a reason. See the Quick-Quiz on
+ * Single-node systems for more details (in Data-Structures.rst).
+ */
rcu_seq_start(&rcu_state.gp_seq);
+ /* Ensure that rcu_seq_done_exact() guardband doesn't give false positives. */
+ WARN_ON_ONCE(IS_ENABLED(CONFIG_PROVE_RCU) &&
+ rcu_seq_done_exact(&old_gp_seq, rcu_seq_snap(&rcu_state.gp_seq)));
+
ASSERT_EXCLUSIVE_WRITER(rcu_state.gp_seq);
trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq, TPS("start"));
rcu_poll_gp_seq_start(&rcu_state.gp_seq_polled_snap);
raw_spin_unlock_irq_rcu_node(rnp);
/*
+ * The "start_new_poll" is set to true, only when this GP is not able
+ * to handle anything and there are outstanding users. It happens when
+ * the rcu_sr_normal_gp_init() function was not able to insert a dummy
+ * separator to the llist, because there were no left any dummy-nodes.
+ *
+ * Number of dummy-nodes is fixed, it could be that we are run out of
+ * them, if so we start a new pool request to repeat a try. It is rare
+ * and it means that a system is doing a slow processing of callbacks.
+ */
+ if (start_new_poll)
+ (void) start_poll_synchronize_rcu();
+
+ /*
* Apply per-leaf buffered online and offline operations to
* the rcu_node tree. Note that this new grace period need not
* wait for subsequent online CPUs, and that RCU hooks in the CPU
@@ -1477,7 +1884,11 @@ static noinline_for_stack bool rcu_gp_init(void)
WRITE_ONCE(rcu_state.gp_state, RCU_GP_ONOFF);
/* Exclude CPU hotplug operations. */
rcu_for_each_leaf_node(rnp) {
- local_irq_save(flags);
+ local_irq_disable();
+ /*
+ * Serialize with CPU offline. See Requirements.rst > Hotplug CPU >
+ * Concurrent Quiescent State Reporting for Offline CPUs.
+ */
arch_spin_lock(&rcu_state.ofl_lock);
raw_spin_lock_rcu_node(rnp);
if (rnp->qsmaskinit == rnp->qsmaskinitnext &&
@@ -1485,7 +1896,7 @@ static noinline_for_stack bool rcu_gp_init(void)
/* Nothing to do on this leaf rcu_node structure. */
raw_spin_unlock_rcu_node(rnp);
arch_spin_unlock(&rcu_state.ofl_lock);
- local_irq_restore(flags);
+ local_irq_enable();
continue;
}
@@ -1522,7 +1933,7 @@ static noinline_for_stack bool rcu_gp_init(void)
raw_spin_unlock_rcu_node(rnp);
arch_spin_unlock(&rcu_state.ofl_lock);
- local_irq_restore(flags);
+ local_irq_enable();
}
rcu_gp_slow(gp_preinit_delay); /* Races with CPU hotplug. */
@@ -1552,7 +1963,12 @@ static noinline_for_stack bool rcu_gp_init(void)
trace_rcu_grace_period_init(rcu_state.name, rnp->gp_seq,
rnp->level, rnp->grplo,
rnp->grphi, rnp->qsmask);
- /* Quiescent states for tasks on any now-offline CPUs. */
+ /*
+ * Quiescent states for tasks on any now-offline CPUs. Since we
+ * released the ofl and rnp lock before this loop, CPUs might
+ * have gone offline and we have to report QS on their behalf.
+ * See Requirements.rst > Hotplug CPU > Concurrent QS Reporting.
+ */
mask = rnp->qsmask & ~rnp->qsmaskinitnext;
rnp->rcu_gp_init_mask = mask;
if ((mask || rnp->wait_blkd_tasks) && rcu_is_leaf_node(rnp))
@@ -1599,22 +2015,33 @@ static bool rcu_gp_fqs_check_wake(int *gfp)
*/
static void rcu_gp_fqs(bool first_time)
{
+ int nr_fqs = READ_ONCE(rcu_state.nr_fqs_jiffies_stall);
struct rcu_node *rnp = rcu_get_root();
WRITE_ONCE(rcu_state.gp_activity, jiffies);
WRITE_ONCE(rcu_state.n_force_qs, rcu_state.n_force_qs + 1);
+
+ WARN_ON_ONCE(nr_fqs > 3);
+ /* Only countdown nr_fqs for stall purposes if jiffies moves. */
+ if (nr_fqs) {
+ if (nr_fqs == 1) {
+ WRITE_ONCE(rcu_state.jiffies_stall,
+ jiffies + rcu_jiffies_till_stall_check());
+ }
+ WRITE_ONCE(rcu_state.nr_fqs_jiffies_stall, --nr_fqs);
+ }
+
if (first_time) {
/* Collect dyntick-idle snapshots. */
- force_qs_rnp(dyntick_save_progress_counter);
+ force_qs_rnp(rcu_watching_snap_save);
} else {
/* Handle dyntick-idle and offline CPUs. */
- force_qs_rnp(rcu_implicit_dynticks_qs);
+ force_qs_rnp(rcu_watching_snap_recheck);
}
/* Clear flag to prevent immediate re-entry. */
if (READ_ONCE(rcu_state.gp_flags) & RCU_GP_FLAG_FQS) {
raw_spin_lock_irq_rcu_node(rnp);
- WRITE_ONCE(rcu_state.gp_flags,
- READ_ONCE(rcu_state.gp_flags) & ~RCU_GP_FLAG_FQS);
+ WRITE_ONCE(rcu_state.gp_flags, rcu_state.gp_flags & ~RCU_GP_FLAG_FQS);
raw_spin_unlock_irq_rcu_node(rnp);
}
}
@@ -1818,6 +2245,9 @@ static noinline void rcu_gp_cleanup(void)
}
raw_spin_unlock_irq_rcu_node(rnp);
+ // Make synchronize_rcu() users aware of the end of old grace period.
+ rcu_sr_normal_gp_cleanup();
+
// If strict, make all CPUs aware of the end of the old grace period.
if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
on_each_cpu(rcu_strict_gp_boundary, NULL, 0);
@@ -1875,8 +2305,7 @@ static void rcu_report_qs_rsp(unsigned long flags)
{
raw_lockdep_assert_held_rcu_node(rcu_get_root());
WARN_ON_ONCE(!rcu_gp_in_progress());
- WRITE_ONCE(rcu_state.gp_flags,
- READ_ONCE(rcu_state.gp_flags) | RCU_GP_FLAG_FQS);
+ WRITE_ONCE(rcu_state.gp_flags, rcu_state.gp_flags | RCU_GP_FLAG_FQS);
raw_spin_unlock_irqrestore_rcu_node(rcu_get_root(), flags);
rcu_gp_kthread_wake();
}
@@ -2003,8 +2432,6 @@ rcu_report_qs_rdp(struct rcu_data *rdp)
{
unsigned long flags;
unsigned long mask;
- bool needwake = false;
- bool needacc = false;
struct rcu_node *rnp;
WARN_ON_ONCE(rdp->cpu != smp_processor_id());
@@ -2035,26 +2462,17 @@ rcu_report_qs_rdp(struct rcu_data *rdp)
* NOCB kthreads have their own way to deal with that...
*/
if (!rcu_rdp_is_offloaded(rdp)) {
- needwake = rcu_accelerate_cbs(rnp, rdp);
- } else if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
/*
- * ...but NOCB kthreads may miss or delay callbacks acceleration
- * if in the middle of a (de-)offloading process.
+ * The current GP has not yet ended, so it
+ * should not be possible for rcu_accelerate_cbs()
+ * to return true. So complain, but don't awaken.
*/
- needacc = true;
+ WARN_ON_ONCE(rcu_accelerate_cbs(rnp, rdp));
}
rcu_disable_urgency_upon_qs(rdp);
rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
/* ^^^ Released rnp->lock */
- if (needwake)
- rcu_gp_kthread_wake();
-
- if (needacc) {
- rcu_nocb_lock_irqsave(rdp, flags);
- rcu_accelerate_cbs_unlocked(rnp, rdp);
- rcu_nocb_unlock_irqrestore(rdp, flags);
- }
}
}
@@ -2091,90 +2509,16 @@ rcu_check_quiescent_state(struct rcu_data *rdp)
rcu_report_qs_rdp(rdp);
}
-/*
- * Near the end of the offline process. Trace the fact that this CPU
- * is going offline.
- */
-int rcutree_dying_cpu(unsigned int cpu)
+/* Return true if callback-invocation time limit exceeded. */
+static bool rcu_do_batch_check_time(long count, long tlimit,
+ bool jlimit_check, unsigned long jlimit)
{
- bool blkd;
- struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
- struct rcu_node *rnp = rdp->mynode;
-
- if (!IS_ENABLED(CONFIG_HOTPLUG_CPU))
- return 0;
-
- blkd = !!(READ_ONCE(rnp->qsmask) & rdp->grpmask);
- trace_rcu_grace_period(rcu_state.name, READ_ONCE(rnp->gp_seq),
- blkd ? TPS("cpuofl-bgp") : TPS("cpuofl"));
- return 0;
-}
-
-/*
- * All CPUs for the specified rcu_node structure have gone offline,
- * and all tasks that were preempted within an RCU read-side critical
- * section while running on one of those CPUs have since exited their RCU
- * read-side critical section. Some other CPU is reporting this fact with
- * the specified rcu_node structure's ->lock held and interrupts disabled.
- * This function therefore goes up the tree of rcu_node structures,
- * clearing the corresponding bits in the ->qsmaskinit fields. Note that
- * the leaf rcu_node structure's ->qsmaskinit field has already been
- * updated.
- *
- * This function does check that the specified rcu_node structure has
- * all CPUs offline and no blocked tasks, so it is OK to invoke it
- * prematurely. That said, invoking it after the fact will cost you
- * a needless lock acquisition. So once it has done its work, don't
- * invoke it again.
- */
-static void rcu_cleanup_dead_rnp(struct rcu_node *rnp_leaf)
-{
- long mask;
- struct rcu_node *rnp = rnp_leaf;
-
- raw_lockdep_assert_held_rcu_node(rnp_leaf);
- if (!IS_ENABLED(CONFIG_HOTPLUG_CPU) ||
- WARN_ON_ONCE(rnp_leaf->qsmaskinit) ||
- WARN_ON_ONCE(rcu_preempt_has_tasks(rnp_leaf)))
- return;
- for (;;) {
- mask = rnp->grpmask;
- rnp = rnp->parent;
- if (!rnp)
- break;
- raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
- rnp->qsmaskinit &= ~mask;
- /* Between grace periods, so better already be zero! */
- WARN_ON_ONCE(rnp->qsmask);
- if (rnp->qsmaskinit) {
- raw_spin_unlock_rcu_node(rnp);
- /* irqs remain disabled. */
- return;
- }
- raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
- }
-}
-
-/*
- * The CPU has been completely removed, and some other CPU is reporting
- * this fact from process context. Do the remainder of the cleanup.
- * There can only be one CPU hotplug operation at a time, so no need for
- * explicit locking.
- */
-int rcutree_dead_cpu(unsigned int cpu)
-{
- struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
- struct rcu_node *rnp = rdp->mynode; /* Outgoing CPU's rdp & rnp. */
-
- if (!IS_ENABLED(CONFIG_HOTPLUG_CPU))
- return 0;
-
- WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus - 1);
- /* Adjust any no-longer-needed kthreads. */
- rcu_boost_kthread_setaffinity(rnp, -1);
- // Stop-machine done, so allow nohz_full to disable tick.
- tick_dep_clear(TICK_DEP_BIT_RCU);
- return 0;
+ // Invoke local_clock() only once per 32 consecutive callbacks.
+ return unlikely(tlimit) &&
+ (!likely(count & 31) ||
+ (IS_ENABLED(CONFIG_RCU_DOUBLE_CHECK_CB_TIME) &&
+ jlimit_check && time_after(jiffies, jlimit))) &&
+ local_clock() >= tlimit;
}
/*
@@ -2183,13 +2527,17 @@ int rcutree_dead_cpu(unsigned int cpu)
*/
static void rcu_do_batch(struct rcu_data *rdp)
{
+ long bl;
+ long count = 0;
int div;
bool __maybe_unused empty;
unsigned long flags;
- struct rcu_head *rhp;
+ unsigned long jlimit;
+ bool jlimit_check = false;
+ long pending;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
- long bl, count = 0;
- long pending, tlimit = 0;
+ struct rcu_head *rhp;
+ long tlimit = 0;
/* If no callbacks are ready, just return. */
if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
@@ -2206,18 +2554,28 @@ static void rcu_do_batch(struct rcu_data *rdp)
* Extract the list of ready callbacks, disabling IRQs to prevent
* races with call_rcu() from interrupt handlers. Leave the
* callback counts, as rcu_barrier() needs to be conservative.
+ *
+ * Callbacks execution is fully ordered against preceding grace period
+ * completion (materialized by rnp->gp_seq update) thanks to the
+ * smp_mb__after_unlock_lock() upon node locking required for callbacks
+ * advancing. In NOCB mode this ordering is then further relayed through
+ * the nocb locking that protects both callbacks advancing and extraction.
*/
rcu_nocb_lock_irqsave(rdp, flags);
WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
- pending = rcu_segcblist_n_cbs(&rdp->cblist);
+ pending = rcu_segcblist_get_seglen(&rdp->cblist, RCU_DONE_TAIL);
div = READ_ONCE(rcu_divisor);
div = div < 0 ? 7 : div > sizeof(long) * 8 - 2 ? sizeof(long) * 8 - 2 : div;
bl = max(rdp->blimit, pending >> div);
- if (in_serving_softirq() && unlikely(bl > 100)) {
+ if ((in_serving_softirq() || rdp->rcu_cpu_kthread_status == RCU_KTHREAD_RUNNING) &&
+ (IS_ENABLED(CONFIG_RCU_DOUBLE_CHECK_CB_TIME) || unlikely(bl > 100))) {
+ const long npj = NSEC_PER_SEC / HZ;
long rrn = READ_ONCE(rcu_resched_ns);
rrn = rrn < NSEC_PER_MSEC ? NSEC_PER_MSEC : rrn > NSEC_PER_SEC ? NSEC_PER_SEC : rrn;
tlimit = local_clock() + rrn;
+ jlimit = jiffies + (rrn + npj + 1) / npj;
+ jlimit_check = true;
}
trace_rcu_batch_start(rcu_state.name,
rcu_segcblist_n_cbs(&rdp->cblist), bl);
@@ -2242,6 +2600,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
trace_rcu_invoke_callback(rcu_state.name, rhp);
f = rhp->func;
+ debug_rcu_head_callback(rhp);
WRITE_ONCE(rhp->func, (rcu_callback_t)0L);
f(rhp);
@@ -2257,19 +2616,23 @@ static void rcu_do_batch(struct rcu_data *rdp)
* Make sure we don't spend too much time here and deprive other
* softirq vectors of CPU cycles.
*/
- if (unlikely(tlimit)) {
- /* only call local_clock() every 32 callbacks */
- if (likely((count & 31) || local_clock() < tlimit))
- continue;
- /* Exceeded the time limit, so leave. */
+ if (rcu_do_batch_check_time(count, tlimit, jlimit_check, jlimit))
break;
- }
} else {
+ // In rcuc/rcuoc context, so no worries about
+ // depriving other softirq vectors of CPU cycles.
local_bh_enable();
lockdep_assert_irqs_enabled();
cond_resched_tasks_rcu_qs();
lockdep_assert_irqs_enabled();
local_bh_disable();
+ // But rcuc kthreads can delay quiescent-state
+ // reporting, so check time limits for them.
+ if (rdp->rcu_cpu_kthread_status == RCU_KTHREAD_RUNNING &&
+ rcu_do_batch_check_time(count, tlimit, jlimit_check, jlimit)) {
+ rdp->rcu_cpu_has_work = 1;
+ break;
+ }
}
}
@@ -2333,10 +2696,8 @@ void rcu_sched_clock_irq(int user)
/* The load-acquire pairs with the store-release setting to true. */
if (smp_load_acquire(this_cpu_ptr(&rcu_data.rcu_urgent_qs))) {
/* Idle and userspace execution already are quiescent states. */
- if (!rcu_is_cpu_rrupt_from_idle() && !user) {
- set_tsk_need_resched(current);
- set_preempt_need_resched();
- }
+ if (!rcu_is_cpu_rrupt_from_idle() && !user)
+ set_need_resched_current();
__this_cpu_write(rcu_data.rcu_urgent_qs, false);
}
rcu_flavor_sched_clock_irq(user);
@@ -2360,15 +2721,15 @@ static void force_qs_rnp(int (*f)(struct rcu_data *rdp))
{
int cpu;
unsigned long flags;
- unsigned long mask;
- struct rcu_data *rdp;
struct rcu_node *rnp;
rcu_state.cbovld = rcu_state.cbovldnext;
rcu_state.cbovldnext = false;
rcu_for_each_leaf_node(rnp) {
+ unsigned long mask = 0;
+ unsigned long rsmask = 0;
+
cond_resched_tasks_rcu_qs();
- mask = 0;
raw_spin_lock_irqsave_rcu_node(rnp, flags);
rcu_state.cbovldnext |= !!rnp->cbovldmask;
if (rnp->qsmask == 0) {
@@ -2386,11 +2747,17 @@ static void force_qs_rnp(int (*f)(struct rcu_data *rdp))
continue;
}
for_each_leaf_node_cpu_mask(rnp, cpu, rnp->qsmask) {
+ struct rcu_data *rdp;
+ int ret;
+
rdp = per_cpu_ptr(&rcu_data, cpu);
- if (f(rdp)) {
+ ret = f(rdp);
+ if (ret > 0) {
mask |= rdp->grpmask;
rcu_disable_urgency_upon_qs(rdp);
}
+ if (ret < 0)
+ rsmask |= rdp->grpmask;
}
if (mask != 0) {
/* Idle/offline CPUs, report (releases rnp->lock). */
@@ -2399,6 +2766,9 @@ static void force_qs_rnp(int (*f)(struct rcu_data *rdp))
/* Nothing to do here, so just drop the lock. */
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
}
+
+ for_each_leaf_node_cpu_mask(rnp, cpu, rsmask)
+ resched_cpu(cpu);
}
}
@@ -2413,6 +2783,8 @@ void rcu_force_quiescent_state(void)
struct rcu_node *rnp;
struct rcu_node *rnp_old = NULL;
+ if (!rcu_gp_in_progress())
+ return;
/* Funnel through hierarchy to reduce memory contention. */
rnp = raw_cpu_read(rcu_data.mynode);
for (; rnp != NULL; rnp = rnp->parent) {
@@ -2433,8 +2805,7 @@ void rcu_force_quiescent_state(void)
raw_spin_unlock_irqrestore_rcu_node(rnp_old, flags);
return; /* Someone beat us to it. */
}
- WRITE_ONCE(rcu_state.gp_flags,
- READ_ONCE(rcu_state.gp_flags) | RCU_GP_FLAG_FQS);
+ WRITE_ONCE(rcu_state.gp_flags, rcu_state.gp_flags | RCU_GP_FLAG_FQS);
raw_spin_unlock_irqrestore_rcu_node(rnp_old, flags);
rcu_gp_kthread_wake();
}
@@ -2451,27 +2822,8 @@ static void strict_work_handler(struct work_struct *work)
/* Perform RCU core processing work for the current CPU. */
static __latent_entropy void rcu_core(void)
{
- unsigned long flags;
struct rcu_data *rdp = raw_cpu_ptr(&rcu_data);
struct rcu_node *rnp = rdp->mynode;
- /*
- * On RT rcu_core() can be preempted when IRQs aren't disabled.
- * Therefore this function can race with concurrent NOCB (de-)offloading
- * on this CPU and the below condition must be considered volatile.
- * However if we race with:
- *
- * _ Offloading: In the worst case we accelerate or process callbacks
- * concurrently with NOCB kthreads. We are guaranteed to
- * call rcu_nocb_lock() if that happens.
- *
- * _ Deoffloading: In the worst case we miss callbacks acceleration or
- * processing. This is fine because the early stage
- * of deoffloading invokes rcu_core() after setting
- * SEGCBLIST_RCU_CORE. So we guarantee that we'll process
- * what could have been dismissed without the need to wait
- * for the next rcu_pending() check in the next jiffy.
- */
- const bool do_batch = !rcu_segcblist_completely_offloaded(&rdp->cblist);
if (cpu_is_offline(smp_processor_id()))
return;
@@ -2482,8 +2834,8 @@ static __latent_entropy void rcu_core(void)
if (IS_ENABLED(CONFIG_PREEMPT_COUNT) && (!(preempt_count() & PREEMPT_MASK))) {
rcu_preempt_deferred_qs(current);
} else if (rcu_preempt_need_deferred_qs(current)) {
- set_tsk_need_resched(current);
- set_preempt_need_resched();
+ guard(irqsave)();
+ set_need_resched_current();
}
/* Update RCU state based on any recent quiescent states. */
@@ -2491,17 +2843,16 @@ static __latent_entropy void rcu_core(void)
/* No grace period and unregistered callbacks? */
if (!rcu_gp_in_progress() &&
- rcu_segcblist_is_enabled(&rdp->cblist) && do_batch) {
- rcu_nocb_lock_irqsave(rdp, flags);
+ rcu_segcblist_is_enabled(&rdp->cblist) && !rcu_rdp_is_offloaded(rdp)) {
+ guard(irqsave)();
if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
rcu_accelerate_cbs_unlocked(rnp, rdp);
- rcu_nocb_unlock_irqrestore(rdp, flags);
}
rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check());
/* If there are callbacks ready, invoke them. */
- if (do_batch && rcu_segcblist_ready_cbs(&rdp->cblist) &&
+ if (!rcu_rdp_is_offloaded(rdp) && rcu_segcblist_ready_cbs(&rdp->cblist) &&
likely(READ_ONCE(rcu_scheduler_fully_active))) {
rcu_do_batch(rdp);
/* Re-invoke RCU core processing if there are callbacks remaining. */
@@ -2518,7 +2869,7 @@ static __latent_entropy void rcu_core(void)
queue_work_on(rdp->cpu, rcu_gp_wq, &rdp->strict_work);
}
-static void rcu_core_si(struct softirq_action *h)
+static void rcu_core_si(void)
{
rcu_core();
}
@@ -2588,12 +2939,12 @@ static void rcu_cpu_kthread(unsigned int cpu)
*statusp = RCU_KTHREAD_RUNNING;
local_irq_disable();
work = *workp;
- *workp = 0;
+ WRITE_ONCE(*workp, 0);
local_irq_enable();
if (work)
rcu_core();
local_bh_enable();
- if (*workp == 0) {
+ if (!READ_ONCE(*workp)) {
trace_rcu_utilization(TPS("End CPU kthread@rcu_wait"));
*statusp = RCU_KTHREAD_WAITING;
return;
@@ -2632,12 +2983,21 @@ static int __init rcu_spawn_core_kthreads(void)
return 0;
}
+static void rcutree_enqueue(struct rcu_data *rdp, struct rcu_head *head, rcu_callback_t func)
+{
+ rcu_segcblist_enqueue(&rdp->cblist, head);
+ trace_rcu_callback(rcu_state.name, head,
+ rcu_segcblist_n_cbs(&rdp->cblist));
+ trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
+}
+
/*
* Handle any core-RCU processing required by a call_rcu() invocation.
*/
-static void __call_rcu_core(struct rcu_data *rdp, struct rcu_head *head,
- unsigned long flags)
+static void call_rcu_core(struct rcu_data *rdp, struct rcu_head *head,
+ rcu_callback_t func, unsigned long flags)
{
+ rcutree_enqueue(rdp, head, func);
/*
* If called from an extended quiescent state, invoke the RCU
* core in order to force a re-evaluation of RCU's idleness.
@@ -2727,16 +3087,20 @@ static void check_cb_ovld(struct rcu_data *rdp)
}
static void
-__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
+__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in)
{
static atomic_t doublefrees;
unsigned long flags;
+ bool lazy;
struct rcu_data *rdp;
- bool was_alldone;
/* Misaligned rcu_head! */
WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
+ /* Avoid NULL dereference if callback is NULL. */
+ if (WARN_ON_ONCE(!func))
+ return;
+
if (debug_rcu_head_queue(head)) {
/*
* Probable double call_rcu(), so leak the callback.
@@ -2752,9 +3116,13 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
}
head->func = func;
head->next = NULL;
- kasan_record_aux_stack_noalloc(head);
+ kasan_record_aux_stack(head);
+
local_irq_save(flags);
rdp = this_cpu_ptr(&rcu_data);
+ RCU_LOCKDEP_WARN(!rcu_rdp_cpu_online(rdp), "Callback enqueued on offline CPU!");
+
+ lazy = lazy_in && !rcu_async_should_hurry();
/* Add the callback to our list. */
if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
@@ -2768,30 +3136,18 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
}
check_cb_ovld(rdp);
- if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
- return; // Enqueued onto ->nocb_bypass, so just leave.
- // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
- rcu_segcblist_enqueue(&rdp->cblist, head);
- if (__is_kvfree_rcu_offset((unsigned long)func))
- trace_rcu_kvfree_callback(rcu_state.name, head,
- (unsigned long)func,
- rcu_segcblist_n_cbs(&rdp->cblist));
- else
- trace_rcu_callback(rcu_state.name, head,
- rcu_segcblist_n_cbs(&rdp->cblist));
-
- trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
- /* Go handle any RCU core processing required. */
- if (unlikely(rcu_rdp_is_offloaded(rdp))) {
- __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
- } else {
- __call_rcu_core(rdp, head, flags);
- local_irq_restore(flags);
- }
+ if (unlikely(rcu_rdp_is_offloaded(rdp)))
+ call_rcu_nocb(rdp, head, func, flags, lazy);
+ else
+ call_rcu_core(rdp, head, func, flags);
+ local_irq_restore(flags);
}
#ifdef CONFIG_RCU_LAZY
+static bool enable_rcu_lazy __read_mostly = !IS_ENABLED(CONFIG_RCU_LAZY_DEFAULT_OFF);
+module_param(enable_rcu_lazy, bool, 0444);
+
/**
* call_rcu_hurry() - Queue RCU callback for invocation after grace period, and
* flush all lazy callbacks (including the new one) to the main ->cblist while
@@ -2805,7 +3161,7 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
* critical sections have completed.
*
* Use this API instead of call_rcu() if you don't want the callback to be
- * invoked after very long periods of time, which can happen on systems without
+ * delayed for very long periods of time, which can happen on systems without
* memory pressure and on systems which are lightly loaded or mostly idle.
* This function will cause callbacks to be invoked sooner than later at the
* expense of extra power. Other than that, this function is identical to, and
@@ -2814,9 +3170,11 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
*/
void call_rcu_hurry(struct rcu_head *head, rcu_callback_t func)
{
- return __call_rcu_common(head, func, false);
+ __call_rcu_common(head, func, false);
}
EXPORT_SYMBOL_GPL(call_rcu_hurry);
+#else
+#define enable_rcu_lazy false
#endif
/**
@@ -2834,6 +3192,12 @@ EXPORT_SYMBOL_GPL(call_rcu_hurry);
* might well execute concurrently with RCU read-side critical sections
* that started after call_rcu() was invoked.
*
+ * It is perfectly legal to repost an RCU callback, potentially with
+ * a different callback function, from within its callback function.
+ * The specified function will be invoked after another full grace period
+ * has elapsed. This use case is similar in form to the common practice
+ * of reposting a timer from within its own handler.
+ *
* RCU read-side critical sections are delimited by rcu_read_lock()
* and rcu_read_unlock(), and may be nested. In addition, but only in
* v5.0 and later, regions of code across which interrupts, preemption,
@@ -2862,633 +3226,74 @@ EXPORT_SYMBOL_GPL(call_rcu_hurry);
*
* Implementation of these memory-ordering guarantees is described here:
* Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
+ *
+ * Specific to call_rcu() (as opposed to the other call_rcu*() functions),
+ * in kernels built with CONFIG_RCU_LAZY=y, call_rcu() might delay for many
+ * seconds before starting the grace period needed by the corresponding
+ * callback. This delay can significantly improve energy-efficiency
+ * on low-utilization battery-powered devices. To avoid this delay,
+ * in latency-sensitive kernel code, use call_rcu_hurry().
*/
void call_rcu(struct rcu_head *head, rcu_callback_t func)
{
- return __call_rcu_common(head, func, IS_ENABLED(CONFIG_RCU_LAZY));
+ __call_rcu_common(head, func, enable_rcu_lazy);
}
EXPORT_SYMBOL_GPL(call_rcu);
-/* Maximum number of jiffies to wait before draining a batch. */
-#define KFREE_DRAIN_JIFFIES (5 * HZ)
-#define KFREE_N_BATCHES 2
-#define FREE_N_CHANNELS 2
-
-/**
- * struct kvfree_rcu_bulk_data - single block to store kvfree_rcu() pointers
- * @nr_records: Number of active pointers in the array
- * @next: Next bulk object in the block chain
- * @records: Array of the kvfree_rcu() pointers
- */
-struct kvfree_rcu_bulk_data {
- unsigned long nr_records;
- struct kvfree_rcu_bulk_data *next;
- void *records[];
-};
-
/*
- * This macro defines how many entries the "records" array
- * will contain. It is based on the fact that the size of
- * kvfree_rcu_bulk_data structure becomes exactly one page.
- */
-#define KVFREE_BULK_MAX_ENTR \
- ((PAGE_SIZE - sizeof(struct kvfree_rcu_bulk_data)) / sizeof(void *))
-
-/**
- * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
- * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
- * @head_free: List of kfree_rcu() objects waiting for a grace period
- * @bkvhead_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
- * @krcp: Pointer to @kfree_rcu_cpu structure
- */
-
-struct kfree_rcu_cpu_work {
- struct rcu_work rcu_work;
- struct rcu_head *head_free;
- struct kvfree_rcu_bulk_data *bkvhead_free[FREE_N_CHANNELS];
- struct kfree_rcu_cpu *krcp;
-};
-
-/**
- * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
- * @head: List of kfree_rcu() objects not yet waiting for a grace period
- * @bkvhead: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
- * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
- * @lock: Synchronize access to this structure
- * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
- * @initialized: The @rcu_work fields have been initialized
- * @count: Number of objects for which GP not started
- * @bkvcache:
- * A simple cache list that contains objects for reuse purpose.
- * In order to save some per-cpu space the list is singular.
- * Even though it is lockless an access has to be protected by the
- * per-cpu lock.
- * @page_cache_work: A work to refill the cache when it is empty
- * @backoff_page_cache_fill: Delay cache refills
- * @work_in_progress: Indicates that page_cache_work is running
- * @hrtimer: A hrtimer for scheduling a page_cache_work
- * @nr_bkv_objs: number of allocated objects at @bkvcache.
+ * During early boot, any blocking grace-period wait automatically
+ * implies a grace period.
*
- * This is a per-CPU structure. The reason that it is not included in
- * the rcu_data structure is to permit this code to be extracted from
- * the RCU files. Such extraction could allow further optimization of
- * the interactions with the slab allocators.
- */
-struct kfree_rcu_cpu {
- struct rcu_head *head;
- struct kvfree_rcu_bulk_data *bkvhead[FREE_N_CHANNELS];
- struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
- raw_spinlock_t lock;
- struct delayed_work monitor_work;
- bool initialized;
- int count;
-
- struct delayed_work page_cache_work;
- atomic_t backoff_page_cache_fill;
- atomic_t work_in_progress;
- struct hrtimer hrtimer;
-
- struct llist_head bkvcache;
- int nr_bkv_objs;
-};
-
-static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc) = {
- .lock = __RAW_SPIN_LOCK_UNLOCKED(krc.lock),
-};
-
-static __always_inline void
-debug_rcu_bhead_unqueue(struct kvfree_rcu_bulk_data *bhead)
-{
-#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
- int i;
-
- for (i = 0; i < bhead->nr_records; i++)
- debug_rcu_head_unqueue((struct rcu_head *)(bhead->records[i]));
-#endif
-}
-
-static inline struct kfree_rcu_cpu *
-krc_this_cpu_lock(unsigned long *flags)
-{
- struct kfree_rcu_cpu *krcp;
-
- local_irq_save(*flags); // For safely calling this_cpu_ptr().
- krcp = this_cpu_ptr(&krc);
- raw_spin_lock(&krcp->lock);
-
- return krcp;
-}
-
-static inline void
-krc_this_cpu_unlock(struct kfree_rcu_cpu *krcp, unsigned long flags)
-{
- raw_spin_unlock_irqrestore(&krcp->lock, flags);
-}
-
-static inline struct kvfree_rcu_bulk_data *
-get_cached_bnode(struct kfree_rcu_cpu *krcp)
-{
- if (!krcp->nr_bkv_objs)
- return NULL;
-
- WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs - 1);
- return (struct kvfree_rcu_bulk_data *)
- llist_del_first(&krcp->bkvcache);
-}
-
-static inline bool
-put_cached_bnode(struct kfree_rcu_cpu *krcp,
- struct kvfree_rcu_bulk_data *bnode)
-{
- // Check the limit.
- if (krcp->nr_bkv_objs >= rcu_min_cached_objs)
- return false;
-
- llist_add((struct llist_node *) bnode, &krcp->bkvcache);
- WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs + 1);
- return true;
-}
-
-static int
-drain_page_cache(struct kfree_rcu_cpu *krcp)
-{
- unsigned long flags;
- struct llist_node *page_list, *pos, *n;
- int freed = 0;
-
- raw_spin_lock_irqsave(&krcp->lock, flags);
- page_list = llist_del_all(&krcp->bkvcache);
- WRITE_ONCE(krcp->nr_bkv_objs, 0);
- raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
- llist_for_each_safe(pos, n, page_list) {
- free_page((unsigned long)pos);
- freed++;
- }
-
- return freed;
-}
-
-/*
- * This function is invoked in workqueue context after a grace period.
- * It frees all the objects queued on ->bkvhead_free or ->head_free.
- */
-static void kfree_rcu_work(struct work_struct *work)
-{
- unsigned long flags;
- struct kvfree_rcu_bulk_data *bkvhead[FREE_N_CHANNELS], *bnext;
- struct rcu_head *head, *next;
- struct kfree_rcu_cpu *krcp;
- struct kfree_rcu_cpu_work *krwp;
- int i, j;
-
- krwp = container_of(to_rcu_work(work),
- struct kfree_rcu_cpu_work, rcu_work);
- krcp = krwp->krcp;
-
- raw_spin_lock_irqsave(&krcp->lock, flags);
- // Channels 1 and 2.
- for (i = 0; i < FREE_N_CHANNELS; i++) {
- bkvhead[i] = krwp->bkvhead_free[i];
- krwp->bkvhead_free[i] = NULL;
- }
-
- // Channel 3.
- head = krwp->head_free;
- krwp->head_free = NULL;
- raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
- // Handle the first two channels.
- for (i = 0; i < FREE_N_CHANNELS; i++) {
- for (; bkvhead[i]; bkvhead[i] = bnext) {
- bnext = bkvhead[i]->next;
- debug_rcu_bhead_unqueue(bkvhead[i]);
-
- rcu_lock_acquire(&rcu_callback_map);
- if (i == 0) { // kmalloc() / kfree().
- trace_rcu_invoke_kfree_bulk_callback(
- rcu_state.name, bkvhead[i]->nr_records,
- bkvhead[i]->records);
-
- kfree_bulk(bkvhead[i]->nr_records,
- bkvhead[i]->records);
- } else { // vmalloc() / vfree().
- for (j = 0; j < bkvhead[i]->nr_records; j++) {
- trace_rcu_invoke_kvfree_callback(
- rcu_state.name,
- bkvhead[i]->records[j], 0);
-
- vfree(bkvhead[i]->records[j]);
- }
- }
- rcu_lock_release(&rcu_callback_map);
-
- raw_spin_lock_irqsave(&krcp->lock, flags);
- if (put_cached_bnode(krcp, bkvhead[i]))
- bkvhead[i] = NULL;
- raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
- if (bkvhead[i])
- free_page((unsigned long) bkvhead[i]);
-
- cond_resched_tasks_rcu_qs();
- }
- }
-
- /*
- * This is used when the "bulk" path can not be used for the
- * double-argument of kvfree_rcu(). This happens when the
- * page-cache is empty, which means that objects are instead
- * queued on a linked list through their rcu_head structures.
- * This list is named "Channel 3".
- */
- for (; head; head = next) {
- unsigned long offset = (unsigned long)head->func;
- void *ptr = (void *)head - offset;
-
- next = head->next;
- debug_rcu_head_unqueue((struct rcu_head *)ptr);
- rcu_lock_acquire(&rcu_callback_map);
- trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
-
- if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
- kvfree(ptr);
-
- rcu_lock_release(&rcu_callback_map);
- cond_resched_tasks_rcu_qs();
- }
-}
-
-static bool
-need_offload_krc(struct kfree_rcu_cpu *krcp)
-{
- int i;
-
- for (i = 0; i < FREE_N_CHANNELS; i++)
- if (krcp->bkvhead[i])
- return true;
-
- return !!krcp->head;
-}
-
-static void
-schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
-{
- long delay, delay_left;
-
- delay = READ_ONCE(krcp->count) >= KVFREE_BULK_MAX_ENTR ? 1:KFREE_DRAIN_JIFFIES;
- if (delayed_work_pending(&krcp->monitor_work)) {
- delay_left = krcp->monitor_work.timer.expires - jiffies;
- if (delay < delay_left)
- mod_delayed_work(system_wq, &krcp->monitor_work, delay);
- return;
- }
- queue_delayed_work(system_wq, &krcp->monitor_work, delay);
-}
-
-/*
- * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
+ * Later on, this could in theory be the case for kernels built with
+ * CONFIG_SMP=y && CONFIG_PREEMPTION=y running on a single CPU, but this
+ * is not a common case. Furthermore, this optimization would cause
+ * the rcu_gp_oldstate structure to expand by 50%, so this potential
+ * grace-period optimization is ignored once the scheduler is running.
*/
-static void kfree_rcu_monitor(struct work_struct *work)
-{
- struct kfree_rcu_cpu *krcp = container_of(work,
- struct kfree_rcu_cpu, monitor_work.work);
- unsigned long flags;
- int i, j;
-
- raw_spin_lock_irqsave(&krcp->lock, flags);
-
- // Attempt to start a new batch.
- for (i = 0; i < KFREE_N_BATCHES; i++) {
- struct kfree_rcu_cpu_work *krwp = &(krcp->krw_arr[i]);
-
- // Try to detach bkvhead or head and attach it over any
- // available corresponding free channel. It can be that
- // a previous RCU batch is in progress, it means that
- // immediately to queue another one is not possible so
- // in that case the monitor work is rearmed.
- if ((krcp->bkvhead[0] && !krwp->bkvhead_free[0]) ||
- (krcp->bkvhead[1] && !krwp->bkvhead_free[1]) ||
- (krcp->head && !krwp->head_free)) {
- // Channel 1 corresponds to the SLAB-pointer bulk path.
- // Channel 2 corresponds to vmalloc-pointer bulk path.
- for (j = 0; j < FREE_N_CHANNELS; j++) {
- if (!krwp->bkvhead_free[j]) {
- krwp->bkvhead_free[j] = krcp->bkvhead[j];
- krcp->bkvhead[j] = NULL;
- }
- }
-
- // Channel 3 corresponds to both SLAB and vmalloc
- // objects queued on the linked list.
- if (!krwp->head_free) {
- krwp->head_free = krcp->head;
- krcp->head = NULL;
- }
-
- WRITE_ONCE(krcp->count, 0);
-
- // One work is per one batch, so there are three
- // "free channels", the batch can handle. It can
- // be that the work is in the pending state when
- // channels have been detached following by each
- // other.
- queue_rcu_work(system_wq, &krwp->rcu_work);
- }
- }
-
- // If there is nothing to detach, it means that our job is
- // successfully done here. In case of having at least one
- // of the channels that is still busy we should rearm the
- // work to repeat an attempt. Because previous batches are
- // still in progress.
- if (need_offload_krc(krcp))
- schedule_delayed_monitor_work(krcp);
-
- raw_spin_unlock_irqrestore(&krcp->lock, flags);
-}
-
-static enum hrtimer_restart
-schedule_page_work_fn(struct hrtimer *t)
-{
- struct kfree_rcu_cpu *krcp =
- container_of(t, struct kfree_rcu_cpu, hrtimer);
-
- queue_delayed_work(system_highpri_wq, &krcp->page_cache_work, 0);
- return HRTIMER_NORESTART;
-}
-
-static void fill_page_cache_func(struct work_struct *work)
-{
- struct kvfree_rcu_bulk_data *bnode;
- struct kfree_rcu_cpu *krcp =
- container_of(work, struct kfree_rcu_cpu,
- page_cache_work.work);
- unsigned long flags;
- int nr_pages;
- bool pushed;
- int i;
-
- nr_pages = atomic_read(&krcp->backoff_page_cache_fill) ?
- 1 : rcu_min_cached_objs;
-
- for (i = 0; i < nr_pages; i++) {
- bnode = (struct kvfree_rcu_bulk_data *)
- __get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
-
- if (!bnode)
- break;
-
- raw_spin_lock_irqsave(&krcp->lock, flags);
- pushed = put_cached_bnode(krcp, bnode);
- raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
- if (!pushed) {
- free_page((unsigned long) bnode);
- break;
- }
- }
-
- atomic_set(&krcp->work_in_progress, 0);
- atomic_set(&krcp->backoff_page_cache_fill, 0);
-}
-
-static void
-run_page_cache_worker(struct kfree_rcu_cpu *krcp)
-{
- if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
- !atomic_xchg(&krcp->work_in_progress, 1)) {
- if (atomic_read(&krcp->backoff_page_cache_fill)) {
- queue_delayed_work(system_wq,
- &krcp->page_cache_work,
- msecs_to_jiffies(rcu_delay_page_cache_fill_msec));
- } else {
- hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
- krcp->hrtimer.function = schedule_page_work_fn;
- hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
- }
- }
-}
-
-// Record ptr in a page managed by krcp, with the pre-krc_this_cpu_lock()
-// state specified by flags. If can_alloc is true, the caller must
-// be schedulable and not be holding any locks or mutexes that might be
-// acquired by the memory allocator or anything that it might invoke.
-// Returns true if ptr was successfully recorded, else the caller must
-// use a fallback.
-static inline bool
-add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp,
- unsigned long *flags, void *ptr, bool can_alloc)
+static int rcu_blocking_is_gp(void)
{
- struct kvfree_rcu_bulk_data *bnode;
- int idx;
-
- *krcp = krc_this_cpu_lock(flags);
- if (unlikely(!(*krcp)->initialized))
+ if (rcu_scheduler_active != RCU_SCHEDULER_INACTIVE) {
+ might_sleep();
return false;
-
- idx = !!is_vmalloc_addr(ptr);
-
- /* Check if a new block is required. */
- if (!(*krcp)->bkvhead[idx] ||
- (*krcp)->bkvhead[idx]->nr_records == KVFREE_BULK_MAX_ENTR) {
- bnode = get_cached_bnode(*krcp);
- if (!bnode && can_alloc) {
- krc_this_cpu_unlock(*krcp, *flags);
-
- // __GFP_NORETRY - allows a light-weight direct reclaim
- // what is OK from minimizing of fallback hitting point of
- // view. Apart of that it forbids any OOM invoking what is
- // also beneficial since we are about to release memory soon.
- //
- // __GFP_NOMEMALLOC - prevents from consuming of all the
- // memory reserves. Please note we have a fallback path.
- //
- // __GFP_NOWARN - it is supposed that an allocation can
- // be failed under low memory or high memory pressure
- // scenarios.
- bnode = (struct kvfree_rcu_bulk_data *)
- __get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
- *krcp = krc_this_cpu_lock(flags);
- }
-
- if (!bnode)
- return false;
-
- /* Initialize the new block. */
- bnode->nr_records = 0;
- bnode->next = (*krcp)->bkvhead[idx];
-
- /* Attach it to the head. */
- (*krcp)->bkvhead[idx] = bnode;
}
-
- /* Finally insert. */
- (*krcp)->bkvhead[idx]->records
- [(*krcp)->bkvhead[idx]->nr_records++] = ptr;
-
return true;
}
/*
- * Queue a request for lazy invocation of the appropriate free routine
- * after a grace period. Please note that three paths are maintained,
- * two for the common case using arrays of pointers and a third one that
- * is used only when the main paths cannot be used, for example, due to
- * memory pressure.
- *
- * Each kvfree_call_rcu() request is added to a batch. The batch will be drained
- * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
- * be free'd in workqueue context. This allows us to: batch requests together to
- * reduce the number of grace periods during heavy kfree_rcu()/kvfree_rcu() load.
+ * Helper function for the synchronize_rcu() API.
*/
-void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+static void synchronize_rcu_normal(void)
{
- unsigned long flags;
- struct kfree_rcu_cpu *krcp;
- bool success;
- void *ptr;
+ struct rcu_synchronize rs;
- if (head) {
- ptr = (void *) head - (unsigned long) func;
- } else {
- /*
- * Please note there is a limitation for the head-less
- * variant, that is why there is a clear rule for such
- * objects: it can be used from might_sleep() context
- * only. For other places please embed an rcu_head to
- * your data.
- */
- might_sleep();
- ptr = (unsigned long *) func;
- }
-
- // Queue the object but don't yet schedule the batch.
- if (debug_rcu_head_queue(ptr)) {
- // Probable double kfree_rcu(), just leak.
- WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
- __func__, head);
+ trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("request"));
- // Mark as success and leave.
- return;
+ if (READ_ONCE(rcu_normal_wake_from_gp) < 1) {
+ wait_rcu_gp(call_rcu_hurry);
+ goto trace_complete_out;
}
- kasan_record_aux_stack_noalloc(ptr);
- success = add_ptr_to_bulk_krc_lock(&krcp, &flags, ptr, !head);
- if (!success) {
- run_page_cache_worker(krcp);
-
- if (head == NULL)
- // Inline if kvfree_rcu(one_arg) call.
- goto unlock_return;
-
- head->func = func;
- head->next = krcp->head;
- krcp->head = head;
- success = true;
- }
-
- WRITE_ONCE(krcp->count, krcp->count + 1);
-
- // Set timer to drain after KFREE_DRAIN_JIFFIES.
- if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING)
- schedule_delayed_monitor_work(krcp);
-
-unlock_return:
- krc_this_cpu_unlock(krcp, flags);
+ init_rcu_head_on_stack(&rs.head);
+ init_completion(&rs.completion);
/*
- * Inline kvfree() after synchronize_rcu(). We can do
- * it from might_sleep() context only, so the current
- * CPU can pass the QS state.
+ * This code might be preempted, therefore take a GP
+ * snapshot before adding a request.
*/
- if (!success) {
- debug_rcu_head_unqueue((struct rcu_head *) ptr);
- synchronize_rcu();
- kvfree(ptr);
- }
-}
-EXPORT_SYMBOL_GPL(kvfree_call_rcu);
-
-static unsigned long
-kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
-{
- int cpu;
- unsigned long count = 0;
-
- /* Snapshot count of all CPUs */
- for_each_possible_cpu(cpu) {
- struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+ if (IS_ENABLED(CONFIG_PROVE_RCU))
+ get_state_synchronize_rcu_full(&rs.oldstate);
- count += READ_ONCE(krcp->count);
- count += READ_ONCE(krcp->nr_bkv_objs);
- atomic_set(&krcp->backoff_page_cache_fill, 1);
- }
-
- return count == 0 ? SHRINK_EMPTY : count;
-}
-
-static unsigned long
-kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
-{
- int cpu, freed = 0;
-
- for_each_possible_cpu(cpu) {
- int count;
- struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+ rcu_sr_normal_add_req(&rs);
- count = krcp->count;
- count += drain_page_cache(krcp);
- kfree_rcu_monitor(&krcp->monitor_work.work);
+ /* Kick a GP and start waiting. */
+ (void) start_poll_synchronize_rcu();
- sc->nr_to_scan -= count;
- freed += count;
+ /* Now we can wait. */
+ wait_for_completion(&rs.completion);
+ destroy_rcu_head_on_stack(&rs.head);
- if (sc->nr_to_scan <= 0)
- break;
- }
-
- return freed == 0 ? SHRINK_STOP : freed;
-}
-
-static struct shrinker kfree_rcu_shrinker = {
- .count_objects = kfree_rcu_shrink_count,
- .scan_objects = kfree_rcu_shrink_scan,
- .batch = 0,
- .seeks = DEFAULT_SEEKS,
-};
-
-void __init kfree_rcu_scheduler_running(void)
-{
- int cpu;
- unsigned long flags;
-
- for_each_possible_cpu(cpu) {
- struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
-
- raw_spin_lock_irqsave(&krcp->lock, flags);
- if (need_offload_krc(krcp))
- schedule_delayed_monitor_work(krcp);
- raw_spin_unlock_irqrestore(&krcp->lock, flags);
- }
-}
-
-/*
- * During early boot, any blocking grace-period wait automatically
- * implies a grace period.
- *
- * Later on, this could in theory be the case for kernels built with
- * CONFIG_SMP=y && CONFIG_PREEMPTION=y running on a single CPU, but this
- * is not a common case. Furthermore, this optimization would cause
- * the rcu_gp_oldstate structure to expand by 50%, so this potential
- * grace-period optimization is ignored once the scheduler is running.
- */
-static int rcu_blocking_is_gp(void)
-{
- if (rcu_scheduler_active != RCU_SCHEDULER_INACTIVE)
- return false;
- might_sleep(); /* Check for RCU read-side critical section. */
- return true;
+trace_complete_out:
+ trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("complete"));
}
/**
@@ -3542,7 +3347,7 @@ void synchronize_rcu(void)
if (rcu_gp_is_expedited())
synchronize_rcu_expedited();
else
- wait_rcu_gp(call_rcu_hurry);
+ synchronize_rcu_normal();
return;
}
@@ -3619,14 +3424,17 @@ EXPORT_SYMBOL_GPL(get_state_synchronize_rcu);
*/
void get_state_synchronize_rcu_full(struct rcu_gp_oldstate *rgosp)
{
- struct rcu_node *rnp = rcu_get_root();
-
/*
* Any prior manipulation of RCU-protected data must happen
* before the loads from ->gp_seq and ->expedited_sequence.
*/
smp_mb(); /* ^^^ */
- rgosp->rgos_norm = rcu_seq_snap(&rnp->gp_seq);
+
+ // Yes, rcu_state.gp_seq, not rnp_root->gp_seq, the latter's use
+ // in poll_state_synchronize_rcu_full() notwithstanding. Use of
+ // the latter here would result in too-short grace periods due to
+ // interactions with newly onlined CPUs.
+ rgosp->rgos_norm = rcu_seq_snap(&rcu_state.gp_seq);
rgosp->rgos_exp = rcu_seq_snap(&rcu_state.expedited_sequence);
}
EXPORT_SYMBOL_GPL(get_state_synchronize_rcu_full);
@@ -3642,7 +3450,6 @@ static void start_poll_synchronize_rcu_common(void)
struct rcu_data *rdp;
struct rcu_node *rnp;
- lockdep_assert_irqs_enabled();
local_irq_save(flags);
rdp = this_cpu_ptr(&rcu_data);
rnp = rdp->mynode;
@@ -3667,9 +3474,6 @@ static void start_poll_synchronize_rcu_common(void)
* grace period has elapsed in the meantime. If the needed grace period
* is not already slated to start, notifies RCU core of the need for that
* grace period.
- *
- * Interrupts must be enabled for the case where it is necessary to awaken
- * the grace-period kthread.
*/
unsigned long start_poll_synchronize_rcu(void)
{
@@ -3690,9 +3494,6 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu);
* grace period (whether normal or expedited) has elapsed in the meantime.
* If the needed grace period is not already slated to start, notifies
* RCU core of the need for that grace period.
- *
- * Interrupts must be enabled for the case where it is necessary to awaken
- * the grace-period kthread.
*/
void start_poll_synchronize_rcu_full(struct rcu_gp_oldstate *rgosp)
{
@@ -3711,7 +3512,9 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu_full);
* If @false is returned, it is the caller's responsibility to invoke this
* function later on until it does return @true. Alternatively, the caller
* can explicitly wait for a grace period, for example, by passing @oldstate
- * to cond_synchronize_rcu() or by directly invoking synchronize_rcu().
+ * to either cond_synchronize_rcu() or cond_synchronize_rcu_expedited()
+ * on the one hand or by directly invoking either synchronize_rcu() or
+ * synchronize_rcu_expedited() on the other.
*
* Yes, this function does not take counter wrap into account.
* But counter wrap is harmless. If the counter wraps, we have waited for
@@ -3722,6 +3525,12 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu_full);
* completed. Alternatively, they can use get_completed_synchronize_rcu()
* to get a guaranteed-completed grace-period state.
*
+ * In addition, because oldstate compresses the grace-period state for
+ * both normal and expedited grace periods into a single unsigned long,
+ * it can miss a grace period when synchronize_rcu() runs concurrently
+ * with synchronize_rcu_expedited(). If this is unacceptable, please
+ * instead use the _full() variant of these polling APIs.
+ *
* This function provides the same memory-ordering guarantees that
* would be provided by a synchronize_rcu() that was invoked at the call
* to the function that provided @oldstate, and that returned at the end
@@ -3862,11 +3671,15 @@ static int rcu_pending(int user)
return 1;
/* Is this a nohz_full CPU in userspace or idle? (Ignore RCU if so.) */
- if ((user || rcu_is_cpu_rrupt_from_idle()) && rcu_nohz_full_cpu())
+ gp_in_progress = rcu_gp_in_progress();
+ if ((user || rcu_is_cpu_rrupt_from_idle() ||
+ (gp_in_progress &&
+ time_before(jiffies, READ_ONCE(rcu_state.gp_start) +
+ nohz_full_patience_delay_jiffies))) &&
+ rcu_nohz_full_cpu())
return 0;
/* Is the RCU core waiting for a quiescent state from this CPU? */
- gp_in_progress = rcu_gp_in_progress();
if (rdp->core_needs_qs && !rdp->cpu_no_qs.b.norm && gp_in_progress)
return 1;
@@ -3914,6 +3727,7 @@ static void rcu_barrier_callback(struct rcu_head *rhp)
{
unsigned long __maybe_unused s = rcu_state.barrier_sequence;
+ rhp->next = rhp; // Mark the callback as having been invoked.
if (atomic_dec_and_test(&rcu_state.barrier_cpu_count)) {
rcu_barrier_trace(TPS("LastCB"), -1, s);
complete(&rcu_state.barrier_completion);
@@ -3982,6 +3796,11 @@ static void rcu_barrier_handler(void *cpu_in)
* to complete. For example, if there are no RCU callbacks queued anywhere
* in the system, then rcu_barrier() is within its rights to return
* immediately, without waiting for anything, much less an RCU grace period.
+ * In fact, rcu_barrier() will normally not result in any RCU grace periods
+ * beyond those that were already destined to be executed.
+ *
+ * In kernels built with CONFIG_RCU_LAZY=y, this function also hurries all
+ * pending lazy RCU callbacks.
*/
void rcu_barrier(void)
{
@@ -4079,6 +3898,202 @@ retry:
}
EXPORT_SYMBOL_GPL(rcu_barrier);
+static unsigned long rcu_barrier_last_throttle;
+
+/**
+ * rcu_barrier_throttled - Do rcu_barrier(), but limit to one per second
+ *
+ * This can be thought of as guard rails around rcu_barrier() that
+ * permits unrestricted userspace use, at least assuming the hardware's
+ * try_cmpxchg() is robust. There will be at most one call per second to
+ * rcu_barrier() system-wide from use of this function, which means that
+ * callers might needlessly wait a second or three.
+ *
+ * This is intended for use by test suites to avoid OOM by flushing RCU
+ * callbacks from the previous test before starting the next. See the
+ * rcutree.do_rcu_barrier module parameter for more information.
+ *
+ * Why not simply make rcu_barrier() more scalable? That might be
+ * the eventual endpoint, but let's keep it simple for the time being.
+ * Note that the module parameter infrastructure serializes calls to a
+ * given .set() function, but should concurrent .set() invocation ever be
+ * possible, we are ready!
+ */
+static void rcu_barrier_throttled(void)
+{
+ unsigned long j = jiffies;
+ unsigned long old = READ_ONCE(rcu_barrier_last_throttle);
+ unsigned long s = rcu_seq_snap(&rcu_state.barrier_sequence);
+
+ while (time_in_range(j, old, old + HZ / 16) ||
+ !try_cmpxchg(&rcu_barrier_last_throttle, &old, j)) {
+ schedule_timeout_idle(HZ / 16);
+ if (rcu_seq_done(&rcu_state.barrier_sequence, s)) {
+ smp_mb(); /* caller's subsequent code after above check. */
+ return;
+ }
+ j = jiffies;
+ old = READ_ONCE(rcu_barrier_last_throttle);
+ }
+ rcu_barrier();
+}
+
+/*
+ * Invoke rcu_barrier_throttled() when a rcutree.do_rcu_barrier
+ * request arrives. We insist on a true value to allow for possible
+ * future expansion.
+ */
+static int param_set_do_rcu_barrier(const char *val, const struct kernel_param *kp)
+{
+ bool b;
+ int ret;
+
+ if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING)
+ return -EAGAIN;
+ ret = kstrtobool(val, &b);
+ if (!ret && b) {
+ atomic_inc((atomic_t *)kp->arg);
+ rcu_barrier_throttled();
+ atomic_dec((atomic_t *)kp->arg);
+ }
+ return ret;
+}
+
+/*
+ * Output the number of outstanding rcutree.do_rcu_barrier requests.
+ */
+static int param_get_do_rcu_barrier(char *buffer, const struct kernel_param *kp)
+{
+ return sprintf(buffer, "%d\n", atomic_read((atomic_t *)kp->arg));
+}
+
+static const struct kernel_param_ops do_rcu_barrier_ops = {
+ .set = param_set_do_rcu_barrier,
+ .get = param_get_do_rcu_barrier,
+};
+static atomic_t do_rcu_barrier;
+module_param_cb(do_rcu_barrier, &do_rcu_barrier_ops, &do_rcu_barrier, 0644);
+
+/*
+ * Compute the mask of online CPUs for the specified rcu_node structure.
+ * This will not be stable unless the rcu_node structure's ->lock is
+ * held, but the bit corresponding to the current CPU will be stable
+ * in most contexts.
+ */
+static unsigned long rcu_rnp_online_cpus(struct rcu_node *rnp)
+{
+ return READ_ONCE(rnp->qsmaskinitnext);
+}
+
+/*
+ * Is the CPU corresponding to the specified rcu_data structure online
+ * from RCU's perspective? This perspective is given by that structure's
+ * ->qsmaskinitnext field rather than by the global cpu_online_mask.
+ */
+static bool rcu_rdp_cpu_online(struct rcu_data *rdp)
+{
+ return !!(rdp->grpmask & rcu_rnp_online_cpus(rdp->mynode));
+}
+
+bool rcu_cpu_online(int cpu)
+{
+ struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+
+ return rcu_rdp_cpu_online(rdp);
+}
+
+#if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU)
+
+/*
+ * Is the current CPU online as far as RCU is concerned?
+ *
+ * Disable preemption to avoid false positives that could otherwise
+ * happen due to the current CPU number being sampled, this task being
+ * preempted, its old CPU being taken offline, resuming on some other CPU,
+ * then determining that its old CPU is now offline.
+ *
+ * Disable checking if in an NMI handler because we cannot safely
+ * report errors from NMI handlers anyway. In addition, it is OK to use
+ * RCU on an offline processor during initial boot, hence the check for
+ * rcu_scheduler_fully_active.
+ */
+bool notrace rcu_lockdep_current_cpu_online(void)
+{
+ struct rcu_data *rdp;
+ bool ret = false;
+
+ if (in_nmi() || !rcu_scheduler_fully_active)
+ return true;
+ preempt_disable_notrace();
+ rdp = this_cpu_ptr(&rcu_data);
+ /*
+ * Strictly, we care here about the case where the current CPU is
+ * in rcutree_report_cpu_starting() and thus has an excuse for rdp->grpmask
+ * not being up to date. So arch_spin_is_locked() might have a
+ * false positive if it's held by some *other* CPU, but that's
+ * OK because that just means a false *negative* on the warning.
+ */
+ if (rcu_rdp_cpu_online(rdp) || arch_spin_is_locked(&rcu_state.ofl_lock))
+ ret = true;
+ preempt_enable_notrace();
+ return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_lockdep_current_cpu_online);
+
+#endif /* #if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU) */
+
+// Has rcu_init() been invoked? This is used (for example) to determine
+// whether spinlocks may be acquired safely.
+static bool rcu_init_invoked(void)
+{
+ return !!READ_ONCE(rcu_state.n_online_cpus);
+}
+
+/*
+ * All CPUs for the specified rcu_node structure have gone offline,
+ * and all tasks that were preempted within an RCU read-side critical
+ * section while running on one of those CPUs have since exited their RCU
+ * read-side critical section. Some other CPU is reporting this fact with
+ * the specified rcu_node structure's ->lock held and interrupts disabled.
+ * This function therefore goes up the tree of rcu_node structures,
+ * clearing the corresponding bits in the ->qsmaskinit fields. Note that
+ * the leaf rcu_node structure's ->qsmaskinit field has already been
+ * updated.
+ *
+ * This function does check that the specified rcu_node structure has
+ * all CPUs offline and no blocked tasks, so it is OK to invoke it
+ * prematurely. That said, invoking it after the fact will cost you
+ * a needless lock acquisition. So once it has done its work, don't
+ * invoke it again.
+ */
+static void rcu_cleanup_dead_rnp(struct rcu_node *rnp_leaf)
+{
+ long mask;
+ struct rcu_node *rnp = rnp_leaf;
+
+ raw_lockdep_assert_held_rcu_node(rnp_leaf);
+ if (!IS_ENABLED(CONFIG_HOTPLUG_CPU) ||
+ WARN_ON_ONCE(rnp_leaf->qsmaskinit) ||
+ WARN_ON_ONCE(rcu_preempt_has_tasks(rnp_leaf)))
+ return;
+ for (;;) {
+ mask = rnp->grpmask;
+ rnp = rnp->parent;
+ if (!rnp)
+ break;
+ raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
+ rnp->qsmaskinit &= ~mask;
+ /* Between grace periods, so better already be zero! */
+ WARN_ON_ONCE(rnp->qsmask);
+ if (rnp->qsmaskinit) {
+ raw_spin_unlock_rcu_node(rnp);
+ /* irqs remain disabled. */
+ return;
+ }
+ raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
+ }
+}
+
/*
* Propagate ->qsinitmask bits up the rcu_node tree to account for the
* first CPU in a given leaf rcu_node structure coming online. The caller
@@ -4119,18 +4134,87 @@ rcu_boot_init_percpu_data(int cpu)
/* Set up local state, ensuring consistent view of global state. */
rdp->grpmask = leaf_node_cpu_bit(rdp->mynode, cpu);
INIT_WORK(&rdp->strict_work, strict_work_handler);
- WARN_ON_ONCE(ct->dynticks_nesting != 1);
- WARN_ON_ONCE(rcu_dynticks_in_eqs(rcu_dynticks_snap(cpu)));
+ WARN_ON_ONCE(ct->nesting != 1);
+ WARN_ON_ONCE(rcu_watching_snap_in_eqs(ct_rcu_watching_cpu(cpu)));
rdp->barrier_seq_snap = rcu_state.barrier_sequence;
rdp->rcu_ofl_gp_seq = rcu_state.gp_seq;
- rdp->rcu_ofl_gp_flags = RCU_GP_CLEANED;
+ rdp->rcu_ofl_gp_state = RCU_GP_CLEANED;
rdp->rcu_onl_gp_seq = rcu_state.gp_seq;
- rdp->rcu_onl_gp_flags = RCU_GP_CLEANED;
+ rdp->rcu_onl_gp_state = RCU_GP_CLEANED;
rdp->last_sched_clock = jiffies;
rdp->cpu = cpu;
rcu_boot_init_nocb_percpu_data(rdp);
}
+static void rcu_thread_affine_rnp(struct task_struct *t, struct rcu_node *rnp)
+{
+ cpumask_var_t affinity;
+ int cpu;
+
+ if (!zalloc_cpumask_var(&affinity, GFP_KERNEL))
+ return;
+
+ for_each_leaf_node_possible_cpu(rnp, cpu)
+ cpumask_set_cpu(cpu, affinity);
+
+ kthread_affine_preferred(t, affinity);
+
+ free_cpumask_var(affinity);
+}
+
+struct kthread_worker *rcu_exp_gp_kworker;
+
+static void rcu_spawn_exp_par_gp_kworker(struct rcu_node *rnp)
+{
+ struct kthread_worker *kworker;
+ const char *name = "rcu_exp_par_gp_kthread_worker/%d";
+ struct sched_param param = { .sched_priority = kthread_prio };
+ int rnp_index = rnp - rcu_get_root();
+
+ if (rnp->exp_kworker)
+ return;
+
+ kworker = kthread_create_worker(0, name, rnp_index);
+ if (IS_ERR_OR_NULL(kworker)) {
+ pr_err("Failed to create par gp kworker on %d/%d\n",
+ rnp->grplo, rnp->grphi);
+ return;
+ }
+ WRITE_ONCE(rnp->exp_kworker, kworker);
+
+ if (IS_ENABLED(CONFIG_RCU_EXP_KTHREAD))
+ sched_setscheduler_nocheck(kworker->task, SCHED_FIFO, &param);
+
+ rcu_thread_affine_rnp(kworker->task, rnp);
+ wake_up_process(kworker->task);
+}
+
+static void __init rcu_start_exp_gp_kworker(void)
+{
+ const char *name = "rcu_exp_gp_kthread_worker";
+ struct sched_param param = { .sched_priority = kthread_prio };
+
+ rcu_exp_gp_kworker = kthread_run_worker(0, name);
+ if (IS_ERR_OR_NULL(rcu_exp_gp_kworker)) {
+ pr_err("Failed to create %s!\n", name);
+ rcu_exp_gp_kworker = NULL;
+ return;
+ }
+
+ if (IS_ENABLED(CONFIG_RCU_EXP_KTHREAD))
+ sched_setscheduler_nocheck(rcu_exp_gp_kworker->task, SCHED_FIFO, &param);
+}
+
+static void rcu_spawn_rnp_kthreads(struct rcu_node *rnp)
+{
+ if (rcu_scheduler_fully_active) {
+ mutex_lock(&rnp->kthread_mutex);
+ rcu_spawn_one_boost_kthread(rnp);
+ rcu_spawn_exp_par_gp_kworker(rnp);
+ mutex_unlock(&rnp->kthread_mutex);
+ }
+}
+
/*
* Invoked early in the CPU-online process, when pretty much all services
* are available. The incoming CPU is not present.
@@ -4153,7 +4237,7 @@ int rcutree_prepare_cpu(unsigned int cpu)
rdp->qlen_last_fqs_check = 0;
rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs);
rdp->blimit = blimit;
- ct->dynticks_nesting = 1; /* CPU not up, no tearing. */
+ ct->nesting = 1; /* CPU not up, no tearing. */
raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
/*
@@ -4170,7 +4254,6 @@ int rcutree_prepare_cpu(unsigned int cpu)
*/
rnp = rdp->mynode;
raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
- rdp->beenonline = true; /* We have now been online. */
rdp->gp_seq = READ_ONCE(rnp->gp_seq);
rdp->gp_seq_needed = rdp->gp_seq;
rdp->cpu_no_qs.b.norm = true;
@@ -4180,21 +4263,24 @@ int rcutree_prepare_cpu(unsigned int cpu)
rdp->rcu_iw_gp_seq = rdp->gp_seq - 1;
trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuonl"));
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
- rcu_spawn_one_boost_kthread(rnp);
+
+ rcu_preempt_deferred_qs_init(rdp);
+ rcu_spawn_rnp_kthreads(rnp);
rcu_spawn_cpu_nocb_kthread(cpu);
+ ASSERT_EXCLUSIVE_WRITER(rcu_state.n_online_cpus);
WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus + 1);
return 0;
}
/*
- * Update RCU priority boot kthread affinity for CPU-hotplug changes.
+ * Has the specified (known valid) CPU ever been fully online?
*/
-static void rcutree_affinity_setting(unsigned int cpu, int outgoing)
+bool rcu_cpu_beenfullyonline(int cpu)
{
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
- rcu_boost_kthread_setaffinity(rdp->mynode, outgoing);
+ return smp_load_acquire(&rdp->beenonline);
}
/*
@@ -4214,8 +4300,6 @@ int rcutree_online_cpu(unsigned int cpu)
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
return 0; /* Too early in boot for scheduler work. */
- sync_sched_exp_online_cleanup(cpu);
- rcutree_affinity_setting(cpu, -1);
// Stop-machine done, so allow nohz_full to disable tick.
tick_dep_clear(TICK_DEP_BIT_RCU);
@@ -4223,29 +4307,6 @@ int rcutree_online_cpu(unsigned int cpu)
}
/*
- * Near the beginning of the process. The CPU is still very much alive
- * with pretty much all services enabled.
- */
-int rcutree_offline_cpu(unsigned int cpu)
-{
- unsigned long flags;
- struct rcu_data *rdp;
- struct rcu_node *rnp;
-
- rdp = per_cpu_ptr(&rcu_data, cpu);
- rnp = rdp->mynode;
- raw_spin_lock_irqsave_rcu_node(rnp, flags);
- rnp->ffmask &= ~rdp->grpmask;
- raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
-
- rcutree_affinity_setting(cpu, cpu);
-
- // nohz_full CPUs need the tick for stop-machine to work quickly
- tick_dep_set(TICK_DEP_BIT_RCU);
- return 0;
-}
-
-/*
* Mark the specified CPU as being online so that subsequent grace periods
* (both expedited and normal) will wait on it. Note that this means that
* incoming CPUs are not allowed to use RCU read-side critical sections
@@ -4255,15 +4316,18 @@ int rcutree_offline_cpu(unsigned int cpu)
* Note that this function is special in that it is invoked directly
* from the incoming CPU rather than from the cpuhp_step mechanism.
* This is because this function must be invoked at a precise location.
+ * This incoming CPU must not have enabled interrupts yet.
+ *
+ * This mirrors the effects of rcutree_report_cpu_dead().
*/
-void rcu_cpu_starting(unsigned int cpu)
+void rcutree_report_cpu_starting(unsigned int cpu)
{
- unsigned long flags;
unsigned long mask;
struct rcu_data *rdp;
struct rcu_node *rnp;
bool newcpu;
+ lockdep_assert_irqs_disabled();
rdp = per_cpu_ptr(&rcu_data, cpu);
if (rdp->cpu_started)
return;
@@ -4271,9 +4335,8 @@ void rcu_cpu_starting(unsigned int cpu)
rnp = rdp->mynode;
mask = rdp->grpmask;
- local_irq_save(flags);
arch_spin_lock(&rcu_state.ofl_lock);
- rcu_dynticks_eqs_online();
+ rcu_watching_online();
raw_spin_lock(&rcu_state.barrier_lock);
raw_spin_lock_rcu_node(rnp);
WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext | mask);
@@ -4285,22 +4348,22 @@ void rcu_cpu_starting(unsigned int cpu)
ASSERT_EXCLUSIVE_WRITER(rcu_state.ncpus);
rcu_gpnum_ovf(rnp, rdp); /* Offline-induced counter wrap? */
rdp->rcu_onl_gp_seq = READ_ONCE(rcu_state.gp_seq);
- rdp->rcu_onl_gp_flags = READ_ONCE(rcu_state.gp_flags);
+ rdp->rcu_onl_gp_state = READ_ONCE(rcu_state.gp_state);
/* An incoming CPU should never be blocking a grace period. */
if (WARN_ON_ONCE(rnp->qsmask & mask)) { /* RCU waiting on incoming CPU? */
/* rcu_report_qs_rnp() *really* wants some flags to restore */
- unsigned long flags2;
+ unsigned long flags;
- local_irq_save(flags2);
+ local_irq_save(flags);
rcu_disable_urgency_upon_qs(rdp);
/* Report QS -after- changing ->qsmaskinitnext! */
- rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags2);
+ rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
} else {
raw_spin_unlock_rcu_node(rnp);
}
arch_spin_unlock(&rcu_state.ofl_lock);
- local_irq_restore(flags);
+ smp_store_release(&rdp->beenonline, true);
smp_mb(); /* Ensure RCU read-side usage follows above initialization. */
}
@@ -4311,14 +4374,27 @@ void rcu_cpu_starting(unsigned int cpu)
* Note that this function is special in that it is invoked directly
* from the outgoing CPU rather than from the cpuhp_step mechanism.
* This is because this function must be invoked at a precise location.
+ *
+ * This mirrors the effect of rcutree_report_cpu_starting().
*/
-void rcu_report_dead(unsigned int cpu)
+void rcutree_report_cpu_dead(void)
{
- unsigned long flags, seq_flags;
+ unsigned long flags;
unsigned long mask;
- struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+ struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
struct rcu_node *rnp = rdp->mynode; /* Outgoing CPU's rdp & rnp. */
+ /*
+ * IRQS must be disabled from now on and until the CPU dies, or an interrupt
+ * may introduce a new READ-side while it is actually off the QS masks.
+ */
+ lockdep_assert_irqs_disabled();
+ /*
+ * CPUHP_AP_SMPCFD_DYING was the last call for rcu_exp_handler() execution.
+ * The requested QS must have been reported on the last context switch
+ * from stop machine to idle.
+ */
+ WARN_ON_ONCE(rdp->cpu_no_qs.b.exp);
// Do any dangling deferred wakeups.
do_nocb_deferred_wakeup(rdp);
@@ -4326,22 +4402,27 @@ void rcu_report_dead(unsigned int cpu)
/* Remove outgoing CPU from mask in the leaf rcu_node structure. */
mask = rdp->grpmask;
- local_irq_save(seq_flags);
+
+ /*
+ * Hold the ofl_lock and rnp lock to avoid races between CPU going
+ * offline and doing a QS report (as below), versus rcu_gp_init().
+ * See Requirements.rst > Hotplug CPU > Concurrent QS Reporting section
+ * for more details.
+ */
arch_spin_lock(&rcu_state.ofl_lock);
raw_spin_lock_irqsave_rcu_node(rnp, flags); /* Enforce GP memory-order guarantee. */
rdp->rcu_ofl_gp_seq = READ_ONCE(rcu_state.gp_seq);
- rdp->rcu_ofl_gp_flags = READ_ONCE(rcu_state.gp_flags);
+ rdp->rcu_ofl_gp_state = READ_ONCE(rcu_state.gp_state);
if (rnp->qsmask & mask) { /* RCU waiting on outgoing CPU? */
/* Report quiescent state -before- changing ->qsmaskinitnext! */
rcu_disable_urgency_upon_qs(rdp);
rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
raw_spin_lock_irqsave_rcu_node(rnp, flags);
}
+ /* Clear from ->qsmaskinitnext to mark offline. */
WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext & ~mask);
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
arch_spin_unlock(&rcu_state.ofl_lock);
- local_irq_restore(seq_flags);
-
rdp->cpu_started = false;
}
@@ -4359,11 +4440,15 @@ void rcutree_migrate_callbacks(int cpu)
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
bool needwake;
- if (rcu_rdp_is_offloaded(rdp) ||
- rcu_segcblist_empty(&rdp->cblist))
- return; /* No callbacks to migrate. */
+ if (rcu_rdp_is_offloaded(rdp))
+ return;
raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags);
+ if (rcu_segcblist_empty(&rdp->cblist)) {
+ raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags);
+ return; /* No callbacks to migrate. */
+ }
+
WARN_ON_ONCE(rcu_rdp_cpu_online(rdp));
rcu_barrier_entrain(rdp);
my_rdp = this_cpu_ptr(&rcu_data);
@@ -4385,8 +4470,9 @@ void rcutree_migrate_callbacks(int cpu)
__call_rcu_nocb_wake(my_rdp, true, flags);
} else {
rcu_nocb_unlock(my_rdp); /* irqs remain disabled. */
- raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags);
+ raw_spin_unlock_rcu_node(my_rnp); /* irqs remain disabled. */
}
+ local_irq_restore(flags);
if (needwake)
rcu_gp_kthread_wake();
lockdep_assert_irqs_enabled();
@@ -4396,7 +4482,59 @@ void rcutree_migrate_callbacks(int cpu)
cpu, rcu_segcblist_n_cbs(&rdp->cblist),
rcu_segcblist_first_cb(&rdp->cblist));
}
-#endif
+
+/*
+ * The CPU has been completely removed, and some other CPU is reporting
+ * this fact from process context. Do the remainder of the cleanup.
+ * There can only be one CPU hotplug operation at a time, so no need for
+ * explicit locking.
+ */
+int rcutree_dead_cpu(unsigned int cpu)
+{
+ ASSERT_EXCLUSIVE_WRITER(rcu_state.n_online_cpus);
+ WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus - 1);
+ // Stop-machine done, so allow nohz_full to disable tick.
+ tick_dep_clear(TICK_DEP_BIT_RCU);
+ return 0;
+}
+
+/*
+ * Near the end of the offline process. Trace the fact that this CPU
+ * is going offline.
+ */
+int rcutree_dying_cpu(unsigned int cpu)
+{
+ bool blkd;
+ struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+ struct rcu_node *rnp = rdp->mynode;
+
+ blkd = !!(READ_ONCE(rnp->qsmask) & rdp->grpmask);
+ trace_rcu_grace_period(rcu_state.name, READ_ONCE(rnp->gp_seq),
+ blkd ? TPS("cpuofl-bgp") : TPS("cpuofl"));
+ return 0;
+}
+
+/*
+ * Near the beginning of the process. The CPU is still very much alive
+ * with pretty much all services enabled.
+ */
+int rcutree_offline_cpu(unsigned int cpu)
+{
+ unsigned long flags;
+ struct rcu_data *rdp;
+ struct rcu_node *rnp;
+
+ rdp = per_cpu_ptr(&rcu_data, cpu);
+ rnp = rdp->mynode;
+ raw_spin_lock_irqsave_rcu_node(rnp, flags);
+ rnp->ffmask &= ~rdp->grpmask;
+ raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
+
+ // nohz_full CPUs need the tick for stop-machine to work quickly
+ tick_dep_set(TICK_DEP_BIT_RCU);
+ return 0;
+}
+#endif /* #ifdef CONFIG_HOTPLUG_CPU */
/*
* On non-huge systems, use expedited RCU grace periods to make suspend
@@ -4408,11 +4546,13 @@ static int rcu_pm_notify(struct notifier_block *self,
switch (action) {
case PM_HIBERNATION_PREPARE:
case PM_SUSPEND_PREPARE:
+ rcu_async_hurry();
rcu_expedite_gp();
break;
case PM_POST_HIBERNATION:
case PM_POST_SUSPEND:
rcu_unexpedite_gp();
+ rcu_async_relax();
break;
default:
break;
@@ -4420,51 +4560,6 @@ static int rcu_pm_notify(struct notifier_block *self,
return NOTIFY_OK;
}
-#ifdef CONFIG_RCU_EXP_KTHREAD
-struct kthread_worker *rcu_exp_gp_kworker;
-struct kthread_worker *rcu_exp_par_gp_kworker;
-
-static void __init rcu_start_exp_gp_kworkers(void)
-{
- const char *par_gp_kworker_name = "rcu_exp_par_gp_kthread_worker";
- const char *gp_kworker_name = "rcu_exp_gp_kthread_worker";
- struct sched_param param = { .sched_priority = kthread_prio };
-
- rcu_exp_gp_kworker = kthread_create_worker(0, gp_kworker_name);
- if (IS_ERR_OR_NULL(rcu_exp_gp_kworker)) {
- pr_err("Failed to create %s!\n", gp_kworker_name);
- return;
- }
-
- rcu_exp_par_gp_kworker = kthread_create_worker(0, par_gp_kworker_name);
- if (IS_ERR_OR_NULL(rcu_exp_par_gp_kworker)) {
- pr_err("Failed to create %s!\n", par_gp_kworker_name);
- kthread_destroy_worker(rcu_exp_gp_kworker);
- return;
- }
-
- sched_setscheduler_nocheck(rcu_exp_gp_kworker->task, SCHED_FIFO, &param);
- sched_setscheduler_nocheck(rcu_exp_par_gp_kworker->task, SCHED_FIFO,
- &param);
-}
-
-static inline void rcu_alloc_par_gp_wq(void)
-{
-}
-#else /* !CONFIG_RCU_EXP_KTHREAD */
-struct workqueue_struct *rcu_par_gp_wq;
-
-static void __init rcu_start_exp_gp_kworkers(void)
-{
-}
-
-static inline void rcu_alloc_par_gp_wq(void)
-{
- rcu_par_gp_wq = alloc_workqueue("rcu_par_gp", WQ_MEM_RECLAIM, 0);
- WARN_ON(!rcu_par_gp_wq);
-}
-#endif /* CONFIG_RCU_EXP_KTHREAD */
-
/*
* Spawn the kthreads that handle RCU's grace periods.
*/
@@ -4499,10 +4594,10 @@ static int __init rcu_spawn_gp_kthread(void)
* due to rcu_scheduler_fully_active.
*/
rcu_spawn_cpu_nocb_kthread(smp_processor_id());
- rcu_spawn_one_boost_kthread(rdp->mynode);
+ rcu_spawn_rnp_kthreads(rdp->mynode);
rcu_spawn_core_kthreads();
/* Create kthread worker for expedited GPs */
- rcu_start_exp_gp_kworkers();
+ rcu_start_exp_gp_kworker();
return 0;
}
early_initcall(rcu_spawn_gp_kthread);
@@ -4605,7 +4700,7 @@ static void __init rcu_init_one(void)
init_waitqueue_head(&rnp->exp_wq[2]);
init_waitqueue_head(&rnp->exp_wq[3]);
spin_lock_init(&rnp->exp_lock);
- mutex_init(&rnp->boost_kthread_mutex);
+ mutex_init(&rnp->kthread_mutex);
raw_spin_lock_init(&rnp->exp_poll_lock);
rnp->exp_seq_poll_rq = RCU_GET_STATE_COMPLETED;
INIT_WORK(&rnp->exp_poll_wq, sync_rcu_do_polled_gp);
@@ -4619,6 +4714,8 @@ static void __init rcu_init_one(void)
while (i > rnp->grphi)
rnp++;
per_cpu_ptr(&rcu_data, i)->mynode = rnp;
+ per_cpu_ptr(&rcu_data, i)->barrier_head.next =
+ &per_cpu_ptr(&rcu_data, i)->barrier_head;
rcu_boot_init_percpu_data(i);
}
}
@@ -4697,8 +4794,7 @@ void rcu_init_geometry(void)
* Complain and fall back to the compile-time values if this
* limit is exceeded.
*/
- if (rcu_fanout_leaf < 2 ||
- rcu_fanout_leaf > sizeof(unsigned long) * 8) {
+ if (rcu_fanout_leaf < 2 || rcu_fanout_leaf > BITS_PER_LONG) {
rcu_fanout_leaf = RCU_FANOUT_LEAF;
WARN_ON(1);
return;
@@ -4763,46 +4859,12 @@ static void __init rcu_dump_rcu_node_tree(void)
struct workqueue_struct *rcu_gp_wq;
-static void __init kfree_rcu_batch_init(void)
-{
- int cpu;
- int i;
-
- /* Clamp it to [0:100] seconds interval. */
- if (rcu_delay_page_cache_fill_msec < 0 ||
- rcu_delay_page_cache_fill_msec > 100 * MSEC_PER_SEC) {
-
- rcu_delay_page_cache_fill_msec =
- clamp(rcu_delay_page_cache_fill_msec, 0,
- (int) (100 * MSEC_PER_SEC));
-
- pr_info("Adjusting rcutree.rcu_delay_page_cache_fill_msec to %d ms.\n",
- rcu_delay_page_cache_fill_msec);
- }
-
- for_each_possible_cpu(cpu) {
- struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
-
- for (i = 0; i < KFREE_N_BATCHES; i++) {
- INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
- krcp->krw_arr[i].krcp = krcp;
- }
-
- INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
- INIT_DELAYED_WORK(&krcp->page_cache_work, fill_page_cache_func);
- krcp->initialized = true;
- }
- if (register_shrinker(&kfree_rcu_shrinker, "rcu-kfree"))
- pr_err("Failed to register kfree_rcu() shrinker!\n");
-}
-
void __init rcu_init(void)
{
int cpu = smp_processor_id();
rcu_early_boot_tests();
- kfree_rcu_batch_init();
rcu_bootup_announce();
sanitize_kthread_prio();
rcu_init_geometry();
@@ -4820,13 +4882,21 @@ void __init rcu_init(void)
pm_notifier(rcu_pm_notify, 0);
WARN_ON(num_online_cpus() > 1); // Only one CPU this early in boot.
rcutree_prepare_cpu(cpu);
- rcu_cpu_starting(cpu);
+ rcutree_report_cpu_starting(cpu);
rcutree_online_cpu(cpu);
/* Create workqueue for Tree SRCU and for expedited GPs. */
- rcu_gp_wq = alloc_workqueue("rcu_gp", WQ_MEM_RECLAIM, 0);
+ rcu_gp_wq = alloc_workqueue("rcu_gp", WQ_MEM_RECLAIM | WQ_PERCPU, 0);
WARN_ON(!rcu_gp_wq);
- rcu_alloc_par_gp_wq();
+
+ sync_wq = alloc_workqueue("sync_wq", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
+ WARN_ON(!sync_wq);
+
+ /* Respect if explicitly disabled via a boot parameter. */
+ if (rcu_normal_wake_from_gp < 0) {
+ if (num_possible_cpus() <= WAKE_FROM_GP_CPU_THRESHOLD)
+ rcu_normal_wake_from_gp = 1;
+ }
/* Fill in default value for rcutree.qovld boot parameter. */
/* -After- the rcu_node ->lock fields are initialized! */
@@ -4835,9 +4905,12 @@ void __init rcu_init(void)
else
qovld_calc = qovld;
- // Kick-start any polled grace periods that started early.
- if (!(per_cpu_ptr(&rcu_data, cpu)->mynode->exp_seq_poll_rq & 0x1))
- (void)start_poll_synchronize_rcu_expedited();
+ // Kick-start in case any polled grace periods started early.
+ (void)start_poll_synchronize_rcu_expedited();
+
+ rcu_test_sync_prims();
+
+ tasks_cblist_init_generic();
}
#include "tree_stall.h"