1 /*
2  * Test functionality of BPF filters with SO_REUSEPORT.  This program creates
3  * an SO_REUSEPORT receiver group containing one socket per CPU core. It then
4  * creates a BPF program that will select a socket from this group based
5  * on the core id that receives the packet.  The sending code artificially
6  * moves itself to run on different core ids and sends one message from
7  * each core.  Since these packets are delivered over loopback, they should
8  * arrive on the same core that sent them.  The receiving code then ensures
9  * that the packet was received on the socket for the corresponding core id.
10  * This entire process is done for several different core id permutations
11  * and for each IPv4/IPv6 and TCP/UDP combination.
12  */
13 
14 #define _GNU_SOURCE
15 
16 #include <arpa/inet.h>
17 #include <errno.h>
18 #include <error.h>
19 #include <linux/filter.h>
20 #include <linux/in.h>
21 #include <linux/unistd.h>
22 #include <sched.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/epoll.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30 
31 static const int PORT = 8888;
32 
33 static void build_rcv_group(int *rcv_fd, size_t len, int family, int proto)
34 {
35 	struct sockaddr_storage addr;
36 	struct sockaddr_in  *addr4;
37 	struct sockaddr_in6 *addr6;
38 	size_t i;
39 	int opt;
40 
41 	switch (family) {
42 	case AF_INET:
43 		addr4 = (struct sockaddr_in *)&addr;
44 		addr4->sin_family = AF_INET;
45 		addr4->sin_addr.s_addr = htonl(INADDR_ANY);
46 		addr4->sin_port = htons(PORT);
47 		break;
48 	case AF_INET6:
49 		addr6 = (struct sockaddr_in6 *)&addr;
50 		addr6->sin6_family = AF_INET6;
51 		addr6->sin6_addr = in6addr_any;
52 		addr6->sin6_port = htons(PORT);
53 		break;
54 	default:
55 		error(1, 0, "Unsupported family %d", family);
56 	}
57 
58 	for (i = 0; i < len; ++i) {
59 		rcv_fd[i] = socket(family, proto, 0);
60 		if (rcv_fd[i] < 0)
61 			error(1, errno, "failed to create receive socket");
62 
63 		opt = 1;
64 		if (setsockopt(rcv_fd[i], SOL_SOCKET, SO_REUSEPORT, &opt,
65 			       sizeof(opt)))
66 			error(1, errno, "failed to set SO_REUSEPORT");
67 
68 		if (bind(rcv_fd[i], (struct sockaddr *)&addr, sizeof(addr)))
69 			error(1, errno, "failed to bind receive socket");
70 
71 		if (proto == SOCK_STREAM && listen(rcv_fd[i], len * 10))
72 			error(1, errno, "failed to listen on receive port");
73 	}
74 }
75 
76 static void attach_bpf(int fd)
77 {
78 	struct sock_filter code[] = {
79 		/* A = raw_smp_processor_id() */
80 		{ BPF_LD  | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU },
81 		/* return A */
82 		{ BPF_RET | BPF_A, 0, 0, 0 },
83 	};
84 	struct sock_fprog p = {
85 		.len = 2,
86 		.filter = code,
87 	};
88 
89 	if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF, &p, sizeof(p)))
90 		error(1, errno, "failed to set SO_ATTACH_REUSEPORT_CBPF");
91 }
92 
93 static void send_from_cpu(int cpu_id, int family, int proto)
94 {
95 	struct sockaddr_storage saddr, daddr;
96 	struct sockaddr_in  *saddr4, *daddr4;
97 	struct sockaddr_in6 *saddr6, *daddr6;
98 	cpu_set_t cpu_set;
99 	int fd;
100 
101 	switch (family) {
102 	case AF_INET:
103 		saddr4 = (struct sockaddr_in *)&saddr;
104 		saddr4->sin_family = AF_INET;
105 		saddr4->sin_addr.s_addr = htonl(INADDR_ANY);
106 		saddr4->sin_port = 0;
107 
108 		daddr4 = (struct sockaddr_in *)&daddr;
109 		daddr4->sin_family = AF_INET;
110 		daddr4->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
111 		daddr4->sin_port = htons(PORT);
112 		break;
113 	case AF_INET6:
114 		saddr6 = (struct sockaddr_in6 *)&saddr;
115 		saddr6->sin6_family = AF_INET6;
116 		saddr6->sin6_addr = in6addr_any;
117 		saddr6->sin6_port = 0;
118 
119 		daddr6 = (struct sockaddr_in6 *)&daddr;
120 		daddr6->sin6_family = AF_INET6;
121 		daddr6->sin6_addr = in6addr_loopback;
122 		daddr6->sin6_port = htons(PORT);
123 		break;
124 	default:
125 		error(1, 0, "Unsupported family %d", family);
126 	}
127 
128 	memset(&cpu_set, 0, sizeof(cpu_set));
129 	CPU_SET(cpu_id, &cpu_set);
130 	if (sched_setaffinity(0, sizeof(cpu_set), &cpu_set) < 0)
131 		error(1, errno, "failed to pin to cpu");
132 
133 	fd = socket(family, proto, 0);
134 	if (fd < 0)
135 		error(1, errno, "failed to create send socket");
136 
137 	if (bind(fd, (struct sockaddr *)&saddr, sizeof(saddr)))
138 		error(1, errno, "failed to bind send socket");
139 
140 	if (connect(fd, (struct sockaddr *)&daddr, sizeof(daddr)))
141 		error(1, errno, "failed to connect send socket");
142 
143 	if (send(fd, "a", 1, 0) < 0)
144 		error(1, errno, "failed to send message");
145 
146 	close(fd);
147 }
148 
149 static
150 void receive_on_cpu(int *rcv_fd, int len, int epfd, int cpu_id, int proto)
151 {
152 	struct epoll_event ev;
153 	int i, fd;
154 	char buf[8];
155 
156 	i = epoll_wait(epfd, &ev, 1, -1);
157 	if (i < 0)
158 		error(1, errno, "epoll_wait failed");
159 
160 	if (proto == SOCK_STREAM) {
161 		fd = accept(ev.data.fd, NULL, NULL);
162 		if (fd < 0)
163 			error(1, errno, "failed to accept");
164 		i = recv(fd, buf, sizeof(buf), 0);
165 		close(fd);
166 	} else {
167 		i = recv(ev.data.fd, buf, sizeof(buf), 0);
168 	}
169 
170 	if (i < 0)
171 		error(1, errno, "failed to recv");
172 
173 	for (i = 0; i < len; ++i)
174 		if (ev.data.fd == rcv_fd[i])
175 			break;
176 	if (i == len)
177 		error(1, 0, "failed to find socket");
178 	fprintf(stderr, "send cpu %d, receive socket %d\n", cpu_id, i);
179 	if (cpu_id != i)
180 		error(1, 0, "cpu id/receive socket mismatch");
181 }
182 
183 static void test(int *rcv_fd, int len, int family, int proto)
184 {
185 	struct epoll_event ev;
186 	int epfd, cpu;
187 
188 	build_rcv_group(rcv_fd, len, family, proto);
189 	attach_bpf(rcv_fd[0]);
190 
191 	epfd = epoll_create(1);
192 	if (epfd < 0)
193 		error(1, errno, "failed to create epoll");
194 	for (cpu = 0; cpu < len; ++cpu) {
195 		ev.events = EPOLLIN;
196 		ev.data.fd = rcv_fd[cpu];
197 		if (epoll_ctl(epfd, EPOLL_CTL_ADD, rcv_fd[cpu], &ev))
198 			error(1, errno, "failed to register sock epoll");
199 	}
200 
201 	/* Forward iterate */
202 	for (cpu = 0; cpu < len; ++cpu) {
203 		send_from_cpu(cpu, family, proto);
204 		receive_on_cpu(rcv_fd, len, epfd, cpu, proto);
205 	}
206 
207 	/* Reverse iterate */
208 	for (cpu = len - 1; cpu >= 0; --cpu) {
209 		send_from_cpu(cpu, family, proto);
210 		receive_on_cpu(rcv_fd, len, epfd, cpu, proto);
211 	}
212 
213 	/* Even cores */
214 	for (cpu = 0; cpu < len; cpu += 2) {
215 		send_from_cpu(cpu, family, proto);
216 		receive_on_cpu(rcv_fd, len, epfd, cpu, proto);
217 	}
218 
219 	/* Odd cores */
220 	for (cpu = 1; cpu < len; cpu += 2) {
221 		send_from_cpu(cpu, family, proto);
222 		receive_on_cpu(rcv_fd, len, epfd, cpu, proto);
223 	}
224 
225 	close(epfd);
226 	for (cpu = 0; cpu < len; ++cpu)
227 		close(rcv_fd[cpu]);
228 }
229 
230 int main(void)
231 {
232 	int *rcv_fd, cpus;
233 
234 	cpus = sysconf(_SC_NPROCESSORS_ONLN);
235 	if (cpus <= 0)
236 		error(1, errno, "failed counting cpus");
237 
238 	rcv_fd = calloc(cpus, sizeof(int));
239 	if (!rcv_fd)
240 		error(1, 0, "failed to allocate array");
241 
242 	fprintf(stderr, "---- IPv4 UDP ----\n");
243 	test(rcv_fd, cpus, AF_INET, SOCK_DGRAM);
244 
245 	fprintf(stderr, "---- IPv6 UDP ----\n");
246 	test(rcv_fd, cpus, AF_INET6, SOCK_DGRAM);
247 
248 	fprintf(stderr, "---- IPv4 TCP ----\n");
249 	test(rcv_fd, cpus, AF_INET, SOCK_STREAM);
250 
251 	fprintf(stderr, "---- IPv6 TCP ----\n");
252 	test(rcv_fd, cpus, AF_INET6, SOCK_STREAM);
253 
254 	free(rcv_fd);
255 
256 	fprintf(stderr, "SUCCESS\n");
257 	return 0;
258 }
259