X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Funixsock.c;h=f8dcc5ba6b5e6c864572101a861cf465ecb8f211;hb=1d3482c65a00342de681e777c74108734ece0eab;hp=e6017ef2584b532e85088f2c82087c50fee8bbed;hpb=b32df633638e1869ecba45290bf502b243092241;p=collectd.git diff --git a/src/unixsock.c b/src/unixsock.c index e6017ef2..f8dcc5ba 100644 --- a/src/unixsock.c +++ b/src/unixsock.c @@ -23,14 +23,14 @@ #include "common.h" #include "plugin.h" #include "configfile.h" -#include "utils_debug.h" +#include "utils_cmd_putval.h" /* Folks without pthread will need to disable this plugin. */ #include #include +#include #include -#include #include @@ -38,7 +38,7 @@ # define UNIX_PATH_MAX sizeof (((struct sockaddr_un *)0)->sun_path) #endif -#define US_DEFAULT_PATH PREFIX"/var/run/"PACKAGE_NAME"-unixsock" +#define US_DEFAULT_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-unixsock" /* * Private data structures @@ -68,13 +68,15 @@ static const char *config_keys[] = }; static int config_keys_num = 3; +static int loop = 0; + /* socket configuration */ static int sock_fd = -1; static char *sock_file = NULL; static char *sock_group = NULL; static int sock_perms = S_IRWXU | S_IRWXG; -static pthread_t listen_thread = -1; +static pthread_t listen_thread = (pthread_t) 0; /* Linked list and auxilliary variables for saving values */ static value_cache_t *cache_head = NULL; @@ -97,65 +99,44 @@ static value_cache_t *cache_search (const char *name) return (vc); } /* value_cache_t *cache_search */ -static int cache_alloc_name (char *ret, int ret_len, - const char *hostname, - const char *plugin, const char *plugin_instance, - const char *type, const char *type_instance) -{ - int status; - - assert (plugin != NULL); - assert (type != NULL); - - if ((plugin_instance == NULL) || (strlen (plugin_instance) == 0)) - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s/%s", - hostname, plugin, type); - else - status = snprintf (ret, ret_len, "%s/%s/%s-%s", - hostname, plugin, type, type_instance); - } - else - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s-%s/%s", - hostname, plugin, plugin_instance, type); - else - status = snprintf (ret, ret_len, "%s/%s-%s/%s-%s", - hostname, plugin, plugin_instance, type, type_instance); - } - - if ((status < 1) || (status >= ret_len)) - return (-1); - return (0); -} /* int cache_alloc_name */ - static int cache_insert (const data_set_t *ds, const value_list_t *vl) { /* We're called from `cache_update' so we don't need to lock the mutex */ value_cache_t *vc; int i; - DBG ("ds->ds_num = %i; vl->values_len = %i;", - ds->ds_num, vl->values_len); + DEBUG ("unixsock plugin: cache_insert: ds->type = %s; ds->ds_num = %i;" + " vl->values_len = %i;", + ds->type, ds->ds_num, vl->values_len); +#if COLLECT_DEBUG assert (ds->ds_num == vl->values_len); +#else + if (ds->ds_num != vl->values_len) + { + ERROR ("unixsock plugin: ds->type = %s: (ds->ds_num = %i) != " + "(vl->values_len = %i)", + ds->type, ds->ds_num, vl->values_len); + return (-1); + } +#endif vc = (value_cache_t *) malloc (sizeof (value_cache_t)); if (vc == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: malloc failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } vc->gauge = (gauge_t *) malloc (sizeof (gauge_t) * vl->values_len); if (vc->gauge == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: malloc failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); free (vc); return (-1); } @@ -163,20 +144,19 @@ static int cache_insert (const data_set_t *ds, const value_list_t *vl) vc->counter = (counter_t *) malloc (sizeof (counter_t) * vl->values_len); if (vc->counter == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: malloc failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); free (vc->gauge); free (vc); return (-1); } - if (cache_alloc_name (vc->name, sizeof (vc->name), - vl->host, vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (vc->name, sizeof (vc->name), vl, ds)) { pthread_mutex_unlock (&cache_lock); - syslog (LOG_ERR, "unixsock plugin: cache_alloc_name failed."); + ERROR ("unixsock plugin: FORMAT_VL failed."); free (vc->counter); free (vc->gauge); free (vc); @@ -221,22 +201,31 @@ static int cache_update (const data_set_t *ds, const value_list_t *vl) value_cache_t *vc; int i; - if (cache_alloc_name (name, sizeof (name), - vl->host, - vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (name, sizeof (name), vl, ds) != 0) return (-1); pthread_mutex_lock (&cache_lock); vc = cache_search (name); + /* pthread_mutex_lock is called by cache_insert. */ if (vc == NULL) return (cache_insert (ds, vl)); assert (vc->values_num == ds->ds_num); assert (vc->values_num == vl->values_len); + /* Avoid floating-point exceptions due to division by zero. */ + if (vc->time >= vl->time) + { + pthread_mutex_unlock (&cache_lock); + ERROR ("unixsock plugin: vc->time >= vl->time. vc->time = %u; " + "vl->time = %u; vl = %s;", + (unsigned int) vc->time, (unsigned int) vl->time, + name); + return (-1); + } /* if (vc->time >= vl->time) */ + /* * Update the values. This is possibly a lot more that you'd expect * because we honor min and max values and handle counter overflows here. @@ -264,20 +253,12 @@ static int cache_update (const data_set_t *ds, const value_list_t *vl) / (vl->time - vc->time); } - DBG ("name = %s; old counter: %llu; new counter: %llu; rate: %lf;", - name, - vc->counter[i], vl->values[i].counter, - vc->gauge[i]); - vc->counter[i] = vl->values[i].counter; } else if (ds->ds[i].type == DS_TYPE_GAUGE) { vc->gauge[i] = vl->values[i].gauge; vc->counter[i] = 0; - - DBG ("name, %s; gauge: %lf;", - name, vc->gauge[i]); } else { @@ -285,9 +266,9 @@ static int cache_update (const data_set_t *ds, const value_list_t *vl) vc->counter[i] = 0; } - if ((vc->gauge[i] == NAN) - || ((ds->ds[i].min != NAN) && (vc->gauge[i] < ds->ds[i].min)) - || ((ds->ds[i].max != NAN) && (vc->gauge[i] > ds->ds[i].max))) + if (isnan (vc->gauge[i]) + || (!isnan (ds->ds[i].min) && (vc->gauge[i] < ds->ds[i].min)) + || (!isnan (ds->ds[i].max) && (vc->gauge[i] > ds->ds[i].max))) vc->gauge[i] = NAN; } /* for i = 0 .. ds->ds_num */ @@ -350,7 +331,7 @@ static void cache_flush (int max_age) } /* while (this != NULL) */ pthread_mutex_unlock (&cache_lock); -} /* int cache_flush */ +} /* void cache_flush */ static int us_open_socket (void) { @@ -360,8 +341,9 @@ static int us_open_socket (void) sock_fd = socket (PF_UNIX, SOCK_STREAM, 0); if (sock_fd < 0) { - syslog (LOG_ERR, "unixsock plugin: socket failed: %s", - strerror (errno)); + char errbuf[1024]; + ERROR ("unixsock plugin: socket failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -371,23 +353,27 @@ static int us_open_socket (void) sizeof (sa.sun_path) - 1); /* unlink (sa.sun_path); */ + DEBUG ("unixsock plugin: socket path = %s", sa.sun_path); + status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa)); if (status != 0) { - DBG ("bind failed: %s; sa.sun_path = %s", - strerror (errno), sa.sun_path); - syslog (LOG_ERR, "unixsock plugin: bind failed: %s", - strerror (errno)); + char errbuf[1024]; + sstrerror (errno, errbuf, sizeof (errbuf)); + ERROR ("unixsock plugin: bind failed: %s", errbuf); close (sock_fd); sock_fd = -1; return (-1); } + chmod (sa.sun_path, sock_perms); + status = listen (sock_fd, 8); if (status != 0) { - syslog (LOG_ERR, "unixsock plugin: listen failed: %s", - strerror (errno)); + char errbuf[1024]; + ERROR ("unixsock plugin: listen failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (sock_fd); sock_fd = -1; return (-1); @@ -395,29 +381,37 @@ static int us_open_socket (void) do { + char *grpname; struct group *g; + struct group sg; + char grbuf[2048]; - errno = 0; - g = getgrnam ((sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME); + grpname = (sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME; + g = NULL; - if (errno != 0) + status = getgrnam_r (grpname, &sg, grbuf, sizeof (grbuf), &g); + if (status != 0) { - syslog (LOG_WARNING, "unixsock plugin: getgrnam (%s) failed: %s", - (sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME, - strerror (errno)); + char errbuf[1024]; + WARNING ("unixsock plugin: getgrnam_r (%s) failed: %s", grpname, + sstrerror (errno, errbuf, sizeof (errbuf))); break; } - if (g == NULL) + { + WARNING ("unixsock plugin: No such group: `%s'", + grpname); break; + } if (chown ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH, (uid_t) -1, g->gr_gid) != 0) { - syslog (LOG_WARNING, "unixsock plugin: chown (%s, -1, %i) failed: %s", + char errbuf[1024]; + WARNING ("unixsock plugin: chown (%s, -1, %i) failed: %s", (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, (int) g->gr_gid, - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); } } while (0); @@ -426,7 +420,7 @@ static int us_open_socket (void) static int us_handle_getval (FILE *fh, char **fields, int fields_num) { - char *hostname = fields[1]; + char *hostname; char *plugin; char *plugin_instance; char *type; @@ -437,55 +431,52 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) int i; if (fields_num != 2) - return (-1); - - plugin = strchr (hostname, '/'); - if (plugin == NULL) - return (-1); - *plugin = '\0'; plugin++; - - type = strchr (plugin, '/'); - if (type == NULL) - return (-1); - *type = '\0'; type++; - - plugin_instance = strchr (plugin, '-'); - if (plugin_instance != NULL) { - *plugin_instance = '\0'; - plugin_instance++; + DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n", + fields_num); + fflush (fh); + return (-1); } + DEBUG ("unixsock plugin: Got query for `%s'", fields[1]); - type_instance = strchr (type, '-'); - if (type_instance != NULL) + status = parse_identifier (fields[1], &hostname, + &plugin, &plugin_instance, + &type, &type_instance); + if (status != 0) { - *type_instance = '\0'; - type_instance++; + DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); + fprintf (fh, "-1 Cannot parse identifier.\n"); + fflush (fh); + return (-1); } - status = cache_alloc_name (name, sizeof (name), + status = format_name (name, sizeof (name), hostname, plugin, plugin_instance, type, type_instance); if (status != 0) + { + fprintf (fh, "-1 format_name failed.\n"); return (-1); + } pthread_mutex_lock (&cache_lock); - DBG ("vc = cache_search (%s)", name); + DEBUG ("vc = cache_search (%s)", name); vc = cache_search (name); if (vc == NULL) { - DBG ("Did not find cache entry."); + DEBUG ("Did not find cache entry."); fprintf (fh, "-1 No such value"); } else { - DBG ("Found cache entry."); + DEBUG ("Found cache entry."); fprintf (fh, "%i", vc->values_num); for (i = 0; i < vc->values_num; i++) { fprintf (fh, " %s=", vc->ds->ds[i].name); - if (vc->gauge[i] == NAN) + if (isnan (vc->gauge[i])) fprintf (fh, "NaN"); else fprintf (fh, "%12e", vc->gauge[i]); @@ -501,10 +492,60 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) return (0); } /* int us_handle_getval */ +static int us_handle_listval (FILE *fh, char **fields, int fields_num) +{ + char buffer[1024]; + char **value_list = NULL; + int value_list_len = 0; + value_cache_t *entry; + int i; + + if (fields_num != 1) + { + DEBUG ("unixsock plugin: us_handle_listval: " + "Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 1.\n", + fields_num); + fflush (fh); + return (-1); + } + + pthread_mutex_lock (&cache_lock); + + for (entry = cache_head; entry != NULL; entry = entry->next) + { + char **tmp; + + snprintf (buffer, sizeof (buffer), "%u %s\n", + (unsigned int) entry->time, entry->name); + buffer[sizeof (buffer) - 1] = '\0'; + + tmp = realloc (value_list, sizeof (char *) * (value_list_len + 1)); + if (tmp == NULL) + continue; + value_list = tmp; + + value_list[value_list_len] = strdup (buffer); + + if (value_list[value_list_len] != NULL) + value_list_len++; + } /* for (entry) */ + + pthread_mutex_unlock (&cache_lock); + + DEBUG ("unixsock plugin: us_handle_listval: value_list_len = %i", value_list_len); + fprintf (fh, "%i Values found\n", value_list_len); + for (i = 0; i < value_list_len; i++) + fputs (value_list[i], fh); + fflush (fh); + + return (0); +} /* int us_handle_listval */ + static void *us_handle_client (void *arg) { int fd; - FILE *fh; + FILE *fhin, *fhout; char buffer[1024]; char *fields[128]; int fields_num; @@ -513,18 +554,29 @@ static void *us_handle_client (void *arg) free (arg); arg = NULL; - DBG ("Reading from fd #%i", fd); + DEBUG ("Reading from fd #%i", fd); - fh = fdopen (fd, "r+"); - if (fh == NULL) + fhin = fdopen (fd, "r"); + if (fhin == NULL) { - syslog (LOG_ERR, "unixsock plugin: fdopen failed: %s", - strerror (errno)); + char errbuf[1024]; + ERROR ("unixsock plugin: fdopen failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (fd); pthread_exit ((void *) 1); } - while (fgets (buffer, sizeof (buffer), fh) != NULL) + fhout = fdopen (fd, "w"); + if (fhout == NULL) + { + char errbuf[1024]; + ERROR ("unixsock plugin: fdopen failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + fclose (fhin); /* this closes fd as well */ + pthread_exit ((void *) 1); + } + + while (fgets (buffer, sizeof (buffer), fhin) != NULL) { int len; @@ -536,7 +588,7 @@ static void *us_handle_client (void *arg) if (len == 0) continue; - DBG ("fgets -> buffer = %s; len = %i;", buffer, len); + DEBUG ("fgets -> buffer = %s; len = %i;", buffer, len); fields_num = strsplit (buffer, fields, sizeof (fields) / sizeof (fields[0])); @@ -549,17 +601,26 @@ static void *us_handle_client (void *arg) if (strcasecmp (fields[0], "getval") == 0) { - us_handle_getval (fh, fields, fields_num); + us_handle_getval (fhout, fields, fields_num); + } + else if (strcasecmp (fields[0], "putval") == 0) + { + handle_putval (fhout, fields, fields_num); + } + else if (strcasecmp (fields[0], "listval") == 0) + { + us_handle_listval (fhout, fields, fields_num); } else { - fprintf (fh, "Unknown command: %s\n", fields[0]); - fflush (fh); + fprintf (fhout, "-1 Unknown command: %s\n", fields[0]); + fflush (fhout); } } /* while (fgets) */ - DBG ("Exiting.."); - close (fd); + DEBUG ("Exiting.."); + fclose (fhin); + fclose (fhout); pthread_exit ((void *) 0); } /* void *us_handle_client */ @@ -569,21 +630,24 @@ static void *us_server_thread (void *arg) int status; int *remote_fd; pthread_t th; + pthread_attr_t th_attr; if (us_open_socket () != 0) pthread_exit ((void *) 1); - while (42) + while (loop != 0) { - DBG ("Calling accept.."); + DEBUG ("unixsock plugin: Calling accept.."); status = accept (sock_fd, NULL, NULL); if (status < 0) { + char errbuf[1024]; + if (errno == EINTR) continue; - syslog (LOG_ERR, "unixsock plugin: accept failed: %s", - strerror (errno)); + ERROR ("unixsock plugin: accept failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (sock_fd); sock_fd = -1; pthread_exit ((void *) 1); @@ -592,25 +656,42 @@ static void *us_server_thread (void *arg) remote_fd = (int *) malloc (sizeof (int)); if (remote_fd == NULL) { - syslog (LOG_WARNING, "unixsock plugin: malloc failed: %s", - strerror (errno)); + char errbuf[1024]; + WARNING ("unixsock plugin: malloc failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (status); continue; } *remote_fd = status; - DBG ("Spawning child to handle connection on fd #%i", *remote_fd); + DEBUG ("Spawning child to handle connection on fd #%i", *remote_fd); + + pthread_attr_init (&th_attr); + pthread_attr_setdetachstate (&th_attr, PTHREAD_CREATE_DETACHED); - status = pthread_create (&th, NULL, us_handle_client, (void *) remote_fd); + status = pthread_create (&th, &th_attr, us_handle_client, (void *) remote_fd); if (status != 0) { - syslog (LOG_WARNING, "unixsock plugin: pthread_create failed: %s", - strerror (status)); + char errbuf[1024]; + WARNING ("unixsock plugin: pthread_create failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); close (*remote_fd); free (remote_fd); continue; } - } /* while (42) */ + } /* while (loop) */ + + close (sock_fd); + sock_fd = -1; + + status = unlink ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH); + if (status != 0) + { + char errbuf[1024]; + NOTICE ("unixsock plugin: unlink (%s) failed: %s", + (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, + sstrerror (errno, errbuf, sizeof (errbuf))); + } return ((void *) 0); } /* void *us_server_thread */ @@ -643,11 +724,14 @@ static int us_init (void) { int status; + loop = 1; + status = pthread_create (&listen_thread, NULL, us_server_thread, NULL); if (status != 0) { - syslog (LOG_ERR, "unixsock plugin: pthread_create failed: %s", - strerror (status)); + char errbuf[1024]; + ERROR ("unixsock plugin: pthread_create failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -658,19 +742,26 @@ static int us_shutdown (void) { void *ret; - if (listen_thread >= 0) + loop = 0; + + if (listen_thread != (pthread_t) 0) { pthread_kill (listen_thread, SIGTERM); pthread_join (listen_thread, &ret); + listen_thread = (pthread_t) 0; } + plugin_unregister_init ("unixsock"); + plugin_unregister_write ("unixsock"); + plugin_unregister_shutdown ("unixsock"); + return (0); } /* int us_shutdown */ static int us_write (const data_set_t *ds, const value_list_t *vl) { cache_update (ds, vl); - cache_flush (2 * atoi (COLLECTD_STEP)); + cache_flush (2 * interval_g); return (0); }