diff options
Diffstat (limited to 'fs/dlm/lowcomms.c')
-rw-r--r-- | fs/dlm/lowcomms.c | 411 |
1 files changed, 327 insertions, 84 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 166e36fcf3e4..0ea9ae35da0b 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -59,7 +59,6 @@ #include "config.h" #define NEEDED_RMEM (4*1024*1024) -#define CONN_HASH_SIZE 32 /* Number of messages to send before rescheduling */ #define MAX_SEND_MSG_COUNT 25 @@ -79,14 +78,20 @@ struct connection { #define CF_CLOSING 8 #define CF_SHUTDOWN 9 #define CF_CONNECTED 10 +#define CF_RECONNECT 11 +#define CF_DELAY_CONNECT 12 +#define CF_EOF 13 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; + atomic_t writequeue_cnt; void (*connect_action) (struct connection *); /* What to do to connect */ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ + bool (*eof_condition)(struct connection *con); /* What to do to eof check */ int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; struct connection *othercon; + struct connection *sendcon; struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ @@ -113,7 +118,22 @@ struct writequeue_entry { int len; int end; int users; + bool dirty; struct connection *con; + struct list_head msgs; + struct kref ref; +}; + +struct dlm_msg { + struct writequeue_entry *entry; + struct dlm_msg *orig_msg; + bool retransmit; + void *ppc; + int len; + int idx; /* new()/commit() idx exchange */ + + struct list_head list; + struct kref ref; }; struct dlm_node_addr { @@ -155,33 +175,23 @@ static void sctp_connect_to_sock(struct connection *con); static void tcp_connect_to_sock(struct connection *con); static void dlm_tcp_shutdown(struct connection *con); -/* This is deliberately very simple because most clusters have simple - sequential nodeids, so we should be able to go straight to a connection - struct in the array */ -static inline int nodeid_hash(int nodeid) +static struct connection *__find_con(int nodeid, int r) { - return nodeid & (CONN_HASH_SIZE-1); -} - -static struct connection *__find_con(int nodeid) -{ - int r, idx; struct connection *con; - r = nodeid_hash(nodeid); - - idx = srcu_read_lock(&connections_srcu); hlist_for_each_entry_rcu(con, &connection_hash[r], list) { - if (con->nodeid == nodeid) { - srcu_read_unlock(&connections_srcu, idx); + if (con->nodeid == nodeid) return con; - } } - srcu_read_unlock(&connections_srcu, idx); return NULL; } +static bool tcp_eof_condition(struct connection *con) +{ + return atomic_read(&con->writequeue_cnt); +} + static int dlm_con_init(struct connection *con, int nodeid) { con->rx_buflen = dlm_config.ci_buffer_size; @@ -193,15 +203,23 @@ static int dlm_con_init(struct connection *con, int nodeid) mutex_init(&con->sock_mutex); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); + atomic_set(&con->writequeue_cnt, 0); INIT_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); init_waitqueue_head(&con->shutdown_wait); - if (dlm_config.ci_protocol == 0) { + switch (dlm_config.ci_protocol) { + case DLM_PROTO_TCP: con->connect_action = tcp_connect_to_sock; con->shutdown_action = dlm_tcp_shutdown; - } else { + con->eof_condition = tcp_eof_condition; + break; + case DLM_PROTO_SCTP: con->connect_action = sctp_connect_to_sock; + break; + default: + kfree(con->rx_buf); + return -EINVAL; } return 0; @@ -216,7 +234,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) struct connection *con, *tmp; int r, ret; - con = __find_con(nodeid); + r = nodeid_hash(nodeid); + con = __find_con(nodeid, r); if (con || !alloc) return con; @@ -230,8 +249,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) return NULL; } - r = nodeid_hash(nodeid); - spin_lock(&connections_lock); /* Because multiple workqueues/threads calls this function it can * race on multiple cpu's. Instead of locking hot path __find_con() @@ -239,7 +256,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) * under protection of connections_lock. If this is the case we * abort our connection creation and return the existing connection. */ - tmp = __find_con(nodeid); + tmp = __find_con(nodeid, r); if (tmp) { spin_unlock(&connections_lock); kfree(con->rx_buf); @@ -256,15 +273,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) /* Loop round all connections */ static void foreach_conn(void (*conn_func)(struct connection *c)) { - int i, idx; + int i; struct connection *con; - idx = srcu_read_lock(&connections_srcu); for (i = 0; i < CONN_HASH_SIZE; i++) { hlist_for_each_entry_rcu(con, &connection_hash[i], list) conn_func(con); } - srcu_read_unlock(&connections_srcu, idx); } static struct dlm_node_addr *find_node_addr(int nodeid) @@ -462,6 +477,9 @@ static void lowcomms_data_ready(struct sock *sk) static void lowcomms_listen_data_ready(struct sock *sk) { + if (!dlm_allow_conn) + return; + queue_work(recv_workqueue, &listen_con.rwork); } @@ -518,14 +536,21 @@ static void lowcomms_state_change(struct sock *sk) int dlm_lowcomms_connect_node(int nodeid) { struct connection *con; + int idx; if (nodeid == dlm_our_nodeid()) return 0; + idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, GFP_NOFS); - if (!con) + if (!con) { + srcu_read_unlock(&connections_srcu, idx); return -ENOMEM; + } + lowcomms_connect_sock(con); + srcu_read_unlock(&connections_srcu, idx); + return 0; } @@ -587,6 +612,22 @@ static void lowcomms_error_report(struct sock *sk) dlm_config.ci_tcp_port, sk->sk_err, sk->sk_err_soft); } + + /* below sendcon only handling */ + if (test_bit(CF_IS_OTHERCON, &con->flags)) + con = con->sendcon; + + switch (sk->sk_err) { + case ECONNREFUSED: + set_bit(CF_DELAY_CONNECT, &con->flags); + break; + default: + break; + } + + if (!test_and_set_bit(CF_RECONNECT, &con->flags)) + queue_work(send_workqueue, &con->swork); + out: read_unlock_bh(&sk->sk_callback_lock); if (orig_report) @@ -669,6 +710,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); } +static void dlm_page_release(struct kref *kref) +{ + struct writequeue_entry *e = container_of(kref, struct writequeue_entry, + ref); + + __free_page(e->page); + kfree(e); +} + +static void dlm_msg_release(struct kref *kref) +{ + struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); + + kref_put(&msg->entry->ref, dlm_page_release); + kfree(msg); +} + +static void free_entry(struct writequeue_entry *e) +{ + struct dlm_msg *msg, *tmp; + + list_for_each_entry_safe(msg, tmp, &e->msgs, list) { + if (msg->orig_msg) { + msg->orig_msg->retransmit = false; + kref_put(&msg->orig_msg->ref, dlm_msg_release); + } + + list_del(&msg->list); + kref_put(&msg->ref, dlm_msg_release); + } + + list_del(&e->list); + atomic_dec(&e->con->writequeue_cnt); + kref_put(&e->ref, dlm_page_release); +} + static void dlm_close_sock(struct socket **sock) { if (*sock) { @@ -683,6 +760,7 @@ static void close_connection(struct connection *con, bool and_other, bool tx, bool rx) { bool closing = test_and_set_bit(CF_CLOSING, &con->flags); + struct writequeue_entry *e; if (tx && !closing && cancel_work_sync(&con->swork)) { log_print("canceled swork for node %d", con->nodeid); @@ -698,12 +776,35 @@ static void close_connection(struct connection *con, bool and_other, if (con->othercon && and_other) { /* Will only re-enter once. */ - close_connection(con->othercon, false, true, true); + close_connection(con->othercon, false, tx, rx); + } + + /* if we send a writequeue entry only a half way, we drop the + * whole entry because reconnection and that we not start of the + * middle of a msg which will confuse the other end. + * + * we can always drop messages because retransmits, but what we + * cannot allow is to transmit half messages which may be processed + * at the other side. + * + * our policy is to start on a clean state when disconnects, we don't + * know what's send/received on transport layer in this case. + */ + spin_lock(&con->writequeue_lock); + if (!list_empty(&con->writequeue)) { + e = list_first_entry(&con->writequeue, struct writequeue_entry, + list); + if (e->dirty) + free_entry(e); } + spin_unlock(&con->writequeue_lock); con->rx_leftover = 0; con->retries = 0; clear_bit(CF_CONNECTED, &con->flags); + clear_bit(CF_DELAY_CONNECT, &con->flags); + clear_bit(CF_RECONNECT, &con->flags); + clear_bit(CF_EOF, &con->flags); mutex_unlock(&con->sock_mutex); clear_bit(CF_CLOSING, &con->flags); } @@ -841,19 +942,26 @@ out_resched: return -EAGAIN; out_close: - mutex_unlock(&con->sock_mutex); - if (ret != -EAGAIN) { - /* Reconnect when there is something to send */ - close_connection(con, false, true, false); - if (ret == 0) { - log_print("connection %p got EOF from %d", - con, con->nodeid); + if (ret == 0) { + log_print("connection %p got EOF from %d", + con, con->nodeid); + + if (con->eof_condition && con->eof_condition(con)) { + set_bit(CF_EOF, &con->flags); + mutex_unlock(&con->sock_mutex); + } else { + mutex_unlock(&con->sock_mutex); + close_connection(con, false, true, false); + /* handling for tcp shutdown */ clear_bit(CF_SHUTDOWN, &con->flags); wake_up(&con->shutdown_wait); - /* signal to breaking receive worker */ - ret = -1; } + + /* signal to breaking receive worker */ + ret = -1; + } else { + mutex_unlock(&con->sock_mutex); } return ret; } @@ -864,16 +972,12 @@ static int accept_from_sock(struct listen_connection *con) int result; struct sockaddr_storage peeraddr; struct socket *newsock; - int len; + int len, idx; int nodeid; struct connection *newcon; struct connection *addcon; unsigned int mark; - if (!dlm_allow_conn) { - return -1; - } - if (!con->sock) return -ENOTCONN; @@ -907,8 +1011,10 @@ static int accept_from_sock(struct listen_connection *con) * the same time and the connections cross on the wire. * In this case we store the incoming one in "othercon" */ + idx = srcu_read_lock(&connections_srcu); newcon = nodeid2con(nodeid, GFP_NOFS); if (!newcon) { + srcu_read_unlock(&connections_srcu, idx); result = -ENOMEM; goto accept_err; } @@ -924,6 +1030,7 @@ static int accept_from_sock(struct listen_connection *con) if (!othercon) { log_print("failed to allocate incoming socket"); mutex_unlock(&newcon->sock_mutex); + srcu_read_unlock(&connections_srcu, idx); result = -ENOMEM; goto accept_err; } @@ -932,11 +1039,14 @@ static int accept_from_sock(struct listen_connection *con) if (result < 0) { kfree(othercon); mutex_unlock(&newcon->sock_mutex); + srcu_read_unlock(&connections_srcu, idx); goto accept_err; } lockdep_set_subclass(&othercon->sock_mutex, 1); + set_bit(CF_IS_OTHERCON, &othercon->flags); newcon->othercon = othercon; + othercon->sendcon = newcon; } else { /* close other sock con if we have something new */ close_connection(othercon, false, true, false); @@ -966,6 +1076,8 @@ static int accept_from_sock(struct listen_connection *con) if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) queue_work(recv_workqueue, &addcon->rwork); + srcu_read_unlock(&connections_srcu, idx); + return 0; accept_err: @@ -977,12 +1089,6 @@ accept_err: return result; } -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); -} - /* * writequeue_entry_complete - try to delete and free write queue entry * @e: write queue entry to try to delete @@ -994,11 +1100,11 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed) { e->offset += completed; e->len -= completed; + /* signal that page was half way transmitted */ + e->dirty = true; - if (e->len == 0 && e->users == 0) { - list_del(&e->list); + if (e->len == 0 && e->users == 0) free_entry(e); - } } /* @@ -1075,7 +1181,7 @@ static void sctp_connect_to_sock(struct connection *con) make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); - log_print("connecting to %d", con->nodeid); + log_print_ratelimited("connecting to %d", con->nodeid); /* Turn off Nagle's algorithm */ sctp_sock_set_nodelay(sock->sk); @@ -1171,7 +1277,7 @@ static void tcp_connect_to_sock(struct connection *con) make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); - log_print("connecting to %d", con->nodeid); + log_print_ratelimited("connecting to %d", con->nodeid); /* Turn off Nagle's algorithm */ tcp_sock_set_nodelay(sock->sk); @@ -1364,12 +1470,16 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, entry->con = con; entry->users = 1; + kref_init(&entry->ref); + INIT_LIST_HEAD(&entry->msgs); return entry; } static struct writequeue_entry *new_wq_entry(struct connection *con, int len, - gfp_t allocation, char **ppc) + gfp_t allocation, char **ppc, + void (*cb)(struct dlm_mhandle *mh), + struct dlm_mhandle *mh) { struct writequeue_entry *e; @@ -1377,7 +1487,12 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, if (!list_empty(&con->writequeue)) { e = list_last_entry(&con->writequeue, struct writequeue_entry, list); if (DLM_WQ_REMAIN_BYTES(e) >= len) { + kref_get(&e->ref); + *ppc = page_address(e->page) + e->end; + if (cb) + cb(mh); + e->end += len; e->users++; spin_unlock(&con->writequeue_lock); @@ -1391,42 +1506,92 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, if (!e) return NULL; + kref_get(&e->ref); *ppc = page_address(e->page); e->end += len; + atomic_inc(&con->writequeue_cnt); spin_lock(&con->writequeue_lock); + if (cb) + cb(mh); + list_add_tail(&e->list, &con->writequeue); spin_unlock(&con->writequeue_lock); return e; }; -void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) +static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, + gfp_t allocation, char **ppc, + void (*cb)(struct dlm_mhandle *mh), + struct dlm_mhandle *mh) +{ + struct writequeue_entry *e; + struct dlm_msg *msg; + + msg = kzalloc(sizeof(*msg), allocation); + if (!msg) + return NULL; + + kref_init(&msg->ref); + + e = new_wq_entry(con, len, allocation, ppc, cb, mh); + if (!e) { + kfree(msg); + return NULL; + } + + msg->ppc = *ppc; + msg->len = len; + msg->entry = e; + + return msg; +} + +struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, + char **ppc, void (*cb)(struct dlm_mhandle *mh), + struct dlm_mhandle *mh) { struct connection *con; + struct dlm_msg *msg; + int idx; - if (len > DEFAULT_BUFFER_SIZE || + if (len > DLM_MAX_SOCKET_BUFSIZE || len < sizeof(struct dlm_header)) { - BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE); + BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); log_print("failed to allocate a buffer of size %d", len); WARN_ON(1); return NULL; } + idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, allocation); - if (!con) + if (!con) { + srcu_read_unlock(&connections_srcu, idx); return NULL; + } - return new_wq_entry(con, len, allocation, ppc); + msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh); + if (!msg) { + srcu_read_unlock(&connections_srcu, idx); + return NULL; + } + + /* we assume if successful commit must called */ + msg->idx = idx; + return msg; } -void dlm_lowcomms_commit_buffer(void *mh) +static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) { - struct writequeue_entry *e = (struct writequeue_entry *)mh; + struct writequeue_entry *e = msg->entry; struct connection *con = e->con; int users; spin_lock(&con->writequeue_lock); + kref_get(&msg->ref); + list_add(&msg->list, &e->msgs); + users = --e->users; if (users) goto out; @@ -1442,6 +1607,42 @@ out: return; } +void dlm_lowcomms_commit_msg(struct dlm_msg *msg) +{ + _dlm_lowcomms_commit_msg(msg); + srcu_read_unlock(&connections_srcu, msg->idx); +} + +void dlm_lowcomms_put_msg(struct dlm_msg *msg) +{ + kref_put(&msg->ref, dlm_msg_release); +} + +/* does not held connections_srcu, usage workqueue only */ +int dlm_lowcomms_resend_msg(struct dlm_msg *msg) +{ + struct dlm_msg *msg_resend; + char *ppc; + + if (msg->retransmit) + return 1; + + msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, + GFP_ATOMIC, &ppc, NULL, NULL); + if (!msg_resend) + return -ENOMEM; + + msg->retransmit = true; + kref_get(&msg->ref); + msg_resend->orig_msg = msg; + + memcpy(ppc, msg->ppc, msg->len); + _dlm_lowcomms_commit_msg(msg_resend); + dlm_lowcomms_put_msg(msg_resend); + + return 0; +} + /* Send a message */ static void send_to_sock(struct connection *con) { @@ -1483,7 +1684,7 @@ static void send_to_sock(struct connection *con) cond_resched(); goto out; } else if (ret < 0) - goto send_error; + goto out; } /* Don't starve people filling buffers */ @@ -1496,16 +1697,23 @@ static void send_to_sock(struct connection *con) writequeue_entry_complete(e, ret); } spin_unlock(&con->writequeue_lock); -out: - mutex_unlock(&con->sock_mutex); + + /* close if we got EOF */ + if (test_and_clear_bit(CF_EOF, &con->flags)) { + mutex_unlock(&con->sock_mutex); + close_connection(con, false, false, true); + + /* handling for tcp shutdown */ + clear_bit(CF_SHUTDOWN, &con->flags); + wake_up(&con->shutdown_wait); + } else { + mutex_unlock(&con->sock_mutex); + } + return; -send_error: +out: mutex_unlock(&con->sock_mutex); - close_connection(con, false, false, true); - /* Requeue the send work. When the work daemon runs again, it will try - a new connection, then call this function again. */ - queue_work(send_workqueue, &con->swork); return; out_connect: @@ -1520,7 +1728,6 @@ static void clean_one_writequeue(struct connection *con) spin_lock(&con->writequeue_lock); list_for_each_entry_safe(e, safe, &con->writequeue, list) { - list_del(&e->list); free_entry(e); } spin_unlock(&con->writequeue_lock); @@ -1532,8 +1739,10 @@ int dlm_lowcomms_close(int nodeid) { struct connection *con; struct dlm_node_addr *na; + int idx; log_print("closing connection to node %d", nodeid); + idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, 0); if (con) { set_bit(CF_CLOSE, &con->flags); @@ -1542,6 +1751,7 @@ int dlm_lowcomms_close(int nodeid) if (con->othercon) clean_one_writequeue(con->othercon); } + srcu_read_unlock(&connections_srcu, idx); spin_lock(&dlm_node_addrs_spin); na = find_node_addr(nodeid); @@ -1578,35 +1788,50 @@ static void process_send_sockets(struct work_struct *work) { struct connection *con = container_of(work, struct connection, swork); + WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags)); + clear_bit(CF_WRITE_PENDING, &con->flags); - if (con->sock == NULL) /* not mutex protected so check it inside too */ + + if (test_and_clear_bit(CF_RECONNECT, &con->flags)) { + close_connection(con, false, false, true); + dlm_midcomms_unack_msg_resend(con->nodeid); + } + + if (con->sock == NULL) { /* not mutex protected so check it inside too */ + if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) + msleep(1000); con->connect_action(con); + } if (!list_empty(&con->writequeue)) send_to_sock(con); } static void work_stop(void) { - if (recv_workqueue) + if (recv_workqueue) { destroy_workqueue(recv_workqueue); - if (send_workqueue) + recv_workqueue = NULL; + } + + if (send_workqueue) { destroy_workqueue(send_workqueue); + send_workqueue = NULL; + } } static int work_start(void) { - recv_workqueue = alloc_workqueue("dlm_recv", - WQ_UNBOUND | WQ_MEM_RECLAIM, 1); + recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM); if (!recv_workqueue) { log_print("can't start dlm_recv"); return -ENOMEM; } - send_workqueue = alloc_workqueue("dlm_send", - WQ_UNBOUND | WQ_MEM_RECLAIM, 1); + send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM); if (!send_workqueue) { log_print("can't start dlm_send"); destroy_workqueue(recv_workqueue); + recv_workqueue = NULL; return -ENOMEM; } @@ -1621,6 +1846,8 @@ static void shutdown_conn(struct connection *con) void dlm_lowcomms_shutdown(void) { + int idx; + /* Set all the flags to prevent any * socket activity. */ @@ -1633,7 +1860,9 @@ void dlm_lowcomms_shutdown(void) dlm_close_sock(&listen_con.sock); + idx = srcu_read_lock(&connections_srcu); foreach_conn(shutdown_conn); + srcu_read_unlock(&connections_srcu, idx); } static void _stop_conn(struct connection *con, bool and_other) @@ -1682,7 +1911,7 @@ static void free_conn(struct connection *con) static void work_flush(void) { - int ok, idx; + int ok; int i; struct connection *con; @@ -1693,7 +1922,6 @@ static void work_flush(void) flush_workqueue(recv_workqueue); if (send_workqueue) flush_workqueue(send_workqueue); - idx = srcu_read_lock(&connections_srcu); for (i = 0; i < CONN_HASH_SIZE && ok; i++) { hlist_for_each_entry_rcu(con, &connection_hash[i], list) { @@ -1707,14 +1935,17 @@ static void work_flush(void) } } } - srcu_read_unlock(&connections_srcu, idx); } while (!ok); } void dlm_lowcomms_stop(void) { + int idx; + + idx = srcu_read_lock(&connections_srcu); work_flush(); foreach_conn(free_conn); + srcu_read_unlock(&connections_srcu, idx); work_stop(); deinit_local(); } @@ -1738,15 +1969,24 @@ int dlm_lowcomms_start(void) error = work_start(); if (error) - goto fail; + goto fail_local; dlm_allow_conn = 1; /* Start listening */ - if (dlm_config.ci_protocol == 0) + switch (dlm_config.ci_protocol) { + case DLM_PROTO_TCP: error = tcp_listen_for_all(); - else + break; + case DLM_PROTO_SCTP: error = sctp_listen_for_all(&listen_con); + break; + default: + log_print("Invalid protocol identifier %d set", + dlm_config.ci_protocol); + error = -EINVAL; + break; + } if (error) goto fail_unlisten; @@ -1755,6 +1995,9 @@ int dlm_lowcomms_start(void) fail_unlisten: dlm_allow_conn = 0; dlm_close_sock(&listen_con.sock); + work_stop(); +fail_local: + deinit_local(); fail: return error; } |