summaryrefslogtreecommitdiff
path: root/fs/dlm/lowcomms.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms.c')
-rw-r--r--fs/dlm/lowcomms.c411
1 files changed, 327 insertions, 84 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 166e36fcf3e4..0ea9ae35da0b 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -59,7 +59,6 @@
#include "config.h"
#define NEEDED_RMEM (4*1024*1024)
-#define CONN_HASH_SIZE 32
/* Number of messages to send before rescheduling */
#define MAX_SEND_MSG_COUNT 25
@@ -79,14 +78,20 @@ struct connection {
#define CF_CLOSING 8
#define CF_SHUTDOWN 9
#define CF_CONNECTED 10
+#define CF_RECONNECT 11
+#define CF_DELAY_CONNECT 12
+#define CF_EOF 13
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
+ atomic_t writequeue_cnt;
void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
+ bool (*eof_condition)(struct connection *con); /* What to do to eof check */
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
struct connection *othercon;
+ struct connection *sendcon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
@@ -113,7 +118,22 @@ struct writequeue_entry {
int len;
int end;
int users;
+ bool dirty;
struct connection *con;
+ struct list_head msgs;
+ struct kref ref;
+};
+
+struct dlm_msg {
+ struct writequeue_entry *entry;
+ struct dlm_msg *orig_msg;
+ bool retransmit;
+ void *ppc;
+ int len;
+ int idx; /* new()/commit() idx exchange */
+
+ struct list_head list;
+ struct kref ref;
};
struct dlm_node_addr {
@@ -155,33 +175,23 @@ static void sctp_connect_to_sock(struct connection *con);
static void tcp_connect_to_sock(struct connection *con);
static void dlm_tcp_shutdown(struct connection *con);
-/* This is deliberately very simple because most clusters have simple
- sequential nodeids, so we should be able to go straight to a connection
- struct in the array */
-static inline int nodeid_hash(int nodeid)
+static struct connection *__find_con(int nodeid, int r)
{
- return nodeid & (CONN_HASH_SIZE-1);
-}
-
-static struct connection *__find_con(int nodeid)
-{
- int r, idx;
struct connection *con;
- r = nodeid_hash(nodeid);
-
- idx = srcu_read_lock(&connections_srcu);
hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
- if (con->nodeid == nodeid) {
- srcu_read_unlock(&connections_srcu, idx);
+ if (con->nodeid == nodeid)
return con;
- }
}
- srcu_read_unlock(&connections_srcu, idx);
return NULL;
}
+static bool tcp_eof_condition(struct connection *con)
+{
+ return atomic_read(&con->writequeue_cnt);
+}
+
static int dlm_con_init(struct connection *con, int nodeid)
{
con->rx_buflen = dlm_config.ci_buffer_size;
@@ -193,15 +203,23 @@ static int dlm_con_init(struct connection *con, int nodeid)
mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue);
spin_lock_init(&con->writequeue_lock);
+ atomic_set(&con->writequeue_cnt, 0);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
init_waitqueue_head(&con->shutdown_wait);
- if (dlm_config.ci_protocol == 0) {
+ switch (dlm_config.ci_protocol) {
+ case DLM_PROTO_TCP:
con->connect_action = tcp_connect_to_sock;
con->shutdown_action = dlm_tcp_shutdown;
- } else {
+ con->eof_condition = tcp_eof_condition;
+ break;
+ case DLM_PROTO_SCTP:
con->connect_action = sctp_connect_to_sock;
+ break;
+ default:
+ kfree(con->rx_buf);
+ return -EINVAL;
}
return 0;
@@ -216,7 +234,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
struct connection *con, *tmp;
int r, ret;
- con = __find_con(nodeid);
+ r = nodeid_hash(nodeid);
+ con = __find_con(nodeid, r);
if (con || !alloc)
return con;
@@ -230,8 +249,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL;
}
- r = nodeid_hash(nodeid);
-
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
@@ -239,7 +256,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
* under protection of connections_lock. If this is the case we
* abort our connection creation and return the existing connection.
*/
- tmp = __find_con(nodeid);
+ tmp = __find_con(nodeid, r);
if (tmp) {
spin_unlock(&connections_lock);
kfree(con->rx_buf);
@@ -256,15 +273,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
/* Loop round all connections */
static void foreach_conn(void (*conn_func)(struct connection *c))
{
- int i, idx;
+ int i;
struct connection *con;
- idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(con, &connection_hash[i], list)
conn_func(con);
}
- srcu_read_unlock(&connections_srcu, idx);
}
static struct dlm_node_addr *find_node_addr(int nodeid)
@@ -462,6 +477,9 @@ static void lowcomms_data_ready(struct sock *sk)
static void lowcomms_listen_data_ready(struct sock *sk)
{
+ if (!dlm_allow_conn)
+ return;
+
queue_work(recv_workqueue, &listen_con.rwork);
}
@@ -518,14 +536,21 @@ static void lowcomms_state_change(struct sock *sk)
int dlm_lowcomms_connect_node(int nodeid)
{
struct connection *con;
+ int idx;
if (nodeid == dlm_our_nodeid())
return 0;
+ idx = srcu_read_lock(&connections_srcu);
con = nodeid2con(nodeid, GFP_NOFS);
- if (!con)
+ if (!con) {
+ srcu_read_unlock(&connections_srcu, idx);
return -ENOMEM;
+ }
+
lowcomms_connect_sock(con);
+ srcu_read_unlock(&connections_srcu, idx);
+
return 0;
}
@@ -587,6 +612,22 @@ static void lowcomms_error_report(struct sock *sk)
dlm_config.ci_tcp_port, sk->sk_err,
sk->sk_err_soft);
}
+
+ /* below sendcon only handling */
+ if (test_bit(CF_IS_OTHERCON, &con->flags))
+ con = con->sendcon;
+
+ switch (sk->sk_err) {
+ case ECONNREFUSED:
+ set_bit(CF_DELAY_CONNECT, &con->flags);
+ break;
+ default:
+ break;
+ }
+
+ if (!test_and_set_bit(CF_RECONNECT, &con->flags))
+ queue_work(send_workqueue, &con->swork);
+
out:
read_unlock_bh(&sk->sk_callback_lock);
if (orig_report)
@@ -669,6 +710,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
}
+static void dlm_page_release(struct kref *kref)
+{
+ struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
+ ref);
+
+ __free_page(e->page);
+ kfree(e);
+}
+
+static void dlm_msg_release(struct kref *kref)
+{
+ struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
+
+ kref_put(&msg->entry->ref, dlm_page_release);
+ kfree(msg);
+}
+
+static void free_entry(struct writequeue_entry *e)
+{
+ struct dlm_msg *msg, *tmp;
+
+ list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+ if (msg->orig_msg) {
+ msg->orig_msg->retransmit = false;
+ kref_put(&msg->orig_msg->ref, dlm_msg_release);
+ }
+
+ list_del(&msg->list);
+ kref_put(&msg->ref, dlm_msg_release);
+ }
+
+ list_del(&e->list);
+ atomic_dec(&e->con->writequeue_cnt);
+ kref_put(&e->ref, dlm_page_release);
+}
+
static void dlm_close_sock(struct socket **sock)
{
if (*sock) {
@@ -683,6 +760,7 @@ static void close_connection(struct connection *con, bool and_other,
bool tx, bool rx)
{
bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
+ struct writequeue_entry *e;
if (tx && !closing && cancel_work_sync(&con->swork)) {
log_print("canceled swork for node %d", con->nodeid);
@@ -698,12 +776,35 @@ static void close_connection(struct connection *con, bool and_other,
if (con->othercon && and_other) {
/* Will only re-enter once. */
- close_connection(con->othercon, false, true, true);
+ close_connection(con->othercon, false, tx, rx);
+ }
+
+ /* if we send a writequeue entry only a half way, we drop the
+ * whole entry because reconnection and that we not start of the
+ * middle of a msg which will confuse the other end.
+ *
+ * we can always drop messages because retransmits, but what we
+ * cannot allow is to transmit half messages which may be processed
+ * at the other side.
+ *
+ * our policy is to start on a clean state when disconnects, we don't
+ * know what's send/received on transport layer in this case.
+ */
+ spin_lock(&con->writequeue_lock);
+ if (!list_empty(&con->writequeue)) {
+ e = list_first_entry(&con->writequeue, struct writequeue_entry,
+ list);
+ if (e->dirty)
+ free_entry(e);
}
+ spin_unlock(&con->writequeue_lock);
con->rx_leftover = 0;
con->retries = 0;
clear_bit(CF_CONNECTED, &con->flags);
+ clear_bit(CF_DELAY_CONNECT, &con->flags);
+ clear_bit(CF_RECONNECT, &con->flags);
+ clear_bit(CF_EOF, &con->flags);
mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
}
@@ -841,19 +942,26 @@ out_resched:
return -EAGAIN;
out_close:
- mutex_unlock(&con->sock_mutex);
- if (ret != -EAGAIN) {
- /* Reconnect when there is something to send */
- close_connection(con, false, true, false);
- if (ret == 0) {
- log_print("connection %p got EOF from %d",
- con, con->nodeid);
+ if (ret == 0) {
+ log_print("connection %p got EOF from %d",
+ con, con->nodeid);
+
+ if (con->eof_condition && con->eof_condition(con)) {
+ set_bit(CF_EOF, &con->flags);
+ mutex_unlock(&con->sock_mutex);
+ } else {
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, true, false);
+
/* handling for tcp shutdown */
clear_bit(CF_SHUTDOWN, &con->flags);
wake_up(&con->shutdown_wait);
- /* signal to breaking receive worker */
- ret = -1;
}
+
+ /* signal to breaking receive worker */
+ ret = -1;
+ } else {
+ mutex_unlock(&con->sock_mutex);
}
return ret;
}
@@ -864,16 +972,12 @@ static int accept_from_sock(struct listen_connection *con)
int result;
struct sockaddr_storage peeraddr;
struct socket *newsock;
- int len;
+ int len, idx;
int nodeid;
struct connection *newcon;
struct connection *addcon;
unsigned int mark;
- if (!dlm_allow_conn) {
- return -1;
- }
-
if (!con->sock)
return -ENOTCONN;
@@ -907,8 +1011,10 @@ static int accept_from_sock(struct listen_connection *con)
* the same time and the connections cross on the wire.
* In this case we store the incoming one in "othercon"
*/
+ idx = srcu_read_lock(&connections_srcu);
newcon = nodeid2con(nodeid, GFP_NOFS);
if (!newcon) {
+ srcu_read_unlock(&connections_srcu, idx);
result = -ENOMEM;
goto accept_err;
}
@@ -924,6 +1030,7 @@ static int accept_from_sock(struct listen_connection *con)
if (!othercon) {
log_print("failed to allocate incoming socket");
mutex_unlock(&newcon->sock_mutex);
+ srcu_read_unlock(&connections_srcu, idx);
result = -ENOMEM;
goto accept_err;
}
@@ -932,11 +1039,14 @@ static int accept_from_sock(struct listen_connection *con)
if (result < 0) {
kfree(othercon);
mutex_unlock(&newcon->sock_mutex);
+ srcu_read_unlock(&connections_srcu, idx);
goto accept_err;
}
lockdep_set_subclass(&othercon->sock_mutex, 1);
+ set_bit(CF_IS_OTHERCON, &othercon->flags);
newcon->othercon = othercon;
+ othercon->sendcon = newcon;
} else {
/* close other sock con if we have something new */
close_connection(othercon, false, true, false);
@@ -966,6 +1076,8 @@ static int accept_from_sock(struct listen_connection *con)
if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
queue_work(recv_workqueue, &addcon->rwork);
+ srcu_read_unlock(&connections_srcu, idx);
+
return 0;
accept_err:
@@ -977,12 +1089,6 @@ accept_err:
return result;
}
-static void free_entry(struct writequeue_entry *e)
-{
- __free_page(e->page);
- kfree(e);
-}
-
/*
* writequeue_entry_complete - try to delete and free write queue entry
* @e: write queue entry to try to delete
@@ -994,11 +1100,11 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
{
e->offset += completed;
e->len -= completed;
+ /* signal that page was half way transmitted */
+ e->dirty = true;
- if (e->len == 0 && e->users == 0) {
- list_del(&e->list);
+ if (e->len == 0 && e->users == 0)
free_entry(e);
- }
}
/*
@@ -1075,7 +1181,7 @@ static void sctp_connect_to_sock(struct connection *con)
make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
- log_print("connecting to %d", con->nodeid);
+ log_print_ratelimited("connecting to %d", con->nodeid);
/* Turn off Nagle's algorithm */
sctp_sock_set_nodelay(sock->sk);
@@ -1171,7 +1277,7 @@ static void tcp_connect_to_sock(struct connection *con)
make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
- log_print("connecting to %d", con->nodeid);
+ log_print_ratelimited("connecting to %d", con->nodeid);
/* Turn off Nagle's algorithm */
tcp_sock_set_nodelay(sock->sk);
@@ -1364,12 +1470,16 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
entry->con = con;
entry->users = 1;
+ kref_init(&entry->ref);
+ INIT_LIST_HEAD(&entry->msgs);
return entry;
}
static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
- gfp_t allocation, char **ppc)
+ gfp_t allocation, char **ppc,
+ void (*cb)(struct dlm_mhandle *mh),
+ struct dlm_mhandle *mh)
{
struct writequeue_entry *e;
@@ -1377,7 +1487,12 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
if (!list_empty(&con->writequeue)) {
e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
if (DLM_WQ_REMAIN_BYTES(e) >= len) {
+ kref_get(&e->ref);
+
*ppc = page_address(e->page) + e->end;
+ if (cb)
+ cb(mh);
+
e->end += len;
e->users++;
spin_unlock(&con->writequeue_lock);
@@ -1391,42 +1506,92 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
if (!e)
return NULL;
+ kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
+ atomic_inc(&con->writequeue_cnt);
spin_lock(&con->writequeue_lock);
+ if (cb)
+ cb(mh);
+
list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock);
return e;
};
-void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
+static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
+ gfp_t allocation, char **ppc,
+ void (*cb)(struct dlm_mhandle *mh),
+ struct dlm_mhandle *mh)
+{
+ struct writequeue_entry *e;
+ struct dlm_msg *msg;
+
+ msg = kzalloc(sizeof(*msg), allocation);
+ if (!msg)
+ return NULL;
+
+ kref_init(&msg->ref);
+
+ e = new_wq_entry(con, len, allocation, ppc, cb, mh);
+ if (!e) {
+ kfree(msg);
+ return NULL;
+ }
+
+ msg->ppc = *ppc;
+ msg->len = len;
+ msg->entry = e;
+
+ return msg;
+}
+
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
+ char **ppc, void (*cb)(struct dlm_mhandle *mh),
+ struct dlm_mhandle *mh)
{
struct connection *con;
+ struct dlm_msg *msg;
+ int idx;
- if (len > DEFAULT_BUFFER_SIZE ||
+ if (len > DLM_MAX_SOCKET_BUFSIZE ||
len < sizeof(struct dlm_header)) {
- BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
+ BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
log_print("failed to allocate a buffer of size %d", len);
WARN_ON(1);
return NULL;
}
+ idx = srcu_read_lock(&connections_srcu);
con = nodeid2con(nodeid, allocation);
- if (!con)
+ if (!con) {
+ srcu_read_unlock(&connections_srcu, idx);
return NULL;
+ }
- return new_wq_entry(con, len, allocation, ppc);
+ msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
+ if (!msg) {
+ srcu_read_unlock(&connections_srcu, idx);
+ return NULL;
+ }
+
+ /* we assume if successful commit must called */
+ msg->idx = idx;
+ return msg;
}
-void dlm_lowcomms_commit_buffer(void *mh)
+static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
{
- struct writequeue_entry *e = (struct writequeue_entry *)mh;
+ struct writequeue_entry *e = msg->entry;
struct connection *con = e->con;
int users;
spin_lock(&con->writequeue_lock);
+ kref_get(&msg->ref);
+ list_add(&msg->list, &e->msgs);
+
users = --e->users;
if (users)
goto out;
@@ -1442,6 +1607,42 @@ out:
return;
}
+void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
+{
+ _dlm_lowcomms_commit_msg(msg);
+ srcu_read_unlock(&connections_srcu, msg->idx);
+}
+
+void dlm_lowcomms_put_msg(struct dlm_msg *msg)
+{
+ kref_put(&msg->ref, dlm_msg_release);
+}
+
+/* does not held connections_srcu, usage workqueue only */
+int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
+{
+ struct dlm_msg *msg_resend;
+ char *ppc;
+
+ if (msg->retransmit)
+ return 1;
+
+ msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
+ GFP_ATOMIC, &ppc, NULL, NULL);
+ if (!msg_resend)
+ return -ENOMEM;
+
+ msg->retransmit = true;
+ kref_get(&msg->ref);
+ msg_resend->orig_msg = msg;
+
+ memcpy(ppc, msg->ppc, msg->len);
+ _dlm_lowcomms_commit_msg(msg_resend);
+ dlm_lowcomms_put_msg(msg_resend);
+
+ return 0;
+}
+
/* Send a message */
static void send_to_sock(struct connection *con)
{
@@ -1483,7 +1684,7 @@ static void send_to_sock(struct connection *con)
cond_resched();
goto out;
} else if (ret < 0)
- goto send_error;
+ goto out;
}
/* Don't starve people filling buffers */
@@ -1496,16 +1697,23 @@ static void send_to_sock(struct connection *con)
writequeue_entry_complete(e, ret);
}
spin_unlock(&con->writequeue_lock);
-out:
- mutex_unlock(&con->sock_mutex);
+
+ /* close if we got EOF */
+ if (test_and_clear_bit(CF_EOF, &con->flags)) {
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, false, true);
+
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ } else {
+ mutex_unlock(&con->sock_mutex);
+ }
+
return;
-send_error:
+out:
mutex_unlock(&con->sock_mutex);
- close_connection(con, false, false, true);
- /* Requeue the send work. When the work daemon runs again, it will try
- a new connection, then call this function again. */
- queue_work(send_workqueue, &con->swork);
return;
out_connect:
@@ -1520,7 +1728,6 @@ static void clean_one_writequeue(struct connection *con)
spin_lock(&con->writequeue_lock);
list_for_each_entry_safe(e, safe, &con->writequeue, list) {
- list_del(&e->list);
free_entry(e);
}
spin_unlock(&con->writequeue_lock);
@@ -1532,8 +1739,10 @@ int dlm_lowcomms_close(int nodeid)
{
struct connection *con;
struct dlm_node_addr *na;
+ int idx;
log_print("closing connection to node %d", nodeid);
+ idx = srcu_read_lock(&connections_srcu);
con = nodeid2con(nodeid, 0);
if (con) {
set_bit(CF_CLOSE, &con->flags);
@@ -1542,6 +1751,7 @@ int dlm_lowcomms_close(int nodeid)
if (con->othercon)
clean_one_writequeue(con->othercon);
}
+ srcu_read_unlock(&connections_srcu, idx);
spin_lock(&dlm_node_addrs_spin);
na = find_node_addr(nodeid);
@@ -1578,35 +1788,50 @@ static void process_send_sockets(struct work_struct *work)
{
struct connection *con = container_of(work, struct connection, swork);
+ WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
+
clear_bit(CF_WRITE_PENDING, &con->flags);
- if (con->sock == NULL) /* not mutex protected so check it inside too */
+
+ if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
+ close_connection(con, false, false, true);
+ dlm_midcomms_unack_msg_resend(con->nodeid);
+ }
+
+ if (con->sock == NULL) { /* not mutex protected so check it inside too */
+ if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
+ msleep(1000);
con->connect_action(con);
+ }
if (!list_empty(&con->writequeue))
send_to_sock(con);
}
static void work_stop(void)
{
- if (recv_workqueue)
+ if (recv_workqueue) {
destroy_workqueue(recv_workqueue);
- if (send_workqueue)
+ recv_workqueue = NULL;
+ }
+
+ if (send_workqueue) {
destroy_workqueue(send_workqueue);
+ send_workqueue = NULL;
+ }
}
static int work_start(void)
{
- recv_workqueue = alloc_workqueue("dlm_recv",
- WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
+ recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
if (!recv_workqueue) {
log_print("can't start dlm_recv");
return -ENOMEM;
}
- send_workqueue = alloc_workqueue("dlm_send",
- WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
+ send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
if (!send_workqueue) {
log_print("can't start dlm_send");
destroy_workqueue(recv_workqueue);
+ recv_workqueue = NULL;
return -ENOMEM;
}
@@ -1621,6 +1846,8 @@ static void shutdown_conn(struct connection *con)
void dlm_lowcomms_shutdown(void)
{
+ int idx;
+
/* Set all the flags to prevent any
* socket activity.
*/
@@ -1633,7 +1860,9 @@ void dlm_lowcomms_shutdown(void)
dlm_close_sock(&listen_con.sock);
+ idx = srcu_read_lock(&connections_srcu);
foreach_conn(shutdown_conn);
+ srcu_read_unlock(&connections_srcu, idx);
}
static void _stop_conn(struct connection *con, bool and_other)
@@ -1682,7 +1911,7 @@ static void free_conn(struct connection *con)
static void work_flush(void)
{
- int ok, idx;
+ int ok;
int i;
struct connection *con;
@@ -1693,7 +1922,6 @@ static void work_flush(void)
flush_workqueue(recv_workqueue);
if (send_workqueue)
flush_workqueue(send_workqueue);
- idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
hlist_for_each_entry_rcu(con, &connection_hash[i],
list) {
@@ -1707,14 +1935,17 @@ static void work_flush(void)
}
}
}
- srcu_read_unlock(&connections_srcu, idx);
} while (!ok);
}
void dlm_lowcomms_stop(void)
{
+ int idx;
+
+ idx = srcu_read_lock(&connections_srcu);
work_flush();
foreach_conn(free_conn);
+ srcu_read_unlock(&connections_srcu, idx);
work_stop();
deinit_local();
}
@@ -1738,15 +1969,24 @@ int dlm_lowcomms_start(void)
error = work_start();
if (error)
- goto fail;
+ goto fail_local;
dlm_allow_conn = 1;
/* Start listening */
- if (dlm_config.ci_protocol == 0)
+ switch (dlm_config.ci_protocol) {
+ case DLM_PROTO_TCP:
error = tcp_listen_for_all();
- else
+ break;
+ case DLM_PROTO_SCTP:
error = sctp_listen_for_all(&listen_con);
+ break;
+ default:
+ log_print("Invalid protocol identifier %d set",
+ dlm_config.ci_protocol);
+ error = -EINVAL;
+ break;
+ }
if (error)
goto fail_unlisten;
@@ -1755,6 +1995,9 @@ int dlm_lowcomms_start(void)
fail_unlisten:
dlm_allow_conn = 0;
dlm_close_sock(&listen_con.sock);
+ work_stop();
+fail_local:
+ deinit_local();
fail:
return error;
}