1 // SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) 2 /* 3 * Ring buffer operations. 4 * 5 * Copyright (C) 2020 Facebook, Inc. 6 */ 7 #ifndef _GNU_SOURCE 8 #define _GNU_SOURCE 9 #endif 10 #include <stdlib.h> 11 #include <stdio.h> 12 #include <errno.h> 13 #include <unistd.h> 14 #include <linux/err.h> 15 #include <linux/bpf.h> 16 #include <asm/barrier.h> 17 #include <sys/mman.h> 18 #include <sys/epoll.h> 19 #include <tools/libc_compat.h> 20 21 #include "libbpf.h" 22 #include "libbpf_internal.h" 23 #include "bpf.h" 24 25 /* make sure libbpf doesn't use kernel-only integer typedefs */ 26 #pragma GCC poison u8 u16 u32 u64 s8 s16 s32 s64 27 28 struct ring { 29 ring_buffer_sample_fn sample_cb; 30 void *ctx; 31 void *data; 32 unsigned long *consumer_pos; 33 unsigned long *producer_pos; 34 unsigned long mask; 35 int map_fd; 36 }; 37 38 struct ring_buffer { 39 struct epoll_event *events; 40 struct ring *rings; 41 size_t page_size; 42 int epoll_fd; 43 int ring_cnt; 44 }; 45 46 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) 47 { 48 if (r->consumer_pos) { 49 munmap(r->consumer_pos, rb->page_size); 50 r->consumer_pos = NULL; 51 } 52 if (r->producer_pos) { 53 munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1)); 54 r->producer_pos = NULL; 55 } 56 } 57 58 /* Add extra RINGBUF maps to this ring buffer manager */ 59 int ring_buffer__add(struct ring_buffer *rb, int map_fd, 60 ring_buffer_sample_fn sample_cb, void *ctx) 61 { 62 struct bpf_map_info info; 63 __u32 len = sizeof(info); 64 struct epoll_event *e; 65 struct ring *r; 66 void *tmp; 67 int err; 68 69 memset(&info, 0, sizeof(info)); 70 71 err = bpf_obj_get_info_by_fd(map_fd, &info, &len); 72 if (err) { 73 err = -errno; 74 pr_warn("ringbuf: failed to get map info for fd=%d: %d\n", 75 map_fd, err); 76 return err; 77 } 78 79 if (info.type != BPF_MAP_TYPE_RINGBUF) { 80 pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n", 81 map_fd); 82 return -EINVAL; 83 } 84 85 tmp = reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings)); 86 if (!tmp) 87 return -ENOMEM; 88 rb->rings = tmp; 89 90 tmp = reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events)); 91 if (!tmp) 92 return -ENOMEM; 93 rb->events = tmp; 94 95 r = &rb->rings[rb->ring_cnt]; 96 memset(r, 0, sizeof(*r)); 97 98 r->map_fd = map_fd; 99 r->sample_cb = sample_cb; 100 r->ctx = ctx; 101 r->mask = info.max_entries - 1; 102 103 /* Map writable consumer page */ 104 tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, 105 map_fd, 0); 106 if (tmp == MAP_FAILED) { 107 err = -errno; 108 pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n", 109 map_fd, err); 110 return err; 111 } 112 r->consumer_pos = tmp; 113 114 /* Map read-only producer page and data pages. We map twice as big 115 * data size to allow simple reading of samples that wrap around the 116 * end of a ring buffer. See kernel implementation for details. 117 * */ 118 tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, PROT_READ, 119 MAP_SHARED, map_fd, rb->page_size); 120 if (tmp == MAP_FAILED) { 121 err = -errno; 122 ringbuf_unmap_ring(rb, r); 123 pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n", 124 map_fd, err); 125 return err; 126 } 127 r->producer_pos = tmp; 128 r->data = tmp + rb->page_size; 129 130 e = &rb->events[rb->ring_cnt]; 131 memset(e, 0, sizeof(*e)); 132 133 e->events = EPOLLIN; 134 e->data.fd = rb->ring_cnt; 135 if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) { 136 err = -errno; 137 ringbuf_unmap_ring(rb, r); 138 pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n", 139 map_fd, err); 140 return err; 141 } 142 143 rb->ring_cnt++; 144 return 0; 145 } 146 147 void ring_buffer__free(struct ring_buffer *rb) 148 { 149 int i; 150 151 if (!rb) 152 return; 153 154 for (i = 0; i < rb->ring_cnt; ++i) 155 ringbuf_unmap_ring(rb, &rb->rings[i]); 156 if (rb->epoll_fd >= 0) 157 close(rb->epoll_fd); 158 159 free(rb->events); 160 free(rb->rings); 161 free(rb); 162 } 163 164 struct ring_buffer * 165 ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx, 166 const struct ring_buffer_opts *opts) 167 { 168 struct ring_buffer *rb; 169 int err; 170 171 if (!OPTS_VALID(opts, ring_buffer_opts)) 172 return NULL; 173 174 rb = calloc(1, sizeof(*rb)); 175 if (!rb) 176 return NULL; 177 178 rb->page_size = getpagesize(); 179 180 rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); 181 if (rb->epoll_fd < 0) { 182 err = -errno; 183 pr_warn("ringbuf: failed to create epoll instance: %d\n", err); 184 goto err_out; 185 } 186 187 err = ring_buffer__add(rb, map_fd, sample_cb, ctx); 188 if (err) 189 goto err_out; 190 191 return rb; 192 193 err_out: 194 ring_buffer__free(rb); 195 return NULL; 196 } 197 198 static inline int roundup_len(__u32 len) 199 { 200 /* clear out top 2 bits (discard and busy, if set) */ 201 len <<= 2; 202 len >>= 2; 203 /* add length prefix */ 204 len += BPF_RINGBUF_HDR_SZ; 205 /* round up to 8 byte alignment */ 206 return (len + 7) / 8 * 8; 207 } 208 209 static int ringbuf_process_ring(struct ring* r) 210 { 211 int *len_ptr, len, err, cnt = 0; 212 unsigned long cons_pos, prod_pos; 213 bool got_new_data; 214 void *sample; 215 216 cons_pos = smp_load_acquire(r->consumer_pos); 217 do { 218 got_new_data = false; 219 prod_pos = smp_load_acquire(r->producer_pos); 220 while (cons_pos < prod_pos) { 221 len_ptr = r->data + (cons_pos & r->mask); 222 len = smp_load_acquire(len_ptr); 223 224 /* sample not committed yet, bail out for now */ 225 if (len & BPF_RINGBUF_BUSY_BIT) 226 goto done; 227 228 got_new_data = true; 229 cons_pos += roundup_len(len); 230 231 if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { 232 sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; 233 err = r->sample_cb(r->ctx, sample, len); 234 if (err) { 235 /* update consumer pos and bail out */ 236 smp_store_release(r->consumer_pos, 237 cons_pos); 238 return err; 239 } 240 cnt++; 241 } 242 243 smp_store_release(r->consumer_pos, cons_pos); 244 } 245 } while (got_new_data); 246 done: 247 return cnt; 248 } 249 250 /* Consume available ring buffer(s) data without event polling. 251 * Returns number of records consumed across all registered ring buffers, or 252 * negative number if any of the callbacks return error. 253 */ 254 int ring_buffer__consume(struct ring_buffer *rb) 255 { 256 int i, err, res = 0; 257 258 for (i = 0; i < rb->ring_cnt; i++) { 259 struct ring *ring = &rb->rings[i]; 260 261 err = ringbuf_process_ring(ring); 262 if (err < 0) 263 return err; 264 res += err; 265 } 266 return res; 267 } 268 269 /* Poll for available data and consume records, if any are available. 270 * Returns number of records consumed, or negative number, if any of the 271 * registered callbacks returned error. 272 */ 273 int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) 274 { 275 int i, cnt, err, res = 0; 276 277 cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms); 278 for (i = 0; i < cnt; i++) { 279 __u32 ring_id = rb->events[i].data.fd; 280 struct ring *ring = &rb->rings[ring_id]; 281 282 err = ringbuf_process_ring(ring); 283 if (err < 0) 284 return err; 285 res += cnt; 286 } 287 return cnt < 0 ? -errno : res; 288 } 289