summaryrefslogtreecommitdiff
path: root/fs/ocfs2/cluster/tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/cluster/tcp.c')
-rw-r--r--fs/ocfs2/cluster/tcp.c431
1 files changed, 218 insertions, 213 deletions
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index d644dc611425..79b281e32f4c 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -1,33 +1,17 @@
-/* -*- mode: c; c-basic-offset: 8; -*-
- *
- * vim: noexpandtab sw=8 ts=8 sts=0:
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
*
* Copyright (C) 2004 Oracle. All rights reserved.
*
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public
- * License along with this program; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 021110-1307, USA.
- *
* ----
*
- * Callers for this were originally written against a very simple synchronus
+ * Callers for this were originally written against a very simple synchronous
* API. This implementation reflects those simple callers. Some day I'm sure
* we'll need to move to a more robust posting/callback mechanism.
*
* Transmit calls pass in kernel virtual addresses and block copying this into
* the socket's tx buffers via a usual blocking sendmsg. They'll block waiting
- * for a failed socket to timeout. TX callers can also pass in a poniter to an
+ * for a failed socket to timeout. TX callers can also pass in a pointer to an
* 'int' which gets filled with an errno off the wire in response to the
* message they send.
*
@@ -54,6 +38,7 @@
*/
#include <linux/kernel.h>
+#include <linux/sched/mm.h>
#include <linux/jiffies.h>
#include <linux/slab.h>
#include <linux/idr.h>
@@ -61,8 +46,9 @@
#include <linux/net.h>
#include <linux/export.h>
#include <net/tcp.h>
+#include <trace/events/sock.h>
-#include <asm/uaccess.h>
+#include <linux/uaccess.h>
#include "heartbeat.h"
#include "tcp.h"
@@ -97,7 +83,7 @@
typeof(sc) __sc = (sc); \
mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p " \
"pg_off %zu] " fmt, __sc, \
- atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock, \
+ kref_read(&__sc->sc_kref), __sc->sc_sock, \
__sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off , \
##args); \
} while (0)
@@ -108,14 +94,14 @@ static struct rb_root o2net_handler_tree = RB_ROOT;
static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
/* XXX someday we'll need better accounting */
-static struct socket *o2net_listen_sock = NULL;
+static struct socket *o2net_listen_sock;
/*
* listen work is only queued by the listening socket callbacks on the
* o2net_wq. teardown detaches the callbacks before destroying the workqueue.
* quorum work is queued as sock containers are shutdown.. stop_listening
* tears down all the node's sock containers, preventing future shutdowns
- * and queued quroum work, before canceling delayed quorum work and
+ * and queued quorum work, before canceling delayed quorum work and
* destroying the work queue.
*/
static struct workqueue_struct *o2net_wq;
@@ -137,9 +123,9 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] =
static void o2net_sc_connect_completed(struct work_struct *work);
static void o2net_rx_until_empty(struct work_struct *work);
static void o2net_shutdown_sc(struct work_struct *work);
-static void o2net_listen_data_ready(struct sock *sk, int bytes);
+static void o2net_listen_data_ready(struct sock *sk);
static void o2net_sc_send_keep_req(struct work_struct *work);
-static void o2net_idle_timer(unsigned long data);
+static void o2net_idle_timer(struct timer_list *t);
static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
@@ -262,17 +248,17 @@ static void o2net_update_recv_stats(struct o2net_sock_container *sc)
#endif /* CONFIG_OCFS2_FS_STATS */
-static inline int o2net_reconnect_delay(void)
+static inline unsigned int o2net_reconnect_delay(void)
{
return o2nm_single_cluster->cl_reconnect_delay_ms;
}
-static inline int o2net_keepalive_delay(void)
+static inline unsigned int o2net_keepalive_delay(void)
{
return o2nm_single_cluster->cl_keepalive_delay_ms;
}
-static inline int o2net_idle_timeout(void)
+static inline unsigned int o2net_idle_timeout(void)
{
return o2nm_single_cluster->cl_idle_timeout_ms;
}
@@ -449,9 +435,7 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc);
INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req);
- init_timer(&sc->sc_idle_timeout);
- sc->sc_idle_timeout.function = o2net_idle_timer;
- sc->sc_idle_timeout.data = (unsigned long)sc;
+ timer_setup(&sc->sc_idle_timeout, o2net_idle_timer, 0);
sclog(sc, "alloced\n");
@@ -536,15 +520,16 @@ static void o2net_set_nn_state(struct o2net_node *nn,
if (nn->nn_persistent_error || nn->nn_sc_valid)
wake_up(&nn->nn_sc_wq);
- if (!was_err && nn->nn_persistent_error) {
+ if (was_valid && !was_err && nn->nn_persistent_error) {
o2quo_conn_err(o2net_num_from_nn(nn));
queue_delayed_work(o2net_wq, &nn->nn_still_up,
msecs_to_jiffies(O2NET_QUORUM_DELAY_MS));
}
if (was_valid && !valid) {
- printk(KERN_NOTICE "o2net: No longer connected to "
- SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc));
+ if (old_sc)
+ printk(KERN_NOTICE "o2net: No longer connected to "
+ SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc));
o2net_complete_nodes_nsw(nn);
}
@@ -596,13 +581,16 @@ static void o2net_set_nn_state(struct o2net_node *nn,
}
/* see o2net_register_callbacks() */
-static void o2net_data_ready(struct sock *sk, int bytes)
+static void o2net_data_ready(struct sock *sk)
{
- void (*ready)(struct sock *sk, int bytes);
+ void (*ready)(struct sock *sk);
+ struct o2net_sock_container *sc;
- read_lock(&sk->sk_callback_lock);
- if (sk->sk_user_data) {
- struct o2net_sock_container *sc = sk->sk_user_data;
+ trace_sk_data_ready(sk);
+
+ read_lock_bh(&sk->sk_callback_lock);
+ sc = sk->sk_user_data;
+ if (sc) {
sclog(sc, "data_ready hit\n");
o2net_set_data_ready_time(sc);
o2net_sc_queue_work(sc, &sc->sc_rx_work);
@@ -610,9 +598,9 @@ static void o2net_data_ready(struct sock *sk, int bytes)
} else {
ready = sk->sk_data_ready;
}
- read_unlock(&sk->sk_callback_lock);
+ read_unlock_bh(&sk->sk_callback_lock);
- ready(sk, bytes);
+ ready(sk);
}
/* see o2net_register_callbacks() */
@@ -621,7 +609,7 @@ static void o2net_state_change(struct sock *sk)
void (*state_change)(struct sock *sk);
struct o2net_sock_container *sc;
- read_lock(&sk->sk_callback_lock);
+ read_lock_bh(&sk->sk_callback_lock);
sc = sk->sk_user_data;
if (sc == NULL) {
state_change = sk->sk_state_change;
@@ -648,7 +636,7 @@ static void o2net_state_change(struct sock *sk)
break;
}
out:
- read_unlock(&sk->sk_callback_lock);
+ read_unlock_bh(&sk->sk_callback_lock);
state_change(sk);
}
@@ -736,7 +724,7 @@ static void o2net_shutdown_sc(struct work_struct *work)
if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) {
/* we shouldn't flush as we're in the thread, the
* races with pending sc work structs are harmless */
- del_timer_sync(&sc->sc_idle_timeout);
+ timer_delete_sync(&sc->sc_idle_timeout);
o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
sc_put(sc);
kernel_sock_shutdown(sc->sc_sock, SHUT_RDWR);
@@ -765,32 +753,32 @@ static struct o2net_msg_handler *
o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p,
struct rb_node **ret_parent)
{
- struct rb_node **p = &o2net_handler_tree.rb_node;
- struct rb_node *parent = NULL;
+ struct rb_node **p = &o2net_handler_tree.rb_node;
+ struct rb_node *parent = NULL;
struct o2net_msg_handler *nmh, *ret = NULL;
int cmp;
- while (*p) {
- parent = *p;
- nmh = rb_entry(parent, struct o2net_msg_handler, nh_node);
+ while (*p) {
+ parent = *p;
+ nmh = rb_entry(parent, struct o2net_msg_handler, nh_node);
cmp = o2net_handler_cmp(nmh, msg_type, key);
- if (cmp < 0)
- p = &(*p)->rb_left;
- else if (cmp > 0)
- p = &(*p)->rb_right;
- else {
+ if (cmp < 0)
+ p = &(*p)->rb_left;
+ else if (cmp > 0)
+ p = &(*p)->rb_right;
+ else {
ret = nmh;
- break;
+ break;
}
- }
+ }
- if (ret_p != NULL)
- *ret_p = p;
- if (ret_parent != NULL)
- *ret_parent = parent;
+ if (ret_p != NULL)
+ *ret_p = p;
+ if (ret_parent != NULL)
+ *ret_parent = parent;
- return ret;
+ return ret;
}
static void o2net_handler_kref_release(struct kref *kref)
@@ -871,8 +859,6 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
"for type %u key %08x\n", msg_type, key);
}
write_unlock(&o2net_handler_lock);
- if (ret)
- goto out;
out:
if (ret)
@@ -915,74 +901,51 @@ static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key)
static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len)
{
- int ret;
- mm_segment_t oldfs;
- struct kvec vec = {
- .iov_len = len,
- .iov_base = data,
- };
- struct msghdr msg = {
- .msg_iovlen = 1,
- .msg_iov = (struct iovec *)&vec,
- .msg_flags = MSG_DONTWAIT,
- };
-
- oldfs = get_fs();
- set_fs(get_ds());
- ret = sock_recvmsg(sock, &msg, len, msg.msg_flags);
- set_fs(oldfs);
-
- return ret;
+ struct kvec vec = { .iov_len = len, .iov_base = data, };
+ struct msghdr msg = { .msg_flags = MSG_DONTWAIT, };
+ iov_iter_kvec(&msg.msg_iter, ITER_DEST, &vec, 1, len);
+ return sock_recvmsg(sock, &msg, MSG_DONTWAIT);
}
static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec,
size_t veclen, size_t total)
{
int ret;
- mm_segment_t oldfs;
- struct msghdr msg = {
- .msg_iov = (struct iovec *)vec,
- .msg_iovlen = veclen,
- };
+ struct msghdr msg = {.msg_flags = 0,};
if (sock == NULL) {
ret = -EINVAL;
goto out;
}
- oldfs = get_fs();
- set_fs(get_ds());
- ret = sock_sendmsg(sock, &msg, total);
- set_fs(oldfs);
- if (ret != total) {
- mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret,
- total);
- if (ret >= 0)
- ret = -EPIPE; /* should be smarter, I bet */
- goto out;
- }
-
- ret = 0;
+ ret = kernel_sendmsg(sock, &msg, vec, veclen, total);
+ if (likely(ret == total))
+ return 0;
+ mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, total);
+ if (ret >= 0)
+ ret = -EPIPE; /* should be smarter, I bet */
out:
- if (ret < 0)
- mlog(0, "returning error: %d\n", ret);
+ mlog(0, "returning error: %d\n", ret);
return ret;
}
static void o2net_sendpage(struct o2net_sock_container *sc,
- void *kmalloced_virt,
- size_t size)
+ void *virt, size_t size)
{
struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+ struct msghdr msg = {};
+ struct bio_vec bv;
ssize_t ret;
+ bvec_set_virt(&bv, virt, size);
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bv, 1, size);
+
while (1) {
+ msg.msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES;
mutex_lock(&sc->sc_send_lock);
- ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
- virt_to_page(kmalloced_virt),
- (long)kmalloced_virt & ~PAGE_MASK,
- size, MSG_DONTWAIT);
+ ret = sock_sendmsg(sc->sc_sock, &msg);
mutex_unlock(&sc->sc_send_lock);
+
if (ret == size)
break;
if (ret == (ssize_t)-EAGAIN) {
@@ -1033,16 +996,15 @@ static int o2net_tx_can_proceed(struct o2net_node *nn,
}
/* Get a map of all nodes to which this node is currently connected to */
-void o2net_fill_node_map(unsigned long *map, unsigned bytes)
+void o2net_fill_node_map(unsigned long *map, unsigned int bits)
{
struct o2net_sock_container *sc;
int node, ret;
- BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
-
- memset(map, 0, bytes);
+ bitmap_zero(map, bits);
for (node = 0; node < O2NM_MAX_NODES; ++node) {
- o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret);
+ if (!o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret))
+ continue;
if (!ret) {
set_bit(node, map);
sc_put(sc);
@@ -1102,7 +1064,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
o2net_set_nst_sock_container(&nst, sc);
veclen = caller_veclen + 1;
- vec = kmalloc(sizeof(struct kvec) * veclen, GFP_ATOMIC);
+ vec = kmalloc_array(veclen, sizeof(struct kvec), GFP_ATOMIC);
if (vec == NULL) {
mlog(0, "failed to %zu element kvec!\n", veclen);
ret = -ENOMEM;
@@ -1238,7 +1200,6 @@ static int o2net_process_message(struct o2net_sock_container *sc,
msglog(hdr, "bad magic\n");
ret = -EINVAL;
goto out;
- break;
}
/* find a handler for it */
@@ -1458,7 +1419,7 @@ out:
return ret;
}
-/* this work func is triggerd by data ready. it reads until it can read no
+/* this work func is triggered by data ready. it reads until it can read no
* more. it interprets 0, eof, as fatal. if data_ready hits while we're doing
* our work the work struct will be marked and we'll be called again. */
static void o2net_rx_until_empty(struct work_struct *work)
@@ -1481,31 +1442,6 @@ static void o2net_rx_until_empty(struct work_struct *work)
sc_put(sc);
}
-static int o2net_set_nodelay(struct socket *sock)
-{
- int ret, val = 1;
- mm_segment_t oldfs;
-
- oldfs = get_fs();
- set_fs(KERNEL_DS);
-
- /*
- * Dear unsuspecting programmer,
- *
- * Don't use sock_setsockopt() for SOL_TCP. It doesn't check its level
- * argument and assumes SOL_SOCKET so, say, your TCP_NODELAY will
- * silently turn into SO_DEBUG.
- *
- * Yours,
- * Keeper of hilariously fragile interfaces.
- */
- ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY,
- (char __user *)&val, sizeof(val));
-
- set_fs(oldfs);
- return ret;
-}
-
static void o2net_initialize_handshake(void)
{
o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
@@ -1547,12 +1483,13 @@ static void o2net_sc_send_keep_req(struct work_struct *work)
sc_put(sc);
}
-/* socket shutdown does a del_timer_sync against this as it tears down.
+/* socket shutdown does a timer_delete_sync against this as it tears down.
* we can't start this timer until we've got to the point in sc buildup
* where shutdown is going to be involved */
-static void o2net_idle_timer(unsigned long data)
+static void o2net_idle_timer(struct timer_list *t)
{
- struct o2net_sock_container *sc = (struct o2net_sock_container *)data;
+ struct o2net_sock_container *sc = timer_container_of(sc, t,
+ sc_idle_timeout);
struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
#ifdef CONFIG_DEBUG_FS
unsigned long msecs = ktime_to_ms(ktime_get()) -
@@ -1562,16 +1499,20 @@ static void o2net_idle_timer(unsigned long data)
#endif
printk(KERN_NOTICE "o2net: Connection to " SC_NODEF_FMT " has been "
- "idle for %lu.%lu secs, shutting it down.\n", SC_NODEF_ARGS(sc),
- msecs / 1000, msecs % 1000);
+ "idle for %lu.%lu secs.\n",
+ SC_NODEF_ARGS(sc), msecs / 1000, msecs % 1000);
- /*
- * Initialize the nn_timeout so that the next connection attempt
- * will continue in o2net_start_connect.
+ /* idle timerout happen, don't shutdown the connection, but
+ * make fence decision. Maybe the connection can recover before
+ * the decision is made.
*/
atomic_set(&nn->nn_timeout, 1);
+ o2quo_conn_err(o2net_num_from_nn(nn));
+ queue_delayed_work(o2net_wq, &nn->nn_still_up,
+ msecs_to_jiffies(O2NET_QUORUM_DELAY_MS));
+
+ o2net_sc_reset_idle_timer(sc);
- o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
}
static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
@@ -1586,6 +1527,15 @@ static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
{
+ struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+
+ /* clear fence decision since the connection recover from timeout*/
+ if (atomic_read(&nn->nn_timeout)) {
+ o2quo_conn_up(o2net_num_from_nn(nn));
+ cancel_delayed_work(&nn->nn_still_up);
+ atomic_set(&nn->nn_timeout, 0);
+ }
+
/* Only push out an existing timer */
if (timer_pending(&sc->sc_idle_timeout))
o2net_sc_reset_idle_timer(sc);
@@ -1606,23 +1556,25 @@ static void o2net_start_connect(struct work_struct *work)
struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
int ret = 0, stop;
unsigned int timeout;
+ unsigned int nofs_flag;
+ /*
+ * sock_create allocates the sock with GFP_KERNEL. We must
+ * prevent the filesystem from being reentered by memory reclaim.
+ */
+ nofs_flag = memalloc_nofs_save();
/* if we're greater we initiate tx, otherwise we accept */
if (o2nm_this_node() <= o2net_num_from_nn(nn))
goto out;
/* watch for racing with tearing a node down */
node = o2nm_get_node_by_num(o2net_num_from_nn(nn));
- if (node == NULL) {
- ret = 0;
+ if (node == NULL)
goto out;
- }
mynode = o2nm_get_node_by_num(o2nm_this_node());
- if (mynode == NULL) {
- ret = 0;
+ if (mynode == NULL)
goto out;
- }
spin_lock(&nn->nn_lock);
/*
@@ -1657,12 +1609,13 @@ static void o2net_start_connect(struct work_struct *work)
sc->sc_sock = sock; /* freed by sc_kref_release */
sock->sk->sk_allocation = GFP_ATOMIC;
+ sock->sk->sk_use_task_frag = false;
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = mynode->nd_ipv4_address;
myaddr.sin_port = htons(0); /* any port */
- ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
+ ret = sock->ops->bind(sock, (struct sockaddr_unsized *)&myaddr,
sizeof(myaddr));
if (ret) {
mlog(ML_ERROR, "bind failed with %d at address %pI4\n",
@@ -1670,11 +1623,8 @@ static void o2net_start_connect(struct work_struct *work)
goto out;
}
- ret = o2net_set_nodelay(sc->sc_sock);
- if (ret) {
- mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);
- goto out;
- }
+ tcp_sock_set_nodelay(sc->sc_sock->sk);
+ tcp_sock_set_user_timeout(sock->sk, O2NET_TCP_USER_TIMEOUT);
o2net_register_callbacks(sc->sc_sock->sk, sc);
@@ -1688,20 +1638,19 @@ static void o2net_start_connect(struct work_struct *work)
remoteaddr.sin_port = node->nd_ipv4_port;
ret = sc->sc_sock->ops->connect(sc->sc_sock,
- (struct sockaddr *)&remoteaddr,
+ (struct sockaddr_unsized *)&remoteaddr,
sizeof(remoteaddr),
O_NONBLOCK);
if (ret == -EINPROGRESS)
ret = 0;
out:
- if (ret) {
+ if (ret && sc) {
printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT
" failed with errno %d\n", SC_NODEF_ARGS(sc), ret);
/* 0 err so that another will be queued and attempted
* from set_nn_state */
- if (sc)
- o2net_ensure_shutdown(nn, sc, 0);
+ o2net_ensure_shutdown(nn, sc, 0);
}
if (sc)
sc_put(sc);
@@ -1710,6 +1659,7 @@ out:
if (mynode)
o2nm_node_put(mynode);
+ memalloc_nofs_restore(nofs_flag);
return;
}
@@ -1721,12 +1671,13 @@ static void o2net_connect_expired(struct work_struct *work)
spin_lock(&nn->nn_lock);
if (!nn->nn_sc_valid) {
printk(KERN_NOTICE "o2net: No connection established with "
- "node %u after %u.%u seconds, giving up.\n",
+ "node %u after %u.%u seconds, check network and"
+ " cluster configuration.\n",
o2net_num_from_nn(nn),
o2net_idle_timeout() / 1000,
o2net_idle_timeout() % 1000);
- o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
+ o2net_set_nn_state(nn, NULL, 0, 0);
}
spin_unlock(&nn->nn_lock);
}
@@ -1787,7 +1738,7 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
(msecs_to_jiffies(o2net_reconnect_delay()) + 1);
if (node_num != o2nm_this_node()) {
- /* believe it or not, accept and node hearbeating testing
+ /* believe it or not, accept and node heartbeating testing
* can succeed for this node before we got here.. so
* only use set_nn_state to clear the persistent error
* if that hasn't already happened */
@@ -1826,17 +1777,28 @@ int o2net_register_hb_callbacks(void)
/* ------------------------------------------------------------ */
-static int o2net_accept_one(struct socket *sock)
+static int o2net_accept_one(struct socket *sock, int *more)
{
- int ret, slen;
+ int ret;
struct sockaddr_in sin;
struct socket *new_sock = NULL;
struct o2nm_node *node = NULL;
struct o2nm_node *local_node = NULL;
struct o2net_sock_container *sc = NULL;
+ struct proto_accept_arg arg = {
+ .flags = O_NONBLOCK,
+ };
struct o2net_node *nn;
+ unsigned int nofs_flag;
+
+ /*
+ * sock_create_lite allocates the sock with GFP_KERNEL. We must
+ * prevent the filesystem from being reentered by memory reclaim.
+ */
+ nofs_flag = memalloc_nofs_save();
BUG_ON(sock == NULL);
+ *more = 0;
ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
sock->sk->sk_protocol, &new_sock);
if (ret)
@@ -1844,21 +1806,17 @@ static int o2net_accept_one(struct socket *sock)
new_sock->type = sock->type;
new_sock->ops = sock->ops;
- ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
+ ret = sock->ops->accept(sock, new_sock, &arg);
if (ret < 0)
goto out;
+ *more = 1;
new_sock->sk->sk_allocation = GFP_ATOMIC;
- ret = o2net_set_nodelay(new_sock);
- if (ret) {
- mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);
- goto out;
- }
+ tcp_sock_set_nodelay(new_sock->sk);
+ tcp_sock_set_user_timeout(new_sock->sk, O2NET_TCP_USER_TIMEOUT);
- slen = sizeof(sin);
- ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
- &slen, 1);
+ ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin, 1);
if (ret < 0)
goto out;
@@ -1873,12 +1831,16 @@ static int o2net_accept_one(struct socket *sock)
if (o2nm_this_node() >= node->nd_num) {
local_node = o2nm_get_node_by_num(o2nm_this_node());
- printk(KERN_NOTICE "o2net: Unexpected connect attempt seen "
- "at node '%s' (%u, %pI4:%d) from node '%s' (%u, "
- "%pI4:%d)\n", local_node->nd_name, local_node->nd_num,
- &(local_node->nd_ipv4_address),
- ntohs(local_node->nd_ipv4_port), node->nd_name,
- node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port));
+ if (local_node)
+ printk(KERN_NOTICE "o2net: Unexpected connect attempt "
+ "seen at node '%s' (%u, %pI4:%d) from "
+ "node '%s' (%u, %pI4:%d)\n",
+ local_node->nd_name, local_node->nd_num,
+ &(local_node->nd_ipv4_address),
+ ntohs(local_node->nd_ipv4_port),
+ node->nd_name,
+ node->nd_num, &sin.sin_addr.s_addr,
+ ntohs(sin.sin_port));
ret = -EINVAL;
goto out;
}
@@ -1939,39 +1901,78 @@ out:
o2nm_node_put(local_node);
if (sc)
sc_put(sc);
+
+ memalloc_nofs_restore(nofs_flag);
return ret;
}
+/*
+ * This function is invoked in response to one or more
+ * pending accepts at softIRQ level. We must drain the
+ * entire que before returning.
+ */
+
static void o2net_accept_many(struct work_struct *work)
{
struct socket *sock = o2net_listen_sock;
- while (o2net_accept_one(sock) == 0)
+ int more;
+
+ /*
+ * It is critical to note that due to interrupt moderation
+ * at the network driver level, we can't assume to get a
+ * softIRQ for every single conn since tcp SYN packets
+ * can arrive back-to-back, and therefore many pending
+ * accepts may result in just 1 softIRQ. If we terminate
+ * the o2net_accept_one() loop upon seeing an err, what happens
+ * to the rest of the conns in the queue? If no new SYN
+ * arrives for hours, no softIRQ will be delivered,
+ * and the connections will just sit in the queue.
+ */
+
+ for (;;) {
+ o2net_accept_one(sock, &more);
+ if (!more)
+ break;
cond_resched();
+ }
}
-static void o2net_listen_data_ready(struct sock *sk, int bytes)
+static void o2net_listen_data_ready(struct sock *sk)
{
- void (*ready)(struct sock *sk, int bytes);
+ void (*ready)(struct sock *sk);
+
+ trace_sk_data_ready(sk);
- read_lock(&sk->sk_callback_lock);
+ read_lock_bh(&sk->sk_callback_lock);
ready = sk->sk_user_data;
if (ready == NULL) { /* check for teardown race */
ready = sk->sk_data_ready;
goto out;
}
- /* ->sk_data_ready is also called for a newly established child socket
- * before it has been accepted and the acceptor has set up their
- * data_ready.. we only want to queue listen work for our listening
- * socket */
+ /* This callback may called twice when a new connection
+ * is being established as a child socket inherits everything
+ * from a parent LISTEN socket, including the data_ready cb of
+ * the parent. This leads to a hazard. In o2net_accept_one()
+ * we are still initializing the child socket but have not
+ * changed the inherited data_ready callback yet when
+ * data starts arriving.
+ * We avoid this hazard by checking the state.
+ * For the listening socket, the state will be TCP_LISTEN; for the new
+ * socket, will be TCP_ESTABLISHED. Also, in this case,
+ * sk->sk_user_data is not a valid function pointer.
+ */
+
if (sk->sk_state == TCP_LISTEN) {
- mlog(ML_TCP, "bytes: %d\n", bytes);
queue_work(o2net_wq, &o2net_listen_work);
+ } else {
+ ready = NULL;
}
out:
- read_unlock(&sk->sk_callback_lock);
- ready(sk, bytes);
+ read_unlock_bh(&sk->sk_callback_lock);
+ if (ready != NULL)
+ ready(sk);
}
static int o2net_open_listening_sock(__be32 addr, __be16 port)
@@ -2001,7 +2002,7 @@ static int o2net_open_listening_sock(__be32 addr, __be16 port)
INIT_WORK(&o2net_listen_work, o2net_accept_many);
sock->sk->sk_reuse = SK_CAN_REUSE;
- ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
+ ret = sock->ops->bind(sock, (struct sockaddr_unsized *)&sin, sizeof(sin));
if (ret < 0) {
printk(KERN_ERR "o2net: Error %d while binding socket at "
"%pI4:%u\n", ret, &addr, ntohs(port));
@@ -2037,7 +2038,7 @@ int o2net_start_listening(struct o2nm_node *node)
BUG_ON(o2net_listen_sock != NULL);
mlog(ML_KTHREAD, "starting o2net thread...\n");
- o2net_wq = create_singlethread_workqueue("o2net");
+ o2net_wq = alloc_ordered_workqueue("o2net", WQ_MEM_RECLAIM);
if (o2net_wq == NULL) {
mlog(ML_ERROR, "unable to launch o2net thread\n");
return -ENOMEM; /* ? */
@@ -2093,22 +2094,23 @@ void o2net_stop_listening(struct o2nm_node *node)
int o2net_init(void)
{
+ struct folio *folio;
+ void *p;
unsigned long i;
o2quo_init();
+ o2net_debugfs_init();
- if (o2net_debugfs_init())
- return -ENOMEM;
+ folio = folio_alloc(GFP_KERNEL | __GFP_ZERO, 0);
+ if (!folio)
+ goto out;
- o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL);
- o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
- o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
- if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
- kfree(o2net_hand);
- kfree(o2net_keep_req);
- kfree(o2net_keep_resp);
- return -ENOMEM;
- }
+ p = folio_address(folio);
+ o2net_hand = p;
+ p += sizeof(struct o2net_handshake);
+ o2net_keep_req = p;
+ p += sizeof(struct o2net_msg);
+ o2net_keep_resp = p;
o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION);
o2net_hand->connector_id = cpu_to_be64(1);
@@ -2133,13 +2135,16 @@ int o2net_init(void)
}
return 0;
+
+out:
+ o2net_debugfs_exit();
+ o2quo_exit();
+ return -ENOMEM;
}
void o2net_exit(void)
{
o2quo_exit();
- kfree(o2net_hand);
- kfree(o2net_keep_req);
- kfree(o2net_keep_resp);
o2net_debugfs_exit();
+ folio_put(virt_to_folio(o2net_hand));
}