summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--net/tipc/group.c148
-rw-r--r--net/tipc/group.h11
-rw-r--r--net/tipc/msg.h5
-rw-r--r--net/tipc/socket.c48
4 files changed, 190 insertions, 22 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 1bfa9348b26d..b8ed70abba01 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -46,6 +46,7 @@
#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
#define ADV_IDLE ADV_UNIT
+#define ADV_ACTIVE (ADV_UNIT * 12)
enum mbr_state {
MBR_QUARANTINED,
@@ -59,16 +60,22 @@ enum mbr_state {
struct tipc_member {
struct rb_node tree_node;
struct list_head list;
+ struct list_head congested;
struct sk_buff *event_msg;
+ struct tipc_group *group;
u32 node;
u32 port;
u32 instance;
enum mbr_state state;
+ u16 advertised;
+ u16 window;
u16 bc_rcv_nxt;
+ bool usr_pending;
};
struct tipc_group {
struct rb_root members;
+ struct list_head congested;
struct tipc_nlist dests;
struct net *net;
int subid;
@@ -86,11 +93,24 @@ struct tipc_group {
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq);
+static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
+{
+ int mcnt = grp->member_cnt + 1;
+
+ /* Scale to bytes, considering worst-case truesize/msgsize ratio */
+ return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
+}
+
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
{
return grp->bc_snd_nxt;
}
+static bool tipc_group_is_enabled(struct tipc_member *m)
+{
+ return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
+}
+
static bool tipc_group_is_receiver(struct tipc_member *m)
{
return m && m->state >= MBR_JOINED;
@@ -111,6 +131,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
if (!grp)
return NULL;
tipc_nlist_init(&grp->dests, tipc_own_addr(net));
+ INIT_LIST_HEAD(&grp->congested);
grp->members = RB_ROOT;
grp->net = net;
grp->portid = portid;
@@ -213,6 +234,8 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
if (!m)
return NULL;
INIT_LIST_HEAD(&m->list);
+ INIT_LIST_HEAD(&m->congested);
+ m->group = grp;
m->node = node;
m->port = port;
grp->member_cnt++;
@@ -233,6 +256,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
rb_erase(&m->tree_node, &grp->members);
grp->member_cnt--;
list_del_init(&m->list);
+ list_del_init(&m->congested);
/* If last member on a node, remove node from dest list */
if (!tipc_group_find_node(grp, m->node))
@@ -255,11 +279,59 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
*scope = grp->scope;
}
-void tipc_group_update_bc_members(struct tipc_group *grp)
+void tipc_group_update_member(struct tipc_member *m, int len)
+{
+ struct tipc_group *grp = m->group;
+ struct tipc_member *_m, *tmp;
+
+ if (!tipc_group_is_enabled(m))
+ return;
+
+ m->window -= len;
+
+ if (m->window >= ADV_IDLE)
+ return;
+
+ if (!list_empty(&m->congested))
+ return;
+
+ /* Sort member into congested members' list */
+ list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
+ if (m->window > _m->window)
+ continue;
+ list_add_tail(&m->congested, &_m->congested);
+ return;
+ }
+ list_add_tail(&m->congested, &grp->congested);
+}
+
+void tipc_group_update_bc_members(struct tipc_group *grp, int len)
{
+ struct tipc_member *m;
+ struct rb_node *n;
+
+ for (n = rb_first(&grp->members); n; n = rb_next(n)) {
+ m = container_of(n, struct tipc_member, tree_node);
+ if (tipc_group_is_enabled(m))
+ tipc_group_update_member(m, len);
+ }
grp->bc_snd_nxt++;
}
+bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+{
+ struct tipc_member *m;
+
+ if (list_empty(&grp->congested))
+ return false;
+
+ m = list_first_entry(&grp->congested, struct tipc_member, congested);
+ if (m->window >= len)
+ return false;
+
+ return true;
+}
+
/* tipc_group_filter_msg() - determine if we should accept arriving message
*/
void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
@@ -302,11 +374,36 @@ drop:
kfree_skb(skb);
}
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
+ u32 port, struct sk_buff_head *xmitq)
+{
+ struct tipc_member *m;
+
+ m = tipc_group_find_member(grp, node, port);
+ if (!m)
+ return;
+
+ m->advertised -= blks;
+
+ switch (m->state) {
+ case MBR_JOINED:
+ if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ break;
+ case MBR_DISCOVERED:
+ case MBR_JOINING:
+ case MBR_LEAVING:
+ default:
+ break;
+ }
+}
+
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr;
struct sk_buff *skb;
+ int adv = 0;
skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
m->node, tipc_own_addr(grp->net),
@@ -314,14 +411,24 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
if (!skb)
return;
+ if (m->state == MBR_JOINED)
+ adv = ADV_ACTIVE - m->advertised;
+
hdr = buf_msg(skb);
- if (mtyp == GRP_JOIN_MSG)
+
+ if (mtyp == GRP_JOIN_MSG) {
msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
+ msg_set_adv_win(hdr, adv);
+ m->advertised += adv;
+ } else if (mtyp == GRP_ADV_MSG) {
+ msg_set_adv_win(hdr, adv);
+ m->advertised += adv;
+ }
__skb_queue_tail(xmitq, skb);
}
-void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
- struct sk_buff_head *inputq,
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
+ struct tipc_msg *hdr, struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
u32 node = msg_orignode(hdr);
@@ -341,14 +448,22 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
if (!m)
return;
m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+ m->window += msg_adv_win(hdr);
/* Wait until PUBLISH event is received */
if (m->state == MBR_DISCOVERED) {
m->state = MBR_JOINING;
} else if (m->state == MBR_PUBLISHED) {
m->state = MBR_JOINED;
+ *usr_wakeup = true;
+ m->usr_pending = false;
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
__skb_queue_tail(inputq, m->event_msg);
}
+ if (m->window < ADV_IDLE)
+ tipc_group_update_member(m, 0);
+ else
+ list_del_init(&m->congested);
return;
case GRP_LEAVE_MSG:
if (!m)
@@ -361,14 +476,28 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
}
/* Otherwise deliver already received WITHDRAW event */
__skb_queue_tail(inputq, m->event_msg);
+ *usr_wakeup = m->usr_pending;
tipc_group_delete_member(grp, m);
+ list_del_init(&m->congested);
+ return;
+ case GRP_ADV_MSG:
+ if (!m)
+ return;
+ m->window += msg_adv_win(hdr);
+ *usr_wakeup = m->usr_pending;
+ m->usr_pending = false;
+ list_del_init(&m->congested);
return;
default:
pr_warn("Received unknown GROUP_PROTO message\n");
}
}
+/* tipc_group_member_evt() - receive and handle a member up/down event
+ */
void tipc_group_member_evt(struct tipc_group *grp,
+ bool *usr_wakeup,
+ int *sk_rcvbuf,
struct sk_buff *skb,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
@@ -416,16 +545,25 @@ void tipc_group_member_evt(struct tipc_group *grp,
} else {
__skb_queue_tail(inputq, skb);
m->state = MBR_JOINED;
+ *usr_wakeup = true;
+ m->usr_pending = false;
}
m->instance = instance;
TIPC_SKB_CB(skb)->orig_member = m->instance;
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+ if (m->window < ADV_IDLE)
+ tipc_group_update_member(m, 0);
+ else
+ list_del_init(&m->congested);
} else if (event == TIPC_WITHDRAWN) {
if (!m)
goto drop;
TIPC_SKB_CB(skb)->orig_member = m->instance;
+ *usr_wakeup = m->usr_pending;
+ m->usr_pending = false;
+
/* Hold back event if more messages might be expected */
if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
m->event_msg = skb;
@@ -434,7 +572,9 @@ void tipc_group_member_evt(struct tipc_group *grp,
__skb_queue_tail(inputq, skb);
tipc_group_delete_member(grp, m);
}
+ list_del_init(&m->congested);
}
+ *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
return;
drop:
kfree_skb(skb);
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 5d3f10d28967..0e2740e1da90 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -52,15 +52,18 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
void tipc_group_filter_msg(struct tipc_group *grp,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
-void tipc_group_member_evt(struct tipc_group *grp,
- struct sk_buff *skb,
+void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
+ int *sk_rcvbuf, struct sk_buff *skb,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
-void tipc_group_proto_rcv(struct tipc_group *grp,
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
-void tipc_group_update_bc_members(struct tipc_group *grp);
+void tipc_group_update_bc_members(struct tipc_group *grp, int len);
+bool tipc_group_bc_cong(struct tipc_group *grp, int len);
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
+ u32 port, struct sk_buff_head *xmitq);
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
int tipc_group_size(struct tipc_group *grp);
#endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 1b527b154e46..237d007499f9 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -538,6 +538,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
*/
#define GRP_JOIN_MSG 0
#define GRP_LEAVE_MSG 1
+#define GRP_ADV_MSG 2
/*
* Word 1
@@ -790,12 +791,12 @@ static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 16, 0xffff, n);
}
-static inline u32 msg_adv_win(struct tipc_msg *m)
+static inline u16 msg_adv_win(struct tipc_msg *m)
{
return msg_bits(m, 9, 0, 0xffff);
}
-static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
+static inline void msg_set_adv_win(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 9, 0, 0xffff, n);
}
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 0a2eac309177..50145c95ac96 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)
return tsk->snt_unacked > tsk->snd_win;
}
+static u16 tsk_blocks(int len)
+{
+ return ((len / FLOWCTL_BLK_SZ) + 1);
+}
+
/* tsk_blocks(): translate a buffer size in bytes to number of
* advertisable blocks, taking into account the ratio truesize(len)/len
* We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
struct tipc_group *grp = tsk->group;
struct tipc_nlist *dsts = tipc_group_dests(grp);
struct tipc_mc_method *method = &tsk->mc_method;
+ int blks = tsk_blocks(MCAST_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
int mtu = tipc_bcast_get_mtu(net);
struct sk_buff_head pkts;
@@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
if (!dsts->local && !dsts->remote)
return -EHOSTUNREACH;
- /* Block or return if any destination link is congested */
- rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+ /* Block or return if any destination link or member is congested */
+ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
+ !tipc_group_bc_cong(grp, blks));
if (unlikely(rc))
return rc;
/* Complete message header */
msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
- msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+ msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0);
msg_set_nameinst(hdr, 0);
@@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
if (unlikely(rc))
return rc;
- /* Update broadcast sequence number */
- tipc_group_update_bc_members(tsk->group);
-
+ /* Update broadcast sequence number and send windows */
+ tipc_group_update_bc_members(tsk->group, blks);
return dlen;
}
@@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE;
- if (unlikely(grp))
+ if (unlikely(grp && !dest))
return tipc_send_group_bcast(sock, m, dlen, timeout);
if (unlikely(!dest)) {
@@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
bool connected = !tipc_sk_type_connectionless(sk);
struct tipc_sock *tsk = tipc_sk(sk);
int rc, err, hlen, dlen, copy;
+ struct sk_buff_head xmitq;
struct tipc_msg *hdr;
struct sk_buff *skb;
bool grp_evt;
@@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
}
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ /* Step rcv queue to first msg with data or error; wait if necessary */
do {
- /* Look at first msg in receive queue; wait if necessary */
rc = tipc_wait_for_rcvmsg(sock, &timeout);
if (unlikely(rc))
goto exit;
@@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
if (unlikely(flags & MSG_PEEK))
goto exit;
+ /* Send group flow control advertisement when applicable */
+ if (tsk->group && msg_in_group(hdr) && !grp_evt) {
+ skb_queue_head_init(&xmitq);
+ tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
+ msg_orignode(hdr), msg_origport(hdr),
+ &xmitq);
+ tipc_node_distr_xmit(sock_net(sk), &xmitq);
+ }
+
tsk_advance_rx_queue(sk);
if (likely(!connected))
goto exit;
- /* Send connection flow control ack when applicable */
+ /* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);
@@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
struct tipc_group *grp = tsk->group;
+ bool wakeup = false;
switch (msg_user(hdr)) {
case CONN_MANAGER:
@@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk,
case SOCK_WAKEUP:
tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
tsk->cong_link_cnt--;
- sk->sk_write_space(sk);
+ wakeup = true;
break;
case GROUP_PROTOCOL:
- tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
+ tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
break;
case TOP_SRV:
- tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
+ tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
+ skb, inputq, xmitq);
skb = NULL;
break;
default:
break;
}
+ if (wakeup)
+ sk->sk_write_space(sk);
+
kfree_skb(skb);
}
@@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
+ if (unlikely(msg_in_group(hdr)))
+ return sk->sk_rcvbuf;
+
if (unlikely(!msg_connected(hdr)))
return sk->sk_rcvbuf << msg_importance(hdr);