summaryrefslogtreecommitdiff
path: root/net/rxrpc/input.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2022-10-07 17:44:39 +0100
committerDavid Howells <dhowells@redhat.com>2022-11-08 16:42:28 +0000
commitd4d02d8bb5c412d977af7ea7c7ea91977a6a64dc (patch)
treefa01723df80b270b8b8ccad43c6c97129c79b60c /net/rxrpc/input.c
parentfaf92e8d53f5f03842da25af971a3f0ef88ffba2 (diff)
rxrpc: Clone received jumbo subpackets and queue separately
Split up received jumbo packets into separate skbuffs by cloning the original skbuff for each subpacket and setting the offset and length of the data in that subpacket in the skbuff's private data. The subpackets are then placed on the recvmsg queue separately. The security class then gets to revise the offset and length to remove its metadata. If we fail to clone a packet, we just drop it and let the peer resend it. The original packet gets used for the final subpacket. This should make it easier to handle parallel decryption of the subpackets. It also simplifies the handling of lost or misordered packets in the queuing/buffering loop as the possibility of overlapping jumbo packets no longer needs to be considered. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/input.c')
-rw-r--r--net/rxrpc/input.c401
1 files changed, 185 insertions, 216 deletions
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 4df1bdc4de6c..0e7545ed0128 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -313,78 +313,178 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
}
/*
- * Scan a data packet to validate its structure and to work out how many
- * subpackets it contains.
- *
- * A jumbo packet is a collection of consecutive packets glued together with
- * little headers between that indicate how to change the initial header for
- * each subpacket.
- *
- * RXRPC_JUMBO_PACKET must be set on all but the last subpacket - and all but
- * the last are RXRPC_JUMBO_DATALEN in size. The last subpacket may be of any
- * size.
+ * Process a DATA packet, adding the packet to the Rx ring. The caller's
+ * packet ref must be passed on or discarded.
*/
-static bool rxrpc_validate_data(struct sk_buff *skb)
+static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
- unsigned int offset = sizeof(struct rxrpc_wire_header);
- unsigned int len = skb->len;
- u8 flags = sp->hdr.flags;
+ rxrpc_serial_t serial = sp->hdr.serial;
+ rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
+ unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
+ bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
+ bool acked = false;
- for (;;) {
- if (flags & RXRPC_REQUEST_ACK)
- __set_bit(sp->nr_subpackets, sp->rx_req_ack);
- sp->nr_subpackets++;
+ rxrpc_inc_stat(call->rxnet, stat_rx_data);
+ if (sp->hdr.flags & RXRPC_REQUEST_ACK)
+ rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack);
+ if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
+ rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
- if (!(flags & RXRPC_JUMBO_PACKET))
- break;
+ hard_ack = READ_ONCE(call->rx_hard_ack);
- if (len - offset < RXRPC_JUMBO_SUBPKTLEN)
- goto protocol_error;
- if (flags & RXRPC_LAST_PACKET)
- goto protocol_error;
- offset += RXRPC_JUMBO_DATALEN;
- if (skb_copy_bits(skb, offset, &flags, 1) < 0)
- goto protocol_error;
- offset += sizeof(struct rxrpc_jumbo_header);
+ _proto("Rx DATA %%%u { #%x l=%u }", serial, seq, last);
+
+ if (last) {
+ if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
+ seq != call->rx_top) {
+ rxrpc_proto_abort("LSN", call, seq);
+ goto out;
+ }
+ } else {
+ if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
+ after_eq(seq, call->rx_top)) {
+ rxrpc_proto_abort("LSA", call, seq);
+ goto out;
+ }
}
- if (flags & RXRPC_LAST_PACKET)
- sp->rx_flags |= RXRPC_SKB_INCL_LAST;
- return true;
+ trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags);
-protocol_error:
- return false;
+ if (before_eq(seq, hard_ack)) {
+ rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
+ rxrpc_propose_ack_input_data);
+ goto out;
+ }
+
+ if (call->rxtx_buffer[ix]) {
+ rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
+ rxrpc_propose_ack_input_data);
+ goto out;
+ }
+
+ if (after(seq, hard_ack + call->rx_winsize)) {
+ rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
+ rxrpc_propose_ack_input_data);
+ goto out;
+ }
+
+ if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
+ rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
+ rxrpc_propose_ack_input_data);
+ acked = true;
+ }
+
+ if (after(seq, call->ackr_highest_seq))
+ call->ackr_highest_seq = seq;
+
+ /* Queue the packet. We use a couple of memory barriers here as need
+ * to make sure that rx_top is perceived to be set after the buffer
+ * pointer and that the buffer pointer is set after the annotation and
+ * the skb data.
+ *
+ * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
+ * and also rxrpc_fill_out_ack().
+ */
+ call->rxtx_annotations[ix] = 1;
+ smp_wmb();
+ call->rxtx_buffer[ix] = skb;
+ if (after(seq, call->rx_top)) {
+ smp_store_release(&call->rx_top, seq);
+ } else if (before(seq, call->rx_top)) {
+ /* Send an immediate ACK if we fill in a hole */
+ if (!acked) {
+ rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
+ rxrpc_propose_ack_input_data_hole);
+ acked = true;
+ }
+ }
+
+ /* From this point on, we're not allowed to touch the packet any longer
+ * as its ref now belongs to the Rx ring.
+ */
+ skb = NULL;
+ sp = NULL;
+
+ if (last) {
+ set_bit(RXRPC_CALL_RX_LAST, &call->flags);
+ trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
+ } else {
+ trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
+ }
+
+ if (after_eq(seq, call->rx_expect_next)) {
+ if (after(seq, call->rx_expect_next)) {
+ _net("OOS %u > %u", seq, call->rx_expect_next);
+ rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
+ rxrpc_propose_ack_input_data);
+ acked = true;
+ }
+ call->rx_expect_next = seq + 1;
+ }
+
+out:
+ if (!acked &&
+ atomic_inc_return(&call->ackr_nr_unacked) > 2)
+ rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+ rxrpc_propose_ack_input_data);
+ else
+ rxrpc_propose_delay_ACK(call, serial,
+ rxrpc_propose_ack_input_data);
+
+ trace_rxrpc_notify_socket(call->debug_id, serial);
+ rxrpc_notify_socket(call);
+
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+ _leave(" [queued]");
}
/*
- * Handle reception of a duplicate packet.
- *
- * We have to take care to avoid an attack here whereby we're given a series of
- * jumbograms, each with a sequence number one before the preceding one and
- * filled up to maximum UDP size. If they never send us the first packet in
- * the sequence, they can cause us to have to hold on to around 2MiB of kernel
- * space until the call times out.
- *
- * We limit the space usage by only accepting three duplicate jumbo packets per
- * call. After that, we tell the other side we're no longer accepting jumbos
- * (that information is encoded in the ACK packet).
+ * Split a jumbo packet and file the bits separately.
*/
-static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
- bool is_jumbo, bool *_jumbo_bad)
+static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb)
{
- /* Discard normal packets that are duplicates. */
- if (is_jumbo)
- return;
+ struct rxrpc_jumbo_header jhdr;
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb), *jsp;
+ struct sk_buff *jskb;
+ unsigned int offset = sizeof(struct rxrpc_wire_header);
+ unsigned int len = skb->len - offset;
- /* Skip jumbo subpackets that are duplicates. When we've had three or
- * more partially duplicate jumbo packets, we refuse to take any more
- * jumbos for this call.
- */
- if (!*_jumbo_bad) {
- call->nr_jumbo_bad++;
- *_jumbo_bad = true;
+ while (sp->hdr.flags & RXRPC_JUMBO_PACKET) {
+ if (len < RXRPC_JUMBO_SUBPKTLEN)
+ goto protocol_error;
+ if (sp->hdr.flags & RXRPC_LAST_PACKET)
+ goto protocol_error;
+ if (skb_copy_bits(skb, offset + RXRPC_JUMBO_DATALEN,
+ &jhdr, sizeof(jhdr)) < 0)
+ goto protocol_error;
+
+ jskb = skb_clone(skb, GFP_ATOMIC);
+ if (!jskb) {
+ kdebug("couldn't clone");
+ return false;
+ }
+ rxrpc_new_skb(jskb, rxrpc_skb_cloned_jumbo);
+ jsp = rxrpc_skb(jskb);
+ jsp->offset = offset;
+ jsp->len = RXRPC_JUMBO_DATALEN;
+ rxrpc_input_data_one(call, jskb);
+
+ sp->hdr.flags = jhdr.flags;
+ sp->hdr._rsvd = ntohs(jhdr._rsvd);
+ sp->hdr.seq++;
+ sp->hdr.serial++;
+ offset += RXRPC_JUMBO_SUBPKTLEN;
+ len -= RXRPC_JUMBO_SUBPKTLEN;
}
+
+ sp->offset = offset;
+ sp->len = len;
+ rxrpc_input_data_one(call, skb);
+ return true;
+
+protocol_error:
+ return false;
}
/*
@@ -395,16 +495,14 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
enum rxrpc_call_state state;
- unsigned int j, nr_subpackets, nr_unacked = 0;
- rxrpc_serial_t serial = sp->hdr.serial, ack_serial = serial;
- rxrpc_seq_t seq0 = sp->hdr.seq, hard_ack;
- bool jumbo_bad = false;
+ rxrpc_serial_t serial = sp->hdr.serial;
+ rxrpc_seq_t seq0 = sp->hdr.seq;
_enter("{%u,%u},{%u,%u}",
call->rx_hard_ack, call->rx_top, skb->len, seq0);
- _proto("Rx DATA %%%u { #%u f=%02x n=%u }",
- sp->hdr.serial, seq0, sp->hdr.flags, sp->nr_subpackets);
+ _proto("Rx DATA %%%u { #%u f=%02x }",
+ sp->hdr.serial, seq0, sp->hdr.flags);
state = READ_ONCE(call->state);
if (state >= RXRPC_CALL_COMPLETE) {
@@ -412,6 +510,24 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
return;
}
+ /* Unshare the packet so that it can be modified for in-place
+ * decryption.
+ */
+ if (sp->hdr.securityIndex != 0) {
+ struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
+ if (!nskb) {
+ rxrpc_eaten_skb(skb, rxrpc_skb_unshared_nomem);
+ return;
+ }
+
+ if (nskb != skb) {
+ rxrpc_eaten_skb(skb, rxrpc_skb_received);
+ skb = nskb;
+ rxrpc_new_skb(skb, rxrpc_skb_unshared);
+ sp = rxrpc_skb(skb);
+ }
+ }
+
if (state == RXRPC_CALL_SERVER_RECV_REQUEST) {
unsigned long timo = READ_ONCE(call->next_req_timo);
unsigned long now, expect_req_by;
@@ -425,12 +541,6 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
}
}
- rxrpc_inc_stat(call->rxnet, stat_rx_data);
- if (sp->hdr.flags & RXRPC_REQUEST_ACK)
- rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack);
- if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
- rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
-
spin_lock(&call->input_lock);
/* Received data implicitly ACKs all of the request packets we sent
@@ -439,154 +549,15 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
if ((state == RXRPC_CALL_CLIENT_SEND_REQUEST ||
state == RXRPC_CALL_CLIENT_AWAIT_REPLY) &&
!rxrpc_receiving_reply(call))
- goto unlock;
-
- hard_ack = READ_ONCE(call->rx_hard_ack);
-
- nr_subpackets = sp->nr_subpackets;
- if (nr_subpackets > 1) {
- if (call->nr_jumbo_bad > 3) {
- rxrpc_send_ACK(call, RXRPC_ACK_NOSPACE, serial,
- rxrpc_propose_ack_input_data);
- goto unlock;
- }
- }
-
- for (j = 0; j < nr_subpackets; j++) {
- rxrpc_serial_t serial = sp->hdr.serial + j;
- rxrpc_seq_t seq = seq0 + j;
- unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
- bool terminal = (j == nr_subpackets - 1);
- bool last = terminal && (sp->rx_flags & RXRPC_SKB_INCL_LAST);
- bool acked = false;
- u8 flags, annotation = j;
-
- _proto("Rx DATA+%u %%%u { #%x t=%u l=%u }",
- j, serial, seq, terminal, last);
-
- if (last) {
- if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- seq != call->rx_top) {
- rxrpc_proto_abort("LSN", call, seq);
- goto unlock;
- }
- } else {
- if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- after_eq(seq, call->rx_top)) {
- rxrpc_proto_abort("LSA", call, seq);
- goto unlock;
- }
- }
-
- flags = 0;
- if (last)
- flags |= RXRPC_LAST_PACKET;
- if (!terminal)
- flags |= RXRPC_JUMBO_PACKET;
- if (test_bit(j, sp->rx_req_ack))
- flags |= RXRPC_REQUEST_ACK;
- trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
-
- if (before_eq(seq, hard_ack)) {
- rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
- rxrpc_propose_ack_input_data);
- continue;
- }
-
- if (call->rxtx_buffer[ix]) {
- rxrpc_input_dup_data(call, seq, nr_subpackets > 1,
- &jumbo_bad);
- rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
- rxrpc_propose_ack_input_data);
- continue;
- }
-
- if (after(seq, hard_ack + call->rx_winsize)) {
- rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
- rxrpc_propose_ack_input_data);
- if (flags & RXRPC_JUMBO_PACKET) {
- if (!jumbo_bad) {
- call->nr_jumbo_bad++;
- jumbo_bad = true;
- }
- }
-
- goto unlock;
- }
-
- if (flags & RXRPC_REQUEST_ACK) {
- rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
- rxrpc_propose_ack_input_data);
- acked = true;
- }
-
- if (after(seq0, call->ackr_highest_seq))
- call->ackr_highest_seq = seq0;
-
- /* Queue the packet. We use a couple of memory barriers here as need
- * to make sure that rx_top is perceived to be set after the buffer
- * pointer and that the buffer pointer is set after the annotation and
- * the skb data.
- *
- * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
- * and also rxrpc_fill_out_ack().
- */
- if (!terminal)
- rxrpc_get_skb(skb, rxrpc_skb_got);
- call->rxtx_annotations[ix] = annotation;
- smp_wmb();
- call->rxtx_buffer[ix] = skb;
- if (after(seq, call->rx_top)) {
- smp_store_release(&call->rx_top, seq);
- } else if (before(seq, call->rx_top)) {
- /* Send an immediate ACK if we fill in a hole */
- if (!acked) {
- rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
- rxrpc_propose_ack_input_data_hole);
- acked = true;
- }
- }
-
- if (terminal) {
- /* From this point on, we're not allowed to touch the
- * packet any longer as its ref now belongs to the Rx
- * ring.
- */
- skb = NULL;
- sp = NULL;
- }
-
- if (last) {
- set_bit(RXRPC_CALL_RX_LAST, &call->flags);
- trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
- } else {
- trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
- }
-
- if (after_eq(seq, call->rx_expect_next)) {
- if (after(seq, call->rx_expect_next)) {
- _net("OOS %u > %u", seq, call->rx_expect_next);
- rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
- rxrpc_propose_ack_input_data);
- acked = true;
- }
- call->rx_expect_next = seq + 1;
- }
+ goto out;
- if (!acked) {
- nr_unacked++;
- ack_serial = serial;
- }
+ if (!rxrpc_input_split_jumbo(call, skb)) {
+ rxrpc_proto_abort("VLD", call, sp->hdr.seq);
+ goto out;
}
+ skb = NULL;
-unlock:
- if (atomic_add_return(nr_unacked, &call->ackr_nr_unacked) > 2)
- rxrpc_send_ACK(call, RXRPC_ACK_IDLE, ack_serial,
- rxrpc_propose_ack_input_data);
- else
- rxrpc_propose_delay_ACK(call, ack_serial,
- rxrpc_propose_ack_input_data);
-
+out:
trace_rxrpc_notify_socket(call->debug_id, serial);
rxrpc_notify_socket(call);
@@ -1288,8 +1259,6 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
if (sp->hdr.callNumber == 0 ||
sp->hdr.seq == 0)
goto bad_message;
- if (!rxrpc_validate_data(skb))
- goto bad_message;
/* Unshare the packet so that it can be modified for in-place
* decryption.
@@ -1403,7 +1372,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
trace_rxrpc_rx_data(chan->call_debug_id,
sp->hdr.seq,
sp->hdr.serial,
- sp->hdr.flags, 0);
+ sp->hdr.flags);
rxrpc_post_packet_to_conn(conn, skb);
goto out;
}