X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Femail.c;h=9b82e10eee8fb6dc808bd5861bac5e08410bb2bc;hb=089d33f1f455da115ca0c87160df8dbd49286377;hp=eed91f170cc1c16639acf47f084975127132f9cc;hpb=358556cc2c0a8e10a4db94a584d049545ad01cd7;p=collectd.git diff --git a/src/email.c b/src/email.c index eed91f17..9b82e10e 100644 --- a/src/email.c +++ b/src/email.c @@ -1,11 +1,10 @@ /** * collectd - src/email.c - * Copyright (C) 2006,2007 Sebastian Harl + * Copyright (C) 2006-2008 Sebastian Harl * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. + * Free Software Foundation; only version 2 of the License is applicable. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of @@ -40,24 +39,15 @@ #include "configfile.h" +#include + #if HAVE_LIBPTHREAD # include #endif -#if HAVE_SYS_SELECT_H -# include -#endif /* HAVE_SYS_SELECT_H */ - -#if HAVE_SYS_SOCKET_H -# include -#endif /* HAVE_SYS_SOCKET_H */ - -/* *sigh* glibc does not define UNIX_PATH_MAX in sys/un.h ... */ -#if HAVE_LINUX_UN_H -# include -#elif HAVE_SYS_UN_H -# include -#endif /* HAVE_LINUX_UN_H | HAVE_SYS_UN_H */ +#include +#include +#include /* some systems (e.g. Darwin) seem to not define UNIX_PATH_MAX at all */ #ifndef UNIX_PATH_MAX @@ -68,21 +58,13 @@ # include #endif /* HAVE_GRP_H */ -#define MODULE_NAME "email" - -/* 256 bytes ought to be enough for anybody ;-) */ -#define BUFSIZE 256 - -#ifndef COLLECTD_SOCKET_PREFIX -# define COLLECTD_SOCKET_PREFIX "/tmp/.collectd-" -#endif /* COLLECTD_SOCKET_PREFIX */ - -#define SOCK_PATH COLLECTD_SOCKET_PREFIX"email" +#define SOCK_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-email" #define MAX_CONNS 5 #define MAX_CONNS_LIMIT 16384 -#define log_err(...) ERROR (MODULE_NAME": "__VA_ARGS__) -#define log_warn(...) WARNING (MODULE_NAME": "__VA_ARGS__) +#define log_debug(...) DEBUG ("email: "__VA_ARGS__) +#define log_err(...) ERROR ("email: "__VA_ARGS__) +#define log_warn(...) WARNING ("email: "__VA_ARGS__) /* * Private data structures @@ -104,19 +86,15 @@ typedef struct collector { pthread_t thread; /* socket descriptor of the current/last connection */ - int socket; + FILE *socket; } collector_t; /* linked list of pending connections */ typedef struct conn { /* socket to read data from */ - int socket; - - /* buffer to read data to */ - char *buffer; - int idx; /* current write position in buffer */ - int length; /* length of the current line, i.e. index of '\0' */ + FILE *socket; + /* linked list of connections */ struct conn *next; } conn_t; @@ -131,44 +109,16 @@ typedef struct { /* valid configuration file keys */ static const char *config_keys[] = { + "SocketFile", "SocketGroup", "SocketPerms", "MaxConns" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); -static data_source_t gauge_dsrc[1] = -{ - {"value", DS_TYPE_GAUGE, 0.0, NAN} -}; - -static data_set_t email_count_ds = -{ - "email_count", 1, gauge_dsrc -}; - -static data_set_t email_size_ds = -{ - "email_size", 1, gauge_dsrc -}; - -static data_set_t spam_check_ds = -{ - "spam_check", 1, gauge_dsrc -}; - -static data_source_t spam_score_dsrc[1] = -{ - {"score", DS_TYPE_GAUGE, NAN, NAN} -}; - -static data_set_t spam_score_ds = -{ - "spam_score", 1, spam_score_dsrc -}; - /* socket configuration */ -static char *sock_group = COLLECTD_GRP_NAME; +static char *sock_file = NULL; +static char *sock_group = NULL; static int sock_perms = S_IRWXU | S_IRWXG; static int max_conns = MAX_CONNS; @@ -190,30 +140,40 @@ static conn_list_t conns; static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER; /* collector threads */ -static collector_t **collectors; +static collector_t **collectors = NULL; static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER; static int available_collectors; static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t count; +static type_list_t list_count; +static type_list_t list_count_copy; static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t size; +static type_list_t list_size; +static type_list_t list_size_copy; static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER; static double score; static int score_count; static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t check; +static type_list_t list_check; +static type_list_t list_check_copy; /* * Private functions */ static int email_config (const char *key, const char *value) { - if (0 == strcasecmp (key, "SocketGroup")) { + if (0 == strcasecmp (key, "SocketFile")) { + if (NULL != sock_file) + free (sock_file); + sock_file = sstrdup (value); + } + else if (0 == strcasecmp (key, "SocketGroup")) { + if (NULL != sock_group) + free (sock_group); sock_group = sstrdup (value); } else if (0 == strcasecmp (key, "SocketPerms")) { @@ -280,139 +240,10 @@ static void type_list_incr (type_list_t *list, char *name, int incr) return; } /* static void type_list_incr (type_list_t *, char *) */ -/* Read a single character from the socket. If an error occurs or end-of-file - * is reached return '\0'. */ -static char read_char (conn_t *src) -{ - char ret = '\0'; - - fd_set fdset; - - FD_ZERO (&fdset); - FD_SET (src->socket, &fdset); - - if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { - char errbuf[1024]; - log_err ("select() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return '\0'; - } - - assert (FD_ISSET (src->socket, &fdset)); - - do { - ssize_t len = 0; - - errno = 0; - if (0 > (len = read (src->socket, (void *)&ret, 1))) { - if (EINTR != errno) { - char errbuf[1024]; - log_err ("read() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return '\0'; - } - } - - if (0 == len) - return '\0'; - } while (EINTR == errno); - return ret; -} /* static char read_char (conn_t *) */ - -/* Read a single line (terminated by '\n') from the the socket. - * - * The return value is zero terminated and does not contain any newline - * characters. - * - * If an error occurs or end-of-file is reached return NULL. - * - * IMPORTANT NOTE: If there is no newline character found in BUFSIZE - * characters of the input stream, the line will will be ignored! By - * definition we should not get any longer input lines, thus this is - * acceptable in this case ;-) */ -static char *read_line (conn_t *src) -{ - int i = 0; - - assert ((BUFSIZE >= src->idx) && (src->idx >= 0)); - assert ((src->idx > src->length) || (src->length == 0)); - - if (src->length > 0) { /* remove old line */ - src->idx -= (src->length + 1); - memmove (src->buffer, src->buffer + src->length + 1, src->idx); - src->length = 0; - } - - for (i = 0; i < src->idx; ++i) { - if ('\n' == src->buffer[i]) - break; - } - - if (i == src->idx) { - fd_set fdset; - - ssize_t len = 0; - - FD_ZERO (&fdset); - FD_SET (src->socket, &fdset); - - if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { - char errbuf[1024]; - log_err ("select() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return NULL; - } - - assert (FD_ISSET (src->socket, &fdset)); - - do { - errno = 0; - if (0 > (len = read (src->socket, - (void *)(src->buffer + src->idx), - BUFSIZE - src->idx))) { - if (EINTR != errno) { - char errbuf[1024]; - log_err ("read() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return NULL; - } - } - - if (0 == len) - return NULL; - } while (EINTR == errno); - - src->idx += len; - - for (i = src->idx - len; i < src->idx; ++i) { - if ('\n' == src->buffer[i]) - break; - } - - if (i == src->idx) { - src->length = 0; - - if (BUFSIZE == src->idx) { /* no space left in buffer */ - while ('\n' != read_char (src)) - /* ignore complete line */; - - src->idx = 0; - } - return read_line (src); - } - } - - src->buffer[i] = '\0'; - src->length = i; - - return src->buffer; -} /* static char *read_line (conn_t *) */ - static void *collect (void *arg) { collector_t *this = (collector_t *)arg; - - char *buffer = (char *)smalloc (BUFSIZE); + pthread_t self = pthread_self (); while (1) { int loop = 1; @@ -432,44 +263,51 @@ static void *collect (void *arg) conns.tail = NULL; } - this->socket = connection->socket; - pthread_mutex_unlock (&conns_mutex); - connection->buffer = buffer; - connection->idx = 0; - connection->length = 0; + /* make the socket available to the global + * thread and connection management */ + this->socket = connection->socket; - { /* put the socket in non-blocking mode */ - int flags = 0; + log_debug ("[thread #%5lu] handling connection on fd #%i", + self, fileno (this->socket)); - errno = 0; - if (-1 == fcntl (connection->socket, F_GETFL, &flags)) { - char errbuf[1024]; - log_err ("fcntl() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - loop = 0; - } + while (loop) { + /* 256 bytes ought to be enough for anybody ;-) */ + char line[256 + 1]; /* line + '\0' */ + int len = 0; errno = 0; - if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) { - char errbuf[1024]; - log_err ("fcntl() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); + if (NULL == fgets (line, sizeof (line), this->socket)) { loop = 0; + + if (0 != errno) { + char errbuf[1024]; + log_err ("[thread #%5lu] reading from socket (fd #%i) " + "failed: %s", self, fileno (this->socket), + sstrerror (errno, errbuf, sizeof (errbuf))); + } + break; } - } - while (loop) { - char *line = read_line (connection); + len = strlen (line); + if (('\n' != line[len - 1]) && ('\r' != line[len - 1])) { + log_warn ("[thread #%5lu] line too long (> %lu characters): " + "'%s' (truncated)", self, sizeof (line) - 1, line); - if (NULL == line) { - loop = 0; - break; + while (NULL != fgets (line, sizeof (line), this->socket)) + if (('\n' == line[len - 1]) || ('\r' == line[len - 1])) + break; + continue; } + line[len - 1] = '\0'; + + log_debug ("[thread #%5lu] line = '%s'", self, line); + if (':' != line[1]) { - log_err ("syntax error in line '%s'", line); + log_err ("[thread #%5lu] syntax error in line '%s'", + self, line); continue; } @@ -480,19 +318,20 @@ static void *collect (void *arg) int bytes = 0; if (NULL == tmp) { - log_err ("syntax error in line '%s'", line); + log_err ("[thread #%5lu] syntax error in line '%s'", + self, line); continue; } bytes = atoi (tmp); pthread_mutex_lock (&count_mutex); - type_list_incr (&count, type, 1); + type_list_incr (&list_count, type, 1); pthread_mutex_unlock (&count_mutex); if (bytes > 0) { pthread_mutex_lock (&size_mutex); - type_list_incr (&size, type, bytes); + type_list_incr (&list_size, type, bytes); pthread_mutex_unlock (&size_mutex); } } @@ -509,19 +348,22 @@ static void *collect (void *arg) do { pthread_mutex_lock (&check_mutex); - type_list_incr (&check, type, 1); + type_list_incr (&list_check, type, 1); pthread_mutex_unlock (&check_mutex); } while (NULL != (type = strtok_r (NULL, ",", &ptr))); } else { - log_err ("unknown type '%c'", line[0]); + log_err ("[thread #%5lu] unknown type '%c'", self, line[0]); } } /* while (loop) */ - close (connection->socket); + log_debug ("[thread #%5lu] shutting down connection on fd #%i", + pthread_self (), fileno (this->socket)); + + fclose (connection->socket); free (connection); - this->socket = -1; + this->socket = NULL; pthread_mutex_lock (&available_mutex); ++available_collectors; @@ -530,7 +372,6 @@ static void *collect (void *arg) pthread_cond_signal (&collector_available); } /* while (1) */ - free (buffer); pthread_exit ((void *)0); } /* static void *collect (void *) */ @@ -538,6 +379,9 @@ static void *open_connection (void *arg) { struct sockaddr_un addr; + char *path = (NULL == sock_file) ? SOCK_PATH : sock_file; + char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group; + /* create UNIX socket */ errno = 0; if (-1 == (connector_socket = socket (PF_UNIX, SOCK_STREAM, 0))) { @@ -550,8 +394,7 @@ static void *open_connection (void *arg) addr.sun_family = AF_UNIX; - strncpy (addr.sun_path, SOCK_PATH, (size_t)(UNIX_PATH_MAX - 1)); - addr.sun_path[UNIX_PATH_MAX - 1] = '\0'; + sstrncpy (addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1)); unlink (addr.sun_path); errno = 0; @@ -560,7 +403,8 @@ static void *open_connection (void *arg) + strlen(addr.sun_path))) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("bind() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); @@ -570,13 +414,13 @@ static void *open_connection (void *arg) if (-1 == listen (connector_socket, 5)) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("listen() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); } - if ((uid_t) 0 == geteuid ()) { struct group sg; struct group *grp; @@ -584,36 +428,32 @@ static void *open_connection (void *arg) int status; grp = NULL; - status = getgrnam_r (sock_group, &sg, grbuf, sizeof (grbuf), &grp); + status = getgrnam_r (group, &sg, grbuf, sizeof (grbuf), &grp); if (status != 0) { char errbuf[1024]; - log_warn ("getgrnam_r (%s) failed: %s", sock_group, + log_warn ("getgrnam_r (%s) failed: %s", group, sstrerror (errno, errbuf, sizeof (errbuf))); } else if (grp == NULL) { - log_warn ("No such group: `%s'", sock_group); + log_warn ("No such group: `%s'", group); } else { - status = chown (SOCK_PATH, (uid_t) -1, grp->gr_gid); + status = chown (path, (uid_t) -1, grp->gr_gid); if (status != 0) { char errbuf[1024]; log_warn ("chown (%s, -1, %i) failed: %s", - SOCK_PATH, (int) grp->gr_gid, + path, (int) grp->gr_gid, sstrerror (errno, errbuf, sizeof (errbuf))); } } } - else /* geteuid != 0 */ - { - log_warn ("not running as root"); - } errno = 0; - if (0 != chmod (SOCK_PATH, sock_perms)) { + if (0 != chmod (path, sock_perms)) { char errbuf[1024]; log_warn ("chmod() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); @@ -638,7 +478,7 @@ static void *open_connection (void *arg) for (i = 0; i < max_conns; ++i) { collectors[i] = (collector_t *)smalloc (sizeof (collector_t)); - collectors[i]->socket = -1; + collectors[i]->socket = NULL; if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr, collect, collectors[i]))) { @@ -673,7 +513,8 @@ static void *open_connection (void *arg) if (EINTR != errno) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("accept() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); @@ -683,9 +524,14 @@ static void *open_connection (void *arg) connection = (conn_t *)smalloc (sizeof (conn_t)); - connection->socket = remote; + connection->socket = fdopen (remote, "r"); connection->next = NULL; + if (NULL == connection->socket) { + close (remote); + continue; + } + pthread_mutex_lock (&conns_mutex); if (NULL == conns.head) { @@ -722,10 +568,9 @@ static int email_init (void) static int email_shutdown (void) { - int i = 0; + type_t *ptr = NULL; - if (disabled) - return (0); + int i = 0; if (connector != ((pthread_t) 0)) { pthread_kill (connector, SIGTERM); @@ -740,22 +585,64 @@ static int email_shutdown (void) /* don't allow any more connections to be processed */ pthread_mutex_lock (&conns_mutex); - for (i = 0; i < max_conns; ++i) { - if (collectors[i]->thread != ((pthread_t) 0)) { - pthread_kill (collectors[i]->thread, SIGTERM); - collectors[i]->thread = (pthread_t) 0; - } - - if (collectors[i]->socket >= 0) { - close (collectors[i]->socket); - collectors[i]->socket = -1; + available_collectors = 0; + + if (collectors != NULL) { + for (i = 0; i < max_conns; ++i) { + if (collectors[i] == NULL) + continue; + + if (collectors[i]->thread != ((pthread_t) 0)) { + pthread_kill (collectors[i]->thread, SIGTERM); + collectors[i]->thread = (pthread_t) 0; + } + + if (collectors[i]->socket != NULL) { + fclose (collectors[i]->socket); + collectors[i]->socket = NULL; + } + + sfree (collectors[i]); } - } + sfree (collectors); + } /* if (collectors != NULL) */ pthread_mutex_unlock (&conns_mutex); - unlink (SOCK_PATH); + for (ptr = list_count.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_size.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_check.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + unlink ((NULL == sock_file) ? SOCK_PATH : sock_file); + sfree (sock_file); + sfree (sock_group); return (0); } /* static void email_shutdown (void) */ @@ -771,9 +658,10 @@ static void email_submit (const char *type, const char *type_instance, gauge_t v vl.time = time (NULL); strcpy (vl.host, hostname_g); strcpy (vl.plugin, "email"); - strncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); + sstrncpy (vl.type, type, sizeof (vl.type)); + sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); - plugin_dispatch_values (type, &vl); + plugin_dispatch_values (&vl); } /* void email_submit */ /* Copy list l1 to list l2. l2 may partly exist already, but it is assumed @@ -818,92 +706,65 @@ static int email_read (void) { type_t *ptr; - double sc; - - static type_list_t *cnt; - static type_list_t *sz; - static type_list_t *chk; + double score_old; + int score_count_old; if (disabled) return (-1); - if (NULL == cnt) { - cnt = (type_list_t *)smalloc (sizeof (type_list_t)); - cnt->head = NULL; - } - - if (NULL == sz) { - sz = (type_list_t *)smalloc (sizeof (type_list_t)); - sz->head = NULL; - } - - if (NULL == chk) { - chk = (type_list_t *)smalloc (sizeof (type_list_t)); - chk->head = NULL; - } - /* email count */ pthread_mutex_lock (&count_mutex); - copy_type_list (&count, cnt); + copy_type_list (&list_count, &list_count_copy); pthread_mutex_unlock (&count_mutex); - for (ptr = cnt->head; NULL != ptr; ptr = ptr->next) { + for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) { email_submit ("email_count", ptr->name, ptr->value); } /* email size */ pthread_mutex_lock (&size_mutex); - copy_type_list (&size, sz); + copy_type_list (&list_size, &list_size_copy); pthread_mutex_unlock (&size_mutex); - for (ptr = sz->head; NULL != ptr; ptr = ptr->next) { + for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) { email_submit ("email_size", ptr->name, ptr->value); } /* spam score */ pthread_mutex_lock (&score_mutex); - sc = score; + score_old = score; + score_count_old = score_count; score = 0.0; score_count = 0; pthread_mutex_unlock (&score_mutex); - email_submit ("spam_score", "", sc); + if (score_count_old > 0) + email_submit ("spam_score", "", score_old); /* spam checks */ pthread_mutex_lock (&check_mutex); - copy_type_list (&check, chk); + copy_type_list (&list_check, &list_check_copy); pthread_mutex_unlock (&check_mutex); - for (ptr = chk->head; NULL != ptr; ptr = ptr->next) + for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) email_submit ("spam_check", ptr->name, ptr->value); return (0); } /* int email_read */ -void module_register (modreg_e load) +void module_register (void) { - if (load & MR_DATASETS) - { - plugin_register_data_set (&email_count_ds); - plugin_register_data_set (&email_size_ds); - plugin_register_data_set (&spam_check_ds); - plugin_register_data_set (&spam_score_ds); - } - - if (load & MR_READ) - { - plugin_register_config ("email", email_config, config_keys, config_keys_num); - plugin_register_init ("email", email_init); - plugin_register_read ("email", email_read); - } + plugin_register_config ("email", email_config, config_keys, config_keys_num); + plugin_register_init ("email", email_init); + plugin_register_read ("email", email_read); plugin_register_shutdown ("email", email_shutdown); } /* void module_register */