X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Funixsock.c;h=215abdd01ec1daeaf8819781bbdd79cac5cc4af9;hb=aa39b2bf774bf297f6c1a3967764bd7d84e0a49f;hp=12ebc0995cf0d2bfe85ca9c56768ca83988f8c92;hpb=79d36d9eb61ac09f253769feff144383b2b665c9;p=collectd.git diff --git a/src/unixsock.c b/src/unixsock.c index 12ebc099..215abdd0 100644 --- a/src/unixsock.c +++ b/src/unixsock.c @@ -23,14 +23,12 @@ #include "common.h" #include "plugin.h" #include "configfile.h" -#include "utils_debug.h" /* Folks without pthread will need to disable this plugin. */ #include #include #include -#include #include @@ -68,6 +66,8 @@ static const char *config_keys[] = }; static int config_keys_num = 3; +static int loop = 0; + /* socket configuration */ static int sock_fd = -1; static char *sock_file = NULL; @@ -97,65 +97,44 @@ static value_cache_t *cache_search (const char *name) return (vc); } /* value_cache_t *cache_search */ -static int cache_alloc_name (char *ret, int ret_len, - const char *hostname, - const char *plugin, const char *plugin_instance, - const char *type, const char *type_instance) -{ - int status; - - assert (plugin != NULL); - assert (type != NULL); - - if ((plugin_instance == NULL) || (strlen (plugin_instance) == 0)) - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s/%s", - hostname, plugin, type); - else - status = snprintf (ret, ret_len, "%s/%s/%s-%s", - hostname, plugin, type, type_instance); - } - else - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s-%s/%s", - hostname, plugin, plugin_instance, type); - else - status = snprintf (ret, ret_len, "%s/%s-%s/%s-%s", - hostname, plugin, plugin_instance, type, type_instance); - } - - if ((status < 1) || (status >= ret_len)) - return (-1); - return (0); -} /* int cache_alloc_name */ - static int cache_insert (const data_set_t *ds, const value_list_t *vl) { /* We're called from `cache_update' so we don't need to lock the mutex */ value_cache_t *vc; int i; - DBG ("ds->ds_num = %i; vl->values_len = %i;", - ds->ds_num, vl->values_len); + DEBUG ("unixsock plugin: cache_insert: ds->type = %s; ds->ds_num = %i;" + " vl->values_len = %i;", + ds->type, ds->ds_num, vl->values_len); +#if COLLECT_DEBUG assert (ds->ds_num == vl->values_len); +#else + if (ds->ds_num != vl->values_len) + { + ERROR ("unixsock plugin: ds->type = %s: (ds->ds_num = %i) != " + "(vl->values_len = %i)", + ds->type, ds->ds_num, vl->values_len); + return (-1); + } +#endif vc = (value_cache_t *) malloc (sizeof (value_cache_t)); if (vc == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: malloc failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } vc->gauge = (gauge_t *) malloc (sizeof (gauge_t) * vl->values_len); if (vc->gauge == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: malloc failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); free (vc); return (-1); } @@ -163,20 +142,19 @@ static int cache_insert (const data_set_t *ds, const value_list_t *vl) vc->counter = (counter_t *) malloc (sizeof (counter_t) * vl->values_len); if (vc->counter == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: malloc failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); free (vc->gauge); free (vc); return (-1); } - if (cache_alloc_name (vc->name, sizeof (vc->name), - vl->host, vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (vc->name, sizeof (vc->name), vl, ds)) { pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: cache_alloc_name failed."); + ERROR ("unixsock plugin: FORMAT_VL failed."); free (vc->counter); free (vc->gauge); free (vc); @@ -221,22 +199,31 @@ static int cache_update (const data_set_t *ds, const value_list_t *vl) value_cache_t *vc; int i; - if (cache_alloc_name (name, sizeof (name), - vl->host, - vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (name, sizeof (name), vl, ds) != 0) return (-1); pthread_mutex_lock (&cache_lock); vc = cache_search (name); + /* pthread_mutex_lock is called by cache_insert. */ if (vc == NULL) return (cache_insert (ds, vl)); assert (vc->values_num == ds->ds_num); assert (vc->values_num == vl->values_len); + /* Avoid floating-point exceptions due to division by zero. */ + if (vc->time >= vl->time) + { + pthread_mutex_unlock (&cache_lock); + ERROR ("unixsock plugin: vc->time >= vl->time. vc->time = %u; " + "vl->time = %u; vl = %s;", + (unsigned int) vc->time, (unsigned int) vl->time, + name); + return (-1); + } /* if (vc->time >= vl->time) */ + /* * Update the values. This is possibly a lot more that you'd expect * because we honor min and max values and handle counter overflows here. @@ -277,9 +264,9 @@ static int cache_update (const data_set_t *ds, const value_list_t *vl) vc->counter[i] = 0; } - if ((vc->gauge[i] == NAN) - || ((ds->ds[i].min != NAN) && (vc->gauge[i] < ds->ds[i].min)) - || ((ds->ds[i].max != NAN) && (vc->gauge[i] > ds->ds[i].max))) + if (isnan (vc->gauge[i]) + || (!isnan (ds->ds[i].min) && (vc->gauge[i] < ds->ds[i].min)) + || (!isnan (ds->ds[i].max) && (vc->gauge[i] > ds->ds[i].max))) vc->gauge[i] = NAN; } /* for i = 0 .. ds->ds_num */ @@ -352,8 +339,9 @@ static int us_open_socket (void) sock_fd = socket (PF_UNIX, SOCK_STREAM, 0); if (sock_fd < 0) { - syslog (LOG_ERR, "unixsock plugin: socket failed: %s", - strerror (errno)); + char errbuf[1024]; + ERROR ("unixsock plugin: socket failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -366,10 +354,10 @@ static int us_open_socket (void) status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa)); if (status != 0) { - DBG ("bind failed: %s; sa.sun_path = %s", - strerror (errno), sa.sun_path); - syslog (LOG_ERR, "unixsock plugin: bind failed: %s", - strerror (errno)); + char errbuf[1024]; + sstrerror (errno, errbuf, sizeof (errbuf)); + DEBUG ("bind failed: %s; sa.sun_path = %s", errbuf, sa.sun_path); + ERROR ("unixsock plugin: bind failed: %s", errbuf); close (sock_fd); sock_fd = -1; return (-1); @@ -378,8 +366,9 @@ static int us_open_socket (void) status = listen (sock_fd, 8); if (status != 0) { - syslog (LOG_ERR, "unixsock plugin: listen failed: %s", - strerror (errno)); + char errbuf[1024]; + ERROR ("unixsock plugin: listen failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (sock_fd); sock_fd = -1; return (-1); @@ -387,29 +376,37 @@ static int us_open_socket (void) do { + char *grpname; struct group *g; + struct group sg; + char grbuf[2048]; - errno = 0; - g = getgrnam ((sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME); + grpname = (sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME; + g = NULL; - if (errno != 0) + status = getgrnam_r (grpname, &sg, grbuf, sizeof (grbuf), &g); + if (status != 0) { - syslog (LOG_WARNING, "unixsock plugin: getgrnam (%s) failed: %s", - (sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME, - strerror (errno)); + char errbuf[1024]; + WARNING ("unixsock plugin: getgrnam_r (%s) failed: %s", grpname, + sstrerror (errno, errbuf, sizeof (errbuf))); break; } - if (g == NULL) + { + WARNING ("unixsock plugin: No such group: `%s'", + grpname); break; + } if (chown ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH, (uid_t) -1, g->gr_gid) != 0) { - syslog (LOG_WARNING, "unixsock plugin: chown (%s, -1, %i) failed: %s", + char errbuf[1024]; + WARNING ("unixsock plugin: chown (%s, -1, %i) failed: %s", (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, (int) g->gr_gid, - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); } } while (0); @@ -418,7 +415,7 @@ static int us_open_socket (void) static int us_handle_getval (FILE *fh, char **fields, int fields_num) { - char *hostname = fields[1]; + char *hostname; char *plugin; char *plugin_instance; char *type; @@ -429,55 +426,52 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) int i; if (fields_num != 2) - return (-1); - - plugin = strchr (hostname, '/'); - if (plugin == NULL) - return (-1); - *plugin = '\0'; plugin++; - - type = strchr (plugin, '/'); - if (type == NULL) - return (-1); - *type = '\0'; type++; - - plugin_instance = strchr (plugin, '-'); - if (plugin_instance != NULL) { - *plugin_instance = '\0'; - plugin_instance++; + DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n", + fields_num); + fflush (fh); + return (-1); } + DEBUG ("unixsock plugin: Got query for `%s'", fields[1]); - type_instance = strchr (type, '-'); - if (type_instance != NULL) + status = parse_identifier (fields[1], &hostname, + &plugin, &plugin_instance, + &type, &type_instance); + if (status != 0) { - *type_instance = '\0'; - type_instance++; + DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); + fprintf (fh, "-1 Cannot parse identifier.\n"); + fflush (fh); + return (-1); } - status = cache_alloc_name (name, sizeof (name), + status = format_name (name, sizeof (name), hostname, plugin, plugin_instance, type, type_instance); if (status != 0) + { + fprintf (fh, "-1 format_name failed.\n"); return (-1); + } pthread_mutex_lock (&cache_lock); - DBG ("vc = cache_search (%s)", name); + DEBUG ("vc = cache_search (%s)", name); vc = cache_search (name); if (vc == NULL) { - DBG ("Did not find cache entry."); + DEBUG ("Did not find cache entry."); fprintf (fh, "-1 No such value"); } else { - DBG ("Found cache entry."); + DEBUG ("Found cache entry."); fprintf (fh, "%i", vc->values_num); for (i = 0; i < vc->values_num; i++) { fprintf (fh, " %s=", vc->ds->ds[i].name); - if (vc->gauge[i] == NAN) + if (isnan (vc->gauge[i])) fprintf (fh, "NaN"); else fprintf (fh, "%12e", vc->gauge[i]); @@ -493,6 +487,149 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) return (0); } /* int us_handle_getval */ +static int us_handle_putval (FILE *fh, char **fields, int fields_num) +{ + char *hostname; + char *plugin; + char *plugin_instance; + char *type; + char *type_instance; + int status; + int i; + + const data_set_t *ds; + value_list_t vl = VALUE_LIST_INIT; + + char **value_ptr; + + if (fields_num != 3) + { + DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 3.\n", + fields_num); + fflush (fh); + return (-1); + } + + status = parse_identifier (fields[1], &hostname, + &plugin, &plugin_instance, + &type, &type_instance); + if (status != 0) + { + DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); + fprintf (fh, "-1 Cannot parse identifier.\n"); + fflush (fh); + return (-1); + } + + if ((strlen (hostname) >= sizeof (vl.host)) + || (strlen (plugin) >= sizeof (vl.plugin)) + || ((plugin_instance != NULL) + && (strlen (plugin_instance) >= sizeof (vl.plugin_instance))) + || ((type_instance != NULL) + && (strlen (type_instance) >= sizeof (vl.type_instance)))) + { + fprintf (fh, "-1 Identifier too long."); + return (-1); + } + + strcpy (vl.host, hostname); + strcpy (vl.plugin, plugin); + if (plugin_instance != NULL) + strcpy (vl.plugin_instance, plugin_instance); + if (type_instance != NULL) + strcpy (vl.type_instance, type_instance); + + { /* parse the time */ + char *t = fields[2]; + char *v = strchr (t, ':'); + if (v == NULL) + { + fprintf (fh, "-1 No time found."); + return (-1); + } + *v = '\0'; v++; + + vl.time = (time_t) atoi (t); + if (vl.time == 0) + vl.time = time (NULL); + + fields[2] = v; + } + + ds = plugin_get_ds (type); + if (ds == NULL) + return (-1); + + value_ptr = (char **) calloc (ds->ds_num, sizeof (char *)); + if (value_ptr == NULL) + { + fprintf (fh, "-1 calloc failed."); + return (-1); + } + + { /* parse the value-list. It's colon-separated. */ + char *dummy; + char *ptr; + char *saveptr; + + i = 0; + dummy = fields[2]; + saveptr = NULL; + while ((ptr = strtok_r (dummy, ":", &saveptr)) != NULL) + { + dummy = NULL; + if (i >= ds->ds_num) + { + i = ds->ds_num + 1; + break; + } + value_ptr[i] = ptr; + i++; + } + + if (i != ds->ds_num) + { + sfree (value_ptr); + fprintf (fh, "-1 Number of values incorrect: Got %i, " + "expected %i.", i, ds->ds_num); + return (-1); + } + } /* done parsing the value-list */ + + vl.values_len = ds->ds_num; + vl.values = (value_t *) malloc (vl.values_len * sizeof (value_t)); + if (vl.values == NULL) + { + sfree (value_ptr); + fprintf (fh, "-1 malloc failed."); + return (-1); + } + DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values); + + for (i = 0; i < ds->ds_num; i++) + { + if (strcmp (value_ptr[i], "U") == 0) + vl.values[i].gauge = NAN; + else if (ds->ds[i].type == DS_TYPE_COUNTER) + vl.values[i].counter = atoll (value_ptr[i]); + else if (ds->ds[i].type == DS_TYPE_GAUGE) + vl.values[i].gauge = atof (value_ptr[i]); + } /* for (i = 2 .. fields_num) */ + + plugin_dispatch_values (type, &vl); + + DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values); + + sfree (value_ptr); + sfree (vl.values); + + fprintf (fh, "0 Success\n"); + fflush (fh); + + return (0); +} /* int us_handle_putval */ + static void *us_handle_client (void *arg) { int fd; @@ -505,13 +642,14 @@ static void *us_handle_client (void *arg) free (arg); arg = NULL; - DBG ("Reading from fd #%i", fd); + DEBUG ("Reading from fd #%i", fd); fh = fdopen (fd, "r+"); if (fh == NULL) { - syslog (LOG_ERR, "unixsock plugin: fdopen failed: %s", - strerror (errno)); + char errbuf[1024]; + ERROR ("unixsock plugin: fdopen failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (fd); pthread_exit ((void *) 1); } @@ -528,7 +666,7 @@ static void *us_handle_client (void *arg) if (len == 0) continue; - DBG ("fgets -> buffer = %s; len = %i;", buffer, len); + DEBUG ("fgets -> buffer = %s; len = %i;", buffer, len); fields_num = strsplit (buffer, fields, sizeof (fields) / sizeof (fields[0])); @@ -543,14 +681,18 @@ static void *us_handle_client (void *arg) { us_handle_getval (fh, fields, fields_num); } + else if (strcasecmp (fields[0], "putval") == 0) + { + us_handle_putval (fh, fields, fields_num); + } else { - fprintf (fh, "Unknown command: %s\n", fields[0]); + fprintf (fh, "-1 Unknown command: %s\n", fields[0]); fflush (fh); } } /* while (fgets) */ - DBG ("Exiting.."); + DEBUG ("Exiting.."); close (fd); pthread_exit ((void *) 0); @@ -566,17 +708,19 @@ static void *us_server_thread (void *arg) if (us_open_socket () != 0) pthread_exit ((void *) 1); - while (42) + while (loop != 0) { - DBG ("Calling accept.."); + DEBUG ("Calling accept.."); status = accept (sock_fd, NULL, NULL); if (status < 0) { + char errbuf[1024]; + if (errno == EINTR) continue; - syslog (LOG_ERR, "unixsock plugin: accept failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: accept failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (sock_fd); sock_fd = -1; pthread_exit ((void *) 1); @@ -585,14 +729,15 @@ static void *us_server_thread (void *arg) remote_fd = (int *) malloc (sizeof (int)); if (remote_fd == NULL) { - syslog (LOG_WARNING, "unixsock plugin: malloc failed: %s", - strerror (errno)); + char errbuf[1024]; + WARNING ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (status); continue; } *remote_fd = status; - DBG ("Spawning child to handle connection on fd #%i", *remote_fd); + DEBUG ("Spawning child to handle connection on fd #%i", *remote_fd); pthread_attr_init (&th_attr); pthread_attr_setdetachstate (&th_attr, PTHREAD_CREATE_DETACHED); @@ -600,13 +745,26 @@ static void *us_server_thread (void *arg) status = pthread_create (&th, &th_attr, us_handle_client, (void *) remote_fd); if (status != 0) { - syslog (LOG_WARNING, "unixsock plugin: pthread_create failed: %s", - strerror (status)); + char errbuf[1024]; + WARNING ("unixsock plugin: pthread_create failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (*remote_fd); free (remote_fd); continue; } - } /* while (42) */ + } /* while (loop) */ + + close (sock_fd); + sock_fd = -1; + + status = unlink ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH); + if (status != 0) + { + char errbuf[1024]; + NOTICE ("unixsock plugin: unlink (%s) failed: %s", + (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, + sstrerror (errno, errbuf, sizeof (errbuf))); + } return ((void *) 0); } /* void *us_server_thread */ @@ -639,11 +797,14 @@ static int us_init (void) { int status; + loop = 1; + status = pthread_create (&listen_thread, NULL, us_server_thread, NULL); if (status != 0) { - syslog (LOG_ERR, "unixsock plugin: pthread_create failed: %s", - strerror (status)); + char errbuf[1024]; + ERROR ("unixsock plugin: pthread_create failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -654,6 +815,8 @@ static int us_shutdown (void) { void *ret; + loop = 0; + if (listen_thread != (pthread_t) 0) { pthread_kill (listen_thread, SIGTERM); @@ -671,7 +834,7 @@ static int us_shutdown (void) static int us_write (const data_set_t *ds, const value_list_t *vl) { cache_update (ds, vl); - cache_flush (2 * atoi (COLLECTD_STEP)); + cache_flush (2 * interval_g); return (0); }