diff options
Diffstat (limited to 'fs/ocfs2/cluster/tcp.c')
| -rw-r--r-- | fs/ocfs2/cluster/tcp.c | 431 |
1 files changed, 218 insertions, 213 deletions
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index d644dc611425..79b281e32f4c 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -1,33 +1,17 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * - * vim: noexpandtab sw=8 ts=8 sts=0: +// SPDX-License-Identifier: GPL-2.0-or-later +/* * * Copyright (C) 2004 Oracle. All rights reserved. * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. - * * ---- * - * Callers for this were originally written against a very simple synchronus + * Callers for this were originally written against a very simple synchronous * API. This implementation reflects those simple callers. Some day I'm sure * we'll need to move to a more robust posting/callback mechanism. * * Transmit calls pass in kernel virtual addresses and block copying this into * the socket's tx buffers via a usual blocking sendmsg. They'll block waiting - * for a failed socket to timeout. TX callers can also pass in a poniter to an + * for a failed socket to timeout. TX callers can also pass in a pointer to an * 'int' which gets filled with an errno off the wire in response to the * message they send. * @@ -54,6 +38,7 @@ */ #include <linux/kernel.h> +#include <linux/sched/mm.h> #include <linux/jiffies.h> #include <linux/slab.h> #include <linux/idr.h> @@ -61,8 +46,9 @@ #include <linux/net.h> #include <linux/export.h> #include <net/tcp.h> +#include <trace/events/sock.h> -#include <asm/uaccess.h> +#include <linux/uaccess.h> #include "heartbeat.h" #include "tcp.h" @@ -97,7 +83,7 @@ typeof(sc) __sc = (sc); \ mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p " \ "pg_off %zu] " fmt, __sc, \ - atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock, \ + kref_read(&__sc->sc_kref), __sc->sc_sock, \ __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off , \ ##args); \ } while (0) @@ -108,14 +94,14 @@ static struct rb_root o2net_handler_tree = RB_ROOT; static struct o2net_node o2net_nodes[O2NM_MAX_NODES]; /* XXX someday we'll need better accounting */ -static struct socket *o2net_listen_sock = NULL; +static struct socket *o2net_listen_sock; /* * listen work is only queued by the listening socket callbacks on the * o2net_wq. teardown detaches the callbacks before destroying the workqueue. * quorum work is queued as sock containers are shutdown.. stop_listening * tears down all the node's sock containers, preventing future shutdowns - * and queued quroum work, before canceling delayed quorum work and + * and queued quorum work, before canceling delayed quorum work and * destroying the work queue. */ static struct workqueue_struct *o2net_wq; @@ -137,9 +123,9 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] = static void o2net_sc_connect_completed(struct work_struct *work); static void o2net_rx_until_empty(struct work_struct *work); static void o2net_shutdown_sc(struct work_struct *work); -static void o2net_listen_data_ready(struct sock *sk, int bytes); +static void o2net_listen_data_ready(struct sock *sk); static void o2net_sc_send_keep_req(struct work_struct *work); -static void o2net_idle_timer(unsigned long data); +static void o2net_idle_timer(struct timer_list *t); static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc); @@ -262,17 +248,17 @@ static void o2net_update_recv_stats(struct o2net_sock_container *sc) #endif /* CONFIG_OCFS2_FS_STATS */ -static inline int o2net_reconnect_delay(void) +static inline unsigned int o2net_reconnect_delay(void) { return o2nm_single_cluster->cl_reconnect_delay_ms; } -static inline int o2net_keepalive_delay(void) +static inline unsigned int o2net_keepalive_delay(void) { return o2nm_single_cluster->cl_keepalive_delay_ms; } -static inline int o2net_idle_timeout(void) +static inline unsigned int o2net_idle_timeout(void) { return o2nm_single_cluster->cl_idle_timeout_ms; } @@ -449,9 +435,7 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node) INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc); INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req); - init_timer(&sc->sc_idle_timeout); - sc->sc_idle_timeout.function = o2net_idle_timer; - sc->sc_idle_timeout.data = (unsigned long)sc; + timer_setup(&sc->sc_idle_timeout, o2net_idle_timer, 0); sclog(sc, "alloced\n"); @@ -536,15 +520,16 @@ static void o2net_set_nn_state(struct o2net_node *nn, if (nn->nn_persistent_error || nn->nn_sc_valid) wake_up(&nn->nn_sc_wq); - if (!was_err && nn->nn_persistent_error) { + if (was_valid && !was_err && nn->nn_persistent_error) { o2quo_conn_err(o2net_num_from_nn(nn)); queue_delayed_work(o2net_wq, &nn->nn_still_up, msecs_to_jiffies(O2NET_QUORUM_DELAY_MS)); } if (was_valid && !valid) { - printk(KERN_NOTICE "o2net: No longer connected to " - SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc)); + if (old_sc) + printk(KERN_NOTICE "o2net: No longer connected to " + SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc)); o2net_complete_nodes_nsw(nn); } @@ -596,13 +581,16 @@ static void o2net_set_nn_state(struct o2net_node *nn, } /* see o2net_register_callbacks() */ -static void o2net_data_ready(struct sock *sk, int bytes) +static void o2net_data_ready(struct sock *sk) { - void (*ready)(struct sock *sk, int bytes); + void (*ready)(struct sock *sk); + struct o2net_sock_container *sc; - read_lock(&sk->sk_callback_lock); - if (sk->sk_user_data) { - struct o2net_sock_container *sc = sk->sk_user_data; + trace_sk_data_ready(sk); + + read_lock_bh(&sk->sk_callback_lock); + sc = sk->sk_user_data; + if (sc) { sclog(sc, "data_ready hit\n"); o2net_set_data_ready_time(sc); o2net_sc_queue_work(sc, &sc->sc_rx_work); @@ -610,9 +598,9 @@ static void o2net_data_ready(struct sock *sk, int bytes) } else { ready = sk->sk_data_ready; } - read_unlock(&sk->sk_callback_lock); + read_unlock_bh(&sk->sk_callback_lock); - ready(sk, bytes); + ready(sk); } /* see o2net_register_callbacks() */ @@ -621,7 +609,7 @@ static void o2net_state_change(struct sock *sk) void (*state_change)(struct sock *sk); struct o2net_sock_container *sc; - read_lock(&sk->sk_callback_lock); + read_lock_bh(&sk->sk_callback_lock); sc = sk->sk_user_data; if (sc == NULL) { state_change = sk->sk_state_change; @@ -648,7 +636,7 @@ static void o2net_state_change(struct sock *sk) break; } out: - read_unlock(&sk->sk_callback_lock); + read_unlock_bh(&sk->sk_callback_lock); state_change(sk); } @@ -736,7 +724,7 @@ static void o2net_shutdown_sc(struct work_struct *work) if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) { /* we shouldn't flush as we're in the thread, the * races with pending sc work structs are harmless */ - del_timer_sync(&sc->sc_idle_timeout); + timer_delete_sync(&sc->sc_idle_timeout); o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); sc_put(sc); kernel_sock_shutdown(sc->sc_sock, SHUT_RDWR); @@ -765,32 +753,32 @@ static struct o2net_msg_handler * o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p, struct rb_node **ret_parent) { - struct rb_node **p = &o2net_handler_tree.rb_node; - struct rb_node *parent = NULL; + struct rb_node **p = &o2net_handler_tree.rb_node; + struct rb_node *parent = NULL; struct o2net_msg_handler *nmh, *ret = NULL; int cmp; - while (*p) { - parent = *p; - nmh = rb_entry(parent, struct o2net_msg_handler, nh_node); + while (*p) { + parent = *p; + nmh = rb_entry(parent, struct o2net_msg_handler, nh_node); cmp = o2net_handler_cmp(nmh, msg_type, key); - if (cmp < 0) - p = &(*p)->rb_left; - else if (cmp > 0) - p = &(*p)->rb_right; - else { + if (cmp < 0) + p = &(*p)->rb_left; + else if (cmp > 0) + p = &(*p)->rb_right; + else { ret = nmh; - break; + break; } - } + } - if (ret_p != NULL) - *ret_p = p; - if (ret_parent != NULL) - *ret_parent = parent; + if (ret_p != NULL) + *ret_p = p; + if (ret_parent != NULL) + *ret_parent = parent; - return ret; + return ret; } static void o2net_handler_kref_release(struct kref *kref) @@ -871,8 +859,6 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, "for type %u key %08x\n", msg_type, key); } write_unlock(&o2net_handler_lock); - if (ret) - goto out; out: if (ret) @@ -915,74 +901,51 @@ static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key) static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len) { - int ret; - mm_segment_t oldfs; - struct kvec vec = { - .iov_len = len, - .iov_base = data, - }; - struct msghdr msg = { - .msg_iovlen = 1, - .msg_iov = (struct iovec *)&vec, - .msg_flags = MSG_DONTWAIT, - }; - - oldfs = get_fs(); - set_fs(get_ds()); - ret = sock_recvmsg(sock, &msg, len, msg.msg_flags); - set_fs(oldfs); - - return ret; + struct kvec vec = { .iov_len = len, .iov_base = data, }; + struct msghdr msg = { .msg_flags = MSG_DONTWAIT, }; + iov_iter_kvec(&msg.msg_iter, ITER_DEST, &vec, 1, len); + return sock_recvmsg(sock, &msg, MSG_DONTWAIT); } static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec, size_t veclen, size_t total) { int ret; - mm_segment_t oldfs; - struct msghdr msg = { - .msg_iov = (struct iovec *)vec, - .msg_iovlen = veclen, - }; + struct msghdr msg = {.msg_flags = 0,}; if (sock == NULL) { ret = -EINVAL; goto out; } - oldfs = get_fs(); - set_fs(get_ds()); - ret = sock_sendmsg(sock, &msg, total); - set_fs(oldfs); - if (ret != total) { - mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, - total); - if (ret >= 0) - ret = -EPIPE; /* should be smarter, I bet */ - goto out; - } - - ret = 0; + ret = kernel_sendmsg(sock, &msg, vec, veclen, total); + if (likely(ret == total)) + return 0; + mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, total); + if (ret >= 0) + ret = -EPIPE; /* should be smarter, I bet */ out: - if (ret < 0) - mlog(0, "returning error: %d\n", ret); + mlog(0, "returning error: %d\n", ret); return ret; } static void o2net_sendpage(struct o2net_sock_container *sc, - void *kmalloced_virt, - size_t size) + void *virt, size_t size) { struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); + struct msghdr msg = {}; + struct bio_vec bv; ssize_t ret; + bvec_set_virt(&bv, virt, size); + iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bv, 1, size); + while (1) { + msg.msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES; mutex_lock(&sc->sc_send_lock); - ret = sc->sc_sock->ops->sendpage(sc->sc_sock, - virt_to_page(kmalloced_virt), - (long)kmalloced_virt & ~PAGE_MASK, - size, MSG_DONTWAIT); + ret = sock_sendmsg(sc->sc_sock, &msg); mutex_unlock(&sc->sc_send_lock); + if (ret == size) break; if (ret == (ssize_t)-EAGAIN) { @@ -1033,16 +996,15 @@ static int o2net_tx_can_proceed(struct o2net_node *nn, } /* Get a map of all nodes to which this node is currently connected to */ -void o2net_fill_node_map(unsigned long *map, unsigned bytes) +void o2net_fill_node_map(unsigned long *map, unsigned int bits) { struct o2net_sock_container *sc; int node, ret; - BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); - - memset(map, 0, bytes); + bitmap_zero(map, bits); for (node = 0; node < O2NM_MAX_NODES; ++node) { - o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret); + if (!o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret)) + continue; if (!ret) { set_bit(node, map); sc_put(sc); @@ -1102,7 +1064,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec, o2net_set_nst_sock_container(&nst, sc); veclen = caller_veclen + 1; - vec = kmalloc(sizeof(struct kvec) * veclen, GFP_ATOMIC); + vec = kmalloc_array(veclen, sizeof(struct kvec), GFP_ATOMIC); if (vec == NULL) { mlog(0, "failed to %zu element kvec!\n", veclen); ret = -ENOMEM; @@ -1238,7 +1200,6 @@ static int o2net_process_message(struct o2net_sock_container *sc, msglog(hdr, "bad magic\n"); ret = -EINVAL; goto out; - break; } /* find a handler for it */ @@ -1458,7 +1419,7 @@ out: return ret; } -/* this work func is triggerd by data ready. it reads until it can read no +/* this work func is triggered by data ready. it reads until it can read no * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing * our work the work struct will be marked and we'll be called again. */ static void o2net_rx_until_empty(struct work_struct *work) @@ -1481,31 +1442,6 @@ static void o2net_rx_until_empty(struct work_struct *work) sc_put(sc); } -static int o2net_set_nodelay(struct socket *sock) -{ - int ret, val = 1; - mm_segment_t oldfs; - - oldfs = get_fs(); - set_fs(KERNEL_DS); - - /* - * Dear unsuspecting programmer, - * - * Don't use sock_setsockopt() for SOL_TCP. It doesn't check its level - * argument and assumes SOL_SOCKET so, say, your TCP_NODELAY will - * silently turn into SO_DEBUG. - * - * Yours, - * Keeper of hilariously fragile interfaces. - */ - ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY, - (char __user *)&val, sizeof(val)); - - set_fs(oldfs); - return ret; -} - static void o2net_initialize_handshake(void) { o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32( @@ -1547,12 +1483,13 @@ static void o2net_sc_send_keep_req(struct work_struct *work) sc_put(sc); } -/* socket shutdown does a del_timer_sync against this as it tears down. +/* socket shutdown does a timer_delete_sync against this as it tears down. * we can't start this timer until we've got to the point in sc buildup * where shutdown is going to be involved */ -static void o2net_idle_timer(unsigned long data) +static void o2net_idle_timer(struct timer_list *t) { - struct o2net_sock_container *sc = (struct o2net_sock_container *)data; + struct o2net_sock_container *sc = timer_container_of(sc, t, + sc_idle_timeout); struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); #ifdef CONFIG_DEBUG_FS unsigned long msecs = ktime_to_ms(ktime_get()) - @@ -1562,16 +1499,20 @@ static void o2net_idle_timer(unsigned long data) #endif printk(KERN_NOTICE "o2net: Connection to " SC_NODEF_FMT " has been " - "idle for %lu.%lu secs, shutting it down.\n", SC_NODEF_ARGS(sc), - msecs / 1000, msecs % 1000); + "idle for %lu.%lu secs.\n", + SC_NODEF_ARGS(sc), msecs / 1000, msecs % 1000); - /* - * Initialize the nn_timeout so that the next connection attempt - * will continue in o2net_start_connect. + /* idle timerout happen, don't shutdown the connection, but + * make fence decision. Maybe the connection can recover before + * the decision is made. */ atomic_set(&nn->nn_timeout, 1); + o2quo_conn_err(o2net_num_from_nn(nn)); + queue_delayed_work(o2net_wq, &nn->nn_still_up, + msecs_to_jiffies(O2NET_QUORUM_DELAY_MS)); + + o2net_sc_reset_idle_timer(sc); - o2net_sc_queue_work(sc, &sc->sc_shutdown_work); } static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc) @@ -1586,6 +1527,15 @@ static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc) static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) { + struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); + + /* clear fence decision since the connection recover from timeout*/ + if (atomic_read(&nn->nn_timeout)) { + o2quo_conn_up(o2net_num_from_nn(nn)); + cancel_delayed_work(&nn->nn_still_up); + atomic_set(&nn->nn_timeout, 0); + } + /* Only push out an existing timer */ if (timer_pending(&sc->sc_idle_timeout)) o2net_sc_reset_idle_timer(sc); @@ -1606,23 +1556,25 @@ static void o2net_start_connect(struct work_struct *work) struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; int ret = 0, stop; unsigned int timeout; + unsigned int nofs_flag; + /* + * sock_create allocates the sock with GFP_KERNEL. We must + * prevent the filesystem from being reentered by memory reclaim. + */ + nofs_flag = memalloc_nofs_save(); /* if we're greater we initiate tx, otherwise we accept */ if (o2nm_this_node() <= o2net_num_from_nn(nn)) goto out; /* watch for racing with tearing a node down */ node = o2nm_get_node_by_num(o2net_num_from_nn(nn)); - if (node == NULL) { - ret = 0; + if (node == NULL) goto out; - } mynode = o2nm_get_node_by_num(o2nm_this_node()); - if (mynode == NULL) { - ret = 0; + if (mynode == NULL) goto out; - } spin_lock(&nn->nn_lock); /* @@ -1657,12 +1609,13 @@ static void o2net_start_connect(struct work_struct *work) sc->sc_sock = sock; /* freed by sc_kref_release */ sock->sk->sk_allocation = GFP_ATOMIC; + sock->sk->sk_use_task_frag = false; myaddr.sin_family = AF_INET; myaddr.sin_addr.s_addr = mynode->nd_ipv4_address; myaddr.sin_port = htons(0); /* any port */ - ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr, + ret = sock->ops->bind(sock, (struct sockaddr_unsized *)&myaddr, sizeof(myaddr)); if (ret) { mlog(ML_ERROR, "bind failed with %d at address %pI4\n", @@ -1670,11 +1623,8 @@ static void o2net_start_connect(struct work_struct *work) goto out; } - ret = o2net_set_nodelay(sc->sc_sock); - if (ret) { - mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret); - goto out; - } + tcp_sock_set_nodelay(sc->sc_sock->sk); + tcp_sock_set_user_timeout(sock->sk, O2NET_TCP_USER_TIMEOUT); o2net_register_callbacks(sc->sc_sock->sk, sc); @@ -1688,20 +1638,19 @@ static void o2net_start_connect(struct work_struct *work) remoteaddr.sin_port = node->nd_ipv4_port; ret = sc->sc_sock->ops->connect(sc->sc_sock, - (struct sockaddr *)&remoteaddr, + (struct sockaddr_unsized *)&remoteaddr, sizeof(remoteaddr), O_NONBLOCK); if (ret == -EINPROGRESS) ret = 0; out: - if (ret) { + if (ret && sc) { printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT " failed with errno %d\n", SC_NODEF_ARGS(sc), ret); /* 0 err so that another will be queued and attempted * from set_nn_state */ - if (sc) - o2net_ensure_shutdown(nn, sc, 0); + o2net_ensure_shutdown(nn, sc, 0); } if (sc) sc_put(sc); @@ -1710,6 +1659,7 @@ out: if (mynode) o2nm_node_put(mynode); + memalloc_nofs_restore(nofs_flag); return; } @@ -1721,12 +1671,13 @@ static void o2net_connect_expired(struct work_struct *work) spin_lock(&nn->nn_lock); if (!nn->nn_sc_valid) { printk(KERN_NOTICE "o2net: No connection established with " - "node %u after %u.%u seconds, giving up.\n", + "node %u after %u.%u seconds, check network and" + " cluster configuration.\n", o2net_num_from_nn(nn), o2net_idle_timeout() / 1000, o2net_idle_timeout() % 1000); - o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); + o2net_set_nn_state(nn, NULL, 0, 0); } spin_unlock(&nn->nn_lock); } @@ -1787,7 +1738,7 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, (msecs_to_jiffies(o2net_reconnect_delay()) + 1); if (node_num != o2nm_this_node()) { - /* believe it or not, accept and node hearbeating testing + /* believe it or not, accept and node heartbeating testing * can succeed for this node before we got here.. so * only use set_nn_state to clear the persistent error * if that hasn't already happened */ @@ -1826,17 +1777,28 @@ int o2net_register_hb_callbacks(void) /* ------------------------------------------------------------ */ -static int o2net_accept_one(struct socket *sock) +static int o2net_accept_one(struct socket *sock, int *more) { - int ret, slen; + int ret; struct sockaddr_in sin; struct socket *new_sock = NULL; struct o2nm_node *node = NULL; struct o2nm_node *local_node = NULL; struct o2net_sock_container *sc = NULL; + struct proto_accept_arg arg = { + .flags = O_NONBLOCK, + }; struct o2net_node *nn; + unsigned int nofs_flag; + + /* + * sock_create_lite allocates the sock with GFP_KERNEL. We must + * prevent the filesystem from being reentered by memory reclaim. + */ + nofs_flag = memalloc_nofs_save(); BUG_ON(sock == NULL); + *more = 0; ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type, sock->sk->sk_protocol, &new_sock); if (ret) @@ -1844,21 +1806,17 @@ static int o2net_accept_one(struct socket *sock) new_sock->type = sock->type; new_sock->ops = sock->ops; - ret = sock->ops->accept(sock, new_sock, O_NONBLOCK); + ret = sock->ops->accept(sock, new_sock, &arg); if (ret < 0) goto out; + *more = 1; new_sock->sk->sk_allocation = GFP_ATOMIC; - ret = o2net_set_nodelay(new_sock); - if (ret) { - mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret); - goto out; - } + tcp_sock_set_nodelay(new_sock->sk); + tcp_sock_set_user_timeout(new_sock->sk, O2NET_TCP_USER_TIMEOUT); - slen = sizeof(sin); - ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin, - &slen, 1); + ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin, 1); if (ret < 0) goto out; @@ -1873,12 +1831,16 @@ static int o2net_accept_one(struct socket *sock) if (o2nm_this_node() >= node->nd_num) { local_node = o2nm_get_node_by_num(o2nm_this_node()); - printk(KERN_NOTICE "o2net: Unexpected connect attempt seen " - "at node '%s' (%u, %pI4:%d) from node '%s' (%u, " - "%pI4:%d)\n", local_node->nd_name, local_node->nd_num, - &(local_node->nd_ipv4_address), - ntohs(local_node->nd_ipv4_port), node->nd_name, - node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port)); + if (local_node) + printk(KERN_NOTICE "o2net: Unexpected connect attempt " + "seen at node '%s' (%u, %pI4:%d) from " + "node '%s' (%u, %pI4:%d)\n", + local_node->nd_name, local_node->nd_num, + &(local_node->nd_ipv4_address), + ntohs(local_node->nd_ipv4_port), + node->nd_name, + node->nd_num, &sin.sin_addr.s_addr, + ntohs(sin.sin_port)); ret = -EINVAL; goto out; } @@ -1939,39 +1901,78 @@ out: o2nm_node_put(local_node); if (sc) sc_put(sc); + + memalloc_nofs_restore(nofs_flag); return ret; } +/* + * This function is invoked in response to one or more + * pending accepts at softIRQ level. We must drain the + * entire que before returning. + */ + static void o2net_accept_many(struct work_struct *work) { struct socket *sock = o2net_listen_sock; - while (o2net_accept_one(sock) == 0) + int more; + + /* + * It is critical to note that due to interrupt moderation + * at the network driver level, we can't assume to get a + * softIRQ for every single conn since tcp SYN packets + * can arrive back-to-back, and therefore many pending + * accepts may result in just 1 softIRQ. If we terminate + * the o2net_accept_one() loop upon seeing an err, what happens + * to the rest of the conns in the queue? If no new SYN + * arrives for hours, no softIRQ will be delivered, + * and the connections will just sit in the queue. + */ + + for (;;) { + o2net_accept_one(sock, &more); + if (!more) + break; cond_resched(); + } } -static void o2net_listen_data_ready(struct sock *sk, int bytes) +static void o2net_listen_data_ready(struct sock *sk) { - void (*ready)(struct sock *sk, int bytes); + void (*ready)(struct sock *sk); + + trace_sk_data_ready(sk); - read_lock(&sk->sk_callback_lock); + read_lock_bh(&sk->sk_callback_lock); ready = sk->sk_user_data; if (ready == NULL) { /* check for teardown race */ ready = sk->sk_data_ready; goto out; } - /* ->sk_data_ready is also called for a newly established child socket - * before it has been accepted and the acceptor has set up their - * data_ready.. we only want to queue listen work for our listening - * socket */ + /* This callback may called twice when a new connection + * is being established as a child socket inherits everything + * from a parent LISTEN socket, including the data_ready cb of + * the parent. This leads to a hazard. In o2net_accept_one() + * we are still initializing the child socket but have not + * changed the inherited data_ready callback yet when + * data starts arriving. + * We avoid this hazard by checking the state. + * For the listening socket, the state will be TCP_LISTEN; for the new + * socket, will be TCP_ESTABLISHED. Also, in this case, + * sk->sk_user_data is not a valid function pointer. + */ + if (sk->sk_state == TCP_LISTEN) { - mlog(ML_TCP, "bytes: %d\n", bytes); queue_work(o2net_wq, &o2net_listen_work); + } else { + ready = NULL; } out: - read_unlock(&sk->sk_callback_lock); - ready(sk, bytes); + read_unlock_bh(&sk->sk_callback_lock); + if (ready != NULL) + ready(sk); } static int o2net_open_listening_sock(__be32 addr, __be16 port) @@ -2001,7 +2002,7 @@ static int o2net_open_listening_sock(__be32 addr, __be16 port) INIT_WORK(&o2net_listen_work, o2net_accept_many); sock->sk->sk_reuse = SK_CAN_REUSE; - ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); + ret = sock->ops->bind(sock, (struct sockaddr_unsized *)&sin, sizeof(sin)); if (ret < 0) { printk(KERN_ERR "o2net: Error %d while binding socket at " "%pI4:%u\n", ret, &addr, ntohs(port)); @@ -2037,7 +2038,7 @@ int o2net_start_listening(struct o2nm_node *node) BUG_ON(o2net_listen_sock != NULL); mlog(ML_KTHREAD, "starting o2net thread...\n"); - o2net_wq = create_singlethread_workqueue("o2net"); + o2net_wq = alloc_ordered_workqueue("o2net", WQ_MEM_RECLAIM); if (o2net_wq == NULL) { mlog(ML_ERROR, "unable to launch o2net thread\n"); return -ENOMEM; /* ? */ @@ -2093,22 +2094,23 @@ void o2net_stop_listening(struct o2nm_node *node) int o2net_init(void) { + struct folio *folio; + void *p; unsigned long i; o2quo_init(); + o2net_debugfs_init(); - if (o2net_debugfs_init()) - return -ENOMEM; + folio = folio_alloc(GFP_KERNEL | __GFP_ZERO, 0); + if (!folio) + goto out; - o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL); - o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL); - o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL); - if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) { - kfree(o2net_hand); - kfree(o2net_keep_req); - kfree(o2net_keep_resp); - return -ENOMEM; - } + p = folio_address(folio); + o2net_hand = p; + p += sizeof(struct o2net_handshake); + o2net_keep_req = p; + p += sizeof(struct o2net_msg); + o2net_keep_resp = p; o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION); o2net_hand->connector_id = cpu_to_be64(1); @@ -2133,13 +2135,16 @@ int o2net_init(void) } return 0; + +out: + o2net_debugfs_exit(); + o2quo_exit(); + return -ENOMEM; } void o2net_exit(void) { o2quo_exit(); - kfree(o2net_hand); - kfree(o2net_keep_req); - kfree(o2net_keep_resp); o2net_debugfs_exit(); + folio_put(virt_to_folio(o2net_hand)); } |
