sysevent plugin initial commit
[collectd.git] / src / sysevent.c
1 /**
2  * collectd - src/sysevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32
33 #include <asm/types.h>
34 #include <errno.h>
35 #include <netdb.h>
36 #include <netinet/in.h>
37 #include <pthread.h>
38 #include <regex.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43 #include <yajl/yajl_tree.h>
44
45 #define SYSEVENT_REGEX_MATCHES 1
46
47 /*
48  * Private data types
49  */
50
51 typedef struct {
52   int head;
53   int tail;
54   int maxLen;
55   char **buffer;
56 } circbuf_t;
57
58 struct regexfilterlist_s {
59   char *regex_filter;
60   regex_t regex_filter_obj;
61
62   struct regexfilterlist_s *next;
63 };
64 typedef struct regexfilterlist_s regexfilterlist_t;
65
66 /*
67  * Private variables
68  */
69
70 static int sysevent_thread_loop = 0;
71 static int sysevent_thread_error = 0;
72 static pthread_t sysevent_thread_id;
73 static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
74 static int sock = -1;
75 static circbuf_t ring;
76
77 static char *listen_ip;
78 static char *listen_port;
79 static int listen_buffer_size = 1024;
80 static int buffer_length = 10;
81
82 static regexfilterlist_t *regexfilterlist_head = NULL;
83
84 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
85 static const char *rsyslog_field_keys[4] = {"facility", "severity", "program",
86                                             "processid"};
87
88 /*
89  * Private functions
90  */
91
92 static void *sysevent_thread(void *arg) /* {{{ */
93 {
94   pthread_mutex_lock(&sysevent_lock);
95
96   while (sysevent_thread_loop > 0) {
97     int status = 0;
98
99     pthread_mutex_unlock(&sysevent_lock);
100
101     if (sock == -1)
102       return ((void *)0);
103
104     char buffer[listen_buffer_size];
105     struct sockaddr_storage src_addr;
106     socklen_t src_addr_len = sizeof(src_addr);
107
108     memset(buffer, '\0', listen_buffer_size);
109
110     ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
111                              (struct sockaddr *)&src_addr, &src_addr_len);
112
113     if (count == -1) {
114       ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
115       status = -1;
116     } else if (count >= sizeof(buffer)) {
117       WARNING("sysevent plugin: datagram too large for buffer: truncated");
118     } else {
119       // 1. Acquire lock
120       // 2. Push to buffer if there is room, otherwise raise warning
121
122       pthread_mutex_lock(&sysevent_lock);
123
124       int next = ring.head + 1;
125       if (next >= ring.maxLen)
126         next = 0;
127
128       if (next == ring.tail) {
129         WARNING("sysevent plugin: ring buffer full");
130       } else {
131         DEBUG("sysevent plugin: writing %s", buffer);
132
133         strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
134         ring.head = next;
135       }
136
137       pthread_mutex_unlock(&sysevent_lock);
138     }
139
140     usleep(1000);
141
142     pthread_mutex_lock(&sysevent_lock);
143
144     if (status < 0) {
145       WARNING("sysevent plugin: problem with thread status: %d", status);
146       sysevent_thread_error = 1;
147       break;
148     }
149
150     if (sysevent_thread_loop <= 0)
151       break;
152   } /* while (sysevent_thread_loop > 0) */
153
154   pthread_mutex_unlock(&sysevent_lock);
155
156   // pthread_exit instead of return
157   return ((void *)0);
158 } /* }}} void *sysevent_thread */
159
160 static int start_thread(void) /* {{{ */
161 {
162   int status;
163
164   pthread_mutex_lock(&sysevent_lock);
165
166   if (sysevent_thread_loop != 0) {
167     pthread_mutex_unlock(&sysevent_lock);
168     return (0);
169   }
170
171   sysevent_thread_loop = 1;
172   sysevent_thread_error = 0;
173
174   DEBUG("sysevent plugin: starting thread");
175
176   status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
177                                 sysevent_thread,
178                                 /* arg = */ (void *)0, "sysevent");
179   if (status != 0) {
180     sysevent_thread_loop = 0;
181     ERROR("sysevent plugin: starting thread failed.");
182     pthread_mutex_unlock(&sysevent_lock);
183     return (-1);
184   }
185
186   pthread_mutex_unlock(&sysevent_lock);
187   return (0);
188 } /* }}} int start_thread */
189
190 static int stop_thread(int shutdown) /* {{{ */
191 {
192   int status;
193
194   pthread_mutex_lock(&sysevent_lock);
195
196   if (sysevent_thread_loop == 0) {
197     pthread_mutex_unlock(&sysevent_lock);
198     return (-1);
199   }
200
201   sysevent_thread_loop = 0;
202   pthread_mutex_unlock(&sysevent_lock);
203
204   if (shutdown == 1) {
205     // Since the thread is blocking, calling pthread_join
206     // doesn't actually succeed in stopping it.  It will stick around
207     // until a message is received on the socket (at which
208     // it will realize that "sysevent_thread_loop" is 0 and will
209     // break out of the read loop and be allowed to die).  This is
210     // fine when the process isn't supposed to be exiting, but in
211     // the case of a process shutdown, we don't want to have an
212     // idle thread hanging around.  Calling pthread_cancel here in
213     // the case of a shutdown is just assures that the thread is
214     // gone and that the process has been fully terminated.
215
216     DEBUG("sysevent plugin: Canceling thread for process shutdown");
217
218     status = pthread_cancel(sysevent_thread_id);
219
220     if (status != 0) {
221       ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
222             strerror(errno));
223       status = -1;
224     }
225   } else {
226     status = pthread_join(sysevent_thread_id, /* return = */ NULL);
227     if (status != 0) {
228       ERROR("sysevent plugin: Stopping thread failed.");
229       status = -1;
230     }
231   }
232
233   pthread_mutex_lock(&sysevent_lock);
234   memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
235   sysevent_thread_error = 0;
236   pthread_mutex_unlock(&sysevent_lock);
237
238   DEBUG("sysevent plugin: Finished requesting stop of thread");
239
240   return (status);
241 } /* }}} int stop_thread */
242
243 static int sysevent_init(void) /* {{{ */
244 {
245   ring.head = 0;
246   ring.tail = 0;
247   ring.maxLen = buffer_length;
248   ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
249
250   for (int i = 0; i < buffer_length; i++) {
251     ring.buffer[i] = malloc(listen_buffer_size);
252   }
253
254   if (sock == -1) {
255     const char *hostname = listen_ip;
256     const char *portname = listen_port;
257     struct addrinfo hints;
258     memset(&hints, 0, sizeof(hints));
259     hints.ai_family = AF_UNSPEC;
260     hints.ai_socktype = SOCK_DGRAM;
261     hints.ai_protocol = 0;
262     hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
263     struct addrinfo *res = 0;
264
265     int err = getaddrinfo(hostname, portname, &hints, &res);
266
267     if (err != 0) {
268       ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
269             err);
270       freeaddrinfo(res);
271       return (-1);
272     }
273
274     sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
275     if (sock == -1) {
276       ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
277       freeaddrinfo(res);
278       return (-1);
279     }
280
281     if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
282       ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
283       freeaddrinfo(res);
284       return (-1);
285     }
286
287     freeaddrinfo(res);
288   }
289
290   DEBUG("sysevent plugin: socket created and bound");
291
292   return (start_thread());
293 } /* }}} int sysevent_init */
294
295 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
296 {
297   if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
298       ci->values[1].type != OCONFIG_TYPE_STRING) {
299     ERROR("sysevent plugin: The `%s' config option needs "
300           "two string arguments (ip and port).",
301           ci->key);
302     return (-1);
303   }
304
305   listen_ip = strdup(ci->values[0].value.string);
306   listen_port = strdup(ci->values[1].value.string);
307
308   return (0);
309 }
310
311 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
312 {
313   int tmp = 0;
314
315   if (cf_util_get_int(ci, &tmp) != 0)
316     return (-1);
317   else if ((tmp >= 1024) && (tmp <= 65535))
318     listen_buffer_size = tmp;
319   else {
320     WARNING(
321         "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
322     return (-1);
323   }
324
325   return (0);
326 }
327
328 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
329 {
330   int tmp = 0;
331
332   if (cf_util_get_int(ci, &tmp) != 0)
333     return (-1);
334   else if ((tmp >= 3) && (tmp <= 1024))
335     buffer_length = tmp;
336   else {
337     WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 1024.");
338     return (-1);
339   }
340
341   return (0);
342 }
343
344 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
345 {
346   if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
347     ERROR("sysevent plugin: The `%s' config option needs "
348           "one string argument, a regular expression.",
349           ci->key);
350     return (-1);
351   }
352
353   regexfilterlist_t *rl;
354   char *regexp_str;
355   regex_t regexp;
356   int status;
357
358   regexp_str = strdup(ci->values[0].value.string);
359
360   status = regcomp(&regexp, regexp_str, REG_EXTENDED);
361
362   if (status != 0) {
363     ERROR("sysevent plugin: 'RegexFilter' invalid regular expression: %s",
364           regexp_str);
365     return (-1);
366   }
367
368   rl = malloc(sizeof(*rl));
369   if (rl == NULL) {
370     char errbuf[1024];
371     ERROR("sysevent plugin: malloc failed during "
372           "sysevent_config_add_regex_filter: %s",
373           sstrerror(errno, errbuf, sizeof(errbuf)));
374     return (-1);
375   }
376
377   rl->regex_filter = regexp_str;
378   rl->regex_filter_obj = regexp;
379   rl->next = regexfilterlist_head;
380   regexfilterlist_head = rl;
381
382   return (0);
383 }
384
385 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
386 {
387   for (int i = 0; i < ci->children_num; i++) {
388     oconfig_item_t *child = ci->children + i;
389
390     if (strcasecmp("Listen", child->key) == 0)
391       sysevent_config_add_listen(child);
392     else if (strcasecmp("BufferSize", child->key) == 0)
393       sysevent_config_add_buffer_size(child);
394     else if (strcasecmp("BufferLength", child->key) == 0)
395       sysevent_config_add_buffer_length(child);
396     else if (strcasecmp("RegexFilter", child->key) == 0)
397       sysevent_config_add_regex_filter(child);
398     else {
399       WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
400     }
401   }
402
403   return (0);
404 } /* }}} int sysevent_config */
405
406 // TODO
407 static void submit(const char *message, yajl_val *node,
408                    const char *type, /* {{{ */
409                    gauge_t value) {
410   value_list_t vl = VALUE_LIST_INIT;
411
412   vl.values = &(value_t){.gauge = value};
413   vl.values_len = 1;
414   sstrncpy(vl.plugin, "sysevent", sizeof(vl.plugin));
415   sstrncpy(vl.type, type, sizeof(vl.type));
416
417   // Create metadata to store JSON key-values
418   meta_data_t *meta = meta_data_create();
419
420   if (node != NULL) {
421     // If we have a parsed-JSON node to work with, use that
422     size_t i = 0;
423
424     for (i = 0; i < sizeof(rsyslog_keys) / sizeof(*rsyslog_keys); i++) {
425       char json_val[listen_buffer_size];
426       const char *key = (const char *)rsyslog_keys[i];
427       const char *path[] = {key, (const char *)0};
428       yajl_val v = yajl_tree_get(*node, path, yajl_t_string);
429
430       memset(json_val, '\0', listen_buffer_size);
431
432       sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0');
433
434       DEBUG("sysevent plugin: adding jsonval: %s", json_val);
435
436       meta_data_add_string(meta, rsyslog_keys[i], json_val);
437     }
438
439     for (i = 0; i < sizeof(rsyslog_field_keys) / sizeof(*rsyslog_field_keys);
440          i++) {
441       char json_val[listen_buffer_size];
442       const char *key = (const char *)rsyslog_field_keys[i];
443       const char *path[] = {"@fields", key, (const char *)0};
444       yajl_val v = yajl_tree_get(*node, path, yajl_t_string);
445
446       memset(json_val, '\0', listen_buffer_size);
447
448       sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0');
449
450       DEBUG("sysevent plugin: adding jsonval: %s", json_val);
451
452       meta_data_add_string(meta, rsyslog_field_keys[i], json_val);
453     }
454   } else {
455     // Data was not sent in JSON format, so just treat the whole log entry
456     // as the message
457     meta_data_add_string(meta, "@message", strdup(message));
458   }
459
460   vl.meta = meta;
461
462   DEBUG("sysevent plugin: dispatching message");
463
464   plugin_dispatch_values(&vl);
465 } /* }}} void sysevent_submit */
466
467 static int sysevent_read(void) /* {{{ */
468 {
469   if (sysevent_thread_error != 0) {
470     ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
471           "it.",
472           sysevent_thread_error);
473
474     stop_thread(0);
475
476     start_thread();
477
478     return (-1);
479   } /* if (sysevent_thread_error != 0) */
480
481   pthread_mutex_lock(&sysevent_lock);
482
483   while (ring.head != ring.tail) {
484     int is_match = 1;
485     char *match_str = NULL;
486     regexfilterlist_t *rl = regexfilterlist_head;
487     int next = ring.tail + 1;
488
489     if (next >= ring.maxLen)
490       next = 0;
491
492     DEBUG("sysevent plugin: reading %s", ring.buffer[ring.tail]);
493
494     // Try to parse JSON, and if it fails, fall back to plain string
495     yajl_val node;
496     char errbuf[1024];
497
498     errbuf[0] = 0;
499
500     node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
501                            sizeof(errbuf));
502
503     if (node != NULL) {
504       // JSON rsyslog data
505
506       // If we have any regex filters, we need to see if the message portion of
507       // the data matches any of them (otherwise we're not interested)
508       if (regexfilterlist_head != NULL) {
509         char json_val[listen_buffer_size];
510         const char *path[] = {"@message", (const char *)0};
511         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
512
513         memset(json_val, '\0', listen_buffer_size);
514
515         sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0');
516
517         match_str = (char *)&json_val;
518       }
519     } else {
520       // non-JSON rsyslog data
521
522       // If we have any regex filters, we need to see if the message data
523       // matches any of them (otherwise we're not interested)
524       if (regexfilterlist_head != NULL)
525         match_str = ring.buffer[ring.tail];
526     }
527
528     // If we care about matching, do that comparison here
529     if (match_str != NULL) {
530       is_match = 0;
531
532       while (rl != NULL) {
533         regmatch_t matches[SYSEVENT_REGEX_MATCHES];
534
535         is_match = (regexec(&rl->regex_filter_obj, match_str,
536                             SYSEVENT_REGEX_MATCHES, matches, 0) == 0
537                         ? 1
538                         : 0);
539
540         if (is_match == 1) {
541           DEBUG("sysevent plugin: regex filter match: %s", rl->regex_filter);
542           break;
543         }
544
545         rl = rl->next;
546       }
547     }
548
549     if (is_match == 1 && node != NULL)
550       submit(NULL, &node, "gauge", 1);
551     else if (is_match == 1)
552       submit(ring.buffer[ring.tail], NULL, "gauge", 1);
553
554     if (node != NULL)
555       yajl_tree_free(node);
556
557     ring.tail = next;
558   }
559
560   pthread_mutex_unlock(&sysevent_lock);
561
562   return (0);
563 } /* }}} int sysevent_read */
564
565 static int sysevent_shutdown(void) /* {{{ */
566 {
567   int status;
568   regexfilterlist_t *rl;
569
570   DEBUG("sysevent plugin: Shutting down thread.");
571   if (stop_thread(1) < 0)
572     return (-1);
573
574   if (sock != -1) {
575     status = close(sock);
576     if (status != 0) {
577       ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
578             strerror(errno));
579       return (-1);
580     } else
581       sock = -1;
582   }
583
584   free(listen_ip);
585   free(listen_port);
586
587   for (int i = 0; i < buffer_length; i++) {
588     free(ring.buffer[i]);
589   }
590
591   free(ring.buffer);
592
593   rl = regexfilterlist_head;
594   while (rl != NULL) {
595     regexfilterlist_t *rl_next;
596
597     rl_next = rl->next;
598
599     free(rl->regex_filter);
600     regfree(&rl->regex_filter_obj);
601
602     sfree(rl);
603
604     rl = rl_next;
605   }
606
607   return (0);
608 } /* }}} int sysevent_shutdown */
609
610 void module_register(void) {
611   plugin_register_complex_config("sysevent", sysevent_config);
612   plugin_register_init("sysevent", sysevent_init);
613   plugin_register_read("sysevent", sysevent_read);
614   plugin_register_shutdown("sysevent", sysevent_shutdown);
615 } /* void module_register */