summaryrefslogtreecommitdiff
path: root/drivers/infiniband/sw/rxe/rxe_req.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/infiniband/sw/rxe/rxe_req.c')
-rw-r--r--drivers/infiniband/sw/rxe/rxe_req.c381
1 files changed, 234 insertions, 147 deletions
diff --git a/drivers/infiniband/sw/rxe/rxe_req.c b/drivers/infiniband/sw/rxe/rxe_req.c
index 5eb89052dd66..373b03f223be 100644
--- a/drivers/infiniband/sw/rxe/rxe_req.c
+++ b/drivers/infiniband/sw/rxe/rxe_req.c
@@ -5,7 +5,6 @@
*/
#include <linux/skbuff.h>
-#include <crypto/hash.h>
#include "rxe.h"
#include "rxe_loc.h"
@@ -15,8 +14,7 @@ static int next_opcode(struct rxe_qp *qp, struct rxe_send_wqe *wqe,
u32 opcode);
static inline void retry_first_write_send(struct rxe_qp *qp,
- struct rxe_send_wqe *wqe,
- unsigned int mask, int npsn)
+ struct rxe_send_wqe *wqe, int npsn)
{
int i;
@@ -33,8 +31,6 @@ static inline void retry_first_write_send(struct rxe_qp *qp,
} else {
advance_dma_data(&wqe->dma, to_send);
}
- if (mask & WR_WRITE_MASK)
- wqe->iova += qp->mtu;
}
}
@@ -85,7 +81,7 @@ static void req_retry(struct rxe_qp *qp)
if (mask & WR_WRITE_OR_SEND_MASK) {
npsn = (qp->comp.psn - wqe->first_psn) &
BTH_PSN_MASK;
- retry_first_write_send(qp, wqe, mask, npsn);
+ retry_first_write_send(qp, wqe, npsn);
}
if (mask & WR_READ_MASK) {
@@ -101,45 +97,51 @@ static void req_retry(struct rxe_qp *qp)
void rnr_nak_timer(struct timer_list *t)
{
- struct rxe_qp *qp = from_timer(qp, t, rnr_nak_timer);
+ struct rxe_qp *qp = timer_container_of(qp, t, rnr_nak_timer);
+ unsigned long flags;
- pr_debug("qp#%d rnr nak timer fired\n", qp_num(qp));
- rxe_run_task(&qp->req.task, 1);
+ rxe_dbg_qp(qp, "nak timer fired\n");
+
+ spin_lock_irqsave(&qp->state_lock, flags);
+ if (qp->valid) {
+ /* request a send queue retry */
+ qp->req.need_retry = 1;
+ qp->req.wait_for_rnr_timer = 0;
+ rxe_sched_task(&qp->send_task);
+ }
+ spin_unlock_irqrestore(&qp->state_lock, flags);
}
-static struct rxe_send_wqe *req_next_wqe(struct rxe_qp *qp)
+static void req_check_sq_drain_done(struct rxe_qp *qp)
{
- struct rxe_send_wqe *wqe;
- struct rxe_queue *q = qp->sq.queue;
- unsigned int index = qp->req.wqe_index;
+ struct rxe_queue *q;
+ unsigned int index;
unsigned int cons;
- unsigned int prod;
+ struct rxe_send_wqe *wqe;
+ unsigned long flags;
- wqe = queue_head(q, QUEUE_TYPE_FROM_CLIENT);
- cons = queue_get_consumer(q, QUEUE_TYPE_FROM_CLIENT);
- prod = queue_get_producer(q, QUEUE_TYPE_FROM_CLIENT);
+ spin_lock_irqsave(&qp->state_lock, flags);
+ if (qp_state(qp) == IB_QPS_SQD) {
+ q = qp->sq.queue;
+ index = qp->req.wqe_index;
+ cons = queue_get_consumer(q, QUEUE_TYPE_FROM_CLIENT);
+ wqe = queue_addr_from_index(q, cons);
- if (unlikely(qp->req.state == QP_STATE_DRAIN)) {
/* check to see if we are drained;
* state_lock used by requester and completer
*/
- spin_lock_bh(&qp->state_lock);
do {
- if (qp->req.state != QP_STATE_DRAIN) {
+ if (!qp->attr.sq_draining)
/* comp just finished */
- spin_unlock_bh(&qp->state_lock);
break;
- }
if (wqe && ((index != cons) ||
- (wqe->state != wqe_state_posted))) {
+ (wqe->state != wqe_state_posted)))
/* comp not done yet */
- spin_unlock_bh(&qp->state_lock);
break;
- }
- qp->req.state = QP_STATE_DRAINED;
- spin_unlock_bh(&qp->state_lock);
+ qp->attr.sq_draining = 0;
+ spin_unlock_irqrestore(&qp->state_lock, flags);
if (qp->ibqp.event_handler) {
struct ib_event ev;
@@ -150,29 +152,74 @@ static struct rxe_send_wqe *req_next_wqe(struct rxe_qp *qp)
qp->ibqp.event_handler(&ev,
qp->ibqp.qp_context);
}
+ return;
} while (0);
}
+ spin_unlock_irqrestore(&qp->state_lock, flags);
+}
+static struct rxe_send_wqe *__req_next_wqe(struct rxe_qp *qp)
+{
+ struct rxe_queue *q = qp->sq.queue;
+ unsigned int index = qp->req.wqe_index;
+ unsigned int prod;
+
+ prod = queue_get_producer(q, QUEUE_TYPE_FROM_CLIENT);
if (index == prod)
return NULL;
+ else
+ return queue_addr_from_index(q, index);
+}
+
+static struct rxe_send_wqe *req_next_wqe(struct rxe_qp *qp)
+{
+ struct rxe_send_wqe *wqe;
+ unsigned long flags;
- wqe = queue_addr_from_index(q, index);
+ req_check_sq_drain_done(qp);
- if (unlikely((qp->req.state == QP_STATE_DRAIN ||
- qp->req.state == QP_STATE_DRAINED) &&
- (wqe->state != wqe_state_processing)))
+ wqe = __req_next_wqe(qp);
+ if (wqe == NULL)
return NULL;
- if (unlikely((wqe->wr.send_flags & IB_SEND_FENCE) &&
- (index != cons))) {
- qp->req.wait_fence = 1;
+ spin_lock_irqsave(&qp->state_lock, flags);
+ if (unlikely((qp_state(qp) == IB_QPS_SQD) &&
+ (wqe->state != wqe_state_processing))) {
+ spin_unlock_irqrestore(&qp->state_lock, flags);
return NULL;
}
+ spin_unlock_irqrestore(&qp->state_lock, flags);
wqe->mask = wr_opcode_mask(wqe->wr.opcode, qp);
return wqe;
}
+/**
+ * rxe_wqe_is_fenced - check if next wqe is fenced
+ * @qp: the queue pair
+ * @wqe: the next wqe
+ *
+ * Returns: 1 if wqe needs to wait
+ * 0 if wqe is ready to go
+ */
+static int rxe_wqe_is_fenced(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
+{
+ /* Local invalidate fence (LIF) see IBA 10.6.5.1
+ * Requires ALL previous operations on the send queue
+ * are complete. Make mandatory for the rxe driver.
+ */
+ if (wqe->wr.opcode == IB_WR_LOCAL_INV)
+ return qp->req.wqe_index != queue_get_consumer(qp->sq.queue,
+ QUEUE_TYPE_FROM_CLIENT);
+
+ /* Fence see IBA 10.8.3.3
+ * Requires that all previous read and atomic operations
+ * are complete.
+ */
+ return (wqe->wr.send_flags & IB_SEND_FENCE) &&
+ atomic_read(&qp->req.rd_atomic) != qp->attr.max_rd_atomic;
+}
+
static int next_opcode_rc(struct rxe_qp *qp, u32 opcode, int fits)
{
switch (opcode) {
@@ -220,6 +267,9 @@ static int next_opcode_rc(struct rxe_qp *qp, u32 opcode, int fits)
IB_OPCODE_RC_SEND_ONLY_WITH_IMMEDIATE :
IB_OPCODE_RC_SEND_FIRST;
+ case IB_WR_FLUSH:
+ return IB_OPCODE_RC_FLUSH;
+
case IB_WR_RDMA_READ:
return IB_OPCODE_RC_RDMA_READ_REQUEST;
@@ -237,6 +287,10 @@ static int next_opcode_rc(struct rxe_qp *qp, u32 opcode, int fits)
else
return fits ? IB_OPCODE_RC_SEND_ONLY_WITH_INVALIDATE :
IB_OPCODE_RC_SEND_FIRST;
+
+ case IB_WR_ATOMIC_WRITE:
+ return IB_OPCODE_RC_ATOMIC_WRITE;
+
case IB_WR_REG_MR:
case IB_WR_LOCAL_INV:
return opcode;
@@ -308,7 +362,6 @@ static int next_opcode(struct rxe_qp *qp, struct rxe_send_wqe *wqe,
case IB_QPT_UC:
return next_opcode_uc(qp, opcode, fits);
- case IB_QPT_SMI:
case IB_QPT_UD:
case IB_QPT_GSI:
switch (opcode) {
@@ -358,37 +411,25 @@ static inline int get_mtu(struct rxe_qp *qp)
}
static struct sk_buff *init_req_packet(struct rxe_qp *qp,
+ struct rxe_av *av,
struct rxe_send_wqe *wqe,
- int opcode, int payload,
+ int opcode, u32 payload,
struct rxe_pkt_info *pkt)
{
struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
struct sk_buff *skb;
struct rxe_send_wr *ibwr = &wqe->wr;
- struct rxe_av *av;
int pad = (-payload) & 0x3;
int paylen;
int solicited;
u32 qp_num;
- int ack_req;
+ int ack_req = 0;
/* length from start of bth to end of icrc */
paylen = rxe_opcode[opcode].length + payload + pad + RXE_ICRC_SIZE;
-
- /* pkt->hdr, port_num and mask are initialized in ifc layer */
- pkt->rxe = rxe;
- pkt->opcode = opcode;
- pkt->qp = qp;
- pkt->psn = qp->req.psn;
- pkt->mask = rxe_opcode[opcode].mask;
- pkt->paylen = paylen;
- pkt->wqe = wqe;
+ pkt->paylen = paylen;
/* init skb */
- av = rxe_get_av(pkt);
- if (!av)
- return NULL;
-
skb = rxe_init_packet(rxe, av, paylen, pkt);
if (unlikely(!skb))
return NULL;
@@ -403,8 +444,9 @@ static struct sk_buff *init_req_packet(struct rxe_qp *qp,
qp_num = (pkt->mask & RXE_DETH_MASK) ? ibwr->wr.ud.remote_qpn :
qp->attr.dest_qp_num;
- ack_req = ((pkt->mask & RXE_END_MASK) ||
- (qp->req.noack_pkts++ > RXE_MAX_PKT_PER_ACK));
+ if (qp_type(qp) != IB_QPT_UD && qp_type(qp) != IB_QPT_UC)
+ ack_req = ((pkt->mask & RXE_END_MASK) ||
+ (qp->req.noack_pkts++ > RXE_MAX_PKT_PER_ACK));
if (ack_req)
qp->req.noack_pkts = 0;
@@ -413,11 +455,18 @@ static struct sk_buff *init_req_packet(struct rxe_qp *qp,
/* init optional headers */
if (pkt->mask & RXE_RETH_MASK) {
- reth_set_rkey(pkt, ibwr->wr.rdma.rkey);
+ if (pkt->mask & RXE_FETH_MASK)
+ reth_set_rkey(pkt, ibwr->wr.flush.rkey);
+ else
+ reth_set_rkey(pkt, ibwr->wr.rdma.rkey);
reth_set_va(pkt, wqe->iova);
reth_set_len(pkt, wqe->dma.resid);
}
+ /* Fill Flush Extension Transport Header */
+ if (pkt->mask & RXE_FETH_MASK)
+ feth_init(pkt, ibwr->wr.flush.type, ibwr->wr.flush.level);
+
if (pkt->mask & RXE_IMMDT_MASK)
immdt_set_imm(pkt, ibwr->ex.imm_data);
@@ -426,8 +475,7 @@ static struct sk_buff *init_req_packet(struct rxe_qp *qp,
if (pkt->mask & RXE_ATMETH_MASK) {
atmeth_set_va(pkt, wqe->iova);
- if (opcode == IB_OPCODE_RC_COMPARE_SWAP ||
- opcode == IB_OPCODE_RD_COMPARE_SWAP) {
+ if (opcode == IB_OPCODE_RC_COMPARE_SWAP) {
atmeth_set_swap_add(pkt, ibwr->wr.atomic.swap);
atmeth_set_comp(pkt, ibwr->wr.atomic.compare_add);
} else {
@@ -447,13 +495,13 @@ static struct sk_buff *init_req_packet(struct rxe_qp *qp,
return skb;
}
-static int finish_packet(struct rxe_qp *qp, struct rxe_send_wqe *wqe,
- struct rxe_pkt_info *pkt, struct sk_buff *skb,
- int paylen)
+static int finish_packet(struct rxe_qp *qp, struct rxe_av *av,
+ struct rxe_send_wqe *wqe, struct rxe_pkt_info *pkt,
+ struct sk_buff *skb, u32 payload)
{
int err;
- err = rxe_prepare(pkt, skb);
+ err = rxe_prepare(av, pkt, skb);
if (err)
return err;
@@ -461,22 +509,30 @@ static int finish_packet(struct rxe_qp *qp, struct rxe_send_wqe *wqe,
if (wqe->wr.send_flags & IB_SEND_INLINE) {
u8 *tmp = &wqe->dma.inline_data[wqe->dma.sge_offset];
- memcpy(payload_addr(pkt), tmp, paylen);
+ memcpy(payload_addr(pkt), tmp, payload);
- wqe->dma.resid -= paylen;
- wqe->dma.sge_offset += paylen;
+ wqe->dma.resid -= payload;
+ wqe->dma.sge_offset += payload;
} else {
err = copy_data(qp->pd, 0, &wqe->dma,
- payload_addr(pkt), paylen,
+ payload_addr(pkt), payload,
RXE_FROM_MR_OBJ);
if (err)
return err;
}
if (bth_pad(pkt)) {
- u8 *pad = payload_addr(pkt) + paylen;
+ u8 *pad = payload_addr(pkt) + payload;
memset(pad, 0, bth_pad(pkt));
}
+ } else if (pkt->mask & RXE_FLUSH_MASK) {
+ /* oA19-2: shall have no payload. */
+ wqe->dma.resid = 0;
+ }
+
+ if (pkt->mask & RXE_ATOMIC_WRITE_MASK) {
+ memcpy(payload_addr(pkt), wqe->dma.atomic_wr, payload);
+ wqe->dma.resid -= payload;
}
return 0;
@@ -489,6 +545,8 @@ static void update_wqe_state(struct rxe_qp *qp,
if (pkt->mask & RXE_END_MASK) {
if (qp_type(qp) == IB_QPT_RC)
wqe->state = wqe_state_pending;
+ else
+ wqe->state = wqe_state_done;
} else {
wqe->state = wqe_state_processing;
}
@@ -497,7 +555,7 @@ static void update_wqe_state(struct rxe_qp *qp,
static void update_wqe_psn(struct rxe_qp *qp,
struct rxe_send_wqe *wqe,
struct rxe_pkt_info *pkt,
- int payload)
+ u32 payload)
{
/* number of packets left to send including current one */
int num_pkt = (wqe->dma.resid + payload + qp->mtu - 1) / qp->mtu;
@@ -517,30 +575,7 @@ static void update_wqe_psn(struct rxe_qp *qp,
qp->req.psn = (qp->req.psn + 1) & BTH_PSN_MASK;
}
-static void save_state(struct rxe_send_wqe *wqe,
- struct rxe_qp *qp,
- struct rxe_send_wqe *rollback_wqe,
- u32 *rollback_psn)
-{
- rollback_wqe->state = wqe->state;
- rollback_wqe->first_psn = wqe->first_psn;
- rollback_wqe->last_psn = wqe->last_psn;
- *rollback_psn = qp->req.psn;
-}
-
-static void rollback_state(struct rxe_send_wqe *wqe,
- struct rxe_qp *qp,
- struct rxe_send_wqe *rollback_wqe,
- u32 rollback_psn)
-{
- wqe->state = rollback_wqe->state;
- wqe->first_psn = rollback_wqe->first_psn;
- wqe->last_psn = rollback_wqe->last_psn;
- qp->req.psn = rollback_psn;
-}
-
-static void update_state(struct rxe_qp *qp, struct rxe_send_wqe *wqe,
- struct rxe_pkt_info *pkt, int payload)
+static void update_state(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
{
qp->req.opcode = pkt->opcode;
@@ -589,7 +624,7 @@ static int rxe_do_local_ops(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
}
break;
default:
- pr_err("Unexpected send wqe opcode %d\n", opcode);
+ rxe_dbg_qp(qp, "Unexpected send wqe opcode %d\n", opcode);
wqe->status = IB_WC_LOC_QP_OP_ERR;
return -EINVAL;
}
@@ -598,45 +633,63 @@ static int rxe_do_local_ops(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
wqe->status = IB_WC_SUCCESS;
qp->req.wqe_index = queue_next_index(qp->sq.queue, qp->req.wqe_index);
- if ((wqe->wr.send_flags & IB_SEND_SIGNALED) ||
- qp->sq_sig_type == IB_SIGNAL_ALL_WR)
- rxe_run_task(&qp->comp.task, 1);
-
return 0;
}
-int rxe_requester(void *arg)
+int rxe_requester(struct rxe_qp *qp)
{
- struct rxe_qp *qp = (struct rxe_qp *)arg;
+ struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
struct rxe_pkt_info pkt;
struct sk_buff *skb;
struct rxe_send_wqe *wqe;
enum rxe_hdr_mask mask;
- int payload;
+ u32 payload;
int mtu;
int opcode;
+ int err;
int ret;
- struct rxe_send_wqe rollback_wqe;
- u32 rollback_psn;
struct rxe_queue *q = qp->sq.queue;
+ struct rxe_ah *ah;
+ struct rxe_av *av;
+ unsigned long flags;
- rxe_add_ref(qp);
-
-next_wqe:
- if (unlikely(!qp->valid || qp->req.state == QP_STATE_ERROR))
+ spin_lock_irqsave(&qp->state_lock, flags);
+ if (unlikely(!qp->valid)) {
+ spin_unlock_irqrestore(&qp->state_lock, flags);
goto exit;
+ }
+
+ if (unlikely(qp_state(qp) == IB_QPS_ERR)) {
+ wqe = __req_next_wqe(qp);
+ spin_unlock_irqrestore(&qp->state_lock, flags);
+ if (wqe) {
+ wqe->status = IB_WC_WR_FLUSH_ERR;
+ goto err;
+ } else {
+ goto exit;
+ }
+ }
- if (unlikely(qp->req.state == QP_STATE_RESET)) {
+ if (unlikely(qp_state(qp) == IB_QPS_RESET)) {
qp->req.wqe_index = queue_get_consumer(q,
QUEUE_TYPE_FROM_CLIENT);
qp->req.opcode = -1;
qp->req.need_rd_atomic = 0;
qp->req.wait_psn = 0;
qp->req.need_retry = 0;
+ qp->req.wait_for_rnr_timer = 0;
+ spin_unlock_irqrestore(&qp->state_lock, flags);
goto exit;
}
+ spin_unlock_irqrestore(&qp->state_lock, flags);
- if (unlikely(qp->req.need_retry)) {
+ /* we come here if the retransmit timer has fired
+ * or if the rnr timer has fired. If the retransmit
+ * timer fires while we are processing an RNR NAK wait
+ * until the rnr timer has fired before starting the
+ * retry flow
+ */
+ if (unlikely(qp->req.need_retry && !qp->req.wait_for_rnr_timer)) {
req_retry(qp);
qp->req.need_retry = 0;
}
@@ -645,12 +698,17 @@ next_wqe:
if (unlikely(!wqe))
goto exit;
+ if (rxe_wqe_is_fenced(qp, wqe)) {
+ qp->req.wait_fence = 1;
+ goto exit;
+ }
+
if (wqe->mask & WR_LOCAL_OP_MASK) {
- ret = rxe_do_local_ops(qp, wqe);
- if (unlikely(ret))
+ err = rxe_do_local_ops(qp, wqe);
+ if (unlikely(err))
goto err;
else
- goto next_wqe;
+ goto done;
}
if (unlikely(qp_type(qp) == IB_QPT_RC &&
@@ -670,17 +728,19 @@ next_wqe:
opcode = next_opcode(qp, wqe, wqe->wr.opcode);
if (unlikely(opcode < 0)) {
wqe->status = IB_WC_LOC_QP_OP_ERR;
- goto exit;
+ goto err;
}
mask = rxe_opcode[opcode].mask;
- if (unlikely(mask & RXE_READ_OR_ATOMIC_MASK)) {
+ if (unlikely(mask & (RXE_READ_OR_ATOMIC_MASK |
+ RXE_ATOMIC_WRITE_MASK))) {
if (check_init_depth(qp, wqe))
goto exit;
}
mtu = get_mtu(qp);
- payload = (mask & RXE_WRITE_OR_SEND_MASK) ? wqe->dma.resid : 0;
+ payload = (mask & (RXE_WRITE_OR_SEND_MASK | RXE_ATOMIC_WRITE_MASK)) ?
+ wqe->dma.resid : 0;
if (payload > mtu) {
if (qp_type(qp) == IB_QPT_UD) {
/* C10-93.1.1: If the total sum of all the buffer lengths specified for a
@@ -698,64 +758,91 @@ next_wqe:
qp->req.wqe_index);
wqe->state = wqe_state_done;
wqe->status = IB_WC_SUCCESS;
- __rxe_do_task(&qp->comp.task);
- rxe_drop_ref(qp);
- return 0;
+ goto done;
}
payload = mtu;
}
- skb = init_req_packet(qp, wqe, opcode, payload, &pkt);
+ pkt.rxe = rxe;
+ pkt.opcode = opcode;
+ pkt.qp = qp;
+ pkt.psn = qp->req.psn;
+ pkt.mask = rxe_opcode[opcode].mask;
+ pkt.wqe = wqe;
+
+ av = rxe_get_av(&pkt, &ah);
+ if (unlikely(!av)) {
+ rxe_dbg_qp(qp, "Failed no address vector\n");
+ wqe->status = IB_WC_LOC_QP_OP_ERR;
+ goto err;
+ }
+
+ skb = init_req_packet(qp, av, wqe, opcode, payload, &pkt);
if (unlikely(!skb)) {
- pr_err("qp#%d Failed allocating skb\n", qp_num(qp));
+ rxe_dbg_qp(qp, "Failed allocating skb\n");
wqe->status = IB_WC_LOC_QP_OP_ERR;
+ if (ah)
+ rxe_put(ah);
goto err;
}
- ret = finish_packet(qp, wqe, &pkt, skb, payload);
- if (unlikely(ret)) {
- pr_debug("qp#%d Error during finish packet\n", qp_num(qp));
- if (ret == -EFAULT)
+ err = finish_packet(qp, av, wqe, &pkt, skb, payload);
+ if (unlikely(err)) {
+ rxe_dbg_qp(qp, "Error during finish packet\n");
+ if (err == -EFAULT)
wqe->status = IB_WC_LOC_PROT_ERR;
else
wqe->status = IB_WC_LOC_QP_OP_ERR;
kfree_skb(skb);
+ if (ah)
+ rxe_put(ah);
goto err;
}
- /*
- * To prevent a race on wqe access between requester and completer,
- * wqe members state and psn need to be set before calling
- * rxe_xmit_packet().
- * Otherwise, completer might initiate an unjustified retry flow.
- */
- save_state(wqe, qp, &rollback_wqe, &rollback_psn);
- update_wqe_state(qp, wqe, &pkt);
- update_wqe_psn(qp, wqe, &pkt, payload);
- ret = rxe_xmit_packet(qp, &pkt, skb);
- if (ret) {
- qp->need_req_skb = 1;
-
- rollback_state(wqe, qp, &rollback_wqe, rollback_psn);
-
- if (ret == -EAGAIN) {
- rxe_run_task(&qp->req.task, 1);
- goto exit;
- }
+ if (ah)
+ rxe_put(ah);
+ err = rxe_xmit_packet(qp, &pkt, skb);
+ if (err) {
wqe->status = IB_WC_LOC_QP_OP_ERR;
goto err;
}
- update_state(qp, wqe, &pkt, payload);
-
- goto next_wqe;
+ update_wqe_state(qp, wqe, &pkt);
+ update_wqe_psn(qp, wqe, &pkt, payload);
+ update_state(qp, &pkt);
+ /* A non-zero return value will cause rxe_do_task to
+ * exit its loop and end the work item. A zero return
+ * will continue looping and return to rxe_requester
+ */
+done:
+ ret = 0;
+ goto out;
err:
+ /* update wqe_index for each wqe completion */
+ qp->req.wqe_index = queue_next_index(qp->sq.queue, qp->req.wqe_index);
wqe->state = wqe_state_error;
- __rxe_do_task(&qp->comp.task);
-
+ rxe_qp_error(qp);
exit:
- rxe_drop_ref(qp);
- return -EAGAIN;
+ ret = -EAGAIN;
+out:
+ return ret;
+}
+
+int rxe_sender(struct rxe_qp *qp)
+{
+ int req_ret;
+ int comp_ret;
+
+ /* process the send queue */
+ req_ret = rxe_requester(qp);
+
+ /* process the response queue */
+ comp_ret = rxe_completer(qp);
+
+ /* exit the task loop if both requester and completer
+ * are ready
+ */
+ return (req_ret && comp_ret) ? -EAGAIN : 0;
}