summaryrefslogtreecommitdiff
path: root/net/sunrpc
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2023-12-18 17:32:07 -0500
committerChuck Lever <chuck.lever@oracle.com>2024-01-07 17:54:33 -0500
commitd3dba534100d4e9eb7a5204be97cd6f9ada2066e (patch)
tree8ff227f2cc04de812f1eb7f2692880f1d87547cd /net/sunrpc
parentecba85e951c178e3fe7cea04eebf1035e8168f93 (diff)
svcrdma: Implement multi-stage Read completion again
Having an nfsd thread waiting for an RDMA Read completion is problematic if the Read responder (ie, the client) stops responding. We need to go back to handling RDMA Reads by getting the svc scheduler to call svc_rdma_recvfrom() a second time to finish building an RPC message after a Read completion. This is the final patch, and makes several changes that have to happen concurrently: 1. svc_rdma_process_read_list no longer waits for a completion, but simply builds and posts the Read WRs. 2. svc_rdma_read_done() now queues a completed Read on sc_read_complete_q for later processing rather than calling complete(). 3. The completed RPC message is no longer built in the svc_rdma_process_read_list() path. Finishing the message is now done in svc_rdma_recvfrom() when it notices work on the sc_read_complete_q. The "finish building this RPC message" code is removed from the svc_rdma_process_read_list() path. This arrangement avoids the need for an nfsd thread to wait for an RDMA Read non-interruptibly without a timeout. It's basically the same code structure that Tom Tucker used for Read chunks along with some clean-up and modernization. Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c36
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c151
2 files changed, 76 insertions, 111 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 034bdd02f925..d72953f29258 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -214,6 +214,8 @@ struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *ctxt)
{
+ svc_rdma_cc_release(rdma, &ctxt->rc_cc, DMA_FROM_DEVICE);
+
/* @rc_page_count is normally zero here, but error flows
* can leave pages in @rc_pages.
*/
@@ -870,6 +872,7 @@ static noinline void svc_rdma_read_complete(struct svc_rqst *rqstp,
* procedure for that depends on what kind of RPC/RDMA
* chunks were provided by the client.
*/
+ rqstp->rq_arg = ctxt->rc_saved_arg;
if (pcl_is_empty(&ctxt->rc_call_pcl)) {
if (ctxt->rc_read_pcl.cl_count == 1)
svc_rdma_read_complete_one(rqstp, ctxt);
@@ -930,7 +933,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q);
if (ctxt) {
list_del(&ctxt->rc_list);
- spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+ spin_unlock(&rdma_xprt->sc_rq_dto_lock);
+ svc_xprt_received(xprt);
svc_rdma_read_complete(rqstp, ctxt);
goto complete;
}
@@ -965,11 +969,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
if (!pcl_is_empty(&ctxt->rc_read_pcl) ||
- !pcl_is_empty(&ctxt->rc_call_pcl)) {
- ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
- if (ret < 0)
- goto out_readfail;
- }
+ !pcl_is_empty(&ctxt->rc_call_pcl))
+ goto out_readlist;
complete:
rqstp->rq_xprt_ctxt = ctxt;
@@ -983,12 +984,23 @@ out_err:
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
return 0;
-out_readfail:
- if (ret == -EINVAL)
- svc_rdma_send_error(rdma_xprt, ctxt, ret);
- svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
- svc_xprt_deferred_close(xprt);
- return -ENOTCONN;
+out_readlist:
+ /* This @rqstp is about to be recycled. Save the work
+ * already done constructing the Call message in rq_arg
+ * so it can be restored when the RDMA Reads have
+ * completed.
+ */
+ ctxt->rc_saved_arg = rqstp->rq_arg;
+
+ ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
+ if (ret < 0) {
+ if (ret == -EINVAL)
+ svc_rdma_send_error(rdma_xprt, ctxt, ret);
+ svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
+ svc_xprt_deferred_close(xprt);
+ return ret;
+ }
+ return 0;
out_backchannel:
svc_rdma_handle_bc_reply(rqstp, ctxt);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 28a34718dee5..c00fcce61d1e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -163,14 +163,15 @@ void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
cc->cc_sqecount = 0;
}
-/*
- * The consumed rw_ctx's are cleaned and placed on a local llist so
- * that only one atomic llist operation is needed to put them all
- * back on the free list.
+/**
+ * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
+ * @rdma: controlling transport instance
+ * @cc: svc_rdma_chunk_ctxt to be released
+ * @dir: DMA direction
*/
-static void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
- struct svc_rdma_chunk_ctxt *cc,
- enum dma_data_direction dir)
+void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc,
+ enum dma_data_direction dir)
{
struct llist_node *first, *last;
struct svc_rdma_rw_ctxt *ctxt;
@@ -300,12 +301,21 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
struct svc_rdma_recv_ctxt *ctxt;
+ svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
+
+ ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
switch (wc->status) {
case IB_WC_SUCCESS:
- ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
cc->cc_posttime);
- break;
+
+ spin_lock(&rdma->sc_rq_dto_lock);
+ list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
+ /* the unlock pairs with the smp_rmb in svc_xprt_ready */
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ spin_unlock(&rdma->sc_rq_dto_lock);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ return;
case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
break;
@@ -313,10 +323,13 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
}
- svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
- cc->cc_status = wc->status;
- complete(&cc->cc_done);
- return;
+ /* The RDMA Read has flushed, so the incoming RPC message
+ * cannot be constructed and must be dropped. Signal the
+ * loss to the client by closing the connection.
+ */
+ svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
+ svc_rdma_recv_ctxt_put(rdma, ctxt);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
}
/*
@@ -823,7 +836,6 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head)
{
const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
- struct xdr_buf *buf = &rqstp->rq_arg;
struct svc_rdma_chunk *chunk, *next;
unsigned int start, length;
int ret;
@@ -853,18 +865,7 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
start += length;
length = head->rc_byte_len - start;
- ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
- if (ret < 0)
- return ret;
-
- buf->len += head->rc_readbytes;
- buf->buflen += head->rc_readbytes;
-
- buf->head[0].iov_base = page_address(rqstp->rq_pages[0]);
- buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, head->rc_readbytes);
- buf->pages = &rqstp->rq_pages[1];
- buf->page_len = head->rc_readbytes - buf->head[0].iov_len;
- return 0;
+ return svc_rdma_copy_inline_range(rqstp, head, start, length);
}
/**
@@ -888,42 +889,8 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head)
{
- struct xdr_buf *buf = &rqstp->rq_arg;
- struct svc_rdma_chunk *chunk;
- unsigned int length;
- int ret;
-
- chunk = pcl_first_chunk(&head->rc_read_pcl);
- ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
- if (ret < 0)
- goto out;
-
- /* Split the Receive buffer between the head and tail
- * buffers at Read chunk's position. XDR roundup of the
- * chunk is not included in either the pagelist or in
- * the tail.
- */
- buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
- buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
- buf->head[0].iov_len = chunk->ch_position;
-
- /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
- *
- * If the client already rounded up the chunk length, the
- * length does not change. Otherwise, the length of the page
- * list is increased to include XDR round-up.
- *
- * Currently these chunks always start at page offset 0,
- * thus the rounded-up length never crosses a page boundary.
- */
- buf->pages = &rqstp->rq_pages[0];
- length = xdr_align_size(chunk->ch_length);
- buf->page_len = length;
- buf->len += length;
- buf->buflen += length;
-
-out:
- return ret;
+ return svc_rdma_build_read_chunk(rqstp, head,
+ pcl_first_chunk(&head->rc_read_pcl));
}
/**
@@ -1051,23 +1018,28 @@ static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head)
{
- struct xdr_buf *buf = &rqstp->rq_arg;
- int ret;
-
- ret = svc_rdma_read_call_chunk(rqstp, head);
- if (ret < 0)
- goto out;
-
- buf->len += head->rc_readbytes;
- buf->buflen += head->rc_readbytes;
+ return svc_rdma_read_call_chunk(rqstp, head);
+}
- buf->head[0].iov_base = page_address(rqstp->rq_pages[0]);
- buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, head->rc_readbytes);
- buf->pages = &rqstp->rq_pages[1];
- buf->page_len = head->rc_readbytes - buf->head[0].iov_len;
+/* Pages under I/O have been copied to head->rc_pages. Ensure that
+ * svc_xprt_release() does not put them when svc_rdma_recvfrom()
+ * returns. This has to be done after all Read WRs are constructed
+ * to properly handle a page that happens to be part of I/O on behalf
+ * of two different RDMA segments.
+ *
+ * Note: if the subsequent post_send fails, these pages have already
+ * been moved to head->rc_pages and thus will be cleaned up by
+ * svc_rdma_recv_ctxt_put().
+ */
+static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
+{
+ unsigned int i;
-out:
- return ret;
+ for (i = 0; i < head->rc_page_count; i++) {
+ head->rc_pages[i] = rqstp->rq_pages[i];
+ rqstp->rq_pages[i] = NULL;
+ }
}
/**
@@ -1113,30 +1085,11 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
ret = svc_rdma_read_multiple_chunks(rqstp, head);
} else
ret = svc_rdma_read_special(rqstp, head);
+ svc_rdma_clear_rqst_pages(rqstp, head);
if (ret < 0)
- goto out_err;
+ return ret;
trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
- init_completion(&cc->cc_done);
ret = svc_rdma_post_chunk_ctxt(rdma, cc);
- if (ret < 0)
- goto out_err;
-
- ret = 1;
- wait_for_completion(&cc->cc_done);
- if (cc->cc_status != IB_WC_SUCCESS)
- ret = -EIO;
-
- /* rq_respages starts after the last arg page */
- rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
- /* Ensure svc_rdma_recv_ctxt_put() does not release pages
- * left in @rc_pages while I/O proceeds.
- */
- head->rc_page_count = 0;
-
-out_err:
- svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
- return ret;
+ return ret < 0 ? ret : 1;
}