summaryrefslogtreecommitdiff
path: root/net/rxrpc/io_thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc/io_thread.c')
-rw-r--r--net/rxrpc/io_thread.c319
1 files changed, 146 insertions, 173 deletions
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index bc65d83fab88..19aa315eddf5 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -9,6 +9,10 @@
#include "ar-internal.h"
+static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
+ struct sockaddr_rxrpc *peer_srx,
+ struct sk_buff *skb);
+
/*
* handle data received on the local endpoint
* - may be called in interrupt context
@@ -63,45 +67,19 @@ void rxrpc_error_report(struct sock *sk)
}
/*
- * post connection-level events to the connection
- * - this includes challenges, responses, some aborts and call terminal packet
- * retransmission.
+ * Process event packets targeted at a local endpoint.
*/
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
- struct sk_buff *skb)
+static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
{
- _enter("%p,%p", conn, skb);
-
- rxrpc_get_skb(skb, rxrpc_skb_get_conn_work);
- skb_queue_tail(&conn->rx_queue, skb);
- rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work);
-}
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ char v;
-/*
- * post endpoint-level events to the local endpoint
- * - this includes debug and version messages
- */
-static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
- struct sk_buff *skb)
-{
- _enter("%p,%p", local, skb);
+ _enter("");
- if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
- rxrpc_get_skb(skb, rxrpc_skb_get_local_work);
- skb_queue_tail(&local->event_queue, skb);
- rxrpc_queue_local(local);
- }
-}
-
-/*
- * put a packet up for transport-level abort
- */
-static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
-{
- if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
- rxrpc_get_skb(skb, rxrpc_skb_get_reject_work);
- skb_queue_tail(&local->reject_queue, skb);
- rxrpc_queue_local(local);
+ rxrpc_see_skb(skb, rxrpc_skb_see_version);
+ if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
+ if (v == 0)
+ rxrpc_send_version_request(local, &sp->hdr, skb);
}
}
@@ -156,22 +134,13 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
{
struct rxrpc_connection *conn;
struct sockaddr_rxrpc peer_srx;
- struct rxrpc_channel *chan;
- struct rxrpc_call *call = NULL;
struct rxrpc_skb_priv *sp;
struct rxrpc_peer *peer = NULL;
- struct rxrpc_sock *rx = NULL;
struct sk_buff *skb = *_skb;
- unsigned int channel;
-
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
+ int ret = 0;
skb_pull(skb, sizeof(struct udphdr));
- /* The UDP protocol already released all skb resources;
- * we are free to add our own data there.
- */
sp = rxrpc_skb(skb);
/* dig out the RxRPC connection details */
@@ -186,15 +155,13 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
}
}
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
trace_rxrpc_rx_packet(sp);
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_VERSION:
if (rxrpc_to_client(sp))
return 0;
- rxrpc_post_packet_to_local(local, skb);
+ rxrpc_input_version(local, skb);
return 0;
case RXRPC_PACKET_TYPE_BUSY:
@@ -259,7 +226,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
goto bad_message;
if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0))
- return 0; /* Unsupported address type - discard. */
+ return true; /* Unsupported address type - discard. */
if (peer_srx.transport.family != local->srx.transport.family &&
(peer_srx.transport.family == AF_INET &&
@@ -267,171 +234,172 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
peer_srx.transport.family,
local->srx.transport.family);
- return 0; /* Wrong address type - discard. */
+ return true; /* Wrong address type - discard. */
+ }
+
+ if (rxrpc_to_client(sp)) {
+ rcu_read_lock();
+ conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb);
+ conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
+ rcu_read_unlock();
+ if (!conn) {
+ trace_rxrpc_abort(0, "NCC", sp->hdr.cid,
+ sp->hdr.callNumber, sp->hdr.seq,
+ RXKADINCONSISTENCY, EBADMSG);
+ goto protocol_error;
+ }
+
+ ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
+ rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
+ return ret;
}
+ /* We need to look up service connections by the full protocol
+ * parameter set. We look up the peer first as an intermediate step
+ * and then the connection from the peer's tree.
+ */
rcu_read_lock();
- if (rxrpc_to_server(sp)) {
- /* Weed out packets to services we're not offering. Packets
- * that would begin a call are explicitly rejected and the rest
- * are just discarded.
- */
- rx = rcu_dereference(local->service);
- if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
- sp->hdr.serviceId != rx->second_service)
- ) {
- rcu_read_unlock();
- if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
- sp->hdr.seq == 1)
- goto unsupported_service;
- return 0;
- }
+ peer = rxrpc_lookup_peer_rcu(local, &peer_srx);
+ if (!peer) {
+ rcu_read_unlock();
+ return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb);
}
- conn = rxrpc_find_connection_rcu(local, &peer_srx, skb, &peer);
+ conn = rxrpc_find_service_conn_rcu(peer, skb);
+ conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
if (conn) {
- if (sp->hdr.securityIndex != conn->security_ix)
- goto wrong_security;
+ rcu_read_unlock();
+ ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
+ rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
+ return ret;
+ }
- if (sp->hdr.serviceId != conn->service_id) {
- int old_id;
+ peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input);
+ rcu_read_unlock();
- if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
- goto reupgrade;
- old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
- sp->hdr.serviceId);
+ ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb);
+ rxrpc_put_peer(peer, rxrpc_peer_put_input);
+ if (ret < 0)
+ goto reject_packet;
+ return 0;
- if (old_id != conn->orig_service_id &&
- old_id != sp->hdr.serviceId)
- goto reupgrade;
- }
+bad_message:
+ trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+ RX_PROTOCOL_ERROR, EBADMSG);
+protocol_error:
+ skb->priority = RX_PROTOCOL_ERROR;
+ skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
+reject_packet:
+ rxrpc_reject_packet(local, skb);
+ return ret;
+}
- if (sp->hdr.callNumber == 0) {
- /* Connection-level packet */
- _debug("CONN %p {%d}", conn, conn->debug_id);
- conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_conn_input);
- rcu_read_unlock();
- if (conn) {
- rxrpc_post_packet_to_conn(conn, skb);
- rxrpc_put_connection(conn, rxrpc_conn_put_conn_input);
- }
- return 0;
- }
+/*
+ * Deal with a packet that's associated with an extant connection.
+ */
+static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
+ struct sockaddr_rxrpc *peer_srx,
+ struct sk_buff *skb)
+{
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct rxrpc_channel *chan;
+ struct rxrpc_call *call = NULL;
+ unsigned int channel;
- if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
- conn->hi_serial = sp->hdr.serial;
+ if (sp->hdr.securityIndex != conn->security_ix)
+ goto wrong_security;
- /* Call-bound packets are routed by connection channel. */
- channel = sp->hdr.cid & RXRPC_CHANNELMASK;
- chan = &conn->channels[channel];
+ if (sp->hdr.serviceId != conn->service_id) {
+ int old_id;
- /* Ignore really old calls */
- if (sp->hdr.callNumber < chan->last_call) {
- rcu_read_unlock();
- return 0;
- }
+ if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
+ goto reupgrade;
+ old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
+ sp->hdr.serviceId);
- if (sp->hdr.callNumber == chan->last_call) {
- if (chan->call ||
- sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) {
- rcu_read_unlock();
- return 0;
- }
+ if (old_id != conn->orig_service_id &&
+ old_id != sp->hdr.serviceId)
+ goto reupgrade;
+ }
- /* For the previous service call, if completed
- * successfully, we discard all further packets.
- */
- if (rxrpc_conn_is_service(conn) &&
- chan->last_type == RXRPC_PACKET_TYPE_ACK) {
- rcu_read_unlock();
- return 0;
- }
+ if (after(sp->hdr.serial, conn->hi_serial))
+ conn->hi_serial = sp->hdr.serial;
- /* But otherwise we need to retransmit the final packet
- * from data cached in the connection record.
- */
- if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
- trace_rxrpc_rx_data(chan->call_debug_id,
- sp->hdr.seq,
- sp->hdr.serial,
- sp->hdr.flags);
- conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
- rcu_read_unlock();
- if (conn) {
- rxrpc_post_packet_to_conn(conn, skb);
- rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
- }
+ /* It's a connection-level packet if the call number is 0. */
+ if (sp->hdr.callNumber == 0)
+ return rxrpc_input_conn_packet(conn, skb);
+
+ /* Call-bound packets are routed by connection channel. */
+ channel = sp->hdr.cid & RXRPC_CHANNELMASK;
+ chan = &conn->channels[channel];
+
+ /* Ignore really old calls */
+ if (sp->hdr.callNumber < chan->last_call)
+ return 0;
+
+ if (sp->hdr.callNumber == chan->last_call) {
+ if (chan->call ||
+ sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
return 0;
- }
- call = rcu_dereference(chan->call);
+ /* For the previous service call, if completed successfully, we
+ * discard all further packets.
+ */
+ if (rxrpc_conn_is_service(conn) &&
+ chan->last_type == RXRPC_PACKET_TYPE_ACK)
+ return 0;
- if (sp->hdr.callNumber > chan->call_id) {
- if (rxrpc_to_client(sp)) {
- rcu_read_unlock();
- goto reject_packet;
- }
- if (call) {
- rxrpc_input_implicit_end_call(conn, call);
- chan->call = NULL;
- call = NULL;
- }
- }
+ /* But otherwise we need to retransmit the final packet from
+ * data cached in the connection record.
+ */
+ if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
+ trace_rxrpc_rx_data(chan->call_debug_id,
+ sp->hdr.seq,
+ sp->hdr.serial,
+ sp->hdr.flags);
+ rxrpc_input_conn_packet(conn, skb);
+ return 0;
+ }
- if (call && !rxrpc_try_get_call(call, rxrpc_call_get_input))
- call = NULL;
+ rcu_read_lock();
+ call = rxrpc_try_get_call(rcu_dereference(chan->call),
+ rxrpc_call_get_input);
+ rcu_read_unlock();
+
+ if (sp->hdr.callNumber > chan->call_id) {
+ if (rxrpc_to_client(sp)) {
+ rxrpc_put_call(call, rxrpc_call_put_input);
+ goto reject_packet;
+ }
if (call) {
- if (sp->hdr.serviceId != call->dest_srx.srx_service)
- call->dest_srx.srx_service = sp->hdr.serviceId;
- if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
- call->rx_serial = sp->hdr.serial;
- if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
- set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
+ rxrpc_implicit_end_call(call, skb);
+ rxrpc_put_call(call, rxrpc_call_put_input);
+ call = NULL;
}
}
if (!call) {
- if (rxrpc_to_client(sp) ||
- sp->hdr.type != RXRPC_PACKET_TYPE_DATA) {
- rcu_read_unlock();
+ if (rxrpc_to_client(sp))
goto bad_message;
- }
- if (sp->hdr.seq != 1) {
- rcu_read_unlock();
+ if (rxrpc_new_incoming_call(conn->local, conn->peer, conn,
+ peer_srx, skb))
return 0;
- }
- call = rxrpc_new_incoming_call(local, rx, &peer_srx, skb);
- if (!call) {
- rcu_read_unlock();
- goto reject_packet;
- }
+ goto reject_packet;
}
- rcu_read_unlock();
-
- /* Process a call packet. */
rxrpc_input_call_event(call, skb);
rxrpc_put_call(call, rxrpc_call_put_input);
- trace_rxrpc_rx_done(0, 0);
return 0;
wrong_security:
- rcu_read_unlock();
trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RXKADINCONSISTENCY, EBADMSG);
skb->priority = RXKADINCONSISTENCY;
goto post_abort;
-unsupported_service:
- trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
- RX_INVALID_OPERATION, EOPNOTSUPP);
- skb->priority = RX_INVALID_OPERATION;
- goto post_abort;
-
reupgrade:
- rcu_read_unlock();
trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RX_PROTOCOL_ERROR, EBADMSG);
goto protocol_error;
@@ -444,7 +412,7 @@ protocol_error:
post_abort:
skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
reject_packet:
- rxrpc_reject_packet(local, skb);
+ rxrpc_reject_packet(conn->local, skb);
return 0;
}
@@ -479,6 +447,11 @@ int rxrpc_io_thread(void *data)
continue;
}
+ if (!list_empty(&local->ack_tx_queue)) {
+ rxrpc_transmit_ack_packets(local);
+ continue;
+ }
+
/* Process received packets and errors. */
if ((skb = __skb_dequeue(&rx_queue))) {
switch (skb->mark) {