diff options
Diffstat (limited to 'net/sched/sch_fq.c')
| -rw-r--r-- | net/sched/sch_fq.c | 1366 |
1 files changed, 1366 insertions, 0 deletions
diff --git a/net/sched/sch_fq.c b/net/sched/sch_fq.c new file mode 100644 index 000000000000..6e5f2f4f2415 --- /dev/null +++ b/net/sched/sch_fq.c @@ -0,0 +1,1366 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * net/sched/sch_fq.c Fair Queue Packet Scheduler (per flow pacing) + * + * Copyright (C) 2013-2023 Eric Dumazet <edumazet@google.com> + * + * Meant to be mostly used for locally generated traffic : + * Fast classification depends on skb->sk being set before reaching us. + * If not, (router workload), we use rxhash as fallback, with 32 bits wide hash. + * All packets belonging to a socket are considered as a 'flow'. + * + * Flows are dynamically allocated and stored in a hash table of RB trees + * They are also part of one Round Robin 'queues' (new or old flows) + * + * Burst avoidance (aka pacing) capability : + * + * Transport (eg TCP) can set in sk->sk_pacing_rate a rate, enqueue a + * bunch of packets, and this packet scheduler adds delay between + * packets to respect rate limitation. + * + * enqueue() : + * - lookup one RB tree (out of 1024 or more) to find the flow. + * If non existent flow, create it, add it to the tree. + * Add skb to the per flow list of skb (fifo). + * - Use a special fifo for high prio packets + * + * dequeue() : serves flows in Round Robin + * Note : When a flow becomes empty, we do not immediately remove it from + * rb trees, for performance reasons (its expected to send additional packets, + * or SLAB cache will reuse socket for another flow) + */ + +#include <linux/module.h> +#include <linux/types.h> +#include <linux/kernel.h> +#include <linux/jiffies.h> +#include <linux/string.h> +#include <linux/in.h> +#include <linux/errno.h> +#include <linux/init.h> +#include <linux/skbuff.h> +#include <linux/slab.h> +#include <linux/rbtree.h> +#include <linux/hash.h> +#include <linux/prefetch.h> +#include <linux/vmalloc.h> +#include <net/netlink.h> +#include <net/pkt_sched.h> +#include <net/sock.h> +#include <net/tcp_states.h> +#include <net/tcp.h> + +struct fq_skb_cb { + u64 time_to_send; + u8 band; +}; + +static inline struct fq_skb_cb *fq_skb_cb(struct sk_buff *skb) +{ + qdisc_cb_private_validate(skb, sizeof(struct fq_skb_cb)); + return (struct fq_skb_cb *)qdisc_skb_cb(skb)->data; +} + +/* + * Per flow structure, dynamically allocated. + * If packets have monotically increasing time_to_send, they are placed in O(1) + * in linear list (head,tail), otherwise are placed in a rbtree (t_root). + */ +struct fq_flow { +/* First cache line : used in fq_gc(), fq_enqueue(), fq_dequeue() */ + struct rb_root t_root; + struct sk_buff *head; /* list of skbs for this flow : first skb */ + union { + struct sk_buff *tail; /* last skb in the list */ + unsigned long age; /* (jiffies | 1UL) when flow was emptied, for gc */ + }; + union { + struct rb_node fq_node; /* anchor in fq_root[] trees */ + /* Following field is only used for q->internal, + * because q->internal is not hashed in fq_root[] + */ + u64 stat_fastpath_packets; + }; + struct sock *sk; + u32 socket_hash; /* sk_hash */ + int qlen; /* number of packets in flow queue */ + +/* Second cache line */ + int credit; + int band; + struct fq_flow *next; /* next pointer in RR lists */ + + struct rb_node rate_node; /* anchor in q->delayed tree */ + u64 time_next_packet; +}; + +struct fq_flow_head { + struct fq_flow *first; + struct fq_flow *last; +}; + +struct fq_perband_flows { + struct fq_flow_head new_flows; + struct fq_flow_head old_flows; + int credit; + int quantum; /* based on band nr : 576KB, 192KB, 64KB */ +}; + +#define FQ_PRIO2BAND_CRUMB_SIZE ((TC_PRIO_MAX + 1) >> 2) + +struct fq_sched_data { +/* Read mostly cache line */ + + u64 offload_horizon; + u32 quantum; + u32 initial_quantum; + u32 flow_refill_delay; + u32 flow_plimit; /* max packets per flow */ + unsigned long flow_max_rate; /* optional max rate per flow */ + u64 ce_threshold; + u64 horizon; /* horizon in ns */ + u32 orphan_mask; /* mask for orphaned skb */ + u32 low_rate_threshold; + struct rb_root *fq_root; + u8 rate_enable; + u8 fq_trees_log; + u8 horizon_drop; + u8 prio2band[FQ_PRIO2BAND_CRUMB_SIZE]; + u32 timer_slack; /* hrtimer slack in ns */ + +/* Read/Write fields. */ + + unsigned int band_nr; /* band being serviced in fq_dequeue() */ + + struct fq_perband_flows band_flows[FQ_BANDS]; + + struct fq_flow internal; /* fastpath queue. */ + struct rb_root delayed; /* for rate limited flows */ + u64 time_next_delayed_flow; + unsigned long unthrottle_latency_ns; + + u32 band_pkt_count[FQ_BANDS]; + u32 flows; + u32 inactive_flows; /* Flows with no packet to send. */ + u32 throttled_flows; + + u64 stat_throttled; + struct qdisc_watchdog watchdog; + u64 stat_gc_flows; + +/* Seldom used fields. */ + + u64 stat_band_drops[FQ_BANDS]; + u64 stat_ce_mark; + u64 stat_horizon_drops; + u64 stat_horizon_caps; + u64 stat_flows_plimit; + u64 stat_pkts_too_long; + u64 stat_allocation_errors; +}; + +/* return the i-th 2-bit value ("crumb") */ +static u8 fq_prio2band(const u8 *prio2band, unsigned int prio) +{ + return (READ_ONCE(prio2band[prio / 4]) >> (2 * (prio & 0x3))) & 0x3; +} + +/* + * f->tail and f->age share the same location. + * We can use the low order bit to differentiate if this location points + * to a sk_buff or contains a jiffies value, if we force this value to be odd. + * This assumes f->tail low order bit must be 0 since alignof(struct sk_buff) >= 2 + */ +static void fq_flow_set_detached(struct fq_flow *f) +{ + f->age = jiffies | 1UL; +} + +static bool fq_flow_is_detached(const struct fq_flow *f) +{ + return !!(f->age & 1UL); +} + +/* special value to mark a throttled flow (not on old/new list) */ +static struct fq_flow throttled; + +static bool fq_flow_is_throttled(const struct fq_flow *f) +{ + return f->next == &throttled; +} + +enum new_flow { + NEW_FLOW, + OLD_FLOW +}; + +static void fq_flow_add_tail(struct fq_sched_data *q, struct fq_flow *flow, + enum new_flow list_sel) +{ + struct fq_perband_flows *pband = &q->band_flows[flow->band]; + struct fq_flow_head *head = (list_sel == NEW_FLOW) ? + &pband->new_flows : + &pband->old_flows; + + if (head->first) + head->last->next = flow; + else + head->first = flow; + head->last = flow; + flow->next = NULL; +} + +static void fq_flow_unset_throttled(struct fq_sched_data *q, struct fq_flow *f) +{ + rb_erase(&f->rate_node, &q->delayed); + q->throttled_flows--; + fq_flow_add_tail(q, f, OLD_FLOW); +} + +static void fq_flow_set_throttled(struct fq_sched_data *q, struct fq_flow *f) +{ + struct rb_node **p = &q->delayed.rb_node, *parent = NULL; + + while (*p) { + struct fq_flow *aux; + + parent = *p; + aux = rb_entry(parent, struct fq_flow, rate_node); + if (f->time_next_packet >= aux->time_next_packet) + p = &parent->rb_right; + else + p = &parent->rb_left; + } + rb_link_node(&f->rate_node, parent, p); + rb_insert_color(&f->rate_node, &q->delayed); + q->throttled_flows++; + q->stat_throttled++; + + f->next = &throttled; + if (q->time_next_delayed_flow > f->time_next_packet) + q->time_next_delayed_flow = f->time_next_packet; +} + + +static struct kmem_cache *fq_flow_cachep __read_mostly; + + +/* limit number of collected flows per round */ +#define FQ_GC_MAX 8 +#define FQ_GC_AGE (3*HZ) + +static bool fq_gc_candidate(const struct fq_flow *f) +{ + return fq_flow_is_detached(f) && + time_after(jiffies, f->age + FQ_GC_AGE); +} + +static void fq_gc(struct fq_sched_data *q, + struct rb_root *root, + struct sock *sk) +{ + struct rb_node **p, *parent; + void *tofree[FQ_GC_MAX]; + struct fq_flow *f; + int i, fcnt = 0; + + p = &root->rb_node; + parent = NULL; + while (*p) { + parent = *p; + + f = rb_entry(parent, struct fq_flow, fq_node); + if (f->sk == sk) + break; + + if (fq_gc_candidate(f)) { + tofree[fcnt++] = f; + if (fcnt == FQ_GC_MAX) + break; + } + + if (f->sk > sk) + p = &parent->rb_right; + else + p = &parent->rb_left; + } + + if (!fcnt) + return; + + for (i = fcnt; i > 0; ) { + f = tofree[--i]; + rb_erase(&f->fq_node, root); + } + q->flows -= fcnt; + q->inactive_flows -= fcnt; + q->stat_gc_flows += fcnt; + + kmem_cache_free_bulk(fq_flow_cachep, fcnt, tofree); +} + +/* Fast path can be used if : + * 1) Packet tstamp is in the past, or within the pacing offload horizon. + * 2) FQ qlen == 0 OR + * (no flow is currently eligible for transmit, + * AND fast path queue has less than 8 packets) + * 3) No SO_MAX_PACING_RATE on the socket (if any). + * 4) No @maxrate attribute on this qdisc, + * + * FQ can not use generic TCQ_F_CAN_BYPASS infrastructure. + */ +static bool fq_fastpath_check(const struct Qdisc *sch, struct sk_buff *skb, + u64 now) +{ + const struct fq_sched_data *q = qdisc_priv(sch); + const struct sock *sk; + + if (fq_skb_cb(skb)->time_to_send > now + q->offload_horizon) + return false; + + if (sch->q.qlen != 0) { + /* Even if some packets are stored in this qdisc, + * we can still enable fast path if all of them are + * scheduled in the future (ie no flows are eligible) + * or in the fast path queue. + */ + if (q->flows != q->inactive_flows + q->throttled_flows) + return false; + + /* Do not allow fast path queue to explode, we want Fair Queue mode + * under pressure. + */ + if (q->internal.qlen >= 8) + return false; + + /* Ordering invariants fall apart if some delayed flows + * are ready but we haven't serviced them, yet. + */ + if (q->time_next_delayed_flow <= now + q->offload_horizon) + return false; + } + + sk = skb->sk; + if (sk && sk_fullsock(sk) && !sk_is_tcp(sk) && + sk->sk_max_pacing_rate != ~0UL) + return false; + + if (q->flow_max_rate != ~0UL) + return false; + + return true; +} + +static struct fq_flow *fq_classify(struct Qdisc *sch, struct sk_buff *skb, + u64 now) +{ + struct fq_sched_data *q = qdisc_priv(sch); + struct rb_node **p, *parent; + struct sock *sk = skb->sk; + struct rb_root *root; + struct fq_flow *f; + + /* SYNACK messages are attached to a TCP_NEW_SYN_RECV request socket + * or a listener (SYNCOOKIE mode) + * 1) request sockets are not full blown, + * they do not contain sk_pacing_rate + * 2) They are not part of a 'flow' yet + * 3) We do not want to rate limit them (eg SYNFLOOD attack), + * especially if the listener set SO_MAX_PACING_RATE + * 4) We pretend they are orphaned + * TCP can also associate TIME_WAIT sockets with RST or ACK packets. + */ + if (!sk || sk_listener_or_tw(sk)) { + unsigned long hash = skb_get_hash(skb) & q->orphan_mask; + + /* By forcing low order bit to 1, we make sure to not + * collide with a local flow (socket pointers are word aligned) + */ + sk = (struct sock *)((hash << 1) | 1UL); + skb_orphan(skb); + } else if (sk->sk_state == TCP_CLOSE) { + unsigned long hash = skb_get_hash(skb) & q->orphan_mask; + /* + * Sockets in TCP_CLOSE are non connected. + * Typical use case is UDP sockets, they can send packets + * with sendto() to many different destinations. + * We probably could use a generic bit advertising + * non connected sockets, instead of sk_state == TCP_CLOSE, + * if we care enough. + */ + sk = (struct sock *)((hash << 1) | 1UL); + } + + if (fq_fastpath_check(sch, skb, now)) { + q->internal.stat_fastpath_packets++; + if (skb->sk == sk && q->rate_enable && + READ_ONCE(sk->sk_pacing_status) != SK_PACING_FQ) + smp_store_release(&sk->sk_pacing_status, + SK_PACING_FQ); + return &q->internal; + } + + root = &q->fq_root[hash_ptr(sk, q->fq_trees_log)]; + + fq_gc(q, root, sk); + + p = &root->rb_node; + parent = NULL; + while (*p) { + parent = *p; + + f = rb_entry(parent, struct fq_flow, fq_node); + if (f->sk == sk) { + /* socket might have been reallocated, so check + * if its sk_hash is the same. + * It not, we need to refill credit with + * initial quantum + */ + if (unlikely(skb->sk == sk && + f->socket_hash != sk->sk_hash)) { + f->credit = q->initial_quantum; + f->socket_hash = sk->sk_hash; + if (q->rate_enable) + smp_store_release(&sk->sk_pacing_status, + SK_PACING_FQ); + if (fq_flow_is_throttled(f)) + fq_flow_unset_throttled(q, f); + f->time_next_packet = 0ULL; + } + return f; + } + if (f->sk > sk) + p = &parent->rb_right; + else + p = &parent->rb_left; + } + + f = kmem_cache_zalloc(fq_flow_cachep, GFP_ATOMIC | __GFP_NOWARN); + if (unlikely(!f)) { + q->stat_allocation_errors++; + return &q->internal; + } + /* f->t_root is already zeroed after kmem_cache_zalloc() */ + + fq_flow_set_detached(f); + f->sk = sk; + if (skb->sk == sk) { + f->socket_hash = sk->sk_hash; + if (q->rate_enable) + smp_store_release(&sk->sk_pacing_status, + SK_PACING_FQ); + } + f->credit = q->initial_quantum; + + rb_link_node(&f->fq_node, parent, p); + rb_insert_color(&f->fq_node, root); + + q->flows++; + q->inactive_flows++; + return f; +} + +static struct sk_buff *fq_peek(struct fq_flow *flow) +{ + struct sk_buff *skb = skb_rb_first(&flow->t_root); + struct sk_buff *head = flow->head; + + if (!skb) + return head; + + if (!head) + return skb; + + if (fq_skb_cb(skb)->time_to_send < fq_skb_cb(head)->time_to_send) + return skb; + return head; +} + +static void fq_erase_head(struct Qdisc *sch, struct fq_flow *flow, + struct sk_buff *skb) +{ + if (skb == flow->head) { + struct sk_buff *next = skb->next; + + prefetch(next); + flow->head = next; + } else { + rb_erase(&skb->rbnode, &flow->t_root); + skb->dev = qdisc_dev(sch); + } +} + +/* Remove one skb from flow queue. + * This skb must be the return value of prior fq_peek(). + */ +static void fq_dequeue_skb(struct Qdisc *sch, struct fq_flow *flow, + struct sk_buff *skb) +{ + fq_erase_head(sch, flow, skb); + skb_mark_not_on_list(skb); + qdisc_qstats_backlog_dec(sch, skb); + sch->q.qlen--; + qdisc_bstats_update(sch, skb); +} + +static void flow_queue_add(struct fq_flow *flow, struct sk_buff *skb) +{ + struct rb_node **p, *parent; + struct sk_buff *head, *aux; + + head = flow->head; + if (!head || + fq_skb_cb(skb)->time_to_send >= fq_skb_cb(flow->tail)->time_to_send) { + if (!head) + flow->head = skb; + else + flow->tail->next = skb; + flow->tail = skb; + skb->next = NULL; + return; + } + + p = &flow->t_root.rb_node; + parent = NULL; + + while (*p) { + parent = *p; + aux = rb_to_skb(parent); + if (fq_skb_cb(skb)->time_to_send >= fq_skb_cb(aux)->time_to_send) + p = &parent->rb_right; + else + p = &parent->rb_left; + } + rb_link_node(&skb->rbnode, parent, p); + rb_insert_color(&skb->rbnode, &flow->t_root); +} + +static bool fq_packet_beyond_horizon(const struct sk_buff *skb, + const struct fq_sched_data *q, u64 now) +{ + return unlikely((s64)skb->tstamp > (s64)(now + q->horizon)); +} + +#define FQDR(reason) SKB_DROP_REASON_FQ_##reason + +static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch, + struct sk_buff **to_free) +{ + struct fq_sched_data *q = qdisc_priv(sch); + struct fq_flow *f; + u64 now; + u8 band; + + band = fq_prio2band(q->prio2band, skb->priority & TC_PRIO_MAX); + if (unlikely(q->band_pkt_count[band] >= sch->limit)) { + q->stat_band_drops[band]++; + return qdisc_drop_reason(skb, sch, to_free, + FQDR(BAND_LIMIT)); + } + + now = ktime_get_ns(); + if (!skb->tstamp) { + fq_skb_cb(skb)->time_to_send = now; + } else { + /* Check if packet timestamp is too far in the future. */ + if (fq_packet_beyond_horizon(skb, q, now)) { + if (q->horizon_drop) { + q->stat_horizon_drops++; + return qdisc_drop_reason(skb, sch, to_free, + FQDR(HORIZON_LIMIT)); + } + q->stat_horizon_caps++; + skb->tstamp = now + q->horizon; + } + fq_skb_cb(skb)->time_to_send = skb->tstamp; + } + + f = fq_classify(sch, skb, now); + + if (f != &q->internal) { + if (unlikely(f->qlen >= q->flow_plimit)) { + q->stat_flows_plimit++; + return qdisc_drop_reason(skb, sch, to_free, + FQDR(FLOW_LIMIT)); + } + + if (fq_flow_is_detached(f)) { + fq_flow_add_tail(q, f, NEW_FLOW); + if (time_after(jiffies, f->age + q->flow_refill_delay)) + f->credit = max_t(u32, f->credit, q->quantum); + } + + f->band = band; + q->band_pkt_count[band]++; + fq_skb_cb(skb)->band = band; + if (f->qlen == 0) + q->inactive_flows--; + } + + f->qlen++; + /* Note: this overwrites f->age */ + flow_queue_add(f, skb); + + qdisc_qstats_backlog_inc(sch, skb); + sch->q.qlen++; + + return NET_XMIT_SUCCESS; +} +#undef FQDR + +static void fq_check_throttled(struct fq_sched_data *q, u64 now) +{ + unsigned long sample; + struct rb_node *p; + + if (q->time_next_delayed_flow > now + q->offload_horizon) + return; + + /* Update unthrottle latency EWMA. + * This is cheap and can help diagnosing timer/latency problems. + */ + sample = (unsigned long)(now - q->time_next_delayed_flow); + if ((long)sample > 0) { + q->unthrottle_latency_ns -= q->unthrottle_latency_ns >> 3; + q->unthrottle_latency_ns += sample >> 3; + } + now += q->offload_horizon; + + q->time_next_delayed_flow = ~0ULL; + while ((p = rb_first(&q->delayed)) != NULL) { + struct fq_flow *f = rb_entry(p, struct fq_flow, rate_node); + + if (f->time_next_packet > now) { + q->time_next_delayed_flow = f->time_next_packet; + break; + } + fq_flow_unset_throttled(q, f); + } +} + +static struct fq_flow_head *fq_pband_head_select(struct fq_perband_flows *pband) +{ + if (pband->credit <= 0) + return NULL; + + if (pband->new_flows.first) + return &pband->new_flows; + + return pband->old_flows.first ? &pband->old_flows : NULL; +} + +static struct sk_buff *fq_dequeue(struct Qdisc *sch) +{ + struct fq_sched_data *q = qdisc_priv(sch); + struct fq_perband_flows *pband; + struct fq_flow_head *head; + struct sk_buff *skb; + struct fq_flow *f; + unsigned long rate; + int retry; + u32 plen; + u64 now; + + if (!sch->q.qlen) + return NULL; + + skb = fq_peek(&q->internal); + if (unlikely(skb)) { + q->internal.qlen--; + fq_dequeue_skb(sch, &q->internal, skb); + goto out; + } + + now = ktime_get_ns(); + fq_check_throttled(q, now); + retry = 0; + pband = &q->band_flows[q->band_nr]; +begin: + head = fq_pband_head_select(pband); + if (!head) { + while (++retry <= FQ_BANDS) { + if (++q->band_nr == FQ_BANDS) + q->band_nr = 0; + pband = &q->band_flows[q->band_nr]; + pband->credit = min(pband->credit + pband->quantum, + pband->quantum); + if (pband->credit > 0) + goto begin; + retry = 0; + } + if (q->time_next_delayed_flow != ~0ULL) + qdisc_watchdog_schedule_range_ns(&q->watchdog, + q->time_next_delayed_flow, + q->timer_slack); + return NULL; + } + f = head->first; + retry = 0; + if (f->credit <= 0) { + f->credit += q->quantum; + head->first = f->next; + fq_flow_add_tail(q, f, OLD_FLOW); + goto begin; + } + + skb = fq_peek(f); + if (skb) { + u64 time_next_packet = max_t(u64, fq_skb_cb(skb)->time_to_send, + f->time_next_packet); + + if (now + q->offload_horizon < time_next_packet) { + head->first = f->next; + f->time_next_packet = time_next_packet; + fq_flow_set_throttled(q, f); + goto begin; + } + prefetch(&skb->end); + fq_dequeue_skb(sch, f, skb); + if ((s64)(now - time_next_packet - q->ce_threshold) > 0) { + INET_ECN_set_ce(skb); + q->stat_ce_mark++; + } + if (--f->qlen == 0) + q->inactive_flows++; + q->band_pkt_count[fq_skb_cb(skb)->band]--; + } else { + head->first = f->next; + /* force a pass through old_flows to prevent starvation */ + if (head == &pband->new_flows) { + fq_flow_add_tail(q, f, OLD_FLOW); + } else { + fq_flow_set_detached(f); + } + goto begin; + } + plen = qdisc_pkt_len(skb); + f->credit -= plen; + pband->credit -= plen; + + if (!q->rate_enable) + goto out; + + rate = q->flow_max_rate; + + /* If EDT time was provided for this skb, we need to + * update f->time_next_packet only if this qdisc enforces + * a flow max rate. + */ + if (!skb->tstamp) { + if (skb->sk) + rate = min(READ_ONCE(skb->sk->sk_pacing_rate), rate); + + if (rate <= q->low_rate_threshold) { + f->credit = 0; + } else { + plen = max(plen, q->quantum); + if (f->credit > 0) + goto out; + } + } + if (rate != ~0UL) { + u64 len = (u64)plen * NSEC_PER_SEC; + + if (likely(rate)) + len = div64_ul(len, rate); + /* Since socket rate can change later, + * clamp the delay to 1 second. + * Really, providers of too big packets should be fixed ! + */ + if (unlikely(len > NSEC_PER_SEC)) { + len = NSEC_PER_SEC; + q->stat_pkts_too_long++; + } + /* Account for schedule/timers drifts. + * f->time_next_packet was set when prior packet was sent, + * and current time (@now) can be too late by tens of us. + */ + if (f->time_next_packet) + len -= min(len/2, now - f->time_next_packet); + f->time_next_packet = now + len; + } +out: + return skb; +} + +static void fq_flow_purge(struct fq_flow *flow) +{ + struct rb_node *p = rb_first(&flow->t_root); + + while (p) { + struct sk_buff *skb = rb_to_skb(p); + + p = rb_next(p); + rb_erase(&skb->rbnode, &flow->t_root); + rtnl_kfree_skbs(skb, skb); + } + rtnl_kfree_skbs(flow->head, flow->tail); + flow->head = NULL; + flow->qlen = 0; +} + +static void fq_reset(struct Qdisc *sch) +{ + struct fq_sched_data *q = qdisc_priv(sch); + struct rb_root *root; + struct rb_node *p; + struct fq_flow *f; + unsigned int idx; + + sch->q.qlen = 0; + sch->qstats.backlog = 0; + + fq_flow_purge(&q->internal); + + if (!q->fq_root) + return; + + for (idx = 0; idx < (1U << q->fq_trees_log); idx++) { + root = &q->fq_root[idx]; + while ((p = rb_first(root)) != NULL) { + f = rb_entry(p, struct fq_flow, fq_node); + rb_erase(p, root); + + fq_flow_purge(f); + + kmem_cache_free(fq_flow_cachep, f); + } + } + for (idx = 0; idx < FQ_BANDS; idx++) { + q->band_flows[idx].new_flows.first = NULL; + q->band_flows[idx].old_flows.first = NULL; + } + q->delayed = RB_ROOT; + q->flows = 0; + q->inactive_flows = 0; + q->throttled_flows = 0; +} + +static void fq_rehash(struct fq_sched_data *q, + struct rb_root *old_array, u32 old_log, + struct rb_root *new_array, u32 new_log) +{ + struct rb_node *op, **np, *parent; + struct rb_root *oroot, *nroot; + struct fq_flow *of, *nf; + int fcnt = 0; + u32 idx; + + for (idx = 0; idx < (1U << old_log); idx++) { + oroot = &old_array[idx]; + while ((op = rb_first(oroot)) != NULL) { + rb_erase(op, oroot); + of = rb_entry(op, struct fq_flow, fq_node); + if (fq_gc_candidate(of)) { + fcnt++; + kmem_cache_free(fq_flow_cachep, of); + continue; + } + nroot = &new_array[hash_ptr(of->sk, new_log)]; + + np = &nroot->rb_node; + parent = NULL; + while (*np) { + parent = *np; + + nf = rb_entry(parent, struct fq_flow, fq_node); + BUG_ON(nf->sk == of->sk); + + if (nf->sk > of->sk) + np = &parent->rb_right; + else + np = &parent->rb_left; + } + + rb_link_node(&of->fq_node, parent, np); + rb_insert_color(&of->fq_node, nroot); + } + } + q->flows -= fcnt; + q->inactive_flows -= fcnt; + q->stat_gc_flows += fcnt; +} + +static void fq_free(void *addr) +{ + kvfree(addr); +} + +static int fq_resize(struct Qdisc *sch, u32 log) +{ + struct fq_sched_data *q = qdisc_priv(sch); + struct rb_root *array; + void *old_fq_root; + u32 idx; + + if (q->fq_root && log == q->fq_trees_log) + return 0; + + /* If XPS was setup, we can allocate memory on right NUMA node */ + array = kvmalloc_node(sizeof(struct rb_root) << log, GFP_KERNEL | __GFP_RETRY_MAYFAIL, + netdev_queue_numa_node_read(sch->dev_queue)); + if (!array) + return -ENOMEM; + + for (idx = 0; idx < (1U << log); idx++) + array[idx] = RB_ROOT; + + sch_tree_lock(sch); + + old_fq_root = q->fq_root; + if (old_fq_root) + fq_rehash(q, old_fq_root, q->fq_trees_log, array, log); + + q->fq_root = array; + WRITE_ONCE(q->fq_trees_log, log); + + sch_tree_unlock(sch); + + fq_free(old_fq_root); + + return 0; +} + +static const struct netlink_range_validation iq_range = { + .max = INT_MAX, +}; + +static const struct nla_policy fq_policy[TCA_FQ_MAX + 1] = { + [TCA_FQ_UNSPEC] = { .strict_start_type = TCA_FQ_TIMER_SLACK }, + + [TCA_FQ_PLIMIT] = { .type = NLA_U32 }, + [TCA_FQ_FLOW_PLIMIT] = { .type = NLA_U32 }, + [TCA_FQ_QUANTUM] = { .type = NLA_U32 }, + [TCA_FQ_INITIAL_QUANTUM] = NLA_POLICY_FULL_RANGE(NLA_U32, &iq_range), + [TCA_FQ_RATE_ENABLE] = { .type = NLA_U32 }, + [TCA_FQ_FLOW_DEFAULT_RATE] = { .type = NLA_U32 }, + [TCA_FQ_FLOW_MAX_RATE] = { .type = NLA_U32 }, + [TCA_FQ_BUCKETS_LOG] = { .type = NLA_U32 }, + [TCA_FQ_FLOW_REFILL_DELAY] = { .type = NLA_U32 }, + [TCA_FQ_ORPHAN_MASK] = { .type = NLA_U32 }, + [TCA_FQ_LOW_RATE_THRESHOLD] = { .type = NLA_U32 }, + [TCA_FQ_CE_THRESHOLD] = { .type = NLA_U32 }, + [TCA_FQ_TIMER_SLACK] = { .type = NLA_U32 }, + [TCA_FQ_HORIZON] = { .type = NLA_U32 }, + [TCA_FQ_HORIZON_DROP] = { .type = NLA_U8 }, + [TCA_FQ_PRIOMAP] = NLA_POLICY_EXACT_LEN(sizeof(struct tc_prio_qopt)), + [TCA_FQ_WEIGHTS] = NLA_POLICY_EXACT_LEN(FQ_BANDS * sizeof(s32)), + [TCA_FQ_OFFLOAD_HORIZON] = { .type = NLA_U32 }, +}; + +/* compress a u8 array with all elems <= 3 to an array of 2-bit fields */ +static void fq_prio2band_compress_crumb(const u8 *in, u8 *out) +{ + const int num_elems = TC_PRIO_MAX + 1; + u8 tmp[FQ_PRIO2BAND_CRUMB_SIZE]; + int i; + + memset(tmp, 0, sizeof(tmp)); + for (i = 0; i < num_elems; i++) + tmp[i / 4] |= in[i] << (2 * (i & 0x3)); + + for (i = 0; i < FQ_PRIO2BAND_CRUMB_SIZE; i++) + WRITE_ONCE(out[i], tmp[i]); +} + +static void fq_prio2band_decompress_crumb(const u8 *in, u8 *out) +{ + const int num_elems = TC_PRIO_MAX + 1; + int i; + + for (i = 0; i < num_elems; i++) + out[i] = fq_prio2band(in, i); +} + +static int fq_load_weights(struct fq_sched_data *q, + const struct nlattr *attr, + struct netlink_ext_ack *extack) +{ + s32 *weights = nla_data(attr); + int i; + + for (i = 0; i < FQ_BANDS; i++) { + if (weights[i] < FQ_MIN_WEIGHT) { + NL_SET_ERR_MSG_FMT_MOD(extack, "Weight %d less that minimum allowed %d", + weights[i], FQ_MIN_WEIGHT); + return -EINVAL; + } + } + for (i = 0; i < FQ_BANDS; i++) + WRITE_ONCE(q->band_flows[i].quantum, weights[i]); + return 0; +} + +static int fq_load_priomap(struct fq_sched_data *q, + const struct nlattr *attr, + struct netlink_ext_ack *extack) +{ + const struct tc_prio_qopt *map = nla_data(attr); + int i; + + if (map->bands != FQ_BANDS) { + NL_SET_ERR_MSG_MOD(extack, "FQ only supports 3 bands"); + return -EINVAL; + } + for (i = 0; i < TC_PRIO_MAX + 1; i++) { + if (map->priomap[i] >= FQ_BANDS) { + NL_SET_ERR_MSG_FMT_MOD(extack, "FQ priomap field %d maps to a too high band %d", + i, map->priomap[i]); + return -EINVAL; + } + } + fq_prio2band_compress_crumb(map->priomap, q->prio2band); + return 0; +} + +static int fq_change(struct Qdisc *sch, struct nlattr *opt, + struct netlink_ext_ack *extack) +{ + unsigned int dropped_pkts = 0, dropped_bytes = 0; + struct fq_sched_data *q = qdisc_priv(sch); + struct nlattr *tb[TCA_FQ_MAX + 1]; + u32 fq_log; + int err; + + err = nla_parse_nested_deprecated(tb, TCA_FQ_MAX, opt, fq_policy, + NULL); + if (err < 0) + return err; + + sch_tree_lock(sch); + + fq_log = q->fq_trees_log; + + if (tb[TCA_FQ_BUCKETS_LOG]) { + u32 nval = nla_get_u32(tb[TCA_FQ_BUCKETS_LOG]); + + if (nval >= 1 && nval <= ilog2(256*1024)) + fq_log = nval; + else + err = -EINVAL; + } + if (tb[TCA_FQ_PLIMIT]) + WRITE_ONCE(sch->limit, + nla_get_u32(tb[TCA_FQ_PLIMIT])); + + if (tb[TCA_FQ_FLOW_PLIMIT]) + WRITE_ONCE(q->flow_plimit, + nla_get_u32(tb[TCA_FQ_FLOW_PLIMIT])); + + if (tb[TCA_FQ_QUANTUM]) { + u32 quantum = nla_get_u32(tb[TCA_FQ_QUANTUM]); + + if (quantum > 0 && quantum <= (1 << 20)) { + WRITE_ONCE(q->quantum, quantum); + } else { + NL_SET_ERR_MSG_MOD(extack, "invalid quantum"); + err = -EINVAL; + } + } + + if (tb[TCA_FQ_INITIAL_QUANTUM]) + WRITE_ONCE(q->initial_quantum, + nla_get_u32(tb[TCA_FQ_INITIAL_QUANTUM])); + + if (tb[TCA_FQ_FLOW_DEFAULT_RATE]) + pr_warn_ratelimited("sch_fq: defrate %u ignored.\n", + nla_get_u32(tb[TCA_FQ_FLOW_DEFAULT_RATE])); + + if (tb[TCA_FQ_FLOW_MAX_RATE]) { + u32 rate = nla_get_u32(tb[TCA_FQ_FLOW_MAX_RATE]); + + WRITE_ONCE(q->flow_max_rate, + (rate == ~0U) ? ~0UL : rate); + } + if (tb[TCA_FQ_LOW_RATE_THRESHOLD]) + WRITE_ONCE(q->low_rate_threshold, + nla_get_u32(tb[TCA_FQ_LOW_RATE_THRESHOLD])); + + if (tb[TCA_FQ_RATE_ENABLE]) { + u32 enable = nla_get_u32(tb[TCA_FQ_RATE_ENABLE]); + + if (enable <= 1) + WRITE_ONCE(q->rate_enable, + enable); + else + err = -EINVAL; + } + + if (tb[TCA_FQ_FLOW_REFILL_DELAY]) { + u32 usecs_delay = nla_get_u32(tb[TCA_FQ_FLOW_REFILL_DELAY]) ; + + WRITE_ONCE(q->flow_refill_delay, + usecs_to_jiffies(usecs_delay)); + } + + if (!err && tb[TCA_FQ_PRIOMAP]) + err = fq_load_priomap(q, tb[TCA_FQ_PRIOMAP], extack); + + if (!err && tb[TCA_FQ_WEIGHTS]) + err = fq_load_weights(q, tb[TCA_FQ_WEIGHTS], extack); + + if (tb[TCA_FQ_ORPHAN_MASK]) + WRITE_ONCE(q->orphan_mask, + nla_get_u32(tb[TCA_FQ_ORPHAN_MASK])); + + if (tb[TCA_FQ_CE_THRESHOLD]) + WRITE_ONCE(q->ce_threshold, + (u64)NSEC_PER_USEC * + nla_get_u32(tb[TCA_FQ_CE_THRESHOLD])); + + if (tb[TCA_FQ_TIMER_SLACK]) + WRITE_ONCE(q->timer_slack, + nla_get_u32(tb[TCA_FQ_TIMER_SLACK])); + + if (tb[TCA_FQ_HORIZON]) + WRITE_ONCE(q->horizon, + (u64)NSEC_PER_USEC * + nla_get_u32(tb[TCA_FQ_HORIZON])); + + if (tb[TCA_FQ_HORIZON_DROP]) + WRITE_ONCE(q->horizon_drop, + nla_get_u8(tb[TCA_FQ_HORIZON_DROP])); + + if (tb[TCA_FQ_OFFLOAD_HORIZON]) { + u64 offload_horizon = (u64)NSEC_PER_USEC * + nla_get_u32(tb[TCA_FQ_OFFLOAD_HORIZON]); + + if (offload_horizon <= qdisc_dev(sch)->max_pacing_offload_horizon) { + WRITE_ONCE(q->offload_horizon, offload_horizon); + } else { + NL_SET_ERR_MSG_MOD(extack, "invalid offload_horizon"); + err = -EINVAL; + } + } + if (!err) { + + sch_tree_unlock(sch); + err = fq_resize(sch, fq_log); + sch_tree_lock(sch); + } + + while (sch->q.qlen > sch->limit) { + struct sk_buff *skb = qdisc_dequeue_internal(sch, false); + + if (!skb) + break; + + dropped_pkts++; + dropped_bytes += qdisc_pkt_len(skb); + rtnl_kfree_skbs(skb, skb); + } + qdisc_tree_reduce_backlog(sch, dropped_pkts, dropped_bytes); + + sch_tree_unlock(sch); + return err; +} + +static void fq_destroy(struct Qdisc *sch) +{ + struct fq_sched_data *q = qdisc_priv(sch); + + fq_reset(sch); + fq_free(q->fq_root); + qdisc_watchdog_cancel(&q->watchdog); +} + +static int fq_init(struct Qdisc *sch, struct nlattr *opt, + struct netlink_ext_ack *extack) +{ + struct fq_sched_data *q = qdisc_priv(sch); + int i, err; + + sch->limit = 10000; + q->flow_plimit = 100; + q->quantum = 2 * psched_mtu(qdisc_dev(sch)); + q->initial_quantum = 10 * psched_mtu(qdisc_dev(sch)); + q->flow_refill_delay = msecs_to_jiffies(40); + q->flow_max_rate = ~0UL; + q->time_next_delayed_flow = ~0ULL; + q->rate_enable = 1; + for (i = 0; i < FQ_BANDS; i++) { + q->band_flows[i].new_flows.first = NULL; + q->band_flows[i].old_flows.first = NULL; + } + q->band_flows[0].quantum = 9 << 16; + q->band_flows[1].quantum = 3 << 16; + q->band_flows[2].quantum = 1 << 16; + q->delayed = RB_ROOT; + q->fq_root = NULL; + q->fq_trees_log = ilog2(1024); + q->orphan_mask = 1024 - 1; + q->low_rate_threshold = 550000 / 8; + + q->timer_slack = 10 * NSEC_PER_USEC; /* 10 usec of hrtimer slack */ + + q->horizon = 10ULL * NSEC_PER_SEC; /* 10 seconds */ + q->horizon_drop = 1; /* by default, drop packets beyond horizon */ + + /* Default ce_threshold of 4294 seconds */ + q->ce_threshold = (u64)NSEC_PER_USEC * ~0U; + + fq_prio2band_compress_crumb(sch_default_prio2band, q->prio2band); + qdisc_watchdog_init_clockid(&q->watchdog, sch, CLOCK_MONOTONIC); + + if (opt) + err = fq_change(sch, opt, extack); + else + err = fq_resize(sch, q->fq_trees_log); + + return err; +} + +static int fq_dump(struct Qdisc *sch, struct sk_buff *skb) +{ + struct fq_sched_data *q = qdisc_priv(sch); + struct tc_prio_qopt prio = { + .bands = FQ_BANDS, + }; + struct nlattr *opts; + u64 offload_horizon; + u64 ce_threshold; + s32 weights[3]; + u64 horizon; + + opts = nla_nest_start_noflag(skb, TCA_OPTIONS); + if (opts == NULL) + goto nla_put_failure; + + /* TCA_FQ_FLOW_DEFAULT_RATE is not used anymore */ + + ce_threshold = READ_ONCE(q->ce_threshold); + do_div(ce_threshold, NSEC_PER_USEC); + + horizon = READ_ONCE(q->horizon); + do_div(horizon, NSEC_PER_USEC); + + offload_horizon = READ_ONCE(q->offload_horizon); + do_div(offload_horizon, NSEC_PER_USEC); + + if (nla_put_u32(skb, TCA_FQ_PLIMIT, + READ_ONCE(sch->limit)) || + nla_put_u32(skb, TCA_FQ_FLOW_PLIMIT, + READ_ONCE(q->flow_plimit)) || + nla_put_u32(skb, TCA_FQ_QUANTUM, + READ_ONCE(q->quantum)) || + nla_put_u32(skb, TCA_FQ_INITIAL_QUANTUM, + READ_ONCE(q->initial_quantum)) || + nla_put_u32(skb, TCA_FQ_RATE_ENABLE, + READ_ONCE(q->rate_enable)) || + nla_put_u32(skb, TCA_FQ_FLOW_MAX_RATE, + min_t(unsigned long, + READ_ONCE(q->flow_max_rate), ~0U)) || + nla_put_u32(skb, TCA_FQ_FLOW_REFILL_DELAY, + jiffies_to_usecs(READ_ONCE(q->flow_refill_delay))) || + nla_put_u32(skb, TCA_FQ_ORPHAN_MASK, + READ_ONCE(q->orphan_mask)) || + nla_put_u32(skb, TCA_FQ_LOW_RATE_THRESHOLD, + READ_ONCE(q->low_rate_threshold)) || + nla_put_u32(skb, TCA_FQ_CE_THRESHOLD, (u32)ce_threshold) || + nla_put_u32(skb, TCA_FQ_BUCKETS_LOG, + READ_ONCE(q->fq_trees_log)) || + nla_put_u32(skb, TCA_FQ_TIMER_SLACK, + READ_ONCE(q->timer_slack)) || + nla_put_u32(skb, TCA_FQ_HORIZON, (u32)horizon) || + nla_put_u32(skb, TCA_FQ_OFFLOAD_HORIZON, (u32)offload_horizon) || + nla_put_u8(skb, TCA_FQ_HORIZON_DROP, + READ_ONCE(q->horizon_drop))) + goto nla_put_failure; + + fq_prio2band_decompress_crumb(q->prio2band, prio.priomap); + if (nla_put(skb, TCA_FQ_PRIOMAP, sizeof(prio), &prio)) + goto nla_put_failure; + + weights[0] = READ_ONCE(q->band_flows[0].quantum); + weights[1] = READ_ONCE(q->band_flows[1].quantum); + weights[2] = READ_ONCE(q->band_flows[2].quantum); + if (nla_put(skb, TCA_FQ_WEIGHTS, sizeof(weights), &weights)) + goto nla_put_failure; + + return nla_nest_end(skb, opts); + +nla_put_failure: + return -1; +} + +static int fq_dump_stats(struct Qdisc *sch, struct gnet_dump *d) +{ + struct fq_sched_data *q = qdisc_priv(sch); + struct tc_fq_qd_stats st; + int i; + + st.pad = 0; + + sch_tree_lock(sch); + + st.gc_flows = q->stat_gc_flows; + st.highprio_packets = 0; + st.fastpath_packets = q->internal.stat_fastpath_packets; + st.tcp_retrans = 0; + st.throttled = q->stat_throttled; + st.flows_plimit = q->stat_flows_plimit; + st.pkts_too_long = q->stat_pkts_too_long; + st.allocation_errors = q->stat_allocation_errors; + st.time_next_delayed_flow = q->time_next_delayed_flow + q->timer_slack - + ktime_get_ns(); + st.flows = q->flows; + st.inactive_flows = q->inactive_flows; + st.throttled_flows = q->throttled_flows; + st.unthrottle_latency_ns = min_t(unsigned long, + q->unthrottle_latency_ns, ~0U); + st.ce_mark = q->stat_ce_mark; + st.horizon_drops = q->stat_horizon_drops; + st.horizon_caps = q->stat_horizon_caps; + for (i = 0; i < FQ_BANDS; i++) { + st.band_drops[i] = q->stat_band_drops[i]; + st.band_pkt_count[i] = q->band_pkt_count[i]; + } + sch_tree_unlock(sch); + + return gnet_stats_copy_app(d, &st, sizeof(st)); +} + +static struct Qdisc_ops fq_qdisc_ops __read_mostly = { + .id = "fq", + .priv_size = sizeof(struct fq_sched_data), + + .enqueue = fq_enqueue, + .dequeue = fq_dequeue, + .peek = qdisc_peek_dequeued, + .init = fq_init, + .reset = fq_reset, + .destroy = fq_destroy, + .change = fq_change, + .dump = fq_dump, + .dump_stats = fq_dump_stats, + .owner = THIS_MODULE, +}; +MODULE_ALIAS_NET_SCH("fq"); + +static int __init fq_module_init(void) +{ + int ret; + + fq_flow_cachep = kmem_cache_create("fq_flow_cache", + sizeof(struct fq_flow), + 0, SLAB_HWCACHE_ALIGN, NULL); + if (!fq_flow_cachep) + return -ENOMEM; + + ret = register_qdisc(&fq_qdisc_ops); + if (ret) + kmem_cache_destroy(fq_flow_cachep); + return ret; +} + +static void __exit fq_module_exit(void) +{ + unregister_qdisc(&fq_qdisc_ops); + kmem_cache_destroy(fq_flow_cachep); +} + +module_init(fq_module_init) +module_exit(fq_module_exit) +MODULE_AUTHOR("Eric Dumazet"); +MODULE_LICENSE("GPL"); +MODULE_DESCRIPTION("Fair Queue Packet Scheduler"); |
