summaryrefslogtreecommitdiff
path: root/net/sunrpc/svcsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/svcsock.c')
-rw-r--r--net/sunrpc/svcsock.c131
1 files changed, 59 insertions, 72 deletions
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 8c9a8ee76aa0..998687421fa6 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -36,6 +36,8 @@
#include <linux/skbuff.h>
#include <linux/file.h>
#include <linux/freezer.h>
+#include <linux/bvec.h>
+
#include <net/sock.h>
#include <net/checksum.h>
#include <net/ip.h>
@@ -695,9 +697,10 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
.msg_name = &rqstp->rq_addr,
.msg_namelen = rqstp->rq_addrlen,
.msg_control = cmh,
+ .msg_flags = MSG_SPLICE_PAGES,
.msg_controllen = sizeof(buffer),
};
- unsigned int sent;
+ unsigned int count;
int err;
svc_udp_release_ctxt(xprt, rqstp->rq_xprt_ctxt);
@@ -710,22 +713,23 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
if (svc_xprt_is_dead(xprt))
goto out_notconn;
- err = xdr_alloc_bvec(xdr, GFP_KERNEL);
- if (err < 0)
- goto out_unlock;
+ count = xdr_buf_to_bvec(rqstp->rq_bvec,
+ ARRAY_SIZE(rqstp->rq_bvec), xdr);
- err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+ count, 0);
+ err = sock_sendmsg(svsk->sk_sock, &msg);
if (err == -ECONNREFUSED) {
/* ICMP error on earlier request. */
- err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+ count, 0);
+ err = sock_sendmsg(svsk->sk_sock, &msg);
}
- xdr_free_bvec(xdr);
+
trace_svcsock_udp_send(xprt, err);
-out_unlock:
+
mutex_unlock(&xprt->xpt_mutex);
- if (err < 0)
- return err;
- return sent;
+ return err;
out_notconn:
mutex_unlock(&xprt->xpt_mutex);
@@ -1089,6 +1093,9 @@ static void svc_tcp_fragment_received(struct svc_sock *svsk)
/* If we have more data, signal svc_xprt_enqueue() to try again */
svsk->sk_tcplen = 0;
svsk->sk_marker = xdr_zero;
+
+ smp_wmb();
+ tcp_set_rcvlowat(svsk->sk_sk, 1);
}
/**
@@ -1178,10 +1185,17 @@ err_incomplete:
goto err_delete;
if (len == want)
svc_tcp_fragment_received(svsk);
- else
+ else {
+ /* Avoid more ->sk_data_ready() calls until the rest
+ * of the message has arrived. This reduces service
+ * thread wake-ups on large incoming messages. */
+ tcp_set_rcvlowat(svsk->sk_sk,
+ svc_sock_reclen(svsk) - svsk->sk_tcplen);
+
trace_svcsock_tcp_recv_short(&svsk->sk_xprt,
svc_sock_reclen(svsk),
svsk->sk_tcplen - sizeof(rpc_fraghdr));
+ }
goto err_noclose;
error:
if (len != -EAGAIN)
@@ -1198,75 +1212,51 @@ err_noclose:
return 0; /* record not complete */
}
-static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
- int flags)
-{
- struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
-
- iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
- return sock_sendmsg(sock, &msg);
-}
-
/*
* MSG_SPLICE_PAGES is used exclusively to reduce the number of
* copy operations in this path. Therefore the caller must ensure
* that the pages backing @xdr are unchanging.
*
- * In addition, the logic assumes that * .bv_len is never larger
- * than PAGE_SIZE.
+ * Note that the send is non-blocking. The caller has incremented
+ * the reference count on each page backing the RPC message, and
+ * the network layer will "put" these pages when transmission is
+ * complete.
+ *
+ * This is safe for our RPC services because the memory backing
+ * the head and tail components is never kmalloc'd. These always
+ * come from pages in the svc_rqst::rq_pages array.
*/
-static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
+static int svc_tcp_sendmsg(struct svc_sock *svsk, struct svc_rqst *rqstp,
rpc_fraghdr marker, unsigned int *sentp)
{
- const struct kvec *head = xdr->head;
- const struct kvec *tail = xdr->tail;
- struct kvec rm = {
- .iov_base = &marker,
- .iov_len = sizeof(marker),
- };
struct msghdr msg = {
- .msg_flags = 0,
+ .msg_flags = MSG_SPLICE_PAGES,
};
+ unsigned int count;
+ void *buf;
int ret;
*sentp = 0;
- ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
- if (ret < 0)
- return ret;
-
- ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
- if (ret < 0)
- return ret;
- *sentp += ret;
- if (ret != rm.iov_len)
- return -EAGAIN;
-
- ret = svc_tcp_send_kvec(sock, head, 0);
- if (ret < 0)
- return ret;
- *sentp += ret;
- if (ret != head->iov_len)
- goto out;
- if (xdr_buf_pagecount(xdr))
- xdr->bvec[0].bv_offset = offset_in_page(xdr->page_base);
-
- msg.msg_flags = MSG_SPLICE_PAGES;
- iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
- xdr_buf_pagecount(xdr), xdr->page_len);
- ret = sock_sendmsg(sock, &msg);
+ /* The stream record marker is copied into a temporary page
+ * fragment buffer so that it can be included in rq_bvec.
+ */
+ buf = page_frag_alloc(&svsk->sk_frag_cache, sizeof(marker),
+ GFP_KERNEL);
+ if (!buf)
+ return -ENOMEM;
+ memcpy(buf, &marker, sizeof(marker));
+ bvec_set_virt(rqstp->rq_bvec, buf, sizeof(marker));
+
+ count = xdr_buf_to_bvec(rqstp->rq_bvec + 1,
+ ARRAY_SIZE(rqstp->rq_bvec) - 1, &rqstp->rq_res);
+
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+ 1 + count, sizeof(marker) + rqstp->rq_res.len);
+ ret = sock_sendmsg(svsk->sk_sock, &msg);
if (ret < 0)
return ret;
*sentp += ret;
-
- if (tail->iov_len) {
- ret = svc_tcp_send_kvec(sock, tail, 0);
- if (ret < 0)
- return ret;
- *sentp += ret;
- }
-
-out:
return 0;
}
@@ -1292,23 +1282,17 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
svc_tcp_release_ctxt(xprt, rqstp->rq_xprt_ctxt);
rqstp->rq_xprt_ctxt = NULL;
- atomic_inc(&svsk->sk_sendqlen);
mutex_lock(&xprt->xpt_mutex);
if (svc_xprt_is_dead(xprt))
goto out_notconn;
- tcp_sock_set_cork(svsk->sk_sk, true);
- err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
- xdr_free_bvec(xdr);
+ err = svc_tcp_sendmsg(svsk, rqstp, marker, &sent);
trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
if (err < 0 || sent != (xdr->len + sizeof(marker)))
goto out_close;
- if (atomic_dec_and_test(&svsk->sk_sendqlen))
- tcp_sock_set_cork(svsk->sk_sk, false);
mutex_unlock(&xprt->xpt_mutex);
return sent;
out_notconn:
- atomic_dec(&svsk->sk_sendqlen);
mutex_unlock(&xprt->xpt_mutex);
return -ENOTCONN;
out_close:
@@ -1317,7 +1301,6 @@ out_close:
(err < 0) ? "got error" : "sent",
(err < 0) ? err : sent, xdr->len);
svc_xprt_deferred_close(xprt);
- atomic_dec(&svsk->sk_sendqlen);
mutex_unlock(&xprt->xpt_mutex);
return -EAGAIN;
}
@@ -1644,6 +1627,7 @@ static void svc_tcp_sock_detach(struct svc_xprt *xprt)
static void svc_sock_free(struct svc_xprt *xprt)
{
struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+ struct page_frag_cache *pfc = &svsk->sk_frag_cache;
struct socket *sock = svsk->sk_sock;
trace_svcsock_free(svsk, sock);
@@ -1653,5 +1637,8 @@ static void svc_sock_free(struct svc_xprt *xprt)
sockfd_put(sock);
else
sock_release(sock);
+ if (pfc->va)
+ __page_frag_cache_drain(virt_to_head_page(pfc->va),
+ pfc->pagecnt_bias);
kfree(svsk);
}