summaryrefslogtreecommitdiff
path: root/net/smc/smc_rx.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/smc/smc_rx.c')
-rw-r--r--net/smc/smc_rx.c43
1 files changed, 24 insertions, 19 deletions
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index 9a2f3638d161..e7f1134453ef 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -42,10 +42,10 @@ static void smc_rx_wake_up(struct sock *sk)
if (skwq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
EPOLLRDNORM | EPOLLRDBAND);
- sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
+ sk_wake_async_rcu(sk, SOCK_WAKE_WAITD, POLL_IN);
if ((sk->sk_shutdown == SHUTDOWN_MASK) ||
(sk->sk_state == SMC_CLOSED))
- sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP);
+ sk_wake_async_rcu(sk, SOCK_WAKE_WAITD, POLL_HUP);
rcu_read_unlock();
}
@@ -197,7 +197,7 @@ static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
partial[i].offset = offset;
partial[i].len = size;
partial[i].private = (unsigned long)priv[i];
- buf += size / sizeof(*buf);
+ buf += size;
left -= size;
offset = 0;
}
@@ -238,22 +238,23 @@ out:
return -ENOMEM;
}
-static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
+static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn, size_t peeked)
{
- return atomic_read(&conn->bytes_to_rcv) &&
+ return smc_rx_data_available(conn, peeked) &&
!atomic_read(&conn->splice_pending);
}
/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
* @smc smc socket
* @timeo pointer to max seconds to wait, pointer to value 0 for no timeout
+ * @peeked number of bytes already peeked
* @fcrit add'l criterion to evaluate as function pointer
* Returns:
* 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
* 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
*/
-int smc_rx_wait(struct smc_sock *smc, long *timeo,
- int (*fcrit)(struct smc_connection *conn))
+int smc_rx_wait(struct smc_sock *smc, long *timeo, size_t peeked,
+ int (*fcrit)(struct smc_connection *conn, size_t baseline))
{
DEFINE_WAIT_FUNC(wait, woken_wake_function);
struct smc_connection *conn = &smc->conn;
@@ -262,7 +263,7 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
struct sock *sk = &smc->sk;
int rc;
- if (fcrit(conn))
+ if (fcrit(conn, peeked))
return 1;
sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
add_wait_queue(sk_sleep(sk), &wait);
@@ -271,7 +272,7 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
cflags->peer_conn_abort ||
READ_ONCE(sk->sk_shutdown) & RCV_SHUTDOWN ||
conn->killed ||
- fcrit(conn),
+ fcrit(conn, peeked),
&wait);
remove_wait_queue(sk_sleep(sk), &wait);
sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
@@ -322,11 +323,11 @@ static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
return -EAGAIN;
}
-static bool smc_rx_recvmsg_data_available(struct smc_sock *smc)
+static bool smc_rx_recvmsg_data_available(struct smc_sock *smc, size_t peeked)
{
struct smc_connection *conn = &smc->conn;
- if (smc_rx_data_available(conn))
+ if (smc_rx_data_available(conn, peeked))
return true;
else if (conn->urg_state == SMC_URG_VALID)
/* we received a single urgent Byte - skip */
@@ -344,10 +345,10 @@ static bool smc_rx_recvmsg_data_available(struct smc_sock *smc)
int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
struct pipe_inode_info *pipe, size_t len, int flags)
{
- size_t copylen, read_done = 0, read_remaining = len;
+ size_t copylen, read_done = 0, read_remaining = len, peeked_bytes = 0;
size_t chunk_len, chunk_off, chunk_len_sum;
struct smc_connection *conn = &smc->conn;
- int (*func)(struct smc_connection *conn);
+ int (*func)(struct smc_connection *conn, size_t baseline);
union smc_host_cursor cons;
int readable, chunk;
char *rcvbuf_base;
@@ -384,14 +385,14 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
if (conn->killed)
break;
- if (smc_rx_recvmsg_data_available(smc))
+ if (smc_rx_recvmsg_data_available(smc, peeked_bytes))
goto copy;
if (sk->sk_shutdown & RCV_SHUTDOWN) {
/* smc_cdc_msg_recv_action() could have run after
* above smc_rx_recvmsg_data_available()
*/
- if (smc_rx_recvmsg_data_available(smc))
+ if (smc_rx_recvmsg_data_available(smc, peeked_bytes))
goto copy;
break;
}
@@ -425,26 +426,28 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
}
}
- if (!smc_rx_data_available(conn)) {
- smc_rx_wait(smc, &timeo, smc_rx_data_available);
+ if (!smc_rx_data_available(conn, peeked_bytes)) {
+ smc_rx_wait(smc, &timeo, peeked_bytes, smc_rx_data_available);
continue;
}
copy:
/* initialize variables for 1st iteration of subsequent loop */
/* could be just 1 byte, even after waiting on data above */
- readable = atomic_read(&conn->bytes_to_rcv);
+ readable = smc_rx_data_available(conn, peeked_bytes);
splbytes = atomic_read(&conn->splice_pending);
if (!readable || (msg && splbytes)) {
if (splbytes)
func = smc_rx_data_available_and_no_splice_pend;
else
func = smc_rx_data_available;
- smc_rx_wait(smc, &timeo, func);
+ smc_rx_wait(smc, &timeo, peeked_bytes, func);
continue;
}
smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
+ if ((flags & MSG_PEEK) && peeked_bytes)
+ smc_curs_add(conn->rmb_desc->len, &cons, peeked_bytes);
/* subsequent splice() calls pick up where previous left */
if (splbytes)
smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
@@ -480,6 +483,8 @@ copy:
}
read_remaining -= chunk_len;
read_done += chunk_len;
+ if (flags & MSG_PEEK)
+ peeked_bytes += chunk_len;
if (chunk_len_sum == copylen)
break; /* either on 1st or 2nd iteration */