From c008413850d1d48cc02c940280bf2dcf76160f4c Mon Sep 17 00:00:00 2001 From: Erik Hugne Date: Wed, 7 Nov 2012 02:40:07 -0500 Subject: tipc: remove obsolete flush of stale reassembly buffer Each link instance has a periodic job checking if there is a stale ongoing message reassembly associated to the link. If no new fragment has been received during the last 4*[link_tolerance] period, it is assumed the missing fragment will never arrive. As a consequence, the reassembly buffer is discarded, and a gap in the message sequence occurs. This assumption is wrong. After we abandoned our ambition to develop packet routing for multi-cluster networks, only single-hop packet transfer remains as an option. For those, all packets are guaranteed to be delivered in sequence to the defragmentation layer. Any failure to achieve sequenced delivery will eventually lead to link reset, and the reassembly buffer will be flushed anyway. So we just remove this periodic check, which is now obsolete. Signed-off-by: Erik Hugne Acked-by: Ying Xue Signed-off-by: Jon Maloy [PG: also delete get/inc_timer count, since they are now unused] Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 44 -------------------------------------------- 1 file changed, 44 deletions(-) (limited to 'net') diff --git a/net/tipc/link.c b/net/tipc/link.c index 87bf5aad704b..daa6080a2a0c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -97,7 +97,6 @@ static int link_send_sections_long(struct tipc_port *sender, struct iovec const *msg_sect, u32 num_sect, unsigned int total_len, u32 destnode); -static void link_check_defragm_bufs(struct tipc_link *l_ptr); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); @@ -271,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr) } /* do all other link processing performed on a periodic basis */ - link_check_defragm_bufs(l_ptr); link_state_event(l_ptr, TIMEOUT_EVT); @@ -2497,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp) msg_set_bcast_ack(buf_msg(buf), exp); } -static u32 get_timer_cnt(struct sk_buff *buf) -{ - return msg_reroute_cnt(buf_msg(buf)); -} - -static void incr_timer_cnt(struct sk_buff *buf) -{ - msg_incr_reroute_cnt(buf_msg(buf)); -} - /* * tipc_link_recv_fragment(): Called with node lock on. Returns * the reassembled buffer if message is complete. @@ -2585,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, return 0; } -/** - * link_check_defragm_bufs - flush stale incoming message fragments - * @l_ptr: pointer to link - */ -static void link_check_defragm_bufs(struct tipc_link *l_ptr) -{ - struct sk_buff *prev = NULL; - struct sk_buff *next = NULL; - struct sk_buff *buf = l_ptr->defragm_buf; - - if (!buf) - return; - if (!link_working_working(l_ptr)) - return; - while (buf) { - u32 cnt = get_timer_cnt(buf); - - next = buf->next; - if (cnt < 4) { - incr_timer_cnt(buf); - prev = buf; - } else { - if (prev) - prev->next = buf->next; - else - l_ptr->defragm_buf = buf->next; - kfree_skb(buf); - } - buf = next; - } -} - static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) { if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) -- cgit From 9da3d475874f4da49057767913af95ce01063ba3 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Tue, 27 Nov 2012 06:15:27 -0500 Subject: tipc: eliminate aggregate sk_receive_queue limit As a complement to the per-socket sk_recv_queue limit, TIPC keeps a global atomic counter for the sum of sk_recv_queue sizes across all tipc sockets. When incremented, the counter is compared to an upper threshold value, and if this is reached, the message is rejected with error code TIPC_OVERLOAD. This check was originally meant to protect the node against buffer exhaustion and general CPU overload. However, all experience indicates that the feature not only is redundant on Linux, but even harmful. Users run into the limit very often, causing disturbances for their applications, while removing it seems to have no negative effects at all. We have also seen that overall performance is boosted significantly when this bottleneck is removed. Furthermore, we don't see any other network protocols maintaining such a mechanism, something strengthening our conviction that this control can be eliminated. As a result, the atomic variable tipc_queue_size is now unused and so it can be deleted. There is a getsockopt call that used to allow reading it; we retain that but just return zero for maximum compatibility. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Cc: Neil Horman [PG: phase out tipc_queue_size as pointed out by Neil Horman] Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) (limited to 'net') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 1a720c86e80a..848be692cb45 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2,7 +2,7 @@ * net/tipc/socket.c: TIPC socket API * * Copyright (c) 2001-2007, Ericsson AB - * Copyright (c) 2004-2008, 2010-2011, Wind River Systems + * Copyright (c) 2004-2008, 2010-2012, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -73,8 +73,6 @@ static struct proto tipc_proto; static int sockets_enabled; -static atomic_t tipc_queue_size = ATOMIC_INIT(0); - /* * Revised TIPC socket locking policy: * @@ -128,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0); static void advance_rx_queue(struct sock *sk) { kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); - atomic_dec(&tipc_queue_size); } /** @@ -140,10 +137,8 @@ static void discard_rx_queue(struct sock *sk) { struct sk_buff *buf; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { - atomic_dec(&tipc_queue_size); + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) kfree_skb(buf); - } } /** @@ -155,10 +150,8 @@ static void reject_rx_queue(struct sock *sk) { struct sk_buff *buf; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) tipc_reject_msg(buf, TIPC_ERR_NO_PORT); - atomic_dec(&tipc_queue_size); - } } /** @@ -280,7 +273,6 @@ static int release(struct socket *sock) buf = __skb_dequeue(&sk->sk_receive_queue); if (buf == NULL) break; - atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) kfree_skb(buf); else { @@ -1241,11 +1233,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) } /* Reject message if there isn't room to queue it */ - recv_q_len = (u32)atomic_read(&tipc_queue_size); - if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) { - if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE)) - return TIPC_ERR_OVERLOAD; - } recv_q_len = skb_queue_len(&sk->sk_receive_queue); if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) @@ -1254,7 +1241,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) /* Enqueue message (finally!) */ TIPC_SKB_CB(buf)->handle = 0; - atomic_inc(&tipc_queue_size); __skb_queue_tail(&sk->sk_receive_queue, buf); /* Initiate connection termination for an incoming 'FIN' */ @@ -1578,7 +1564,6 @@ restart: /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ buf = __skb_dequeue(&sk->sk_receive_queue); if (buf) { - atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) { kfree_skb(buf); goto restart; @@ -1717,7 +1702,7 @@ static int getsockopt(struct socket *sock, /* no need to set "res", since already 0 at this point */ break; case TIPC_NODE_RECVQ_DEPTH: - value = (u32)atomic_read(&tipc_queue_size); + value = 0; /* was tipc_queue_size, now obsolete */ break; case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); -- cgit From e643df156ade104b0430588562d25b8638683fc1 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Tue, 27 Nov 2012 06:15:29 -0500 Subject: tipc: change sk_receive_queue upper limit The sk_recv_queue upper limit for connectionless sockets has empirically turned out to be too low. When we double the current limit we get much fewer rejected messages and no noticable negative side-effects. Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 848be692cb45..f553fdecc1d2 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, Ericsson AB + * Copyright (c) 2001-2007, 2012 Ericsson AB * Copyright (c) 2004-2008, 2010-2012, Wind River Systems * All rights reserved. * @@ -43,7 +43,7 @@ #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ -#define OVERLOAD_LIMIT_BASE 5000 +#define OVERLOAD_LIMIT_BASE 10000 #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ struct tipc_sock { -- cgit From bc879117d4cf2a6fcf5c5a43f157143bbbe88e84 Mon Sep 17 00:00:00 2001 From: Paul Gortmaker Date: Thu, 29 Nov 2012 13:48:40 -0500 Subject: tipc: standardize across connect/disconnect function naming Currently we have tipc_disconnect and tipc_disconnect_port. It is not clear from the names alone, what they do or how they differ. It turns out that tipc_disconnect just deals with the port locking and then calls tipc_disconnect_port which does all the work. If we rename as follows: tipc_disconnect_port --> __tipc_disconnect then we will be following typical linux convention, where: __tipc_disconnect: "raw" function that does all the work. tipc_disconnect: wrapper that deals with locking and then calls the real core __tipc_disconnect function With this, the difference is immediately evident, and locking violations are more apt to be spotted by chance while working on, or even just while reading the code. On the connect side of things, we currently only have the single "tipc_connect2port" function. It does both the locking at enter/exit, and the core of the work. Pending changes will make it desireable to have the connect be a two part locking wrapper + worker function, just like the disconnect is already. Here, we make the connect look just like the updated disconnect case, for the above reason, and for consistency. In the process, we also get rid of the "2port" suffix that was on the original name, since it adds no descriptive value. On close examination, one might notice that the above connect changes implicitly move the call to tipc_link_get_max_pkt() to be within the scope of tipc_port_lock() protected region; when it was not previously. We don't see any issues with this, and it is in keeping with __tipc_connect doing the work and tipc_connect just handling the locking. Signed-off-by: Paul Gortmaker --- net/tipc/port.c | 32 +++++++++++++++++++++++--------- net/tipc/port.h | 6 ++++-- net/tipc/socket.c | 6 +++--- net/tipc/subscr.c | 2 +- 4 files changed, 31 insertions(+), 15 deletions(-) (limited to 'net') diff --git a/net/tipc/port.c b/net/tipc/port.c index 07c42fba672b..18098cac62f2 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -726,7 +726,7 @@ static void port_dispatcher_sigh(void *dummy) if (unlikely(!cb)) goto reject; if (unlikely(!connected)) { - if (tipc_connect2port(dref, &orig)) + if (tipc_connect(dref, &orig)) goto reject; } else if (peer_invalid) goto reject; @@ -1036,15 +1036,30 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) return res; } -int tipc_connect2port(u32 ref, struct tipc_portid const *peer) +int tipc_connect(u32 ref, struct tipc_portid const *peer) { struct tipc_port *p_ptr; - struct tipc_msg *msg; - int res = -EINVAL; + int res; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; + res = __tipc_connect(ref, p_ptr, peer); + tipc_port_unlock(p_ptr); + return res; +} + +/* + * __tipc_connect - connect to a remote peer + * + * Port must be locked. + */ +int __tipc_connect(u32 ref, struct tipc_port *p_ptr, + struct tipc_portid const *peer) +{ + struct tipc_msg *msg; + int res = -EINVAL; + if (p_ptr->published || p_ptr->connected) goto exit; if (!peer->ref) @@ -1067,17 +1082,16 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer) (net_ev_handler)port_handle_node_down); res = 0; exit: - tipc_port_unlock(p_ptr); p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); return res; } -/** - * tipc_disconnect_port - disconnect port from peer +/* + * __tipc_disconnect - disconnect port from peer * * Port must be locked. */ -int tipc_disconnect_port(struct tipc_port *tp_ptr) +int __tipc_disconnect(struct tipc_port *tp_ptr) { int res; @@ -1104,7 +1118,7 @@ int tipc_disconnect(u32 ref) p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; - res = tipc_disconnect_port(p_ptr); + res = __tipc_disconnect(p_ptr); tipc_port_unlock(p_ptr); return res; } diff --git a/net/tipc/port.h b/net/tipc/port.h index 4660e3065790..fb66e2e5f4d1 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -190,7 +190,7 @@ int tipc_publish(u32 portref, unsigned int scope, int tipc_withdraw(u32 portref, unsigned int scope, struct tipc_name_seq const *name_seq); -int tipc_connect2port(u32 portref, struct tipc_portid const *port); +int tipc_connect(u32 portref, struct tipc_portid const *port); int tipc_disconnect(u32 portref); @@ -200,7 +200,9 @@ int tipc_shutdown(u32 ref); /* * The following routines require that the port be locked on entry */ -int tipc_disconnect_port(struct tipc_port *tp_ptr); +int __tipc_disconnect(struct tipc_port *tp_ptr); +int __tipc_connect(u32 ref, struct tipc_port *p_ptr, + struct tipc_portid const *peer); int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); /* diff --git a/net/tipc/socket.c b/net/tipc/socket.c index f553fdecc1d2..b630f3839ca3 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -783,7 +783,7 @@ static int auto_connect(struct socket *sock, struct tipc_msg *msg) tsock->peer_name.ref = msg_origport(msg); tsock->peer_name.node = msg_orignode(msg); - tipc_connect2port(tsock->p->ref, &tsock->peer_name); + tipc_connect(tsock->p->ref, &tsock->peer_name); tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); sock->state = SS_CONNECTED; return 0; @@ -1246,7 +1246,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) /* Initiate connection termination for an incoming 'FIN' */ if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) { sock->state = SS_DISCONNECTING; - tipc_disconnect_port(tipc_sk_port(sk)); + __tipc_disconnect(tipc_sk_port(sk)); } sk->sk_data_ready(sk, 0); @@ -1506,7 +1506,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) /* Connect new socket to it's peer */ new_tsock->peer_name.ref = msg_origport(msg); new_tsock->peer_name.node = msg_orignode(msg); - tipc_connect2port(new_ref, &new_tsock->peer_name); + tipc_connect(new_ref, &new_tsock->peer_name); new_sock->state = SS_CONNECTED; tipc_set_portimportance(new_ref, msg_importance(msg)); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 0f7d0d007e22..6b42d47029af 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -462,7 +462,7 @@ static void subscr_named_msg_event(void *usr_handle, kfree(subscriber); return; } - tipc_connect2port(subscriber->port_ref, orig); + tipc_connect(subscriber->port_ref, orig); /* Lock server port (& save lock address for future use) */ subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; -- cgit From 7e6c131e1568dcc2033736739a9880dce1976886 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Thu, 29 Nov 2012 18:39:14 -0500 Subject: tipc: consolidate connection-oriented message reception in one function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Handling of connection-related message reception is currently scattered around at different places in the code. This makes it harder to verify that things are handled correctly in all possible scenarios. So we consolidate the existing processing of connection-oriented message reception in a single routine. In the process, we convert the chain of if/else into a switch/case for improved readability. A cast on the socket_state in the switch is needed to avoid compile warnings on 32 bit, like "net/tipc/socket.c:1252:2: warning: case value ‘4294967295’ not in enumerated type". This happens because existing tipc code pseudo extends the default linux socket state values with: #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ It may make sense to add these as _positive_ values to the existing socket state enum list someday, vs. these already existing defines. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy [PG: add cast to fix warning; remove returns from middle of switch] Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 75 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 24 deletions(-) (limited to 'net') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b630f3839ca3..d16a6de32ea1 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1186,6 +1186,53 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base) return queue_size >= threshold; } +/** + * filter_connect - Handle all incoming messages for a connection-based socket + * @tsock: TIPC socket + * @msg: message + * + * Returns TIPC error status code and socket error status code + * once it encounters some errors + */ +static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) +{ + struct socket *sock = tsock->sk.sk_socket; + struct tipc_msg *msg = buf_msg(*buf); + u32 retval = TIPC_ERR_NO_PORT; + + if (msg_mcast(msg)) + return retval; + + switch ((int)sock->state) { + case SS_CONNECTED: + /* Accept only connection-based messages sent by peer */ + if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) { + if (unlikely(msg_errcode(msg))) { + sock->state = SS_DISCONNECTING; + __tipc_disconnect(tsock->p); + } + retval = TIPC_OK; + } + break; + case SS_CONNECTING: + /* Accept only ACK or NACK message */ + if (msg_connected(msg) || (msg_errcode(msg))) + retval = TIPC_OK; + break; + case SS_LISTENING: + case SS_UNCONNECTED: + /* Accept only SYN message */ + if (!msg_connected(msg) && !(msg_errcode(msg))) + retval = TIPC_OK; + break; + case SS_DISCONNECTING: + break; + default: + pr_err("Unknown socket state %u\n", sock->state); + } + return retval; +} + /** * filter_rcv - validate incoming message * @sk: socket @@ -1203,6 +1250,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) struct socket *sock = sk->sk_socket; struct tipc_msg *msg = buf_msg(buf); u32 recv_q_len; + u32 res = TIPC_OK; /* Reject message if it is wrong sort of message for socket */ if (msg_type(msg) > TIPC_DIRECT_MSG) @@ -1212,24 +1260,9 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) if (msg_connected(msg)) return TIPC_ERR_NO_PORT; } else { - if (msg_mcast(msg)) - return TIPC_ERR_NO_PORT; - if (sock->state == SS_CONNECTED) { - if (!msg_connected(msg) || - !tipc_port_peer_msg(tipc_sk_port(sk), msg)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_CONNECTING) { - if (!msg_connected(msg) && (msg_errcode(msg) == 0)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_LISTENING) { - if (msg_connected(msg) || msg_errcode(msg)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_DISCONNECTING) { - return TIPC_ERR_NO_PORT; - } else /* (sock->state == SS_UNCONNECTED) */ { - if (msg_connected(msg) || msg_errcode(msg)) - return TIPC_ERR_NO_PORT; - } + res = filter_connect(tipc_sk(sk), &buf); + if (res != TIPC_OK || buf == NULL) + return res; } /* Reject message if there isn't room to queue it */ @@ -1243,12 +1276,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) TIPC_SKB_CB(buf)->handle = 0; __skb_queue_tail(&sk->sk_receive_queue, buf); - /* Initiate connection termination for an incoming 'FIN' */ - if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) { - sock->state = SS_DISCONNECTING; - __tipc_disconnect(tipc_sk_port(sk)); - } - sk->sk_data_ready(sk, 0); return TIPC_OK; } -- cgit From 584d24b3960e4c877fc623214815f278708f127c Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Thu, 29 Nov 2012 18:51:19 -0500 Subject: tipc: introduce non-blocking socket connect TIPC has so far only supported blocking connect(), meaning that a call to connect() doesn't return until either the connection is fully established, or an error occurs. This has proved insufficient for many users, so we now introduce non-blocking connect(), analogous to how this is done in TCP and other protocols. With this feature, if a connection cannot be established instantly, connect() will return the error code "-EINPROGRESS". If the user later calls connect() again, he will either have the return code "-EALREADY" or "-EISCONN", depending on whether the connection has been established or not. The user must have explicitly set the socket to be non-blocking (SOCK_NONBLOCK or O_NONBLOCK, depending on method used), so unless for some reason they had set this already (the socket would anyway remain blocking in current TIPC) this change should be completely backwards compatible. It is also now possible to call select() or poll() to wait for the completion of a connection. An effect of the above is that the actual completion of a connection may now be performed asynchronously, independent of the calls from user space. Therefore, we now execute this code in BH context, in the function filter_rcv(), which is executed upon reception of messages in the socket. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy [PG: minor refactoring for improved connect/disconnect function names] Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 158 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 93 insertions(+), 65 deletions(-) (limited to 'net') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d16a6de32ea1..dbce2745f0a8 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -775,16 +775,19 @@ exit: static int auto_connect(struct socket *sock, struct tipc_msg *msg) { struct tipc_sock *tsock = tipc_sk(sock->sk); - - if (msg_errcode(msg)) { - sock->state = SS_DISCONNECTING; - return -ECONNREFUSED; - } + struct tipc_port *p_ptr; tsock->peer_name.ref = msg_origport(msg); tsock->peer_name.node = msg_orignode(msg); - tipc_connect(tsock->p->ref, &tsock->peer_name); - tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); + p_ptr = tipc_port_deref(tsock->p->ref); + if (!p_ptr) + return -EINVAL; + + __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name); + + if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE) + return -EINVAL; + msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg)); sock->state = SS_CONNECTED; return 0; } @@ -1198,7 +1201,9 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) { struct socket *sock = tsock->sk.sk_socket; struct tipc_msg *msg = buf_msg(*buf); + struct sock *sk = &tsock->sk; u32 retval = TIPC_ERR_NO_PORT; + int res; if (msg_mcast(msg)) return retval; @@ -1216,8 +1221,36 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) break; case SS_CONNECTING: /* Accept only ACK or NACK message */ - if (msg_connected(msg) || (msg_errcode(msg))) + if (unlikely(msg_errcode(msg))) { + sock->state = SS_DISCONNECTING; + sk->sk_err = -ECONNREFUSED; + retval = TIPC_OK; + break; + } + + if (unlikely(!msg_connected(msg))) + break; + + res = auto_connect(sock, msg); + if (res) { + sock->state = SS_DISCONNECTING; + sk->sk_err = res; retval = TIPC_OK; + break; + } + + /* If an incoming message is an 'ACK-', it should be + * discarded here because it doesn't contain useful + * data. In addition, we should try to wake up + * connect() routine if sleeping. + */ + if (msg_data_sz(msg) == 0) { + kfree_skb(*buf); + *buf = NULL; + if (waitqueue_active(sk_sleep(sk))) + wake_up_interruptible(sk_sleep(sk)); + } + retval = TIPC_OK; break; case SS_LISTENING: case SS_UNCONNECTED: @@ -1361,8 +1394,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, struct sock *sk = sock->sk; struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; struct msghdr m = {NULL,}; - struct sk_buff *buf; - struct tipc_msg *msg; unsigned int timeout; int res; @@ -1374,26 +1405,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, goto exit; } - /* For now, TIPC does not support the non-blocking form of connect() */ - if (flags & O_NONBLOCK) { - res = -EOPNOTSUPP; - goto exit; - } - - /* Issue Posix-compliant error code if socket is in the wrong state */ - if (sock->state == SS_LISTENING) { - res = -EOPNOTSUPP; - goto exit; - } - if (sock->state == SS_CONNECTING) { - res = -EALREADY; - goto exit; - } - if (sock->state != SS_UNCONNECTED) { - res = -EISCONN; - goto exit; - } - /* * Reject connection attempt using multicast address * @@ -1405,49 +1416,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, goto exit; } - /* Reject any messages already in receive queue (very unlikely) */ - reject_rx_queue(sk); + timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout; + + switch (sock->state) { + case SS_UNCONNECTED: + /* Send a 'SYN-' to destination */ + m.msg_name = dest; + m.msg_namelen = destlen; + + /* If connect is in non-blocking case, set MSG_DONTWAIT to + * indicate send_msg() is never blocked. + */ + if (!timeout) + m.msg_flags = MSG_DONTWAIT; + + res = send_msg(NULL, sock, &m, 0); + if ((res < 0) && (res != -EWOULDBLOCK)) + goto exit; - /* Send a 'SYN-' to destination */ - m.msg_name = dest; - m.msg_namelen = destlen; - res = send_msg(NULL, sock, &m, 0); - if (res < 0) + /* Just entered SS_CONNECTING state; the only + * difference is that return value in non-blocking + * case is EINPROGRESS, rather than EALREADY. + */ + res = -EINPROGRESS; + break; + case SS_CONNECTING: + res = -EALREADY; + break; + case SS_CONNECTED: + res = -EISCONN; + break; + default: + res = -EINVAL; goto exit; + } - /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ - timeout = tipc_sk(sk)->conn_timeout; - release_sock(sk); - res = wait_event_interruptible_timeout(*sk_sleep(sk), - (!skb_queue_empty(&sk->sk_receive_queue) || - (sock->state != SS_CONNECTING)), - timeout ? (long)msecs_to_jiffies(timeout) - : MAX_SCHEDULE_TIMEOUT); - lock_sock(sk); + if (sock->state == SS_CONNECTING) { + if (!timeout) + goto exit; - if (res > 0) { - buf = skb_peek(&sk->sk_receive_queue); - if (buf != NULL) { - msg = buf_msg(buf); - res = auto_connect(sock, msg); - if (!res) { - if (!msg_data_sz(msg)) - advance_rx_queue(sk); - } - } else { - if (sock->state == SS_CONNECTED) - res = -EISCONN; + /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ + release_sock(sk); + res = wait_event_interruptible_timeout(*sk_sleep(sk), + sock->state != SS_CONNECTING, + timeout ? (long)msecs_to_jiffies(timeout) + : MAX_SCHEDULE_TIMEOUT); + lock_sock(sk); + if (res <= 0) { + if (res == 0) + res = -ETIMEDOUT; else - res = -ECONNREFUSED; + ; /* leave "res" unchanged */ + goto exit; } - } else { - if (res == 0) - res = -ETIMEDOUT; - else - ; /* leave "res" unchanged */ - sock->state = SS_DISCONNECTING; } + if (unlikely(sock->state == SS_DISCONNECTING)) + res = sock_error(sk); + else + res = 0; + exit: release_sock(sk); return res; -- cgit From cbab368790f23bc917d97fcf7a338c5ba5336ee0 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Thu, 29 Nov 2012 16:28:30 +0800 Subject: tipc: eliminate connection setup for implied connect in recv_msg() As connection setup is now completed asynchronously in BH context, in the function filter_connect(), the corresponding code in recv_msg() becomes redundant. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 7 ------- 1 file changed, 7 deletions(-) (limited to 'net') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index dbce2745f0a8..ef75b6270247 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -946,13 +946,6 @@ restart: sz = msg_data_sz(msg); err = msg_errcode(msg); - /* Complete connection setup for an implied connect */ - if (unlikely(sock->state == SS_CONNECTING)) { - res = auto_connect(sock, msg); - if (res) - goto exit; - } - /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { advance_rx_queue(sk); -- cgit From 258f8667a29d72b1c220065632b39c0faeb061ca Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 3 Dec 2012 16:12:07 +0800 Subject: tipc: add lock nesting notation to quiet lockdep warning TIPC accept() call grabs the socket lock on a newly allocated socket while holding the socket lock on an old socket. But lockdep worries that this might be a recursive lock attempt: [ INFO: possible recursive locking detected ] --------------------------------------------- kworker/u:0/6 is trying to acquire lock: (sk_lock-AF_TIPC){+.+.+.}, at: [] accept+0x15c/0x310 [tipc] but task is already holding lock: (sk_lock-AF_TIPC){+.+.+.}, at: [] accept+0x28/0x310 [tipc] other info that might help us debug this: Possible unsafe locking scenario: CPU0 ---- lock(sk_lock-AF_TIPC); lock(sk_lock-AF_TIPC); *** DEADLOCK *** May be due to missing lock nesting notation [...] Tell lockdep that this locking is safe by using lock_sock_nested(). This is similar to what was done in commit 5131a184a3458d9 for SCTP code ("SCTP: lock_sock_nested in sctp_sock_migrate"). Also note that this is isn't something that is seen normally, as it was uncovered with some experimental work-in-progress code not yet ready for mainline. So no need for stable backports or similar of this commit. Signed-off-by: Ying Xue Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index ef75b6270247..b5c9795cf151 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1543,7 +1543,8 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) u32 new_ref = new_tport->ref; struct tipc_msg *msg = buf_msg(buf); - lock_sock(new_sk); + /* we lock on new_sk; but lockdep sees the lock on sk */ + lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); /* * Reject any stray messages received by new socket -- cgit From 0fef8f205f6f4cdff1869e54e44f317a79902785 Mon Sep 17 00:00:00 2001 From: Paul Gortmaker Date: Tue, 4 Dec 2012 11:01:55 -0500 Subject: tipc: refactor accept() code for improved readability In TIPC's accept() routine, there is a large block of code relating to initialization of a new socket, all within an if condition checking if the allocation succeeded. Here, we simply flip the check of the if, so that the main execution path stays at the same indentation level, which improves readability. If the allocation fails, we jump to an already existing exit label. Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 89 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 48 insertions(+), 41 deletions(-) (limited to 'net') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b5c9795cf151..9b4e4833a484 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1509,8 +1509,13 @@ static int listen(struct socket *sock, int len) */ static int accept(struct socket *sock, struct socket *new_sock, int flags) { - struct sock *sk = sock->sk; + struct sock *new_sk, *sk = sock->sk; struct sk_buff *buf; + struct tipc_sock *new_tsock; + struct tipc_port *new_tport; + struct tipc_msg *msg; + u32 new_ref; + int res; lock_sock(sk); @@ -1536,49 +1541,51 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) buf = skb_peek(&sk->sk_receive_queue); res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); - if (!res) { - struct sock *new_sk = new_sock->sk; - struct tipc_sock *new_tsock = tipc_sk(new_sk); - struct tipc_port *new_tport = new_tsock->p; - u32 new_ref = new_tport->ref; - struct tipc_msg *msg = buf_msg(buf); - - /* we lock on new_sk; but lockdep sees the lock on sk */ - lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); - - /* - * Reject any stray messages received by new socket - * before the socket lock was taken (very, very unlikely) - */ - reject_rx_queue(new_sk); - - /* Connect new socket to it's peer */ - new_tsock->peer_name.ref = msg_origport(msg); - new_tsock->peer_name.node = msg_orignode(msg); - tipc_connect(new_ref, &new_tsock->peer_name); - new_sock->state = SS_CONNECTED; - - tipc_set_portimportance(new_ref, msg_importance(msg)); - if (msg_named(msg)) { - new_tport->conn_type = msg_nametype(msg); - new_tport->conn_instance = msg_nameinst(msg); - } + if (res) + goto exit; - /* - * Respond to 'SYN-' by discarding it & returning 'ACK'-. - * Respond to 'SYN+' by queuing it on new socket. - */ - if (!msg_data_sz(msg)) { - struct msghdr m = {NULL,}; + new_sk = new_sock->sk; + new_tsock = tipc_sk(new_sk); + new_tport = new_tsock->p; + new_ref = new_tport->ref; + msg = buf_msg(buf); - advance_rx_queue(sk); - send_packet(NULL, new_sock, &m, 0); - } else { - __skb_dequeue(&sk->sk_receive_queue); - __skb_queue_head(&new_sk->sk_receive_queue, buf); - } - release_sock(new_sk); + /* we lock on new_sk; but lockdep sees the lock on sk */ + lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); + + /* + * Reject any stray messages received by new socket + * before the socket lock was taken (very, very unlikely) + */ + reject_rx_queue(new_sk); + + /* Connect new socket to it's peer */ + new_tsock->peer_name.ref = msg_origport(msg); + new_tsock->peer_name.node = msg_orignode(msg); + tipc_connect(new_ref, &new_tsock->peer_name); + new_sock->state = SS_CONNECTED; + + tipc_set_portimportance(new_ref, msg_importance(msg)); + if (msg_named(msg)) { + new_tport->conn_type = msg_nametype(msg); + new_tport->conn_instance = msg_nameinst(msg); } + + /* + * Respond to 'SYN-' by discarding it & returning 'ACK'-. + * Respond to 'SYN+' by queuing it on new socket. + */ + if (!msg_data_sz(msg)) { + struct msghdr m = {NULL,}; + + advance_rx_queue(sk); + send_packet(NULL, new_sock, &m, 0); + } else { + __skb_dequeue(&sk->sk_receive_queue); + __skb_queue_head(&new_sk->sk_receive_queue, buf); + } + release_sock(new_sk); + exit: release_sock(sk); return res; -- cgit