X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Funixsock.c;h=0b897482c9d956547d74be3f905f9d9941b157be;hb=3b4201d2235c25ed21174c41c526c9b7894de539;hp=734403e87c944fb606670f205e0e5b43a479bd75;hpb=7e5df1a2c6611bd4ac9fb8ac4b78106f9139ae6e;p=collectd.git diff --git a/src/unixsock.c b/src/unixsock.c index 734403e8..0b897482 100644 --- a/src/unixsock.c +++ b/src/unixsock.c @@ -1,6 +1,6 @@ /** * collectd - src/unixsock.c - * Copyright (C) 2007 Florian octo Forster + * Copyright (C) 2007,2008 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -24,10 +24,18 @@ #include "plugin.h" #include "configfile.h" +#include "utils_cmd_flush.h" +#include "utils_cmd_getval.h" +#include "utils_cmd_getthreshold.h" +#include "utils_cmd_listval.h" +#include "utils_cmd_putval.h" +#include "utils_cmd_putnotif.h" + /* Folks without pthread will need to disable this plugin. */ #include #include +#include #include #include @@ -36,22 +44,7 @@ # define UNIX_PATH_MAX sizeof (((struct sockaddr_un *)0)->sun_path) #endif -#define US_DEFAULT_PATH PREFIX"/var/run/"PACKAGE_NAME"-unixsock" - -/* - * Private data structures - */ -/* linked list of cached values */ -typedef struct value_cache_s -{ - char name[4*DATA_MAX_NAME_LEN]; - int values_num; - gauge_t *gauge; - counter_t *counter; - const data_set_t *ds; - time_t time; - struct value_cache_s *next; -} value_cache_t; +#define US_DEFAULT_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-unixsock" /* * Private variables @@ -61,10 +54,9 @@ static const char *config_keys[] = { "SocketFile", "SocketGroup", - "SocketPerms", - NULL + "SocketPerms" }; -static int config_keys_num = 3; +static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); static int loop = 0; @@ -76,261 +68,9 @@ static int sock_perms = S_IRWXU | S_IRWXG; static pthread_t listen_thread = (pthread_t) 0; -/* Linked list and auxilliary variables for saving values */ -static value_cache_t *cache_head = NULL; -static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; -static unsigned int cache_oldest = UINT_MAX; - /* * Functions */ -static value_cache_t *cache_search (const char *name) -{ - value_cache_t *vc; - - for (vc = cache_head; vc != NULL; vc = vc->next) - { - if (strcmp (vc->name, name) == 0) - break; - } /* for vc = cache_head .. NULL */ - - return (vc); -} /* value_cache_t *cache_search */ - -static int cache_insert (const data_set_t *ds, const value_list_t *vl) -{ - /* We're called from `cache_update' so we don't need to lock the mutex */ - value_cache_t *vc; - int i; - - DEBUG ("unixsock plugin: cache_insert: ds->type = %s; ds->ds_num = %i;" - " vl->values_len = %i;", - ds->type, ds->ds_num, vl->values_len); -#if COLLECT_DEBUG - assert (ds->ds_num == vl->values_len); -#else - if (ds->ds_num != vl->values_len) - { - ERROR ("unixsock plugin: ds->type = %s: (ds->ds_num = %i) != " - "(vl->values_len = %i)", - ds->type, ds->ds_num, vl->values_len); - return (-1); - } -#endif - - vc = (value_cache_t *) malloc (sizeof (value_cache_t)); - if (vc == NULL) - { - char errbuf[1024]; - pthread_mutex_unlock (&cache_lock); - ERROR ("unixsock plugin: malloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return (-1); - } - - vc->gauge = (gauge_t *) malloc (sizeof (gauge_t) * vl->values_len); - if (vc->gauge == NULL) - { - char errbuf[1024]; - pthread_mutex_unlock (&cache_lock); - ERROR ("unixsock plugin: malloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - free (vc); - return (-1); - } - - vc->counter = (counter_t *) malloc (sizeof (counter_t) * vl->values_len); - if (vc->counter == NULL) - { - char errbuf[1024]; - pthread_mutex_unlock (&cache_lock); - ERROR ("unixsock plugin: malloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - free (vc->gauge); - free (vc); - return (-1); - } - - if (FORMAT_VL (vc->name, sizeof (vc->name), vl, ds)) - { - pthread_mutex_unlock (&cache_lock); - ERROR ("unixsock plugin: FORMAT_VL failed."); - free (vc->counter); - free (vc->gauge); - free (vc); - return (-1); - } - - for (i = 0; i < ds->ds_num; i++) - { - if (ds->ds[i].type == DS_TYPE_COUNTER) - { - vc->gauge[i] = 0.0; - vc->counter[i] = vl->values[i].counter; - } - else if (ds->ds[i].type == DS_TYPE_GAUGE) - { - vc->gauge[i] = vl->values[i].gauge; - vc->counter[i] = 0; - } - else - { - vc->gauge[i] = 0.0; - vc->counter[i] = 0; - } - } - vc->values_num = ds->ds_num; - vc->ds = ds; - - vc->next = cache_head; - cache_head = vc; - - vc->time = vl->time; - if (vc->time < cache_oldest) - cache_oldest = vc->time; - - pthread_mutex_unlock (&cache_lock); - return (0); -} /* int cache_insert */ - -static int cache_update (const data_set_t *ds, const value_list_t *vl) -{ - char name[4*DATA_MAX_NAME_LEN];; - value_cache_t *vc; - int i; - - if (FORMAT_VL (name, sizeof (name), vl, ds) != 0) - return (-1); - - pthread_mutex_lock (&cache_lock); - - vc = cache_search (name); - - /* pthread_mutex_lock is called by cache_insert. */ - if (vc == NULL) - return (cache_insert (ds, vl)); - - assert (vc->values_num == ds->ds_num); - assert (vc->values_num == vl->values_len); - - /* Avoid floating-point exceptions due to division by zero. */ - if (vc->time >= vl->time) - { - pthread_mutex_unlock (&cache_lock); - ERROR ("unixsock plugin: vc->time >= vl->time. vc->time = %u; " - "vl->time = %u; vl = %s;", - (unsigned int) vc->time, (unsigned int) vl->time, - name); - return (-1); - } /* if (vc->time >= vl->time) */ - - /* - * Update the values. This is possibly a lot more that you'd expect - * because we honor min and max values and handle counter overflows here. - */ - for (i = 0; i < ds->ds_num; i++) - { - if (ds->ds[i].type == DS_TYPE_COUNTER) - { - if (vl->values[i].counter < vc->counter[i]) - { - if (vl->values[i].counter <= 4294967295U) - { - vc->gauge[i] = ((4294967295U - vl->values[i].counter) - + vc->counter[i]) / (vl->time - vc->time); - } - else - { - vc->gauge[i] = ((18446744073709551615ULL - vl->values[i].counter) - + vc->counter[i]) / (vl->time - vc->time); - } - } - else - { - vc->gauge[i] = (vl->values[i].counter - vc->counter[i]) - / (vl->time - vc->time); - } - - vc->counter[i] = vl->values[i].counter; - } - else if (ds->ds[i].type == DS_TYPE_GAUGE) - { - vc->gauge[i] = vl->values[i].gauge; - vc->counter[i] = 0; - } - else - { - vc->gauge[i] = NAN; - vc->counter[i] = 0; - } - - if (isnan (vc->gauge[i]) - || (!isnan (ds->ds[i].min) && (vc->gauge[i] < ds->ds[i].min)) - || (!isnan (ds->ds[i].max) && (vc->gauge[i] > ds->ds[i].max))) - vc->gauge[i] = NAN; - } /* for i = 0 .. ds->ds_num */ - - vc->ds = ds; - vc->time = vl->time; - - if (vc->time < cache_oldest) - cache_oldest = vc->time; - - pthread_mutex_unlock (&cache_lock); - return (0); -} /* int cache_update */ - -static void cache_flush (int max_age) -{ - value_cache_t *this; - value_cache_t *prev; - time_t now; - - pthread_mutex_lock (&cache_lock); - - now = time (NULL); - - if ((now - cache_oldest) <= max_age) - { - pthread_mutex_unlock (&cache_lock); - return; - } - - cache_oldest = now; - - prev = NULL; - this = cache_head; - - while (this != NULL) - { - if ((now - this->time) <= max_age) - { - if (this->time < cache_oldest) - cache_oldest = this->time; - - prev = this; - this = this->next; - continue; - } - - if (prev == NULL) - cache_head = this->next; - else - prev->next = this->next; - - free (this->gauge); - free (this->counter); - free (this); - - if (prev == NULL) - this = cache_head; - else - this = prev->next; - } /* while (this != NULL) */ - - pthread_mutex_unlock (&cache_lock); -} /* int cache_flush */ - static int us_open_socket (void) { struct sockaddr_un sa; @@ -347,8 +87,8 @@ static int us_open_socket (void) memset (&sa, '\0', sizeof (sa)); sa.sun_family = AF_UNIX; - strncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, - sizeof (sa.sun_path) - 1); + sstrncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, + sizeof (sa.sun_path)); /* unlink (sa.sun_path); */ DEBUG ("unixsock plugin: socket path = %s", sa.sun_path); @@ -364,6 +104,8 @@ static int us_open_socket (void) return (-1); } + chmod (sa.sun_path, sock_perms); + status = listen (sock_fd, 8); if (status != 0) { @@ -414,250 +156,81 @@ static int us_open_socket (void) return (0); } /* int us_open_socket */ -static int us_handle_getval (FILE *fh, char **fields, int fields_num) +static void *us_handle_client (void *arg) { - char *hostname; - char *plugin; - char *plugin_instance; - char *type; - char *type_instance; - char name[4*DATA_MAX_NAME_LEN]; - value_cache_t *vc; - int status; - int i; - - if (fields_num != 2) - { - DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); - fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n", - fields_num); - fflush (fh); - return (-1); - } - DEBUG ("unixsock plugin: Got query for `%s'", fields[1]); + int fdin; + int fdout; + FILE *fhin, *fhout; - status = parse_identifier (fields[1], &hostname, - &plugin, &plugin_instance, - &type, &type_instance); - if (status != 0) - { - DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); - fprintf (fh, "-1 Cannot parse identifier.\n"); - fflush (fh); - return (-1); - } - - status = format_name (name, sizeof (name), - hostname, plugin, plugin_instance, type, type_instance); - if (status != 0) - { - fprintf (fh, "-1 format_name failed.\n"); - return (-1); - } - - pthread_mutex_lock (&cache_lock); + fdin = *((int *) arg); + free (arg); + arg = NULL; - DEBUG ("vc = cache_search (%s)", name); - vc = cache_search (name); + DEBUG ("unixsock plugin: us_handle_client: Reading from fd #%i", fdin); - if (vc == NULL) - { - DEBUG ("Did not find cache entry."); - fprintf (fh, "-1 No such value"); - } - else + fdout = dup (fdin); + if (fdout < 0) { - DEBUG ("Found cache entry."); - fprintf (fh, "%i", vc->values_num); - for (i = 0; i < vc->values_num; i++) - { - fprintf (fh, " %s=", vc->ds->ds[i].name); - if (isnan (vc->gauge[i])) - fprintf (fh, "NaN"); - else - fprintf (fh, "%12e", vc->gauge[i]); - } + char errbuf[1024]; + ERROR ("unixsock plugin: dup failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + close (fdin); + pthread_exit ((void *) 1); } - /* Free the mutex as soon as possible and definitely before flushing */ - pthread_mutex_unlock (&cache_lock); - - fprintf (fh, "\n"); - fflush (fh); - - return (0); -} /* int us_handle_getval */ - -static int us_handle_putval (FILE *fh, char **fields, int fields_num) -{ - char *hostname; - char *plugin; - char *plugin_instance; - char *type; - char *type_instance; - int status; - int i; - - const data_set_t *ds; - value_list_t vl = VALUE_LIST_INIT; - - char **value_ptr; - - if (fields_num != 3) + fhin = fdopen (fdin, "r"); + if (fhin == NULL) { - DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); - fprintf (fh, "-1 Wrong number of fields: Got %i, expected 3.\n", - fields_num); - fflush (fh); - return (-1); + char errbuf[1024]; + ERROR ("unixsock plugin: fdopen failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + close (fdin); + close (fdout); + pthread_exit ((void *) 1); } - status = parse_identifier (fields[1], &hostname, - &plugin, &plugin_instance, - &type, &type_instance); - if (status != 0) + fhout = fdopen (fdout, "w"); + if (fhout == NULL) { - DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); - fprintf (fh, "-1 Cannot parse identifier.\n"); - fflush (fh); - return (-1); + char errbuf[1024]; + ERROR ("unixsock plugin: fdopen failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + fclose (fhin); /* this closes fdin as well */ + close (fdout); + pthread_exit ((void *) 1); } - if ((strlen (hostname) >= sizeof (vl.host)) - || (strlen (plugin) >= sizeof (vl.plugin)) - || ((plugin_instance != NULL) - && (strlen (plugin_instance) >= sizeof (vl.plugin_instance))) - || ((type_instance != NULL) - && (strlen (type_instance) >= sizeof (vl.type_instance)))) + /* change output buffer to line buffered mode */ + if (setvbuf (fhout, NULL, _IOLBF, 0) != 0) { - fprintf (fh, "-1 Identifier too long."); - return (-1); - } - - strcpy (vl.host, hostname); - strcpy (vl.plugin, plugin); - if (plugin_instance != NULL) - strcpy (vl.plugin_instance, plugin_instance); - if (type_instance != NULL) - strcpy (vl.type_instance, type_instance); - - { /* parse the time */ - char *t = fields[2]; - char *v = strchr (t, ':'); - if (v == NULL) - { - fprintf (fh, "-1 No time found."); - return (-1); - } - *v = '\0'; v++; - - vl.time = (time_t) atoi (t); - if (vl.time == 0) - vl.time = time (NULL); - - fields[2] = v; + char errbuf[1024]; + ERROR ("unixsock plugin: setvbuf failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + fclose (fhin); + fclose (fhout); + pthread_exit ((void *) 1); } - ds = plugin_get_ds (type); - if (ds == NULL) - return (-1); - - value_ptr = (char **) calloc (ds->ds_num, sizeof (char *)); - if (value_ptr == NULL) + while (42) { - fprintf (fh, "-1 calloc failed."); - return (-1); - } + char buffer[1024]; + char buffer_copy[1024]; + char *fields[128]; + int fields_num; + int len; - { /* parse the value-list. It's colon-separated. */ - char *dummy; - char *ptr; - char *saveptr; - - i = 0; - dummy = fields[2]; - saveptr = NULL; - while ((ptr = strtok_r (dummy, ":", &saveptr)) != NULL) + errno = 0; + if (fgets (buffer, sizeof (buffer), fhin) == NULL) { - dummy = NULL; - if (i >= ds->ds_num) + if (errno != 0) { - i = ds->ds_num + 1; - break; + char errbuf[1024]; + WARNING ("unixsock plugin: failed to read from socket #%i: %s", + fileno (fhin), + sstrerror (errno, errbuf, sizeof (errbuf))); } - value_ptr[i] = ptr; - i++; - } - - if (i != ds->ds_num) - { - sfree (value_ptr); - fprintf (fh, "-1 Number of values incorrect: Got %i, " - "expected %i.", i, ds->ds_num); - return (-1); + break; } - } /* done parsing the value-list */ - - vl.values_len = ds->ds_num; - vl.values = (value_t *) malloc (vl.values_len * sizeof (value_t)); - if (vl.values == NULL) - { - sfree (value_ptr); - fprintf (fh, "-1 malloc failed."); - return (-1); - } - DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values); - - for (i = 0; i < ds->ds_num; i++) - { - if (strcmp (value_ptr[i], "U") == 0) - vl.values[i].gauge = NAN; - else if (ds->ds[i].type == DS_TYPE_COUNTER) - vl.values[i].counter = atoll (value_ptr[i]); - else if (ds->ds[i].type == DS_TYPE_GAUGE) - vl.values[i].gauge = atof (value_ptr[i]); - } /* for (i = 2 .. fields_num) */ - - plugin_dispatch_values (type, &vl); - - DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values); - - sfree (value_ptr); - sfree (vl.values); - - fprintf (fh, "0 Success\n"); - fflush (fh); - - return (0); -} /* int us_handle_putval */ - -static void *us_handle_client (void *arg) -{ - int fd; - FILE *fh; - char buffer[1024]; - char *fields[128]; - int fields_num; - - fd = *((int *) arg); - free (arg); - arg = NULL; - - DEBUG ("Reading from fd #%i", fd); - - fh = fdopen (fd, "r+"); - if (fh == NULL) - { - char errbuf[1024]; - ERROR ("unixsock plugin: fdopen failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - close (fd); - pthread_exit ((void *) 1); - } - - while (fgets (buffer, sizeof (buffer), fh) != NULL) - { - int len; len = strlen (buffer); while ((len > 0) @@ -667,39 +240,64 @@ static void *us_handle_client (void *arg) if (len == 0) continue; - DEBUG ("fgets -> buffer = %s; len = %i;", buffer, len); + sstrncpy (buffer_copy, buffer, sizeof (buffer_copy)); - fields_num = strsplit (buffer, fields, + fields_num = strsplit (buffer_copy, fields, sizeof (fields) / sizeof (fields[0])); - if (fields_num < 1) { - close (fd); - break; + fprintf (fhout, "-1 Internal error\n"); + fclose (fhin); + fclose (fhout); + pthread_exit ((void *) 1); } if (strcasecmp (fields[0], "getval") == 0) { - us_handle_getval (fh, fields, fields_num); + handle_getval (fhout, buffer); + } + else if (strcasecmp (fields[0], "getthreshold") == 0) + { + handle_getthreshold (fhout, buffer); } else if (strcasecmp (fields[0], "putval") == 0) { - us_handle_putval (fh, fields, fields_num); + handle_putval (fhout, buffer); + } + else if (strcasecmp (fields[0], "listval") == 0) + { + handle_listval (fhout, buffer); + } + else if (strcasecmp (fields[0], "putnotif") == 0) + { + handle_putnotif (fhout, buffer); + } + else if (strcasecmp (fields[0], "flush") == 0) + { + handle_flush (fhout, buffer); } else { - fprintf (fh, "-1 Unknown command: %s\n", fields[0]); - fflush (fh); + if (fprintf (fhout, "-1 Unknown command: %s\n", fields[0]) < 0) + { + char errbuf[1024]; + WARNING ("unixsock plugin: failed to write to socket #%i: %s", + fileno (fhout), + sstrerror (errno, errbuf, sizeof (errbuf))); + break; + } } } /* while (fgets) */ - DEBUG ("Exiting.."); - close (fd); + DEBUG ("unixsock plugin: us_handle_client: Exiting.."); + fclose (fhin); + fclose (fhout); pthread_exit ((void *) 0); + return ((void *) 0); } /* void *us_handle_client */ -static void *us_server_thread (void *arg) +static void *us_server_thread (void __attribute__((unused)) *arg) { int status; int *remote_fd; @@ -711,7 +309,7 @@ static void *us_server_thread (void *arg) while (loop != 0) { - DEBUG ("Calling accept.."); + DEBUG ("unixsock plugin: Calling accept.."); status = accept (sock_fd, NULL, NULL); if (status < 0) { @@ -774,13 +372,21 @@ static int us_config (const char *key, const char *val) { if (strcasecmp (key, "SocketFile") == 0) { + char *new_sock_file = strdup (val); + if (new_sock_file == NULL) + return (1); + sfree (sock_file); - sock_file = strdup (val); + sock_file = new_sock_file; } else if (strcasecmp (key, "SocketGroup") == 0) { + char *new_sock_group = strdup (val); + if (new_sock_group == NULL) + return (1); + sfree (sock_group); - sock_group = strdup (val); + sock_group = new_sock_group; } else if (strcasecmp (key, "SocketPerms") == 0) { @@ -796,8 +402,15 @@ static int us_config (const char *key, const char *val) static int us_init (void) { + static int have_init = 0; + int status; + /* Initialize only once. */ + if (have_init != 0) + return (0); + have_init = 1; + loop = 1; status = pthread_create (&listen_thread, NULL, us_server_thread, NULL); @@ -826,26 +439,16 @@ static int us_shutdown (void) } plugin_unregister_init ("unixsock"); - plugin_unregister_write ("unixsock"); plugin_unregister_shutdown ("unixsock"); return (0); } /* int us_shutdown */ -static int us_write (const data_set_t *ds, const value_list_t *vl) -{ - cache_update (ds, vl); - cache_flush (2 * interval_g); - - return (0); -} - void module_register (void) { plugin_register_config ("unixsock", us_config, config_keys, config_keys_num); plugin_register_init ("unixsock", us_init); - plugin_register_write ("unixsock", us_write); plugin_register_shutdown ("unixsock", us_shutdown); } /* void module_register (void) */