diff options
Diffstat (limited to 'net/smc/smc_rx.c')
| -rw-r--r-- | net/smc/smc_rx.c | 192 |
1 files changed, 139 insertions, 53 deletions
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c index bbcf0fe4ae10..e7f1134453ef 100644 --- a/net/smc/smc_rx.c +++ b/net/smc/smc_rx.c @@ -13,14 +13,18 @@ #include <linux/net.h> #include <linux/rcupdate.h> #include <linux/sched/signal.h> +#include <linux/splice.h> #include <net/sock.h> +#include <trace/events/sock.h> #include "smc.h" #include "smc_core.h" #include "smc_cdc.h" #include "smc_tx.h" /* smc_tx_consumer_update() */ #include "smc_rx.h" +#include "smc_stats.h" +#include "smc_tracepoint.h" /* callback implementation to wakeup consumers blocked with smc_rx_wait(). * indirectly called by smc_cdc_msg_recv_action(). @@ -29,6 +33,8 @@ static void smc_rx_wake_up(struct sock *sk) { struct socket_wq *wq; + trace_sk_data_ready(sk); + /* derived from sock_def_readable() */ /* called already in smc_listen_work() */ rcu_read_lock(); @@ -36,10 +42,10 @@ static void smc_rx_wake_up(struct sock *sk) if (skwq_has_sleeper(wq)) wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI | EPOLLRDNORM | EPOLLRDBAND); - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); + sk_wake_async_rcu(sk, SOCK_WAKE_WAITD, POLL_IN); if ((sk->sk_shutdown == SHUTDOWN_MASK) || (sk->sk_state == SMC_CLOSED)) - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); + sk_wake_async_rcu(sk, SOCK_WAKE_WAITD, POLL_HUP); rcu_read_unlock(); } @@ -129,17 +135,8 @@ out: sock_put(sk); } -static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe, - struct pipe_buffer *buf) -{ - return 1; -} - static const struct pipe_buf_operations smc_pipe_ops = { - .can_merge = 0, - .confirm = generic_pipe_buf_confirm, .release = smc_rx_pipe_buf_release, - .steal = smc_rx_pipe_buf_nosteal, .get = generic_pipe_buf_get }; @@ -152,68 +149,130 @@ static void smc_rx_spd_release(struct splice_pipe_desc *spd, static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len, struct smc_sock *smc) { + struct smc_link_group *lgr = smc->conn.lgr; + int offset = offset_in_page(src); + struct partial_page *partial; struct splice_pipe_desc spd; - struct partial_page partial; - struct smc_spd_priv *priv; - int bytes; + struct smc_spd_priv **priv; + struct page **pages; + int bytes, nr_pages; + int i; - priv = kzalloc(sizeof(*priv), GFP_KERNEL); + nr_pages = !lgr->is_smcd && smc->conn.rmb_desc->is_vm ? + PAGE_ALIGN(len + offset) / PAGE_SIZE : 1; + + pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); + if (!pages) + goto out; + partial = kcalloc(nr_pages, sizeof(*partial), GFP_KERNEL); + if (!partial) + goto out_page; + priv = kcalloc(nr_pages, sizeof(*priv), GFP_KERNEL); if (!priv) - return -ENOMEM; - priv->len = len; - priv->smc = smc; - partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr; - partial.len = len; - partial.private = (unsigned long)priv; - - spd.nr_pages_max = 1; - spd.nr_pages = 1; - spd.pages = &smc->conn.rmb_desc->pages; - spd.partial = &partial; + goto out_part; + for (i = 0; i < nr_pages; i++) { + priv[i] = kzalloc(sizeof(**priv), GFP_KERNEL); + if (!priv[i]) + goto out_priv; + } + + if (lgr->is_smcd || + (!lgr->is_smcd && !smc->conn.rmb_desc->is_vm)) { + /* smcd or smcr that uses physically contiguous RMBs */ + priv[0]->len = len; + priv[0]->smc = smc; + partial[0].offset = src - (char *)smc->conn.rmb_desc->cpu_addr; + partial[0].len = len; + partial[0].private = (unsigned long)priv[0]; + pages[0] = smc->conn.rmb_desc->pages; + } else { + int size, left = len; + void *buf = src; + /* smcr that uses virtually contiguous RMBs*/ + for (i = 0; i < nr_pages; i++) { + size = min_t(int, PAGE_SIZE - offset, left); + priv[i]->len = size; + priv[i]->smc = smc; + pages[i] = vmalloc_to_page(buf); + partial[i].offset = offset; + partial[i].len = size; + partial[i].private = (unsigned long)priv[i]; + buf += size; + left -= size; + offset = 0; + } + } + spd.nr_pages_max = nr_pages; + spd.nr_pages = nr_pages; + spd.pages = pages; + spd.partial = partial; spd.ops = &smc_pipe_ops; spd.spd_release = smc_rx_spd_release; bytes = splice_to_pipe(pipe, &spd); if (bytes > 0) { sock_hold(&smc->sk); - get_page(smc->conn.rmb_desc->pages); + if (!lgr->is_smcd && smc->conn.rmb_desc->is_vm) { + for (i = 0; i < PAGE_ALIGN(bytes + offset) / PAGE_SIZE; i++) + get_page(pages[i]); + } else { + get_page(smc->conn.rmb_desc->pages); + } atomic_add(bytes, &smc->conn.splice_pending); } + kfree(priv); + kfree(partial); + kfree(pages); return bytes; + +out_priv: + for (i = (i - 1); i >= 0; i--) + kfree(priv[i]); + kfree(priv); +out_part: + kfree(partial); +out_page: + kfree(pages); +out: + return -ENOMEM; } -static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn) +static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn, size_t peeked) { - return atomic_read(&conn->bytes_to_rcv) && + return smc_rx_data_available(conn, peeked) && !atomic_read(&conn->splice_pending); } /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted * @smc smc socket * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout + * @peeked number of bytes already peeked * @fcrit add'l criterion to evaluate as function pointer * Returns: * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted). */ -int smc_rx_wait(struct smc_sock *smc, long *timeo, - int (*fcrit)(struct smc_connection *conn)) +int smc_rx_wait(struct smc_sock *smc, long *timeo, size_t peeked, + int (*fcrit)(struct smc_connection *conn, size_t baseline)) { DEFINE_WAIT_FUNC(wait, woken_wake_function); struct smc_connection *conn = &smc->conn; + struct smc_cdc_conn_state_flags *cflags = + &conn->local_tx_ctrl.conn_state_flags; struct sock *sk = &smc->sk; int rc; - if (fcrit(conn)) + if (fcrit(conn, peeked)) return 1; sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); add_wait_queue(sk_sleep(sk), &wait); rc = sk_wait_event(sk, timeo, - sk->sk_err || - sk->sk_shutdown & RCV_SHUTDOWN || - fcrit(conn) || - smc_cdc_rxed_any_close_or_senddone(conn), + READ_ONCE(sk->sk_err) || + cflags->peer_conn_abort || + READ_ONCE(sk->sk_shutdown) & RCV_SHUTDOWN || + conn->killed || + fcrit(conn, peeked), &wait); remove_wait_queue(sk_sleep(sk), &wait); sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); @@ -233,6 +292,7 @@ static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, conn->urg_state == SMC_URG_READ) return -EINVAL; + SMC_STAT_INC(smc, urg_data_cnt); if (conn->urg_state == SMC_URG_VALID) { if (!(flags & MSG_PEEK)) smc->conn.urg_state = SMC_URG_READ; @@ -263,6 +323,18 @@ static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, return -EAGAIN; } +static bool smc_rx_recvmsg_data_available(struct smc_sock *smc, size_t peeked) +{ + struct smc_connection *conn = &smc->conn; + + if (smc_rx_data_available(conn, peeked)) + return true; + else if (conn->urg_state == SMC_URG_VALID) + /* we received a single urgent Byte - skip */ + smc_rx_update_cons(smc, 0); + return false; +} + /* smc_rx_recvmsg - receive data from RMBE * @msg: copy data to receive buffer * @pipe: copy data to pipe if set - indicates splice() call @@ -273,10 +345,10 @@ static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, struct pipe_inode_info *pipe, size_t len, int flags) { - size_t copylen, read_done = 0, read_remaining = len; + size_t copylen, read_done = 0, read_remaining = len, peeked_bytes = 0; size_t chunk_len, chunk_off, chunk_len_sum; struct smc_connection *conn = &smc->conn; - int (*func)(struct smc_connection *conn); + int (*func)(struct smc_connection *conn, size_t baseline); union smc_host_cursor cons; int readable, chunk; char *rcvbuf_base; @@ -297,6 +369,12 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); + readable = atomic_read(&conn->bytes_to_rcv); + if (readable >= conn->rmb_desc->len) + SMC_STAT_RMB_RX_FULL(smc, !conn->lnk); + + if (len < readable) + SMC_STAT_RMB_RX_SIZE_SMALL(smc, !conn->lnk); /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr; @@ -304,16 +382,20 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, if (read_done >= target || (pipe && read_done)) break; - if (atomic_read(&conn->bytes_to_rcv)) + if (conn->killed) + break; + + if (smc_rx_recvmsg_data_available(smc, peeked_bytes)) goto copy; - else if (conn->urg_state == SMC_URG_VALID) - /* we received a single urgent Byte - skip */ - smc_rx_update_cons(smc, 0); - if (sk->sk_shutdown & RCV_SHUTDOWN || - smc_cdc_rxed_any_close_or_senddone(conn) || - conn->local_tx_ctrl.conn_state_flags.peer_conn_abort) + if (sk->sk_shutdown & RCV_SHUTDOWN) { + /* smc_cdc_msg_recv_action() could have run after + * above smc_rx_recvmsg_data_available() + */ + if (smc_rx_recvmsg_data_available(smc, peeked_bytes)) + goto copy; break; + } if (read_done) { if (sk->sk_err || @@ -336,34 +418,36 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, } break; } + if (!timeo) + return -EAGAIN; if (signal_pending(current)) { read_done = sock_intr_errno(timeo); break; } - if (!timeo) - return -EAGAIN; } - if (!smc_rx_data_available(conn)) { - smc_rx_wait(smc, &timeo, smc_rx_data_available); + if (!smc_rx_data_available(conn, peeked_bytes)) { + smc_rx_wait(smc, &timeo, peeked_bytes, smc_rx_data_available); continue; } copy: /* initialize variables for 1st iteration of subsequent loop */ /* could be just 1 byte, even after waiting on data above */ - readable = atomic_read(&conn->bytes_to_rcv); + readable = smc_rx_data_available(conn, peeked_bytes); splbytes = atomic_read(&conn->splice_pending); if (!readable || (msg && splbytes)) { if (splbytes) func = smc_rx_data_available_and_no_splice_pend; else func = smc_rx_data_available; - smc_rx_wait(smc, &timeo, func); + smc_rx_wait(smc, &timeo, peeked_bytes, func); continue; } smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); + if ((flags & MSG_PEEK) && peeked_bytes) + smc_curs_add(conn->rmb_desc->len, &cons, peeked_bytes); /* subsequent splice() calls pick up where previous left */ if (splbytes) smc_curs_add(conn->rmb_desc->len, &cons, splbytes); @@ -394,12 +478,13 @@ copy: if (rc < 0) { if (!read_done) read_done = -EFAULT; - smc_rmb_sync_sg_for_device(conn); goto out; } } read_remaining -= chunk_len; read_done += chunk_len; + if (flags & MSG_PEEK) + peeked_bytes += chunk_len; if (chunk_len_sum == copylen) break; /* either on 1st or 2nd iteration */ @@ -408,7 +493,6 @@ copy: chunk_len_sum += chunk_len; chunk_off = 0; /* modulo offset in recv ring buffer */ } - smc_rmb_sync_sg_for_device(conn); /* update cursors */ if (!(flags & MSG_PEEK)) { @@ -420,6 +504,8 @@ copy: if (msg && smc_rx_update_consumer(smc, cons, copylen)) goto out; } + + trace_smc_rx_recvmsg(smc, copylen); } while (read_remaining); out: return read_done; |
