From 96f8778f70d0f5b988146d757a26dcd5d5b44116 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 3 Aug 2017 14:30:03 -0400 Subject: xprtrdma: Add xdr_init_decode to rpcrdma_reply_handler() Transport header decoding deals with untrusted input data, therefore decoding this header needs to be hardened. Adopt the same infrastructure that is used when XDR decoding NFS replies. This is slightly more CPU-intensive than the replaced code, but we're not adding new atomics, locking, or context switches. The cost is manageable. Start by initializing an xdr_stream in rpcrdma_reply_handler(). Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index ca4d6e4528f3..24f58c7b3106 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -993,10 +993,11 @@ rpcrdma_reply_handler(struct work_struct *work) struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct xdr_stream *xdr = &rep->rr_stream; struct rpcrdma_msg *headerp; struct rpcrdma_req *req; struct rpc_rqst *rqst; - __be32 *iptr; + __be32 *iptr, *p, xid, vers, proc; int rdmalen, status, rmerr; unsigned long cwnd; struct list_head mws; @@ -1005,8 +1006,18 @@ rpcrdma_reply_handler(struct work_struct *work) if (rep->rr_len == RPCRDMA_BAD_LEN) goto out_badstatus; - if (rep->rr_len < RPCRDMA_HDRLEN_ERR) + + xdr_init_decode(xdr, &rep->rr_hdrbuf, + rep->rr_hdrbuf.head[0].iov_base); + + /* Fixed transport header fields */ + p = xdr_inline_decode(xdr, 4 * sizeof(*p)); + if (unlikely(!p)) goto out_shortreply; + xid = *p++; + vers = *p++; + p++; /* credits */ + proc = *p++; headerp = rdmab_to_msg(rep->rr_rdmabuf); #if defined(CONFIG_SUNRPC_BACKCHANNEL) @@ -1018,8 +1029,7 @@ rpcrdma_reply_handler(struct work_struct *work) * get context for handling any incoming chunks. */ spin_lock(&buf->rb_lock); - req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, - headerp->rm_xid); + req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, xid); if (!req) goto out_nomatch; if (req->rl_reply) @@ -1035,7 +1045,7 @@ rpcrdma_reply_handler(struct work_struct *work) spin_unlock(&buf->rb_lock); dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", - __func__, rep, req, be32_to_cpu(headerp->rm_xid)); + __func__, rep, req, be32_to_cpu(xid)); /* Invalidate and unmap the data payloads before waking the * waiting application. This guarantees the memory regions @@ -1052,16 +1062,16 @@ rpcrdma_reply_handler(struct work_struct *work) * the rep, rqst, and rq_task pointers remain stable. */ spin_lock_bh(&xprt->transport_lock); - rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); + rqst = xprt_lookup_rqst(xprt, xid); if (!rqst) goto out_norqst; xprt->reestablish_timeout = 0; - if (headerp->rm_vers != rpcrdma_version) + if (vers != rpcrdma_version) goto out_badversion; /* check for expected message types */ /* The order of some of these tests is important. */ - switch (headerp->rm_type) { + switch (proc) { case rdma_msg: /* never expect read chunks */ /* never expect reply chunks (two ways to check) */ @@ -1123,7 +1133,7 @@ badheader: default: dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", rqst->rq_task->tk_pid, __func__, - be32_to_cpu(headerp->rm_type)); + be32_to_cpu(proc)); status = -EIO; r_xprt->rx_stats.bad_reply_count++; break; @@ -1161,7 +1171,7 @@ out_bcall: */ out_badversion: dprintk("RPC: %s: invalid version %d\n", - __func__, be32_to_cpu(headerp->rm_vers)); + __func__, be32_to_cpu(vers)); status = -EIO; r_xprt->rx_stats.bad_reply_count++; goto out; @@ -1204,16 +1214,15 @@ out_shortreply: out_nomatch: spin_unlock(&buf->rb_lock); - dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", - __func__, be32_to_cpu(headerp->rm_xid), - rep->rr_len); + dprintk("RPC: %s: no match for incoming xid 0x%08x\n", + __func__, be32_to_cpu(xid)); goto repost; out_duplicate: spin_unlock(&buf->rb_lock); dprintk("RPC: %s: " "duplicate reply %p to RPC request %p: xid 0x%08x\n", - __func__, rep, req, be32_to_cpu(headerp->rm_xid)); + __func__, rep, req, be32_to_cpu(xid)); /* If no pending RPC transaction was matched, post a replacement * receive buffer before returning. -- cgit From 41c8f70f5a3db7e06179186b6525fd9ee1d7d314 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 3 Aug 2017 14:30:11 -0400 Subject: xprtrdma: Harden backchannel call decoding Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 58 +++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 21 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 24f58c7b3106..9b5ab598ab7b 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -949,35 +949,59 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws, } } -#if defined(CONFIG_SUNRPC_BACKCHANNEL) /* By convention, backchannel calls arrive via rdma_msg type * messages, and never populate the chunk lists. This makes * the RPC/RDMA header small and fixed in size, so it is * straightforward to check the RPC header's direction field. */ static bool -rpcrdma_is_bcall(struct rpcrdma_msg *headerp) +rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, + __be32 xid, __be32 proc) +#if defined(CONFIG_SUNRPC_BACKCHANNEL) { - __be32 *p = (__be32 *)headerp; + struct xdr_stream *xdr = &rep->rr_stream; + __be32 *p; - if (headerp->rm_type != rdma_msg) + if (proc != rdma_msg) return false; - if (headerp->rm_body.rm_chunks[0] != xdr_zero) + + /* Peek at stream contents without advancing. */ + p = xdr_inline_decode(xdr, 0); + + /* Chunk lists */ + if (*p++ != xdr_zero) return false; - if (headerp->rm_body.rm_chunks[1] != xdr_zero) + if (*p++ != xdr_zero) return false; - if (headerp->rm_body.rm_chunks[2] != xdr_zero) + if (*p++ != xdr_zero) return false; - /* sanity */ - if (p[7] != headerp->rm_xid) + /* RPC header */ + if (*p++ != xid) return false; - /* call direction */ - if (p[8] != cpu_to_be32(RPC_CALL)) + if (*p != cpu_to_be32(RPC_CALL)) return false; + /* Now that we are sure this is a backchannel call, + * advance to the RPC header. + */ + p = xdr_inline_decode(xdr, 3 * sizeof(*p)); + if (unlikely(!p)) + goto out_short; + + rpcrdma_bc_receive_call(r_xprt, rep); + return true; + +out_short: + pr_warn("RPC/RDMA short backward direction call\n"); + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep)) + xprt_disconnect_done(&r_xprt->rx_xprt); return true; } +#else /* CONFIG_SUNRPC_BACKCHANNEL */ +{ + return false; +} #endif /* CONFIG_SUNRPC_BACKCHANNEL */ /* Process received RPC/RDMA messages. @@ -1020,10 +1044,8 @@ rpcrdma_reply_handler(struct work_struct *work) proc = *p++; headerp = rdmab_to_msg(rep->rr_rdmabuf); -#if defined(CONFIG_SUNRPC_BACKCHANNEL) - if (rpcrdma_is_bcall(headerp)) - goto out_bcall; -#endif + if (rpcrdma_is_bcall(r_xprt, rep, xid, proc)) + return; /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. @@ -1159,12 +1181,6 @@ out_badstatus: } return; -#if defined(CONFIG_SUNRPC_BACKCHANNEL) -out_bcall: - rpcrdma_bc_receive_call(r_xprt, rep); - return; -#endif - /* If the incoming reply terminated a pending RPC, the next * RPC call will post a replacement receive buffer as it is * being marshaled. -- cgit From 07ff2dd510d8c5b1166827df7686036283be10e0 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 3 Aug 2017 14:30:19 -0400 Subject: xprtrdma: Refactor rpcrdma_reply_handler() Refactor the reply handler's transport header decoding logic to make it easier to understand and update. Convert some of the handler to use xdr_streams, which will enable stricter validation of input data and enable the eventual addition of support for new combinations of chunks, such as "Write + Reply" or "PZRC + normal Read". Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 212 +++++++++++++++++++++++++---------------- 1 file changed, 130 insertions(+), 82 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 9b5ab598ab7b..56f22773fa4b 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1004,6 +1004,124 @@ out_short: } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ +static int +rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, + struct rpc_rqst *rqst) +{ + struct xdr_stream *xdr = &rep->rr_stream; + int rdmalen, status; + __be32 *p; + + p = xdr_inline_decode(xdr, 2 * sizeof(*p)); + if (unlikely(!p)) + return -EIO; + + /* never expect read list */ + if (unlikely(*p++ != xdr_zero)) + return -EIO; + + /* Write list */ + if (*p != xdr_zero) { + char *base = rep->rr_hdrbuf.head[0].iov_base; + + p++; + rdmalen = rpcrdma_count_chunks(rep, 1, &p); + if (rdmalen < 0 || *p++ != xdr_zero) + return -EIO; + + rep->rr_len -= (char *)p - base; + status = rep->rr_len + rdmalen; + r_xprt->rx_stats.total_rdma_reply += rdmalen; + + /* special case - last segment may omit padding */ + rdmalen &= 3; + if (rdmalen) { + rdmalen = 4 - rdmalen; + status += rdmalen; + } + } else { + p = xdr_inline_decode(xdr, sizeof(*p)); + if (!p) + return -EIO; + + /* never expect reply chunk */ + if (*p++ != xdr_zero) + return -EIO; + rdmalen = 0; + rep->rr_len -= RPCRDMA_HDRLEN_MIN; + status = rep->rr_len; + } + + r_xprt->rx_stats.fixup_copy_count += + rpcrdma_inline_fixup(rqst, (char *)p, rep->rr_len, + rdmalen); + + return status; +} + +static noinline int +rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) +{ + struct xdr_stream *xdr = &rep->rr_stream; + int rdmalen; + __be32 *p; + + p = xdr_inline_decode(xdr, 3 * sizeof(*p)); + if (unlikely(!p)) + return -EIO; + + /* never expect Read chunks */ + if (unlikely(*p++ != xdr_zero)) + return -EIO; + /* never expect Write chunks */ + if (unlikely(*p++ != xdr_zero)) + return -EIO; + /* always expect a Reply chunk */ + if (unlikely(*p++ == xdr_zero)) + return -EIO; + + rdmalen = rpcrdma_count_chunks(rep, 0, &p); + if (rdmalen < 0) + return -EIO; + r_xprt->rx_stats.total_rdma_reply += rdmalen; + + /* Reply chunk buffer already is the reply vector - no fixup. */ + return rdmalen; +} + +static noinline int +rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, + struct rpc_rqst *rqst) +{ + struct xdr_stream *xdr = &rep->rr_stream; + __be32 *p; + + p = xdr_inline_decode(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EIO; + + switch (*p) { + case err_vers: + p = xdr_inline_decode(xdr, 2 * sizeof(*p)); + if (!p) + break; + dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n", + rqst->rq_task->tk_pid, __func__, + be32_to_cpup(p), be32_to_cpu(*(p + 1))); + break; + case err_chunk: + dprintk("RPC: %5u: %s: server reports header decoding error\n", + rqst->rq_task->tk_pid, __func__); + break; + default: + dprintk("RPC: %5u: %s: server reports unrecognized error %d\n", + rqst->rq_task->tk_pid, __func__, be32_to_cpup(p)); + } + + r_xprt->rx_stats.bad_reply_count++; + return -EREMOTEIO; +} + /* Process received RPC/RDMA messages. * * Errors must result in the RPC task either being awakened, or @@ -1018,13 +1136,12 @@ rpcrdma_reply_handler(struct work_struct *work) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct xdr_stream *xdr = &rep->rr_stream; - struct rpcrdma_msg *headerp; struct rpcrdma_req *req; struct rpc_rqst *rqst; - __be32 *iptr, *p, xid, vers, proc; - int rdmalen, status, rmerr; + __be32 *p, xid, vers, proc; unsigned long cwnd; struct list_head mws; + int status; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); @@ -1043,7 +1160,6 @@ rpcrdma_reply_handler(struct work_struct *work) p++; /* credits */ proc = *p++; - headerp = rdmab_to_msg(rep->rr_rdmabuf); if (rpcrdma_is_bcall(r_xprt, rep, xid, proc)) return; @@ -1091,75 +1207,21 @@ rpcrdma_reply_handler(struct work_struct *work) if (vers != rpcrdma_version) goto out_badversion; - /* check for expected message types */ - /* The order of some of these tests is important. */ switch (proc) { case rdma_msg: - /* never expect read chunks */ - /* never expect reply chunks (two ways to check) */ - if (headerp->rm_body.rm_chunks[0] != xdr_zero || - (headerp->rm_body.rm_chunks[1] == xdr_zero && - headerp->rm_body.rm_chunks[2] != xdr_zero)) - goto badheader; - if (headerp->rm_body.rm_chunks[1] != xdr_zero) { - /* count any expected write chunks in read reply */ - /* start at write chunk array count */ - iptr = &headerp->rm_body.rm_chunks[2]; - rdmalen = rpcrdma_count_chunks(rep, 1, &iptr); - /* check for validity, and no reply chunk after */ - if (rdmalen < 0 || *iptr++ != xdr_zero) - goto badheader; - rep->rr_len -= - ((unsigned char *)iptr - (unsigned char *)headerp); - status = rep->rr_len + rdmalen; - r_xprt->rx_stats.total_rdma_reply += rdmalen; - /* special case - last chunk may omit padding */ - if (rdmalen &= 3) { - rdmalen = 4 - rdmalen; - status += rdmalen; - } - } else { - /* else ordinary inline */ - rdmalen = 0; - iptr = (__be32 *)((unsigned char *)headerp + - RPCRDMA_HDRLEN_MIN); - rep->rr_len -= RPCRDMA_HDRLEN_MIN; - status = rep->rr_len; - } - - r_xprt->rx_stats.fixup_copy_count += - rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, - rdmalen); + status = rpcrdma_decode_msg(r_xprt, rep, rqst); break; - case rdma_nomsg: - /* never expect read or write chunks, always reply chunks */ - if (headerp->rm_body.rm_chunks[0] != xdr_zero || - headerp->rm_body.rm_chunks[1] != xdr_zero || - headerp->rm_body.rm_chunks[2] != xdr_one) - goto badheader; - iptr = (__be32 *)((unsigned char *)headerp + - RPCRDMA_HDRLEN_MIN); - rdmalen = rpcrdma_count_chunks(rep, 0, &iptr); - if (rdmalen < 0) - goto badheader; - r_xprt->rx_stats.total_rdma_reply += rdmalen; - /* Reply chunk buffer already is the reply vector - no fixup. */ - status = rdmalen; + status = rpcrdma_decode_nomsg(r_xprt, rep); break; - case rdma_error: - goto out_rdmaerr; - -badheader: + status = rpcrdma_decode_error(r_xprt, rep, rqst); + break; default: - dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", - rqst->rq_task->tk_pid, __func__, - be32_to_cpu(proc)); status = -EIO; - r_xprt->rx_stats.bad_reply_count++; - break; } + if (status < 0) + goto out_badheader; out: cwnd = xprt->cwnd; @@ -1192,25 +1254,11 @@ out_badversion: r_xprt->rx_stats.bad_reply_count++; goto out; -out_rdmaerr: - rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err); - switch (rmerr) { - case ERR_VERS: - pr_err("%s: server reports header version error (%u-%u)\n", - __func__, - be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low), - be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high)); - break; - case ERR_CHUNK: - pr_err("%s: server reports header decoding error\n", - __func__); - break; - default: - pr_err("%s: server reports unknown error %d\n", - __func__, rmerr); - } - status = -EREMOTEIO; +out_badheader: + dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", + rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc)); r_xprt->rx_stats.bad_reply_count++; + status = -EIO; goto out; /* The req was still available, but by the time the transport_lock -- cgit From 264b0cdbcb93e6d7b419fcc82fca9413a13f87ae Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 3 Aug 2017 14:30:27 -0400 Subject: xprtrdma: Replace rpcrdma_count_chunks() Clean up chunk list decoding by using the xdr_stream set up in rpcrdma_reply_handler. This hardens decoding by checking for buffer overflow at every step while unmarshaling variable-length XDR objects. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 221 +++++++++++++++++++++++------------------ 1 file changed, 127 insertions(+), 94 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 56f22773fa4b..e422c0f63a69 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -792,48 +792,6 @@ out_err: return PTR_ERR(iptr); } -/* - * Chase down a received write or reply chunklist to get length - * RDMA'd by server. See map at rpcrdma_create_chunks()! :-) - */ -static int -rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp) -{ - unsigned int i, total_len; - struct rpcrdma_write_chunk *cur_wchunk; - char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); - - i = be32_to_cpu(**iptrp); - cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); - total_len = 0; - while (i--) { - struct rpcrdma_segment *seg = &cur_wchunk->wc_target; - ifdebug(FACILITY) { - u64 off; - xdr_decode_hyper((__be32 *)&seg->rs_offset, &off); - dprintk("RPC: %s: chunk %d@0x%016llx:0x%08x\n", - __func__, - be32_to_cpu(seg->rs_length), - (unsigned long long)off, - be32_to_cpu(seg->rs_handle)); - } - total_len += be32_to_cpu(seg->rs_length); - ++cur_wchunk; - } - /* check and adjust for properly terminated write chunk */ - if (wrchunk) { - __be32 *w = (__be32 *) cur_wchunk; - if (*w++ != xdr_zero) - return -1; - cur_wchunk = (struct rpcrdma_write_chunk *) w; - } - if ((char *)cur_wchunk > base + rep->rr_len) - return -1; - - *iptrp = (__be32 *) cur_wchunk; - return total_len; -} - /** * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs * @rqst: controlling RPC request @@ -1004,89 +962,164 @@ out_short: } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -static int -rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, - struct rpc_rqst *rqst) +static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) { - struct xdr_stream *xdr = &rep->rr_stream; - int rdmalen, status; __be32 *p; - p = xdr_inline_decode(xdr, 2 * sizeof(*p)); + p = xdr_inline_decode(xdr, 4 * sizeof(*p)); if (unlikely(!p)) return -EIO; - /* never expect read list */ - if (unlikely(*p++ != xdr_zero)) - return -EIO; + ifdebug(FACILITY) { + u64 offset; + u32 handle; - /* Write list */ - if (*p != xdr_zero) { - char *base = rep->rr_hdrbuf.head[0].iov_base; + handle = be32_to_cpup(p++); + *length = be32_to_cpup(p++); + xdr_decode_hyper(p, &offset); + dprintk("RPC: %s: segment %u@0x%016llx:0x%08x\n", + __func__, *length, (unsigned long long)offset, + handle); + } else { + *length = be32_to_cpup(p + 1); + } - p++; - rdmalen = rpcrdma_count_chunks(rep, 1, &p); - if (rdmalen < 0 || *p++ != xdr_zero) + return 0; +} + +static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) +{ + u32 segcount, seglength; + __be32 *p; + + p = xdr_inline_decode(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EIO; + + *length = 0; + segcount = be32_to_cpup(p); + while (segcount--) { + if (decode_rdma_segment(xdr, &seglength)) return -EIO; + *length += seglength; + } - rep->rr_len -= (char *)p - base; - status = rep->rr_len + rdmalen; - r_xprt->rx_stats.total_rdma_reply += rdmalen; + dprintk("RPC: %s: segcount=%u, %u bytes\n", + __func__, be32_to_cpup(p), *length); + return 0; +} - /* special case - last segment may omit padding */ - rdmalen &= 3; - if (rdmalen) { - rdmalen = 4 - rdmalen; - status += rdmalen; - } - } else { +/* In RPC-over-RDMA Version One replies, a Read list is never + * expected. This decoder is a stub that returns an error if + * a Read list is present. + */ +static int decode_read_list(struct xdr_stream *xdr) +{ + __be32 *p; + + p = xdr_inline_decode(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EIO; + if (unlikely(*p != xdr_zero)) + return -EIO; + return 0; +} + +/* Supports only one Write chunk in the Write list + */ +static int decode_write_list(struct xdr_stream *xdr, u32 *length) +{ + u32 chunklen; + bool first; + __be32 *p; + + *length = 0; + first = true; + do { p = xdr_inline_decode(xdr, sizeof(*p)); - if (!p) + if (unlikely(!p)) + return -EIO; + if (*p == xdr_zero) + break; + if (!first) return -EIO; - /* never expect reply chunk */ - if (*p++ != xdr_zero) + if (decode_write_chunk(xdr, &chunklen)) return -EIO; - rdmalen = 0; - rep->rr_len -= RPCRDMA_HDRLEN_MIN; - status = rep->rr_len; - } + *length += chunklen; + first = false; + } while (true); + return 0; +} + +static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) +{ + __be32 *p; + + p = xdr_inline_decode(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EIO; + + *length = 0; + if (*p != xdr_zero) + if (decode_write_chunk(xdr, length)) + return -EIO; + return 0; +} +static int +rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, + struct rpc_rqst *rqst) +{ + struct xdr_stream *xdr = &rep->rr_stream; + u32 writelist, replychunk, rpclen; + char *base; + + /* Decode the chunk lists */ + if (decode_read_list(xdr)) + return -EIO; + if (decode_write_list(xdr, &writelist)) + return -EIO; + if (decode_reply_chunk(xdr, &replychunk)) + return -EIO; + + /* RDMA_MSG sanity checks */ + if (unlikely(replychunk)) + return -EIO; + + /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ + base = (char *)xdr_inline_decode(xdr, 0); + rpclen = xdr_stream_remaining(xdr); r_xprt->rx_stats.fixup_copy_count += - rpcrdma_inline_fixup(rqst, (char *)p, rep->rr_len, - rdmalen); + rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3); - return status; + r_xprt->rx_stats.total_rdma_reply += writelist; + return rpclen + xdr_align_size(writelist); } static noinline int rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) { struct xdr_stream *xdr = &rep->rr_stream; - int rdmalen; - __be32 *p; + u32 writelist, replychunk; - p = xdr_inline_decode(xdr, 3 * sizeof(*p)); - if (unlikely(!p)) + /* Decode the chunk lists */ + if (decode_read_list(xdr)) return -EIO; - - /* never expect Read chunks */ - if (unlikely(*p++ != xdr_zero)) + if (decode_write_list(xdr, &writelist)) return -EIO; - /* never expect Write chunks */ - if (unlikely(*p++ != xdr_zero)) - return -EIO; - /* always expect a Reply chunk */ - if (unlikely(*p++ == xdr_zero)) + if (decode_reply_chunk(xdr, &replychunk)) return -EIO; - rdmalen = rpcrdma_count_chunks(rep, 0, &p); - if (rdmalen < 0) + /* RDMA_NOMSG sanity checks */ + if (unlikely(writelist)) + return -EIO; + if (unlikely(!replychunk)) return -EIO; - r_xprt->rx_stats.total_rdma_reply += rdmalen; - /* Reply chunk buffer already is the reply vector - no fixup. */ - return rdmalen; + /* Reply chunk buffer already is the reply vector */ + r_xprt->rx_stats.total_rdma_reply += replychunk; + return replychunk; } static noinline int -- cgit From e2a671904149c1c0aa438e3cbe7d0e8ad2cf8721 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 3 Aug 2017 14:30:44 -0400 Subject: xprtrdma: Remove rpcrdma_rep::rr_len This field is no longer used outside the Receive completion handler. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index e422c0f63a69..621986156495 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1178,7 +1178,7 @@ rpcrdma_reply_handler(struct work_struct *work) dprintk("RPC: %s: incoming rep %p\n", __func__, rep); - if (rep->rr_len == RPCRDMA_BAD_LEN) + if (rep->rr_hdrbuf.head[0].iov_len == 0) goto out_badstatus; xdr_init_decode(xdr, &rep->rr_hdrbuf, -- cgit From 09e60641fc58960c9c63a9b6d1f57b194572dafc Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 10 Aug 2017 12:47:12 -0400 Subject: xprtrdma: Clean up rpcrdma_marshal_req() synopsis Clean up: The caller already has rpcrdma_xprt, so pass that directly instead. And provide a documenting comment for this critical function. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 621986156495..d916e596d427 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -651,18 +651,27 @@ rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req) req->rl_mapped_sges = 0; } -/* - * Marshal a request: the primary job of this routine is to choose - * the transfer modes. See comments below. +/** + * rpcrdma_marshal_req - Marshal and send one RPC request + * @r_xprt: controlling transport + * @rqst: RPC request to be marshaled * - * Returns zero on success, otherwise a negative errno. + * For the RPC in "rqst", this function: + * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) + * - Registers Read, Write, and Reply chunks + * - Constructs the transport header + * - Posts a Send WR to send the transport header and request + * + * Returns: + * %0 if the RPC was sent successfully, + * %-ENOTCONN if the connection was lost, + * %-EAGAIN if not enough pages are available for on-demand reply buffer, + * %-ENOBUFS if no MRs are available to register chunks, + * %-EIO if a permanent problem occurred while marshaling. */ - int -rpcrdma_marshal_req(struct rpc_rqst *rqst) +rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { - struct rpc_xprt *xprt = rqst->rq_xprt; - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; -- cgit From f4a2805e7d14c530237d5c8d51c711157c276188 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 10 Aug 2017 12:47:20 -0400 Subject: xprtrdma: Remove rpclen from rpcrdma_marshal_req Clean up: Remove a variable whose result is no longer used. Commit 655fec6987be ("xprtrdma: Use gathered Send for large inline messages") should have removed it. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index d916e596d427..f1d63ac9d7c7 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -677,7 +677,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) struct rpcrdma_msg *headerp; bool ddp_allowed; ssize_t hdrlen; - size_t rpclen; __be32 *iptr; #if defined(CONFIG_SUNRPC_BACKCHANNEL) @@ -731,16 +730,12 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) */ if (rpcrdma_args_inline(r_xprt, rqst)) { rtype = rpcrdma_noch; - rpclen = rqst->rq_snd_buf.len; } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; - rpclen = rqst->rq_snd_buf.head[0].iov_len + - rqst->rq_snd_buf.tail[0].iov_len; } else { r_xprt->rx_stats.nomsg_call_count++; headerp->rm_type = htonl(RDMA_NOMSG); rtype = rpcrdma_areadch; - rpclen = 0; } req->rl_xid = rqst->rq_xid; @@ -780,10 +775,10 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) goto out_err; hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; - dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", + dprintk("RPC: %5u %s: %s/%s: hdrlen %zd\n", rqst->rq_task->tk_pid, __func__, transfertypes[rtype], transfertypes[wtype], - hdrlen, rpclen); + hdrlen); if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen, &rqst->rq_snd_buf, rtype)) { -- cgit From 7a80f3f0ddf1d7814ac44728f56b6fbba5837703 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 10 Aug 2017 12:47:28 -0400 Subject: xprtrdma: Set up an xdr_stream in rpcrdma_marshal_req() Initialize an xdr_stream at the top of rpcrdma_marshal_req(), and use it to encode the fixed transport header fields. This xdr_stream will be used to encode the chunk lists in a subsequent patch. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index f1d63ac9d7c7..ffa99f0f6a28 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -667,17 +667,20 @@ rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req) * %-ENOTCONN if the connection was lost, * %-EAGAIN if not enough pages are available for on-demand reply buffer, * %-ENOBUFS if no MRs are available to register chunks, + * %-EMSGSIZE if the transport header is too small, * %-EIO if a permanent problem occurred while marshaling. */ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + struct xdr_stream *xdr = &req->rl_stream; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; bool ddp_allowed; ssize_t hdrlen; __be32 *iptr; + __be32 *p; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) @@ -685,11 +688,18 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) #endif headerp = rdmab_to_msg(req->rl_rdmabuf); - /* don't byte-swap XID, it's already done in request */ - headerp->rm_xid = rqst->rq_xid; - headerp->rm_vers = rpcrdma_version; - headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); - headerp->rm_type = rdma_msg; + rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); + xdr_init_encode(xdr, &req->rl_hdrbuf, + req->rl_rdmabuf->rg_base); + + /* Fixed header fields */ + iptr = ERR_PTR(-EMSGSIZE); + p = xdr_reserve_space(xdr, 4 * sizeof(*p)); + if (!p) + goto out_err; + *p++ = rqst->rq_xid; + *p++ = rpcrdma_version; + *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); /* When the ULP employs a GSS flavor that guarantees integrity * or privacy, direct data placement of individual data items @@ -729,12 +739,14 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) * by themselves are larger than the inline threshold. */ if (rpcrdma_args_inline(r_xprt, rqst)) { + *p++ = rdma_msg; rtype = rpcrdma_noch; } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { + *p++ = rdma_msg; rtype = rpcrdma_readch; } else { r_xprt->rx_stats.nomsg_call_count++; - headerp->rm_type = htonl(RDMA_NOMSG); + *p++ = rdma_nomsg; rtype = rpcrdma_areadch; } -- cgit From 39f4cd9e9982f97a52033579bf996bb74c644c08 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Thu, 10 Aug 2017 12:47:36 -0400 Subject: xprtrdma: Harden chunk list encoding against send buffer overflow While marshaling chunk lists which are variable-length XDR objects, check for XDR buffer overflow at every step. Measurements show no significant changes in CPU utilization. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 228 +++++++++++++++++++++++++---------------- 1 file changed, 142 insertions(+), 86 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index ffa99f0f6a28..f27dbfd21a10 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -273,15 +273,70 @@ out_overflow: return -EIO; } -static inline __be32 * +static inline int +encode_item_present(struct xdr_stream *xdr) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + *p = xdr_one; + return 0; +} + +static inline int +encode_item_not_present(struct xdr_stream *xdr) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + *p = xdr_zero; + return 0; +} + +static void xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) { *iptr++ = cpu_to_be32(mw->mw_handle); *iptr++ = cpu_to_be32(mw->mw_length); - return xdr_encode_hyper(iptr, mw->mw_offset); + xdr_encode_hyper(iptr, mw->mw_offset); +} + +static int +encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, 4 * sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + xdr_encode_rdma_segment(p, mw); + return 0; +} + +static int +encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw, + u32 position) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, 6 * sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + *p++ = xdr_one; /* Item present */ + *p++ = cpu_to_be32(position); + xdr_encode_rdma_segment(p, mw); + return 0; } -/* XDR-encode the Read list. Supports encoding a list of read +/* Register and XDR encode the Read list. Supports encoding a list of read * segments that belong to a single read chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): @@ -290,24 +345,21 @@ xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) * N elements, position P (same P for all chunks of same arg!): * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * - * Returns a pointer to the XDR word in the RDMA header following - * the end of the Read list, or an error pointer. + * Returns zero on success, or a negative errno if a failure occurred. + * @xdr is advanced to the next position in the stream. + * + * Only a single @pos value is currently supported. */ -static __be32 * -rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, struct rpc_rqst *rqst, - __be32 *iptr, enum rpcrdma_chunktype rtype) +static noinline int +rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype) { + struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; unsigned int pos; int n, nsegs; - if (rtype == rpcrdma_noch) { - *iptr++ = xdr_zero; /* item not present */ - return iptr; - } - pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) pos = 0; @@ -315,22 +367,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, rtype, seg); if (nsegs < 0) - return ERR_PTR(nsegs); + return nsegs; do { n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false, &mw); if (n < 0) - return ERR_PTR(n); + return n; rpcrdma_push_mw(mw, &req->rl_registered); - *iptr++ = xdr_one; /* item present */ - - /* All read segments in this chunk - * have the same "position". - */ - *iptr++ = cpu_to_be32(pos); - iptr = xdr_encode_rdma_segment(iptr, mw); + if (encode_read_segment(xdr, mw, pos) < 0) + return -EMSGSIZE; dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, pos, @@ -342,13 +389,12 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, nsegs -= n; } while (nsegs); - /* Finish Read list */ - *iptr++ = xdr_zero; /* Next item not present */ - return iptr; + return 0; } -/* XDR-encode the Write list. Supports encoding a list containing - * one array of plain segments that belong to a single write chunk. +/* Register and XDR encode the Write list. Supports encoding a list + * containing one array of plain segments that belong to a single + * write chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * @@ -356,43 +402,45 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, * N elements: * 1 - N - HLOO - HLOO - ... - HLOO - 0 * - * Returns a pointer to the XDR word in the RDMA header following - * the end of the Write list, or an error pointer. + * Returns zero on success, or a negative errno if a failure occurred. + * @xdr is advanced to the next position in the stream. + * + * Only a single Write chunk is currently supported. */ -static __be32 * +static noinline int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, __be32 *iptr, - enum rpcrdma_chunktype wtype) + struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) { + struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; int n, nsegs, nchunks; __be32 *segcount; - if (wtype != rpcrdma_writech) { - *iptr++ = xdr_zero; /* no Write list present */ - return iptr; - } - seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, rqst->rq_rcv_buf.head[0].iov_len, wtype, seg); if (nsegs < 0) - return ERR_PTR(nsegs); + return nsegs; - *iptr++ = xdr_one; /* Write list present */ - segcount = iptr++; /* save location of segment count */ + if (encode_item_present(xdr) < 0) + return -EMSGSIZE; + segcount = xdr_reserve_space(xdr, sizeof(*segcount)); + if (unlikely(!segcount)) + return -EMSGSIZE; + /* Actual value encoded below */ nchunks = 0; do { n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true, &mw); if (n < 0) - return ERR_PTR(n); + return n; rpcrdma_push_mw(mw, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, mw); + if (encode_rdma_segment(xdr, mw) < 0) + return -EMSGSIZE; dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, @@ -409,13 +457,11 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, /* Update count of segments in this Write chunk */ *segcount = cpu_to_be32(nchunks); - /* Finish Write list */ - *iptr++ = xdr_zero; /* Next item not present */ - return iptr; + return 0; } -/* XDR-encode the Reply chunk. Supports encoding an array of plain - * segments that belong to a single write (reply) chunk. +/* Register and XDR encode the Reply chunk. Supports encoding an array + * of plain segments that belong to a single write (reply) chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * @@ -423,41 +469,41 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, * N elements: * 1 - N - HLOO - HLOO - ... - HLOO * - * Returns a pointer to the XDR word in the RDMA header following - * the end of the Reply chunk, or an error pointer. + * Returns zero on success, or a negative errno if a failure occurred. + * @xdr is advanced to the next position in the stream. */ -static __be32 * -rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, struct rpc_rqst *rqst, - __be32 *iptr, enum rpcrdma_chunktype wtype) +static noinline int +rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) { + struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; int n, nsegs, nchunks; __be32 *segcount; - if (wtype != rpcrdma_replych) { - *iptr++ = xdr_zero; /* no Reply chunk present */ - return iptr; - } - seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) - return ERR_PTR(nsegs); + return nsegs; - *iptr++ = xdr_one; /* Reply chunk present */ - segcount = iptr++; /* save location of segment count */ + if (encode_item_present(xdr) < 0) + return -EMSGSIZE; + segcount = xdr_reserve_space(xdr, sizeof(*segcount)); + if (unlikely(!segcount)) + return -EMSGSIZE; + /* Actual value encoded below */ nchunks = 0; do { n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true, &mw); if (n < 0) - return ERR_PTR(n); + return n; rpcrdma_push_mw(mw, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, mw); + if (encode_rdma_segment(xdr, mw) < 0) + return -EMSGSIZE; dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, @@ -474,7 +520,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, /* Update count of segments in the Reply chunk */ *segcount = cpu_to_be32(nchunks); - return iptr; + return 0; } /* Prepare the RPC-over-RDMA header SGE. @@ -676,24 +722,21 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct xdr_stream *xdr = &req->rl_stream; enum rpcrdma_chunktype rtype, wtype; - struct rpcrdma_msg *headerp; bool ddp_allowed; - ssize_t hdrlen; - __be32 *iptr; __be32 *p; + int ret; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) return rpcrdma_bc_marshal_reply(rqst); #endif - headerp = rdmab_to_msg(req->rl_rdmabuf); rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); xdr_init_encode(xdr, &req->rl_hdrbuf, req->rl_rdmabuf->rg_base); /* Fixed header fields */ - iptr = ERR_PTR(-EMSGSIZE); + ret = -EMSGSIZE; p = xdr_reserve_space(xdr, 4 * sizeof(*p)); if (!p) goto out_err; @@ -775,37 +818,50 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) * send a Call message with a Position Zero Read chunk and a * regular Read chunk at the same time. */ - iptr = headerp->rm_body.rm_chunks; - iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); - if (IS_ERR(iptr)) + if (rtype != rpcrdma_noch) { + ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); + if (ret) + goto out_err; + } + ret = encode_item_not_present(xdr); + if (ret) goto out_err; - iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype); - if (IS_ERR(iptr)) + + if (wtype == rpcrdma_writech) { + ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); + if (ret) + goto out_err; + } + ret = encode_item_not_present(xdr); + if (ret) goto out_err; - iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype); - if (IS_ERR(iptr)) + + if (wtype != rpcrdma_replych) + ret = encode_item_not_present(xdr); + else + ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); + if (ret) goto out_err; - hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; - dprintk("RPC: %5u %s: %s/%s: hdrlen %zd\n", + dprintk("RPC: %5u %s: %s/%s: hdrlen %u rpclen\n", rqst->rq_task->tk_pid, __func__, transfertypes[rtype], transfertypes[wtype], - hdrlen); + xdr_stream_pos(xdr)); - if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen, + if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, + xdr_stream_pos(xdr), &rqst->rq_snd_buf, rtype)) { - iptr = ERR_PTR(-EIO); + ret = -EIO; goto out_err; } return 0; out_err: - if (PTR_ERR(iptr) != -ENOBUFS) { - pr_err("rpcrdma: rpcrdma_marshal_req failed, status %ld\n", - PTR_ERR(iptr)); + if (ret != -ENOBUFS) { + pr_err("rpcrdma: header marshaling failed (%d)\n", ret); r_xprt->rx_stats.failed_marshal_count++; } - return PTR_ERR(iptr); + return ret; } /** -- cgit From 28d9d56f4c7759e1f12e5b1bff60210082812edc Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Mon, 14 Aug 2017 15:38:22 -0400 Subject: xprtrdma: Remove imul instructions from rpcrdma_convert_iovs() Re-arrange the pointer arithmetic in rpcrdma_convert_iovs() to eliminate several integer multiplication instructions during Transport Header encoding. Also, array overflow does not occur outside development environments, so replace overflow checking with one spot check at the end. This reduces the number of conditional branches in the common case. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 105 +++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 57 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index f27dbfd21a10..211ac4b7979d 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -169,40 +169,41 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; } -/* Split "vec" on page boundaries into segments. FMR registers pages, - * not a byte range. Other modes coalesce these segments into a single - * MR when they can. +/* Split @vec on page boundaries into SGEs. FMR registers pages, not + * a byte range. Other modes coalesce these SGEs into a single MR + * when they can. + * + * Returns pointer to next available SGE, and bumps the total number + * of SGEs consumed. */ -static int -rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) +static struct rpcrdma_mr_seg * +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, + unsigned int *n) { - size_t page_offset; - u32 remaining; + u32 remaining, page_offset; char *base; base = vec->iov_base; page_offset = offset_in_page(base); remaining = vec->iov_len; - while (remaining && n < RPCRDMA_MAX_SEGS) { - seg[n].mr_page = NULL; - seg[n].mr_offset = base; - seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); - remaining -= seg[n].mr_len; - base += seg[n].mr_len; - ++n; + while (remaining) { + seg->mr_page = NULL; + seg->mr_offset = base; + seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); + remaining -= seg->mr_len; + base += seg->mr_len; + ++seg; + ++(*n); page_offset = 0; } - return n; + return seg; } -/* - * Chunk assembly from upper layer xdr_buf. - * - * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk - * elements. Segments are then coalesced when registered, if possible - * within the selected memreg mode. +/* Convert @xdrbuf into SGEs no larger than a page each. As they + * are registered, these SGEs are then coalesced into RDMA segments + * when the selected memreg mode supports it. * - * Returns positive number of segments converted, or a negative errno. + * Returns positive number of SGEs consumed, or a negative errno. */ static int @@ -210,47 +211,41 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, unsigned int pos, enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg) { - int len, n, p, page_base; + unsigned long page_base; + unsigned int len, n; struct page **ppages; n = 0; - if (pos == 0) { - n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n); - if (n == RPCRDMA_MAX_SEGS) - goto out_overflow; - } + if (pos == 0) + seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); len = xdrbuf->page_len; ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); page_base = offset_in_page(xdrbuf->page_base); - p = 0; - while (len && n < RPCRDMA_MAX_SEGS) { - if (!ppages[p]) { - /* alloc the pagelist for receiving buffer */ - ppages[p] = alloc_page(GFP_ATOMIC); - if (!ppages[p]) + while (len) { + if (unlikely(!*ppages)) { + /* XXX: Certain upper layer operations do + * not provide receive buffer pages. + */ + *ppages = alloc_page(GFP_ATOMIC); + if (!*ppages) return -EAGAIN; } - seg[n].mr_page = ppages[p]; - seg[n].mr_offset = (void *)(unsigned long) page_base; - seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); - if (seg[n].mr_len > PAGE_SIZE) - goto out_overflow; - len -= seg[n].mr_len; + seg->mr_page = *ppages; + seg->mr_offset = (char *)page_base; + seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); + len -= seg->mr_len; + ++ppages; + ++seg; ++n; - ++p; - page_base = 0; /* page offset only applies to first page */ + page_base = 0; } - /* Message overflows the seg array */ - if (len && n == RPCRDMA_MAX_SEGS) - goto out_overflow; - /* When encoding a Read chunk, the tail iovec contains an * XDR pad and may be omitted. */ if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup) - return n; + goto out; /* When encoding a Write chunk, some servers need to see an * extra segment for non-XDR-aligned Write chunks. The upper @@ -258,19 +253,15 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, * for this purpose. */ if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup) - return n; + goto out; - if (xdrbuf->tail[0].iov_len) { - n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n); - if (n == RPCRDMA_MAX_SEGS) - goto out_overflow; - } + if (xdrbuf->tail[0].iov_len) + seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); +out: + if (unlikely(n > RPCRDMA_MAX_SEGS)) + return -EIO; return n; - -out_overflow: - pr_err("rpcrdma: segment array overflow\n"); - return -EIO; } static inline int -- cgit From 6748b0caf82101f1f01208e48f5c4fd3ce76d562 Mon Sep 17 00:00:00 2001 From: Chuck Lever <chuck.lever@oracle.com> Date: Mon, 14 Aug 2017 15:38:30 -0400 Subject: xprtrdma: Remove imul instructions from chunk list encoders Re-arrange the pointer arithmetic in the chunk list encoders to eliminate several more integer multiplication instructions during Transport Header encoding. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> --- net/sunrpc/xprtrdma/rpc_rdma.c | 45 ++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 24 deletions(-) (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 211ac4b7979d..84584caaa7e9 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -349,7 +349,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; unsigned int pos; - int n, nsegs; + int nsegs; pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) @@ -361,10 +361,10 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return nsegs; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - false, &mw); - if (n < 0) - return n; + seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + false, &mw); + if (IS_ERR(seg)) + return PTR_ERR(seg); rpcrdma_push_mw(mw, &req->rl_registered); if (encode_read_segment(xdr, mw, pos) < 0) @@ -373,11 +373,10 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, pos, mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, n < nsegs ? "more" : "last"); + mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); r_xprt->rx_stats.read_chunk_count++; - seg += n; - nsegs -= n; + nsegs -= mw->mw_nents; } while (nsegs); return 0; @@ -405,7 +404,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - int n, nsegs, nchunks; + int nsegs, nchunks; __be32 *segcount; seg = req->rl_segments; @@ -424,10 +423,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - true, &mw); - if (n < 0) - return n; + seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (IS_ERR(seg)) + return PTR_ERR(seg); rpcrdma_push_mw(mw, &req->rl_registered); if (encode_rdma_segment(xdr, mw) < 0) @@ -436,13 +435,12 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, n < nsegs ? "more" : "last"); + mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); r_xprt->rx_stats.write_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; nchunks++; - seg += n; - nsegs -= n; + nsegs -= mw->mw_nents; } while (nsegs); /* Update count of segments in this Write chunk */ @@ -470,7 +468,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - int n, nsegs, nchunks; + int nsegs, nchunks; __be32 *segcount; seg = req->rl_segments; @@ -487,10 +485,10 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - true, &mw); - if (n < 0) - return n; + seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (IS_ERR(seg)) + return PTR_ERR(seg); rpcrdma_push_mw(mw, &req->rl_registered); if (encode_rdma_segment(xdr, mw) < 0) @@ -499,13 +497,12 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, n < nsegs ? "more" : "last"); + mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); r_xprt->rx_stats.reply_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; nchunks++; - seg += n; - nsegs -= n; + nsegs -= mw->mw_nents; } while (nsegs); /* Update count of segments in the Reply chunk */ -- cgit