summaryrefslogtreecommitdiff
path: root/net/rds/ib_send.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/ib_send.c')
-rw-r--r--net/rds/ib_send.c358
1 files changed, 177 insertions, 181 deletions
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e59094981175..4190b90ff3b1 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006 Oracle. All rights reserved.
+ * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
@@ -36,42 +36,10 @@
#include <linux/dmapool.h>
#include <linux/ratelimit.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
-
-static char *rds_ib_wc_status_strings[] = {
-#define RDS_IB_WC_STATUS_STR(foo) \
- [IB_WC_##foo] = __stringify(IB_WC_##foo)
- RDS_IB_WC_STATUS_STR(SUCCESS),
- RDS_IB_WC_STATUS_STR(LOC_LEN_ERR),
- RDS_IB_WC_STATUS_STR(LOC_QP_OP_ERR),
- RDS_IB_WC_STATUS_STR(LOC_EEC_OP_ERR),
- RDS_IB_WC_STATUS_STR(LOC_PROT_ERR),
- RDS_IB_WC_STATUS_STR(WR_FLUSH_ERR),
- RDS_IB_WC_STATUS_STR(MW_BIND_ERR),
- RDS_IB_WC_STATUS_STR(BAD_RESP_ERR),
- RDS_IB_WC_STATUS_STR(LOC_ACCESS_ERR),
- RDS_IB_WC_STATUS_STR(REM_INV_REQ_ERR),
- RDS_IB_WC_STATUS_STR(REM_ACCESS_ERR),
- RDS_IB_WC_STATUS_STR(REM_OP_ERR),
- RDS_IB_WC_STATUS_STR(RETRY_EXC_ERR),
- RDS_IB_WC_STATUS_STR(RNR_RETRY_EXC_ERR),
- RDS_IB_WC_STATUS_STR(LOC_RDD_VIOL_ERR),
- RDS_IB_WC_STATUS_STR(REM_INV_RD_REQ_ERR),
- RDS_IB_WC_STATUS_STR(REM_ABORT_ERR),
- RDS_IB_WC_STATUS_STR(INV_EECN_ERR),
- RDS_IB_WC_STATUS_STR(INV_EEC_STATE_ERR),
- RDS_IB_WC_STATUS_STR(FATAL_ERR),
- RDS_IB_WC_STATUS_STR(RESP_TIMEOUT_ERR),
- RDS_IB_WC_STATUS_STR(GENERAL_ERR),
-#undef RDS_IB_WC_STATUS_STR
-};
-
-char *rds_ib_wc_status_str(enum ib_wc_status status)
-{
- return rds_str_array(rds_ib_wc_status_strings,
- ARRAY_SIZE(rds_ib_wc_status_strings), status);
-}
+#include "ib_mr.h"
/*
* Convert IB-specific error message to RDS error message and call core
@@ -234,11 +202,12 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
send->s_wr.ex.imm_data = 0;
sge = &send->s_sge[0];
- sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
+ sge->addr = ic->i_send_hdrs_dma[i];
+
sge->length = sizeof(struct rds_header);
- sge->lkey = ic->i_mr->lkey;
+ sge->lkey = ic->i_pd->local_dma_lkey;
- send->s_sge[1].lkey = ic->i_mr->lkey;
+ send->s_sge[1].lkey = ic->i_pd->local_dma_lkey;
}
}
@@ -271,81 +240,71 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
* unallocs the next free entry in the ring it doesn't alter which is
* the next to be freed, which is what this is concerned with.
*/
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
{
- struct rds_connection *conn = context;
- struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_message *rm = NULL;
- struct ib_wc wc;
+ struct rds_connection *conn = ic->conn;
struct rds_ib_send_work *send;
u32 completed;
u32 oldest;
u32 i = 0;
- int ret;
int nr_sig = 0;
- rdsdebug("cq %p conn %p\n", cq, conn);
- rds_ib_stats_inc(s_ib_tx_cq_call);
- ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
- if (ret)
- rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
-
- while (ib_poll_cq(cq, 1, &wc) > 0) {
- rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
- (unsigned long long)wc.wr_id, wc.status,
- rds_ib_wc_status_str(wc.status), wc.byte_len,
- be32_to_cpu(wc.ex.imm_data));
- rds_ib_stats_inc(s_ib_tx_cq_event);
-
- if (wc.wr_id == RDS_IB_ACK_WR_ID) {
- if (ic->i_ack_queued + HZ/2 < jiffies)
- rds_ib_stats_inc(s_ib_tx_stalled);
- rds_ib_ack_send_complete(ic);
- continue;
- }
- oldest = rds_ib_ring_oldest(&ic->i_send_ring);
+ rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
+ (unsigned long long)wc->wr_id, wc->status,
+ ib_wc_status_msg(wc->status), wc->byte_len,
+ be32_to_cpu(wc->ex.imm_data));
+ rds_ib_stats_inc(s_ib_tx_cq_event);
- completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
+ if (wc->wr_id == RDS_IB_ACK_WR_ID) {
+ if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
+ rds_ib_stats_inc(s_ib_tx_stalled);
+ rds_ib_ack_send_complete(ic);
+ return;
+ }
- for (i = 0; i < completed; i++) {
- send = &ic->i_sends[oldest];
- if (send->s_wr.send_flags & IB_SEND_SIGNALED)
- nr_sig++;
+ oldest = rds_ib_ring_oldest(&ic->i_send_ring);
- rm = rds_ib_send_unmap_op(ic, send, wc.status);
+ completed = rds_ib_ring_completed(&ic->i_send_ring, wc->wr_id, oldest);
- if (send->s_queued + HZ/2 < jiffies)
- rds_ib_stats_inc(s_ib_tx_stalled);
+ for (i = 0; i < completed; i++) {
+ send = &ic->i_sends[oldest];
+ if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+ nr_sig++;
- if (send->s_op) {
- if (send->s_op == rm->m_final_op) {
- /* If anyone waited for this message to get flushed out, wake
- * them up now */
- rds_message_unmapped(rm);
- }
- rds_message_put(rm);
- send->s_op = NULL;
- }
+ rm = rds_ib_send_unmap_op(ic, send, wc->status);
- oldest = (oldest + 1) % ic->i_send_ring.w_nr;
- }
+ if (time_after(jiffies, send->s_queued + HZ / 2))
+ rds_ib_stats_inc(s_ib_tx_stalled);
- rds_ib_ring_free(&ic->i_send_ring, completed);
- rds_ib_sub_signaled(ic, nr_sig);
- nr_sig = 0;
-
- if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
- test_bit(0, &conn->c_map_queued))
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
-
- /* We expect errors as the qp is drained during shutdown */
- if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
- rds_ib_conn_error(conn, "send completion on %pI4 had status "
- "%u (%s), disconnecting and reconnecting\n",
- &conn->c_faddr, wc.status,
- rds_ib_wc_status_str(wc.status));
+ if (send->s_op) {
+ if (send->s_op == rm->m_final_op) {
+ /* If anyone waited for this message to get
+ * flushed out, wake them up now
+ */
+ rds_message_unmapped(rm);
+ }
+ rds_message_put(rm);
+ send->s_op = NULL;
}
+
+ oldest = (oldest + 1) % ic->i_send_ring.w_nr;
+ }
+
+ rds_ib_ring_free(&ic->i_send_ring, completed);
+ rds_ib_sub_signaled(ic, nr_sig);
+
+ if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+ test_bit(0, &conn->c_map_queued))
+ queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+ /* We expect errors as the qp is drained during shutdown */
+ if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
+ rds_ib_conn_error(conn, "send completion on <%pI6c,%pI6c,%d> had status %u (%s), vendor err 0x%x, disconnecting and reconnecting\n",
+ &conn->c_laddr, &conn->c_faddr,
+ conn->c_tos, wc->status,
+ ib_wc_status_msg(wc->status), wc->vendor_err);
}
}
@@ -409,7 +368,7 @@ try_again:
posted = IB_GET_POST_CREDITS(oldval);
avail = IB_GET_SEND_CREDITS(oldval);
- rdsdebug("rds_ib_send_grab_credits(%u): credits=%u posted=%u\n",
+ rdsdebug("wanted=%u credits=%u posted=%u\n",
wanted, avail, posted);
/* The last credit must be used to send a credit update. */
@@ -453,7 +412,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
if (credits == 0)
return;
- rdsdebug("rds_ib_send_add_credits(%u): current=%u%s\n",
+ rdsdebug("credits=%u current=%u%s\n",
credits,
IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
@@ -530,7 +489,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
struct rds_ib_send_work *send = NULL;
struct rds_ib_send_work *first;
struct rds_ib_send_work *prev;
- struct ib_send_wr *failed_wr;
+ const struct ib_send_wr *failed_wr;
struct scatterlist *scat;
u32 pos;
u32 i;
@@ -552,16 +511,15 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
&& rm->m_inc.i_hdr.h_flags & RDS_FLAG_CONG_BITMAP) {
rds_cong_map_updated(conn->c_fcong, ~(u64) 0);
scat = &rm->data.op_sg[sg];
- ret = sizeof(struct rds_header) + RDS_CONG_MAP_BYTES;
- ret = min_t(int, ret, scat->length - conn->c_xmit_data_off);
- return ret;
+ ret = max_t(int, RDS_CONG_MAP_BYTES, scat->length);
+ return sizeof(struct rds_header) + ret;
}
/* FIXME we may overallocate here */
if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
i = 1;
else
- i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
+ i = DIV_ROUND_UP(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
if (work_alloc == 0) {
@@ -606,6 +564,8 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
}
rds_message_addref(rm);
+ rm->data.op_dmasg = 0;
+ rm->data.op_dmaoff = 0;
ic->i_data_op = &rm->data;
/* Finalize the header */
@@ -659,7 +619,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
send = &ic->i_sends[pos];
first = send;
prev = NULL;
- scat = &ic->i_data_op->op_sg[sg];
+ scat = &ic->i_data_op->op_sg[rm->data.op_dmasg];
i = 0;
do {
unsigned int len = 0;
@@ -672,36 +632,49 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
send->s_queued = jiffies;
send->s_op = NULL;
- send->s_sge[0].addr = ic->i_send_hdrs_dma
- + (pos * sizeof(struct rds_header));
+ send->s_sge[0].addr = ic->i_send_hdrs_dma[pos];
+
send->s_sge[0].length = sizeof(struct rds_header);
+ send->s_sge[0].lkey = ic->i_pd->local_dma_lkey;
+
+ ib_dma_sync_single_for_cpu(ic->rds_ibdev->dev,
+ ic->i_send_hdrs_dma[pos],
+ sizeof(struct rds_header),
+ DMA_TO_DEVICE);
+ memcpy(ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
+ sizeof(struct rds_header));
- memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
/* Set up the data, if present */
if (i < work_alloc
&& scat != &rm->data.op_sg[rm->data.op_count]) {
- len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
+ len = min(RDS_FRAG_SIZE,
+ sg_dma_len(scat) - rm->data.op_dmaoff);
send->s_wr.num_sge = 2;
- send->s_sge[1].addr = ib_sg_dma_address(dev, scat) + off;
+ send->s_sge[1].addr = sg_dma_address(scat);
+ send->s_sge[1].addr += rm->data.op_dmaoff;
send->s_sge[1].length = len;
+ send->s_sge[1].lkey = ic->i_pd->local_dma_lkey;
bytes_sent += len;
- off += len;
- if (off == ib_sg_dma_len(dev, scat)) {
+ rm->data.op_dmaoff += len;
+ if (rm->data.op_dmaoff == sg_dma_len(scat)) {
scat++;
- off = 0;
+ rm->data.op_dmasg++;
+ rm->data.op_dmaoff = 0;
}
}
- rds_ib_set_wr_signal_state(ic, send, 0);
+ rds_ib_set_wr_signal_state(ic, send, false);
/*
* Always signal the last one if we're stopping due to flow control.
*/
- if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
- send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+ if (ic->i_flowctl && flow_controlled && i == (work_alloc - 1)) {
+ rds_ib_set_wr_signal_state(ic, send, true);
+ send->s_wr.send_flags |= IB_SEND_SOLICITED;
+ }
if (send->s_wr.send_flags & IB_SEND_SIGNALED)
nr_sig++;
@@ -710,7 +683,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
if (ic->i_flowctl && adv_credits) {
- struct rds_header *hdr = &ic->i_send_hdrs[pos];
+ struct rds_header *hdr = ic->i_send_hdrs[pos];
/* add credit and redo the header checksum */
hdr->h_credit = adv_credits;
@@ -718,6 +691,10 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
adv_credits = 0;
rds_ib_stats_inc(s_ib_tx_credit_updates);
}
+ ib_dma_sync_single_for_device(ic->rds_ibdev->dev,
+ ic->i_send_hdrs_dma[pos],
+ sizeof(struct rds_header),
+ DMA_TO_DEVICE);
if (prev)
prev->s_wr.next = &send->s_wr;
@@ -739,6 +716,8 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (scat == &rm->data.op_sg[rm->data.op_count]) {
prev->s_op = ic->i_data_op;
prev->s_wr.send_flags |= IB_SEND_SOLICITED;
+ if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED))
+ nr_sig += rds_ib_set_wr_signal_state(ic, prev, true);
ic->i_data_op = NULL;
}
@@ -760,7 +739,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
first, &first->s_wr, ret, failed_wr);
BUG_ON(failed_wr != &first->s_wr);
if (ret) {
- printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
+ printk(KERN_WARNING "RDS/IB: ib_post_send to %pI6c "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_sub_signaled(ic, nr_sig);
@@ -788,18 +767,14 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
{
struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_ib_send_work *send = NULL;
- struct ib_send_wr *failed_wr;
- struct rds_ib_device *rds_ibdev;
+ const struct ib_send_wr *failed_wr;
u32 pos;
u32 work_alloc;
int ret;
int nr_sig = 0;
- rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
-
work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
if (work_alloc != 1) {
- rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_stats_inc(s_ib_tx_ring_full);
ret = -ENOMEM;
goto out;
@@ -810,23 +785,24 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
send->s_queued = jiffies;
if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
- send->s_wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP;
- send->s_wr.wr.atomic.compare_add = op->op_m_cswp.compare;
- send->s_wr.wr.atomic.swap = op->op_m_cswp.swap;
- send->s_wr.wr.atomic.compare_add_mask = op->op_m_cswp.compare_mask;
- send->s_wr.wr.atomic.swap_mask = op->op_m_cswp.swap_mask;
+ send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP;
+ send->s_atomic_wr.compare_add = op->op_m_cswp.compare;
+ send->s_atomic_wr.swap = op->op_m_cswp.swap;
+ send->s_atomic_wr.compare_add_mask = op->op_m_cswp.compare_mask;
+ send->s_atomic_wr.swap_mask = op->op_m_cswp.swap_mask;
} else { /* FADD */
- send->s_wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD;
- send->s_wr.wr.atomic.compare_add = op->op_m_fadd.add;
- send->s_wr.wr.atomic.swap = 0;
- send->s_wr.wr.atomic.compare_add_mask = op->op_m_fadd.nocarry_mask;
- send->s_wr.wr.atomic.swap_mask = 0;
+ send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD;
+ send->s_atomic_wr.compare_add = op->op_m_fadd.add;
+ send->s_atomic_wr.swap = 0;
+ send->s_atomic_wr.compare_add_mask = op->op_m_fadd.nocarry_mask;
+ send->s_atomic_wr.swap_mask = 0;
}
+ send->s_wr.send_flags = 0;
nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
- send->s_wr.num_sge = 1;
- send->s_wr.next = NULL;
- send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
- send->s_wr.wr.atomic.rkey = op->op_rkey;
+ send->s_atomic_wr.wr.num_sge = 1;
+ send->s_atomic_wr.wr.next = NULL;
+ send->s_atomic_wr.remote_addr = op->op_remote_addr;
+ send->s_atomic_wr.rkey = op->op_rkey;
send->s_op = op;
rds_message_addref(container_of(send->s_op, struct rds_message, atomic));
@@ -841,9 +817,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
}
/* Convert our struct scatterlist to struct ib_sge */
- send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg);
- send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg);
- send->s_sge[0].lkey = ic->i_mr->lkey;
+ send->s_sge[0].addr = sg_dma_address(op->op_sg);
+ send->s_sge[0].length = sg_dma_len(op->op_sg);
+ send->s_sge[0].lkey = ic->i_pd->local_dma_lkey;
rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
send->s_sge[0].addr, send->s_sge[0].length);
@@ -851,22 +827,22 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
if (nr_sig)
atomic_add(nr_sig, &ic->i_signaled_sends);
- failed_wr = &send->s_wr;
- ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
+ failed_wr = &send->s_atomic_wr.wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &send->s_atomic_wr.wr, &failed_wr);
rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
- send, &send->s_wr, ret, failed_wr);
- BUG_ON(failed_wr != &send->s_wr);
+ send, &send->s_atomic_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &send->s_atomic_wr.wr);
if (ret) {
- printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
+ printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI6c "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_sub_signaled(ic, nr_sig);
goto out;
}
- if (unlikely(failed_wr != &send->s_wr)) {
+ if (unlikely(failed_wr != &send->s_atomic_wr.wr)) {
printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
- BUG_ON(failed_wr != &send->s_wr);
+ BUG_ON(failed_wr != &send->s_atomic_wr.wr);
}
out:
@@ -879,7 +855,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
struct rds_ib_send_work *send = NULL;
struct rds_ib_send_work *first;
struct rds_ib_send_work *prev;
- struct ib_send_wr *failed_wr;
+ const struct ib_send_wr *failed_wr;
struct scatterlist *scat;
unsigned long len;
u64 remote_addr = op->op_remote_addr;
@@ -892,27 +868,36 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
int ret;
int num_sge;
int nr_sig = 0;
+ u64 odp_addr = op->op_odp_addr;
+ u32 odp_lkey = 0;
/* map the op the first time we see it */
- if (!op->op_mapped) {
- op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
- op->op_sg, op->op_nents, (op->op_write) ?
- DMA_TO_DEVICE : DMA_FROM_DEVICE);
- rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
- if (op->op_count == 0) {
- rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
- ret = -ENOMEM; /* XXX ? */
- goto out;
+ if (!op->op_odp_mr) {
+ if (!op->op_mapped) {
+ op->op_count =
+ ib_dma_map_sg(ic->i_cm_id->device, op->op_sg,
+ op->op_nents,
+ (op->op_write) ? DMA_TO_DEVICE :
+ DMA_FROM_DEVICE);
+ rdsdebug("ic %p mapping op %p: %d\n", ic, op,
+ op->op_count);
+ if (op->op_count == 0) {
+ rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
+ ret = -ENOMEM; /* XXX ? */
+ goto out;
+ }
+ op->op_mapped = 1;
}
-
- op->op_mapped = 1;
+ } else {
+ op->op_count = op->op_nents;
+ odp_lkey = rds_ib_get_lkey(op->op_odp_mr->r_trans_private);
}
/*
* Instead of knowing how to return a partial rdma read/write we insist that there
* be enough work requests to send the entire message.
*/
- i = ceil(op->op_count, max_sge);
+ i = DIV_ROUND_UP(op->op_count, max_sge);
work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
if (work_alloc != i) {
@@ -934,40 +919,50 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
send->s_queued = jiffies;
send->s_op = NULL;
- nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+ if (!op->op_notify)
+ nr_sig += rds_ib_set_wr_signal_state(ic, send,
+ op->op_notify);
send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
- send->s_wr.wr.rdma.remote_addr = remote_addr;
- send->s_wr.wr.rdma.rkey = op->op_rkey;
+ send->s_rdma_wr.remote_addr = remote_addr;
+ send->s_rdma_wr.rkey = op->op_rkey;
if (num_sge > max_sge) {
- send->s_wr.num_sge = max_sge;
+ send->s_rdma_wr.wr.num_sge = max_sge;
num_sge -= max_sge;
} else {
- send->s_wr.num_sge = num_sge;
+ send->s_rdma_wr.wr.num_sge = num_sge;
}
- send->s_wr.next = NULL;
+ send->s_rdma_wr.wr.next = NULL;
if (prev)
- prev->s_wr.next = &send->s_wr;
-
- for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) {
- len = ib_sg_dma_len(ic->i_cm_id->device, scat);
- send->s_sge[j].addr =
- ib_sg_dma_address(ic->i_cm_id->device, scat);
+ prev->s_rdma_wr.wr.next = &send->s_rdma_wr.wr;
+
+ for (j = 0; j < send->s_rdma_wr.wr.num_sge &&
+ scat != &op->op_sg[op->op_count]; j++) {
+ len = sg_dma_len(scat);
+ if (!op->op_odp_mr) {
+ send->s_sge[j].addr = sg_dma_address(scat);
+ send->s_sge[j].lkey = ic->i_pd->local_dma_lkey;
+ } else {
+ send->s_sge[j].addr = odp_addr;
+ send->s_sge[j].lkey = odp_lkey;
+ }
send->s_sge[j].length = len;
- send->s_sge[j].lkey = ic->i_mr->lkey;
sent += len;
rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
remote_addr += len;
+ odp_addr += len;
scat++;
}
rdsdebug("send %p wr %p num_sge %u next %p\n", send,
- &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
+ &send->s_rdma_wr.wr,
+ send->s_rdma_wr.wr.num_sge,
+ send->s_rdma_wr.wr.next);
prev = send;
if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
@@ -988,22 +983,22 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
if (nr_sig)
atomic_add(nr_sig, &ic->i_signaled_sends);
- failed_wr = &first->s_wr;
- ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+ failed_wr = &first->s_rdma_wr.wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &first->s_rdma_wr.wr, &failed_wr);
rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
- first, &first->s_wr, ret, failed_wr);
- BUG_ON(failed_wr != &first->s_wr);
+ first, &first->s_rdma_wr.wr, ret, failed_wr);
+ BUG_ON(failed_wr != &first->s_rdma_wr.wr);
if (ret) {
- printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
+ printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI6c "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_sub_signaled(ic, nr_sig);
goto out;
}
- if (unlikely(failed_wr != &first->s_wr)) {
+ if (unlikely(failed_wr != &first->s_rdma_wr.wr)) {
printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
- BUG_ON(failed_wr != &first->s_wr);
+ BUG_ON(failed_wr != &first->s_rdma_wr.wr);
}
@@ -1011,8 +1006,9 @@ out:
return ret;
}
-void rds_ib_xmit_complete(struct rds_connection *conn)
+void rds_ib_xmit_path_complete(struct rds_conn_path *cp)
{
+ struct rds_connection *conn = cp->cp_conn;
struct rds_ib_connection *ic = conn->c_transport_data;
/* We may have a pending ACK or window update we were unable