diff options
Diffstat (limited to 'kernel/rcu/srcutree.c')
-rw-r--r-- | kernel/rcu/srcutree.c | 1405 |
1 files changed, 1009 insertions, 396 deletions
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c index 6d3ef700fb0e..0351a4e83529 100644 --- a/kernel/rcu/srcutree.c +++ b/kernel/rcu/srcutree.c @@ -24,24 +24,12 @@ #include <linux/smp.h> #include <linux/delay.h> #include <linux/module.h> +#include <linux/slab.h> #include <linux/srcu.h> #include "rcu.h" #include "rcu_segcblist.h" -#ifndef data_race -#define data_race(expr) \ - ({ \ - expr; \ - }) -#endif -#ifndef ASSERT_EXCLUSIVE_WRITER -#define ASSERT_EXCLUSIVE_WRITER(var) do { } while (0) -#endif -#ifndef ASSERT_EXCLUSIVE_ACCESS -#define ASSERT_EXCLUSIVE_ACCESS(var) do { } while (0) -#endif - /* Holdoff in nanoseconds for auto-expediting. */ #define DEFAULT_SRCU_EXP_HOLDOFF (25 * 1000) static ulong exp_holdoff = DEFAULT_SRCU_EXP_HOLDOFF; @@ -51,6 +39,35 @@ module_param(exp_holdoff, ulong, 0444); static ulong counter_wrap_check = (ULONG_MAX >> 2); module_param(counter_wrap_check, ulong, 0444); +/* + * Control conversion to SRCU_SIZE_BIG: + * 0: Don't convert at all. + * 1: Convert at init_srcu_struct() time. + * 2: Convert when rcutorture invokes srcu_torture_stats_print(). + * 3: Decide at boot time based on system shape (default). + * 0x1x: Convert when excessive contention encountered. + */ +#define SRCU_SIZING_NONE 0 +#define SRCU_SIZING_INIT 1 +#define SRCU_SIZING_TORTURE 2 +#define SRCU_SIZING_AUTO 3 +#define SRCU_SIZING_CONTEND 0x10 +#define SRCU_SIZING_IS(x) ((convert_to_big & ~SRCU_SIZING_CONTEND) == x) +#define SRCU_SIZING_IS_NONE() (SRCU_SIZING_IS(SRCU_SIZING_NONE)) +#define SRCU_SIZING_IS_INIT() (SRCU_SIZING_IS(SRCU_SIZING_INIT)) +#define SRCU_SIZING_IS_TORTURE() (SRCU_SIZING_IS(SRCU_SIZING_TORTURE)) +#define SRCU_SIZING_IS_CONTEND() (convert_to_big & SRCU_SIZING_CONTEND) +static int convert_to_big = SRCU_SIZING_AUTO; +module_param(convert_to_big, int, 0444); + +/* Number of CPUs to trigger init_srcu_struct()-time transition to big. */ +static int big_cpu_lim __read_mostly = 128; +module_param(big_cpu_lim, int, 0444); + +/* Contention events per jiffy to initiate transition to big. */ +static int small_contention_lim __read_mostly = 100; +module_param(small_contention_lim, int, 0444); + /* Early-boot callback-management, so early that no lock is required! */ static LIST_HEAD(srcu_boot_list); static bool __read_mostly srcu_init_done; @@ -61,39 +78,90 @@ static void process_srcu(struct work_struct *work); static void srcu_delay_timer(struct timer_list *t); /* Wrappers for lock acquisition and release, see raw_spin_lock_rcu_node(). */ -#define spin_lock_rcu_node(p) \ -do { \ - spin_lock(&ACCESS_PRIVATE(p, lock)); \ - smp_mb__after_unlock_lock(); \ +#define spin_lock_rcu_node(p) \ +do { \ + spin_lock(&ACCESS_PRIVATE(p, lock)); \ + smp_mb__after_unlock_lock(); \ } while (0) #define spin_unlock_rcu_node(p) spin_unlock(&ACCESS_PRIVATE(p, lock)) -#define spin_lock_irq_rcu_node(p) \ -do { \ - spin_lock_irq(&ACCESS_PRIVATE(p, lock)); \ - smp_mb__after_unlock_lock(); \ +#define spin_lock_irq_rcu_node(p) \ +do { \ + spin_lock_irq(&ACCESS_PRIVATE(p, lock)); \ + smp_mb__after_unlock_lock(); \ } while (0) -#define spin_unlock_irq_rcu_node(p) \ +#define spin_unlock_irq_rcu_node(p) \ spin_unlock_irq(&ACCESS_PRIVATE(p, lock)) -#define spin_lock_irqsave_rcu_node(p, flags) \ -do { \ - spin_lock_irqsave(&ACCESS_PRIVATE(p, lock), flags); \ - smp_mb__after_unlock_lock(); \ +#define spin_lock_irqsave_rcu_node(p, flags) \ +do { \ + spin_lock_irqsave(&ACCESS_PRIVATE(p, lock), flags); \ + smp_mb__after_unlock_lock(); \ } while (0) -#define spin_unlock_irqrestore_rcu_node(p, flags) \ - spin_unlock_irqrestore(&ACCESS_PRIVATE(p, lock), flags) \ +#define spin_trylock_irqsave_rcu_node(p, flags) \ +({ \ + bool ___locked = spin_trylock_irqsave(&ACCESS_PRIVATE(p, lock), flags); \ + \ + if (___locked) \ + smp_mb__after_unlock_lock(); \ + ___locked; \ +}) + +#define spin_unlock_irqrestore_rcu_node(p, flags) \ + spin_unlock_irqrestore(&ACCESS_PRIVATE(p, lock), flags) \ /* - * Initialize SRCU combining tree. Note that statically allocated + * Initialize SRCU per-CPU data. Note that statically allocated * srcu_struct structures might already have srcu_read_lock() and * srcu_read_unlock() running against them. So if the is_static parameter * is set, don't initialize ->srcu_lock_count[] and ->srcu_unlock_count[]. */ -static void init_srcu_struct_nodes(struct srcu_struct *ssp, bool is_static) +static void init_srcu_struct_data(struct srcu_struct *ssp) +{ + int cpu; + struct srcu_data *sdp; + + /* + * Initialize the per-CPU srcu_data array, which feeds into the + * leaves of the srcu_node tree. + */ + WARN_ON_ONCE(ARRAY_SIZE(sdp->srcu_lock_count) != + ARRAY_SIZE(sdp->srcu_unlock_count)); + for_each_possible_cpu(cpu) { + sdp = per_cpu_ptr(ssp->sda, cpu); + spin_lock_init(&ACCESS_PRIVATE(sdp, lock)); + rcu_segcblist_init(&sdp->srcu_cblist); + sdp->srcu_cblist_invoking = false; + sdp->srcu_gp_seq_needed = ssp->srcu_sup->srcu_gp_seq; + sdp->srcu_gp_seq_needed_exp = ssp->srcu_sup->srcu_gp_seq; + sdp->mynode = NULL; + sdp->cpu = cpu; + INIT_WORK(&sdp->work, srcu_invoke_callbacks); + timer_setup(&sdp->delay_work, srcu_delay_timer, 0); + sdp->ssp = ssp; + } +} + +/* Invalid seq state, used during snp node initialization */ +#define SRCU_SNP_INIT_SEQ 0x2 + +/* + * Check whether sequence number corresponding to snp node, + * is invalid. + */ +static inline bool srcu_invl_snp_seq(unsigned long s) +{ + return s == SRCU_SNP_INIT_SEQ; +} + +/* + * Allocated and initialize SRCU combining tree. Returns @true if + * allocation succeeded and @false otherwise. + */ +static bool init_srcu_struct_nodes(struct srcu_struct *ssp, gfp_t gfp_flags) { int cpu; int i; @@ -103,10 +171,16 @@ static void init_srcu_struct_nodes(struct srcu_struct *ssp, bool is_static) struct srcu_node *snp; struct srcu_node *snp_first; + /* Initialize geometry if it has not already been initialized. */ + rcu_init_geometry(); + ssp->srcu_sup->node = kcalloc(rcu_num_nodes, sizeof(*ssp->srcu_sup->node), gfp_flags); + if (!ssp->srcu_sup->node) + return false; + /* Work out the overall tree geometry. */ - ssp->level[0] = &ssp->node[0]; + ssp->srcu_sup->level[0] = &ssp->srcu_sup->node[0]; for (i = 1; i < rcu_num_lvls; i++) - ssp->level[i] = ssp->level[i - 1] + num_rcu_lvl[i - 1]; + ssp->srcu_sup->level[i] = ssp->srcu_sup->level[i - 1] + num_rcu_lvl[i - 1]; rcu_init_levelspread(levelspread, num_rcu_lvl); /* Each pass through this loop initializes one srcu_node structure. */ @@ -115,23 +189,23 @@ static void init_srcu_struct_nodes(struct srcu_struct *ssp, bool is_static) WARN_ON_ONCE(ARRAY_SIZE(snp->srcu_have_cbs) != ARRAY_SIZE(snp->srcu_data_have_cbs)); for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++) { - snp->srcu_have_cbs[i] = 0; + snp->srcu_have_cbs[i] = SRCU_SNP_INIT_SEQ; snp->srcu_data_have_cbs[i] = 0; } - snp->srcu_gp_seq_needed_exp = 0; + snp->srcu_gp_seq_needed_exp = SRCU_SNP_INIT_SEQ; snp->grplo = -1; snp->grphi = -1; - if (snp == &ssp->node[0]) { + if (snp == &ssp->srcu_sup->node[0]) { /* Root node, special case. */ snp->srcu_parent = NULL; continue; } /* Non-root node. */ - if (snp == ssp->level[level + 1]) + if (snp == ssp->srcu_sup->level[level + 1]) level++; - snp->srcu_parent = ssp->level[level - 1] + - (snp - ssp->level[level]) / + snp->srcu_parent = ssp->srcu_sup->level[level - 1] + + (snp - ssp->srcu_sup->level[level]) / levelspread[level - 1]; } @@ -139,62 +213,73 @@ static void init_srcu_struct_nodes(struct srcu_struct *ssp, bool is_static) * Initialize the per-CPU srcu_data array, which feeds into the * leaves of the srcu_node tree. */ - WARN_ON_ONCE(ARRAY_SIZE(sdp->srcu_lock_count) != - ARRAY_SIZE(sdp->srcu_unlock_count)); level = rcu_num_lvls - 1; - snp_first = ssp->level[level]; + snp_first = ssp->srcu_sup->level[level]; for_each_possible_cpu(cpu) { sdp = per_cpu_ptr(ssp->sda, cpu); - spin_lock_init(&ACCESS_PRIVATE(sdp, lock)); - rcu_segcblist_init(&sdp->srcu_cblist); - sdp->srcu_cblist_invoking = false; - sdp->srcu_gp_seq_needed = ssp->srcu_gp_seq; - sdp->srcu_gp_seq_needed_exp = ssp->srcu_gp_seq; sdp->mynode = &snp_first[cpu / levelspread[level]]; for (snp = sdp->mynode; snp != NULL; snp = snp->srcu_parent) { if (snp->grplo < 0) snp->grplo = cpu; snp->grphi = cpu; } - sdp->cpu = cpu; - INIT_WORK(&sdp->work, srcu_invoke_callbacks); - timer_setup(&sdp->delay_work, srcu_delay_timer, 0); - sdp->ssp = ssp; - sdp->grpmask = 1 << (cpu - sdp->mynode->grplo); - if (is_static) - continue; - - /* Dynamically allocated, better be no srcu_read_locks()! */ - for (i = 0; i < ARRAY_SIZE(sdp->srcu_lock_count); i++) { - sdp->srcu_lock_count[i] = 0; - sdp->srcu_unlock_count[i] = 0; - } + sdp->grpmask = 1UL << (cpu - sdp->mynode->grplo); } + smp_store_release(&ssp->srcu_sup->srcu_size_state, SRCU_SIZE_WAIT_BARRIER); + return true; } /* * Initialize non-compile-time initialized fields, including the - * associated srcu_node and srcu_data structures. The is_static - * parameter is passed through to init_srcu_struct_nodes(), and - * also tells us that ->sda has already been wired up to srcu_data. + * associated srcu_node and srcu_data structures. The is_static parameter + * tells us that ->sda has already been wired up to srcu_data. */ static int init_srcu_struct_fields(struct srcu_struct *ssp, bool is_static) { - mutex_init(&ssp->srcu_cb_mutex); - mutex_init(&ssp->srcu_gp_mutex); + if (!is_static) + ssp->srcu_sup = kzalloc(sizeof(*ssp->srcu_sup), GFP_KERNEL); + if (!ssp->srcu_sup) + return -ENOMEM; + if (!is_static) + spin_lock_init(&ACCESS_PRIVATE(ssp->srcu_sup, lock)); + ssp->srcu_sup->srcu_size_state = SRCU_SIZE_SMALL; + ssp->srcu_sup->node = NULL; + mutex_init(&ssp->srcu_sup->srcu_cb_mutex); + mutex_init(&ssp->srcu_sup->srcu_gp_mutex); ssp->srcu_idx = 0; - ssp->srcu_gp_seq = 0; - ssp->srcu_barrier_seq = 0; - mutex_init(&ssp->srcu_barrier_mutex); - atomic_set(&ssp->srcu_barrier_cpu_cnt, 0); - INIT_DELAYED_WORK(&ssp->work, process_srcu); + ssp->srcu_sup->srcu_gp_seq = 0; + ssp->srcu_sup->srcu_barrier_seq = 0; + mutex_init(&ssp->srcu_sup->srcu_barrier_mutex); + atomic_set(&ssp->srcu_sup->srcu_barrier_cpu_cnt, 0); + INIT_DELAYED_WORK(&ssp->srcu_sup->work, process_srcu); + ssp->srcu_sup->sda_is_static = is_static; if (!is_static) ssp->sda = alloc_percpu(struct srcu_data); - init_srcu_struct_nodes(ssp, is_static); - ssp->srcu_gp_seq_needed_exp = 0; - ssp->srcu_last_gp_end = ktime_get_mono_fast_ns(); - smp_store_release(&ssp->srcu_gp_seq_needed, 0); /* Init done. */ - return ssp->sda ? 0 : -ENOMEM; + if (!ssp->sda) + goto err_free_sup; + init_srcu_struct_data(ssp); + ssp->srcu_sup->srcu_gp_seq_needed_exp = 0; + ssp->srcu_sup->srcu_last_gp_end = ktime_get_mono_fast_ns(); + if (READ_ONCE(ssp->srcu_sup->srcu_size_state) == SRCU_SIZE_SMALL && SRCU_SIZING_IS_INIT()) { + if (!init_srcu_struct_nodes(ssp, GFP_ATOMIC)) + goto err_free_sda; + WRITE_ONCE(ssp->srcu_sup->srcu_size_state, SRCU_SIZE_BIG); + } + ssp->srcu_sup->srcu_ssp = ssp; + smp_store_release(&ssp->srcu_sup->srcu_gp_seq_needed, 0); /* Init done. */ + return 0; + +err_free_sda: + if (!is_static) { + free_percpu(ssp->sda); + ssp->sda = NULL; + } +err_free_sup: + if (!is_static) { + kfree(ssp->srcu_sup); + ssp->srcu_sup = NULL; + } + return -ENOMEM; } #ifdef CONFIG_DEBUG_LOCK_ALLOC @@ -205,7 +290,6 @@ int __init_srcu_struct(struct srcu_struct *ssp, const char *name, /* Don't re-initialize a lock while it is held. */ debug_check_no_locks_freed((void *)ssp, sizeof(*ssp)); lockdep_init_map(&ssp->dep_map, name, key, 0); - spin_lock_init(&ACCESS_PRIVATE(ssp, lock)); return init_srcu_struct_fields(ssp, false); } EXPORT_SYMBOL_GPL(__init_srcu_struct); @@ -222,7 +306,6 @@ EXPORT_SYMBOL_GPL(__init_srcu_struct); */ int init_srcu_struct(struct srcu_struct *ssp) { - spin_lock_init(&ACCESS_PRIVATE(ssp, lock)); return init_srcu_struct_fields(ssp, false); } EXPORT_SYMBOL_GPL(init_srcu_struct); @@ -230,6 +313,86 @@ EXPORT_SYMBOL_GPL(init_srcu_struct); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ /* + * Initiate a transition to SRCU_SIZE_BIG with lock held. + */ +static void __srcu_transition_to_big(struct srcu_struct *ssp) +{ + lockdep_assert_held(&ACCESS_PRIVATE(ssp->srcu_sup, lock)); + smp_store_release(&ssp->srcu_sup->srcu_size_state, SRCU_SIZE_ALLOC); +} + +/* + * Initiate an idempotent transition to SRCU_SIZE_BIG. + */ +static void srcu_transition_to_big(struct srcu_struct *ssp) +{ + unsigned long flags; + + /* Double-checked locking on ->srcu_size-state. */ + if (smp_load_acquire(&ssp->srcu_sup->srcu_size_state) != SRCU_SIZE_SMALL) + return; + spin_lock_irqsave_rcu_node(ssp->srcu_sup, flags); + if (smp_load_acquire(&ssp->srcu_sup->srcu_size_state) != SRCU_SIZE_SMALL) { + spin_unlock_irqrestore_rcu_node(ssp->srcu_sup, flags); + return; + } + __srcu_transition_to_big(ssp); + spin_unlock_irqrestore_rcu_node(ssp->srcu_sup, flags); +} + +/* + * Check to see if the just-encountered contention event justifies + * a transition to SRCU_SIZE_BIG. + */ +static void spin_lock_irqsave_check_contention(struct srcu_struct *ssp) +{ + unsigned long j; + + if (!SRCU_SIZING_IS_CONTEND() || ssp->srcu_sup->srcu_size_state) + return; + j = jiffies; + if (ssp->srcu_sup->srcu_size_jiffies != j) { + ssp->srcu_sup->srcu_size_jiffies = j; + ssp->srcu_sup->srcu_n_lock_retries = 0; + } + if (++ssp->srcu_sup->srcu_n_lock_retries <= small_contention_lim) + return; + __srcu_transition_to_big(ssp); +} + +/* + * Acquire the specified srcu_data structure's ->lock, but check for + * excessive contention, which results in initiation of a transition + * to SRCU_SIZE_BIG. But only if the srcutree.convert_to_big module + * parameter permits this. + */ +static void spin_lock_irqsave_sdp_contention(struct srcu_data *sdp, unsigned long *flags) +{ + struct srcu_struct *ssp = sdp->ssp; + + if (spin_trylock_irqsave_rcu_node(sdp, *flags)) + return; + spin_lock_irqsave_rcu_node(ssp->srcu_sup, *flags); + spin_lock_irqsave_check_contention(ssp); + spin_unlock_irqrestore_rcu_node(ssp->srcu_sup, *flags); + spin_lock_irqsave_rcu_node(sdp, *flags); +} + +/* + * Acquire the specified srcu_struct structure's ->lock, but check for + * excessive contention, which results in initiation of a transition + * to SRCU_SIZE_BIG. But only if the srcutree.convert_to_big module + * parameter permits this. + */ +static void spin_lock_irqsave_ssp_contention(struct srcu_struct *ssp, unsigned long *flags) +{ + if (spin_trylock_irqsave_rcu_node(ssp->srcu_sup, *flags)) + return; + spin_lock_irqsave_rcu_node(ssp->srcu_sup, *flags); + spin_lock_irqsave_check_contention(ssp); +} + +/* * First-use initialization of statically allocated srcu_struct * structure. Wiring up the combining tree is more than can be * done with compile-time initialization, so this check is added @@ -242,15 +405,15 @@ static void check_init_srcu_struct(struct srcu_struct *ssp) unsigned long flags; /* The smp_load_acquire() pairs with the smp_store_release(). */ - if (!rcu_seq_state(smp_load_acquire(&ssp->srcu_gp_seq_needed))) /*^^^*/ + if (!rcu_seq_state(smp_load_acquire(&ssp->srcu_sup->srcu_gp_seq_needed))) /*^^^*/ return; /* Already initialized. */ - spin_lock_irqsave_rcu_node(ssp, flags); - if (!rcu_seq_state(ssp->srcu_gp_seq_needed)) { - spin_unlock_irqrestore_rcu_node(ssp, flags); + spin_lock_irqsave_rcu_node(ssp->srcu_sup, flags); + if (!rcu_seq_state(ssp->srcu_sup->srcu_gp_seq_needed)) { + spin_unlock_irqrestore_rcu_node(ssp->srcu_sup, flags); return; } init_srcu_struct_fields(ssp, true); - spin_unlock_irqrestore_rcu_node(ssp, flags); + spin_unlock_irqrestore_rcu_node(ssp->srcu_sup, flags); } /* @@ -265,7 +428,7 @@ static unsigned long srcu_readers_lock_idx(struct srcu_struct *ssp, int idx) for_each_possible_cpu(cpu) { struct srcu_data *cpuc = per_cpu_ptr(ssp->sda, cpu); - sum += READ_ONCE(cpuc->srcu_lock_count[idx]); + sum += atomic_long_read(&cpuc->srcu_lock_count[idx]); } return sum; } @@ -277,13 +440,18 @@ static unsigned long srcu_readers_lock_idx(struct srcu_struct *ssp, int idx) static unsigned long srcu_readers_unlock_idx(struct srcu_struct *ssp, int idx) { int cpu; + unsigned long mask = 0; unsigned long sum = 0; for_each_possible_cpu(cpu) { struct srcu_data *cpuc = per_cpu_ptr(ssp->sda, cpu); - sum += READ_ONCE(cpuc->srcu_unlock_count[idx]); + sum += atomic_long_read(&cpuc->srcu_unlock_count[idx]); + if (IS_ENABLED(CONFIG_PROVE_RCU)) + mask = mask | READ_ONCE(cpuc->srcu_nmi_safety); } + WARN_ONCE(IS_ENABLED(CONFIG_PROVE_RCU) && (mask & (mask >> 1)), + "Mixed NMI-safe readers for srcu_struct at %ps.\n", ssp); return sum; } @@ -312,24 +480,59 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *ssp, int idx) /* * If the locks are the same as the unlocks, then there must have - * been no readers on this index at some time in between. This does - * not mean that there are no more readers, as one could have read - * the current index but not have incremented the lock counter yet. + * been no readers on this index at some point in this function. + * But there might be more readers, as a task might have read + * the current ->srcu_idx but not yet have incremented its CPU's + * ->srcu_lock_count[idx] counter. In fact, it is possible + * that most of the tasks have been preempted between fetching + * ->srcu_idx and incrementing ->srcu_lock_count[idx]. And there + * could be almost (ULONG_MAX / sizeof(struct task_struct)) tasks + * in a system whose address space was fully populated with memory. + * Call this quantity Nt. + * + * So suppose that the updater is preempted at this point in the + * code for a long time. That now-preempted updater has already + * flipped ->srcu_idx (possibly during the preceding grace period), + * done an smp_mb() (again, possibly during the preceding grace + * period), and summed up the ->srcu_unlock_count[idx] counters. + * How many times can a given one of the aforementioned Nt tasks + * increment the old ->srcu_idx value's ->srcu_lock_count[idx] + * counter, in the absence of nesting? + * + * It can clearly do so once, given that it has already fetched + * the old value of ->srcu_idx and is just about to use that value + * to index its increment of ->srcu_lock_count[idx]. But as soon as + * it leaves that SRCU read-side critical section, it will increment + * ->srcu_unlock_count[idx], which must follow the updater's above + * read from that same value. Thus, as soon the reading task does + * an smp_mb() and a later fetch from ->srcu_idx, that task will be + * guaranteed to get the new index. Except that the increment of + * ->srcu_unlock_count[idx] in __srcu_read_unlock() is after the + * smp_mb(), and the fetch from ->srcu_idx in __srcu_read_lock() + * is before the smp_mb(). Thus, that task might not see the new + * value of ->srcu_idx until the -second- __srcu_read_lock(), + * which in turn means that this task might well increment + * ->srcu_lock_count[idx] for the old value of ->srcu_idx twice, + * not just once. + * + * However, it is important to note that a given smp_mb() takes + * effect not just for the task executing it, but also for any + * later task running on that same CPU. * - * So suppose that the updater is preempted here for so long - * that more than ULONG_MAX non-nested readers come and go in - * the meantime. It turns out that this cannot result in overflow - * because if a reader modifies its unlock count after we read it - * above, then that reader's next load of ->srcu_idx is guaranteed - * to get the new value, which will cause it to operate on the - * other bank of counters, where it cannot contribute to the - * overflow of these counters. This means that there is a maximum - * of 2*NR_CPUS increments, which cannot overflow given current - * systems, especially not on 64-bit systems. + * That is, there can be almost Nt + Nc further increments of + * ->srcu_lock_count[idx] for the old index, where Nc is the number + * of CPUs. But this is OK because the size of the task_struct + * structure limits the value of Nt and current systems limit Nc + * to a few thousand. * - * OK, how about nesting? This does impose a limit on nesting - * of floor(ULONG_MAX/NR_CPUS/2), which should be sufficient, - * especially on 64-bit systems. + * OK, but what about nesting? This does impose a limit on + * nesting of half of the size of the task_struct structure + * (measured in bytes), which should be sufficient. A late 2022 + * TREE01 rcutorture run reported this size to be no less than + * 9408 bytes, allowing up to 4704 levels of nesting, which is + * comfortably beyond excessive. Especially on 64-bit systems, + * which are unlikely to be configured with an address space fully + * populated with memory, at least not anytime soon. */ return srcu_readers_lock_idx(ssp, idx) == unlocks; } @@ -351,15 +554,60 @@ static bool srcu_readers_active(struct srcu_struct *ssp) for_each_possible_cpu(cpu) { struct srcu_data *cpuc = per_cpu_ptr(ssp->sda, cpu); - sum += READ_ONCE(cpuc->srcu_lock_count[0]); - sum += READ_ONCE(cpuc->srcu_lock_count[1]); - sum -= READ_ONCE(cpuc->srcu_unlock_count[0]); - sum -= READ_ONCE(cpuc->srcu_unlock_count[1]); + sum += atomic_long_read(&cpuc->srcu_lock_count[0]); + sum += atomic_long_read(&cpuc->srcu_lock_count[1]); + sum -= atomic_long_read(&cpuc->srcu_unlock_count[0]); + sum -= atomic_long_read(&cpuc->srcu_unlock_count[1]); } return sum; } -#define SRCU_INTERVAL 1 +/* + * We use an adaptive strategy for synchronize_srcu() and especially for + * synchronize_srcu_expedited(). We spin for a fixed time period + * (defined below, boot time configurable) to allow SRCU readers to exit + * their read-side critical sections. If there are still some readers + * after one jiffy, we repeatedly block for one jiffy time periods. + * The blocking time is increased as the grace-period age increases, + * with max blocking time capped at 10 jiffies. + */ +#define SRCU_DEFAULT_RETRY_CHECK_DELAY 5 + +static ulong srcu_retry_check_delay = SRCU_DEFAULT_RETRY_CHECK_DELAY; +module_param(srcu_retry_check_delay, ulong, 0444); + +#define SRCU_INTERVAL 1 // Base delay if no expedited GPs pending. +#define SRCU_MAX_INTERVAL 10 // Maximum incremental delay from slow readers. + +#define SRCU_DEFAULT_MAX_NODELAY_PHASE_LO 3UL // Lowmark on default per-GP-phase + // no-delay instances. +#define SRCU_DEFAULT_MAX_NODELAY_PHASE_HI 1000UL // Highmark on default per-GP-phase + // no-delay instances. + +#define SRCU_UL_CLAMP_LO(val, low) ((val) > (low) ? (val) : (low)) +#define SRCU_UL_CLAMP_HI(val, high) ((val) < (high) ? (val) : (high)) +#define SRCU_UL_CLAMP(val, low, high) SRCU_UL_CLAMP_HI(SRCU_UL_CLAMP_LO((val), (low)), (high)) +// per-GP-phase no-delay instances adjusted to allow non-sleeping poll upto +// one jiffies time duration. Mult by 2 is done to factor in the srcu_get_delay() +// called from process_srcu(). +#define SRCU_DEFAULT_MAX_NODELAY_PHASE_ADJUSTED \ + (2UL * USEC_PER_SEC / HZ / SRCU_DEFAULT_RETRY_CHECK_DELAY) + +// Maximum per-GP-phase consecutive no-delay instances. +#define SRCU_DEFAULT_MAX_NODELAY_PHASE \ + SRCU_UL_CLAMP(SRCU_DEFAULT_MAX_NODELAY_PHASE_ADJUSTED, \ + SRCU_DEFAULT_MAX_NODELAY_PHASE_LO, \ + SRCU_DEFAULT_MAX_NODELAY_PHASE_HI) + +static ulong srcu_max_nodelay_phase = SRCU_DEFAULT_MAX_NODELAY_PHASE; +module_param(srcu_max_nodelay_phase, ulong, 0444); + +// Maximum consecutive no-delay instances. +#define SRCU_DEFAULT_MAX_NODELAY (SRCU_DEFAULT_MAX_NODELAY_PHASE > 100 ? \ + SRCU_DEFAULT_MAX_NODELAY_PHASE : 100) + +static ulong srcu_max_nodelay = SRCU_DEFAULT_MAX_NODELAY; +module_param(srcu_max_nodelay, ulong, 0444); /* * Return grace-period delay, zero if there are expedited grace @@ -367,10 +615,25 @@ static bool srcu_readers_active(struct srcu_struct *ssp) */ static unsigned long srcu_get_delay(struct srcu_struct *ssp) { - if (ULONG_CMP_LT(READ_ONCE(ssp->srcu_gp_seq), - READ_ONCE(ssp->srcu_gp_seq_needed_exp))) - return 0; - return SRCU_INTERVAL; + unsigned long gpstart; + unsigned long j; + unsigned long jbase = SRCU_INTERVAL; + struct srcu_usage *sup = ssp->srcu_sup; + + if (ULONG_CMP_LT(READ_ONCE(sup->srcu_gp_seq), READ_ONCE(sup->srcu_gp_seq_needed_exp))) + jbase = 0; + if (rcu_seq_state(READ_ONCE(sup->srcu_gp_seq))) { + j = jiffies - 1; + gpstart = READ_ONCE(sup->srcu_gp_start); + if (time_after(j, gpstart)) + jbase += j - gpstart; + if (!jbase) { + WRITE_ONCE(sup->srcu_n_exp_nodelay, READ_ONCE(sup->srcu_n_exp_nodelay) + 1); + if (READ_ONCE(sup->srcu_n_exp_nodelay) > srcu_max_nodelay_phase) + jbase = 1; + } + } + return jbase > SRCU_MAX_INTERVAL ? SRCU_MAX_INTERVAL : jbase; } /** @@ -383,12 +646,13 @@ static unsigned long srcu_get_delay(struct srcu_struct *ssp) void cleanup_srcu_struct(struct srcu_struct *ssp) { int cpu; + struct srcu_usage *sup = ssp->srcu_sup; if (WARN_ON(!srcu_get_delay(ssp))) return; /* Just leak it! */ if (WARN_ON(srcu_readers_active(ssp))) return; /* Just leak it! */ - flush_delayed_work(&ssp->work); + flush_delayed_work(&sup->work); for_each_possible_cpu(cpu) { struct srcu_data *sdp = per_cpu_ptr(ssp->sda, cpu); @@ -397,17 +661,49 @@ void cleanup_srcu_struct(struct srcu_struct *ssp) if (WARN_ON(rcu_segcblist_n_cbs(&sdp->srcu_cblist))) return; /* Forgot srcu_barrier(), so just leak it! */ } - if (WARN_ON(rcu_seq_state(READ_ONCE(ssp->srcu_gp_seq)) != SRCU_STATE_IDLE) || + if (WARN_ON(rcu_seq_state(READ_ONCE(sup->srcu_gp_seq)) != SRCU_STATE_IDLE) || + WARN_ON(rcu_seq_current(&sup->srcu_gp_seq) != sup->srcu_gp_seq_needed) || WARN_ON(srcu_readers_active(ssp))) { - pr_info("%s: Active srcu_struct %p state: %d\n", - __func__, ssp, rcu_seq_state(READ_ONCE(ssp->srcu_gp_seq))); + pr_info("%s: Active srcu_struct %p read state: %d gp state: %lu/%lu\n", + __func__, ssp, rcu_seq_state(READ_ONCE(sup->srcu_gp_seq)), + rcu_seq_current(&sup->srcu_gp_seq), sup->srcu_gp_seq_needed); return; /* Caller forgot to stop doing call_srcu()? */ } - free_percpu(ssp->sda); - ssp->sda = NULL; + kfree(sup->node); + sup->node = NULL; + sup->srcu_size_state = SRCU_SIZE_SMALL; + if (!sup->sda_is_static) { + free_percpu(ssp->sda); + ssp->sda = NULL; + kfree(sup); + ssp->srcu_sup = NULL; + } } EXPORT_SYMBOL_GPL(cleanup_srcu_struct); +#ifdef CONFIG_PROVE_RCU +/* + * Check for consistent NMI safety. + */ +void srcu_check_nmi_safety(struct srcu_struct *ssp, bool nmi_safe) +{ + int nmi_safe_mask = 1 << nmi_safe; + int old_nmi_safe_mask; + struct srcu_data *sdp; + + /* NMI-unsafe use in NMI is a bad sign */ + WARN_ON_ONCE(!nmi_safe && in_nmi()); + sdp = raw_cpu_ptr(ssp->sda); + old_nmi_safe_mask = READ_ONCE(sdp->srcu_nmi_safety); + if (!old_nmi_safe_mask) { + WRITE_ONCE(sdp->srcu_nmi_safety, nmi_safe_mask); + return; + } + WARN_ONCE(old_nmi_safe_mask != nmi_safe_mask, "CPU %d old state %d new state %d\n", sdp->cpu, old_nmi_safe_mask, nmi_safe_mask); +} +EXPORT_SYMBOL_GPL(srcu_check_nmi_safety); +#endif /* CONFIG_PROVE_RCU */ + /* * Counts the new reader in the appropriate per-CPU element of the * srcu_struct. @@ -418,7 +714,7 @@ int __srcu_read_lock(struct srcu_struct *ssp) int idx; idx = READ_ONCE(ssp->srcu_idx) & 0x1; - this_cpu_inc(ssp->sda->srcu_lock_count[idx]); + this_cpu_inc(ssp->sda->srcu_lock_count[idx].counter); smp_mb(); /* B */ /* Avoid leaking the critical section. */ return idx; } @@ -432,38 +728,59 @@ EXPORT_SYMBOL_GPL(__srcu_read_lock); void __srcu_read_unlock(struct srcu_struct *ssp, int idx) { smp_mb(); /* C */ /* Avoid leaking the critical section. */ - this_cpu_inc(ssp->sda->srcu_unlock_count[idx]); + this_cpu_inc(ssp->sda->srcu_unlock_count[idx].counter); } EXPORT_SYMBOL_GPL(__srcu_read_unlock); +#ifdef CONFIG_NEED_SRCU_NMI_SAFE + /* - * We use an adaptive strategy for synchronize_srcu() and especially for - * synchronize_srcu_expedited(). We spin for a fixed time period - * (defined below) to allow SRCU readers to exit their read-side critical - * sections. If there are still some readers after a few microseconds, - * we repeatedly block for 1-millisecond time periods. + * Counts the new reader in the appropriate per-CPU element of the + * srcu_struct, but in an NMI-safe manner using RMW atomics. + * Returns an index that must be passed to the matching srcu_read_unlock(). */ -#define SRCU_RETRY_CHECK_DELAY 5 +int __srcu_read_lock_nmisafe(struct srcu_struct *ssp) +{ + int idx; + struct srcu_data *sdp = raw_cpu_ptr(ssp->sda); + + idx = READ_ONCE(ssp->srcu_idx) & 0x1; + atomic_long_inc(&sdp->srcu_lock_count[idx]); + smp_mb__after_atomic(); /* B */ /* Avoid leaking the critical section. */ + return idx; +} +EXPORT_SYMBOL_GPL(__srcu_read_lock_nmisafe); + +/* + * Removes the count for the old reader from the appropriate per-CPU + * element of the srcu_struct. Note that this may well be a different + * CPU than that which was incremented by the corresponding srcu_read_lock(). + */ +void __srcu_read_unlock_nmisafe(struct srcu_struct *ssp, int idx) +{ + struct srcu_data *sdp = raw_cpu_ptr(ssp->sda); + + smp_mb__before_atomic(); /* C */ /* Avoid leaking the critical section. */ + atomic_long_inc(&sdp->srcu_unlock_count[idx]); +} +EXPORT_SYMBOL_GPL(__srcu_read_unlock_nmisafe); + +#endif // CONFIG_NEED_SRCU_NMI_SAFE /* * Start an SRCU grace period. */ static void srcu_gp_start(struct srcu_struct *ssp) { - struct srcu_data *sdp = this_cpu_ptr(ssp->sda); int state; - lockdep_assert_held(&ACCESS_PRIVATE(ssp, lock)); - WARN_ON_ONCE(ULONG_CMP_GE(ssp->srcu_gp_seq, ssp->srcu_gp_seq_needed)); - spin_lock_rcu_node(sdp); /* Interrupts already disabled. */ - rcu_segcblist_advance(&sdp->srcu_cblist, - rcu_seq_current(&ssp->srcu_gp_seq)); - (void)rcu_segcblist_accelerate(&sdp->srcu_cblist, - rcu_seq_snap(&ssp->srcu_gp_seq)); - spin_unlock_rcu_node(sdp); /* Interrupts remain disabled. */ + lockdep_assert_held(&ACCESS_PRIVATE(ssp->srcu_sup, lock)); + WARN_ON_ONCE(ULONG_CMP_GE(ssp->srcu_sup->srcu_gp_seq, ssp->srcu_sup->srcu_gp_seq_needed)); + WRITE_ONCE(ssp->srcu_sup->srcu_gp_start, jiffies); + WRITE_ONCE(ssp->srcu_sup->srcu_n_exp_nodelay, 0); smp_mb(); /* Order prior store to ->srcu_gp_seq_needed vs. GP start. */ - rcu_seq_start(&ssp->srcu_gp_seq); - state = rcu_seq_state(ssp->srcu_gp_seq); + rcu_seq_start(&ssp->srcu_sup->srcu_gp_seq); + state = rcu_seq_state(ssp->srcu_sup->srcu_gp_seq); WARN_ON_ONCE(state != SRCU_STATE_SCAN1); } @@ -507,7 +824,7 @@ static void srcu_schedule_cbs_snp(struct srcu_struct *ssp, struct srcu_node *snp int cpu; for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) { - if (!(mask & (1 << (cpu - snp->grplo)))) + if (!(mask & (1UL << (cpu - snp->grplo)))) continue; srcu_schedule_cbs_sdp(per_cpu_ptr(ssp->sda, cpu), delay); } @@ -524,7 +841,7 @@ static void srcu_schedule_cbs_snp(struct srcu_struct *ssp, struct srcu_node *snp */ static void srcu_gp_end(struct srcu_struct *ssp) { - unsigned long cbdelay; + unsigned long cbdelay = 1; bool cbs; bool last_lvl; int cpu; @@ -533,71 +850,92 @@ static void srcu_gp_end(struct srcu_struct *ssp) int idx; unsigned long mask; struct srcu_data *sdp; + unsigned long sgsne; struct srcu_node *snp; + int ss_state; + struct srcu_usage *sup = ssp->srcu_sup; /* Prevent more than one additional grace period. */ - mutex_lock(&ssp->srcu_cb_mutex); + mutex_lock(&sup->srcu_cb_mutex); /* End the current grace period. */ - spin_lock_irq_rcu_node(ssp); - idx = rcu_seq_state(ssp->srcu_gp_seq); + spin_lock_irq_rcu_node(sup); + idx = rcu_seq_state(sup->srcu_gp_seq); WARN_ON_ONCE(idx != SRCU_STATE_SCAN2); - cbdelay = srcu_get_delay(ssp); - WRITE_ONCE(ssp->srcu_last_gp_end, ktime_get_mono_fast_ns()); - rcu_seq_end(&ssp->srcu_gp_seq); - gpseq = rcu_seq_current(&ssp->srcu_gp_seq); - if (ULONG_CMP_LT(ssp->srcu_gp_seq_needed_exp, gpseq)) - WRITE_ONCE(ssp->srcu_gp_seq_needed_exp, gpseq); - spin_unlock_irq_rcu_node(ssp); - mutex_unlock(&ssp->srcu_gp_mutex); + if (ULONG_CMP_LT(READ_ONCE(sup->srcu_gp_seq), READ_ONCE(sup->srcu_gp_seq_needed_exp))) + cbdelay = 0; + + WRITE_ONCE(sup->srcu_last_gp_end, ktime_get_mono_fast_ns()); + rcu_seq_end(&sup->srcu_gp_seq); + gpseq = rcu_seq_current(&sup->srcu_gp_seq); + if (ULONG_CMP_LT(sup->srcu_gp_seq_needed_exp, gpseq)) + WRITE_ONCE(sup->srcu_gp_seq_needed_exp, gpseq); + spin_unlock_irq_rcu_node(sup); + mutex_unlock(&sup->srcu_gp_mutex); /* A new grace period can start at this point. But only one. */ /* Initiate callback invocation as needed. */ - idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs); - srcu_for_each_node_breadth_first(ssp, snp) { - spin_lock_irq_rcu_node(snp); - cbs = false; - last_lvl = snp >= ssp->level[rcu_num_lvls - 1]; - if (last_lvl) - cbs = snp->srcu_have_cbs[idx] == gpseq; - snp->srcu_have_cbs[idx] = gpseq; - rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1); - if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq)) - WRITE_ONCE(snp->srcu_gp_seq_needed_exp, gpseq); - mask = snp->srcu_data_have_cbs[idx]; - snp->srcu_data_have_cbs[idx] = 0; - spin_unlock_irq_rcu_node(snp); - if (cbs) - srcu_schedule_cbs_snp(ssp, snp, mask, cbdelay); - - /* Occasionally prevent srcu_data counter wrap. */ - if (!(gpseq & counter_wrap_check) && last_lvl) - for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) { - sdp = per_cpu_ptr(ssp->sda, cpu); - spin_lock_irqsave_rcu_node(sdp, flags); - if (ULONG_CMP_GE(gpseq, - sdp->srcu_gp_seq_needed + 100)) - sdp->srcu_gp_seq_needed = gpseq; - if (ULONG_CMP_GE(gpseq, - sdp->srcu_gp_seq_needed_exp + 100)) - sdp->srcu_gp_seq_needed_exp = gpseq; - spin_unlock_irqrestore_rcu_node(sdp, flags); - } + ss_state = smp_load_acquire(&sup->srcu_size_state); + if (ss_state < SRCU_SIZE_WAIT_BARRIER) { + srcu_schedule_cbs_sdp(per_cpu_ptr(ssp->sda, get_boot_cpu_id()), + cbdelay); + } else { + idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs); + srcu_for_each_node_breadth_first(ssp, snp) { + spin_lock_irq_rcu_node(snp); + cbs = false; + last_lvl = snp >= sup->level[rcu_num_lvls - 1]; + if (last_lvl) + cbs = ss_state < SRCU_SIZE_BIG || snp->srcu_have_cbs[idx] == gpseq; + snp->srcu_have_cbs[idx] = gpseq; + rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1); + sgsne = snp->srcu_gp_seq_needed_exp; + if (srcu_invl_snp_seq(sgsne) || ULONG_CMP_LT(sgsne, gpseq)) + WRITE_ONCE(snp->srcu_gp_seq_needed_exp, gpseq); + if (ss_state < SRCU_SIZE_BIG) + mask = ~0; + else + mask = snp->srcu_data_have_cbs[idx]; + snp->srcu_data_have_cbs[idx] = 0; + spin_unlock_irq_rcu_node(snp); + if (cbs) + srcu_schedule_cbs_snp(ssp, snp, mask, cbdelay); + } } + /* Occasionally prevent srcu_data counter wrap. */ + if (!(gpseq & counter_wrap_check)) + for_each_possible_cpu(cpu) { + sdp = per_cpu_ptr(ssp->sda, cpu); + spin_lock_irqsave_rcu_node(sdp, flags); + if (ULONG_CMP_GE(gpseq, sdp->srcu_gp_seq_needed + 100)) + sdp->srcu_gp_seq_needed = gpseq; + if (ULONG_CMP_GE(gpseq, sdp->srcu_gp_seq_needed_exp + 100)) + sdp->srcu_gp_seq_needed_exp = gpseq; + spin_unlock_irqrestore_rcu_node(sdp, flags); + } + /* Callback initiation done, allow grace periods after next. */ - mutex_unlock(&ssp->srcu_cb_mutex); + mutex_unlock(&sup->srcu_cb_mutex); /* Start a new grace period if needed. */ - spin_lock_irq_rcu_node(ssp); - gpseq = rcu_seq_current(&ssp->srcu_gp_seq); + spin_lock_irq_rcu_node(sup); + gpseq = rcu_seq_current(&sup->srcu_gp_seq); if (!rcu_seq_state(gpseq) && - ULONG_CMP_LT(gpseq, ssp->srcu_gp_seq_needed)) { + ULONG_CMP_LT(gpseq, sup->srcu_gp_seq_needed)) { srcu_gp_start(ssp); - spin_unlock_irq_rcu_node(ssp); + spin_unlock_irq_rcu_node(sup); srcu_reschedule(ssp, 0); } else { - spin_unlock_irq_rcu_node(ssp); + spin_unlock_irq_rcu_node(sup); + } + + /* Transition to big if needed. */ + if (ss_state != SRCU_SIZE_SMALL && ss_state != SRCU_SIZE_BIG) { + if (ss_state == SRCU_SIZE_ALLOC) + init_srcu_struct_nodes(ssp, GFP_KERNEL); + else + smp_store_release(&sup->srcu_size_state, ss_state + 1); } } @@ -612,23 +950,27 @@ static void srcu_funnel_exp_start(struct srcu_struct *ssp, struct srcu_node *snp unsigned long s) { unsigned long flags; + unsigned long sgsne; - for (; snp != NULL; snp = snp->srcu_parent) { - if (rcu_seq_done(&ssp->srcu_gp_seq, s) || - ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s)) - return; - spin_lock_irqsave_rcu_node(snp, flags); - if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) { + if (snp) + for (; snp != NULL; snp = snp->srcu_parent) { + sgsne = READ_ONCE(snp->srcu_gp_seq_needed_exp); + if (WARN_ON_ONCE(rcu_seq_done(&ssp->srcu_sup->srcu_gp_seq, s)) || + (!srcu_invl_snp_seq(sgsne) && ULONG_CMP_GE(sgsne, s))) + return; + spin_lock_irqsave_rcu_node(snp, flags); + sgsne = snp->srcu_gp_seq_needed_exp; + if (!srcu_invl_snp_seq(sgsne) && ULONG_CMP_GE(sgsne, s)) { + spin_unlock_irqrestore_rcu_node(snp, flags); + return; + } + WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s); spin_unlock_irqrestore_rcu_node(snp, flags); - return; } - WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s); - spin_unlock_irqrestore_rcu_node(snp, flags); - } - spin_lock_irqsave_rcu_node(ssp, flags); - if (ULONG_CMP_LT(ssp->srcu_gp_seq_needed_exp, s)) - WRITE_ONCE(ssp->srcu_gp_seq_needed_exp, s); - spin_unlock_irqrestore_rcu_node(ssp, flags); + spin_lock_irqsave_ssp_contention(ssp, &flags); + if (ULONG_CMP_LT(ssp->srcu_sup->srcu_gp_seq_needed_exp, s)) + WRITE_ONCE(ssp->srcu_sup->srcu_gp_seq_needed_exp, s); + spin_unlock_irqrestore_rcu_node(ssp->srcu_sup, flags); } /* @@ -640,67 +982,85 @@ static void srcu_funnel_exp_start(struct srcu_struct *ssp, struct srcu_node *snp * * Note that this function also does the work of srcu_funnel_exp_start(), * in some cases by directly invoking it. + * + * The srcu read lock should be hold around this function. And s is a seq snap + * after holding that lock. */ static void srcu_funnel_gp_start(struct srcu_struct *ssp, struct srcu_data *sdp, unsigned long s, bool do_norm) { unsigned long flags; int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs); - struct srcu_node *snp = sdp->mynode; + unsigned long sgsne; + struct srcu_node *snp; + struct srcu_node *snp_leaf; unsigned long snp_seq; + struct srcu_usage *sup = ssp->srcu_sup; - /* Each pass through the loop does one level of the srcu_node tree. */ - for (; snp != NULL; snp = snp->srcu_parent) { - if (rcu_seq_done(&ssp->srcu_gp_seq, s) && snp != sdp->mynode) - return; /* GP already done and CBs recorded. */ - spin_lock_irqsave_rcu_node(snp, flags); - if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) { + /* Ensure that snp node tree is fully initialized before traversing it */ + if (smp_load_acquire(&sup->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER) + snp_leaf = NULL; + else + snp_leaf = sdp->mynode; + + if (snp_leaf) + /* Each pass through the loop does one level of the srcu_node tree. */ + for (snp = snp_leaf; snp != NULL; snp = snp->srcu_parent) { + if (WARN_ON_ONCE(rcu_seq_done(&sup->srcu_gp_seq, s)) && snp != snp_leaf) + return; /* GP already done and CBs recorded. */ + spin_lock_irqsave_rcu_node(snp, flags); snp_seq = snp->srcu_have_cbs[idx]; - if (snp == sdp->mynode && snp_seq == s) - snp->srcu_data_have_cbs[idx] |= sdp->grpmask; - spin_unlock_irqrestore_rcu_node(snp, flags); - if (snp == sdp->mynode && snp_seq != s) { - srcu_schedule_cbs_sdp(sdp, do_norm - ? SRCU_INTERVAL - : 0); + if (!srcu_invl_snp_seq(snp_seq) && ULONG_CMP_GE(snp_seq, s)) { + if (snp == snp_leaf && snp_seq == s) + snp->srcu_data_have_cbs[idx] |= sdp->grpmask; + spin_unlock_irqrestore_rcu_node(snp, flags); + if (snp == snp_leaf && snp_seq != s) { + srcu_schedule_cbs_sdp(sdp, do_norm ? SRCU_INTERVAL : 0); + return; + } + if (!do_norm) + srcu_funnel_exp_start(ssp, snp, s); return; } - if (!do_norm) - srcu_funnel_exp_start(ssp, snp, s); - return; + snp->srcu_have_cbs[idx] = s; + if (snp == snp_leaf) + snp->srcu_data_have_cbs[idx] |= sdp->grpmask; + sgsne = snp->srcu_gp_seq_needed_exp; + if (!do_norm && (srcu_invl_snp_seq(sgsne) || ULONG_CMP_LT(sgsne, s))) + WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s); + spin_unlock_irqrestore_rcu_node(snp, flags); } - snp->srcu_have_cbs[idx] = s; - if (snp == sdp->mynode) - snp->srcu_data_have_cbs[idx] |= sdp->grpmask; - if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s)) - WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s); - spin_unlock_irqrestore_rcu_node(snp, flags); - } /* Top of tree, must ensure the grace period will be started. */ - spin_lock_irqsave_rcu_node(ssp, flags); - if (ULONG_CMP_LT(ssp->srcu_gp_seq_needed, s)) { + spin_lock_irqsave_ssp_contention(ssp, &flags); + if (ULONG_CMP_LT(sup->srcu_gp_seq_needed, s)) { /* * Record need for grace period s. Pair with load * acquire setting up for initialization. */ - smp_store_release(&ssp->srcu_gp_seq_needed, s); /*^^^*/ + smp_store_release(&sup->srcu_gp_seq_needed, s); /*^^^*/ } - if (!do_norm && ULONG_CMP_LT(ssp->srcu_gp_seq_needed_exp, s)) - WRITE_ONCE(ssp->srcu_gp_seq_needed_exp, s); + if (!do_norm && ULONG_CMP_LT(sup->srcu_gp_seq_needed_exp, s)) + WRITE_ONCE(sup->srcu_gp_seq_needed_exp, s); - /* If grace period not already done and none in progress, start it. */ - if (!rcu_seq_done(&ssp->srcu_gp_seq, s) && - rcu_seq_state(ssp->srcu_gp_seq) == SRCU_STATE_IDLE) { - WARN_ON_ONCE(ULONG_CMP_GE(ssp->srcu_gp_seq, ssp->srcu_gp_seq_needed)); + /* If grace period not already in progress, start it. */ + if (!WARN_ON_ONCE(rcu_seq_done(&sup->srcu_gp_seq, s)) && + rcu_seq_state(sup->srcu_gp_seq) == SRCU_STATE_IDLE) { + WARN_ON_ONCE(ULONG_CMP_GE(sup->srcu_gp_seq, sup->srcu_gp_seq_needed)); srcu_gp_start(ssp); + + // And how can that list_add() in the "else" clause + // possibly be safe for concurrent execution? Well, + // it isn't. And it does not have to be. After all, it + // can only be executed during early boot when there is only + // the one boot CPU running with interrupts still disabled. if (likely(srcu_init_done)) - queue_delayed_work(rcu_gp_wq, &ssp->work, - srcu_get_delay(ssp)); - else if (list_empty(&ssp->work.work.entry)) - list_add(&ssp->work.work.entry, &srcu_boot_list); + queue_delayed_work(rcu_gp_wq, &sup->work, + !!srcu_get_delay(ssp)); + else if (list_empty(&sup->work.work.entry)) + list_add(&sup->work.work.entry, &srcu_boot_list); } - spin_unlock_irqrestore_rcu_node(ssp, flags); + spin_unlock_irqrestore_rcu_node(sup, flags); } /* @@ -710,12 +1070,16 @@ static void srcu_funnel_gp_start(struct srcu_struct *ssp, struct srcu_data *sdp, */ static bool try_check_zero(struct srcu_struct *ssp, int idx, int trycount) { + unsigned long curdelay; + + curdelay = !srcu_get_delay(ssp); + for (;;) { if (srcu_readers_active_idx_check(ssp, idx)) return true; - if (--trycount + !srcu_get_delay(ssp) <= 0) + if ((--trycount + curdelay) <= 0) return false; - udelay(SRCU_RETRY_CHECK_DELAY); + udelay(srcu_retry_check_delay); } } @@ -727,23 +1091,44 @@ static bool try_check_zero(struct srcu_struct *ssp, int idx, int trycount) static void srcu_flip(struct srcu_struct *ssp) { /* - * Ensure that if this updater saw a given reader's increment - * from __srcu_read_lock(), that reader was using an old value - * of ->srcu_idx. Also ensure that if a given reader sees the - * new value of ->srcu_idx, this updater's earlier scans cannot - * have seen that reader's increments (which is OK, because this - * grace period need not wait on that reader). + * Because the flip of ->srcu_idx is executed only if the + * preceding call to srcu_readers_active_idx_check() found that + * the ->srcu_unlock_count[] and ->srcu_lock_count[] sums matched + * and because that summing uses atomic_long_read(), there is + * ordering due to a control dependency between that summing and + * the WRITE_ONCE() in this call to srcu_flip(). This ordering + * ensures that if this updater saw a given reader's increment from + * __srcu_read_lock(), that reader was using a value of ->srcu_idx + * from before the previous call to srcu_flip(), which should be + * quite rare. This ordering thus helps forward progress because + * the grace period could otherwise be delayed by additional + * calls to __srcu_read_lock() using that old (soon to be new) + * value of ->srcu_idx. + * + * This sum-equality check and ordering also ensures that if + * a given call to __srcu_read_lock() uses the new value of + * ->srcu_idx, this updater's earlier scans cannot have seen + * that reader's increments, which is all to the good, because + * this grace period need not wait on that reader. After all, + * if those earlier scans had seen that reader, there would have + * been a sum mismatch and this code would not be reached. + * + * This means that the following smp_mb() is redundant, but + * it stays until either (1) Compilers learn about this sort of + * control dependency or (2) Some production workload running on + * a production system is unduly delayed by this slowpath smp_mb(). */ smp_mb(); /* E */ /* Pairs with B and C. */ - WRITE_ONCE(ssp->srcu_idx, ssp->srcu_idx + 1); + WRITE_ONCE(ssp->srcu_idx, ssp->srcu_idx + 1); // Flip the counter. /* * Ensure that if the updater misses an __srcu_read_unlock() - * increment, that task's next __srcu_read_lock() will see the - * above counter update. Note that both this memory barrier - * and the one in srcu_readers_active_idx_check() provide the - * guarantee for __srcu_read_lock(). + * increment, that task's __srcu_read_lock() following its next + * __srcu_read_lock() or __srcu_read_unlock() will see the above + * counter update. Note that both this memory barrier and the + * one in srcu_readers_active_idx_check() provide the guarantee + * for __srcu_read_lock(). */ smp_mb(); /* D */ /* Pairs with C. */ } @@ -766,7 +1151,7 @@ static void srcu_flip(struct srcu_struct *ssp) * it, if this function was preempted for enough time for the counters * to wrap, it really doesn't matter whether or not we expedite the grace * period. The extra overhead of a needlessly expedited grace period is - * negligible when amoritized over that time period, and the extra latency + * negligible when amortized over that time period, and the extra latency * of a needlessly non-expedited grace period is similarly negligible. */ static bool srcu_might_be_idle(struct srcu_struct *ssp) @@ -777,35 +1162,36 @@ static bool srcu_might_be_idle(struct srcu_struct *ssp) unsigned long t; unsigned long tlast; + check_init_srcu_struct(ssp); /* If the local srcu_data structure has callbacks, not idle. */ - local_irq_save(flags); - sdp = this_cpu_ptr(ssp->sda); + sdp = raw_cpu_ptr(ssp->sda); + spin_lock_irqsave_rcu_node(sdp, flags); if (rcu_segcblist_pend_cbs(&sdp->srcu_cblist)) { - local_irq_restore(flags); + spin_unlock_irqrestore_rcu_node(sdp, flags); return false; /* Callbacks already present, so not idle. */ } - local_irq_restore(flags); + spin_unlock_irqrestore_rcu_node(sdp, flags); /* - * No local callbacks, so probabalistically probe global state. + * No local callbacks, so probabilistically probe global state. * Exact information would require acquiring locks, which would - * kill scalability, hence the probabalistic nature of the probe. + * kill scalability, hence the probabilistic nature of the probe. */ /* First, see if enough time has passed since the last GP. */ t = ktime_get_mono_fast_ns(); - tlast = READ_ONCE(ssp->srcu_last_gp_end); + tlast = READ_ONCE(ssp->srcu_sup->srcu_last_gp_end); if (exp_holdoff == 0 || time_in_range_open(t, tlast, tlast + exp_holdoff)) return false; /* Too soon after last GP. */ /* Next, check for probable idleness. */ - curseq = rcu_seq_current(&ssp->srcu_gp_seq); + curseq = rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq); smp_mb(); /* Order ->srcu_gp_seq with ->srcu_gp_seq_needed. */ - if (ULONG_CMP_LT(curseq, READ_ONCE(ssp->srcu_gp_seq_needed))) + if (ULONG_CMP_LT(curseq, READ_ONCE(ssp->srcu_sup->srcu_gp_seq_needed))) return false; /* Grace period in progress, so not idle. */ smp_mb(); /* Order ->srcu_gp_seq with prior access. */ - if (curseq != rcu_seq_current(&ssp->srcu_gp_seq)) + if (curseq != rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq)) return false; /* GP # changed, so not idle. */ return true; /* With reasonable probability, idle! */ } @@ -818,6 +1204,93 @@ static void srcu_leak_callback(struct rcu_head *rhp) } /* + * Start an SRCU grace period, and also queue the callback if non-NULL. + */ +static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp, + struct rcu_head *rhp, bool do_norm) +{ + unsigned long flags; + int idx; + bool needexp = false; + bool needgp = false; + unsigned long s; + struct srcu_data *sdp; + struct srcu_node *sdp_mynode; + int ss_state; + + check_init_srcu_struct(ssp); + /* + * While starting a new grace period, make sure we are in an + * SRCU read-side critical section so that the grace-period + * sequence number cannot wrap around in the meantime. + */ + idx = __srcu_read_lock_nmisafe(ssp); + ss_state = smp_load_acquire(&ssp->srcu_sup->srcu_size_state); + if (ss_state < SRCU_SIZE_WAIT_CALL) + sdp = per_cpu_ptr(ssp->sda, get_boot_cpu_id()); + else + sdp = raw_cpu_ptr(ssp->sda); + spin_lock_irqsave_sdp_contention(sdp, &flags); + if (rhp) + rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp); + /* + * The snapshot for acceleration must be taken _before_ the read of the + * current gp sequence used for advancing, otherwise advancing may fail + * and acceleration may then fail too. + * + * This could happen if: + * + * 1) The RCU_WAIT_TAIL segment has callbacks (gp_num = X + 4) and the + * RCU_NEXT_READY_TAIL also has callbacks (gp_num = X + 8). + * + * 2) The grace period for RCU_WAIT_TAIL is seen as started but not + * completed so rcu_seq_current() returns X + SRCU_STATE_SCAN1. + * + * 3) This value is passed to rcu_segcblist_advance() which can't move + * any segment forward and fails. + * + * 4) srcu_gp_start_if_needed() still proceeds with callback acceleration. + * But then the call to rcu_seq_snap() observes the grace period for the + * RCU_WAIT_TAIL segment as completed and the subsequent one for the + * RCU_NEXT_READY_TAIL segment as started (ie: X + 4 + SRCU_STATE_SCAN1) + * so it returns a snapshot of the next grace period, which is X + 12. + * + * 5) The value of X + 12 is passed to rcu_segcblist_accelerate() but the + * freshly enqueued callback in RCU_NEXT_TAIL can't move to + * RCU_NEXT_READY_TAIL which already has callbacks for a previous grace + * period (gp_num = X + 8). So acceleration fails. + */ + s = rcu_seq_snap(&ssp->srcu_sup->srcu_gp_seq); + if (rhp) { + rcu_segcblist_advance(&sdp->srcu_cblist, + rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq)); + WARN_ON_ONCE(!rcu_segcblist_accelerate(&sdp->srcu_cblist, s)); + } + if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed, s)) { + sdp->srcu_gp_seq_needed = s; + needgp = true; + } + if (!do_norm && ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) { + sdp->srcu_gp_seq_needed_exp = s; + needexp = true; + } + spin_unlock_irqrestore_rcu_node(sdp, flags); + + /* Ensure that snp node tree is fully initialized before traversing it */ + if (ss_state < SRCU_SIZE_WAIT_BARRIER) + sdp_mynode = NULL; + else + sdp_mynode = sdp->mynode; + + if (needgp) + srcu_funnel_gp_start(ssp, sdp, s, do_norm); + else if (needexp) + srcu_funnel_exp_start(ssp, sdp_mynode, s); + __srcu_read_unlock_nmisafe(ssp, idx); + return s; +} + +/* * Enqueue an SRCU callback on the srcu_data structure associated with * the current CPU and the specified srcu_struct structure, initiating * grace-period processing if it is not already running. @@ -848,14 +1321,6 @@ static void srcu_leak_callback(struct rcu_head *rhp) static void __call_srcu(struct srcu_struct *ssp, struct rcu_head *rhp, rcu_callback_t func, bool do_norm) { - unsigned long flags; - int idx; - bool needexp = false; - bool needgp = false; - unsigned long s; - struct srcu_data *sdp; - - check_init_srcu_struct(ssp); if (debug_rcu_head_queue(rhp)) { /* Probable double call_srcu(), so leak the callback. */ WRITE_ONCE(rhp->func, srcu_leak_callback); @@ -863,29 +1328,7 @@ static void __call_srcu(struct srcu_struct *ssp, struct rcu_head *rhp, return; } rhp->func = func; - idx = srcu_read_lock(ssp); - local_irq_save(flags); - sdp = this_cpu_ptr(ssp->sda); - spin_lock_rcu_node(sdp); - rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp); - rcu_segcblist_advance(&sdp->srcu_cblist, - rcu_seq_current(&ssp->srcu_gp_seq)); - s = rcu_seq_snap(&ssp->srcu_gp_seq); - (void)rcu_segcblist_accelerate(&sdp->srcu_cblist, s); - if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed, s)) { - sdp->srcu_gp_seq_needed = s; - needgp = true; - } - if (!do_norm && ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) { - sdp->srcu_gp_seq_needed_exp = s; - needexp = true; - } - spin_unlock_irqrestore_rcu_node(sdp, flags); - if (needgp) - srcu_funnel_gp_start(ssp, sdp, s, do_norm); - else if (needexp) - srcu_funnel_exp_start(ssp, sdp->mynode, s); - srcu_read_unlock(ssp, idx); + (void)srcu_gp_start_if_needed(ssp, rhp, do_norm); } /** @@ -919,7 +1362,9 @@ static void __synchronize_srcu(struct srcu_struct *ssp, bool do_norm) { struct rcu_synchronize rcu; - RCU_LOCKDEP_WARN(lock_is_held(&ssp->dep_map) || + srcu_lock_sync(&ssp->dep_map); + + RCU_LOCKDEP_WARN(lockdep_is_held(ssp) || lock_is_held(&rcu_bh_lock_map) || lock_is_held(&rcu_lock_map) || lock_is_held(&rcu_sched_lock_map), @@ -1000,6 +1445,9 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are * passed the same srcu_struct structure. * + * Implementation of these memory-ordering guarantees is similar to + * that of synchronize_rcu(). + * * If SRCU is likely idle, expedite the first request. This semantic * was provided by Classic SRCU, and is relied upon by its users, so TREE * SRCU must also provide it. Note that detecting idleness is heuristic @@ -1014,6 +1462,77 @@ void synchronize_srcu(struct srcu_struct *ssp) } EXPORT_SYMBOL_GPL(synchronize_srcu); +/** + * get_state_synchronize_srcu - Provide an end-of-grace-period cookie + * @ssp: srcu_struct to provide cookie for. + * + * This function returns a cookie that can be passed to + * poll_state_synchronize_srcu(), which will return true if a full grace + * period has elapsed in the meantime. It is the caller's responsibility + * to make sure that grace period happens, for example, by invoking + * call_srcu() after return from get_state_synchronize_srcu(). + */ +unsigned long get_state_synchronize_srcu(struct srcu_struct *ssp) +{ + // Any prior manipulation of SRCU-protected data must happen + // before the load from ->srcu_gp_seq. + smp_mb(); + return rcu_seq_snap(&ssp->srcu_sup->srcu_gp_seq); +} +EXPORT_SYMBOL_GPL(get_state_synchronize_srcu); + +/** + * start_poll_synchronize_srcu - Provide cookie and start grace period + * @ssp: srcu_struct to provide cookie for. + * + * This function returns a cookie that can be passed to + * poll_state_synchronize_srcu(), which will return true if a full grace + * period has elapsed in the meantime. Unlike get_state_synchronize_srcu(), + * this function also ensures that any needed SRCU grace period will be + * started. This convenience does come at a cost in terms of CPU overhead. + */ +unsigned long start_poll_synchronize_srcu(struct srcu_struct *ssp) +{ + return srcu_gp_start_if_needed(ssp, NULL, true); +} +EXPORT_SYMBOL_GPL(start_poll_synchronize_srcu); + +/** + * poll_state_synchronize_srcu - Has cookie's grace period ended? + * @ssp: srcu_struct to provide cookie for. + * @cookie: Return value from get_state_synchronize_srcu() or start_poll_synchronize_srcu(). + * + * This function takes the cookie that was returned from either + * get_state_synchronize_srcu() or start_poll_synchronize_srcu(), and + * returns @true if an SRCU grace period elapsed since the time that the + * cookie was created. + * + * Because cookies are finite in size, wrapping/overflow is possible. + * This is more pronounced on 32-bit systems where cookies are 32 bits, + * where in theory wrapping could happen in about 14 hours assuming + * 25-microsecond expedited SRCU grace periods. However, a more likely + * overflow lower bound is on the order of 24 days in the case of + * one-millisecond SRCU grace periods. Of course, wrapping in a 64-bit + * system requires geologic timespans, as in more than seven million years + * even for expedited SRCU grace periods. + * + * Wrapping/overflow is much more of an issue for CONFIG_SMP=n systems + * that also have CONFIG_PREEMPTION=n, which selects Tiny SRCU. This uses + * a 16-bit cookie, which rcutorture routinely wraps in a matter of a + * few minutes. If this proves to be a problem, this counter will be + * expanded to the same size as for Tree SRCU. + */ +bool poll_state_synchronize_srcu(struct srcu_struct *ssp, unsigned long cookie) +{ + if (!rcu_seq_done(&ssp->srcu_sup->srcu_gp_seq, cookie)) + return false; + // Ensure that the end of the SRCU grace period happens before + // any subsequent code that the caller might execute. + smp_mb(); // ^^^ + return true; +} +EXPORT_SYMBOL_GPL(poll_state_synchronize_srcu); + /* * Callback function for srcu_barrier() use. */ @@ -1024,8 +1543,30 @@ static void srcu_barrier_cb(struct rcu_head *rhp) sdp = container_of(rhp, struct srcu_data, srcu_barrier_head); ssp = sdp->ssp; - if (atomic_dec_and_test(&ssp->srcu_barrier_cpu_cnt)) - complete(&ssp->srcu_barrier_completion); + if (atomic_dec_and_test(&ssp->srcu_sup->srcu_barrier_cpu_cnt)) + complete(&ssp->srcu_sup->srcu_barrier_completion); +} + +/* + * Enqueue an srcu_barrier() callback on the specified srcu_data + * structure's ->cblist. but only if that ->cblist already has at least one + * callback enqueued. Note that if a CPU already has callbacks enqueue, + * it must have already registered the need for a future grace period, + * so all we need do is enqueue a callback that will use the same grace + * period as the last callback already in the queue. + */ +static void srcu_barrier_one_cpu(struct srcu_struct *ssp, struct srcu_data *sdp) +{ + spin_lock_irq_rcu_node(sdp); + atomic_inc(&ssp->srcu_sup->srcu_barrier_cpu_cnt); + sdp->srcu_barrier_head.func = srcu_barrier_cb; + debug_rcu_head_queue(&sdp->srcu_barrier_head); + if (!rcu_segcblist_entrain(&sdp->srcu_cblist, + &sdp->srcu_barrier_head)) { + debug_rcu_head_unqueue(&sdp->srcu_barrier_head); + atomic_dec(&ssp->srcu_sup->srcu_barrier_cpu_cnt); + } + spin_unlock_irq_rcu_node(sdp); } /** @@ -1035,51 +1576,37 @@ static void srcu_barrier_cb(struct rcu_head *rhp) void srcu_barrier(struct srcu_struct *ssp) { int cpu; - struct srcu_data *sdp; - unsigned long s = rcu_seq_snap(&ssp->srcu_barrier_seq); + int idx; + unsigned long s = rcu_seq_snap(&ssp->srcu_sup->srcu_barrier_seq); check_init_srcu_struct(ssp); - mutex_lock(&ssp->srcu_barrier_mutex); - if (rcu_seq_done(&ssp->srcu_barrier_seq, s)) { + mutex_lock(&ssp->srcu_sup->srcu_barrier_mutex); + if (rcu_seq_done(&ssp->srcu_sup->srcu_barrier_seq, s)) { smp_mb(); /* Force ordering following return. */ - mutex_unlock(&ssp->srcu_barrier_mutex); + mutex_unlock(&ssp->srcu_sup->srcu_barrier_mutex); return; /* Someone else did our work for us. */ } - rcu_seq_start(&ssp->srcu_barrier_seq); - init_completion(&ssp->srcu_barrier_completion); + rcu_seq_start(&ssp->srcu_sup->srcu_barrier_seq); + init_completion(&ssp->srcu_sup->srcu_barrier_completion); /* Initial count prevents reaching zero until all CBs are posted. */ - atomic_set(&ssp->srcu_barrier_cpu_cnt, 1); + atomic_set(&ssp->srcu_sup->srcu_barrier_cpu_cnt, 1); - /* - * Each pass through this loop enqueues a callback, but only - * on CPUs already having callbacks enqueued. Note that if - * a CPU already has callbacks enqueue, it must have already - * registered the need for a future grace period, so all we - * need do is enqueue a callback that will use the same - * grace period as the last callback already in the queue. - */ - for_each_possible_cpu(cpu) { - sdp = per_cpu_ptr(ssp->sda, cpu); - spin_lock_irq_rcu_node(sdp); - atomic_inc(&ssp->srcu_barrier_cpu_cnt); - sdp->srcu_barrier_head.func = srcu_barrier_cb; - debug_rcu_head_queue(&sdp->srcu_barrier_head); - if (!rcu_segcblist_entrain(&sdp->srcu_cblist, - &sdp->srcu_barrier_head)) { - debug_rcu_head_unqueue(&sdp->srcu_barrier_head); - atomic_dec(&ssp->srcu_barrier_cpu_cnt); - } - spin_unlock_irq_rcu_node(sdp); - } + idx = __srcu_read_lock_nmisafe(ssp); + if (smp_load_acquire(&ssp->srcu_sup->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER) + srcu_barrier_one_cpu(ssp, per_cpu_ptr(ssp->sda, get_boot_cpu_id())); + else + for_each_possible_cpu(cpu) + srcu_barrier_one_cpu(ssp, per_cpu_ptr(ssp->sda, cpu)); + __srcu_read_unlock_nmisafe(ssp, idx); /* Remove the initial count, at which point reaching zero can happen. */ - if (atomic_dec_and_test(&ssp->srcu_barrier_cpu_cnt)) - complete(&ssp->srcu_barrier_completion); - wait_for_completion(&ssp->srcu_barrier_completion); + if (atomic_dec_and_test(&ssp->srcu_sup->srcu_barrier_cpu_cnt)) + complete(&ssp->srcu_sup->srcu_barrier_completion); + wait_for_completion(&ssp->srcu_sup->srcu_barrier_completion); - rcu_seq_end(&ssp->srcu_barrier_seq); - mutex_unlock(&ssp->srcu_barrier_mutex); + rcu_seq_end(&ssp->srcu_sup->srcu_barrier_seq); + mutex_unlock(&ssp->srcu_sup->srcu_barrier_mutex); } EXPORT_SYMBOL_GPL(srcu_barrier); @@ -1105,7 +1632,7 @@ static void srcu_advance_state(struct srcu_struct *ssp) { int idx; - mutex_lock(&ssp->srcu_gp_mutex); + mutex_lock(&ssp->srcu_sup->srcu_gp_mutex); /* * Because readers might be delayed for an extended period after @@ -1117,38 +1644,39 @@ static void srcu_advance_state(struct srcu_struct *ssp) * The load-acquire ensures that we see the accesses performed * by the prior grace period. */ - idx = rcu_seq_state(smp_load_acquire(&ssp->srcu_gp_seq)); /* ^^^ */ + idx = rcu_seq_state(smp_load_acquire(&ssp->srcu_sup->srcu_gp_seq)); /* ^^^ */ if (idx == SRCU_STATE_IDLE) { - spin_lock_irq_rcu_node(ssp); - if (ULONG_CMP_GE(ssp->srcu_gp_seq, ssp->srcu_gp_seq_needed)) { - WARN_ON_ONCE(rcu_seq_state(ssp->srcu_gp_seq)); - spin_unlock_irq_rcu_node(ssp); - mutex_unlock(&ssp->srcu_gp_mutex); + spin_lock_irq_rcu_node(ssp->srcu_sup); + if (ULONG_CMP_GE(ssp->srcu_sup->srcu_gp_seq, ssp->srcu_sup->srcu_gp_seq_needed)) { + WARN_ON_ONCE(rcu_seq_state(ssp->srcu_sup->srcu_gp_seq)); + spin_unlock_irq_rcu_node(ssp->srcu_sup); + mutex_unlock(&ssp->srcu_sup->srcu_gp_mutex); return; } - idx = rcu_seq_state(READ_ONCE(ssp->srcu_gp_seq)); + idx = rcu_seq_state(READ_ONCE(ssp->srcu_sup->srcu_gp_seq)); if (idx == SRCU_STATE_IDLE) srcu_gp_start(ssp); - spin_unlock_irq_rcu_node(ssp); + spin_unlock_irq_rcu_node(ssp->srcu_sup); if (idx != SRCU_STATE_IDLE) { - mutex_unlock(&ssp->srcu_gp_mutex); + mutex_unlock(&ssp->srcu_sup->srcu_gp_mutex); return; /* Someone else started the grace period. */ } } - if (rcu_seq_state(READ_ONCE(ssp->srcu_gp_seq)) == SRCU_STATE_SCAN1) { + if (rcu_seq_state(READ_ONCE(ssp->srcu_sup->srcu_gp_seq)) == SRCU_STATE_SCAN1) { idx = 1 ^ (ssp->srcu_idx & 1); if (!try_check_zero(ssp, idx, 1)) { - mutex_unlock(&ssp->srcu_gp_mutex); + mutex_unlock(&ssp->srcu_sup->srcu_gp_mutex); return; /* readers present, retry later. */ } srcu_flip(ssp); - spin_lock_irq_rcu_node(ssp); - rcu_seq_set_state(&ssp->srcu_gp_seq, SRCU_STATE_SCAN2); - spin_unlock_irq_rcu_node(ssp); + spin_lock_irq_rcu_node(ssp->srcu_sup); + rcu_seq_set_state(&ssp->srcu_sup->srcu_gp_seq, SRCU_STATE_SCAN2); + ssp->srcu_sup->srcu_n_exp_nodelay = 0; + spin_unlock_irq_rcu_node(ssp->srcu_sup); } - if (rcu_seq_state(READ_ONCE(ssp->srcu_gp_seq)) == SRCU_STATE_SCAN2) { + if (rcu_seq_state(READ_ONCE(ssp->srcu_sup->srcu_gp_seq)) == SRCU_STATE_SCAN2) { /* * SRCU read-side critical sections are normally short, @@ -1156,9 +1684,10 @@ static void srcu_advance_state(struct srcu_struct *ssp) */ idx = 1 ^ (ssp->srcu_idx & 1); if (!try_check_zero(ssp, idx, 2)) { - mutex_unlock(&ssp->srcu_gp_mutex); + mutex_unlock(&ssp->srcu_sup->srcu_gp_mutex); return; /* readers present, retry later. */ } + ssp->srcu_sup->srcu_n_exp_nodelay = 0; srcu_gp_end(ssp); /* Releases ->srcu_gp_mutex. */ } } @@ -1171,6 +1700,7 @@ static void srcu_advance_state(struct srcu_struct *ssp) */ static void srcu_invoke_callbacks(struct work_struct *work) { + long len; bool more; struct rcu_cblist ready_cbs; struct rcu_head *rhp; @@ -1182,8 +1712,14 @@ static void srcu_invoke_callbacks(struct work_struct *work) ssp = sdp->ssp; rcu_cblist_init(&ready_cbs); spin_lock_irq_rcu_node(sdp); + WARN_ON_ONCE(!rcu_segcblist_segempty(&sdp->srcu_cblist, RCU_NEXT_TAIL)); rcu_segcblist_advance(&sdp->srcu_cblist, - rcu_seq_current(&ssp->srcu_gp_seq)); + rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq)); + /* + * Although this function is theoretically re-entrant, concurrent + * callbacks invocation is disallowed to avoid executing an SRCU barrier + * too early. + */ if (sdp->srcu_cblist_invoking || !rcu_segcblist_ready_cbs(&sdp->srcu_cblist)) { spin_unlock_irq_rcu_node(sdp); @@ -1193,26 +1729,28 @@ static void srcu_invoke_callbacks(struct work_struct *work) /* We are on the job! Extract and invoke ready callbacks. */ sdp->srcu_cblist_invoking = true; rcu_segcblist_extract_done_cbs(&sdp->srcu_cblist, &ready_cbs); + len = ready_cbs.len; spin_unlock_irq_rcu_node(sdp); rhp = rcu_cblist_dequeue(&ready_cbs); for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { debug_rcu_head_unqueue(rhp); + debug_rcu_head_callback(rhp); local_bh_disable(); rhp->func(rhp); local_bh_enable(); } + WARN_ON_ONCE(ready_cbs.len); /* * Update counts, accelerate new callbacks, and if needed, * schedule another round of callback invocation. */ spin_lock_irq_rcu_node(sdp); - rcu_segcblist_insert_count(&sdp->srcu_cblist, &ready_cbs); - (void)rcu_segcblist_accelerate(&sdp->srcu_cblist, - rcu_seq_snap(&ssp->srcu_gp_seq)); + rcu_segcblist_add_len(&sdp->srcu_cblist, -len); sdp->srcu_cblist_invoking = false; more = rcu_segcblist_ready_cbs(&sdp->srcu_cblist); spin_unlock_irq_rcu_node(sdp); + /* An SRCU barrier or callbacks from previous nesting work pending */ if (more) srcu_schedule_cbs_sdp(sdp, 0); } @@ -1225,20 +1763,20 @@ static void srcu_reschedule(struct srcu_struct *ssp, unsigned long delay) { bool pushgp = true; - spin_lock_irq_rcu_node(ssp); - if (ULONG_CMP_GE(ssp->srcu_gp_seq, ssp->srcu_gp_seq_needed)) { - if (!WARN_ON_ONCE(rcu_seq_state(ssp->srcu_gp_seq))) { + spin_lock_irq_rcu_node(ssp->srcu_sup); + if (ULONG_CMP_GE(ssp->srcu_sup->srcu_gp_seq, ssp->srcu_sup->srcu_gp_seq_needed)) { + if (!WARN_ON_ONCE(rcu_seq_state(ssp->srcu_sup->srcu_gp_seq))) { /* All requests fulfilled, time to go idle. */ pushgp = false; } - } else if (!rcu_seq_state(ssp->srcu_gp_seq)) { + } else if (!rcu_seq_state(ssp->srcu_sup->srcu_gp_seq)) { /* Outstanding request and no GP. Start one. */ srcu_gp_start(ssp); } - spin_unlock_irq_rcu_node(ssp); + spin_unlock_irq_rcu_node(ssp->srcu_sup); if (pushgp) - queue_delayed_work(rcu_gp_wq, &ssp->work, delay); + queue_delayed_work(rcu_gp_wq, &ssp->srcu_sup->work, delay); } /* @@ -1246,12 +1784,30 @@ static void srcu_reschedule(struct srcu_struct *ssp, unsigned long delay) */ static void process_srcu(struct work_struct *work) { + unsigned long curdelay; + unsigned long j; struct srcu_struct *ssp; + struct srcu_usage *sup; - ssp = container_of(work, struct srcu_struct, work.work); + sup = container_of(work, struct srcu_usage, work.work); + ssp = sup->srcu_ssp; srcu_advance_state(ssp); - srcu_reschedule(ssp, srcu_get_delay(ssp)); + curdelay = srcu_get_delay(ssp); + if (curdelay) { + WRITE_ONCE(sup->reschedule_count, 0); + } else { + j = jiffies; + if (READ_ONCE(sup->reschedule_jiffies) == j) { + WRITE_ONCE(sup->reschedule_count, READ_ONCE(sup->reschedule_count) + 1); + if (READ_ONCE(sup->reschedule_count) > srcu_max_nodelay) + curdelay = 1; + } else { + WRITE_ONCE(sup->reschedule_count, 1); + WRITE_ONCE(sup->reschedule_jiffies, j); + } + } + srcu_reschedule(ssp, curdelay); } void srcutorture_get_gp_data(enum rcutorture_type test_type, @@ -1261,47 +1817,73 @@ void srcutorture_get_gp_data(enum rcutorture_type test_type, if (test_type != SRCU_FLAVOR) return; *flags = 0; - *gp_seq = rcu_seq_current(&ssp->srcu_gp_seq); + *gp_seq = rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq); } EXPORT_SYMBOL_GPL(srcutorture_get_gp_data); +static const char * const srcu_size_state_name[] = { + "SRCU_SIZE_SMALL", + "SRCU_SIZE_ALLOC", + "SRCU_SIZE_WAIT_BARRIER", + "SRCU_SIZE_WAIT_CALL", + "SRCU_SIZE_WAIT_CBS1", + "SRCU_SIZE_WAIT_CBS2", + "SRCU_SIZE_WAIT_CBS3", + "SRCU_SIZE_WAIT_CBS4", + "SRCU_SIZE_BIG", + "SRCU_SIZE_???", +}; + void srcu_torture_stats_print(struct srcu_struct *ssp, char *tt, char *tf) { int cpu; int idx; unsigned long s0 = 0, s1 = 0; + int ss_state = READ_ONCE(ssp->srcu_sup->srcu_size_state); + int ss_state_idx = ss_state; idx = ssp->srcu_idx & 0x1; - pr_alert("%s%s Tree SRCU g%ld per-CPU(idx=%d):", - tt, tf, rcu_seq_current(&ssp->srcu_gp_seq), idx); - for_each_possible_cpu(cpu) { - unsigned long l0, l1; - unsigned long u0, u1; - long c0, c1; - struct srcu_data *sdp; - - sdp = per_cpu_ptr(ssp->sda, cpu); - u0 = data_race(sdp->srcu_unlock_count[!idx]); - u1 = data_race(sdp->srcu_unlock_count[idx]); - - /* - * Make sure that a lock is always counted if the corresponding - * unlock is counted. - */ - smp_rmb(); - - l0 = data_race(sdp->srcu_lock_count[!idx]); - l1 = data_race(sdp->srcu_lock_count[idx]); - - c0 = l0 - u0; - c1 = l1 - u1; - pr_cont(" %d(%ld,%ld %c)", - cpu, c0, c1, - "C."[rcu_segcblist_empty(&sdp->srcu_cblist)]); - s0 += c0; - s1 += c1; + if (ss_state < 0 || ss_state >= ARRAY_SIZE(srcu_size_state_name)) + ss_state_idx = ARRAY_SIZE(srcu_size_state_name) - 1; + pr_alert("%s%s Tree SRCU g%ld state %d (%s)", + tt, tf, rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq), ss_state, + srcu_size_state_name[ss_state_idx]); + if (!ssp->sda) { + // Called after cleanup_srcu_struct(), perhaps. + pr_cont(" No per-CPU srcu_data structures (->sda == NULL).\n"); + } else { + pr_cont(" per-CPU(idx=%d):", idx); + for_each_possible_cpu(cpu) { + unsigned long l0, l1; + unsigned long u0, u1; + long c0, c1; + struct srcu_data *sdp; + + sdp = per_cpu_ptr(ssp->sda, cpu); + u0 = data_race(atomic_long_read(&sdp->srcu_unlock_count[!idx])); + u1 = data_race(atomic_long_read(&sdp->srcu_unlock_count[idx])); + + /* + * Make sure that a lock is always counted if the corresponding + * unlock is counted. + */ + smp_rmb(); + + l0 = data_race(atomic_long_read(&sdp->srcu_lock_count[!idx])); + l1 = data_race(atomic_long_read(&sdp->srcu_lock_count[idx])); + + c0 = l0 - u0; + c1 = l1 - u1; + pr_cont(" %d(%ld,%ld %c)", + cpu, c0, c1, + "C."[rcu_segcblist_empty(&sdp->srcu_cblist)]); + s0 += c0; + s1 += c1; + } + pr_cont(" T(%ld,%ld)\n", s0, s1); } - pr_cont(" T(%ld,%ld)\n", s0, s1); + if (SRCU_SIZING_IS_TORTURE()) + srcu_transition_to_big(ssp); } EXPORT_SYMBOL_GPL(srcu_torture_stats_print); @@ -1310,21 +1892,44 @@ static int __init srcu_bootup_announce(void) pr_info("Hierarchical SRCU implementation.\n"); if (exp_holdoff != DEFAULT_SRCU_EXP_HOLDOFF) pr_info("\tNon-default auto-expedite holdoff of %lu ns.\n", exp_holdoff); + if (srcu_retry_check_delay != SRCU_DEFAULT_RETRY_CHECK_DELAY) + pr_info("\tNon-default retry check delay of %lu us.\n", srcu_retry_check_delay); + if (srcu_max_nodelay != SRCU_DEFAULT_MAX_NODELAY) + pr_info("\tNon-default max no-delay of %lu.\n", srcu_max_nodelay); + pr_info("\tMax phase no-delay instances is %lu.\n", srcu_max_nodelay_phase); return 0; } early_initcall(srcu_bootup_announce); void __init srcu_init(void) { - struct srcu_struct *ssp; + struct srcu_usage *sup; + + /* Decide on srcu_struct-size strategy. */ + if (SRCU_SIZING_IS(SRCU_SIZING_AUTO)) { + if (nr_cpu_ids >= big_cpu_lim) { + convert_to_big = SRCU_SIZING_INIT; // Don't bother waiting for contention. + pr_info("%s: Setting srcu_struct sizes to big.\n", __func__); + } else { + convert_to_big = SRCU_SIZING_NONE | SRCU_SIZING_CONTEND; + pr_info("%s: Setting srcu_struct sizes based on contention.\n", __func__); + } + } + /* + * Once that is set, call_srcu() can follow the normal path and + * queue delayed work. This must follow RCU workqueues creation + * and timers initialization. + */ srcu_init_done = true; while (!list_empty(&srcu_boot_list)) { - ssp = list_first_entry(&srcu_boot_list, struct srcu_struct, + sup = list_first_entry(&srcu_boot_list, struct srcu_usage, work.work.entry); - check_init_srcu_struct(ssp); - list_del_init(&ssp->work.work.entry); - queue_work(rcu_gp_wq, &ssp->work.work); + list_del_init(&sup->work.work.entry); + if (SRCU_SIZING_IS(SRCU_SIZING_INIT) && + sup->srcu_size_state == SRCU_SIZE_SMALL) + sup->srcu_size_state = SRCU_SIZE_ALLOC; + queue_work(rcu_gp_wq, &sup->work.work); } } @@ -1334,13 +1939,14 @@ void __init srcu_init(void) static int srcu_module_coming(struct module *mod) { int i; + struct srcu_struct *ssp; struct srcu_struct **sspp = mod->srcu_struct_ptrs; - int ret; for (i = 0; i < mod->num_srcu_structs; i++) { - ret = init_srcu_struct(*(sspp++)); - if (WARN_ON_ONCE(ret)) - return ret; + ssp = *(sspp++); + ssp->sda = alloc_percpu(struct srcu_data); + if (WARN_ON_ONCE(!ssp->sda)) + return -ENOMEM; } return 0; } @@ -1349,10 +1955,17 @@ static int srcu_module_coming(struct module *mod) static void srcu_module_going(struct module *mod) { int i; + struct srcu_struct *ssp; struct srcu_struct **sspp = mod->srcu_struct_ptrs; - for (i = 0; i < mod->num_srcu_structs; i++) - cleanup_srcu_struct(*(sspp++)); + for (i = 0; i < mod->num_srcu_structs; i++) { + ssp = *(sspp++); + if (!rcu_seq_state(smp_load_acquire(&ssp->srcu_sup->srcu_gp_seq_needed)) && + !WARN_ON_ONCE(!ssp->srcu_sup->sda_is_static)) + cleanup_srcu_struct(ssp); + if (!WARN_ON(srcu_readers_active(ssp))) + free_percpu(ssp->sda); + } } /* Handle one module, either coming or going. */ |