diff options
Diffstat (limited to 'kernel/rcu/tree_nocb.h')
| -rw-r--r-- | kernel/rcu/tree_nocb.h | 913 |
1 files changed, 535 insertions, 378 deletions
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h index eeafb546a7a0..e6cd56603cad 100644 --- a/kernel/rcu/tree_nocb.h +++ b/kernel/rcu/tree_nocb.h @@ -16,10 +16,6 @@ #ifdef CONFIG_RCU_NOCB_CPU static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */ static bool __read_mostly rcu_nocb_poll; /* Offload kthread are to poll. */ -static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp) -{ - return lockdep_is_held(&rdp->nocb_lock); -} static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp) { @@ -60,9 +56,6 @@ static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp) * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. * If the list is invalid, a warning is emitted and all CPUs are offloaded. */ - -static bool rcu_nocb_is_setup; - static int __init rcu_nocb_setup(char *str) { alloc_bootmem_cpumask_var(&rcu_nocb_mask); @@ -72,7 +65,7 @@ static int __init rcu_nocb_setup(char *str) cpumask_setall(rcu_nocb_mask); } } - rcu_nocb_is_setup = true; + rcu_state.nocb_is_setup = true; return 1; } __setup("rcu_nocbs", rcu_nocb_setup); @@ -80,9 +73,9 @@ __setup("rcu_nocbs", rcu_nocb_setup); static int __init parse_rcu_nocb_poll(char *arg) { rcu_nocb_poll = true; - return 0; + return 1; } -early_param("rcu_nocb_poll", parse_rcu_nocb_poll); +__setup("rcu_nocb_poll", parse_rcu_nocb_poll); /* * Don't bother bypassing ->cblist if the call_rcu() rate is low. @@ -94,8 +87,7 @@ module_param(nocb_nobypass_lim_per_jiffy, int, 0); /* * Acquire the specified rcu_data structure's ->nocb_bypass_lock. If the - * lock isn't immediately available, increment ->nocb_lock_contended to - * flag the contention. + * lock isn't immediately available, perform minimal sanity check. */ static void rcu_nocb_bypass_lock(struct rcu_data *rdp) __acquires(&rdp->nocb_bypass_lock) @@ -103,29 +95,12 @@ static void rcu_nocb_bypass_lock(struct rcu_data *rdp) lockdep_assert_irqs_disabled(); if (raw_spin_trylock(&rdp->nocb_bypass_lock)) return; - atomic_inc(&rdp->nocb_lock_contended); + /* + * Contention expected only when local enqueue collide with + * remote flush from kthreads. + */ WARN_ON_ONCE(smp_processor_id() != rdp->cpu); - smp_mb__after_atomic(); /* atomic_inc() before lock. */ raw_spin_lock(&rdp->nocb_bypass_lock); - smp_mb__before_atomic(); /* atomic_dec() after lock. */ - atomic_dec(&rdp->nocb_lock_contended); -} - -/* - * Spinwait until the specified rcu_data structure's ->nocb_lock is - * not contended. Please note that this is extremely special-purpose, - * relying on the fact that at most two kthreads and one CPU contend for - * this lock, and also that the two kthreads are guaranteed to have frequent - * grace-period-duration time intervals between successive acquisitions - * of the lock. This allows us to use an extremely simple throttling - * mechanism, and further to apply it only to the CPU doing floods of - * call_rcu() invocations. Don't try this at home! - */ -static void rcu_nocb_wait_contended(struct rcu_data *rdp) -{ - WARN_ON_ONCE(smp_processor_id() != rdp->cpu); - while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended))) - cpu_relax(); } /* @@ -215,14 +190,6 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) init_swait_queue_head(&rnp->nocb_gp_wq[1]); } -/* Is the specified CPU a no-CBs CPU? */ -bool rcu_is_nocb_cpu(int cpu) -{ - if (cpumask_available(rcu_nocb_mask)) - return cpumask_test_cpu(cpu, rcu_nocb_mask); - return false; -} - static bool __wake_nocb_gp(struct rcu_data *rdp_gp, struct rcu_data *rdp, bool force, unsigned long flags) @@ -239,7 +206,7 @@ static bool __wake_nocb_gp(struct rcu_data *rdp_gp, if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) { WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - del_timer(&rdp_gp->nocb_timer); + timer_delete(&rdp_gp->nocb_timer); } if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) { @@ -249,7 +216,7 @@ static bool __wake_nocb_gp(struct rcu_data *rdp_gp, raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags); if (needwake) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake")); - wake_up_process(rdp_gp->nocb_gp_kthread); + swake_up_one(&rdp_gp->nocb_gp_wq); } return needwake; @@ -267,6 +234,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) return __wake_nocb_gp(rdp_gp, rdp, force, flags); } +#ifdef CONFIG_RCU_LAZY +/* + * LAZY_FLUSH_JIFFIES decides the maximum amount of time that + * can elapse before lazy callbacks are flushed. Lazy callbacks + * could be flushed much earlier for a number of other reasons + * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are + * left unsubmitted to RCU after those many jiffies. + */ +#define LAZY_FLUSH_JIFFIES (10 * HZ) +static unsigned long jiffies_lazy_flush = LAZY_FLUSH_JIFFIES; + +// To be called only from test code. +void rcu_set_jiffies_lazy_flush(unsigned long jif) +{ + jiffies_lazy_flush = jif; +} +EXPORT_SYMBOL(rcu_set_jiffies_lazy_flush); + +unsigned long rcu_get_jiffies_lazy_flush(void) +{ + return jiffies_lazy_flush; +} +EXPORT_SYMBOL(rcu_get_jiffies_lazy_flush); +#endif + /* * Arrange to wake the GP kthread for this NOCB group at some future * time when it is safe to do so. @@ -280,10 +272,14 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); /* - * Bypass wakeup overrides previous deferments. In case - * of callback storm, no need to wake up too early. + * Bypass wakeup overrides previous deferments. In case of + * callback storms, no need to wake up too early. */ - if (waketype == RCU_NOCB_WAKE_BYPASS) { + if (waketype == RCU_NOCB_WAKE_LAZY && + rdp_gp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) { + mod_timer(&rdp_gp->nocb_timer, jiffies + rcu_get_jiffies_lazy_flush()); + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); + } else if (waketype == RCU_NOCB_WAKE_BYPASS) { mod_timer(&rdp_gp->nocb_timer, jiffies + 2); WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); } else { @@ -304,12 +300,16 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, * proves to be initially empty, just return false because the no-CB GP * kthread may need to be awakened in this case. * + * Return true if there was something to be flushed and it succeeded, otherwise + * false. + * * Note that this function always returns true if rhp is NULL. */ -static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - unsigned long j) +static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp_in, + unsigned long j, bool lazy) { struct rcu_cblist rcl; + struct rcu_head *rhp = rhp_in; WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp)); rcu_lockdep_assert_cblist_protected(rdp); @@ -321,7 +321,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ if (rhp) rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ + + /* + * If the new CB requested was a lazy one, queue it onto the main + * ->cblist so that we can take advantage of the grace-period that will + * happen regardless. But queue it onto the bypass list first so that + * the lazy CB is ordered with the existing CBs in the bypass list. + */ + if (lazy && rhp) { + rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); + rhp = NULL; + } rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); + WRITE_ONCE(rdp->lazy_len, 0); + rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); WRITE_ONCE(rdp->nocb_bypass_first, j); rcu_nocb_bypass_unlock(rdp); @@ -337,13 +350,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, * Note that this function always returns true if rhp is NULL. */ static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - unsigned long j) + unsigned long j, bool lazy) { if (!rcu_rdp_is_offloaded(rdp)) return true; rcu_lockdep_assert_cblist_protected(rdp); rcu_nocb_bypass_lock(rdp); - return rcu_nocb_do_flush_bypass(rdp, rhp, j); + return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy); } /* @@ -356,7 +369,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) if (!rcu_rdp_is_offloaded(rdp) || !rcu_nocb_bypass_trylock(rdp)) return; - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false)); } /* @@ -378,12 +391,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) * there is only one CPU in operation. */ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - bool *was_alldone, unsigned long flags) + bool *was_alldone, unsigned long flags, + bool lazy) { unsigned long c; unsigned long cur_gp_seq; unsigned long j = jiffies; long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len)); lockdep_assert_irqs_disabled(); @@ -394,14 +409,6 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, return false; } - // In the process of (de-)offloading: no bypassing, but - // locking. - if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) { - rcu_nocb_lock(rdp); - *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); - return false; /* Not offloaded, no bypassing. */ - } - // Don't use ->nocb_bypass during early boot. if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) { rcu_nocb_lock(rdp); @@ -428,24 +435,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, // If there hasn't yet been all that many ->cblist enqueues // this jiffy, tell the caller to enqueue onto ->cblist. But flush // ->nocb_bypass first. - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { + // Lazy CBs throttle this back and do immediate bypass queuing. + if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) { rcu_nocb_lock(rdp); *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); if (*was_alldone) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstQ")); - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); + + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); return false; // Caller must enqueue the callback. } // If ->nocb_bypass has been used too long or is too full, // flush ->nocb_bypass to ->cblist. - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || + if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) || + (ncbs && bypass_is_lazy && + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + rcu_get_jiffies_lazy_flush()))) || ncbs >= qhimark) { rcu_nocb_lock(rdp); - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { - *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); + *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); + + if (!rcu_nocb_flush_bypass(rdp, rhp, j, lazy)) { if (*was_alldone) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstQ")); @@ -458,25 +470,38 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, rcu_advance_cbs_nowake(rdp->mynode, rdp); rdp->nocb_gp_adv_time = j; } - rcu_nocb_unlock_irqrestore(rdp, flags); + + // The flush succeeded and we moved CBs into the regular list. + // Don't wait for the wake up timer as it may be too far ahead. + // Wake up the GP thread now instead, if the cblist was empty. + __call_rcu_nocb_wake(rdp, *was_alldone, flags); + return true; // Callback already enqueued. } // We need to use the bypass. - rcu_nocb_wait_contended(rdp); rcu_nocb_bypass_lock(rdp); ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); + + if (lazy) + WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1); + if (!ncbs) { WRITE_ONCE(rdp->nocb_bypass_first, j); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); } rcu_nocb_bypass_unlock(rdp); - smp_mb(); /* Order enqueue before wake. */ - if (ncbs) { - local_irq_restore(flags); - } else { + + // A wake up of the grace period kthread or timer adjustment + // needs to be done only if: + // 1. Bypass list was fully empty before (this is the first + // bypass list entry), or: + // 2. Both of these conditions are met: + // a. The bypass list previously had only lazy CBs, and: + // b. The new CB is non-lazy. + if (!ncbs || (bypass_is_lazy && !lazy)) { // No-CBs GP kthread might be indefinitely asleep, if so, wake. rcu_nocb_lock(rdp); // Rare during call_rcu() flood. if (!rcu_segcblist_pend_cbs(&rdp->cblist)) { @@ -486,7 +511,7 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, } else { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQnoWake")); - rcu_nocb_unlock_irqrestore(rdp, flags); + rcu_nocb_unlock(rdp); } } return true; // Callback already enqueued. @@ -502,31 +527,41 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, unsigned long flags) __releases(rdp->nocb_lock) { + long bypass_len; unsigned long cur_gp_seq; unsigned long j; + long lazy_len; long len; struct task_struct *t; + struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; // If we are being polled or there is no kthread, just leave. t = READ_ONCE(rdp->nocb_gp_kthread); if (rcu_nocb_poll || !t) { - rcu_nocb_unlock_irqrestore(rdp, flags); + rcu_nocb_unlock(rdp); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNotPoll")); return; } // Need to actually to a wakeup. len = rcu_segcblist_n_cbs(&rdp->cblist); + bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass); + lazy_len = READ_ONCE(rdp->lazy_len); if (was_alldone) { rdp->qlen_last_fqs_check = len; - if (!irqs_disabled_flags(flags)) { + // Only lazy CBs in bypass list + if (lazy_len && bypass_len == lazy_len) { + rcu_nocb_unlock(rdp); + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY, + TPS("WakeLazy")); + } else if (!irqs_disabled_flags(flags)) { /* ... if queue was empty ... */ - rcu_nocb_unlock_irqrestore(rdp, flags); + rcu_nocb_unlock(rdp); wake_nocb_gp(rdp, false); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeEmpty")); } else { - rcu_nocb_unlock_irqrestore(rdp, flags); + rcu_nocb_unlock(rdp); wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, TPS("WakeEmptyIsDeferred")); } @@ -543,66 +578,68 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, smp_mb(); /* Enqueue before timer_pending(). */ if ((rdp->nocb_cb_sleep || !rcu_segcblist_ready_cbs(&rdp->cblist)) && - !timer_pending(&rdp->nocb_timer)) { - rcu_nocb_unlock_irqrestore(rdp, flags); + !timer_pending(&rdp_gp->nocb_timer)) { + rcu_nocb_unlock(rdp); wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, TPS("WakeOvfIsDeferred")); } else { - rcu_nocb_unlock_irqrestore(rdp, flags); + rcu_nocb_unlock(rdp); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); } } else { - rcu_nocb_unlock_irqrestore(rdp, flags); + rcu_nocb_unlock(rdp); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); } } -/* - * Check if we ignore this rdp. - * - * We check that without holding the nocb lock but - * we make sure not to miss a freshly offloaded rdp - * with the current ordering: - * - * rdp_offload_toggle() nocb_gp_enabled_cb() - * ------------------------- ---------------------------- - * WRITE flags LOCK nocb_gp_lock - * LOCK nocb_gp_lock READ/WRITE nocb_gp_sleep - * READ/WRITE nocb_gp_sleep UNLOCK nocb_gp_lock - * UNLOCK nocb_gp_lock READ flags - */ -static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp) +static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head, + rcu_callback_t func, unsigned long flags, bool lazy) { - u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP; + bool was_alldone; - return rcu_segcblist_test_flags(&rdp->cblist, flags); + if (!rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy)) { + /* Not enqueued on bypass but locked, do regular enqueue */ + rcutree_enqueue(rdp, head, func); + __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */ + } } -static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp, - bool *needwake_state) +static void nocb_gp_toggle_rdp(struct rcu_data *rdp_gp, struct rcu_data *rdp) { struct rcu_segcblist *cblist = &rdp->cblist; - - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) { - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) { - rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP); - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) - *needwake_state = true; - } - return false; - } + unsigned long flags; /* - * De-offloading. Clear our flag and notify the de-offload worker. - * We will ignore this rdp until it ever gets re-offloaded. + * Locking orders future de-offloaded callbacks enqueue against previous + * handling of this rdp. Ie: Make sure rcuog is done with this rdp before + * deoffloaded callbacks can be enqueued. */ - WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)); - rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP); - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) - *needwake_state = true; - return true; + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) { + /* + * Offloading. Set our flag and notify the offload worker. + * We will handle this rdp until it ever gets de-offloaded. + */ + list_add_tail(&rdp->nocb_entry_rdp, &rdp_gp->nocb_head_rdp); + rcu_segcblist_set_flags(cblist, SEGCBLIST_OFFLOADED); + } else { + /* + * De-offloading. Clear our flag and notify the de-offload worker. + * We will ignore this rdp until it ever gets re-offloaded. + */ + list_del(&rdp->nocb_entry_rdp); + rcu_segcblist_clear_flags(cblist, SEGCBLIST_OFFLOADED); + } + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); } +static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu) +{ + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep")); + swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, + !READ_ONCE(my_rdp->nocb_gp_sleep)); + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep")); +} /* * No-CBs GP kthreads come here to wait for additional callbacks to show up @@ -611,16 +648,16 @@ static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp, static void nocb_gp_wait(struct rcu_data *my_rdp) { bool bypass = false; - long bypass_ncbs; int __maybe_unused cpu = my_rdp->cpu; unsigned long cur_gp_seq; unsigned long flags; bool gotcbs = false; unsigned long j = jiffies; + bool lazy = false; bool needwait_gp = false; // This prevents actual uninitialized use. bool needwake; bool needwake_gp; - struct rcu_data *rdp; + struct rcu_data *rdp, *rdp_toggling = NULL; struct rcu_node *rnp; unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning. bool wasempty = false; @@ -645,36 +682,44 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) * is added to the list, so the skipped-over rcu_data structures * won't be ignored for long. */ - list_for_each_entry_rcu(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp, 1) { - bool needwake_state = false; + list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) { + long bypass_ncbs; + bool flush_bypass = false; + long lazy_ncbs; - if (!nocb_gp_enabled_cb(rdp)) - continue; trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); rcu_nocb_lock_irqsave(rdp, flags); - if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) { - rcu_nocb_unlock_irqrestore(rdp, flags); - if (needwake_state) - swake_up_one(&rdp->nocb_state_wq); - continue; - } + lockdep_assert_held(&rdp->nocb_lock); bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); - if (bypass_ncbs && + lazy_ncbs = READ_ONCE(rdp->lazy_len); + + if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) && + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + rcu_get_jiffies_lazy_flush()) || + bypass_ncbs > 2 * qhimark)) { + flush_bypass = true; + } else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) && (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || bypass_ncbs > 2 * qhimark)) { - // Bypass full or old, so flush it. - (void)rcu_nocb_try_flush_bypass(rdp, j); - bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + flush_bypass = true; } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { rcu_nocb_unlock_irqrestore(rdp, flags); - if (needwake_state) - swake_up_one(&rdp->nocb_state_wq); continue; /* No callbacks here, try next. */ } + + if (flush_bypass) { + // Bypass full or old, so flush it. + (void)rcu_nocb_try_flush_bypass(rdp, j); + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + lazy_ncbs = READ_ONCE(rdp->lazy_len); + } + if (bypass_ncbs) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("Bypass")); - bypass = true; + bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass")); + if (bypass_ncbs == lazy_ncbs) + lazy = true; + else + bypass = true; } rnp = rdp->mynode; @@ -705,7 +750,6 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) if (rcu_segcblist_ready_cbs(&rdp->cblist)) { needwake = rdp->nocb_cb_sleep; WRITE_ONCE(rdp->nocb_cb_sleep, false); - smp_mb(); /* CB invocation -after- GP end. */ } else { needwake = false; } @@ -716,31 +760,43 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) } if (needwake_gp) rcu_gp_kthread_wake(); - if (needwake_state) - swake_up_one(&rdp->nocb_state_wq); } my_rdp->nocb_gp_bypass = bypass; my_rdp->nocb_gp_gp = needwait_gp; my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; - if (bypass && !rcu_nocb_poll) { - // At least one child with non-empty ->nocb_bypass, so set - // timer in order to avoid stranding its callbacks. - wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, - TPS("WakeBypassIsDeferred")); + // At least one child with non-empty ->nocb_bypass, so set + // timer in order to avoid stranding its callbacks. + if (!rcu_nocb_poll) { + // If bypass list only has lazy CBs. Add a deferred lazy wake up. + if (lazy && !bypass) { + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY, + TPS("WakeLazyIsDeferred")); + // Otherwise add a deferred bypass wake up. + } else if (bypass) { + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, + TPS("WakeBypassIsDeferred")); + } } + if (rcu_nocb_poll) { /* Polling, so trace if first poll in the series. */ if (gotcbs) trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll")); - schedule_timeout_idle(1); + if (list_empty(&my_rdp->nocb_head_rdp)) { + raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); + if (!my_rdp->nocb_toggling_rdp) + WRITE_ONCE(my_rdp->nocb_gp_sleep, true); + raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); + /* Wait for any offloading rdp */ + nocb_gp_sleep(my_rdp, cpu); + } else { + schedule_timeout_idle(1); + } } else if (!needwait_gp) { /* Wait for callbacks to appear. */ - trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep")); - swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, - !READ_ONCE(my_rdp->nocb_gp_sleep)); - trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep")); + nocb_gp_sleep(my_rdp, cpu); } else { rnp = my_rdp->mynode; trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait")); @@ -750,15 +806,41 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) !READ_ONCE(my_rdp->nocb_gp_sleep)); trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait")); } + if (!rcu_nocb_poll) { raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); + // (De-)queue an rdp to/from the group if its nocb state is changing + rdp_toggling = my_rdp->nocb_toggling_rdp; + if (rdp_toggling) + my_rdp->nocb_toggling_rdp = NULL; + if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) { WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - del_timer(&my_rdp->nocb_timer); + timer_delete(&my_rdp->nocb_timer); } WRITE_ONCE(my_rdp->nocb_gp_sleep, true); raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); + } else { + rdp_toggling = READ_ONCE(my_rdp->nocb_toggling_rdp); + if (rdp_toggling) { + /* + * Paranoid locking to make sure nocb_toggling_rdp is well + * reset *before* we (re)set SEGCBLIST_KTHREAD_GP or we could + * race with another round of nocb toggling for this rdp. + * Nocb locking should prevent from that already but we stick + * to paranoia, especially in rare path. + */ + raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); + my_rdp->nocb_toggling_rdp = NULL; + raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); + } } + + if (rdp_toggling) { + nocb_gp_toggle_rdp(my_rdp, rdp_toggling); + swake_up_one(&rdp_toggling->nocb_state_wq); + } + my_rdp->nocb_gp_seq = -1; WARN_ON(signal_pending(current)); } @@ -783,16 +865,9 @@ static int rcu_nocb_gp_kthread(void *arg) return 0; } -static inline bool nocb_cb_can_run(struct rcu_data *rdp) -{ - u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB; - - return rcu_segcblist_test_flags(&rdp->cblist, flags); -} - static inline bool nocb_cb_wait_cond(struct rcu_data *rdp) { - return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep); + return !READ_ONCE(rdp->nocb_cb_sleep) || kthread_should_park(); } /* @@ -804,25 +879,33 @@ static void nocb_cb_wait(struct rcu_data *rdp) struct rcu_segcblist *cblist = &rdp->cblist; unsigned long cur_gp_seq; unsigned long flags; - bool needwake_state = false; bool needwake_gp = false; - bool can_sleep = true; struct rcu_node *rnp = rdp->mynode; - do { - swait_event_interruptible_exclusive(rdp->nocb_cb_wq, - nocb_cb_wait_cond(rdp)); - - // VVV Ensure CB invocation follows _sleep test. - if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^ - WARN_ON(signal_pending(current)); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); + swait_event_interruptible_exclusive(rdp->nocb_cb_wq, + nocb_cb_wait_cond(rdp)); + if (kthread_should_park()) { + /* + * kthread_park() must be preceded by an rcu_barrier(). + * But yet another rcu_barrier() might have sneaked in between + * the barrier callback execution and the callbacks counter + * decrement. + */ + if (rdp->nocb_cb_sleep) { + rcu_nocb_lock_irqsave(rdp, flags); + WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist)); + rcu_nocb_unlock_irqrestore(rdp, flags); + kthread_parkme(); } - } while (!nocb_cb_can_run(rdp)); + } else if (READ_ONCE(rdp->nocb_cb_sleep)) { + WARN_ON(signal_pending(current)); + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); + } + WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp)); local_irq_save(flags); - rcu_momentary_dyntick_idle(); + rcu_momentary_eqs(); local_irq_restore(flags); /* * Disable BH to provide the expected environment. Also, when @@ -842,37 +925,16 @@ static void nocb_cb_wait(struct rcu_data *rdp) raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ } - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) { - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) { - rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB); - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) - needwake_state = true; - } - if (rcu_segcblist_ready_cbs(cblist)) - can_sleep = false; + if (!rcu_segcblist_ready_cbs(cblist)) { + WRITE_ONCE(rdp->nocb_cb_sleep, true); + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); } else { - /* - * De-offloading. Clear our flag and notify the de-offload worker. - * We won't touch the callbacks and keep sleeping until we ever - * get re-offloaded. - */ - WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)); - rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB); - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) - needwake_state = true; + WRITE_ONCE(rdp->nocb_cb_sleep, false); } - WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep); - - if (rdp->nocb_cb_sleep) - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); - rcu_nocb_unlock_irqrestore(rdp, flags); if (needwake_gp) rcu_gp_kthread_wake(); - - if (needwake_state) - swake_up_one(&rdp->nocb_state_wq); } /* @@ -923,7 +985,7 @@ static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp, static void do_nocb_deferred_wakeup_timer(struct timer_list *t) { unsigned long flags; - struct rcu_data *rdp = from_timer(rdp, t, nocb_timer); + struct rcu_data *rdp = timer_container_of(rdp, t, nocb_timer); WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer")); @@ -956,97 +1018,93 @@ void rcu_nocb_flush_deferred_wakeup(void) } EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup); -static int rdp_offload_toggle(struct rcu_data *rdp, - bool offload, unsigned long flags) - __releases(rdp->nocb_lock) +static int rcu_nocb_queue_toggle_rdp(struct rcu_data *rdp) { - struct rcu_segcblist *cblist = &rdp->cblist; struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; bool wake_gp = false; - - rcu_segcblist_offload(cblist, offload); - - if (rdp->nocb_cb_sleep) - rdp->nocb_cb_sleep = false; - rcu_nocb_unlock_irqrestore(rdp, flags); - - /* - * Ignore former value of nocb_cb_sleep and force wake up as it could - * have been spuriously set to false already. - */ - swake_up_one(&rdp->nocb_cb_wq); + unsigned long flags; raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); + // Queue this rdp for add/del to/from the list to iterate on rcuog + WRITE_ONCE(rdp_gp->nocb_toggling_rdp, rdp); if (rdp_gp->nocb_gp_sleep) { rdp_gp->nocb_gp_sleep = false; wake_gp = true; } raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags); - if (wake_gp) - wake_up_process(rdp_gp->nocb_gp_kthread); + return wake_gp; +} - return 0; +static bool rcu_nocb_rdp_deoffload_wait_cond(struct rcu_data *rdp) +{ + unsigned long flags; + bool ret; + + /* + * Locking makes sure rcuog is done handling this rdp before deoffloaded + * enqueue can happen. Also it keeps the SEGCBLIST_OFFLOADED flag stable + * while the ->nocb_lock is held. + */ + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + ret = !rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + + return ret; } -static long rcu_nocb_rdp_deoffload(void *arg) +static int rcu_nocb_rdp_deoffload(struct rcu_data *rdp) { - struct rcu_data *rdp = arg; - struct rcu_segcblist *cblist = &rdp->cblist; unsigned long flags; - int ret; + int wake_gp; + struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; - WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id()); + /* CPU must be offline, unless it's early boot */ + WARN_ON_ONCE(cpu_online(rdp->cpu) && rdp->cpu != raw_smp_processor_id()); pr_info("De-offloading %d\n", rdp->cpu); - rcu_nocb_lock_irqsave(rdp, flags); - /* - * Flush once and for all now. This suffices because we are - * running on the target CPU holding ->nocb_lock (thus having - * interrupts disabled), and because rdp_offload_toggle() - * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED. - * Thus future calls to rcu_segcblist_completely_offloaded() will - * return false, which means that future calls to rcu_nocb_try_bypass() - * will refuse to put anything into the bypass. - */ - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); - /* - * Start with invoking rcu_core() early. This way if the current thread - * happens to preempt an ongoing call to rcu_core() in the middle, - * leaving some work dismissed because rcu_core() still thinks the rdp is - * completely offloaded, we are guaranteed a nearby future instance of - * rcu_core() to catch up. - */ - rcu_segcblist_set_flags(cblist, SEGCBLIST_RCU_CORE); - invoke_rcu_core(); - ret = rdp_offload_toggle(rdp, false, flags); - swait_event_exclusive(rdp->nocb_state_wq, - !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB | - SEGCBLIST_KTHREAD_GP)); - /* Stop nocb_gp_wait() from iterating over this structure. */ - list_del_rcu(&rdp->nocb_entry_rdp); - /* - * Lock one last time to acquire latest callback updates from kthreads - * so we can later handle callbacks locally without locking. - */ - rcu_nocb_lock_irqsave(rdp, flags); - /* - * Theoretically we could clear SEGCBLIST_LOCKING after the nocb - * lock is released but how about being paranoid for once? - */ - rcu_segcblist_clear_flags(cblist, SEGCBLIST_LOCKING); + /* Flush all callbacks from segcblist and bypass */ + rcu_barrier(); + /* - * Without SEGCBLIST_LOCKING, we can't use - * rcu_nocb_unlock_irqrestore() anymore. + * Make sure the rcuoc kthread isn't in the middle of a nocb locked + * sequence while offloading is deactivated, along with nocb locking. */ - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + if (rdp->nocb_cb_kthread) + kthread_park(rdp->nocb_cb_kthread); - /* Sanity check */ + rcu_nocb_lock_irqsave(rdp, flags); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); + WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist)); + rcu_nocb_unlock_irqrestore(rdp, flags); + wake_gp = rcu_nocb_queue_toggle_rdp(rdp); - return ret; + mutex_lock(&rdp_gp->nocb_gp_kthread_mutex); + + if (rdp_gp->nocb_gp_kthread) { + if (wake_gp) + wake_up_process(rdp_gp->nocb_gp_kthread); + + swait_event_exclusive(rdp->nocb_state_wq, + rcu_nocb_rdp_deoffload_wait_cond(rdp)); + } else { + /* + * No kthread to clear the flags for us or remove the rdp from the nocb list + * to iterate. Do it here instead. Locking doesn't look stricly necessary + * but we stick to paranoia in this rare path. + */ + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_OFFLOADED); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + + list_del(&rdp->nocb_entry_rdp); + } + + mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex); + + return 0; } int rcu_nocb_cpu_deoffload(int cpu) @@ -1054,33 +1112,42 @@ int rcu_nocb_cpu_deoffload(int cpu) struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); int ret = 0; - mutex_lock(&rcu_state.barrier_mutex); cpus_read_lock(); + mutex_lock(&rcu_state.nocb_mutex); if (rcu_rdp_is_offloaded(rdp)) { - if (cpu_online(cpu)) { - ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp); + if (!cpu_online(cpu)) { + ret = rcu_nocb_rdp_deoffload(rdp); if (!ret) cpumask_clear_cpu(cpu, rcu_nocb_mask); } else { - pr_info("NOCB: Can't CB-deoffload an offline CPU\n"); + pr_info("NOCB: Cannot CB-deoffload online CPU %d\n", rdp->cpu); ret = -EINVAL; } } + mutex_unlock(&rcu_state.nocb_mutex); cpus_read_unlock(); - mutex_unlock(&rcu_state.barrier_mutex); return ret; } EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload); -static long rcu_nocb_rdp_offload(void *arg) +static bool rcu_nocb_rdp_offload_wait_cond(struct rcu_data *rdp) { - struct rcu_data *rdp = arg; - struct rcu_segcblist *cblist = &rdp->cblist; unsigned long flags; - int ret; + bool ret; - WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id()); + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + ret = rcu_segcblist_test_flags(&rdp->cblist, SEGCBLIST_OFFLOADED); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + + return ret; +} + +static int rcu_nocb_rdp_offload(struct rcu_data *rdp) +{ + int wake_gp; + + WARN_ON_ONCE(cpu_online(rdp->cpu)); /* * For now we only support re-offload, ie: the rdp must have been * offloaded on boot first. @@ -1088,54 +1155,24 @@ static long rcu_nocb_rdp_offload(void *arg) if (!rdp->nocb_gp_rdp) return -EINVAL; + if (WARN_ON_ONCE(!rdp->nocb_gp_kthread)) + return -EINVAL; + pr_info("Offloading %d\n", rdp->cpu); - /* - * Cause future nocb_gp_wait() invocations to iterate over - * structure, resetting ->nocb_gp_sleep and waking up the related - * "rcuog". Since nocb_gp_wait() in turn locks ->nocb_gp_lock - * before setting ->nocb_gp_sleep again, we are guaranteed to - * iterate this newly added structure before "rcuog" goes to - * sleep again. - */ - list_add_tail_rcu(&rdp->nocb_entry_rdp, &rdp->nocb_gp_rdp->nocb_head_rdp); + WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); + WARN_ON_ONCE(rcu_segcblist_n_cbs(&rdp->cblist)); - /* - * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING - * is set. - */ - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + wake_gp = rcu_nocb_queue_toggle_rdp(rdp); + if (wake_gp) + wake_up_process(rdp->nocb_gp_kthread); - /* - * We didn't take the nocb lock while working on the - * rdp->cblist with SEGCBLIST_LOCKING cleared (pure softirq/rcuc mode). - * Every modifications that have been done previously on - * rdp->cblist must be visible remotely by the nocb kthreads - * upon wake up after reading the cblist flags. - * - * The layout against nocb_lock enforces that ordering: - * - * __rcu_nocb_rdp_offload() nocb_cb_wait()/nocb_gp_wait() - * ------------------------- ---------------------------- - * WRITE callbacks rcu_nocb_lock() - * rcu_nocb_lock() READ flags - * WRITE flags READ callbacks - * rcu_nocb_unlock() rcu_nocb_unlock() - */ - ret = rdp_offload_toggle(rdp, true, flags); swait_event_exclusive(rdp->nocb_state_wq, - rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) && - rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)); + rcu_nocb_rdp_offload_wait_cond(rdp)); - /* - * All kthreads are ready to work, we can finally relieve rcu_core() and - * enable nocb bypass. - */ - rcu_nocb_lock_irqsave(rdp, flags); - rcu_segcblist_clear_flags(cblist, SEGCBLIST_RCU_CORE); - rcu_nocb_unlock_irqrestore(rdp, flags); + kthread_unpark(rdp->nocb_cb_kthread); - return ret; + return 0; } int rcu_nocb_cpu_offload(int cpu) @@ -1143,53 +1180,153 @@ int rcu_nocb_cpu_offload(int cpu) struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); int ret = 0; - mutex_lock(&rcu_state.barrier_mutex); cpus_read_lock(); + mutex_lock(&rcu_state.nocb_mutex); if (!rcu_rdp_is_offloaded(rdp)) { - if (cpu_online(cpu)) { - ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp); + if (!cpu_online(cpu)) { + ret = rcu_nocb_rdp_offload(rdp); if (!ret) cpumask_set_cpu(cpu, rcu_nocb_mask); } else { - pr_info("NOCB: Can't CB-offload an offline CPU\n"); + pr_info("NOCB: Cannot CB-offload online CPU %d\n", rdp->cpu); ret = -EINVAL; } } + mutex_unlock(&rcu_state.nocb_mutex); cpus_read_unlock(); - mutex_unlock(&rcu_state.barrier_mutex); return ret; } EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload); +#ifdef CONFIG_RCU_LAZY +static unsigned long +lazy_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) +{ + int cpu; + unsigned long count = 0; + + if (WARN_ON_ONCE(!cpumask_available(rcu_nocb_mask))) + return 0; + + /* Protect rcu_nocb_mask against concurrent (de-)offloading. */ + if (!mutex_trylock(&rcu_state.nocb_mutex)) + return 0; + + /* Snapshot count of all CPUs */ + for_each_cpu(cpu, rcu_nocb_mask) { + struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); + + count += READ_ONCE(rdp->lazy_len); + } + + mutex_unlock(&rcu_state.nocb_mutex); + + return count ? count : SHRINK_EMPTY; +} + +static unsigned long +lazy_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc) +{ + int cpu; + unsigned long flags; + unsigned long count = 0; + + if (WARN_ON_ONCE(!cpumask_available(rcu_nocb_mask))) + return 0; + /* + * Protect against concurrent (de-)offloading. Otherwise nocb locking + * may be ignored or imbalanced. + */ + if (!mutex_trylock(&rcu_state.nocb_mutex)) { + /* + * But really don't insist if nocb_mutex is contended since we + * can't guarantee that it will never engage in a dependency + * chain involving memory allocation. The lock is seldom contended + * anyway. + */ + return 0; + } + + /* Snapshot count of all CPUs */ + for_each_cpu(cpu, rcu_nocb_mask) { + struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); + int _count; + + if (WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp))) + continue; + + if (!READ_ONCE(rdp->lazy_len)) + continue; + + rcu_nocb_lock_irqsave(rdp, flags); + /* + * Recheck under the nocb lock. Since we are not holding the bypass + * lock we may still race with increments from the enqueuer but still + * we know for sure if there is at least one lazy callback. + */ + _count = READ_ONCE(rdp->lazy_len); + if (!_count) { + rcu_nocb_unlock_irqrestore(rdp, flags); + continue; + } + rcu_nocb_try_flush_bypass(rdp, jiffies); + rcu_nocb_unlock_irqrestore(rdp, flags); + wake_nocb_gp(rdp, false); + sc->nr_to_scan -= _count; + count += _count; + if (sc->nr_to_scan <= 0) + break; + } + + mutex_unlock(&rcu_state.nocb_mutex); + + return count ? count : SHRINK_STOP; +} +#endif // #ifdef CONFIG_RCU_LAZY + void __init rcu_init_nohz(void) { int cpu; - bool need_rcu_nocb_mask = false; struct rcu_data *rdp; + const struct cpumask *cpumask = NULL; + struct shrinker * __maybe_unused lazy_rcu_shrinker; #if defined(CONFIG_NO_HZ_FULL) - if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask)) - need_rcu_nocb_mask = true; -#endif /* #if defined(CONFIG_NO_HZ_FULL) */ + if (tick_nohz_full_running && !cpumask_empty(tick_nohz_full_mask)) + cpumask = tick_nohz_full_mask; +#endif - if (need_rcu_nocb_mask) { + if (IS_ENABLED(CONFIG_RCU_NOCB_CPU_DEFAULT_ALL) && + !rcu_state.nocb_is_setup && !cpumask) + cpumask = cpu_possible_mask; + + if (cpumask) { if (!cpumask_available(rcu_nocb_mask)) { if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) { pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n"); return; } } - rcu_nocb_is_setup = true; + + cpumask_or(rcu_nocb_mask, rcu_nocb_mask, cpumask); + rcu_state.nocb_is_setup = true; } - if (!rcu_nocb_is_setup) + if (!rcu_state.nocb_is_setup) return; -#if defined(CONFIG_NO_HZ_FULL) - if (tick_nohz_full_running) - cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask); -#endif /* #if defined(CONFIG_NO_HZ_FULL) */ +#ifdef CONFIG_RCU_LAZY + lazy_rcu_shrinker = shrinker_alloc(0, "rcu-lazy"); + if (!lazy_rcu_shrinker) { + pr_err("Failed to allocate lazy_rcu shrinker!\n"); + } else { + lazy_rcu_shrinker->count_objects = lazy_rcu_shrink_count; + lazy_rcu_shrinker->scan_objects = lazy_rcu_shrink_scan; + + shrinker_register(lazy_rcu_shrinker); + } +#endif // #ifdef CONFIG_RCU_LAZY if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) { pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n"); @@ -1208,9 +1345,7 @@ void __init rcu_init_nohz(void) rdp = per_cpu_ptr(&rcu_data, cpu); if (rcu_segcblist_empty(&rdp->cblist)) rcu_segcblist_init(&rdp->cblist); - rcu_segcblist_offload(&rdp->cblist, true); - rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB | SEGCBLIST_KTHREAD_GP); - rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_RCU_CORE); + rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_OFFLOADED); } rcu_organize_nocb_kthreads(); } @@ -1226,6 +1361,8 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) raw_spin_lock_init(&rdp->nocb_gp_lock); timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0); rcu_cblist_init(&rdp->nocb_bypass); + WRITE_ONCE(rdp->lazy_len, 0); + mutex_init(&rdp->nocb_gp_kthread_mutex); } /* @@ -1238,8 +1375,9 @@ static void rcu_spawn_cpu_nocb_kthread(int cpu) struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); struct rcu_data *rdp_gp; struct task_struct *t; + struct sched_param sp; - if (!rcu_scheduler_fully_active || !rcu_nocb_is_setup) + if (!rcu_scheduler_fully_active || !rcu_state.nocb_is_setup) return; /* If there already is an rcuo kthread, then nothing to do. */ @@ -1247,38 +1385,54 @@ static void rcu_spawn_cpu_nocb_kthread(int cpu) return; /* If we didn't spawn the GP kthread first, reorganize! */ + sp.sched_priority = kthread_prio; rdp_gp = rdp->nocb_gp_rdp; + mutex_lock(&rdp_gp->nocb_gp_kthread_mutex); if (!rdp_gp->nocb_gp_kthread) { t = kthread_run(rcu_nocb_gp_kthread, rdp_gp, "rcuog/%d", rdp_gp->cpu); - if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__)) - return; + if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__)) { + mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex); + goto err; + } WRITE_ONCE(rdp_gp->nocb_gp_kthread, t); + if (kthread_prio) + sched_setscheduler_nocheck(t, SCHED_FIFO, &sp); } + mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex); /* Spawn the kthread for this CPU. */ - t = kthread_run(rcu_nocb_cb_kthread, rdp, - "rcuo%c/%d", rcu_state.abbr, cpu); + t = kthread_create(rcu_nocb_cb_kthread, rdp, + "rcuo%c/%d", rcu_state.abbr, cpu); if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__)) - return; + goto err; + + if (rcu_rdp_is_offloaded(rdp)) + wake_up_process(t); + else + kthread_park(t); + + if (IS_ENABLED(CONFIG_RCU_NOCB_CPU_CB_BOOST) && kthread_prio) + sched_setscheduler_nocheck(t, SCHED_FIFO, &sp); + WRITE_ONCE(rdp->nocb_cb_kthread, t); WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread); -} - -/* - * Once the scheduler is running, spawn rcuo kthreads for all online - * no-CBs CPUs. This assumes that the early_initcall()s happen before - * non-boot CPUs come online -- if this changes, we will need to add - * some mutual exclusion. - */ -static void __init rcu_spawn_nocb_kthreads(void) -{ - int cpu; + return; - if (rcu_nocb_is_setup) { - for_each_online_cpu(cpu) - rcu_spawn_cpu_nocb_kthread(cpu); +err: + /* + * No need to protect against concurrent rcu_barrier() + * because the number of callbacks should be 0 for a non-boot CPU, + * therefore rcu_barrier() shouldn't even try to grab the nocb_lock. + * But hold nocb_mutex to avoid nocb_lock imbalance from shrinker. + */ + WARN_ON_ONCE(system_state > SYSTEM_BOOTING && rcu_segcblist_n_cbs(&rdp->cblist)); + mutex_lock(&rcu_state.nocb_mutex); + if (rcu_rdp_is_offloaded(rdp)) { + rcu_nocb_rdp_deoffload(rdp); + cpumask_clear_cpu(cpu, rcu_nocb_mask); } + mutex_unlock(&rcu_state.nocb_mutex); } /* How many CB CPU IDs per GP kthread? Default of -1 for sqrt(nr_cpu_ids). */ @@ -1348,7 +1502,7 @@ static void __init rcu_organize_nocb_kthreads(void) */ void rcu_bind_current_to_nocb(void) { - if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask)) + if (cpumask_available(rcu_nocb_mask) && !cpumask_empty(rcu_nocb_mask)) WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask)); } EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb); @@ -1389,15 +1543,18 @@ static void show_rcu_nocb_gp_state(struct rcu_data *rdp) (long)rdp->nocb_gp_seq, rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops), rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.', - rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1, - show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread)); + rdp->nocb_gp_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1, + show_rcu_should_be_on_cpu(rdp->nocb_gp_kthread)); } /* Dump out nocb kthread state for the specified rcu_data structure. */ static void show_rcu_nocb_state(struct rcu_data *rdp) { - char bufw[20]; - char bufr[20]; + char bufd[22]; + char bufw[45]; + char bufr[45]; + char bufn[22]; + char bufb[22]; struct rcu_data *nocb_next_rdp; struct rcu_segcblist *rsclp = &rdp->cblist; bool waslocked; @@ -1406,19 +1563,25 @@ static void show_rcu_nocb_state(struct rcu_data *rdp) if (rdp->nocb_gp_rdp == rdp) show_rcu_nocb_gp_state(rdp); + if (!rcu_segcblist_is_offloaded(&rdp->cblist)) + return; + nocb_next_rdp = list_next_or_null_rcu(&rdp->nocb_gp_rdp->nocb_head_rdp, &rdp->nocb_entry_rdp, typeof(*rdp), nocb_entry_rdp); - sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]); - sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]); - pr_info(" CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n", + sprintf(bufd, "%ld", rsclp->seglen[RCU_DONE_TAIL]); + sprintf(bufw, "%ld(%ld)", rsclp->seglen[RCU_WAIT_TAIL], rsclp->gp_seq[RCU_WAIT_TAIL]); + sprintf(bufr, "%ld(%ld)", rsclp->seglen[RCU_NEXT_READY_TAIL], + rsclp->gp_seq[RCU_NEXT_READY_TAIL]); + sprintf(bufn, "%ld", rsclp->seglen[RCU_NEXT_TAIL]); + sprintf(bufb, "%ld", rcu_cblist_n_cbs(&rdp->nocb_bypass)); + pr_info(" CB %d^%d->%d %c%c%c%c%c F%ld L%ld C%d %c%s%c%s%c%s%c%s%c%s q%ld %c CPU %d%s\n", rdp->cpu, rdp->nocb_gp_rdp->cpu, nocb_next_rdp ? nocb_next_rdp->cpu : -1, "kK"[!!rdp->nocb_cb_kthread], "bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)], - "cC"[!!atomic_read(&rdp->nocb_lock_contended)], "lL"[raw_spin_is_locked(&rdp->nocb_lock)], "sS"[!!rdp->nocb_cb_sleep], ".W"[swait_active(&rdp->nocb_cb_wq)], @@ -1426,15 +1589,18 @@ static void show_rcu_nocb_state(struct rcu_data *rdp) jiffies - rdp->nocb_nobypass_last, rdp->nocb_nobypass_count, ".D"[rcu_segcblist_ready_cbs(rsclp)], + rcu_segcblist_segempty(rsclp, RCU_DONE_TAIL) ? "" : bufd, ".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)], rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw, ".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)], rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr, ".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)], + rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL) ? "" : bufn, ".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)], + !rcu_cblist_n_cbs(&rdp->nocb_bypass) ? "" : bufb, rcu_segcblist_n_cbs(&rdp->cblist), rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.', - rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1, + rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_cb_kthread) : -1, show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread)); /* It is OK for GP kthreads to have GP state. */ @@ -1455,16 +1621,6 @@ static void show_rcu_nocb_state(struct rcu_data *rdp) #else /* #ifdef CONFIG_RCU_NOCB_CPU */ -static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp) -{ - return 0; -} - -static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp) -{ - return false; -} - /* No ->nocb_lock to acquire. */ static void rcu_nocb_lock(struct rcu_data *rdp) { @@ -1501,16 +1657,21 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) { } +static bool wake_nocb_gp(struct rcu_data *rdp, bool force) +{ + return false; +} + static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - unsigned long j) + unsigned long j, bool lazy) { return true; } -static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - bool *was_alldone, unsigned long flags) +static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head, + rcu_callback_t func, unsigned long flags, bool lazy) { - return false; + WARN_ON_ONCE(1); /* Should be dead code! */ } static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, @@ -1537,10 +1698,6 @@ static void rcu_spawn_cpu_nocb_kthread(int cpu) { } -static void __init rcu_spawn_nocb_kthreads(void) -{ -} - static void show_rcu_nocb_state(struct rcu_data *rdp) { } |
