summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/svc_rdma_sendto.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_sendto.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c1077
1 files changed, 689 insertions, 388 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index cf51b8f9b15f..914cd263c2f1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -100,65 +100,68 @@
*/
#include <linux/spinlock.h>
-#include <asm/unaligned.h>
+#include <linux/unaligned.h>
#include <rdma/ib_verbs.h>
#include <rdma/rdma_cm.h>
#include <linux/sunrpc/debug.h>
-#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"
#include <trace/events/rpcrdma.h>
-#define RPCDBG_FACILITY RPCDBG_SVCXPRT
-
static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
-static inline struct svc_rdma_send_ctxt *
-svc_rdma_next_send_ctxt(struct list_head *list)
-{
- return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
- sc_list);
-}
-
static struct svc_rdma_send_ctxt *
svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
{
+ int node = ibdev_to_node(rdma->sc_cm_id->device);
struct svc_rdma_send_ctxt *ctxt;
+ unsigned long pages;
dma_addr_t addr;
void *buffer;
- size_t size;
int i;
- size = sizeof(*ctxt);
- size += rdma->sc_max_send_sges * sizeof(struct ib_sge);
- ctxt = kmalloc(size, GFP_KERNEL);
+ ctxt = kzalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges),
+ GFP_KERNEL, node);
if (!ctxt)
goto fail0;
- buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
- if (!buffer)
+ pages = svc_serv_maxpages(rdma->sc_xprt.xpt_server);
+ ctxt->sc_pages = kcalloc_node(pages, sizeof(struct page *),
+ GFP_KERNEL, node);
+ if (!ctxt->sc_pages)
goto fail1;
+ ctxt->sc_maxpages = pages;
+ buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
+ if (!buffer)
+ goto fail2;
addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
rdma->sc_max_req_size, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
- goto fail2;
+ goto fail3;
+
+ svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
+ ctxt->sc_rdma = rdma;
ctxt->sc_send_wr.next = NULL;
ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send;
ctxt->sc_xprt_buf = buffer;
+ xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
+ rdma->sc_max_req_size);
ctxt->sc_sges[0].addr = addr;
for (i = 0; i < rdma->sc_max_send_sges; i++)
ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
return ctxt;
-fail2:
+fail3:
kfree(buffer);
+fail2:
+ kfree(ctxt->sc_pages);
fail1:
kfree(ctxt);
fail0:
@@ -173,14 +176,16 @@ fail0:
void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
{
struct svc_rdma_send_ctxt *ctxt;
+ struct llist_node *node;
- while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
- list_del(&ctxt->sc_list);
+ while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) {
+ ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
ib_dma_unmap_single(rdma->sc_pd->device,
ctxt->sc_sges[0].addr,
rdma->sc_max_req_size,
DMA_TO_DEVICE);
kfree(ctxt->sc_xprt_buf);
+ kfree(ctxt->sc_pages);
kfree(ctxt);
}
}
@@ -195,56 +200,98 @@ void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
{
struct svc_rdma_send_ctxt *ctxt;
+ struct llist_node *node;
spin_lock(&rdma->sc_send_lock);
- ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts);
- if (!ctxt)
- goto out_empty;
- list_del(&ctxt->sc_list);
+ node = llist_del_first(&rdma->sc_send_ctxts);
spin_unlock(&rdma->sc_send_lock);
+ if (!node)
+ goto out_empty;
+
+ ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
out:
+ rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+ ctxt->sc_xprt_buf, NULL);
+
+ svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc);
ctxt->sc_send_wr.num_sge = 0;
ctxt->sc_cur_sge_no = 0;
ctxt->sc_page_count = 0;
+ ctxt->sc_wr_chain = &ctxt->sc_send_wr;
+ ctxt->sc_sqecount = 1;
+
return ctxt;
out_empty:
- spin_unlock(&rdma->sc_send_lock);
ctxt = svc_rdma_send_ctxt_alloc(rdma);
if (!ctxt)
return NULL;
goto out;
}
-/**
- * svc_rdma_send_ctxt_put - Return send_ctxt to free list
- * @rdma: controlling svcxprt_rdma
- * @ctxt: object to return to the free list
- *
- * Pages left in sc_pages are DMA unmapped and released.
- */
-void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt)
+static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
{
struct ib_device *device = rdma->sc_cm_id->device;
unsigned int i;
+ svc_rdma_reply_chunk_release(rdma, ctxt);
+
+ if (ctxt->sc_page_count)
+ release_pages(ctxt->sc_pages, ctxt->sc_page_count);
+
/* The first SGE contains the transport header, which
* remains mapped until @ctxt is destroyed.
*/
- for (i = 1; i < ctxt->sc_send_wr.num_sge; i++)
+ for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
+ trace_svcrdma_dma_unmap_page(&ctxt->sc_cid,
+ ctxt->sc_sges[i].addr,
+ ctxt->sc_sges[i].length);
ib_dma_unmap_page(device,
ctxt->sc_sges[i].addr,
ctxt->sc_sges[i].length,
DMA_TO_DEVICE);
+ }
- for (i = 0; i < ctxt->sc_page_count; ++i)
- put_page(ctxt->sc_pages[i]);
+ llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts);
+}
- spin_lock(&rdma->sc_send_lock);
- list_add(&ctxt->sc_list, &rdma->sc_send_ctxts);
- spin_unlock(&rdma->sc_send_lock);
+static void svc_rdma_send_ctxt_put_async(struct work_struct *work)
+{
+ struct svc_rdma_send_ctxt *ctxt;
+
+ ctxt = container_of(work, struct svc_rdma_send_ctxt, sc_work);
+ svc_rdma_send_ctxt_release(ctxt->sc_rdma, ctxt);
+}
+
+/**
+ * svc_rdma_send_ctxt_put - Return send_ctxt to free list
+ * @rdma: controlling svcxprt_rdma
+ * @ctxt: object to return to the free list
+ *
+ * Pages left in sc_pages are DMA unmapped and released.
+ */
+void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ INIT_WORK(&ctxt->sc_work, svc_rdma_send_ctxt_put_async);
+ queue_work(svcrdma_wq, &ctxt->sc_work);
+}
+
+/**
+ * svc_rdma_wake_send_waiters - manage Send Queue accounting
+ * @rdma: controlling transport
+ * @avail: Number of additional SQEs that are now available
+ *
+ */
+void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail)
+{
+ atomic_add(avail, &rdma->sc_sq_avail);
+ smp_mb__after_atomic();
+ if (unlikely(waitqueue_active(&rdma->sc_send_wait)))
+ wake_up(&rdma->sc_send_wait);
}
/**
@@ -259,363 +306,556 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
struct svcxprt_rdma *rdma = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
- struct svc_rdma_send_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt =
+ container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
- trace_svcrdma_wc_send(wc);
+ svc_rdma_wake_send_waiters(rdma, ctxt->sc_sqecount);
- atomic_inc(&rdma->sc_sq_avail);
- wake_up(&rdma->sc_send_wait);
+ if (unlikely(wc->status != IB_WC_SUCCESS))
+ goto flushed;
- ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
+ trace_svcrdma_wc_send(&ctxt->sc_cid);
svc_rdma_send_ctxt_put(rdma, ctxt);
+ return;
- if (unlikely(wc->status != IB_WC_SUCCESS)) {
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- svc_xprt_enqueue(&rdma->sc_xprt);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: Send: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
- }
-
- svc_xprt_put(&rdma->sc_xprt);
+flushed:
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ trace_svcrdma_wc_send_err(wc, &ctxt->sc_cid);
+ else
+ trace_svcrdma_wc_send_flush(wc, &ctxt->sc_cid);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
}
/**
- * svc_rdma_send - Post a single Send WR
- * @rdma: transport on which to post the WR
- * @wr: prepared Send WR to post
+ * svc_rdma_post_send - Post a WR chain to the Send Queue
+ * @rdma: transport context
+ * @ctxt: WR chain to post
+ *
+ * Copy fields in @ctxt to stack variables in order to guarantee
+ * that these values remain available after the ib_post_send() call.
+ * In some error flow cases, svc_rdma_wc_send() releases @ctxt.
+ *
+ * Note there is potential for starvation when the Send Queue is
+ * full because there is no order to when waiting threads are
+ * awoken. The transport is typically provisioned with a deep
+ * enough Send Queue that SQ exhaustion should be a rare event.
*
- * Returns zero the Send WR was posted successfully. Otherwise, a
- * negative errno is returned.
+ * Return values:
+ * %0: @ctxt's WR chain was posted successfully
+ * %-ENOTCONN: The connection was lost
*/
-int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
+int svc_rdma_post_send(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
{
- int ret;
+ struct ib_send_wr *first_wr = ctxt->sc_wr_chain;
+ struct ib_send_wr *send_wr = &ctxt->sc_send_wr;
+ const struct ib_send_wr *bad_wr = first_wr;
+ struct rpc_rdma_cid cid = ctxt->sc_cid;
+ int ret, sqecount = ctxt->sc_sqecount;
might_sleep();
+ /* Sync the transport header buffer */
+ ib_dma_sync_single_for_device(rdma->sc_pd->device,
+ send_wr->sg_list[0].addr,
+ send_wr->sg_list[0].length,
+ DMA_TO_DEVICE);
+
/* If the SQ is full, wait until an SQ entry is available */
- while (1) {
- if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) {
- atomic_inc(&rdma_stat_sq_starve);
- trace_svcrdma_sq_full(rdma);
- atomic_inc(&rdma->sc_sq_avail);
+ while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) {
+ if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
+ svc_rdma_wake_send_waiters(rdma, sqecount);
+
+ /* When the transport is torn down, assume
+ * ib_drain_sq() will trigger enough Send
+ * completions to wake us. The XPT_CLOSE test
+ * above should then cause the while loop to
+ * exit.
+ */
+ percpu_counter_inc(&svcrdma_stat_sq_starve);
+ trace_svcrdma_sq_full(rdma, &cid);
wait_event(rdma->sc_send_wait,
- atomic_read(&rdma->sc_sq_avail) > 1);
- if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
- return -ENOTCONN;
- trace_svcrdma_sq_retry(rdma);
+ atomic_read(&rdma->sc_sq_avail) > 0);
+ trace_svcrdma_sq_retry(rdma, &cid);
continue;
}
- svc_xprt_get(&rdma->sc_xprt);
- ret = ib_post_send(rdma->sc_qp, wr, NULL);
- trace_svcrdma_post_send(wr, ret);
+ trace_svcrdma_post_send(ctxt);
+ ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
if (ret) {
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- svc_xprt_put(&rdma->sc_xprt);
- wake_up(&rdma->sc_send_wait);
+ trace_svcrdma_sq_post_err(rdma, &cid, ret);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+
+ /* If even one WR was posted, there will be a
+ * Send completion that bumps sc_sq_avail.
+ */
+ if (bad_wr == first_wr) {
+ svc_rdma_wake_send_waiters(rdma, sqecount);
+ break;
+ }
}
- break;
+ return 0;
}
- return ret;
+ return -ENOTCONN;
}
-static u32 xdr_padsize(u32 len)
+/**
+ * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
+ * @sctxt: Send context for the RPC Reply
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply Read list
+ * %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
{
- return (len & 3) ? (4 - (len & 3)) : 0;
+ /* RPC-over-RDMA version 1 replies never have a Read list. */
+ return xdr_stream_encode_item_absent(&sctxt->sc_stream);
}
-/* Returns length of transport header, in bytes.
+/**
+ * svc_rdma_encode_write_segment - Encode one Write segment
+ * @sctxt: Send context for the RPC Reply
+ * @chunk: Write chunk to push
+ * @remaining: remaining bytes of the payload left in the Write chunk
+ * @segno: which segment in the chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write segment, and updates @remaining
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
+static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_chunk *chunk,
+ u32 *remaining, unsigned int segno)
{
- unsigned int nsegs;
+ const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
+ const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
+ u32 length;
__be32 *p;
- p = rdma_resp;
-
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p += rpcrdma_fixed_maxsz + 1;
-
- /* Skip Write list. */
- while (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
- }
-
- /* Skip Reply chunk. */
- if (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
- }
-
- return (unsigned long)p - (unsigned long)rdma_resp;
+ p = xdr_reserve_space(&sctxt->sc_stream, len);
+ if (!p)
+ return -EMSGSIZE;
+
+ length = min_t(u32, *remaining, segment->rs_length);
+ *remaining -= length;
+ xdr_encode_rdma_segment(p, segment->rs_handle, length,
+ segment->rs_offset);
+ trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
+ segment->rs_offset);
+ return len;
}
-/* One Write chunk is copied from Call transport header to Reply
- * transport header. Each segment's length field is updated to
- * reflect number of bytes consumed in the segment.
+/**
+ * svc_rdma_encode_write_chunk - Encode one Write chunk
+ * @sctxt: Send context for the RPC Reply
+ * @chunk: Write chunk to push
*
- * Returns number of segments in this chunk.
+ * Copy a Write chunk from the Call transport header to the
+ * Reply transport header. Update each segment's length field
+ * to reflect the number of bytes written in that segment.
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write chunk
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
- unsigned int remaining)
+static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_chunk *chunk)
{
- unsigned int i, nsegs;
- u32 seg_len;
-
- /* Write list discriminator */
- *dst++ = *src++;
-
- /* number of segments in this chunk */
- nsegs = be32_to_cpup(src);
- *dst++ = *src++;
-
- for (i = nsegs; i; i--) {
- /* segment's RDMA handle */
- *dst++ = *src++;
-
- /* bytes returned in this segment */
- seg_len = be32_to_cpu(*src);
- if (remaining >= seg_len) {
- /* entire segment was consumed */
- *dst = *src;
- remaining -= seg_len;
- } else {
- /* segment only partly filled */
- *dst = cpu_to_be32(remaining);
- remaining = 0;
- }
- dst++; src++;
+ u32 remaining = chunk->ch_payload_length;
+ unsigned int segno;
+ ssize_t len, ret;
- /* segment's RDMA offset */
- *dst++ = *src++;
- *dst++ = *src++;
- }
+ len = 0;
+ ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
+ if (ret < 0)
+ return ret;
+ len += ret;
- return nsegs;
-}
+ ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
+ if (ret < 0)
+ return ret;
+ len += ret;
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- * - Client has provided only one Write chunk
- */
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
- unsigned int consumed)
-{
- unsigned int nsegs;
- __be32 *p, *q;
-
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
- q = wr_ch;
- while (*q != xdr_zero) {
- nsegs = xdr_encode_write_chunk(p, q, consumed);
- q += 2 + nsegs * rpcrdma_segment_maxsz;
- p += 2 + nsegs * rpcrdma_segment_maxsz;
- consumed = 0;
+ for (segno = 0; segno < chunk->ch_segcount; segno++) {
+ ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
+ if (ret < 0)
+ return ret;
+ len += ret;
}
- /* Terminate Write list */
- *p++ = xdr_zero;
-
- /* Reply chunk discriminator; may be replaced later */
- *p = xdr_zero;
+ return len;
}
-/* The client provided a Reply chunk in the Call message. Fill in
- * the segments in the Reply chunk in the Reply message with the
- * number of bytes consumed in each segment.
+/**
+ * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
*
- * Assumptions:
- * - Reply can always fit in the provided Reply chunk
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Write list
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
- unsigned int consumed)
+static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt)
{
- __be32 *p;
+ struct svc_rdma_chunk *chunk;
+ ssize_t len, ret;
- /* Find the Reply chunk in the Reply's xprt header.
- * RPC-over-RDMA V1 replies never have a Read list.
- */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+ len = 0;
+ pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
+ ret = svc_rdma_encode_write_chunk(sctxt, chunk);
+ if (ret < 0)
+ return ret;
+ len += ret;
+ }
- /* Skip past Write list */
- while (*p++ != xdr_zero)
- p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+ /* Terminate the Write list */
+ ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
+ if (ret < 0)
+ return ret;
- xdr_encode_write_chunk(p, rp_ch, consumed);
+ return len + ret;
}
-/* Parse the RPC Call's transport header.
+/**
+ * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the Reply chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Reply chunk
+ * %-EMSGSIZE on XDR buffer overflow
+ * %-E2BIG if the RPC message is larger than the Reply chunk
*/
-static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
- __be32 **write, __be32 **reply)
+static ssize_t
+svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int length)
{
- __be32 *p;
+ struct svc_rdma_chunk *chunk;
- p = rdma_argp + rpcrdma_fixed_maxsz;
+ if (pcl_is_empty(&rctxt->rc_reply_pcl))
+ return xdr_stream_encode_item_absent(&sctxt->sc_stream);
- /* Read list */
- while (*p++ != xdr_zero)
- p += 5;
+ chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+ if (length > chunk->ch_length)
+ return -E2BIG;
- /* Write list */
- if (*p != xdr_zero) {
- *write = p;
- while (*p++ != xdr_zero)
- p += 1 + be32_to_cpu(*p) * 4;
- } else {
- *write = NULL;
- p++;
- }
-
- /* Reply chunk */
- if (*p != xdr_zero)
- *reply = p;
- else
- *reply = NULL;
+ chunk->ch_payload_length = length;
+ return svc_rdma_encode_write_chunk(sctxt, chunk);
}
-static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- struct page *page,
- unsigned long offset,
- unsigned int len)
+struct svc_rdma_map_data {
+ struct svcxprt_rdma *md_rdma;
+ struct svc_rdma_send_ctxt *md_ctxt;
+};
+
+/**
+ * svc_rdma_page_dma_map - DMA map one page
+ * @data: pointer to arguments
+ * @page: struct page to DMA map
+ * @offset: offset into the page
+ * @len: number of bytes to map
+ *
+ * Returns:
+ * %0 if DMA mapping was successful
+ * %-EIO if the page cannot be DMA mapped
+ */
+static int svc_rdma_page_dma_map(void *data, struct page *page,
+ unsigned long offset, unsigned int len)
{
+ struct svc_rdma_map_data *args = data;
+ struct svcxprt_rdma *rdma = args->md_rdma;
+ struct svc_rdma_send_ctxt *ctxt = args->md_ctxt;
struct ib_device *dev = rdma->sc_cm_id->device;
dma_addr_t dma_addr;
+ ++ctxt->sc_cur_sge_no;
+
dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(dev, dma_addr))
goto out_maperr;
+ trace_svcrdma_dma_map_page(&ctxt->sc_cid, dma_addr, len);
ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
ctxt->sc_send_wr.num_sge++;
return 0;
out_maperr:
- trace_svcrdma_dma_map_page(rdma, page);
+ trace_svcrdma_dma_map_err(&ctxt->sc_cid, dma_addr, len);
return -EIO;
}
-/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
+/**
+ * svc_rdma_iov_dma_map - DMA map an iovec
+ * @data: pointer to arguments
+ * @iov: kvec to DMA map
+ *
+ * ib_dma_map_page() is used here because svc_rdma_dma_unmap()
* handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
+ *
+ * Returns:
+ * %0 if DMA mapping was successful
+ * %-EIO if the iovec cannot be DMA mapped
*/
-static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- unsigned char *base,
- unsigned int len)
+static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov)
{
- return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base),
- offset_in_page(base), len);
+ if (!iov->iov_len)
+ return 0;
+ return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base),
+ offset_in_page(iov->iov_base),
+ iov->iov_len);
}
/**
- * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer
- * @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
- * @len: length of transport header
+ * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
+ * @xdr: xdr_buf containing portion of an RPC message to transmit
+ * @data: pointer to arguments
+ *
+ * Returns:
+ * %0 if DMA mapping was successful
+ * %-EIO if DMA mapping failed
*
+ * On failure, any DMA mappings that have been already done must be
+ * unmapped by the caller.
*/
-void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- unsigned int len)
+static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data)
{
- ctxt->sc_sges[0].length = len;
- ctxt->sc_send_wr.num_sge++;
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
- ctxt->sc_sges[0].addr, len,
- DMA_TO_DEVICE);
+ unsigned int len, remaining;
+ unsigned long pageoff;
+ struct page **ppages;
+ int ret;
+
+ ret = svc_rdma_iov_dma_map(data, &xdr->head[0]);
+ if (ret < 0)
+ return ret;
+
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+ pageoff = offset_in_page(xdr->page_base);
+ remaining = xdr->page_len;
+ while (remaining) {
+ len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+
+ ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len);
+ if (ret < 0)
+ return ret;
+
+ remaining -= len;
+ pageoff = 0;
+ }
+
+ ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]);
+ if (ret < 0)
+ return ret;
+
+ return xdr->len;
}
-/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
- * @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
- * @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
+struct svc_rdma_pullup_data {
+ u8 *pd_dest;
+ unsigned int pd_length;
+ unsigned int pd_num_sges;
+};
+
+/**
+ * svc_rdma_xb_count_sges - Count how many SGEs will be needed
+ * @xdr: xdr_buf containing portion of an RPC message to transmit
+ * @data: pointer to arguments
*
- * Load the xdr_buf into the ctxt's sge array, and DMA map each
- * element as it is added.
+ * Returns:
+ * Number of SGEs needed to Send the contents of @xdr inline
+ */
+static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
+ void *data)
+{
+ struct svc_rdma_pullup_data *args = data;
+ unsigned int remaining;
+ unsigned long offset;
+
+ if (xdr->head[0].iov_len)
+ ++args->pd_num_sges;
+
+ offset = offset_in_page(xdr->page_base);
+ remaining = xdr->page_len;
+ while (remaining) {
+ ++args->pd_num_sges;
+ remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
+ offset = 0;
+ }
+
+ if (xdr->tail[0].iov_len)
+ ++args->pd_num_sges;
+
+ args->pd_length += xdr->len;
+ return 0;
+}
+
+/**
+ * svc_rdma_pull_up_needed - Determine whether to use pull-up
+ * @rdma: controlling transport
+ * @sctxt: send_ctxt for the Send WR
+ * @write_pcl: Write chunk list provided by client
+ * @xdr: xdr_buf containing RPC message to transmit
*
- * Returns zero on success, or a negative errno on failure.
+ * Returns:
+ * %true if pull-up must be used
+ * %false otherwise
*/
-int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
+ const struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_pcl *write_pcl,
+ const struct xdr_buf *xdr)
{
- unsigned int len, remaining;
- unsigned long page_off;
- struct page **ppages;
- unsigned char *base;
- u32 xdr_pad;
+ /* Resources needed for the transport header */
+ struct svc_rdma_pullup_data args = {
+ .pd_length = sctxt->sc_hdrbuf.len,
+ .pd_num_sges = 1,
+ };
int ret;
- if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
- return -EIO;
- ret = svc_rdma_dma_map_buf(rdma, ctxt,
- xdr->head[0].iov_base,
- xdr->head[0].iov_len);
+ ret = pcl_process_nonpayloads(write_pcl, xdr,
+ svc_rdma_xb_count_sges, &args);
if (ret < 0)
- return ret;
+ return false;
- /* If a Write chunk is present, the xdr_buf's page list
- * is not included inline. However the Upper Layer may
- * have added XDR padding in the tail buffer, and that
- * should not be included inline.
- */
- if (wr_lst) {
- base = xdr->tail[0].iov_base;
- len = xdr->tail[0].iov_len;
- xdr_pad = xdr_padsize(xdr->page_len);
-
- if (len && xdr_pad) {
- base += xdr_pad;
- len -= xdr_pad;
- }
+ if (args.pd_length < RPCRDMA_PULLUP_THRESH)
+ return true;
+ return args.pd_num_sges >= rdma->sc_max_send_sges;
+}
- goto tail;
+/**
+ * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
+ * @xdr: xdr_buf containing portion of an RPC message to copy
+ * @data: pointer to arguments
+ *
+ * Returns:
+ * Always zero.
+ */
+static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
+ void *data)
+{
+ struct svc_rdma_pullup_data *args = data;
+ unsigned int len, remaining;
+ unsigned long pageoff;
+ struct page **ppages;
+
+ if (xdr->head[0].iov_len) {
+ memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
+ args->pd_dest += xdr->head[0].iov_len;
}
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
- page_off = xdr->page_base & ~PAGE_MASK;
+ pageoff = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
- len = min_t(u32, PAGE_SIZE - page_off, remaining);
-
- if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
- return -EIO;
- ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++,
- page_off, len);
- if (ret < 0)
- return ret;
-
+ len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+ memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
remaining -= len;
- page_off = 0;
+ args->pd_dest += len;
+ pageoff = 0;
+ ppages++;
}
- base = xdr->tail[0].iov_base;
- len = xdr->tail[0].iov_len;
-tail:
- if (len) {
- if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges)
- return -EIO;
- ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len);
- if (ret < 0)
- return ret;
+ if (xdr->tail[0].iov_len) {
+ memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
+ args->pd_dest += xdr->tail[0].iov_len;
}
+ args->pd_length += xdr->len;
+ return 0;
+}
+
+/**
+ * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
+ * @rdma: controlling transport
+ * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
+ * @write_pcl: Write chunk list provided by client
+ * @xdr: prepared xdr_buf containing RPC message
+ *
+ * The device is not capable of sending the reply directly.
+ * Assemble the elements of @xdr into the transport header buffer.
+ *
+ * Assumptions:
+ * pull_up_needed has determined that @xdr will fit in the buffer.
+ *
+ * Returns:
+ * %0 if pull-up was successful
+ * %-EMSGSIZE if a buffer manipulation problem occurred
+ */
+static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_pcl *write_pcl,
+ const struct xdr_buf *xdr)
+{
+ struct svc_rdma_pullup_data args = {
+ .pd_dest = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
+ };
+ int ret;
+
+ ret = pcl_process_nonpayloads(write_pcl, xdr,
+ svc_rdma_xb_linearize, &args);
+ if (ret < 0)
+ return ret;
+
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
+ trace_svcrdma_send_pullup(sctxt, args.pd_length);
return 0;
}
+/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
+ * @rdma: controlling transport
+ * @sctxt: send_ctxt for the Send WR
+ * @write_pcl: Write chunk list provided by client
+ * @reply_pcl: Reply chunk provided by client
+ * @xdr: prepared xdr_buf containing RPC message
+ *
+ * Returns:
+ * %0 if DMA mapping was successful.
+ * %-EMSGSIZE if a buffer manipulation problem occurred
+ * %-EIO if DMA mapping failed
+ *
+ * The Send WR's num_sge field is set in all cases.
+ */
+int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_pcl *write_pcl,
+ const struct svc_rdma_pcl *reply_pcl,
+ const struct xdr_buf *xdr)
+{
+ struct svc_rdma_map_data args = {
+ .md_rdma = rdma,
+ .md_ctxt = sctxt,
+ };
+
+ /* Set up the (persistently-mapped) transport header SGE. */
+ sctxt->sc_send_wr.num_sge = 1;
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
+
+ /* If there is a Reply chunk, nothing follows the transport
+ * header, so there is nothing to map.
+ */
+ if (!pcl_is_empty(reply_pcl))
+ return 0;
+
+ /* For pull-up, svc_rdma_send() will sync the transport header.
+ * No additional DMA mapping is necessary.
+ */
+ if (svc_rdma_pull_up_needed(rdma, sctxt, write_pcl, xdr))
+ return svc_rdma_pull_up_reply_msg(rdma, sctxt, write_pcl, xdr);
+
+ return pcl_process_nonpayloads(write_pcl, xdr,
+ svc_rdma_xb_dma_map, &args);
+}
+
/* The svc_rqst and all resources it owns are released as soon as
* svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
* so they are released by the Send completion handler.
@@ -640,78 +880,108 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
* in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
*
* Depending on whether a Write list or Reply chunk is present,
- * the server may send all, a portion of, or none of the xdr_buf.
+ * the server may Send all, a portion of, or none of the xdr_buf.
* In the latter case, only the transport header (sc_sges[0]) is
* transmitted.
*
- * RDMA Send is the last step of transmitting an RPC reply. Pages
- * involved in the earlier RDMA Writes are here transferred out
- * of the rqstp and into the sctxt's page array. These pages are
- * DMA unmapped by each Write completion, but the subsequent Send
- * completion finally releases these pages.
- *
* Assumptions:
* - The Reply's transport header will never be larger than a page.
*/
static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt,
- struct svc_rdma_recv_ctxt *rctxt,
- struct svc_rqst *rqstp,
- __be32 *wr_lst, __be32 *rp_ch)
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rqst *rqstp)
{
+ struct ib_send_wr *send_wr = &sctxt->sc_send_wr;
int ret;
- if (!rp_ch) {
- ret = svc_rdma_map_reply_msg(rdma, sctxt,
- &rqstp->rq_res, wr_lst);
- if (ret < 0)
- return ret;
- }
+ ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl,
+ &rctxt->rc_reply_pcl, &rqstp->rq_res);
+ if (ret < 0)
+ return ret;
+ /* Transfer pages involved in RDMA Writes to the sctxt's
+ * page array. Completion handling releases these pages.
+ */
svc_rdma_save_io_pages(rqstp, sctxt);
if (rctxt->rc_inv_rkey) {
- sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
- sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
+ send_wr->opcode = IB_WR_SEND_WITH_INV;
+ send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey;
} else {
- sctxt->sc_send_wr.opcode = IB_WR_SEND;
+ send_wr->opcode = IB_WR_SEND;
}
- dprintk("svcrdma: posting Send WR with %u sge(s)\n",
- sctxt->sc_send_wr.num_sge);
- return svc_rdma_send(rdma, &sctxt->sc_send_wr);
+
+ return svc_rdma_post_send(rdma, sctxt);
}
-/* Given the client-provided Write and Reply chunks, the server was not
- * able to form a complete reply. Return an RDMA_ERROR message so the
- * client can retire this RPC transaction. As above, the Send completion
- * routine releases payload pages that were part of a previous RDMA Write.
+/**
+ * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
+ * @rdma: controlling transport context
+ * @sctxt: Send context for the response
+ * @rctxt: Receive context for incoming bad message
+ * @status: negative errno indicating error that occurred
*
- * Remote Invalidation is skipped for simplicity.
+ * Given the client-provided Read, Write, and Reply chunks, the
+ * server was not able to parse the Call or form a complete Reply.
+ * Return an RDMA_ERROR message so the client can retire the RPC
+ * transaction.
+ *
+ * The caller does not have to release @sctxt. It is released by
+ * Send completion, or by this function on error.
*/
-static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- struct svc_rqst *rqstp)
+void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ struct svc_rdma_recv_ctxt *rctxt,
+ int status)
{
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
__be32 *p;
- int ret;
- p = ctxt->sc_xprt_buf;
- trace_svcrdma_err_chunk(*p);
- p += 3;
- *p++ = rdma_error;
- *p = err_chunk;
- svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
+ rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
+ sctxt->sc_xprt_buf, NULL);
- svc_rdma_save_io_pages(rqstp, ctxt);
+ p = xdr_reserve_space(&sctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
- ctxt->sc_send_wr.opcode = IB_WR_SEND;
- ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
- if (ret) {
- svc_rdma_send_ctxt_put(rdma, ctxt);
- return ret;
+ *p++ = *rdma_argp;
+ *p++ = *(rdma_argp + 1);
+ *p++ = rdma->sc_fc_credits;
+ *p = rdma_error;
+
+ switch (status) {
+ case -EPROTONOSUPPORT:
+ p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
+ *p++ = err_vers;
+ *p++ = rpcrdma_version;
+ *p = rpcrdma_version;
+ trace_svcrdma_err_vers(*rdma_argp);
+ break;
+ default:
+ p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
+ *p = err_chunk;
+ trace_svcrdma_err_chunk(*rdma_argp);
}
- return 0;
+ /* Remote Invalidation is skipped for simplicity. */
+ sctxt->sc_send_wr.num_sge = 1;
+ sctxt->sc_send_wr.opcode = IB_WR_SEND;
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
+ if (svc_rdma_post_send(rdma, sctxt))
+ goto put_ctxt;
+ return;
+
+put_ctxt:
+ svc_rdma_send_ctxt_put(rdma, sctxt);
}
/**
@@ -732,78 +1002,109 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
- __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
- struct xdr_buf *xdr = &rqstp->rq_res;
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
struct svc_rdma_send_ctxt *sctxt;
+ unsigned int rc_size;
+ __be32 *p;
int ret;
- rdma_argp = rctxt->rc_recv_buf;
- svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
+ ret = -ENOTCONN;
+ if (svc_xprt_is_dead(xprt))
+ goto drop_connection;
- /* Create the RDMA response header. xprt->xpt_mutex,
- * acquired in svc_send(), serializes RPC replies. The
- * code path below that inserts the credit grant value
- * into each transport header runs only inside this
- * critical section.
- */
ret = -ENOMEM;
sctxt = svc_rdma_send_ctxt_get(rdma);
if (!sctxt)
- goto err0;
- rdma_resp = sctxt->sc_xprt_buf;
+ goto drop_connection;
- p = rdma_resp;
- *p++ = *rdma_argp;
- *p++ = *(rdma_argp + 1);
- *p++ = rdma->sc_fc_credits;
- *p++ = rp_ch ? rdma_nomsg : rdma_msg;
+ ret = -EMSGSIZE;
+ p = xdr_reserve_space(&sctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
- /* Start with empty chunks */
- *p++ = xdr_zero;
- *p++ = xdr_zero;
- *p = xdr_zero;
+ ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res);
+ if (ret < 0)
+ goto put_ctxt;
- if (wr_lst) {
- /* XXX: Presume the client sent only one Write chunk */
- ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
- if (ret < 0)
- goto err2;
- svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
- }
- if (rp_ch) {
- ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
+ rc_size = 0;
+ if (!pcl_is_empty(&rctxt->rc_reply_pcl)) {
+ ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl,
+ &rctxt->rc_reply_pcl, sctxt,
+ &rqstp->rq_res);
if (ret < 0)
- goto err2;
- svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
+ goto reply_chunk;
+ rc_size = ret;
}
- svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
- ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp,
- wr_lst, rp_ch);
+ *p++ = *rdma_argp;
+ *p++ = *(rdma_argp + 1);
+ *p++ = rdma->sc_fc_credits;
+ *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
+
+ ret = svc_rdma_encode_read_list(sctxt);
if (ret < 0)
- goto err1;
- ret = 0;
+ goto put_ctxt;
+ ret = svc_rdma_encode_write_list(rctxt, sctxt);
+ if (ret < 0)
+ goto put_ctxt;
+ ret = svc_rdma_encode_reply_chunk(rctxt, sctxt, rc_size);
+ if (ret < 0)
+ goto put_ctxt;
-out:
- rqstp->rq_xprt_ctxt = NULL;
- svc_rdma_recv_ctxt_put(rdma, rctxt);
- return ret;
+ ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
+ if (ret < 0)
+ goto put_ctxt;
+ return 0;
- err2:
+reply_chunk:
if (ret != -E2BIG && ret != -EINVAL)
- goto err1;
+ goto put_ctxt;
- ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp);
- if (ret < 0)
- goto err1;
- ret = 0;
- goto out;
+ /* Send completion releases payload pages that were part
+ * of previously posted RDMA Writes.
+ */
+ svc_rdma_save_io_pages(rqstp, sctxt);
+ svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
+ return 0;
- err1:
+put_ctxt:
svc_rdma_send_ctxt_put(rdma, sctxt);
- err0:
- trace_svcrdma_send_failed(rqstp, ret);
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
- ret = -ENOTCONN;
- goto out;
+drop_connection:
+ trace_svcrdma_send_err(rqstp, ret);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+ return -ENOTCONN;
+}
+
+/**
+ * svc_rdma_result_payload - special processing for a result payload
+ * @rqstp: RPC transaction context
+ * @offset: payload's byte offset in @rqstp->rq_res
+ * @length: size of payload, in bytes
+ *
+ * Assign the passed-in result payload to the current Write chunk,
+ * and advance to cur_result_payload to the next Write chunk, if
+ * there is one.
+ *
+ * Return values:
+ * %0 if successful or nothing needed to be done
+ * %-E2BIG if the payload was larger than the Write chunk
+ */
+int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
+ unsigned int length)
+{
+ struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+ struct svc_rdma_chunk *chunk;
+
+ chunk = rctxt->rc_cur_result_payload;
+ if (!length || !chunk)
+ return 0;
+ rctxt->rc_cur_result_payload =
+ pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
+
+ if (length > chunk->ch_length)
+ return -E2BIG;
+ chunk->ch_position = offset;
+ chunk->ch_payload_length = length;
+ return 0;
}