summaryrefslogtreecommitdiff
path: root/net/rxrpc/call_object.c
diff options
context:
space:
mode:
authorJakub Kicinski <kuba@kernel.org>2024-12-09 13:48:35 -0800
committerJakub Kicinski <kuba@kernel.org>2024-12-09 13:48:37 -0800
commitf9663b7cafa5e15b700efc1fdfabe33e31c133e7 (patch)
treea97e28155f864d42668731c195828491f01e33d1 /net/rxrpc/call_object.c
parent6145fefc1e42c1895c0c1c2c8593de2c085d8c56 (diff)
parent7c482665931b6ce7bc72fa5feae6c35567070296 (diff)
Merge branch 'rxrpc-implement-jumbo-data-transmission-and-rack-tlp'
David Howells says: ==================== rxrpc: Implement jumbo DATA transmission and RACK-TLP Here's a series of patches to implement two main features: (1) The transmission of jumbo data packets whereby several DATA packets of a particular size can be glued together into a single UDP packet, allowing us to make use of larger MTU sizes. The basic jumbo subpacket capacity is 1412 bytes (RXRPC_JUMBO_DATALEN) and, say, an MTU of 8192 allows five of them to be transmitted as one. An alternative (and possibly more efficient way) would be to expand/shrink the capacity of each DATA packet to match the MTU and thus save on header and tail-gap overhead, but the Rx protocol does not provide a mechanism for splitting the data - especially as the transported data is encrypted per-packet - and so UDP fragmentation would be the only way to handle this. In fact, in the future, AF_RXRPC also needs to look at shrinking the packet size where the MTU is smaller - for instance in the case of being carried by IPv6 over wifi where there isn't capacity for a 1412 byte capacity. (2) RACK-TLP to manage packet loss and retransmission in conjunction with the congestion control algorithm. These allow for better data throughput and work towards being able to have larger transmission windows. To this end, the following changes are also made: (1) Use a single large array of kvec structs for the I/O thread rather than having one per transmission buffer. We need a much bigger collection of kvecs for ping padding (2) Implement path-MTU probing by sending padded PING ACK packets and monitoring for PING RESPONSE ACKs. The pmtud value determined is used to configure the construction of jumbo DATA packets. (3) The transmission queue is changed from a linked list of transmission buffer structs to a linked list of transmission-queue structs, each of which points to either 32 or 64 transmission buffers (depending on cpu word size) and various bits of metadata are concentrated in the queue structs rather than the buffers to make better use of the cpu cache. (4) SACK data is stored in the transmission-queue structures in batches of 32 or 64 making it faster to process rather than being spread amongst all the individual packet buffers. (5) Don't change the DF flag on the UDP socket unless we need to - and basically only enable it for path-MTU probing. There are also some additional bits: (1) Fix the handling of connection aborts to poke the aborted connections. (2) Don't set the MORE-PACKETS Rx header flag on the wire. No one actually checks it and it is, in any case, generated inconsistently between implementations. (3) Request an ACK when, during call transmission, there's a stall in the app generating the data to be transmitted. (4) Fix attention starvation in the I/O thread by making sure we go through all outstanding events rather than returning to the beginning of the check cycle after any time we process an event. (5) Don't use the skbuff timestamp in the calculation of timeouts and RTT as we really should include local processing time in that too. Further, getting receive skbuff timestamps may be expensive. (6) Make RTT tracking per call with the saving of the value between calls, even within the same connection channel. The initial call timeout starts off large to allow the server time to set up its state before the initial reply. (7) Don't allocate txbuf structs for ACK packets, but rather use page frags and MSG_SPLICE_PAGES. (8) Use irq-disabling locks for interactions between app threads and I/O threads so that the I/O thread doesn't get help up. (9) Make rxrpc set the REQUEST-ACK flag on an outgoing packet when cwnd is at RXRPC_MIN_CWND (currently 4), not at 2 which it can never reach. (10) Add some tracing bits and pieces (including displaying the userStatus field in an ACK header) and some more stats counters (including different sizes of jumbo packets sent/received). Link: https://lore.kernel.org/r/20240306000655.1100294-1-dhowells@redhat.com/ [1] ==================== Link: https://patch.msgid.link/20241204074710.990092-1-dhowells@redhat.com Signed-off-by: Jakub Kicinski <kuba@kernel.org>
Diffstat (limited to 'net/rxrpc/call_object.c')
-rw-r--r--net/rxrpc/call_object.c66
1 files changed, 36 insertions, 30 deletions
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index f9e983a12c14..5a543c3f6fb0 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -49,7 +49,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
bool busy;
if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) {
- spin_lock_bh(&local->lock);
+ spin_lock_irq(&local->lock);
busy = !list_empty(&call->attend_link);
trace_rxrpc_poke_call(call, busy, what);
if (!busy && !rxrpc_try_get_call(call, rxrpc_call_get_poke))
@@ -57,7 +57,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
if (!busy) {
list_add_tail(&call->attend_link, &local->call_attend_q);
}
- spin_unlock_bh(&local->lock);
+ spin_unlock_irq(&local->lock);
if (!busy)
rxrpc_wake_up_io_thread(local);
}
@@ -146,23 +146,21 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
INIT_LIST_HEAD(&call->attend_link);
- INIT_LIST_HEAD(&call->tx_sendmsg);
- INIT_LIST_HEAD(&call->tx_buffer);
+ skb_queue_head_init(&call->rx_queue);
skb_queue_head_init(&call->recvmsg_queue);
skb_queue_head_init(&call->rx_oos_queue);
init_waitqueue_head(&call->waitq);
spin_lock_init(&call->notify_lock);
- spin_lock_init(&call->tx_lock);
refcount_set(&call->ref, 1);
call->debug_id = debug_id;
call->tx_total_len = -1;
+ call->tx_jumbo_max = 1;
call->next_rx_timo = 20 * HZ;
call->next_req_timo = 1 * HZ;
call->ackr_window = 1;
call->ackr_wtop = 1;
call->delay_ack_at = KTIME_MAX;
- call->ack_lost_at = KTIME_MAX;
- call->resend_at = KTIME_MAX;
+ call->rack_timo_at = KTIME_MAX;
call->ping_at = KTIME_MAX;
call->keepalive_at = KTIME_MAX;
call->expect_rx_by = KTIME_MAX;
@@ -177,6 +175,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
call->cong_cwnd = RXRPC_MIN_CWND;
call->cong_ssthresh = RXRPC_TX_MAX_WINDOW;
+ rxrpc_call_init_rtt(call);
+
call->rxnet = rxnet;
call->rtt_avail = RXRPC_CALL_RTT_AVAIL_MASK;
atomic_inc(&rxnet->nr_calls);
@@ -220,9 +220,9 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
__set_bit(RXRPC_CALL_EXCLUSIVE, &call->flags);
if (p->timeouts.normal)
- call->next_rx_timo = min(p->timeouts.normal, 1);
+ call->next_rx_timo = umin(p->timeouts.normal, 1);
if (p->timeouts.idle)
- call->next_req_timo = min(p->timeouts.idle, 1);
+ call->next_req_timo = umin(p->timeouts.idle, 1);
if (p->timeouts.hard)
call->hard_timo = p->timeouts.hard;
@@ -302,9 +302,9 @@ static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
trace_rxrpc_client(NULL, -1, rxrpc_client_queue_new_call);
rxrpc_get_call(call, rxrpc_call_get_io_thread);
- spin_lock(&local->client_call_lock);
+ spin_lock_irq(&local->client_call_lock);
list_add_tail(&call->wait_link, &local->new_client_calls);
- spin_unlock(&local->client_call_lock);
+ spin_unlock_irq(&local->client_call_lock);
rxrpc_wake_up_io_thread(local);
return 0;
@@ -434,7 +434,7 @@ error_attached_to_socket:
/*
* Set up an incoming call. call->conn points to the connection.
- * This is called in BH context and isn't allowed to fail.
+ * This is called with interrupts disabled and isn't allowed to fail.
*/
void rxrpc_incoming_call(struct rxrpc_sock *rx,
struct rxrpc_call *call,
@@ -531,11 +531,29 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
}
/*
- * Clean up the Rx skb ring.
+ * Clean up the transmission buffers.
+ */
+static void rxrpc_cleanup_tx_buffers(struct rxrpc_call *call)
+{
+ struct rxrpc_txqueue *tq, *next;
+
+ for (tq = call->tx_queue; tq; tq = next) {
+ next = tq->next;
+ for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
+ if (tq->bufs[i])
+ rxrpc_put_txbuf(tq->bufs[i], rxrpc_txbuf_put_cleaned);
+ trace_rxrpc_tq(call, tq, 0, rxrpc_tq_cleaned);
+ kfree(tq);
+ }
+}
+
+/*
+ * Clean up the receive buffers.
*/
-static void rxrpc_cleanup_ring(struct rxrpc_call *call)
+static void rxrpc_cleanup_rx_buffers(struct rxrpc_call *call)
{
rxrpc_purge_queue(&call->recvmsg_queue);
+ rxrpc_purge_queue(&call->rx_queue);
rxrpc_purge_queue(&call->rx_oos_queue);
}
@@ -558,7 +576,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
rxrpc_put_call_slot(call);
/* Make sure we don't get any more notifications */
- spin_lock(&rx->recvmsg_lock);
+ spin_lock_irq(&rx->recvmsg_lock);
if (!list_empty(&call->recvmsg_link)) {
_debug("unlinking once-pending call %p { e=%lx f=%lx }",
@@ -571,7 +589,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
call->recvmsg_link.next = NULL;
call->recvmsg_link.prev = NULL;
- spin_unlock(&rx->recvmsg_lock);
+ spin_unlock_irq(&rx->recvmsg_lock);
if (put)
rxrpc_put_call(call, rxrpc_call_put_unnotify);
@@ -671,23 +689,11 @@ static void rxrpc_rcu_free_call(struct rcu_head *rcu)
static void rxrpc_destroy_call(struct work_struct *work)
{
struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
- struct rxrpc_txbuf *txb;
del_timer_sync(&call->timer);
- rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
- rxrpc_cleanup_ring(call);
- while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
- struct rxrpc_txbuf, call_link))) {
- list_del(&txb->call_link);
- rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
- }
- while ((txb = list_first_entry_or_null(&call->tx_buffer,
- struct rxrpc_txbuf, call_link))) {
- list_del(&txb->call_link);
- rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
- }
-
+ rxrpc_cleanup_tx_buffers(call);
+ rxrpc_cleanup_rx_buffers(call);
rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
rxrpc_deactivate_bundle(call->bundle);