From a6edbeb4abb7bbeb8adaf40e6e815ec70ea193f2 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 11 Jul 2013 13:22:39 +0200 Subject: [PATCH] Implement concurrency. --- src/Makefile.am | 2 ++ src/statsd-tg.c | 107 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 95 insertions(+), 14 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index fe34a05..1189f8e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,3 +1,5 @@ bin_PROGRAMS = statsd-tg statsd_tg_SOURCES = statsd-tg.c +statsd_tg_CFLAGS = $(AM_CFLAGS) -pthread +statsd_tg_LDADD = -lrt diff --git a/src/statsd-tg.c b/src/statsd-tg.c index 8815a8b..3f792f7 100644 --- a/src/statsd-tg.c +++ b/src/statsd-tg.c @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include #include @@ -57,11 +59,13 @@ static int conf_set_size = DEF_SET_SIZE; static const char *conf_node = DEF_NODE; static const char *conf_service = DEF_SERVICE; -static int sock = -1; +static int conf_threads_num = 1; static struct sigaction sigint_action; static struct sigaction sigterm_action; +static unsigned long long events_sent = 0; +pthread_mutex_t events_sent_lock = PTHREAD_MUTEX_INITIALIZER; static _Bool loop = 1; __attribute__((noreturn)) @@ -101,6 +105,7 @@ static int sock_open (void) /* {{{ */ struct addrinfo ai_hints; struct addrinfo *ai_list = NULL; struct addrinfo *ai_ptr; + int sock; int status; @@ -147,17 +152,17 @@ static int sock_open (void) /* {{{ */ exit (EXIT_FAILURE); } - return (0); + return (sock); } /* }}} int sock_open */ -static int send_random_event (void) /* {{{ */ +static int send_random_event (int sock, unsigned short seed[static 3]) /* {{{ */ { long conf_num_total = conf_num_counters + conf_num_timers + conf_num_gauges + conf_num_sets; /* Not completely fair, but good enough for our use-case. */ - long rnd = lrand48 () % conf_num_total; + long rnd = nrand48 (seed) % conf_num_total; - long value = lrand48 (); + long value = nrand48 (seed); char *type; char buffer[1024]; @@ -199,7 +204,7 @@ static int send_random_event (void) /* {{{ */ status = send (sock, buffer, (size_t) buffer_size, /* flags = */ 0); if (status < 0) { - fprintf (stderr, "send failed: %s", strerror (errno)); + fprintf (stderr, "send failed: %s\n", strerror (errno)); return (-1); } @@ -239,6 +244,10 @@ static int read_options (int argc, char **argv) /* {{{ */ { int opt; +#ifdef _SC_NPROCESSORS_ONLN + conf_threads_num = (int) sysconf (_SC_NPROCESSORS_ONLN); +#endif + while ((opt = getopt (argc, argv, "c:t:g:s:S:d:D:h")) != -1) { switch (opt) @@ -271,6 +280,10 @@ static int read_options (int argc, char **argv) /* {{{ */ conf_service = optarg; break; + case 'T': + get_integer_opt (optarg, &conf_threads_num); + break; + case 'h': exit_usage (EXIT_SUCCESS); @@ -282,8 +295,76 @@ static int read_options (int argc, char **argv) /* {{{ */ return (0); } /* }}} int read_options */ +static void *send_thread (void *args __attribute__((unused))) /* {{{ */ +{ + int sock; + unsigned short seed[3]; + struct timespec ts; + + unsigned long long local_events_sent = 0; + + clock_gettime (CLOCK_REALTIME, &ts); + seed[2] = (unsigned short) (ts.tv_nsec); + seed[1] = (unsigned short) (ts.tv_nsec >> 16); + seed[0] = (unsigned short) (ts.tv_sec); + + sock = sock_open (); + + while (loop) + { + send_random_event (sock, seed); + local_events_sent++; + } + + close (sock); + + pthread_mutex_lock (&events_sent_lock); + events_sent += local_events_sent; + pthread_mutex_unlock (&events_sent_lock); + + return (NULL); +} /* }}} void *send_thread */ + +static void run_threads (void) /* {{{ */ +{ + pthread_t threads[conf_threads_num]; + int i; + + for (i = 0; i < conf_threads_num; i++) + { + int status; + + status = pthread_create (&threads[i], /* attr = */ NULL, + send_thread, /* args = */ NULL); + if (status != 0) + { + fprintf (stderr, "pthread_create failed."); + abort (); + } + } + + for (i = 0; i < conf_threads_num; i++) + pthread_join (threads[i], /* retval = */ NULL); +} /* }}} void run_threads */ + +static double timespec_diff (struct timespec const *ts0, /* {{{ */ + struct timespec const *ts1) +{ + time_t diff_sec; + long diff_nsec; + + diff_sec = ts1->tv_sec - ts0->tv_sec; + diff_nsec += ts1->tv_nsec - ts0->tv_nsec; + + return ((double) diff_sec) + (((double) diff_nsec) / 1.0e9); +} /* }}} double timespec_diff */ + int main (int argc, char **argv) /* {{{ */ { + struct timespec ts_begin; + struct timespec ts_end; + double runtime; + read_options (argc, argv); sigint_action.sa_handler = signal_handler; @@ -292,15 +373,13 @@ int main (int argc, char **argv) /* {{{ */ sigterm_action.sa_handler = signal_handler; sigaction (SIGTERM, &sigterm_action, /* old = */ NULL); - sock_open (); + clock_gettime (CLOCK_MONOTONIC, &ts_begin); + run_threads (); + clock_gettime (CLOCK_MONOTONIC, &ts_end); - while (loop) - { - send_random_event (); - } - - close (sock); - sock = -1; + runtime = timespec_diff (&ts_begin, &ts_end); + printf ("Sent %llu events in %.0fs (%.0f events/s).\n", + events_sent, runtime, ((double) events_sent) / runtime); exit (EXIT_SUCCESS); return (0); -- 2.11.0