diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_sendto.c')
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 1077 |
1 files changed, 689 insertions, 388 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index cf51b8f9b15f..914cd263c2f1 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -100,65 +100,68 @@ */ #include <linux/spinlock.h> -#include <asm/unaligned.h> +#include <linux/unaligned.h> #include <rdma/ib_verbs.h> #include <rdma/rdma_cm.h> #include <linux/sunrpc/debug.h> -#include <linux/sunrpc/rpc_rdma.h> #include <linux/sunrpc/svc_rdma.h> #include "xprt_rdma.h" #include <trace/events/rpcrdma.h> -#define RPCDBG_FACILITY RPCDBG_SVCXPRT - static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); -static inline struct svc_rdma_send_ctxt * -svc_rdma_next_send_ctxt(struct list_head *list) -{ - return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, - sc_list); -} - static struct svc_rdma_send_ctxt * svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) { + int node = ibdev_to_node(rdma->sc_cm_id->device); struct svc_rdma_send_ctxt *ctxt; + unsigned long pages; dma_addr_t addr; void *buffer; - size_t size; int i; - size = sizeof(*ctxt); - size += rdma->sc_max_send_sges * sizeof(struct ib_sge); - ctxt = kmalloc(size, GFP_KERNEL); + ctxt = kzalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges), + GFP_KERNEL, node); if (!ctxt) goto fail0; - buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL); - if (!buffer) + pages = svc_serv_maxpages(rdma->sc_xprt.xpt_server); + ctxt->sc_pages = kcalloc_node(pages, sizeof(struct page *), + GFP_KERNEL, node); + if (!ctxt->sc_pages) goto fail1; + ctxt->sc_maxpages = pages; + buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node); + if (!buffer) + goto fail2; addr = ib_dma_map_single(rdma->sc_pd->device, buffer, rdma->sc_max_req_size, DMA_TO_DEVICE); if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) - goto fail2; + goto fail3; + + svc_rdma_send_cid_init(rdma, &ctxt->sc_cid); + ctxt->sc_rdma = rdma; ctxt->sc_send_wr.next = NULL; ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; ctxt->sc_send_wr.sg_list = ctxt->sc_sges; ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; ctxt->sc_cqe.done = svc_rdma_wc_send; ctxt->sc_xprt_buf = buffer; + xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf, + rdma->sc_max_req_size); ctxt->sc_sges[0].addr = addr; for (i = 0; i < rdma->sc_max_send_sges; i++) ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; return ctxt; -fail2: +fail3: kfree(buffer); +fail2: + kfree(ctxt->sc_pages); fail1: kfree(ctxt); fail0: @@ -173,14 +176,16 @@ fail0: void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) { struct svc_rdma_send_ctxt *ctxt; + struct llist_node *node; - while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { - list_del(&ctxt->sc_list); + while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) { + ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node); ib_dma_unmap_single(rdma->sc_pd->device, ctxt->sc_sges[0].addr, rdma->sc_max_req_size, DMA_TO_DEVICE); kfree(ctxt->sc_xprt_buf); + kfree(ctxt->sc_pages); kfree(ctxt); } } @@ -195,56 +200,98 @@ void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) { struct svc_rdma_send_ctxt *ctxt; + struct llist_node *node; spin_lock(&rdma->sc_send_lock); - ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); - if (!ctxt) - goto out_empty; - list_del(&ctxt->sc_list); + node = llist_del_first(&rdma->sc_send_ctxts); spin_unlock(&rdma->sc_send_lock); + if (!node) + goto out_empty; + + ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node); out: + rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0); + xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf, + ctxt->sc_xprt_buf, NULL); + + svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc); ctxt->sc_send_wr.num_sge = 0; ctxt->sc_cur_sge_no = 0; ctxt->sc_page_count = 0; + ctxt->sc_wr_chain = &ctxt->sc_send_wr; + ctxt->sc_sqecount = 1; + return ctxt; out_empty: - spin_unlock(&rdma->sc_send_lock); ctxt = svc_rdma_send_ctxt_alloc(rdma); if (!ctxt) return NULL; goto out; } -/** - * svc_rdma_send_ctxt_put - Return send_ctxt to free list - * @rdma: controlling svcxprt_rdma - * @ctxt: object to return to the free list - * - * Pages left in sc_pages are DMA unmapped and released. - */ -void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt) +static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) { struct ib_device *device = rdma->sc_cm_id->device; unsigned int i; + svc_rdma_reply_chunk_release(rdma, ctxt); + + if (ctxt->sc_page_count) + release_pages(ctxt->sc_pages, ctxt->sc_page_count); + /* The first SGE contains the transport header, which * remains mapped until @ctxt is destroyed. */ - for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) + for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) { + trace_svcrdma_dma_unmap_page(&ctxt->sc_cid, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length); ib_dma_unmap_page(device, ctxt->sc_sges[i].addr, ctxt->sc_sges[i].length, DMA_TO_DEVICE); + } - for (i = 0; i < ctxt->sc_page_count; ++i) - put_page(ctxt->sc_pages[i]); + llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts); +} - spin_lock(&rdma->sc_send_lock); - list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); - spin_unlock(&rdma->sc_send_lock); +static void svc_rdma_send_ctxt_put_async(struct work_struct *work) +{ + struct svc_rdma_send_ctxt *ctxt; + + ctxt = container_of(work, struct svc_rdma_send_ctxt, sc_work); + svc_rdma_send_ctxt_release(ctxt->sc_rdma, ctxt); +} + +/** + * svc_rdma_send_ctxt_put - Return send_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * + * Pages left in sc_pages are DMA unmapped and released. + */ +void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + INIT_WORK(&ctxt->sc_work, svc_rdma_send_ctxt_put_async); + queue_work(svcrdma_wq, &ctxt->sc_work); +} + +/** + * svc_rdma_wake_send_waiters - manage Send Queue accounting + * @rdma: controlling transport + * @avail: Number of additional SQEs that are now available + * + */ +void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail) +{ + atomic_add(avail, &rdma->sc_sq_avail); + smp_mb__after_atomic(); + if (unlikely(waitqueue_active(&rdma->sc_send_wait))) + wake_up(&rdma->sc_send_wait); } /** @@ -259,363 +306,556 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) { struct svcxprt_rdma *rdma = cq->cq_context; struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_send_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt = + container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); - trace_svcrdma_wc_send(wc); + svc_rdma_wake_send_waiters(rdma, ctxt->sc_sqecount); - atomic_inc(&rdma->sc_sq_avail); - wake_up(&rdma->sc_send_wait); + if (unlikely(wc->status != IB_WC_SUCCESS)) + goto flushed; - ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); + trace_svcrdma_wc_send(&ctxt->sc_cid); svc_rdma_send_ctxt_put(rdma, ctxt); + return; - if (unlikely(wc->status != IB_WC_SUCCESS)) { - set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); - svc_xprt_enqueue(&rdma->sc_xprt); - if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("svcrdma: Send: %s (%u/0x%x)\n", - ib_wc_status_msg(wc->status), - wc->status, wc->vendor_err); - } - - svc_xprt_put(&rdma->sc_xprt); +flushed: + if (wc->status != IB_WC_WR_FLUSH_ERR) + trace_svcrdma_wc_send_err(wc, &ctxt->sc_cid); + else + trace_svcrdma_wc_send_flush(wc, &ctxt->sc_cid); + svc_rdma_send_ctxt_put(rdma, ctxt); + svc_xprt_deferred_close(&rdma->sc_xprt); } /** - * svc_rdma_send - Post a single Send WR - * @rdma: transport on which to post the WR - * @wr: prepared Send WR to post + * svc_rdma_post_send - Post a WR chain to the Send Queue + * @rdma: transport context + * @ctxt: WR chain to post + * + * Copy fields in @ctxt to stack variables in order to guarantee + * that these values remain available after the ib_post_send() call. + * In some error flow cases, svc_rdma_wc_send() releases @ctxt. + * + * Note there is potential for starvation when the Send Queue is + * full because there is no order to when waiting threads are + * awoken. The transport is typically provisioned with a deep + * enough Send Queue that SQ exhaustion should be a rare event. * - * Returns zero the Send WR was posted successfully. Otherwise, a - * negative errno is returned. + * Return values: + * %0: @ctxt's WR chain was posted successfully + * %-ENOTCONN: The connection was lost */ -int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) +int svc_rdma_post_send(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) { - int ret; + struct ib_send_wr *first_wr = ctxt->sc_wr_chain; + struct ib_send_wr *send_wr = &ctxt->sc_send_wr; + const struct ib_send_wr *bad_wr = first_wr; + struct rpc_rdma_cid cid = ctxt->sc_cid; + int ret, sqecount = ctxt->sc_sqecount; might_sleep(); + /* Sync the transport header buffer */ + ib_dma_sync_single_for_device(rdma->sc_pd->device, + send_wr->sg_list[0].addr, + send_wr->sg_list[0].length, + DMA_TO_DEVICE); + /* If the SQ is full, wait until an SQ entry is available */ - while (1) { - if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) { - atomic_inc(&rdma_stat_sq_starve); - trace_svcrdma_sq_full(rdma); - atomic_inc(&rdma->sc_sq_avail); + while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) { + if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) { + svc_rdma_wake_send_waiters(rdma, sqecount); + + /* When the transport is torn down, assume + * ib_drain_sq() will trigger enough Send + * completions to wake us. The XPT_CLOSE test + * above should then cause the while loop to + * exit. + */ + percpu_counter_inc(&svcrdma_stat_sq_starve); + trace_svcrdma_sq_full(rdma, &cid); wait_event(rdma->sc_send_wait, - atomic_read(&rdma->sc_sq_avail) > 1); - if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) - return -ENOTCONN; - trace_svcrdma_sq_retry(rdma); + atomic_read(&rdma->sc_sq_avail) > 0); + trace_svcrdma_sq_retry(rdma, &cid); continue; } - svc_xprt_get(&rdma->sc_xprt); - ret = ib_post_send(rdma->sc_qp, wr, NULL); - trace_svcrdma_post_send(wr, ret); + trace_svcrdma_post_send(ctxt); + ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); if (ret) { - set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); - svc_xprt_put(&rdma->sc_xprt); - wake_up(&rdma->sc_send_wait); + trace_svcrdma_sq_post_err(rdma, &cid, ret); + svc_xprt_deferred_close(&rdma->sc_xprt); + + /* If even one WR was posted, there will be a + * Send completion that bumps sc_sq_avail. + */ + if (bad_wr == first_wr) { + svc_rdma_wake_send_waiters(rdma, sqecount); + break; + } } - break; + return 0; } - return ret; + return -ENOTCONN; } -static u32 xdr_padsize(u32 len) +/** + * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list + * @sctxt: Send context for the RPC Reply + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Reply Read list + * %-EMSGSIZE on XDR buffer overflow + */ +static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt) { - return (len & 3) ? (4 - (len & 3)) : 0; + /* RPC-over-RDMA version 1 replies never have a Read list. */ + return xdr_stream_encode_item_absent(&sctxt->sc_stream); } -/* Returns length of transport header, in bytes. +/** + * svc_rdma_encode_write_segment - Encode one Write segment + * @sctxt: Send context for the RPC Reply + * @chunk: Write chunk to push + * @remaining: remaining bytes of the payload left in the Write chunk + * @segno: which segment in the chunk + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Write segment, and updates @remaining + * %-EMSGSIZE on XDR buffer overflow */ -static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) +static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_chunk *chunk, + u32 *remaining, unsigned int segno) { - unsigned int nsegs; + const struct svc_rdma_segment *segment = &chunk->ch_segments[segno]; + const size_t len = rpcrdma_segment_maxsz * sizeof(__be32); + u32 length; __be32 *p; - p = rdma_resp; - - /* RPC-over-RDMA V1 replies never have a Read list. */ - p += rpcrdma_fixed_maxsz + 1; - - /* Skip Write list. */ - while (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; - } - - /* Skip Reply chunk. */ - if (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; - } - - return (unsigned long)p - (unsigned long)rdma_resp; + p = xdr_reserve_space(&sctxt->sc_stream, len); + if (!p) + return -EMSGSIZE; + + length = min_t(u32, *remaining, segment->rs_length); + *remaining -= length; + xdr_encode_rdma_segment(p, segment->rs_handle, length, + segment->rs_offset); + trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length, + segment->rs_offset); + return len; } -/* One Write chunk is copied from Call transport header to Reply - * transport header. Each segment's length field is updated to - * reflect number of bytes consumed in the segment. +/** + * svc_rdma_encode_write_chunk - Encode one Write chunk + * @sctxt: Send context for the RPC Reply + * @chunk: Write chunk to push * - * Returns number of segments in this chunk. + * Copy a Write chunk from the Call transport header to the + * Reply transport header. Update each segment's length field + * to reflect the number of bytes written in that segment. + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Write chunk + * %-EMSGSIZE on XDR buffer overflow */ -static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, - unsigned int remaining) +static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_chunk *chunk) { - unsigned int i, nsegs; - u32 seg_len; - - /* Write list discriminator */ - *dst++ = *src++; - - /* number of segments in this chunk */ - nsegs = be32_to_cpup(src); - *dst++ = *src++; - - for (i = nsegs; i; i--) { - /* segment's RDMA handle */ - *dst++ = *src++; - - /* bytes returned in this segment */ - seg_len = be32_to_cpu(*src); - if (remaining >= seg_len) { - /* entire segment was consumed */ - *dst = *src; - remaining -= seg_len; - } else { - /* segment only partly filled */ - *dst = cpu_to_be32(remaining); - remaining = 0; - } - dst++; src++; + u32 remaining = chunk->ch_payload_length; + unsigned int segno; + ssize_t len, ret; - /* segment's RDMA offset */ - *dst++ = *src++; - *dst++ = *src++; - } + len = 0; + ret = xdr_stream_encode_item_present(&sctxt->sc_stream); + if (ret < 0) + return ret; + len += ret; - return nsegs; -} + ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount); + if (ret < 0) + return ret; + len += ret; -/* The client provided a Write list in the Call message. Fill in - * the segments in the first Write chunk in the Reply's transport - * header with the number of bytes consumed in each segment. - * Remaining chunks are returned unused. - * - * Assumptions: - * - Client has provided only one Write chunk - */ -static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, - unsigned int consumed) -{ - unsigned int nsegs; - __be32 *p, *q; - - /* RPC-over-RDMA V1 replies never have a Read list. */ - p = rdma_resp + rpcrdma_fixed_maxsz + 1; - - q = wr_ch; - while (*q != xdr_zero) { - nsegs = xdr_encode_write_chunk(p, q, consumed); - q += 2 + nsegs * rpcrdma_segment_maxsz; - p += 2 + nsegs * rpcrdma_segment_maxsz; - consumed = 0; + for (segno = 0; segno < chunk->ch_segcount; segno++) { + ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno); + if (ret < 0) + return ret; + len += ret; } - /* Terminate Write list */ - *p++ = xdr_zero; - - /* Reply chunk discriminator; may be replaced later */ - *p = xdr_zero; + return len; } -/* The client provided a Reply chunk in the Call message. Fill in - * the segments in the Reply chunk in the Reply message with the - * number of bytes consumed in each segment. +/** + * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list + * @rctxt: Reply context with information about the RPC Call + * @sctxt: Send context for the RPC Reply * - * Assumptions: - * - Reply can always fit in the provided Reply chunk + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Reply's Write list + * %-EMSGSIZE on XDR buffer overflow */ -static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, - unsigned int consumed) +static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt, + struct svc_rdma_send_ctxt *sctxt) { - __be32 *p; + struct svc_rdma_chunk *chunk; + ssize_t len, ret; - /* Find the Reply chunk in the Reply's xprt header. - * RPC-over-RDMA V1 replies never have a Read list. - */ - p = rdma_resp + rpcrdma_fixed_maxsz + 1; + len = 0; + pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) { + ret = svc_rdma_encode_write_chunk(sctxt, chunk); + if (ret < 0) + return ret; + len += ret; + } - /* Skip past Write list */ - while (*p++ != xdr_zero) - p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; + /* Terminate the Write list */ + ret = xdr_stream_encode_item_absent(&sctxt->sc_stream); + if (ret < 0) + return ret; - xdr_encode_write_chunk(p, rp_ch, consumed); + return len + ret; } -/* Parse the RPC Call's transport header. +/** + * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk + * @rctxt: Reply context with information about the RPC Call + * @sctxt: Send context for the RPC Reply + * @length: size in bytes of the payload in the Reply chunk + * + * Return values: + * On success, returns length in bytes of the Reply XDR buffer + * that was consumed by the Reply's Reply chunk + * %-EMSGSIZE on XDR buffer overflow + * %-E2BIG if the RPC message is larger than the Reply chunk */ -static void svc_rdma_get_write_arrays(__be32 *rdma_argp, - __be32 **write, __be32 **reply) +static ssize_t +svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt, + struct svc_rdma_send_ctxt *sctxt, + unsigned int length) { - __be32 *p; + struct svc_rdma_chunk *chunk; - p = rdma_argp + rpcrdma_fixed_maxsz; + if (pcl_is_empty(&rctxt->rc_reply_pcl)) + return xdr_stream_encode_item_absent(&sctxt->sc_stream); - /* Read list */ - while (*p++ != xdr_zero) - p += 5; + chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); + if (length > chunk->ch_length) + return -E2BIG; - /* Write list */ - if (*p != xdr_zero) { - *write = p; - while (*p++ != xdr_zero) - p += 1 + be32_to_cpu(*p) * 4; - } else { - *write = NULL; - p++; - } - - /* Reply chunk */ - if (*p != xdr_zero) - *reply = p; - else - *reply = NULL; + chunk->ch_payload_length = length; + return svc_rdma_encode_write_chunk(sctxt, chunk); } -static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt, - struct page *page, - unsigned long offset, - unsigned int len) +struct svc_rdma_map_data { + struct svcxprt_rdma *md_rdma; + struct svc_rdma_send_ctxt *md_ctxt; +}; + +/** + * svc_rdma_page_dma_map - DMA map one page + * @data: pointer to arguments + * @page: struct page to DMA map + * @offset: offset into the page + * @len: number of bytes to map + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if the page cannot be DMA mapped + */ +static int svc_rdma_page_dma_map(void *data, struct page *page, + unsigned long offset, unsigned int len) { + struct svc_rdma_map_data *args = data; + struct svcxprt_rdma *rdma = args->md_rdma; + struct svc_rdma_send_ctxt *ctxt = args->md_ctxt; struct ib_device *dev = rdma->sc_cm_id->device; dma_addr_t dma_addr; + ++ctxt->sc_cur_sge_no; + dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; + trace_svcrdma_dma_map_page(&ctxt->sc_cid, dma_addr, len); ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; ctxt->sc_send_wr.num_sge++; return 0; out_maperr: - trace_svcrdma_dma_map_page(rdma, page); + trace_svcrdma_dma_map_err(&ctxt->sc_cid, dma_addr, len); return -EIO; } -/* ib_dma_map_page() is used here because svc_rdma_dma_unmap() +/** + * svc_rdma_iov_dma_map - DMA map an iovec + * @data: pointer to arguments + * @iov: kvec to DMA map + * + * ib_dma_map_page() is used here because svc_rdma_dma_unmap() * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if the iovec cannot be DMA mapped */ -static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt, - unsigned char *base, - unsigned int len) +static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov) { - return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base), - offset_in_page(base), len); + if (!iov->iov_len) + return 0; + return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base), + offset_in_page(iov->iov_base), + iov->iov_len); } /** - * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer - * @rdma: controlling transport - * @ctxt: send_ctxt for the Send WR - * @len: length of transport header + * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf + * @xdr: xdr_buf containing portion of an RPC message to transmit + * @data: pointer to arguments + * + * Returns: + * %0 if DMA mapping was successful + * %-EIO if DMA mapping failed * + * On failure, any DMA mappings that have been already done must be + * unmapped by the caller. */ -void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt, - unsigned int len) +static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data) { - ctxt->sc_sges[0].length = len; - ctxt->sc_send_wr.num_sge++; - ib_dma_sync_single_for_device(rdma->sc_pd->device, - ctxt->sc_sges[0].addr, len, - DMA_TO_DEVICE); + unsigned int len, remaining; + unsigned long pageoff; + struct page **ppages; + int ret; + + ret = svc_rdma_iov_dma_map(data, &xdr->head[0]); + if (ret < 0) + return ret; + + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + pageoff = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + len = min_t(u32, PAGE_SIZE - pageoff, remaining); + + ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len); + if (ret < 0) + return ret; + + remaining -= len; + pageoff = 0; + } + + ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]); + if (ret < 0) + return ret; + + return xdr->len; } -/* svc_rdma_map_reply_msg - Map the buffer holding RPC message - * @rdma: controlling transport - * @ctxt: send_ctxt for the Send WR - * @xdr: prepared xdr_buf containing RPC message - * @wr_lst: pointer to Call header's Write list, or NULL +struct svc_rdma_pullup_data { + u8 *pd_dest; + unsigned int pd_length; + unsigned int pd_num_sges; +}; + +/** + * svc_rdma_xb_count_sges - Count how many SGEs will be needed + * @xdr: xdr_buf containing portion of an RPC message to transmit + * @data: pointer to arguments * - * Load the xdr_buf into the ctxt's sge array, and DMA map each - * element as it is added. + * Returns: + * Number of SGEs needed to Send the contents of @xdr inline + */ +static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr, + void *data) +{ + struct svc_rdma_pullup_data *args = data; + unsigned int remaining; + unsigned long offset; + + if (xdr->head[0].iov_len) + ++args->pd_num_sges; + + offset = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + ++args->pd_num_sges; + remaining -= min_t(u32, PAGE_SIZE - offset, remaining); + offset = 0; + } + + if (xdr->tail[0].iov_len) + ++args->pd_num_sges; + + args->pd_length += xdr->len; + return 0; +} + +/** + * svc_rdma_pull_up_needed - Determine whether to use pull-up + * @rdma: controlling transport + * @sctxt: send_ctxt for the Send WR + * @write_pcl: Write chunk list provided by client + * @xdr: xdr_buf containing RPC message to transmit * - * Returns zero on success, or a negative errno on failure. + * Returns: + * %true if pull-up must be used + * %false otherwise */ -int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt, - struct xdr_buf *xdr, __be32 *wr_lst) +static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma, + const struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_pcl *write_pcl, + const struct xdr_buf *xdr) { - unsigned int len, remaining; - unsigned long page_off; - struct page **ppages; - unsigned char *base; - u32 xdr_pad; + /* Resources needed for the transport header */ + struct svc_rdma_pullup_data args = { + .pd_length = sctxt->sc_hdrbuf.len, + .pd_num_sges = 1, + }; int ret; - if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges) - return -EIO; - ret = svc_rdma_dma_map_buf(rdma, ctxt, - xdr->head[0].iov_base, - xdr->head[0].iov_len); + ret = pcl_process_nonpayloads(write_pcl, xdr, + svc_rdma_xb_count_sges, &args); if (ret < 0) - return ret; + return false; - /* If a Write chunk is present, the xdr_buf's page list - * is not included inline. However the Upper Layer may - * have added XDR padding in the tail buffer, and that - * should not be included inline. - */ - if (wr_lst) { - base = xdr->tail[0].iov_base; - len = xdr->tail[0].iov_len; - xdr_pad = xdr_padsize(xdr->page_len); - - if (len && xdr_pad) { - base += xdr_pad; - len -= xdr_pad; - } + if (args.pd_length < RPCRDMA_PULLUP_THRESH) + return true; + return args.pd_num_sges >= rdma->sc_max_send_sges; +} - goto tail; +/** + * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer + * @xdr: xdr_buf containing portion of an RPC message to copy + * @data: pointer to arguments + * + * Returns: + * Always zero. + */ +static int svc_rdma_xb_linearize(const struct xdr_buf *xdr, + void *data) +{ + struct svc_rdma_pullup_data *args = data; + unsigned int len, remaining; + unsigned long pageoff; + struct page **ppages; + + if (xdr->head[0].iov_len) { + memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len); + args->pd_dest += xdr->head[0].iov_len; } ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - page_off = xdr->page_base & ~PAGE_MASK; + pageoff = offset_in_page(xdr->page_base); remaining = xdr->page_len; while (remaining) { - len = min_t(u32, PAGE_SIZE - page_off, remaining); - - if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges) - return -EIO; - ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++, - page_off, len); - if (ret < 0) - return ret; - + len = min_t(u32, PAGE_SIZE - pageoff, remaining); + memcpy(args->pd_dest, page_address(*ppages) + pageoff, len); remaining -= len; - page_off = 0; + args->pd_dest += len; + pageoff = 0; + ppages++; } - base = xdr->tail[0].iov_base; - len = xdr->tail[0].iov_len; -tail: - if (len) { - if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges) - return -EIO; - ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len); - if (ret < 0) - return ret; + if (xdr->tail[0].iov_len) { + memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len); + args->pd_dest += xdr->tail[0].iov_len; } + args->pd_length += xdr->len; + return 0; +} + +/** + * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer + * @rdma: controlling transport + * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared + * @write_pcl: Write chunk list provided by client + * @xdr: prepared xdr_buf containing RPC message + * + * The device is not capable of sending the reply directly. + * Assemble the elements of @xdr into the transport header buffer. + * + * Assumptions: + * pull_up_needed has determined that @xdr will fit in the buffer. + * + * Returns: + * %0 if pull-up was successful + * %-EMSGSIZE if a buffer manipulation problem occurred + */ +static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_pcl *write_pcl, + const struct xdr_buf *xdr) +{ + struct svc_rdma_pullup_data args = { + .pd_dest = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len, + }; + int ret; + + ret = pcl_process_nonpayloads(write_pcl, xdr, + svc_rdma_xb_linearize, &args); + if (ret < 0) + return ret; + + sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length; + trace_svcrdma_send_pullup(sctxt, args.pd_length); return 0; } +/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message + * @rdma: controlling transport + * @sctxt: send_ctxt for the Send WR + * @write_pcl: Write chunk list provided by client + * @reply_pcl: Reply chunk provided by client + * @xdr: prepared xdr_buf containing RPC message + * + * Returns: + * %0 if DMA mapping was successful. + * %-EMSGSIZE if a buffer manipulation problem occurred + * %-EIO if DMA mapping failed + * + * The Send WR's num_sge field is set in all cases. + */ +int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_pcl *write_pcl, + const struct svc_rdma_pcl *reply_pcl, + const struct xdr_buf *xdr) +{ + struct svc_rdma_map_data args = { + .md_rdma = rdma, + .md_ctxt = sctxt, + }; + + /* Set up the (persistently-mapped) transport header SGE. */ + sctxt->sc_send_wr.num_sge = 1; + sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len; + + /* If there is a Reply chunk, nothing follows the transport + * header, so there is nothing to map. + */ + if (!pcl_is_empty(reply_pcl)) + return 0; + + /* For pull-up, svc_rdma_send() will sync the transport header. + * No additional DMA mapping is necessary. + */ + if (svc_rdma_pull_up_needed(rdma, sctxt, write_pcl, xdr)) + return svc_rdma_pull_up_reply_msg(rdma, sctxt, write_pcl, xdr); + + return pcl_process_nonpayloads(write_pcl, xdr, + svc_rdma_xb_dma_map, &args); +} + /* The svc_rqst and all resources it owns are released as soon as * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt * so they are released by the Send completion handler. @@ -640,78 +880,108 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. * * Depending on whether a Write list or Reply chunk is present, - * the server may send all, a portion of, or none of the xdr_buf. + * the server may Send all, a portion of, or none of the xdr_buf. * In the latter case, only the transport header (sc_sges[0]) is * transmitted. * - * RDMA Send is the last step of transmitting an RPC reply. Pages - * involved in the earlier RDMA Writes are here transferred out - * of the rqstp and into the sctxt's page array. These pages are - * DMA unmapped by each Write completion, but the subsequent Send - * completion finally releases these pages. - * * Assumptions: * - The Reply's transport header will never be larger than a page. */ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *sctxt, - struct svc_rdma_recv_ctxt *rctxt, - struct svc_rqst *rqstp, - __be32 *wr_lst, __be32 *rp_ch) + const struct svc_rdma_recv_ctxt *rctxt, + struct svc_rqst *rqstp) { + struct ib_send_wr *send_wr = &sctxt->sc_send_wr; int ret; - if (!rp_ch) { - ret = svc_rdma_map_reply_msg(rdma, sctxt, - &rqstp->rq_res, wr_lst); - if (ret < 0) - return ret; - } + ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl, + &rctxt->rc_reply_pcl, &rqstp->rq_res); + if (ret < 0) + return ret; + /* Transfer pages involved in RDMA Writes to the sctxt's + * page array. Completion handling releases these pages. + */ svc_rdma_save_io_pages(rqstp, sctxt); if (rctxt->rc_inv_rkey) { - sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; - sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey; + send_wr->opcode = IB_WR_SEND_WITH_INV; + send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey; } else { - sctxt->sc_send_wr.opcode = IB_WR_SEND; + send_wr->opcode = IB_WR_SEND; } - dprintk("svcrdma: posting Send WR with %u sge(s)\n", - sctxt->sc_send_wr.num_sge); - return svc_rdma_send(rdma, &sctxt->sc_send_wr); + + return svc_rdma_post_send(rdma, sctxt); } -/* Given the client-provided Write and Reply chunks, the server was not - * able to form a complete reply. Return an RDMA_ERROR message so the - * client can retire this RPC transaction. As above, the Send completion - * routine releases payload pages that were part of a previous RDMA Write. +/** + * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response + * @rdma: controlling transport context + * @sctxt: Send context for the response + * @rctxt: Receive context for incoming bad message + * @status: negative errno indicating error that occurred * - * Remote Invalidation is skipped for simplicity. + * Given the client-provided Read, Write, and Reply chunks, the + * server was not able to parse the Call or form a complete Reply. + * Return an RDMA_ERROR message so the client can retire the RPC + * transaction. + * + * The caller does not have to release @sctxt. It is released by + * Send completion, or by this function on error. */ -static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, - struct svc_rdma_send_ctxt *ctxt, - struct svc_rqst *rqstp) +void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + struct svc_rdma_recv_ctxt *rctxt, + int status) { + __be32 *rdma_argp = rctxt->rc_recv_buf; __be32 *p; - int ret; - p = ctxt->sc_xprt_buf; - trace_svcrdma_err_chunk(*p); - p += 3; - *p++ = rdma_error; - *p = err_chunk; - svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR); + rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0); + xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf, + sctxt->sc_xprt_buf, NULL); - svc_rdma_save_io_pages(rqstp, ctxt); + p = xdr_reserve_space(&sctxt->sc_stream, + rpcrdma_fixed_maxsz * sizeof(*p)); + if (!p) + goto put_ctxt; - ctxt->sc_send_wr.opcode = IB_WR_SEND; - ret = svc_rdma_send(rdma, &ctxt->sc_send_wr); - if (ret) { - svc_rdma_send_ctxt_put(rdma, ctxt); - return ret; + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); + *p++ = rdma->sc_fc_credits; + *p = rdma_error; + + switch (status) { + case -EPROTONOSUPPORT: + p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p)); + if (!p) + goto put_ctxt; + + *p++ = err_vers; + *p++ = rpcrdma_version; + *p = rpcrdma_version; + trace_svcrdma_err_vers(*rdma_argp); + break; + default: + p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p)); + if (!p) + goto put_ctxt; + + *p = err_chunk; + trace_svcrdma_err_chunk(*rdma_argp); } - return 0; + /* Remote Invalidation is skipped for simplicity. */ + sctxt->sc_send_wr.num_sge = 1; + sctxt->sc_send_wr.opcode = IB_WR_SEND; + sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len; + if (svc_rdma_post_send(rdma, sctxt)) + goto put_ctxt; + return; + +put_ctxt: + svc_rdma_send_ctxt_put(rdma, sctxt); } /** @@ -732,78 +1002,109 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) struct svcxprt_rdma *rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; - __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; - struct xdr_buf *xdr = &rqstp->rq_res; + __be32 *rdma_argp = rctxt->rc_recv_buf; struct svc_rdma_send_ctxt *sctxt; + unsigned int rc_size; + __be32 *p; int ret; - rdma_argp = rctxt->rc_recv_buf; - svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); + ret = -ENOTCONN; + if (svc_xprt_is_dead(xprt)) + goto drop_connection; - /* Create the RDMA response header. xprt->xpt_mutex, - * acquired in svc_send(), serializes RPC replies. The - * code path below that inserts the credit grant value - * into each transport header runs only inside this - * critical section. - */ ret = -ENOMEM; sctxt = svc_rdma_send_ctxt_get(rdma); if (!sctxt) - goto err0; - rdma_resp = sctxt->sc_xprt_buf; + goto drop_connection; - p = rdma_resp; - *p++ = *rdma_argp; - *p++ = *(rdma_argp + 1); - *p++ = rdma->sc_fc_credits; - *p++ = rp_ch ? rdma_nomsg : rdma_msg; + ret = -EMSGSIZE; + p = xdr_reserve_space(&sctxt->sc_stream, + rpcrdma_fixed_maxsz * sizeof(*p)); + if (!p) + goto put_ctxt; - /* Start with empty chunks */ - *p++ = xdr_zero; - *p++ = xdr_zero; - *p = xdr_zero; + ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res); + if (ret < 0) + goto put_ctxt; - if (wr_lst) { - /* XXX: Presume the client sent only one Write chunk */ - ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr); - if (ret < 0) - goto err2; - svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); - } - if (rp_ch) { - ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); + rc_size = 0; + if (!pcl_is_empty(&rctxt->rc_reply_pcl)) { + ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl, + &rctxt->rc_reply_pcl, sctxt, + &rqstp->rq_res); if (ret < 0) - goto err2; - svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); + goto reply_chunk; + rc_size = ret; } - svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp)); - ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp, - wr_lst, rp_ch); + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); + *p++ = rdma->sc_fc_credits; + *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg; + + ret = svc_rdma_encode_read_list(sctxt); if (ret < 0) - goto err1; - ret = 0; + goto put_ctxt; + ret = svc_rdma_encode_write_list(rctxt, sctxt); + if (ret < 0) + goto put_ctxt; + ret = svc_rdma_encode_reply_chunk(rctxt, sctxt, rc_size); + if (ret < 0) + goto put_ctxt; -out: - rqstp->rq_xprt_ctxt = NULL; - svc_rdma_recv_ctxt_put(rdma, rctxt); - return ret; + ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp); + if (ret < 0) + goto put_ctxt; + return 0; - err2: +reply_chunk: if (ret != -E2BIG && ret != -EINVAL) - goto err1; + goto put_ctxt; - ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp); - if (ret < 0) - goto err1; - ret = 0; - goto out; + /* Send completion releases payload pages that were part + * of previously posted RDMA Writes. + */ + svc_rdma_save_io_pages(rqstp, sctxt); + svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret); + return 0; - err1: +put_ctxt: svc_rdma_send_ctxt_put(rdma, sctxt); - err0: - trace_svcrdma_send_failed(rqstp, ret); - set_bit(XPT_CLOSE, &xprt->xpt_flags); - ret = -ENOTCONN; - goto out; +drop_connection: + trace_svcrdma_send_err(rqstp, ret); + svc_xprt_deferred_close(&rdma->sc_xprt); + return -ENOTCONN; +} + +/** + * svc_rdma_result_payload - special processing for a result payload + * @rqstp: RPC transaction context + * @offset: payload's byte offset in @rqstp->rq_res + * @length: size of payload, in bytes + * + * Assign the passed-in result payload to the current Write chunk, + * and advance to cur_result_payload to the next Write chunk, if + * there is one. + * + * Return values: + * %0 if successful or nothing needed to be done + * %-E2BIG if the payload was larger than the Write chunk + */ +int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset, + unsigned int length) +{ + struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; + struct svc_rdma_chunk *chunk; + + chunk = rctxt->rc_cur_result_payload; + if (!length || !chunk) + return 0; + rctxt->rc_cur_result_payload = + pcl_next_chunk(&rctxt->rc_write_pcl, chunk); + + if (length > chunk->ch_length) + return -E2BIG; + chunk->ch_position = offset; + chunk->ch_payload_length = length; + return 0; } |
