summaryrefslogtreecommitdiff
path: root/net/netfilter/ipvs/ip_vs_sync.c
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2012-05-08 19:39:49 +0200
committerPablo Neira Ayuso <pablo@netfilter.org>2012-05-08 19:39:53 +0200
commit1c003b1580e20ff9f500846677303a695b1837cc (patch)
treed00c951b6e4c88edd403b4d8ead96a57b9bfc808 /net/netfilter/ipvs/ip_vs_sync.c
parentcdcc5e905d59026fbf2e7f74f9cc834203b6207b (diff)
ipvs: wakeup master thread
High rate of sync messages in master can lead to overflowing the socket buffer and dropping the messages. Fixed sleep of 1 second without wakeup events is not suitable for loaded masters, Use delayed_work to schedule sending for queued messages and limit the delay to IPVS_SYNC_SEND_DELAY (20ms). This will reduce the rate of wakeups but to avoid sending long bursts we wakeup the master thread after IPVS_SYNC_WAKEUP_RATE (8) messages. Add hard limit for the queued messages before sending by using "sync_qlen_max" sysctl var. It defaults to 1/32 of the memory pages but actually represents number of messages. It will protect us from allocating large parts of memory when the sending rate is lower than the queuing rate. As suggested by Pablo, add new sysctl var "sync_sock_size" to configure the SNDBUF (master) or RCVBUF (slave) socket limit. Default value is 0 (preserve system defaults). Change the master thread to detect and block on SNDBUF overflow, so that we do not drop messages when the socket limit is low but the sync_qlen_max limit is not reached. On ENOBUFS or other errors just drop the messages. Change master thread to enter TASK_INTERRUPTIBLE state early, so that we do not miss wakeups due to messages or kthread_should_stop event. Thanks to Pablo Neira Ayuso for his valuable feedback! Signed-off-by: Julian Anastasov <ja@ssi.bg> Signed-off-by: Simon Horman <horms@verge.net.au>
Diffstat (limited to 'net/netfilter/ipvs/ip_vs_sync.c')
-rw-r--r--net/netfilter/ipvs/ip_vs_sync.c149
1 files changed, 117 insertions, 32 deletions
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index d2df694405f1..b3235b230139 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -307,11 +307,15 @@ static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
spin_lock_bh(&ipvs->sync_lock);
if (list_empty(&ipvs->sync_queue)) {
sb = NULL;
+ __set_current_state(TASK_INTERRUPTIBLE);
} else {
sb = list_entry(ipvs->sync_queue.next,
struct ip_vs_sync_buff,
list);
list_del(&sb->list);
+ ipvs->sync_queue_len--;
+ if (!ipvs->sync_queue_len)
+ ipvs->sync_queue_delay = 0;
}
spin_unlock_bh(&ipvs->sync_lock);
@@ -358,9 +362,16 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
struct ip_vs_sync_buff *sb = ipvs->sync_buff;
spin_lock(&ipvs->sync_lock);
- if (ipvs->sync_state & IP_VS_STATE_MASTER)
+ if (ipvs->sync_state & IP_VS_STATE_MASTER &&
+ ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
+ if (!ipvs->sync_queue_len)
+ schedule_delayed_work(&ipvs->master_wakeup_work,
+ max(IPVS_SYNC_SEND_DELAY, 1));
+ ipvs->sync_queue_len++;
list_add_tail(&sb->list, &ipvs->sync_queue);
- else
+ if ((++ipvs->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE)
+ wake_up_process(ipvs->master_thread);
+ } else
ip_vs_sync_buff_release(sb);
spin_unlock(&ipvs->sync_lock);
}
@@ -379,6 +390,7 @@ get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
time_after_eq(jiffies - ipvs->sync_buff->firstuse, time)) {
sb = ipvs->sync_buff;
ipvs->sync_buff = NULL;
+ __set_current_state(TASK_RUNNING);
} else
sb = NULL;
spin_unlock_bh(&ipvs->sync_buff_lock);
@@ -392,26 +404,23 @@ get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
void ip_vs_sync_switch_mode(struct net *net, int mode)
{
struct netns_ipvs *ipvs = net_ipvs(net);
+ struct ip_vs_sync_buff *sb;
+ spin_lock_bh(&ipvs->sync_buff_lock);
if (!(ipvs->sync_state & IP_VS_STATE_MASTER))
- return;
- if (mode == sysctl_sync_ver(ipvs) || !ipvs->sync_buff)
- return;
+ goto unlock;
+ sb = ipvs->sync_buff;
+ if (mode == sysctl_sync_ver(ipvs) || !sb)
+ goto unlock;
- spin_lock_bh(&ipvs->sync_buff_lock);
/* Buffer empty ? then let buf_create do the job */
- if (ipvs->sync_buff->mesg->size <= sizeof(struct ip_vs_sync_mesg)) {
- kfree(ipvs->sync_buff);
+ if (sb->mesg->size <= sizeof(struct ip_vs_sync_mesg)) {
+ ip_vs_sync_buff_release(sb);
ipvs->sync_buff = NULL;
- } else {
- spin_lock_bh(&ipvs->sync_lock);
- if (ipvs->sync_state & IP_VS_STATE_MASTER)
- list_add_tail(&ipvs->sync_buff->list,
- &ipvs->sync_queue);
- else
- ip_vs_sync_buff_release(ipvs->sync_buff);
- spin_unlock_bh(&ipvs->sync_lock);
- }
+ } else
+ sb_queue_tail(ipvs);
+
+unlock:
spin_unlock_bh(&ipvs->sync_buff_lock);
}
@@ -1130,6 +1139,28 @@ static void ip_vs_process_message(struct net *net, __u8 *buffer,
/*
+ * Setup sndbuf (mode=1) or rcvbuf (mode=0)
+ */
+static void set_sock_size(struct sock *sk, int mode, int val)
+{
+ /* setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)); */
+ /* setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)); */
+ lock_sock(sk);
+ if (mode) {
+ val = clamp_t(int, val, (SOCK_MIN_SNDBUF + 1) / 2,
+ sysctl_wmem_max);
+ sk->sk_sndbuf = val * 2;
+ sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
+ } else {
+ val = clamp_t(int, val, (SOCK_MIN_RCVBUF + 1) / 2,
+ sysctl_rmem_max);
+ sk->sk_rcvbuf = val * 2;
+ sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
+ }
+ release_sock(sk);
+}
+
+/*
* Setup loopback of outgoing multicasts on a sending socket
*/
static void set_mcast_loop(struct sock *sk, u_char loop)
@@ -1305,6 +1336,9 @@ static struct socket *make_send_sock(struct net *net)
set_mcast_loop(sock->sk, 0);
set_mcast_ttl(sock->sk, 1);
+ result = sysctl_sync_sock_size(ipvs);
+ if (result > 0)
+ set_sock_size(sock->sk, 1, result);
result = bind_mcastif_addr(sock, ipvs->master_mcast_ifn);
if (result < 0) {
@@ -1350,6 +1384,9 @@ static struct socket *make_receive_sock(struct net *net)
sk_change_net(sock->sk, net);
/* it is equivalent to the REUSEADDR option in user-space */
sock->sk->sk_reuse = SK_CAN_REUSE;
+ result = sysctl_sync_sock_size(ipvs);
+ if (result > 0)
+ set_sock_size(sock->sk, 0, result);
result = sock->ops->bind(sock, (struct sockaddr *) &mcast_addr,
sizeof(struct sockaddr));
@@ -1392,18 +1429,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
return len;
}
-static void
+static int
ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
{
int msize;
+ int ret;
msize = msg->size;
/* Put size in network byte order */
msg->size = htons(msg->size);
- if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
- pr_err("ip_vs_send_async error\n");
+ ret = ip_vs_send_async(sock, (char *)msg, msize);
+ if (ret >= 0 || ret == -EAGAIN)
+ return ret;
+ pr_err("ip_vs_send_async error %d\n", ret);
+ return 0;
}
static int
@@ -1428,36 +1469,75 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
return len;
}
+/* Wakeup the master thread for sending */
+static void master_wakeup_work_handler(struct work_struct *work)
+{
+ struct netns_ipvs *ipvs = container_of(work, struct netns_ipvs,
+ master_wakeup_work.work);
+
+ spin_lock_bh(&ipvs->sync_lock);
+ if (ipvs->sync_queue_len &&
+ ipvs->sync_queue_delay < IPVS_SYNC_WAKEUP_RATE) {
+ ipvs->sync_queue_delay = IPVS_SYNC_WAKEUP_RATE;
+ wake_up_process(ipvs->master_thread);
+ }
+ spin_unlock_bh(&ipvs->sync_lock);
+}
+
+/* Get next buffer to send */
+static inline struct ip_vs_sync_buff *
+next_sync_buff(struct netns_ipvs *ipvs)
+{
+ struct ip_vs_sync_buff *sb;
+
+ sb = sb_dequeue(ipvs);
+ if (sb)
+ return sb;
+ /* Do not delay entries in buffer for more than 2 seconds */
+ return get_curr_sync_buff(ipvs, 2 * HZ);
+}
static int sync_thread_master(void *data)
{
struct ip_vs_sync_thread_data *tinfo = data;
struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
+ struct sock *sk = tinfo->sock->sk;
struct ip_vs_sync_buff *sb;
pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
"syncid = %d\n",
ipvs->master_mcast_ifn, ipvs->master_syncid);
- while (!kthread_should_stop()) {
- while ((sb = sb_dequeue(ipvs))) {
- ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
- ip_vs_sync_buff_release(sb);
+ for (;;) {
+ sb = next_sync_buff(ipvs);
+ if (unlikely(kthread_should_stop()))
+ break;
+ if (!sb) {
+ schedule_timeout(IPVS_SYNC_CHECK_PERIOD);
+ continue;
}
-
- /* check if entries stay in ipvs->sync_buff for 2 seconds */
- sb = get_curr_sync_buff(ipvs, 2 * HZ);
- if (sb) {
- ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
- ip_vs_sync_buff_release(sb);
+ while (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) < 0) {
+ int ret = 0;
+
+ __wait_event_interruptible(*sk_sleep(sk),
+ sock_writeable(sk) ||
+ kthread_should_stop(),
+ ret);
+ if (unlikely(kthread_should_stop()))
+ goto done;
}
-
- schedule_timeout_interruptible(HZ);
+ ip_vs_sync_buff_release(sb);
}
+done:
+ __set_current_state(TASK_RUNNING);
+ if (sb)
+ ip_vs_sync_buff_release(sb);
+
/* clean up the sync_buff queue */
while ((sb = sb_dequeue(ipvs)))
ip_vs_sync_buff_release(sb);
+ __set_current_state(TASK_RUNNING);
/* clean up the current sync_buff */
sb = get_curr_sync_buff(ipvs, 0);
@@ -1538,6 +1618,10 @@ int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
realtask = &ipvs->master_thread;
name = "ipvs_master:%d";
threadfn = sync_thread_master;
+ ipvs->sync_queue_len = 0;
+ ipvs->sync_queue_delay = 0;
+ INIT_DELAYED_WORK(&ipvs->master_wakeup_work,
+ master_wakeup_work_handler);
sock = make_send_sock(net);
} else if (state == IP_VS_STATE_BACKUP) {
if (ipvs->backup_thread)
@@ -1623,6 +1707,7 @@ int stop_sync_thread(struct net *net, int state)
spin_lock_bh(&ipvs->sync_lock);
ipvs->sync_state &= ~IP_VS_STATE_MASTER;
spin_unlock_bh(&ipvs->sync_lock);
+ cancel_delayed_work_sync(&ipvs->master_wakeup_work);
retc = kthread_stop(ipvs->master_thread);
ipvs->master_thread = NULL;
} else if (state == IP_VS_STATE_BACKUP) {