summaryrefslogtreecommitdiff
path: root/net/rxrpc/local_object.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2022-10-10 08:45:20 +0100
committerDavid Howells <dhowells@redhat.com>2022-12-01 13:36:40 +0000
commita275da62e8c111b897b9cb73eb91df2f4e475ca5 (patch)
tree44f1e9d3e31acb0992297eda75a03dfc57c7af90 /net/rxrpc/local_object.c
parent96b2d69b43a075a38df600597133f17d28525f24 (diff)
rxrpc: Create a per-local endpoint receive queue and I/O thread
Create a per-local receive queue to which, in a future patch, all incoming packets will be directed and an I/O thread that will process those packets and perform all transmission of packets. Destruction of the local endpoint is also moved from the local processor work item (which will be absorbed) to the thread. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/local_object.c')
-rw-r--r--net/rxrpc/local_object.c39
1 files changed, 22 insertions, 17 deletions
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 1617ce651b9b..7c61349984e3 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -103,6 +103,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
init_rwsem(&local->defrag_sem);
skb_queue_head_init(&local->reject_queue);
skb_queue_head_init(&local->event_queue);
+ skb_queue_head_init(&local->rx_queue);
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);
spin_lock_init(&local->lock);
@@ -126,6 +127,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
struct udp_tunnel_sock_cfg tuncfg = {NULL};
struct sockaddr_rxrpc *srx = &local->srx;
struct udp_port_cfg udp_conf = {0};
+ struct task_struct *io_thread;
struct sock *usk;
int ret;
@@ -185,8 +187,23 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
BUG();
}
+ io_thread = kthread_run(rxrpc_io_thread, local,
+ "krxrpcio/%u", ntohs(udp_conf.local_udp_port));
+ if (IS_ERR(io_thread)) {
+ ret = PTR_ERR(io_thread);
+ goto error_sock;
+ }
+
+ local->io_thread = io_thread;
_leave(" = 0");
return 0;
+
+error_sock:
+ kernel_sock_shutdown(local->socket, SHUT_RDWR);
+ local->socket->sk->sk_user_data = NULL;
+ sock_release(local->socket);
+ local->socket = NULL;
+ return ret;
}
/*
@@ -360,19 +377,8 @@ struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local,
*/
void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
{
- unsigned int debug_id;
- int r, u;
-
- if (local) {
- debug_id = local->debug_id;
- r = refcount_read(&local->ref);
- u = atomic_dec_return(&local->active_users);
- trace_rxrpc_local(debug_id, why, r, u);
- if (u == 0) {
- rxrpc_get_local(local, rxrpc_local_get_queue);
- rxrpc_queue_local(local);
- }
- }
+ if (local && __rxrpc_unuse_local(local, why))
+ kthread_stop(local->io_thread);
}
/*
@@ -382,7 +388,7 @@ void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
* Closing the socket cannot be done from bottom half context or RCU callback
* context because it might sleep.
*/
-static void rxrpc_local_destroyer(struct rxrpc_local *local)
+void rxrpc_destroy_local(struct rxrpc_local *local)
{
struct socket *socket = local->socket;
struct rxrpc_net *rxnet = local->rxnet;
@@ -411,6 +417,7 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
*/
rxrpc_purge_queue(&local->reject_queue);
rxrpc_purge_queue(&local->event_queue);
+ rxrpc_purge_queue(&local->rx_queue);
}
/*
@@ -430,10 +437,8 @@ static void rxrpc_local_processor(struct work_struct *work)
do {
again = false;
- if (!__rxrpc_use_local(local, rxrpc_local_use_work)) {
- rxrpc_local_destroyer(local);
+ if (!__rxrpc_use_local(local, rxrpc_local_use_work))
break;
- }
if (!list_empty(&local->ack_tx_queue)) {
rxrpc_transmit_ack_packets(local);