X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Femail.c;h=8fc5509f3e7e5ece33a15015b0ffa40fde6d6ede;hb=de0fdb208de123fe753c5fcf03533833777a5b4a;hp=c78d76146835ab2e55bedac09c8a36f71752e132;hpb=d00449bdc71bb7e97e23fc42579a42855afc9492;p=collectd.git diff --git a/src/email.c b/src/email.c index c78d7614..8fc5509f 100644 --- a/src/email.c +++ b/src/email.c @@ -1,6 +1,6 @@ /** * collectd - src/email.c - * Copyright (C) 2006,2007 Sebastian Harl + * Copyright (C) 2006-2008 Sebastian Harl * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -58,17 +58,13 @@ # include #endif /* HAVE_GRP_H */ -#define MODULE_NAME "email" - -/* 256 bytes ought to be enough for anybody ;-) */ -#define BUFSIZE 256 - #define SOCK_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-email" #define MAX_CONNS 5 #define MAX_CONNS_LIMIT 16384 -#define log_err(...) ERROR (MODULE_NAME": "__VA_ARGS__) -#define log_warn(...) WARNING (MODULE_NAME": "__VA_ARGS__) +#define log_debug(...) DEBUG ("email: "__VA_ARGS__) +#define log_err(...) ERROR ("email: "__VA_ARGS__) +#define log_warn(...) WARNING ("email: "__VA_ARGS__) /* * Private data structures @@ -90,19 +86,15 @@ typedef struct collector { pthread_t thread; /* socket descriptor of the current/last connection */ - int socket; + FILE *socket; } collector_t; /* linked list of pending connections */ typedef struct conn { /* socket to read data from */ - int socket; - - /* buffer to read data to */ - char *buffer; - int idx; /* current write position in buffer */ - int length; /* length of the current line, i.e. index of '\0' */ + FILE *socket; + /* linked list of connections */ struct conn *next; } conn_t; @@ -125,8 +117,8 @@ static const char *config_keys[] = static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); /* socket configuration */ -static char *sock_file = SOCK_PATH; -static char *sock_group = COLLECTD_GRP_NAME; +static char *sock_file = NULL; +static char *sock_group = NULL; static int sock_perms = S_IRWXU | S_IRWXG; static int max_conns = MAX_CONNS; @@ -154,17 +146,20 @@ static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER; static int available_collectors; static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t count; +static type_list_t list_count; +static type_list_t list_count_copy; static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t size; +static type_list_t list_size; +static type_list_t list_size_copy; static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER; static double score; static int score_count; static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER; -static type_list_t check; +static type_list_t list_check; +static type_list_t list_check_copy; /* * Private functions @@ -172,9 +167,13 @@ static type_list_t check; static int email_config (const char *key, const char *value) { if (0 == strcasecmp (key, "SocketFile")) { + if (NULL != sock_file) + free (sock_file); sock_file = sstrdup (value); } else if (0 == strcasecmp (key, "SocketGroup")) { + if (NULL != sock_group) + free (sock_group); sock_group = sstrdup (value); } else if (0 == strcasecmp (key, "SocketPerms")) { @@ -188,12 +187,18 @@ static int email_config (const char *key, const char *value) fprintf (stderr, "email plugin: `MaxConns' was set to invalid " "value %li, will use default %i.\n", tmp, MAX_CONNS); + ERROR ("email plugin: `MaxConns' was set to invalid " + "value %li, will use default %i.\n", + tmp, MAX_CONNS); max_conns = MAX_CONNS; } else if (tmp > MAX_CONNS_LIMIT) { fprintf (stderr, "email plugin: `MaxConns' was set to invalid " "value %li, will use hardcoded limit %i.\n", tmp, MAX_CONNS_LIMIT); + ERROR ("email plugin: `MaxConns' was set to invalid " + "value %li, will use hardcoded limit %i.\n", + tmp, MAX_CONNS_LIMIT); max_conns = MAX_CONNS_LIMIT; } else { @@ -241,140 +246,10 @@ static void type_list_incr (type_list_t *list, char *name, int incr) return; } /* static void type_list_incr (type_list_t *, char *) */ -/* Read a single character from the socket. If an error occurs or end-of-file - * is reached return '\0'. */ -static char read_char (conn_t *src) -{ - char ret = '\0'; - - fd_set fdset; - - FD_ZERO (&fdset); - FD_SET (src->socket, &fdset); - - if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { - char errbuf[1024]; - log_err ("select() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return '\0'; - } - - assert (FD_ISSET (src->socket, &fdset)); - - do { - ssize_t len = 0; - - errno = 0; - if (0 > (len = read (src->socket, (void *)&ret, 1))) { - if (EINTR != errno) { - char errbuf[1024]; - log_err ("read() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return '\0'; - } - } - - if (0 == len) - return '\0'; - } while (EINTR == errno); - return ret; -} /* static char read_char (conn_t *) */ - -/* Read a single line (terminated by '\n') from the the socket. - * - * The return value is zero terminated and does not contain any newline - * characters. - * - * If an error occurs or end-of-file is reached return NULL. - * - * IMPORTANT NOTE: If there is no newline character found in BUFSIZE - * characters of the input stream, the line will will be ignored! By - * definition we should not get any longer input lines, thus this is - * acceptable in this case ;-) */ -static char *read_line (conn_t *src) -{ - int i = 0; - - assert ((BUFSIZE >= src->idx) && (src->idx >= 0)); - assert ((src->idx > src->length) || (src->length == 0)); - - if (src->length > 0) { /* remove old line */ - src->idx -= (src->length + 1); - memmove (src->buffer, src->buffer + src->length + 1, src->idx); - src->length = 0; - } - - for (i = 0; i < src->idx; ++i) { - if ('\n' == src->buffer[i]) - break; - } - - if (i == src->idx) { - fd_set fdset; - - ssize_t len = 0; - - FD_ZERO (&fdset); - FD_SET (src->socket, &fdset); - - if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { - char errbuf[1024]; - log_err ("select() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return NULL; - } - - assert (FD_ISSET (src->socket, &fdset)); - - do { - errno = 0; - if (0 > (len = read (src->socket, - (void *)(src->buffer + src->idx), - BUFSIZE - src->idx))) { - if (EINTR != errno) { - char errbuf[1024]; - log_err ("read() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return NULL; - } - } - - if (0 == len) - return NULL; - } while (EINTR == errno); - - src->idx += len; - - for (i = src->idx - len; i < src->idx; ++i) { - if ('\n' == src->buffer[i]) - break; - } - - if (i == src->idx) { - src->length = 0; - - if (BUFSIZE == src->idx) { /* no space left in buffer */ - while ('\n' != read_char (src)) - /* ignore complete line */; - - src->idx = 0; - } - return read_line (src); - } - } - - src->buffer[i] = '\0'; - src->length = i; - - return src->buffer; -} /* static char *read_line (conn_t *) */ - static void *collect (void *arg) { collector_t *this = (collector_t *)arg; - char *buffer = (char *)smalloc (BUFSIZE); - while (1) { int loop = 1; @@ -393,44 +268,50 @@ static void *collect (void *arg) conns.tail = NULL; } - this->socket = connection->socket; - pthread_mutex_unlock (&conns_mutex); - connection->buffer = buffer; - connection->idx = 0; - connection->length = 0; + /* make the socket available to the global + * thread and connection management */ + this->socket = connection->socket; - { /* put the socket in non-blocking mode */ - int flags = 0; + log_debug ("collect: handling connection on fd #%i", + fileno (this->socket)); - errno = 0; - if (-1 == fcntl (connection->socket, F_GETFL, &flags)) { - char errbuf[1024]; - log_err ("fcntl() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - loop = 0; - } + while (loop) { + /* 256 bytes ought to be enough for anybody ;-) */ + char line[256 + 1]; /* line + '\0' */ + int len = 0; errno = 0; - if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) { - char errbuf[1024]; - log_err ("fcntl() failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); + if (NULL == fgets (line, sizeof (line), this->socket)) { loop = 0; + + if (0 != errno) { + char errbuf[1024]; + log_err ("collect: reading from socket (fd #%i) " + "failed: %s", fileno (this->socket), + sstrerror (errno, errbuf, sizeof (errbuf))); + } + break; } - } - while (loop) { - char *line = read_line (connection); + len = strlen (line); + if (('\n' != line[len - 1]) && ('\r' != line[len - 1])) { + log_warn ("collect: line too long (> %zu characters): " + "'%s' (truncated)", sizeof (line) - 1, line); - if (NULL == line) { - loop = 0; - break; + while (NULL != fgets (line, sizeof (line), this->socket)) + if (('\n' == line[len - 1]) || ('\r' == line[len - 1])) + break; + continue; } + line[len - 1] = '\0'; + + log_debug ("collect: line = '%s'", line); + if (':' != line[1]) { - log_err ("syntax error in line '%s'", line); + log_err ("collect: syntax error in line '%s'", line); continue; } @@ -441,19 +322,19 @@ static void *collect (void *arg) int bytes = 0; if (NULL == tmp) { - log_err ("syntax error in line '%s'", line); + log_err ("collect: syntax error in line '%s'", line); continue; } bytes = atoi (tmp); pthread_mutex_lock (&count_mutex); - type_list_incr (&count, type, 1); + type_list_incr (&list_count, type, 1); pthread_mutex_unlock (&count_mutex); if (bytes > 0) { pthread_mutex_lock (&size_mutex); - type_list_incr (&size, type, bytes); + type_list_incr (&list_size, type, bytes); pthread_mutex_unlock (&size_mutex); } } @@ -470,19 +351,22 @@ static void *collect (void *arg) do { pthread_mutex_lock (&check_mutex); - type_list_incr (&check, type, 1); + type_list_incr (&list_check, type, 1); pthread_mutex_unlock (&check_mutex); } while (NULL != (type = strtok_r (NULL, ",", &ptr))); } else { - log_err ("unknown type '%c'", line[0]); + log_err ("collect: unknown type '%c'", line[0]); } } /* while (loop) */ - close (connection->socket); + log_debug ("Shutting down connection on fd #%i", + fileno (this->socket)); + + fclose (connection->socket); free (connection); - this->socket = -1; + this->socket = NULL; pthread_mutex_lock (&available_mutex); ++available_collectors; @@ -491,14 +375,16 @@ static void *collect (void *arg) pthread_cond_signal (&collector_available); } /* while (1) */ - free (buffer); pthread_exit ((void *)0); } /* static void *collect (void *) */ -static void *open_connection (void *arg) +static void *open_connection (void __attribute__((unused)) *arg) { struct sockaddr_un addr; + char *path = (NULL == sock_file) ? SOCK_PATH : sock_file; + char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group; + /* create UNIX socket */ errno = 0; if (-1 == (connector_socket = socket (PF_UNIX, SOCK_STREAM, 0))) { @@ -510,9 +396,7 @@ static void *open_connection (void *arg) } addr.sun_family = AF_UNIX; - - strncpy (addr.sun_path, sock_file, (size_t)(UNIX_PATH_MAX - 1)); - addr.sun_path[UNIX_PATH_MAX - 1] = '\0'; + sstrncpy (addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1)); errno = 0; if (-1 == bind (connector_socket, (struct sockaddr *)&addr, @@ -520,7 +404,8 @@ static void *open_connection (void *arg) + strlen(addr.sun_path))) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("bind() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); @@ -530,13 +415,13 @@ static void *open_connection (void *arg) if (-1 == listen (connector_socket, 5)) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("listen() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); } - if ((uid_t) 0 == geteuid ()) { struct group sg; struct group *grp; @@ -544,36 +429,32 @@ static void *open_connection (void *arg) int status; grp = NULL; - status = getgrnam_r (sock_group, &sg, grbuf, sizeof (grbuf), &grp); + status = getgrnam_r (group, &sg, grbuf, sizeof (grbuf), &grp); if (status != 0) { char errbuf[1024]; - log_warn ("getgrnam_r (%s) failed: %s", sock_group, + log_warn ("getgrnam_r (%s) failed: %s", group, sstrerror (errno, errbuf, sizeof (errbuf))); } else if (grp == NULL) { - log_warn ("No such group: `%s'", sock_group); + log_warn ("No such group: `%s'", group); } else { - status = chown (sock_file, (uid_t) -1, grp->gr_gid); + status = chown (path, (uid_t) -1, grp->gr_gid); if (status != 0) { char errbuf[1024]; log_warn ("chown (%s, -1, %i) failed: %s", - sock_file, (int) grp->gr_gid, + path, (int) grp->gr_gid, sstrerror (errno, errbuf, sizeof (errbuf))); } } } - else /* geteuid != 0 */ - { - log_warn ("not running as root"); - } errno = 0; - if (0 != chmod (sock_file, sock_perms)) { + if (0 != chmod (path, sock_perms)) { char errbuf[1024]; log_warn ("chmod() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); @@ -598,7 +479,7 @@ static void *open_connection (void *arg) for (i = 0; i < max_conns; ++i) { collectors[i] = (collector_t *)smalloc (sizeof (collector_t)); - collectors[i]->socket = -1; + collectors[i]->socket = NULL; if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr, collect, collectors[i]))) { @@ -633,7 +514,8 @@ static void *open_connection (void *arg) if (EINTR != errno) { char errbuf[1024]; disabled = 1; - connector_socket = -1; /* TODO: close? */ + close (connector_socket); + connector_socket = -1; log_err ("accept() failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); pthread_exit ((void *)1); @@ -643,9 +525,14 @@ static void *open_connection (void *arg) connection = (conn_t *)smalloc (sizeof (conn_t)); - connection->socket = remote; + connection->socket = fdopen (remote, "r"); connection->next = NULL; + if (NULL == connection->socket) { + close (remote); + continue; + } + pthread_mutex_lock (&conns_mutex); if (NULL == conns.head) { @@ -682,6 +569,8 @@ static int email_init (void) static int email_shutdown (void) { + type_t *ptr = NULL; + int i = 0; if (connector != ((pthread_t) 0)) { @@ -697,6 +586,8 @@ static int email_shutdown (void) /* don't allow any more connections to be processed */ pthread_mutex_lock (&conns_mutex); + available_collectors = 0; + if (collectors != NULL) { for (i = 0; i < max_conns; ++i) { if (collectors[i] == NULL) @@ -707,18 +598,52 @@ static int email_shutdown (void) collectors[i]->thread = (pthread_t) 0; } - if (collectors[i]->socket >= 0) { - close (collectors[i]->socket); - collectors[i]->socket = -1; + if (collectors[i]->socket != NULL) { + fclose (collectors[i]->socket); + collectors[i]->socket = NULL; } + + sfree (collectors[i]); } + sfree (collectors); } /* if (collectors != NULL) */ pthread_mutex_unlock (&conns_mutex); - unlink (sock_file); - errno = 0; + for (ptr = list_count.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_size.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_check.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) { + free (ptr->name); + free (ptr); + } + + unlink ((NULL == sock_file) ? SOCK_PATH : sock_file); + sfree (sock_file); + sfree (sock_group); return (0); } /* static void email_shutdown (void) */ @@ -731,12 +656,12 @@ static void email_submit (const char *type, const char *type_instance, gauge_t v vl.values = values; vl.values_len = 1; - vl.time = time (NULL); sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "email", sizeof (vl.plugin)); - strncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); + sstrncpy (vl.type, type, sizeof (vl.type)); + sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); - plugin_dispatch_values (type, &vl); + plugin_dispatch_values (&vl); } /* void email_submit */ /* Copy list l1 to list l2. l2 may partly exist already, but it is assumed @@ -784,47 +709,28 @@ static int email_read (void) double score_old; int score_count_old; - static type_list_t *cnt; - static type_list_t *sz; - static type_list_t *chk; - if (disabled) return (-1); - if (NULL == cnt) { - cnt = (type_list_t *)smalloc (sizeof (type_list_t)); - cnt->head = NULL; - } - - if (NULL == sz) { - sz = (type_list_t *)smalloc (sizeof (type_list_t)); - sz->head = NULL; - } - - if (NULL == chk) { - chk = (type_list_t *)smalloc (sizeof (type_list_t)); - chk->head = NULL; - } - /* email count */ pthread_mutex_lock (&count_mutex); - copy_type_list (&count, cnt); + copy_type_list (&list_count, &list_count_copy); pthread_mutex_unlock (&count_mutex); - for (ptr = cnt->head; NULL != ptr; ptr = ptr->next) { + for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) { email_submit ("email_count", ptr->name, ptr->value); } /* email size */ pthread_mutex_lock (&size_mutex); - copy_type_list (&size, sz); + copy_type_list (&list_size, &list_size_copy); pthread_mutex_unlock (&size_mutex); - for (ptr = sz->head; NULL != ptr; ptr = ptr->next) { + for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) { email_submit ("email_size", ptr->name, ptr->value); } @@ -844,11 +750,11 @@ static int email_read (void) /* spam checks */ pthread_mutex_lock (&check_mutex); - copy_type_list (&check, chk); + copy_type_list (&list_check, &list_check_copy); pthread_mutex_unlock (&check_mutex); - for (ptr = chk->head; NULL != ptr; ptr = ptr->next) + for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) email_submit ("spam_check", ptr->name, ptr->value); return (0);