Implement concurrency.
[statsd-tg.git] / src / statsd-tg.c
1 /**
2  * collectd-td - collectd traffic generator
3  * Copyright (C) 2013       Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian Forster <ff at octo.it>
20  **/
21
22 #if HAVE_CONFIG_H
23 # include "config.h"
24 #endif
25
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <time.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include <assert.h>
34 #include <time.h>
35 #include <pthread.h>
36
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netdb.h>
40
41 #if !__GNUC__
42 # define __attribute__(x) /**/
43 #endif
44
45 #define DEF_NODE "localhost"
46 #define DEF_SERVICE "8125"
47
48 #define DEF_NUM_COUNTERS  1000
49 #define DEF_NUM_TIMERS    1000
50 #define DEF_NUM_GAUGES     100
51 #define DEF_NUM_SETS       100
52 #define DEF_SET_SIZE       128
53
54 static int conf_num_counters = DEF_NUM_COUNTERS;
55 static int conf_num_timers   = DEF_NUM_TIMERS;
56 static int conf_num_gauges   = DEF_NUM_GAUGES;
57 static int conf_num_sets     = DEF_NUM_SETS;
58 static int conf_set_size     = DEF_SET_SIZE;
59 static const char *conf_node = DEF_NODE;
60 static const char *conf_service = DEF_SERVICE;
61
62 static int conf_threads_num = 1;
63
64 static struct sigaction sigint_action;
65 static struct sigaction sigterm_action;
66
67 static unsigned long long events_sent = 0;
68 pthread_mutex_t events_sent_lock = PTHREAD_MUTEX_INITIALIZER;
69 static _Bool loop = 1;
70
71 __attribute__((noreturn))
72 static void exit_usage (int exit_status) /* {{{ */
73 {
74   fprintf ((exit_status == EXIT_FAILURE) ? stderr : stdout,
75       PACKAGE_NAME" -- statsd traffic generator\n"
76       "\n"
77       "  Usage: statsd-ng [OPTION]\n"
78       "\n"
79       "  Valid options:\n"
80       "    -c <number>    Number of counters to emulate. (Default: %i)\n"
81       "    -t <number>    Number of timers to emulate. (Default: %i)\n"
82       "    -g <number>    Number of gauges to emulate. (Default: %i)\n"
83       "    -s <number>    Number of sets to emulate. (Default: %i)\n"
84       "    -S <size>      Number of elements in each set. (Default: %i)\n"
85       "    -d <dest>      Destination address of the network packets.\n"
86       "                   (Default: "DEF_NODE")\n"
87       "    -D <port>      Destination port of the network packets.\n"
88       "                   (Default: "DEF_SERVICE")\n"
89       "    -h             Print usage information (this output).\n"
90       "\n"
91       "Copyright (C) 2013  Florian Forster\n"
92       "Licensed under the GNU General Public License, version 2 (GPLv2)\n",
93       DEF_NUM_COUNTERS, DEF_NUM_TIMERS, DEF_NUM_GAUGES,
94       DEF_NUM_SETS, DEF_SET_SIZE);
95   exit (exit_status);
96 } /* }}} void exit_usage */
97
98 static void signal_handler (int signal __attribute__((unused))) /* {{{ */
99 {
100   loop = 0;
101 } /* }}} void signal_handler */
102
103 static int sock_open (void) /* {{{ */
104 {
105   struct addrinfo ai_hints;
106   struct addrinfo *ai_list = NULL;
107   struct addrinfo *ai_ptr;
108   int sock;
109
110   int status;
111
112   memset (&ai_hints, 0, sizeof (ai_hints));
113 #ifdef AI_ADDRCONFIG
114   ai_hints.ai_flags = AI_ADDRCONFIG;
115 #endif
116   ai_hints.ai_family = AF_UNSPEC;
117   ai_hints.ai_socktype = SOCK_DGRAM;
118
119   status = getaddrinfo (conf_node, conf_service, &ai_hints, &ai_list);
120   if (status != 0)
121   {
122     fprintf (stderr, "getaddrinfo failed: %s\n", gai_strerror (status));
123     exit (EXIT_FAILURE);
124   }
125
126   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
127   {
128     int fd;
129
130     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
131     if (fd < 0)
132     {
133       continue;
134     }
135
136     status = connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
137     if (status != 0)
138     {
139       close (fd);
140       continue;
141     }
142
143     sock = fd;
144     break;
145   }
146
147   freeaddrinfo (ai_list);
148
149   if (sock < 0)
150   {
151     fprintf (stderr, "Opening network socket failed.\n");
152     exit (EXIT_FAILURE);
153   }
154
155   return (sock);
156 } /* }}} int sock_open */
157
158 static int send_random_event (int sock, unsigned short seed[static 3]) /* {{{ */
159 {
160   long conf_num_total = conf_num_counters + conf_num_timers
161       + conf_num_gauges + conf_num_sets;
162   /* Not completely fair, but good enough for our use-case. */
163   long rnd = nrand48 (seed) % conf_num_total;
164
165   long value = nrand48 (seed);
166   char *type;
167
168   char buffer[1024];
169   int buffer_size;
170   ssize_t status;
171
172   if (rnd < conf_num_counters)
173   {
174     /* counter */
175     type = "c";
176     value = (value % 8) + 1;
177   }
178   else if (rnd < (conf_num_counters + conf_num_timers))
179   {
180     /* timer */
181     type = "ms";
182     value = (value % 1024) + 1;
183   }
184   else if (rnd < (conf_num_counters + conf_num_timers + conf_num_gauges))
185   {
186     /* gauge */
187     type = "g";
188     value = (value % 128) - 64;
189   }
190   else
191   {
192     /* set */
193     type = "s";
194     value %= conf_set_size;
195   }
196
197   buffer_size = snprintf (buffer, sizeof (buffer), "%06li:%li|%s",
198                           rnd, value, type);
199   assert (buffer_size > 0);
200   if (((size_t) buffer_size) >= sizeof (buffer))
201     return (-1);
202   assert (buffer[buffer_size] == 0);
203
204   status = send (sock, buffer, (size_t) buffer_size, /* flags = */ 0);
205   if (status < 0)
206   {
207     fprintf (stderr, "send failed: %s\n", strerror (errno));
208     return (-1);
209   }
210
211   return (0);
212 } /* }}} int send_random_event */
213
214 static int get_integer_opt (const char *str, int *ret_value) /* {{{ */
215 {
216   char *endptr;
217   int tmp;
218
219   errno = 0;
220   endptr = NULL;
221   tmp = (int) strtol (str, &endptr, /* base = */ 0);
222   if (errno != 0)
223   {
224     fprintf (stderr, "Unable to parse option as a number: \"%s\": %s\n",
225         str, strerror (errno));
226     exit (EXIT_FAILURE);
227   }
228   else if (endptr == str)
229   {
230     fprintf (stderr, "Unable to parse option as a number: \"%s\"\n", str);
231     exit (EXIT_FAILURE);
232   }
233   else if (*endptr != 0)
234   {
235     fprintf (stderr, "Garbage after end of value: \"%s\"\n", str);
236     exit (EXIT_FAILURE);
237   }
238
239   *ret_value = tmp;
240   return (0);
241 } /* }}} int get_integer_opt */
242
243 static int read_options (int argc, char **argv) /* {{{ */
244 {
245   int opt;
246
247 #ifdef _SC_NPROCESSORS_ONLN
248   conf_threads_num = (int) sysconf (_SC_NPROCESSORS_ONLN);
249 #endif
250
251   while ((opt = getopt (argc, argv, "c:t:g:s:S:d:D:h")) != -1)
252   {
253     switch (opt)
254     {
255       case 'c':
256         get_integer_opt (optarg, &conf_num_counters);
257         break;
258
259       case 't':
260         get_integer_opt (optarg, &conf_num_timers);
261         break;
262
263       case 'g':
264         get_integer_opt (optarg, &conf_num_gauges);
265         break;
266
267       case 's':
268         get_integer_opt (optarg, &conf_num_sets);
269         break;
270
271       case 'S':
272         get_integer_opt (optarg, &conf_set_size);
273         break;
274
275       case 'd':
276         conf_node = optarg;
277         break;
278
279       case 'D':
280         conf_service = optarg;
281         break;
282
283       case 'T':
284         get_integer_opt (optarg, &conf_threads_num);
285         break;
286
287       case 'h':
288         exit_usage (EXIT_SUCCESS);
289
290       default:
291         exit_usage (EXIT_FAILURE);
292     } /* switch (opt) */
293   } /* while (getopt) */
294
295   return (0);
296 } /* }}} int read_options */
297
298 static void *send_thread (void *args __attribute__((unused))) /* {{{ */
299 {
300   int sock;
301   unsigned short seed[3];
302   struct timespec ts;
303
304   unsigned long long local_events_sent = 0;
305
306   clock_gettime (CLOCK_REALTIME, &ts);
307   seed[2] = (unsigned short) (ts.tv_nsec);
308   seed[1] = (unsigned short) (ts.tv_nsec >> 16);
309   seed[0] = (unsigned short) (ts.tv_sec);
310
311   sock = sock_open ();
312
313   while (loop)
314   {
315     send_random_event (sock, seed);
316     local_events_sent++;
317   }
318
319   close (sock);
320
321   pthread_mutex_lock (&events_sent_lock);
322   events_sent += local_events_sent;
323   pthread_mutex_unlock (&events_sent_lock);
324
325   return (NULL);
326 } /* }}} void *send_thread */
327
328 static void run_threads (void) /* {{{ */
329 {
330   pthread_t threads[conf_threads_num];
331   int i;
332
333   for (i = 0; i < conf_threads_num; i++)
334   {
335     int status;
336
337     status = pthread_create (&threads[i], /* attr = */ NULL,
338         send_thread, /* args = */ NULL);
339     if (status != 0)
340     {
341       fprintf (stderr, "pthread_create failed.");
342       abort ();
343     }
344   }
345
346   for (i = 0; i < conf_threads_num; i++)
347     pthread_join (threads[i], /* retval = */ NULL);
348 } /* }}} void run_threads */
349
350 static double timespec_diff (struct timespec const *ts0, /* {{{ */
351     struct timespec const *ts1)
352 {
353   time_t diff_sec;
354   long diff_nsec;
355
356   diff_sec = ts1->tv_sec - ts0->tv_sec;
357   diff_nsec += ts1->tv_nsec - ts0->tv_nsec;
358
359   return ((double) diff_sec) + (((double) diff_nsec) / 1.0e9);
360 } /* }}} double timespec_diff */
361
362 int main (int argc, char **argv) /* {{{ */
363 {
364   struct timespec ts_begin;
365   struct timespec ts_end;
366   double runtime;
367
368   read_options (argc, argv);
369
370   sigint_action.sa_handler = signal_handler;
371   sigaction (SIGINT, &sigint_action, /* old = */ NULL);
372
373   sigterm_action.sa_handler = signal_handler;
374   sigaction (SIGTERM, &sigterm_action, /* old = */ NULL);
375
376   clock_gettime (CLOCK_MONOTONIC, &ts_begin);
377   run_threads ();
378   clock_gettime (CLOCK_MONOTONIC, &ts_end);
379
380   runtime = timespec_diff (&ts_begin, &ts_end);
381   printf ("Sent %llu events in %.0fs (%.0f events/s).\n",
382       events_sent, runtime, ((double) events_sent) / runtime);
383
384   exit (EXIT_SUCCESS);
385   return (0);
386 } /* }}} int main */
387
388 /* vim: set sw=2 sts=2 et fdm=marker : */