summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2020-03-19 20:28:34 -0700
committerDavid S. Miller <davem@davemloft.net>2020-03-19 20:28:34 -0700
commit3ac9eb4210a95cf9b374f1d48ce99d87d79d2510 (patch)
tree36f2fbbcf4234024220993c2451c1fb25999ecde
parent6002059d7882c3512e6ac52fa82424272ddfcd5c (diff)
parent7d7587db0d7fd1138f2afcffdc46a8e15630b944 (diff)
Merge tag 'rxrpc-fixes-20200319' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs
David Howells says: ==================== rxrpc, afs: Interruptibility fixes Here are a number of fixes for AF_RXRPC and AFS that make AFS system calls less interruptible and so less likely to leave the filesystem in an uncertain state. There's also a miscellaneous patch to make tracing consistent. (1) Firstly, abstract out the Tx space calculation in sendmsg. Much the same code is replicated in a number of places that subsequent patches are going to alter, including adding another copy. (2) Fix Tx interruptibility by allowing a kernel service, such as AFS, to request that a call be interruptible only when waiting for a call slot to become available (ie. the call has not taken place yet) or that a call be not interruptible at all (e.g. when we want to do writeback and don't want a signal interrupting a VM-induced writeback). (3) Increase the minimum delay on MSG_WAITALL for userspace sendmsg() when waiting for Tx buffer space as a 2*RTT delay is really small over 10G ethernet and a 1 jiffy timeout might be essentially 0 if at the end of the jiffy period. (4) Fix some tracing output in AFS to make it consistent with rxrpc. (5) Make sure aborted asynchronous AFS operations are tidied up properly so we don't end up with stuck rxrpc calls. (6) Make AFS client calls uninterruptible in the Rx phase. If we don't wait for the reply to be fully gathered, we can't update the local VFS state and we end up in an indeterminate state with respect to the server. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--fs/afs/cmservice.c14
-rw-r--r--fs/afs/internal.h12
-rw-r--r--fs/afs/rxrpc.c74
-rw-r--r--include/net/af_rxrpc.h12
-rw-r--r--include/trace/events/afs.h2
-rw-r--r--net/rxrpc/af_rxrpc.c37
-rw-r--r--net/rxrpc/ar-internal.h5
-rw-r--r--net/rxrpc/call_object.c3
-rw-r--r--net/rxrpc/conn_client.c13
-rw-r--r--net/rxrpc/input.c1
-rw-r--r--net/rxrpc/sendmsg.c75
11 files changed, 115 insertions, 133 deletions
diff --git a/fs/afs/cmservice.c b/fs/afs/cmservice.c
index ff3994a6be23..6765949b3aab 100644
--- a/fs/afs/cmservice.c
+++ b/fs/afs/cmservice.c
@@ -244,6 +244,17 @@ static void afs_cm_destructor(struct afs_call *call)
}
/*
+ * Abort a service call from within an action function.
+ */
+static void afs_abort_service_call(struct afs_call *call, u32 abort_code, int error,
+ const char *why)
+{
+ rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
+ abort_code, error, why);
+ afs_set_call_complete(call, error, 0);
+}
+
+/*
* The server supplied a list of callbacks that it wanted to break.
*/
static void SRXAFSCB_CallBack(struct work_struct *work)
@@ -510,8 +521,7 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work)
if (memcmp(r, &call->net->uuid, sizeof(call->net->uuid)) == 0)
afs_send_empty_reply(call);
else
- rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
- 1, 1, "K-1");
+ afs_abort_service_call(call, 1, 1, "K-1");
afs_put_call(call);
_leave("");
diff --git a/fs/afs/internal.h b/fs/afs/internal.h
index 1d81fc4c3058..52de2112e1b1 100644
--- a/fs/afs/internal.h
+++ b/fs/afs/internal.h
@@ -154,7 +154,7 @@ struct afs_call {
};
unsigned char unmarshall; /* unmarshalling phase */
unsigned char addr_ix; /* Address in ->alist */
- bool incoming; /* T if incoming call */
+ bool drop_ref; /* T if need to drop ref for incoming call */
bool send_pages; /* T if data from mapping should be sent */
bool need_attention; /* T if RxRPC poked us */
bool async; /* T if asynchronous */
@@ -1209,8 +1209,16 @@ static inline void afs_set_call_complete(struct afs_call *call,
ok = true;
}
spin_unlock_bh(&call->state_lock);
- if (ok)
+ if (ok) {
trace_afs_call_done(call);
+
+ /* Asynchronous calls have two refs to release - one from the alloc and
+ * one queued with the work item - and we can't just deallocate the
+ * call because the work item may be queued again.
+ */
+ if (call->drop_ref)
+ afs_put_call(call);
+ }
}
/*
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
index 58d396592250..1ecc67da6c1a 100644
--- a/fs/afs/rxrpc.c
+++ b/fs/afs/rxrpc.c
@@ -18,7 +18,6 @@ struct workqueue_struct *afs_async_calls;
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
-static void afs_delete_async_call(struct work_struct *);
static void afs_process_async_call(struct work_struct *);
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
@@ -169,7 +168,7 @@ void afs_put_call(struct afs_call *call)
int n = atomic_dec_return(&call->usage);
int o = atomic_read(&net->nr_outstanding_calls);
- trace_afs_call(call, afs_call_trace_put, n + 1, o,
+ trace_afs_call(call, afs_call_trace_put, n, o,
__builtin_return_address(0));
ASSERTCMP(n, >=, 0);
@@ -402,8 +401,10 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
/* If the call is going to be asynchronous, we need an extra ref for
* the call to hold itself so the caller need not hang on to its ref.
*/
- if (call->async)
+ if (call->async) {
afs_get_call(call, afs_call_trace_get);
+ call->drop_ref = true;
+ }
/* create a call */
rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
@@ -413,7 +414,8 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
afs_wake_up_async_call :
afs_wake_up_call_waiter),
call->upgrade,
- call->intr,
+ (call->intr ? RXRPC_PREINTERRUPTIBLE :
+ RXRPC_UNINTERRUPTIBLE),
call->debug_id);
if (IS_ERR(rxcall)) {
ret = PTR_ERR(rxcall);
@@ -584,8 +586,6 @@ static void afs_deliver_to_call(struct afs_call *call)
done:
if (call->type->done)
call->type->done(call);
- if (state == AFS_CALL_COMPLETE && call->incoming)
- afs_put_call(call);
out:
_leave("");
return;
@@ -604,11 +604,7 @@ call_complete:
long afs_wait_for_call_to_complete(struct afs_call *call,
struct afs_addr_cursor *ac)
{
- signed long rtt2, timeout;
long ret;
- bool stalled = false;
- u64 rtt;
- u32 life, last_life;
bool rxrpc_complete = false;
DECLARE_WAITQUEUE(myself, current);
@@ -619,14 +615,6 @@ long afs_wait_for_call_to_complete(struct afs_call *call,
if (ret < 0)
goto out;
- rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
- rtt2 = nsecs_to_jiffies64(rtt) * 2;
- if (rtt2 < 2)
- rtt2 = 2;
-
- timeout = rtt2;
- rxrpc_kernel_check_life(call->net->socket, call->rxcall, &last_life);
-
add_wait_queue(&call->waitq, &myself);
for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE);
@@ -637,37 +625,19 @@ long afs_wait_for_call_to_complete(struct afs_call *call,
call->need_attention = false;
__set_current_state(TASK_RUNNING);
afs_deliver_to_call(call);
- timeout = rtt2;
continue;
}
if (afs_check_call_state(call, AFS_CALL_COMPLETE))
break;
- if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall, &life)) {
+ if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall)) {
/* rxrpc terminated the call. */
rxrpc_complete = true;
break;
}
- if (call->intr && timeout == 0 &&
- life == last_life && signal_pending(current)) {
- if (stalled)
- break;
- __set_current_state(TASK_RUNNING);
- rxrpc_kernel_probe_life(call->net->socket, call->rxcall);
- timeout = rtt2;
- stalled = true;
- continue;
- }
-
- if (life != last_life) {
- timeout = rtt2;
- last_life = life;
- stalled = false;
- }
-
- timeout = schedule_timeout(timeout);
+ schedule();
}
remove_wait_queue(&call->waitq, &myself);
@@ -735,7 +705,7 @@ static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
u = atomic_fetch_add_unless(&call->usage, 1, 0);
if (u != 0) {
- trace_afs_call(call, afs_call_trace_wake, u,
+ trace_afs_call(call, afs_call_trace_wake, u + 1,
atomic_read(&call->net->nr_outstanding_calls),
__builtin_return_address(0));
@@ -745,21 +715,6 @@ static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
}
/*
- * Delete an asynchronous call. The work item carries a ref to the call struct
- * that we need to release.
- */
-static void afs_delete_async_call(struct work_struct *work)
-{
- struct afs_call *call = container_of(work, struct afs_call, async_work);
-
- _enter("");
-
- afs_put_call(call);
-
- _leave("");
-}
-
-/*
* Perform I/O processing on an asynchronous call. The work item carries a ref
* to the call struct that we either need to release or to pass on.
*/
@@ -774,16 +729,6 @@ static void afs_process_async_call(struct work_struct *work)
afs_deliver_to_call(call);
}
- if (call->state == AFS_CALL_COMPLETE) {
- /* We have two refs to release - one from the alloc and one
- * queued with the work item - and we can't just deallocate the
- * call because the work item may be queued again.
- */
- call->async_work.func = afs_delete_async_call;
- if (!queue_work(afs_async_calls, &call->async_work))
- afs_put_call(call);
- }
-
afs_put_call(call);
_leave("");
}
@@ -810,6 +755,7 @@ void afs_charge_preallocation(struct work_struct *work)
if (!call)
break;
+ call->drop_ref = true;
call->async = true;
call->state = AFS_CALL_SV_AWAIT_OP_ID;
init_waitqueue_head(&call->waitq);
diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
index 1abae3c340a5..04e97bab6f28 100644
--- a/include/net/af_rxrpc.h
+++ b/include/net/af_rxrpc.h
@@ -16,6 +16,12 @@ struct sock;
struct socket;
struct rxrpc_call;
+enum rxrpc_interruptibility {
+ RXRPC_INTERRUPTIBLE, /* Call is interruptible */
+ RXRPC_PREINTERRUPTIBLE, /* Call can be cancelled whilst waiting for a slot */
+ RXRPC_UNINTERRUPTIBLE, /* Call should not be interruptible at all */
+};
+
/*
* Debug ID counter for tracing.
*/
@@ -41,7 +47,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
gfp_t,
rxrpc_notify_rx_t,
bool,
- bool,
+ enum rxrpc_interruptibility,
unsigned int);
int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
struct msghdr *, size_t,
@@ -58,9 +64,7 @@ int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
rxrpc_user_attach_call_t, unsigned long, gfp_t,
unsigned int);
void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64);
-bool rxrpc_kernel_check_life(const struct socket *, const struct rxrpc_call *,
- u32 *);
-void rxrpc_kernel_probe_life(struct socket *, struct rxrpc_call *);
+bool rxrpc_kernel_check_life(const struct socket *, const struct rxrpc_call *);
u32 rxrpc_kernel_get_epoch(struct socket *, struct rxrpc_call *);
bool rxrpc_kernel_get_reply_time(struct socket *, struct rxrpc_call *,
ktime_t *);
diff --git a/include/trace/events/afs.h b/include/trace/events/afs.h
index 564ba1b5cf57..c612cabbc378 100644
--- a/include/trace/events/afs.h
+++ b/include/trace/events/afs.h
@@ -233,7 +233,7 @@ enum afs_cb_break_reason {
EM(afs_call_trace_get, "GET ") \
EM(afs_call_trace_put, "PUT ") \
EM(afs_call_trace_wake, "WAKE ") \
- E_(afs_call_trace_work, "WORK ")
+ E_(afs_call_trace_work, "QUEUE")
#define afs_server_traces \
EM(afs_server_trace_alloc, "ALLOC ") \
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index fe42f986cd94..15ee92d79581 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -285,7 +285,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
gfp_t gfp,
rxrpc_notify_rx_t notify_rx,
bool upgrade,
- bool intr,
+ enum rxrpc_interruptibility interruptibility,
unsigned int debug_id)
{
struct rxrpc_conn_parameters cp;
@@ -310,7 +310,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
memset(&p, 0, sizeof(p));
p.user_call_ID = user_call_ID;
p.tx_total_len = tx_total_len;
- p.intr = intr;
+ p.interruptibility = interruptibility;
memset(&cp, 0, sizeof(cp));
cp.local = rx->local;
@@ -371,45 +371,18 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
* rxrpc_kernel_check_life - Check to see whether a call is still alive
* @sock: The socket the call is on
* @call: The call to check
- * @_life: Where to store the life value
*
- * Allow a kernel service to find out whether a call is still alive - ie. we're
- * getting ACKs from the server. Passes back in *_life a number representing
- * the life state which can be compared to that returned by a previous call and
- * return true if the call is still alive.
- *
- * If the life state stalls, rxrpc_kernel_probe_life() should be called and
- * then 2RTT waited.
+ * Allow a kernel service to find out whether a call is still alive -
+ * ie. whether it has completed.
*/
bool rxrpc_kernel_check_life(const struct socket *sock,
- const struct rxrpc_call *call,
- u32 *_life)
+ const struct rxrpc_call *call)
{
- *_life = call->acks_latest;
return call->state != RXRPC_CALL_COMPLETE;
}
EXPORT_SYMBOL(rxrpc_kernel_check_life);
/**
- * rxrpc_kernel_probe_life - Poke the peer to see if it's still alive
- * @sock: The socket the call is on
- * @call: The call to check
- *
- * In conjunction with rxrpc_kernel_check_life(), allow a kernel service to
- * find out whether a call is still alive by pinging it. This should cause the
- * life state to be bumped in about 2*RTT.
- *
- * The must be called in TASK_RUNNING state on pain of might_sleep() objecting.
- */
-void rxrpc_kernel_probe_life(struct socket *sock, struct rxrpc_call *call)
-{
- rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
- rxrpc_propose_ack_ping_for_check_life);
- rxrpc_send_ack_packet(call, true, NULL);
-}
-EXPORT_SYMBOL(rxrpc_kernel_probe_life);
-
-/**
* rxrpc_kernel_get_epoch - Retrieve the epoch value from a call.
* @sock: The socket the call is on
* @call: The call to query
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 7d730c438404..3eb1ab40ca5c 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -489,7 +489,6 @@ enum rxrpc_call_flag {
RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */
RXRPC_CALL_RX_HEARD, /* The peer responded at least once to this call */
RXRPC_CALL_RX_UNDERRUN, /* Got data underrun */
- RXRPC_CALL_IS_INTR, /* The call is interruptible */
RXRPC_CALL_DISCONNECTED, /* The call has been disconnected */
};
@@ -598,6 +597,7 @@ struct rxrpc_call {
atomic_t usage;
u16 service_id; /* service ID */
u8 security_ix; /* Security type */
+ enum rxrpc_interruptibility interruptibility; /* At what point call may be interrupted */
u32 call_id; /* call ID on connection */
u32 cid; /* connection ID plus channel index */
int debug_id; /* debug ID for printks */
@@ -675,7 +675,6 @@ struct rxrpc_call {
/* transmission-phase ACK management */
ktime_t acks_latest_ts; /* Timestamp of latest ACK received */
- rxrpc_serial_t acks_latest; /* serial number of latest ACK received */
rxrpc_seq_t acks_lowest_nak; /* Lowest NACK in the buffer (or ==tx_hard_ack) */
rxrpc_seq_t acks_lost_top; /* tx_top at the time lost-ack ping sent */
rxrpc_serial_t acks_lost_ping; /* Serial number of probe ACK */
@@ -721,7 +720,7 @@ struct rxrpc_call_params {
u32 normal; /* Max time since last call packet (msec) */
} timeouts;
u8 nr_timeouts; /* Number of timeouts specified */
- bool intr; /* The call is interruptible */
+ enum rxrpc_interruptibility interruptibility; /* How is interruptible is the call? */
};
struct rxrpc_send_params {
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index c9f34b0a11df..f07970207b54 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -237,8 +237,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
return call;
}
- if (p->intr)
- __set_bit(RXRPC_CALL_IS_INTR, &call->flags);
+ call->interruptibility = p->interruptibility;
call->tx_total_len = p->tx_total_len;
trace_rxrpc_call(call->debug_id, rxrpc_call_new_client,
atomic_read(&call->usage),
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index ea7d4c21f889..f2a1a5dbb5a7 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -655,13 +655,20 @@ static int rxrpc_wait_for_channel(struct rxrpc_call *call, gfp_t gfp)
add_wait_queue_exclusive(&call->waitq, &myself);
for (;;) {
- if (test_bit(RXRPC_CALL_IS_INTR, &call->flags))
+ switch (call->interruptibility) {
+ case RXRPC_INTERRUPTIBLE:
+ case RXRPC_PREINTERRUPTIBLE:
set_current_state(TASK_INTERRUPTIBLE);
- else
+ break;
+ case RXRPC_UNINTERRUPTIBLE:
+ default:
set_current_state(TASK_UNINTERRUPTIBLE);
+ break;
+ }
if (call->call_id)
break;
- if (test_bit(RXRPC_CALL_IS_INTR, &call->flags) &&
+ if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
+ call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
signal_pending(current)) {
ret = -ERESTARTSYS;
break;
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index ef10fbf71b15..69e09d69c896 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -882,7 +882,6 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
before(prev_pkt, call->ackr_prev_seq))
goto out;
call->acks_latest_ts = skb->tstamp;
- call->acks_latest = sp->hdr.serial;
call->ackr_first_seq = first_soft_ack;
call->ackr_prev_seq = prev_pkt;
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 813fd6888142..0fcf157aa09f 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -18,6 +18,21 @@
#include "ar-internal.h"
/*
+ * Return true if there's sufficient Tx queue space.
+ */
+static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
+{
+ unsigned int win_size =
+ min_t(unsigned int, call->tx_winsize,
+ call->cong_cwnd + call->cong_extra);
+ rxrpc_seq_t tx_win = READ_ONCE(call->tx_hard_ack);
+
+ if (_tx_win)
+ *_tx_win = tx_win;
+ return call->tx_top - tx_win < win_size;
+}
+
+/*
* Wait for space to appear in the Tx queue or a signal to occur.
*/
static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
@@ -26,9 +41,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
{
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
- if (call->tx_top - call->tx_hard_ack <
- min_t(unsigned int, call->tx_winsize,
- call->cong_cwnd + call->cong_extra))
+ if (rxrpc_check_tx_space(call, NULL))
return 0;
if (call->state >= RXRPC_CALL_COMPLETE)
@@ -49,7 +62,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
* Wait for space to appear in the Tx queue uninterruptibly, but with
* a timeout of 2*RTT if no progress was made and a signal occurred.
*/
-static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
+static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
struct rxrpc_call *call)
{
rxrpc_seq_t tx_start, tx_win;
@@ -58,8 +71,8 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
rtt = READ_ONCE(call->peer->rtt);
rtt2 = nsecs_to_jiffies64(rtt) * 2;
- if (rtt2 < 1)
- rtt2 = 1;
+ if (rtt2 < 2)
+ rtt2 = 2;
timeout = rtt2;
tx_start = READ_ONCE(call->tx_hard_ack);
@@ -68,16 +81,13 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
set_current_state(TASK_UNINTERRUPTIBLE);
tx_win = READ_ONCE(call->tx_hard_ack);
- if (call->tx_top - tx_win <
- min_t(unsigned int, call->tx_winsize,
- call->cong_cwnd + call->cong_extra))
+ if (rxrpc_check_tx_space(call, &tx_win))
return 0;
if (call->state >= RXRPC_CALL_COMPLETE)
return call->error;
- if (test_bit(RXRPC_CALL_IS_INTR, &call->flags) &&
- timeout == 0 &&
+ if (timeout == 0 &&
tx_win == tx_start && signal_pending(current))
return -EINTR;
@@ -92,6 +102,26 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
}
/*
+ * Wait for space to appear in the Tx queue uninterruptibly.
+ */
+static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
+ struct rxrpc_call *call,
+ long *timeo)
+{
+ for (;;) {
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ if (rxrpc_check_tx_space(call, NULL))
+ return 0;
+
+ if (call->state >= RXRPC_CALL_COMPLETE)
+ return call->error;
+
+ trace_rxrpc_transmit(call, rxrpc_transmit_wait);
+ *timeo = schedule_timeout(*timeo);
+ }
+}
+
+/*
* wait for space to appear in the transmit/ACK window
* - caller holds the socket locked
*/
@@ -108,10 +138,19 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
add_wait_queue(&call->waitq, &myself);
- if (waitall)
- ret = rxrpc_wait_for_tx_window_nonintr(rx, call);
- else
- ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
+ switch (call->interruptibility) {
+ case RXRPC_INTERRUPTIBLE:
+ if (waitall)
+ ret = rxrpc_wait_for_tx_window_waitall(rx, call);
+ else
+ ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
+ break;
+ case RXRPC_PREINTERRUPTIBLE:
+ case RXRPC_UNINTERRUPTIBLE:
+ default:
+ ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
+ break;
+ }
remove_wait_queue(&call->waitq, &myself);
set_current_state(TASK_RUNNING);
@@ -302,9 +341,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
_debug("alloc");
- if (call->tx_top - call->tx_hard_ack >=
- min_t(unsigned int, call->tx_winsize,
- call->cong_cwnd + call->cong_extra)) {
+ if (!rxrpc_check_tx_space(call, NULL)) {
ret = -EAGAIN;
if (msg->msg_flags & MSG_DONTWAIT)
goto maybe_error;
@@ -619,7 +656,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
.call.tx_total_len = -1,
.call.user_call_ID = 0,
.call.nr_timeouts = 0,
- .call.intr = true,
+ .call.interruptibility = RXRPC_INTERRUPTIBLE,
.abort_code = 0,
.command = RXRPC_CMD_SEND_DATA,
.exclusive = false,