summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/linux/sunrpc/svc_rdma.h35
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c13
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c13
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c254
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c205
5 files changed, 254 insertions, 266 deletions
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 8827b4e36c3c..d3e2bb331264 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -109,8 +109,8 @@ struct svcxprt_rdma {
struct ib_pd *sc_pd;
- spinlock_t sc_ctxt_lock;
- struct list_head sc_ctxts;
+ spinlock_t sc_send_lock;
+ struct list_head sc_send_ctxts;
int sc_ctxt_used;
spinlock_t sc_rw_ctxt_lock;
struct list_head sc_rw_ctxts;
@@ -158,6 +158,19 @@ struct svc_rdma_recv_ctxt {
struct page *rc_pages[RPCSVC_MAXPAGES];
};
+enum {
+ RPCRDMA_MAX_SGES = 1 + (RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE),
+};
+
+struct svc_rdma_send_ctxt {
+ struct list_head sc_list;
+ struct ib_send_wr sc_send_wr;
+ struct ib_cqe sc_cqe;
+ int sc_page_count;
+ struct page *sc_pages[RPCSVC_MAXPAGES];
+ struct ib_sge sc_sges[RPCRDMA_MAX_SGES];
+};
+
/* svc_rdma_backchannel.c */
extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
__be32 *rdma_resp,
@@ -183,24 +196,22 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
struct xdr_buf *xdr);
/* svc_rdma_sendto.c */
+extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
+extern struct svc_rdma_send_ctxt *
+ svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma);
+extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt);
+extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr);
extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
__be32 *rdma_resp, unsigned int len);
extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
u32 inv_rkey);
extern int svc_rdma_sendto(struct svc_rqst *);
/* svc_rdma_transport.c */
-extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
-extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
-extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
-extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
-extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
extern void svc_sq_reap(struct svcxprt_rdma *);
extern void svc_rq_reap(struct svcxprt_rdma *);
extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 0b9ba9f50a76..95e33511cc6f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-2.0
/*
- * Copyright (c) 2015 Oracle. All rights reserved.
+ * Copyright (c) 2015-2018 Oracle. All rights reserved.
*
* Support for backward direction RPCs on RPC/RDMA (server-side).
*/
@@ -117,10 +117,14 @@ out_notfound:
static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
struct rpc_rqst *rqst)
{
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt;
int ret;
- ctxt = svc_rdma_get_context(rdma);
+ ctxt = svc_rdma_send_ctxt_get(rdma);
+ if (!ctxt) {
+ ret = -ENOMEM;
+ goto out_err;
+ }
/* rpcrdma_bc_send_request builds the transport header and
* the backchannel RPC message in the same buffer. Thus only
@@ -144,8 +148,7 @@ out_err:
return ret;
out_unmap:
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
ret = -EIO;
goto out_err;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index af6d2f3b3242..2d1e0db4c869 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -601,7 +601,7 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
__be32 *rdma_argp, int status)
{
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt;
__be32 *p, *err_msgp;
unsigned int length;
struct page *page;
@@ -631,7 +631,10 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
length = (unsigned long)p - (unsigned long)err_msgp;
/* Map transport header; no RPC message payload */
- ctxt = svc_rdma_get_context(xprt);
+ ctxt = svc_rdma_send_ctxt_get(xprt);
+ if (!ctxt)
+ return;
+
ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
if (ret) {
dprintk("svcrdma: Error %d mapping send for protocol error\n",
@@ -640,10 +643,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
}
ret = svc_rdma_post_send_wr(xprt, ctxt, 0);
- if (ret) {
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
- }
+ if (ret)
+ svc_rdma_send_ctxt_put(xprt, ctxt);
}
/* By convention, backchannel calls arrive via rdma_msg type
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 4591017adc1e..b286d6a6e429 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -75,11 +75,11 @@
* DMA-unmap the pages under I/O for that Write segment. The Write
* completion handler does not release any pages.
*
- * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
+ * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
* The ownership of all of the Reply's pages are transferred into that
* ctxt, the Send WR is posted, and sendto returns.
*
- * The svc_rdma_op_ctxt is presented when the Send WR completes. The
+ * The svc_rdma_send_ctxt is presented when the Send WR completes. The
* Send completion handler finally releases the Reply's pages.
*
* This mechanism also assumes that completions on the transport's Send
@@ -114,6 +114,184 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
+
+static inline struct svc_rdma_send_ctxt *
+svc_rdma_next_send_ctxt(struct list_head *list)
+{
+ return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
+ sc_list);
+}
+
+static struct svc_rdma_send_ctxt *
+svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_send_ctxt *ctxt;
+ int i;
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (!ctxt)
+ return NULL;
+
+ ctxt->sc_cqe.done = svc_rdma_wc_send;
+ ctxt->sc_send_wr.next = NULL;
+ ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
+ ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
+ ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
+ for (i = 0; i < ARRAY_SIZE(ctxt->sc_sges); i++)
+ ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
+ return ctxt;
+}
+
+/**
+ * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
+ * @rdma: svcxprt_rdma being torn down
+ *
+ */
+void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_send_ctxt *ctxt;
+
+ while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
+ list_del(&ctxt->sc_list);
+ kfree(ctxt);
+ }
+}
+
+/**
+ * svc_rdma_send_ctxt_get - Get a free send_ctxt
+ * @rdma: controlling svcxprt_rdma
+ *
+ * Returns a ready-to-use send_ctxt, or NULL if none are
+ * available and a fresh one cannot be allocated.
+ */
+struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_send_ctxt *ctxt;
+
+ spin_lock(&rdma->sc_send_lock);
+ ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts);
+ if (!ctxt)
+ goto out_empty;
+ list_del(&ctxt->sc_list);
+ spin_unlock(&rdma->sc_send_lock);
+
+out:
+ ctxt->sc_send_wr.num_sge = 0;
+ ctxt->sc_page_count = 0;
+ return ctxt;
+
+out_empty:
+ spin_unlock(&rdma->sc_send_lock);
+ ctxt = svc_rdma_send_ctxt_alloc(rdma);
+ if (!ctxt)
+ return NULL;
+ goto out;
+}
+
+/**
+ * svc_rdma_send_ctxt_put - Return send_ctxt to free list
+ * @rdma: controlling svcxprt_rdma
+ * @ctxt: object to return to the free list
+ *
+ * Pages left in sc_pages are DMA unmapped and released.
+ */
+void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ struct ib_device *device = rdma->sc_cm_id->device;
+ unsigned int i;
+
+ for (i = 0; i < ctxt->sc_send_wr.num_sge; i++)
+ ib_dma_unmap_page(device,
+ ctxt->sc_sges[i].addr,
+ ctxt->sc_sges[i].length,
+ DMA_TO_DEVICE);
+
+ for (i = 0; i < ctxt->sc_page_count; ++i)
+ put_page(ctxt->sc_pages[i]);
+
+ spin_lock(&rdma->sc_send_lock);
+ list_add(&ctxt->sc_list, &rdma->sc_send_ctxts);
+ spin_unlock(&rdma->sc_send_lock);
+}
+
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: Completion Queue context
+ * @wc: Work Completion object
+ *
+ * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
+ * the Send completion handler could be running.
+ */
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct svcxprt_rdma *rdma = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_send_ctxt *ctxt;
+
+ trace_svcrdma_wc_send(wc);
+
+ atomic_inc(&rdma->sc_sq_avail);
+ wake_up(&rdma->sc_send_wait);
+
+ ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
+
+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: Send: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ }
+
+ svc_xprt_put(&rdma->sc_xprt);
+}
+
+int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
+{
+ struct ib_send_wr *bad_wr, *n_wr;
+ int wr_count;
+ int i;
+ int ret;
+
+ wr_count = 1;
+ for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
+ wr_count++;
+
+ /* If the SQ is full, wait until an SQ entry is available */
+ while (1) {
+ if ((atomic_sub_return(wr_count, &rdma->sc_sq_avail) < 0)) {
+ atomic_inc(&rdma_stat_sq_starve);
+ trace_svcrdma_sq_full(rdma);
+ atomic_add(wr_count, &rdma->sc_sq_avail);
+ wait_event(rdma->sc_send_wait,
+ atomic_read(&rdma->sc_sq_avail) > wr_count);
+ if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+ return -ENOTCONN;
+ trace_svcrdma_sq_retry(rdma);
+ continue;
+ }
+ /* Take a transport ref for each WR posted */
+ for (i = 0; i < wr_count; i++)
+ svc_xprt_get(&rdma->sc_xprt);
+
+ /* Bump used SQ WR count and post */
+ ret = ib_post_send(rdma->sc_qp, wr, &bad_wr);
+ trace_svcrdma_post_send(wr, ret);
+ if (ret) {
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ for (i = 0; i < wr_count; i++)
+ svc_xprt_put(&rdma->sc_xprt);
+ wake_up(&rdma->sc_send_wait);
+ }
+ break;
+ }
+ return ret;
+}
+
static u32 xdr_padsize(u32 len)
{
return (len & 3) ? (4 - (len & 3)) : 0;
@@ -303,7 +481,7 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
unsigned int sge_no,
struct page *page,
unsigned long offset,
@@ -316,10 +494,9 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
if (ib_dma_mapping_error(dev, dma_addr))
goto out_maperr;
- ctxt->sge[sge_no].addr = dma_addr;
- ctxt->sge[sge_no].length = len;
- ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
- ctxt->mapped_sges++;
+ ctxt->sc_sges[sge_no].addr = dma_addr;
+ ctxt->sc_sges[sge_no].length = len;
+ ctxt->sc_send_wr.num_sge++;
return 0;
out_maperr:
@@ -331,7 +508,7 @@ out_maperr:
* handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
*/
static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
unsigned int sge_no,
unsigned char *base,
unsigned int len)
@@ -352,14 +529,13 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
* %-EIO if DMA mapping failed.
*/
int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
__be32 *rdma_resp,
unsigned int len)
{
- ctxt->direction = DMA_TO_DEVICE;
- ctxt->pages[0] = virt_to_page(rdma_resp);
- ctxt->count = 1;
- return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
+ ctxt->sc_pages[0] = virt_to_page(rdma_resp);
+ ctxt->sc_page_count++;
+ return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->sc_pages[0], 0, len);
}
/* Load the xdr_buf into the ctxt's sge array, and DMA map each
@@ -368,7 +544,7 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
* Returns zero on success, or a negative errno on failure.
*/
static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
struct xdr_buf *xdr, __be32 *wr_lst)
{
unsigned int len, sge_no, remaining;
@@ -436,13 +612,13 @@ tail:
* so they are released by the Send completion handler.
*/
static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *ctxt)
+ struct svc_rdma_send_ctxt *ctxt)
{
int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
- ctxt->count += pages;
+ ctxt->sc_page_count += pages;
for (i = 0; i < pages; i++) {
- ctxt->pages[i + 1] = rqstp->rq_respages[i];
+ ctxt->sc_pages[i + 1] = rqstp->rq_respages[i];
rqstp->rq_respages[i] = NULL;
}
rqstp->rq_next_page = rqstp->rq_respages + 1;
@@ -461,37 +637,29 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
* %-ENOMEM if ib_post_send failed.
*/
int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
u32 inv_rkey)
{
- struct ib_send_wr *send_wr = &ctxt->send_wr;
-
dprintk("svcrdma: posting Send WR with %u sge(s)\n",
- ctxt->mapped_sges);
-
- send_wr->next = NULL;
- ctxt->cqe.done = svc_rdma_wc_send;
- send_wr->wr_cqe = &ctxt->cqe;
- send_wr->sg_list = ctxt->sge;
- send_wr->num_sge = ctxt->mapped_sges;
- send_wr->send_flags = IB_SEND_SIGNALED;
+ ctxt->sc_send_wr.num_sge);
+
if (inv_rkey) {
- send_wr->opcode = IB_WR_SEND_WITH_INV;
- send_wr->ex.invalidate_rkey = inv_rkey;
+ ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+ ctxt->sc_send_wr.ex.invalidate_rkey = inv_rkey;
} else {
- send_wr->opcode = IB_WR_SEND;
+ ctxt->sc_send_wr.opcode = IB_WR_SEND;
}
- return svc_rdma_send(rdma, send_wr);
+ return svc_rdma_send(rdma, &ctxt->sc_send_wr);
}
/* Prepare the portion of the RPC Reply that will be transmitted
* via RDMA Send. The RPC-over-RDMA transport header is prepared
- * in sge[0], and the RPC xdr_buf is prepared in following sges.
+ * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
*
* Depending on whether a Write list or Reply chunk is present,
* the server may send all, a portion of, or none of the xdr_buf.
- * In the latter case, only the transport header (sge[0]) is
+ * In the latter case, only the transport header (sc_sges[0]) is
* transmitted.
*
* RDMA Send is the last step of transmitting an RPC reply. Pages
@@ -508,11 +676,13 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp,
__be32 *wr_lst, __be32 *rp_ch)
{
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt;
u32 inv_rkey;
int ret;
- ctxt = svc_rdma_get_context(rdma);
+ ctxt = svc_rdma_send_ctxt_get(rdma);
+ if (!ctxt)
+ return -ENOMEM;
ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
svc_rdma_reply_hdr_len(rdma_resp));
@@ -538,8 +708,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
return 0;
err:
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
return ret;
}
@@ -553,11 +722,13 @@ err:
static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
__be32 *rdma_resp, struct svc_rqst *rqstp)
{
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt;
__be32 *p;
int ret;
- ctxt = svc_rdma_get_context(rdma);
+ ctxt = svc_rdma_send_ctxt_get(rdma);
+ if (!ctxt)
+ return -ENOMEM;
/* Replace the original transport header with an
* RDMA_ERROR response. XID etc are preserved.
@@ -580,8 +751,7 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
return 0;
err:
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
return ret;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index baeecbb2f763..3de81735a6cc 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/*
+ * Copyright (c) 2015-2018 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
*
@@ -157,114 +158,6 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
-static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
- gfp_t flags)
-{
- struct svc_rdma_op_ctxt *ctxt;
-
- ctxt = kmalloc(sizeof(*ctxt), flags);
- if (ctxt) {
- ctxt->xprt = xprt;
- INIT_LIST_HEAD(&ctxt->list);
- }
- return ctxt;
-}
-
-static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
-{
- unsigned int i;
-
- i = xprt->sc_sq_depth;
- while (i--) {
- struct svc_rdma_op_ctxt *ctxt;
-
- ctxt = alloc_ctxt(xprt, GFP_KERNEL);
- if (!ctxt) {
- dprintk("svcrdma: No memory for RDMA ctxt\n");
- return false;
- }
- list_add(&ctxt->list, &xprt->sc_ctxts);
- }
- return true;
-}
-
-struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
-{
- struct svc_rdma_op_ctxt *ctxt = NULL;
-
- spin_lock(&xprt->sc_ctxt_lock);
- xprt->sc_ctxt_used++;
- if (list_empty(&xprt->sc_ctxts))
- goto out_empty;
-
- ctxt = list_first_entry(&xprt->sc_ctxts,
- struct svc_rdma_op_ctxt, list);
- list_del(&ctxt->list);
- spin_unlock(&xprt->sc_ctxt_lock);
-
-out:
- ctxt->count = 0;
- ctxt->mapped_sges = 0;
- return ctxt;
-
-out_empty:
- /* Either pre-allocation missed the mark, or send
- * queue accounting is broken.
- */
- spin_unlock(&xprt->sc_ctxt_lock);
-
- ctxt = alloc_ctxt(xprt, GFP_NOIO);
- if (ctxt)
- goto out;
-
- spin_lock(&xprt->sc_ctxt_lock);
- xprt->sc_ctxt_used--;
- spin_unlock(&xprt->sc_ctxt_lock);
- WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
- return NULL;
-}
-
-void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
-{
- struct svcxprt_rdma *xprt = ctxt->xprt;
- struct ib_device *device = xprt->sc_cm_id->device;
- unsigned int i;
-
- for (i = 0; i < ctxt->mapped_sges; i++)
- ib_dma_unmap_page(device,
- ctxt->sge[i].addr,
- ctxt->sge[i].length,
- ctxt->direction);
- ctxt->mapped_sges = 0;
-}
-
-void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
-{
- struct svcxprt_rdma *xprt = ctxt->xprt;
- int i;
-
- if (free_pages)
- for (i = 0; i < ctxt->count; i++)
- put_page(ctxt->pages[i]);
-
- spin_lock(&xprt->sc_ctxt_lock);
- xprt->sc_ctxt_used--;
- list_add(&ctxt->list, &xprt->sc_ctxts);
- spin_unlock(&xprt->sc_ctxt_lock);
-}
-
-static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
-{
- while (!list_empty(&xprt->sc_ctxts)) {
- struct svc_rdma_op_ctxt *ctxt;
-
- ctxt = list_first_entry(&xprt->sc_ctxts,
- struct svc_rdma_op_ctxt, list);
- list_del(&ctxt->list);
- kfree(ctxt);
- }
-}
-
/* QP event handler */
static void qp_event_handler(struct ib_event *event, void *context)
{
@@ -292,39 +185,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
}
}
-/**
- * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct svcxprt_rdma *xprt = cq->cq_context;
- struct ib_cqe *cqe = wc->wr_cqe;
- struct svc_rdma_op_ctxt *ctxt;
-
- trace_svcrdma_wc_send(wc);
-
- atomic_inc(&xprt->sc_sq_avail);
- wake_up(&xprt->sc_send_wait);
-
- ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
-
- if (unlikely(wc->status != IB_WC_SUCCESS)) {
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- svc_xprt_enqueue(&xprt->sc_xprt);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: Send: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
- }
-
- svc_xprt_put(&xprt->sc_xprt);
-}
-
static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
struct net *net)
{
@@ -338,14 +198,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
- INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
+ INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);
spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
- spin_lock_init(&cma_xprt->sc_ctxt_lock);
+ spin_lock_init(&cma_xprt->sc_send_lock);
spin_lock_init(&cma_xprt->sc_recv_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
@@ -640,9 +500,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
}
atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
- if (!svc_rdma_prealloc_ctxts(newxprt))
- goto errout;
-
newxprt->sc_pd = ib_alloc_pd(dev, 0);
if (IS_ERR(newxprt->sc_pd)) {
dprintk("svcrdma: error creating PD for connect request\n");
@@ -794,11 +651,6 @@ static void __svc_rdma_free(struct work_struct *work)
svc_rdma_flush_recv_queues(rdma);
- /* Warn if we leaked a resource or under-referenced */
- if (rdma->sc_ctxt_used != 0)
- pr_err("svcrdma: ctxt still in use? (%d)\n",
- rdma->sc_ctxt_used);
-
/* Final put of backchannel client transport */
if (xprt->xpt_bc_xprt) {
xprt_put(xprt->xpt_bc_xprt);
@@ -806,7 +658,7 @@ static void __svc_rdma_free(struct work_struct *work)
}
svc_rdma_destroy_rw_ctxts(rdma);
- svc_rdma_destroy_ctxts(rdma);
+ svc_rdma_send_ctxts_destroy(rdma);
svc_rdma_recv_ctxts_destroy(rdma);
/* Destroy the QP if present (not a listener) */
@@ -860,52 +712,3 @@ static void svc_rdma_secure_port(struct svc_rqst *rqstp)
static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
{
}
-
-int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
-{
- struct ib_send_wr *bad_wr, *n_wr;
- int wr_count;
- int i;
- int ret;
-
- if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
- return -ENOTCONN;
-
- wr_count = 1;
- for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
- wr_count++;
-
- /* If the SQ is full, wait until an SQ entry is available */
- while (1) {
- if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
- atomic_inc(&rdma_stat_sq_starve);
- trace_svcrdma_sq_full(xprt);
- atomic_add(wr_count, &xprt->sc_sq_avail);
- wait_event(xprt->sc_send_wait,
- atomic_read(&xprt->sc_sq_avail) > wr_count);
- if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
- return -ENOTCONN;
- trace_svcrdma_sq_retry(xprt);
- continue;
- }
- /* Take a transport ref for each WR posted */
- for (i = 0; i < wr_count; i++)
- svc_xprt_get(&xprt->sc_xprt);
-
- /* Bump used SQ WR count and post */
- ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
- trace_svcrdma_post_send(wr, ret);
- if (ret) {
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- for (i = 0; i < wr_count; i ++)
- svc_xprt_put(&xprt->sc_xprt);
- dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
- dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n",
- atomic_read(&xprt->sc_sq_avail),
- xprt->sc_sq_depth);
- wake_up(&xprt->sc_send_wait);
- }
- break;
- }
- return ret;
-}