diff options
Diffstat (limited to 'net/rds/send.c')
| -rw-r--r-- | net/rds/send.c | 815 |
1 files changed, 595 insertions, 220 deletions
diff --git a/net/rds/send.c b/net/rds/send.c index 88eace57dd6b..0b3d0ef2f008 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006 Oracle. All rights reserved. + * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -38,6 +38,7 @@ #include <linux/list.h> #include <linux/ratelimit.h> #include <linux/export.h> +#include <linux/sizes.h> #include "rds.h" @@ -51,7 +52,7 @@ * it to 0 will restore the old behavior (where we looped until we had * drained the queue). */ -static int send_batch_count = 64; +static int send_batch_count = SZ_1K; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); @@ -61,14 +62,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status); * Reset the send state. Callers must ensure that this doesn't race with * rds_send_xmit(). */ -void rds_send_reset(struct rds_connection *conn) +void rds_send_path_reset(struct rds_conn_path *cp) { struct rds_message *rm, *tmp; unsigned long flags; - if (conn->c_xmit_rm) { - rm = conn->c_xmit_rm; - conn->c_xmit_rm = NULL; + if (cp->cp_xmit_rm) { + rm = cp->cp_xmit_rm; + cp->cp_xmit_rm = NULL; /* Tell the user the RDMA op is no longer mapped by the * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's @@ -77,45 +78,45 @@ void rds_send_reset(struct rds_connection *conn) rds_message_put(rm); } - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_data_sent = 0; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_data_sent = 0; - conn->c_map_queued = 0; + cp->cp_conn->c_map_queued = 0; - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; /* Mark messages as retransmissions, and move them to the send q */ - spin_lock_irqsave(&conn->c_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + spin_lock_irqsave(&cp->cp_lock, flags); + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); } - list_splice_init(&conn->c_retrans, &conn->c_send_queue); - spin_unlock_irqrestore(&conn->c_lock, flags); + list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); + spin_unlock_irqrestore(&cp->cp_lock, flags); } +EXPORT_SYMBOL_GPL(rds_send_path_reset); -static int acquire_in_xmit(struct rds_connection *conn) +static int acquire_in_xmit(struct rds_conn_path *cp) { - return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; + return test_and_set_bit_lock(RDS_IN_XMIT, &cp->cp_flags) == 0; } -static void release_in_xmit(struct rds_connection *conn) +static void release_in_xmit(struct rds_conn_path *cp) { - clear_bit(RDS_IN_XMIT, &conn->c_flags); - smp_mb__after_clear_bit(); + clear_bit_unlock(RDS_IN_XMIT, &cp->cp_flags); /* * We don't use wait_on_bit()/wake_up_bit() because our waking is in a * hot path and finding waiters is very rare. We don't want to walk * the system-wide hashed waitqueue buckets in the fast path only to * almost never find waiters. */ - if (waitqueue_active(&conn->c_waitq)) - wake_up_all(&conn->c_waitq); + if (waitqueue_active(&cp->cp_waitq)) + wake_up_all(&cp->cp_waitq); } /* @@ -132,16 +133,21 @@ static void release_in_xmit(struct rds_connection *conn) * - small message latency is higher behind queued large messages * - large message latency isn't starved by intervening small sends */ -int rds_send_xmit(struct rds_connection *conn) +int rds_send_xmit(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_message *rm; unsigned long flags; unsigned int tmp; struct scatterlist *sg; int ret = 0; LIST_HEAD(to_be_dropped); + int batch_count; + unsigned long send_gen = 0; + int same_rm = 0; restart: + batch_count = 0; /* * sendmsg calls here after having queued its message on the send @@ -150,24 +156,41 @@ restart: * avoids blocking the caller and trading per-connection data between * caches per message. */ - if (!acquire_in_xmit(conn)) { + if (!acquire_in_xmit(cp)) { rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; } + if (rds_destroy_pending(cp->cp_conn)) { + release_in_xmit(cp); + ret = -ENETUNREACH; /* dont requeue send work */ + goto out; + } + + /* + * we record the send generation after doing the xmit acquire. + * if someone else manages to jump in and do some work, we'll use + * this to avoid a goto restart farther down. + * + * The acquire_in_xmit() check above ensures that only one + * caller can increment c_send_gen at any time. + */ + send_gen = READ_ONCE(cp->cp_send_gen) + 1; + WRITE_ONCE(cp->cp_send_gen, send_gen); + /* * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, * we do the opposite to avoid races. */ - if (!rds_conn_up(conn)) { - release_in_xmit(conn); + if (!rds_conn_path_up(cp)) { + release_in_xmit(cp); ret = 0; goto out; } - if (conn->c_trans->xmit_prepare) - conn->c_trans->xmit_prepare(conn); + if (conn->c_trans->xmit_path_prepare) + conn->c_trans->xmit_path_prepare(cp); /* * spin trying to push headers and data down the connection until @@ -175,7 +198,18 @@ restart: */ while (1) { - rm = conn->c_xmit_rm; + rm = cp->cp_xmit_rm; + + if (!rm) { + same_rm = 0; + } else { + same_rm++; + if (same_rm >= 4096) { + rds_stats_inc(s_send_stuck_rm); + ret = -EAGAIN; + break; + } + } /* * If between sending messages, we can send a pending congestion @@ -188,24 +222,36 @@ restart: break; } rm->data.op_active = 1; + rm->m_inc.i_conn_path = cp; + rm->m_inc.i_conn = cp->cp_conn; - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* * If not already working on one, grab the next message. * - * c_xmit_rm holds a ref while we're sending this message down - * the connction. We can use this ref while holding the + * cp_xmit_rm holds a ref while we're sending this message down + * the connection. We can use this ref while holding the * send_sem.. rds_send_reset() is serialized with it. */ if (!rm) { unsigned int len; - spin_lock_irqsave(&conn->c_lock, flags); + batch_count++; - if (!list_empty(&conn->c_send_queue)) { - rm = list_entry(conn->c_send_queue.next, + /* we want to process as big a batch as we can, but + * we also want to avoid softlockups. If we've been + * through a lot of messages, lets back off and see + * if anyone else jumps in + */ + if (batch_count >= send_batch_count) + goto over_batch; + + spin_lock_irqsave(&cp->cp_lock, flags); + + if (!list_empty(&cp->cp_send_queue)) { + rm = list_entry(cp->cp_send_queue.next, struct rds_message, m_conn_item); rds_message_addref(rm); @@ -214,70 +260,82 @@ restart: * Move the message from the send queue to the retransmit * list right away. */ - list_move_tail(&rm->m_conn_item, &conn->c_retrans); + list_move_tail(&rm->m_conn_item, + &cp->cp_retrans); } - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); if (!rm) break; /* Unfortunately, the way Infiniband deals with * RDMA to a bad MR key is by moving the entire - * queue pair to error state. We cold possibly + * queue pair to error state. We could possibly * recover from that, but right now we drop the * connection. * Therefore, we never retransmit messages with RDMA ops. */ - if (rm->rdma.op_active && - test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { - spin_lock_irqsave(&conn->c_lock, flags); + if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) || + (rm->rdma.op_active && + test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) { + spin_lock_irqsave(&cp->cp_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move(&rm->m_conn_item, &to_be_dropped); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); continue; } /* Require an ACK every once in a while */ len = ntohl(rm->m_inc.i_hdr.h_len); - if (conn->c_unacked_packets == 0 || - conn->c_unacked_bytes < len) { - __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); - - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + if (cp->cp_unacked_packets == 0 || + cp->cp_unacked_bytes < len) { + set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); + + cp->cp_unacked_packets = + rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = + rds_sysctl_max_unacked_bytes; rds_stats_inc(s_send_ack_required); } else { - conn->c_unacked_bytes -= len; - conn->c_unacked_packets--; + cp->cp_unacked_bytes -= len; + cp->cp_unacked_packets--; } - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* The transport either sends the whole rdma or none of it */ - if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { rm->m_final_op = &rm->rdma; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue + */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); - if (ret) + if (ret) { + clear_bit(RDS_MSG_MAPPED, &rm->m_flags); + wake_up_interruptible(&rm->m_flush_wait); break; - conn->c_xmit_rdma_sent = 1; + } + cp->cp_xmit_rdma_sent = 1; - /* The transport owns the mapped memory for now. - * You can't unmap it while it's on the send queue */ - set_bit(RDS_MSG_MAPPED, &rm->m_flags); } - if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { rm->m_final_op = &rm->atomic; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue + */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); - if (ret) + if (ret) { + clear_bit(RDS_MSG_MAPPED, &rm->m_flags); + wake_up_interruptible(&rm->m_flush_wait); break; - conn->c_xmit_atomic_sent = 1; + } + cp->cp_xmit_atomic_sent = 1; - /* The transport owns the mapped memory for now. - * You can't unmap it while it's on the send queue */ - set_bit(RDS_MSG_MAPPED, &rm->m_flags); } /* @@ -302,41 +360,42 @@ restart: rm->data.op_active = 0; } - if (rm->data.op_active && !conn->c_xmit_data_sent) { + if (rm->data.op_active && !cp->cp_xmit_data_sent) { rm->m_final_op = &rm->data; + ret = conn->c_trans->xmit(conn, rm, - conn->c_xmit_hdr_off, - conn->c_xmit_sg, - conn->c_xmit_data_off); + cp->cp_xmit_hdr_off, + cp->cp_xmit_sg, + cp->cp_xmit_data_off); if (ret <= 0) break; - if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { + if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { tmp = min_t(int, ret, sizeof(struct rds_header) - - conn->c_xmit_hdr_off); - conn->c_xmit_hdr_off += tmp; + cp->cp_xmit_hdr_off); + cp->cp_xmit_hdr_off += tmp; ret -= tmp; } - sg = &rm->data.op_sg[conn->c_xmit_sg]; + sg = &rm->data.op_sg[cp->cp_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - - conn->c_xmit_data_off); - conn->c_xmit_data_off += tmp; + cp->cp_xmit_data_off); + cp->cp_xmit_data_off += tmp; ret -= tmp; - if (conn->c_xmit_data_off == sg->length) { - conn->c_xmit_data_off = 0; + if (cp->cp_xmit_data_off == sg->length) { + cp->cp_xmit_data_off = 0; sg++; - conn->c_xmit_sg++; - BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->data.op_nents); + cp->cp_xmit_sg++; + BUG_ON(ret != 0 && cp->cp_xmit_sg == + rm->data.op_nents); } } - if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && - (conn->c_xmit_sg == rm->data.op_nents)) - conn->c_xmit_data_sent = 1; + if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && + (cp->cp_xmit_sg == rm->data.op_nents)) + cp->cp_xmit_data_sent = 1; } /* @@ -344,23 +403,23 @@ restart: * if there is a data op. Thus, if the data is sent (or there was * none), then we're done with the rm. */ - if (!rm->data.op_active || conn->c_xmit_data_sent) { - conn->c_xmit_rm = NULL; - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_data_sent = 0; + if (!rm->data.op_active || cp->cp_xmit_data_sent) { + cp->cp_xmit_rm = NULL; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_data_sent = 0; rds_message_put(rm); } } - if (conn->c_trans->xmit_complete) - conn->c_trans->xmit_complete(conn); - - release_in_xmit(conn); +over_batch: + if (conn->c_trans->xmit_path_complete) + conn->c_trans->xmit_path_complete(cp); + release_in_xmit(cp); /* Nuke any messages we decided not to retransmit. */ if (!list_empty(&to_be_dropped)) { @@ -380,17 +439,35 @@ restart: * If the transport cannot continue (i.e ret != 0), then it must * call us when more room is available, such as from the tx * completion handler. + * + * We have an extra generation check here so that if someone manages + * to jump in after our release_in_xmit, we'll see that they have done + * some work and we will skip our goto */ if (ret == 0) { + bool raced; + smp_mb(); - if (!list_empty(&conn->c_send_queue)) { + raced = send_gen != READ_ONCE(cp->cp_send_gen); + + if ((test_bit(0, &conn->c_map_queued) || + !list_empty(&cp->cp_send_queue)) && !raced) { + if (batch_count < send_batch_count) + goto restart; + rcu_read_lock(); + if (rds_destroy_pending(cp->cp_conn)) + ret = -ENETUNREACH; + else + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rcu_read_unlock(); + } else if (raced) { rds_stats_inc(s_send_lock_queue_raced); - goto restart; } } out: return ret; } +EXPORT_SYMBOL_GPL(rds_send_xmit); static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) { @@ -517,42 +594,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) } /* - * This is called from the IB send completion when we detect - * a RDMA operation that failed with remote access error. - * So speed is not an issue here. - */ -struct rds_message *rds_send_get_message(struct rds_connection *conn, - struct rm_rdma_op *op) -{ - struct rds_message *rm, *tmp, *found = NULL; - unsigned long flags; - - spin_lock_irqsave(&conn->c_lock, flags); - - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - goto out; - } - } - - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - break; - } - } - -out: - spin_unlock_irqrestore(&conn->c_lock, flags); - - return found; -} -EXPORT_SYMBOL_GPL(rds_send_get_message); - -/* * This removes messages from the socket's list if they're on it. The list * argument must be private to the caller, we must be able to modify it * without locks. The messages must have a reference held for their @@ -593,8 +634,11 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status) sock_put(rds_rs_to_sk(rs)); } rs = rm->m_rs; - sock_hold(rds_rs_to_sk(rs)); + if (rs) + sock_hold(rds_rs_to_sk(rs)); } + if (!rs) + goto unlock_and_drop; spin_lock(&rs->rs_lock); if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { @@ -614,7 +658,6 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status) rm->rdma.op_notifier = NULL; } was_on_sock = 1; - rm->m_rs = NULL; } spin_unlock(&rs->rs_lock); @@ -638,20 +681,17 @@ unlock_and_drop: * queue. This means that in the TCP case, the message may not have been * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked * checks the RDS_MSG_HAS_ACK_SEQ bit. - * - * XXX It's not clear to me how this is safely serialized with socket - * destruction. Maybe it should bail if it sees SOCK_DEAD. */ -void rds_send_drop_acked(struct rds_connection *conn, u64 ack, - is_acked_func is_acked) +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked) { struct rds_message *rm, *tmp; unsigned long flags; LIST_HEAD(list); - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { if (!rds_send_is_acked(rm, ack, is_acked)) break; @@ -661,19 +701,28 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, /* order flag updates with spin locks */ if (!list_empty(&list)) - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* now remove the messages from the sock list as needed */ rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); } +EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); + +void rds_send_drop_acked(struct rds_connection *conn, u64 ack, + is_acked_func is_acked) +{ + WARN_ON(conn->c_trans->t_mp_capable); + rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); +} EXPORT_SYMBOL_GPL(rds_send_drop_acked); -void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) +void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) { struct rds_message *rm, *tmp; struct rds_connection *conn; + struct rds_conn_path *cp; unsigned long flags; LIST_HEAD(list); @@ -681,8 +730,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) spin_lock_irqsave(&rs->rs_lock, flags); list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { - if (dest && (dest->sin_addr.s_addr != rm->m_daddr || - dest->sin_port != rm->m_inc.i_hdr.h_dport)) + if (dest && + (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) || + dest->sin6_port != rm->m_inc.i_hdr.h_dport)) continue; list_move(&rm->m_sock_item, &list); @@ -691,7 +741,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) } /* order flag updates with the rs lock */ - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); spin_unlock_irqrestore(&rs->rs_lock, flags); @@ -702,19 +752,23 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) list_for_each_entry(rm, &list, m_sock_item) { conn = rm->m_inc.i_conn; + if (conn->c_trans->t_mp_capable) + cp = rm->m_inc.i_conn_path; + else + cp = &conn->c_path[0]; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); /* * Maybe someone else beat us to removing rm from the conn. * If we race with their flag update we'll get the lock and * then really see that the flag has been cleared. */ if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); continue; } list_del_init(&rm->m_conn_item); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* * Couldn't grab m_rs_lock in top loop (lock ordering), @@ -726,7 +780,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); spin_unlock(&rs->rs_lock); - rm->m_rs = NULL; spin_unlock_irqrestore(&rm->m_rs_lock, flags); rds_message_put(rm); @@ -737,8 +790,21 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) while (!list_empty(&list)) { rm = list_entry(list.next, struct rds_message, m_sock_item); list_del_init(&rm->m_sock_item); - rds_message_wait(rm); + + /* just in case the code above skipped this message + * because RDS_MSG_ON_CONN wasn't set, run it again here + * taking m_rs_lock is the only thing that keeps us + * from racing with ack processing. + */ + spin_lock_irqsave(&rm->m_rs_lock, flags); + + spin_lock(&rs->rs_lock); + __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); + spin_unlock(&rs->rs_lock); + + spin_unlock_irqrestore(&rm->m_rs_lock, flags); + rds_message_put(rm); } } @@ -749,6 +815,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) * message from the flow with RDS_CANCEL_SENT_TO. */ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, + struct rds_conn_path *cp, struct rds_message *rm, __be16 sport, __be16 dport, int *queued) { @@ -781,24 +848,26 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, * throughput hits a certain threshold. */ if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) - __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); + set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); rds_message_addref(rm); + sock_hold(rds_rs_to_sk(rs)); rm->m_rs = rs; /* The code ordering is a little weird, but we're trying to minimize the time we hold c_lock */ rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); rm->m_inc.i_conn = conn; + rm->m_inc.i_conn_path = cp; rds_message_addref(rm); - spin_lock(&conn->c_lock); - rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock(&cp->cp_lock); + rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); - spin_unlock(&conn->c_lock); + spin_unlock(&cp->cp_lock); rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", rm, len, rs, rs->rs_snd_bytes, @@ -816,14 +885,20 @@ out: * rds_message is getting to be quite complicated, and we'd like to allocate * it all in one go. This figures out how big it needs to be up front. */ -static int rds_rm_size(struct msghdr *msg, int data_len) +static int rds_rm_size(struct msghdr *msg, int num_sgs, + struct rds_iov_vector_arr *vct) { struct cmsghdr *cmsg; int size = 0; int cmsg_groups = 0; int retval; + bool zcopy_cookie = false; + struct rds_iov_vector *iov, *tmp_iov; + + if (num_sgs < 0) + return -EINVAL; - for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + for_each_cmsghdr(cmsg, msg) { if (!CMSG_OK(msg, cmsg)) return -EINVAL; @@ -832,14 +907,34 @@ static int rds_rm_size(struct msghdr *msg, int data_len) switch (cmsg->cmsg_type) { case RDS_CMSG_RDMA_ARGS: + if (vct->indx >= vct->len) { + vct->len += vct->incr; + tmp_iov = + krealloc(vct->vec, + vct->len * + sizeof(struct rds_iov_vector), + GFP_KERNEL); + if (!tmp_iov) { + vct->len -= vct->incr; + return -ENOMEM; + } + vct->vec = tmp_iov; + } + iov = &vct->vec[vct->indx]; + memset(iov, 0, sizeof(struct rds_iov_vector)); + vct->indx++; cmsg_groups |= 1; - retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); + retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov); if (retval < 0) return retval; size += retval; break; + case RDS_CMSG_ZCOPY_COOKIE: + zcopy_cookie = true; + fallthrough; + case RDS_CMSG_RDMA_DEST: case RDS_CMSG_RDMA_MAP: cmsg_groups |= 2; @@ -860,7 +955,10 @@ static int rds_rm_size(struct msghdr *msg, int data_len) } - size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); + if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie) + return -EINVAL; + + size += num_sgs * sizeof(struct scatterlist); /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ if (cmsg_groups == 3) @@ -869,13 +967,27 @@ static int rds_rm_size(struct msghdr *msg, int data_len) return size; } +static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm, + struct cmsghdr *cmsg) +{ + u32 *cookie; + + if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) || + !rm->data.op_mmp_znotifier) + return -EINVAL; + cookie = CMSG_DATA(cmsg); + rm->data.op_mmp_znotifier->z_cookie = *cookie; + return 0; +} + static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, - struct msghdr *msg, int *allocated_mr) + struct msghdr *msg, int *allocated_mr, + struct rds_iov_vector_arr *vct) { struct cmsghdr *cmsg; - int ret = 0; + int ret = 0, ind = 0; - for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + for_each_cmsghdr(cmsg, msg) { if (!CMSG_OK(msg, cmsg)) return -EINVAL; @@ -887,7 +999,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, */ switch (cmsg->cmsg_type) { case RDS_CMSG_RDMA_ARGS: - ret = rds_cmsg_rdma_args(rs, rm, cmsg); + if (ind >= vct->indx) + return -ENOMEM; + ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]); + ind++; break; case RDS_CMSG_RDMA_DEST: @@ -898,6 +1013,11 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, ret = rds_cmsg_rdma_map(rs, rm, cmsg); if (!ret) *allocated_mr = 1; + else if (ret == -ENODEV) + /* Accommodate the get_mr() case which can fail + * if connection isn't established yet. + */ + ret = -EAGAIN; break; case RDS_CMSG_ATOMIC_CSWP: case RDS_CMSG_ATOMIC_FADD: @@ -906,6 +1026,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, ret = rds_cmsg_atomic(rs, rm, cmsg); break; + case RDS_CMSG_ZCOPY_COOKIE: + ret = rds_cmsg_zcopy(rs, rm, cmsg); + break; + default: return -EINVAL; } @@ -917,13 +1041,68 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, return ret; } -int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, - size_t payload_len) +static int rds_send_mprds_hash(struct rds_sock *rs, + struct rds_connection *conn, int nonblock) +{ + int hash; + + if (conn->c_npaths == 0) + hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS); + else + hash = RDS_MPATH_HASH(rs, conn->c_npaths); + if (conn->c_npaths == 0 && hash != 0) { + rds_send_ping(conn, 0); + + /* The underlying connection is not up yet. Need to wait + * until it is up to be sure that the non-zero c_path can be + * used. But if we are interrupted, we have to use the zero + * c_path in case the connection ends up being non-MP capable. + */ + if (conn->c_npaths == 0) { + /* Cannot wait for the connection be made, so just use + * the base c_path. + */ + if (nonblock) + return 0; + if (wait_event_interruptible(conn->c_hs_waitq, + conn->c_npaths != 0)) + hash = 0; + } + if (conn->c_npaths == 1) + hash = 0; + } + return hash; +} + +static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes) +{ + struct rds_rdma_args *args; + struct cmsghdr *cmsg; + + for_each_cmsghdr(cmsg, msg) { + if (!CMSG_OK(msg, cmsg)) + return -EINVAL; + + if (cmsg->cmsg_level != SOL_RDS) + continue; + + if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) { + if (cmsg->cmsg_len < + CMSG_LEN(sizeof(struct rds_rdma_args))) + return -EINVAL; + args = CMSG_DATA(cmsg); + *rdma_bytes += args->remote_vec.bytes; + } + } + return 0; +} + +int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) { struct sock *sk = sock->sk; struct rds_sock *rs = rds_sk_to_rs(sk); - struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; - __be32 daddr; + DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name); + DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name); __be16 dport; struct rds_message *rm = NULL; struct rds_connection *conn; @@ -931,38 +1110,157 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, int queued = 0, allocated_mr = 0; int nonblock = msg->msg_flags & MSG_DONTWAIT; long timeo = sock_sndtimeo(sk, nonblock); + struct rds_conn_path *cpath; + struct in6_addr daddr; + __u32 scope_id = 0; + size_t rdma_payload_len = 0; + bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) && + sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY)); + int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE); + int namelen; + struct rds_iov_vector_arr vct; + int ind; + + memset(&vct, 0, sizeof(vct)); + + /* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */ + vct.incr = 1; /* Mirror Linux UDP mirror of BSD error message compatibility */ /* XXX: Perhaps MSG_MORE someday */ - if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { + if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) { ret = -EOPNOTSUPP; goto out; } - if (msg->msg_namelen) { - /* XXX fail non-unicast destination IPs? */ - if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) { + namelen = msg->msg_namelen; + if (namelen != 0) { + if (namelen < sizeof(*usin)) { + ret = -EINVAL; + goto out; + } + switch (usin->sin_family) { + case AF_INET: + if (usin->sin_addr.s_addr == htonl(INADDR_ANY) || + usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) || + ipv4_is_multicast(usin->sin_addr.s_addr)) { + ret = -EINVAL; + goto out; + } + ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr); + dport = usin->sin_port; + break; + +#if IS_ENABLED(CONFIG_IPV6) + case AF_INET6: { + int addr_type; + + if (namelen < sizeof(*sin6)) { + ret = -EINVAL; + goto out; + } + addr_type = ipv6_addr_type(&sin6->sin6_addr); + if (!(addr_type & IPV6_ADDR_UNICAST)) { + __be32 addr4; + + if (!(addr_type & IPV6_ADDR_MAPPED)) { + ret = -EINVAL; + goto out; + } + + /* It is a mapped address. Need to do some + * sanity checks. + */ + addr4 = sin6->sin6_addr.s6_addr32[3]; + if (addr4 == htonl(INADDR_ANY) || + addr4 == htonl(INADDR_BROADCAST) || + ipv4_is_multicast(addr4)) { + ret = -EINVAL; + goto out; + } + } + if (addr_type & IPV6_ADDR_LINKLOCAL) { + if (sin6->sin6_scope_id == 0) { + ret = -EINVAL; + goto out; + } + scope_id = sin6->sin6_scope_id; + } + + daddr = sin6->sin6_addr; + dport = sin6->sin6_port; + break; + } +#endif + + default: ret = -EINVAL; goto out; } - daddr = usin->sin_addr.s_addr; - dport = usin->sin_port; } else { /* We only care about consistency with ->connect() */ lock_sock(sk); daddr = rs->rs_conn_addr; dport = rs->rs_conn_port; + scope_id = rs->rs_bound_scope_id; release_sock(sk); } - /* racing with another thread binding seems ok here */ - if (daddr == 0 || rs->rs_bound_addr == 0) { - ret = -ENOTCONN; /* XXX not a great errno */ + lock_sock(sk); + if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) { + release_sock(sk); + ret = -ENOTCONN; + goto out; + } else if (namelen != 0) { + /* Cannot send to an IPv4 address using an IPv6 source + * address and cannot send to an IPv6 address using an + * IPv4 source address. + */ + if (ipv6_addr_v4mapped(&daddr) ^ + ipv6_addr_v4mapped(&rs->rs_bound_addr)) { + release_sock(sk); + ret = -EOPNOTSUPP; + goto out; + } + /* If the socket is already bound to a link local address, + * it can only send to peers on the same link. But allow + * communicating between link local and non-link local address. + */ + if (scope_id != rs->rs_bound_scope_id) { + if (!scope_id) { + scope_id = rs->rs_bound_scope_id; + } else if (rs->rs_bound_scope_id) { + release_sock(sk); + ret = -EINVAL; + goto out; + } + } + } + release_sock(sk); + + ret = rds_rdma_bytes(msg, &rdma_payload_len); + if (ret) + goto out; + + if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) { + ret = -EMSGSIZE; + goto out; + } + + if (payload_len > rds_sk_sndbuf(rs)) { + ret = -EMSGSIZE; goto out; } + if (zcopy) { + if (rs->rs_transport->t_type != RDS_TRANS_TCP) { + ret = -EOPNOTSUPP; + goto out; + } + num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX); + } /* size of rm including all sgs */ - ret = rds_rm_size(msg, payload_len); + ret = rds_rm_size(msg, num_sgs, &vct); if (ret < 0) goto out; @@ -974,12 +1272,12 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, /* Attach data to the rm */ if (payload_len) { - rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); - if (!rm->data.op_sg) { - ret = -ENOMEM; + rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs); + if (IS_ERR(rm->data.op_sg)) { + ret = PTR_ERR(rm->data.op_sg); goto out; } - ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); + ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy); if (ret) goto out; } @@ -989,12 +1287,15 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, /* rds_conn_create has a spinlock that runs with IRQ off. * Caching the conn in the socket helps a lot. */ - if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) + if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) && + rs->rs_tos == rs->rs_conn->c_tos) { conn = rs->rs_conn; - else { - conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr, - rs->rs_transport, - sock->sk->sk_allocation); + } else { + conn = rds_conn_create_outgoing(sock_net(sock->sk), + &rs->rs_bound_addr, &daddr, + rs->rs_transport, rs->rs_tos, + sock->sk->sk_allocation, + scope_id); if (IS_ERR(conn)) { ret = PTR_ERR(conn); goto out; @@ -1002,8 +1303,15 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, rs->rs_conn = conn; } + if (conn->c_trans->t_mp_capable) + cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)]; + else + cpath = &conn->c_path[0]; + + rm->m_conn_path = cpath; + /* Parse any control messages the user may have included. */ - ret = rds_cmsg_send(rs, rm, msg, &allocated_mr); + ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct); if (ret) goto out; @@ -1021,29 +1329,30 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; } - rds_conn_connect_if_down(conn); + if (rds_destroy_pending(conn)) { + ret = -EAGAIN; + goto out; + } + + if (rds_conn_path_down(cpath)) + rds_check_all_paths(conn); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); if (ret) { rs->rs_seen_congestion = 1; goto out; } - - while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, + while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); - /* XXX make sure this is reasonable */ - if (payload_len > rds_sk_sndbuf(rs)) { - ret = -EMSGSIZE; - goto out; - } + if (nonblock) { ret = -EAGAIN; goto out; } timeo = wait_event_interruptible_timeout(*sk_sleep(sk), - rds_send_queue_rm(rs, conn, rm, + rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued), @@ -1064,13 +1373,31 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, */ rds_stats_inc(s_send_queued); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - rds_send_xmit(conn); - + ret = rds_send_xmit(cpath); + if (ret == -ENOMEM || ret == -EAGAIN) { + ret = 0; + rcu_read_lock(); + if (rds_destroy_pending(cpath->cp_conn)) + ret = -ENETUNREACH; + else + queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); + rcu_read_unlock(); + } + if (ret) + goto out; rds_message_put(rm); + + for (ind = 0; ind < vct.indx; ind++) + kfree(vct.vec[ind].iov); + kfree(vct.vec); + return payload_len; out: + for (ind = 0; ind < vct.indx; ind++) + kfree(vct.vec[ind].iov); + kfree(vct.vec); + /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN * or in any other way, we need to destroy the MR again */ @@ -1083,10 +1410,16 @@ out: } /* - * Reply to a ping packet. + * send out a probe. Can be shared by rds_send_ping, + * rds_send_pong, rds_send_hb. + * rds_send_hb should use h_flags + * RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED + * or + * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED */ -int -rds_send_pong(struct rds_connection *conn, __be16 dport) +static int +rds_send_probe(struct rds_conn_path *cp, __be16 sport, + __be16 dport, u8 h_flags) { struct rds_message *rm; unsigned long flags; @@ -1098,31 +1431,50 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) goto out; } - rm->m_daddr = conn->c_faddr; + rm->m_daddr = cp->cp_conn->c_faddr; rm->data.op_active = 1; - rds_conn_connect_if_down(conn); + rds_conn_path_connect_if_down(cp); - ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); + ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); if (ret) goto out; - spin_lock_irqsave(&conn->c_lock, flags); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock_irqsave(&cp->cp_lock, flags); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); rds_message_addref(rm); - rm->m_inc.i_conn = conn; - - rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, - conn->c_next_tx_seq); - conn->c_next_tx_seq++; - spin_unlock_irqrestore(&conn->c_lock, flags); + rm->m_inc.i_conn = cp->cp_conn; + rm->m_inc.i_conn_path = cp; + + rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, + cp->cp_next_tx_seq); + rm->m_inc.i_hdr.h_flags |= h_flags; + cp->cp_next_tx_seq++; + + if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) && + cp->cp_conn->c_trans->t_mp_capable) { + __be16 npaths = cpu_to_be16(RDS_MPATH_WORKERS); + __be32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num); + + rds_message_add_extension(&rm->m_inc.i_hdr, + RDS_EXTHDR_NPATHS, &npaths, + sizeof(npaths)); + rds_message_add_extension(&rm->m_inc.i_hdr, + RDS_EXTHDR_GEN_NUM, + &my_gen_num, + sizeof(u32)); + } + spin_unlock_irqrestore(&cp->cp_lock, flags); rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + /* schedule the send work on rds_wq */ + rcu_read_lock(); + if (!rds_destroy_pending(cp->cp_conn)) + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rcu_read_unlock(); rds_message_put(rm); return 0; @@ -1132,3 +1484,26 @@ out: rds_message_put(rm); return ret; } + +int +rds_send_pong(struct rds_conn_path *cp, __be16 dport) +{ + return rds_send_probe(cp, 0, dport, 0); +} + +void +rds_send_ping(struct rds_connection *conn, int cp_index) +{ + unsigned long flags; + struct rds_conn_path *cp = &conn->c_path[cp_index]; + + spin_lock_irqsave(&cp->cp_lock, flags); + if (conn->c_ping_triggered) { + spin_unlock_irqrestore(&cp->cp_lock, flags); + return; + } + conn->c_ping_triggered = 1; + spin_unlock_irqrestore(&cp->cp_lock, flags); + rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0); +} +EXPORT_SYMBOL_GPL(rds_send_ping); |
