diff options
Diffstat (limited to 'fs/dlm/midcomms.c')
-rw-r--r-- | fs/dlm/midcomms.c | 126 |
1 files changed, 55 insertions, 71 deletions
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index c02c43e4980a..e1a0df67b566 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -136,7 +136,6 @@ #include <net/tcp.h> #include "dlm_internal.h" -#include "lockspace.h" #include "lowcomms.h" #include "config.h" #include "memory.h" @@ -149,12 +148,14 @@ /* 5 seconds wait to sync ending of dlm */ #define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000) #define DLM_VERSION_NOT_SET 0 +#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32 +#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8) struct midcomms_node { int nodeid; uint32_t version; - uint32_t seq_send; - uint32_t seq_next; + atomic_t seq_send; + atomic_t seq_next; /* These queues are unbound because we cannot drop any message in dlm. * We could send a fence signal for a specific node to the cluster * manager if queues hits some maximum value, however this handling @@ -166,7 +167,7 @@ struct midcomms_node { #define DLM_NODE_FLAG_CLOSE 1 #define DLM_NODE_FLAG_STOP_TX 2 #define DLM_NODE_FLAG_STOP_RX 3 -#define DLM_NODE_ULP_DELIVERED 4 + atomic_t ulp_delivered; unsigned long flags; wait_queue_head_t shutdown_wait; @@ -318,8 +319,9 @@ static void midcomms_node_reset(struct midcomms_node *node) { pr_debug("reset node %d\n", node->nodeid); - node->seq_next = DLM_SEQ_INIT; - node->seq_send = DLM_SEQ_INIT; + atomic_set(&node->seq_next, DLM_SEQ_INIT); + atomic_set(&node->seq_send, DLM_SEQ_INIT); + atomic_set(&node->ulp_delivered, 0); node->version = DLM_VERSION_NOT_SET; node->flags = 0; @@ -394,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq) return 0; } +static void dlm_send_ack_threshold(struct midcomms_node *node, + uint32_t threshold) +{ + uint32_t oval, nval; + bool send_ack; + + /* let only send one user trigger threshold to send ack back */ + do { + oval = atomic_read(&node->ulp_delivered); + send_ack = (oval > threshold); + /* abort if threshold is not reached */ + if (!send_ack) + break; + + nval = 0; + /* try to reset ulp_delivered counter */ + } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval); + + if (send_ack) + dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); +} + static int dlm_send_fin(struct midcomms_node *node, void (*ack_rcv)(struct midcomms_node *node)) { @@ -493,9 +517,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, struct midcomms_node *node, uint32_t seq) { - if (seq == node->seq_next) { - node->seq_next++; + bool is_expected_seq; + uint32_t oval, nval; + do { + oval = atomic_read(&node->seq_next); + is_expected_seq = (oval == seq); + if (!is_expected_seq) + break; + + nval = oval + 1; + } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval); + + if (is_expected_seq) { switch (p->header.h_cmd) { case DLM_FIN: spin_lock(&node->state_lock); @@ -504,7 +538,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, switch (node->state) { case DLM_ESTABLISHED: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); /* passive shutdown DLM_LAST_ACK case 1 * additional we check if the node is used by @@ -523,14 +557,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, } break; case DLM_FIN_WAIT1: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); node->state = DLM_CLOSING; set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); break; case DLM_FIN_WAIT2: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); midcomms_node_reset(node); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -551,18 +585,20 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); dlm_receive_buffer_3_2_trace(seq, p); dlm_receive_buffer(p, node->nodeid); - set_bit(DLM_NODE_ULP_DELIVERED, &node->flags); + atomic_inc(&node->ulp_delivered); + /* unlikely case to send ack back when we don't transmit */ + dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD); break; } } else { /* retry to ack message which we already have by sending back * current node->seq_next number as ack. */ - if (seq < node->seq_next) - dlm_send_ack(node->nodeid, node->seq_next); + if (seq < oval) + dlm_send_ack(node->nodeid, oval); log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d", - seq, node->seq_next, node->nodeid); + seq, oval, node->nodeid); } } @@ -960,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) return ret; } -void dlm_midcomms_receive_done(int nodeid) -{ - struct midcomms_node *node; - int idx; - - idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* old protocol, we do nothing */ - switch (node->version) { - case DLM_VERSION_3_2: - break; - default: - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* do nothing if we didn't delivered stateful to ulp */ - if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED, - &node->flags)) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - spin_lock(&node->state_lock); - /* we only ack if state is ESTABLISHED */ - switch (node->state) { - case DLM_ESTABLISHED: - spin_unlock(&node->state_lock); - dlm_send_ack(node->nodeid, node->seq_next); - break; - default: - spin_unlock(&node->state_lock); - /* do nothing FIN has it's own ack send */ - break; - } - srcu_read_unlock(&nodes_srcu, idx); -} - void dlm_midcomms_unack_msg_resend(int nodeid) { struct midcomms_node *node; @@ -1059,7 +1052,7 @@ static void midcomms_new_msg_cb(void *data) list_add_tail_rcu(&mh->list, &mh->node->send_queue); spin_unlock_bh(&mh->node->send_queue_lock); - mh->seq = mh->node->seq_send++; + mh->seq = atomic_fetch_inc(&mh->node->seq_send); } static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, @@ -1133,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, goto err; } + /* send ack back if necessary */ + dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD); break; default: dlm_free_mhandle(mh); @@ -1281,9 +1276,6 @@ void dlm_midcomms_add_member(int nodeid) struct midcomms_node *node; int idx; - if (nodeid == dlm_our_nodeid()) - return; - idx = srcu_read_lock(&nodes_srcu); node = nodeid2node(nodeid, GFP_NOFS); if (!node) { @@ -1329,9 +1321,6 @@ void dlm_midcomms_remove_member(int nodeid) struct midcomms_node *node; int idx; - if (nodeid == dlm_our_nodeid()) - return; - idx = srcu_read_lock(&nodes_srcu); node = nodeid2node(nodeid, 0); if (!node) { @@ -1488,11 +1477,6 @@ int dlm_midcomms_close(int nodeid) struct midcomms_node *node; int idx, ret; - if (nodeid == dlm_our_nodeid()) - return 0; - - dlm_stop_lockspaces_check(); - idx = srcu_read_lock(&nodes_srcu); /* Abort pending close/remove operation */ node = nodeid2node(nodeid, 0); @@ -1542,7 +1526,7 @@ static void midcomms_new_rawmsg_cb(void *data) switch (h->h_cmd) { case DLM_OPTS: if (!h->u.h_seq) - h->u.h_seq = cpu_to_le32(rd->node->seq_send++); + h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send)); break; default: break; |