summaryrefslogtreecommitdiff
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c149
1 files changed, 120 insertions, 29 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 877d94f34814..b36e16cdc945 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -181,7 +181,10 @@ struct tipc_link {
u16 acked;
struct tipc_link *bc_rcvlink;
struct tipc_link *bc_sndlink;
- int nack_state;
+ unsigned long prev_retr;
+ u16 prev_from;
+ u16 prev_to;
+ u8 nack_state;
bool bc_peer_is_up;
/* Statistics */
@@ -202,6 +205,8 @@ enum {
BC_NACK_SND_SUPPRESS,
};
+#define TIPC_BC_RETR_LIMIT 10 /* [ms] */
+
/*
* Interval between NACKs when packets arrive out of order
*/
@@ -237,8 +242,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
u16 rcvgap, int tolerance, int priority,
struct sk_buff_head *xmitq);
static void link_print(struct tipc_link *l, const char *str);
-static void tipc_link_build_nack_msg(struct tipc_link *l,
- struct sk_buff_head *xmitq);
+static int tipc_link_build_nack_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq);
static void tipc_link_build_bc_init_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
@@ -367,6 +372,18 @@ int tipc_link_bc_peers(struct tipc_link *l)
return l->ackers;
}
+u16 link_bc_rcv_gap(struct tipc_link *l)
+{
+ struct sk_buff *skb = skb_peek(&l->deferdq);
+ u16 gap = 0;
+
+ if (more(l->snd_nxt, l->rcv_nxt))
+ gap = l->snd_nxt - l->rcv_nxt;
+ if (skb)
+ gap = buf_seqno(skb) - l->rcv_nxt;
+ return gap;
+}
+
void tipc_link_set_mtu(struct tipc_link *l, int mtu)
{
l->mtu = mtu;
@@ -807,7 +824,7 @@ void link_prepare_wakeup(struct tipc_link *l)
skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
- lim = l->window + l->backlog[imp].limit;
+ lim = l->backlog[imp].limit;
pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
if ((pnd[imp] + l->backlog[imp].len) >= lim)
break;
@@ -873,9 +890,11 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
struct sk_buff *skb, *_skb, *bskb;
/* Match msg importance against this and all higher backlog limits: */
- for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
- if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
- return link_schedule_user(l, list);
+ if (!skb_queue_empty(backlogq)) {
+ for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+ if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+ return link_schedule_user(l, list);
+ }
}
if (unlikely(msg_size(hdr) > mtu)) {
skb_queue_purge(list);
@@ -1133,7 +1152,10 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf)
return 0;
l->rcv_unacked = 0;
- return TIPC_LINK_SND_BC_ACK;
+
+ /* Use snd_nxt to store peer's snd_nxt in broadcast rcv link */
+ l->snd_nxt = l->rcv_nxt;
+ return TIPC_LINK_SND_STATE;
}
/* Unicast ACK */
@@ -1162,17 +1184,26 @@ void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
}
/* tipc_link_build_nack_msg: prepare link nack message for transmission
+ * Note that sending of broadcast NACK is coordinated among nodes, to
+ * reduce the risk of NACK storms towards the sender
*/
-static void tipc_link_build_nack_msg(struct tipc_link *l,
- struct sk_buff_head *xmitq)
+static int tipc_link_build_nack_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq)
{
u32 def_cnt = ++l->stats.deferred_recv;
+ int match1, match2;
- if (link_is_bc_rcvlink(l))
- return;
+ if (link_is_bc_rcvlink(l)) {
+ match1 = def_cnt & 0xf;
+ match2 = tipc_own_addr(l->net) & 0xf;
+ if (match1 == match2)
+ return TIPC_LINK_SND_STATE;
+ return 0;
+ }
if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
+ return 0;
}
/* tipc_link_rcv - process TIPC packets/messages arriving from off-node
@@ -1223,7 +1254,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
/* Defer delivery if sequence gap */
if (unlikely(seqno != rcv_nxt)) {
__tipc_skb_queue_sorted(defq, seqno, skb);
- tipc_link_build_nack_msg(l, xmitq);
+ rc |= tipc_link_build_nack_msg(l, xmitq);
break;
}
@@ -1234,7 +1265,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
rc |= tipc_link_input(l, skb, l->inputq);
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
rc |= tipc_link_build_state_msg(l, xmitq);
- if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
+ if (unlikely(rc & ~TIPC_LINK_SND_STATE))
break;
} while ((skb = __skb_dequeue(defq)));
@@ -1248,10 +1279,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
u16 rcvgap, int tolerance, int priority,
struct sk_buff_head *xmitq)
{
+ struct tipc_link *bcl = l->bc_rcvlink;
struct sk_buff *skb;
struct tipc_msg *hdr;
struct sk_buff_head *dfq = &l->deferdq;
- bool node_up = link_is_up(l->bc_rcvlink);
+ bool node_up = link_is_up(bcl);
struct tipc_mon_state *mstate = &l->mon_state;
int dlen = 0;
void *data;
@@ -1279,7 +1311,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
msg_set_net_plane(hdr, l->net_plane);
msg_set_next_sent(hdr, l->snd_nxt);
msg_set_ack(hdr, l->rcv_nxt - 1);
- msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
+ msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
msg_set_link_tolerance(hdr, tolerance);
msg_set_linkprio(hdr, priority);
@@ -1289,6 +1321,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
if (mtyp == STATE_MSG) {
msg_set_seq_gap(hdr, rcvgap);
+ msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
msg_set_probe(hdr, probe);
tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
msg_set_size(hdr, INT_H_SIZE + dlen);
@@ -1571,51 +1604,107 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
l->rcv_nxt = peers_snd_nxt;
}
+/* link_bc_retr eval()- check if the indicated range can be retransmitted now
+ * - Adjust permitted range if there is overlap with previous retransmission
+ */
+static bool link_bc_retr_eval(struct tipc_link *l, u16 *from, u16 *to)
+{
+ unsigned long elapsed = jiffies_to_msecs(jiffies - l->prev_retr);
+
+ if (less(*to, *from))
+ return false;
+
+ /* New retransmission request */
+ if ((elapsed > TIPC_BC_RETR_LIMIT) ||
+ less(*to, l->prev_from) || more(*from, l->prev_to)) {
+ l->prev_from = *from;
+ l->prev_to = *to;
+ l->prev_retr = jiffies;
+ return true;
+ }
+
+ /* Inside range of previous retransmit */
+ if (!less(*from, l->prev_from) && !more(*to, l->prev_to))
+ return false;
+
+ /* Fully or partially outside previous range => exclude overlap */
+ if (less(*from, l->prev_from)) {
+ *to = l->prev_from - 1;
+ l->prev_from = *from;
+ }
+ if (more(*to, l->prev_to)) {
+ *from = l->prev_to + 1;
+ l->prev_to = *to;
+ }
+ l->prev_retr = jiffies;
+ return true;
+}
+
/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
*/
-void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
- struct sk_buff_head *xmitq)
+int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
+ struct sk_buff_head *xmitq)
{
+ struct tipc_link *snd_l = l->bc_sndlink;
u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
+ u16 from = msg_bcast_ack(hdr) + 1;
+ u16 to = from + msg_bc_gap(hdr) - 1;
+ int rc = 0;
if (!link_is_up(l))
- return;
+ return rc;
if (!msg_peer_node_is_up(hdr))
- return;
+ return rc;
/* Open when peer ackowledges our bcast init msg (pkt #1) */
if (msg_ack(hdr))
l->bc_peer_is_up = true;
if (!l->bc_peer_is_up)
- return;
+ return rc;
+
+ l->stats.recv_nacks++;
/* Ignore if peers_snd_nxt goes beyond receive window */
if (more(peers_snd_nxt, l->rcv_nxt + l->window))
- return;
+ return rc;
+
+ if (link_bc_retr_eval(snd_l, &from, &to))
+ rc = tipc_link_retrans(snd_l, from, to, xmitq);
+
+ l->snd_nxt = peers_snd_nxt;
+ if (link_bc_rcv_gap(l))
+ rc |= TIPC_LINK_SND_STATE;
+
+ /* Return now if sender supports nack via STATE messages */
+ if (l->peer_caps & TIPC_BCAST_STATE_NACK)
+ return rc;
+
+ /* Otherwise, be backwards compatible */
if (!more(peers_snd_nxt, l->rcv_nxt)) {
l->nack_state = BC_NACK_SND_CONDITIONAL;
- return;
+ return 0;
}
/* Don't NACK if one was recently sent or peeked */
if (l->nack_state == BC_NACK_SND_SUPPRESS) {
l->nack_state = BC_NACK_SND_UNCONDITIONAL;
- return;
+ return 0;
}
/* Conditionally delay NACK sending until next synch rcv */
if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
l->nack_state = BC_NACK_SND_UNCONDITIONAL;
if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
- return;
+ return 0;
}
/* Send NACK now but suppress next one */
tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
l->nack_state = BC_NACK_SND_SUPPRESS;
+ return 0;
}
void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
@@ -1652,6 +1741,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
}
/* tipc_link_bc_nack_rcv(): receive broadcast nack message
+ * This function is here for backwards compatibility, since
+ * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5.
*/
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq)
@@ -1692,10 +1783,10 @@ void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
l->window = win;
- l->backlog[TIPC_LOW_IMPORTANCE].limit = win / 2;
- l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = win;
- l->backlog[TIPC_HIGH_IMPORTANCE].limit = win / 2 * 3;
- l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
+ l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win);
+ l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = max_t(u16, 100, win * 2);
+ l->backlog[TIPC_HIGH_IMPORTANCE].limit = max_t(u16, 150, win * 3);
+ l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = max_t(u16, 200, win * 4);
l->backlog[TIPC_SYSTEM_IMPORTANCE].limit = max_bulk;
}