diff options
author | David Howells <dhowells@redhat.com> | 2020-01-23 13:13:41 +0000 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2022-12-01 13:36:42 +0000 |
commit | 5e6ef4f1017c7f844e305283bbd8875af475e2fc (patch) | |
tree | a1af5b9ab3f538d84a50214be6c41fd0700d4bca /net/rxrpc/local_object.c | |
parent | 393a2a2007d13df7ae54c94328b45b6c2269b6a9 (diff) |
rxrpc: Make the I/O thread take over the call and local processor work
Move the functions from the call->processor and local->processor work items
into the domain of the I/O thread.
The call event processor, now called from the I/O thread, then takes over
the job of cranking the call state machine, processing incoming packets and
transmitting DATA, ACK and ABORT packets. In a future patch,
rxrpc_send_ACK() will transmit the ACK on the spot rather than queuing it
for later transmission.
The call event processor becomes purely received-skb driven. It only
transmits things in response to events. We use "pokes" to queue a dummy
skb to make it do things like start/resume transmitting data. Timer expiry
also results in pokes.
The connection event processor, becomes similar, though crypto events, such
as dealing with CHALLENGE and RESPONSE packets is offloaded to a work item
to avoid doing crypto in the I/O thread.
The local event processor is removed and VERSION response packets are
generated directly from the packet parser. Similarly, ABORTs generated in
response to protocol errors will be transmitted immediately rather than
being pushed onto a queue for later transmission.
Changes:
========
ver #2)
- Fix a couple of introduced lock context imbalances.
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/local_object.c')
-rw-r--r-- | net/rxrpc/local_object.c | 69 |
1 files changed, 1 insertions, 68 deletions
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c index 03f491cc23ef..c73a5a1bc088 100644 --- a/net/rxrpc/local_object.c +++ b/net/rxrpc/local_object.c @@ -20,7 +20,6 @@ #include <net/af_rxrpc.h> #include "ar-internal.h" -static void rxrpc_local_processor(struct work_struct *); static void rxrpc_local_rcu(struct rcu_head *); /* @@ -97,12 +96,9 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet, atomic_set(&local->active_users, 1); local->rxnet = rxnet; INIT_HLIST_NODE(&local->link); - INIT_WORK(&local->processor, rxrpc_local_processor); INIT_LIST_HEAD(&local->ack_tx_queue); spin_lock_init(&local->ack_tx_lock); init_rwsem(&local->defrag_sem); - skb_queue_head_init(&local->reject_queue); - skb_queue_head_init(&local->event_queue); skb_queue_head_init(&local->rx_queue); INIT_LIST_HEAD(&local->call_attend_q); local->client_bundles = RB_ROOT; @@ -319,21 +315,6 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local, } /* - * Queue a local endpoint and pass the caller's reference to the work item. - */ -void rxrpc_queue_local(struct rxrpc_local *local) -{ - unsigned int debug_id = local->debug_id; - int r = refcount_read(&local->ref); - int u = atomic_read(&local->active_users); - - if (rxrpc_queue_work(&local->processor)) - trace_rxrpc_local(debug_id, rxrpc_local_queued, r, u); - else - rxrpc_put_local(local, rxrpc_local_put_already_queued); -} - -/* * Drop a ref on a local endpoint. */ void rxrpc_put_local(struct rxrpc_local *local, enum rxrpc_local_trace why) @@ -374,7 +355,7 @@ struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local, /* * Cease using a local endpoint. Once the number of active users reaches 0, we - * start the closure of the transport in the work processor. + * start the closure of the transport in the I/O thread.. */ void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why) { @@ -416,66 +397,18 @@ void rxrpc_destroy_local(struct rxrpc_local *local) /* At this point, there should be no more packets coming in to the * local endpoint. */ - rxrpc_purge_queue(&local->reject_queue); - rxrpc_purge_queue(&local->event_queue); rxrpc_purge_queue(&local->rx_queue); } /* - * Process events on an endpoint. The work item carries a ref which - * we must release. - */ -static void rxrpc_local_processor(struct work_struct *work) -{ - struct rxrpc_local *local = - container_of(work, struct rxrpc_local, processor); - bool again; - - if (local->dead) - return; - - rxrpc_see_local(local, rxrpc_local_processing); - - do { - again = false; - if (!__rxrpc_use_local(local, rxrpc_local_use_work)) - break; - - if (!list_empty(&local->ack_tx_queue)) { - rxrpc_transmit_ack_packets(local); - again = true; - } - - if (!skb_queue_empty(&local->reject_queue)) { - rxrpc_reject_packets(local); - again = true; - } - - if (!skb_queue_empty(&local->event_queue)) { - rxrpc_process_local_events(local); - again = true; - } - - __rxrpc_unuse_local(local, rxrpc_local_unuse_work); - } while (again); - - rxrpc_put_local(local, rxrpc_local_put_queue); -} - -/* * Destroy a local endpoint after the RCU grace period expires. */ static void rxrpc_local_rcu(struct rcu_head *rcu) { struct rxrpc_local *local = container_of(rcu, struct rxrpc_local, rcu); - _enter("%d", local->debug_id); - - ASSERT(!work_pending(&local->processor)); - rxrpc_see_local(local, rxrpc_local_free); kfree(local); - _leave(""); } /* |