diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_sendto.c')
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 1241 |
1 files changed, 823 insertions, 418 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 7c3a211e0e9a..914cd263c2f1 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -1,5 +1,6 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* - * Copyright (c) 2016 Oracle. All rights reserved. + * Copyright (c) 2016-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * @@ -74,11 +75,11 @@ * DMA-unmap the pages under I/O for that Write segment. The Write * completion handler does not release any pages. * - * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. + * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. * The ownership of all of the Reply's pages are transferred into that * ctxt, the Send WR is posted, and sendto returns. * - * The svc_rdma_op_ctxt is presented when the Send WR completes. The + * The svc_rdma_send_ctxt is presented when the Send WR completes. The * Send completion handler finally releases the Reply's pages. * * This mechanism also assumes that completions on the transport's Send @@ -98,512 +99,889 @@ * where two different Write segments send portions of the same page. */ -#include <linux/sunrpc/debug.h> -#include <linux/sunrpc/rpc_rdma.h> #include <linux/spinlock.h> -#include <asm/unaligned.h> +#include <linux/unaligned.h> + #include <rdma/ib_verbs.h> #include <rdma/rdma_cm.h> + +#include <linux/sunrpc/debug.h> #include <linux/sunrpc/svc_rdma.h> -#define RPCDBG_FACILITY RPCDBG_SVCXPRT +#include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> + +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); -static u32 xdr_padsize(u32 len) +static struct svc_rdma_send_ctxt * +svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) { - return (len & 3) ? (4 - (len & 3)) : 0; + int node = ibdev_to_node(rdma->sc_cm_id->device); + struct svc_rdma_send_ctxt *ctxt; + unsigned long pages; + dma_addr_t addr; + void *buffer; + int i; + + ctxt = kzalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges), + GFP_KERNEL, node); + if (!ctxt) + goto fail0; + pages = svc_serv_maxpages(rdma->sc_xprt.xpt_server); + ctxt->sc_pages = kcalloc_node(pages, sizeof(struct page *), + GFP_KERNEL, node); + if (!ctxt->sc_pages) + goto fail1; + ctxt->sc_maxpages = pages; + buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node); + if (!buffer) + goto fail2; + addr = ib_dma_map_single(rdma->sc_pd->device, buffer, + rdma->sc_max_req_size, DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) + goto fail3; + + svc_rdma_send_cid_init(rdma, &ctxt->sc_cid); + + ctxt->sc_rdma = rdma; + ctxt->sc_send_wr.next = NULL; + ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; + ctxt->sc_send_wr.sg_list = ctxt->sc_sges; + ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; + ctxt->sc_cqe.done = svc_rdma_wc_send; + ctxt->sc_xprt_buf = buffer; + xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf, + rdma->sc_max_req_size); + ctxt->sc_sges[0].addr = addr; + + for (i = 0; i < rdma->sc_max_send_sges; i++) + ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; + return ctxt; + +fail3: + kfree(buffer); +fail2: + kfree(ctxt->sc_pages); +fail1: + kfree(ctxt); +fail0: + return NULL; } -/* Returns length of transport header, in bytes. +/** + * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt + * @rdma: svcxprt_rdma being torn down + * */ -static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) +void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) { - unsigned int nsegs; - __be32 *p; + struct svc_rdma_send_ctxt *ctxt; + struct llist_node *node; + + while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) { + ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node); + ib_dma_unmap_single(rdma->sc_pd->device, + ctxt->sc_sges[0].addr, + rdma->sc_max_req_size, + DMA_TO_DEVICE); + kfree(ctxt->sc_xprt_buf); + kfree(ctxt->sc_pages); + kfree(ctxt); + } +} - p = rdma_resp; +/** + * svc_rdma_send_ctxt_get - Get a free send_ctxt + * @rdma: controlling svcxprt_rdma + * + * Returns a ready-to-use send_ctxt, or NULL if none are + * available and a fresh one cannot be allocated. + */ +struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + struct llist_node *node; + + spin_lock(&rdma->sc_send_lock); + node = llist_del_first(&rdma->sc_send_ctxts); + spin_unlock(&rdma->sc_send_lock); + if (!node) + goto out_empty; + + ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node); + +out: + rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0); + xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf, + ctxt->sc_xprt_buf, NULL); + + svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc); + ctxt->sc_send_wr.num_sge = 0; + ctxt->sc_cur_sge_no = 0; + ctxt->sc_page_count = 0; + ctxt->sc_wr_chain = &ctxt->sc_send_wr; + ctxt->sc_sqecount = 1; + + return ctxt; + +out_empty: + ctxt = svc_rdma_send_ctxt_alloc(rdma); + if (!ctxt) + return NULL; + goto out; +} - /* RPC-over-RDMA V1 replies never have a Read list. */ - p += rpcrdma_fixed_maxsz + 1; +static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + struct ib_device *device = rdma->sc_cm_id->device; + unsigned int i; - /* Skip Write list. */ - while (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; - } + svc_rdma_reply_chunk_release(rdma, ctxt); - /* Skip Reply chunk. */ - if (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; + if (ctxt->sc_page_count) + release_pages(ctxt->sc_pages, ctxt->sc_page_count); + + /* The first SGE contains the transport header, which + * remains mapped until @ctxt is destroyed. + */ + for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) { + trace_svcrdma_dma_unmap_page(&ctxt->sc_cid, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length); + ib_dma_unmap_page(device, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length, + DMA_TO_DEVICE); } - return (unsigned long)p - (unsigned long)rdma_resp; + llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts); +} + +static void svc_rdma_send_ctxt_put_async(struct work_struct *work) +{ + struct svc_rdma_send_ctxt *ctxt; + + ctxt = container_of(work, struct svc_rdma_send_ctxt, sc_work); + svc_rdma_send_ctxt_release(ctxt->sc_rdma, ctxt); } -/* One Write chunk is copied from Call transport header to Reply - * transport header. Each segment's length field is updated to - * reflect number of bytes consumed in the segment. +/** + * svc_rdma_send_ctxt_put - Return send_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list * - * Returns number of segments in this chunk. + * Pages left in sc_pages are DMA unmapped and released. */ -static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, - unsigned int remaining) +void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) { - unsigned int i, nsegs; - u32 seg_len; - - /* Write list discriminator */ - *dst++ = *src++; - - /* number of segments in this chunk */ - nsegs = be32_to_cpup(src); - *dst++ = *src++; - - for (i = nsegs; i; i--) { - /* segment's RDMA handle */ - *dst++ = *src++; - - /* bytes returned in this segment */ - seg_len = be32_to_cpu(*src); - if (remaining >= seg_len) { - /* entire segment was consumed */ - *dst = *src; - remaining -= seg_len; - } else { - /* segment only partly filled */ - *dst = cpu_to_be32(remaining); - remaining = 0; - } - dst++; src++; - - /* segment's RDMA offset */ - *dst++ = *src++; - *dst++ = *src++; - } + INIT_WORK(&ctxt->sc_work, svc_rdma_send_ctxt_put_async); + queue_work(svcrdma_wq, &ctxt->sc_work); +} - return nsegs; +/** + * svc_rdma_wake_send_waiters - manage Send Queue accounting + * @rdma: controlling transport + * @avail: Number of additional SQEs that are now available + * + */ +void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail) +{ + atomic_add(avail, &rdma->sc_sq_avail); + smp_mb__after_atomic(); + if (unlikely(waitqueue_active(&rdma->sc_send_wait))) + wake_up(&rdma->sc_send_wait); } -/* The client provided a Write list in the Call message. Fill in - * the segments in the first Write chunk in the Reply's transport - * header with the number of bytes consumed in each segment. - * Remaining chunks are returned unused. +/** + * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: Completion Queue context + * @wc: Work Completion object * - * Assumptions: - * - Client has provided only one Write chunk + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that + * the Send completion handler could be running. */ -static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, - unsigned int consumed) +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) { - unsigned int nsegs; - __be32 *p, *q; - - /* RPC-over-RDMA V1 replies never have a Read list. */ - p = rdma_resp + rpcrdma_fixed_maxsz + 1; - - q = wr_ch; - while (*q != xdr_zero) { - nsegs = xdr_encode_write_chunk(p, q, consumed); - q += 2 + nsegs * rpcrdma_segment_maxsz; - p += 2 + nsegs * rpcrdma_segment_maxsz; - consumed = 0; - } + struct svcxprt_rdma *rdma = cq->cq_context; + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_send_ctxt *ctxt = + container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); + + svc_rdma_wake_send_waiters(rdma, ctxt->sc_sqecount); + + if (unlikely(wc->status != IB_WC_SUCCESS)) + goto flushed; - /* Terminate Write list */ - *p++ = xdr_zero; + trace_svcrdma_wc_send(&ctxt->sc_cid); + svc_rdma_send_ctxt_put(rdma, ctxt); + return; - /* Reply chunk discriminator; may be replaced later */ - *p = xdr_zero; +flushed: + if (wc->status != IB_WC_WR_FLUSH_ERR) + trace_svcrdma_wc_send_err(wc, &ctxt->sc_cid); + else + trace_svcrdma_wc_send_flush(wc, &ctxt->sc_cid); + svc_rdma_send_ctxt_put(rdma, ctxt); + svc_xprt_deferred_close(&rdma->sc_xprt); } -/* The client provided a Reply chunk in the Call message. Fill in - * the segments in the Reply chunk in the Reply message with the - * number of bytes consumed in each segment. +/** + * svc_rdma_post_send - Post a WR chain to the Send Queue + * @rdma: transport context + * @ctxt: WR chain to post * - * Assumptions: - * - Reply can always fit in the provided Reply chunk + * Copy fields in @ctxt to stack variables in order to guarantee + * that these values remain available after the ib_post_send() call. + * In some error flow cases, svc_rdma_wc_send() releases @ctxt. + * + * Note there is potential for starvation when the Send Queue is + * full because there is no order to when waiting threads are + * awoken. The transport is typically provisioned with a deep + * enough Send Queue that SQ exhaustion should be a rare event. + * + * Return values: + * %0: @ctxt's WR chain was posted successfully + * %-ENOTCONN: The connection was lost */ -static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, - unsigned int consumed) +int svc_rdma_post_send(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) { - __be32 *p; - - /* Find the Reply chunk in the Reply's xprt header. - * RPC-over-RDMA V1 replies never have a Read list. - */ - p = rdma_resp + rpcrdma_fixed_maxsz + 1; + struct ib_send_wr *first_wr = ctxt->sc_wr_chain; + struct ib_send_wr *send_wr = &ctxt->sc_send_wr; + const struct ib_send_wr *bad_wr = first_wr; + struct rpc_rdma_cid cid = ctxt->sc_cid; + int ret, sqecount = ctxt->sc_sqecount; + + might_sleep(); + + /* Sync the transport header buffer */ + ib_dma_sync_single_for_device(rdma->sc_pd->device, + send_wr->sg_list[0].addr, + send_wr->sg_list[0].length, + DMA_TO_DEVICE); + + /* If the SQ is full, wait until an SQ entry is available */ + while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) { + if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) { + svc_rdma_wake_send_waiters(rdma, sqecount); + + /* When the transport is torn down, assume + * ib_drain_sq() will trigger enough Send + * completions to wake us. The XPT_CLOSE test + * above should then cause the while loop to + * exit. + */ + percpu_counter_inc(&svcrdma_stat_sq_starve); + trace_svcrdma_sq_full(rdma, &cid); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > 0); + trace_svcrdma_sq_retry(rdma, &cid); + continue; + } - /* Skip past Write list */ - while (*p++ != xdr_zero) - p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; + trace_svcrdma_post_send(ctxt); + ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); + if (ret) { + trace_svcrdma_sq_post_err(rdma, &cid, ret); + svc_xprt_deferred_close(&rdma->sc_xprt); + + /* If even one WR was posted, there will be a + * Send completion that bumps sc_sq_avail. + */ + if (bad_wr == first_wr) { + svc_rdma_wake_send_waiters(rdma, sqecount); + break; + } + } + return 0; + } + return -ENOTCONN; +} - xdr_encode_write_chunk(p, rp_ch, consumed); +/** + * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list + * @sctxt: Send context for the RPC Reply + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Reply Read list + * %-EMSGSIZE on XDR buffer overflow + */ +static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt) +{ + /* RPC-over-RDMA version 1 replies never have a Read list. */ + return xdr_stream_encode_item_absent(&sctxt->sc_stream); } -/* Parse the RPC Call's transport header. +/** + * svc_rdma_encode_write_segment - Encode one Write segment + * @sctxt: Send context for the RPC Reply + * @chunk: Write chunk to push + * @remaining: remaining bytes of the payload left in the Write chunk + * @segno: which segment in the chunk + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Write segment, and updates @remaining + * %-EMSGSIZE on XDR buffer overflow */ -static void svc_rdma_get_write_arrays(__be32 *rdma_argp, - __be32 **write, __be32 **reply) +static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_chunk *chunk, + u32 *remaining, unsigned int segno) { + const struct svc_rdma_segment *segment = &chunk->ch_segments[segno]; + const size_t len = rpcrdma_segment_maxsz * sizeof(__be32); + u32 length; __be32 *p; - p = rdma_argp + rpcrdma_fixed_maxsz; + p = xdr_reserve_space(&sctxt->sc_stream, len); + if (!p) + return -EMSGSIZE; + + length = min_t(u32, *remaining, segment->rs_length); + *remaining -= length; + xdr_encode_rdma_segment(p, segment->rs_handle, length, + segment->rs_offset); + trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length, + segment->rs_offset); + return len; +} - /* Read list */ - while (*p++ != xdr_zero) - p += 5; +/** + * svc_rdma_encode_write_chunk - Encode one Write chunk + * @sctxt: Send context for the RPC Reply + * @chunk: Write chunk to push + * + * Copy a Write chunk from the Call transport header to the + * Reply transport header. Update each segment's length field + * to reflect the number of bytes written in that segment. + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Write chunk + * %-EMSGSIZE on XDR buffer overflow + */ +static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_chunk *chunk) +{ + u32 remaining = chunk->ch_payload_length; + unsigned int segno; + ssize_t len, ret; - /* Write list */ - if (*p != xdr_zero) { - *write = p; - while (*p++ != xdr_zero) - p += 1 + be32_to_cpu(*p) * 4; - } else { - *write = NULL; - p++; + len = 0; + ret = xdr_stream_encode_item_present(&sctxt->sc_stream); + if (ret < 0) + return ret; + len += ret; + + ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount); + if (ret < 0) + return ret; + len += ret; + + for (segno = 0; segno < chunk->ch_segcount; segno++) { + ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno); + if (ret < 0) + return ret; + len += ret; } - /* Reply chunk */ - if (*p != xdr_zero) - *reply = p; - else - *reply = NULL; + return len; } -/* RPC-over-RDMA Version One private extension: Remote Invalidation. - * Responder's choice: requester signals it can handle Send With - * Invalidate, and responder chooses one rkey to invalidate. - * - * Find a candidate rkey to invalidate when sending a reply. Picks the - * first R_key it finds in the chunk lists. +/** + * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list + * @rctxt: Reply context with information about the RPC Call + * @sctxt: Send context for the RPC Reply * - * Returns zero if RPC's chunk lists are empty. + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Reply's Write list + * %-EMSGSIZE on XDR buffer overflow */ -static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, - __be32 *wr_lst, __be32 *rp_ch) +static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt, + struct svc_rdma_send_ctxt *sctxt) { - __be32 *p; + struct svc_rdma_chunk *chunk; + ssize_t len, ret; - p = rdma_argp + rpcrdma_fixed_maxsz; - if (*p != xdr_zero) - p += 2; - else if (wr_lst && be32_to_cpup(wr_lst + 1)) - p = wr_lst + 2; - else if (rp_ch && be32_to_cpup(rp_ch + 1)) - p = rp_ch + 2; - else - return 0; - return be32_to_cpup(p); + len = 0; + pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) { + ret = svc_rdma_encode_write_chunk(sctxt, chunk); + if (ret < 0) + return ret; + len += ret; + } + + /* Terminate the Write list */ + ret = xdr_stream_encode_item_absent(&sctxt->sc_stream); + if (ret < 0) + return ret; + + return len + ret; } -/* ib_dma_map_page() is used here because svc_rdma_dma_unmap() - * is used during completion to DMA-unmap this memory, and - * it uses ib_dma_unmap_page() exclusively. +/** + * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk + * @rctxt: Reply context with information about the RPC Call + * @sctxt: Send context for the RPC Reply + * @length: size in bytes of the payload in the Reply chunk + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Reply's Reply chunk + * %-EMSGSIZE on XDR buffer overflow + * %-E2BIG if the RPC message is larger than the Reply chunk */ -static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - unsigned int sge_no, - unsigned char *base, - unsigned int len) +static ssize_t +svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt, + struct svc_rdma_send_ctxt *sctxt, + unsigned int length) { - unsigned long offset = (unsigned long)base & ~PAGE_MASK; - struct ib_device *dev = rdma->sc_cm_id->device; - dma_addr_t dma_addr; + struct svc_rdma_chunk *chunk; - dma_addr = ib_dma_map_page(dev, virt_to_page(base), - offset, len, DMA_TO_DEVICE); - if (ib_dma_mapping_error(dev, dma_addr)) - goto out_maperr; + if (pcl_is_empty(&rctxt->rc_reply_pcl)) + return xdr_stream_encode_item_absent(&sctxt->sc_stream); - ctxt->sge[sge_no].addr = dma_addr; - ctxt->sge[sge_no].length = len; - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - svc_rdma_count_mappings(rdma, ctxt); - return 0; + chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); + if (length > chunk->ch_length) + return -E2BIG; -out_maperr: - pr_err("svcrdma: failed to map buffer\n"); - return -EIO; + chunk->ch_payload_length = length; + return svc_rdma_encode_write_chunk(sctxt, chunk); } -static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - unsigned int sge_no, - struct page *page, - unsigned int offset, - unsigned int len) +struct svc_rdma_map_data { + struct svcxprt_rdma *md_rdma; + struct svc_rdma_send_ctxt *md_ctxt; +}; + +/** + * svc_rdma_page_dma_map - DMA map one page + * @data: pointer to arguments + * @page: struct page to DMA map + * @offset: offset into the page + * @len: number of bytes to map + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if the page cannot be DMA mapped + */ +static int svc_rdma_page_dma_map(void *data, struct page *page, + unsigned long offset, unsigned int len) { + struct svc_rdma_map_data *args = data; + struct svcxprt_rdma *rdma = args->md_rdma; + struct svc_rdma_send_ctxt *ctxt = args->md_ctxt; struct ib_device *dev = rdma->sc_cm_id->device; dma_addr_t dma_addr; + ++ctxt->sc_cur_sge_no; + dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; - ctxt->sge[sge_no].addr = dma_addr; - ctxt->sge[sge_no].length = len; - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - svc_rdma_count_mappings(rdma, ctxt); + trace_svcrdma_dma_map_page(&ctxt->sc_cid, dma_addr, len); + ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; + ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; + ctxt->sc_send_wr.num_sge++; return 0; out_maperr: - pr_err("svcrdma: failed to map page\n"); + trace_svcrdma_dma_map_err(&ctxt->sc_cid, dma_addr, len); return -EIO; } /** - * svc_rdma_map_reply_hdr - DMA map the transport header buffer - * @rdma: controlling transport - * @ctxt: op_ctxt for the Send WR - * @rdma_resp: buffer containing transport header - * @len: length of transport header + * svc_rdma_iov_dma_map - DMA map an iovec + * @data: pointer to arguments + * @iov: kvec to DMA map + * + * ib_dma_map_page() is used here because svc_rdma_dma_unmap() + * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. * * Returns: - * %0 if the header is DMA mapped, - * %-EIO if DMA mapping failed. + * %0 if DMA mapping was successful + * %-EIO if the iovec cannot be DMA mapped */ -int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - __be32 *rdma_resp, - unsigned int len) +static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov) { - ctxt->direction = DMA_TO_DEVICE; - ctxt->pages[0] = virt_to_page(rdma_resp); - ctxt->count = 1; - return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); + if (!iov->iov_len) + return 0; + return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base), + offset_in_page(iov->iov_base), + iov->iov_len); } -/* Load the xdr_buf into the ctxt's sge array, and DMA map each - * element as it is added. +/** + * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf + * @xdr: xdr_buf containing portion of an RPC message to transmit + * @data: pointer to arguments + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if DMA mapping failed * - * Returns the number of sge elements loaded on success, or - * a negative errno on failure. + * On failure, any DMA mappings that have been already done must be + * unmapped by the caller. */ -static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - struct xdr_buf *xdr, __be32 *wr_lst) +static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data) { - unsigned int len, sge_no, remaining, page_off; + unsigned int len, remaining; + unsigned long pageoff; struct page **ppages; - unsigned char *base; - u32 xdr_pad; int ret; - sge_no = 1; - - ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, - xdr->head[0].iov_base, - xdr->head[0].iov_len); + ret = svc_rdma_iov_dma_map(data, &xdr->head[0]); if (ret < 0) return ret; - /* If a Write chunk is present, the xdr_buf's page list - * is not included inline. However the Upper Layer may - * have added XDR padding in the tail buffer, and that - * should not be included inline. - */ - if (wr_lst) { - base = xdr->tail[0].iov_base; - len = xdr->tail[0].iov_len; - xdr_pad = xdr_padsize(xdr->page_len); - - if (len && xdr_pad) { - base += xdr_pad; - len -= xdr_pad; - } - - goto tail; - } - ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - page_off = xdr->page_base & ~PAGE_MASK; + pageoff = offset_in_page(xdr->page_base); remaining = xdr->page_len; while (remaining) { - len = min_t(u32, PAGE_SIZE - page_off, remaining); + len = min_t(u32, PAGE_SIZE - pageoff, remaining); - ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++, - *ppages++, page_off, len); + ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len); if (ret < 0) return ret; remaining -= len; - page_off = 0; + pageoff = 0; } - base = xdr->tail[0].iov_base; - len = xdr->tail[0].iov_len; -tail: - if (len) { - ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len); - if (ret < 0) - return ret; - } + ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]); + if (ret < 0) + return ret; - return sge_no - 1; + return xdr->len; } -/* The svc_rqst and all resources it owns are released as soon as - * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt - * so they are released by the Send completion handler. +struct svc_rdma_pullup_data { + u8 *pd_dest; + unsigned int pd_length; + unsigned int pd_num_sges; +}; + +/** + * svc_rdma_xb_count_sges - Count how many SGEs will be needed + * @xdr: xdr_buf containing portion of an RPC message to transmit + * @data: pointer to arguments + * + * Returns: + * Number of SGEs needed to Send the contents of @xdr inline */ -static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *ctxt) +static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr, + void *data) { - int i, pages = rqstp->rq_next_page - rqstp->rq_respages; + struct svc_rdma_pullup_data *args = data; + unsigned int remaining; + unsigned long offset; - ctxt->count += pages; - for (i = 0; i < pages; i++) { - ctxt->pages[i + 1] = rqstp->rq_respages[i]; - rqstp->rq_respages[i] = NULL; + if (xdr->head[0].iov_len) + ++args->pd_num_sges; + + offset = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + ++args->pd_num_sges; + remaining -= min_t(u32, PAGE_SIZE - offset, remaining); + offset = 0; } - rqstp->rq_next_page = rqstp->rq_respages + 1; + + if (xdr->tail[0].iov_len) + ++args->pd_num_sges; + + args->pd_length += xdr->len; + return 0; } /** - * svc_rdma_post_send_wr - Set up and post one Send Work Request + * svc_rdma_pull_up_needed - Determine whether to use pull-up * @rdma: controlling transport - * @ctxt: op_ctxt for transmitting the Send WR - * @num_sge: number of SGEs to send - * @inv_rkey: R_key argument to Send With Invalidate, or zero + * @sctxt: send_ctxt for the Send WR + * @write_pcl: Write chunk list provided by client + * @xdr: xdr_buf containing RPC message to transmit * * Returns: - * %0 if the Send* was posted successfully, - * %-ENOTCONN if the connection was lost or dropped, - * %-EINVAL if there was a problem with the Send we built, - * %-ENOMEM if ib_post_send failed. + * %true if pull-up must be used + * %false otherwise */ -int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, int num_sge, - u32 inv_rkey) +static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma, + const struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_pcl *write_pcl, + const struct xdr_buf *xdr) { - struct ib_send_wr *send_wr = &ctxt->send_wr; + /* Resources needed for the transport header */ + struct svc_rdma_pullup_data args = { + .pd_length = sctxt->sc_hdrbuf.len, + .pd_num_sges = 1, + }; + int ret; - dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge); + ret = pcl_process_nonpayloads(write_pcl, xdr, + svc_rdma_xb_count_sges, &args); + if (ret < 0) + return false; - send_wr->next = NULL; - ctxt->cqe.done = svc_rdma_wc_send; - send_wr->wr_cqe = &ctxt->cqe; - send_wr->sg_list = ctxt->sge; - send_wr->num_sge = num_sge; - send_wr->send_flags = IB_SEND_SIGNALED; - if (inv_rkey) { - send_wr->opcode = IB_WR_SEND_WITH_INV; - send_wr->ex.invalidate_rkey = inv_rkey; - } else { - send_wr->opcode = IB_WR_SEND; + if (args.pd_length < RPCRDMA_PULLUP_THRESH) + return true; + return args.pd_num_sges >= rdma->sc_max_send_sges; +} + +/** + * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer + * @xdr: xdr_buf containing portion of an RPC message to copy + * @data: pointer to arguments + * + * Returns: + * Always zero. + */ +static int svc_rdma_xb_linearize(const struct xdr_buf *xdr, + void *data) +{ + struct svc_rdma_pullup_data *args = data; + unsigned int len, remaining; + unsigned long pageoff; + struct page **ppages; + + if (xdr->head[0].iov_len) { + memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len); + args->pd_dest += xdr->head[0].iov_len; + } + + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + pageoff = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + len = min_t(u32, PAGE_SIZE - pageoff, remaining); + memcpy(args->pd_dest, page_address(*ppages) + pageoff, len); + remaining -= len; + args->pd_dest += len; + pageoff = 0; + ppages++; + } + + if (xdr->tail[0].iov_len) { + memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len); + args->pd_dest += xdr->tail[0].iov_len; } - return svc_rdma_send(rdma, send_wr); + args->pd_length += xdr->len; + return 0; } -/* Prepare the portion of the RPC Reply that will be transmitted - * via RDMA Send. The RPC-over-RDMA transport header is prepared - * in sge[0], and the RPC xdr_buf is prepared in following sges. - * - * Depending on whether a Write list or Reply chunk is present, - * the server may send all, a portion of, or none of the xdr_buf. - * In the latter case, only the transport header (sge[0]) is - * transmitted. +/** + * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer + * @rdma: controlling transport + * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared + * @write_pcl: Write chunk list provided by client + * @xdr: prepared xdr_buf containing RPC message * - * RDMA Send is the last step of transmitting an RPC reply. Pages - * involved in the earlier RDMA Writes are here transferred out - * of the rqstp and into the ctxt's page array. These pages are - * DMA unmapped by each Write completion, but the subsequent Send - * completion finally releases these pages. + * The device is not capable of sending the reply directly. + * Assemble the elements of @xdr into the transport header buffer. * * Assumptions: - * - The Reply's transport header will never be larger than a page. + * pull_up_needed has determined that @xdr will fit in the buffer. + * + * Returns: + * %0 if pull-up was successful + * %-EMSGSIZE if a buffer manipulation problem occurred */ -static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, - __be32 *rdma_argp, __be32 *rdma_resp, - struct svc_rqst *rqstp, - __be32 *wr_lst, __be32 *rp_ch) +static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_pcl *write_pcl, + const struct xdr_buf *xdr) { - struct svc_rdma_op_ctxt *ctxt; - u32 inv_rkey; + struct svc_rdma_pullup_data args = { + .pd_dest = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len, + }; int ret; - dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n", - (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"), - rqstp->rq_res.head[0].iov_len, - rqstp->rq_res.page_len, - rqstp->rq_res.tail[0].iov_len); + ret = pcl_process_nonpayloads(write_pcl, xdr, + svc_rdma_xb_linearize, &args); + if (ret < 0) + return ret; - ctxt = svc_rdma_get_context(rdma); + sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length; + trace_svcrdma_send_pullup(sctxt, args.pd_length); + return 0; +} - ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, - svc_rdma_reply_hdr_len(rdma_resp)); - if (ret < 0) - goto err; +/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message + * @rdma: controlling transport + * @sctxt: send_ctxt for the Send WR + * @write_pcl: Write chunk list provided by client + * @reply_pcl: Reply chunk provided by client + * @xdr: prepared xdr_buf containing RPC message + * + * Returns: + * %0 if DMA mapping was successful. + * %-EMSGSIZE if a buffer manipulation problem occurred + * %-EIO if DMA mapping failed + * + * The Send WR's num_sge field is set in all cases. + */ +int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_pcl *write_pcl, + const struct svc_rdma_pcl *reply_pcl, + const struct xdr_buf *xdr) +{ + struct svc_rdma_map_data args = { + .md_rdma = rdma, + .md_ctxt = sctxt, + }; - if (!rp_ch) { - ret = svc_rdma_map_reply_msg(rdma, ctxt, - &rqstp->rq_res, wr_lst); - if (ret < 0) - goto err; - } + /* Set up the (persistently-mapped) transport header SGE. */ + sctxt->sc_send_wr.num_sge = 1; + sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len; + + /* If there is a Reply chunk, nothing follows the transport + * header, so there is nothing to map. + */ + if (!pcl_is_empty(reply_pcl)) + return 0; - svc_rdma_save_io_pages(rqstp, ctxt); + /* For pull-up, svc_rdma_send() will sync the transport header. + * No additional DMA mapping is necessary. + */ + if (svc_rdma_pull_up_needed(rdma, sctxt, write_pcl, xdr)) + return svc_rdma_pull_up_reply_msg(rdma, sctxt, write_pcl, xdr); - inv_rkey = 0; - if (rdma->sc_snd_w_inv) - inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch); - ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey); - if (ret) - goto err; + return pcl_process_nonpayloads(write_pcl, xdr, + svc_rdma_xb_dma_map, &args); +} - return 0; +/* The svc_rqst and all resources it owns are released as soon as + * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt + * so they are released by the Send completion handler. + */ +static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, + struct svc_rdma_send_ctxt *ctxt) +{ + int i, pages = rqstp->rq_next_page - rqstp->rq_respages; -err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - return ret; + ctxt->sc_page_count += pages; + for (i = 0; i < pages; i++) { + ctxt->sc_pages[i] = rqstp->rq_respages[i]; + rqstp->rq_respages[i] = NULL; + } + + /* Prevent svc_xprt_release from releasing pages in rq_pages */ + rqstp->rq_next_page = rqstp->rq_respages; } -/* Given the client-provided Write and Reply chunks, the server was not - * able to form a complete reply. Return an RDMA_ERROR message so the - * client can retire this RPC transaction. As above, the Send completion - * routine releases payload pages that were part of a previous RDMA Write. +/* Prepare the portion of the RPC Reply that will be transmitted + * via RDMA Send. The RPC-over-RDMA transport header is prepared + * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. * - * Remote Invalidation is skipped for simplicity. + * Depending on whether a Write list or Reply chunk is present, + * the server may Send all, a portion of, or none of the xdr_buf. + * In the latter case, only the transport header (sc_sges[0]) is + * transmitted. + * + * Assumptions: + * - The Reply's transport header will never be larger than a page. */ -static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, - __be32 *rdma_resp, struct svc_rqst *rqstp) +static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_recv_ctxt *rctxt, + struct svc_rqst *rqstp) { - struct svc_rdma_op_ctxt *ctxt; - __be32 *p; + struct ib_send_wr *send_wr = &sctxt->sc_send_wr; int ret; - ctxt = svc_rdma_get_context(rdma); + ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl, + &rctxt->rc_reply_pcl, &rqstp->rq_res); + if (ret < 0) + return ret; - /* Replace the original transport header with an - * RDMA_ERROR response. XID etc are preserved. + /* Transfer pages involved in RDMA Writes to the sctxt's + * page array. Completion handling releases these pages. */ - p = rdma_resp + 3; - *p++ = rdma_error; - *p = err_chunk; + svc_rdma_save_io_pages(rqstp, sctxt); - ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20); - if (ret < 0) - goto err; + if (rctxt->rc_inv_rkey) { + send_wr->opcode = IB_WR_SEND_WITH_INV; + send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey; + } else { + send_wr->opcode = IB_WR_SEND; + } + + return svc_rdma_post_send(rdma, sctxt); +} - svc_rdma_save_io_pages(rqstp, ctxt); +/** + * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response + * @rdma: controlling transport context + * @sctxt: Send context for the response + * @rctxt: Receive context for incoming bad message + * @status: negative errno indicating error that occurred + * + * Given the client-provided Read, Write, and Reply chunks, the + * server was not able to parse the Call or form a complete Reply. + * Return an RDMA_ERROR message so the client can retire the RPC + * transaction. + * + * The caller does not have to release @sctxt. It is released by + * Send completion, or by this function on error. + */ +void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + struct svc_rdma_recv_ctxt *rctxt, + int status) +{ + __be32 *rdma_argp = rctxt->rc_recv_buf; + __be32 *p; - ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0); - if (ret) - goto err; + rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0); + xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf, + sctxt->sc_xprt_buf, NULL); - return 0; + p = xdr_reserve_space(&sctxt->sc_stream, + rpcrdma_fixed_maxsz * sizeof(*p)); + if (!p) + goto put_ctxt; -err: - pr_err("svcrdma: failed to post Send WR (%d)\n", ret); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - return ret; -} + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); + *p++ = rdma->sc_fc_credits; + *p = rdma_error; + + switch (status) { + case -EPROTONOSUPPORT: + p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p)); + if (!p) + goto put_ctxt; + + *p++ = err_vers; + *p++ = rpcrdma_version; + *p = rpcrdma_version; + trace_svcrdma_err_vers(*rdma_argp); + break; + default: + p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p)); + if (!p) + goto put_ctxt; + + *p = err_chunk; + trace_svcrdma_err_chunk(*rdma_argp); + } -void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) -{ + /* Remote Invalidation is skipped for simplicity. */ + sctxt->sc_send_wr.num_sge = 1; + sctxt->sc_send_wr.opcode = IB_WR_SEND; + sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len; + if (svc_rdma_post_send(rdma, sctxt)) + goto put_ctxt; + return; + +put_ctxt: + svc_rdma_send_ctxt_put(rdma, sctxt); } /** @@ -623,83 +1001,110 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) struct svc_xprt *xprt = rqstp->rq_xprt; struct svcxprt_rdma *rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); - __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; - struct xdr_buf *xdr = &rqstp->rq_res; - struct page *res_page; + struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; + __be32 *rdma_argp = rctxt->rc_recv_buf; + struct svc_rdma_send_ctxt *sctxt; + unsigned int rc_size; + __be32 *p; int ret; - /* Find the call's chunk lists to decide how to send the reply. - * Receive places the Call's xprt header at the start of page 0. - */ - rdma_argp = page_address(rqstp->rq_pages[0]); - svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); - - dprintk("svcrdma: preparing response for XID 0x%08x\n", - be32_to_cpup(rdma_argp)); + ret = -ENOTCONN; + if (svc_xprt_is_dead(xprt)) + goto drop_connection; - /* Create the RDMA response header. xprt->xpt_mutex, - * acquired in svc_send(), serializes RPC replies. The - * code path below that inserts the credit grant value - * into each transport header runs only inside this - * critical section. - */ ret = -ENOMEM; - res_page = alloc_page(GFP_KERNEL); - if (!res_page) - goto err0; - rdma_resp = page_address(res_page); + sctxt = svc_rdma_send_ctxt_get(rdma); + if (!sctxt) + goto drop_connection; - p = rdma_resp; - *p++ = *rdma_argp; - *p++ = *(rdma_argp + 1); - *p++ = rdma->sc_fc_credits; - *p++ = rp_ch ? rdma_nomsg : rdma_msg; + ret = -EMSGSIZE; + p = xdr_reserve_space(&sctxt->sc_stream, + rpcrdma_fixed_maxsz * sizeof(*p)); + if (!p) + goto put_ctxt; - /* Start with empty chunks */ - *p++ = xdr_zero; - *p++ = xdr_zero; - *p = xdr_zero; + ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res); + if (ret < 0) + goto put_ctxt; - if (wr_lst) { - /* XXX: Presume the client sent only one Write chunk */ - ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr); - if (ret < 0) - goto err2; - svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); - } - if (rp_ch) { - ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); + rc_size = 0; + if (!pcl_is_empty(&rctxt->rc_reply_pcl)) { + ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl, + &rctxt->rc_reply_pcl, sctxt, + &rqstp->rq_res); if (ret < 0) - goto err2; - svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); + goto reply_chunk; + rc_size = ret; } - ret = svc_rdma_post_recv(rdma, GFP_KERNEL); - if (ret) - goto err1; - ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp, - wr_lst, rp_ch); + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); + *p++ = rdma->sc_fc_credits; + *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg; + + ret = svc_rdma_encode_read_list(sctxt); + if (ret < 0) + goto put_ctxt; + ret = svc_rdma_encode_write_list(rctxt, sctxt); + if (ret < 0) + goto put_ctxt; + ret = svc_rdma_encode_reply_chunk(rctxt, sctxt, rc_size); + if (ret < 0) + goto put_ctxt; + + ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp); if (ret < 0) - goto err0; + goto put_ctxt; return 0; - err2: +reply_chunk: if (ret != -E2BIG && ret != -EINVAL) - goto err1; + goto put_ctxt; - ret = svc_rdma_post_recv(rdma, GFP_KERNEL); - if (ret) - goto err1; - ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp); - if (ret < 0) - goto err0; + /* Send completion releases payload pages that were part + * of previously posted RDMA Writes. + */ + svc_rdma_save_io_pages(rqstp, sctxt); + svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret); return 0; - err1: - put_page(res_page); - err0: - pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", - ret); - set_bit(XPT_CLOSE, &xprt->xpt_flags); +put_ctxt: + svc_rdma_send_ctxt_put(rdma, sctxt); +drop_connection: + trace_svcrdma_send_err(rqstp, ret); + svc_xprt_deferred_close(&rdma->sc_xprt); return -ENOTCONN; } + +/** + * svc_rdma_result_payload - special processing for a result payload + * @rqstp: RPC transaction context + * @offset: payload's byte offset in @rqstp->rq_res + * @length: size of payload, in bytes + * + * Assign the passed-in result payload to the current Write chunk, + * and advance to cur_result_payload to the next Write chunk, if + * there is one. + * + * Return values: + * %0 if successful or nothing needed to be done + * %-E2BIG if the payload was larger than the Write chunk + */ +int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset, + unsigned int length) +{ + struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; + struct svc_rdma_chunk *chunk; + + chunk = rctxt->rc_cur_result_payload; + if (!length || !chunk) + return 0; + rctxt->rc_cur_result_payload = + pcl_next_chunk(&rctxt->rc_write_pcl, chunk); + + if (length > chunk->ch_length) + return -E2BIG; + chunk->ch_position = offset; + chunk->ch_payload_length = length; + return 0; +} |
