Merge branch 'collectd-5.7'
[collectd.git] / src / email.c
1 /**
2  * collectd - src/email.c
3  * Copyright (C) 2006-2008  Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 /*
28  * This plugin communicates with a spam filter, a virus scanner or similar
29  * software using a UNIX socket and a very simple protocol:
30  *
31  * e-mail type (e.g. ham, spam, virus, ...) and size
32  * e:<type>:<bytes>
33  *
34  * spam score
35  * s:<value>
36  *
37  * successful spam checks
38  * c:<type1>[,<type2>,...]
39  */
40
41 #include "collectd.h"
42
43 #include "common.h"
44 #include "plugin.h"
45
46 #include <stddef.h>
47
48 #include <sys/select.h>
49 #include <sys/un.h>
50
51 /* some systems (e.g. Darwin) seem to not define UNIX_PATH_MAX at all */
52 #ifndef UNIX_PATH_MAX
53 #define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
54 #endif /* UNIX_PATH_MAX */
55
56 #if HAVE_GRP_H
57 #include <grp.h>
58 #endif /* HAVE_GRP_H */
59
60 #define SOCK_PATH LOCALSTATEDIR "/run/" PACKAGE_NAME "-email"
61 #define MAX_CONNS 5
62 #define MAX_CONNS_LIMIT 16384
63
64 #define log_debug(...) DEBUG("email: "__VA_ARGS__)
65 #define log_err(...) ERROR("email: "__VA_ARGS__)
66 #define log_warn(...) WARNING("email: "__VA_ARGS__)
67
68 /*
69  * Private data structures
70  */
71 /* linked list of email and check types */
72 typedef struct type {
73   char *name;
74   int value;
75   struct type *next;
76 } type_t;
77
78 typedef struct {
79   type_t *head;
80   type_t *tail;
81 } type_list_t;
82
83 /* collector thread control information */
84 typedef struct collector {
85   pthread_t thread;
86
87   /* socket descriptor of the current/last connection */
88   FILE *socket;
89 } collector_t;
90
91 /* linked list of pending connections */
92 typedef struct conn {
93   /* socket to read data from */
94   FILE *socket;
95
96   /* linked list of connections */
97   struct conn *next;
98 } conn_t;
99
100 typedef struct {
101   conn_t *head;
102   conn_t *tail;
103 } conn_list_t;
104
105 /*
106  * Private variables
107  */
108 /* valid configuration file keys */
109 static const char *config_keys[] = {"SocketFile", "SocketGroup", "SocketPerms",
110                                     "MaxConns"};
111 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
112
113 /* socket configuration */
114 static char *sock_file = NULL;
115 static char *sock_group = NULL;
116 static int sock_perms = S_IRWXU | S_IRWXG;
117 static int max_conns = MAX_CONNS;
118
119 /* state of the plugin */
120 static int disabled = 0;
121
122 /* thread managing "client" connections */
123 static pthread_t connector = (pthread_t)0;
124 static int connector_socket = -1;
125
126 /* tell the collector threads that a new connection is available */
127 static pthread_cond_t conn_available = PTHREAD_COND_INITIALIZER;
128
129 /* connections that are waiting to be processed */
130 static pthread_mutex_t conns_mutex = PTHREAD_MUTEX_INITIALIZER;
131 static conn_list_t conns;
132
133 /* tell the connector thread that a collector is available */
134 static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER;
135
136 /* collector threads */
137 static collector_t **collectors = NULL;
138
139 static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER;
140 static int available_collectors;
141
142 static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
143 static type_list_t list_count;
144 static type_list_t list_count_copy;
145
146 static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER;
147 static type_list_t list_size;
148 static type_list_t list_size_copy;
149
150 static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER;
151 static double score;
152 static int score_count;
153
154 static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER;
155 static type_list_t list_check;
156 static type_list_t list_check_copy;
157
158 /*
159  * Private functions
160  */
161 static int email_config(const char *key, const char *value) {
162   if (0 == strcasecmp(key, "SocketFile")) {
163     if (NULL != sock_file)
164       free(sock_file);
165     sock_file = sstrdup(value);
166   } else if (0 == strcasecmp(key, "SocketGroup")) {
167     if (NULL != sock_group)
168       free(sock_group);
169     sock_group = sstrdup(value);
170   } else if (0 == strcasecmp(key, "SocketPerms")) {
171     /* the user is responsible for providing reasonable values */
172     sock_perms = (int)strtol(value, NULL, 8);
173   } else if (0 == strcasecmp(key, "MaxConns")) {
174     long int tmp = strtol(value, NULL, 0);
175
176     if (tmp < 1) {
177       fprintf(stderr, "email plugin: `MaxConns' was set to invalid "
178                       "value %li, will use default %i.\n",
179               tmp, MAX_CONNS);
180       ERROR("email plugin: `MaxConns' was set to invalid "
181             "value %li, will use default %i.\n",
182             tmp, MAX_CONNS);
183       max_conns = MAX_CONNS;
184     } else if (tmp > MAX_CONNS_LIMIT) {
185       fprintf(stderr, "email plugin: `MaxConns' was set to invalid "
186                       "value %li, will use hardcoded limit %i.\n",
187               tmp, MAX_CONNS_LIMIT);
188       ERROR("email plugin: `MaxConns' was set to invalid "
189             "value %li, will use hardcoded limit %i.\n",
190             tmp, MAX_CONNS_LIMIT);
191       max_conns = MAX_CONNS_LIMIT;
192     } else {
193       max_conns = (int)tmp;
194     }
195   } else {
196     return -1;
197   }
198   return 0;
199 } /* static int email_config (char *, char *) */
200
201 /* Increment the value of the given name in the given list by incr. */
202 static void type_list_incr(type_list_t *list, char *name, int incr) {
203   if (NULL == list->head) {
204     list->head = smalloc(sizeof(*list->head));
205
206     list->head->name = sstrdup(name);
207     list->head->value = incr;
208     list->head->next = NULL;
209
210     list->tail = list->head;
211   } else {
212     type_t *ptr;
213
214     for (ptr = list->head; NULL != ptr; ptr = ptr->next) {
215       if (0 == strcmp(name, ptr->name))
216         break;
217     }
218
219     if (NULL == ptr) {
220       list->tail->next = smalloc(sizeof(*list->tail->next));
221       list->tail = list->tail->next;
222
223       list->tail->name = sstrdup(name);
224       list->tail->value = incr;
225       list->tail->next = NULL;
226     } else {
227       ptr->value += incr;
228     }
229   }
230   return;
231 } /* static void type_list_incr (type_list_t *, char *) */
232
233 static void *collect(void *arg) {
234   collector_t *this = (collector_t *)arg;
235
236   while (1) {
237     conn_t *connection;
238
239     pthread_mutex_lock(&conns_mutex);
240
241     while (NULL == conns.head) {
242       pthread_cond_wait(&conn_available, &conns_mutex);
243     }
244
245     connection = conns.head;
246     conns.head = conns.head->next;
247
248     if (NULL == conns.head) {
249       conns.tail = NULL;
250     }
251
252     pthread_mutex_unlock(&conns_mutex);
253
254     /* make the socket available to the global
255      * thread and connection management */
256     this->socket = connection->socket;
257
258     log_debug("collect: handling connection on fd #%i", fileno(this->socket));
259
260     while (42) {
261       /* 256 bytes ought to be enough for anybody ;-) */
262       char line[256 + 1]; /* line + '\0' */
263       int len = 0;
264
265       errno = 0;
266       if (NULL == fgets(line, sizeof(line), this->socket)) {
267         if (0 != errno) {
268           char errbuf[1024];
269           log_err("collect: reading from socket (fd #%i) "
270                   "failed: %s",
271                   fileno(this->socket),
272                   sstrerror(errno, errbuf, sizeof(errbuf)));
273         }
274         break;
275       }
276
277       len = strlen(line);
278       if (('\n' != line[len - 1]) && ('\r' != line[len - 1])) {
279         log_warn("collect: line too long (> %zu characters): "
280                  "'%s' (truncated)",
281                  sizeof(line) - 1, line);
282
283         while (NULL != fgets(line, sizeof(line), this->socket))
284           if (('\n' == line[len - 1]) || ('\r' == line[len - 1]))
285             break;
286         continue;
287       }
288       if (len < 3) { /* [a-z] ':' '\n' */
289         continue;
290       }
291
292       line[len - 1] = 0;
293
294       log_debug("collect: line = '%s'", line);
295
296       if (':' != line[1]) {
297         log_err("collect: syntax error in line '%s'", line);
298         continue;
299       }
300
301       if ('e' == line[0]) { /* e:<type>:<bytes> */
302         char *ptr = NULL;
303         char *type = strtok_r(line + 2, ":", &ptr);
304         char *tmp = strtok_r(NULL, ":", &ptr);
305         int bytes = 0;
306
307         if (NULL == tmp) {
308           log_err("collect: syntax error in line '%s'", line);
309           continue;
310         }
311
312         bytes = atoi(tmp);
313
314         pthread_mutex_lock(&count_mutex);
315         type_list_incr(&list_count, type, /* increment = */ 1);
316         pthread_mutex_unlock(&count_mutex);
317
318         if (bytes > 0) {
319           pthread_mutex_lock(&size_mutex);
320           type_list_incr(&list_size, type, /* increment = */ bytes);
321           pthread_mutex_unlock(&size_mutex);
322         }
323       } else if ('s' == line[0]) { /* s:<value> */
324         pthread_mutex_lock(&score_mutex);
325         score = (score * (double)score_count + atof(line + 2)) /
326                 (double)(score_count + 1);
327         ++score_count;
328         pthread_mutex_unlock(&score_mutex);
329       } else if ('c' == line[0]) { /* c:<type1>[,<type2>,...] */
330         char *dummy = line + 2;
331         char *endptr = NULL;
332         char *type;
333
334         pthread_mutex_lock(&check_mutex);
335         while ((type = strtok_r(dummy, ",", &endptr)) != NULL) {
336           dummy = NULL;
337           type_list_incr(&list_check, type, /* increment = */ 1);
338         }
339         pthread_mutex_unlock(&check_mutex);
340       } else {
341         log_err("collect: unknown type '%c'", line[0]);
342       }
343     } /* while (42) */
344
345     log_debug("Shutting down connection on fd #%i", fileno(this->socket));
346
347     fclose(connection->socket);
348     free(connection);
349
350     this->socket = NULL;
351
352     pthread_mutex_lock(&available_mutex);
353     ++available_collectors;
354     pthread_mutex_unlock(&available_mutex);
355
356     pthread_cond_signal(&collector_available);
357   } /* while (1) */
358
359   pthread_exit((void *)0);
360   return ((void *)0);
361 } /* static void *collect (void *) */
362
363 static void *open_connection(void __attribute__((unused)) * arg) {
364   const char *path = (NULL == sock_file) ? SOCK_PATH : sock_file;
365   const char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group;
366
367   /* create UNIX socket */
368   errno = 0;
369   if (-1 == (connector_socket = socket(PF_UNIX, SOCK_STREAM, 0))) {
370     char errbuf[1024];
371     disabled = 1;
372     log_err("socket() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
373     pthread_exit((void *)1);
374   }
375
376   struct sockaddr_un addr = {
377     .sun_family = AF_UNIX
378   };
379   sstrncpy(addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1));
380
381   errno = 0;
382   if (-1 ==
383       bind(connector_socket, (struct sockaddr *)&addr,
384            offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path))) {
385     char errbuf[1024];
386     disabled = 1;
387     close(connector_socket);
388     connector_socket = -1;
389     log_err("bind() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
390     pthread_exit((void *)1);
391   }
392
393   errno = 0;
394   if (-1 == listen(connector_socket, 5)) {
395     char errbuf[1024];
396     disabled = 1;
397     close(connector_socket);
398     connector_socket = -1;
399     log_err("listen() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
400     pthread_exit((void *)1);
401   }
402
403   {
404     struct group sg;
405     struct group *grp;
406     char grbuf[2048];
407     int status;
408
409     grp = NULL;
410     status = getgrnam_r(group, &sg, grbuf, sizeof(grbuf), &grp);
411     if (status != 0) {
412       char errbuf[1024];
413       log_warn("getgrnam_r (%s) failed: %s", group,
414                sstrerror(errno, errbuf, sizeof(errbuf)));
415     } else if (grp == NULL) {
416       log_warn("No such group: `%s'", group);
417     } else {
418       status = chown(path, (uid_t)-1, grp->gr_gid);
419       if (status != 0) {
420         char errbuf[1024];
421         log_warn("chown (%s, -1, %i) failed: %s", path, (int)grp->gr_gid,
422                  sstrerror(errno, errbuf, sizeof(errbuf)));
423       }
424     }
425   }
426
427   errno = 0;
428   if (0 != chmod(path, sock_perms)) {
429     char errbuf[1024];
430     log_warn("chmod() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
431   }
432
433   { /* initialize collector threads */
434     pthread_attr_t ptattr;
435
436     conns.head = NULL;
437     conns.tail = NULL;
438
439     pthread_attr_init(&ptattr);
440     pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED);
441
442     available_collectors = max_conns;
443
444     collectors = smalloc(max_conns * sizeof(*collectors));
445
446     for (int i = 0; i < max_conns; ++i) {
447       collectors[i] = smalloc(sizeof(*collectors[i]));
448       collectors[i]->socket = NULL;
449
450       if (plugin_thread_create(&collectors[i]->thread, &ptattr, collect,
451                                collectors[i], "email collector") != 0) {
452         char errbuf[1024];
453         log_err("plugin_thread_create() failed: %s",
454                 sstrerror(errno, errbuf, sizeof(errbuf)));
455         collectors[i]->thread = (pthread_t)0;
456       }
457     }
458
459     pthread_attr_destroy(&ptattr);
460   }
461
462   while (1) {
463     int remote = 0;
464
465     conn_t *connection;
466
467     pthread_mutex_lock(&available_mutex);
468
469     while (0 == available_collectors) {
470       pthread_cond_wait(&collector_available, &available_mutex);
471     }
472
473     --available_collectors;
474
475     pthread_mutex_unlock(&available_mutex);
476
477     while (42) {
478       errno = 0;
479
480       remote = accept(connector_socket, NULL, NULL);
481       if (remote == -1) {
482         char errbuf[1024];
483
484         if (errno == EINTR)
485           continue;
486
487         disabled = 1;
488         close(connector_socket);
489         connector_socket = -1;
490         log_err("accept() failed: %s",
491                 sstrerror(errno, errbuf, sizeof(errbuf)));
492         pthread_exit((void *)1);
493       }
494
495       /* access() succeeded. */
496       break;
497     }
498
499     connection = calloc(1, sizeof(*connection));
500     if (connection == NULL) {
501       close(remote);
502       continue;
503     }
504
505     connection->socket = fdopen(remote, "r");
506     connection->next = NULL;
507
508     if (NULL == connection->socket) {
509       close(remote);
510       sfree(connection);
511       continue;
512     }
513
514     pthread_mutex_lock(&conns_mutex);
515
516     if (NULL == conns.head) {
517       conns.head = connection;
518       conns.tail = connection;
519     } else {
520       conns.tail->next = connection;
521       conns.tail = conns.tail->next;
522     }
523
524     pthread_mutex_unlock(&conns_mutex);
525
526     pthread_cond_signal(&conn_available);
527   }
528
529   pthread_exit((void *)0);
530   return ((void *)0);
531 } /* static void *open_connection (void *) */
532
533 static int email_init(void) {
534   if (plugin_thread_create(&connector, NULL, open_connection, NULL,
535                            "email listener") != 0) {
536     char errbuf[1024];
537     disabled = 1;
538     log_err("plugin_thread_create() failed: %s",
539             sstrerror(errno, errbuf, sizeof(errbuf)));
540     return (-1);
541   }
542
543   return (0);
544 } /* int email_init */
545
546 static void type_list_free(type_list_t *t) {
547   type_t *this;
548
549   this = t->head;
550   while (this != NULL) {
551     type_t *next = this->next;
552
553     sfree(this->name);
554     sfree(this);
555
556     this = next;
557   }
558
559   t->head = NULL;
560   t->tail = NULL;
561 }
562
563 static int email_shutdown(void) {
564   if (connector != ((pthread_t)0)) {
565     pthread_kill(connector, SIGTERM);
566     connector = (pthread_t)0;
567   }
568
569   if (connector_socket >= 0) {
570     close(connector_socket);
571     connector_socket = -1;
572   }
573
574   /* don't allow any more connections to be processed */
575   pthread_mutex_lock(&conns_mutex);
576
577   available_collectors = 0;
578
579   if (collectors != NULL) {
580     for (int i = 0; i < max_conns; ++i) {
581       if (collectors[i] == NULL)
582         continue;
583
584       if (collectors[i]->thread != ((pthread_t)0)) {
585         pthread_kill(collectors[i]->thread, SIGTERM);
586         collectors[i]->thread = (pthread_t)0;
587       }
588
589       if (collectors[i]->socket != NULL) {
590         fclose(collectors[i]->socket);
591         collectors[i]->socket = NULL;
592       }
593
594       sfree(collectors[i]);
595     }
596     sfree(collectors);
597   } /* if (collectors != NULL) */
598
599   pthread_mutex_unlock(&conns_mutex);
600
601   type_list_free(&list_count);
602   type_list_free(&list_count_copy);
603   type_list_free(&list_size);
604   type_list_free(&list_size_copy);
605   type_list_free(&list_check);
606   type_list_free(&list_check_copy);
607
608   unlink((NULL == sock_file) ? SOCK_PATH : sock_file);
609
610   sfree(sock_file);
611   sfree(sock_group);
612   return (0);
613 } /* static void email_shutdown (void) */
614
615 static void email_submit(const char *type, const char *type_instance,
616                          gauge_t value) {
617   value_list_t vl = VALUE_LIST_INIT;
618
619   vl.values = &(value_t){.gauge = value};
620   vl.values_len = 1;
621   sstrncpy(vl.plugin, "email", sizeof(vl.plugin));
622   sstrncpy(vl.type, type, sizeof(vl.type));
623   sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
624
625   plugin_dispatch_values(&vl);
626 } /* void email_submit */
627
628 /* Copy list l1 to list l2. l2 may partly exist already, but it is assumed
629  * that neither the order nor the name of any element of either list is
630  * changed and no elements are deleted. The values of l1 are reset to zero
631  * after they have been copied to l2. */
632 static void copy_type_list(type_list_t *l1, type_list_t *l2) {
633   type_t *last = NULL;
634
635   for (type_t *ptr1 = l1->head, *ptr2 = l2->head; NULL != ptr1;
636        ptr1 = ptr1->next, last = ptr2, ptr2 = ptr2->next) {
637     if (NULL == ptr2) {
638       ptr2 = smalloc(sizeof(*ptr2));
639       ptr2->name = NULL;
640       ptr2->next = NULL;
641
642       if (NULL == last) {
643         l2->head = ptr2;
644       } else {
645         last->next = ptr2;
646       }
647
648       l2->tail = ptr2;
649     }
650
651     if (NULL == ptr2->name) {
652       ptr2->name = sstrdup(ptr1->name);
653     }
654
655     ptr2->value = ptr1->value;
656     ptr1->value = 0;
657   }
658   return;
659 }
660
661 static int email_read(void) {
662   double score_old;
663   int score_count_old;
664
665   if (disabled)
666     return (-1);
667
668   /* email count */
669   pthread_mutex_lock(&count_mutex);
670
671   copy_type_list(&list_count, &list_count_copy);
672
673   pthread_mutex_unlock(&count_mutex);
674
675   for (type_t *ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) {
676     email_submit("email_count", ptr->name, ptr->value);
677   }
678
679   /* email size */
680   pthread_mutex_lock(&size_mutex);
681
682   copy_type_list(&list_size, &list_size_copy);
683
684   pthread_mutex_unlock(&size_mutex);
685
686   for (type_t *ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) {
687     email_submit("email_size", ptr->name, ptr->value);
688   }
689
690   /* spam score */
691   pthread_mutex_lock(&score_mutex);
692
693   score_old = score;
694   score_count_old = score_count;
695   score = 0.0;
696   score_count = 0;
697
698   pthread_mutex_unlock(&score_mutex);
699
700   if (score_count_old > 0)
701     email_submit("spam_score", "", score_old);
702
703   /* spam checks */
704   pthread_mutex_lock(&check_mutex);
705
706   copy_type_list(&list_check, &list_check_copy);
707
708   pthread_mutex_unlock(&check_mutex);
709
710   for (type_t *ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next)
711     email_submit("spam_check", ptr->name, ptr->value);
712
713   return (0);
714 } /* int email_read */
715
716 void module_register(void) {
717   plugin_register_config("email", email_config, config_keys, config_keys_num);
718   plugin_register_init("email", email_init);
719   plugin_register_read("email", email_read);
720   plugin_register_shutdown("email", email_shutdown);
721 } /* void module_register */