X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Funixsock.c;h=6abbfef9bd248abac42b3578eb6d3d1daf844df9;hb=c7495cc1cada2553050e1b5b8dae8335f34ce867;hp=82cfc60433d78209affcbbae61ccfb31a18cbd85;hpb=39d723f4cb1d50b7ebb90c3251f0ebabad576412;p=collectd.git diff --git a/src/unixsock.c b/src/unixsock.c index 82cfc604..6abbfef9 100644 --- a/src/unixsock.c +++ b/src/unixsock.c @@ -23,13 +23,13 @@ #include "common.h" #include "plugin.h" #include "configfile.h" +#include "utils_cmd_putval.h" /* Folks without pthread will need to disable this plugin. */ #include #include #include -#include #include @@ -37,7 +37,7 @@ # define UNIX_PATH_MAX sizeof (((struct sockaddr_un *)0)->sun_path) #endif -#define US_DEFAULT_PATH PREFIX"/var/run/"PACKAGE_NAME"-unixsock" +#define US_DEFAULT_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-unixsock" /* * Private data structures @@ -67,6 +67,8 @@ static const char *config_keys[] = }; static int config_keys_num = 3; +static int loop = 0; + /* socket configuration */ static int sock_fd = -1; static char *sock_file = NULL; @@ -83,52 +85,6 @@ static unsigned int cache_oldest = UINT_MAX; /* * Functions */ -static int parse_identifier (char *str, char **ret_host, - char **ret_plugin, char **ret_plugin_instance, - char **ret_type, char **ret_type_instance) -{ - char *hostname = NULL; - char *plugin = NULL; - char *plugin_instance = NULL; - char *type = NULL; - char *type_instance = NULL; - - hostname = str; - if (hostname == NULL) - return (-1); - - plugin = strchr (hostname, '/'); - if (plugin == NULL) - return (-1); - *plugin = '\0'; plugin++; - - type = strchr (plugin, '/'); - if (type == NULL) - return (-1); - *type = '\0'; type++; - - plugin_instance = strchr (plugin, '-'); - if (plugin_instance != NULL) - { - *plugin_instance = '\0'; - plugin_instance++; - } - - type_instance = strchr (type, '-'); - if (type_instance != NULL) - { - *type_instance = '\0'; - type_instance++; - } - - *ret_host = hostname; - *ret_plugin = plugin; - *ret_plugin_instance = plugin_instance; - *ret_type = type; - *ret_type_instance = type_instance; - return (0); -} /* int parse_identifier */ - static value_cache_t *cache_search (const char *name) { value_cache_t *vc; @@ -142,49 +98,26 @@ static value_cache_t *cache_search (const char *name) return (vc); } /* value_cache_t *cache_search */ -static int cache_alloc_name (char *ret, int ret_len, - const char *hostname, - const char *plugin, const char *plugin_instance, - const char *type, const char *type_instance) -{ - int status; - - assert (plugin != NULL); - assert (type != NULL); - - if ((plugin_instance == NULL) || (strlen (plugin_instance) == 0)) - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s/%s", - hostname, plugin, type); - else - status = snprintf (ret, ret_len, "%s/%s/%s-%s", - hostname, plugin, type, type_instance); - } - else - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s-%s/%s", - hostname, plugin, plugin_instance, type); - else - status = snprintf (ret, ret_len, "%s/%s-%s/%s-%s", - hostname, plugin, plugin_instance, type, type_instance); - } - - if ((status < 1) || (status >= ret_len)) - return (-1); - return (0); -} /* int cache_alloc_name */ - static int cache_insert (const data_set_t *ds, const value_list_t *vl) { /* We're called from `cache_update' so we don't need to lock the mutex */ value_cache_t *vc; int i; - DEBUG ("ds->ds_num = %i; vl->values_len = %i;", - ds->ds_num, vl->values_len); + DEBUG ("unixsock plugin: cache_insert: ds->type = %s; ds->ds_num = %i;" + " vl->values_len = %i;", + ds->type, ds->ds_num, vl->values_len); +#if COLLECT_DEBUG assert (ds->ds_num == vl->values_len); +#else + if (ds->ds_num != vl->values_len) + { + ERROR ("unixsock plugin: ds->type = %s: (ds->ds_num = %i) != " + "(vl->values_len = %i)", + ds->type, ds->ds_num, vl->values_len); + return (-1); + } +#endif vc = (value_cache_t *) malloc (sizeof (value_cache_t)); if (vc == NULL) @@ -219,12 +152,10 @@ static int cache_insert (const data_set_t *ds, const value_list_t *vl) return (-1); } - if (cache_alloc_name (vc->name, sizeof (vc->name), - vl->host, vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (vc->name, sizeof (vc->name), vl, ds)) { pthread_mutex_unlock (&cache_lock); - ERROR ("unixsock plugin: cache_alloc_name failed."); + ERROR ("unixsock plugin: FORMAT_VL failed."); free (vc->counter); free (vc->gauge); free (vc); @@ -269,22 +200,31 @@ static int cache_update (const data_set_t *ds, const value_list_t *vl) value_cache_t *vc; int i; - if (cache_alloc_name (name, sizeof (name), - vl->host, - vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (name, sizeof (name), vl, ds) != 0) return (-1); pthread_mutex_lock (&cache_lock); vc = cache_search (name); + /* pthread_mutex_lock is called by cache_insert. */ if (vc == NULL) return (cache_insert (ds, vl)); assert (vc->values_num == ds->ds_num); assert (vc->values_num == vl->values_len); + /* Avoid floating-point exceptions due to division by zero. */ + if (vc->time >= vl->time) + { + pthread_mutex_unlock (&cache_lock); + ERROR ("unixsock plugin: vc->time >= vl->time. vc->time = %u; " + "vl->time = %u; vl = %s;", + (unsigned int) vc->time, (unsigned int) vl->time, + name); + return (-1); + } /* if (vc->time >= vl->time) */ + /* * Update the values. This is possibly a lot more that you'd expect * because we honor min and max values and handle counter overflows here. @@ -390,7 +330,7 @@ static void cache_flush (int max_age) } /* while (this != NULL) */ pthread_mutex_unlock (&cache_lock); -} /* int cache_flush */ +} /* void cache_flush */ static int us_open_socket (void) { @@ -412,12 +352,13 @@ static int us_open_socket (void) sizeof (sa.sun_path) - 1); /* unlink (sa.sun_path); */ + DEBUG ("unixsock plugin: socket path = %s", sa.sun_path); + status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa)); if (status != 0) { char errbuf[1024]; sstrerror (errno, errbuf, sizeof (errbuf)); - DEBUG ("bind failed: %s; sa.sun_path = %s", errbuf, sa.sun_path); ERROR ("unixsock plugin: bind failed: %s", errbuf); close (sock_fd); sock_fd = -1; @@ -487,18 +428,33 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) int i; if (fields_num != 2) + { + DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n", + fields_num); + fflush (fh); return (-1); + } + DEBUG ("unixsock plugin: Got query for `%s'", fields[1]); status = parse_identifier (fields[1], &hostname, &plugin, &plugin_instance, &type, &type_instance); if (status != 0) + { + DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); + fprintf (fh, "-1 Cannot parse identifier.\n"); + fflush (fh); return (-1); + } - status = cache_alloc_name (name, sizeof (name), + status = format_name (name, sizeof (name), hostname, plugin, plugin_instance, type, type_instance); if (status != 0) + { + fprintf (fh, "-1 format_name failed.\n"); return (-1); + } pthread_mutex_lock (&cache_lock); @@ -533,117 +489,55 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) return (0); } /* int us_handle_getval */ -static int us_handle_putval (FILE *fh, char **fields, int fields_num) +static int us_handle_listval (FILE *fh, char **fields, int fields_num) { - char *hostname; - char *plugin; - char *plugin_instance; - char *type; - char *type_instance; - int status; - int i; - - const data_set_t *ds; - value_list_t vl = VALUE_LIST_INIT; - - char **value_ptr; - - if (fields_num != 3) - return (-1); - - status = parse_identifier (fields[1], &hostname, - &plugin, &plugin_instance, - &type, &type_instance); - if (status != 0) - return (-1); + char buffer[1024]; + char **value_list = NULL; + int value_list_len = 0; + value_cache_t *entry; + int i; - if ((strlen (hostname) > sizeof (vl.host)) - || (strlen (plugin) > sizeof (vl.plugin)) - || (strlen (plugin_instance) > sizeof (vl.plugin_instance)) - || (strlen (type_instance) > sizeof (vl.type_instance))) + if (fields_num != 1) + { + DEBUG ("unixsock plugin: us_handle_listval: " + "Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 1.\n", + fields_num); + fflush (fh); return (-1); - - strcpy (vl.host, hostname); - strcpy (vl.plugin, plugin); - strcpy (vl.plugin_instance, plugin_instance); - strcpy (vl.type_instance, type_instance); - - { /* parse the time */ - char *t = fields[2]; - char *v = strchr (t, ':'); - if (v == NULL) - return (-1); - *v = '\0'; v++; - - vl.time = (time_t) atoi (t); - if (vl.time == 0) - vl.time = time (NULL); - - fields[2] = v; } - ds = plugin_get_ds (type); - if (ds == NULL) - return (-1); - - value_ptr = (char **) calloc (ds->ds_num, sizeof (char *)); - if (value_ptr == NULL) - return (-1); - - - { /* parse the value-list. It's colon-separated. */ - char *dummy; - char *ptr; - char *saveptr; - - i = 0; - dummy = fields[2]; - saveptr = NULL; - while ((ptr = strtok_r (dummy, ":", &saveptr)) != NULL) - { - dummy = NULL; - if (i >= ds->ds_num) - { - i = ds->ds_num + 1; - break; - } - value_ptr[i] = ptr; - i++; - } - - if (i != ds->ds_num) - { - free (value_ptr); - return (-1); - } - } /* done parsing the value-list */ + pthread_mutex_lock (&cache_lock); - vl.values_len = fields_num - 2; - vl.values = (value_t *) malloc (vl.values_len * sizeof (value_t)); - if (vl.values == NULL) + for (entry = cache_head; entry != NULL; entry = entry->next) { - free (value_ptr); - return (-1); - } - vl.values_len = ds->ds_num; + char **tmp; + + snprintf (buffer, sizeof (buffer), "%u %s\n", + (unsigned int) entry->time, entry->name); + buffer[sizeof (buffer) - 1] = '\0'; + + tmp = realloc (value_list, sizeof (char *) * (value_list_len + 1)); + if (tmp == NULL) + continue; + value_list = tmp; - for (i = 0; i < ds->ds_num; i++) - { - if (strcmp (value_ptr[i], "U") == 0) - vl.values[i].gauge = NAN; - else if (ds->ds[i].type == DS_TYPE_COUNTER) - vl.values[i].counter = atoll (value_ptr[i]); - else if (ds->ds[i].type == DS_TYPE_GAUGE) - vl.values[i].gauge = atof (value_ptr[i]); - } /* for (i = 2 .. fields_num) */ - sfree (value_ptr); + value_list[value_list_len] = strdup (buffer); + + if (value_list[value_list_len] != NULL) + value_list_len++; + } /* for (entry) */ - plugin_dispatch_values (type, &vl); + pthread_mutex_unlock (&cache_lock); - sfree (vl.values); + DEBUG ("unixsock plugin: us_handle_listval: value_list_len = %i", value_list_len); + fprintf (fh, "%i Values found\n", value_list_len); + for (i = 0; i < value_list_len; i++) + fputs (value_list[i], fh); + fflush (fh); return (0); -} /* int us_handle_putval */ +} /* int us_handle_listval */ static void *us_handle_client (void *arg) { @@ -698,11 +592,15 @@ static void *us_handle_client (void *arg) } else if (strcasecmp (fields[0], "putval") == 0) { - us_handle_putval (fh, fields, fields_num); + handle_putval (fh, fields, fields_num); + } + else if (strcasecmp (fields[0], "listval") == 0) + { + us_handle_listval (fh, fields, fields_num); } else { - fprintf (fh, "Unknown command: %s\n", fields[0]); + fprintf (fh, "-1 Unknown command: %s\n", fields[0]); fflush (fh); } } /* while (fgets) */ @@ -723,7 +621,7 @@ static void *us_server_thread (void *arg) if (us_open_socket () != 0) pthread_exit ((void *) 1); - while (42) + while (loop != 0) { DEBUG ("Calling accept.."); status = accept (sock_fd, NULL, NULL); @@ -767,7 +665,19 @@ static void *us_server_thread (void *arg) free (remote_fd); continue; } - } /* while (42) */ + } /* while (loop) */ + + close (sock_fd); + sock_fd = -1; + + status = unlink ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH); + if (status != 0) + { + char errbuf[1024]; + NOTICE ("unixsock plugin: unlink (%s) failed: %s", + (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, + sstrerror (errno, errbuf, sizeof (errbuf))); + } return ((void *) 0); } /* void *us_server_thread */ @@ -800,6 +710,8 @@ static int us_init (void) { int status; + loop = 1; + status = pthread_create (&listen_thread, NULL, us_server_thread, NULL); if (status != 0) { @@ -816,6 +728,8 @@ static int us_shutdown (void) { void *ret; + loop = 0; + if (listen_thread != (pthread_t) 0) { pthread_kill (listen_thread, SIGTERM); @@ -833,7 +747,7 @@ static int us_shutdown (void) static int us_write (const data_set_t *ds, const value_list_t *vl) { cache_update (ds, vl); - cache_flush (2 * atoi (COLLECTD_STEP)); + cache_flush (2 * interval_g); return (0); }