summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/backchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/backchannel.c')
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c78
1 files changed, 46 insertions, 32 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 8b818bb3518a..ed1a4a3065ee 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -43,7 +43,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
req = rpcrdma_create_req(r_xprt);
if (IS_ERR(req))
return PTR_ERR(req);
- __set_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags);
rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
DMA_TO_DEVICE, GFP_KERNEL);
@@ -74,21 +73,13 @@ out_fail:
static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
unsigned int count)
{
- struct rpcrdma_rep *rep;
int rc = 0;
while (count--) {
- rep = rpcrdma_create_rep(r_xprt);
- if (IS_ERR(rep)) {
- pr_err("RPC: %s: reply buffer alloc failed\n",
- __func__);
- rc = PTR_ERR(rep);
+ rc = rpcrdma_create_rep(r_xprt);
+ if (rc)
break;
- }
-
- rpcrdma_recv_buffer_put(rep);
}
-
return rc;
}
@@ -129,6 +120,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
rqst->rq_xprt = &r_xprt->rx_xprt;
INIT_LIST_HEAD(&rqst->rq_list);
INIT_LIST_HEAD(&rqst->rq_bc_list);
+ __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
goto out_free;
@@ -148,7 +140,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
buffer->rb_bc_srv_max_requests = reqs;
request_module("svcrdma");
-
+ trace_xprtrdma_cb_setup(r_xprt, reqs);
return 0;
out_free:
@@ -196,13 +188,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
return maxmsg - RPCRDMA_HDRLEN_MIN;
}
-/**
- * rpcrdma_bc_marshal_reply - Send backwards direction reply
- * @rqst: buffer containing RPC reply data
- *
- * Returns zero on success.
- */
-int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
+static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
@@ -226,7 +212,46 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
&rqst->rq_snd_buf, rpcrdma_noch))
return -EIO;
+
+ trace_xprtrdma_cb_reply(rqst);
+ return 0;
+}
+
+/**
+ * xprt_rdma_bc_send_reply - marshal and send a backchannel reply
+ * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf
+ *
+ * Caller holds the transport's write lock.
+ *
+ * Returns:
+ * %0 if the RPC message has been sent
+ * %-ENOTCONN if the caller should reconnect and call again
+ * %-EIO if a permanent error occurred and the request was not
+ * sent. Do not try to send this message again.
+ */
+int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ int rc;
+
+ if (!xprt_connected(rqst->rq_xprt))
+ goto drop_connection;
+
+ rc = rpcrdma_bc_marshal_reply(rqst);
+ if (rc < 0)
+ goto failed_marshal;
+
+ if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
+ goto drop_connection;
return 0;
+
+failed_marshal:
+ if (rc != -ENOTCONN)
+ return rc;
+drop_connection:
+ xprt_disconnect_done(rqst->rq_xprt);
+ return -ENOTCONN;
}
/**
@@ -262,11 +287,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
dprintk("RPC: %s: freeing rqst %p (req %p)\n",
__func__, rqst, rpcr_to_rdmar(rqst));
- smp_mb__before_atomic();
- WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
- clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
- smp_mb__after_atomic();
-
spin_lock_bh(&xprt->bc_pa_lock);
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
spin_unlock_bh(&xprt->bc_pa_lock);
@@ -274,7 +294,7 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
/**
* rpcrdma_bc_receive_call - Handle a backward direction call
- * @xprt: transport receiving the call
+ * @r_xprt: transport receiving the call
* @rep: receive buffer containing the call
*
* Operational assumptions:
@@ -313,7 +333,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst, rq_bc_pa_list);
list_del(&rqst->rq_bc_pa_list);
spin_unlock(&xprt->bc_pa_lock);
- dprintk("RPC: %s: using rqst %p\n", __func__, rqst);
/* Prepare rqst */
rqst->rq_reply_bytes_recvd = 0;
@@ -321,7 +340,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
rqst->rq_xid = *p;
rqst->rq_private_buf.len = size;
- set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
buf = &rqst->rq_rcv_buf;
memset(buf, 0, sizeof(*buf));
@@ -335,12 +353,8 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
* the Upper Layer is done decoding it.
*/
req = rpcr_to_rdmar(rqst);
- dprintk("RPC: %s: attaching rep %p to req %p\n",
- __func__, rep, req);
req->rl_reply = rep;
-
- /* Defeat the retransmit detection logic in send_request */
- req->rl_connect_cookie = 0;
+ trace_xprtrdma_cb_call(rqst);
/* Queue rqst for ULP's callback service */
bc_serv = xprt->bc_serv;