diff options
Diffstat (limited to 'kernel/bpf/ringbuf.c')
| -rw-r--r-- | kernel/bpf/ringbuf.c | 201 |
1 files changed, 152 insertions, 49 deletions
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c index 80f4b4d88aaf..f6a075ffac63 100644 --- a/kernel/bpf/ringbuf.c +++ b/kernel/bpf/ringbuf.c @@ -11,33 +11,27 @@ #include <linux/kmemleak.h> #include <uapi/linux/btf.h> #include <linux/btf_ids.h> +#include <asm/rqspinlock.h> -#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE) +#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_RB_OVERWRITE) /* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */ #define RINGBUF_PGOFF \ (offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT) /* consumer page and producer page */ #define RINGBUF_POS_PAGES 2 +#define RINGBUF_NR_META_PAGES (RINGBUF_PGOFF + RINGBUF_POS_PAGES) #define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4) -/* Maximum size of ring buffer area is limited by 32-bit page offset within - * record header, counted in pages. Reserve 8 bits for extensibility, and take - * into account few extra pages for consumer/producer pages and - * non-mmap()'able parts. This gives 64GB limit, which seems plenty for single - * ring buffer. - */ -#define RINGBUF_MAX_DATA_SZ \ - (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE) - struct bpf_ringbuf { wait_queue_head_t waitq; struct irq_work work; u64 mask; struct page **pages; int nr_pages; - spinlock_t spinlock ____cacheline_aligned_in_smp; + bool overwrite_mode; + rqspinlock_t spinlock ____cacheline_aligned_in_smp; /* For user-space producer ring buffers, an atomic_t busy bit is used * to synchronize access to the ring buffers in the kernel, rather than * the spinlock that is used for kernel-producer ring buffers. This is @@ -59,7 +53,8 @@ struct bpf_ringbuf { * This prevents a user-space application from modifying the * position and ruining in-kernel tracking. The permissions of the * pages depend on who is producing samples: user-space or the - * kernel. + * kernel. Note that the pending counter is placed in the same + * page as the producer, so that it shares the same cache line. * * Kernel-producer * --------------- @@ -78,6 +73,8 @@ struct bpf_ringbuf { */ unsigned long consumer_pos __aligned(PAGE_SIZE); unsigned long producer_pos __aligned(PAGE_SIZE); + unsigned long pending_pos; + unsigned long overwrite_pos; /* position after the last overwritten record */ char data[] __aligned(PAGE_SIZE); }; @@ -96,7 +93,7 @@ static struct bpf_ringbuf *bpf_ringbuf_area_alloc(size_t data_sz, int numa_node) { const gfp_t flags = GFP_KERNEL_ACCOUNT | __GFP_RETRY_MAYFAIL | __GFP_NOWARN | __GFP_ZERO; - int nr_meta_pages = RINGBUF_PGOFF + RINGBUF_POS_PAGES; + int nr_meta_pages = RINGBUF_NR_META_PAGES; int nr_data_pages = data_sz >> PAGE_SHIFT; int nr_pages = nr_meta_pages + nr_data_pages; struct page **pages, *page; @@ -160,7 +157,18 @@ static void bpf_ringbuf_notify(struct irq_work *work) wake_up_all(&rb->waitq); } -static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node) +/* Maximum size of ring buffer area is limited by 32-bit page offset within + * record header, counted in pages. Reserve 8 bits for extensibility, and + * take into account few extra pages for consumer/producer pages and + * non-mmap()'able parts, the current maximum size would be: + * + * (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE) + * + * This gives 64GB limit, which seems plenty for single ring buffer. Now + * considering that the maximum value of data_sz is (4GB - 1), there + * will be no overflow, so just note the size limit in the comments. + */ +static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node, bool overwrite_mode) { struct bpf_ringbuf *rb; @@ -168,7 +176,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node) if (!rb) return NULL; - spin_lock_init(&rb->spinlock); + raw_res_spin_lock_init(&rb->spinlock); atomic_set(&rb->busy, 0); init_waitqueue_head(&rb->waitq); init_irq_work(&rb->work, bpf_ringbuf_notify); @@ -176,35 +184,38 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node) rb->mask = data_sz - 1; rb->consumer_pos = 0; rb->producer_pos = 0; + rb->pending_pos = 0; + rb->overwrite_mode = overwrite_mode; return rb; } static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr) { + bool overwrite_mode = false; struct bpf_ringbuf_map *rb_map; if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK) return ERR_PTR(-EINVAL); + if (attr->map_flags & BPF_F_RB_OVERWRITE) { + if (attr->map_type != BPF_MAP_TYPE_RINGBUF) + return ERR_PTR(-EINVAL); + overwrite_mode = true; + } + if (attr->key_size || attr->value_size || !is_power_of_2(attr->max_entries) || !PAGE_ALIGNED(attr->max_entries)) return ERR_PTR(-EINVAL); -#ifdef CONFIG_64BIT - /* on 32-bit arch, it's impossible to overflow record's hdr->pgoff */ - if (attr->max_entries > RINGBUF_MAX_DATA_SZ) - return ERR_PTR(-E2BIG); -#endif - rb_map = bpf_map_area_alloc(sizeof(*rb_map), NUMA_NO_NODE); if (!rb_map) return ERR_PTR(-ENOMEM); bpf_map_init_from_attr(&rb_map->map, attr); - rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node); + rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node, overwrite_mode); if (!rb_map->rb) { bpf_map_area_free(rb_map); return ERR_PTR(-ENOMEM); @@ -215,6 +226,8 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr) static void bpf_ringbuf_free(struct bpf_ringbuf *rb) { + irq_work_sync(&rb->work); + /* copy pages pointer and nr_pages to local variable, as we are going * to unmap rb itself with vunmap() below */ @@ -241,13 +254,13 @@ static void *ringbuf_map_lookup_elem(struct bpf_map *map, void *key) return ERR_PTR(-ENOTSUPP); } -static int ringbuf_map_update_elem(struct bpf_map *map, void *key, void *value, - u64 flags) +static long ringbuf_map_update_elem(struct bpf_map *map, void *key, void *value, + u64 flags) { return -ENOTSUPP; } -static int ringbuf_map_delete_elem(struct bpf_map *map, void *key) +static long ringbuf_map_delete_elem(struct bpf_map *map, void *key) { return -ENOTSUPP; } @@ -268,8 +281,6 @@ static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma /* allow writable mapping for the consumer_pos only */ if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE) return -EPERM; - } else { - vma->vm_flags &= ~VM_MAYWRITE; } /* remap_vmalloc_range() checks size and offset constraints */ return remap_vmalloc_range(vma, rb_map->rb, @@ -289,20 +300,31 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma * position, and the ring buffer data itself. */ return -EPERM; - } else { - vma->vm_flags &= ~VM_MAYWRITE; } /* remap_vmalloc_range() checks size and offset constraints */ return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF); } +/* + * Return an estimate of the available data in the ring buffer. + * Note: the returned value can exceed the actual ring buffer size because the + * function is not synchronized with the producer. The producer acquires the + * ring buffer's spinlock, but this function does not. + */ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb) { - unsigned long cons_pos, prod_pos; + unsigned long cons_pos, prod_pos, over_pos; cons_pos = smp_load_acquire(&rb->consumer_pos); - prod_pos = smp_load_acquire(&rb->producer_pos); - return prod_pos - cons_pos; + + if (unlikely(rb->overwrite_mode)) { + over_pos = smp_load_acquire(&rb->overwrite_pos); + prod_pos = smp_load_acquire(&rb->producer_pos); + return prod_pos - max(cons_pos, over_pos); + } else { + prod_pos = smp_load_acquire(&rb->producer_pos); + return prod_pos - cons_pos; + } } static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb) @@ -336,6 +358,21 @@ static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp, return 0; } +static u64 ringbuf_map_mem_usage(const struct bpf_map *map) +{ + struct bpf_ringbuf *rb; + int nr_data_pages; + int nr_meta_pages; + u64 usage = sizeof(struct bpf_ringbuf_map); + + rb = container_of(map, struct bpf_ringbuf_map, map)->rb; + usage += (u64)rb->nr_pages << PAGE_SHIFT; + nr_meta_pages = RINGBUF_NR_META_PAGES; + nr_data_pages = map->max_entries >> PAGE_SHIFT; + usage += (nr_meta_pages + 2 * nr_data_pages) * sizeof(struct page *); + return usage; +} + BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map) const struct bpf_map_ops ringbuf_map_ops = { .map_meta_equal = bpf_map_meta_equal, @@ -347,6 +384,7 @@ const struct bpf_map_ops ringbuf_map_ops = { .map_update_elem = ringbuf_map_update_elem, .map_delete_elem = ringbuf_map_delete_elem, .map_get_next_key = ringbuf_map_get_next_key, + .map_mem_usage = ringbuf_map_mem_usage, .map_btf_id = &ringbuf_map_btf_ids[0], }; @@ -361,6 +399,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = { .map_update_elem = ringbuf_map_update_elem, .map_delete_elem = ringbuf_map_delete_elem, .map_get_next_key = ringbuf_map_get_next_key, + .map_mem_usage = ringbuf_map_mem_usage, .map_btf_id = &user_ringbuf_map_btf_ids[0], }; @@ -388,11 +427,43 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr) return (void*)((addr & PAGE_MASK) - off); } +static bool bpf_ringbuf_has_space(const struct bpf_ringbuf *rb, + unsigned long new_prod_pos, + unsigned long cons_pos, + unsigned long pend_pos) +{ + /* + * No space if oldest not yet committed record until the newest + * record span more than (ringbuf_size - 1). + */ + if (new_prod_pos - pend_pos > rb->mask) + return false; + + /* Ok, we have space in overwrite mode */ + if (unlikely(rb->overwrite_mode)) + return true; + + /* + * No space if producer position advances more than (ringbuf_size - 1) + * ahead of consumer position when not in overwrite mode. + */ + if (new_prod_pos - cons_pos > rb->mask) + return false; + + return true; +} + +static u32 bpf_ringbuf_round_up_hdr_len(u32 hdr_len) +{ + hdr_len &= ~BPF_RINGBUF_DISCARD_BIT; + return round_up(hdr_len + BPF_RINGBUF_HDR_SZ, 8); +} + static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) { - unsigned long cons_pos, prod_pos, new_prod_pos, flags; - u32 len, pg_off; + unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, over_pos, flags; struct bpf_ringbuf_hdr *hdr; + u32 len, pg_off, hdr_len; if (unlikely(size > RINGBUF_MAX_RECORD_SZ)) return NULL; @@ -403,24 +474,55 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) cons_pos = smp_load_acquire(&rb->consumer_pos); - if (in_nmi()) { - if (!spin_trylock_irqsave(&rb->spinlock, flags)) - return NULL; - } else { - spin_lock_irqsave(&rb->spinlock, flags); - } + if (raw_res_spin_lock_irqsave(&rb->spinlock, flags)) + return NULL; + pend_pos = rb->pending_pos; prod_pos = rb->producer_pos; new_prod_pos = prod_pos + len; - /* check for out of ringbuf space by ensuring producer position - * doesn't advance more than (ringbuf_size - 1) ahead - */ - if (new_prod_pos - cons_pos > rb->mask) { - spin_unlock_irqrestore(&rb->spinlock, flags); + while (pend_pos < prod_pos) { + hdr = (void *)rb->data + (pend_pos & rb->mask); + hdr_len = READ_ONCE(hdr->len); + if (hdr_len & BPF_RINGBUF_BUSY_BIT) + break; + pend_pos += bpf_ringbuf_round_up_hdr_len(hdr_len); + } + rb->pending_pos = pend_pos; + + if (!bpf_ringbuf_has_space(rb, new_prod_pos, cons_pos, pend_pos)) { + raw_res_spin_unlock_irqrestore(&rb->spinlock, flags); return NULL; } + /* + * In overwrite mode, advance overwrite_pos when the ring buffer is full. + * The key points are to stay on record boundaries and consume enough records + * to fit the new one. + */ + if (unlikely(rb->overwrite_mode)) { + over_pos = rb->overwrite_pos; + while (new_prod_pos - over_pos > rb->mask) { + hdr = (void *)rb->data + (over_pos & rb->mask); + hdr_len = READ_ONCE(hdr->len); + /* + * The bpf_ringbuf_has_space() check above ensures we won’t + * step over a record currently being worked on by another + * producer. + */ + over_pos += bpf_ringbuf_round_up_hdr_len(hdr_len); + } + /* + * smp_store_release(&rb->producer_pos, new_prod_pos) at + * the end of the function ensures that when consumer sees + * the updated rb->producer_pos, it always sees the updated + * rb->overwrite_pos, so when consumer reads overwrite_pos + * after smp_load_acquire(r->producer_pos), the overwrite_pos + * will always be valid. + */ + WRITE_ONCE(rb->overwrite_pos, over_pos); + } + hdr = (void *)rb->data + (prod_pos & rb->mask); pg_off = bpf_ringbuf_rec_pg_off(rb, hdr); hdr->len = size | BPF_RINGBUF_BUSY_BIT; @@ -429,7 +531,7 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) /* pairs with consumer's smp_load_acquire() */ smp_store_release(&rb->producer_pos, new_prod_pos); - spin_unlock_irqrestore(&rb->spinlock, flags); + raw_res_spin_unlock_irqrestore(&rb->spinlock, flags); return (void *)hdr + BPF_RINGBUF_HDR_SZ; } @@ -550,6 +652,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags) return smp_load_acquire(&rb->consumer_pos); case BPF_RB_PROD_POS: return smp_load_acquire(&rb->producer_pos); + case BPF_RB_OVERWRITE_POS: + return smp_load_acquire(&rb->overwrite_pos); default: return 0; } @@ -599,7 +703,7 @@ const struct bpf_func_proto bpf_ringbuf_reserve_dynptr_proto = { .arg1_type = ARG_CONST_MAP_PTR, .arg2_type = ARG_ANYTHING, .arg3_type = ARG_ANYTHING, - .arg4_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | MEM_UNINIT, + .arg4_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | MEM_UNINIT | MEM_WRITE, }; BPF_CALL_2(bpf_ringbuf_submit_dynptr, struct bpf_dynptr_kern *, ptr, u64, flags) @@ -756,8 +860,7 @@ schedule_work_return: /* Prevent the clearing of the busy-bit from being reordered before the * storing of any rb consumer or producer positions. */ - smp_mb__before_atomic(); - atomic_set(&rb->busy, 0); + atomic_set_release(&rb->busy, 0); if (flags & BPF_RB_FORCE_WAKEUP) irq_work_queue(&rb->work); |
