summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c120
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c63
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c115
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c2
-rw-r--r--net/sunrpc/xprtrdma/transport.c105
-rw-r--r--net/sunrpc/xprtrdma/verbs.c338
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h121
7 files changed, 383 insertions, 481 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index d79b18c1f4cd..ce986591f213 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -19,45 +19,6 @@
#undef RPCRDMA_BACKCHANNEL_DEBUG
-static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
- unsigned int count)
-{
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- struct rpcrdma_req *req;
- struct rpc_rqst *rqst;
- unsigned int i;
-
- for (i = 0; i < (count << 1); i++) {
- struct rpcrdma_regbuf *rb;
- size_t size;
-
- req = rpcrdma_create_req(r_xprt);
- if (IS_ERR(req))
- return PTR_ERR(req);
- rqst = &req->rl_slot;
-
- rqst->rq_xprt = xprt;
- INIT_LIST_HEAD(&rqst->rq_bc_list);
- __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
- spin_lock(&xprt->bc_pa_lock);
- list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
- spin_unlock(&xprt->bc_pa_lock);
-
- size = r_xprt->rx_data.inline_rsize;
- rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
- if (IS_ERR(rb))
- goto out_fail;
- req->rl_sendbuf = rb;
- xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base,
- min_t(size_t, size, PAGE_SIZE));
- }
- return 0;
-
-out_fail:
- rpcrdma_req_destroy(req);
- return -ENOMEM;
-}
-
/**
* xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
* @xprt: transport associated with these backchannel resources
@@ -68,34 +29,10 @@ out_fail:
int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- int rc;
- /* The backchannel reply path returns each rpc_rqst to the
- * bc_pa_list _after_ the reply is sent. If the server is
- * faster than the client, it can send another backward
- * direction request before the rpc_rqst is returned to the
- * list. The client rejects the request in this case.
- *
- * Twice as many rpc_rqsts are prepared to ensure there is
- * always an rpc_rqst available as soon as a reply is sent.
- */
- if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
- goto out_err;
-
- rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
- if (rc)
- goto out_free;
-
- r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
+ r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
trace_xprtrdma_cb_setup(r_xprt, reqs);
return 0;
-
-out_free:
- xprt_rdma_bc_destroy(xprt, reqs);
-
-out_err:
- pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
- return -ENOMEM;
}
/**
@@ -107,10 +44,10 @@ out_err:
size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
size_t maxmsg;
- maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
+ maxmsg = min_t(unsigned int, ep->rep_inline_send, ep->rep_inline_recv);
maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
return maxmsg - RPCRDMA_HDRLEN_MIN;
}
@@ -123,7 +60,7 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
- req->rl_rdmabuf->rg_base, rqst);
+ rdmab_data(req->rl_rdmabuf), rqst);
p = xdr_reserve_space(&req->rl_stream, 28);
if (unlikely(!p))
@@ -223,6 +160,43 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
spin_unlock(&xprt->bc_pa_lock);
}
+static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct rpcrdma_req *req;
+ struct rpc_rqst *rqst;
+ size_t size;
+
+ spin_lock(&xprt->bc_pa_lock);
+ rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
+ rq_bc_pa_list);
+ if (!rqst)
+ goto create_req;
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock(&xprt->bc_pa_lock);
+ return rqst;
+
+create_req:
+ spin_unlock(&xprt->bc_pa_lock);
+
+ /* Set a limit to prevent a remote from overrunning our resources.
+ */
+ if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
+ return NULL;
+
+ size = min_t(size_t, r_xprt->rx_ep.rep_inline_recv, PAGE_SIZE);
+ req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
+ if (!req)
+ return NULL;
+
+ xprt->bc_alloc_count++;
+ rqst = &req->rl_slot;
+ rqst->rq_xprt = xprt;
+ __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+ xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
+ return rqst;
+}
+
/**
* rpcrdma_bc_receive_call - Handle a backward direction call
* @r_xprt: transport receiving the call
@@ -254,18 +228,10 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
pr_info("RPC: %s: %*ph\n", __func__, size, p);
#endif
- /* Grab a free bc rqst */
- spin_lock(&xprt->bc_pa_lock);
- if (list_empty(&xprt->bc_pa_list)) {
- spin_unlock(&xprt->bc_pa_lock);
+ rqst = rpcrdma_bc_rqst_get(r_xprt);
+ if (!rqst)
goto out_overflow;
- }
- rqst = list_first_entry(&xprt->bc_pa_list,
- struct rpc_rqst, rq_bc_pa_list);
- list_del(&rqst->rq_bc_pa_list);
- spin_unlock(&xprt->bc_pa_lock);
- /* Prepare rqst */
rqst->rq_reply_bytes_recvd = 0;
rqst->rq_xid = *p;
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 52cb6c1b0c2b..794ba4ca0994 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -82,13 +82,13 @@
/**
* frwr_is_supported - Check if device supports FRWR
- * @ia: interface adapter to check
+ * @device: interface adapter to check
*
* Returns true if device supports FRWR, otherwise false
*/
-bool frwr_is_supported(struct rpcrdma_ia *ia)
+bool frwr_is_supported(struct ib_device *device)
{
- struct ib_device_attr *attrs = &ia->ri_device->attrs;
+ struct ib_device_attr *attrs = &device->attrs;
if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
goto out_not_supported;
@@ -98,7 +98,7 @@ bool frwr_is_supported(struct rpcrdma_ia *ia)
out_not_supported:
pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
- ia->ri_device->name);
+ device->name);
return false;
}
@@ -131,7 +131,7 @@ frwr_mr_recycle_worker(struct work_struct *work)
if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
@@ -194,12 +194,11 @@ out_list_err:
* frwr_open - Prepare an endpoint for use with FRWR
* @ia: interface adapter this endpoint will use
* @ep: endpoint to prepare
- * @cdata: transport parameters
*
* On success, sets:
* ep->rep_attr.cap.max_send_wr
* ep->rep_attr.cap.max_recv_wr
- * cdata->max_requests
+ * ep->rep_max_requests
* ia->ri_max_segs
*
* And these FRWR-related fields:
@@ -208,10 +207,9 @@ out_list_err:
*
* On failure, a negative errno is returned.
*/
-int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
- struct rpcrdma_create_data_internal *cdata)
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
{
- struct ib_device_attr *attrs = &ia->ri_device->attrs;
+ struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
int max_qp_wr, depth, delta;
ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
@@ -253,24 +251,23 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
} while (delta > 0);
}
- max_qp_wr = ia->ri_device->attrs.max_qp_wr;
+ max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
max_qp_wr -= RPCRDMA_BACKWARD_WRS;
max_qp_wr -= 1;
if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
return -ENOMEM;
- if (cdata->max_requests > max_qp_wr)
- cdata->max_requests = max_qp_wr;
- ep->rep_attr.cap.max_send_wr = cdata->max_requests * depth;
+ if (ep->rep_max_requests > max_qp_wr)
+ ep->rep_max_requests = max_qp_wr;
+ ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
- cdata->max_requests = max_qp_wr / depth;
- if (!cdata->max_requests)
+ ep->rep_max_requests = max_qp_wr / depth;
+ if (!ep->rep_max_requests)
return -EINVAL;
- ep->rep_attr.cap.max_send_wr = cdata->max_requests *
- depth;
+ ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
}
ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
- ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
@@ -300,15 +297,6 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
(ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
}
-static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
-{
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
- wr, ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
-}
-
/**
* frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
* @cq: completion queue (ignored)
@@ -323,10 +311,8 @@ frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
container_of(cqe, struct rpcrdma_frwr, fr_cqe);
/* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS) {
+ if (wc->status != IB_WC_SUCCESS)
frwr->fr_state = FRWR_FLUSHED_FR;
- __frwr_sendcompletion_flush(wc, "fastreg");
- }
trace_xprtrdma_wc_fastreg(wc, frwr);
}
@@ -344,10 +330,8 @@ frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
fr_cqe);
/* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS) {
+ if (wc->status != IB_WC_SUCCESS)
frwr->fr_state = FRWR_FLUSHED_LI;
- __frwr_sendcompletion_flush(wc, "localinv");
- }
trace_xprtrdma_wc_li(wc, frwr);
}
@@ -366,12 +350,10 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
fr_cqe);
/* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS) {
+ if (wc->status != IB_WC_SUCCESS)
frwr->fr_state = FRWR_FLUSHED_LI;
- __frwr_sendcompletion_flush(wc, "localinv");
- }
- complete(&frwr->fr_linv_done);
trace_xprtrdma_wc_li_wake(wc, frwr);
+ complete(&frwr->fr_linv_done);
}
/**
@@ -436,7 +418,8 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
}
mr->mr_dir = rpcrdma_data_dir(writing);
- mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir);
+ mr->mr_nents =
+ ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
if (!mr->mr_nents)
goto out_dmamap_err;
@@ -466,7 +449,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
return seg;
out_dmamap_err:
- frwr->fr_state = FRWR_IS_INVALID;
+ mr->mr_dir = DMA_NONE;
trace_xprtrdma_frwr_sgerr(mr, i);
rpcrdma_mr_put(mr);
return ERR_PTR(-EIO);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 6c1fb270f127..85115a2e2639 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -105,16 +105,23 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
return size;
}
+/**
+ * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
+ * @r_xprt: transport instance to initialize
+ *
+ * The max_inline fields contain the maximum size of an RPC message
+ * so the marshaling code doesn't have to repeat this calculation
+ * for every RPC.
+ */
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- unsigned int maxsegs = ia->ri_max_segs;
-
- ia->ri_max_inline_write = cdata->inline_wsize -
- rpcrdma_max_call_header_size(maxsegs);
- ia->ri_max_inline_read = cdata->inline_rsize -
- rpcrdma_max_reply_header_size(maxsegs);
+ unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+ ep->rep_max_inline_send =
+ ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
+ ep->rep_max_inline_recv =
+ ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
}
/* The client can send a request inline as long as the RPCRDMA header
@@ -131,7 +138,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
struct xdr_buf *xdr = &rqst->rq_snd_buf;
unsigned int count, remaining, offset;
- if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
+ if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
return false;
if (xdr->page_len) {
@@ -159,9 +166,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
- return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
+ return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
}
/* The client is required to provide a Reply chunk if the maximum
@@ -173,10 +178,9 @@ rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
const struct rpc_rqst *rqst)
{
const struct xdr_buf *buf = &rqst->rq_rcv_buf;
- const struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- return buf->head[0].iov_len + buf->tail[0].iov_len <
- ia->ri_max_inline_read;
+ return (buf->head[0].iov_len + buf->tail[0].iov_len) <
+ r_xprt->rx_ep.rep_max_inline_recv;
}
/* Split @vec on page boundaries into SGEs. FMR registers pages, not
@@ -238,7 +242,7 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
*/
if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
if (!*ppages)
- *ppages = alloc_page(GFP_ATOMIC);
+ *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
if (!*ppages)
return -ENOBUFS;
}
@@ -508,50 +512,45 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
}
/**
- * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap
*
*/
-void
-rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{
- struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
struct ib_sge *sge;
- unsigned int count;
/* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so
* they can be cheaply re-used.
*/
- sge = &sc->sc_sges[2];
- for (count = sc->sc_unmap_count; count; ++sge, --count)
- ib_dma_unmap_page(ia->ri_device,
- sge->addr, sge->length, DMA_TO_DEVICE);
+ for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
+ ++sge, --sc->sc_unmap_count)
+ ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
+ DMA_TO_DEVICE);
- if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
- smp_mb__after_atomic();
+ if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
+ &sc->sc_req->rl_flags))
wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
- }
}
/* Prepare an SGE for the RPC-over-RDMA transport header.
*/
-static bool
-rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
- u32 len)
+static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, u32 len)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
struct ib_sge *sge = sc->sc_sges;
- if (!rpcrdma_dma_map_regbuf(ia, rb))
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_regbuf;
sge->addr = rdmab_addr(rb);
sge->length = len;
sge->lkey = rdmab_lkey(rb);
- ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
- sge->length, DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
+ DMA_TO_DEVICE);
sc->sc_wr.num_sge++;
return true;
@@ -563,23 +562,23 @@ out_regbuf:
/* Prepare the Send SGEs. The head and tail iovec, and each entry
* in the page list, gets its own SGE.
*/
-static bool
-rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
- struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct xdr_buf *xdr,
+ enum rpcrdma_chunktype rtype)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
unsigned int sge_no, page_base, len, remaining;
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
- struct ib_device *device = ia->ri_device;
struct ib_sge *sge = sc->sc_sges;
- u32 lkey = ia->ri_pd->local_dma_lkey;
struct page *page, **ppages;
/* The head iovec is straightforward, as it is already
* DMA-mapped. Sync the content that has changed.
*/
- if (!rpcrdma_dma_map_regbuf(ia, rb))
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_regbuf;
+ sc->sc_device = rdmab_device(rb);
sge_no = 1;
sge[sge_no].addr = rdmab_addr(rb);
sge[sge_no].length = xdr->head[0].iov_len;
@@ -626,13 +625,14 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
goto out_mapping_overflow;
len = min_t(u32, PAGE_SIZE - page_base, remaining);
- sge[sge_no].addr = ib_dma_map_page(device, *ppages,
- page_base, len,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ sge[sge_no].addr =
+ ib_dma_map_page(rdmab_device(rb), *ppages,
+ page_base, len, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdmab_device(rb),
+ sge[sge_no].addr))
goto out_mapping_err;
sge[sge_no].length = len;
- sge[sge_no].lkey = lkey;
+ sge[sge_no].lkey = rdmab_lkey(rb);
sc->sc_unmap_count++;
ppages++;
@@ -653,13 +653,13 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
map_tail:
sge_no++;
- sge[sge_no].addr = ib_dma_map_page(device, page,
- page_base, len,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ sge[sge_no].addr =
+ ib_dma_map_page(rdmab_device(rb), page, page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
goto out_mapping_err;
sge[sge_no].length = len;
- sge[sge_no].lkey = lkey;
+ sge[sge_no].lkey = rdmab_lkey(rb);
sc->sc_unmap_count++;
}
@@ -674,12 +674,12 @@ out_regbuf:
return false;
out_mapping_overflow:
- rpcrdma_unmap_sendctx(sc);
+ rpcrdma_sendctx_unmap(sc);
pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
return false;
out_mapping_err:
- rpcrdma_unmap_sendctx(sc);
+ rpcrdma_sendctx_unmap(sc);
trace_xprtrdma_dma_maperr(sge[sge_no].addr);
return false;
}
@@ -699,7 +699,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
- req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+ req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
return -EAGAIN;
req->rl_sendctx->sc_wr.num_sge = 0;
@@ -707,11 +707,11 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
req->rl_sendctx->sc_req = req;
__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
- if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
+ if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
return -EIO;
if (rtype != rpcrdma_areadch)
- if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
+ if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
return -EIO;
return 0;
@@ -747,8 +747,8 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
int ret;
rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
- xdr_init_encode(xdr, &req->rl_hdrbuf,
- req->rl_rdmabuf->rg_base, rqst);
+ xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
+ rqst);
/* Fixed header fields */
ret = -EMSGSIZE;
@@ -876,6 +876,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
return 0;
out_err:
+ trace_xprtrdma_marshal_failed(rqst, ret);
switch (ret) {
case -EAGAIN:
xprt_wait_for_buffer_space(rqst->rq_xprt);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 907464c2a9f0..bed57d8b5c19 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -261,7 +261,7 @@ static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
.buf_alloc = xprt_rdma_bc_allocate,
.buf_free = xprt_rdma_bc_free,
.send_request = xprt_rdma_bc_send_request,
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
.close = xprt_rdma_bc_close,
.destroy = xprt_rdma_bc_put,
.print_stats = xprt_rdma_print_stats
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 5d261353bd90..1f73a6a7e43c 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -68,9 +68,9 @@
* tunables
*/
-static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
+unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
-static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
+unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRWR;
int xprt_rdma_pad_optimize;
@@ -288,7 +288,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
- rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ rpcrdma_ep_destroy(r_xprt);
rpcrdma_buffer_destroy(&r_xprt->rx_buf);
rpcrdma_ia_close(&r_xprt->rx_ia);
@@ -311,10 +311,8 @@ static const struct rpc_timeout xprt_rdma_default_timeout = {
static struct rpc_xprt *
xprt_setup_rdma(struct xprt_create *args)
{
- struct rpcrdma_create_data_internal cdata;
struct rpc_xprt *xprt;
struct rpcrdma_xprt *new_xprt;
- struct rpcrdma_ep *new_ep;
struct sockaddr *sap;
int rc;
@@ -349,40 +347,12 @@ xprt_setup_rdma(struct xprt_create *args)
xprt_set_bound(xprt);
xprt_rdma_format_addresses(xprt, sap);
- cdata.max_requests = xprt_rdma_slot_table_entries;
-
- cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
- cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
-
- cdata.inline_wsize = xprt_rdma_max_inline_write;
- if (cdata.inline_wsize > cdata.wsize)
- cdata.inline_wsize = cdata.wsize;
-
- cdata.inline_rsize = xprt_rdma_max_inline_read;
- if (cdata.inline_rsize > cdata.rsize)
- cdata.inline_rsize = cdata.rsize;
-
- /*
- * Create new transport instance, which includes initialized
- * o ia
- * o endpoint
- * o buffers
- */
-
new_xprt = rpcx_to_rdmax(xprt);
-
rc = rpcrdma_ia_open(new_xprt);
if (rc)
goto out1;
- /*
- * initialize and create ep
- */
- new_xprt->rx_data = cdata;
- new_ep = &new_xprt->rx_ep;
-
- rc = rpcrdma_ep_create(&new_xprt->rx_ep,
- &new_xprt->rx_ia, &new_xprt->rx_data);
+ rc = rpcrdma_ep_create(new_xprt);
if (rc)
goto out2;
@@ -413,7 +383,7 @@ out4:
rpcrdma_buffer_destroy(&new_xprt->rx_buf);
rc = -ENODEV;
out3:
- rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
+ rpcrdma_ep_destroy(new_xprt);
out2:
rpcrdma_ia_close(&new_xprt->rx_ia);
out1:
@@ -585,52 +555,15 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
rpc_wake_up_next(&xprt->backlog);
}
-static bool
-rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- size_t size, gfp_t flags)
+static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb, size_t size,
+ gfp_t flags)
{
- struct rpcrdma_regbuf *rb;
-
- if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
- return true;
-
- rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
- if (IS_ERR(rb))
- return false;
-
- rpcrdma_free_regbuf(req->rl_sendbuf);
- r_xprt->rx_stats.hardway_register_count += size;
- req->rl_sendbuf = rb;
- return true;
-}
-
-/* The rq_rcv_buf is used only if a Reply chunk is necessary.
- * The decision to use a Reply chunk is made later in
- * rpcrdma_marshal_req. This buffer is registered at that time.
- *
- * Otherwise, the associated RPC Reply arrives in a separate
- * Receive buffer, arbitrarily chosen by the HCA. The buffer
- * allocated here for the RPC Reply is not utilized in that
- * case. See rpcrdma_inline_fixup.
- *
- * A regbuf is used here to remember the buffer size.
- */
-static bool
-rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- size_t size, gfp_t flags)
-{
- struct rpcrdma_regbuf *rb;
-
- if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
- return true;
-
- rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
- if (IS_ERR(rb))
- return false;
-
- rpcrdma_free_regbuf(req->rl_recvbuf);
- r_xprt->rx_stats.hardway_register_count += size;
- req->rl_recvbuf = rb;
+ if (unlikely(rdmab_length(rb) < size)) {
+ if (!rpcrdma_regbuf_realloc(rb, size, flags))
+ return false;
+ r_xprt->rx_stats.hardway_register_count += size;
+ }
return true;
}
@@ -655,13 +588,15 @@ xprt_rdma_allocate(struct rpc_task *task)
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
- if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
+ if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize,
+ flags))
goto out_fail;
- if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
+ if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize,
+ flags))
goto out_fail;
- rqst->rq_buffer = req->rl_sendbuf->rg_base;
- rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
+ rqst->rq_buffer = rdmab_data(req->rl_sendbuf);
+ rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf);
trace_xprtrdma_op_allocate(task, req);
return 0;
@@ -815,7 +750,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
.alloc_slot = xprt_rdma_alloc_slot,
.free_slot = xprt_rdma_free_slot,
.release_request = xprt_release_rqst_cong, /* ditto */
- .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
+ .wait_for_reply_request = xprt_wait_for_reply_request_def, /* ditto */
.timer = xprt_rdma_timer,
.rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
.set_port = xprt_rdma_set_port,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 30cfc0efe699..bef5eac8ab38 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -76,11 +76,16 @@
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
-static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
-static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
+static struct rpcrdma_regbuf *
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
+ gfp_t flags);
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
-/* Wait for outstanding transport work to finish.
+/* Wait for outstanding transport work to finish. ib_drain_qp
+ * handles the drains in the wrong order for us, so open code
+ * them here.
*/
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
@@ -132,11 +137,6 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_send(sc, wc);
- if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
-
rpcrdma_sendctx_put_locked(sc);
}
@@ -174,10 +174,6 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
return;
out_flushed:
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
rpcrdma_recv_buffer_put(rep);
}
@@ -185,7 +181,6 @@ static void
rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
struct rdma_conn_param *param)
{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
const struct rpcrdma_connect_private *pmsg = param->private_data;
unsigned int rsize, wsize;
@@ -202,12 +197,13 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
}
- if (rsize < cdata->inline_rsize)
- cdata->inline_rsize = rsize;
- if (wsize < cdata->inline_wsize)
- cdata->inline_wsize = wsize;
- dprintk("RPC: %s: max send %u, max recv %u\n",
- __func__, cdata->inline_wsize, cdata->inline_rsize);
+ if (rsize < r_xprt->rx_ep.rep_inline_recv)
+ r_xprt->rx_ep.rep_inline_recv = rsize;
+ if (wsize < r_xprt->rx_ep.rep_inline_send)
+ r_xprt->rx_ep.rep_inline_send = wsize;
+ dprintk("RPC: %s: max send %u, max recv %u\n", __func__,
+ r_xprt->rx_ep.rep_inline_send,
+ r_xprt->rx_ep.rep_inline_recv);
rpcrdma_set_max_header_sizes(r_xprt);
}
@@ -247,7 +243,7 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_DEVICE_REMOVAL:
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
pr_info("rpcrdma: removing device %s for %s:%s\n",
- ia->ri_device->name,
+ ia->ri_id->device->name,
rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
#endif
set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
@@ -256,7 +252,6 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
wait_for_completion(&ia->ri_remove_done);
ia->ri_id = NULL;
- ia->ri_device = NULL;
/* Return 1 to ensure the core destroys the id. */
return 1;
case RDMA_CM_EVENT_ESTABLISHED:
@@ -291,7 +286,7 @@ disconnected:
dprintk("RPC: %s: %s:%s on %s/frwr: %s\n", __func__,
rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
- ia->ri_device->name, rdma_event_msg(event->event));
+ ia->ri_id->device->name, rdma_event_msg(event->event));
return 0;
}
@@ -370,9 +365,8 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
rc = PTR_ERR(ia->ri_id);
goto out_err;
}
- ia->ri_device = ia->ri_id->device;
- ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
+ ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0);
if (IS_ERR(ia->ri_pd)) {
rc = PTR_ERR(ia->ri_pd);
pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
@@ -381,12 +375,12 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
switch (xprt_rdma_memreg_strategy) {
case RPCRDMA_FRWR:
- if (frwr_is_supported(ia))
+ if (frwr_is_supported(ia->ri_id->device))
break;
/*FALLTHROUGH*/
default:
pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
- ia->ri_device->name, xprt_rdma_memreg_strategy);
+ ia->ri_id->device->name, xprt_rdma_memreg_strategy);
rc = -EINVAL;
goto out_err;
}
@@ -438,11 +432,11 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
* mappings and MRs are gone.
*/
list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
- rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
+ rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
- rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
- rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
- rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
+ rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
+ rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
+ rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
}
rpcrdma_mrs_destroy(buf);
ib_dealloc_pd(ia->ri_pd);
@@ -468,7 +462,6 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia)
rdma_destroy_id(ia->ri_id);
}
ia->ri_id = NULL;
- ia->ri_device = NULL;
/* If the pd is still busy, xprtrdma missed freeing a resource */
if (ia->ri_pd && !IS_ERR(ia->ri_pd))
@@ -476,19 +469,26 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia)
ia->ri_pd = NULL;
}
-/*
- * Create unconnected endpoint.
+/**
+ * rpcrdma_ep_create - Create unconnected endpoint
+ * @r_xprt: transport to instantiate
+ *
+ * Returns zero on success, or a negative errno.
*/
-int
-rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
- struct rpcrdma_create_data_internal *cdata)
+int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
struct ib_cq *sendcq, *recvcq;
unsigned int max_sge;
int rc;
- max_sge = min_t(unsigned int, ia->ri_device->attrs.max_send_sge,
+ ep->rep_max_requests = xprt_rdma_slot_table_entries;
+ ep->rep_inline_send = xprt_rdma_max_inline_write;
+ ep->rep_inline_recv = xprt_rdma_max_inline_read;
+
+ max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge,
RPCRDMA_MAX_SEND_SGES);
if (max_sge < RPCRDMA_MIN_SEND_SGES) {
pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
@@ -496,7 +496,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
}
ia->ri_max_send_sges = max_sge;
- rc = frwr_open(ia, ep, cdata);
+ rc = frwr_open(ia, ep);
if (rc)
return rc;
@@ -518,23 +518,21 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.cap.max_send_sge,
ep->rep_attr.cap.max_recv_sge);
- /* set trigger for requesting send completion */
- ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
- cdata->max_requests >> 2);
+ ep->rep_send_batch = ep->rep_max_requests >> 3;
ep->rep_send_count = ep->rep_send_batch;
init_waitqueue_head(&ep->rep_connect_wait);
ep->rep_receive_count = 0;
- sendcq = ib_alloc_cq(ia->ri_device, NULL,
+ sendcq = ib_alloc_cq(ia->ri_id->device, NULL,
ep->rep_attr.cap.max_send_wr + 1,
- ia->ri_device->num_comp_vectors > 1 ? 1 : 0,
+ ia->ri_id->device->num_comp_vectors > 1 ? 1 : 0,
IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
goto out1;
}
- recvcq = ib_alloc_cq(ia->ri_device, NULL,
+ recvcq = ib_alloc_cq(ia->ri_id->device, NULL,
ep->rep_attr.cap.max_recv_wr + 1,
0, IB_POLL_WORKQUEUE);
if (IS_ERR(recvcq)) {
@@ -552,15 +550,15 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
pmsg->cp_magic = rpcrdma_cmp_magic;
pmsg->cp_version = RPCRDMA_CMP_VERSION;
pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
- pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
- pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
+ pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send);
+ pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv);
ep->rep_remote_cma.private_data = pmsg;
ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
/* Client offers RDMA Read but does not initiate */
ep->rep_remote_cma.initiator_depth = 0;
ep->rep_remote_cma.responder_resources =
- min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
+ min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
/* Limit transport retries so client can detect server
* GID changes quickly. RPC layer handles re-establishing
@@ -583,16 +581,16 @@ out1:
return rc;
}
-/*
- * rpcrdma_ep_destroy
+/**
+ * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
+ * @r_xprt: transport instance to shut down
*
- * Disconnect and destroy endpoint. After this, the only
- * valid operations on the ep are to free it (if dynamically
- * allocated) or re-create it.
*/
-void
-rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+
if (ia->ri_id && ia->ri_id->qp) {
rpcrdma_ep_disconnect(ep, ia);
rdma_destroy_qp(ia->ri_id);
@@ -622,7 +620,7 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
goto out1;
rc = -ENOMEM;
- err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
+ err = rpcrdma_ep_create(r_xprt);
if (err) {
pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
goto out2;
@@ -639,7 +637,7 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
return 0;
out3:
- rpcrdma_ep_destroy(ep, ia);
+ rpcrdma_ep_destroy(r_xprt);
out2:
rpcrdma_ia_close(ia);
out1:
@@ -672,7 +670,7 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
*/
old = id;
rc = -ENETUNREACH;
- if (ia->ri_device != id->device) {
+ if (ia->ri_id->device != id->device) {
pr_err("rpcrdma: can't reconnect on different device!\n");
goto out_destroy;
}
@@ -796,8 +794,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
*/
/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
- * queue activity, and ib_drain_qp has flushed all remaining Send
- * requests.
+ * queue activity, and rpcrdma_xprt_drain has flushed all remaining
+ * Send requests.
*/
static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
{
@@ -867,20 +865,20 @@ static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
/**
* rpcrdma_sendctx_get_locked - Acquire a send context
- * @buf: transport buffers from which to acquire an unused context
+ * @r_xprt: controlling transport instance
*
* Returns pointer to a free send completion context; or NULL if
* the queue is empty.
*
* Usage: Called to acquire an SGE array before preparing a Send WR.
*
- * The caller serializes calls to this function (per rpcrdma_buffer),
- * and provides an effective memory barrier that flushes the new value
+ * The caller serializes calls to this function (per transport), and
+ * provides an effective memory barrier that flushes the new value
* of rb_sc_head.
*/
-struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_xprt *r_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_sendctx *sc;
unsigned long next_head;
@@ -905,7 +903,6 @@ out_emptyq:
* backing up. Cause the caller to pause and try again.
*/
set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
- r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
r_xprt->rx_stats.empty_sendctx_q++;
return NULL;
}
@@ -917,7 +914,7 @@ out_emptyq:
* Usage: Called from Send completion to return a sendctxt
* to the queue.
*
- * The caller serializes calls to this function (per rpcrdma_buffer).
+ * The caller serializes calls to this function (per transport).
*/
static void
rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
@@ -925,7 +922,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
unsigned long next_tail;
- /* Unmap SGEs of previously completed by unsignaled
+ /* Unmap SGEs of previously completed but unsignaled
* Sends by walking up the queue until @sc is found.
*/
next_tail = buf->rb_sc_tail;
@@ -933,7 +930,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
next_tail = rpcrdma_sendctx_next(buf, next_tail);
/* ORDER: item must be accessed _before_ tail is updated */
- rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
+ rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
} while (buf->rb_sc_ctxs[next_tail] != sc);
@@ -996,54 +993,70 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
rpcrdma_mrs_create(r_xprt);
}
-struct rpcrdma_req *
-rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
+/**
+ * rpcrdma_req_create - Allocate an rpcrdma_req object
+ * @r_xprt: controlling r_xprt
+ * @size: initial size, in bytes, of send and receive buffers
+ * @flags: GFP flags passed to memory allocators
+ *
+ * Returns an allocated and fully initialized rpcrdma_req or NULL.
+ */
+struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
+ gfp_t flags)
{
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
- req = kzalloc(sizeof(*req), GFP_KERNEL);
+ req = kzalloc(sizeof(*req), flags);
if (req == NULL)
- return ERR_PTR(-ENOMEM);
+ goto out1;
- rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
- DMA_TO_DEVICE, GFP_KERNEL);
- if (IS_ERR(rb)) {
- kfree(req);
- return ERR_PTR(-ENOMEM);
- }
+ rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
+ if (!rb)
+ goto out2;
req->rl_rdmabuf = rb;
- xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
+ xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
+
+ req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
+ if (!req->rl_sendbuf)
+ goto out3;
+
+ req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
+ if (!req->rl_recvbuf)
+ goto out4;
+
req->rl_buffer = buffer;
INIT_LIST_HEAD(&req->rl_registered);
-
spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_lock);
return req;
+
+out4:
+ kfree(req->rl_sendbuf);
+out3:
+ kfree(req->rl_rdmabuf);
+out2:
+ kfree(req);
+out1:
+ return NULL;
}
-static int
-rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
+static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_rep *rep;
- int rc;
- rc = -ENOMEM;
rep = kzalloc(sizeof(*rep), GFP_KERNEL);
if (rep == NULL)
goto out;
- rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
+ rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
DMA_FROM_DEVICE, GFP_KERNEL);
- if (IS_ERR(rep->rr_rdmabuf)) {
- rc = PTR_ERR(rep->rr_rdmabuf);
+ if (!rep->rr_rdmabuf)
goto out_free;
- }
- xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
+ xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
rdmab_length(rep->rr_rdmabuf));
rep->rr_cqe.done = rpcrdma_wc_receive;
@@ -1058,22 +1071,27 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
spin_lock(&buf->rb_lock);
list_add(&rep->rr_list, &buf->rb_recv_bufs);
spin_unlock(&buf->rb_lock);
- return 0;
+ return true;
out_free:
kfree(rep);
out:
- return rc;
+ return false;
}
-int
-rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
+/**
+ * rpcrdma_buffer_create - Create initial set of req/rep objects
+ * @r_xprt: transport instance to (re)initialize
+ *
+ * Returns zero on success, otherwise a negative errno.
+ */
+int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
int i, rc;
buf->rb_flags = 0;
- buf->rb_max_requests = r_xprt->rx_data.max_requests;
+ buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
@@ -1086,16 +1104,15 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
+
+ rc = -ENOMEM;
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
- req = rpcrdma_create_req(r_xprt);
- if (IS_ERR(req)) {
- dprintk("RPC: %s: request buffer %d alloc"
- " failed\n", __func__, i);
- rc = PTR_ERR(req);
+ req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
+ GFP_KERNEL);
+ if (!req)
goto out;
- }
list_add(&req->rl_list, &buf->rb_send_bufs);
}
@@ -1121,10 +1138,9 @@ out:
return rc;
}
-static void
-rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
{
- rpcrdma_free_regbuf(rep->rr_rdmabuf);
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
kfree(rep);
}
@@ -1140,9 +1156,9 @@ rpcrdma_req_destroy(struct rpcrdma_req *req)
{
list_del(&req->rl_all);
- rpcrdma_free_regbuf(req->rl_recvbuf);
- rpcrdma_free_regbuf(req->rl_sendbuf);
- rpcrdma_free_regbuf(req->rl_rdmabuf);
+ rpcrdma_regbuf_free(req->rl_recvbuf);
+ rpcrdma_regbuf_free(req->rl_sendbuf);
+ rpcrdma_regbuf_free(req->rl_rdmabuf);
kfree(req);
}
@@ -1180,7 +1196,7 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
* rpcrdma_buffer_destroy - Release all hw resources
* @buf: root control block for resources
*
- * ORDERING: relies on a prior ib_drain_qp :
+ * ORDERING: relies on a prior rpcrdma_xprt_drain :
* - No more Send or Receive completions can occur
* - All MRs, reps, and reqs are returned to their free lists
*/
@@ -1202,7 +1218,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
rep = list_first_entry(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list);
list_del(&rep->rr_list);
- rpcrdma_destroy_rep(rep);
+ rpcrdma_rep_destroy(rep);
}
while (!list_empty(&buf->rb_send_bufs)) {
@@ -1281,7 +1297,7 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
@@ -1331,7 +1347,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
}
spin_unlock(&buffers->rb_lock);
if (rep)
- rpcrdma_destroy_rep(rep);
+ rpcrdma_rep_destroy(rep);
}
/*
@@ -1348,69 +1364,90 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
list_add(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
} else {
- rpcrdma_destroy_rep(rep);
+ rpcrdma_rep_destroy(rep);
}
}
-/**
- * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
- * @size: size of buffer to be allocated, in bytes
- * @direction: direction of data movement
- * @flags: GFP flags
- *
- * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
- * can be persistently DMA-mapped for I/O.
+/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
* or Replies they may be registered externally via frwr_map.
*/
-struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
+static struct rpcrdma_regbuf *
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags)
{
struct rpcrdma_regbuf *rb;
- rb = kmalloc(sizeof(*rb) + size, flags);
- if (rb == NULL)
- return ERR_PTR(-ENOMEM);
+ rb = kmalloc(sizeof(*rb), flags);
+ if (!rb)
+ return NULL;
+ rb->rg_data = kmalloc(size, flags);
+ if (!rb->rg_data) {
+ kfree(rb);
+ return NULL;
+ }
rb->rg_device = NULL;
rb->rg_direction = direction;
rb->rg_iov.length = size;
-
return rb;
}
/**
- * __rpcrdma_map_regbuf - DMA-map a regbuf
- * @ia: controlling rpcrdma_ia
+ * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
+ * @rb: regbuf to reallocate
+ * @size: size of buffer to be allocated, in bytes
+ * @flags: GFP flags
+ *
+ * Returns true if reallocation was successful. If false is
+ * returned, @rb is left untouched.
+ */
+bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
+{
+ void *buf;
+
+ buf = kmalloc(size, flags);
+ if (!buf)
+ return false;
+
+ rpcrdma_regbuf_dma_unmap(rb);
+ kfree(rb->rg_data);
+
+ rb->rg_data = buf;
+ rb->rg_iov.length = size;
+ return true;
+}
+
+/**
+ * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
+ * @r_xprt: controlling transport instance
* @rb: regbuf to be mapped
+ *
+ * Returns true if the buffer is now DMA mapped to @r_xprt's device
*/
-bool
-__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb)
{
- struct ib_device *device = ia->ri_device;
+ struct ib_device *device = r_xprt->rx_ia.ri_id->device;
if (rb->rg_direction == DMA_NONE)
return false;
- rb->rg_iov.addr = ib_dma_map_single(device,
- (void *)rb->rg_base,
- rdmab_length(rb),
- rb->rg_direction);
+ rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
+ rdmab_length(rb), rb->rg_direction);
if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
trace_xprtrdma_dma_maperr(rdmab_addr(rb));
return false;
}
rb->rg_device = device;
- rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
+ rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
return true;
}
-static void
-rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
{
if (!rb)
return;
@@ -1418,19 +1455,16 @@ rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
if (!rpcrdma_regbuf_is_mapped(rb))
return;
- ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
- rdmab_length(rb), rb->rg_direction);
+ ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
+ rb->rg_direction);
rb->rg_device = NULL;
}
-/**
- * rpcrdma_free_regbuf - deregister and free registered buffer
- * @rb: regbuf to be deregistered and freed
- */
-void
-rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
{
- rpcrdma_dma_unmap_regbuf(rb);
+ rpcrdma_regbuf_dma_unmap(rb);
+ if (rb)
+ kfree(rb->rg_data);
kfree(rb);
}
@@ -1497,17 +1531,15 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
list_del(&rep->rr_list);
spin_unlock(&buf->rb_lock);
if (!rep) {
- if (rpcrdma_create_rep(r_xprt, temp))
+ if (!rpcrdma_rep_create(r_xprt, temp))
break;
continue;
}
rb = rep->rr_rdmabuf;
- if (!rpcrdma_regbuf_is_mapped(rb)) {
- if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
- rpcrdma_recv_buffer_put(rep);
- break;
- }
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) {
+ rpcrdma_recv_buffer_put(rep);
+ break;
}
trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 10f6593e1a6a..d1e0749bcbc4 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -66,20 +66,17 @@
* Interface Adapter -- one per transport instance
*/
struct rpcrdma_ia {
- struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
- struct completion ri_done;
- struct completion ri_remove_done;
int ri_async_rc;
unsigned int ri_max_segs;
unsigned int ri_max_frwr_depth;
- unsigned int ri_max_inline_write;
- unsigned int ri_max_inline_read;
unsigned int ri_max_send_sges;
bool ri_implicit_roundup;
enum ib_mr_type ri_mrtype;
unsigned long ri_flags;
+ struct completion ri_done;
+ struct completion ri_remove_done;
};
enum {
@@ -93,22 +90,29 @@ enum {
struct rpcrdma_ep {
unsigned int rep_send_count;
unsigned int rep_send_batch;
+ unsigned int rep_max_inline_send;
+ unsigned int rep_max_inline_recv;
int rep_connected;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma;
+ unsigned int rep_max_requests; /* set by /proc */
+ unsigned int rep_inline_send; /* negotiated */
+ unsigned int rep_inline_recv; /* negotiated */
int rep_receive_count;
};
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are
- * allocated when the forward channel is set up.
+ * allocated when the forward channel is set up, long before the
+ * backchannel is provisioned. This value is two times
+ * NFS4_DEF_CB_SLOT_TABLE_SIZE.
*/
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-#define RPCRDMA_BACKWARD_WRS (8)
+#define RPCRDMA_BACKWARD_WRS (32)
#else
-#define RPCRDMA_BACKWARD_WRS (0)
+#define RPCRDMA_BACKWARD_WRS (0)
#endif
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
@@ -121,33 +125,34 @@ struct rpcrdma_regbuf {
struct ib_sge rg_iov;
struct ib_device *rg_device;
enum dma_data_direction rg_direction;
- __be32 rg_base[0] __attribute__ ((aligned(256)));
+ void *rg_data;
};
-static inline u64
-rdmab_addr(struct rpcrdma_regbuf *rb)
+static inline u64 rdmab_addr(struct rpcrdma_regbuf *rb)
{
return rb->rg_iov.addr;
}
-static inline u32
-rdmab_length(struct rpcrdma_regbuf *rb)
+static inline u32 rdmab_length(struct rpcrdma_regbuf *rb)
{
return rb->rg_iov.length;
}
-static inline u32
-rdmab_lkey(struct rpcrdma_regbuf *rb)
+static inline u32 rdmab_lkey(struct rpcrdma_regbuf *rb)
{
return rb->rg_iov.lkey;
}
-static inline struct ib_device *
-rdmab_device(struct rpcrdma_regbuf *rb)
+static inline struct ib_device *rdmab_device(struct rpcrdma_regbuf *rb)
{
return rb->rg_device;
}
+static inline void *rdmab_data(const struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_data;
+}
+
#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN)
/* To ensure a transport can always make forward progress,
@@ -222,34 +227,18 @@ struct rpcrdma_xprt;
struct rpcrdma_sendctx {
struct ib_send_wr sc_wr;
struct ib_cqe sc_cqe;
+ struct ib_device *sc_device;
struct rpcrdma_xprt *sc_xprt;
struct rpcrdma_req *sc_req;
unsigned int sc_unmap_count;
struct ib_sge sc_sges[];
};
-/* Limit the number of SGEs that can be unmapped during one
- * Send completion. This caps the amount of work a single
- * completion can do before returning to the provider.
- *
- * Setting this to zero disables Send completion batching.
- */
-enum {
- RPCRDMA_MAX_SEND_BATCH = 7,
-};
-
/*
* struct rpcrdma_mr - external memory region metadata
*
* An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered).
- *
- * Each rpcrdma_buffer has a list of free MWs anchored in rb_mrs. During
- * call_allocate, rpcrdma_buffer_get() assigns one to each segment in
- * an rpcrdma_req. Then rpcrdma_register_external() grabs these to keep
- * track of registration metadata while each RPC is pending.
- * rpcrdma_deregister_external() uses this metadata to unmap and
- * release these resources when an RPC is complete.
*/
enum rpcrdma_frwr_state {
FRWR_IS_INVALID, /* ready to be used */
@@ -419,20 +408,6 @@ enum {
};
/*
- * Internal structure for transport instance creation. This
- * exists primarily for modularity.
- *
- * This data should be set with mount options
- */
-struct rpcrdma_create_data_internal {
- unsigned int max_requests; /* max requests (slots) in flight */
- unsigned int rsize; /* mount rsize - max read hdr+data */
- unsigned int wsize; /* mount wsize - max write hdr+data */
- unsigned int inline_rsize; /* max non-rdma read data payload */
- unsigned int inline_wsize; /* max non-rdma write data payload */
-};
-
-/*
* Statistics for RPCRDMA
*/
struct rpcrdma_stats {
@@ -476,13 +451,11 @@ struct rpcrdma_xprt {
struct rpcrdma_ia rx_ia;
struct rpcrdma_ep rx_ep;
struct rpcrdma_buffer rx_buf;
- struct rpcrdma_create_data_internal rx_data;
struct delayed_work rx_connect_worker;
struct rpcrdma_stats rx_stats;
};
#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt)
-#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
static inline const char *
rpcrdma_addrstr(const struct rpcrdma_xprt *r_xprt)
@@ -516,9 +489,8 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
/*
* Endpoint calls - xprtrdma/verbs.c
*/
-int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
- struct rpcrdma_create_data_internal *);
-void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt);
+void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt);
int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
@@ -528,11 +500,12 @@ int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
/*
* Buffer calls - xprtrdma/verbs.c
*/
-struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
+ gfp_t flags);
void rpcrdma_req_destroy(struct rpcrdma_req *req);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
-struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr);
@@ -548,23 +521,34 @@ struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
-struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
- gfp_t);
-bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
-void rpcrdma_free_regbuf(struct rpcrdma_regbuf *);
+bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
+ gfp_t flags);
+bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb);
-static inline bool
-rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
+/**
+ * rpcrdma_regbuf_is_mapped - check if buffer is DMA mapped
+ *
+ * Returns true if the buffer is now mapped to rb->rg_device.
+ */
+static inline bool rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
{
return rb->rg_device != NULL;
}
-static inline bool
-rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+/**
+ * rpcrdma_regbuf_dma_map - DMA-map a regbuf
+ * @r_xprt: controlling transport instance
+ * @rb: regbuf to be mapped
+ *
+ * Returns true if the buffer is currently DMA mapped.
+ */
+static inline bool rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_regbuf *rb)
{
if (likely(rpcrdma_regbuf_is_mapped(rb)))
return true;
- return __rpcrdma_dma_map_regbuf(ia, rb);
+ return __rpcrdma_regbuf_dma_map(r_xprt, rb);
}
/*
@@ -579,9 +563,8 @@ rpcrdma_data_dir(bool writing)
/* Memory registration calls xprtrdma/frwr_ops.c
*/
-bool frwr_is_supported(struct rpcrdma_ia *);
-int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
- struct rpcrdma_create_data_internal *cdata);
+bool frwr_is_supported(struct ib_device *device);
+int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
void frwr_release_mr(struct rpcrdma_mr *mr);
size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
@@ -610,7 +593,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr,
enum rpcrdma_chunktype rtype);
-void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
@@ -627,7 +610,9 @@ static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
/* RPC/RDMA module init - xprtrdma/transport.c
*/
+extern unsigned int xprt_rdma_slot_table_entries;
extern unsigned int xprt_rdma_max_inline_read;
+extern unsigned int xprt_rdma_max_inline_write;
void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
void xprt_rdma_close(struct rpc_xprt *xprt);