summaryrefslogtreecommitdiff
path: root/net/kcm/kcmsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/kcm/kcmsock.c')
-rw-r--r--net/kcm/kcmsock.c423
1 files changed, 146 insertions, 277 deletions
diff --git a/net/kcm/kcmsock.c b/net/kcm/kcmsock.c
index 890a2423f559..5dd7e0509a48 100644
--- a/net/kcm/kcmsock.c
+++ b/net/kcm/kcmsock.c
@@ -19,6 +19,7 @@
#include <linux/rculist.h>
#include <linux/skbuff.h>
#include <linux/socket.h>
+#include <linux/splice.h>
#include <linux/uaccess.h>
#include <linux/workqueue.h>
#include <linux/syscalls.h>
@@ -28,6 +29,7 @@
#include <net/netns/generic.h>
#include <net/sock.h>
#include <uapi/linux/kcm.h>
+#include <trace/events/sock.h>
unsigned int kcm_net_id;
@@ -349,6 +351,8 @@ static void psock_data_ready(struct sock *sk)
{
struct kcm_psock *psock;
+ trace_sk_data_ready(sk);
+
read_lock_bh(&sk->sk_callback_lock);
psock = (struct kcm_psock *)sk->sk_user_data;
@@ -426,7 +430,7 @@ static void psock_write_space(struct sock *sk)
/* Check if the socket is reserved so someone is waiting for sending. */
kcm = psock->tx_kcm;
- if (kcm && !unlikely(kcm->tx_stopped))
+ if (kcm)
queue_work(kcm_wq, &kcm->tx_work);
spin_unlock_bh(&mux->lock);
@@ -578,12 +582,10 @@ static void kcm_report_tx_retry(struct kcm_sock *kcm)
*/
static int kcm_write_msgs(struct kcm_sock *kcm)
{
+ unsigned int total_sent = 0;
struct sock *sk = &kcm->sk;
struct kcm_psock *psock;
- struct sk_buff *skb, *head;
- struct kcm_tx_msg *txm;
- unsigned short fragidx, frag_offset;
- unsigned int sent, total_sent = 0;
+ struct sk_buff *head;
int ret = 0;
kcm->tx_wait_more = false;
@@ -597,72 +599,58 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
if (skb_queue_empty(&sk->sk_write_queue))
return 0;
- kcm_tx_msg(skb_peek(&sk->sk_write_queue))->sent = 0;
-
- } else if (skb_queue_empty(&sk->sk_write_queue)) {
- return 0;
+ kcm_tx_msg(skb_peek(&sk->sk_write_queue))->started_tx = false;
}
- head = skb_peek(&sk->sk_write_queue);
- txm = kcm_tx_msg(head);
+retry:
+ while ((head = skb_peek(&sk->sk_write_queue))) {
+ struct msghdr msg = {
+ .msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES,
+ };
+ struct kcm_tx_msg *txm = kcm_tx_msg(head);
+ struct sk_buff *skb;
+ unsigned int msize;
+ int i;
- if (txm->sent) {
- /* Send of first skbuff in queue already in progress */
- if (WARN_ON(!psock)) {
- ret = -EINVAL;
- goto out;
+ if (!txm->started_tx) {
+ psock = reserve_psock(kcm);
+ if (!psock)
+ goto out;
+ skb = head;
+ txm->frag_offset = 0;
+ txm->sent = 0;
+ txm->started_tx = true;
+ } else {
+ if (WARN_ON(!psock)) {
+ ret = -EINVAL;
+ goto out;
+ }
+ skb = txm->frag_skb;
}
- sent = txm->sent;
- frag_offset = txm->frag_offset;
- fragidx = txm->fragidx;
- skb = txm->frag_skb;
- goto do_frag;
- }
-
-try_again:
- psock = reserve_psock(kcm);
- if (!psock)
- goto out;
-
- do {
- skb = head;
- txm = kcm_tx_msg(head);
- sent = 0;
-
-do_frag_list:
- if (WARN_ON(!skb_shinfo(skb)->nr_frags)) {
+ if (WARN_ON(!skb_shinfo(skb)->nr_frags) ||
+ WARN_ON_ONCE(!skb_frag_page(&skb_shinfo(skb)->frags[0]))) {
ret = -EINVAL;
goto out;
}
- for (fragidx = 0; fragidx < skb_shinfo(skb)->nr_frags;
- fragidx++) {
- skb_frag_t *frag;
+ msize = 0;
+ for (i = 0; i < skb_shinfo(skb)->nr_frags; i++)
+ msize += skb_frag_size(&skb_shinfo(skb)->frags[i]);
- frag_offset = 0;
-do_frag:
- frag = &skb_shinfo(skb)->frags[fragidx];
- if (WARN_ON(!skb_frag_size(frag))) {
- ret = -EINVAL;
- goto out;
- }
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE,
+ (const struct bio_vec *)skb_shinfo(skb)->frags,
+ skb_shinfo(skb)->nr_frags, msize);
+ iov_iter_advance(&msg.msg_iter, txm->frag_offset);
- ret = kernel_sendpage(psock->sk->sk_socket,
- skb_frag_page(frag),
- skb_frag_off(frag) + frag_offset,
- skb_frag_size(frag) - frag_offset,
- MSG_DONTWAIT);
+ do {
+ ret = sock_sendmsg(psock->sk->sk_socket, &msg);
if (ret <= 0) {
if (ret == -EAGAIN) {
/* Save state to try again when there's
* write space on the socket
*/
- txm->sent = sent;
- txm->frag_offset = frag_offset;
- txm->fragidx = fragidx;
txm->frag_skb = skb;
-
ret = 0;
goto out;
}
@@ -675,45 +663,44 @@ do_frag:
kcm_abort_tx_psock(psock, ret ? -ret : EPIPE,
true);
unreserve_psock(kcm);
+ psock = NULL;
- txm->sent = 0;
+ txm->started_tx = false;
kcm_report_tx_retry(kcm);
ret = 0;
-
- goto try_again;
+ goto retry;
}
- sent += ret;
- frag_offset += ret;
+ txm->sent += ret;
+ txm->frag_offset += ret;
KCM_STATS_ADD(psock->stats.tx_bytes, ret);
- if (frag_offset < skb_frag_size(frag)) {
- /* Not finished with this frag */
- goto do_frag;
- }
- }
+ } while (msg.msg_iter.count > 0);
if (skb == head) {
if (skb_has_frag_list(skb)) {
- skb = skb_shinfo(skb)->frag_list;
- goto do_frag_list;
+ txm->frag_skb = skb_shinfo(skb)->frag_list;
+ txm->frag_offset = 0;
+ continue;
}
} else if (skb->next) {
- skb = skb->next;
- goto do_frag_list;
+ txm->frag_skb = skb->next;
+ txm->frag_offset = 0;
+ continue;
}
/* Successfully sent the whole packet, account for it. */
+ sk->sk_wmem_queued -= txm->sent;
+ total_sent += txm->sent;
skb_dequeue(&sk->sk_write_queue);
kfree_skb(head);
- sk->sk_wmem_queued -= sent;
- total_sent += sent;
KCM_STATS_INCR(psock->stats.tx_msgs);
- } while ((head = skb_peek(&sk->sk_write_queue)));
+ }
out:
if (!head) {
/* Done with all queued messages. */
WARN_ON(!skb_queue_empty(&sk->sk_write_queue));
- unreserve_psock(kcm);
+ if (psock)
+ unreserve_psock(kcm);
}
/* Check if write space is available */
@@ -758,149 +745,6 @@ static void kcm_push(struct kcm_sock *kcm)
kcm_write_msgs(kcm);
}
-static ssize_t kcm_sendpage(struct socket *sock, struct page *page,
- int offset, size_t size, int flags)
-
-{
- struct sock *sk = sock->sk;
- struct kcm_sock *kcm = kcm_sk(sk);
- struct sk_buff *skb = NULL, *head = NULL;
- long timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
- bool eor;
- int err = 0;
- int i;
-
- if (flags & MSG_SENDPAGE_NOTLAST)
- flags |= MSG_MORE;
-
- /* No MSG_EOR from splice, only look at MSG_MORE */
- eor = !(flags & MSG_MORE);
-
- lock_sock(sk);
-
- sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
-
- err = -EPIPE;
- if (sk->sk_err)
- goto out_error;
-
- if (kcm->seq_skb) {
- /* Previously opened message */
- head = kcm->seq_skb;
- skb = kcm_tx_msg(head)->last_skb;
- i = skb_shinfo(skb)->nr_frags;
-
- if (skb_can_coalesce(skb, i, page, offset)) {
- skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], size);
- skb_shinfo(skb)->flags |= SKBFL_SHARED_FRAG;
- goto coalesced;
- }
-
- if (i >= MAX_SKB_FRAGS) {
- struct sk_buff *tskb;
-
- tskb = alloc_skb(0, sk->sk_allocation);
- while (!tskb) {
- kcm_push(kcm);
- err = sk_stream_wait_memory(sk, &timeo);
- if (err)
- goto out_error;
- }
-
- if (head == skb)
- skb_shinfo(head)->frag_list = tskb;
- else
- skb->next = tskb;
-
- skb = tskb;
- skb->ip_summed = CHECKSUM_UNNECESSARY;
- i = 0;
- }
- } else {
- /* Call the sk_stream functions to manage the sndbuf mem. */
- if (!sk_stream_memory_free(sk)) {
- kcm_push(kcm);
- set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
- err = sk_stream_wait_memory(sk, &timeo);
- if (err)
- goto out_error;
- }
-
- head = alloc_skb(0, sk->sk_allocation);
- while (!head) {
- kcm_push(kcm);
- err = sk_stream_wait_memory(sk, &timeo);
- if (err)
- goto out_error;
- }
-
- skb = head;
- i = 0;
- }
-
- get_page(page);
- skb_fill_page_desc_noacc(skb, i, page, offset, size);
- skb_shinfo(skb)->flags |= SKBFL_SHARED_FRAG;
-
-coalesced:
- skb->len += size;
- skb->data_len += size;
- skb->truesize += size;
- sk->sk_wmem_queued += size;
- sk_mem_charge(sk, size);
-
- if (head != skb) {
- head->len += size;
- head->data_len += size;
- head->truesize += size;
- }
-
- if (eor) {
- bool not_busy = skb_queue_empty(&sk->sk_write_queue);
-
- /* Message complete, queue it on send buffer */
- __skb_queue_tail(&sk->sk_write_queue, head);
- kcm->seq_skb = NULL;
- KCM_STATS_INCR(kcm->stats.tx_msgs);
-
- if (flags & MSG_BATCH) {
- kcm->tx_wait_more = true;
- } else if (kcm->tx_wait_more || not_busy) {
- err = kcm_write_msgs(kcm);
- if (err < 0) {
- /* We got a hard error in write_msgs but have
- * already queued this message. Report an error
- * in the socket, but don't affect return value
- * from sendmsg
- */
- pr_warn("KCM: Hard failure on kcm_write_msgs\n");
- report_csk_error(&kcm->sk, -err);
- }
- }
- } else {
- /* Message not complete, save state */
- kcm->seq_skb = head;
- kcm_tx_msg(head)->last_skb = skb;
- }
-
- KCM_STATS_ADD(kcm->stats.tx_bytes, size);
-
- release_sock(sk);
- return size;
-
-out_error:
- kcm_push(kcm);
-
- err = sk_stream_error(sk, flags, err);
-
- /* make sure we wake any epoll edge trigger waiter */
- if (unlikely(skb_queue_len(&sk->sk_write_queue) == 0 && err == -EAGAIN))
- sk->sk_write_space(sk);
-
- release_sock(sk);
- return err;
-}
-
static int kcm_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
{
struct sock *sk = sock->sk;
@@ -912,6 +756,7 @@ static int kcm_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
!(msg->msg_flags & MSG_MORE) : !!(msg->msg_flags & MSG_EOR);
int err = -EPIPE;
+ mutex_lock(&kcm->tx_mutex);
lock_sock(sk);
/* Per tcp_sendmsg this should be in poll */
@@ -986,29 +831,51 @@ start:
merge = false;
}
- copy = min_t(int, msg_data_left(msg),
- pfrag->size - pfrag->offset);
+ if (msg->msg_flags & MSG_SPLICE_PAGES) {
+ copy = msg_data_left(msg);
+ if (!sk_wmem_schedule(sk, copy))
+ goto wait_for_memory;
- if (!sk_wmem_schedule(sk, copy))
- goto wait_for_memory;
+ err = skb_splice_from_iter(skb, &msg->msg_iter, copy);
+ if (err < 0) {
+ if (err == -EMSGSIZE)
+ goto wait_for_memory;
+ goto out_error;
+ }
- err = skb_copy_to_page_nocache(sk, &msg->msg_iter, skb,
- pfrag->page,
- pfrag->offset,
- copy);
- if (err)
- goto out_error;
+ copy = err;
+ skb_shinfo(skb)->flags |= SKBFL_SHARED_FRAG;
+ sk_wmem_queued_add(sk, copy);
+ sk_mem_charge(sk, copy);
- /* Update the skb. */
- if (merge) {
- skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
+ if (head != skb)
+ head->truesize += copy;
} else {
- skb_fill_page_desc(skb, i, pfrag->page,
- pfrag->offset, copy);
- get_page(pfrag->page);
+ copy = min_t(int, msg_data_left(msg),
+ pfrag->size - pfrag->offset);
+ if (!sk_wmem_schedule(sk, copy))
+ goto wait_for_memory;
+
+ err = skb_copy_to_page_nocache(sk, &msg->msg_iter, skb,
+ pfrag->page,
+ pfrag->offset,
+ copy);
+ if (err)
+ goto out_error;
+
+ /* Update the skb. */
+ if (merge) {
+ skb_frag_size_add(
+ &skb_shinfo(skb)->frags[i - 1], copy);
+ } else {
+ skb_fill_page_desc(skb, i, pfrag->page,
+ pfrag->offset, copy);
+ get_page(pfrag->page);
+ }
+
+ pfrag->offset += copy;
}
- pfrag->offset += copy;
copied += copy;
if (head != skb) {
head->len += copy;
@@ -1060,20 +927,24 @@ partial_message:
KCM_STATS_ADD(kcm->stats.tx_bytes, copied);
release_sock(sk);
+ mutex_unlock(&kcm->tx_mutex);
return copied;
out_error:
kcm_push(kcm);
- if (copied && sock->type == SOCK_SEQPACKET) {
+ if (sock->type == SOCK_SEQPACKET) {
/* Wrote some bytes before encountering an
* error, return partial success.
*/
- goto partial_message;
- }
-
- if (head != kcm->seq_skb)
+ if (copied)
+ goto partial_message;
+ if (head != kcm->seq_skb)
+ kfree_skb(head);
+ } else {
kfree_skb(head);
+ kcm->seq_skb = NULL;
+ }
err = sk_stream_error(sk, msg->msg_flags, err);
@@ -1082,9 +953,23 @@ out_error:
sk->sk_write_space(sk);
release_sock(sk);
+ mutex_unlock(&kcm->tx_mutex);
return err;
}
+static void kcm_splice_eof(struct socket *sock)
+{
+ struct sock *sk = sock->sk;
+ struct kcm_sock *kcm = kcm_sk(sk);
+
+ if (skb_queue_empty_lockless(&sk->sk_write_queue))
+ return;
+
+ lock_sock(sk);
+ kcm_write_msgs(kcm);
+ release_sock(sk);
+}
+
static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
size_t len, int flags)
{
@@ -1145,6 +1030,11 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
ssize_t copied;
struct sk_buff *skb;
+ if (sock->file->f_flags & O_NONBLOCK || flags & SPLICE_F_NONBLOCK)
+ flags = MSG_DONTWAIT;
+ else
+ flags = 0;
+
/* Only support splice for SOCKSEQPACKET */
skb = skb_recv_datagram(sk, flags, &err);
@@ -1271,10 +1161,11 @@ static int kcm_getsockopt(struct socket *sock, int level, int optname,
if (get_user(len, optlen))
return -EFAULT;
- len = min_t(unsigned int, len, sizeof(int));
if (len < 0)
return -EINVAL;
+ len = min_t(unsigned int, len, sizeof(int));
+
switch (optname) {
case KCM_RECV_DISABLE:
val = kcm->rx_disabled;
@@ -1321,6 +1212,7 @@ static void init_kcm_sock(struct kcm_sock *kcm, struct kcm_mux *mux)
spin_unlock_bh(&mux->lock);
INIT_WORK(&kcm->tx_work, kcm_tx_work);
+ mutex_init(&kcm->tx_mutex);
spin_lock_bh(&mux->rx_lock);
kcm_rcv_ready(kcm);
@@ -1668,24 +1560,16 @@ static int kcm_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
}
case SIOCKCMCLONE: {
struct kcm_clone info;
- struct file *file;
- info.fd = get_unused_fd_flags(0);
- if (unlikely(info.fd < 0))
- return info.fd;
+ FD_PREPARE(fdf, 0, kcm_clone(sock));
+ if (fdf.err)
+ return fdf.err;
- file = kcm_clone(sock);
- if (IS_ERR(file)) {
- put_unused_fd(info.fd);
- return PTR_ERR(file);
- }
- if (copy_to_user((void __user *)arg, &info,
- sizeof(info))) {
- put_unused_fd(info.fd);
- fput(file);
+ info.fd = fd_prepare_fd(fdf);
+ if (copy_to_user((void __user *)arg, &info, sizeof(info)))
return -EFAULT;
- }
- fd_install(info.fd, file);
+
+ fd_publish(fdf);
err = 0;
break;
}
@@ -1697,14 +1581,6 @@ static int kcm_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
return err;
}
-static void free_mux(struct rcu_head *rcu)
-{
- struct kcm_mux *mux = container_of(rcu,
- struct kcm_mux, rcu);
-
- kmem_cache_free(kcm_muxp, mux);
-}
-
static void release_mux(struct kcm_mux *mux)
{
struct kcm_net *knet = mux->knet;
@@ -1732,7 +1608,7 @@ static void release_mux(struct kcm_mux *mux)
knet->count--;
mutex_unlock(&knet->mutex);
- call_rcu(&mux->rcu, free_mux);
+ kfree_rcu(mux, rcu);
}
static void kcm_done(struct kcm_sock *kcm)
@@ -1809,12 +1685,6 @@ static int kcm_release(struct socket *sock)
*/
__skb_queue_purge(&sk->sk_write_queue);
- /* Set tx_stopped. This is checked when psock is bound to a kcm and we
- * get a writespace callback. This prevents further work being queued
- * from the callback (unbinding the psock occurs after canceling work.
- */
- kcm->tx_stopped = 1;
-
release_sock(sk);
spin_lock_bh(&mux->lock);
@@ -1830,7 +1700,7 @@ static int kcm_release(struct socket *sock)
/* Cancel work. After this point there should be no outside references
* to the kcm socket.
*/
- cancel_work_sync(&kcm->tx_work);
+ disable_work_sync(&kcm->tx_work);
lock_sock(sk);
psock = kcm->tx_psock;
@@ -1872,7 +1742,7 @@ static const struct proto_ops kcm_dgram_ops = {
.sendmsg = kcm_sendmsg,
.recvmsg = kcm_recvmsg,
.mmap = sock_no_mmap,
- .sendpage = kcm_sendpage,
+ .splice_eof = kcm_splice_eof,
};
static const struct proto_ops kcm_seqpacket_ops = {
@@ -1893,7 +1763,7 @@ static const struct proto_ops kcm_seqpacket_ops = {
.sendmsg = kcm_sendmsg,
.recvmsg = kcm_recvmsg,
.mmap = sock_no_mmap,
- .sendpage = kcm_sendpage,
+ .splice_eof = kcm_splice_eof,
.splice_read = kcm_splice_read,
};
@@ -1981,6 +1851,8 @@ static __net_exit void kcm_exit_net(struct net *net)
* that all multiplexors and psocks have been destroyed.
*/
WARN_ON(!list_empty(&knet->mux_list));
+
+ mutex_destroy(&knet->mutex);
}
static struct pernet_operations kcm_net_ops = {
@@ -1994,15 +1866,11 @@ static int __init kcm_init(void)
{
int err = -ENOMEM;
- kcm_muxp = kmem_cache_create("kcm_mux_cache",
- sizeof(struct kcm_mux), 0,
- SLAB_HWCACHE_ALIGN, NULL);
+ kcm_muxp = KMEM_CACHE(kcm_mux, SLAB_HWCACHE_ALIGN);
if (!kcm_muxp)
goto fail;
- kcm_psockp = kmem_cache_create("kcm_psock_cache",
- sizeof(struct kcm_psock), 0,
- SLAB_HWCACHE_ALIGN, NULL);
+ kcm_psockp = KMEM_CACHE(kcm_psock, SLAB_HWCACHE_ALIGN);
if (!kcm_psockp)
goto fail;
@@ -2063,4 +1931,5 @@ module_init(kcm_init);
module_exit(kcm_exit);
MODULE_LICENSE("GPL");
+MODULE_DESCRIPTION("KCM (Kernel Connection Multiplexor) sockets");
MODULE_ALIAS_NETPROTO(PF_KCM);