diff options
Diffstat (limited to 'net/sunrpc/xprtsock.c')
| -rw-r--r-- | net/sunrpc/xprtsock.c | 1606 |
1 files changed, 1074 insertions, 532 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 7754aa3e434f..2e1fe6013361 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -47,15 +47,23 @@ #include <net/checksum.h> #include <net/udp.h> #include <net/tcp.h> +#include <net/tls_prot.h> +#include <net/handshake.h> + #include <linux/bvec.h> #include <linux/highmem.h> #include <linux/uio.h> +#include <linux/sched/mm.h> +#include <trace/events/sock.h> #include <trace/events/sunrpc.h> +#include "socklib.h" #include "sunrpc.h" static void xs_close(struct rpc_xprt *xprt); +static void xs_reset_srcport(struct sock_xprt *transport); +static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock); static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, struct socket *sock); @@ -74,7 +82,7 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; /* * We can register our own files under /proc/sys/sunrpc by - * calling register_sysctl_table() again. The files in that + * calling register_sysctl() again. The files in that * directory become the union of all files registered there. * * We simply need to make sure that we don't collide with @@ -89,6 +97,12 @@ static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; static struct ctl_table_header *sunrpc_table_header; +static struct xprt_class xs_local_transport; +static struct xprt_class xs_udp_transport; +static struct xprt_class xs_tcp_transport; +static struct xprt_class xs_tcp_tls_transport; +static struct xprt_class xs_bc_tcp_transport; + /* * FIXME: changing the UDP slot table size should also resize the UDP * socket buffers for existing UDP transports @@ -146,16 +160,6 @@ static struct ctl_table xs_tunables_table[] = { .mode = 0644, .proc_handler = proc_dointvec_jiffies, }, - { }, -}; - -static struct ctl_table sunrpc_table[] = { - { - .procname = "sunrpc", - .mode = 0555, - .child = xs_tunables_table - }, - { }, }; /* @@ -187,6 +191,11 @@ static struct ctl_table sunrpc_table[] = { */ #define XS_IDLE_DISC_TO (5U * 60 * HZ) +/* + * TLS handshake timeout. + */ +#define XS_TLS_HANDSHAKE_TO (10U * HZ) + #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # undef RPC_DEBUG_DATA # define RPCDBG_FACILITY RPCDBG_TRANS @@ -253,7 +262,12 @@ static void xs_format_common_peer_addresses(struct rpc_xprt *xprt) switch (sap->sa_family) { case AF_LOCAL: sun = xs_addr_un(xprt); - strlcpy(buf, sun->sun_path, sizeof(buf)); + if (sun->sun_path[0]) { + strscpy(buf, sun->sun_path, sizeof(buf)); + } else { + buf[0] = '@'; + strscpy(buf+1, sun->sun_path+1, sizeof(buf)-1); + } xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL); break; @@ -342,6 +356,66 @@ xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp) return want; } +static int +xs_sock_process_cmsg(struct socket *sock, struct msghdr *msg, + unsigned int *msg_flags, struct cmsghdr *cmsg, int ret) +{ + u8 content_type = tls_get_record_type(sock->sk, cmsg); + u8 level, description; + + switch (content_type) { + case 0: + break; + case TLS_RECORD_TYPE_DATA: + /* TLS sets EOR at the end of each application data + * record, even though there might be more frames + * waiting to be decrypted. + */ + *msg_flags &= ~MSG_EOR; + break; + case TLS_RECORD_TYPE_ALERT: + tls_alert_recv(sock->sk, msg, &level, &description); + ret = (level == TLS_ALERT_LEVEL_FATAL) ? + -EACCES : -EAGAIN; + break; + default: + /* discard this record type */ + ret = -EAGAIN; + } + return ret; +} + +static int +xs_sock_recv_cmsg(struct socket *sock, unsigned int *msg_flags, int flags) +{ + union { + struct cmsghdr cmsg; + u8 buf[CMSG_SPACE(sizeof(u8))]; + } u; + u8 alert[2]; + struct kvec alert_kvec = { + .iov_base = alert, + .iov_len = sizeof(alert), + }; + struct msghdr msg = { + .msg_flags = *msg_flags, + .msg_control = &u, + .msg_controllen = sizeof(u), + }; + int ret; + + iov_iter_kvec(&msg.msg_iter, ITER_DEST, &alert_kvec, 1, + alert_kvec.iov_len); + ret = sock_recvmsg(sock, &msg, flags); + if (ret > 0) { + if (tls_get_record_type(sock->sk, &u.cmsg) == TLS_RECORD_TYPE_ALERT) + iov_iter_revert(&msg.msg_iter, ret); + ret = xs_sock_process_cmsg(sock, &msg, msg_flags, &u.cmsg, + -EAGAIN); + } + return ret; +} + static ssize_t xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek) { @@ -349,6 +423,12 @@ xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek) if (seek != 0) iov_iter_advance(&msg->msg_iter, seek); ret = sock_recvmsg(sock, msg, flags); + /* Handle TLS inband control message lazily */ + if (msg->msg_flags & MSG_CTRUNC) { + msg->msg_flags &= ~(MSG_CTRUNC | MSG_EOR); + if (ret == 0 || ret == -EIO) + ret = xs_sock_recv_cmsg(sock, &msg->msg_flags, flags); + } return ret > 0 ? ret + seek : ret; } @@ -356,7 +436,7 @@ static ssize_t xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags, struct kvec *kvec, size_t count, size_t seek) { - iov_iter_kvec(&msg->msg_iter, READ, kvec, 1, count); + iov_iter_kvec(&msg->msg_iter, ITER_DEST, kvec, 1, count); return xs_sock_recvmsg(sock, msg, flags, seek); } @@ -365,7 +445,7 @@ xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags, struct bio_vec *bvec, unsigned long nr, size_t count, size_t seek) { - iov_iter_bvec(&msg->msg_iter, READ, bvec, nr, count); + iov_iter_bvec(&msg->msg_iter, ITER_DEST, bvec, nr, count); return xs_sock_recvmsg(sock, msg, flags, seek); } @@ -373,8 +453,8 @@ static ssize_t xs_read_discard(struct socket *sock, struct msghdr *msg, int flags, size_t count) { - iov_iter_discard(&msg->msg_iter, READ, count); - return sock_recvmsg(sock, msg, flags); + iov_iter_discard(&msg->msg_iter, ITER_DEST, count); + return xs_sock_recvmsg(sock, msg, flags, 0); } #if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE @@ -404,8 +484,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, size_t want, seek_init = seek, offset = 0; ssize_t ret; - if (seek < buf->head[0].iov_len) { - want = min_t(size_t, count, buf->head[0].iov_len); + want = min_t(size_t, count, buf->head[0].iov_len); + if (seek < want) { ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek); if (ret <= 0) goto sock_err; @@ -416,13 +496,13 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, goto out; seek = 0; } else { - seek -= buf->head[0].iov_len; - offset += buf->head[0].iov_len; + seek -= want; + offset += want; } - want = xs_alloc_sparse_pages(buf, - min_t(size_t, count - offset, buf->page_len), - GFP_NOWAIT); + want = xs_alloc_sparse_pages( + buf, min_t(size_t, count - offset, buf->page_len), + GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN); if (seek < want) { ret = xs_read_bvec(sock, msg, flags, buf->bvec, xdr_buf_pagecount(buf), @@ -431,7 +511,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, if (ret <= 0) goto sock_err; xs_flush_bvec(buf->bvec, ret, seek + buf->page_base); - offset += ret - buf->page_base; + ret -= buf->page_base; + offset += ret; if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) goto out; if (ret != want) @@ -442,8 +523,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, offset += want; } - if (seek < buf->tail[0].iov_len) { - want = min_t(size_t, count - offset, buf->tail[0].iov_len); + want = min_t(size_t, count - offset, buf->tail[0].iov_len); + if (seek < want) { ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek); if (ret <= 0) goto sock_err; @@ -452,8 +533,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, goto out; if (ret != want) goto out; - } else - offset += buf->tail[0].iov_len; + } else if (offset < seek_init) + offset = seek_init; ret = -EMSGSIZE; out: *read = offset - seek_init; @@ -481,6 +562,14 @@ xs_read_stream_request_done(struct sock_xprt *transport) return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); } +static void +xs_read_stream_check_eor(struct sock_xprt *transport, + struct msghdr *msg) +{ + if (xs_read_stream_request_done(transport)) + msg->msg_flags |= MSG_EOR; +} + static ssize_t xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, int flags, struct rpc_rqst *req) @@ -492,17 +581,21 @@ xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, xs_read_header(transport, buf); want = transport->recv.len - transport->recv.offset; - ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, - transport->recv.copied + want, transport->recv.copied, - &read); - transport->recv.offset += read; - transport->recv.copied += read; - if (transport->recv.offset == transport->recv.len) { - if (xs_read_stream_request_done(transport)) - msg->msg_flags |= MSG_EOR; - return read; + if (want != 0) { + ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, + transport->recv.copied + want, + transport->recv.copied, + &read); + transport->recv.offset += read; + transport->recv.copied += read; } + if (transport->recv.offset == transport->recv.len) + xs_read_stream_check_eor(transport, msg); + + if (want == 0) + return 0; + switch (ret) { default: break; @@ -543,16 +636,24 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) struct rpc_rqst *req; ssize_t ret; + /* Is this transport associated with the backchannel? */ + if (!xprt->bc_serv) + return -ESHUTDOWN; + /* Look up and lock the request corresponding to the given XID */ req = xprt_lookup_bc_request(xprt, transport->recv.xid); if (!req) { printk(KERN_WARNING "Callback slot table overflowed\n"); return -ESHUTDOWN; } + if (transport->recv.copied && !req->rq_private_buf.len) + return -ESHUTDOWN; ret = xs_read_stream_request(transport, msg, flags, req); if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) xprt_complete_bc_request(req, transport->recv.copied); + else + req->rq_private_buf.len = transport->recv.copied; return ret; } @@ -574,7 +675,7 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) /* Look up and lock the request corresponding to the given XID */ spin_lock(&xprt->queue_lock); req = xprt_lookup_rqst(xprt, transport->recv.xid); - if (!req) { + if (!req || (transport->recv.copied && !req->rq_private_buf.len)) { msg->msg_flags |= MSG_TRUNC; goto out; } @@ -586,6 +687,8 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) spin_lock(&xprt->queue_lock); if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) xprt_complete_rqst(req->rq_task, transport->recv.copied); + else + req->rq_private_buf.len = transport->recv.copied; xprt_unpin_rqst(req); out: spin_unlock(&xprt->queue_lock); @@ -655,13 +758,37 @@ out_err: return ret != 0 ? ret : -ESHUTDOWN; } +static __poll_t xs_poll_socket(struct sock_xprt *transport) +{ + return transport->sock->ops->poll(transport->file, transport->sock, + NULL); +} + +static bool xs_poll_socket_readable(struct sock_xprt *transport) +{ + __poll_t events = xs_poll_socket(transport); + + return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP); +} + +static void xs_poll_check_readable(struct sock_xprt *transport) +{ + + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) + return; + if (!xs_poll_socket_readable(transport)) + return; + if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + queue_work(xprtiod_workqueue, &transport->recv_worker); +} + static void xs_stream_data_receive(struct sock_xprt *transport) { size_t read = 0; ssize_t ret = 0; mutex_lock(&transport->recv_mutex); - clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); if (transport->sock == NULL) goto out; for (;;) { @@ -671,6 +798,12 @@ static void xs_stream_data_receive(struct sock_xprt *transport) read += ret; cond_resched(); } + if (ret == -ESHUTDOWN) + kernel_sock_shutdown(transport->sock, SHUT_RDWR); + else if (ret == -EACCES) + xprt_wake_pending_tasks(&transport->xprt, -EACCES); + else + xs_poll_check_readable(transport); out: mutex_unlock(&transport->recv_mutex); trace_xs_stream_read_data(&transport->xprt, ret, read); @@ -680,7 +813,10 @@ static void xs_stream_data_receive_workfn(struct work_struct *work) { struct sock_xprt *transport = container_of(work, struct sock_xprt, recv_worker); + unsigned int pflags = memalloc_nofs_save(); + xs_stream_data_receive(transport); + memalloc_nofs_restore(pflags); } static void @@ -690,173 +826,92 @@ xs_stream_reset_connect(struct sock_xprt *transport) transport->recv.len = 0; transport->recv.copied = 0; transport->xmit.offset = 0; - transport->xprt.stat.connect_count++; - transport->xprt.stat.connect_start = jiffies; -} - -#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) - -static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) -{ - struct msghdr msg = { - .msg_name = addr, - .msg_namelen = addrlen, - .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0), - }; - struct kvec iov = { - .iov_base = vec->iov_base + base, - .iov_len = vec->iov_len - base, - }; - - if (iov.iov_len != 0) - return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); - return kernel_sendmsg(sock, &msg, NULL, 0, 0); } -static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p) +static void +xs_stream_start_connect(struct sock_xprt *transport) { - ssize_t (*do_sendpage)(struct socket *sock, struct page *page, - int offset, size_t size, int flags); - struct page **ppage; - unsigned int remainder; - int err; - - remainder = xdr->page_len - base; - base += xdr->page_base; - ppage = xdr->pages + (base >> PAGE_SHIFT); - base &= ~PAGE_MASK; - do_sendpage = sock->ops->sendpage; - if (!zerocopy) - do_sendpage = sock_no_sendpage; - for(;;) { - unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder); - int flags = XS_SENDMSG_FLAGS; - - remainder -= len; - if (more) - flags |= MSG_MORE; - if (remainder != 0) - flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE; - err = do_sendpage(sock, *ppage, base, len, flags); - if (remainder == 0 || err != len) - break; - *sent_p += err; - ppage++; - base = 0; - } - if (err > 0) { - *sent_p += err; - err = 0; - } - return err; + transport->xprt.stat.connect_count++; + transport->xprt.stat.connect_start = jiffies; } -/** - * xs_sendpages - write pages directly to a socket - * @sock: socket to send on - * @addr: UDP only -- address of destination - * @addrlen: UDP only -- length of destination address - * @xdr: buffer containing this request - * @base: starting position in the buffer - * @zerocopy: true if it is safe to use sendpage() - * @sent_p: return the total number of bytes successfully queued for sending - * - */ -static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p) -{ - unsigned int remainder = xdr->len - base; - int err = 0; - int sent = 0; - - if (unlikely(!sock)) - return -ENOTSOCK; - - if (base != 0) { - addr = NULL; - addrlen = 0; - } - - if (base < xdr->head[0].iov_len || addr != NULL) { - unsigned int len = xdr->head[0].iov_len - base; - remainder -= len; - err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0); - if (remainder == 0 || err != len) - goto out; - *sent_p += err; - base = 0; - } else - base -= xdr->head[0].iov_len; - - if (base < xdr->page_len) { - unsigned int len = xdr->page_len - base; - remainder -= len; - err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent); - *sent_p += sent; - if (remainder == 0 || sent != len) - goto out; - base = 0; - } else - base -= xdr->page_len; - - if (base >= xdr->tail[0].iov_len) - return 0; - err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); -out: - if (err > 0) { - *sent_p += err; - err = 0; - } - return err; -} +#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) /** * xs_nospace - handle transmit was incomplete * @req: pointer to RPC request + * @transport: pointer to struct sock_xprt * */ -static int xs_nospace(struct rpc_rqst *req) +static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport) { - struct rpc_xprt *xprt = req->rq_xprt; - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct rpc_xprt *xprt = &transport->xprt; struct sock *sk = transport->inet; int ret = -EAGAIN; - dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", - req->rq_task->tk_pid, - req->rq_slen - transport->xmit.offset, - req->rq_slen); + trace_rpc_socket_nospace(req, transport); /* Protect against races with write_space */ - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); /* Don't race with disconnect */ if (xprt_connected(xprt)) { /* wait for more buffer space */ + set_bit(XPRT_SOCK_NOSPACE, &transport->sock_state); + set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); sk->sk_write_pending++; xprt_wait_for_buffer_space(xprt); } else ret = -ENOTCONN; - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); + return ret; +} - /* Race breaker in case memory is freed before above code is called */ - if (ret == -EAGAIN) { - struct socket_wq *wq; +static int xs_sock_nospace(struct rpc_rqst *req) +{ + struct sock_xprt *transport = + container_of(req->rq_xprt, struct sock_xprt, xprt); + struct sock *sk = transport->inet; + int ret = -EAGAIN; - rcu_read_lock(); - wq = rcu_dereference(sk->sk_wq); - set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags); - rcu_read_unlock(); + lock_sock(sk); + if (!sock_writeable(sk)) + ret = xs_nospace(req, transport); + release_sock(sk); + return ret; +} - sk->sk_write_space(sk); - } +static int xs_stream_nospace(struct rpc_rqst *req, bool vm_wait) +{ + struct sock_xprt *transport = + container_of(req->rq_xprt, struct sock_xprt, xprt); + struct sock *sk = transport->inet; + int ret = -EAGAIN; + + if (vm_wait) + return -ENOBUFS; + lock_sock(sk); + if (!sk_stream_memory_free(sk)) + ret = xs_nospace(req, transport); + release_sock(sk); return ret; } -static void -xs_stream_prepare_request(struct rpc_rqst *req) +static int xs_stream_prepare_request(struct rpc_rqst *req, struct xdr_buf *buf) { - req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO); + return xdr_alloc_bvec(buf, rpc_task_gfp_mask()); +} + +static void xs_stream_abort_send_request(struct rpc_rqst *req) +{ + struct rpc_xprt *xprt = req->rq_xprt; + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + + if (transport->xmit.offset != 0 && + !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) + xprt_force_disconnect(xprt); } /* @@ -870,13 +925,14 @@ xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req) } /* - * Construct a stream transport record marker in @buf. + * Return the stream record marker field for a record of length < 2^31-1 */ -static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) +static rpc_fraghdr +xs_stream_record_marker(struct xdr_buf *xdr) { - u32 reclen = buf->len - sizeof(rpc_fraghdr); - rpc_fraghdr *base = buf->head[0].iov_base; - *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen); + if (!xdr->len) + return 0; + return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len); } /** @@ -888,7 +944,7 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) * EAGAIN: The socket was blocked, please call again later to * complete the request * ENOTCONN: Caller needs to invoke connect logic then call again - * other: Some other error occured, the request was not sent + * other: Some other error occurred, the request was not sent */ static int xs_local_send_request(struct rpc_rqst *req) { @@ -896,54 +952,54 @@ static int xs_local_send_request(struct rpc_rqst *req) struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; + rpc_fraghdr rm = xs_stream_record_marker(xdr); + unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; + struct msghdr msg = { + .msg_flags = XS_SENDMSG_FLAGS, + }; + bool vm_wait; + unsigned int sent; int status; - int sent = 0; /* Close the stream if the previous transmission was incomplete */ if (xs_send_request_was_aborted(transport, req)) { - xs_close(xprt); + xprt_force_disconnect(xprt); return -ENOTCONN; } - xs_encode_stream_record_marker(&req->rq_snd_buf); - xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); + vm_wait = sk_stream_is_writeable(transport->inet) ? true : false; + req->rq_xtime = ktime_get(); - status = xs_sendpages(transport->sock, NULL, 0, xdr, - transport->xmit.offset, - true, &sent); + status = xprt_sock_sendmsg(transport->sock, &msg, xdr, + transport->xmit.offset, rm, &sent); dprintk("RPC: %s(%u) = %d\n", __func__, xdr->len - transport->xmit.offset, status); - if (status == -EAGAIN && sock_writeable(transport->inet)) - status = -ENOBUFS; - if (likely(sent > 0) || status == 0) { transport->xmit.offset += sent; req->rq_bytes_sent = transport->xmit.offset; - if (likely(req->rq_bytes_sent >= req->rq_slen)) { + if (likely(req->rq_bytes_sent >= msglen)) { req->rq_xmit_bytes_sent += transport->xmit.offset; - req->rq_bytes_sent = 0; transport->xmit.offset = 0; return 0; } status = -EAGAIN; + vm_wait = false; } switch (status) { - case -ENOBUFS: - break; case -EAGAIN: - status = xs_nospace(req); + status = xs_stream_nospace(req, vm_wait); break; default: dprintk("RPC: sendmsg returned unrecognized error %d\n", -status); - /* fall through */ + fallthrough; case -EPIPE: - xs_close(xprt); + xprt_force_disconnect(xprt); status = -ENOTCONN; } @@ -966,7 +1022,12 @@ static int xs_udp_send_request(struct rpc_rqst *req) struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; - int sent = 0; + struct msghdr msg = { + .msg_name = xs_addr(xprt), + .msg_namelen = xprt->addrlen, + .msg_flags = XS_SENDMSG_FLAGS, + }; + unsigned int sent; int status; xs_pktdump("packet data:", @@ -979,9 +1040,11 @@ static int xs_udp_send_request(struct rpc_rqst *req) if (!xprt_request_get_cong(xprt, req)) return -EBADSLT; + status = xdr_alloc_bvec(xdr, rpc_task_gfp_mask()); + if (status < 0) + return status; req->rq_xtime = ktime_get(); - status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, - xdr, 0, true, &sent); + status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent); dprintk("RPC: xs_udp_send_request(%u) = %d\n", xdr->len, status); @@ -1008,7 +1071,7 @@ process_status: /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(req); + status = xs_sock_nospace(req); break; case -ENETUNREACH: case -ENOBUFS: @@ -1045,10 +1108,14 @@ static int xs_tcp_send_request(struct rpc_rqst *req) struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; - bool zerocopy = true; - bool vm_wait = false; + rpc_fraghdr rm = xs_stream_record_marker(xdr); + unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; + struct msghdr msg = { + .msg_flags = XS_SENDMSG_FLAGS, + }; + bool vm_wait; + unsigned int sent; int status; - int sent; /* Close the stream if the previous transmission was incomplete */ if (xs_send_request_was_aborted(transport, req)) { @@ -1056,31 +1123,29 @@ static int xs_tcp_send_request(struct rpc_rqst *req) kernel_sock_shutdown(transport->sock, SHUT_RDWR); return -ENOTCONN; } - - xs_encode_stream_record_marker(&req->rq_snd_buf); + if (!transport->inet) + return -ENOTCONN; xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); - /* Don't use zero copy if this is a resend. If the RPC call - * completes while the socket holds a reference to the pages, - * then we may end up resending corrupted data. - */ - if (req->rq_task->tk_flags & RPC_TASK_SENT) - zerocopy = false; if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state)) xs_tcp_set_socket_timeouts(xprt, transport->sock); + xs_set_srcport(transport, transport->sock); + /* Continue transmitting the packet/record. We must be careful * to cope with writespace callbacks arriving _after_ we have * called sendmsg(). */ req->rq_xtime = ktime_get(); - while (1) { - sent = 0; - status = xs_sendpages(transport->sock, NULL, 0, xdr, - transport->xmit.offset, - zerocopy, &sent); + tcp_sock_set_cork(transport->inet, true); + + vm_wait = sk_stream_is_writeable(transport->inet) ? true : false; + + do { + status = xprt_sock_sendmsg(transport->sock, &msg, xdr, + transport->xmit.offset, rm, &sent); dprintk("RPC: xs_tcp_send_request(%u) = %d\n", xdr->len - transport->xmit.offset, status); @@ -1089,40 +1154,20 @@ static int xs_tcp_send_request(struct rpc_rqst *req) * reset the count of bytes sent. */ transport->xmit.offset += sent; req->rq_bytes_sent = transport->xmit.offset; - if (likely(req->rq_bytes_sent >= req->rq_slen)) { + if (likely(req->rq_bytes_sent >= msglen)) { req->rq_xmit_bytes_sent += transport->xmit.offset; - req->rq_bytes_sent = 0; transport->xmit.offset = 0; + if (atomic_long_read(&xprt->xmit_queuelen) == 1) + tcp_sock_set_cork(transport->inet, false); return 0; } WARN_ON_ONCE(sent == 0 && status == 0); - if (status == -EAGAIN ) { - /* - * Return EAGAIN if we're sure we're hitting the - * socket send buffer limits. - */ - if (test_bit(SOCK_NOSPACE, &transport->sock->flags)) - break; - /* - * Did we hit a memory allocation failure? - */ - if (sent == 0) { - status = -ENOBUFS; - if (vm_wait) - break; - /* Retry, knowing now that we're below the - * socket send buffer limit - */ - vm_wait = true; - } - continue; - } - if (status < 0) - break; - vm_wait = false; - } + if (sent > 0) + vm_wait = false; + + } while (status == 0); switch (status) { case -ENOTSOCK: @@ -1130,7 +1175,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req) /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(req); + status = xs_stream_nospace(req, vm_wait); break; case -ECONNRESET: case -ECONNREFUSED: @@ -1167,11 +1212,24 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + transport->xprt_err = 0; clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state); + clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state); + clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); +} + +static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr) +{ + set_bit(nr, &transport->sock_state); + queue_work(xprtiod_workqueue, &transport->error_worker); } static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) { + xprt->connect_cookie++; smp_mb__before_atomic(); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state); @@ -1188,22 +1246,23 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) */ static void xs_error_report(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; - int err; - read_lock_bh(&sk->sk_callback_lock); if (!(xprt = xprt_from_sock(sk))) - goto out; + return; - err = -sk->sk_err; - if (err == 0) - goto out; + transport = container_of(xprt, struct sock_xprt, xprt); + transport->xprt_err = -sk->sk_err; + if (transport->xprt_err == 0) + return; dprintk("RPC: xs_error_report client %p, error=%d...\n", - xprt, -err); - trace_rpc_socket_error(xprt, sk->sk_socket, err); - xprt_wake_pending_tasks(xprt, err); - out: - read_unlock_bh(&sk->sk_callback_lock); + xprt, -transport->xprt_err); + trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err); + + /* barrier ensures xprt_err is set before XPRT_SOCK_WAKE_ERROR */ + smp_mb__before_atomic(); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR); } static void xs_reset_transport(struct sock_xprt *transport) @@ -1211,30 +1270,47 @@ static void xs_reset_transport(struct sock_xprt *transport) struct socket *sock = transport->sock; struct sock *sk = transport->inet; struct rpc_xprt *xprt = &transport->xprt; + struct file *filp = transport->file; if (sk == NULL) return; + /* + * Make sure we're calling this in a context from which it is safe + * to call __fput_sync(). In practice that means rpciod and the + * system workqueue. + */ + if (!(current->flags & PF_WQ_WORKER)) { + WARN_ON_ONCE(1); + set_bit(XPRT_CLOSE_WAIT, &xprt->state); + return; + } if (atomic_read(&transport->xprt.swapper)) sk_clear_memalloc(sk); + tls_handshake_cancel(sk); + kernel_sock_shutdown(sock, SHUT_RDWR); mutex_lock(&transport->recv_mutex); - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); transport->inet = NULL; transport->sock = NULL; + transport->file = NULL; sk->sk_user_data = NULL; + sk->sk_sndtimeo = 0; xs_restore_old_callbacks(transport, sk); xprt_clear_connected(xprt); - write_unlock_bh(&sk->sk_callback_lock); xs_sock_reset_connection_flags(xprt); + /* Reset stream record info */ + xs_stream_reset_connect(transport); + release_sock(sk); mutex_unlock(&transport->recv_mutex); trace_rpc_socket_close(xprt, sock); - sock_release(sock); + __fput_sync(filp); xprt_disconnect_done(xprt); } @@ -1255,6 +1331,8 @@ static void xs_close(struct rpc_xprt *xprt) dprintk("RPC: xs_close xprt %p\n", xprt); + if (transport->sock) + tls_handshake_close(transport->sock); xs_reset_transport(transport); xprt->reestablish_timeout = 0; } @@ -1286,6 +1364,7 @@ static void xs_destroy(struct rpc_xprt *xprt) cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); cancel_work_sync(&transport->recv_worker); + cancel_work_sync(&transport->error_worker); xs_xprt_free(xprt); module_put(THIS_MODULE); } @@ -1339,9 +1418,9 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, } - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, copied); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); spin_lock(&xprt->queue_lock); xprt_complete_rqst(task, copied); __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); @@ -1358,18 +1437,18 @@ static void xs_udp_data_receive(struct sock_xprt *transport) int err; mutex_lock(&transport->recv_mutex); - clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); sk = transport->inet; if (sk == NULL) goto out; for (;;) { - skb = skb_recv_udp(sk, 0, 1, &err); + skb = skb_recv_udp(sk, MSG_DONTWAIT, &err); if (skb == NULL) break; xs_udp_data_read_skb(&transport->xprt, sk, skb); consume_skb(skb); cond_resched(); } + xs_poll_check_readable(transport); out: mutex_unlock(&transport->recv_mutex); } @@ -1378,11 +1457,14 @@ static void xs_udp_data_receive_workfn(struct work_struct *work) { struct sock_xprt *transport = container_of(work, struct sock_xprt, recv_worker); + unsigned int pflags = memalloc_nofs_save(); + xs_udp_data_receive(transport); + memalloc_nofs_restore(pflags); } /** - * xs_data_ready - "data ready" callback for UDP sockets + * xs_data_ready - "data ready" callback for sockets * @sk: socket with data to read * */ @@ -1390,13 +1472,20 @@ static void xs_data_ready(struct sock *sk) { struct rpc_xprt *xprt; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: xs_data_ready...\n"); + trace_sk_data_ready(sk); + xprt = xprt_from_sock(sk); if (xprt != NULL) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + + trace_xs_data_ready(xprt); + transport->old_data_ready(sk); + + if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) + return; + /* Any data means we had a useful conversation, so * then we don't need to delay the next reconnect */ @@ -1405,7 +1494,6 @@ static void xs_data_ready(struct sock *sk) if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) queue_work(xprtiod_workqueue, &transport->recv_worker); } - read_unlock_bh(&sk->sk_callback_lock); } /* @@ -1425,6 +1513,26 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) #endif /* CONFIG_SUNRPC_BACKCHANNEL */ /** + * xs_local_state_change - callback to handle AF_LOCAL socket state changes + * @sk: socket whose state has changed + * + */ +static void xs_local_state_change(struct sock *sk) +{ + struct rpc_xprt *xprt; + struct sock_xprt *transport; + + if (!(xprt = xprt_from_sock(sk))) + return; + transport = container_of(xprt, struct sock_xprt, xprt); + if (sk->sk_shutdown & SHUTDOWN_MASK) { + clear_bit(XPRT_CONNECTED, &xprt->state); + /* Trigger the socket release */ + xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); + } +} + +/** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed * @@ -1434,9 +1542,8 @@ static void xs_tcp_state_change(struct sock *sk) struct rpc_xprt *xprt; struct sock_xprt *transport; - read_lock_bh(&sk->sk_callback_lock); if (!(xprt = xprt_from_sock(sk))) - goto out; + return; dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); dprintk("RPC: state %x conn %d dead %d zapped %d sk_shutdown %d\n", sk->sk_state, xprt_connected(xprt), @@ -1448,7 +1555,6 @@ static void xs_tcp_state_change(struct sock *sk) trace_rpc_socket_state_change(xprt, sk->sk_socket); switch (sk->sk_state) { case TCP_ESTABLISHED: - spin_lock(&xprt->transport_lock); if (!xprt_test_and_set_connected(xprt)) { xprt->connect_cookie++; clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); @@ -1457,9 +1563,8 @@ static void xs_tcp_state_change(struct sock *sk) xprt->stat.connect_count++; xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; - xprt_wake_pending_tasks(xprt, -EAGAIN); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING); } - spin_unlock(&xprt->transport_lock); break; case TCP_FIN_WAIT1: /* The client initiated a shutdown of the socket */ @@ -1475,8 +1580,8 @@ static void xs_tcp_state_change(struct sock *sk) /* The server initiated a shutdown of the socket */ xprt->connect_cookie++; clear_bit(XPRT_CONNECTED, &xprt->state); - xs_tcp_force_close(xprt); - /* fall through */ + xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); + fallthrough; case TCP_CLOSING: /* * If the server closed down the connection, make sure that @@ -1493,19 +1598,19 @@ static void xs_tcp_state_change(struct sock *sk) break; case TCP_CLOSE: if (test_and_clear_bit(XPRT_SOCK_CONNECTING, - &transport->sock_state)) + &transport->sock_state)) { + xs_reset_srcport(transport); xprt_clear_connecting(xprt); + } clear_bit(XPRT_CLOSING, &xprt->state); /* Trigger the socket release */ - xs_tcp_force_close(xprt); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); } - out: - read_unlock_bh(&sk->sk_callback_lock); } static void xs_write_space(struct sock *sk) { - struct socket_wq *wq; + struct sock_xprt *transport; struct rpc_xprt *xprt; if (!sk->sk_socket) @@ -1514,15 +1619,11 @@ static void xs_write_space(struct sock *sk) if (unlikely(!(xprt = xprt_from_sock(sk)))) return; - rcu_read_lock(); - wq = rcu_dereference(sk->sk_wq); - if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0) - goto out; - - if (xprt_write_space(xprt)) - sk->sk_write_pending--; -out: - rcu_read_unlock(); + transport = container_of(xprt, struct sock_xprt, xprt); + if (!test_and_clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state)) + return; + xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE); + sk->sk_write_pending--; } /** @@ -1537,13 +1638,9 @@ out: */ static void xs_udp_write_space(struct sock *sk) { - read_lock_bh(&sk->sk_callback_lock); - /* from net/core/sock.c:sock_def_write_space */ if (sock_writeable(sk)) xs_write_space(sk); - - read_unlock_bh(&sk->sk_callback_lock); } /** @@ -1558,13 +1655,9 @@ static void xs_udp_write_space(struct sock *sk) */ static void xs_tcp_write_space(struct sock *sk) { - read_lock_bh(&sk->sk_callback_lock); - /* from net/core/stream.c:sk_stream_write_space */ if (sk_stream_is_writeable(sk)) xs_write_space(sk); - - read_unlock_bh(&sk->sk_callback_lock); } static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) @@ -1614,9 +1707,9 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t */ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) { - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } static int xs_get_random_port(void) @@ -1628,25 +1721,10 @@ static int xs_get_random_port(void) if (max < min) return -EADDRINUSE; range = max - min + 1; - rand = (unsigned short) prandom_u32() % range; + rand = get_random_u32_below(range); return rand + min; } -/** - * xs_set_reuseaddr_port - set the socket's port and address reuse options - * @sock: socket - * - * Note that this function has to be called on all sockets that share the - * same port, and it must be called before binding. - */ -static void xs_sock_set_reuseport(struct socket *sock) -{ - int opt = 1; - - kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, - (char *)&opt, sizeof(opt)); -} - static unsigned short xs_sock_getport(struct socket *sock) { struct sockaddr_storage buf; @@ -1679,9 +1757,14 @@ static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) xs_update_peer_port(xprt); } +static void xs_reset_srcport(struct sock_xprt *transport) +{ + transport->srcport = 0; +} + static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) { - if (transport->srcport == 0) + if (transport->srcport == 0 && transport->xprt.reuseport) transport->srcport = xs_sock_getport(sock); } @@ -1694,6 +1777,36 @@ static int xs_get_srcport(struct sock_xprt *transport) return port; } +static unsigned short xs_sock_srcport(struct rpc_xprt *xprt) +{ + struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt); + unsigned short ret = 0; + mutex_lock(&sock->recv_mutex); + if (sock->sock) + ret = xs_sock_getport(sock->sock); + mutex_unlock(&sock->recv_mutex); + return ret; +} + +static int xs_sock_srcaddr(struct rpc_xprt *xprt, char *buf, size_t buflen) +{ + struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt); + union { + struct sockaddr sa; + struct sockaddr_storage st; + } saddr; + int ret = -ENOTCONN; + + mutex_lock(&sock->recv_mutex); + if (sock->sock) { + ret = kernel_getsockname(sock->sock, &saddr.sa); + if (ret >= 0) + ret = snprintf(buf, buflen, "%pISc", &saddr.sa); + } + mutex_unlock(&sock->recv_mutex); + return ret; +} + static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port) { if (transport->srcport != 0) @@ -1720,7 +1833,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) * This ensures that we can continue to establish TCP * connections even when all local ephemeral ports are already * a part of some TCP connection. This makes no difference - * for UDP sockets, but also doens't harm them. + * for UDP sockets, but also doesn't harm them. * * If we're asking for any reserved port (i.e. port == 0 && * transport->xprt.resvport == 1) xs_get_srcport above will @@ -1732,10 +1845,11 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); do { rpc_set_port((struct sockaddr *)&myaddr, port); - err = kernel_bind(sock, (struct sockaddr *)&myaddr, - transport->xprt.addrlen); + err = kernel_bind(sock, (struct sockaddr_unsized *)&myaddr, + transport->xprt.addrlen); if (err == 0) { - transport->srcport = port; + if (transport->xprt.reuseport) + transport->srcport = port; break; } last = port; @@ -1768,15 +1882,15 @@ static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) } #ifdef CONFIG_DEBUG_LOCK_ALLOC -static struct lock_class_key xs_key[2]; -static struct lock_class_key xs_slock_key[2]; +static struct lock_class_key xs_key[3]; +static struct lock_class_key xs_slock_key[3]; static inline void xs_reclassify_socketu(struct socket *sock) { struct sock *sk = sock->sk; sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC", - &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]); + &xs_slock_key[0], "sk_lock-AF_LOCAL-RPC", &xs_key[0]); } static inline void xs_reclassify_socket4(struct socket *sock) @@ -1784,7 +1898,7 @@ static inline void xs_reclassify_socket4(struct socket *sock) struct sock *sk = sock->sk; sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC", - &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]); + &xs_slock_key[1], "sk_lock-AF_INET-RPC", &xs_key[1]); } static inline void xs_reclassify_socket6(struct socket *sock) @@ -1792,7 +1906,7 @@ static inline void xs_reclassify_socket6(struct socket *sock) struct sock *sk = sock->sk; sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC", - &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]); + &xs_slock_key[2], "sk_lock-AF_INET6-RPC", &xs_key[2]); } static inline void xs_reclassify_socket(int family, struct socket *sock) @@ -1826,6 +1940,7 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt, struct sock_xprt *transport, int family, int type, int protocol, bool reuseport) { + struct file *filp; struct socket *sock; int err; @@ -1838,7 +1953,7 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt, xs_reclassify_socket(family, sock); if (reuseport) - xs_sock_set_reuseport(sock); + sock_set_reuseport(sock->sk); err = xs_bind(transport, sock); if (err) { @@ -1846,6 +1961,14 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt, goto out; } + if (protocol == IPPROTO_TCP) + sk_net_refcnt_upgrade(sock->sk); + + filp = sock_alloc_file(sock, O_NONBLOCK, NULL); + if (IS_ERR(filp)) + return ERR_CAST(filp); + transport->file = filp; + return sock; out: return ERR_PTR(err); @@ -1860,16 +1983,16 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, if (!transport->inet) { struct sock *sk = sock->sk; - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; - sock_set_flag(sk, SOCK_FASYNC); + sk->sk_state_change = xs_local_state_change; sk->sk_error_report = xs_error_report; - sk->sk_allocation = GFP_NOIO; + sk->sk_use_task_frag = false; xprt_clear_connected(xprt); @@ -1877,12 +2000,12 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, transport->sock = sock; transport->inet = sk; - write_unlock_bh(&sk->sk_callback_lock); + release_sock(sk); } - xs_stream_reset_connect(transport); + xs_stream_start_connect(transport); - return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); + return kernel_connect(sock, (struct sockaddr_unsized *)xs_addr(xprt), xprt->addrlen, 0); } /** @@ -1892,8 +2015,9 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, static int xs_local_setup_socket(struct sock_xprt *transport) { struct rpc_xprt *xprt = &transport->xprt; + struct file *filp; struct socket *sock; - int status = -EIO; + int status; status = __sock_create(xprt->xprt_net, AF_LOCAL, SOCK_STREAM, 0, &sock, 1); @@ -1904,6 +2028,13 @@ static int xs_local_setup_socket(struct sock_xprt *transport) } xs_reclassify_socket(AF_LOCAL, sock); + filp = sock_alloc_file(sock, O_NONBLOCK, NULL); + if (IS_ERR(filp)) { + status = PTR_ERR(filp); + goto out; + } + transport->file = filp; + dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); @@ -1917,6 +2048,7 @@ static int xs_local_setup_socket(struct sock_xprt *transport) xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; xprt_set_connected(xprt); + break; case -ENOBUFS: break; case -ENOENT: @@ -1944,7 +2076,10 @@ static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); int ret; - if (RPC_IS_ASYNC(task)) { + if (transport->file) + goto force_disconnect; + + if (RPC_IS_ASYNC(task)) { /* * We want the AF_LOCAL connect to be resolved in the * filesystem namespace of the process making the rpc @@ -1954,19 +2089,25 @@ static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) * we'll need to figure out how to pass a namespace to * connect. */ - rpc_exit(task, -ENOTCONN); - return; + rpc_task_set_rpc_status(task, -ENOTCONN); + goto out_wake; } ret = xs_local_setup_socket(transport); if (ret && !RPC_IS_SOFTCONN(task)) msleep_interruptible(15000); + return; +force_disconnect: + xprt_force_disconnect(xprt); +out_wake: + xprt_clear_connecting(xprt); + xprt_wake_pending_tasks(xprt, -ENOTCONN); } #if IS_ENABLED(CONFIG_SUNRPC_SWAP) /* - * Note that this should be called with XPRT_LOCKED held (or when we otherwise - * know that we have exclusive access to the socket), to guard against - * races with xs_reset_transport. + * Note that this should be called with XPRT_LOCKED held, or recv_mutex + * held, or when we otherwise know that we have exclusive access to the + * socket, to guard against races with xs_reset_transport. */ static void xs_set_memalloc(struct rpc_xprt *xprt) { @@ -1995,13 +2136,11 @@ xs_enable_swap(struct rpc_xprt *xprt) { struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); - if (atomic_inc_return(&xprt->swapper) != 1) - return 0; - if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) - return -ERESTARTSYS; - if (xs->inet) + mutex_lock(&xs->recv_mutex); + if (atomic_inc_return(&xprt->swapper) == 1 && + xs->inet) sk_set_memalloc(xs->inet); - xprt_release_xprt(xprt, NULL); + mutex_unlock(&xs->recv_mutex); return 0; } @@ -2017,13 +2156,11 @@ xs_disable_swap(struct rpc_xprt *xprt) { struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt); - if (!atomic_dec_and_test(&xprt->swapper)) - return; - if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) - return; - if (xs->inet) + mutex_lock(&xs->recv_mutex); + if (atomic_dec_and_test(&xprt->swapper) && + xs->inet) sk_clear_memalloc(xs->inet); - xprt_release_xprt(xprt, NULL); + mutex_unlock(&xs->recv_mutex); } #else static void xs_set_memalloc(struct rpc_xprt *xprt) @@ -2049,15 +2186,14 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) if (!transport->inet) { struct sock *sk = sock->sk; - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; - sock_set_flag(sk, SOCK_FASYNC); - sk->sk_allocation = GFP_NOIO; + sk->sk_use_task_frag = false; xprt_set_connected(xprt); @@ -2067,7 +2203,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_set_memalloc(xprt); - write_unlock_bh(&sk->sk_callback_lock); + release_sock(sk); } xs_udp_do_set_buffer_size(xprt); @@ -2081,7 +2217,10 @@ static void xs_udp_setup_socket(struct work_struct *work) struct rpc_xprt *xprt = &transport->xprt; struct socket *sock; int status = -EIO; + unsigned int pflags = current->flags; + if (atomic_read(&xprt->swapper)) + current->flags |= PF_MEMALLOC; sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP, false); @@ -2101,6 +2240,7 @@ out: xprt_clear_connecting(xprt); xprt_unlock_connect(xprt, transport); xprt_wake_pending_tasks(xprt, status); + current_restore_flags(pflags, PF_MEMALLOC); } /** @@ -2118,13 +2258,21 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt) if (sock == NULL) return; + if (!xprt->reuseport) { + xs_close(xprt); + return; + } switch (skst) { - default: + case TCP_FIN_WAIT1: + case TCP_FIN_WAIT2: + case TCP_LAST_ACK: + break; + case TCP_ESTABLISHED: + case TCP_CLOSE_WAIT: kernel_sock_shutdown(sock, SHUT_RDWR); trace_rpc_socket_shutdown(xprt, sock); break; - case TCP_CLOSE: - case TCP_TIME_WAIT: + default: xs_reset_transport(transport); } } @@ -2133,32 +2281,59 @@ static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct net *net = sock_net(sock->sk); + unsigned long connect_timeout; + unsigned long syn_retries; unsigned int keepidle; unsigned int keepcnt; - unsigned int opt_on = 1; unsigned int timeo; + unsigned long t; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ); keepcnt = xprt->timeout->to_retries + 1; timeo = jiffies_to_msecs(xprt->timeout->to_initval) * (xprt->timeout->to_retries + 1); clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); /* TCP Keepalive options */ - kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, - (char *)&opt_on, sizeof(opt_on)); - kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, - (char *)&keepidle, sizeof(keepidle)); - kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, - (char *)&keepidle, sizeof(keepidle)); - kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, - (char *)&keepcnt, sizeof(keepcnt)); + sock_set_keepalive(sock->sk); + tcp_sock_set_keepidle(sock->sk, keepidle); + tcp_sock_set_keepintvl(sock->sk, keepidle); + tcp_sock_set_keepcnt(sock->sk, keepcnt); /* TCP user timeout (see RFC5482) */ - kernel_setsockopt(sock, SOL_TCP, TCP_USER_TIMEOUT, - (char *)&timeo, sizeof(timeo)); + tcp_sock_set_user_timeout(sock->sk, timeo); + + /* Connect timeout */ + connect_timeout = max_t(unsigned long, + DIV_ROUND_UP(xprt->connect_timeout, HZ), 1); + syn_retries = max_t(unsigned long, + READ_ONCE(net->ipv4.sysctl_tcp_syn_retries), 1); + for (t = 0; t <= syn_retries && (1UL << t) < connect_timeout; t++) + ; + if (t <= syn_retries) + tcp_sock_set_syncnt(sock->sk, t - 1); +} + +static void xs_tcp_do_set_connect_timeout(struct rpc_xprt *xprt, + unsigned long connect_timeout) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct rpc_timeout to; + unsigned long initval; + + memcpy(&to, xprt->timeout, sizeof(to)); + /* Arbitrary lower limit */ + initval = max_t(unsigned long, connect_timeout, XS_TCP_INIT_REEST_TO); + to.to_initval = initval; + to.to_maxval = initval; + to.to_retries = 0; + memcpy(&transport->tcp_timeout, &to, sizeof(transport->tcp_timeout)); + xprt->timeout = &transport->tcp_timeout; + xprt->connect_timeout = connect_timeout; } static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, @@ -2166,37 +2341,22 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, unsigned long reconnect_timeout) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct rpc_timeout to; - unsigned long initval; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); if (reconnect_timeout < xprt->max_reconnect_timeout) xprt->max_reconnect_timeout = reconnect_timeout; - if (connect_timeout < xprt->connect_timeout) { - memcpy(&to, xprt->timeout, sizeof(to)); - initval = DIV_ROUND_UP(connect_timeout, to.to_retries + 1); - /* Arbitrary lower limit */ - if (initval < XS_TCP_INIT_REEST_TO << 1) - initval = XS_TCP_INIT_REEST_TO << 1; - to.to_initval = initval; - to.to_maxval = initval; - memcpy(&transport->tcp_timeout, &to, - sizeof(transport->tcp_timeout)); - xprt->timeout = &transport->tcp_timeout; - xprt->connect_timeout = connect_timeout; - } + if (connect_timeout < xprt->connect_timeout) + xs_tcp_do_set_connect_timeout(xprt, connect_timeout); set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - int ret = -ENOTCONN; if (!transport->inet) { struct sock *sk = sock->sk; - unsigned int addr_pref = IPV6_PREFER_SRC_PUBLIC; /* Avoid temporary address, they are bad for long-lived * connections such as NFS mounts. @@ -2205,12 +2365,15 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) * knowledge about the normal duration of connections, * MAY override this as appropriate. */ - kernel_setsockopt(sock, SOL_IPV6, IPV6_ADDR_PREFERENCES, - (char *)&addr_pref, sizeof(addr_pref)); + if (xs_addr(xprt)->sa_family == PF_INET6) { + ip6_sock_set_addr_preferences(sk, + IPV6_PREFER_SRC_PUBLIC); + } xs_tcp_set_socket_timeouts(xprt, sock); + tcp_sock_set_nodelay(sk); - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); xs_save_old_callbacks(transport, sk); @@ -2218,13 +2381,11 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_data_ready = xs_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; - sock_set_flag(sk, SOCK_FASYNC); sk->sk_error_report = xs_error_report; - sk->sk_allocation = GFP_NOIO; + sk->sk_use_task_frag = false; /* socket options */ sock_reset_flag(sk, SOCK_LINGER); - tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; xprt_clear_connected(xprt); @@ -2232,35 +2393,20 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) transport->sock = sock; transport->inet = sk; - write_unlock_bh(&sk->sk_callback_lock); + release_sock(sk); } if (!xprt_bound(xprt)) - goto out; + return -ENOTCONN; xs_set_memalloc(xprt); - /* Reset TCP record info */ - xs_stream_reset_connect(transport); + xs_stream_start_connect(transport); /* Tell the socket layer to start connecting... */ set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); - ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); - switch (ret) { - case 0: - xs_set_srcport(transport, sock); - /* fall through */ - case -EINPROGRESS: - /* SYN_SENT! */ - if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) - xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; - break; - case -EADDRNOTAVAIL: - /* Source port number is unavailable. Try a new one! */ - transport->srcport = 0; - } -out: - return ret; + return kernel_connect(sock, (struct sockaddr_unsized *)xs_addr(xprt), + xprt->addrlen, O_NONBLOCK); } /** @@ -2275,14 +2421,22 @@ static void xs_tcp_setup_socket(struct work_struct *work) container_of(work, struct sock_xprt, connect_worker.work); struct socket *sock = transport->sock; struct rpc_xprt *xprt = &transport->xprt; - int status = -EIO; + int status; + unsigned int pflags = current->flags; + + if (atomic_read(&xprt->swapper)) + current->flags |= PF_MEMALLOC; - if (!sock) { - sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_STREAM, - IPPROTO_TCP, true); + if (xprt_connected(xprt)) + goto out; + if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT, + &transport->sock_state) || + !sock) { + xs_reset_transport(transport); + sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family, + SOCK_STREAM, IPPROTO_TCP, true); if (IS_ERR(sock)) { - status = PTR_ERR(sock); + xprt_wake_pending_tasks(xprt, PTR_ERR(sock)); goto out; } } @@ -2299,21 +2453,27 @@ static void xs_tcp_setup_socket(struct work_struct *work) xprt, -status, xprt_connected(xprt), sock->sk->sk_state); switch (status) { - default: - printk("%s: connect returned unhandled error %d\n", - __func__, status); - /* fall through */ - case -EADDRNOTAVAIL: - /* We're probably in TIME_WAIT. Get rid of existing socket, - * and retry - */ - xs_tcp_force_close(xprt); - break; case 0: case -EINPROGRESS: + /* SYN_SENT! */ + set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state); + if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) + xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; + fallthrough; case -EALREADY: - xprt_unlock_connect(xprt, transport); - return; + goto out_unlock; + case -EADDRNOTAVAIL: + /* Source port number is unavailable. Try a new one! */ + transport->srcport = 0; + status = -EAGAIN; + break; + case -EPERM: + /* Happens, for instance, if a BPF program is preventing + * the connect. Remap the error so upper layers can better + * deal with it. + */ + status = -ECONNREFUSED; + fallthrough; case -EINVAL: /* Happens, for instance, if the user specified a link * local IPv6 address without a scope-id. @@ -2325,39 +2485,293 @@ static void xs_tcp_setup_socket(struct work_struct *work) case -EHOSTUNREACH: case -EADDRINUSE: case -ENOBUFS: - /* - * xs_tcp_force_close() wakes tasks with -EIO. - * We need to wake them first to ensure the - * correct error code. - */ - xprt_wake_pending_tasks(xprt, status); - xs_tcp_force_close(xprt); - goto out; + case -ENOTCONN: + break; + default: + printk("%s: connect returned unhandled error %d\n", + __func__, status); + status = -EAGAIN; } - status = -EAGAIN; + + /* xs_tcp_force_close() wakes tasks with a fixed error code. + * We need to wake them first to ensure the correct error code. + */ + xprt_wake_pending_tasks(xprt, status); + xs_tcp_force_close(xprt); out: xprt_clear_connecting(xprt); +out_unlock: xprt_unlock_connect(xprt, transport); - xprt_wake_pending_tasks(xprt, status); + current_restore_flags(pflags, PF_MEMALLOC); } -static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt) +/* + * Transfer the connected socket to @upper_transport, then mark that + * xprt CONNECTED. + */ +static int xs_tcp_tls_finish_connecting(struct rpc_xprt *lower_xprt, + struct sock_xprt *upper_transport) { - unsigned long start, now = jiffies; + struct sock_xprt *lower_transport = + container_of(lower_xprt, struct sock_xprt, xprt); + struct rpc_xprt *upper_xprt = &upper_transport->xprt; + + if (!upper_transport->inet) { + struct socket *sock = lower_transport->sock; + struct sock *sk = sock->sk; + + /* Avoid temporary address, they are bad for long-lived + * connections such as NFS mounts. + * RFC4941, section 3.6 suggests that: + * Individual applications, which have specific + * knowledge about the normal duration of connections, + * MAY override this as appropriate. + */ + if (xs_addr(upper_xprt)->sa_family == PF_INET6) + ip6_sock_set_addr_preferences(sk, IPV6_PREFER_SRC_PUBLIC); + + xs_tcp_set_socket_timeouts(upper_xprt, sock); + tcp_sock_set_nodelay(sk); + + lock_sock(sk); - start = xprt->stat.connect_start + xprt->reestablish_timeout; - if (time_after(start, now)) - return start - now; + /* @sk is already connected, so it now has the RPC callbacks. + * Reach into @lower_transport to save the original ones. + */ + upper_transport->old_data_ready = lower_transport->old_data_ready; + upper_transport->old_state_change = lower_transport->old_state_change; + upper_transport->old_write_space = lower_transport->old_write_space; + upper_transport->old_error_report = lower_transport->old_error_report; + sk->sk_user_data = upper_xprt; + + /* socket options */ + sock_reset_flag(sk, SOCK_LINGER); + + xprt_clear_connected(upper_xprt); + + upper_transport->sock = sock; + upper_transport->inet = sk; + upper_transport->file = lower_transport->file; + + release_sock(sk); + + /* Reset lower_transport before shutting down its clnt */ + mutex_lock(&lower_transport->recv_mutex); + lower_transport->inet = NULL; + lower_transport->sock = NULL; + lower_transport->file = NULL; + + xprt_clear_connected(lower_xprt); + xs_sock_reset_connection_flags(lower_xprt); + xs_stream_reset_connect(lower_transport); + mutex_unlock(&lower_transport->recv_mutex); + } + + if (!xprt_bound(upper_xprt)) + return -ENOTCONN; + + xs_set_memalloc(upper_xprt); + + if (!xprt_test_and_set_connected(upper_xprt)) { + upper_xprt->connect_cookie++; + clear_bit(XPRT_SOCK_CONNECTING, &upper_transport->sock_state); + xprt_clear_connecting(upper_xprt); + + upper_xprt->stat.connect_count++; + upper_xprt->stat.connect_time += (long)jiffies - + upper_xprt->stat.connect_start; + xs_run_error_worker(upper_transport, XPRT_SOCK_WAKE_PENDING); + } return 0; } -static void xs_reconnect_backoff(struct rpc_xprt *xprt) +/** + * xs_tls_handshake_done - TLS handshake completion handler + * @data: address of xprt to wake + * @status: status of handshake + * @peerid: serial number of key containing the remote's identity + * + */ +static void xs_tls_handshake_done(void *data, int status, key_serial_t peerid) +{ + struct rpc_xprt *lower_xprt = data; + struct sock_xprt *lower_transport = + container_of(lower_xprt, struct sock_xprt, xprt); + + switch (status) { + case 0: + case -EACCES: + case -ETIMEDOUT: + lower_transport->xprt_err = status; + break; + default: + lower_transport->xprt_err = -EACCES; + } + complete(&lower_transport->handshake_done); + xprt_put(lower_xprt); +} + +static int xs_tls_handshake_sync(struct rpc_xprt *lower_xprt, struct xprtsec_parms *xprtsec) +{ + struct sock_xprt *lower_transport = + container_of(lower_xprt, struct sock_xprt, xprt); + struct tls_handshake_args args = { + .ta_sock = lower_transport->sock, + .ta_done = xs_tls_handshake_done, + .ta_data = xprt_get(lower_xprt), + .ta_peername = lower_xprt->servername, + }; + struct sock *sk = lower_transport->inet; + int rc; + + init_completion(&lower_transport->handshake_done); + set_bit(XPRT_SOCK_IGNORE_RECV, &lower_transport->sock_state); + lower_transport->xprt_err = -ETIMEDOUT; + switch (xprtsec->policy) { + case RPC_XPRTSEC_TLS_ANON: + rc = tls_client_hello_anon(&args, GFP_KERNEL); + if (rc) + goto out_put_xprt; + break; + case RPC_XPRTSEC_TLS_X509: + args.ta_my_cert = xprtsec->cert_serial; + args.ta_my_privkey = xprtsec->privkey_serial; + rc = tls_client_hello_x509(&args, GFP_KERNEL); + if (rc) + goto out_put_xprt; + break; + default: + rc = -EACCES; + goto out_put_xprt; + } + + rc = wait_for_completion_interruptible_timeout(&lower_transport->handshake_done, + XS_TLS_HANDSHAKE_TO); + if (rc <= 0) { + tls_handshake_cancel(sk); + if (rc == 0) + rc = -ETIMEDOUT; + goto out_put_xprt; + } + + rc = lower_transport->xprt_err; + +out: + xs_stream_reset_connect(lower_transport); + clear_bit(XPRT_SOCK_IGNORE_RECV, &lower_transport->sock_state); + return rc; + +out_put_xprt: + xprt_put(lower_xprt); + goto out; +} + +/** + * xs_tcp_tls_setup_socket - establish a TLS session on a TCP socket + * @work: queued work item + * + * Invoked by a work queue tasklet. + * + * For RPC-with-TLS, there is a two-stage connection process. + * + * The "upper-layer xprt" is visible to the RPC consumer. Once it has + * been marked connected, the consumer knows that a TCP connection and + * a TLS session have been established. + * + * A "lower-layer xprt", created in this function, handles the mechanics + * of connecting the TCP socket, performing the RPC_AUTH_TLS probe, and + * then driving the TLS handshake. Once all that is complete, the upper + * layer xprt is marked connected. + */ +static void xs_tcp_tls_setup_socket(struct work_struct *work) { - xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) - xprt->reestablish_timeout = xprt->max_reconnect_timeout; - if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) - xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; + struct sock_xprt *upper_transport = + container_of(work, struct sock_xprt, connect_worker.work); + struct rpc_clnt *upper_clnt = upper_transport->clnt; + struct rpc_xprt *upper_xprt = &upper_transport->xprt; + struct rpc_create_args args = { + .net = upper_xprt->xprt_net, + .protocol = upper_xprt->prot, + .address = (struct sockaddr *)&upper_xprt->addr, + .addrsize = upper_xprt->addrlen, + .timeout = upper_clnt->cl_timeout, + .servername = upper_xprt->servername, + .program = upper_clnt->cl_program, + .prognumber = upper_clnt->cl_prog, + .version = upper_clnt->cl_vers, + .authflavor = RPC_AUTH_TLS, + .cred = upper_clnt->cl_cred, + .xprtsec = { + .policy = RPC_XPRTSEC_NONE, + }, + .stats = upper_clnt->cl_stats, + }; + unsigned int pflags = current->flags; + struct rpc_clnt *lower_clnt; + struct rpc_xprt *lower_xprt; + int status; + + if (atomic_read(&upper_xprt->swapper)) + current->flags |= PF_MEMALLOC; + + xs_stream_start_connect(upper_transport); + + /* This implicitly sends an RPC_AUTH_TLS probe */ + lower_clnt = rpc_create(&args); + if (IS_ERR(lower_clnt)) { + trace_rpc_tls_unavailable(upper_clnt, upper_xprt); + clear_bit(XPRT_SOCK_CONNECTING, &upper_transport->sock_state); + xprt_clear_connecting(upper_xprt); + xprt_wake_pending_tasks(upper_xprt, PTR_ERR(lower_clnt)); + xs_run_error_worker(upper_transport, XPRT_SOCK_WAKE_PENDING); + goto out_unlock; + } + + /* RPC_AUTH_TLS probe was successful. Try a TLS handshake on + * the lower xprt. + */ + rcu_read_lock(); + lower_xprt = rcu_dereference(lower_clnt->cl_xprt); + rcu_read_unlock(); + + if (wait_on_bit_lock(&lower_xprt->state, XPRT_LOCKED, TASK_KILLABLE)) + goto out_unlock; + + status = xs_tls_handshake_sync(lower_xprt, &upper_xprt->xprtsec); + if (status) { + trace_rpc_tls_not_started(upper_clnt, upper_xprt); + goto out_close; + } + + status = xs_tcp_tls_finish_connecting(lower_xprt, upper_transport); + if (status) + goto out_close; + xprt_release_write(lower_xprt, NULL); + trace_rpc_socket_connect(upper_xprt, upper_transport->sock, 0); + rpc_shutdown_client(lower_clnt); + + /* Check for ingress data that arrived before the socket's + * ->data_ready callback was set up. + */ + xs_poll_check_readable(upper_transport); + +out_unlock: + current_restore_flags(pflags, PF_MEMALLOC); + upper_transport->clnt = NULL; + xprt_unlock_connect(upper_xprt, upper_transport); + return; + +out_close: + xprt_release_write(lower_xprt, NULL); + rpc_shutdown_client(lower_clnt); + + /* xprt_force_disconnect() wakes tasks with a fixed tk_status code. + * Wake them first here to ensure they get our tk_status code. + */ + xprt_wake_pending_tasks(upper_xprt, status); + xs_tcp_force_close(upper_xprt); + xprt_clear_connecting(upper_xprt); + goto out_unlock; } /** @@ -2383,25 +2797,64 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) if (transport->sock != NULL) { dprintk("RPC: xs_connect delayed xprt %p for %lu " - "seconds\n", - xprt, xprt->reestablish_timeout / HZ); + "seconds\n", xprt, xprt->reestablish_timeout / HZ); - /* Start by resetting any existing state */ - xs_reset_transport(transport); - - delay = xs_reconnect_delay(xprt); - xs_reconnect_backoff(xprt); + delay = xprt_reconnect_delay(xprt); + xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO); } else dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); + transport->clnt = task->tk_client; queue_delayed_work(xprtiod_workqueue, &transport->connect_worker, delay); } +static void xs_wake_disconnect(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state)) + xs_tcp_force_close(&transport->xprt); +} + +static void xs_wake_write(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state)) + xprt_write_space(&transport->xprt); +} + +static void xs_wake_error(struct sock_xprt *transport) +{ + int sockerr; + + if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) + return; + sockerr = xchg(&transport->xprt_err, 0); + if (sockerr < 0) { + xprt_wake_pending_tasks(&transport->xprt, sockerr); + xs_tcp_force_close(&transport->xprt); + } +} + +static void xs_wake_pending(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state)) + xprt_wake_pending_tasks(&transport->xprt, -EAGAIN); +} + +static void xs_error_handle(struct work_struct *work) +{ + struct sock_xprt *transport = container_of(work, + struct sock_xprt, error_worker); + + xs_wake_disconnect(transport); + xs_wake_write(transport); + xs_wake_error(transport); + xs_wake_pending(transport); +} + /** - * xs_local_print_stats - display AF_LOCAL socket-specifc stats + * xs_local_print_stats - display AF_LOCAL socket-specific stats * @xprt: rpc_xprt struct containing statistics * @seq: output file * @@ -2430,7 +2883,7 @@ static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) } /** - * xs_udp_print_stats - display UDP socket-specifc stats + * xs_udp_print_stats - display UDP socket-specific stats * @xprt: rpc_xprt struct containing statistics * @seq: output file * @@ -2454,7 +2907,7 @@ static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) } /** - * xs_tcp_print_stats - display TCP socket-specifc stats + * xs_tcp_print_stats - display TCP socket-specific stats * @xprt: rpc_xprt struct containing statistics * @seq: output file * @@ -2502,7 +2955,7 @@ static int bc_malloc(struct rpc_task *task) return -EINVAL; } - page = alloc_page(GFP_KERNEL); + page = alloc_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN); if (!page) return -ENOMEM; @@ -2526,46 +2979,46 @@ static void bc_free(struct rpc_task *task) free_page((unsigned long)buf); } -/* - * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex - * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request. - */ static int bc_sendto(struct rpc_rqst *req) { - int len; - struct xdr_buf *xbufp = &req->rq_snd_buf; - struct rpc_xprt *xprt = req->rq_xprt; + struct xdr_buf *xdr = &req->rq_snd_buf; struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct socket *sock = transport->sock; - unsigned long headoff; - unsigned long tailoff; - - xs_encode_stream_record_marker(xbufp); - - tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK; - headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK; - len = svc_send_common(sock, xbufp, - virt_to_page(xbufp->head[0].iov_base), headoff, - xbufp->tail[0].iov_base, tailoff); - - if (len != xbufp->len) { - printk(KERN_NOTICE "Error sending entire callback!\n"); - len = -EAGAIN; - } + container_of(req->rq_xprt, struct sock_xprt, xprt); + struct msghdr msg = { + .msg_flags = 0, + }; + rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | + (u32)xdr->len); + unsigned int sent = 0; + int err; - return len; + req->rq_xtime = ktime_get(); + err = xdr_alloc_bvec(xdr, rpc_task_gfp_mask()); + if (err < 0) + return err; + err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent); + xdr_free_bvec(xdr); + if (err < 0 || sent != (xdr->len + sizeof(marker))) + return -EAGAIN; + return sent; } -/* - * The send routine. Borrows from svc_send +/** + * bc_send_request - Send a backchannel Call on a TCP socket + * @req: rpc_rqst containing Call message to be sent + * + * xpt_mutex ensures @rqstp's whole message is written to the socket + * without interruption. + * + * Return values: + * %0 if the message was sent successfully + * %ENOTCONN if the message was not sent */ static int bc_send_request(struct rpc_rqst *req) { struct svc_xprt *xprt; int len; - dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); /* * Get the server socket associated with this callback xprt */ @@ -2588,19 +3041,11 @@ static int bc_send_request(struct rpc_rqst *req) return len; } -/* - * The close routine. Since this is client initiated, we do nothing - */ - static void bc_close(struct rpc_xprt *xprt) { + xprt_disconnect_done(xprt); } -/* - * The xprt destroy routine. Again, because this connection is client - * initiated, we do nothing - */ - static void bc_destroy(struct rpc_xprt *xprt) { dprintk("RPC: bc_destroy xprt %p\n", xprt); @@ -2621,7 +3066,8 @@ static const struct rpc_xprt_ops xs_local_ops = { .buf_free = rpc_free, .prepare_request = xs_stream_prepare_request, .send_request = xs_local_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_def, + .abort_send_request = xs_stream_abort_send_request, + .wait_for_reply_request = xprt_wait_for_reply_request_def, .close = xs_close, .destroy = xs_destroy, .print_stats = xs_local_print_stats, @@ -2638,10 +3084,12 @@ static const struct rpc_xprt_ops xs_udp_ops = { .rpcbind = rpcb_getport_async, .set_port = xs_set_port, .connect = xs_connect, + .get_srcaddr = xs_sock_srcaddr, + .get_srcport = xs_sock_srcport, .buf_alloc = rpc_malloc, .buf_free = rpc_free, .send_request = xs_udp_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_rtt, + .wait_for_reply_request = xprt_wait_for_reply_request_rtt, .timer = xs_udp_timer, .release_request = xprt_release_rqst_cong, .close = xs_close, @@ -2660,11 +3108,14 @@ static const struct rpc_xprt_ops xs_tcp_ops = { .rpcbind = rpcb_getport_async, .set_port = xs_set_port, .connect = xs_connect, + .get_srcaddr = xs_sock_srcaddr, + .get_srcport = xs_sock_srcport, .buf_alloc = rpc_malloc, .buf_free = rpc_free, .prepare_request = xs_stream_prepare_request, .send_request = xs_tcp_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_def, + .abort_send_request = xs_stream_abort_send_request, + .wait_for_reply_request = xprt_wait_for_reply_request_def, .close = xs_tcp_shutdown, .destroy = xs_destroy, .set_connect_timeout = xs_tcp_set_connect_timeout, @@ -2675,6 +3126,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = { #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, .bc_maxpayload = xs_tcp_bc_maxpayload, + .bc_num_slots = xprt_bc_max_slots, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif @@ -2692,7 +3144,7 @@ static const struct rpc_xprt_ops bc_tcp_ops = { .buf_alloc = bc_malloc, .buf_free = bc_free, .send_request = bc_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_def, + .wait_for_reply_request = xprt_wait_for_reply_request_def, .close = bc_close, .destroy = bc_destroy, .print_stats = xs_tcp_print_stats, @@ -2793,7 +3245,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = 0; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); + xprt->xprt_class = &xs_local_transport; xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->bind_timeout = XS_BIND_TO; @@ -2804,11 +3256,12 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->timeout = &xs_local_default_timeout; INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); switch (sun->sun_family) { case AF_LOCAL: - if (sun->sun_path[0] != '/') { + if (sun->sun_path[0] != '/' && sun->sun_path[0] != '\0') { dprintk("RPC: bad AF_LOCAL address: %s\n", sun->sun_path); ret = ERR_PTR(-EINVAL); @@ -2816,9 +3269,6 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) } xprt_set_bound(xprt); xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); - ret = ERR_PTR(xs_local_setup_socket(transport)); - if (ret) - goto out_err; break; default: ret = ERR_PTR(-EAFNOSUPPORT); @@ -2862,7 +3312,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_UDP; - xprt->tsh_size = 0; + xprt->xprt_class = &xs_udp_transport; /* XXX: header size can vary due to auth type, IPv6, etc. */ xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); @@ -2875,6 +3325,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); switch (addr->sa_family) { @@ -2942,7 +3393,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_TCP; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); + xprt->xprt_class = &xs_tcp_transport; xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->bind_timeout = XS_BIND_TO; @@ -2953,10 +3404,16 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->timeout = &xs_tcp_default_timeout; xprt->max_reconnect_timeout = xprt->timeout->to_maxval; + if (args->reconnect_timeout) + xprt->max_reconnect_timeout = args->reconnect_timeout; + xprt->connect_timeout = xprt->timeout->to_initval * (xprt->timeout->to_retries + 1); + if (args->connect_timeout) + xs_tcp_do_set_connect_timeout(xprt, args->connect_timeout); INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); switch (addr->sa_family) { @@ -2996,6 +3453,94 @@ out_err: } /** + * xs_setup_tcp_tls - Set up transport to use a TCP with TLS + * @args: rpc transport creation arguments + * + */ +static struct rpc_xprt *xs_setup_tcp_tls(struct xprt_create *args) +{ + struct sockaddr *addr = args->dstaddr; + struct rpc_xprt *xprt; + struct sock_xprt *transport; + struct rpc_xprt *ret; + unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries; + + if (args->flags & XPRT_CREATE_INFINITE_SLOTS) + max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT; + + xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, + max_slot_table_size); + if (IS_ERR(xprt)) + return xprt; + transport = container_of(xprt, struct sock_xprt, xprt); + + xprt->prot = IPPROTO_TCP; + xprt->xprt_class = &xs_tcp_transport; + xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; + + xprt->bind_timeout = XS_BIND_TO; + xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; + xprt->idle_timeout = XS_IDLE_DISC_TO; + + xprt->ops = &xs_tcp_ops; + xprt->timeout = &xs_tcp_default_timeout; + + xprt->max_reconnect_timeout = xprt->timeout->to_maxval; + xprt->connect_timeout = xprt->timeout->to_initval * + (xprt->timeout->to_retries + 1); + + INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); + + switch (args->xprtsec.policy) { + case RPC_XPRTSEC_TLS_ANON: + case RPC_XPRTSEC_TLS_X509: + xprt->xprtsec = args->xprtsec; + INIT_DELAYED_WORK(&transport->connect_worker, + xs_tcp_tls_setup_socket); + break; + default: + ret = ERR_PTR(-EACCES); + goto out_err; + } + + switch (addr->sa_family) { + case AF_INET: + if (((struct sockaddr_in *)addr)->sin_port != htons(0)) + xprt_set_bound(xprt); + + xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); + break; + case AF_INET6: + if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) + xprt_set_bound(xprt); + + xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); + break; + default: + ret = ERR_PTR(-EAFNOSUPPORT); + goto out_err; + } + + if (xprt_bound(xprt)) + dprintk("RPC: set up xprt to %s (port %s) via %s\n", + xprt->address_strings[RPC_DISPLAY_ADDR], + xprt->address_strings[RPC_DISPLAY_PORT], + xprt->address_strings[RPC_DISPLAY_PROTO]); + else + dprintk("RPC: set up xprt to %s (autobind) via %s\n", + xprt->address_strings[RPC_DISPLAY_ADDR], + xprt->address_strings[RPC_DISPLAY_PROTO]); + + if (try_module_get(THIS_MODULE)) + return xprt; + ret = ERR_PTR(-EINVAL); +out_err: + xs_xprt_free(xprt); + return ret; +} + +/** * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket * @args: rpc transport creation arguments * @@ -3015,7 +3560,7 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_TCP; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); + xprt->xprt_class = &xs_bc_tcp_transport; xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->timeout = &xs_tcp_default_timeout; @@ -3083,6 +3628,7 @@ static struct xprt_class xs_local_transport = { .owner = THIS_MODULE, .ident = XPRT_TRANSPORT_LOCAL, .setup = xs_setup_local, + .netid = { "" }, }; static struct xprt_class xs_udp_transport = { @@ -3091,6 +3637,7 @@ static struct xprt_class xs_udp_transport = { .owner = THIS_MODULE, .ident = XPRT_TRANSPORT_UDP, .setup = xs_setup_udp, + .netid = { "udp", "udp6", "" }, }; static struct xprt_class xs_tcp_transport = { @@ -3099,6 +3646,16 @@ static struct xprt_class xs_tcp_transport = { .owner = THIS_MODULE, .ident = XPRT_TRANSPORT_TCP, .setup = xs_setup_tcp, + .netid = { "tcp", "tcp6", "" }, +}; + +static struct xprt_class xs_tcp_tls_transport = { + .list = LIST_HEAD_INIT(xs_tcp_tls_transport.list), + .name = "tcp-with-tls", + .owner = THIS_MODULE, + .ident = XPRT_TRANSPORT_TCP_TLS, + .setup = xs_setup_tcp_tls, + .netid = { "tcp", "tcp6", "" }, }; static struct xprt_class xs_bc_tcp_transport = { @@ -3107,6 +3664,7 @@ static struct xprt_class xs_bc_tcp_transport = { .owner = THIS_MODULE, .ident = XPRT_TRANSPORT_BC_TCP, .setup = xs_setup_bc_tcp, + .netid = { "" }, }; /** @@ -3116,11 +3674,12 @@ static struct xprt_class xs_bc_tcp_transport = { int init_socket_xprt(void) { if (!sunrpc_table_header) - sunrpc_table_header = register_sysctl_table(sunrpc_table); + sunrpc_table_header = register_sysctl("sunrpc", xs_tunables_table); xprt_register_transport(&xs_local_transport); xprt_register_transport(&xs_udp_transport); xprt_register_transport(&xs_tcp_transport); + xprt_register_transport(&xs_tcp_tls_transport); xprt_register_transport(&xs_bc_tcp_transport); return 0; @@ -3140,27 +3699,10 @@ void cleanup_socket_xprt(void) xprt_unregister_transport(&xs_local_transport); xprt_unregister_transport(&xs_udp_transport); xprt_unregister_transport(&xs_tcp_transport); + xprt_unregister_transport(&xs_tcp_tls_transport); xprt_unregister_transport(&xs_bc_tcp_transport); } -static int param_set_uint_minmax(const char *val, - const struct kernel_param *kp, - unsigned int min, unsigned int max) -{ - unsigned int num; - int ret; - - if (!val) - return -EINVAL; - ret = kstrtouint(val, 0, &num); - if (ret) - return ret; - if (num < min || num > max) - return -EINVAL; - *((unsigned int *)kp->arg) = num; - return 0; -} - static int param_set_portnr(const char *val, const struct kernel_param *kp) { return param_set_uint_minmax(val, kp, |
