summaryrefslogtreecommitdiff
path: root/net/ipv4/udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ipv4/udp.c')
-rw-r--r--net/ipv4/udp.c117
1 files changed, 71 insertions, 46 deletions
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index 85cfc32eb2cc..95241093b7f0 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -1685,25 +1685,6 @@ static void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
udp_rmem_release(sk, udp_skb_truesize(skb), 1, true);
}
-/* Idea of busylocks is to let producers grab an extra spinlock
- * to relieve pressure on the receive_queue spinlock shared by consumer.
- * Under flood, this means that only one producer can be in line
- * trying to acquire the receive_queue spinlock.
- */
-static spinlock_t *busylock_acquire(struct sock *sk)
-{
- spinlock_t *busy = &udp_sk(sk)->busylock;
-
- spin_lock(busy);
- return busy;
-}
-
-static void busylock_release(spinlock_t *busy)
-{
- if (busy)
- spin_unlock(busy);
-}
-
static int udp_rmem_schedule(struct sock *sk, int size)
{
int delta;
@@ -1718,14 +1699,24 @@ static int udp_rmem_schedule(struct sock *sk, int size)
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
{
struct sk_buff_head *list = &sk->sk_receive_queue;
+ struct udp_prod_queue *udp_prod_queue;
+ struct sk_buff *next, *to_drop = NULL;
+ struct llist_node *ll_list;
unsigned int rmem, rcvbuf;
- spinlock_t *busy = NULL;
int size, err = -ENOMEM;
+ int total_size = 0;
+ int q_size = 0;
+ int dropcount;
+ int nb = 0;
rmem = atomic_read(&sk->sk_rmem_alloc);
rcvbuf = READ_ONCE(sk->sk_rcvbuf);
size = skb->truesize;
+ udp_prod_queue = &udp_sk(sk)->udp_prod_queue[numa_node_id()];
+
+ rmem += atomic_read(&udp_prod_queue->rmem_alloc);
+
/* Immediately drop when the receive queue is full.
* Cast to unsigned int performs the boundary check for INT_MAX.
*/
@@ -1747,45 +1738,77 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
if (rmem > (rcvbuf >> 1)) {
skb_condense(skb);
size = skb->truesize;
- rmem = atomic_add_return(size, &sk->sk_rmem_alloc);
- if (rmem > rcvbuf)
- goto uncharge_drop;
- busy = busylock_acquire(sk);
- } else {
- atomic_add(size, &sk->sk_rmem_alloc);
}
udp_set_dev_scratch(skb);
+ atomic_add(size, &udp_prod_queue->rmem_alloc);
+
+ if (!llist_add(&skb->ll_node, &udp_prod_queue->ll_root))
+ return 0;
+
+ dropcount = sock_flag(sk, SOCK_RXQ_OVFL) ? sk_drops_read(sk) : 0;
+
spin_lock(&list->lock);
- err = udp_rmem_schedule(sk, size);
- if (err) {
- spin_unlock(&list->lock);
- goto uncharge_drop;
- }
- sk_forward_alloc_add(sk, -size);
+ ll_list = llist_del_all(&udp_prod_queue->ll_root);
- /* no need to setup a destructor, we will explicitly release the
- * forward allocated memory on dequeue
- */
- sock_skb_set_dropcount(sk, skb);
+ ll_list = llist_reverse_order(ll_list);
+
+ llist_for_each_entry_safe(skb, next, ll_list, ll_node) {
+ size = udp_skb_truesize(skb);
+ total_size += size;
+ err = udp_rmem_schedule(sk, size);
+ if (unlikely(err)) {
+ /* Free the skbs outside of locked section. */
+ skb->next = to_drop;
+ to_drop = skb;
+ continue;
+ }
+
+ q_size += size;
+ sk_forward_alloc_add(sk, -size);
+
+ /* no need to setup a destructor, we will explicitly release the
+ * forward allocated memory on dequeue
+ */
+ SOCK_SKB_CB(skb)->dropcount = dropcount;
+ nb++;
+ __skb_queue_tail(list, skb);
+ }
+
+ atomic_add(q_size, &sk->sk_rmem_alloc);
- __skb_queue_tail(list, skb);
spin_unlock(&list->lock);
- if (!sock_flag(sk, SOCK_DEAD))
- INDIRECT_CALL_1(sk->sk_data_ready, sock_def_readable, sk);
+ if (!sock_flag(sk, SOCK_DEAD)) {
+ /* Multiple threads might be blocked in recvmsg(),
+ * using prepare_to_wait_exclusive().
+ */
+ while (nb) {
+ INDIRECT_CALL_1(sk->sk_data_ready,
+ sock_def_readable, sk);
+ nb--;
+ }
+ }
+
+ if (unlikely(to_drop)) {
+ for (nb = 0; to_drop != NULL; nb++) {
+ skb = to_drop;
+ to_drop = skb->next;
+ skb_mark_not_on_list(skb);
+ /* TODO: update SNMP values. */
+ sk_skb_reason_drop(sk, skb, SKB_DROP_REASON_PROTO_MEM);
+ }
+ numa_drop_add(&udp_sk(sk)->drop_counters, nb);
+ }
- busylock_release(busy);
- return 0;
+ atomic_sub(total_size, &udp_prod_queue->rmem_alloc);
-uncharge_drop:
- atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
+ return 0;
drop:
udp_drops_inc(sk);
- busylock_release(busy);
return err;
}
EXPORT_IPV6_MOD_GPL(__udp_enqueue_schedule_skb);
@@ -1803,6 +1826,7 @@ void udp_destruct_common(struct sock *sk)
kfree_skb(skb);
}
udp_rmem_release(sk, total, 0, true);
+ kfree(up->udp_prod_queue);
}
EXPORT_IPV6_MOD_GPL(udp_destruct_common);
@@ -1814,10 +1838,11 @@ static void udp_destruct_sock(struct sock *sk)
int udp_init_sock(struct sock *sk)
{
- udp_lib_init_sock(sk);
+ int res = udp_lib_init_sock(sk);
+
sk->sk_destruct = udp_destruct_sock;
set_bit(SOCK_SUPPORT_ZC, &sk->sk_socket->flags);
- return 0;
+ return res;
}
void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)