summaryrefslogtreecommitdiff
path: root/fs/dlm/midcomms.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/midcomms.c')
-rw-r--r--fs/dlm/midcomms.c126
1 files changed, 55 insertions, 71 deletions
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index c02c43e4980a..e1a0df67b566 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -136,7 +136,6 @@
#include <net/tcp.h>
#include "dlm_internal.h"
-#include "lockspace.h"
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
@@ -149,12 +148,14 @@
/* 5 seconds wait to sync ending of dlm */
#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000)
#define DLM_VERSION_NOT_SET 0
+#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
+#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
struct midcomms_node {
int nodeid;
uint32_t version;
- uint32_t seq_send;
- uint32_t seq_next;
+ atomic_t seq_send;
+ atomic_t seq_next;
/* These queues are unbound because we cannot drop any message in dlm.
* We could send a fence signal for a specific node to the cluster
* manager if queues hits some maximum value, however this handling
@@ -166,7 +167,7 @@ struct midcomms_node {
#define DLM_NODE_FLAG_CLOSE 1
#define DLM_NODE_FLAG_STOP_TX 2
#define DLM_NODE_FLAG_STOP_RX 3
-#define DLM_NODE_ULP_DELIVERED 4
+ atomic_t ulp_delivered;
unsigned long flags;
wait_queue_head_t shutdown_wait;
@@ -318,8 +319,9 @@ static void midcomms_node_reset(struct midcomms_node *node)
{
pr_debug("reset node %d\n", node->nodeid);
- node->seq_next = DLM_SEQ_INIT;
- node->seq_send = DLM_SEQ_INIT;
+ atomic_set(&node->seq_next, DLM_SEQ_INIT);
+ atomic_set(&node->seq_send, DLM_SEQ_INIT);
+ atomic_set(&node->ulp_delivered, 0);
node->version = DLM_VERSION_NOT_SET;
node->flags = 0;
@@ -394,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
return 0;
}
+static void dlm_send_ack_threshold(struct midcomms_node *node,
+ uint32_t threshold)
+{
+ uint32_t oval, nval;
+ bool send_ack;
+
+ /* let only send one user trigger threshold to send ack back */
+ do {
+ oval = atomic_read(&node->ulp_delivered);
+ send_ack = (oval > threshold);
+ /* abort if threshold is not reached */
+ if (!send_ack)
+ break;
+
+ nval = 0;
+ /* try to reset ulp_delivered counter */
+ } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
+
+ if (send_ack)
+ dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
+}
+
static int dlm_send_fin(struct midcomms_node *node,
void (*ack_rcv)(struct midcomms_node *node))
{
@@ -493,9 +517,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
struct midcomms_node *node,
uint32_t seq)
{
- if (seq == node->seq_next) {
- node->seq_next++;
+ bool is_expected_seq;
+ uint32_t oval, nval;
+ do {
+ oval = atomic_read(&node->seq_next);
+ is_expected_seq = (oval == seq);
+ if (!is_expected_seq)
+ break;
+
+ nval = oval + 1;
+ } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
+
+ if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
spin_lock(&node->state_lock);
@@ -504,7 +538,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
switch (node->state) {
case DLM_ESTABLISHED:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
/* passive shutdown DLM_LAST_ACK case 1
* additional we check if the node is used by
@@ -523,14 +557,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
}
break;
case DLM_FIN_WAIT1:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
node->state = DLM_CLOSING;
set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
case DLM_FIN_WAIT2:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
midcomms_node_reset(node);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -551,18 +585,20 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer_3_2_trace(seq, p);
dlm_receive_buffer(p, node->nodeid);
- set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
+ atomic_inc(&node->ulp_delivered);
+ /* unlikely case to send ack back when we don't transmit */
+ dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
break;
}
} else {
/* retry to ack message which we already have by sending back
* current node->seq_next number as ack.
*/
- if (seq < node->seq_next)
- dlm_send_ack(node->nodeid, node->seq_next);
+ if (seq < oval)
+ dlm_send_ack(node->nodeid, oval);
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
- seq, node->seq_next, node->nodeid);
+ seq, oval, node->nodeid);
}
}
@@ -960,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
return ret;
}
-void dlm_midcomms_receive_done(int nodeid)
-{
- struct midcomms_node *node;
- int idx;
-
- idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, 0);
- if (!node) {
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- /* old protocol, we do nothing */
- switch (node->version) {
- case DLM_VERSION_3_2:
- break;
- default:
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- /* do nothing if we didn't delivered stateful to ulp */
- if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
- &node->flags)) {
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- spin_lock(&node->state_lock);
- /* we only ack if state is ESTABLISHED */
- switch (node->state) {
- case DLM_ESTABLISHED:
- spin_unlock(&node->state_lock);
- dlm_send_ack(node->nodeid, node->seq_next);
- break;
- default:
- spin_unlock(&node->state_lock);
- /* do nothing FIN has it's own ack send */
- break;
- }
- srcu_read_unlock(&nodes_srcu, idx);
-}
-
void dlm_midcomms_unack_msg_resend(int nodeid)
{
struct midcomms_node *node;
@@ -1059,7 +1052,7 @@ static void midcomms_new_msg_cb(void *data)
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
spin_unlock_bh(&mh->node->send_queue_lock);
- mh->seq = mh->node->seq_send++;
+ mh->seq = atomic_fetch_inc(&mh->node->seq_send);
}
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
@@ -1133,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
goto err;
}
+ /* send ack back if necessary */
+ dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
break;
default:
dlm_free_mhandle(mh);
@@ -1281,9 +1276,6 @@ void dlm_midcomms_add_member(int nodeid)
struct midcomms_node *node;
int idx;
- if (nodeid == dlm_our_nodeid())
- return;
-
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, GFP_NOFS);
if (!node) {
@@ -1329,9 +1321,6 @@ void dlm_midcomms_remove_member(int nodeid)
struct midcomms_node *node;
int idx;
- if (nodeid == dlm_our_nodeid())
- return;
-
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
@@ -1488,11 +1477,6 @@ int dlm_midcomms_close(int nodeid)
struct midcomms_node *node;
int idx, ret;
- if (nodeid == dlm_our_nodeid())
- return 0;
-
- dlm_stop_lockspaces_check();
-
idx = srcu_read_lock(&nodes_srcu);
/* Abort pending close/remove operation */
node = nodeid2node(nodeid, 0);
@@ -1542,7 +1526,7 @@ static void midcomms_new_rawmsg_cb(void *data)
switch (h->h_cmd) {
case DLM_OPTS:
if (!h->u.h_seq)
- h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
+ h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
break;
default:
break;