summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2019-01-02 16:35:23 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2019-01-02 16:35:23 -0800
commite6b92572808467f35fd159d47c45b650de29e722 (patch)
tree5fbd2e6279539c4f3eeeca0d6a69779bdbd0d6a4 /net/sunrpc/xprtrdma
parente45428a436765fcd154d461a2739b5640916dc00 (diff)
parent260f71eff493a844531629854c0935fa8de4fa2c (diff)
Merge tag 'nfs-for-4.21-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker: "Stable bugfixes: - xprtrdma: Yet another double DMA-unmap # v4.20 Features: - Allow some /proc/sys/sunrpc entries without CONFIG_SUNRPC_DEBUG - Per-xprt rdma receive workqueues - Drop support for FMR memory registration - Make port= mount option optional for RDMA mounts Other bugfixes and cleanups: - Remove unused nfs4_xdev_fs_type declaration - Fix comments for behavior that has changed - Remove generic RPC credentials by switching to 'struct cred' - Fix crossing mountpoints with different auth flavors - Various xprtrdma fixes from testing and auditing the close code - Fixes for disconnect issues when using xprtrdma with krb5 - Clean up and improve xprtrdma trace points - Fix NFS v4.2 async copy reboot recovery" * tag 'nfs-for-4.21-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (63 commits) sunrpc: convert to DEFINE_SHOW_ATTRIBUTE sunrpc: Add xprt after nfs4_test_session_trunk() sunrpc: convert unnecessary GFP_ATOMIC to GFP_NOFS sunrpc: handle ENOMEM in rpcb_getport_async NFS: remove unnecessary test for IS_ERR(cred) xprtrdma: Prevent leak of rpcrdma_rep objects NFSv4.2 fix async copy reboot recovery xprtrdma: Don't leak freed MRs xprtrdma: Add documenting comment for rpcrdma_buffer_destroy xprtrdma: Replace outdated comment for rpcrdma_ep_post xprtrdma: Update comments in frwr_op_send SUNRPC: Fix some kernel doc complaints SUNRPC: Simplify defining common RPC trace events NFS: Fix NFSv4 symbolic trace point output xprtrdma: Trace mapping, alloc, and dereg failures xprtrdma: Add trace points for calls to transport switch methods xprtrdma: Relocate the xprtrdma_mr_map trace points xprtrdma: Clean up of xprtrdma chunk trace points xprtrdma: Remove unused fields from rpcrdma_ia xprtrdma: Cull dprintk() call sites ...
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/Makefile3
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c39
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c337
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c209
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c78
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c8
-rw-r--r--net/sunrpc/xprtrdma/transport.c91
-rw-r--r--net/sunrpc/xprtrdma/verbs.c255
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h80
9 files changed, 343 insertions, 757 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 8bf19e142b6b..8ed0377d7a18 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,8 +1,7 @@
# SPDX-License-Identifier: GPL-2.0
obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
-rpcrdma-y := transport.o rpc_rdma.o verbs.o \
- fmr_ops.o frwr_ops.o \
+rpcrdma-y := transport.o rpc_rdma.o verbs.o frwr_ops.o \
svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
module.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index edba0d35776b..0de9b3e63770 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -5,7 +5,6 @@
* Support for backward direction RPCs on RPC/RDMA.
*/
-#include <linux/module.h>
#include <linux/sunrpc/xprt.h>
#include <linux/sunrpc/svc.h>
#include <linux/sunrpc/svc_xprt.h>
@@ -20,29 +19,16 @@
#undef RPCRDMA_BACKCHANNEL_DEBUG
-static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
- struct rpc_rqst *rqst)
-{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-
- spin_lock(&buf->rb_reqslock);
- list_del(&req->rl_all);
- spin_unlock(&buf->rb_reqslock);
-
- rpcrdma_destroy_req(req);
-}
-
static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
unsigned int count)
{
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct rpcrdma_req *req;
struct rpc_rqst *rqst;
unsigned int i;
for (i = 0; i < (count << 1); i++) {
struct rpcrdma_regbuf *rb;
- struct rpcrdma_req *req;
size_t size;
req = rpcrdma_create_req(r_xprt);
@@ -68,7 +54,7 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
return 0;
out_fail:
- rpcrdma_bc_free_rqst(r_xprt, rqst);
+ rpcrdma_req_destroy(req);
return -ENOMEM;
}
@@ -101,7 +87,6 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
goto out_free;
r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
- request_module("svcrdma");
trace_xprtrdma_cb_setup(r_xprt, reqs);
return 0;
@@ -173,21 +158,21 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
*/
int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
{
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
int rc;
- if (!xprt_connected(rqst->rq_xprt))
- goto drop_connection;
+ if (!xprt_connected(xprt))
+ return -ENOTCONN;
- if (!xprt_request_get_cong(rqst->rq_xprt, rqst))
+ if (!xprt_request_get_cong(xprt, rqst))
return -EBADSLT;
rc = rpcrdma_bc_marshal_reply(rqst);
if (rc < 0)
goto failed_marshal;
- rpcrdma_post_recvs(r_xprt, true);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection;
return 0;
@@ -196,7 +181,7 @@ failed_marshal:
if (rc != -ENOTCONN)
return rc;
drop_connection:
- xprt_disconnect_done(rqst->rq_xprt);
+ xprt_rdma_close(xprt);
return -ENOTCONN;
}
@@ -207,7 +192,6 @@ drop_connection:
*/
void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
{
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpc_rqst *rqst, *tmp;
spin_lock(&xprt->bc_pa_lock);
@@ -215,7 +199,7 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
list_del(&rqst->rq_bc_pa_list);
spin_unlock(&xprt->bc_pa_lock);
- rpcrdma_bc_free_rqst(r_xprt, rqst);
+ rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
spin_lock(&xprt->bc_pa_lock);
}
@@ -231,9 +215,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpc_xprt *xprt = rqst->rq_xprt;
- dprintk("RPC: %s: freeing rqst %p (req %p)\n",
- __func__, rqst, req);
-
rpcrdma_recv_buffer_put(req->rl_reply);
req->rl_reply = NULL;
@@ -319,7 +300,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
out_overflow:
pr_warn("RPC/RDMA backchannel overflow\n");
- xprt_disconnect_done(xprt);
+ xprt_force_disconnect(xprt);
/* This receive buffer gets reposted automatically
* when the connection is re-established.
*/
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
deleted file mode 100644
index fd8fea59fe92..000000000000
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ /dev/null
@@ -1,337 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * Copyright (c) 2015, 2017 Oracle. All rights reserved.
- * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
- */
-
-/* Lightweight memory registration using Fast Memory Regions (FMR).
- * Referred to sometimes as MTHCAFMR mode.
- *
- * FMR uses synchronous memory registration and deregistration.
- * FMR registration is known to be fast, but FMR deregistration
- * can take tens of usecs to complete.
- */
-
-/* Normal operation
- *
- * A Memory Region is prepared for RDMA READ or WRITE using the
- * ib_map_phys_fmr verb (fmr_op_map). When the RDMA operation is
- * finished, the Memory Region is unmapped using the ib_unmap_fmr
- * verb (fmr_op_unmap).
- */
-
-#include <linux/sunrpc/svc_rdma.h>
-
-#include "xprt_rdma.h"
-#include <trace/events/rpcrdma.h>
-
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY RPCDBG_TRANS
-#endif
-
-/* Maximum scatter/gather per FMR */
-#define RPCRDMA_MAX_FMR_SGES (64)
-
-/* Access mode of externally registered pages */
-enum {
- RPCRDMA_FMR_ACCESS_FLAGS = IB_ACCESS_REMOTE_WRITE |
- IB_ACCESS_REMOTE_READ,
-};
-
-bool
-fmr_is_supported(struct rpcrdma_ia *ia)
-{
- if (!ia->ri_device->ops.alloc_fmr) {
- pr_info("rpcrdma: 'fmr' mode is not supported by device %s\n",
- ia->ri_device->name);
- return false;
- }
- return true;
-}
-
-static void
-__fmr_unmap(struct rpcrdma_mr *mr)
-{
- LIST_HEAD(l);
- int rc;
-
- list_add(&mr->fmr.fm_mr->list, &l);
- rc = ib_unmap_fmr(&l);
- list_del(&mr->fmr.fm_mr->list);
- if (rc)
- pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
- mr, rc);
-}
-
-/* Release an MR.
- */
-static void
-fmr_op_release_mr(struct rpcrdma_mr *mr)
-{
- int rc;
-
- kfree(mr->fmr.fm_physaddrs);
- kfree(mr->mr_sg);
-
- /* In case this one was left mapped, try to unmap it
- * to prevent dealloc_fmr from failing with EBUSY
- */
- __fmr_unmap(mr);
-
- rc = ib_dealloc_fmr(mr->fmr.fm_mr);
- if (rc)
- pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n",
- mr, rc);
-
- kfree(mr);
-}
-
-/* MRs are dynamically allocated, so simply clean up and release the MR.
- * A replacement MR will subsequently be allocated on demand.
- */
-static void
-fmr_mr_recycle_worker(struct work_struct *work)
-{
- struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-
- trace_xprtrdma_mr_recycle(mr);
-
- trace_xprtrdma_mr_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
- mr->mr_sg, mr->mr_nents, mr->mr_dir);
-
- spin_lock(&r_xprt->rx_buf.rb_mrlock);
- list_del(&mr->mr_all);
- r_xprt->rx_stats.mrs_recycled++;
- spin_unlock(&r_xprt->rx_buf.rb_mrlock);
- fmr_op_release_mr(mr);
-}
-
-static int
-fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
-{
- static struct ib_fmr_attr fmr_attr = {
- .max_pages = RPCRDMA_MAX_FMR_SGES,
- .max_maps = 1,
- .page_shift = PAGE_SHIFT
- };
-
- mr->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES,
- sizeof(u64), GFP_KERNEL);
- if (!mr->fmr.fm_physaddrs)
- goto out_free;
-
- mr->mr_sg = kcalloc(RPCRDMA_MAX_FMR_SGES,
- sizeof(*mr->mr_sg), GFP_KERNEL);
- if (!mr->mr_sg)
- goto out_free;
-
- sg_init_table(mr->mr_sg, RPCRDMA_MAX_FMR_SGES);
-
- mr->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS,
- &fmr_attr);
- if (IS_ERR(mr->fmr.fm_mr))
- goto out_fmr_err;
-
- INIT_LIST_HEAD(&mr->mr_list);
- INIT_WORK(&mr->mr_recycle, fmr_mr_recycle_worker);
- return 0;
-
-out_fmr_err:
- dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__,
- PTR_ERR(mr->fmr.fm_mr));
-
-out_free:
- kfree(mr->mr_sg);
- kfree(mr->fmr.fm_physaddrs);
- return -ENOMEM;
-}
-
-/* On success, sets:
- * ep->rep_attr.cap.max_send_wr
- * ep->rep_attr.cap.max_recv_wr
- * cdata->max_requests
- * ia->ri_max_segs
- */
-static int
-fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
- struct rpcrdma_create_data_internal *cdata)
-{
- int max_qp_wr;
-
- max_qp_wr = ia->ri_device->attrs.max_qp_wr;
- max_qp_wr -= RPCRDMA_BACKWARD_WRS;
- max_qp_wr -= 1;
- if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
- return -ENOMEM;
- if (cdata->max_requests > max_qp_wr)
- cdata->max_requests = max_qp_wr;
- ep->rep_attr.cap.max_send_wr = cdata->max_requests;
- ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
- ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
- ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
- ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
- ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
-
- ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
- RPCRDMA_MAX_FMR_SGES);
- ia->ri_max_segs += 2; /* segments for head and tail buffers */
- return 0;
-}
-
-/* FMR mode conveys up to 64 pages of payload per chunk segment.
- */
-static size_t
-fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
-{
- return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
-}
-
-/* Use the ib_map_phys_fmr() verb to register a memory region
- * for remote access via RDMA READ or RDMA WRITE.
- */
-static struct rpcrdma_mr_seg *
-fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing, struct rpcrdma_mr **out)
-{
- struct rpcrdma_mr_seg *seg1 = seg;
- int len, pageoff, i, rc;
- struct rpcrdma_mr *mr;
- u64 *dma_pages;
-
- mr = rpcrdma_mr_get(r_xprt);
- if (!mr)
- return ERR_PTR(-EAGAIN);
-
- pageoff = offset_in_page(seg1->mr_offset);
- seg1->mr_offset -= pageoff; /* start of page */
- seg1->mr_len += pageoff;
- len = -pageoff;
- if (nsegs > RPCRDMA_MAX_FMR_SGES)
- nsegs = RPCRDMA_MAX_FMR_SGES;
- for (i = 0; i < nsegs;) {
- if (seg->mr_page)
- sg_set_page(&mr->mr_sg[i],
- seg->mr_page,
- seg->mr_len,
- offset_in_page(seg->mr_offset));
- else
- sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
- seg->mr_len);
- len += seg->mr_len;
- ++seg;
- ++i;
- /* Check for holes */
- if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
- offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
- break;
- }
- mr->mr_dir = rpcrdma_data_dir(writing);
-
- mr->mr_nents = ib_dma_map_sg(r_xprt->rx_ia.ri_device,
- mr->mr_sg, i, mr->mr_dir);
- if (!mr->mr_nents)
- goto out_dmamap_err;
- trace_xprtrdma_mr_map(mr);
-
- for (i = 0, dma_pages = mr->fmr.fm_physaddrs; i < mr->mr_nents; i++)
- dma_pages[i] = sg_dma_address(&mr->mr_sg[i]);
- rc = ib_map_phys_fmr(mr->fmr.fm_mr, dma_pages, mr->mr_nents,
- dma_pages[0]);
- if (rc)
- goto out_maperr;
-
- mr->mr_handle = mr->fmr.fm_mr->rkey;
- mr->mr_length = len;
- mr->mr_offset = dma_pages[0] + pageoff;
-
- *out = mr;
- return seg;
-
-out_dmamap_err:
- pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
- mr->mr_sg, i);
- rpcrdma_mr_put(mr);
- return ERR_PTR(-EIO);
-
-out_maperr:
- pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
- len, (unsigned long long)dma_pages[0],
- pageoff, mr->mr_nents, rc);
- rpcrdma_mr_unmap_and_put(mr);
- return ERR_PTR(-EIO);
-}
-
-/* Post Send WR containing the RPC Call message.
- */
-static int
-fmr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
-{
- return ib_post_send(ia->ri_id->qp, &req->rl_sendctx->sc_wr, NULL);
-}
-
-/* Invalidate all memory regions that were registered for "req".
- *
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
- *
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
- */
-static void
-fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
-{
- struct rpcrdma_mr *mr;
- LIST_HEAD(unmap_list);
- int rc;
-
- /* ORDER: Invalidate all of the req's MRs first
- *
- * ib_unmap_fmr() is slow, so use a single call instead
- * of one call per mapped FMR.
- */
- list_for_each_entry(mr, mrs, mr_list) {
- dprintk("RPC: %s: unmapping fmr %p\n",
- __func__, &mr->fmr);
- trace_xprtrdma_mr_localinv(mr);
- list_add_tail(&mr->fmr.fm_mr->list, &unmap_list);
- }
- r_xprt->rx_stats.local_inv_needed++;
- rc = ib_unmap_fmr(&unmap_list);
- if (rc)
- goto out_release;
-
- /* ORDER: Now DMA unmap all of the req's MRs, and return
- * them to the free MW list.
- */
- while (!list_empty(mrs)) {
- mr = rpcrdma_mr_pop(mrs);
- list_del(&mr->fmr.fm_mr->list);
- rpcrdma_mr_unmap_and_put(mr);
- }
-
- return;
-
-out_release:
- pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
-
- while (!list_empty(mrs)) {
- mr = rpcrdma_mr_pop(mrs);
- list_del(&mr->fmr.fm_mr->list);
- rpcrdma_mr_recycle(mr);
- }
-}
-
-const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
- .ro_map = fmr_op_map,
- .ro_send = fmr_op_send,
- .ro_unmap_sync = fmr_op_unmap_sync,
- .ro_open = fmr_op_open,
- .ro_maxpages = fmr_op_maxpages,
- .ro_init_mr = fmr_op_init_mr,
- .ro_release_mr = fmr_op_release_mr,
- .ro_displayname = "fmr",
- .ro_send_w_inv_ok = 0,
-};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index fc6378cc0c1c..6a561056b538 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -15,21 +15,21 @@
/* Normal operation
*
* A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
- * Work Request (frwr_op_map). When the RDMA operation is finished, this
+ * Work Request (frwr_map). When the RDMA operation is finished, this
* Memory Region is invalidated using a LOCAL_INV Work Request
- * (frwr_op_unmap_sync).
+ * (frwr_unmap_sync).
*
* Typically these Work Requests are not signaled, and neither are RDMA
* SEND Work Requests (with the exception of signaling occasionally to
* prevent provider work queue overflows). This greatly reduces HCA
* interrupt workload.
*
- * As an optimization, frwr_op_unmap marks MRs INVALID before the
+ * As an optimization, frwr_unmap marks MRs INVALID before the
* LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
* rb_mrs immediately so that no work (like managing a linked list
* under a spinlock) is needed in the completion upcall.
*
- * But this means that frwr_op_map() can occasionally encounter an MR
+ * But this means that frwr_map() can occasionally encounter an MR
* that is INVALID but the LOCAL_INV WR has not completed. Work Queue
* ordering prevents a subsequent FAST_REG WR from executing against
* that MR while it is still being invalidated.
@@ -57,14 +57,14 @@
* FLUSHED_LI: The MR was being invalidated when the QP entered ERROR
* state, and the pending WR was flushed.
*
- * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered
+ * When frwr_map encounters FLUSHED and VALID MRs, they are recovered
* with ib_dereg_mr and then are re-initialized. Because MR recovery
* allocates fresh resources, it is deferred to a workqueue, and the
* recovered MRs are placed back on the rb_mrs list when recovery is
- * complete. frwr_op_map allocates another MR for the current RPC while
+ * complete. frwr_map allocates another MR for the current RPC while
* the broken MR is reset.
*
- * To ensure that frwr_op_map doesn't encounter an MR that is marked
+ * To ensure that frwr_map doesn't encounter an MR that is marked
* INVALID but that is about to be flushed due to a previous transport
* disconnect, the transport connect worker attempts to drain all
* pending send queue WRs before the transport is reconnected.
@@ -80,8 +80,13 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
-bool
-frwr_is_supported(struct rpcrdma_ia *ia)
+/**
+ * frwr_is_supported - Check if device supports FRWR
+ * @ia: interface adapter to check
+ *
+ * Returns true if device supports FRWR, otherwise false
+ */
+bool frwr_is_supported(struct rpcrdma_ia *ia)
{
struct ib_device_attr *attrs = &ia->ri_device->attrs;
@@ -97,15 +102,18 @@ out_not_supported:
return false;
}
-static void
-frwr_op_release_mr(struct rpcrdma_mr *mr)
+/**
+ * frwr_release_mr - Destroy one MR
+ * @mr: MR allocated by frwr_init_mr
+ *
+ */
+void frwr_release_mr(struct rpcrdma_mr *mr)
{
int rc;
rc = ib_dereg_mr(mr->frwr.fr_mr);
if (rc)
- pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
- mr, rc);
+ trace_xprtrdma_frwr_dereg(mr, rc);
kfree(mr->mr_sg);
kfree(mr);
}
@@ -117,60 +125,78 @@ static void
frwr_mr_recycle_worker(struct work_struct *work)
{
struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
- enum rpcrdma_frwr_state state = mr->frwr.fr_state;
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
trace_xprtrdma_mr_recycle(mr);
- if (state != FRWR_FLUSHED_LI) {
+ if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
+ mr->mr_dir = DMA_NONE;
}
spin_lock(&r_xprt->rx_buf.rb_mrlock);
list_del(&mr->mr_all);
r_xprt->rx_stats.mrs_recycled++;
spin_unlock(&r_xprt->rx_buf.rb_mrlock);
- frwr_op_release_mr(mr);
+
+ frwr_release_mr(mr);
}
-static int
-frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
+/**
+ * frwr_init_mr - Initialize one MR
+ * @ia: interface adapter
+ * @mr: generic MR to prepare for FRWR
+ *
+ * Returns zero if successful. Otherwise a negative errno
+ * is returned.
+ */
+int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
{
unsigned int depth = ia->ri_max_frwr_depth;
- struct rpcrdma_frwr *frwr = &mr->frwr;
+ struct scatterlist *sg;
+ struct ib_mr *frmr;
int rc;
- frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
- if (IS_ERR(frwr->fr_mr))
+ frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
+ if (IS_ERR(frmr))
goto out_mr_err;
- mr->mr_sg = kcalloc(depth, sizeof(*mr->mr_sg), GFP_KERNEL);
- if (!mr->mr_sg)
+ sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL);
+ if (!sg)
goto out_list_err;
+ mr->frwr.fr_mr = frmr;
+ mr->frwr.fr_state = FRWR_IS_INVALID;
+ mr->mr_dir = DMA_NONE;
INIT_LIST_HEAD(&mr->mr_list);
INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
- sg_init_table(mr->mr_sg, depth);
- init_completion(&frwr->fr_linv_done);
+ init_completion(&mr->frwr.fr_linv_done);
+
+ sg_init_table(sg, depth);
+ mr->mr_sg = sg;
return 0;
out_mr_err:
- rc = PTR_ERR(frwr->fr_mr);
- dprintk("RPC: %s: ib_alloc_mr status %i\n",
- __func__, rc);
+ rc = PTR_ERR(frmr);
+ trace_xprtrdma_frwr_alloc(mr, rc);
return rc;
out_list_err:
- rc = -ENOMEM;
dprintk("RPC: %s: sg allocation failure\n",
__func__);
- ib_dereg_mr(frwr->fr_mr);
- return rc;
+ ib_dereg_mr(frmr);
+ return -ENOMEM;
}
-/* On success, sets:
+/**
+ * frwr_open - Prepare an endpoint for use with FRWR
+ * @ia: interface adapter this endpoint will use
+ * @ep: endpoint to prepare
+ * @cdata: transport parameters
+ *
+ * On success, sets:
* ep->rep_attr.cap.max_send_wr
* ep->rep_attr.cap.max_recv_wr
* cdata->max_requests
@@ -179,10 +205,11 @@ out_list_err:
* And these FRWR-related fields:
* ia->ri_max_frwr_depth
* ia->ri_mrtype
+ *
+ * On failure, a negative errno is returned.
*/
-static int
-frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
- struct rpcrdma_create_data_internal *cdata)
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
+ struct rpcrdma_create_data_internal *cdata)
{
struct ib_device_attr *attrs = &ia->ri_device->attrs;
int max_qp_wr, depth, delta;
@@ -191,10 +218,17 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
- ia->ri_max_frwr_depth =
- min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- attrs->max_fast_reg_page_list_len);
- dprintk("RPC: %s: device's max FR page list len = %u\n",
+ /* Quirk: Some devices advertise a large max_fast_reg_page_list_len
+ * capability, but perform optimally when the MRs are not larger
+ * than a page.
+ */
+ if (attrs->max_sge_rd > 1)
+ ia->ri_max_frwr_depth = attrs->max_sge_rd;
+ else
+ ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
+ if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
+ ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
+ dprintk("RPC: %s: max FR page list depth = %u\n",
__func__, ia->ri_max_frwr_depth);
/* Add room for frwr register and invalidate WRs.
@@ -242,20 +276,28 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
ia->ri_max_frwr_depth);
- ia->ri_max_segs += 2; /* segments for head and tail buffers */
+ /* Reply chunks require segments for head and tail buffers */
+ ia->ri_max_segs += 2;
+ if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
+ ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
return 0;
}
-/* FRWR mode conveys a list of pages per chunk segment. The
+/**
+ * frwr_maxpages - Compute size of largest payload
+ * @r_xprt: transport
+ *
+ * Returns maximum size of an RPC message, in pages.
+ *
+ * FRWR mode conveys a list of pages per chunk segment. The
* maximum length of that list is the FRWR page list depth.
*/
-static size_t
-frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
+size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frwr_depth);
+ (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
}
static void
@@ -332,12 +374,25 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
trace_xprtrdma_wc_li_wake(wc, frwr);
}
-/* Post a REG_MR Work Request to register a memory region
+/**
+ * frwr_map - Register a memory region
+ * @r_xprt: controlling transport
+ * @seg: memory region co-ordinates
+ * @nsegs: number of segments remaining
+ * @writing: true when RDMA Write will be used
+ * @xid: XID of RPC using the registered memory
+ * @out: initialized MR
+ *
+ * Prepare a REG_MR Work Request to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
+ *
+ * Returns the next segment or a negative errno pointer.
+ * On success, the prepared MR is planted in @out.
*/
-static struct rpcrdma_mr_seg *
-frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing, struct rpcrdma_mr **out)
+struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing, u32 xid,
+ struct rpcrdma_mr **out)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
@@ -384,13 +439,14 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir);
if (!mr->mr_nents)
goto out_dmamap_err;
- trace_xprtrdma_mr_map(mr);
ibmr = frwr->fr_mr;
n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
if (unlikely(n != mr->mr_nents))
goto out_mapmr_err;
+ ibmr->iova &= 0x00000000ffffffff;
+ ibmr->iova |= ((u64)cpu_to_be32(xid)) << 32;
key = (u8)(ibmr->rkey & 0x000000FF);
ib_update_fast_reg_key(ibmr, ++key);
@@ -404,32 +460,35 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
mr->mr_handle = ibmr->rkey;
mr->mr_length = ibmr->length;
mr->mr_offset = ibmr->iova;
+ trace_xprtrdma_mr_map(mr);
*out = mr;
return seg;
out_dmamap_err:
- pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
- mr->mr_sg, i);
frwr->fr_state = FRWR_IS_INVALID;
+ trace_xprtrdma_frwr_sgerr(mr, i);
rpcrdma_mr_put(mr);
return ERR_PTR(-EIO);
out_mapmr_err:
- pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
- frwr->fr_mr, n, mr->mr_nents);
+ trace_xprtrdma_frwr_maperr(mr, n);
rpcrdma_mr_recycle(mr);
return ERR_PTR(-EIO);
}
-/* Post Send WR containing the RPC Call message.
+/**
+ * frwr_send - post Send WR containing the RPC Call message
+ * @ia: interface adapter
+ * @req: Prepared RPC Call
*
- * For FRMR, chain any FastReg WRs to the Send WR. Only a
+ * For FRWR, chain any FastReg WRs to the Send WR. Only a
* single ib_post_send call is needed to register memory
* and then post the Send WR.
+ *
+ * Returns the result of ib_post_send.
*/
-static int
-frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
struct ib_send_wr *post_wr;
struct rpcrdma_mr *mr;
@@ -451,15 +510,18 @@ frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
}
/* If ib_post_send fails, the next ->send_request for
- * @req will queue these MWs for recovery.
+ * @req will queue these MRs for recovery.
*/
return ib_post_send(ia->ri_id->qp, post_wr, NULL);
}
-/* Handle a remotely invalidated mr on the @mrs list
+/**
+ * frwr_reminv - handle a remotely invalidated mr on the @mrs list
+ * @rep: Received reply
+ * @mrs: list of MRs to check
+ *
*/
-static void
-frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
+void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
{
struct rpcrdma_mr *mr;
@@ -473,7 +535,10 @@ frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
}
}
-/* Invalidate all memory regions that were registered for "req".
+/**
+ * frwr_unmap_sync - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport
+ * @mrs: list of MRs to process
*
* Sleeps until it is safe for the host CPU to access the
* previously mapped memory regions.
@@ -481,8 +546,7 @@ frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
* Caller ensures that @mrs is not empty before the call. This
* function empties the list.
*/
-static void
-frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
{
struct ib_send_wr *first, **prev, *last;
const struct ib_send_wr *bad_wr;
@@ -561,20 +625,7 @@ out_release:
mr = container_of(frwr, struct rpcrdma_mr, frwr);
bad_wr = bad_wr->next;
- list_del(&mr->mr_list);
- frwr_op_release_mr(mr);
+ list_del_init(&mr->mr_list);
+ rpcrdma_mr_recycle(mr);
}
}
-
-const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
- .ro_map = frwr_op_map,
- .ro_send = frwr_op_send,
- .ro_reminv = frwr_op_reminv,
- .ro_unmap_sync = frwr_op_unmap_sync,
- .ro_open = frwr_op_open,
- .ro_maxpages = frwr_op_maxpages,
- .ro_init_mr = frwr_op_init_mr,
- .ro_release_mr = frwr_op_release_mr,
- .ro_displayname = "frwr",
- .ro_send_w_inv_ok = RPCRDMA_CMP_F_SND_W_INV_OK,
-};
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 9f53e0240035..d18614e02b4e 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -218,11 +218,12 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdrbuf->page_base);
while (len) {
- if (unlikely(!*ppages)) {
- /* XXX: Certain upper layer operations do
- * not provide receive buffer pages.
- */
- *ppages = alloc_page(GFP_ATOMIC);
+ /* ACL likes to be lazy in allocating pages - ACLs
+ * are small by default but can get huge.
+ */
+ if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
+ if (!*ppages)
+ *ppages = alloc_page(GFP_ATOMIC);
if (!*ppages)
return -ENOBUFS;
}
@@ -356,8 +357,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return nsegs;
do {
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
- false, &mr);
+ seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
rpcrdma_mr_push(mr, &req->rl_registered);
@@ -365,7 +365,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (encode_read_segment(xdr, mr, pos) < 0)
return -EMSGSIZE;
- trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
+ trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
r_xprt->rx_stats.read_chunk_count++;
nsegs -= mr->mr_nents;
} while (nsegs);
@@ -414,8 +414,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nchunks = 0;
do {
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
- true, &mr);
+ seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
rpcrdma_mr_push(mr, &req->rl_registered);
@@ -423,7 +422,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
- trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
+ trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
r_xprt->rx_stats.write_chunk_count++;
r_xprt->rx_stats.total_rdma_request += mr->mr_length;
nchunks++;
@@ -472,8 +471,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nchunks = 0;
do {
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
- true, &mr);
+ seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
rpcrdma_mr_push(mr, &req->rl_registered);
@@ -481,7 +479,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
- trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
+ trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
r_xprt->rx_stats.reply_chunk_count++;
r_xprt->rx_stats.total_rdma_request += mr->mr_length;
nchunks++;
@@ -667,7 +665,7 @@ out_mapping_overflow:
out_mapping_err:
rpcrdma_unmap_sendctx(sc);
- pr_err("rpcrdma: Send mapping error\n");
+ trace_xprtrdma_dma_maperr(sge[sge_no].addr);
return false;
}
@@ -1188,17 +1186,20 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
p = xdr_inline_decode(xdr, 2 * sizeof(*p));
if (!p)
break;
- dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
- rqst->rq_task->tk_pid, __func__,
- be32_to_cpup(p), be32_to_cpu(*(p + 1)));
+ dprintk("RPC: %s: server reports "
+ "version error (%u-%u), xid %08x\n", __func__,
+ be32_to_cpup(p), be32_to_cpu(*(p + 1)),
+ be32_to_cpu(rep->rr_xid));
break;
case err_chunk:
- dprintk("RPC: %5u: %s: server reports header decoding error\n",
- rqst->rq_task->tk_pid, __func__);
+ dprintk("RPC: %s: server reports "
+ "header decoding error, xid %08x\n", __func__,
+ be32_to_cpu(rep->rr_xid));
break;
default:
- dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
- rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
+ dprintk("RPC: %s: server reports "
+ "unrecognized error %d, xid %08x\n", __func__,
+ be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
}
r_xprt->rx_stats.bad_reply_count++;
@@ -1248,7 +1249,6 @@ out:
out_badheader:
trace_xprtrdma_reply_hdr(rep);
r_xprt->rx_stats.bad_reply_count++;
- status = -EIO;
goto out;
}
@@ -1262,8 +1262,7 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* RPC has relinquished all its Send Queue entries.
*/
if (!list_empty(&req->rl_registered))
- r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
- &req->rl_registered);
+ frwr_unmap_sync(r_xprt, &req->rl_registered);
/* Ensure that any DMA mapped pages associated with
* the Send of the RPC Call have been unmapped before
@@ -1292,7 +1291,7 @@ void rpcrdma_deferred_completion(struct work_struct *work)
trace_xprtrdma_defer_cmp(rep);
if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
- r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
+ frwr_reminv(rep, &req->rl_registered);
rpcrdma_release_rqst(r_xprt, req);
rpcrdma_complete_rqst(rep);
}
@@ -1312,11 +1311,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
u32 credits;
__be32 *p;
- --buf->rb_posted_receives;
-
- if (rep->rr_hdrbuf.head[0].iov_len == 0)
- goto out_badstatus;
-
/* Fixed transport header fields */
xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
rep->rr_hdrbuf.head[0].iov_base);
@@ -1356,36 +1350,30 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
}
req = rpcr_to_rdmar(rqst);
+ if (req->rl_reply) {
+ trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
+ rpcrdma_recv_buffer_put(req->rl_reply);
+ }
req->rl_reply = rep;
rep->rr_rqst = rqst;
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
-
- rpcrdma_post_recvs(r_xprt, false);
- queue_work(rpcrdma_receive_wq, &rep->rr_work);
+ queue_work(buf->rb_completion_wq, &rep->rr_work);
return;
out_badversion:
trace_xprtrdma_reply_vers(rep);
- goto repost;
+ goto out;
-/* The RPC transaction has already been terminated, or the header
- * is corrupt.
- */
out_norqst:
spin_unlock(&xprt->queue_lock);
trace_xprtrdma_reply_rqst(rep);
- goto repost;
+ goto out;
out_shortreply:
trace_xprtrdma_reply_short(rep);
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
- */
-repost:
- rpcrdma_post_recvs(r_xprt, false);
-out_badstatus:
+out:
rpcrdma_recv_buffer_put(rep);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index f3c147d70286..b908f2ca08fd 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -200,11 +200,10 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
svc_rdma_send_ctxt_put(rdma, ctxt);
goto drop_connection;
}
- return rc;
+ return 0;
drop_connection:
dprintk("svcrdma: failed to send bc call\n");
- xprt_disconnect_done(xprt);
return -ENOTCONN;
}
@@ -225,8 +224,11 @@ xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
ret = -ENOTCONN;
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
- if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+ if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
ret = rpcrdma_bc_send_request(rdma, rqst);
+ if (ret == -ENOTCONN)
+ svc_close_xprt(sxprt);
+ }
mutex_unlock(&sxprt->xpt_mutex);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 9141068693fa..fbc171ebfe91 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -268,7 +268,7 @@ xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- trace_xprtrdma_inject_dsc(r_xprt);
+ trace_xprtrdma_op_inject_dsc(r_xprt);
rdma_disconnect(r_xprt->rx_ia.ri_id);
}
@@ -284,7 +284,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- trace_xprtrdma_destroy(r_xprt);
+ trace_xprtrdma_op_destroy(r_xprt);
cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
@@ -318,17 +318,12 @@ xprt_setup_rdma(struct xprt_create *args)
struct sockaddr *sap;
int rc;
- if (args->addrlen > sizeof(xprt->addr)) {
- dprintk("RPC: %s: address too large\n", __func__);
+ if (args->addrlen > sizeof(xprt->addr))
return ERR_PTR(-EBADF);
- }
xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
- if (xprt == NULL) {
- dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
- __func__);
+ if (!xprt)
return ERR_PTR(-ENOMEM);
- }
/* 60 second timeout, no retries */
xprt->timeout = &xprt_rdma_default_timeout;
@@ -399,7 +394,7 @@ xprt_setup_rdma(struct xprt_create *args)
INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
xprt_rdma_connect_worker);
- xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
+ xprt->max_payload = frwr_maxpages(new_xprt);
if (xprt->max_payload == 0)
goto out4;
xprt->max_payload <<= PAGE_SHIFT;
@@ -423,7 +418,7 @@ out3:
out2:
rpcrdma_ia_close(&new_xprt->rx_ia);
out1:
- trace_xprtrdma_destroy(new_xprt);
+ trace_xprtrdma_op_destroy(new_xprt);
xprt_rdma_free_addresses(xprt);
xprt_free(xprt);
return ERR_PTR(rc);
@@ -433,29 +428,33 @@ out1:
* xprt_rdma_close - close a transport connection
* @xprt: transport context
*
- * Called during transport shutdown, reconnect, or device removal.
+ * Called during autoclose or device removal.
+ *
* Caller holds @xprt's send lock to prevent activity on this
* transport while the connection is torn down.
*/
-static void
-xprt_rdma_close(struct rpc_xprt *xprt)
+void xprt_rdma_close(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- dprintk("RPC: %s: closing xprt %p\n", __func__, xprt);
+ might_sleep();
+
+ trace_xprtrdma_op_close(r_xprt);
+
+ /* Prevent marshaling and sending of new requests */
+ xprt_clear_connected(xprt);
if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
- xprt_clear_connected(xprt);
rpcrdma_ia_remove(ia);
- return;
+ goto out;
}
+
if (ep->rep_connected == -ENODEV)
return;
if (ep->rep_connected > 0)
xprt->reestablish_timeout = 0;
- xprt_disconnect_done(xprt);
rpcrdma_ep_disconnect(ep, ia);
/* Prepare @xprt for the next connection by reinitializing
@@ -463,6 +462,10 @@ xprt_rdma_close(struct rpc_xprt *xprt)
*/
r_xprt->rx_buf.rb_credits = 1;
xprt->cwnd = RPC_CWNDSHIFT;
+
+out:
+ ++xprt->connect_cookie;
+ xprt_disconnect_done(xprt);
}
/**
@@ -525,6 +528,7 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ trace_xprtrdma_op_connect(r_xprt);
if (r_xprt->rx_ep.rep_connected != 0) {
/* Reconnect */
schedule_delayed_work(&r_xprt->rx_connect_worker,
@@ -659,11 +663,11 @@ xprt_rdma_allocate(struct rpc_task *task)
rqst->rq_buffer = req->rl_sendbuf->rg_base;
rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
- trace_xprtrdma_allocate(task, req);
+ trace_xprtrdma_op_allocate(task, req);
return 0;
out_fail:
- trace_xprtrdma_allocate(task, NULL);
+ trace_xprtrdma_op_allocate(task, NULL);
return -ENOMEM;
}
@@ -682,7 +686,7 @@ xprt_rdma_free(struct rpc_task *task)
if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
rpcrdma_release_rqst(r_xprt, req);
- trace_xprtrdma_rpc_done(task, req);
+ trace_xprtrdma_op_free(task, req);
}
/**
@@ -696,8 +700,10 @@ xprt_rdma_free(struct rpc_task *task)
* %-ENOTCONN if the caller should reconnect and call again
* %-EAGAIN if the caller should call again
* %-ENOBUFS if the caller should call again after a delay
- * %-EIO if a permanent error occurred and the request was not
- * sent. Do not try to send this message again.
+ * %-EMSGSIZE if encoding ran out of buffer space. The request
+ * was not sent. Do not try to send this message again.
+ * %-EIO if an I/O error occurred. The request was not sent.
+ * Do not try to send this message again.
*/
static int
xprt_rdma_send_request(struct rpc_rqst *rqst)
@@ -713,7 +719,7 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
if (!xprt_connected(xprt))
- goto drop_connection;
+ return -ENOTCONN;
if (!xprt_request_get_cong(xprt, rqst))
return -EBADSLT;
@@ -745,8 +751,8 @@ failed_marshal:
if (rc != -ENOTCONN)
return rc;
drop_connection:
- xprt_disconnect_done(xprt);
- return -ENOTCONN; /* implies disconnect */
+ xprt_rdma_close(xprt);
+ return -ENOTCONN;
}
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
@@ -843,58 +849,31 @@ static struct xprt_class xprt_rdma = {
void xprt_rdma_cleanup(void)
{
- int rc;
-
- dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (sunrpc_table_header) {
unregister_sysctl_table(sunrpc_table_header);
sunrpc_table_header = NULL;
}
#endif
- rc = xprt_unregister_transport(&xprt_rdma);
- if (rc)
- dprintk("RPC: %s: xprt_unregister returned %i\n",
- __func__, rc);
-
- rpcrdma_destroy_wq();
- rc = xprt_unregister_transport(&xprt_rdma_bc);
- if (rc)
- dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
- __func__, rc);
+ xprt_unregister_transport(&xprt_rdma);
+ xprt_unregister_transport(&xprt_rdma_bc);
}
int xprt_rdma_init(void)
{
int rc;
- rc = rpcrdma_alloc_wq();
- if (rc)
- return rc;
-
rc = xprt_register_transport(&xprt_rdma);
- if (rc) {
- rpcrdma_destroy_wq();
+ if (rc)
return rc;
- }
rc = xprt_register_transport(&xprt_rdma_bc);
if (rc) {
xprt_unregister_transport(&xprt_rdma);
- rpcrdma_destroy_wq();
return rc;
}
- dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
-
- dprintk("Defaults:\n");
- dprintk("\tSlots %d\n"
- "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
- xprt_rdma_slot_table_entries,
- xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
- dprintk("\tPadding 0\n\tMemreg %d\n", xprt_rdma_memreg_strategy);
-
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (!sunrpc_table_header)
sunrpc_table_header = register_sysctl_table(sunrpc_table);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 3ddba94c939f..7749a2bf6887 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -78,53 +78,25 @@ static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
+static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
-{
- struct workqueue_struct *recv_wq;
-
- recv_wq = alloc_workqueue("xprtrdma_receive",
- WQ_MEM_RECLAIM | WQ_HIGHPRI,
- 0);
- if (!recv_wq)
- return -ENOMEM;
-
- rpcrdma_receive_wq = recv_wq;
- return 0;
-}
-
-void
-rpcrdma_destroy_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (rpcrdma_receive_wq) {
- wq = rpcrdma_receive_wq;
- rpcrdma_receive_wq = NULL;
- destroy_workqueue(wq);
- }
-}
-
-/**
- * rpcrdma_disconnect_worker - Force a disconnect
- * @work: endpoint to be disconnected
- *
- * Provider callbacks can possibly run in an IRQ context. This function
- * is invoked in a worker thread to guarantee that disconnect wake-up
- * calls are always done in process context.
+/* Wait for outstanding transport work to finish.
*/
-static void
-rpcrdma_disconnect_worker(struct work_struct *work)
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_ep *ep = container_of(work, struct rpcrdma_ep,
- rep_disconnect_worker.work);
- struct rpcrdma_xprt *r_xprt =
- container_of(ep, struct rpcrdma_xprt, rx_ep);
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- xprt_force_disconnect(&r_xprt->rx_xprt);
+ /* Flush Receives, then wait for deferred Reply work
+ * to complete.
+ */
+ ib_drain_qp(ia->ri_id->qp);
+ drain_workqueue(buf->rb_completion_wq);
+
+ /* Deferred Reply processing might have scheduled
+ * local invalidations.
+ */
+ ib_drain_sq(ia->ri_id->qp);
}
/**
@@ -143,15 +115,6 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context)
rx_ep);
trace_xprtrdma_qp_event(r_xprt, event);
- pr_err("rpcrdma: %s on device %s connected to %s:%s\n",
- ib_event_msg(event->event), event->device->name,
- rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
-
- if (ep->rep_connected == 1) {
- ep->rep_connected = -EIO;
- schedule_delayed_work(&ep->rep_disconnect_worker, 0);
- wake_up_all(&ep->rep_connect_wait);
- }
}
/**
@@ -189,11 +152,13 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
rr_cqe);
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
- /* WARNING: Only wr_id and status are reliable at this point */
+ /* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_receive(wc);
+ --r_xprt->rx_ep.rep_receive_count;
if (wc->status != IB_WC_SUCCESS)
- goto out_fail;
+ goto out_flushed;
/* status == SUCCESS means all fields in wc are trustworthy */
rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
@@ -204,17 +169,16 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
rdmab_addr(rep->rr_rdmabuf),
wc->byte_len, DMA_FROM_DEVICE);
-out_schedule:
+ rpcrdma_post_recvs(r_xprt, false);
rpcrdma_reply_handler(rep);
return;
-out_fail:
+out_flushed:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
- rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
- goto out_schedule;
+ rpcrdma_recv_buffer_put(rep);
}
static void
@@ -316,7 +280,6 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
ep->rep_connected = -EAGAIN;
goto disconnected;
case RDMA_CM_EVENT_DISCONNECTED:
- ++xprt->connect_cookie;
ep->rep_connected = -ECONNABORTED;
disconnected:
xprt_force_disconnect(xprt);
@@ -326,10 +289,9 @@ disconnected:
break;
}
- dprintk("RPC: %s: %s:%s on %s/%s: %s\n", __func__,
+ dprintk("RPC: %s: %s:%s on %s/frwr: %s\n", __func__,
rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
- ia->ri_device->name, ia->ri_ops->ro_displayname,
- rdma_event_msg(event->event));
+ ia->ri_device->name, rdma_event_msg(event->event));
return 0;
}
@@ -347,22 +309,15 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
xprt, RDMA_PS_TCP, IB_QPT_RC);
- if (IS_ERR(id)) {
- rc = PTR_ERR(id);
- dprintk("RPC: %s: rdma_create_id() failed %i\n",
- __func__, rc);
+ if (IS_ERR(id))
return id;
- }
ia->ri_async_rc = -ETIMEDOUT;
rc = rdma_resolve_addr(id, NULL,
(struct sockaddr *)&xprt->rx_xprt.addr,
RDMA_RESOLVE_TIMEOUT);
- if (rc) {
- dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
- __func__, rc);
+ if (rc)
goto out;
- }
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
if (rc < 0) {
trace_xprtrdma_conn_tout(xprt);
@@ -375,11 +330,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
ia->ri_async_rc = -ETIMEDOUT;
rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
- if (rc) {
- dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
- __func__, rc);
+ if (rc)
goto out;
- }
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
if (rc < 0) {
trace_xprtrdma_conn_tout(xprt);
@@ -429,16 +381,8 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
switch (xprt_rdma_memreg_strategy) {
case RPCRDMA_FRWR:
- if (frwr_is_supported(ia)) {
- ia->ri_ops = &rpcrdma_frwr_memreg_ops;
- break;
- }
- /*FALLTHROUGH*/
- case RPCRDMA_MTHCAFMR:
- if (fmr_is_supported(ia)) {
- ia->ri_ops = &rpcrdma_fmr_memreg_ops;
+ if (frwr_is_supported(ia))
break;
- }
/*FALLTHROUGH*/
default:
pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
@@ -481,7 +425,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
* connection is already gone.
*/
if (ia->ri_id->qp) {
- ib_drain_qp(ia->ri_id->qp);
+ rpcrdma_xprt_drain(r_xprt);
rdma_destroy_qp(ia->ri_id);
ia->ri_id->qp = NULL;
}
@@ -552,7 +496,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
}
ia->ri_max_send_sges = max_sge;
- rc = ia->ri_ops->ro_open(ia, ep, cdata);
+ rc = frwr_open(ia, ep, cdata);
if (rc)
return rc;
@@ -579,16 +523,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
cdata->max_requests >> 2);
ep->rep_send_count = ep->rep_send_batch;
init_waitqueue_head(&ep->rep_connect_wait);
- INIT_DELAYED_WORK(&ep->rep_disconnect_worker,
- rpcrdma_disconnect_worker);
+ ep->rep_receive_count = 0;
sendcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_send_wr + 1,
1, IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
- dprintk("RPC: %s: failed to create send CQ: %i\n",
- __func__, rc);
goto out1;
}
@@ -597,8 +538,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
0, IB_POLL_WORKQUEUE);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
- dprintk("RPC: %s: failed to create recv CQ: %i\n",
- __func__, rc);
goto out2;
}
@@ -611,7 +550,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
/* Prepare RDMA-CM private message */
pmsg->cp_magic = rpcrdma_cmp_magic;
pmsg->cp_version = RPCRDMA_CMP_VERSION;
- pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
+ pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
ep->rep_remote_cma.private_data = pmsg;
@@ -653,8 +592,6 @@ out1:
void
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
- cancel_delayed_work_sync(&ep->rep_disconnect_worker);
-
if (ia->ri_id && ia->ri_id->qp) {
rpcrdma_ep_disconnect(ep, ia);
rdma_destroy_qp(ia->ri_id);
@@ -740,11 +677,8 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
}
err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
- if (err) {
- dprintk("RPC: %s: rdma_create_qp returned %d\n",
- __func__, err);
+ if (err)
goto out_destroy;
- }
/* Atomically replace the transport's ID and QP. */
rc = 0;
@@ -775,8 +709,6 @@ retry:
dprintk("RPC: %s: connecting...\n", __func__);
rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
if (rc) {
- dprintk("RPC: %s: rdma_create_qp failed %i\n",
- __func__, rc);
rc = -ENETUNREACH;
goto out_noupdate;
}
@@ -798,11 +730,8 @@ retry:
rpcrdma_post_recvs(r_xprt, true);
rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
- if (rc) {
- dprintk("RPC: %s: rdma_connect() failed with %i\n",
- __func__, rc);
+ if (rc)
goto out;
- }
wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
if (ep->rep_connected <= 0) {
@@ -822,8 +751,10 @@ out_noupdate:
return rc;
}
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
*
* This is separate from destroy to facilitate the ability
* to reconnect without recreating the endpoint.
@@ -834,19 +765,20 @@ out_noupdate:
void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
+ struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+ rx_ep);
int rc;
+ /* returns without wait if ID is not connected */
rc = rdma_disconnect(ia->ri_id);
if (!rc)
- /* returns without wait if not connected */
wait_event_interruptible(ep->rep_connect_wait,
ep->rep_connected != 1);
else
ep->rep_connected = rc;
- trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
- rx_ep), rc);
+ trace_xprtrdma_disconnect(r_xprt, rc);
- ib_drain_qp(ia->ri_id->qp);
+ rpcrdma_xprt_drain(r_xprt);
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -1034,7 +966,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
if (!mr)
break;
- rc = ia->ri_ops->ro_init_mr(ia, mr);
+ rc = frwr_init_mr(ia, mr);
if (rc) {
kfree(mr);
break;
@@ -1089,9 +1021,9 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
req->rl_buffer = buffer;
INIT_LIST_HEAD(&req->rl_registered);
- spin_lock(&buffer->rb_reqslock);
+ spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
- spin_unlock(&buffer->rb_reqslock);
+ spin_unlock(&buffer->rb_lock);
return req;
}
@@ -1134,8 +1066,6 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
out_free:
kfree(rep);
out:
- dprintk("RPC: %s: reply buffer %d alloc failed\n",
- __func__, rc);
return rc;
}
@@ -1159,7 +1089,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
- spin_lock_init(&buf->rb_reqslock);
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
@@ -1174,13 +1103,19 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
}
buf->rb_credits = 1;
- buf->rb_posted_receives = 0;
INIT_LIST_HEAD(&buf->rb_recv_bufs);
rc = rpcrdma_sendctxs_create(r_xprt);
if (rc)
goto out;
+ buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
+ WQ_MEM_RECLAIM | WQ_HIGHPRI,
+ 0,
+ r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
+ if (!buf->rb_completion_wq)
+ goto out;
+
return 0;
out:
rpcrdma_buffer_destroy(buf);
@@ -1194,9 +1129,18 @@ rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
kfree(rep);
}
+/**
+ * rpcrdma_req_destroy - Destroy an rpcrdma_req object
+ * @req: unused object to be destroyed
+ *
+ * This function assumes that the caller prevents concurrent device
+ * unload and transport tear-down.
+ */
void
-rpcrdma_destroy_req(struct rpcrdma_req *req)
+rpcrdma_req_destroy(struct rpcrdma_req *req)
{
+ list_del(&req->rl_all);
+
rpcrdma_free_regbuf(req->rl_recvbuf);
rpcrdma_free_regbuf(req->rl_sendbuf);
rpcrdma_free_regbuf(req->rl_rdmabuf);
@@ -1208,7 +1152,6 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
- struct rpcrdma_ia *ia = rdmab_to_ia(buf);
struct rpcrdma_mr *mr;
unsigned int count;
@@ -1224,7 +1167,7 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
if (!list_empty(&mr->mr_list))
list_del(&mr->mr_list);
- ia->ri_ops->ro_release_mr(mr);
+ frwr_release_mr(mr);
count++;
spin_lock(&buf->rb_mrlock);
}
@@ -1234,11 +1177,24 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
dprintk("RPC: %s: released %u MRs\n", __func__, count);
}
+/**
+ * rpcrdma_buffer_destroy - Release all hw resources
+ * @buf: root control block for resources
+ *
+ * ORDERING: relies on a prior ib_drain_qp :
+ * - No more Send or Receive completions can occur
+ * - All MRs, reps, and reqs are returned to their free lists
+ */
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ if (buf->rb_completion_wq) {
+ destroy_workqueue(buf->rb_completion_wq);
+ buf->rb_completion_wq = NULL;
+ }
+
rpcrdma_sendctxs_destroy(buf);
while (!list_empty(&buf->rb_recv_bufs)) {
@@ -1250,19 +1206,14 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
rpcrdma_destroy_rep(rep);
}
- spin_lock(&buf->rb_reqslock);
- while (!list_empty(&buf->rb_allreqs)) {
+ while (!list_empty(&buf->rb_send_bufs)) {
struct rpcrdma_req *req;
- req = list_first_entry(&buf->rb_allreqs,
- struct rpcrdma_req, rl_all);
- list_del(&req->rl_all);
-
- spin_unlock(&buf->rb_reqslock);
- rpcrdma_destroy_req(req);
- spin_lock(&buf->rb_reqslock);
+ req = list_first_entry(&buf->rb_send_bufs,
+ struct rpcrdma_req, rl_list);
+ list_del(&req->rl_list);
+ rpcrdma_req_destroy(req);
}
- spin_unlock(&buf->rb_reqslock);
rpcrdma_mrs_destroy(buf);
}
@@ -1329,9 +1280,12 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- trace_xprtrdma_mr_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
- mr->mr_sg, mr->mr_nents, mr->mr_dir);
+ if (mr->mr_dir != DMA_NONE) {
+ trace_xprtrdma_mr_unmap(mr);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mr->mr_sg, mr->mr_nents, mr->mr_dir);
+ mr->mr_dir = DMA_NONE;
+ }
__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
}
@@ -1410,7 +1364,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
- * or Replies they may be registered externally via ro_map.
+ * or Replies they may be registered externally via frwr_map.
*/
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
@@ -1446,8 +1400,10 @@ __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
(void *)rb->rg_base,
rdmab_length(rb),
rb->rg_direction);
- if (ib_dma_mapping_error(device, rdmab_addr(rb)))
+ if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
+ trace_xprtrdma_dma_maperr(rdmab_addr(rb));
return false;
+ }
rb->rg_device = device;
rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
@@ -1479,10 +1435,14 @@ rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
kfree(rb);
}
-/*
- * Prepost any receive buffer, then post send.
+/**
+ * rpcrdma_ep_post - Post WRs to a transport's Send Queue
+ * @ia: transport's device information
+ * @ep: transport's RDMA endpoint information
+ * @req: rpcrdma_req containing the Send WR to post
*
- * Receive buffer is donated to hardware, reclaimed upon recv completion.
+ * Returns 0 if the post was successful, otherwise -ENOTCONN
+ * is returned.
*/
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
@@ -1501,32 +1461,27 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
--ep->rep_send_count;
}
- rc = ia->ri_ops->ro_send(ia, req);
+ rc = frwr_send(ia, req);
trace_xprtrdma_post_send(req, rc);
if (rc)
return -ENOTCONN;
return 0;
}
-/**
- * rpcrdma_post_recvs - Maybe post some Receive buffers
- * @r_xprt: controlling transport
- * @temp: when true, allocate temp rpcrdma_rep objects
- *
- */
-void
+static void
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct ib_recv_wr *wr, *bad_wr;
int needed, count, rc;
rc = 0;
count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
- if (buf->rb_posted_receives > needed)
+ if (ep->rep_receive_count > needed)
goto out;
- needed -= buf->rb_posted_receives;
+ needed -= ep->rep_receive_count;
count = 0;
wr = NULL;
@@ -1574,7 +1529,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
--count;
}
}
- buf->rb_posted_receives += count;
+ ep->rep_receive_count += count;
out:
trace_xprtrdma_post_recvs(r_xprt, count, rc);
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 9218dbebedce..5a18472f2c9c 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -66,7 +66,6 @@
* Interface Adapter -- one per transport instance
*/
struct rpcrdma_ia {
- const struct rpcrdma_memreg_ops *ri_ops;
struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
@@ -81,8 +80,6 @@ struct rpcrdma_ia {
bool ri_implicit_roundup;
enum ib_mr_type ri_mrtype;
unsigned long ri_flags;
- struct ib_qp_attr ri_qp_attr;
- struct ib_qp_init_attr ri_qp_init_attr;
};
enum {
@@ -101,7 +98,7 @@ struct rpcrdma_ep {
wait_queue_head_t rep_connect_wait;
struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma;
- struct delayed_work rep_disconnect_worker;
+ int rep_receive_count;
};
/* Pre-allocate extra Work Requests for handling backward receives
@@ -262,20 +259,12 @@ struct rpcrdma_frwr {
};
};
-struct rpcrdma_fmr {
- struct ib_fmr *fm_mr;
- u64 *fm_physaddrs;
-};
-
struct rpcrdma_mr {
struct list_head mr_list;
struct scatterlist *mr_sg;
int mr_nents;
enum dma_data_direction mr_dir;
- union {
- struct rpcrdma_fmr fmr;
- struct rpcrdma_frwr frwr;
- };
+ struct rpcrdma_frwr frwr;
struct rpcrdma_xprt *mr_xprt;
u32 mr_handle;
u32 mr_length;
@@ -401,20 +390,18 @@ struct rpcrdma_buffer {
spinlock_t rb_lock; /* protect buf lists */
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
+ struct list_head rb_allreqs;
+
unsigned long rb_flags;
u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */
- int rb_posted_receives;
u32 rb_bc_srv_max_requests;
- spinlock_t rb_reqslock; /* protect rb_allreqs */
- struct list_head rb_allreqs;
-
u32 rb_bc_max_requests;
+ struct workqueue_struct *rb_completion_wq;
struct delayed_work rb_refresh_worker;
};
-#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
/* rb_flags */
enum {
@@ -465,35 +452,6 @@ struct rpcrdma_stats {
};
/*
- * Per-registration mode operations
- */
-struct rpcrdma_xprt;
-struct rpcrdma_memreg_ops {
- struct rpcrdma_mr_seg *
- (*ro_map)(struct rpcrdma_xprt *,
- struct rpcrdma_mr_seg *, int, bool,
- struct rpcrdma_mr **);
- int (*ro_send)(struct rpcrdma_ia *ia,
- struct rpcrdma_req *req);
- void (*ro_reminv)(struct rpcrdma_rep *rep,
- struct list_head *mrs);
- void (*ro_unmap_sync)(struct rpcrdma_xprt *,
- struct list_head *);
- int (*ro_open)(struct rpcrdma_ia *,
- struct rpcrdma_ep *,
- struct rpcrdma_create_data_internal *);
- size_t (*ro_maxpages)(struct rpcrdma_xprt *);
- int (*ro_init_mr)(struct rpcrdma_ia *,
- struct rpcrdma_mr *);
- void (*ro_release_mr)(struct rpcrdma_mr *mr);
- const char *ro_displayname;
- const int ro_send_w_inv_ok;
-};
-
-extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;
-extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops;
-
-/*
* RPCRDMA transport -- encapsulates the structures above for
* integration with RPC.
*
@@ -544,10 +502,6 @@ extern unsigned int xprt_rdma_memreg_strategy;
int rpcrdma_ia_open(struct rpcrdma_xprt *xprt);
void rpcrdma_ia_remove(struct rpcrdma_ia *ia);
void rpcrdma_ia_close(struct rpcrdma_ia *);
-bool frwr_is_supported(struct rpcrdma_ia *);
-bool fmr_is_supported(struct rpcrdma_ia *);
-
-extern struct workqueue_struct *rpcrdma_receive_wq;
/*
* Endpoint calls - xprtrdma/verbs.c
@@ -560,13 +514,12 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *);
-void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/*
* Buffer calls - xprtrdma/verbs.c
*/
struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
-void rpcrdma_destroy_req(struct rpcrdma_req *);
+void rpcrdma_req_destroy(struct rpcrdma_req *req);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
@@ -604,9 +557,6 @@ rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
return __rpcrdma_dma_map_regbuf(ia, rb);
}
-int rpcrdma_alloc_wq(void);
-void rpcrdma_destroy_wq(void);
-
/*
* Wrappers for chunk registration, shared by read/write chunk code.
*/
@@ -617,6 +567,23 @@ rpcrdma_data_dir(bool writing)
return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
}
+/* Memory registration calls xprtrdma/frwr_ops.c
+ */
+bool frwr_is_supported(struct rpcrdma_ia *);
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
+ struct rpcrdma_create_data_internal *cdata);
+int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
+void frwr_release_mr(struct rpcrdma_mr *mr);
+size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
+struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing, u32 xid,
+ struct rpcrdma_mr **mr);
+int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
+void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt,
+ struct list_head *mrs);
+
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
@@ -653,6 +620,7 @@ static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
extern unsigned int xprt_rdma_max_inline_read;
void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
+void xprt_rdma_close(struct rpc_xprt *xprt);
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);