diff options
Diffstat (limited to 'net/smc/smc_tx.c')
| -rw-r--r-- | net/smc/smc_tx.c | 297 |
1 files changed, 202 insertions, 95 deletions
diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c index d8366ed51757..3144b4b1fe29 100644 --- a/net/smc/smc_tx.c +++ b/net/smc/smc_tx.c @@ -24,11 +24,13 @@ #include "smc.h" #include "smc_wr.h" #include "smc_cdc.h" +#include "smc_close.h" #include "smc_ism.h" #include "smc_tx.h" +#include "smc_stats.h" +#include "smc_tracepoint.h" -#define SMC_TX_WORK_DELAY HZ -#define SMC_TX_CORK_DELAY (HZ >> 2) /* 250 ms */ +#define SMC_TX_WORK_DELAY 0 /***************************** sndbuf producer *******************************/ @@ -44,6 +46,8 @@ static void smc_tx_write_space(struct sock *sk) /* similar to sk_stream_write_space */ if (atomic_read(&smc->conn.sndbuf_space) && sock) { + if (test_bit(SOCK_NOSPACE, &sock->flags)) + SMC_STAT_RMB_TX_FULL(smc, !smc->conn.lnk); clear_bit(SOCK_NOSPACE, &sock->flags); rcu_read_lock(); wq = rcu_dereference(sk->sk_wq); @@ -75,18 +79,17 @@ static int smc_tx_wait(struct smc_sock *smc, int flags) DEFINE_WAIT_FUNC(wait, woken_wake_function); struct smc_connection *conn = &smc->conn; struct sock *sk = &smc->sk; - bool noblock; long timeo; int rc = 0; /* similar to sk_stream_wait_memory */ timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT); - noblock = timeo ? false : true; add_wait_queue(sk_sleep(sk), &wait); while (1) { sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk); if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN) || + conn->killed || conn->local_tx_ctrl.conn_state_flags.peer_done_writing) { rc = -EPIPE; break; @@ -96,8 +99,8 @@ static int smc_tx_wait(struct smc_sock *smc, int flags) break; } if (!timeo) { - if (noblock) - set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); + /* ensure EPOLLOUT is subsequently generated */ + set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); rc = -EAGAIN; break; } @@ -110,8 +113,8 @@ static int smc_tx_wait(struct smc_sock *smc, int flags) break; /* at least 1 byte of free & no urgent data */ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); sk_wait_event(sk, &timeo, - sk->sk_err || - (sk->sk_shutdown & SEND_SHUTDOWN) || + READ_ONCE(sk->sk_err) || + (READ_ONCE(sk->sk_shutdown) & SEND_SHUTDOWN) || smc_cdc_rxed_any_close(conn) || (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend), @@ -128,6 +131,50 @@ static bool smc_tx_is_corked(struct smc_sock *smc) return (tp->nonagle & TCP_NAGLE_CORK) ? true : false; } +/* If we have pending CDC messages, do not send: + * Because CQE of this CDC message will happen shortly, it gives + * a chance to coalesce future sendmsg() payload in to one RDMA Write, + * without need for a timer, and with no latency trade off. + * Algorithm here: + * 1. First message should never cork + * 2. If we have pending Tx CDC messages, wait for the first CDC + * message's completion + * 3. Don't cork to much data in a single RDMA Write to prevent burst + * traffic, total corked message should not exceed sendbuf/2 + */ +static bool smc_should_autocork(struct smc_sock *smc) +{ + struct smc_connection *conn = &smc->conn; + int corking_size; + + corking_size = min_t(unsigned int, conn->sndbuf_desc->len >> 1, + sock_net(&smc->sk)->smc.sysctl_autocorking_size); + + if (atomic_read(&conn->cdc_pend_tx_wr) == 0 || + smc_tx_prepared_sends(conn) > corking_size) + return false; + return true; +} + +static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg) +{ + struct smc_connection *conn = &smc->conn; + + if (smc_should_autocork(smc)) + return true; + + /* for a corked socket defer the RDMA writes if + * sndbuf_space is still available. The applications + * should known how/when to uncork it. + */ + if ((msg->msg_flags & MSG_MORE || + smc_tx_is_corked(smc)) && + atomic_read(&conn->sndbuf_space)) + return true; + + return false; +} + /* sndbuf producer: main API called by socket layer. * called under sock lock. */ @@ -151,12 +198,22 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) goto out_err; } + if (sk->sk_state == SMC_INIT) + return -ENOTCONN; + + if (len > conn->sndbuf_desc->len) + SMC_STAT_RMB_TX_SIZE_SMALL(smc, !conn->lnk); + + if (len > conn->peer_rmbe_size) + SMC_STAT_RMB_TX_PEER_SIZE_SMALL(smc, !conn->lnk); + + if (msg->msg_flags & MSG_OOB) + SMC_STAT_INC(smc, urg_data_cnt); + while (msg_data_left(msg)) { - if (sk->sk_state == SMC_INIT) - return -ENOTCONN; if (smc->sk.sk_shutdown & SEND_SHUTDOWN || (smc->sk.sk_err == ECONNABORTED) || - conn->local_tx_ctrl.conn_state_flags.peer_conn_abort) + conn->killed) return -EPIPE; if (smc_cdc_rxed_any_close(conn)) return send_done ?: -ECONNRESET; @@ -165,12 +222,11 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) conn->local_tx_ctrl.prod_flags.urg_data_pending = 1; if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) { + if (send_done) + return send_done; rc = smc_tx_wait(smc, msg->msg_flags); - if (rc) { - if (send_done) - return send_done; + if (rc) goto out_err; - } continue; } @@ -189,7 +245,6 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) tx_cnt_prep); chunk_len_sum = chunk_len; chunk_off = tx_cnt_prep; - smc_sndbuf_sync_sg_for_cpu(conn); for (chunk = 0; chunk < 2; chunk++) { rc = memcpy_from_msg(sndbuf_base + chunk_off, msg, chunk_len); @@ -223,16 +278,13 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) */ if ((msg->msg_flags & MSG_OOB) && !send_remaining) conn->urg_tx_pend = true; - if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) && - (atomic_read(&conn->sndbuf_space) > - (conn->sndbuf_desc->len >> 1))) - /* for a corked socket defer the RDMA writes if there - * is still sufficient sndbuf_space available - */ - schedule_delayed_work(&conn->tx_work, - SMC_TX_CORK_DELAY); - else + /* If we need to cork, do nothing and wait for the next + * sendmsg() call or push on tx completion + */ + if (!smc_tx_should_cork(smc, msg)) smc_tx_sndbuf_nonempty(conn); + + trace_smc_tx_sendmsg(smc, copylen); } /* while (msg_data_left(msg)) */ return send_done; @@ -251,15 +303,11 @@ out_err: int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len, u32 offset, int signal) { - struct smc_ism_position pos; int rc; - memset(&pos, 0, sizeof(pos)); - pos.token = conn->peer_token; - pos.index = conn->peer_rmbe_idx; - pos.offset = conn->tx_off + offset; - pos.signal = signal; - rc = smc_ism_write(conn->lgr->smcd, &pos, data, len); + rc = smc_ism_write(conn->lgr->smcd, conn->peer_token, + conn->peer_rmbe_idx, signal, conn->tx_off + offset, + data, len); if (rc) conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; return rc; @@ -267,31 +315,24 @@ int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len, /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */ static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset, - int num_sges, struct ib_sge sges[]) + int num_sges, struct ib_rdma_wr *rdma_wr) { struct smc_link_group *lgr = conn->lgr; - struct ib_rdma_wr rdma_wr; - struct smc_link *link; + struct smc_link *link = conn->lnk; int rc; - memset(&rdma_wr, 0, sizeof(rdma_wr)); - link = &lgr->lnk[SMC_SINGLE_LINK]; - rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link); - rdma_wr.wr.sg_list = sges; - rdma_wr.wr.num_sge = num_sges; - rdma_wr.wr.opcode = IB_WR_RDMA_WRITE; - rdma_wr.remote_addr = - lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr + + rdma_wr->wr.wr_id = smc_wr_tx_get_next_wr_id(link); + rdma_wr->wr.num_sge = num_sges; + rdma_wr->remote_addr = + lgr->rtokens[conn->rtoken_idx][link->link_idx].dma_addr + /* RMBE within RMB */ conn->tx_off + /* offset within RMBE */ peer_rmbe_offset; - rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey; - rc = ib_post_send(link->roce_qp, &rdma_wr.wr, NULL); - if (rc) { - conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; - smc_lgr_terminate(lgr); - } + rdma_wr->rkey = lgr->rtokens[conn->rtoken_idx][link->link_idx].rkey; + rc = ib_post_send(link->roce_qp, &rdma_wr->wr, NULL); + if (rc) + smcr_link_down_cond_sched(link); return rc; } @@ -314,24 +355,40 @@ static inline void smc_tx_advance_cursors(struct smc_connection *conn, /* SMC-R helper for smc_tx_rdma_writes() */ static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len, size_t src_off, size_t src_len, - size_t dst_off, size_t dst_len) + size_t dst_off, size_t dst_len, + struct smc_rdma_wr *wr_rdma_buf) { + struct smc_link *link = conn->lnk; + dma_addr_t dma_addr = - sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl); - struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK]; + sg_dma_address(conn->sndbuf_desc->sgt[link->link_idx].sgl); + u64 virt_addr = (uintptr_t)conn->sndbuf_desc->cpu_addr; int src_len_sum = src_len, dst_len_sum = dst_len; - struct ib_sge sges[SMC_IB_MAX_SEND_SGE]; int sent_count = src_off; int srcchunk, dstchunk; int num_sges; int rc; for (dstchunk = 0; dstchunk < 2; dstchunk++) { + struct ib_rdma_wr *wr = &wr_rdma_buf->wr_tx_rdma[dstchunk]; + struct ib_sge *sge = wr->wr.sg_list; + u64 base_addr = dma_addr; + + if (dst_len < link->qp_attr.cap.max_inline_data) { + base_addr = virt_addr; + wr->wr.send_flags |= IB_SEND_INLINE; + } else { + wr->wr.send_flags &= ~IB_SEND_INLINE; + } + num_sges = 0; for (srcchunk = 0; srcchunk < 2; srcchunk++) { - sges[srcchunk].addr = dma_addr + src_off; - sges[srcchunk].length = src_len; - sges[srcchunk].lkey = link->roce_pd->local_dma_lkey; + sge[srcchunk].addr = conn->sndbuf_desc->is_vm ? + (virt_addr + src_off) : (base_addr + src_off); + sge[srcchunk].length = src_len; + if (conn->sndbuf_desc->is_vm) + sge[srcchunk].lkey = + conn->sndbuf_desc->mr[link->link_idx]->lkey; num_sges++; src_off += src_len; @@ -344,7 +401,7 @@ static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len, src_len = dst_len - src_len; /* remainder */ src_len_sum += src_len; } - rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges); + rc = smc_tx_rdma_write(conn, dst_off, num_sges, wr); if (rc) return rc; if (dst_len_sum == len) @@ -369,6 +426,9 @@ static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len, int srcchunk, dstchunk; int rc; + if (conn->sndbuf_desc->is_attached) + return 0; + for (dstchunk = 0; dstchunk < 2; dstchunk++) { for (srcchunk = 0; srcchunk < 2; srcchunk++) { void *data = conn->sndbuf_desc->cpu_addr + src_off; @@ -403,7 +463,8 @@ static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len, /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit; * usable snd_wnd as max transmit */ -static int smc_tx_rdma_writes(struct smc_connection *conn) +static int smc_tx_rdma_writes(struct smc_connection *conn, + struct smc_rdma_wr *wr_rdma_buf) { size_t len, src_len, dst_off, dst_len; /* current chunk values */ union smc_host_cursor sent, prep, prod, cons; @@ -422,8 +483,12 @@ static int smc_tx_rdma_writes(struct smc_connection *conn) /* destination: RMBE */ /* cf. snd_wnd */ rmbespace = atomic_read(&conn->peer_rmbe_space); - if (rmbespace <= 0) + if (rmbespace <= 0) { + struct smc_sock *smc = container_of(conn, struct smc_sock, + conn); + SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk); return 0; + } smc_curs_copy(&prod, &conn->local_tx_ctrl.prod, conn); smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn); @@ -464,7 +529,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn) dst_off, dst_len); else rc = smcr_tx_rdma_writes(conn, len, sent.count, src_len, - dst_off, dst_len); + dst_off, dst_len, wr_rdma_buf); if (rc) return rc; @@ -484,41 +549,51 @@ static int smc_tx_rdma_writes(struct smc_connection *conn) */ static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn) { - struct smc_cdc_producer_flags *pflags; + struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags; + struct smc_link *link = conn->lnk; + struct smc_rdma_wr *wr_rdma_buf; struct smc_cdc_tx_pend *pend; struct smc_wr_buf *wr_buf; int rc; - spin_lock_bh(&conn->send_lock); - rc = smc_cdc_get_free_slot(conn, &wr_buf, &pend); + if (!link || !smc_wr_tx_link_hold(link)) + return -ENOLINK; + rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend); if (rc < 0) { + smc_wr_tx_link_put(link); if (rc == -EBUSY) { struct smc_sock *smc = container_of(conn, struct smc_sock, conn); - if (smc->sk.sk_err == ECONNABORTED) { - rc = sock_error(&smc->sk); - goto out_unlock; - } + if (smc->sk.sk_err == ECONNABORTED) + return sock_error(&smc->sk); + if (conn->killed) + return -EPIPE; rc = 0; - if (conn->alert_token_local) /* connection healthy */ - mod_delayed_work(system_wq, &conn->tx_work, - SMC_TX_WORK_DELAY); + mod_delayed_work(conn->lgr->tx_wq, &conn->tx_work, + SMC_TX_WORK_DELAY); } - goto out_unlock; + return rc; } - if (!conn->local_tx_ctrl.prod_flags.urg_data_present) { - rc = smc_tx_rdma_writes(conn); + spin_lock_bh(&conn->send_lock); + if (link != conn->lnk) { + /* link of connection changed, tx_work will restart */ + smc_wr_tx_put_slot(link, + (struct smc_wr_tx_pend_priv *)pend); + rc = -ENOLINK; + goto out_unlock; + } + if (!pflags->urg_data_present) { + rc = smc_tx_rdma_writes(conn, wr_rdma_buf); if (rc) { - smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], + smc_wr_tx_put_slot(link, (struct smc_wr_tx_pend_priv *)pend); goto out_unlock; } } rc = smc_cdc_msg_send(conn, wr_buf, pend); - pflags = &conn->local_tx_ctrl.prod_flags; if (!rc && pflags->urg_data_present) { pflags->urg_data_pending = 0; pflags->urg_data_present = 0; @@ -526,6 +601,7 @@ static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn) out_unlock: spin_unlock_bh(&conn->send_lock); + smc_wr_tx_link_put(link); return rc; } @@ -536,7 +612,7 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn) spin_lock_bh(&conn->send_lock); if (!pflags->urg_data_present) - rc = smc_tx_rdma_writes(conn); + rc = smc_tx_rdma_writes(conn, NULL); if (!rc) rc = smcd_cdc_msg_send(conn); @@ -550,39 +626,69 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn) int smc_tx_sndbuf_nonempty(struct smc_connection *conn) { - int rc; + struct smc_sock *smc = container_of(conn, struct smc_sock, conn); + int rc = 0; + /* No data in the send queue */ + if (unlikely(smc_tx_prepared_sends(conn) <= 0)) + goto out; + + /* Peer don't have RMBE space */ + if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) { + SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk); + goto out; + } + + if (conn->killed || + conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) { + rc = -EPIPE; /* connection being aborted */ + goto out; + } if (conn->lgr->is_smcd) rc = smcd_tx_sndbuf_nonempty(conn); else rc = smcr_tx_sndbuf_nonempty(conn); + if (!rc) { + /* trigger socket release if connection is closing */ + smc_close_wake_tx_prepared(smc); + } + +out: return rc; } /* Wakeup sndbuf consumers from process context - * since there is more data to transmit + * since there is more data to transmit. The caller + * must hold sock lock. */ -void smc_tx_work(struct work_struct *work) +void smc_tx_pending(struct smc_connection *conn) { - struct smc_connection *conn = container_of(to_delayed_work(work), - struct smc_connection, - tx_work); struct smc_sock *smc = container_of(conn, struct smc_sock, conn); int rc; - lock_sock(&smc->sk); - if (smc->sk.sk_err || - !conn->alert_token_local || - conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) - goto out; + if (smc->sk.sk_err) + return; rc = smc_tx_sndbuf_nonempty(conn); if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked && !atomic_read(&conn->bytes_to_rcv)) conn->local_rx_ctrl.prod_flags.write_blocked = 0; +} -out: +/* Wakeup sndbuf consumers from process context + * since there is more data to transmit in locked + * sock. + */ +void smc_tx_work(struct work_struct *work) +{ + struct smc_connection *conn = container_of(to_delayed_work(work), + struct smc_connection, + tx_work); + struct smc_sock *smc = container_of(conn, struct smc_sock, conn); + + lock_sock(&smc->sk); + smc_tx_pending(conn); release_sock(&smc->sk); } @@ -598,7 +704,8 @@ void smc_tx_consumer_update(struct smc_connection *conn, bool force) if (to_confirm > conn->rmbe_update_limit) { smc_curs_copy(&prod, &conn->local_rx_ctrl.prod, conn); sender_free = conn->rmb_desc->len - - smc_curs_diff(conn->rmb_desc->len, &prod, &cfed); + smc_curs_diff_large(conn->rmb_desc->len, + &cfed, &prod); } if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || @@ -606,15 +713,15 @@ void smc_tx_consumer_update(struct smc_connection *conn, bool force) ((to_confirm > conn->rmbe_update_limit) && ((sender_free <= (conn->rmb_desc->len / 2)) || conn->local_rx_ctrl.prod_flags.write_blocked))) { + if (conn->killed || + conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) + return; if ((smc_cdc_get_slot_and_msg_send(conn) < 0) && - conn->alert_token_local) { /* connection healthy */ - schedule_delayed_work(&conn->tx_work, - SMC_TX_WORK_DELAY); + !conn->killed) { + queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work, + SMC_TX_WORK_DELAY); return; } - smc_curs_copy(&conn->rx_curs_confirmed, - &conn->local_tx_ctrl.cons, conn); - conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; } if (conn->local_rx_ctrl.prod_flags.write_blocked && !atomic_read(&conn->bytes_to_rcv)) |
