/**
* collectd - src/unixsock.c
- * Copyright (C) 2007 Florian octo Forster
+ * Copyright (C) 2007,2008 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
#include "common.h"
#include "plugin.h"
#include "configfile.h"
-#include "utils_debug.h"
+
+#include "utils_cmd_flush.h"
+#include "utils_cmd_getval.h"
+#include "utils_cmd_getthreshold.h"
+#include "utils_cmd_listval.h"
+#include "utils_cmd_putval.h"
+#include "utils_cmd_putnotif.h"
/* Folks without pthread will need to disable this plugin. */
#include <pthread.h>
#include <sys/socket.h>
+#include <sys/stat.h>
#include <sys/un.h>
-#include <sys/poll.h>
#include <grp.h>
# define UNIX_PATH_MAX sizeof (((struct sockaddr_un *)0)->sun_path)
#endif
-#define US_DEFAULT_PATH PREFIX"/var/run/"PACKAGE_NAME"-unixsock"
-
-/*
- * Private data structures
- */
-/* linked list of cached values */
-typedef struct value_cache_s
-{
- char name[4*DATA_MAX_NAME_LEN];
- int values_num;
- gauge_t *gauge;
- counter_t *counter;
- const data_set_t *ds;
- time_t time;
- struct value_cache_s *next;
-} value_cache_t;
+#define US_DEFAULT_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-unixsock"
/*
* Private variables
{
"SocketFile",
"SocketGroup",
- "SocketPerms",
- NULL
+ "SocketPerms"
};
-static int config_keys_num = 3;
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static int loop = 0;
/* socket configuration */
static int sock_fd = -1;
static char *sock_group = NULL;
static int sock_perms = S_IRWXU | S_IRWXG;
-static pthread_t listen_thread = -1;
-
-/* Linked list and auxilliary variables for saving values */
-static value_cache_t *cache_head = NULL;
-static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
-static unsigned int cache_oldest = UINT_MAX;
+static pthread_t listen_thread = (pthread_t) 0;
/*
* Functions
*/
-static value_cache_t *cache_search (const char *name)
-{
- value_cache_t *vc;
-
- for (vc = cache_head; vc != NULL; vc = vc->next)
- {
- if (strcmp (vc->name, name) == 0)
- break;
- } /* for vc = cache_head .. NULL */
-
- return (vc);
-} /* value_cache_t *cache_search */
-
-static int cache_alloc_name (char *ret, int ret_len,
- const char *plugin, const char *plugin_instance,
- const char *type, const char *type_instance)
-{
- int status;
-
- assert (plugin != NULL);
- assert (type != NULL);
-
- if ((plugin_instance == NULL) || (strlen (plugin_instance) == 0))
- {
- if ((type_instance == NULL) || (strlen (type_instance) == 0))
- status = snprintf (ret, ret_len, "%s/%s",
- plugin, type);
- else
- status = snprintf (ret, ret_len, "%s/%s-%s",
- plugin, type, type_instance);
- }
- else
- {
- if ((type_instance == NULL) || (strlen (type_instance) == 0))
- status = snprintf (ret, ret_len, "%s-%s/%s",
- plugin, plugin_instance, type);
- else
- status = snprintf (ret, ret_len, "%s-%s/%s-%s",
- plugin, plugin_instance,
- type, type_instance);
- }
-
- if ((status < 1) || (status >= ret_len))
- return (-1);
- return (0);
-} /* int cache_alloc_name */
-
-static int cache_insert (const data_set_t *ds, const value_list_t *vl)
-{
- /* We're called from `cache_update' so we don't need to lock the mutex */
- value_cache_t *vc;
- int i;
-
- DBG ("ds->ds_num = %i; vl->values_len = %i;",
- ds->ds_num, vl->values_len);
- assert (ds->ds_num == vl->values_len);
-
- vc = (value_cache_t *) malloc (sizeof (value_cache_t));
- if (vc == NULL)
- {
- pthread_mutex_unlock (&cache_lock);
- syslog (LOG_ERR, "unixsock plugin: malloc failed: %s",
- strerror (errno));
- return (-1);
- }
-
- vc->gauge = (gauge_t *) malloc (sizeof (gauge_t) * vl->values_len);
- if (vc->gauge == NULL)
- {
- pthread_mutex_unlock (&cache_lock);
- syslog (LOG_ERR, "unixsock plugin: malloc failed: %s",
- strerror (errno));
- free (vc);
- return (-1);
- }
-
- vc->counter = (counter_t *) malloc (sizeof (counter_t) * vl->values_len);
- if (vc->counter == NULL)
- {
- pthread_mutex_unlock (&cache_lock);
- syslog (LOG_ERR, "unixsock plugin: malloc failed: %s",
- strerror (errno));
- free (vc->gauge);
- free (vc);
- return (-1);
- }
-
- if (cache_alloc_name (vc->name, sizeof (vc->name),
- vl->plugin, vl->plugin_instance,
- ds->type, vl->type_instance) != 0)
- {
- pthread_mutex_unlock (&cache_lock);
- syslog (LOG_ERR, "unixsock plugin: cache_alloc_name failed.");
- free (vc->counter);
- free (vc->gauge);
- free (vc);
- return (-1);
- }
-
- for (i = 0; i < ds->ds_num; i++)
- {
- if (ds->ds[i].type == DS_TYPE_COUNTER)
- {
- vc->gauge[i] = 0.0;
- vc->counter[i] = vl->values[i].counter;
- }
- else if (ds->ds[i].type == DS_TYPE_GAUGE)
- {
- vc->gauge[i] = vl->values[i].gauge;
- vc->counter[i] = 0;
- }
- else
- {
- vc->gauge[i] = 0.0;
- vc->counter[i] = 0;
- }
- }
- vc->values_num = ds->ds_num;
- vc->ds = ds;
-
- vc->next = cache_head;
- cache_head = vc;
-
- vc->time = vl->time;
- if (vc->time < cache_oldest)
- cache_oldest = vc->time;
-
- pthread_mutex_unlock (&cache_lock);
- return (0);
-} /* int cache_insert */
-
-static int cache_update (const data_set_t *ds, const value_list_t *vl)
-{
- char name[4*DATA_MAX_NAME_LEN];;
- value_cache_t *vc;
- int i;
-
- if (cache_alloc_name (name, sizeof (name),
- vl->plugin, vl->plugin_instance,
- ds->type, vl->type_instance) != 0)
- return (-1);
-
- pthread_mutex_lock (&cache_lock);
-
- vc = cache_search (name);
-
- if (vc == NULL)
- return (cache_insert (ds, vl));
-
- assert (vc->values_num == ds->ds_num);
- assert (vc->values_num == vl->values_len);
-
- /*
- * Update the values. This is possibly a lot more that you'd expect
- * because we honor min and max values and handle counter overflows here.
- */
- for (i = 0; i < ds->ds_num; i++)
- {
- if (ds->ds[i].type == DS_TYPE_COUNTER)
- {
- if (vl->values[i].counter < vc->counter[i])
- {
- if (vl->values[i].counter <= 4294967295U)
- {
- vc->gauge[i] = ((4294967295U - vl->values[i].counter)
- + vc->counter[i]) / (vl->time - vc->time);
- }
- else
- {
- vc->gauge[i] = ((18446744073709551615ULL - vl->values[i].counter)
- + vc->counter[i]) / (vl->time - vc->time);
- }
- }
- else
- {
- vc->gauge[i] = (vl->values[i].counter - vc->counter[i])
- / (vl->time - vc->time);
- }
-
- DBG ("name = %s; old counter: %llu; new counter: %llu; rate: %lf;",
- name,
- vc->counter[i], vl->values[i].counter,
- vc->gauge[i]);
-
- vc->counter[i] = vl->values[i].counter;
- }
- else if (ds->ds[i].type == DS_TYPE_GAUGE)
- {
- vc->gauge[i] = vl->values[i].gauge;
- vc->counter[i] = 0;
-
- DBG ("name, %s; gauge: %lf;",
- name, vc->gauge[i]);
- }
- else
- {
- vc->gauge[i] = NAN;
- vc->counter[i] = 0;
- }
-
- if ((vc->gauge[i] == NAN)
- || ((ds->ds[i].min != NAN) && (vc->gauge[i] < ds->ds[i].min))
- || ((ds->ds[i].max != NAN) && (vc->gauge[i] > ds->ds[i].max)))
- vc->gauge[i] = NAN;
- } /* for i = 0 .. ds->ds_num */
-
- vc->ds = ds;
- vc->time = vl->time;
-
- if (vc->time < cache_oldest)
- cache_oldest = vc->time;
-
- pthread_mutex_unlock (&cache_lock);
- return (0);
-} /* int cache_update */
-
-static void cache_flush (int max_age)
-{
- value_cache_t *this;
- value_cache_t *prev;
- time_t now;
-
- pthread_mutex_lock (&cache_lock);
-
- now = time (NULL);
-
- if ((now - cache_oldest) <= max_age)
- {
- pthread_mutex_unlock (&cache_lock);
- return;
- }
-
- cache_oldest = now;
-
- prev = NULL;
- this = cache_head;
-
- while (this != NULL)
- {
- if ((now - this->time) <= max_age)
- {
- if (this->time < cache_oldest)
- cache_oldest = this->time;
-
- prev = this;
- this = this->next;
- continue;
- }
-
- if (prev == NULL)
- cache_head = this->next;
- else
- prev->next = this->next;
-
- free (this->gauge);
- free (this->counter);
- free (this);
-
- if (prev == NULL)
- this = cache_head;
- else
- this = prev->next;
- } /* while (this != NULL) */
-
- pthread_mutex_unlock (&cache_lock);
-} /* int cache_flush */
-
static int us_open_socket (void)
{
struct sockaddr_un sa;
sock_fd = socket (PF_UNIX, SOCK_STREAM, 0);
if (sock_fd < 0)
{
- syslog (LOG_ERR, "unixsock plugin: socket failed: %s",
- strerror (errno));
+ char errbuf[1024];
+ ERROR ("unixsock plugin: socket failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
memset (&sa, '\0', sizeof (sa));
sa.sun_family = AF_UNIX;
- strncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
- sizeof (sa.sun_path) - 1);
+ sstrncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
+ sizeof (sa.sun_path));
/* unlink (sa.sun_path); */
+ DEBUG ("unixsock plugin: socket path = %s", sa.sun_path);
+
status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa));
if (status != 0)
{
- DBG ("bind failed: %s; sa.sun_path = %s",
- strerror (errno), sa.sun_path);
- syslog (LOG_ERR, "unixsock plugin: bind failed: %s",
- strerror (errno));
+ char errbuf[1024];
+ sstrerror (errno, errbuf, sizeof (errbuf));
+ ERROR ("unixsock plugin: bind failed: %s", errbuf);
close (sock_fd);
sock_fd = -1;
return (-1);
}
+ chmod (sa.sun_path, sock_perms);
+
status = listen (sock_fd, 8);
if (status != 0)
{
- syslog (LOG_ERR, "unixsock plugin: listen failed: %s",
- strerror (errno));
+ char errbuf[1024];
+ ERROR ("unixsock plugin: listen failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
close (sock_fd);
sock_fd = -1;
return (-1);
do
{
+ char *grpname;
struct group *g;
+ struct group sg;
+ char grbuf[2048];
- errno = 0;
- g = getgrnam ((sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME);
+ grpname = (sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME;
+ g = NULL;
- if (errno != 0)
+ status = getgrnam_r (grpname, &sg, grbuf, sizeof (grbuf), &g);
+ if (status != 0)
{
- syslog (LOG_WARNING, "unixsock plugin: getgrnam (%s) failed: %s",
- (sock_group != NULL) ? sock_group : COLLECTD_GRP_NAME,
- strerror (errno));
+ char errbuf[1024];
+ WARNING ("unixsock plugin: getgrnam_r (%s) failed: %s", grpname,
+ sstrerror (errno, errbuf, sizeof (errbuf)));
break;
}
-
if (g == NULL)
+ {
+ WARNING ("unixsock plugin: No such group: `%s'",
+ grpname);
break;
+ }
if (chown ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
(uid_t) -1, g->gr_gid) != 0)
{
- syslog (LOG_WARNING, "unixsock plugin: chown (%s, -1, %i) failed: %s",
+ char errbuf[1024];
+ WARNING ("unixsock plugin: chown (%s, -1, %i) failed: %s",
(sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
(int) g->gr_gid,
- strerror (errno));
+ sstrerror (errno, errbuf, sizeof (errbuf)));
}
} while (0);
return (0);
} /* int us_open_socket */
-static int us_handle_getval (FILE *fh, char **fields, int fields_num)
+static void *us_handle_client (void *arg)
{
- char *plugin = fields[1];
- char *plugin_instance;
- char *type;
- char *type_instance;
- char name[4*DATA_MAX_NAME_LEN];
- value_cache_t *vc;
- int status;
- int i;
-
- if (fields_num != 2)
- return (-1);
+ int fdin;
+ int fdout;
+ FILE *fhin, *fhout;
- type = strchr (plugin, '/');
- if (type == NULL)
- return (-1);
- *type = '\0'; type++;
+ fdin = *((int *) arg);
+ free (arg);
+ arg = NULL;
- plugin_instance = strchr (plugin, '-');
- if (plugin_instance != NULL)
- {
- *plugin_instance = '\0';
- plugin_instance++;
- }
+ DEBUG ("unixsock plugin: us_handle_client: Reading from fd #%i", fdin);
- type_instance = strchr (type, '-');
- if (type_instance != NULL)
+ fdout = dup (fdin);
+ if (fdout < 0)
{
- *type_instance = '\0';
- type_instance++;
+ char errbuf[1024];
+ ERROR ("unixsock plugin: dup failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ close (fdin);
+ pthread_exit ((void *) 1);
}
- status = cache_alloc_name (name, sizeof (name),
- plugin, plugin_instance, type, type_instance);
- if (status != 0)
- return (-1);
-
- pthread_mutex_lock (&cache_lock);
-
- DBG ("vc = cache_search (%s)", name);
- vc = cache_search (name);
-
- if (vc == NULL)
+ fhin = fdopen (fdin, "r");
+ if (fhin == NULL)
{
- DBG ("Did not find cache entry.");
- fprintf (fh, "-1 No such value");
+ char errbuf[1024];
+ ERROR ("unixsock plugin: fdopen failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ close (fdin);
+ close (fdout);
+ pthread_exit ((void *) 1);
}
- else
+
+ fhout = fdopen (fdout, "w");
+ if (fhout == NULL)
{
- DBG ("Found cache entry.");
- fprintf (fh, "%i", vc->values_num);
- for (i = 0; i < vc->values_num; i++)
- {
- fprintf (fh, " %s=", vc->ds->ds[i].name);
- if (vc->gauge[i] == NAN)
- fprintf (fh, "NaN");
- else
- fprintf (fh, "%12e", vc->gauge[i]);
- }
+ char errbuf[1024];
+ ERROR ("unixsock plugin: fdopen failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ fclose (fhin); /* this closes fdin as well */
+ close (fdout);
+ pthread_exit ((void *) 1);
}
- /* Free the mutex as soon as possible and definitely before flushing */
- pthread_mutex_unlock (&cache_lock);
-
- fprintf (fh, "\n");
- fflush (fh);
-
- return (0);
-} /* int us_handle_getval */
-
-static void *us_handle_client (void *arg)
-{
- int fd;
- FILE *fh;
- char buffer[1024];
- char *fields[128];
- int fields_num;
-
- fd = *((int *) arg);
- free (arg);
- arg = NULL;
-
- DBG ("Reading from fd #%i", fd);
-
- fh = fdopen (fd, "r+");
- if (fh == NULL)
+ /* change output buffer to line buffered mode */
+ if (setvbuf (fhout, NULL, _IOLBF, 0) != 0)
{
- syslog (LOG_ERR, "unixsock plugin: fdopen failed: %s",
- strerror (errno));
- close (fd);
+ char errbuf[1024];
+ ERROR ("unixsock plugin: setvbuf failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ fclose (fhin);
+ fclose (fhout);
pthread_exit ((void *) 1);
}
- while (fgets (buffer, sizeof (buffer), fh) != NULL)
+ while (42)
{
- int len;
+ char buffer[1024];
+ char buffer_copy[1024];
+ char *fields[128];
+ int fields_num;
+ int len;
+
+ errno = 0;
+ if (fgets (buffer, sizeof (buffer), fhin) == NULL)
+ {
+ if (errno != 0)
+ {
+ char errbuf[1024];
+ WARNING ("unixsock plugin: failed to read from socket #%i: %s",
+ fileno (fhin),
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ }
+ break;
+ }
len = strlen (buffer);
while ((len > 0)
if (len == 0)
continue;
- DBG ("fgets -> buffer = %s; len = %i;", buffer, len);
+ sstrncpy (buffer_copy, buffer, sizeof (buffer_copy));
- fields_num = strsplit (buffer, fields,
+ fields_num = strsplit (buffer_copy, fields,
sizeof (fields) / sizeof (fields[0]));
-
if (fields_num < 1)
{
- close (fd);
- break;
+ fprintf (fhout, "-1 Internal error\n");
+ fclose (fhin);
+ fclose (fhout);
+ pthread_exit ((void *) 1);
}
if (strcasecmp (fields[0], "getval") == 0)
{
- us_handle_getval (fh, fields, fields_num);
+ handle_getval (fhout, buffer);
+ }
+ else if (strcasecmp (fields[0], "getthreshold") == 0)
+ {
+ handle_getthreshold (fhout, buffer);
+ }
+ else if (strcasecmp (fields[0], "putval") == 0)
+ {
+ handle_putval (fhout, buffer);
+ }
+ else if (strcasecmp (fields[0], "listval") == 0)
+ {
+ handle_listval (fhout, buffer);
+ }
+ else if (strcasecmp (fields[0], "putnotif") == 0)
+ {
+ handle_putnotif (fhout, buffer);
+ }
+ else if (strcasecmp (fields[0], "flush") == 0)
+ {
+ handle_flush (fhout, buffer);
}
else
{
- fprintf (fh, "Unknown command: %s\n", fields[0]);
- fflush (fh);
+ if (fprintf (fhout, "-1 Unknown command: %s\n", fields[0]) < 0)
+ {
+ char errbuf[1024];
+ WARNING ("unixsock plugin: failed to write to socket #%i: %s",
+ fileno (fhout),
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ break;
+ }
}
} /* while (fgets) */
- DBG ("Exiting..");
- close (fd);
+ DEBUG ("unixsock plugin: us_handle_client: Exiting..");
+ fclose (fhin);
+ fclose (fhout);
pthread_exit ((void *) 0);
+ return ((void *) 0);
} /* void *us_handle_client */
-static void *us_server_thread (void *arg)
+static void *us_server_thread (void __attribute__((unused)) *arg)
{
int status;
int *remote_fd;
pthread_t th;
+ pthread_attr_t th_attr;
if (us_open_socket () != 0)
pthread_exit ((void *) 1);
- while (42)
+ while (loop != 0)
{
- DBG ("Calling accept..");
+ DEBUG ("unixsock plugin: Calling accept..");
status = accept (sock_fd, NULL, NULL);
if (status < 0)
{
+ char errbuf[1024];
+
if (errno == EINTR)
continue;
- syslog (LOG_ERR, "unixsock plugin: accept failed: %s",
- strerror (errno));
+ ERROR ("unixsock plugin: accept failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
close (sock_fd);
sock_fd = -1;
pthread_exit ((void *) 1);
remote_fd = (int *) malloc (sizeof (int));
if (remote_fd == NULL)
{
- syslog (LOG_WARNING, "unixsock plugin: malloc failed: %s",
- strerror (errno));
+ char errbuf[1024];
+ WARNING ("unixsock plugin: malloc failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
close (status);
continue;
}
*remote_fd = status;
- DBG ("Spawning child to handle connection on fd #%i", *remote_fd);
+ DEBUG ("Spawning child to handle connection on fd #%i", *remote_fd);
- status = pthread_create (&th, NULL, us_handle_client, (void *) remote_fd);
+ pthread_attr_init (&th_attr);
+ pthread_attr_setdetachstate (&th_attr, PTHREAD_CREATE_DETACHED);
+
+ status = pthread_create (&th, &th_attr, us_handle_client, (void *) remote_fd);
if (status != 0)
{
- syslog (LOG_WARNING, "unixsock plugin: pthread_create failed: %s",
- strerror (status));
+ char errbuf[1024];
+ WARNING ("unixsock plugin: pthread_create failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
close (*remote_fd);
free (remote_fd);
continue;
}
- } /* while (42) */
+ } /* while (loop) */
+
+ close (sock_fd);
+ sock_fd = -1;
+
+ status = unlink ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ NOTICE ("unixsock plugin: unlink (%s) failed: %s",
+ (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ }
return ((void *) 0);
} /* void *us_server_thread */
{
if (strcasecmp (key, "SocketFile") == 0)
{
+ char *new_sock_file = strdup (val);
+ if (new_sock_file == NULL)
+ return (1);
+
sfree (sock_file);
- sock_file = strdup (val);
+ sock_file = new_sock_file;
}
else if (strcasecmp (key, "SocketGroup") == 0)
{
+ char *new_sock_group = strdup (val);
+ if (new_sock_group == NULL)
+ return (1);
+
sfree (sock_group);
- sock_group = strdup (val);
+ sock_group = new_sock_group;
}
else if (strcasecmp (key, "SocketPerms") == 0)
{
static int us_init (void)
{
+ static int have_init = 0;
+
int status;
+ /* Initialize only once. */
+ if (have_init != 0)
+ return (0);
+ have_init = 1;
+
+ loop = 1;
+
status = pthread_create (&listen_thread, NULL, us_server_thread, NULL);
if (status != 0)
{
- syslog (LOG_ERR, "unixsock plugin: pthread_create failed: %s",
- strerror (status));
+ char errbuf[1024];
+ ERROR ("unixsock plugin: pthread_create failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
{
void *ret;
- if (listen_thread >= 0)
+ loop = 0;
+
+ if (listen_thread != (pthread_t) 0)
{
pthread_kill (listen_thread, SIGTERM);
pthread_join (listen_thread, &ret);
+ listen_thread = (pthread_t) 0;
}
- return (0);
-} /* int us_shutdown */
-
-static int us_write (const data_set_t *ds, const value_list_t *vl)
-{
- cache_update (ds, vl);
- cache_flush (2 * atoi (COLLECTD_STEP));
+ plugin_unregister_init ("unixsock");
+ plugin_unregister_shutdown ("unixsock");
return (0);
-}
+} /* int us_shutdown */
void module_register (void)
{
plugin_register_config ("unixsock", us_config,
config_keys, config_keys_num);
plugin_register_init ("unixsock", us_init);
- plugin_register_write ("unixsock", us_write);
plugin_register_shutdown ("unixsock", us_shutdown);
} /* void module_register (void) */