summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c207
1 files changed, 115 insertions, 92 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 0f39e08ee580..78af7518f263 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -58,6 +58,7 @@
#include "sunrpc.h"
static void xs_close(struct rpc_xprt *xprt);
+static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock);
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
struct socket *sock);
@@ -427,9 +428,9 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
offset += want;
}
- want = xs_alloc_sparse_pages(buf,
- min_t(size_t, count - offset, buf->page_len),
- GFP_KERNEL);
+ want = xs_alloc_sparse_pages(
+ buf, min_t(size_t, count - offset, buf->page_len),
+ GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN);
if (seek < want) {
ret = xs_read_bvec(sock, msg, flags, buf->bvec,
xdr_buf_pagecount(buf),
@@ -763,12 +764,12 @@ xs_stream_start_connect(struct sock_xprt *transport)
/**
* xs_nospace - handle transmit was incomplete
* @req: pointer to RPC request
+ * @transport: pointer to struct sock_xprt
*
*/
-static int xs_nospace(struct rpc_rqst *req)
+static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport)
{
- struct rpc_xprt *xprt = req->rq_xprt;
- struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+ struct rpc_xprt *xprt = &transport->xprt;
struct sock *sk = transport->inet;
int ret = -EAGAIN;
@@ -780,24 +781,44 @@ static int xs_nospace(struct rpc_rqst *req)
/* Don't race with disconnect */
if (xprt_connected(xprt)) {
/* wait for more buffer space */
+ set_bit(XPRT_SOCK_NOSPACE, &transport->sock_state);
+ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk->sk_write_pending++;
xprt_wait_for_buffer_space(xprt);
} else
ret = -ENOTCONN;
spin_unlock(&xprt->transport_lock);
+ return ret;
+}
- /* Race breaker in case memory is freed before above code is called */
- if (ret == -EAGAIN) {
- struct socket_wq *wq;
+static int xs_sock_nospace(struct rpc_rqst *req)
+{
+ struct sock_xprt *transport =
+ container_of(req->rq_xprt, struct sock_xprt, xprt);
+ struct sock *sk = transport->inet;
+ int ret = -EAGAIN;
- rcu_read_lock();
- wq = rcu_dereference(sk->sk_wq);
- set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags);
- rcu_read_unlock();
+ lock_sock(sk);
+ if (!sock_writeable(sk))
+ ret = xs_nospace(req, transport);
+ release_sock(sk);
+ return ret;
+}
- sk->sk_write_space(sk);
- }
+static int xs_stream_nospace(struct rpc_rqst *req, bool vm_wait)
+{
+ struct sock_xprt *transport =
+ container_of(req->rq_xprt, struct sock_xprt, xprt);
+ struct sock *sk = transport->inet;
+ int ret = -EAGAIN;
+
+ if (vm_wait)
+ return -ENOBUFS;
+ lock_sock(sk);
+ if (!sk_stream_memory_free(sk))
+ ret = xs_nospace(req, transport);
+ release_sock(sk);
return ret;
}
@@ -805,7 +826,8 @@ static void
xs_stream_prepare_request(struct rpc_rqst *req)
{
xdr_free_bvec(&req->rq_rcv_buf);
- req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL);
+ req->rq_task->tk_status = xdr_alloc_bvec(
+ &req->rq_rcv_buf, GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN);
}
/*
@@ -851,6 +873,7 @@ static int xs_local_send_request(struct rpc_rqst *req)
struct msghdr msg = {
.msg_flags = XS_SENDMSG_FLAGS,
};
+ bool vm_wait;
unsigned int sent;
int status;
@@ -863,15 +886,14 @@ static int xs_local_send_request(struct rpc_rqst *req)
xs_pktdump("packet data:",
req->rq_svec->iov_base, req->rq_svec->iov_len);
+ vm_wait = sk_stream_is_writeable(transport->inet) ? true : false;
+
req->rq_xtime = ktime_get();
status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
transport->xmit.offset, rm, &sent);
dprintk("RPC: %s(%u) = %d\n",
__func__, xdr->len - transport->xmit.offset, status);
- if (status == -EAGAIN && sock_writeable(transport->inet))
- status = -ENOBUFS;
-
if (likely(sent > 0) || status == 0) {
transport->xmit.offset += sent;
req->rq_bytes_sent = transport->xmit.offset;
@@ -881,13 +903,12 @@ static int xs_local_send_request(struct rpc_rqst *req)
return 0;
}
status = -EAGAIN;
+ vm_wait = false;
}
switch (status) {
- case -ENOBUFS:
- break;
case -EAGAIN:
- status = xs_nospace(req);
+ status = xs_stream_nospace(req, vm_wait);
break;
default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
@@ -963,7 +984,7 @@ process_status:
/* Should we call xs_close() here? */
break;
case -EAGAIN:
- status = xs_nospace(req);
+ status = xs_sock_nospace(req);
break;
case -ENETUNREACH:
case -ENOBUFS:
@@ -1005,7 +1026,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
struct msghdr msg = {
.msg_flags = XS_SENDMSG_FLAGS,
};
- bool vm_wait = false;
+ bool vm_wait;
unsigned int sent;
int status;
@@ -1025,12 +1046,17 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
xs_tcp_set_socket_timeouts(xprt, transport->sock);
+ xs_set_srcport(transport, transport->sock);
+
/* Continue transmitting the packet/record. We must be careful
* to cope with writespace callbacks arriving _after_ we have
* called sendmsg(). */
req->rq_xtime = ktime_get();
tcp_sock_set_cork(transport->inet, true);
- while (1) {
+
+ vm_wait = sk_stream_is_writeable(transport->inet) ? true : false;
+
+ do {
status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
transport->xmit.offset, rm, &sent);
@@ -1051,31 +1077,10 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
WARN_ON_ONCE(sent == 0 && status == 0);
- if (status == -EAGAIN ) {
- /*
- * Return EAGAIN if we're sure we're hitting the
- * socket send buffer limits.
- */
- if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
- break;
- /*
- * Did we hit a memory allocation failure?
- */
- if (sent == 0) {
- status = -ENOBUFS;
- if (vm_wait)
- break;
- /* Retry, knowing now that we're below the
- * socket send buffer limit
- */
- vm_wait = true;
- }
- continue;
- }
- if (status < 0)
- break;
- vm_wait = false;
- }
+ if (sent > 0)
+ vm_wait = false;
+
+ } while (status == 0);
switch (status) {
case -ENOTSOCK:
@@ -1083,7 +1088,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
/* Should we call xs_close() here? */
break;
case -EAGAIN:
- status = xs_nospace(req);
+ status = xs_stream_nospace(req, vm_wait);
break;
case -ECONNRESET:
case -ECONNREFUSED:
@@ -1124,6 +1129,7 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
+ clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state);
}
static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
@@ -1470,7 +1476,6 @@ static void xs_tcp_state_change(struct sock *sk)
static void xs_write_space(struct sock *sk)
{
- struct socket_wq *wq;
struct sock_xprt *transport;
struct rpc_xprt *xprt;
@@ -1481,15 +1486,10 @@ static void xs_write_space(struct sock *sk)
if (unlikely(!(xprt = xprt_from_sock(sk))))
return;
transport = container_of(xprt, struct sock_xprt, xprt);
- rcu_read_lock();
- wq = rcu_dereference(sk->sk_wq);
- if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
- goto out;
-
+ if (!test_and_clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state))
+ return;
xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
sk->sk_write_pending--;
-out:
- rcu_read_unlock();
}
/**
@@ -1638,7 +1638,7 @@ static int xs_get_srcport(struct sock_xprt *transport)
return port;
}
-unsigned short get_srcport(struct rpc_xprt *xprt)
+static unsigned short xs_sock_srcport(struct rpc_xprt *xprt)
{
struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt);
unsigned short ret = 0;
@@ -1648,7 +1648,25 @@ unsigned short get_srcport(struct rpc_xprt *xprt)
mutex_unlock(&sock->recv_mutex);
return ret;
}
-EXPORT_SYMBOL(get_srcport);
+
+static int xs_sock_srcaddr(struct rpc_xprt *xprt, char *buf, size_t buflen)
+{
+ struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt);
+ union {
+ struct sockaddr sa;
+ struct sockaddr_storage st;
+ } saddr;
+ int ret = -ENOTCONN;
+
+ mutex_lock(&sock->recv_mutex);
+ if (sock->sock) {
+ ret = kernel_getsockname(sock->sock, &saddr.sa);
+ if (ret >= 0)
+ ret = snprintf(buf, buflen, "%pISc", &saddr.sa);
+ }
+ mutex_unlock(&sock->recv_mutex);
+ return ret;
+}
static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
{
@@ -1830,7 +1848,6 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
sk->sk_user_data = xprt;
sk->sk_data_ready = xs_data_ready;
sk->sk_write_space = xs_udp_write_space;
- sock_set_flag(sk, SOCK_FASYNC);
sk->sk_error_report = xs_error_report;
xprt_clear_connected(xprt);
@@ -1936,9 +1953,9 @@ static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task)
#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
/*
- * Note that this should be called with XPRT_LOCKED held (or when we otherwise
- * know that we have exclusive access to the socket), to guard against
- * races with xs_reset_transport.
+ * Note that this should be called with XPRT_LOCKED held, or recv_mutex
+ * held, or when we otherwise know that we have exclusive access to the
+ * socket, to guard against races with xs_reset_transport.
*/
static void xs_set_memalloc(struct rpc_xprt *xprt)
{
@@ -1967,13 +1984,11 @@ xs_enable_swap(struct rpc_xprt *xprt)
{
struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
- if (atomic_inc_return(&xprt->swapper) != 1)
- return 0;
- if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE))
- return -ERESTARTSYS;
- if (xs->inet)
+ mutex_lock(&xs->recv_mutex);
+ if (atomic_inc_return(&xprt->swapper) == 1 &&
+ xs->inet)
sk_set_memalloc(xs->inet);
- xprt_release_xprt(xprt, NULL);
+ mutex_unlock(&xs->recv_mutex);
return 0;
}
@@ -1989,13 +2004,11 @@ xs_disable_swap(struct rpc_xprt *xprt)
{
struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
- if (!atomic_dec_and_test(&xprt->swapper))
- return;
- if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE))
- return;
- if (xs->inet)
+ mutex_lock(&xs->recv_mutex);
+ if (atomic_dec_and_test(&xprt->swapper) &&
+ xs->inet)
sk_clear_memalloc(xs->inet);
- xprt_release_xprt(xprt, NULL);
+ mutex_unlock(&xs->recv_mutex);
}
#else
static void xs_set_memalloc(struct rpc_xprt *xprt)
@@ -2028,7 +2041,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
sk->sk_user_data = xprt;
sk->sk_data_ready = xs_data_ready;
sk->sk_write_space = xs_udp_write_space;
- sock_set_flag(sk, SOCK_FASYNC);
xprt_set_connected(xprt);
@@ -2052,7 +2064,10 @@ static void xs_udp_setup_socket(struct work_struct *work)
struct rpc_xprt *xprt = &transport->xprt;
struct socket *sock;
int status = -EIO;
+ unsigned int pflags = current->flags;
+ if (atomic_read(&xprt->swapper))
+ current->flags |= PF_MEMALLOC;
sock = xs_create_sock(xprt, transport,
xs_addr(xprt)->sa_family, SOCK_DGRAM,
IPPROTO_UDP, false);
@@ -2072,6 +2087,7 @@ out:
xprt_clear_connecting(xprt);
xprt_unlock_connect(xprt, transport);
xprt_wake_pending_tasks(xprt, status);
+ current_restore_flags(pflags, PF_MEMALLOC);
}
/**
@@ -2191,7 +2207,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
sk->sk_data_ready = xs_data_ready;
sk->sk_state_change = xs_tcp_state_change;
sk->sk_write_space = xs_tcp_write_space;
- sock_set_flag(sk, SOCK_FASYNC);
sk->sk_error_report = xs_error_report;
/* socket options */
@@ -2231,11 +2246,19 @@ static void xs_tcp_setup_socket(struct work_struct *work)
struct socket *sock = transport->sock;
struct rpc_xprt *xprt = &transport->xprt;
int status;
+ unsigned int pflags = current->flags;
- if (!sock) {
- sock = xs_create_sock(xprt, transport,
- xs_addr(xprt)->sa_family, SOCK_STREAM,
- IPPROTO_TCP, true);
+ if (atomic_read(&xprt->swapper))
+ current->flags |= PF_MEMALLOC;
+
+ if (xprt_connected(xprt))
+ goto out;
+ if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT,
+ &transport->sock_state) ||
+ !sock) {
+ xs_reset_transport(transport);
+ sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family,
+ SOCK_STREAM, IPPROTO_TCP, true);
if (IS_ERR(sock)) {
xprt_wake_pending_tasks(xprt, PTR_ERR(sock));
goto out;
@@ -2255,10 +2278,9 @@ static void xs_tcp_setup_socket(struct work_struct *work)
sock->sk->sk_state);
switch (status) {
case 0:
- xs_set_srcport(transport, sock);
- fallthrough;
case -EINPROGRESS:
/* SYN_SENT! */
+ set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state);
if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
fallthrough;
@@ -2296,6 +2318,7 @@ out:
xprt_clear_connecting(xprt);
out_unlock:
xprt_unlock_connect(xprt, transport);
+ current_restore_flags(pflags, PF_MEMALLOC);
}
/**
@@ -2319,13 +2342,9 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport));
- if (transport->sock != NULL && !xprt_connecting(xprt)) {
+ if (transport->sock != NULL) {
dprintk("RPC: xs_connect delayed xprt %p for %lu "
- "seconds\n",
- xprt, xprt->reestablish_timeout / HZ);
-
- /* Start by resetting any existing state */
- xs_reset_transport(transport);
+ "seconds\n", xprt, xprt->reestablish_timeout / HZ);
delay = xprt_reconnect_delay(xprt);
xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
@@ -2487,7 +2506,7 @@ static int bc_malloc(struct rpc_task *task)
return -EINVAL;
}
- page = alloc_page(GFP_KERNEL);
+ page = alloc_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN);
if (!page)
return -ENOMEM;
@@ -2621,6 +2640,8 @@ static const struct rpc_xprt_ops xs_udp_ops = {
.rpcbind = rpcb_getport_async,
.set_port = xs_set_port,
.connect = xs_connect,
+ .get_srcaddr = xs_sock_srcaddr,
+ .get_srcport = xs_sock_srcport,
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.send_request = xs_udp_send_request,
@@ -2643,6 +2664,8 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
.rpcbind = rpcb_getport_async,
.set_port = xs_set_port,
.connect = xs_connect,
+ .get_srcaddr = xs_sock_srcaddr,
+ .get_srcport = xs_sock_srcport,
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.prepare_request = xs_stream_prepare_request,