X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Femail.c;h=fbdc785f9374ecdc0826f056a7176f3d1b3a49c1;hb=633c3966f770e4d46651a2fe219a18d8a9907a9f;hp=f5a544ac4ea0a2745eaa2b7648112859c904c67a;hpb=6d43c759c9495118ef3c088fd2d06fd09c4fda8f;p=collectd.git diff --git a/src/email.c b/src/email.c index f5a544ac..d1a87192 100644 --- a/src/email.c +++ b/src/email.c @@ -1,21 +1,26 @@ /** * collectd - src/email.c - * Copyright (C) 2006,2007 Sebastian Harl + * Copyright (C) 2006-2008 Sebastian Harl * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; only version 2 of the License is applicable. + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * - * Author: + * Authors: * Sebastian Harl **/ @@ -39,6 +44,8 @@ #include "configfile.h" +#include + #if HAVE_LIBPTHREAD # include #endif @@ -56,17 +63,13 @@ # include #endif /* HAVE_GRP_H */ -#define MODULE_NAME "email" - -/* 256 bytes ought to be enough for anybody ;-) */ -#define BUFSIZE 256 - #define SOCK_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-email" #define MAX_CONNS 5 #define MAX_CONNS_LIMIT 16384 -#define log_err(...) ERROR (MODULE_NAME": "__VA_ARGS__) -#define log_warn(...) WARNING (MODULE_NAME": "__VA_ARGS__) +#define log_debug(...) DEBUG ("email: "__VA_ARGS__) +#define log_err(...) ERROR ("email: "__VA_ARGS__) +#define log_warn(...) WARNING ("email: "__VA_ARGS__) /* * Private data structures @@ -88,19 +91,15 @@ typedef struct collector { pthread_t thread; /* socket descriptor of the current/last connection */ - int socket; + FILE *socket; } collector_t; /* linked list of pending connections */ typedef struct conn { /* socket to read data from */ - int socket; - - /* buffer to read data to */ - char *buffer; - int idx; /* current write position in buffer */ - int length; /* length of the current line, i.e. index of '\0' */ + FILE *socket; + /* linked list of connections */ struct conn *next; } conn_t; @@ -123,8 +122,8 @@ static const char *config_keys[] = static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); /* socket configuration */ -static char *sock_file = SOCK_PATH; -static char *sock_group = COLLECTD_GRP_NAME; +static char *sock_file = NULL; +static char *sock_group = NULL; static int sock_perms = S_IRWXU | S_IRWXG; static int max_conns = MAX_CONNS; @@ -152,17 +151,20 @@ static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER; static int available_collectors; static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t count; +static type_list_t list_count; +static type_list_t list_count_copy; static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t size; +static type_list_t list_size; +static type_list_t list_size_copy; static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER; static double score; static int score_count; static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t check; +static type_list_t list_check; +static type_list_t list_check_copy; /* * Private functions @@ -170,9 +172,13 @@ static type_list_t check; static int email_config (const char *key, const char *value) { if (0 == strcasecmp (key, "SocketFile")) { + if (NULL != sock_file) + free (sock_file); sock_file = sstrdup (value); } else if (0 == strcasecmp (key, "SocketGroup")) { + if (NULL != sock_group) + free (sock_group); sock_group = sstrdup (value); } else if (0 == strcasecmp (key, "SocketPerms")) { @@ -186,12 +192,18 @@ static int email_config (const char *key, const char *value) fprintf (stderr, "email plugin: `MaxConns' was set to invalid " "value %li, will use default %i.\n", tmp, MAX_CONNS); + ERROR ("email plugin: `MaxConns' was set to invalid " + "value %li, will use default %i.\n", + tmp, MAX_CONNS); max_conns = MAX_CONNS; } else if (tmp > MAX_CONNS_LIMIT) { fprintf (stderr, "email plugin: `MaxConns' was set to invalid " "value %li, will use hardcoded limit %i.\n", tmp, MAX_CONNS_LIMIT); + ERROR ("email plugin: `MaxConns' was set to invalid " + "value %li, will use hardcoded limit %i.\n", + tmp, MAX_CONNS_LIMIT); max_conns = MAX_CONNS_LIMIT; } else { @@ -239,140 +251,10 @@ static void type_list_incr (type_list_t *list, char *name, int incr) return; } /* static void type_list_incr (type_list_t *, char *) */ -/* Read a single character from the socket. If an error occurs or end-of-file - * is reached return '\0'. */ -static char read_char (conn_t *src) -{ - char ret = '\0'; - - fd_set fdset; - - FD_ZERO (&fdset); - FD_SET (src->socket, &fdset); - - if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { - char errbuf[1024]; - log_err ("select() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return '\0'; - } - - assert (FD_ISSET (src->socket, &fdset)); - - do { - ssize_t len = 0; - - errno = 0; - if (0 > (len = read (src->socket, (void *)&ret, 1))) { - if (EINTR != errno) { - char errbuf[1024]; - log_err ("read() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return '\0'; - } - } - - if (0 == len) - return '\0'; - } while (EINTR == errno); - return ret; -} /* static char read_char (conn_t *) */ - -/* Read a single line (terminated by '\n') from the the socket. - * - * The return value is zero terminated and does not contain any newline - * characters. - * - * If an error occurs or end-of-file is reached return NULL. - * - * IMPORTANT NOTE: If there is no newline character found in BUFSIZE - * characters of the input stream, the line will will be ignored! By - * definition we should not get any longer input lines, thus this is - * acceptable in this case ;-) */ -static char *read_line (conn_t *src) -{ - int i = 0; - - assert ((BUFSIZE >= src->idx) && (src->idx >= 0)); - assert ((src->idx > src->length) || (src->length == 0)); - - if (src->length > 0) { /* remove old line */ - src->idx -= (src->length + 1); - memmove (src->buffer, src->buffer + src->length + 1, src->idx); - src->length = 0; - } - - for (i = 0; i < src->idx; ++i) { - if ('\n' == src->buffer[i]) - break; - } - - if (i == src->idx) { - fd_set fdset; - - ssize_t len = 0; - - FD_ZERO (&fdset); - FD_SET (src->socket, &fdset); - - if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { - char errbuf[1024]; - log_err ("select() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return NULL; - } - - assert (FD_ISSET (src->socket, &fdset)); - - do { - errno = 0; - if (0 > (len = read (src->socket, - (void *)(src->buffer + src->idx), - BUFSIZE - src->idx))) { - if (EINTR != errno) { - char errbuf[1024]; - log_err ("read() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return NULL; - } - } - - if (0 == len) - return NULL; - } while (EINTR == errno); - - src->idx += len; - - for (i = src->idx - len; i < src->idx; ++i) { - if ('\n' == src->buffer[i]) - break; - } - - if (i == src->idx) { - src->length = 0; - - if (BUFSIZE == src->idx) { /* no space left in buffer */ - while ('\n' != read_char (src)) - /* ignore complete line */; - - src->idx = 0; - } - return read_line (src); - } - } - - src->buffer[i] = '\0'; - src->length = i; - - return src->buffer; -} /* static char *read_line (conn_t *) */ - static void *collect (void *arg) { collector_t *this = (collector_t *)arg; - char *buffer = (char *)smalloc (BUFSIZE); - while (1) { int loop = 1; @@ -391,44 +273,50 @@ static void *collect (void *arg) conns.tail = NULL; } - this->socket = connection->socket; - pthread_mutex_unlock (&conns_mutex); - connection->buffer = buffer; - connection->idx = 0; - connection->length = 0; + /* make the socket available to the global + * thread and connection management */ + this->socket = connection->socket; - { /* put the socket in non-blocking mode */ - int flags = 0; + log_debug ("collect: handling connection on fd #%i", + fileno (this->socket)); - errno = 0; - if (-1 == fcntl (connection->socket, F_GETFL, &flags)) { - char errbuf[1024]; - log_err ("fcntl() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - loop = 0; - } + while (loop) { + /* 256 bytes ought to be enough for anybody ;-) */ + char line[256 + 1]; /* line + '\0' */ + int len = 0; errno = 0; - if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) { - char errbuf[1024]; - log_err ("fcntl() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); + if (NULL == fgets (line, sizeof (line), this->socket)) { loop = 0; + + if (0 != errno) { + char errbuf[1024]; + log_err ("collect: reading from socket (fd #%i) " + "failed: %s", fileno (this->socket), + sstrerror (errno, errbuf, sizeof (errbuf))); + } + break; } - } - while (loop) { - char *line = read_line (connection); + len = strlen (line); + if (('\n' != line[len - 1]) && ('\r' != line[len - 1])) { + log_warn ("collect: line too long (> %zu characters): " + "'%s' (truncated)", sizeof (line) - 1, line); - if (NULL == line) { - loop = 0; - break; + while (NULL != fgets (line, sizeof (line), this->socket)) + if (('\n' == line[len - 1]) || ('\r' == line[len - 1])) + break; + continue; } + line[len - 1] = '\0'; + + log_debug ("collect: line = '%s'", line); + if (':' != line[1]) { - log_err ("syntax error in line '%s'", line); + log_err ("collect: syntax error in line '%s'", line); continue; } @@ -439,19 +327,19 @@ static void *collect (void *arg) int bytes = 0; if (NULL == tmp) { - log_err ("syntax error in line '%s'", line); + log_err ("collect: syntax error in line '%s'", line); continue; } bytes = atoi (tmp); pthread_mutex_lock (&count_mutex); - type_list_incr (&count, type, 1); + type_list_incr (&list_count, type, 1); pthread_mutex_unlock (&count_mutex); if (bytes > 0) { pthread_mutex_lock (&size_mutex); - type_list_incr (&size, type, bytes); + type_list_incr (&list_size, type, bytes); pthread_mutex_unlock (&size_mutex); } } @@ -468,19 +356,22 @@ static void *collect (void *arg) do { pthread_mutex_lock (&check_mutex); - type_list_incr (&check, type, 1); + type_list_incr (&list_check, type, 1); pthread_mutex_unlock (&check_mutex); } while (NULL != (type = strtok_r (NULL, ",", &ptr))); } else { - log_err ("unknown type '%c'", line[0]); + log_err ("collect: unknown type '%c'", line[0]); } } /* while (loop) */ - close (connection->socket); + log_debug ("Shutting down connection on fd #%i", + fileno (this->socket)); + + fclose (connection->socket); free (connection); - this->socket = -1; + this->socket = NULL; pthread_mutex_lock (&available_mutex); ++available_collectors; @@ -489,14 +380,17 @@ static void *collect (void *arg) pthread_cond_signal (&collector_available); } /* while (1) */ - free (buffer); pthread_exit ((void *)0); + return ((void *) 0); } /* static void *collect (void *) */ -static void *open_connection (void *arg) +static void *open_connection (void __attribute__((unused)) *arg) { struct sockaddr_un addr; + char *path = (NULL == sock_file) ? SOCK_PATH : sock_file; + char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group; + /* create UNIX socket */ errno = 0; if (-1 == (connector_socket = socket (PF_UNIX, SOCK_STREAM, 0))) { @@ -508,10 +402,7 @@ static void *open_connection (void *arg) } addr.sun_family = AF_UNIX; - - strncpy (addr.sun_path, sock_file, (size_t)(UNIX_PATH_MAX - 1)); - addr.sun_path[UNIX_PATH_MAX - 1] = '\0'; - unlink (addr.sun_path); + sstrncpy (addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1)); errno = 0; if (-1 == bind (connector_socket, (struct sockaddr *)&addr, @@ -519,7 +410,8 @@ static void *open_connection (void *arg) + strlen(addr.sun_path))) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("bind() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); @@ -529,13 +421,13 @@ static void *open_connection (void *arg) if (-1 == listen (connector_socket, 5)) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("listen() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); } - if ((uid_t) 0 == geteuid ()) { struct group sg; struct group *grp; @@ -543,36 +435,32 @@ static void *open_connection (void *arg) int status; grp = NULL; - status = getgrnam_r (sock_group, &sg, grbuf, sizeof (grbuf), &grp); + status = getgrnam_r (group, &sg, grbuf, sizeof (grbuf), &grp); if (status != 0) { char errbuf[1024]; - log_warn ("getgrnam_r (%s) failed: %s", sock_group, + log_warn ("getgrnam_r (%s) failed: %s", group, sstrerror (errno, errbuf, sizeof (errbuf))); } else if (grp == NULL) { - log_warn ("No such group: `%s'", sock_group); + log_warn ("No such group: `%s'", group); } else { - status = chown (sock_file, (uid_t) -1, grp->gr_gid); + status = chown (path, (uid_t) -1, grp->gr_gid); if (status != 0) { char errbuf[1024]; log_warn ("chown (%s, -1, %i) failed: %s", - sock_file, (int) grp->gr_gid, + path, (int) grp->gr_gid, sstrerror (errno, errbuf, sizeof (errbuf))); } } } - else /* geteuid != 0 */ - { - log_warn ("not running as root"); - } errno = 0; - if (0 != chmod (sock_file, sock_perms)) { + if (0 != chmod (path, sock_perms)) { char errbuf[1024]; log_warn ("chmod() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); @@ -597,10 +485,10 @@ static void *open_connection (void *arg) for (i = 0; i < max_conns; ++i) { collectors[i] = (collector_t *)smalloc (sizeof (collector_t)); - collectors[i]->socket = -1; + collectors[i]->socket = NULL; - if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr, - collect, collectors[i]))) { + if (0 != (err = plugin_thread_create (&collectors[i]->thread, + &ptattr, collect, collectors[i]))) { char errbuf[1024]; log_err ("pthread_create() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); @@ -632,7 +520,8 @@ static void *open_connection (void *arg) if (EINTR != errno) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("accept() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); @@ -642,9 +531,14 @@ static void *open_connection (void *arg) connection = (conn_t *)smalloc (sizeof (conn_t)); - connection->socket = remote; + connection->socket = fdopen (remote, "r"); connection->next = NULL; + if (NULL == connection->socket) { + close (remote); + continue; + } + pthread_mutex_lock (&conns_mutex); if (NULL == conns.head) { @@ -660,14 +554,16 @@ static void *open_connection (void *arg) pthread_cond_signal (&conn_available); } - pthread_exit ((void *)0); + + pthread_exit ((void *) 0); + return ((void *) 0); } /* static void *open_connection (void *) */ static int email_init (void) { int err = 0; - if (0 != (err = pthread_create (&connector, NULL, + if (0 != (err = plugin_thread_create (&connector, NULL, open_connection, NULL))) { char errbuf[1024]; disabled = 1; @@ -681,6 +577,8 @@ static int email_init (void) static int email_shutdown (void) { + type_t *ptr = NULL; + int i = 0; if (connector != ((pthread_t) 0)) { @@ -696,6 +594,8 @@ static int email_shutdown (void) /* don't allow any more connections to be processed */ pthread_mutex_lock (&conns_mutex); + available_collectors = 0; + if (collectors != NULL) { for (i = 0; i < max_conns; ++i) { if (collectors[i] == NULL) @@ -706,18 +606,52 @@ static int email_shutdown (void) collectors[i]->thread = (pthread_t) 0; } - if (collectors[i]->socket >= 0) { - close (collectors[i]->socket); - collectors[i]->socket = -1; + if (collectors[i]->socket != NULL) { + fclose (collectors[i]->socket); + collectors[i]->socket = NULL; } + + sfree (collectors[i]); } + sfree (collectors); } /* if (collectors != NULL) */ pthread_mutex_unlock (&conns_mutex); - unlink (sock_file); - errno = 0; + for (ptr = list_count.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + for (ptr = list_size.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_check.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + unlink ((NULL == sock_file) ? SOCK_PATH : sock_file); + + sfree (sock_file); + sfree (sock_group); return (0); } /* static void email_shutdown (void) */ @@ -730,12 +664,12 @@ static void email_submit (const char *type, const char *type_instance, gauge_t v vl.values = values; vl.values_len = 1; - vl.time = time (NULL); - strcpy (vl.host, hostname_g); - strcpy (vl.plugin, "email"); - strncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); + sstrncpy (vl.host, hostname_g, sizeof (vl.host)); + sstrncpy (vl.plugin, "email", sizeof (vl.plugin)); + sstrncpy (vl.type, type, sizeof (vl.type)); + sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); - plugin_dispatch_values (type, &vl); + plugin_dispatch_values (&vl); } /* void email_submit */ /* Copy list l1 to list l2. l2 may partly exist already, but it is assumed @@ -783,47 +717,28 @@ static int email_read (void) double score_old; int score_count_old; - static type_list_t *cnt; - static type_list_t *sz; - static type_list_t *chk; - if (disabled) return (-1); - if (NULL == cnt) { - cnt = (type_list_t *)smalloc (sizeof (type_list_t)); - cnt->head = NULL; - } - - if (NULL == sz) { - sz = (type_list_t *)smalloc (sizeof (type_list_t)); - sz->head = NULL; - } - - if (NULL == chk) { - chk = (type_list_t *)smalloc (sizeof (type_list_t)); - chk->head = NULL; - } - /* email count */ pthread_mutex_lock (&count_mutex); - copy_type_list (&count, cnt); + copy_type_list (&list_count, &list_count_copy); pthread_mutex_unlock (&count_mutex); - for (ptr = cnt->head; NULL != ptr; ptr = ptr->next) { + for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) { email_submit ("email_count", ptr->name, ptr->value); } /* email size */ pthread_mutex_lock (&size_mutex); - copy_type_list (&size, sz); + copy_type_list (&list_size, &list_size_copy); pthread_mutex_unlock (&size_mutex); - for (ptr = sz->head; NULL != ptr; ptr = ptr->next) { + for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) { email_submit ("email_size", ptr->name, ptr->value); } @@ -843,11 +758,11 @@ static int email_read (void) /* spam checks */ pthread_mutex_lock (&check_mutex); - copy_type_list (&check, chk); + copy_type_list (&list_check, &list_check_copy); pthread_mutex_unlock (&check_mutex); - for (ptr = chk->head; NULL != ptr; ptr = ptr->next) + for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) email_submit ("spam_check", ptr->name, ptr->value); return (0);