diff options
Diffstat (limited to 'kernel/rcu/tree.c')
-rw-r--r-- | kernel/rcu/tree.c | 1537 |
1 files changed, 555 insertions, 982 deletions
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index d9642dd06c25..475f31deed14 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -75,12 +75,10 @@ #define MODULE_PARAM_PREFIX "rcutree." /* Data structures. */ +static void rcu_sr_normal_gp_cleanup_work(struct work_struct *); static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = { .gpwrap = true, -#ifdef CONFIG_RCU_NOCB_CPU - .cblist.flags = SEGCBLIST_RCU_CORE, -#endif }; static struct rcu_state rcu_state = { .level = { &rcu_state.node[0] }, @@ -93,6 +91,12 @@ static struct rcu_state rcu_state = { .exp_mutex = __MUTEX_INITIALIZER(rcu_state.exp_mutex), .exp_wake_mutex = __MUTEX_INITIALIZER(rcu_state.exp_wake_mutex), .ofl_lock = __ARCH_SPIN_LOCK_UNLOCKED, + .srs_cleanup_work = __WORK_INITIALIZER(rcu_state.srs_cleanup_work, + rcu_sr_normal_gp_cleanup_work), + .srs_cleanups_pending = ATOMIC_INIT(0), +#ifdef CONFIG_RCU_NOCB_CPU + .nocb_mutex = __MUTEX_INITIALIZER(rcu_state.nocb_mutex), +#endif }; /* Dump rcu_node combining tree at boot to verify correct setup. */ @@ -145,7 +149,6 @@ static int rcu_scheduler_fully_active __read_mostly; static void rcu_report_qs_rnp(unsigned long mask, struct rcu_node *rnp, unsigned long gps, unsigned long flags); -static struct task_struct *rcu_boost_task(struct rcu_node *rnp); static void invoke_rcu_core(void); static void rcu_report_exp_rdp(struct rcu_data *rdp); static void sync_sched_exp_online_cleanup(int cpu); @@ -172,6 +175,9 @@ static int gp_init_delay; module_param(gp_init_delay, int, 0444); static int gp_cleanup_delay; module_param(gp_cleanup_delay, int, 0444); +static int nohz_full_patience_delay; +module_param(nohz_full_patience_delay, int, 0444); +static int nohz_full_patience_delay_jiffies; // Add delay to rcu_read_unlock() for strict grace periods. static int rcu_unlock_delay; @@ -179,26 +185,6 @@ static int rcu_unlock_delay; module_param(rcu_unlock_delay, int, 0444); #endif -/* - * This rcu parameter is runtime-read-only. It reflects - * a minimum allowed number of objects which can be cached - * per-CPU. Object size is equal to one page. This value - * can be changed at boot time. - */ -static int rcu_min_cached_objs = 5; -module_param(rcu_min_cached_objs, int, 0444); - -// A page shrinker can ask for pages to be freed to make them -// available for other parts of the system. This usually happens -// under low memory conditions, and in that case we should also -// defer page-cache filling for a short time period. -// -// The default value is 5 seconds, which is long enough to reduce -// interference with the shrinker while it asks other systems to -// drain their caches. -static int rcu_delay_page_cache_fill_msec = 5000; -module_param(rcu_delay_page_cache_fill_msec, int, 0444); - /* Retrieve RCU kthreads priority for rcutorture */ int rcu_get_gp_kthreads_prio(void) { @@ -240,76 +226,113 @@ static long rcu_get_n_cbs_cpu(int cpu) return 0; } +/** + * rcu_softirq_qs - Provide a set of RCU quiescent states in softirq processing + * + * Mark a quiescent state for RCU, Tasks RCU, and Tasks Trace RCU. + * This is a special-purpose function to be used in the softirq + * infrastructure and perhaps the occasional long-running softirq + * handler. + * + * Note that from RCU's viewpoint, a call to rcu_softirq_qs() is + * equivalent to momentarily completely enabling preemption. For + * example, given this code:: + * + * local_bh_disable(); + * do_something(); + * rcu_softirq_qs(); // A + * do_something_else(); + * local_bh_enable(); // B + * + * A call to synchronize_rcu() that began concurrently with the + * call to do_something() would be guaranteed to wait only until + * execution reached statement A. Without that rcu_softirq_qs(), + * that same synchronize_rcu() would instead be guaranteed to wait + * until execution reached statement B. + */ void rcu_softirq_qs(void) { + RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) || + lock_is_held(&rcu_lock_map) || + lock_is_held(&rcu_sched_lock_map), + "Illegal rcu_softirq_qs() in RCU read-side critical section"); rcu_qs(); rcu_preempt_deferred_qs(current); rcu_tasks_qs(current, false); } /* - * Reset the current CPU's ->dynticks counter to indicate that the + * Reset the current CPU's RCU_WATCHING counter to indicate that the * newly onlined CPU is no longer in an extended quiescent state. * This will either leave the counter unchanged, or increment it * to the next non-quiescent value. * * The non-atomic test/increment sequence works because the upper bits - * of the ->dynticks counter are manipulated only by the corresponding CPU, + * of the ->state variable are manipulated only by the corresponding CPU, * or when the corresponding CPU is offline. */ -static void rcu_dynticks_eqs_online(void) +static void rcu_watching_online(void) { - if (ct_dynticks() & RCU_DYNTICKS_IDX) + if (ct_rcu_watching() & CT_RCU_WATCHING) return; - ct_state_inc(RCU_DYNTICKS_IDX); + ct_state_inc(CT_RCU_WATCHING); } /* - * Snapshot the ->dynticks counter with full ordering so as to allow - * stable comparison of this counter with past and future snapshots. - */ -static int rcu_dynticks_snap(int cpu) -{ - smp_mb(); // Fundamental RCU ordering guarantee. - return ct_dynticks_cpu_acquire(cpu); -} - -/* - * Return true if the snapshot returned from rcu_dynticks_snap() + * Return true if the snapshot returned from ct_rcu_watching() * indicates that RCU is in an extended quiescent state. */ -static bool rcu_dynticks_in_eqs(int snap) +static bool rcu_watching_snap_in_eqs(int snap) { - return !(snap & RCU_DYNTICKS_IDX); + return !(snap & CT_RCU_WATCHING); } -/* - * Return true if the CPU corresponding to the specified rcu_data - * structure has spent some time in an extended quiescent state since - * rcu_dynticks_snap() returned the specified snapshot. +/** + * rcu_watching_snap_stopped_since() - Has RCU stopped watching a given CPU + * since the specified @snap? + * + * @rdp: The rcu_data corresponding to the CPU for which to check EQS. + * @snap: rcu_watching snapshot taken when the CPU wasn't in an EQS. + * + * Returns true if the CPU corresponding to @rdp has spent some time in an + * extended quiescent state since @snap. Note that this doesn't check if it + * /still/ is in an EQS, just that it went through one since @snap. + * + * This is meant to be used in a loop waiting for a CPU to go through an EQS. */ -static bool rcu_dynticks_in_eqs_since(struct rcu_data *rdp, int snap) +static bool rcu_watching_snap_stopped_since(struct rcu_data *rdp, int snap) { - return snap != rcu_dynticks_snap(rdp->cpu); + /* + * The first failing snapshot is already ordered against the accesses + * performed by the remote CPU after it exits idle. + * + * The second snapshot therefore only needs to order against accesses + * performed by the remote CPU prior to entering idle and therefore can + * rely solely on acquire semantics. + */ + if (WARN_ON_ONCE(rcu_watching_snap_in_eqs(snap))) + return true; + + return snap != ct_rcu_watching_cpu_acquire(rdp->cpu); } /* * Return true if the referenced integer is zero while the specified * CPU remains within a single extended quiescent state. */ -bool rcu_dynticks_zero_in_eqs(int cpu, int *vp) +bool rcu_watching_zero_in_eqs(int cpu, int *vp) { int snap; // If not quiescent, force back to earlier extended quiescent state. - snap = ct_dynticks_cpu(cpu) & ~RCU_DYNTICKS_IDX; - smp_rmb(); // Order ->dynticks and *vp reads. + snap = ct_rcu_watching_cpu(cpu) & ~CT_RCU_WATCHING; + smp_rmb(); // Order CT state and *vp reads. if (READ_ONCE(*vp)) return false; // Non-zero, so report failure; - smp_rmb(); // Order *vp read and ->dynticks re-read. + smp_rmb(); // Order *vp read and CT state re-read. // If still in the same extended quiescent state, we are good! - return snap == ct_dynticks_cpu(cpu); + return snap == ct_rcu_watching_cpu(cpu); } /* @@ -323,17 +346,17 @@ bool rcu_dynticks_zero_in_eqs(int cpu, int *vp) * * The caller must have disabled interrupts and must not be idle. */ -notrace void rcu_momentary_dyntick_idle(void) +notrace void rcu_momentary_eqs(void) { int seq; raw_cpu_write(rcu_data.rcu_need_heavy_qs, false); - seq = ct_state_inc(2 * RCU_DYNTICKS_IDX); + seq = ct_state_inc(2 * CT_RCU_WATCHING); /* It is illegal to call this from idle state. */ - WARN_ON_ONCE(!(seq & RCU_DYNTICKS_IDX)); + WARN_ON_ONCE(!(seq & CT_RCU_WATCHING)); rcu_preempt_deferred_qs(current); } -EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle); +EXPORT_SYMBOL_GPL(rcu_momentary_eqs); /** * rcu_is_cpu_rrupt_from_idle - see if 'interrupted' from idle @@ -355,13 +378,13 @@ static int rcu_is_cpu_rrupt_from_idle(void) lockdep_assert_irqs_disabled(); /* Check for counter underflows */ - RCU_LOCKDEP_WARN(ct_dynticks_nesting() < 0, - "RCU dynticks_nesting counter underflow!"); - RCU_LOCKDEP_WARN(ct_dynticks_nmi_nesting() <= 0, - "RCU dynticks_nmi_nesting counter underflow/zero!"); + RCU_LOCKDEP_WARN(ct_nesting() < 0, + "RCU nesting counter underflow!"); + RCU_LOCKDEP_WARN(ct_nmi_nesting() <= 0, + "RCU nmi_nesting counter underflow/zero!"); /* Are we at first interrupt nesting level? */ - nesting = ct_dynticks_nmi_nesting(); + nesting = ct_nmi_nesting(); if (nesting > 1) return false; @@ -371,7 +394,7 @@ static int rcu_is_cpu_rrupt_from_idle(void) WARN_ON_ONCE(!nesting && !is_idle_task(current)); /* Does CPU appear to be idle from an RCU standpoint? */ - return ct_dynticks_nesting() == 0; + return ct_nesting() == 0; } #define DEFAULT_RCU_BLIMIT (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD) ? 1000 : 10) @@ -508,17 +531,10 @@ static struct rcu_node *rcu_get_root(void) /* * Send along grace-period-related data for rcutorture diagnostics. */ -void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags, - unsigned long *gp_seq) +void rcutorture_get_gp_data(int *flags, unsigned long *gp_seq) { - switch (test_type) { - case RCU_FLAVOR: - *flags = READ_ONCE(rcu_state.gp_flags); - *gp_seq = rcu_seq_current(&rcu_state.gp_seq); - break; - default: - break; - } + *flags = READ_ONCE(rcu_state.gp_flags); + *gp_seq = rcu_seq_current(&rcu_state.gp_seq); } EXPORT_SYMBOL_GPL(rcutorture_get_gp_data); @@ -570,12 +586,12 @@ void rcu_irq_exit_check_preempt(void) { lockdep_assert_irqs_disabled(); - RCU_LOCKDEP_WARN(ct_dynticks_nesting() <= 0, - "RCU dynticks_nesting counter underflow/zero!"); - RCU_LOCKDEP_WARN(ct_dynticks_nmi_nesting() != - DYNTICK_IRQ_NONIDLE, - "Bad RCU dynticks_nmi_nesting counter\n"); - RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(), + RCU_LOCKDEP_WARN(ct_nesting() <= 0, + "RCU nesting counter underflow/zero!"); + RCU_LOCKDEP_WARN(ct_nmi_nesting() != + CT_NESTING_IRQ_NONIDLE, + "Bad RCU nmi_nesting counter\n"); + RCU_LOCKDEP_WARN(!rcu_is_watching_curr_cpu(), "RCU in extended quiescent state!"); } #endif /* #ifdef CONFIG_PROVE_RCU */ @@ -615,7 +631,7 @@ void __rcu_irq_enter_check_tick(void) if (in_nmi()) return; - RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(), + RCU_LOCKDEP_WARN(!rcu_is_watching_curr_cpu(), "Illegal rcu_irq_enter_check_tick() from extended quiescent state"); if (!tick_nohz_full_cpu(rdp->cpu) || @@ -697,7 +713,7 @@ notrace bool rcu_is_watching(void) bool ret; preempt_disable_notrace(); - ret = !rcu_dynticks_curr_cpu_in_eqs(); + ret = rcu_is_watching_curr_cpu(); preempt_enable_notrace(); return ret; } @@ -739,14 +755,25 @@ static void rcu_gpnum_ovf(struct rcu_node *rnp, struct rcu_data *rdp) } /* - * Snapshot the specified CPU's dynticks counter so that we can later + * Snapshot the specified CPU's RCU_WATCHING counter so that we can later * credit them with an implicit quiescent state. Return 1 if this CPU * is in dynticks idle mode, which is an extended quiescent state. */ -static int dyntick_save_progress_counter(struct rcu_data *rdp) +static int rcu_watching_snap_save(struct rcu_data *rdp) { - rdp->dynticks_snap = rcu_dynticks_snap(rdp->cpu); - if (rcu_dynticks_in_eqs(rdp->dynticks_snap)) { + /* + * Full ordering between remote CPU's post idle accesses and updater's + * accesses prior to current GP (and also the started GP sequence number) + * is enforced by rcu_seq_start() implicit barrier and even further by + * smp_mb__after_unlock_lock() barriers chained all the way throughout the + * rnp locking tree since rcu_gp_init() and up to the current leaf rnp + * locking. + * + * Ordering between remote CPU's pre idle accesses and post grace period + * updater's accesses is enforced by the below acquire semantic. + */ + rdp->watching_snap = ct_rcu_watching_cpu_acquire(rdp->cpu); + if (rcu_watching_snap_in_eqs(rdp->watching_snap)) { trace_rcu_fqs(rcu_state.name, rdp->gp_seq, rdp->cpu, TPS("dti")); rcu_gpnum_ovf(rdp->mynode, rdp); return 1; @@ -757,14 +784,14 @@ static int dyntick_save_progress_counter(struct rcu_data *rdp) /* * Returns positive if the specified CPU has passed through a quiescent state * by virtue of being in or having passed through an dynticks idle state since - * the last call to dyntick_save_progress_counter() for this same CPU, or by + * the last call to rcu_watching_snap_save() for this same CPU, or by * virtue of having been offline. * * Returns negative if the specified CPU needs a force resched. * * Returns zero otherwise. */ -static int rcu_implicit_dynticks_qs(struct rcu_data *rdp) +static int rcu_watching_snap_recheck(struct rcu_data *rdp) { unsigned long jtsq; int ret = 0; @@ -778,7 +805,7 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp) * read-side critical section that started before the beginning * of the current RCU grace period. */ - if (rcu_dynticks_in_eqs_since(rdp, rdp->dynticks_snap)) { + if (rcu_watching_snap_stopped_since(rdp, rdp->watching_snap)) { trace_rcu_fqs(rcu_state.name, rdp->gp_seq, rdp->cpu, TPS("dti")); rcu_gpnum_ovf(rnp, rdp); return 1; @@ -813,8 +840,8 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp) __func__, rnp1->grplo, rnp1->grphi, rnp1->qsmask, rnp1->qsmaskinit, rnp1->qsmaskinitnext, rnp1->rcu_gp_init_mask); pr_info("%s %d: %c online: %ld(%d) offline: %ld(%d)\n", __func__, rdp->cpu, ".o"[rcu_rdp_cpu_online(rdp)], - (long)rdp->rcu_onl_gp_seq, rdp->rcu_onl_gp_flags, - (long)rdp->rcu_ofl_gp_seq, rdp->rcu_ofl_gp_flags); + (long)rdp->rcu_onl_gp_seq, rdp->rcu_onl_gp_state, + (long)rdp->rcu_ofl_gp_seq, rdp->rcu_ofl_gp_state); return 1; /* Break things loose after complaining. */ } @@ -1423,6 +1450,326 @@ static void rcu_poll_gp_seq_end_unlocked(unsigned long *snap) } /* + * There is a single llist, which is used for handling + * synchronize_rcu() users' enqueued rcu_synchronize nodes. + * Within this llist, there are two tail pointers: + * + * wait tail: Tracks the set of nodes, which need to + * wait for the current GP to complete. + * done tail: Tracks the set of nodes, for which grace + * period has elapsed. These nodes processing + * will be done as part of the cleanup work + * execution by a kworker. + * + * At every grace period init, a new wait node is added + * to the llist. This wait node is used as wait tail + * for this new grace period. Given that there are a fixed + * number of wait nodes, if all wait nodes are in use + * (which can happen when kworker callback processing + * is delayed) and additional grace period is requested. + * This means, a system is slow in processing callbacks. + * + * TODO: If a slow processing is detected, a first node + * in the llist should be used as a wait-tail for this + * grace period, therefore users which should wait due + * to a slow process are handled by _this_ grace period + * and not next. + * + * Below is an illustration of how the done and wait + * tail pointers move from one set of rcu_synchronize nodes + * to the other, as grace periods start and finish and + * nodes are processed by kworker. + * + * + * a. Initial llist callbacks list: + * + * +----------+ +--------+ +-------+ + * | | | | | | + * | head |---------> | cb2 |--------->| cb1 | + * | | | | | | + * +----------+ +--------+ +-------+ + * + * + * + * b. New GP1 Start: + * + * WAIT TAIL + * | + * | + * v + * +----------+ +--------+ +--------+ +-------+ + * | | | | | | | | + * | head ------> wait |------> cb2 |------> | cb1 | + * | | | head1 | | | | | + * +----------+ +--------+ +--------+ +-------+ + * + * + * + * c. GP completion: + * + * WAIT_TAIL == DONE_TAIL + * + * DONE TAIL + * | + * | + * v + * +----------+ +--------+ +--------+ +-------+ + * | | | | | | | | + * | head ------> wait |------> cb2 |------> | cb1 | + * | | | head1 | | | | | + * +----------+ +--------+ +--------+ +-------+ + * + * + * + * d. New callbacks and GP2 start: + * + * WAIT TAIL DONE TAIL + * | | + * | | + * v v + * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+ + * | | | | | | | | | | | | | | + * | head ------> wait |--->| cb4 |--->| cb3 |--->|wait |--->| cb2 |--->| cb1 | + * | | | head2| | | | | |head1| | | | | + * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+ + * + * + * + * e. GP2 completion: + * + * WAIT_TAIL == DONE_TAIL + * DONE TAIL + * | + * | + * v + * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+ + * | | | | | | | | | | | | | | + * | head ------> wait |--->| cb4 |--->| cb3 |--->|wait |--->| cb2 |--->| cb1 | + * | | | head2| | | | | |head1| | | | | + * +----------+ +------+ +------+ +------+ +-----+ +-----+ +-----+ + * + * + * While the llist state transitions from d to e, a kworker + * can start executing rcu_sr_normal_gp_cleanup_work() and + * can observe either the old done tail (@c) or the new + * done tail (@e). So, done tail updates and reads need + * to use the rel-acq semantics. If the concurrent kworker + * observes the old done tail, the newly queued work + * execution will process the updated done tail. If the + * concurrent kworker observes the new done tail, then + * the newly queued work will skip processing the done + * tail, as workqueue semantics guarantees that the new + * work is executed only after the previous one completes. + * + * f. kworker callbacks processing complete: + * + * + * DONE TAIL + * | + * | + * v + * +----------+ +--------+ + * | | | | + * | head ------> wait | + * | | | head2 | + * +----------+ +--------+ + * + */ +static bool rcu_sr_is_wait_head(struct llist_node *node) +{ + return &(rcu_state.srs_wait_nodes)[0].node <= node && + node <= &(rcu_state.srs_wait_nodes)[SR_NORMAL_GP_WAIT_HEAD_MAX - 1].node; +} + +static struct llist_node *rcu_sr_get_wait_head(void) +{ + struct sr_wait_node *sr_wn; + int i; + + for (i = 0; i < SR_NORMAL_GP_WAIT_HEAD_MAX; i++) { + sr_wn = &(rcu_state.srs_wait_nodes)[i]; + + if (!atomic_cmpxchg_acquire(&sr_wn->inuse, 0, 1)) + return &sr_wn->node; + } + + return NULL; +} + +static void rcu_sr_put_wait_head(struct llist_node *node) +{ + struct sr_wait_node *sr_wn = container_of(node, struct sr_wait_node, node); + + atomic_set_release(&sr_wn->inuse, 0); +} + +/* Disabled by default. */ +static int rcu_normal_wake_from_gp; +module_param(rcu_normal_wake_from_gp, int, 0644); +static struct workqueue_struct *sync_wq; + +static void rcu_sr_normal_complete(struct llist_node *node) +{ + struct rcu_synchronize *rs = container_of( + (struct rcu_head *) node, struct rcu_synchronize, head); + unsigned long oldstate = (unsigned long) rs->head.func; + + WARN_ONCE(IS_ENABLED(CONFIG_PROVE_RCU) && + !poll_state_synchronize_rcu(oldstate), + "A full grace period is not passed yet: %lu", + rcu_seq_diff(get_state_synchronize_rcu(), oldstate)); + + /* Finally. */ + complete(&rs->completion); +} + +static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work) +{ + struct llist_node *done, *rcu, *next, *head; + + /* + * This work execution can potentially execute + * while a new done tail is being updated by + * grace period kthread in rcu_sr_normal_gp_cleanup(). + * So, read and updates of done tail need to + * follow acq-rel semantics. + * + * Given that wq semantics guarantees that a single work + * cannot execute concurrently by multiple kworkers, + * the done tail list manipulations are protected here. + */ + done = smp_load_acquire(&rcu_state.srs_done_tail); + if (WARN_ON_ONCE(!done)) + return; + + WARN_ON_ONCE(!rcu_sr_is_wait_head(done)); + head = done->next; + done->next = NULL; + + /* + * The dummy node, which is pointed to by the + * done tail which is acq-read above is not removed + * here. This allows lockless additions of new + * rcu_synchronize nodes in rcu_sr_normal_add_req(), + * while the cleanup work executes. The dummy + * nodes is removed, in next round of cleanup + * work execution. + */ + llist_for_each_safe(rcu, next, head) { + if (!rcu_sr_is_wait_head(rcu)) { + rcu_sr_normal_complete(rcu); + continue; + } + + rcu_sr_put_wait_head(rcu); + } + + /* Order list manipulations with atomic access. */ + atomic_dec_return_release(&rcu_state.srs_cleanups_pending); +} + +/* + * Helper function for rcu_gp_cleanup(). + */ +static void rcu_sr_normal_gp_cleanup(void) +{ + struct llist_node *wait_tail, *next = NULL, *rcu = NULL; + int done = 0; + + wait_tail = rcu_state.srs_wait_tail; + if (wait_tail == NULL) + return; + + rcu_state.srs_wait_tail = NULL; + ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_wait_tail); + WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail)); + + /* + * Process (a) and (d) cases. See an illustration. + */ + llist_for_each_safe(rcu, next, wait_tail->next) { + if (rcu_sr_is_wait_head(rcu)) + break; + + rcu_sr_normal_complete(rcu); + // It can be last, update a next on this step. + wait_tail->next = next; + + if (++done == SR_MAX_USERS_WAKE_FROM_GP) + break; + } + + /* + * Fast path, no more users to process except putting the second last + * wait head if no inflight-workers. If there are in-flight workers, + * they will remove the last wait head. + * + * Note that the ACQUIRE orders atomic access with list manipulation. + */ + if (wait_tail->next && wait_tail->next->next == NULL && + rcu_sr_is_wait_head(wait_tail->next) && + !atomic_read_acquire(&rcu_state.srs_cleanups_pending)) { + rcu_sr_put_wait_head(wait_tail->next); + wait_tail->next = NULL; + } + + /* Concurrent sr_normal_gp_cleanup work might observe this update. */ + ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_done_tail); + smp_store_release(&rcu_state.srs_done_tail, wait_tail); + + /* + * We schedule a work in order to perform a final processing + * of outstanding users(if still left) and releasing wait-heads + * added by rcu_sr_normal_gp_init() call. + */ + if (wait_tail->next) { + atomic_inc(&rcu_state.srs_cleanups_pending); + if (!queue_work(sync_wq, &rcu_state.srs_cleanup_work)) + atomic_dec(&rcu_state.srs_cleanups_pending); + } +} + +/* + * Helper function for rcu_gp_init(). + */ +static bool rcu_sr_normal_gp_init(void) +{ + struct llist_node *first; + struct llist_node *wait_head; + bool start_new_poll = false; + + first = READ_ONCE(rcu_state.srs_next.first); + if (!first || rcu_sr_is_wait_head(first)) + return start_new_poll; + + wait_head = rcu_sr_get_wait_head(); + if (!wait_head) { + // Kick another GP to retry. + start_new_poll = true; + return start_new_poll; + } + + /* Inject a wait-dummy-node. */ + llist_add(wait_head, &rcu_state.srs_next); + + /* + * A waiting list of rcu_synchronize nodes should be empty on + * this step, since a GP-kthread, rcu_gp_init() -> gp_cleanup(), + * rolls it over. If not, it is a BUG, warn a user. + */ + WARN_ON_ONCE(rcu_state.srs_wait_tail != NULL); + rcu_state.srs_wait_tail = wait_head; + ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_wait_tail); + + return start_new_poll; +} + +static void rcu_sr_normal_add_req(struct rcu_synchronize *rs) +{ + llist_add((struct llist_node *) &rs->head, &rcu_state.srs_next); +} + +/* * Initialize a new grace period. Return false if no grace period required. */ static noinline_for_stack bool rcu_gp_init(void) @@ -1432,10 +1779,11 @@ static noinline_for_stack bool rcu_gp_init(void) unsigned long mask; struct rcu_data *rdp; struct rcu_node *rnp = rcu_get_root(); + bool start_new_poll; WRITE_ONCE(rcu_state.gp_activity, jiffies); raw_spin_lock_irq_rcu_node(rnp); - if (!READ_ONCE(rcu_state.gp_flags)) { + if (!rcu_state.gp_flags) { /* Spurious wakeup, tell caller to go back to sleep. */ raw_spin_unlock_irq_rcu_node(rnp); return false; @@ -1456,11 +1804,25 @@ static noinline_for_stack bool rcu_gp_init(void) /* Record GP times before starting GP, hence rcu_seq_start(). */ rcu_seq_start(&rcu_state.gp_seq); ASSERT_EXCLUSIVE_WRITER(rcu_state.gp_seq); + start_new_poll = rcu_sr_normal_gp_init(); trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq, TPS("start")); rcu_poll_gp_seq_start(&rcu_state.gp_seq_polled_snap); raw_spin_unlock_irq_rcu_node(rnp); /* + * The "start_new_poll" is set to true, only when this GP is not able + * to handle anything and there are outstanding users. It happens when + * the rcu_sr_normal_gp_init() function was not able to insert a dummy + * separator to the llist, because there were no left any dummy-nodes. + * + * Number of dummy-nodes is fixed, it could be that we are run out of + * them, if so we start a new pool request to repeat a try. It is rare + * and it means that a system is doing a slow processing of callbacks. + */ + if (start_new_poll) + (void) start_poll_synchronize_rcu(); + + /* * Apply per-leaf buffered online and offline operations to * the rcu_node tree. Note that this new grace period need not * wait for subsequent online CPUs, and that RCU hooks in the CPU @@ -1472,7 +1834,7 @@ static noinline_for_stack bool rcu_gp_init(void) WRITE_ONCE(rcu_state.gp_state, RCU_GP_ONOFF); /* Exclude CPU hotplug operations. */ rcu_for_each_leaf_node(rnp) { - local_irq_save(flags); + local_irq_disable(); arch_spin_lock(&rcu_state.ofl_lock); raw_spin_lock_rcu_node(rnp); if (rnp->qsmaskinit == rnp->qsmaskinitnext && @@ -1480,7 +1842,7 @@ static noinline_for_stack bool rcu_gp_init(void) /* Nothing to do on this leaf rcu_node structure. */ raw_spin_unlock_rcu_node(rnp); arch_spin_unlock(&rcu_state.ofl_lock); - local_irq_restore(flags); + local_irq_enable(); continue; } @@ -1517,7 +1879,7 @@ static noinline_for_stack bool rcu_gp_init(void) raw_spin_unlock_rcu_node(rnp); arch_spin_unlock(&rcu_state.ofl_lock); - local_irq_restore(flags); + local_irq_enable(); } rcu_gp_slow(gp_preinit_delay); /* Races with CPU hotplug. */ @@ -1612,16 +1974,15 @@ static void rcu_gp_fqs(bool first_time) if (first_time) { /* Collect dyntick-idle snapshots. */ - force_qs_rnp(dyntick_save_progress_counter); + force_qs_rnp(rcu_watching_snap_save); } else { /* Handle dyntick-idle and offline CPUs. */ - force_qs_rnp(rcu_implicit_dynticks_qs); + force_qs_rnp(rcu_watching_snap_recheck); } /* Clear flag to prevent immediate re-entry. */ if (READ_ONCE(rcu_state.gp_flags) & RCU_GP_FLAG_FQS) { raw_spin_lock_irq_rcu_node(rnp); - WRITE_ONCE(rcu_state.gp_flags, - READ_ONCE(rcu_state.gp_flags) & ~RCU_GP_FLAG_FQS); + WRITE_ONCE(rcu_state.gp_flags, rcu_state.gp_flags & ~RCU_GP_FLAG_FQS); raw_spin_unlock_irq_rcu_node(rnp); } } @@ -1825,6 +2186,9 @@ static noinline void rcu_gp_cleanup(void) } raw_spin_unlock_irq_rcu_node(rnp); + // Make synchronize_rcu() users aware of the end of old grace period. + rcu_sr_normal_gp_cleanup(); + // If strict, make all CPUs aware of the end of the old grace period. if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD)) on_each_cpu(rcu_strict_gp_boundary, NULL, 0); @@ -1882,8 +2246,7 @@ static void rcu_report_qs_rsp(unsigned long flags) { raw_lockdep_assert_held_rcu_node(rcu_get_root()); WARN_ON_ONCE(!rcu_gp_in_progress()); - WRITE_ONCE(rcu_state.gp_flags, - READ_ONCE(rcu_state.gp_flags) | RCU_GP_FLAG_FQS); + WRITE_ONCE(rcu_state.gp_flags, rcu_state.gp_flags | RCU_GP_FLAG_FQS); raw_spin_unlock_irqrestore_rcu_node(rcu_get_root(), flags); rcu_gp_kthread_wake(); } @@ -2010,7 +2373,6 @@ rcu_report_qs_rdp(struct rcu_data *rdp) { unsigned long flags; unsigned long mask; - bool needacc = false; struct rcu_node *rnp; WARN_ON_ONCE(rdp->cpu != smp_processor_id()); @@ -2047,23 +2409,11 @@ rcu_report_qs_rdp(struct rcu_data *rdp) * to return true. So complain, but don't awaken. */ WARN_ON_ONCE(rcu_accelerate_cbs(rnp, rdp)); - } else if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) { - /* - * ...but NOCB kthreads may miss or delay callbacks acceleration - * if in the middle of a (de-)offloading process. - */ - needacc = true; } rcu_disable_urgency_upon_qs(rdp); rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags); /* ^^^ Released rnp->lock */ - - if (needacc) { - rcu_nocb_lock_irqsave(rdp, flags); - rcu_accelerate_cbs_unlocked(rnp, rdp); - rcu_nocb_unlock_irqrestore(rdp, flags); - } } } @@ -2398,8 +2748,7 @@ void rcu_force_quiescent_state(void) raw_spin_unlock_irqrestore_rcu_node(rnp_old, flags); return; /* Someone beat us to it. */ } - WRITE_ONCE(rcu_state.gp_flags, - READ_ONCE(rcu_state.gp_flags) | RCU_GP_FLAG_FQS); + WRITE_ONCE(rcu_state.gp_flags, rcu_state.gp_flags | RCU_GP_FLAG_FQS); raw_spin_unlock_irqrestore_rcu_node(rnp_old, flags); rcu_gp_kthread_wake(); } @@ -2419,24 +2768,6 @@ static __latent_entropy void rcu_core(void) unsigned long flags; struct rcu_data *rdp = raw_cpu_ptr(&rcu_data); struct rcu_node *rnp = rdp->mynode; - /* - * On RT rcu_core() can be preempted when IRQs aren't disabled. - * Therefore this function can race with concurrent NOCB (de-)offloading - * on this CPU and the below condition must be considered volatile. - * However if we race with: - * - * _ Offloading: In the worst case we accelerate or process callbacks - * concurrently with NOCB kthreads. We are guaranteed to - * call rcu_nocb_lock() if that happens. - * - * _ Deoffloading: In the worst case we miss callbacks acceleration or - * processing. This is fine because the early stage - * of deoffloading invokes rcu_core() after setting - * SEGCBLIST_RCU_CORE. So we guarantee that we'll process - * what could have been dismissed without the need to wait - * for the next rcu_pending() check in the next jiffy. - */ - const bool do_batch = !rcu_segcblist_completely_offloaded(&rdp->cblist); if (cpu_is_offline(smp_processor_id())) return; @@ -2456,17 +2787,17 @@ static __latent_entropy void rcu_core(void) /* No grace period and unregistered callbacks? */ if (!rcu_gp_in_progress() && - rcu_segcblist_is_enabled(&rdp->cblist) && do_batch) { - rcu_nocb_lock_irqsave(rdp, flags); + rcu_segcblist_is_enabled(&rdp->cblist) && !rcu_rdp_is_offloaded(rdp)) { + local_irq_save(flags); if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL)) rcu_accelerate_cbs_unlocked(rnp, rdp); - rcu_nocb_unlock_irqrestore(rdp, flags); + local_irq_restore(flags); } rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check()); /* If there are callbacks ready, invoke them. */ - if (do_batch && rcu_segcblist_ready_cbs(&rdp->cblist) && + if (!rcu_rdp_is_offloaded(rdp) && rcu_segcblist_ready_cbs(&rdp->cblist) && likely(READ_ONCE(rcu_scheduler_fully_active))) { rcu_do_batch(rdp); /* Re-invoke RCU core processing if there are callbacks remaining. */ @@ -2483,7 +2814,7 @@ static __latent_entropy void rcu_core(void) queue_work_on(rdp->cpu, rcu_gp_wq, &rdp->strict_work); } -static void rcu_core_si(struct softirq_action *h) +static void rcu_core_si(void) { rcu_core(); } @@ -2731,9 +3062,12 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in) } head->func = func; head->next = NULL; - kasan_record_aux_stack_noalloc(head); + kasan_record_aux_stack(head); + local_irq_save(flags); rdp = this_cpu_ptr(&rcu_data); + RCU_LOCKDEP_WARN(!rcu_rdp_cpu_online(rdp), "Callback enqueued on offline CPU!"); + lazy = lazy_in && !rcu_async_should_hurry(); /* Add the callback to our list. */ @@ -2839,724 +3173,60 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) } EXPORT_SYMBOL_GPL(call_rcu); -/* Maximum number of jiffies to wait before draining a batch. */ -#define KFREE_DRAIN_JIFFIES (5 * HZ) -#define KFREE_N_BATCHES 2 -#define FREE_N_CHANNELS 2 - -/** - * struct kvfree_rcu_bulk_data - single block to store kvfree_rcu() pointers - * @list: List node. All blocks are linked between each other - * @gp_snap: Snapshot of RCU state for objects placed to this bulk - * @nr_records: Number of active pointers in the array - * @records: Array of the kvfree_rcu() pointers - */ -struct kvfree_rcu_bulk_data { - struct list_head list; - struct rcu_gp_oldstate gp_snap; - unsigned long nr_records; - void *records[]; -}; - /* - * This macro defines how many entries the "records" array - * will contain. It is based on the fact that the size of - * kvfree_rcu_bulk_data structure becomes exactly one page. - */ -#define KVFREE_BULK_MAX_ENTR \ - ((PAGE_SIZE - sizeof(struct kvfree_rcu_bulk_data)) / sizeof(void *)) - -/** - * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests - * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period - * @head_free: List of kfree_rcu() objects waiting for a grace period - * @head_free_gp_snap: Grace-period snapshot to check for attempted premature frees. - * @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period - * @krcp: Pointer to @kfree_rcu_cpu structure - */ - -struct kfree_rcu_cpu_work { - struct rcu_work rcu_work; - struct rcu_head *head_free; - struct rcu_gp_oldstate head_free_gp_snap; - struct list_head bulk_head_free[FREE_N_CHANNELS]; - struct kfree_rcu_cpu *krcp; -}; - -/** - * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period - * @head: List of kfree_rcu() objects not yet waiting for a grace period - * @head_gp_snap: Snapshot of RCU state for objects placed to "@head" - * @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period - * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period - * @lock: Synchronize access to this structure - * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES - * @initialized: The @rcu_work fields have been initialized - * @head_count: Number of objects in rcu_head singular list - * @bulk_count: Number of objects in bulk-list - * @bkvcache: - * A simple cache list that contains objects for reuse purpose. - * In order to save some per-cpu space the list is singular. - * Even though it is lockless an access has to be protected by the - * per-cpu lock. - * @page_cache_work: A work to refill the cache when it is empty - * @backoff_page_cache_fill: Delay cache refills - * @work_in_progress: Indicates that page_cache_work is running - * @hrtimer: A hrtimer for scheduling a page_cache_work - * @nr_bkv_objs: number of allocated objects at @bkvcache. + * During early boot, any blocking grace-period wait automatically + * implies a grace period. * - * This is a per-CPU structure. The reason that it is not included in - * the rcu_data structure is to permit this code to be extracted from - * the RCU files. Such extraction could allow further optimization of - * the interactions with the slab allocators. - */ -struct kfree_rcu_cpu { - // Objects queued on a linked list - // through their rcu_head structures. - struct rcu_head *head; - unsigned long head_gp_snap; - atomic_t head_count; - - // Objects queued on a bulk-list. - struct list_head bulk_head[FREE_N_CHANNELS]; - atomic_t bulk_count[FREE_N_CHANNELS]; - - struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES]; - raw_spinlock_t lock; - struct delayed_work monitor_work; - bool initialized; - - struct delayed_work page_cache_work; - atomic_t backoff_page_cache_fill; - atomic_t work_in_progress; - struct hrtimer hrtimer; - - struct llist_head bkvcache; - int nr_bkv_objs; -}; - -static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc) = { - .lock = __RAW_SPIN_LOCK_UNLOCKED(krc.lock), -}; - -static __always_inline void -debug_rcu_bhead_unqueue(struct kvfree_rcu_bulk_data *bhead) -{ -#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD - int i; - - for (i = 0; i < bhead->nr_records; i++) - debug_rcu_head_unqueue((struct rcu_head *)(bhead->records[i])); -#endif -} - -static inline struct kfree_rcu_cpu * -krc_this_cpu_lock(unsigned long *flags) -{ - struct kfree_rcu_cpu *krcp; - - local_irq_save(*flags); // For safely calling this_cpu_ptr(). - krcp = this_cpu_ptr(&krc); - raw_spin_lock(&krcp->lock); - - return krcp; -} - -static inline void -krc_this_cpu_unlock(struct kfree_rcu_cpu *krcp, unsigned long flags) -{ - raw_spin_unlock_irqrestore(&krcp->lock, flags); -} - -static inline struct kvfree_rcu_bulk_data * -get_cached_bnode(struct kfree_rcu_cpu *krcp) -{ - if (!krcp->nr_bkv_objs) - return NULL; - - WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs - 1); - return (struct kvfree_rcu_bulk_data *) - llist_del_first(&krcp->bkvcache); -} - -static inline bool -put_cached_bnode(struct kfree_rcu_cpu *krcp, - struct kvfree_rcu_bulk_data *bnode) -{ - // Check the limit. - if (krcp->nr_bkv_objs >= rcu_min_cached_objs) - return false; - - llist_add((struct llist_node *) bnode, &krcp->bkvcache); - WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs + 1); - return true; -} - -static int -drain_page_cache(struct kfree_rcu_cpu *krcp) -{ - unsigned long flags; - struct llist_node *page_list, *pos, *n; - int freed = 0; - - if (!rcu_min_cached_objs) - return 0; - - raw_spin_lock_irqsave(&krcp->lock, flags); - page_list = llist_del_all(&krcp->bkvcache); - WRITE_ONCE(krcp->nr_bkv_objs, 0); - raw_spin_unlock_irqrestore(&krcp->lock, flags); - - llist_for_each_safe(pos, n, page_list) { - free_page((unsigned long)pos); - freed++; - } - - return freed; -} - -static void -kvfree_rcu_bulk(struct kfree_rcu_cpu *krcp, - struct kvfree_rcu_bulk_data *bnode, int idx) -{ - unsigned long flags; - int i; - - if (!WARN_ON_ONCE(!poll_state_synchronize_rcu_full(&bnode->gp_snap))) { - debug_rcu_bhead_unqueue(bnode); - rcu_lock_acquire(&rcu_callback_map); - if (idx == 0) { // kmalloc() / kfree(). - trace_rcu_invoke_kfree_bulk_callback( - rcu_state.name, bnode->nr_records, - bnode->records); - - kfree_bulk(bnode->nr_records, bnode->records); - } else { // vmalloc() / vfree(). - for (i = 0; i < bnode->nr_records; i++) { - trace_rcu_invoke_kvfree_callback( - rcu_state.name, bnode->records[i], 0); - - vfree(bnode->records[i]); - } - } - rcu_lock_release(&rcu_callback_map); - } - - raw_spin_lock_irqsave(&krcp->lock, flags); - if (put_cached_bnode(krcp, bnode)) - bnode = NULL; - raw_spin_unlock_irqrestore(&krcp->lock, flags); - - if (bnode) - free_page((unsigned long) bnode); - - cond_resched_tasks_rcu_qs(); -} - -static void -kvfree_rcu_list(struct rcu_head *head) -{ - struct rcu_head *next; - - for (; head; head = next) { - void *ptr = (void *) head->func; - unsigned long offset = (void *) head - ptr; - - next = head->next; - debug_rcu_head_unqueue((struct rcu_head *)ptr); - rcu_lock_acquire(&rcu_callback_map); - trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset); - - if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) - kvfree(ptr); - - rcu_lock_release(&rcu_callback_map); - cond_resched_tasks_rcu_qs(); - } -} - -/* - * This function is invoked in workqueue context after a grace period. - * It frees all the objects queued on ->bulk_head_free or ->head_free. - */ -static void kfree_rcu_work(struct work_struct *work) -{ - unsigned long flags; - struct kvfree_rcu_bulk_data *bnode, *n; - struct list_head bulk_head[FREE_N_CHANNELS]; - struct rcu_head *head; - struct kfree_rcu_cpu *krcp; - struct kfree_rcu_cpu_work *krwp; - struct rcu_gp_oldstate head_gp_snap; - int i; - - krwp = container_of(to_rcu_work(work), - struct kfree_rcu_cpu_work, rcu_work); - krcp = krwp->krcp; - - raw_spin_lock_irqsave(&krcp->lock, flags); - // Channels 1 and 2. - for (i = 0; i < FREE_N_CHANNELS; i++) - list_replace_init(&krwp->bulk_head_free[i], &bulk_head[i]); - - // Channel 3. - head = krwp->head_free; - krwp->head_free = NULL; - head_gp_snap = krwp->head_free_gp_snap; - raw_spin_unlock_irqrestore(&krcp->lock, flags); - - // Handle the first two channels. - for (i = 0; i < FREE_N_CHANNELS; i++) { - // Start from the tail page, so a GP is likely passed for it. - list_for_each_entry_safe(bnode, n, &bulk_head[i], list) - kvfree_rcu_bulk(krcp, bnode, i); - } - - /* - * This is used when the "bulk" path can not be used for the - * double-argument of kvfree_rcu(). This happens when the - * page-cache is empty, which means that objects are instead - * queued on a linked list through their rcu_head structures. - * This list is named "Channel 3". - */ - if (head && !WARN_ON_ONCE(!poll_state_synchronize_rcu_full(&head_gp_snap))) - kvfree_rcu_list(head); -} - -static bool -need_offload_krc(struct kfree_rcu_cpu *krcp) -{ - int i; - - for (i = 0; i < FREE_N_CHANNELS; i++) - if (!list_empty(&krcp->bulk_head[i])) - return true; - - return !!READ_ONCE(krcp->head); -} - -static bool -need_wait_for_krwp_work(struct kfree_rcu_cpu_work *krwp) -{ - int i; - - for (i = 0; i < FREE_N_CHANNELS; i++) - if (!list_empty(&krwp->bulk_head_free[i])) - return true; - - return !!krwp->head_free; -} - -static int krc_count(struct kfree_rcu_cpu *krcp) -{ - int sum = atomic_read(&krcp->head_count); - int i; - - for (i = 0; i < FREE_N_CHANNELS; i++) - sum += atomic_read(&krcp->bulk_count[i]); - - return sum; -} - -static void -schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp) -{ - long delay, delay_left; - - delay = krc_count(krcp) >= KVFREE_BULK_MAX_ENTR ? 1:KFREE_DRAIN_JIFFIES; - if (delayed_work_pending(&krcp->monitor_work)) { - delay_left = krcp->monitor_work.timer.expires - jiffies; - if (delay < delay_left) - mod_delayed_work(system_wq, &krcp->monitor_work, delay); - return; - } - queue_delayed_work(system_wq, &krcp->monitor_work, delay); -} - -static void -kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp) -{ - struct list_head bulk_ready[FREE_N_CHANNELS]; - struct kvfree_rcu_bulk_data *bnode, *n; - struct rcu_head *head_ready = NULL; - unsigned long flags; - int i; - - raw_spin_lock_irqsave(&krcp->lock, flags); - for (i = 0; i < FREE_N_CHANNELS; i++) { - INIT_LIST_HEAD(&bulk_ready[i]); - - list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) { - if (!poll_state_synchronize_rcu_full(&bnode->gp_snap)) - break; - - atomic_sub(bnode->nr_records, &krcp->bulk_count[i]); - list_move(&bnode->list, &bulk_ready[i]); - } - } - - if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) { - head_ready = krcp->head; - atomic_set(&krcp->head_count, 0); - WRITE_ONCE(krcp->head, NULL); - } - raw_spin_unlock_irqrestore(&krcp->lock, flags); - - for (i = 0; i < FREE_N_CHANNELS; i++) { - list_for_each_entry_safe(bnode, n, &bulk_ready[i], list) - kvfree_rcu_bulk(krcp, bnode, i); - } - - if (head_ready) - kvfree_rcu_list(head_ready); -} - -/* - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. + * Later on, this could in theory be the case for kernels built with + * CONFIG_SMP=y && CONFIG_PREEMPTION=y running on a single CPU, but this + * is not a common case. Furthermore, this optimization would cause + * the rcu_gp_oldstate structure to expand by 50%, so this potential + * grace-period optimization is ignored once the scheduler is running. */ -static void kfree_rcu_monitor(struct work_struct *work) -{ - struct kfree_rcu_cpu *krcp = container_of(work, - struct kfree_rcu_cpu, monitor_work.work); - unsigned long flags; - int i, j; - - // Drain ready for reclaim. - kvfree_rcu_drain_ready(krcp); - - raw_spin_lock_irqsave(&krcp->lock, flags); - - // Attempt to start a new batch. - for (i = 0; i < KFREE_N_BATCHES; i++) { - struct kfree_rcu_cpu_work *krwp = &(krcp->krw_arr[i]); - - // Try to detach bulk_head or head and attach it, only when - // all channels are free. Any channel is not free means at krwp - // there is on-going rcu work to handle krwp's free business. - if (need_wait_for_krwp_work(krwp)) - continue; - - // kvfree_rcu_drain_ready() might handle this krcp, if so give up. - if (need_offload_krc(krcp)) { - // Channel 1 corresponds to the SLAB-pointer bulk path. - // Channel 2 corresponds to vmalloc-pointer bulk path. - for (j = 0; j < FREE_N_CHANNELS; j++) { - if (list_empty(&krwp->bulk_head_free[j])) { - atomic_set(&krcp->bulk_count[j], 0); - list_replace_init(&krcp->bulk_head[j], - &krwp->bulk_head_free[j]); - } - } - - // Channel 3 corresponds to both SLAB and vmalloc - // objects queued on the linked list. - if (!krwp->head_free) { - krwp->head_free = krcp->head; - get_state_synchronize_rcu_full(&krwp->head_free_gp_snap); - atomic_set(&krcp->head_count, 0); - WRITE_ONCE(krcp->head, NULL); - } - - // One work is per one batch, so there are three - // "free channels", the batch can handle. It can - // be that the work is in the pending state when - // channels have been detached following by each - // other. - queue_rcu_work(system_wq, &krwp->rcu_work); - } - } - - raw_spin_unlock_irqrestore(&krcp->lock, flags); - - // If there is nothing to detach, it means that our job is - // successfully done here. In case of having at least one - // of the channels that is still busy we should rearm the - // work to repeat an attempt. Because previous batches are - // still in progress. - if (need_offload_krc(krcp)) - schedule_delayed_monitor_work(krcp); -} - -static enum hrtimer_restart -schedule_page_work_fn(struct hrtimer *t) -{ - struct kfree_rcu_cpu *krcp = - container_of(t, struct kfree_rcu_cpu, hrtimer); - - queue_delayed_work(system_highpri_wq, &krcp->page_cache_work, 0); - return HRTIMER_NORESTART; -} - -static void fill_page_cache_func(struct work_struct *work) -{ - struct kvfree_rcu_bulk_data *bnode; - struct kfree_rcu_cpu *krcp = - container_of(work, struct kfree_rcu_cpu, - page_cache_work.work); - unsigned long flags; - int nr_pages; - bool pushed; - int i; - - nr_pages = atomic_read(&krcp->backoff_page_cache_fill) ? - 1 : rcu_min_cached_objs; - - for (i = READ_ONCE(krcp->nr_bkv_objs); i < nr_pages; i++) { - bnode = (struct kvfree_rcu_bulk_data *) - __get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN); - - if (!bnode) - break; - - raw_spin_lock_irqsave(&krcp->lock, flags); - pushed = put_cached_bnode(krcp, bnode); - raw_spin_unlock_irqrestore(&krcp->lock, flags); - - if (!pushed) { - free_page((unsigned long) bnode); - break; - } - } - - atomic_set(&krcp->work_in_progress, 0); - atomic_set(&krcp->backoff_page_cache_fill, 0); -} - -static void -run_page_cache_worker(struct kfree_rcu_cpu *krcp) -{ - // If cache disabled, bail out. - if (!rcu_min_cached_objs) - return; - - if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING && - !atomic_xchg(&krcp->work_in_progress, 1)) { - if (atomic_read(&krcp->backoff_page_cache_fill)) { - queue_delayed_work(system_wq, - &krcp->page_cache_work, - msecs_to_jiffies(rcu_delay_page_cache_fill_msec)); - } else { - hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL); - krcp->hrtimer.function = schedule_page_work_fn; - hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL); - } - } -} - -// Record ptr in a page managed by krcp, with the pre-krc_this_cpu_lock() -// state specified by flags. If can_alloc is true, the caller must -// be schedulable and not be holding any locks or mutexes that might be -// acquired by the memory allocator or anything that it might invoke. -// Returns true if ptr was successfully recorded, else the caller must -// use a fallback. -static inline bool -add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp, - unsigned long *flags, void *ptr, bool can_alloc) +static int rcu_blocking_is_gp(void) { - struct kvfree_rcu_bulk_data *bnode; - int idx; - - *krcp = krc_this_cpu_lock(flags); - if (unlikely(!(*krcp)->initialized)) + if (rcu_scheduler_active != RCU_SCHEDULER_INACTIVE) { + might_sleep(); return false; - - idx = !!is_vmalloc_addr(ptr); - bnode = list_first_entry_or_null(&(*krcp)->bulk_head[idx], - struct kvfree_rcu_bulk_data, list); - - /* Check if a new block is required. */ - if (!bnode || bnode->nr_records == KVFREE_BULK_MAX_ENTR) { - bnode = get_cached_bnode(*krcp); - if (!bnode && can_alloc) { - krc_this_cpu_unlock(*krcp, *flags); - - // __GFP_NORETRY - allows a light-weight direct reclaim - // what is OK from minimizing of fallback hitting point of - // view. Apart of that it forbids any OOM invoking what is - // also beneficial since we are about to release memory soon. - // - // __GFP_NOMEMALLOC - prevents from consuming of all the - // memory reserves. Please note we have a fallback path. - // - // __GFP_NOWARN - it is supposed that an allocation can - // be failed under low memory or high memory pressure - // scenarios. - bnode = (struct kvfree_rcu_bulk_data *) - __get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN); - raw_spin_lock_irqsave(&(*krcp)->lock, *flags); - } - - if (!bnode) - return false; - - // Initialize the new block and attach it. - bnode->nr_records = 0; - list_add(&bnode->list, &(*krcp)->bulk_head[idx]); } - - // Finally insert and update the GP for this page. - bnode->records[bnode->nr_records++] = ptr; - get_state_synchronize_rcu_full(&bnode->gp_snap); - atomic_inc(&(*krcp)->bulk_count[idx]); - return true; } /* - * Queue a request for lazy invocation of the appropriate free routine - * after a grace period. Please note that three paths are maintained, - * two for the common case using arrays of pointers and a third one that - * is used only when the main paths cannot be used, for example, due to - * memory pressure. - * - * Each kvfree_call_rcu() request is added to a batch. The batch will be drained - * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will - * be free'd in workqueue context. This allows us to: batch requests together to - * reduce the number of grace periods during heavy kfree_rcu()/kvfree_rcu() load. + * Helper function for the synchronize_rcu() API. */ -void kvfree_call_rcu(struct rcu_head *head, void *ptr) +static void synchronize_rcu_normal(void) { - unsigned long flags; - struct kfree_rcu_cpu *krcp; - bool success; + struct rcu_synchronize rs; - /* - * Please note there is a limitation for the head-less - * variant, that is why there is a clear rule for such - * objects: it can be used from might_sleep() context - * only. For other places please embed an rcu_head to - * your data. - */ - if (!head) - might_sleep(); + trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("request")); - // Queue the object but don't yet schedule the batch. - if (debug_rcu_head_queue(ptr)) { - // Probable double kfree_rcu(), just leak. - WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n", - __func__, head); - - // Mark as success and leave. - return; + if (!READ_ONCE(rcu_normal_wake_from_gp)) { + wait_rcu_gp(call_rcu_hurry); + goto trace_complete_out; } - kasan_record_aux_stack_noalloc(ptr); - success = add_ptr_to_bulk_krc_lock(&krcp, &flags, ptr, !head); - if (!success) { - run_page_cache_worker(krcp); - - if (head == NULL) - // Inline if kvfree_rcu(one_arg) call. - goto unlock_return; - - head->func = ptr; - head->next = krcp->head; - WRITE_ONCE(krcp->head, head); - atomic_inc(&krcp->head_count); - - // Take a snapshot for this krcp. - krcp->head_gp_snap = get_state_synchronize_rcu(); - success = true; - } + init_rcu_head_on_stack(&rs.head); + init_completion(&rs.completion); /* - * The kvfree_rcu() caller considers the pointer freed at this point - * and likely removes any references to it. Since the actual slab - * freeing (and kmemleak_free()) is deferred, tell kmemleak to ignore - * this object (no scanning or false positives reporting). + * This code might be preempted, therefore take a GP + * snapshot before adding a request. */ - kmemleak_ignore(ptr); + if (IS_ENABLED(CONFIG_PROVE_RCU)) + rs.head.func = (void *) get_state_synchronize_rcu(); - // Set timer to drain after KFREE_DRAIN_JIFFIES. - if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING) - schedule_delayed_monitor_work(krcp); + rcu_sr_normal_add_req(&rs); -unlock_return: - krc_this_cpu_unlock(krcp, flags); + /* Kick a GP and start waiting. */ + (void) start_poll_synchronize_rcu(); - /* - * Inline kvfree() after synchronize_rcu(). We can do - * it from might_sleep() context only, so the current - * CPU can pass the QS state. - */ - if (!success) { - debug_rcu_head_unqueue((struct rcu_head *) ptr); - synchronize_rcu(); - kvfree(ptr); - } -} -EXPORT_SYMBOL_GPL(kvfree_call_rcu); - -static unsigned long -kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) -{ - int cpu; - unsigned long count = 0; - - /* Snapshot count of all CPUs */ - for_each_possible_cpu(cpu) { - struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu); - - count += krc_count(krcp); - count += READ_ONCE(krcp->nr_bkv_objs); - atomic_set(&krcp->backoff_page_cache_fill, 1); - } - - return count == 0 ? SHRINK_EMPTY : count; -} - -static unsigned long -kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc) -{ - int cpu, freed = 0; - - for_each_possible_cpu(cpu) { - int count; - struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu); - - count = krc_count(krcp); - count += drain_page_cache(krcp); - kfree_rcu_monitor(&krcp->monitor_work.work); - - sc->nr_to_scan -= count; - freed += count; - - if (sc->nr_to_scan <= 0) - break; - } - - return freed == 0 ? SHRINK_STOP : freed; -} + /* Now we can wait. */ + wait_for_completion(&rs.completion); + destroy_rcu_head_on_stack(&rs.head); -void __init kfree_rcu_scheduler_running(void) -{ - int cpu; - - for_each_possible_cpu(cpu) { - struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu); - - if (need_offload_krc(krcp)) - schedule_delayed_monitor_work(krcp); - } -} - -/* - * During early boot, any blocking grace-period wait automatically - * implies a grace period. - * - * Later on, this could in theory be the case for kernels built with - * CONFIG_SMP=y && CONFIG_PREEMPTION=y running on a single CPU, but this - * is not a common case. Furthermore, this optimization would cause - * the rcu_gp_oldstate structure to expand by 50%, so this potential - * grace-period optimization is ignored once the scheduler is running. - */ -static int rcu_blocking_is_gp(void) -{ - if (rcu_scheduler_active != RCU_SCHEDULER_INACTIVE) { - might_sleep(); - return false; - } - return true; +trace_complete_out: + trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("complete")); } /** @@ -3610,7 +3280,7 @@ void synchronize_rcu(void) if (rcu_gp_is_expedited()) synchronize_rcu_expedited(); else - wait_rcu_gp(call_rcu_hurry); + synchronize_rcu_normal(); return; } @@ -3710,7 +3380,6 @@ static void start_poll_synchronize_rcu_common(void) struct rcu_data *rdp; struct rcu_node *rnp; - lockdep_assert_irqs_enabled(); local_irq_save(flags); rdp = this_cpu_ptr(&rcu_data); rnp = rdp->mynode; @@ -3735,9 +3404,6 @@ static void start_poll_synchronize_rcu_common(void) * grace period has elapsed in the meantime. If the needed grace period * is not already slated to start, notifies RCU core of the need for that * grace period. - * - * Interrupts must be enabled for the case where it is necessary to awaken - * the grace-period kthread. */ unsigned long start_poll_synchronize_rcu(void) { @@ -3758,9 +3424,6 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu); * grace period (whether normal or expedited) has elapsed in the meantime. * If the needed grace period is not already slated to start, notifies * RCU core of the need for that grace period. - * - * Interrupts must be enabled for the case where it is necessary to awaken - * the grace-period kthread. */ void start_poll_synchronize_rcu_full(struct rcu_gp_oldstate *rgosp) { @@ -3938,11 +3601,15 @@ static int rcu_pending(int user) return 1; /* Is this a nohz_full CPU in userspace or idle? (Ignore RCU if so.) */ - if ((user || rcu_is_cpu_rrupt_from_idle()) && rcu_nohz_full_cpu()) + gp_in_progress = rcu_gp_in_progress(); + if ((user || rcu_is_cpu_rrupt_from_idle() || + (gp_in_progress && + time_before(jiffies, READ_ONCE(rcu_state.gp_start) + + nohz_full_patience_delay_jiffies))) && + rcu_nohz_full_cpu()) return 0; /* Is the RCU core waiting for a quiescent state from this CPU? */ - gp_in_progress = rcu_gp_in_progress(); if (rdp->core_needs_qs && !rdp->cpu_no_qs.b.norm && gp_in_progress) return 1; @@ -3990,6 +3657,7 @@ static void rcu_barrier_callback(struct rcu_head *rhp) { unsigned long __maybe_unused s = rcu_state.barrier_sequence; + rhp->next = rhp; // Mark the callback as having been invoked. if (atomic_dec_and_test(&rcu_state.barrier_cpu_count)) { rcu_barrier_trace(TPS("LastCB"), -1, s); complete(&rcu_state.barrier_completion); @@ -4303,7 +3971,7 @@ EXPORT_SYMBOL_GPL(rcu_lockdep_current_cpu_online); // whether spinlocks may be acquired safely. static bool rcu_init_invoked(void) { - return !!rcu_state.n_online_cpus; + return !!READ_ONCE(rcu_state.n_online_cpus); } /* @@ -4391,18 +4059,34 @@ rcu_boot_init_percpu_data(int cpu) /* Set up local state, ensuring consistent view of global state. */ rdp->grpmask = leaf_node_cpu_bit(rdp->mynode, cpu); INIT_WORK(&rdp->strict_work, strict_work_handler); - WARN_ON_ONCE(ct->dynticks_nesting != 1); - WARN_ON_ONCE(rcu_dynticks_in_eqs(rcu_dynticks_snap(cpu))); + WARN_ON_ONCE(ct->nesting != 1); + WARN_ON_ONCE(rcu_watching_snap_in_eqs(ct_rcu_watching_cpu(cpu))); rdp->barrier_seq_snap = rcu_state.barrier_sequence; rdp->rcu_ofl_gp_seq = rcu_state.gp_seq; - rdp->rcu_ofl_gp_flags = RCU_GP_CLEANED; + rdp->rcu_ofl_gp_state = RCU_GP_CLEANED; rdp->rcu_onl_gp_seq = rcu_state.gp_seq; - rdp->rcu_onl_gp_flags = RCU_GP_CLEANED; + rdp->rcu_onl_gp_state = RCU_GP_CLEANED; rdp->last_sched_clock = jiffies; rdp->cpu = cpu; rcu_boot_init_nocb_percpu_data(rdp); } +static void rcu_thread_affine_rnp(struct task_struct *t, struct rcu_node *rnp) +{ + cpumask_var_t affinity; + int cpu; + + if (!zalloc_cpumask_var(&affinity, GFP_KERNEL)) + return; + + for_each_leaf_node_possible_cpu(rnp, cpu) + cpumask_set_cpu(cpu, affinity); + + kthread_affine_preferred(t, affinity); + + free_cpumask_var(affinity); +} + struct kthread_worker *rcu_exp_gp_kworker; static void rcu_spawn_exp_par_gp_kworker(struct rcu_node *rnp) @@ -4425,16 +4109,9 @@ static void rcu_spawn_exp_par_gp_kworker(struct rcu_node *rnp) if (IS_ENABLED(CONFIG_RCU_EXP_KTHREAD)) sched_setscheduler_nocheck(kworker->task, SCHED_FIFO, ¶m); -} - -static struct task_struct *rcu_exp_par_gp_task(struct rcu_node *rnp) -{ - struct kthread_worker *kworker = READ_ONCE(rnp->exp_kworker); - if (!kworker) - return NULL; - - return kworker->task; + rcu_thread_affine_rnp(kworker->task, rnp); + wake_up_process(kworker->task); } static void __init rcu_start_exp_gp_kworker(void) @@ -4442,7 +4119,7 @@ static void __init rcu_start_exp_gp_kworker(void) const char *name = "rcu_exp_gp_kthread_worker"; struct sched_param param = { .sched_priority = kthread_prio }; - rcu_exp_gp_kworker = kthread_create_worker(0, name); + rcu_exp_gp_kworker = kthread_run_worker(0, name); if (IS_ERR_OR_NULL(rcu_exp_gp_kworker)) { pr_err("Failed to create %s!\n", name); rcu_exp_gp_kworker = NULL; @@ -4485,7 +4162,7 @@ int rcutree_prepare_cpu(unsigned int cpu) rdp->qlen_last_fqs_check = 0; rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs); rdp->blimit = blimit; - ct->dynticks_nesting = 1; /* CPU not up, no tearing. */ + ct->nesting = 1; /* CPU not up, no tearing. */ raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ /* @@ -4513,73 +4190,13 @@ int rcutree_prepare_cpu(unsigned int cpu) raw_spin_unlock_irqrestore_rcu_node(rnp, flags); rcu_spawn_rnp_kthreads(rnp); rcu_spawn_cpu_nocb_kthread(cpu); + ASSERT_EXCLUSIVE_WRITER(rcu_state.n_online_cpus); WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus + 1); return 0; } /* - * Update kthreads affinity during CPU-hotplug changes. - * - * Set the per-rcu_node kthread's affinity to cover all CPUs that are - * served by the rcu_node in question. The CPU hotplug lock is still - * held, so the value of rnp->qsmaskinit will be stable. - * - * We don't include outgoingcpu in the affinity set, use -1 if there is - * no outgoing CPU. If there are no CPUs left in the affinity set, - * this function allows the kthread to execute on any CPU. - * - * Any future concurrent calls are serialized via ->kthread_mutex. - */ -static void rcutree_affinity_setting(unsigned int cpu, int outgoingcpu) -{ - cpumask_var_t cm; - unsigned long mask; - struct rcu_data *rdp; - struct rcu_node *rnp; - struct task_struct *task_boost, *task_exp; - - rdp = per_cpu_ptr(&rcu_data, cpu); - rnp = rdp->mynode; - - task_boost = rcu_boost_task(rnp); - task_exp = rcu_exp_par_gp_task(rnp); - - /* - * If CPU is the boot one, those tasks are created later from early - * initcall since kthreadd must be created first. - */ - if (!task_boost && !task_exp) - return; - - if (!zalloc_cpumask_var(&cm, GFP_KERNEL)) - return; - - mutex_lock(&rnp->kthread_mutex); - mask = rcu_rnp_online_cpus(rnp); - for_each_leaf_node_possible_cpu(rnp, cpu) - if ((mask & leaf_node_cpu_bit(rnp, cpu)) && - cpu != outgoingcpu) - cpumask_set_cpu(cpu, cm); - cpumask_and(cm, cm, housekeeping_cpumask(HK_TYPE_RCU)); - if (cpumask_empty(cm)) { - cpumask_copy(cm, housekeeping_cpumask(HK_TYPE_RCU)); - if (outgoingcpu >= 0) - cpumask_clear_cpu(outgoingcpu, cm); - } - - if (task_exp) - set_cpus_allowed_ptr(task_exp, cm); - - if (task_boost) - set_cpus_allowed_ptr(task_boost, cm); - - mutex_unlock(&rnp->kthread_mutex); - - free_cpumask_var(cm); -} - -/* * Has the specified (known valid) CPU ever been fully online? */ bool rcu_cpu_beenfullyonline(int cpu) @@ -4607,7 +4224,6 @@ int rcutree_online_cpu(unsigned int cpu) if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE) return 0; /* Too early in boot for scheduler work. */ sync_sched_exp_online_cleanup(cpu); - rcutree_affinity_setting(cpu, -1); // Stop-machine done, so allow nohz_full to disable tick. tick_dep_clear(TICK_DEP_BIT_RCU); @@ -4644,7 +4260,7 @@ void rcutree_report_cpu_starting(unsigned int cpu) rnp = rdp->mynode; mask = rdp->grpmask; arch_spin_lock(&rcu_state.ofl_lock); - rcu_dynticks_eqs_online(); + rcu_watching_online(); raw_spin_lock(&rcu_state.barrier_lock); raw_spin_lock_rcu_node(rnp); WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext | mask); @@ -4656,7 +4272,7 @@ void rcutree_report_cpu_starting(unsigned int cpu) ASSERT_EXCLUSIVE_WRITER(rcu_state.ncpus); rcu_gpnum_ovf(rnp, rdp); /* Offline-induced counter wrap? */ rdp->rcu_onl_gp_seq = READ_ONCE(rcu_state.gp_seq); - rdp->rcu_onl_gp_flags = READ_ONCE(rcu_state.gp_flags); + rdp->rcu_onl_gp_state = READ_ONCE(rcu_state.gp_state); /* An incoming CPU should never be blocking a grace period. */ if (WARN_ON_ONCE(rnp->qsmask & mask)) { /* RCU waiting on incoming CPU? */ @@ -4707,7 +4323,7 @@ void rcutree_report_cpu_dead(void) arch_spin_lock(&rcu_state.ofl_lock); raw_spin_lock_irqsave_rcu_node(rnp, flags); /* Enforce GP memory-order guarantee. */ rdp->rcu_ofl_gp_seq = READ_ONCE(rcu_state.gp_seq); - rdp->rcu_ofl_gp_flags = READ_ONCE(rcu_state.gp_flags); + rdp->rcu_ofl_gp_state = READ_ONCE(rcu_state.gp_state); if (rnp->qsmask & mask) { /* RCU waiting on outgoing CPU? */ /* Report quiescent state -before- changing ->qsmaskinitnext! */ rcu_disable_urgency_upon_qs(rdp); @@ -4734,11 +4350,15 @@ void rcutree_migrate_callbacks(int cpu) struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); bool needwake; - if (rcu_rdp_is_offloaded(rdp) || - rcu_segcblist_empty(&rdp->cblist)) - return; /* No callbacks to migrate. */ + if (rcu_rdp_is_offloaded(rdp)) + return; raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags); + if (rcu_segcblist_empty(&rdp->cblist)) { + raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags); + return; /* No callbacks to migrate. */ + } + WARN_ON_ONCE(rcu_rdp_cpu_online(rdp)); rcu_barrier_entrain(rdp); my_rdp = this_cpu_ptr(&rcu_data); @@ -4781,6 +4401,7 @@ void rcutree_migrate_callbacks(int cpu) */ int rcutree_dead_cpu(unsigned int cpu) { + ASSERT_EXCLUSIVE_WRITER(rcu_state.n_online_cpus); WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus - 1); // Stop-machine done, so allow nohz_full to disable tick. tick_dep_clear(TICK_DEP_BIT_RCU); @@ -4819,8 +4440,6 @@ int rcutree_offline_cpu(unsigned int cpu) rnp->ffmask &= ~rdp->grpmask; raw_spin_unlock_irqrestore_rcu_node(rnp, flags); - rcutree_affinity_setting(cpu, cpu); - // nohz_full CPUs need the tick for stop-machine to work quickly tick_dep_set(TICK_DEP_BIT_RCU); return 0; @@ -5005,6 +4624,8 @@ static void __init rcu_init_one(void) while (i > rnp->grphi) rnp++; per_cpu_ptr(&rcu_data, i)->mynode = rnp; + per_cpu_ptr(&rcu_data, i)->barrier_head.next = + &per_cpu_ptr(&rcu_data, i)->barrier_head; rcu_boot_init_percpu_data(i); } } @@ -5083,8 +4704,7 @@ void rcu_init_geometry(void) * Complain and fall back to the compile-time values if this * limit is exceeded. */ - if (rcu_fanout_leaf < 2 || - rcu_fanout_leaf > sizeof(unsigned long) * 8) { + if (rcu_fanout_leaf < 2 || rcu_fanout_leaf > BITS_PER_LONG) { rcu_fanout_leaf = RCU_FANOUT_LEAF; WARN_ON(1); return; @@ -5149,62 +4769,12 @@ static void __init rcu_dump_rcu_node_tree(void) struct workqueue_struct *rcu_gp_wq; -static void __init kfree_rcu_batch_init(void) -{ - int cpu; - int i, j; - struct shrinker *kfree_rcu_shrinker; - - /* Clamp it to [0:100] seconds interval. */ - if (rcu_delay_page_cache_fill_msec < 0 || - rcu_delay_page_cache_fill_msec > 100 * MSEC_PER_SEC) { - - rcu_delay_page_cache_fill_msec = - clamp(rcu_delay_page_cache_fill_msec, 0, - (int) (100 * MSEC_PER_SEC)); - - pr_info("Adjusting rcutree.rcu_delay_page_cache_fill_msec to %d ms.\n", - rcu_delay_page_cache_fill_msec); - } - - for_each_possible_cpu(cpu) { - struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu); - - for (i = 0; i < KFREE_N_BATCHES; i++) { - INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work); - krcp->krw_arr[i].krcp = krcp; - - for (j = 0; j < FREE_N_CHANNELS; j++) - INIT_LIST_HEAD(&krcp->krw_arr[i].bulk_head_free[j]); - } - - for (i = 0; i < FREE_N_CHANNELS; i++) - INIT_LIST_HEAD(&krcp->bulk_head[i]); - - INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor); - INIT_DELAYED_WORK(&krcp->page_cache_work, fill_page_cache_func); - krcp->initialized = true; - } - - kfree_rcu_shrinker = shrinker_alloc(0, "rcu-kfree"); - if (!kfree_rcu_shrinker) { - pr_err("Failed to allocate kfree_rcu() shrinker!\n"); - return; - } - - kfree_rcu_shrinker->count_objects = kfree_rcu_shrink_count; - kfree_rcu_shrinker->scan_objects = kfree_rcu_shrink_scan; - - shrinker_register(kfree_rcu_shrinker); -} - void __init rcu_init(void) { int cpu = smp_processor_id(); rcu_early_boot_tests(); - kfree_rcu_batch_init(); rcu_bootup_announce(); sanitize_kthread_prio(); rcu_init_geometry(); @@ -5229,6 +4799,9 @@ void __init rcu_init(void) rcu_gp_wq = alloc_workqueue("rcu_gp", WQ_MEM_RECLAIM, 0); WARN_ON(!rcu_gp_wq); + sync_wq = alloc_workqueue("sync_wq", WQ_MEM_RECLAIM, 0); + WARN_ON(!sync_wq); + /* Fill in default value for rcutree.qovld boot parameter. */ /* -After- the rcu_node ->lock fields are initialized! */ if (qovld < 0) |