53602053f68ff5dbec087cd8c77358423a3f8436
[collectd.git] / src / email.c
1 /**
2  * collectd - src/email.c
3  * Copyright (C) 2006-2008  Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 /*
28  * This plugin communicates with a spam filter, a virus scanner or similar
29  * software using a UNIX socket and a very simple protocol:
30  *
31  * e-mail type (e.g. ham, spam, virus, ...) and size
32  * e:<type>:<bytes>
33  *
34  * spam score
35  * s:<value>
36  *
37  * successful spam checks
38  * c:<type1>[,<type2>,...]
39  */
40
41 #include "collectd.h"
42
43 #include "plugin.h"
44 #include "utils/common/common.h"
45
46 #include <stddef.h>
47
48 #include <sys/select.h>
49 #include <sys/un.h>
50
51 /* some systems (e.g. Darwin) seem to not define UNIX_PATH_MAX at all */
52 #ifndef UNIX_PATH_MAX
53 #define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
54 #endif /* UNIX_PATH_MAX */
55
56 #if HAVE_GRP_H
57 #include <grp.h>
58 #endif /* HAVE_GRP_H */
59
60 #define SOCK_PATH LOCALSTATEDIR "/run/" PACKAGE_NAME "-email"
61 #define MAX_CONNS 5
62 #define MAX_CONNS_LIMIT 16384
63
64 #define log_debug(...) DEBUG("email: "__VA_ARGS__)
65 #define log_err(...) ERROR("email: "__VA_ARGS__)
66 #define log_warn(...) WARNING("email: "__VA_ARGS__)
67
68 /*
69  * Private data structures
70  */
71 /* linked list of email and check types */
72 typedef struct type {
73   char *name;
74   int value;
75   struct type *next;
76 } type_t;
77
78 typedef struct {
79   type_t *head;
80   type_t *tail;
81 } type_list_t;
82
83 /* collector thread control information */
84 typedef struct collector {
85   pthread_t thread;
86
87   /* socket descriptor of the current/last connection */
88   FILE *socket;
89 } collector_t;
90
91 /* linked list of pending connections */
92 typedef struct conn {
93   /* socket to read data from */
94   FILE *socket;
95
96   /* linked list of connections */
97   struct conn *next;
98 } conn_t;
99
100 typedef struct {
101   conn_t *head;
102   conn_t *tail;
103 } conn_list_t;
104
105 /*
106  * Private variables
107  */
108 /* valid configuration file keys */
109 static const char *config_keys[] = {"SocketFile", "SocketGroup", "SocketPerms",
110                                     "MaxConns"};
111 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
112
113 /* socket configuration */
114 static char *sock_file;
115 static char *sock_group;
116 static int sock_perms = S_IRWXU | S_IRWXG;
117 static int max_conns = MAX_CONNS;
118
119 /* state of the plugin */
120 static int disabled;
121
122 /* thread managing "client" connections */
123 static pthread_t connector = (pthread_t)0;
124 static int connector_socket = -1;
125
126 /* tell the collector threads that a new connection is available */
127 static pthread_cond_t conn_available = PTHREAD_COND_INITIALIZER;
128
129 /* connections that are waiting to be processed */
130 static pthread_mutex_t conns_mutex = PTHREAD_MUTEX_INITIALIZER;
131 static conn_list_t conns;
132
133 /* tell the connector thread that a collector is available */
134 static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER;
135
136 /* collector threads */
137 static collector_t **collectors;
138
139 static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER;
140 static int available_collectors;
141
142 static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
143 static type_list_t list_count;
144 static type_list_t list_count_copy;
145
146 static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER;
147 static type_list_t list_size;
148 static type_list_t list_size_copy;
149
150 static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER;
151 static double score;
152 static int score_count;
153
154 static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER;
155 static type_list_t list_check;
156 static type_list_t list_check_copy;
157
158 /*
159  * Private functions
160  */
161 static int email_config(const char *key, const char *value) {
162   if (strcasecmp(key, "SocketFile") == 0) {
163     if (sock_file != NULL)
164       free(sock_file);
165     sock_file = sstrdup(value);
166   } else if (strcasecmp(key, "SocketGroup") == 0) {
167     if (sock_group != NULL)
168       free(sock_group);
169     sock_group = sstrdup(value);
170   } else if (strcasecmp(key, "SocketPerms") == 0) {
171     /* the user is responsible for providing reasonable values */
172     sock_perms = (int)strtol(value, NULL, 8);
173   } else if (strcasecmp(key, "MaxConns") == 0) {
174     long int tmp = strtol(value, NULL, 0);
175
176     if (tmp < 1) {
177       fprintf(stderr,
178               "email plugin: `MaxConns' was set to invalid "
179               "value %li, will use default %i.\n",
180               tmp, MAX_CONNS);
181       ERROR("email plugin: `MaxConns' was set to invalid "
182             "value %li, will use default %i.\n",
183             tmp, MAX_CONNS);
184       max_conns = MAX_CONNS;
185     } else if (tmp > MAX_CONNS_LIMIT) {
186       fprintf(stderr,
187               "email plugin: `MaxConns' was set to invalid "
188               "value %li, will use hardcoded limit %i.\n",
189               tmp, MAX_CONNS_LIMIT);
190       ERROR("email plugin: `MaxConns' was set to invalid "
191             "value %li, will use hardcoded limit %i.\n",
192             tmp, MAX_CONNS_LIMIT);
193       max_conns = MAX_CONNS_LIMIT;
194     } else {
195       max_conns = (int)tmp;
196     }
197   } else {
198     return -1;
199   }
200   return 0;
201 } /* static int email_config (char *, char *) */
202
203 /* Increment the value of the given name in the given list by incr. */
204 static void type_list_incr(type_list_t *list, char *name, int incr) {
205   if (list->head == NULL) {
206     list->head = smalloc(sizeof(*list->head));
207
208     list->head->name = sstrdup(name);
209     list->head->value = incr;
210     list->head->next = NULL;
211
212     list->tail = list->head;
213   } else {
214     type_t *ptr;
215
216     for (ptr = list->head; NULL != ptr; ptr = ptr->next) {
217       if (strcmp(name, ptr->name) == 0)
218         break;
219     }
220
221     if (ptr == NULL) {
222       list->tail->next = smalloc(sizeof(*list->tail->next));
223       list->tail = list->tail->next;
224
225       list->tail->name = sstrdup(name);
226       list->tail->value = incr;
227       list->tail->next = NULL;
228     } else {
229       ptr->value += incr;
230     }
231   }
232   return;
233 } /* static void type_list_incr (type_list_t *, char *) */
234
235 static void *collect(void *arg) {
236   collector_t *this = (collector_t *)arg;
237
238   while (1) {
239     conn_t *connection;
240
241     pthread_mutex_lock(&conns_mutex);
242
243     while (conns.head == NULL) {
244       pthread_cond_wait(&conn_available, &conns_mutex);
245     }
246
247     connection = conns.head;
248     conns.head = conns.head->next;
249
250     if (conns.head == NULL) {
251       conns.tail = NULL;
252     }
253
254     pthread_mutex_unlock(&conns_mutex);
255
256     /* make the socket available to the global
257      * thread and connection management */
258     this->socket = connection->socket;
259
260     log_debug("collect: handling connection on fd #%i", fileno(this->socket));
261
262     while (42) {
263       /* 256 bytes ought to be enough for anybody ;-) */
264       char line[256 + 1]; /* line + '\0' */
265
266       errno = 0;
267       if (fgets(line, sizeof(line), this->socket) == NULL) {
268         if (errno != 0) {
269           log_err("collect: reading from socket (fd #%i) "
270                   "failed: %s",
271                   fileno(this->socket), STRERRNO);
272         }
273         break;
274       }
275
276       size_t len = strlen(line);
277       if ((line[len - 1] != '\n') && (line[len - 1] != '\r')) {
278         log_warn("collect: line too long (> %" PRIsz " characters): "
279                  "'%s' (truncated)",
280                  sizeof(line) - 1, line);
281
282         while (fgets(line, sizeof(line), this->socket) != NULL)
283           if ((line[len - 1] == '\n') || (line[len - 1] == '\r'))
284             break;
285         continue;
286       }
287       if (len < 3) { /* [a-z] ':' '\n' */
288         continue;
289       }
290
291       line[len - 1] = '\0';
292
293       log_debug("collect: line = '%s'", line);
294
295       if (line[1] != ':') {
296         log_err("collect: syntax error in line '%s'", line);
297         continue;
298       }
299
300       if (line[0] == 'e') { /* e:<type>:<bytes> */
301         char *type = line + 2;
302         char *bytes_str = strchr(type, ':');
303         if (bytes_str == NULL) {
304           log_err("collect: syntax error in line '%s'", line);
305           continue;
306         }
307
308         *bytes_str = 0;
309         bytes_str++;
310
311         pthread_mutex_lock(&count_mutex);
312         type_list_incr(&list_count, type, /* increment = */ 1);
313         pthread_mutex_unlock(&count_mutex);
314
315         int bytes = atoi(bytes_str);
316         if (bytes > 0) {
317           pthread_mutex_lock(&size_mutex);
318           type_list_incr(&list_size, type, /* increment = */ bytes);
319           pthread_mutex_unlock(&size_mutex);
320         }
321       } else if (line[0] == 's') { /* s:<value> */
322         pthread_mutex_lock(&score_mutex);
323         score = (score * (double)score_count + atof(line + 2)) /
324                 (double)(score_count + 1);
325         ++score_count;
326         pthread_mutex_unlock(&score_mutex);
327       } else if (line[0] == 'c') { /* c:<type1>[,<type2>,...] */
328         char *dummy = line + 2;
329         char *endptr = NULL;
330         char *type;
331
332         pthread_mutex_lock(&check_mutex);
333         while ((type = strtok_r(dummy, ",", &endptr)) != NULL) {
334           dummy = NULL;
335           type_list_incr(&list_check, type, /* increment = */ 1);
336         }
337         pthread_mutex_unlock(&check_mutex);
338       } else {
339         log_err("collect: unknown type '%c'", line[0]);
340       }
341     } /* while (42) */
342
343     log_debug("Shutting down connection on fd #%i", fileno(this->socket));
344
345     fclose(connection->socket);
346     free(connection);
347
348     this->socket = NULL;
349
350     pthread_mutex_lock(&available_mutex);
351     ++available_collectors;
352     pthread_mutex_unlock(&available_mutex);
353
354     pthread_cond_signal(&collector_available);
355   } /* while (1) */
356
357   pthread_exit((void *)0);
358   return (void *)0;
359 } /* static void *collect (void *) */
360
361 static void *open_connection(void __attribute__((unused)) * arg) {
362   const char *path = (NULL == sock_file) ? SOCK_PATH : sock_file;
363   const char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group;
364
365   /* create UNIX socket */
366   errno = 0;
367   if ((connector_socket = socket(PF_UNIX, SOCK_STREAM, 0)) == -1) {
368     disabled = 1;
369     log_err("socket() failed: %s", STRERRNO);
370     pthread_exit((void *)1);
371   }
372
373   struct sockaddr_un addr = {
374       .sun_family = AF_UNIX,
375   };
376   sstrncpy(addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1));
377
378   errno = 0;
379   if (bind(connector_socket, (struct sockaddr *)&addr,
380            offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) ==
381       -1) {
382     disabled = 1;
383     close(connector_socket);
384     connector_socket = -1;
385     log_err("bind() failed: %s", STRERRNO);
386     pthread_exit((void *)1);
387   }
388
389   errno = 0;
390   if (listen(connector_socket, 5) == -1) {
391     disabled = 1;
392     close(connector_socket);
393     connector_socket = -1;
394     log_err("listen() failed: %s", STRERRNO);
395     pthread_exit((void *)1);
396   }
397
398   {
399     struct group sg;
400     struct group *grp;
401     int status;
402
403     long int grbuf_size = sysconf(_SC_GETGR_R_SIZE_MAX);
404     if (grbuf_size <= 0)
405       grbuf_size = sysconf(_SC_PAGESIZE);
406     if (grbuf_size <= 0)
407       grbuf_size = 4096;
408     char grbuf[grbuf_size];
409
410     grp = NULL;
411     status = getgrnam_r(group, &sg, grbuf, sizeof(grbuf), &grp);
412     if (status != 0) {
413       log_warn("getgrnam_r (%s) failed: %s", group, STRERROR(status));
414     } else if (grp == NULL) {
415       log_warn("No such group: `%s'", group);
416     } else {
417       status = chown(path, (uid_t)-1, grp->gr_gid);
418       if (status != 0) {
419         log_warn("chown (%s, -1, %i) failed: %s", path, (int)grp->gr_gid,
420                  STRERRNO);
421       }
422     }
423   }
424
425   errno = 0;
426   if (chmod(path, sock_perms) != 0) {
427     log_warn("chmod() failed: %s", STRERRNO);
428   }
429
430   { /* initialize collector threads */
431     pthread_attr_t ptattr;
432
433     conns.head = NULL;
434     conns.tail = NULL;
435
436     pthread_attr_init(&ptattr);
437     pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED);
438
439     available_collectors = max_conns;
440
441     collectors = smalloc(max_conns * sizeof(*collectors));
442
443     for (int i = 0; i < max_conns; ++i) {
444       collectors[i] = smalloc(sizeof(*collectors[i]));
445       collectors[i]->socket = NULL;
446
447       if (plugin_thread_create(&collectors[i]->thread, &ptattr, collect,
448                                collectors[i], "email collector") != 0) {
449         log_err("plugin_thread_create() failed: %s", STRERRNO);
450         collectors[i]->thread = (pthread_t)0;
451       }
452     }
453
454     pthread_attr_destroy(&ptattr);
455   }
456
457   while (1) {
458     int remote = 0;
459
460     conn_t *connection;
461
462     pthread_mutex_lock(&available_mutex);
463
464     while (available_collectors == 0) {
465       pthread_cond_wait(&collector_available, &available_mutex);
466     }
467
468     --available_collectors;
469
470     pthread_mutex_unlock(&available_mutex);
471
472     while (42) {
473       errno = 0;
474
475       remote = accept(connector_socket, NULL, NULL);
476       if (remote == -1) {
477         if (errno == EINTR)
478           continue;
479
480         disabled = 1;
481         close(connector_socket);
482         connector_socket = -1;
483         log_err("accept() failed: %s", STRERRNO);
484         pthread_exit((void *)1);
485       }
486
487       /* access() succeeded. */
488       break;
489     }
490
491     connection = calloc(1, sizeof(*connection));
492     if (connection == NULL) {
493       close(remote);
494       continue;
495     }
496
497     connection->socket = fdopen(remote, "r");
498     connection->next = NULL;
499
500     if (connection->socket == NULL) {
501       close(remote);
502       sfree(connection);
503       continue;
504     }
505
506     pthread_mutex_lock(&conns_mutex);
507
508     if (conns.head == NULL) {
509       conns.head = connection;
510       conns.tail = connection;
511     } else {
512       conns.tail->next = connection;
513       conns.tail = conns.tail->next;
514     }
515
516     pthread_mutex_unlock(&conns_mutex);
517
518     pthread_cond_signal(&conn_available);
519   }
520
521   pthread_exit((void *)0);
522   return (void *)0;
523 } /* static void *open_connection (void *) */
524
525 static int email_init(void) {
526   if (plugin_thread_create(&connector, NULL, open_connection, NULL,
527                            "email listener") != 0) {
528     disabled = 1;
529     log_err("plugin_thread_create() failed: %s", STRERRNO);
530     return -1;
531   }
532
533   return 0;
534 } /* int email_init */
535
536 static void type_list_free(type_list_t *t) {
537   type_t *this;
538
539   this = t->head;
540   while (this != NULL) {
541     type_t *next = this->next;
542
543     sfree(this->name);
544     sfree(this);
545
546     this = next;
547   }
548
549   t->head = NULL;
550   t->tail = NULL;
551 }
552
553 static int email_shutdown(void) {
554   if (connector != ((pthread_t)0)) {
555     pthread_kill(connector, SIGTERM);
556     connector = (pthread_t)0;
557   }
558
559   if (connector_socket >= 0) {
560     close(connector_socket);
561     connector_socket = -1;
562   }
563
564   /* don't allow any more connections to be processed */
565   pthread_mutex_lock(&conns_mutex);
566
567   available_collectors = 0;
568
569   if (collectors != NULL) {
570     for (int i = 0; i < max_conns; ++i) {
571       if (collectors[i] == NULL)
572         continue;
573
574       if (collectors[i]->thread != ((pthread_t)0)) {
575         pthread_kill(collectors[i]->thread, SIGTERM);
576         collectors[i]->thread = (pthread_t)0;
577       }
578
579       if (collectors[i]->socket != NULL) {
580         fclose(collectors[i]->socket);
581         collectors[i]->socket = NULL;
582       }
583
584       sfree(collectors[i]);
585     }
586     sfree(collectors);
587   } /* if (collectors != NULL) */
588
589   pthread_mutex_unlock(&conns_mutex);
590
591   type_list_free(&list_count);
592   type_list_free(&list_count_copy);
593   type_list_free(&list_size);
594   type_list_free(&list_size_copy);
595   type_list_free(&list_check);
596   type_list_free(&list_check_copy);
597
598   unlink((sock_file == NULL) ? SOCK_PATH : sock_file);
599
600   sfree(sock_file);
601   sfree(sock_group);
602   return 0;
603 } /* static void email_shutdown (void) */
604
605 static void email_submit(const char *type, const char *type_instance,
606                          gauge_t value) {
607   value_list_t vl = VALUE_LIST_INIT;
608
609   vl.values = &(value_t){.gauge = value};
610   vl.values_len = 1;
611   sstrncpy(vl.plugin, "email", sizeof(vl.plugin));
612   sstrncpy(vl.type, type, sizeof(vl.type));
613   sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
614
615   plugin_dispatch_values(&vl);
616 } /* void email_submit */
617
618 /* Copy list l1 to list l2. l2 may partly exist already, but it is assumed
619  * that neither the order nor the name of any element of either list is
620  * changed and no elements are deleted. The values of l1 are reset to zero
621  * after they have been copied to l2. */
622 static void copy_type_list(type_list_t *l1, type_list_t *l2) {
623   type_t *last = NULL;
624
625   for (type_t *ptr1 = l1->head, *ptr2 = l2->head; ptr1 != NULL;
626        ptr1 = ptr1->next, last = ptr2, ptr2 = ptr2->next) {
627     if (ptr2 == NULL) {
628       ptr2 = smalloc(sizeof(*ptr2));
629       ptr2->name = NULL;
630       ptr2->next = NULL;
631
632       if (last == NULL) {
633         l2->head = ptr2;
634       } else {
635         last->next = ptr2;
636       }
637
638       l2->tail = ptr2;
639     }
640
641     if (ptr2->name == NULL) {
642       ptr2->name = sstrdup(ptr1->name);
643     }
644
645     ptr2->value = ptr1->value;
646     ptr1->value = 0;
647   }
648   return;
649 }
650
651 static int email_read(void) {
652   double score_old;
653   int score_count_old;
654
655   if (disabled)
656     return -1;
657
658   /* email count */
659   pthread_mutex_lock(&count_mutex);
660
661   copy_type_list(&list_count, &list_count_copy);
662
663   pthread_mutex_unlock(&count_mutex);
664
665   for (type_t *ptr = list_count_copy.head; ptr != NULL; ptr = ptr->next) {
666     email_submit("email_count", ptr->name, ptr->value);
667   }
668
669   /* email size */
670   pthread_mutex_lock(&size_mutex);
671
672   copy_type_list(&list_size, &list_size_copy);
673
674   pthread_mutex_unlock(&size_mutex);
675
676   for (type_t *ptr = list_size_copy.head; ptr != NULL; ptr = ptr->next) {
677     email_submit("email_size", ptr->name, ptr->value);
678   }
679
680   /* spam score */
681   pthread_mutex_lock(&score_mutex);
682
683   score_old = score;
684   score_count_old = score_count;
685   score = 0.0;
686   score_count = 0;
687
688   pthread_mutex_unlock(&score_mutex);
689
690   if (score_count_old > 0)
691     email_submit("spam_score", "", score_old);
692
693   /* spam checks */
694   pthread_mutex_lock(&check_mutex);
695
696   copy_type_list(&list_check, &list_check_copy);
697
698   pthread_mutex_unlock(&check_mutex);
699
700   for (type_t *ptr = list_check_copy.head; ptr != NULL; ptr = ptr->next)
701     email_submit("spam_check", ptr->name, ptr->value);
702
703   return 0;
704 } /* int email_read */
705
706 void module_register(void) {
707   plugin_register_config("email", email_config, config_keys, config_keys_num);
708   plugin_register_init("email", email_init);
709   plugin_register_read("email", email_read);
710   plugin_register_shutdown("email", email_shutdown);
711 } /* void module_register */