1 /* 2 * 3 * sched-messaging.c 4 * 5 * messaging: Benchmark for scheduler and IPC mechanisms 6 * 7 * Based on hackbench by Rusty Russell <rusty@rustcorp.com.au> 8 * Ported to perf by Hitoshi Mitake <mitake@dcl.info.waseda.ac.jp> 9 * 10 */ 11 12 #include "../perf.h" 13 #include "../util/util.h" 14 #include <subcmd/parse-options.h> 15 #include "../builtin.h" 16 #include "bench.h" 17 18 /* Test groups of 20 processes spraying to 20 receivers */ 19 #include <pthread.h> 20 #include <stdio.h> 21 #include <stdlib.h> 22 #include <string.h> 23 #include <errno.h> 24 #include <unistd.h> 25 #include <sys/types.h> 26 #include <sys/socket.h> 27 #include <sys/wait.h> 28 #include <sys/time.h> 29 #include <poll.h> 30 #include <limits.h> 31 #include <err.h> 32 #include <linux/time64.h> 33 34 #define DATASIZE 100 35 36 static bool use_pipes = false; 37 static unsigned int nr_loops = 100; 38 static bool thread_mode = false; 39 static unsigned int num_groups = 10; 40 41 struct sender_context { 42 unsigned int num_fds; 43 int ready_out; 44 int wakefd; 45 int out_fds[0]; 46 }; 47 48 struct receiver_context { 49 unsigned int num_packets; 50 int in_fds[2]; 51 int ready_out; 52 int wakefd; 53 }; 54 55 static void fdpair(int fds[2]) 56 { 57 if (use_pipes) { 58 if (pipe(fds) == 0) 59 return; 60 } else { 61 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0) 62 return; 63 } 64 65 err(EXIT_FAILURE, use_pipes ? "pipe()" : "socketpair()"); 66 } 67 68 /* Block until we're ready to go */ 69 static void ready(int ready_out, int wakefd) 70 { 71 char dummy; 72 struct pollfd pollfd = { .fd = wakefd, .events = POLLIN }; 73 74 /* Tell them we're ready. */ 75 if (write(ready_out, &dummy, 1) != 1) 76 err(EXIT_FAILURE, "CLIENT: ready write"); 77 78 /* Wait for "GO" signal */ 79 if (poll(&pollfd, 1, -1) != 1) 80 err(EXIT_FAILURE, "poll"); 81 } 82 83 /* Sender sprays nr_loops messages down each file descriptor */ 84 static void *sender(struct sender_context *ctx) 85 { 86 char data[DATASIZE]; 87 unsigned int i, j; 88 89 ready(ctx->ready_out, ctx->wakefd); 90 91 /* Now pump to every receiver. */ 92 for (i = 0; i < nr_loops; i++) { 93 for (j = 0; j < ctx->num_fds; j++) { 94 int ret, done = 0; 95 96 again: 97 ret = write(ctx->out_fds[j], data + done, 98 sizeof(data)-done); 99 if (ret < 0) 100 err(EXIT_FAILURE, "SENDER: write"); 101 done += ret; 102 if (done < DATASIZE) 103 goto again; 104 } 105 } 106 107 return NULL; 108 } 109 110 111 /* One receiver per fd */ 112 static void *receiver(struct receiver_context* ctx) 113 { 114 unsigned int i; 115 116 if (!thread_mode) 117 close(ctx->in_fds[1]); 118 119 /* Wait for start... */ 120 ready(ctx->ready_out, ctx->wakefd); 121 122 /* Receive them all */ 123 for (i = 0; i < ctx->num_packets; i++) { 124 char data[DATASIZE]; 125 int ret, done = 0; 126 127 again: 128 ret = read(ctx->in_fds[0], data + done, DATASIZE - done); 129 if (ret < 0) 130 err(EXIT_FAILURE, "SERVER: read"); 131 done += ret; 132 if (done < DATASIZE) 133 goto again; 134 } 135 136 return NULL; 137 } 138 139 static pthread_t create_worker(void *ctx, void *(*func)(void *)) 140 { 141 pthread_attr_t attr; 142 pthread_t childid; 143 int ret; 144 145 if (!thread_mode) { 146 /* process mode */ 147 /* Fork the receiver. */ 148 switch (fork()) { 149 case -1: 150 err(EXIT_FAILURE, "fork()"); 151 break; 152 case 0: 153 (*func) (ctx); 154 exit(0); 155 break; 156 default: 157 break; 158 } 159 160 return (pthread_t)0; 161 } 162 163 if (pthread_attr_init(&attr) != 0) 164 err(EXIT_FAILURE, "pthread_attr_init:"); 165 166 #ifndef __ia64__ 167 if (pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN) != 0) 168 err(EXIT_FAILURE, "pthread_attr_setstacksize"); 169 #endif 170 171 ret = pthread_create(&childid, &attr, func, ctx); 172 if (ret != 0) 173 err(EXIT_FAILURE, "pthread_create failed"); 174 175 return childid; 176 } 177 178 static void reap_worker(pthread_t id) 179 { 180 int proc_status; 181 void *thread_status; 182 183 if (!thread_mode) { 184 /* process mode */ 185 wait(&proc_status); 186 if (!WIFEXITED(proc_status)) 187 exit(1); 188 } else { 189 pthread_join(id, &thread_status); 190 } 191 } 192 193 /* One group of senders and receivers */ 194 static unsigned int group(pthread_t *pth, 195 unsigned int num_fds, 196 int ready_out, 197 int wakefd) 198 { 199 unsigned int i; 200 struct sender_context *snd_ctx = malloc(sizeof(struct sender_context) 201 + num_fds * sizeof(int)); 202 203 if (!snd_ctx) 204 err(EXIT_FAILURE, "malloc()"); 205 206 for (i = 0; i < num_fds; i++) { 207 int fds[2]; 208 struct receiver_context *ctx = malloc(sizeof(*ctx)); 209 210 if (!ctx) 211 err(EXIT_FAILURE, "malloc()"); 212 213 214 /* Create the pipe between client and server */ 215 fdpair(fds); 216 217 ctx->num_packets = num_fds * nr_loops; 218 ctx->in_fds[0] = fds[0]; 219 ctx->in_fds[1] = fds[1]; 220 ctx->ready_out = ready_out; 221 ctx->wakefd = wakefd; 222 223 pth[i] = create_worker(ctx, (void *)receiver); 224 225 snd_ctx->out_fds[i] = fds[1]; 226 if (!thread_mode) 227 close(fds[0]); 228 } 229 230 /* Now we have all the fds, fork the senders */ 231 for (i = 0; i < num_fds; i++) { 232 snd_ctx->ready_out = ready_out; 233 snd_ctx->wakefd = wakefd; 234 snd_ctx->num_fds = num_fds; 235 236 pth[num_fds+i] = create_worker(snd_ctx, (void *)sender); 237 } 238 239 /* Close the fds we have left */ 240 if (!thread_mode) 241 for (i = 0; i < num_fds; i++) 242 close(snd_ctx->out_fds[i]); 243 244 /* Return number of children to reap */ 245 return num_fds * 2; 246 } 247 248 static const struct option options[] = { 249 OPT_BOOLEAN('p', "pipe", &use_pipes, 250 "Use pipe() instead of socketpair()"), 251 OPT_BOOLEAN('t', "thread", &thread_mode, 252 "Be multi thread instead of multi process"), 253 OPT_UINTEGER('g', "group", &num_groups, "Specify number of groups"), 254 OPT_UINTEGER('l', "nr_loops", &nr_loops, "Specify the number of loops to run (default: 100)"), 255 OPT_END() 256 }; 257 258 static const char * const bench_sched_message_usage[] = { 259 "perf bench sched messaging <options>", 260 NULL 261 }; 262 263 int bench_sched_messaging(int argc, const char **argv) 264 { 265 unsigned int i, total_children; 266 struct timeval start, stop, diff; 267 unsigned int num_fds = 20; 268 int readyfds[2], wakefds[2]; 269 char dummy; 270 pthread_t *pth_tab; 271 272 argc = parse_options(argc, argv, options, 273 bench_sched_message_usage, 0); 274 275 pth_tab = malloc(num_fds * 2 * num_groups * sizeof(pthread_t)); 276 if (!pth_tab) 277 err(EXIT_FAILURE, "main:malloc()"); 278 279 fdpair(readyfds); 280 fdpair(wakefds); 281 282 total_children = 0; 283 for (i = 0; i < num_groups; i++) 284 total_children += group(pth_tab+total_children, num_fds, 285 readyfds[1], wakefds[0]); 286 287 /* Wait for everyone to be ready */ 288 for (i = 0; i < total_children; i++) 289 if (read(readyfds[0], &dummy, 1) != 1) 290 err(EXIT_FAILURE, "Reading for readyfds"); 291 292 gettimeofday(&start, NULL); 293 294 /* Kick them off */ 295 if (write(wakefds[1], &dummy, 1) != 1) 296 err(EXIT_FAILURE, "Writing to start them"); 297 298 /* Reap them all */ 299 for (i = 0; i < total_children; i++) 300 reap_worker(pth_tab[i]); 301 302 gettimeofday(&stop, NULL); 303 304 timersub(&stop, &start, &diff); 305 306 switch (bench_format) { 307 case BENCH_FORMAT_DEFAULT: 308 printf("# %d sender and receiver %s per group\n", 309 num_fds, thread_mode ? "threads" : "processes"); 310 printf("# %d groups == %d %s run\n\n", 311 num_groups, num_groups * 2 * num_fds, 312 thread_mode ? "threads" : "processes"); 313 printf(" %14s: %lu.%03lu [sec]\n", "Total time", 314 diff.tv_sec, 315 (unsigned long) (diff.tv_usec / USEC_PER_MSEC)); 316 break; 317 case BENCH_FORMAT_SIMPLE: 318 printf("%lu.%03lu\n", diff.tv_sec, 319 (unsigned long) (diff.tv_usec / USEC_PER_MSEC)); 320 break; 321 default: 322 /* reaching here is something disaster */ 323 fprintf(stderr, "Unknown format:%d\n", bench_format); 324 exit(1); 325 break; 326 } 327 328 free(pth_tab); 329 330 return 0; 331 } 332