diff options
Diffstat (limited to 'fs/dlm/lowcomms.c')
| -rw-r--r-- | fs/dlm/lowcomms.c | 2572 |
1 files changed, 1379 insertions, 1193 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 76976d6e50f9..b3958008ba3f 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1,12 +1,10 @@ +// SPDX-License-Identifier: GPL-2.0-only /****************************************************************************** ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. ** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. ** ******************************************************************************* ******************************************************************************/ @@ -55,78 +53,75 @@ #include <net/sctp/sctp.h> #include <net/ipv6.h> +#include <trace/events/dlm.h> +#include <trace/events/sock.h> + #include "dlm_internal.h" #include "lowcomms.h" #include "midcomms.h" +#include "memory.h" #include "config.h" +#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) +#define DLM_MAX_PROCESS_BUFFERS 24 #define NEEDED_RMEM (4*1024*1024) -#define CONN_HASH_SIZE 32 - -/* Number of messages to send before rescheduling */ -#define MAX_SEND_MSG_COUNT 25 - -struct cbuf { - unsigned int base; - unsigned int len; - unsigned int mask; -}; - -static void cbuf_add(struct cbuf *cb, int n) -{ - cb->len += n; -} - -static int cbuf_data(struct cbuf *cb) -{ - return ((cb->base + cb->len) & cb->mask); -} - -static void cbuf_init(struct cbuf *cb, int size) -{ - cb->base = cb->len = 0; - cb->mask = size-1; -} - -static void cbuf_eat(struct cbuf *cb, int n) -{ - cb->len -= n; - cb->base += n; - cb->base &= cb->mask; -} - -static bool cbuf_empty(struct cbuf *cb) -{ - return cb->len == 0; -} struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ - struct mutex sock_mutex; + /* this semaphore is used to allow parallel recv/send in read + * lock mode. When we release a sock we need to held the write lock. + * + * However this is locking code and not nice. When we remove the + * othercon handling we can look into other mechanism to synchronize + * io handling to call sock_release() at the right time. + */ + struct rw_semaphore sock_lock; unsigned long flags; -#define CF_READ_PENDING 1 -#define CF_WRITE_PENDING 2 -#define CF_INIT_PENDING 4 +#define CF_APP_LIMITED 0 +#define CF_RECV_PENDING 1 +#define CF_SEND_PENDING 2 +#define CF_RECV_INTR 3 +#define CF_IO_STOP 4 #define CF_IS_OTHERCON 5 -#define CF_CLOSE 6 -#define CF_APP_LIMITED 7 -#define CF_CLOSING 8 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; - int (*rx_action) (struct connection *); /* What to do when active */ - void (*connect_action) (struct connection *); /* What to do to connect */ - struct page *rx_page; - struct cbuf cb; int retries; -#define MAX_CONNECT_RETRIES 3 struct hlist_node list; + /* due some connect()/accept() races we currently have this cross over + * connection attempt second connection for one node. + * + * There is a solution to avoid the race by introducing a connect + * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to + * connect. Otherside can connect but will only be considered that + * the other side wants to have a reconnect. + * + * However changing to this behaviour will break backwards compatible. + * In a DLM protocol major version upgrade we should remove this! + */ struct connection *othercon; - struct work_struct rwork; /* Receive workqueue */ - struct work_struct swork; /* Send workqueue */ + struct work_struct rwork; /* receive worker */ + struct work_struct swork; /* send worker */ + wait_queue_head_t shutdown_wait; + unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; + int rx_leftover; + int mark; + int addr_count; + int curr_addr_index; + struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; + spinlock_t addrs_lock; + struct rcu_head rcu; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) +struct listen_connection { + struct socket *sock; + struct work_struct rwork; +}; + +#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) +#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) + /* An entry waiting to be sent */ struct writequeue_entry { struct list_head list; @@ -135,15 +130,43 @@ struct writequeue_entry { int len; int end; int users; + bool dirty; struct connection *con; + struct list_head msgs; + struct kref ref; }; -struct dlm_node_addr { +struct dlm_msg { + struct writequeue_entry *entry; + struct dlm_msg *orig_msg; + bool retransmit; + void *ppc; + int len; + int idx; /* new()/commit() idx exchange */ + struct list_head list; + struct kref ref; +}; + +struct processqueue_entry { + unsigned char *buf; int nodeid; - int addr_count; - int curr_addr_index; - struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; + int buflen; + + struct list_head list; +}; + +struct dlm_proto_ops { + bool try_new_addr; + const char *name; + int proto; + int how; + + void (*sockopts)(struct socket *sock); + int (*bind)(struct socket *sock); + int (*listen_validate)(void); + void (*listen_sockopts)(struct socket *sock); + int (*listen_bind)(struct socket *sock); }; static struct listen_sock_callbacks { @@ -153,122 +176,163 @@ static struct listen_sock_callbacks { void (*sk_write_space)(struct sock *); } listen_sock; -static LIST_HEAD(dlm_node_addrs); -static DEFINE_SPINLOCK(dlm_node_addrs_spin); - -static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; +static struct listen_connection listen_con; +static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; static int dlm_local_count; -static int dlm_allow_conn; /* Work queues */ -static struct workqueue_struct *recv_workqueue; -static struct workqueue_struct *send_workqueue; +static struct workqueue_struct *io_workqueue; +static struct workqueue_struct *process_workqueue; static struct hlist_head connection_hash[CONN_HASH_SIZE]; -static DEFINE_MUTEX(connections_lock); -static struct kmem_cache *con_cache; +static DEFINE_SPINLOCK(connections_lock); +DEFINE_STATIC_SRCU(connections_srcu); + +static const struct dlm_proto_ops *dlm_proto_ops; + +#define DLM_IO_SUCCESS 0 +#define DLM_IO_END 1 +#define DLM_IO_EOF 2 +#define DLM_IO_RESCHED 3 +#define DLM_IO_FLUSH 4 static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); +static void process_dlm_messages(struct work_struct *work); +static DECLARE_WORK(process_work, process_dlm_messages); +static DEFINE_SPINLOCK(processqueue_lock); +static bool process_dlm_messages_pending; +static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq); +static atomic_t processqueue_count; +static LIST_HEAD(processqueue); -/* This is deliberately very simple because most clusters have simple - sequential nodeids, so we should be able to go straight to a connection - struct in the array */ -static inline int nodeid_hash(int nodeid) +bool dlm_lowcomms_is_running(void) { - return nodeid & (CONN_HASH_SIZE-1); + return !!listen_con.sock; } -static struct connection *__find_con(int nodeid) +static void lowcomms_queue_swork(struct connection *con) { - int r; - struct connection *con; + assert_spin_locked(&con->writequeue_lock); - r = nodeid_hash(nodeid); + if (!test_bit(CF_IO_STOP, &con->flags) && + !test_bit(CF_APP_LIMITED, &con->flags) && + !test_and_set_bit(CF_SEND_PENDING, &con->flags)) + queue_work(io_workqueue, &con->swork); +} + +static void lowcomms_queue_rwork(struct connection *con) +{ +#ifdef CONFIG_LOCKDEP + WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); +#endif + + if (!test_bit(CF_IO_STOP, &con->flags) && + !test_and_set_bit(CF_RECV_PENDING, &con->flags)) + queue_work(io_workqueue, &con->rwork); +} - hlist_for_each_entry(con, &connection_hash[r], list) { +static void writequeue_entry_ctor(void *data) +{ + struct writequeue_entry *entry = data; + + INIT_LIST_HEAD(&entry->msgs); +} + +struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) +{ + return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), + 0, 0, writequeue_entry_ctor); +} + +struct kmem_cache *dlm_lowcomms_msg_cache_create(void) +{ + return KMEM_CACHE(dlm_msg, 0); +} + +/* need to held writequeue_lock */ +static struct writequeue_entry *con_next_wq(struct connection *con) +{ + struct writequeue_entry *e; + + e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, + list); + /* if len is zero nothing is to send, if there are users filling + * buffers we wait until the users are done so we can send more. + */ + if (!e || e->users || e->len == 0) + return NULL; + + return e; +} + +static struct connection *__find_con(int nodeid, int r) +{ + struct connection *con; + + hlist_for_each_entry_rcu(con, &connection_hash[r], list) { if (con->nodeid == nodeid) return con; } + return NULL; } +static void dlm_con_init(struct connection *con, int nodeid) +{ + con->nodeid = nodeid; + init_rwsem(&con->sock_lock); + INIT_LIST_HEAD(&con->writequeue); + spin_lock_init(&con->writequeue_lock); + INIT_WORK(&con->swork, process_send_sockets); + INIT_WORK(&con->rwork, process_recv_sockets); + spin_lock_init(&con->addrs_lock); + init_waitqueue_head(&con->shutdown_wait); +} + /* * If 'allocation' is zero then we don't attempt to create a new * connection structure for this node. */ -static struct connection *__nodeid2con(int nodeid, gfp_t alloc) +static struct connection *nodeid2con(int nodeid, gfp_t alloc) { - struct connection *con = NULL; + struct connection *con, *tmp; int r; - con = __find_con(nodeid); + r = nodeid_hash(nodeid); + con = __find_con(nodeid, r); if (con || !alloc) return con; - con = kmem_cache_zalloc(con_cache, alloc); + con = kzalloc(sizeof(*con), alloc); if (!con) return NULL; - r = nodeid_hash(nodeid); - hlist_add_head(&con->list, &connection_hash[r]); - - con->nodeid = nodeid; - mutex_init(&con->sock_mutex); - INIT_LIST_HEAD(&con->writequeue); - spin_lock_init(&con->writequeue_lock); - INIT_WORK(&con->swork, process_send_sockets); - INIT_WORK(&con->rwork, process_recv_sockets); - - /* Setup action pointers for child sockets */ - if (con->nodeid) { - struct connection *zerocon = __find_con(0); - - con->connect_action = zerocon->connect_action; - if (!con->rx_action) - con->rx_action = zerocon->rx_action; - } - - return con; -} - -/* Loop round all connections */ -static void foreach_conn(void (*conn_func)(struct connection *c)) -{ - int i; - struct hlist_node *n; - struct connection *con; + dlm_con_init(con, nodeid); - for (i = 0; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry_safe(con, n, &connection_hash[i], list) - conn_func(con); + spin_lock(&connections_lock); + /* Because multiple workqueues/threads calls this function it can + * race on multiple cpu's. Instead of locking hot path __find_con() + * we just check in rare cases of recently added nodes again + * under protection of connections_lock. If this is the case we + * abort our connection creation and return the existing connection. + */ + tmp = __find_con(nodeid, r); + if (tmp) { + spin_unlock(&connections_lock); + kfree(con); + return tmp; } -} - -static struct connection *nodeid2con(int nodeid, gfp_t allocation) -{ - struct connection *con; - mutex_lock(&connections_lock); - con = __nodeid2con(nodeid, allocation); - mutex_unlock(&connections_lock); + hlist_add_head_rcu(&con->list, &connection_hash[r]); + spin_unlock(&connections_lock); return con; } -static struct dlm_node_addr *find_node_addr(int nodeid) -{ - struct dlm_node_addr *na; - - list_for_each_entry(na, &dlm_node_addrs, list) { - if (na->nodeid == nodeid) - return na; - } - return NULL; -} - -static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) +static int addr_compare(const struct sockaddr_storage *x, + const struct sockaddr_storage *y) { switch (x->ss_family) { case AF_INET: { @@ -296,41 +360,51 @@ static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) } static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, - struct sockaddr *sa_out, bool try_new_addr) + struct sockaddr *sa_out, bool try_new_addr, + unsigned int *mark) { struct sockaddr_storage sas; - struct dlm_node_addr *na; + struct connection *con; + int idx; if (!dlm_local_count) return -1; - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (na && na->addr_count) { - memcpy(&sas, na->addr[na->curr_addr_index], - sizeof(struct sockaddr_storage)); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; + } - if (try_new_addr) { - na->curr_addr_index++; - if (na->curr_addr_index == na->addr_count) - na->curr_addr_index = 0; - } + spin_lock(&con->addrs_lock); + if (!con->addr_count) { + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; } - spin_unlock(&dlm_node_addrs_spin); - if (!na) - return -EEXIST; + memcpy(&sas, &con->addr[con->curr_addr_index], + sizeof(struct sockaddr_storage)); - if (!na->addr_count) - return -ENOENT; + if (try_new_addr) { + con->curr_addr_index++; + if (con->curr_addr_index == con->addr_count) + con->curr_addr_index = 0; + } + + *mark = con->mark; + spin_unlock(&con->addrs_lock); if (sas_out) memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); - if (!sa_out) + if (!sa_out) { + srcu_read_unlock(&connections_srcu, idx); return 0; + } - if (dlm_local_addr[0]->ss_family == AF_INET) { + if (dlm_local_addr[0].ss_family == AF_INET) { struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; ret4->sin_addr.s_addr = in4->sin_addr.s_addr; @@ -340,213 +414,231 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, ret6->sin6_addr = in6->sin6_addr; } + srcu_read_unlock(&connections_srcu, idx); return 0; } -static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) +static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, + unsigned int *mark) { - struct dlm_node_addr *na; - int rv = -EEXIST; - int addr_i; - - spin_lock(&dlm_node_addrs_spin); - list_for_each_entry(na, &dlm_node_addrs, list) { - if (!na->addr_count) - continue; + struct connection *con; + int i, idx, addr_i; - for (addr_i = 0; addr_i < na->addr_count; addr_i++) { - if (addr_compare(na->addr[addr_i], addr)) { - *nodeid = na->nodeid; - rv = 0; - goto unlock; + idx = srcu_read_lock(&connections_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(con, &connection_hash[i], list) { + WARN_ON_ONCE(!con->addr_count); + + spin_lock(&con->addrs_lock); + for (addr_i = 0; addr_i < con->addr_count; addr_i++) { + if (addr_compare(&con->addr[addr_i], addr)) { + *nodeid = con->nodeid; + *mark = con->mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return 0; + } } + spin_unlock(&con->addrs_lock); } } -unlock: - spin_unlock(&dlm_node_addrs_spin); - return rv; + srcu_read_unlock(&connections_srcu, idx); + + return -ENOENT; } -int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +static bool dlm_lowcomms_con_has_addr(const struct connection *con, + const struct sockaddr_storage *addr) { - struct sockaddr_storage *new_addr; - struct dlm_node_addr *new_node, *na; + int i; - new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); - if (!new_node) - return -ENOMEM; + for (i = 0; i < con->addr_count; i++) { + if (addr_compare(&con->addr[i], addr)) + return true; + } + + return false; +} + +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr) +{ + struct connection *con; + bool ret; + int idx; - new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); - if (!new_addr) { - kfree(new_node); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, GFP_NOFS); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); return -ENOMEM; } - memcpy(new_addr, addr, len); - - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (!na) { - new_node->nodeid = nodeid; - new_node->addr[0] = new_addr; - new_node->addr_count = 1; - list_add(&new_node->list, &dlm_node_addrs); - spin_unlock(&dlm_node_addrs_spin); + spin_lock(&con->addrs_lock); + if (!con->addr_count) { + memcpy(&con->addr[0], addr, sizeof(*addr)); + con->addr_count = 1; + con->mark = dlm_config.ci_mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return 0; } - if (na->addr_count >= DLM_MAX_ADDR_COUNT) { - spin_unlock(&dlm_node_addrs_spin); - kfree(new_addr); - kfree(new_node); + ret = dlm_lowcomms_con_has_addr(con, addr); + if (ret) { + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return -EEXIST; + } + + if (con->addr_count >= DLM_MAX_ADDR_COUNT) { + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return -ENOSPC; } - na->addr[na->addr_count++] = new_addr; - spin_unlock(&dlm_node_addrs_spin); - kfree(new_node); + memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); + srcu_read_unlock(&connections_srcu, idx); + spin_unlock(&con->addrs_lock); return 0; } /* Data available on socket or listen socket received a connect */ static void lowcomms_data_ready(struct sock *sk) { - struct connection *con; + struct connection *con = sock2con(sk); + + trace_sk_data_ready(sk); - read_lock_bh(&sk->sk_callback_lock); - con = sock2con(sk); - if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); - read_unlock_bh(&sk->sk_callback_lock); + set_bit(CF_RECV_INTR, &con->flags); + lowcomms_queue_rwork(con); } static void lowcomms_write_space(struct sock *sk) { - struct connection *con; - - read_lock_bh(&sk->sk_callback_lock); - con = sock2con(sk); - if (!con) - goto out; + struct connection *con = sock2con(sk); clear_bit(SOCK_NOSPACE, &con->sock->flags); + spin_lock_bh(&con->writequeue_lock); if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { con->sock->sk->sk_write_pending--; clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); } - queue_work(send_workqueue, &con->swork); -out: - read_unlock_bh(&sk->sk_callback_lock); -} - -static inline void lowcomms_connect_sock(struct connection *con) -{ - if (test_bit(CF_CLOSE, &con->flags)) - return; - queue_work(send_workqueue, &con->swork); - cond_resched(); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); } static void lowcomms_state_change(struct sock *sk) { /* SCTP layer is not calling sk_data_ready when the connection - * is done, so we catch the signal through here. Also, it - * doesn't switch socket state when entering shutdown, so we - * skip the write in that case. + * is done, so we catch the signal through here. */ - if (sk->sk_shutdown) { - if (sk->sk_shutdown == RCV_SHUTDOWN) - lowcomms_data_ready(sk); - } else if (sk->sk_state == TCP_ESTABLISHED) { - lowcomms_write_space(sk); - } + if (sk->sk_shutdown & RCV_SHUTDOWN) + lowcomms_data_ready(sk); +} + +static void lowcomms_listen_data_ready(struct sock *sk) +{ + trace_sk_data_ready(sk); + + queue_work(io_workqueue, &listen_con.rwork); } int dlm_lowcomms_connect_node(int nodeid) { struct connection *con; + int idx; - if (nodeid == dlm_our_nodeid()) - return 0; + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!con)) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; + } - con = nodeid2con(nodeid, GFP_NOFS); - if (!con) - return -ENOMEM; - lowcomms_connect_sock(con); + down_read(&con->sock_lock); + if (!con->sock) { + spin_lock_bh(&con->writequeue_lock); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); + } + up_read(&con->sock_lock); + srcu_read_unlock(&connections_srcu, idx); + + cond_resched(); return 0; } -static void lowcomms_error_report(struct sock *sk) +int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) { struct connection *con; - struct sockaddr_storage saddr; - void (*orig_report)(struct sock *) = NULL; + int idx; - read_lock_bh(&sk->sk_callback_lock); - con = sock2con(sk); - if (con == NULL) - goto out; + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; + } - orig_report = listen_sock.sk_error_report; - if (con->sock == NULL || - kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { - printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d, port %d, " - "sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, dlm_config.ci_tcp_port, - sk->sk_err, sk->sk_err_soft); - } else if (saddr.ss_family == AF_INET) { - struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; + spin_lock(&con->addrs_lock); + con->mark = mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return 0; +} + +static void lowcomms_error_report(struct sock *sk) +{ + struct connection *con = sock2con(sk); + struct inet_sock *inet; + inet = inet_sk(sk); + switch (sk->sk_family) { + case AF_INET: printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d at %pI4, port %d, " + "sending to node %d at %pI4, dport %d, " "sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, &sin4->sin_addr.s_addr, - dlm_config.ci_tcp_port, sk->sk_err, - sk->sk_err_soft); - } else { - struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; - + con->nodeid, &inet->inet_daddr, + ntohs(inet->inet_dport), sk->sk_err, + READ_ONCE(sk->sk_err_soft)); + break; +#if IS_ENABLED(CONFIG_IPV6) + case AF_INET6: + printk_ratelimited(KERN_ERR "dlm: node %d: socket error " + "sending to node %d at %pI6c, " + "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), + con->nodeid, &sk->sk_v6_daddr, + ntohs(inet->inet_dport), sk->sk_err, + READ_ONCE(sk->sk_err_soft)); + break; +#endif + default: printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d at %u.%u.%u.%u, " - "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, sin6->sin6_addr.s6_addr32[0], - sin6->sin6_addr.s6_addr32[1], - sin6->sin6_addr.s6_addr32[2], - sin6->sin6_addr.s6_addr32[3], - dlm_config.ci_tcp_port, sk->sk_err, - sk->sk_err_soft); + "invalid socket family %d set, " + "sk_err=%d/%d\n", dlm_our_nodeid(), + sk->sk_family, sk->sk_err, + READ_ONCE(sk->sk_err_soft)); + break; } -out: - read_unlock_bh(&sk->sk_callback_lock); - if (orig_report) - orig_report(sk); -} -/* Note: sk_callback_lock must be locked before calling this function. */ -static void save_listen_callbacks(struct socket *sock) -{ - struct sock *sk = sock->sk; + dlm_midcomms_unack_msg_resend(con->nodeid); - listen_sock.sk_data_ready = sk->sk_data_ready; - listen_sock.sk_state_change = sk->sk_state_change; - listen_sock.sk_write_space = sk->sk_write_space; - listen_sock.sk_error_report = sk->sk_error_report; + listen_sock.sk_error_report(sk); } -static void restore_callbacks(struct socket *sock) +static void restore_callbacks(struct sock *sk) { - struct sock *sk = sock->sk; +#ifdef CONFIG_LOCKDEP + WARN_ON_ONCE(!lockdep_sock_is_held(sk)); +#endif - write_lock_bh(&sk->sk_callback_lock); sk->sk_user_data = NULL; sk->sk_data_ready = listen_sock.sk_data_ready; sk->sk_state_change = listen_sock.sk_state_change; sk->sk_write_space = listen_sock.sk_write_space; sk->sk_error_report = listen_sock.sk_error_report; - write_unlock_bh(&sk->sk_callback_lock); } /* Make a socket active */ @@ -554,204 +646,351 @@ static void add_sock(struct socket *sock, struct connection *con) { struct sock *sk = sock->sk; - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); con->sock = sock; sk->sk_user_data = con; - /* Install a data_ready callback */ sk->sk_data_ready = lowcomms_data_ready; sk->sk_write_space = lowcomms_write_space; - sk->sk_state_change = lowcomms_state_change; + if (dlm_config.ci_protocol == DLM_PROTO_SCTP) + sk->sk_state_change = lowcomms_state_change; sk->sk_allocation = GFP_NOFS; + sk->sk_use_task_frag = false; sk->sk_error_report = lowcomms_error_report; - write_unlock_bh(&sk->sk_callback_lock); + release_sock(sk); } /* Add the port number to an IPv6 or 4 sockaddr and return the address length */ -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, +static void make_sockaddr(struct sockaddr_storage *saddr, __be16 port, int *addr_len) { - saddr->ss_family = dlm_local_addr[0]->ss_family; + saddr->ss_family = dlm_local_addr[0].ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); + in4_addr->sin_port = port; *addr_len = sizeof(struct sockaddr_in); memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); } else { struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); + in6_addr->sin6_port = port; *addr_len = sizeof(struct sockaddr_in6); } memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); } -/* Close a remote connection and tidy up */ -static void close_connection(struct connection *con, bool and_other, - bool tx, bool rx) +static void dlm_page_release(struct kref *kref) { - bool closing = test_and_set_bit(CF_CLOSING, &con->flags); + struct writequeue_entry *e = container_of(kref, struct writequeue_entry, + ref); - if (tx && !closing && cancel_work_sync(&con->swork)) { - log_print("canceled swork for node %d", con->nodeid); - clear_bit(CF_WRITE_PENDING, &con->flags); - } - if (rx && !closing && cancel_work_sync(&con->rwork)) { - log_print("canceled rwork for node %d", con->nodeid); - clear_bit(CF_READ_PENDING, &con->flags); + __free_page(e->page); + dlm_free_writequeue(e); +} + +static void dlm_msg_release(struct kref *kref) +{ + struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); + + kref_put(&msg->entry->ref, dlm_page_release); + dlm_free_msg(msg); +} + +static void free_entry(struct writequeue_entry *e) +{ + struct dlm_msg *msg, *tmp; + + list_for_each_entry_safe(msg, tmp, &e->msgs, list) { + if (msg->orig_msg) { + msg->orig_msg->retransmit = false; + kref_put(&msg->orig_msg->ref, dlm_msg_release); + } + + list_del(&msg->list); + kref_put(&msg->ref, dlm_msg_release); } - mutex_lock(&con->sock_mutex); + list_del(&e->list); + kref_put(&e->ref, dlm_page_release); +} + +static void dlm_close_sock(struct socket **sock) +{ + lock_sock((*sock)->sk); + restore_callbacks((*sock)->sk); + release_sock((*sock)->sk); + + sock_release(*sock); + *sock = NULL; +} + +static void allow_connection_io(struct connection *con) +{ + if (con->othercon) + clear_bit(CF_IO_STOP, &con->othercon->flags); + clear_bit(CF_IO_STOP, &con->flags); +} + +static void stop_connection_io(struct connection *con) +{ + if (con->othercon) + stop_connection_io(con->othercon); + + spin_lock_bh(&con->writequeue_lock); + set_bit(CF_IO_STOP, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + + down_write(&con->sock_lock); if (con->sock) { - restore_callbacks(con->sock); - sock_release(con->sock); - con->sock = NULL; + lock_sock(con->sock->sk); + restore_callbacks(con->sock->sk); + release_sock(con->sock->sk); } - if (con->othercon && and_other) { - /* Will only re-enter once. */ - close_connection(con->othercon, false, true, true); + up_write(&con->sock_lock); + + cancel_work_sync(&con->swork); + cancel_work_sync(&con->rwork); +} + +/* Close a remote connection and tidy up */ +static void close_connection(struct connection *con, bool and_other) +{ + struct writequeue_entry *e; + + if (con->othercon && and_other) + close_connection(con->othercon, false); + + down_write(&con->sock_lock); + if (!con->sock) { + up_write(&con->sock_lock); + return; } - if (con->rx_page) { - __free_page(con->rx_page); - con->rx_page = NULL; + + dlm_close_sock(&con->sock); + + /* if we send a writequeue entry only a half way, we drop the + * whole entry because reconnection and that we not start of the + * middle of a msg which will confuse the other end. + * + * we can always drop messages because retransmits, but what we + * cannot allow is to transmit half messages which may be processed + * at the other side. + * + * our policy is to start on a clean state when disconnects, we don't + * know what's send/received on transport layer in this case. + */ + spin_lock_bh(&con->writequeue_lock); + if (!list_empty(&con->writequeue)) { + e = list_first_entry(&con->writequeue, struct writequeue_entry, + list); + if (e->dirty) + free_entry(e); } + spin_unlock_bh(&con->writequeue_lock); + con->rx_leftover = 0; con->retries = 0; - mutex_unlock(&con->sock_mutex); - clear_bit(CF_CLOSING, &con->flags); + clear_bit(CF_APP_LIMITED, &con->flags); + clear_bit(CF_RECV_PENDING, &con->flags); + clear_bit(CF_SEND_PENDING, &con->flags); + up_write(&con->sock_lock); } -/* Data received from remote end */ -static int receive_from_sock(struct connection *con) +static void shutdown_connection(struct connection *con, bool and_other) { - int ret = 0; - struct msghdr msg = {}; - struct kvec iov[2]; - unsigned len; - int r; - int call_again_soon = 0; - int nvec; + int ret; - mutex_lock(&con->sock_mutex); + if (con->othercon && and_other) + shutdown_connection(con->othercon, false); - if (con->sock == NULL) { - ret = -EAGAIN; - goto out_close; - } - if (con->nodeid == 0) { - ret = -EINVAL; - goto out_close; + flush_workqueue(io_workqueue); + down_read(&con->sock_lock); + /* nothing to shutdown */ + if (!con->sock) { + up_read(&con->sock_lock); + return; } - if (con->rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - con->rx_page = alloc_page(GFP_ATOMIC); - if (con->rx_page == NULL) - goto out_resched; - cbuf_init(&con->cb, PAGE_SIZE); + ret = kernel_sock_shutdown(con->sock, dlm_proto_ops->how); + up_read(&con->sock_lock); + if (ret) { + log_print("Connection %p failed to shutdown: %d will force close", + con, ret); + goto force_close; + } else { + ret = wait_event_timeout(con->shutdown_wait, !con->sock, + DLM_SHUTDOWN_WAIT_TIMEOUT); + if (ret == 0) { + log_print("Connection %p shutdown timed out, will force close", + con); + goto force_close; + } } - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. - */ - iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); - iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); - iov[1].iov_len = 0; - nvec = 1; - - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (cbuf_data(&con->cb) >= con->cb.base) { - iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb); - iov[1].iov_len = con->cb.base; - iov[1].iov_base = page_address(con->rx_page); - nvec = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len); - - r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); - if (ret <= 0) - goto out_close; - else if (ret == len) - call_again_soon = 1; - - cbuf_add(&con->cb, ret); - ret = dlm_process_incoming_buffer(con->nodeid, - page_address(con->rx_page), - con->cb.base, con->cb.len, - PAGE_SIZE); - if (ret == -EBADMSG) { - log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d", - page_address(con->rx_page), con->cb.base, - con->cb.len, r); - } - if (ret < 0) - goto out_close; - cbuf_eat(&con->cb, ret); + return; + +force_close: + close_connection(con, false); +} + +static struct processqueue_entry *new_processqueue_entry(int nodeid, + int buflen) +{ + struct processqueue_entry *pentry; - if (cbuf_empty(&con->cb) && !call_again_soon) { - __free_page(con->rx_page); - con->rx_page = NULL; + pentry = kmalloc(sizeof(*pentry), GFP_NOFS); + if (!pentry) + return NULL; + + pentry->buf = kmalloc(buflen, GFP_NOFS); + if (!pentry->buf) { + kfree(pentry); + return NULL; } - if (call_again_soon) - goto out_resched; - mutex_unlock(&con->sock_mutex); - return 0; + pentry->nodeid = nodeid; + return pentry; +} -out_resched: - if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); - mutex_unlock(&con->sock_mutex); - return -EAGAIN; +static void free_processqueue_entry(struct processqueue_entry *pentry) +{ + kfree(pentry->buf); + kfree(pentry); +} -out_close: - mutex_unlock(&con->sock_mutex); - if (ret != -EAGAIN) { - close_connection(con, true, true, false); - /* Reconnect when there is something to send */ +static void process_dlm_messages(struct work_struct *work) +{ + struct processqueue_entry *pentry; + + spin_lock_bh(&processqueue_lock); + pentry = list_first_entry_or_null(&processqueue, + struct processqueue_entry, list); + if (WARN_ON_ONCE(!pentry)) { + process_dlm_messages_pending = false; + spin_unlock_bh(&processqueue_lock); + return; } - /* Don't return success if we really got EOF */ - if (ret == 0) - ret = -EAGAIN; - return ret; + list_del(&pentry->list); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); + spin_unlock_bh(&processqueue_lock); + + for (;;) { + dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, + pentry->buflen); + free_processqueue_entry(pentry); + + spin_lock_bh(&processqueue_lock); + pentry = list_first_entry_or_null(&processqueue, + struct processqueue_entry, list); + if (!pentry) { + process_dlm_messages_pending = false; + spin_unlock_bh(&processqueue_lock); + break; + } + + list_del(&pentry->list); + if (atomic_dec_and_test(&processqueue_count)) + wake_up(&processqueue_wq); + spin_unlock_bh(&processqueue_lock); + } } -/* Listening socket is busy, accept a connection */ -static int tcp_accept_from_sock(struct connection *con) +/* Data received from remote end */ +static int receive_from_sock(struct connection *con, int buflen) { - int result; - struct sockaddr_storage peeraddr; - struct socket *newsock; - int len; - int nodeid; - struct connection *newcon; - struct connection *addcon; + struct processqueue_entry *pentry; + int ret, buflen_real; + struct msghdr msg; + struct kvec iov; - mutex_lock(&connections_lock); - if (!dlm_allow_conn) { - mutex_unlock(&connections_lock); - return -1; + pentry = new_processqueue_entry(con->nodeid, buflen); + if (!pentry) + return DLM_IO_RESCHED; + + memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); + + /* calculate new buffer parameter regarding last receive and + * possible leftover bytes + */ + iov.iov_base = pentry->buf + con->rx_leftover; + iov.iov_len = buflen - con->rx_leftover; + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + clear_bit(CF_RECV_INTR, &con->flags); +again: + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); + trace_dlm_recv(con->nodeid, ret); + if (ret == -EAGAIN) { + lock_sock(con->sock->sk); + if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { + release_sock(con->sock->sk); + goto again; + } + + clear_bit(CF_RECV_PENDING, &con->flags); + release_sock(con->sock->sk); + free_processqueue_entry(pentry); + return DLM_IO_END; + } else if (ret == 0) { + /* close will clear CF_RECV_PENDING */ + free_processqueue_entry(pentry); + return DLM_IO_EOF; + } else if (ret < 0) { + free_processqueue_entry(pentry); + return ret; + } + + /* new buflen according readed bytes and leftover from last receive */ + buflen_real = ret + con->rx_leftover; + ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, + buflen_real); + if (ret < 0) { + free_processqueue_entry(pentry); + return ret; } - mutex_unlock(&connections_lock); - mutex_lock_nested(&con->sock_mutex, 0); + pentry->buflen = ret; - if (!con->sock) { - mutex_unlock(&con->sock_mutex); - return -ENOTCONN; + /* calculate leftover bytes from process and put it into begin of + * the receive buffer, so next receive we have the full message + * at the start address of the receive buffer. + */ + con->rx_leftover = buflen_real - ret; + memmove(con->rx_leftover_buf, pentry->buf + ret, + con->rx_leftover); + + spin_lock_bh(&processqueue_lock); + ret = atomic_inc_return(&processqueue_count); + list_add_tail(&pentry->list, &processqueue); + if (!process_dlm_messages_pending) { + process_dlm_messages_pending = true; + queue_work(process_workqueue, &process_work); } + spin_unlock_bh(&processqueue_lock); - result = kernel_accept(con->sock, &newsock, O_NONBLOCK); - if (result < 0) + if (ret > DLM_MAX_PROCESS_BUFFERS) + return DLM_IO_FLUSH; + + return DLM_IO_SUCCESS; +} + +/* Listening socket is busy, accept a connection */ +static int accept_from_sock(void) +{ + struct sockaddr_storage peeraddr; + int len, idx, result, nodeid; + struct connection *newcon; + struct socket *newsock; + unsigned int mark; + + result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); + if (result == -EAGAIN) + return DLM_IO_END; + else if (result < 0) goto accept_err; /* Get the connected socket's peer */ @@ -764,13 +1003,30 @@ static int tcp_accept_from_sock(struct connection *con) /* Get the new node's NODEID */ make_sockaddr(&peeraddr, 0, &len); - if (addr_to_nodeid(&peeraddr, &nodeid)) { - unsigned char *b=(unsigned char *)&peeraddr; - log_print("connect from non cluster node"); - print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, - b, sizeof(struct sockaddr_storage)); + if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { + switch (peeraddr.ss_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; + + log_print("connect from non cluster IPv4 node %pI4", + &sin->sin_addr); + break; + } +#if IS_ENABLED(CONFIG_IPV6) + case AF_INET6: { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; + + log_print("connect from non cluster IPv6 node %pI6c", + &sin6->sin6_addr); + break; + } +#endif + default: + log_print("invalid family from non cluster node"); + break; + } + sock_release(newsock); - mutex_unlock(&con->sock_mutex); return -1; } @@ -781,200 +1037,69 @@ static int tcp_accept_from_sock(struct connection *con) * the same time and the connections cross on the wire. * In this case we store the incoming one in "othercon" */ - newcon = nodeid2con(nodeid, GFP_NOFS); - if (!newcon) { - result = -ENOMEM; + idx = srcu_read_lock(&connections_srcu); + newcon = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!newcon)) { + srcu_read_unlock(&connections_srcu, idx); + result = -ENOENT; goto accept_err; } - mutex_lock_nested(&newcon->sock_mutex, 1); + + sock_set_mark(newsock->sk, mark); + + down_write(&newcon->sock_lock); if (newcon->sock) { struct connection *othercon = newcon->othercon; if (!othercon) { - othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); + othercon = kzalloc(sizeof(*othercon), GFP_NOFS); if (!othercon) { log_print("failed to allocate incoming socket"); - mutex_unlock(&newcon->sock_mutex); + up_write(&newcon->sock_lock); + srcu_read_unlock(&connections_srcu, idx); result = -ENOMEM; goto accept_err; } - othercon->nodeid = nodeid; - othercon->rx_action = receive_from_sock; - mutex_init(&othercon->sock_mutex); - INIT_LIST_HEAD(&othercon->writequeue); - spin_lock_init(&othercon->writequeue_lock); - INIT_WORK(&othercon->swork, process_send_sockets); - INIT_WORK(&othercon->rwork, process_recv_sockets); - set_bit(CF_IS_OTHERCON, &othercon->flags); - } - mutex_lock_nested(&othercon->sock_mutex, 2); - if (!othercon->sock) { + + dlm_con_init(othercon, nodeid); + lockdep_set_subclass(&othercon->sock_lock, 1); newcon->othercon = othercon; - add_sock(newsock, othercon); - addcon = othercon; - mutex_unlock(&othercon->sock_mutex); - } - else { - printk("Extra connection from node %d attempted\n", nodeid); - result = -EAGAIN; - mutex_unlock(&othercon->sock_mutex); - mutex_unlock(&newcon->sock_mutex); - goto accept_err; + set_bit(CF_IS_OTHERCON, &othercon->flags); + } else { + /* close other sock con if we have something new */ + close_connection(othercon, false); } + + down_write(&othercon->sock_lock); + add_sock(newsock, othercon); + + /* check if we receved something while adding */ + lock_sock(othercon->sock->sk); + lowcomms_queue_rwork(othercon); + release_sock(othercon->sock->sk); + up_write(&othercon->sock_lock); } else { - newcon->rx_action = receive_from_sock; /* accept copies the sk after we've saved the callbacks, so we don't want to save them a second time or comm errors will result in calling sk_error_report recursively. */ add_sock(newsock, newcon); - addcon = newcon; - } - - mutex_unlock(&newcon->sock_mutex); - - /* - * Add it to the active queue in case we got data - * between processing the accept adding the socket - * to the read_sockets list - */ - if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) - queue_work(recv_workqueue, &addcon->rwork); - mutex_unlock(&con->sock_mutex); - - return 0; - -accept_err: - mutex_unlock(&con->sock_mutex); - if (newsock) - sock_release(newsock); - - if (result != -EAGAIN) - log_print("error accepting connection from node: %d", result); - return result; -} - -static int sctp_accept_from_sock(struct connection *con) -{ - /* Check that the new node is in the lockspace */ - struct sctp_prim prim; - int nodeid; - int prim_len, ret; - int addr_len; - struct connection *newcon; - struct connection *addcon; - struct socket *newsock; - - mutex_lock(&connections_lock); - if (!dlm_allow_conn) { - mutex_unlock(&connections_lock); - return -1; - } - mutex_unlock(&connections_lock); - mutex_lock_nested(&con->sock_mutex, 0); - - ret = kernel_accept(con->sock, &newsock, O_NONBLOCK); - if (ret < 0) - goto accept_err; - - memset(&prim, 0, sizeof(struct sctp_prim)); - prim_len = sizeof(struct sctp_prim); - - ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR, - (char *)&prim, &prim_len); - if (ret < 0) { - log_print("getsockopt/sctp_primary_addr failed: %d", ret); - goto accept_err; - } - - make_sockaddr(&prim.ssp_addr, 0, &addr_len); - ret = addr_to_nodeid(&prim.ssp_addr, &nodeid); - if (ret) { - unsigned char *b = (unsigned char *)&prim.ssp_addr; - - log_print("reject connect from unknown addr"); - print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, - b, sizeof(struct sockaddr_storage)); - goto accept_err; - } - - newcon = nodeid2con(nodeid, GFP_NOFS); - if (!newcon) { - ret = -ENOMEM; - goto accept_err; - } - - mutex_lock_nested(&newcon->sock_mutex, 1); - - if (newcon->sock) { - struct connection *othercon = newcon->othercon; - - if (!othercon) { - othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); - if (!othercon) { - log_print("failed to allocate incoming socket"); - mutex_unlock(&newcon->sock_mutex); - ret = -ENOMEM; - goto accept_err; - } - othercon->nodeid = nodeid; - othercon->rx_action = receive_from_sock; - mutex_init(&othercon->sock_mutex); - INIT_LIST_HEAD(&othercon->writequeue); - spin_lock_init(&othercon->writequeue_lock); - INIT_WORK(&othercon->swork, process_send_sockets); - INIT_WORK(&othercon->rwork, process_recv_sockets); - set_bit(CF_IS_OTHERCON, &othercon->flags); - } - mutex_lock_nested(&othercon->sock_mutex, 2); - if (!othercon->sock) { - newcon->othercon = othercon; - add_sock(newsock, othercon); - addcon = othercon; - mutex_unlock(&othercon->sock_mutex); - } else { - printk("Extra connection from node %d attempted\n", nodeid); - ret = -EAGAIN; - mutex_unlock(&othercon->sock_mutex); - mutex_unlock(&newcon->sock_mutex); - goto accept_err; - } - } else { - newcon->rx_action = receive_from_sock; - add_sock(newsock, newcon); - addcon = newcon; + /* check if we receved something while adding */ + lock_sock(newcon->sock->sk); + lowcomms_queue_rwork(newcon); + release_sock(newcon->sock->sk); } + up_write(&newcon->sock_lock); + srcu_read_unlock(&connections_srcu, idx); - log_print("connected to %d", nodeid); - - mutex_unlock(&newcon->sock_mutex); - - /* - * Add it to the active queue in case we got data - * between processing the accept adding the socket - * to the read_sockets list - */ - if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) - queue_work(recv_workqueue, &addcon->rwork); - mutex_unlock(&con->sock_mutex); - - return 0; + return DLM_IO_SUCCESS; accept_err: - mutex_unlock(&con->sock_mutex); if (newsock) sock_release(newsock); - if (ret != -EAGAIN) - log_print("error accepting connection from node: %d", ret); - - return ret; -} -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); + return result; } /* @@ -988,33 +1113,30 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed) { e->offset += completed; e->len -= completed; + /* signal that page was half way transmitted */ + e->dirty = true; - if (e->len == 0 && e->users == 0) { - list_del(&e->list); + if (e->len == 0 && e->users == 0) free_entry(e); - } } /* * sctp_bind_addrs - bind a SCTP socket to all our addresses */ -static int sctp_bind_addrs(struct connection *con, uint16_t port) +static int sctp_bind_addrs(struct socket *sock, __be16 port) { struct sockaddr_storage localaddr; + struct sockaddr_unsized *addr = (struct sockaddr_unsized *)&localaddr; int i, addr_len, result = 0; for (i = 0; i < dlm_local_count; i++) { - memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); + memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); make_sockaddr(&localaddr, port, &addr_len); if (!i) - result = kernel_bind(con->sock, - (struct sockaddr *)&localaddr, - addr_len); + result = kernel_bind(sock, addr, addr_len); else - result = kernel_setsockopt(con->sock, SOL_SCTP, - SCTP_SOCKOPT_BINDX_ADD, - (char *)&localaddr, addr_len); + result = sock_bind_add(sock->sk, addr, addr_len); if (result < 0) { log_print("Can't bind to %d addr number %d, %d.\n", @@ -1025,719 +1147,769 @@ static int sctp_bind_addrs(struct connection *con, uint16_t port) return result; } -/* Initiate an SCTP association. - This is a special case of send_to_sock() in that we don't yet have a - peeled-off socket for this association, so we use the listening socket - and add the primary IP address of the remote node. - */ -static void sctp_connect_to_sock(struct connection *con) +/* Get local addresses */ +static void init_local(void) { - struct sockaddr_storage daddr; - int one = 1; - int result; - int addr_len; - struct socket *sock; - struct timeval tv = { .tv_sec = 5, .tv_usec = 0 }; + struct sockaddr_storage sas; + int i; - if (con->nodeid == 0) { - log_print("attempt to connect sock 0 foiled"); - return; - } + dlm_local_count = 0; + for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { + if (dlm_our_addr(&sas, i)) + break; - mutex_lock(&con->sock_mutex); + memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); + } +} - /* Some odd races can cause double-connects, ignore them */ - if (con->retries++ > MAX_CONNECT_RETRIES) - goto out; +static struct writequeue_entry *new_writequeue_entry(struct connection *con) +{ + struct writequeue_entry *entry; - if (con->sock) { - log_print("node %d already connected.", con->nodeid); - goto out; - } + entry = dlm_allocate_writequeue(); + if (!entry) + return NULL; - memset(&daddr, 0, sizeof(daddr)); - result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); - if (result < 0) { - log_print("no address for nodeid %d", con->nodeid); - goto out; + entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); + if (!entry->page) { + dlm_free_writequeue(entry); + return NULL; } - /* Create a socket to communicate with */ - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_SCTP, &sock); - if (result < 0) - goto socket_err; - - con->rx_action = receive_from_sock; - con->connect_action = sctp_connect_to_sock; - add_sock(sock, con); + entry->offset = 0; + entry->len = 0; + entry->end = 0; + entry->dirty = false; + entry->con = con; + entry->users = 1; + kref_init(&entry->ref); + return entry; +} - /* Bind to all addresses. */ - if (sctp_bind_addrs(con, 0)) - goto bind_err; +static struct writequeue_entry *new_wq_entry(struct connection *con, int len, + char **ppc, void (*cb)(void *data), + void *data) +{ + struct writequeue_entry *e; - make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); + spin_lock_bh(&con->writequeue_lock); + if (!list_empty(&con->writequeue)) { + e = list_last_entry(&con->writequeue, struct writequeue_entry, list); + if (DLM_WQ_REMAIN_BYTES(e) >= len) { + kref_get(&e->ref); - log_print("connecting to %d", con->nodeid); + *ppc = page_address(e->page) + e->end; + if (cb) + cb(data); - /* Turn off Nagle's algorithm */ - kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, - sizeof(one)); + e->end += len; + e->users++; + goto out; + } + } - /* - * Make sock->ops->connect() function return in specified time, - * since O_NONBLOCK argument in connect() function does not work here, - * then, we should restore the default value of this attribute. - */ - kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, - sizeof(tv)); - result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, - 0); - memset(&tv, 0, sizeof(tv)); - kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, - sizeof(tv)); - - if (result == -EINPROGRESS) - result = 0; - if (result == 0) + e = new_writequeue_entry(con); + if (!e) goto out; -bind_err: - con->sock = NULL; - sock_release(sock); + kref_get(&e->ref); + *ppc = page_address(e->page); + e->end += len; + if (cb) + cb(data); -socket_err: - /* - * Some errors are fatal and this list might need adjusting. For other - * errors we try again until the max number of retries is reached. - */ - if (result != -EHOSTUNREACH && - result != -ENETUNREACH && - result != -ENETDOWN && - result != -EINVAL && - result != -EPROTONOSUPPORT) { - log_print("connect %d try %d error %d", con->nodeid, - con->retries, result); - mutex_unlock(&con->sock_mutex); - msleep(1000); - lowcomms_connect_sock(con); - return; - } + list_add_tail(&e->list, &con->writequeue); out: - mutex_unlock(&con->sock_mutex); -} + spin_unlock_bh(&con->writequeue_lock); + return e; +}; -/* Connect a new socket to its peer */ -static void tcp_connect_to_sock(struct connection *con) +static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, + char **ppc, void (*cb)(void *data), + void *data) { - struct sockaddr_storage saddr, src_addr; - int addr_len; - struct socket *sock = NULL; - int one = 1; - int result; + struct writequeue_entry *e; + struct dlm_msg *msg; - if (con->nodeid == 0) { - log_print("attempt to connect sock 0 foiled"); - return; - } + msg = dlm_allocate_msg(); + if (!msg) + return NULL; - mutex_lock(&con->sock_mutex); - if (con->retries++ > MAX_CONNECT_RETRIES) - goto out; + kref_init(&msg->ref); - /* Some odd races can cause double-connects, ignore them */ - if (con->sock) - goto out; + e = new_wq_entry(con, len, ppc, cb, data); + if (!e) { + dlm_free_msg(msg); + return NULL; + } - /* Create a socket to communicate with */ - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_TCP, &sock); - if (result < 0) - goto out_err; + msg->retransmit = false; + msg->orig_msg = NULL; + msg->ppc = *ppc; + msg->len = len; + msg->entry = e; - memset(&saddr, 0, sizeof(saddr)); - result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); - if (result < 0) { - log_print("no address for nodeid %d", con->nodeid); - goto out_err; + return msg; +} + +/* avoid false positive for nodes_srcu, unlock happens in + * dlm_lowcomms_commit_msg which is a must call if success + */ +#ifndef __CHECKER__ +struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc, + void (*cb)(void *data), void *data) +{ + struct connection *con; + struct dlm_msg *msg; + int idx; + + if (len > DLM_MAX_SOCKET_BUFSIZE || + len < sizeof(struct dlm_header)) { + BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); + log_print("failed to allocate a buffer of size %d", len); + WARN_ON_ONCE(1); + return NULL; } - con->rx_action = receive_from_sock; - con->connect_action = tcp_connect_to_sock; - add_sock(sock, con); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!con)) { + srcu_read_unlock(&connections_srcu, idx); + return NULL; + } - /* Bind to our cluster-known address connecting to avoid - routing problems */ - memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); - make_sockaddr(&src_addr, 0, &addr_len); - result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, - addr_len); - if (result < 0) { - log_print("could not bind for connect: %d", result); - /* This *may* not indicate a critical error */ + msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data); + if (!msg) { + srcu_read_unlock(&connections_srcu, idx); + return NULL; } - make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); + /* for dlm_lowcomms_commit_msg() */ + kref_get(&msg->ref); + /* we assume if successful commit must called */ + msg->idx = idx; + return msg; +} +#endif + +static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) +{ + struct writequeue_entry *e = msg->entry; + struct connection *con = e->con; + int users; - log_print("connecting to %d", con->nodeid); + spin_lock_bh(&con->writequeue_lock); + kref_get(&msg->ref); + list_add(&msg->list, &e->msgs); - /* Turn off Nagle's algorithm */ - kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, - sizeof(one)); - - result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, - O_NONBLOCK); - if (result == -EINPROGRESS) - result = 0; - if (result == 0) + users = --e->users; + if (users) goto out; -out_err: - if (con->sock) { - sock_release(con->sock); - con->sock = NULL; - } else if (sock) { - sock_release(sock); - } - /* - * Some errors are fatal and this list might need adjusting. For other - * errors we try again until the max number of retries is reached. - */ - if (result != -EHOSTUNREACH && - result != -ENETUNREACH && - result != -ENETDOWN && - result != -EINVAL && - result != -EPROTONOSUPPORT) { - log_print("connect %d try %d error %d", con->nodeid, - con->retries, result); - mutex_unlock(&con->sock_mutex); - msleep(1000); - lowcomms_connect_sock(con); - return; - } + e->len = DLM_WQ_LENGTH_BYTES(e); + + lowcomms_queue_swork(con); + out: - mutex_unlock(&con->sock_mutex); + spin_unlock_bh(&con->writequeue_lock); return; } -static struct socket *tcp_create_listen_sock(struct connection *con, - struct sockaddr_storage *saddr) +/* avoid false positive for nodes_srcu, lock was happen in + * dlm_lowcomms_new_msg + */ +#ifndef __CHECKER__ +void dlm_lowcomms_commit_msg(struct dlm_msg *msg) { - struct socket *sock = NULL; - int result = 0; - int one = 1; - int addr_len; + _dlm_lowcomms_commit_msg(msg); + srcu_read_unlock(&connections_srcu, msg->idx); + /* because dlm_lowcomms_new_msg() */ + kref_put(&msg->ref, dlm_msg_release); +} +#endif - if (dlm_local_addr[0]->ss_family == AF_INET) - addr_len = sizeof(struct sockaddr_in); - else - addr_len = sizeof(struct sockaddr_in6); +void dlm_lowcomms_put_msg(struct dlm_msg *msg) +{ + kref_put(&msg->ref, dlm_msg_release); +} - /* Create a socket to communicate with */ - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_TCP, &sock); - if (result < 0) { - log_print("Can't create listening comms socket"); - goto create_out; - } +/* does not held connections_srcu, usage lowcomms_error_report only */ +int dlm_lowcomms_resend_msg(struct dlm_msg *msg) +{ + struct dlm_msg *msg_resend; + char *ppc; - /* Turn off Nagle's algorithm */ - kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, - sizeof(one)); + if (msg->retransmit) + return 1; - result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&one, sizeof(one)); + msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, + NULL, NULL); + if (!msg_resend) + return -ENOMEM; - if (result < 0) { - log_print("Failed to set SO_REUSEADDR on socket: %d", result); - } - write_lock_bh(&sock->sk->sk_callback_lock); - sock->sk->sk_user_data = con; - save_listen_callbacks(sock); - con->rx_action = tcp_accept_from_sock; - con->connect_action = tcp_connect_to_sock; - write_unlock_bh(&sock->sk->sk_callback_lock); + msg->retransmit = true; + kref_get(&msg->ref); + msg_resend->orig_msg = msg; - /* Bind to our port */ - make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); - result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); - if (result < 0) { - log_print("Can't bind to port %d", dlm_config.ci_tcp_port); - sock_release(sock); - sock = NULL; - con->sock = NULL; - goto create_out; - } - result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, - (char *)&one, sizeof(one)); - if (result < 0) { - log_print("Set keepalive failed: %d", result); - } + memcpy(ppc, msg->ppc, msg->len); + _dlm_lowcomms_commit_msg(msg_resend); + dlm_lowcomms_put_msg(msg_resend); - result = sock->ops->listen(sock, 5); - if (result < 0) { - log_print("Can't listen on port %d", dlm_config.ci_tcp_port); - sock_release(sock); - sock = NULL; - goto create_out; + return 0; +} + +/* Send a message */ +static int send_to_sock(struct connection *con) +{ + struct writequeue_entry *e; + struct bio_vec bvec; + struct msghdr msg = { + .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL, + }; + int len, offset, ret; + + spin_lock_bh(&con->writequeue_lock); + e = con_next_wq(con); + if (!e) { + clear_bit(CF_SEND_PENDING, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + return DLM_IO_END; + } + + len = e->len; + offset = e->offset; + WARN_ON_ONCE(len == 0 && e->users == 0); + spin_unlock_bh(&con->writequeue_lock); + + bvec_set_page(&bvec, e->page, len, offset); + iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len); + ret = sock_sendmsg(con->sock, &msg); + trace_dlm_send(con->nodeid, ret); + if (ret == -EAGAIN || ret == 0) { + lock_sock(con->sock->sk); + spin_lock_bh(&con->writequeue_lock); + if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && + !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { + /* Notify TCP that we're limited by the + * application window size. + */ + set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); + con->sock->sk->sk_write_pending++; + + clear_bit(CF_SEND_PENDING, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + release_sock(con->sock->sk); + + /* wait for write_space() event */ + return DLM_IO_END; + } + spin_unlock_bh(&con->writequeue_lock); + release_sock(con->sock->sk); + + return DLM_IO_RESCHED; + } else if (ret < 0) { + return ret; } -create_out: - return sock; + spin_lock_bh(&con->writequeue_lock); + writequeue_entry_complete(e, ret); + spin_unlock_bh(&con->writequeue_lock); + + return DLM_IO_SUCCESS; } -/* Get local addresses */ -static void init_local(void) +static void clean_one_writequeue(struct connection *con) { - struct sockaddr_storage sas, *addr; - int i; - - dlm_local_count = 0; - for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { - if (dlm_our_addr(&sas, i)) - break; + struct writequeue_entry *e, *safe; - addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); - if (!addr) - break; - dlm_local_addr[dlm_local_count++] = addr; + spin_lock_bh(&con->writequeue_lock); + list_for_each_entry_safe(e, safe, &con->writequeue, list) { + free_entry(e); } + spin_unlock_bh(&con->writequeue_lock); } -/* Initialise SCTP socket and bind to all interfaces */ -static int sctp_listen_for_all(void) +static void connection_release(struct rcu_head *rcu) { - struct socket *sock = NULL; - int result = -EINVAL; - struct connection *con = nodeid2con(0, GFP_NOFS); - int bufsize = NEEDED_RMEM; - int one = 1; + struct connection *con = container_of(rcu, struct connection, rcu); - if (!con) - return -ENOMEM; + WARN_ON_ONCE(!list_empty(&con->writequeue)); + WARN_ON_ONCE(con->sock); + kfree(con); +} + +/* Called from recovery when it knows that a node has + left the cluster */ +int dlm_lowcomms_close(int nodeid) +{ + struct connection *con; + int idx; - log_print("Using SCTP for communications"); + log_print("closing connection to node %d", nodeid); - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, - SOCK_STREAM, IPPROTO_SCTP, &sock); - if (result < 0) { - log_print("Can't create comms socket, check SCTP is loaded"); - goto out; + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!con)) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; } - result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, - (char *)&bufsize, sizeof(bufsize)); - if (result) - log_print("Error increasing buffer space on socket %d", result); + stop_connection_io(con); + log_print("io handling for node: %d stopped", nodeid); + close_connection(con, true); - result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, - sizeof(one)); - if (result < 0) - log_print("Could not set SCTP NODELAY error %d\n", result); + spin_lock(&connections_lock); + hlist_del_rcu(&con->list); + spin_unlock(&connections_lock); - write_lock_bh(&sock->sk->sk_callback_lock); - /* Init con struct */ - sock->sk->sk_user_data = con; - save_listen_callbacks(sock); - con->sock = sock; - con->sock->sk->sk_data_ready = lowcomms_data_ready; - con->rx_action = sctp_accept_from_sock; - con->connect_action = sctp_connect_to_sock; + clean_one_writequeue(con); + call_srcu(&connections_srcu, &con->rcu, connection_release); + if (con->othercon) { + clean_one_writequeue(con->othercon); + call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); + } + srcu_read_unlock(&connections_srcu, idx); + + /* for debugging we print when we are done to compare with other + * messages in between. This function need to be correctly synchronized + * with io handling + */ + log_print("closing connection to node %d done", nodeid); - write_unlock_bh(&sock->sk->sk_callback_lock); + return 0; +} - /* Bind to all addresses. */ - if (sctp_bind_addrs(con, dlm_config.ci_tcp_port)) - goto create_delsock; +/* Receive worker function */ +static void process_recv_sockets(struct work_struct *work) +{ + struct connection *con = container_of(work, struct connection, rwork); + int ret, buflen; - result = sock->ops->listen(sock, 5); - if (result < 0) { - log_print("Can't set socket listening"); - goto create_delsock; + down_read(&con->sock_lock); + if (!con->sock) { + up_read(&con->sock_lock); + return; } - return 0; + buflen = READ_ONCE(dlm_config.ci_buffer_size); + do { + ret = receive_from_sock(con, buflen); + } while (ret == DLM_IO_SUCCESS); + up_read(&con->sock_lock); -create_delsock: - sock_release(sock); - con->sock = NULL; -out: - return result; + switch (ret) { + case DLM_IO_END: + /* CF_RECV_PENDING cleared */ + break; + case DLM_IO_EOF: + close_connection(con, false); + wake_up(&con->shutdown_wait); + /* CF_RECV_PENDING cleared */ + break; + case DLM_IO_FLUSH: + /* we can't flush the process_workqueue here because a + * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non + * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead + * we have a waitqueue to wait until all messages are + * processed. + * + * This handling is only necessary to backoff the sender and + * not queue all messages from the socket layer into DLM + * processqueue. When DLM is capable to parse multiple messages + * on an e.g. per socket basis this handling can might be + * removed. Especially in a message burst we are too slow to + * process messages and the queue will fill up memory. + */ + wait_event(processqueue_wq, !atomic_read(&processqueue_count)); + fallthrough; + case DLM_IO_RESCHED: + cond_resched(); + queue_work(io_workqueue, &con->rwork); + /* CF_RECV_PENDING not cleared */ + break; + default: + if (ret < 0) { + if (test_bit(CF_IS_OTHERCON, &con->flags)) { + close_connection(con, false); + } else { + spin_lock_bh(&con->writequeue_lock); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); + } + + /* CF_RECV_PENDING cleared for othercon + * we trigger send queue if not already done + * and process_send_sockets will handle it + */ + break; + } + + WARN_ON_ONCE(1); + break; + } } -static int tcp_listen_for_all(void) +static void process_listen_recv_socket(struct work_struct *work) { - struct socket *sock = NULL; - struct connection *con = nodeid2con(0, GFP_NOFS); - int result = -EINVAL; + int ret; - if (!con) - return -ENOMEM; + if (WARN_ON_ONCE(!listen_con.sock)) + return; - /* We don't support multi-homed hosts */ - if (dlm_local_addr[1] != NULL) { - log_print("TCP protocol can't handle multi-homed hosts, " - "try SCTP"); - return -EINVAL; + do { + ret = accept_from_sock(); + } while (ret == DLM_IO_SUCCESS); + + if (ret < 0) + log_print("critical error accepting connection: %d", ret); +} + +static int dlm_connect(struct connection *con) +{ + struct sockaddr_storage addr; + int result, addr_len; + struct socket *sock; + unsigned int mark; + + memset(&addr, 0, sizeof(addr)); + result = nodeid_to_addr(con->nodeid, &addr, NULL, + dlm_proto_ops->try_new_addr, &mark); + if (result < 0) { + log_print("no address for nodeid %d", con->nodeid); + return result; } - log_print("Using TCP for communications"); + /* Create a socket to communicate with */ + result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, + SOCK_STREAM, dlm_proto_ops->proto, &sock); + if (result < 0) + return result; + + sock_set_mark(sock->sk, mark); + dlm_proto_ops->sockopts(sock); - sock = tcp_create_listen_sock(con, dlm_local_addr[0]); - if (sock) { - add_sock(sock, con); - result = 0; + result = dlm_proto_ops->bind(sock); + if (result < 0) { + sock_release(sock); + return result; } - else { - result = -EADDRINUSE; + + add_sock(sock, con); + + log_print_ratelimited("connecting to %d", con->nodeid); + make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); + result = kernel_connect(sock, (struct sockaddr_unsized *)&addr, addr_len, 0); + switch (result) { + case -EINPROGRESS: + /* not an error */ + fallthrough; + case 0: + break; + default: + if (result < 0) + dlm_close_sock(&con->sock); + + break; } return result; } - - -static struct writequeue_entry *new_writequeue_entry(struct connection *con, - gfp_t allocation) +/* Send worker function */ +static void process_send_sockets(struct work_struct *work) { - struct writequeue_entry *entry; + struct connection *con = container_of(work, struct connection, swork); + int ret; - entry = kmalloc(sizeof(struct writequeue_entry), allocation); - if (!entry) - return NULL; + WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); - entry->page = alloc_page(allocation); - if (!entry->page) { - kfree(entry); - return NULL; + down_read(&con->sock_lock); + if (!con->sock) { + up_read(&con->sock_lock); + down_write(&con->sock_lock); + if (!con->sock) { + ret = dlm_connect(con); + switch (ret) { + case 0: + break; + default: + /* CF_SEND_PENDING not cleared */ + up_write(&con->sock_lock); + log_print("connect to node %d try %d error %d", + con->nodeid, con->retries++, ret); + msleep(1000); + /* For now we try forever to reconnect. In + * future we should send a event to cluster + * manager to fence itself after certain amount + * of retries. + */ + queue_work(io_workqueue, &con->swork); + return; + } + } + downgrade_write(&con->sock_lock); } - entry->offset = 0; - entry->len = 0; - entry->end = 0; - entry->users = 0; - entry->con = con; + do { + ret = send_to_sock(con); + } while (ret == DLM_IO_SUCCESS); + up_read(&con->sock_lock); - return entry; + switch (ret) { + case DLM_IO_END: + /* CF_SEND_PENDING cleared */ + break; + case DLM_IO_RESCHED: + /* CF_SEND_PENDING not cleared */ + cond_resched(); + queue_work(io_workqueue, &con->swork); + break; + default: + if (ret < 0) { + close_connection(con, false); + + /* CF_SEND_PENDING cleared */ + spin_lock_bh(&con->writequeue_lock); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); + break; + } + + WARN_ON_ONCE(1); + break; + } } -void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) +static void work_stop(void) { - struct connection *con; - struct writequeue_entry *e; - int offset = 0; - - con = nodeid2con(nodeid, allocation); - if (!con) - return NULL; + if (io_workqueue) { + destroy_workqueue(io_workqueue); + io_workqueue = NULL; + } - spin_lock(&con->writequeue_lock); - e = list_entry(con->writequeue.prev, struct writequeue_entry, list); - if ((&e->list == &con->writequeue) || - (PAGE_SIZE - e->end < len)) { - e = NULL; - } else { - offset = e->end; - e->end += len; - e->users++; + if (process_workqueue) { + destroy_workqueue(process_workqueue); + process_workqueue = NULL; } - spin_unlock(&con->writequeue_lock); +} - if (e) { - got_one: - *ppc = page_address(e->page) + offset; - return e; +static int work_start(void) +{ + io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM | + WQ_UNBOUND, 0); + if (!io_workqueue) { + log_print("can't start dlm_io"); + return -ENOMEM; } - e = new_writequeue_entry(con, allocation); - if (e) { - spin_lock(&con->writequeue_lock); - offset = e->end; - e->end += len; - e->users++; - list_add_tail(&e->list, &con->writequeue); - spin_unlock(&con->writequeue_lock); - goto got_one; + process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH | WQ_PERCPU, 0); + if (!process_workqueue) { + log_print("can't start dlm_process"); + destroy_workqueue(io_workqueue); + io_workqueue = NULL; + return -ENOMEM; } - return NULL; + + return 0; } -void dlm_lowcomms_commit_buffer(void *mh) +void dlm_lowcomms_shutdown(void) { - struct writequeue_entry *e = (struct writequeue_entry *)mh; - struct connection *con = e->con; - int users; + struct connection *con; + int i, idx; - spin_lock(&con->writequeue_lock); - users = --e->users; - if (users) - goto out; - e->len = e->end - e->offset; - spin_unlock(&con->writequeue_lock); + /* stop lowcomms_listen_data_ready calls */ + lock_sock(listen_con.sock->sk); + listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; + release_sock(listen_con.sock->sk); - queue_work(send_workqueue, &con->swork); - return; + cancel_work_sync(&listen_con.rwork); + dlm_close_sock(&listen_con.sock); -out: - spin_unlock(&con->writequeue_lock); - return; + idx = srcu_read_lock(&connections_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(con, &connection_hash[i], list) { + shutdown_connection(con, true); + stop_connection_io(con); + flush_workqueue(process_workqueue); + close_connection(con, true); + + clean_one_writequeue(con); + if (con->othercon) + clean_one_writequeue(con->othercon); + allow_connection_io(con); + } + } + srcu_read_unlock(&connections_srcu, idx); } -/* Send a message */ -static void send_to_sock(struct connection *con) +void dlm_lowcomms_stop(void) { - int ret = 0; - const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; - struct writequeue_entry *e; - int len, offset; - int count = 0; - - mutex_lock(&con->sock_mutex); - if (con->sock == NULL) - goto out_connect; + work_stop(); + dlm_proto_ops = NULL; +} - spin_lock(&con->writequeue_lock); - for (;;) { - e = list_entry(con->writequeue.next, struct writequeue_entry, - list); - if ((struct list_head *) e == &con->writequeue) - break; +static int dlm_listen_for_all(void) +{ + struct socket *sock; + int result; - len = e->len; - offset = e->offset; - BUG_ON(len == 0 && e->users == 0); - spin_unlock(&con->writequeue_lock); - - ret = 0; - if (len) { - ret = kernel_sendpage(con->sock, e->page, offset, len, - msg_flags); - if (ret == -EAGAIN || ret == 0) { - if (ret == -EAGAIN && - test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && - !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { - /* Notify TCP that we're limited by the - * application window size. - */ - set_bit(SOCK_NOSPACE, &con->sock->flags); - con->sock->sk->sk_write_pending++; - } - cond_resched(); - goto out; - } else if (ret < 0) - goto send_error; - } + log_print("Using %s for communications", + dlm_proto_ops->name); - /* Don't starve people filling buffers */ - if (++count >= MAX_SEND_MSG_COUNT) { - cond_resched(); - count = 0; - } + result = dlm_proto_ops->listen_validate(); + if (result < 0) + return result; - spin_lock(&con->writequeue_lock); - writequeue_entry_complete(e, ret); + result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, + SOCK_STREAM, dlm_proto_ops->proto, &sock); + if (result < 0) { + log_print("Can't create comms socket: %d", result); + return result; } - spin_unlock(&con->writequeue_lock); -out: - mutex_unlock(&con->sock_mutex); - return; -send_error: - mutex_unlock(&con->sock_mutex); - close_connection(con, true, false, true); - /* Requeue the send work. When the work daemon runs again, it will try - a new connection, then call this function again. */ - queue_work(send_workqueue, &con->swork); - return; + sock_set_mark(sock->sk, dlm_config.ci_mark); + dlm_proto_ops->listen_sockopts(sock); -out_connect: - mutex_unlock(&con->sock_mutex); - queue_work(send_workqueue, &con->swork); - cond_resched(); -} + result = dlm_proto_ops->listen_bind(sock); + if (result < 0) + goto out; -static void clean_one_writequeue(struct connection *con) -{ - struct writequeue_entry *e, *safe; + lock_sock(sock->sk); + listen_sock.sk_data_ready = sock->sk->sk_data_ready; + listen_sock.sk_write_space = sock->sk->sk_write_space; + listen_sock.sk_error_report = sock->sk->sk_error_report; + listen_sock.sk_state_change = sock->sk->sk_state_change; - spin_lock(&con->writequeue_lock); - list_for_each_entry_safe(e, safe, &con->writequeue, list) { - list_del(&e->list); - free_entry(e); + listen_con.sock = sock; + + sock->sk->sk_allocation = GFP_NOFS; + sock->sk->sk_use_task_frag = false; + sock->sk->sk_data_ready = lowcomms_listen_data_ready; + release_sock(sock->sk); + + result = sock->ops->listen(sock, 128); + if (result < 0) { + dlm_close_sock(&listen_con.sock); + return result; } - spin_unlock(&con->writequeue_lock); + + return 0; + +out: + sock_release(sock); + return result; } -/* Called from recovery when it knows that a node has - left the cluster */ -int dlm_lowcomms_close(int nodeid) +static int dlm_tcp_bind(struct socket *sock) { - struct connection *con; - struct dlm_node_addr *na; + struct sockaddr_storage src_addr; + int result, addr_len; - log_print("closing connection to node %d", nodeid); - con = nodeid2con(nodeid, 0); - if (con) { - set_bit(CF_CLOSE, &con->flags); - close_connection(con, true, true, true); - clean_one_writequeue(con); - } + /* Bind to our cluster-known address connecting to avoid + * routing problems. + */ + memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); + make_sockaddr(&src_addr, 0, &addr_len); - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (na) { - list_del(&na->list); - while (na->addr_count--) - kfree(na->addr[na->addr_count]); - kfree(na); + result = kernel_bind(sock, (struct sockaddr_unsized *)&src_addr, + addr_len); + if (result < 0) { + /* This *may* not indicate a critical error */ + log_print("could not bind for connect: %d", result); } - spin_unlock(&dlm_node_addrs_spin); return 0; } -/* Receive workqueue function */ -static void process_recv_sockets(struct work_struct *work) +static int dlm_tcp_listen_validate(void) { - struct connection *con = container_of(work, struct connection, rwork); - int err; + /* We don't support multi-homed hosts */ + if (dlm_local_count > 1) { + log_print("Detect multi-homed hosts but use only the first IP address."); + log_print("Try SCTP, if you want to enable multi-link."); + } - clear_bit(CF_READ_PENDING, &con->flags); - do { - err = con->rx_action(con); - } while (!err); + return 0; } -/* Send workqueue function */ -static void process_send_sockets(struct work_struct *work) +static void dlm_tcp_sockopts(struct socket *sock) { - struct connection *con = container_of(work, struct connection, swork); - - clear_bit(CF_WRITE_PENDING, &con->flags); - if (con->sock == NULL) /* not mutex protected so check it inside too */ - con->connect_action(con); - if (!list_empty(&con->writequeue)) - send_to_sock(con); + /* Turn off Nagle's algorithm */ + tcp_sock_set_nodelay(sock->sk); } - -/* Discard all entries on the write queues */ -static void clean_writequeues(void) +static void dlm_tcp_listen_sockopts(struct socket *sock) { - foreach_conn(clean_one_writequeue); + dlm_tcp_sockopts(sock); + sock_set_reuseaddr(sock->sk); } -static void work_stop(void) +static int dlm_tcp_listen_bind(struct socket *sock) { - destroy_workqueue(recv_workqueue); - destroy_workqueue(send_workqueue); -} + int addr_len; -static int work_start(void) -{ - recv_workqueue = alloc_workqueue("dlm_recv", - WQ_UNBOUND | WQ_MEM_RECLAIM, 1); - if (!recv_workqueue) { - log_print("can't start dlm_recv"); - return -ENOMEM; - } + /* Bind to our port */ + make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); + return kernel_bind(sock, (struct sockaddr_unsized *)&dlm_local_addr[0], + addr_len); +} - send_workqueue = alloc_workqueue("dlm_send", - WQ_UNBOUND | WQ_MEM_RECLAIM, 1); - if (!send_workqueue) { - log_print("can't start dlm_send"); - destroy_workqueue(recv_workqueue); - return -ENOMEM; - } +static const struct dlm_proto_ops dlm_tcp_ops = { + .name = "TCP", + .proto = IPPROTO_TCP, + .how = SHUT_WR, + .sockopts = dlm_tcp_sockopts, + .bind = dlm_tcp_bind, + .listen_validate = dlm_tcp_listen_validate, + .listen_sockopts = dlm_tcp_listen_sockopts, + .listen_bind = dlm_tcp_listen_bind, +}; - return 0; +static int dlm_sctp_bind(struct socket *sock) +{ + return sctp_bind_addrs(sock, 0); } -static void _stop_conn(struct connection *con, bool and_other) +static int dlm_sctp_listen_validate(void) { - mutex_lock(&con->sock_mutex); - set_bit(CF_CLOSE, &con->flags); - set_bit(CF_READ_PENDING, &con->flags); - set_bit(CF_WRITE_PENDING, &con->flags); - if (con->sock && con->sock->sk) { - write_lock_bh(&con->sock->sk->sk_callback_lock); - con->sock->sk->sk_user_data = NULL; - write_unlock_bh(&con->sock->sk->sk_callback_lock); + if (!IS_ENABLED(CONFIG_IP_SCTP)) { + log_print("SCTP is not enabled by this kernel"); + return -EOPNOTSUPP; } - if (con->othercon && and_other) - _stop_conn(con->othercon, false); - mutex_unlock(&con->sock_mutex); -} -static void stop_conn(struct connection *con) -{ - _stop_conn(con, true); + request_module("sctp"); + return 0; } -static void free_conn(struct connection *con) +static int dlm_sctp_bind_listen(struct socket *sock) { - close_connection(con, true, true, true); - if (con->othercon) - kmem_cache_free(con_cache, con->othercon); - hlist_del(&con->list); - kmem_cache_free(con_cache, con); + return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); } -static void work_flush(void) +static void dlm_sctp_sockopts(struct socket *sock) { - int ok; - int i; - struct hlist_node *n; - struct connection *con; - - flush_workqueue(recv_workqueue); - flush_workqueue(send_workqueue); - do { - ok = 1; - foreach_conn(stop_conn); - flush_workqueue(recv_workqueue); - flush_workqueue(send_workqueue); - for (i = 0; i < CONN_HASH_SIZE && ok; i++) { - hlist_for_each_entry_safe(con, n, - &connection_hash[i], list) { - ok &= test_bit(CF_READ_PENDING, &con->flags); - ok &= test_bit(CF_WRITE_PENDING, &con->flags); - if (con->othercon) { - ok &= test_bit(CF_READ_PENDING, - &con->othercon->flags); - ok &= test_bit(CF_WRITE_PENDING, - &con->othercon->flags); - } - } - } - } while (!ok); + /* Turn off Nagle's algorithm */ + sctp_sock_set_nodelay(sock->sk); + sock_set_rcvbuf(sock->sk, NEEDED_RMEM); } -void dlm_lowcomms_stop(void) -{ - /* Set all the flags to prevent any - socket activity. - */ - mutex_lock(&connections_lock); - dlm_allow_conn = 0; - mutex_unlock(&connections_lock); - work_flush(); - clean_writequeues(); - foreach_conn(free_conn); - work_stop(); - - kmem_cache_destroy(con_cache); -} +static const struct dlm_proto_ops dlm_sctp_ops = { + .name = "SCTP", + .proto = IPPROTO_SCTP, + .how = SHUT_RDWR, + .try_new_addr = true, + .sockopts = dlm_sctp_sockopts, + .bind = dlm_sctp_bind, + .listen_validate = dlm_sctp_listen_validate, + .listen_sockopts = dlm_sctp_sockopts, + .listen_bind = dlm_sctp_bind_listen, +}; int dlm_lowcomms_start(void) { - int error = -EINVAL; - struct connection *con; - int i; - - for (i = 0; i < CONN_HASH_SIZE; i++) - INIT_HLIST_HEAD(&connection_hash[i]); + int error; init_local(); if (!dlm_local_count) { @@ -1746,52 +1918,66 @@ int dlm_lowcomms_start(void) goto fail; } - error = -ENOMEM; - con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), - __alignof__(struct connection), 0, - NULL); - if (!con_cache) - goto fail; - error = work_start(); if (error) - goto fail_destroy; - - dlm_allow_conn = 1; + goto fail; /* Start listening */ - if (dlm_config.ci_protocol == 0) - error = tcp_listen_for_all(); - else - error = sctp_listen_for_all(); + switch (dlm_config.ci_protocol) { + case DLM_PROTO_TCP: + dlm_proto_ops = &dlm_tcp_ops; + break; + case DLM_PROTO_SCTP: + dlm_proto_ops = &dlm_sctp_ops; + break; + default: + log_print("Invalid protocol identifier %d set", + dlm_config.ci_protocol); + error = -EINVAL; + goto fail_proto_ops; + } + + error = dlm_listen_for_all(); if (error) - goto fail_unlisten; + goto fail_listen; return 0; -fail_unlisten: - dlm_allow_conn = 0; - con = nodeid2con(0,0); - if (con) { - close_connection(con, false, true, true); - kmem_cache_free(con_cache, con); - } -fail_destroy: - kmem_cache_destroy(con_cache); +fail_listen: + dlm_proto_ops = NULL; +fail_proto_ops: + work_stop(); fail: return error; } +void dlm_lowcomms_init(void) +{ + int i; + + for (i = 0; i < CONN_HASH_SIZE; i++) + INIT_HLIST_HEAD(&connection_hash[i]); + + INIT_WORK(&listen_con.rwork, process_listen_recv_socket); +} + void dlm_lowcomms_exit(void) { - struct dlm_node_addr *na, *safe; + struct connection *con; + int i, idx; - spin_lock(&dlm_node_addrs_spin); - list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { - list_del(&na->list); - while (na->addr_count--) - kfree(na->addr[na->addr_count]); - kfree(na); + idx = srcu_read_lock(&connections_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(con, &connection_hash[i], list) { + spin_lock(&connections_lock); + hlist_del_rcu(&con->list); + spin_unlock(&connections_lock); + + if (con->othercon) + call_srcu(&connections_srcu, &con->othercon->rcu, + connection_release); + call_srcu(&connections_srcu, &con->rcu, connection_release); + } } - spin_unlock(&dlm_node_addrs_spin); + srcu_read_unlock(&connections_srcu, idx); } |
