From 9945e8043ef9273cfb633d930e2a5a9116009b09 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 15 Oct 2015 14:52:40 -0400 Subject: tipc: limit usage of temporary skb list during packet reception During packet reception, the function tipc_link_rcv() adds its accepted packets to a temporary buffer queue, before finally splicing this queue into the lock protected input queue that will be delivered up to the socket layer. The purpose is to reduce potential contention on the input queue lock. However, since the vast majority of packets arrive in sequence, they will anyway be added one by one to the input queue, and the use of the temporary queue becomes a sub-optimization. The only case where this queue makes sense is when unpacking buffers from a bundle packet; here we want to avoid dozens of small buffers to be added individually to the lock-protected input queue in a tight loop. In this commit, we remove the general usage of the temporary queue, and keep it only for the packet unbundling case. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 75db07c78a69..11f74294e085 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -953,7 +953,7 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: case CONN_MANAGER: - __skb_queue_tail(inputq, skb); + skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: node->bclink.recv_permitted = true; @@ -982,6 +982,7 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, struct tipc_msg *hdr = buf_msg(skb); struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; + struct sk_buff_head tmpq; int usr = msg_user(hdr); int rc = 0; int pos = 0; @@ -1006,10 +1007,12 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, } if (usr == MSG_BUNDLER) { + skb_queue_head_init(&tmpq); l->stats.recv_bundles++; l->stats.recv_bundled += msg_msgcnt(hdr); while (tipc_msg_extract(skb, &iskb, &pos)) - tipc_data_input(l, iskb, inputq); + tipc_data_input(l, iskb, &tmpq); + tipc_skb_queue_splice_tail(&tmpq, inputq); return 0; } else if (usr == MSG_FRAGMENTER) { l->stats.recv_fragments++; @@ -1053,13 +1056,10 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) { struct sk_buff_head *arrvq = &l->deferdq; - struct sk_buff_head tmpq; struct tipc_msg *hdr; u16 seqno, rcv_nxt; int rc = 0; - __skb_queue_head_init(&tmpq); - if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) tipc_link_build_proto_msg(l, STATE_MSG, 0, @@ -1114,8 +1114,8 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Packet can be delivered */ l->rcv_nxt++; l->stats.recv_info++; - if (unlikely(!tipc_data_input(l, skb, &tmpq))) - rc = tipc_link_input(l, skb, &tmpq); + if (unlikely(!tipc_data_input(l, skb, l->inputq))) + rc = tipc_link_input(l, skb, l->inputq); /* Ack at regular intervals */ if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { @@ -1126,7 +1126,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, } } exit: - tipc_skb_queue_splice_tail(&tmpq, l->inputq); return rc; } -- cgit From f9aa358a8109f9f33e96c3a7efb9a07631670294 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 15 Oct 2015 14:52:41 -0400 Subject: tipc: simplify tipc_link_rcv() reception loop Currently, all packets received in tipc_link_rcv() are unconditionally added to the packet deferred queue, whereafter that queue is walked and all its buffers evaluated for delivery. This is both non-optimal and and makes the queue sorting function unnecessary complex. This commit changes the loop so that an arrived packet is evaluated first, and added to the deferred queue only when a sequence number gap is discovered. A non-empty deferred queue is walked until it is empty or until its head's sequence number doesn't fit. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 84 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 47 insertions(+), 37 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 11f74294e085..8e23ab523530 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1047,44 +1047,55 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) return released; } +/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission + */ +void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) +{ + l->rcv_unacked = 0; + l->stats.sent_acks++; + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); +} + +/* tipc_link_build_nack_msg: prepare link nack message for transmission + */ +static void tipc_link_build_nack_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) +{ + u32 def_cnt = ++l->stats.deferred_recv; + + if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); +} + /* tipc_link_rcv - process TIPC packets/messages arriving from off-node - * @link: the link that should handle the message + * @l: the link that should handle the message * @skb: TIPC packet * @xmitq: queue to place packets to be sent after this call */ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) { - struct sk_buff_head *arrvq = &l->deferdq; + struct sk_buff_head *defq = &l->deferdq; struct tipc_msg *hdr; u16 seqno, rcv_nxt; int rc = 0; - if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { - if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) - tipc_link_build_proto_msg(l, STATE_MSG, 0, - 0, 0, 0, xmitq); - return rc; - } - - while ((skb = skb_peek(arrvq))) { + do { hdr = buf_msg(skb); + seqno = msg_seqno(hdr); + rcv_nxt = l->rcv_nxt; /* Verify and update link state */ - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { - __skb_dequeue(arrvq); - rc = tipc_link_proto_rcv(l, skb, xmitq); - continue; - } + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) + return tipc_link_proto_rcv(l, skb, xmitq); if (unlikely(!link_is_up(l))) { rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - if (!link_is_up(l)) { - kfree_skb(__skb_dequeue(arrvq)); - goto exit; - } + if (!link_is_up(l)) + goto drop; } + /* Don't send probe at next timeout expiration */ l->silent_intv_cnt = 0; /* Forward queues and wake up waiting users */ @@ -1095,37 +1106,36 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, } /* Defer reception if there is a gap in the sequence */ - seqno = msg_seqno(hdr); - rcv_nxt = l->rcv_nxt; if (unlikely(less(rcv_nxt, seqno))) { - l->stats.deferred_recv++; - goto exit; + __tipc_skb_queue_sorted(defq, skb); + tipc_link_build_nack_msg(l, xmitq); + break; } - __skb_dequeue(arrvq); - /* Drop if packet already received */ if (unlikely(more(rcv_nxt, seqno))) { l->stats.duplicates++; - kfree_skb(skb); - goto exit; + goto drop; } /* Packet can be delivered */ l->rcv_nxt++; l->stats.recv_info++; - if (unlikely(!tipc_data_input(l, skb, l->inputq))) + + if (!tipc_data_input(l, skb, l->inputq)) rc = tipc_link_input(l, skb, l->inputq); + if (rc) + break; /* Ack at regular intervals */ - if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { - l->rcv_unacked = 0; - l->stats.sent_acks++; - tipc_link_build_proto_msg(l, STATE_MSG, - 0, 0, 0, 0, xmitq); - } - } -exit: + if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) + tipc_link_build_ack_msg(l, xmitq); + + } while ((skb = __skb_dequeue(defq))); + + return rc; +drop: + kfree_skb(skb); return rc; } @@ -1249,7 +1259,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, } /* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets - * with contents of the link's tranmsit and backlog queues. + * with contents of the link's transmit and backlog queues. */ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, int mtyp, struct sk_buff_head *xmitq) -- cgit From 81204c492b05274ade680c54787cd8ba234dcfd7 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 15 Oct 2015 14:52:42 -0400 Subject: tipc: improve sequence number checking The sequence number of an incoming packet is currently only checked for less than, equality to, or bigger than the next expected number, meaning that the receive window in practice becomes one half sequence number cycle, or U16_MAX/2. This does not make sense, and may not even be safe if there are extreme delays in the network. Any packet sent by the peer during the ongoing cycle must belong inside his current send window, or should otherwise be dropped if possible. Since a link endpoint cannot know its peer's current send window, it has to base this sanity check on a worst-case assumption, i.e., that the peer is using a maximum sized window of 8191 packets. Using this assumption, we now add a check that the sequence number is not bigger than next_expected + TIPC_MAX_LINK_WIN. We also re-order the checks done, so that the receive window test is performed before the gap test. This way, we are guaranteed that no packet with illegal sequence numbers are ever added to the deferred queue. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 8e23ab523530..2b549f653d80 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1077,13 +1077,14 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, { struct sk_buff_head *defq = &l->deferdq; struct tipc_msg *hdr; - u16 seqno, rcv_nxt; + u16 seqno, rcv_nxt, win_lim; int rc = 0; do { hdr = buf_msg(skb); seqno = msg_seqno(hdr); rcv_nxt = l->rcv_nxt; + win_lim = rcv_nxt + TIPC_MAX_LINK_WIN; /* Verify and update link state */ if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) @@ -1098,6 +1099,12 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Don't send probe at next timeout expiration */ l->silent_intv_cnt = 0; + /* Drop if outside receive window */ + if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) { + l->stats.duplicates++; + goto drop; + } + /* Forward queues and wake up waiting users */ if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { tipc_link_advance_backlog(l, xmitq); @@ -1105,29 +1112,20 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, link_prepare_wakeup(l); } - /* Defer reception if there is a gap in the sequence */ - if (unlikely(less(rcv_nxt, seqno))) { + /* Defer delivery if sequence gap */ + if (unlikely(seqno != rcv_nxt)) { __tipc_skb_queue_sorted(defq, skb); tipc_link_build_nack_msg(l, xmitq); break; } - /* Drop if packet already received */ - if (unlikely(more(rcv_nxt, seqno))) { - l->stats.duplicates++; - goto drop; - } - - /* Packet can be delivered */ + /* Deliver packet */ l->rcv_nxt++; l->stats.recv_info++; - if (!tipc_data_input(l, skb, l->inputq)) rc = tipc_link_input(l, skb, l->inputq); - if (rc) + if (unlikely(rc)) break; - - /* Ack at regular intervals */ if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) tipc_link_build_ack_msg(l, xmitq); -- cgit From 8306f99a517b91ebf8fa94d017c2c84ca62e107c Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 15 Oct 2015 14:52:43 -0400 Subject: tipc: disallow packet duplicates in link deferred queue After the previous commits, we are guaranteed that no packets of type LINK_PROTOCOL or with illegal sequence numbers will be attempted added to the link deferred queue. This makes it possible to make some simplifications to the sorting algorithm in the function tipc_skb_queue_sorted(). We also alter the function so that it will drop packets if one with the same seqeunce number is already present in the queue. This is necessary because we have identified weird packet sequences, involving duplicate packets, where a legitimate in-sequence packet may advance to the head of the queue without being detected and de-queued. Finally, we make this function outline, since it will now be called only in exceptional cases. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 2b549f653d80..e7c608631276 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1114,7 +1114,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Defer delivery if sequence gap */ if (unlikely(seqno != rcv_nxt)) { - __tipc_skb_queue_sorted(defq, skb); + __tipc_skb_queue_sorted(defq, seqno, skb); tipc_link_build_nack_msg(l, xmitq); break; } -- cgit From 73f646cec35477b5099d7e952297cb9e1855be45 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 15 Oct 2015 14:52:44 -0400 Subject: tipc: delay ESTABLISH state event when link is established Link establishing, just like link teardown, is a non-atomic action, in the sense that discovering that conditions are right to establish a link, and the actual adding of the link to one of the node's send slots is done in two different lock contexts. The link FSM is designed to help bridging the gap between the two contexts in a safe manner. We have now discovered a weakness in the implementaton of this FSM. Because we directly let the link go from state LINK_ESTABLISHING to state LINK_ESTABLISHED already in the first lock context, we are unable to distinguish between a fully established link, i.e., a link that has been added to its slot, and a link that has not yet reached the second lock context. It may hence happen that a manual intervention, e.g., when disabling an interface, causes the function tipc_node_link_down() to try removing the link from the node slots, decrementing its active link counter etc, although the link was never added there in the first place. We solve this by delaying the actual state change until we reach the second lock context, inside the function tipc_node_link_up(). This makes it possible for potentail callers of __tipc_node_link_down() to know if they should proceed or not, and the problem is solved. Unforunately, the situation described above also has a second problem. Since there by necessity is a tipc_node_link_up() call pending once the node lock has been released, we must defuse that call by setting the link back from LINK_ESTABLISHING to LINK_RESET state. This forces us to make a slight modification to the link FSM, which will now look as follows. +------------------------------------+ |RESET_EVT | | | | +--------------+ | +-----------------| SYNCHING |-----------------+ | |FAILURE_EVT +--------------+ PEER_RESET_EVT| | | A | | | | | | | | | | | | | | |SYNCH_ |SYNCH_ | | | |BEGIN_EVT |END_EVT | | | | | | | V | V V | +-------------+ +--------------+ +------------+ | | RESETTING |<---------| ESTABLISHED |--------->| PEER_RESET | | +-------------+ FAILURE_ +--------------+ PEER_ +------------+ | | EVT | A RESET_EVT | | | | | | | | +----------------+ | | | RESET_EVT| |RESET_EVT | | | | | | | | | | |ESTABLISH_EVT | | | | +-------------+ | | | | | | RESET_EVT | | | | | | | | | | | V V V | | | | +-------------+ +--------------+ RESET_EVT| +--->| RESET |--------->| ESTABLISHING |<----------------+ +-------------+ PEER_ +--------------+ | A RESET_EVT | | | | | | | |FAILOVER_ |FAILOVER_ |FAILOVER_ |BEGIN_EVT |END_EVT |BEGIN_EVT | | | V | | +-------------+ | | FAILINGOVER |<----------------+ +-------------+ Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index e7c608631276..8c794c1dd531 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -125,6 +125,11 @@ bool tipc_link_is_reset(struct tipc_link *l) return l->state & (LINK_RESET | LINK_FAILINGOVER | LINK_ESTABLISHING); } +bool tipc_link_is_establishing(struct tipc_link *l) +{ + return l->state == LINK_ESTABLISHING; +} + bool tipc_link_is_synching(struct tipc_link *l) { return l->state == LINK_SYNCHING; @@ -321,14 +326,15 @@ int tipc_link_fsm_evt(struct tipc_link *l, int evt) switch (evt) { case LINK_ESTABLISH_EVT: l->state = LINK_ESTABLISHED; - rc |= TIPC_LINK_UP_EVT; break; case LINK_FAILOVER_BEGIN_EVT: l->state = LINK_FAILINGOVER; break; - case LINK_PEER_RESET_EVT: case LINK_RESET_EVT: + l->state = LINK_RESET; + break; case LINK_FAILURE_EVT: + case LINK_PEER_RESET_EVT: case LINK_SYNCH_BEGIN_EVT: case LINK_FAILOVER_END_EVT: break; @@ -1091,9 +1097,9 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, return tipc_link_proto_rcv(l, skb, xmitq); if (unlikely(!link_is_up(l))) { - rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - if (!link_is_up(l)) - goto drop; + if (l->state == LINK_ESTABLISHING) + rc = TIPC_LINK_UP_EVT; + goto drop; } /* Don't send probe at next timeout expiration */ @@ -1338,6 +1344,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, u16 peers_tol = msg_link_tolerance(hdr); u16 peers_prio = msg_linkprio(hdr); u16 rcv_nxt = l->rcv_nxt; + int mtyp = msg_type(hdr); char *if_name; int rc = 0; @@ -1347,7 +1354,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, if (link_own_addr(l) > msg_prevnode(hdr)) l->net_plane = msg_net_plane(hdr); - switch (msg_type(hdr)) { + switch (mtyp) { case RESET_MSG: /* Ignore duplicate RESET with old session number */ @@ -1374,12 +1381,14 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI)) l->priority = peers_prio; - if (msg_type(hdr) == RESET_MSG) { - rc |= tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); - } else if (!link_is_up(l)) { - tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); - rc |= tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - } + /* ACTIVATE_MSG serves as PEER_RESET if link is already down */ + if ((mtyp == RESET_MSG) || !link_is_up(l)) + rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); + + /* ACTIVATE_MSG takes up link if it was already locally reset */ + if ((mtyp == ACTIVATE_MSG) && (l->state == LINK_ESTABLISHING)) + rc = TIPC_LINK_UP_EVT; + l->peer_session = msg_session(hdr); l->peer_bearer_id = msg_bearer_id(hdr); if (l->mtu > msg_max_pkt(hdr)) @@ -1396,9 +1405,12 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, l->stats.recv_states++; if (msg_probe(hdr)) l->stats.recv_probes++; - rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - if (!link_is_up(l)) + + if (!link_is_up(l)) { + if (l->state == LINK_ESTABLISHING) + rc = TIPC_LINK_UP_EVT; break; + } /* Send NACK if peer has sent pkts we haven't received yet */ if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l)) -- cgit From 282b3a056225b35024246f63feb91d769d714dad Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 15 Oct 2015 14:52:45 -0400 Subject: tipc: send out RESET immediately when link goes down When a link is taken down because of a node local event, such as disabling of a bearer or an interface, we currently leave it to the peer node to discover the broken communication. The default time for such failure discovery is 1.5-2 seconds. If we instead allow the terminating link endpoint to send out a RESET message at the moment it is reset, we can achieve the impression that both endpoints are going down instantly. Since this is a very common scenario, we find it worthwhile to make this small modification. Apart from letting the link produce the said message, we also have to ensure that the interface is able to transmit it before TIPC is detached. We do this by performing the disabling of a bearer in three steps: 1) Disable reception of TIPC packets from the interface in question. 2) Take down the links, while allowing them so send out a RESET message. 3) Disable transmission of TIPC packets on the interface. Apart from this, we now have to react on the NETDEV_GOING_DOWN event, instead of as currently the NEDEV_DOWN event, to ensure that such transmission is possible during the teardown phase. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 8c794c1dd531..737b5980020d 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1062,6 +1062,18 @@ void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); } +/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message + */ +void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq) +{ + int mtyp = RESET_MSG; + + if (l->state == LINK_ESTABLISHING) + mtyp = ACTIVATE_MSG; + + tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq); +} + /* tipc_link_build_nack_msg: prepare link nack message for transmission */ static void tipc_link_build_nack_msg(struct tipc_link *l, -- cgit From c819930090fe3f74c822be765c185b3431360193 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 15 Oct 2015 14:52:46 -0400 Subject: tipc: update node FSM when peer RESET message is received The change made in the previous commit revealed a small flaw in the way the node FSM is updated. When the function tipc_node_link_down() is called for the last link to a node, we should check whether this was caused by a local reset or by a received RESET message from the peer. In the latter case, we can directly issue a PEER_LOST_CONTACT_EVT to the node FSM, so that it is ready to re-establish contact. If this is not done, the peer node will sometimes have to go through a second establish cycle before the link becomes stable. We fix this in this commit by conditionally issuing the mentioned event in the function tipc_node_link_down(). We also move LINK_RESET FSM even away from the link_reset() function and into the caller function, partially because it is easier to follow the code when state changes are gathered at a limited number of locations, partially because there will be cases in future commits where we don't want the link to go RESET mode when link_reset() is called. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 737b5980020d..ff9b0b92e62e 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -120,6 +120,11 @@ bool tipc_link_is_up(struct tipc_link *l) return link_is_up(l); } +bool tipc_link_peer_is_down(struct tipc_link *l) +{ + return l->state == LINK_PEER_RESET; +} + bool tipc_link_is_reset(struct tipc_link *l) { return l->state & (LINK_RESET | LINK_FAILINGOVER | LINK_ESTABLISHING); @@ -584,8 +589,6 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr) void tipc_link_reset(struct tipc_link *l) { - tipc_link_fsm_evt(l, LINK_RESET_EVT); - /* Link is down, accept any session */ l->peer_session = WILDCARD_SESSION; -- cgit From 0e05498e9eae16a6d8c86543e77930ec152e655e Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:36 -0400 Subject: tipc: make link implementation independent from struct tipc_bearer In reality, the link implementation is already independent from struct tipc_bearer, in that it doesn't store any reference to it. However, we still pass on a pointer to a bearer instance in the function tipc_link_create(), just to have it extract some initialization information from it. I later commits, we need to create instances of tipc_link without having any associated struct tipc_bearer. To facilitate this, we want to extract the initialization data already in the creator function in node.c, before calling tipc_link_create(), and pass this info on as individual parameters in the call. This commit introduces this change. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 47 +++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index ff9b0b92e62e..0d8fdc8fe6d4 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -165,9 +165,16 @@ static u32 link_own_addr(struct tipc_link *l) /** * tipc_link_create - create a new link * @n: pointer to associated node - * @b: pointer to associated bearer + * @if_name: associated interface name + * @bearer_id: id (index) of associated bearer + * @tolerance: link tolerance to be used by link + * @net_plane: network plane (A,B,c..) this link belongs to + * @mtu: mtu to be advertised by link + * @priority: priority to be used by link + * @window: send window to be used by link + * @session: session to be used by link * @ownnode: identity of own node - * @peer: identity of peer node + * @peer: node id of peer node * @maddr: media address to be used * @inputq: queue to put messages ready for delivery * @namedq: queue to put binding table update messages ready for delivery @@ -175,47 +182,47 @@ static u32 link_own_addr(struct tipc_link *l) * * Returns true if link was created, otherwise false */ -bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, - u32 ownnode, u32 peer, struct tipc_media_addr *maddr, +bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, + int tolerance, char net_plane, u32 mtu, int priority, + int window, u32 session, u32 ownnode, u32 peer, + struct tipc_media_addr *maddr, struct sk_buff_head *inputq, struct sk_buff_head *namedq, struct tipc_link **link) { struct tipc_link *l; struct tipc_msg *hdr; - char *if_name; l = kzalloc(sizeof(*l), GFP_ATOMIC); if (!l) return false; *link = l; + l->pmsg = (struct tipc_msg *)&l->proto_msg; + hdr = l->pmsg; + tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer); + msg_set_size(hdr, sizeof(l->proto_msg)); + msg_set_session(hdr, session); + msg_set_bearer_id(hdr, l->bearer_id); /* Note: peer i/f name is completed by reset/activate message */ - if_name = strchr(b->name, ':') + 1; sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); + strcpy((char *)msg_data(hdr), if_name); l->addr = peer; l->media_addr = maddr; l->owner = n; l->peer_session = WILDCARD_SESSION; - l->bearer_id = b->identity; - l->tolerance = b->tolerance; - l->net_plane = b->net_plane; - l->advertised_mtu = b->mtu; - l->mtu = b->mtu; - l->priority = b->priority; - tipc_link_set_queue_limits(l, b->window); + l->bearer_id = bearer_id; + l->tolerance = tolerance; + l->net_plane = net_plane; + l->advertised_mtu = mtu; + l->mtu = mtu; + l->priority = priority; + tipc_link_set_queue_limits(l, window); l->inputq = inputq; l->namedq = namedq; l->state = LINK_RESETTING; - l->pmsg = (struct tipc_msg *)&l->proto_msg; - hdr = l->pmsg; - tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer); - msg_set_size(hdr, sizeof(l->proto_msg)); - msg_set_session(hdr, session); - msg_set_bearer_id(hdr, l->bearer_id); - strcpy((char *)msg_data(hdr), if_name); __skb_queue_head_init(&l->transmq); __skb_queue_head_init(&l->backlogq); __skb_queue_head_init(&l->deferdq); -- cgit From 323019069e8d96d87e9dba51f897060f94999821 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:37 -0400 Subject: tipc: use explicit allocation of broadcast send link The broadcast link instance (struct tipc_link) used for sending is currently aggregated into struct tipc_bclink. This means that we cannot use the regular tipc_link_create() function for initiating the link, but do instead have to initiate numerous fields directly from the bcast_init() function. We want to reduce dependencies between the broadcast functionality and the inner workings of tipc_link. In this commit, we introduce a new function tipc_bclink_create() to link.c, and allocate the instance of the link separately using this function. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 0d8fdc8fe6d4..f0cf768a59f3 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -50,6 +50,7 @@ */ static const char *link_co_err = "Link tunneling error, "; static const char *link_rst_msg = "Resetting link "; +static const char tipc_bclink_name[] = "broadcast-link"; static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, @@ -231,6 +232,34 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, return true; } +/** + * tipc_link_bc_create - create new link to be used for broadcast + * @n: pointer to associated node + * @mtu: mtu to be used + * @window: send window to be used + * @inputq: queue to put messages ready for delivery + * @namedq: queue to put binding table update messages ready for delivery + * @link: return value, pointer to put the created link + * + * Returns true if link was created, otherwise false + */ +bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, + struct tipc_link **link) +{ + struct tipc_link *l; + + if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, + 0, 0, 0, NULL, inputq, namedq, link)) + return false; + + l = *link; + strcpy(l->name, tipc_bclink_name); + tipc_link_reset(l); + return true; +} + /* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. * * Give a newly added peer node the sequence number where it should -- cgit From c1ab3f1dea3df566ad38caf98baf69c656679090 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:38 -0400 Subject: tipc: make struct tipc_link generic to support broadcast Realizing that unicast is just a special case of broadcast, we also see that we can go in the other direction, i.e., that modest changes to the current unicast link can make it generic enough to support broadcast. The following changes are introduced here: - A new counter ("ackers") in struct tipc_link, to indicate how many peers need to ack a packet before it can be released. - A corresponding counter in the skb user area, to keep track of how many peers a are left to ack before a buffer can be released. - A new counter ("acked"), to keep persistent track of how far a peer has acked at the moment, i.e., where in the transmission queue to start updating buffers when the next ack arrives. This is to avoid double acknowledgements from a peer, with inadvertent relase of packets as a result. - A more generic tipc_link_retrans() function, where retransmit starts from a given sequence number, instead of the first packet in the transmision queue. This is to minimize the number of retransmitted packets on the broadcast media. When the new functionality is taken into use in the next commits, we expect it to have minimal effect on unicast mode performance. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index f0cf768a59f3..dfc738e5cff9 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -221,6 +221,7 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, l->mtu = mtu; l->priority = priority; tipc_link_set_queue_limits(l, window); + l->ackers = 1; l->inputq = inputq; l->namedq = namedq; l->state = LINK_RESETTING; @@ -647,6 +648,7 @@ void tipc_link_reset(struct tipc_link *l) l->rcv_unacked = 0; l->snd_nxt = 1; l->rcv_nxt = 1; + l->acked = 0; l->silent_intv_cnt = 0; l->stats.recv_info = 0; l->stale_count = 0; @@ -769,6 +771,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, __skb_dequeue(list); __skb_queue_tail(transmq, skb); __skb_queue_tail(xmitq, _skb); + TIPC_SKB_CB(skb)->ackers = l->ackers; l->rcv_unacked = 0; seqno++; continue; @@ -829,6 +832,7 @@ void tipc_link_push_packets(struct tipc_link *link) skb = __skb_dequeue(&link->backlogq); if (!skb) break; + TIPC_SKB_CB(skb)->ackers = link->ackers; msg = buf_msg(skb); link->backlog[msg_importance(msg)].len--; msg_set_ack(msg, ack); @@ -862,6 +866,7 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) l->backlog[msg_importance(hdr)].len--; __skb_queue_tail(&l->transmq, skb); __skb_queue_tail(xmitq, _skb); + TIPC_SKB_CB(skb)->ackers = l->ackers; msg_set_ack(hdr, ack); msg_set_seqno(hdr, seqno); msg_set_bcast_ack(hdr, l->owner->bclink.last_in); @@ -947,11 +952,13 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, } } -static int tipc_link_retransm(struct tipc_link *l, int retransm, - struct sk_buff_head *xmitq) +int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, + struct sk_buff_head *xmitq) { struct sk_buff *_skb, *skb = skb_peek(&l->transmq); struct tipc_msg *hdr; + u16 ack = l->rcv_nxt - 1; + u16 bc_ack = l->owner->bclink.last_in; if (!skb) return 0; @@ -964,19 +971,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, link_retransmit_failure(l, skb); return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } + + /* Move forward to where retransmission should start */ skb_queue_walk(&l->transmq, skb) { - if (!retransm) - return 0; + if (!less(buf_seqno(skb), from)) + break; + } + + skb_queue_walk_from(&l->transmq, skb) { + if (more(buf_seqno(skb), to)) + break; hdr = buf_msg(skb); _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); if (!_skb) return 0; hdr = buf_msg(_skb); - msg_set_ack(hdr, l->rcv_nxt - 1); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); - retransm--; l->stats.retransmitted++; } return 0; @@ -1390,7 +1403,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, { struct tipc_msg *hdr = buf_msg(skb); u16 rcvgap = 0; - u16 nacked_gap = msg_seq_gap(hdr); + u16 ack = msg_ack(hdr); + u16 gap = msg_seq_gap(hdr); u16 peers_snd_nxt = msg_next_sent(hdr); u16 peers_tol = msg_link_tolerance(hdr); u16 peers_prio = msg_linkprio(hdr); @@ -1469,11 +1483,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, if (rcvgap || (msg_probe(hdr))) tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, 0, 0, xmitq); - tipc_link_release_pkts(l, msg_ack(hdr)); + tipc_link_release_pkts(l, ack); /* If NACK, retransmit will now start at right position */ - if (nacked_gap) { - rc = tipc_link_retransm(l, nacked_gap, xmitq); + if (gap) { + rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq); l->stats.recv_nacks++; } @@ -1550,7 +1564,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr) static void link_print(struct tipc_link *l, const char *str) { struct sk_buff *hskb = skb_peek(&l->transmq); - u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; + u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1; u16 tail = l->snd_nxt - 1; pr_info("%s Link <%s> state %x\n", str, l->name, l->state); -- cgit From 2f566124570625c29c3fd79bac4d9cd97c0c31a1 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:39 -0400 Subject: tipc: let broadcast transmission use new link transmit function This commit simplifies the broadcast link transmission function, by leveraging previous changes to the link transmission function and the broadcast transmission link life cycle. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index dfc738e5cff9..363da5f85704 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -158,6 +158,21 @@ int tipc_link_is_active(struct tipc_link *l) return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); } +void tipc_link_add_bc_peer(struct tipc_link *l) +{ + l->ackers++; +} + +void tipc_link_remove_bc_peer(struct tipc_link *l) +{ + l->ackers--; +} + +int tipc_link_bc_peers(struct tipc_link *l) +{ + return l->ackers; +} + static u32 link_own_addr(struct tipc_link *l) { return msg_prevnode(l->pmsg); @@ -258,6 +273,7 @@ bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, l = *link; strcpy(l->name, tipc_bclink_name); tipc_link_reset(l); + l->ackers = 0; return true; } @@ -898,8 +914,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, char addr_string[16]; pr_info("Msg seq number: %u, ", msg_seqno(msg)); - pr_cont("Outstanding acks: %lu\n", - (unsigned long) TIPC_SKB_CB(buf)->handle); + pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers); n_ptr = tipc_bclink_retransmit_to(net); -- cgit From fd556f209af53b9cdc45df8c467feb235376c4df Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:40 -0400 Subject: tipc: introduce capability bit for broadcast synchronization Until now, we have tried to support both the newer, dedicated broadcast synchronization mechanism along with the older, less safe, RESET_MSG/ ACTIVATE_MSG based one. The latter method has turned out to be a hazard in a highly dynamic cluster, so we find it safer to disable it completely when we find that the former mechanism is supported by the peer node. For this purpose, we now introduce a new capabability bit, TIPC_BCAST_SYNCH, to inform any peer nodes that dedicated broadcast syncronization is supported by the present node. The new bit is conveyed between peers in the 'capabilities' field of neighbor discovery messages. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 363da5f85704..6a1a9d9239ae 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -191,6 +191,7 @@ static u32 link_own_addr(struct tipc_link *l) * @session: session to be used by link * @ownnode: identity of own node * @peer: node id of peer node + * @peer_caps: bitmap describing peer node capabilities * @maddr: media address to be used * @inputq: queue to put messages ready for delivery * @namedq: queue to put binding table update messages ready for delivery @@ -201,7 +202,7 @@ static u32 link_own_addr(struct tipc_link *l) bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, int tolerance, char net_plane, u32 mtu, int priority, int window, u32 session, u32 ownnode, u32 peer, - struct tipc_media_addr *maddr, + u16 peer_caps, struct tipc_media_addr *maddr, struct sk_buff_head *inputq, struct sk_buff_head *namedq, struct tipc_link **link) { @@ -226,6 +227,7 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, strcpy((char *)msg_data(hdr), if_name); l->addr = peer; + l->peer_caps = peer_caps; l->media_addr = maddr; l->owner = n; l->peer_session = WILDCARD_SESSION; @@ -260,6 +262,7 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, * Returns true if link was created, otherwise false */ bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, + u16 peer_caps, struct sk_buff_head *inputq, struct sk_buff_head *namedq, struct tipc_link **link) @@ -267,7 +270,7 @@ bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, struct tipc_link *l; if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, - 0, 0, 0, NULL, inputq, namedq, link)) + 0, 0, 0, peer_caps, NULL, inputq, namedq, link)) return false; l = *link; -- cgit From 5266698661401afc5e4a1a521cf9ba10724d10dd Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:41 -0400 Subject: tipc: let broadcast packet reception use new link receive function The code path for receiving broadcast packets is currently distinct from the unicast path. This leads to unnecessary code and data duplication, something that can be avoided with some effort. We now introduce separate per-peer tipc_link instances for handling broadcast packet reception. Each receive link keeps a pointer to the common, single, broadcast link instance, and can hence handle release and retransmission of send buffers as if they belonged to the own instance. Furthermore, we let each unicast link instance keep a reference to both the pertaining broadcast receive link, and to the common send link. This makes it possible for the unicast links to easily access data for broadcast link synchronization, as well as for carrying acknowledges for received broadcast packets. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 435 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 312 insertions(+), 123 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 6a1a9d9239ae..ff725c398914 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -76,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } }; +/* Send states for broadcast NACKs + */ +enum { + BC_NACK_SND_CONDITIONAL, + BC_NACK_SND_UNCONDITIONAL, + BC_NACK_SND_SUPPRESS, +}; + /* * Interval between NACKs when packets arrive out of order */ @@ -111,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, struct sk_buff_head *xmitq); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); +static void tipc_link_build_nack_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +static void tipc_link_build_bc_init_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +static bool tipc_link_release_pkts(struct tipc_link *l, u16 to); /* * Simple non-static link routines (i.e. referenced outside this file) @@ -151,6 +163,16 @@ bool tipc_link_is_blocked(struct tipc_link *l) return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); } +bool link_is_bc_sndlink(struct tipc_link *l) +{ + return !l->bc_sndlink; +} + +bool link_is_bc_rcvlink(struct tipc_link *l) +{ + return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l)); +} + int tipc_link_is_active(struct tipc_link *l) { struct tipc_node *n = l->owner; @@ -158,14 +180,31 @@ int tipc_link_is_active(struct tipc_link *l) return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); } -void tipc_link_add_bc_peer(struct tipc_link *l) +void tipc_link_add_bc_peer(struct tipc_link *snd_l, + struct tipc_link *uc_l, + struct sk_buff_head *xmitq) { - l->ackers++; + struct tipc_link *rcv_l = uc_l->bc_rcvlink; + + snd_l->ackers++; + rcv_l->acked = snd_l->snd_nxt - 1; + tipc_link_build_bc_init_msg(uc_l, xmitq); } -void tipc_link_remove_bc_peer(struct tipc_link *l) +void tipc_link_remove_bc_peer(struct tipc_link *snd_l, + struct tipc_link *rcv_l, + struct sk_buff_head *xmitq) { - l->ackers--; + u16 ack = snd_l->snd_nxt - 1; + + snd_l->ackers--; + tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); + tipc_link_reset(rcv_l); + rcv_l->state = LINK_RESET; + if (!snd_l->ackers) { + tipc_link_reset(snd_l); + __skb_queue_purge(xmitq); + } } int tipc_link_bc_peers(struct tipc_link *l) @@ -193,6 +232,8 @@ static u32 link_own_addr(struct tipc_link *l) * @peer: node id of peer node * @peer_caps: bitmap describing peer node capabilities * @maddr: media address to be used + * @bc_sndlink: the namespace global link used for broadcast sending + * @bc_rcvlink: the peer specific link used for broadcast reception * @inputq: queue to put messages ready for delivery * @namedq: queue to put binding table update messages ready for delivery * @link: return value, pointer to put the created link @@ -202,8 +243,12 @@ static u32 link_own_addr(struct tipc_link *l) bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, int tolerance, char net_plane, u32 mtu, int priority, int window, u32 session, u32 ownnode, u32 peer, - u16 peer_caps, struct tipc_media_addr *maddr, - struct sk_buff_head *inputq, struct sk_buff_head *namedq, + u16 peer_caps, + struct tipc_media_addr *maddr, + struct tipc_link *bc_sndlink, + struct tipc_link *bc_rcvlink, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, struct tipc_link **link) { struct tipc_link *l; @@ -239,6 +284,8 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, l->priority = priority; tipc_link_set_queue_limits(l, window); l->ackers = 1; + l->bc_sndlink = bc_sndlink; + l->bc_rcvlink = bc_rcvlink; l->inputq = inputq; l->namedq = namedq; l->state = LINK_RESETTING; @@ -261,46 +308,32 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, * * Returns true if link was created, otherwise false */ -bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, - u16 peer_caps, +bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer, + int mtu, int window, u16 peer_caps, struct sk_buff_head *inputq, struct sk_buff_head *namedq, + struct tipc_link *bc_sndlink, struct tipc_link **link) { struct tipc_link *l; if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, - 0, 0, 0, peer_caps, NULL, inputq, namedq, link)) + 0, ownnode, peer, peer_caps, NULL, bc_sndlink, + NULL, inputq, namedq, link)) return false; l = *link; strcpy(l->name, tipc_bclink_name); tipc_link_reset(l); + l->state = LINK_RESET; l->ackers = 0; - return true; -} + l->bc_rcvlink = l; -/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. - * - * Give a newly added peer node the sequence number where it should - * start receiving and acking broadcast packets. - */ -void tipc_link_build_bcast_sync_msg(struct tipc_link *l, - struct sk_buff_head *xmitq) -{ - struct sk_buff *skb; - struct sk_buff_head list; - u16 last_sent; + /* Broadcast send link is always up */ + if (link_is_bc_sndlink(l)) + l->state = LINK_ESTABLISHED; - skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, - 0, l->addr, link_own_addr(l), 0, 0, 0); - if (!skb) - return; - last_sent = tipc_bclink_get_last_sent(l->owner->net); - msg_set_last_bcast(buf_msg(skb), last_sent); - __skb_queue_head_init(&list); - __skb_queue_tail(&list, skb); - tipc_link_xmit(l, &list, xmitq); + return true; } /** @@ -505,6 +538,8 @@ static void link_profile_stats(struct tipc_link *l) l->stats.msg_length_profile[6]++; } +/* tipc_link_timeout - perform periodic task as instructed from node timeout + */ /* tipc_link_timeout - perform periodic task as instructed from node timeout */ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) @@ -513,6 +548,9 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) int mtyp = STATE_MSG; bool xmit = false; bool prb = false; + u16 bc_snt = l->bc_sndlink->snd_nxt - 1; + u16 bc_acked = l->bc_rcvlink->acked; + bool bc_up = link_is_up(l->bc_rcvlink); link_profile_stats(l); @@ -520,7 +558,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) case LINK_ESTABLISHED: case LINK_SYNCHING: if (!l->silent_intv_cnt) { - if (tipc_bclink_acks_missing(l->owner)) + if (bc_up && (bc_acked != bc_snt)) xmit = true; } else if (l->silent_intv_cnt <= l->abort_limit) { xmit = true; @@ -671,6 +709,7 @@ void tipc_link_reset(struct tipc_link *l) l->silent_intv_cnt = 0; l->stats.recv_info = 0; l->stale_count = 0; + l->bc_peer_is_up = false; link_reset_statistics(l); } @@ -692,7 +731,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, uint mtu = link->mtu; u16 ack = mod(link->rcv_nxt - 1); u16 seqno = link->snd_nxt; - u16 bc_last_in = link->owner->bclink.last_in; + u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1; struct tipc_media_addr *addr = link->media_addr; struct sk_buff_head *transmq = &link->transmq; struct sk_buff_head *backlogq = &link->backlogq; @@ -712,7 +751,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, msg = buf_msg(skb); msg_set_seqno(msg, seqno); msg_set_ack(msg, ack); - msg_set_bcast_ack(msg, bc_last_in); + msg_set_bcast_ack(msg, bc_ack); if (likely(skb_queue_len(transmq) < maxwin)) { __skb_dequeue(list); @@ -762,7 +801,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, unsigned int mtu = l->mtu; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; - u16 bc_last_in = l->owner->bclink.last_in; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; struct sk_buff_head *transmq = &l->transmq; struct sk_buff_head *backlogq = &l->backlogq; struct sk_buff *skb, *_skb, *bskb; @@ -781,7 +820,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, hdr = buf_msg(skb); msg_set_seqno(hdr, seqno); msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_last_in); + msg_set_bcast_ack(hdr, bc_ack); if (likely(skb_queue_len(transmq) < maxwin)) { _skb = skb_clone(skb, GFP_ATOMIC); @@ -815,23 +854,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return 0; } -/* - * tipc_link_sync_rcv - synchronize broadcast link endpoints. - * Receive the sequence number where we should start receiving and - * acking broadcast packets from a newly added peer node, and open - * up for reception of such packets. - * - * Called with node locked - */ -static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - - n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); - n->bclink.recv_permitted = true; - kfree_skb(buf); -} - /* * tipc_link_push_packets - push unsent packets to bearer * @@ -872,6 +894,7 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) struct tipc_msg *hdr; u16 seqno = l->snd_nxt; u16 ack = l->rcv_nxt - 1; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; while (skb_queue_len(&l->transmq) < l->window) { skb = skb_peek(&l->backlogq); @@ -886,54 +909,25 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) __skb_queue_tail(&l->transmq, skb); __skb_queue_tail(xmitq, _skb); TIPC_SKB_CB(skb)->ackers = l->ackers; - msg_set_ack(hdr, ack); msg_set_seqno(hdr, seqno); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); l->rcv_unacked = 0; seqno++; } l->snd_nxt = seqno; } -static void link_retransmit_failure(struct tipc_link *l_ptr, - struct sk_buff *buf) +static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb) { - struct tipc_msg *msg = buf_msg(buf); - struct net *net = l_ptr->owner->net; - - pr_warn("Retransmission failure on link <%s>\n", l_ptr->name); - - if (l_ptr->addr) { - /* Handle failure on standard link */ - link_print(l_ptr, "Resetting link "); - pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", - msg_user(msg), msg_type(msg), msg_size(msg), - msg_errcode(msg)); - pr_info("sqno %u, prev: %x, src: %x\n", - msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg)); - } else { - /* Handle failure on broadcast link */ - struct tipc_node *n_ptr; - char addr_string[16]; - - pr_info("Msg seq number: %u, ", msg_seqno(msg)); - pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers); - - n_ptr = tipc_bclink_retransmit_to(net); - - tipc_addr_string_fill(addr_string, n_ptr->addr); - pr_info("Broadcast link info for %s\n", addr_string); - pr_info("Reception permitted: %d, Acked: %u\n", - n_ptr->bclink.recv_permitted, - n_ptr->bclink.acked); - pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", - n_ptr->bclink.last_in, - n_ptr->bclink.oos_state, - n_ptr->bclink.last_sent); - - n_ptr->action_flags |= TIPC_BCAST_RESET; - l_ptr->stale_count = 0; - } + struct tipc_msg *hdr = buf_msg(skb); + + pr_warn("Retransmission failure on link <%s>\n", l->name); + link_print(l, "Resetting link "); + pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", + msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr)); + pr_info("sqno %u, prev: %x, src: %x\n", + msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr)); } void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, @@ -976,7 +970,7 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, struct sk_buff *_skb, *skb = skb_peek(&l->transmq); struct tipc_msg *hdr; u16 ack = l->rcv_nxt - 1; - u16 bc_ack = l->owner->bclink.last_in; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; if (!skb) return 0; @@ -1018,11 +1012,9 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, * Consumes buffer if message is of right type * Node lock must be held */ -static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, +static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - struct tipc_node *node = link->owner; - switch (msg_user(buf_msg(skb))) { case TIPC_LOW_IMPORTANCE: case TIPC_MEDIUM_IMPORTANCE: @@ -1032,8 +1024,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: - node->bclink.recv_permitted = true; - skb_queue_tail(link->namedq, skb); + l->bc_rcvlink->state = LINK_ESTABLISHED; + skb_queue_tail(l->namedq, skb); return true; case MSG_BUNDLER: case TUNNEL_PROTOCOL: @@ -1054,7 +1046,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - struct tipc_node *node = l->owner; struct tipc_msg *hdr = buf_msg(skb); struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; @@ -1095,13 +1086,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, if (tipc_buf_append(reasm_skb, &skb)) { l->stats.recv_fragmented++; tipc_data_input(l, skb, inputq); - } else if (!*reasm_skb) { + } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) { + pr_warn_ratelimited("Unable to build fragment list\n"); return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } return 0; } else if (usr == BCAST_PROTOCOL) { - tipc_link_sync_rcv(node, skb); - return 0; + tipc_bcast_lock(l->owner->net); + tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); + tipc_bcast_unlock(l->owner->net); } drop: kfree_skb(skb); @@ -1124,12 +1117,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) } /* tipc_link_build_ack_msg: prepare link acknowledge message for transmission + * + * Note that sending of broadcast ack is coordinated among nodes, to reduce + * risk of ack storms towards the sender */ -void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) +int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) { + if (!l) + return 0; + + /* Broadcast ACK must be sent via a unicast link => defer to caller */ + if (link_is_bc_rcvlink(l)) { + if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf) + return 0; + l->rcv_unacked = 0; + return TIPC_LINK_SND_BC_ACK; + } + + /* Unicast ACK */ l->rcv_unacked = 0; l->stats.sent_acks++; tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); + return 0; } /* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message @@ -1151,6 +1160,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l, { u32 def_cnt = ++l->stats.deferred_recv; + if (link_is_bc_rcvlink(l)) + return; + if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); } @@ -1211,12 +1223,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, l->rcv_nxt++; l->stats.recv_info++; if (!tipc_data_input(l, skb, l->inputq)) - rc = tipc_link_input(l, skb, l->inputq); - if (unlikely(rc)) - break; + rc |= tipc_link_input(l, skb, l->inputq); if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) - tipc_link_build_ack_msg(l, xmitq); - + rc |= tipc_link_build_ack_msg(l, xmitq); + if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK)) + break; } while ((skb = __skb_dequeue(defq))); return rc; @@ -1284,18 +1295,13 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, kfree_skb(skb); } -/* tipc_link_build_proto_msg: prepare link protocol message for transmission - */ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, u16 rcvgap, int tolerance, int priority, struct sk_buff_head *xmitq) { struct sk_buff *skb = NULL; struct tipc_msg *hdr = l->pmsg; - u16 snd_nxt = l->snd_nxt; - u16 rcv_nxt = l->rcv_nxt; - u16 rcv_last = rcv_nxt - 1; - int node_up = l->owner->bclink.recv_permitted; + bool node_up = link_is_up(l->bc_rcvlink); /* Don't send protocol message during reset or link failover */ if (tipc_link_is_blocked(l)) @@ -1303,33 +1309,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, msg_set_type(hdr, mtyp); msg_set_net_plane(hdr, l->net_plane); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); - msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); + msg_set_next_sent(hdr, l->snd_nxt); + msg_set_ack(hdr, l->rcv_nxt - 1); + msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1); + msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1); msg_set_link_tolerance(hdr, tolerance); msg_set_linkprio(hdr, priority); msg_set_redundant_link(hdr, node_up); msg_set_seq_gap(hdr, 0); /* Compatibility: created msg must not be in sequence with pkt flow */ - msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); + msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2); if (mtyp == STATE_MSG) { if (!tipc_link_is_up(l)) return; - msg_set_next_sent(hdr, snd_nxt); /* Override rcvgap if there are packets in deferred queue */ if (!skb_queue_empty(&l->deferdq)) - rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; + rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt; if (rcvgap) { msg_set_seq_gap(hdr, rcvgap); l->stats.sent_nacks++; } - msg_set_ack(hdr, rcv_last); msg_set_probe(hdr, probe); if (probe) l->stats.sent_probes++; l->stats.sent_states++; + l->rcv_unacked = 0; } else { /* RESET_MSG or ACTIVATE_MSG */ msg_set_max_pkt(hdr, l->advertised_mtu); @@ -1431,7 +1438,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, char *if_name; int rc = 0; - if (tipc_link_is_blocked(l)) + if (tipc_link_is_blocked(l) || !xmitq) goto exit; if (link_own_addr(l) > msg_prevnode(hdr)) @@ -1518,6 +1525,188 @@ exit: return rc; } +/* tipc_link_build_bc_proto_msg() - create broadcast protocol message + */ +static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast, + u16 peers_snd_nxt, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb; + struct tipc_msg *hdr; + struct sk_buff *dfrd_skb = skb_peek(&l->deferdq); + u16 ack = l->rcv_nxt - 1; + u16 gap_to = peers_snd_nxt - 1; + + skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, + 0, l->addr, link_own_addr(l), 0, 0, 0); + if (!skb) + return false; + hdr = buf_msg(skb); + msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1); + msg_set_bcast_ack(hdr, ack); + msg_set_bcgap_after(hdr, ack); + if (dfrd_skb) + gap_to = buf_seqno(dfrd_skb) - 1; + msg_set_bcgap_to(hdr, gap_to); + msg_set_non_seq(hdr, bcast); + __skb_queue_tail(xmitq, skb); + return true; +} + +/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + */ +void tipc_link_build_bc_init_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) +{ + struct sk_buff_head list; + + __skb_queue_head_init(&list); + if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list)) + return; + tipc_link_xmit(l, &list, xmitq); +} + +/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer + */ +void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr) +{ + int mtyp = msg_type(hdr); + u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); + + if (link_is_up(l)) + return; + + if (msg_user(hdr) == BCAST_PROTOCOL) { + l->rcv_nxt = peers_snd_nxt; + l->state = LINK_ESTABLISHED; + return; + } + + if (l->peer_caps & TIPC_BCAST_SYNCH) + return; + + if (msg_peer_node_is_up(hdr)) + return; + + /* Compatibility: accept older, less safe initial synch data */ + if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG)) + l->rcv_nxt = peers_snd_nxt; +} + +/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state + */ +void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, + struct sk_buff_head *xmitq) +{ + u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); + + if (!link_is_up(l)) + return; + + if (!msg_peer_node_is_up(hdr)) + return; + + l->bc_peer_is_up = true; + + /* Ignore if peers_snd_nxt goes beyond receive window */ + if (more(peers_snd_nxt, l->rcv_nxt + l->window)) + return; + + if (!more(peers_snd_nxt, l->rcv_nxt)) { + l->nack_state = BC_NACK_SND_CONDITIONAL; + return; + } + + /* Don't NACK if one was recently sent or peeked */ + if (l->nack_state == BC_NACK_SND_SUPPRESS) { + l->nack_state = BC_NACK_SND_UNCONDITIONAL; + return; + } + + /* Conditionally delay NACK sending until next synch rcv */ + if (l->nack_state == BC_NACK_SND_CONDITIONAL) { + l->nack_state = BC_NACK_SND_UNCONDITIONAL; + if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN) + return; + } + + /* Send NACK now but suppress next one */ + tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq); + l->nack_state = BC_NACK_SND_SUPPRESS; +} + +void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb, *tmp; + struct tipc_link *snd_l = l->bc_sndlink; + + if (!link_is_up(l) || !l->bc_peer_is_up) + return; + + if (!more(acked, l->acked)) + return; + + /* Skip over packets peer has already acked */ + skb_queue_walk(&snd_l->transmq, skb) { + if (more(buf_seqno(skb), l->acked)) + break; + } + + /* Update/release the packets peer is acking now */ + skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) { + if (more(buf_seqno(skb), acked)) + break; + if (!--TIPC_SKB_CB(skb)->ackers) { + __skb_unlink(skb, &snd_l->transmq); + kfree_skb(skb); + } + } + l->acked = acked; + tipc_link_advance_backlog(snd_l, xmitq); + if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) + link_prepare_wakeup(snd_l); +} + +/* tipc_link_bc_nack_rcv(): receive broadcast nack message + */ +int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + u32 dnode = msg_destnode(hdr); + int mtyp = msg_type(hdr); + u16 acked = msg_bcast_ack(hdr); + u16 from = acked + 1; + u16 to = msg_bcgap_to(hdr); + u16 peers_snd_nxt = to + 1; + int rc = 0; + + kfree_skb(skb); + + if (!tipc_link_is_up(l) || !l->bc_peer_is_up) + return 0; + + if (mtyp != STATE_MSG) + return 0; + + if (dnode == link_own_addr(l)) { + tipc_link_bc_ack_rcv(l, acked, xmitq); + rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq); + l->stats.recv_nacks++; + return rc; + } + + /* Msg for other node => suppress own NACK at next sync if applicable */ + if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from)) + l->nack_state = BC_NACK_SND_SUPPRESS; + + return 0; +} + void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) { int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); -- cgit From 959e1781aa230aecc90e4deb80117fd9a53dede7 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:43 -0400 Subject: tipc: introduce jumbo frame support for broadcast Until now, we have only been supporting a fix MTU size of 1500 bytes for all broadcast media, irrespective of their actual capability. We now make the broadcast MTU adaptable to the carrying media, i.e., we use the smallest MTU supported by any of the interfaces attached to TIPC. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index ff725c398914..3b98f8e70626 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -212,6 +212,16 @@ int tipc_link_bc_peers(struct tipc_link *l) return l->ackers; } +void tipc_link_set_mtu(struct tipc_link *l, int mtu) +{ + l->mtu = mtu; +} + +int tipc_link_mtu(struct tipc_link *l) +{ + return l->mtu; +} + static u32 link_own_addr(struct tipc_link *l) { return msg_prevnode(l->pmsg); -- cgit From 60852d679575b0d7ce62497938116f92654ae908 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:44 -0400 Subject: tipc: let neighbor discoverer tranmsit consumable buffers The neighbor discovery function currently uses the function tipc_bearer_send() for transmitting packets, assuming that the sent buffers are not consumed by the called function. We want to change this, in order to avoid unnecessary buffer cloning elswhere in the code. This commit introduces a new function tipc_bearer_skb() which consumes the sent buffers, and let the discoverer functions use this new call instead. The discoverer does now itself perform the cloning when that is necessary. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 3b98f8e70626..7d3b6e7d852a 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1300,9 +1300,8 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, skb = __skb_dequeue(&xmitq); if (!skb) return; - tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); + tipc_bearer_xmit_skb(l->owner->net, l->bearer_id, skb, l->media_addr); l->rcv_unacked = 0; - kfree_skb(skb); } static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, -- cgit From c72fa872a23f03b2b9c17e88f3b0a8070924e5f1 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:46 -0400 Subject: tipc: eliminate link's reference to owner node With the recent commit series, we have established a one-way dependency between the link aggregation (struct tipc_node) instances and their pertaining tipc_link instances. This has enabled quite significant code and structure simplifications. In this commit, we eliminate the field 'owner', which points to an instance of struct tipc_node, from struct tipc_link, and replace it with a pointer to struct net, which is the only external reference now needed by a link instance. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 7d3b6e7d852a..819fb7163fa2 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -175,9 +175,12 @@ bool link_is_bc_rcvlink(struct tipc_link *l) int tipc_link_is_active(struct tipc_link *l) { - struct tipc_node *n = l->owner; + return l->active; +} - return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); +void tipc_link_set_active(struct tipc_link *l, bool active) +{ + l->active = active; } void tipc_link_add_bc_peer(struct tipc_link *snd_l, @@ -250,7 +253,7 @@ static u32 link_own_addr(struct tipc_link *l) * * Returns true if link was created, otherwise false */ -bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, +bool tipc_link_create(struct net *net, char *if_name, int bearer_id, int tolerance, char net_plane, u32 mtu, int priority, int window, u32 session, u32 ownnode, u32 peer, u16 peer_caps, @@ -284,7 +287,7 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, l->addr = peer; l->peer_caps = peer_caps; l->media_addr = maddr; - l->owner = n; + l->net = net; l->peer_session = WILDCARD_SESSION; l->bearer_id = bearer_id; l->tolerance = tolerance; @@ -318,7 +321,7 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, * * Returns true if link was created, otherwise false */ -bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer, +bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, int mtu, int window, u16 peer_caps, struct sk_buff_head *inputq, struct sk_buff_head *namedq, @@ -327,7 +330,7 @@ bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer, { struct tipc_link *l; - if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, + if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, 0, ownnode, peer, peer_caps, NULL, bc_sndlink, NULL, inputq, namedq, link)) return false; @@ -889,10 +892,10 @@ void tipc_link_push_packets(struct tipc_link *link) msg_set_ack(msg, ack); msg_set_seqno(msg, seqno); seqno = mod(seqno + 1); - msg_set_bcast_ack(msg, link->owner->bclink.last_in); + /* msg_set_bcast_ack(msg, link->owner->bclink.last_in); */ link->rcv_unacked = 0; __skb_queue_tail(&link->transmq, skb); - tipc_bearer_send(link->owner->net, link->bearer_id, + tipc_bearer_send(link->net, link->bearer_id, skb, link->media_addr); } link->snd_nxt = seqno; @@ -966,8 +969,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, break; msg = buf_msg(skb); msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb, + /* msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); */ + tipc_bearer_send(l_ptr->net, l_ptr->bearer_id, skb, l_ptr->media_addr); retransmits--; l_ptr->stats.retransmitted++; @@ -1102,9 +1105,9 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, } return 0; } else if (usr == BCAST_PROTOCOL) { - tipc_bcast_lock(l->owner->net); + tipc_bcast_lock(l->net); tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); - tipc_bcast_unlock(l->owner->net); + tipc_bcast_unlock(l->net); } drop: kfree_skb(skb); @@ -1300,7 +1303,7 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, skb = __skb_dequeue(&xmitq); if (!skb) return; - tipc_bearer_xmit_skb(l->owner->net, l->bearer_id, skb, l->media_addr); + tipc_bearer_xmit_skb(l->net, l->bearer_id, skb, l->media_addr); l->rcv_unacked = 0; } @@ -2004,7 +2007,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg, if (tipc_link_is_up(link)) if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) goto attr_msg_full; - if (tipc_link_is_active(link)) + if (link->active) if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE)) goto attr_msg_full; -- cgit From 2af5ae372a4b6d6e2d3314af0e9c865d6d64f8d3 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:48 -0400 Subject: tipc: clean up unused code and structures After the previous changes in this series, we can now remove some unused code and structures, both in the broadcast, link aggregation and link code. There are no functional changes in this commit. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 222 ++------------------------------------------------------ 1 file changed, 8 insertions(+), 214 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 819fb7163fa2..4449fa01e232 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -244,7 +244,6 @@ static u32 link_own_addr(struct tipc_link *l) * @ownnode: identity of own node * @peer: node id of peer node * @peer_caps: bitmap describing peer node capabilities - * @maddr: media address to be used * @bc_sndlink: the namespace global link used for broadcast sending * @bc_rcvlink: the peer specific link used for broadcast reception * @inputq: queue to put messages ready for delivery @@ -257,7 +256,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, int tolerance, char net_plane, u32 mtu, int priority, int window, u32 session, u32 ownnode, u32 peer, u16 peer_caps, - struct tipc_media_addr *maddr, struct tipc_link *bc_sndlink, struct tipc_link *bc_rcvlink, struct sk_buff_head *inputq, @@ -286,7 +284,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, l->addr = peer; l->peer_caps = peer_caps; - l->media_addr = maddr; l->net = net; l->peer_session = WILDCARD_SESSION; l->bearer_id = bearer_id; @@ -331,7 +328,7 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, struct tipc_link *l; if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, - 0, ownnode, peer, peer_caps, NULL, bc_sndlink, + 0, ownnode, peer, peer_caps, bc_sndlink, NULL, inputq, namedq, link)) return false; @@ -662,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l) } } -/** - * tipc_link_reset_fragments - purge link's inbound message fragments queue - * @l_ptr: pointer to link - */ -void tipc_link_reset_fragments(struct tipc_link *l_ptr) -{ - kfree_skb(l_ptr->reasm_buf); - l_ptr->reasm_buf = NULL; -} - -void tipc_link_purge_backlog(struct tipc_link *l) -{ - __skb_queue_purge(&l->backlogq); - l->backlog[TIPC_LOW_IMPORTANCE].len = 0; - l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; - l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; - l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; - l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; -} - -/** - * tipc_link_purge_queues - purge all pkt queues associated with link - * @l_ptr: pointer to link - */ -void tipc_link_purge_queues(struct tipc_link *l_ptr) -{ - __skb_queue_purge(&l_ptr->deferdq); - __skb_queue_purge(&l_ptr->transmq); - tipc_link_purge_backlog(l_ptr); - tipc_link_reset_fragments(l_ptr); -} - void tipc_link_reset(struct tipc_link *l) { /* Link is down, accept any session */ @@ -705,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l) /* Prepare for renewed mtu size negotiation */ l->mtu = l->advertised_mtu; - /* Clean up all queues: */ + /* Clean up all queues and counters: */ __skb_queue_purge(&l->transmq); __skb_queue_purge(&l->deferdq); skb_queue_splice_init(&l->wakeupq, l->inputq); - - tipc_link_purge_backlog(l); + __skb_queue_purge(&l->backlogq); + l->backlog[TIPC_LOW_IMPORTANCE].len = 0; + l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; + l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; + l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; + l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; kfree_skb(l->reasm_buf); kfree_skb(l->failover_reasm_skb); l->reasm_buf = NULL; @@ -726,74 +695,6 @@ void tipc_link_reset(struct tipc_link *l) link_reset_statistics(l); } -/** - * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked - * @link: link to use - * @list: chain of buffers containing message - * - * Consumes the buffer chain, except when returning an error code, - * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS - * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted - */ -int __tipc_link_xmit(struct net *net, struct tipc_link *link, - struct sk_buff_head *list) -{ - struct tipc_msg *msg = buf_msg(skb_peek(list)); - unsigned int maxwin = link->window; - unsigned int i, imp = msg_importance(msg); - uint mtu = link->mtu; - u16 ack = mod(link->rcv_nxt - 1); - u16 seqno = link->snd_nxt; - u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1; - struct tipc_media_addr *addr = link->media_addr; - struct sk_buff_head *transmq = &link->transmq; - struct sk_buff_head *backlogq = &link->backlogq; - struct sk_buff *skb, *bskb; - - /* Match msg importance against this and all higher backlog limits: */ - for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { - if (unlikely(link->backlog[i].len >= link->backlog[i].limit)) - return link_schedule_user(link, list); - } - if (unlikely(msg_size(msg) > mtu)) - return -EMSGSIZE; - - /* Prepare each packet for sending, and add to relevant queue: */ - while (skb_queue_len(list)) { - skb = skb_peek(list); - msg = buf_msg(skb); - msg_set_seqno(msg, seqno); - msg_set_ack(msg, ack); - msg_set_bcast_ack(msg, bc_ack); - - if (likely(skb_queue_len(transmq) < maxwin)) { - __skb_dequeue(list); - __skb_queue_tail(transmq, skb); - tipc_bearer_send(net, link->bearer_id, skb, addr); - link->rcv_unacked = 0; - seqno++; - continue; - } - if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) { - kfree_skb(__skb_dequeue(list)); - link->stats.sent_bundled++; - continue; - } - if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) { - kfree_skb(__skb_dequeue(list)); - __skb_queue_tail(backlogq, bskb); - link->backlog[msg_importance(buf_msg(bskb))].len++; - link->stats.sent_bundled++; - link->stats.sent_bundles++; - continue; - } - link->backlog[imp].len += skb_queue_len(list); - skb_queue_splice_tail_init(list, backlogq); - } - link->snd_nxt = seqno; - return 0; -} - /** * tipc_link_xmit(): enqueue buffer list according to queue situation * @link: link to use @@ -867,40 +768,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return 0; } -/* - * tipc_link_push_packets - push unsent packets to bearer - * - * Push out the unsent messages of a link where congestion - * has abated. Node is locked. - * - * Called with node locked - */ -void tipc_link_push_packets(struct tipc_link *link) -{ - struct sk_buff *skb; - struct tipc_msg *msg; - u16 seqno = link->snd_nxt; - u16 ack = mod(link->rcv_nxt - 1); - - while (skb_queue_len(&link->transmq) < link->window) { - skb = __skb_dequeue(&link->backlogq); - if (!skb) - break; - TIPC_SKB_CB(skb)->ackers = link->ackers; - msg = buf_msg(skb); - link->backlog[msg_importance(msg)].len--; - msg_set_ack(msg, ack); - msg_set_seqno(msg, seqno); - seqno = mod(seqno + 1); - /* msg_set_bcast_ack(msg, link->owner->bclink.last_in); */ - link->rcv_unacked = 0; - __skb_queue_tail(&link->transmq, skb); - tipc_bearer_send(link->net, link->bearer_id, - skb, link->media_addr); - } - link->snd_nxt = seqno; -} - void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) { struct sk_buff *skb, *_skb; @@ -943,40 +810,6 @@ static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb) msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr)); } -void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, - u32 retransmits) -{ - struct tipc_msg *msg; - - if (!skb) - return; - - msg = buf_msg(skb); - - /* Detect repeated retransmit failures */ - if (l_ptr->last_retransm == msg_seqno(msg)) { - if (++l_ptr->stale_count > 100) { - link_retransmit_failure(l_ptr, skb); - return; - } - } else { - l_ptr->last_retransm = msg_seqno(msg); - l_ptr->stale_count = 1; - } - - skb_queue_walk_from(&l_ptr->transmq, skb) { - if (!retransmits) - break; - msg = buf_msg(skb); - msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); - /* msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); */ - tipc_bearer_send(l_ptr->net, l_ptr->bearer_id, skb, - l_ptr->media_addr); - retransmits--; - l_ptr->stats.retransmitted++; - } -} - int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, struct sk_buff_head *xmitq) { @@ -1249,45 +1082,6 @@ drop: return rc; } -/** - * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue - * - * Returns increase in queue length (i.e. 0 or 1) - */ -u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) -{ - struct sk_buff *skb1; - u16 seq_no = buf_seqno(skb); - - /* Empty queue ? */ - if (skb_queue_empty(list)) { - __skb_queue_tail(list, skb); - return 1; - } - - /* Last ? */ - if (less(buf_seqno(skb_peek_tail(list)), seq_no)) { - __skb_queue_tail(list, skb); - return 1; - } - - /* Locate insertion point in queue, then insert; discard if duplicate */ - skb_queue_walk(list, skb1) { - u16 curr_seqno = buf_seqno(skb1); - - if (seq_no == curr_seqno) { - kfree_skb(skb); - return 0; - } - - if (less(seq_no, curr_seqno)) - break; - } - - __skb_queue_before(list, skb1, skb); - return 1; -} - /* * Send protocol message to the other endpoint. */ -- cgit From 742e038330a485350334ee5eb75dce4a9dff87cd Mon Sep 17 00:00:00 2001 From: Wu Fengguang Date: Sat, 24 Oct 2015 22:56:01 +0800 Subject: tipc: link_is_bc_sndlink() can be static TO: "David S. Miller" CC: netdev@vger.kernel.org CC: Jon Maloy CC: Ying Xue CC: tipc-discussion@lists.sourceforge.net CC: linux-kernel@vger.kernel.org Signed-off-by: Fengguang Wu Acked-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 4449fa01e232..9efbdbde2b08 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -163,12 +163,12 @@ bool tipc_link_is_blocked(struct tipc_link *l) return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); } -bool link_is_bc_sndlink(struct tipc_link *l) +static bool link_is_bc_sndlink(struct tipc_link *l) { return !l->bc_sndlink; } -bool link_is_bc_rcvlink(struct tipc_link *l) +static bool link_is_bc_rcvlink(struct tipc_link *l) { return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l)); } @@ -1364,8 +1364,8 @@ static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast, * Give a newly added peer node the sequence number where it should * start receiving and acking broadcast packets. */ -void tipc_link_build_bc_init_msg(struct tipc_link *l, - struct sk_buff_head *xmitq) +static void tipc_link_build_bc_init_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) { struct sk_buff_head list; -- cgit