4948b2b87bd4b492ea3cafbc92b723e5ba022891
[collectd.git] / src / sysevent.c
1 /**
2  * collectd - src/sysevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
33
34 #include <errno.h>
35 #include <netdb.h>
36 #include <netinet/in.h>
37 #include <pthread.h>
38 #include <regex.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <yajl/yajl_common.h>
45 #include <yajl/yajl_gen.h>
46
47 #if HAVE_YAJL_YAJL_VERSION_H
48 #include <yajl/yajl_version.h>
49 #endif
50 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
51 #include <yajl/yajl_tree.h>
52 #define HAVE_YAJL_V2 1
53 #endif
54
55 #define SYSEVENT_DOMAIN_FIELD "domain"
56 #define SYSEVENT_DOMAIN_VALUE "syslog"
57 #define SYSEVENT_EVENT_ID_FIELD "eventId"
58 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
59 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
60 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
61 #define SYSEVENT_PRIORITY_FIELD "priority"
62 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
63 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
64 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
65 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
66 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
67 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
69 #define SYSEVENT_SEQUENCE_FIELD "sequence"
70 #define SYSEVENT_SEQUENCE_VALUE "0"
71 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
72 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
73 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
74 #define SYSEVENT_VERSION_FIELD "version"
75 #define SYSEVENT_VERSION_VALUE "1.0"
76
77 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
78 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
80 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
81 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
83 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
84 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
85 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
86 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
87 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
88
89 /*
90  * Private data types
91  */
92
93 typedef struct {
94   int head;
95   int tail;
96   int maxLen;
97   char **buffer;
98   long long unsigned int *timestamp;
99 } circbuf_t;
100
101 /*
102  * Private variables
103  */
104 static ignorelist_t *ignorelist = NULL;
105
106 static int sysevent_thread_loop = 0;
107 static int sysevent_thread_error = 0;
108 static pthread_t sysevent_thread_id;
109 static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
110 static int sock = -1;
111 static int event_id = 0;
112 static circbuf_t ring;
113
114 static char *listen_ip;
115 static char *listen_port;
116 static int listen_buffer_size = 4096;
117 static int buffer_length = 10;
118
119 static int monitor_all_messages = 1;
120
121 #if HAVE_YAJL_V2
122 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
123 static const char *rsyslog_field_keys[5] = {
124     "facility", "severity", "severity-num", "program", "processid"};
125 #endif
126
127 /*
128  * Private functions
129  */
130
131 static int gen_message_payload(const char *msg, char *sev, int sev_num,
132                                char *process, char *host,
133                                long long unsigned int timestamp, char **buf) {
134   const unsigned char *buf2;
135   yajl_gen g;
136   char json_str[DATA_MAX_NAME_LEN];
137
138 #if !defined(HAVE_YAJL_V2)
139   yajl_gen_config conf = {};
140
141   conf.beautify = 0;
142 #endif
143
144 #if HAVE_YAJL_V2
145   size_t len;
146   g = yajl_gen_alloc(NULL);
147   yajl_gen_config(g, yajl_gen_beautify, 0);
148 #else
149   unsigned int len;
150   g = yajl_gen_alloc(&conf, NULL);
151 #endif
152
153   yajl_gen_clear(g);
154
155   // *** BEGIN common event header ***
156
157   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
158     goto err;
159
160   // domain
161   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
162                       strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
163     goto err;
164
165   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
166                       strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
167     goto err;
168
169   // eventId
170   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
171                       strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
172     goto err;
173
174   event_id = event_id + 1;
175   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
176   memset(json_str, '\0', DATA_MAX_NAME_LEN);
177   snprintf(json_str, event_id_len, "%d", event_id);
178
179   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
180     goto err;
181   }
182
183   // eventName
184   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
185                       strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
186     goto err;
187
188   int event_name_len = 0;
189   event_name_len = event_name_len + strlen(host); // host name
190   event_name_len =
191       event_name_len +
192       22; // "host", "rsyslog", "message", 3 spaces and null-terminator
193   memset(json_str, '\0', DATA_MAX_NAME_LEN);
194   snprintf(json_str, event_name_len, "host %s rsyslog message", host);
195
196   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
197       yajl_gen_status_ok) {
198     goto err;
199   }
200
201   // lastEpochMicrosec
202   if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
203                       strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
204       yajl_gen_status_ok)
205     goto err;
206
207   int last_epoch_microsec_len =
208       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
209   memset(json_str, '\0', DATA_MAX_NAME_LEN);
210   snprintf(json_str, last_epoch_microsec_len, "%llu",
211            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
212
213   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
214     goto err;
215   }
216
217   // priority
218   if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
219                       strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
220     goto err;
221
222   switch (sev_num) {
223   case 4:
224     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
225                         strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
226         yajl_gen_status_ok)
227       goto err;
228     break;
229   case 5:
230     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
231                         strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
232         yajl_gen_status_ok)
233       goto err;
234     break;
235   case 6:
236   case 7:
237     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
238                         strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
239         yajl_gen_status_ok)
240       goto err;
241     break;
242   default:
243     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
244                         strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
245         yajl_gen_status_ok)
246       goto err;
247     break;
248   }
249
250   // reportingEntityName
251   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
252                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
253       yajl_gen_status_ok)
254     goto err;
255
256   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
257                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
258       yajl_gen_status_ok)
259     goto err;
260
261   // sequence
262   if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
263                       strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
264     goto err;
265
266   if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
267                       strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
268     goto err;
269
270   // sourceName
271   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
272                       strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
273     goto err;
274
275   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
276                       strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
277     goto err;
278
279   // startEpochMicrosec
280   if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
281                       strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
282       yajl_gen_status_ok)
283     goto err;
284
285   int start_epoch_microsec_len =
286       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
287   memset(json_str, '\0', DATA_MAX_NAME_LEN);
288   snprintf(json_str, start_epoch_microsec_len, "%llu",
289            (long long unsigned int)timestamp);
290
291   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
292     goto err;
293   }
294
295   // version
296   if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
297                       strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
298     goto err;
299
300   if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
301                       strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
302     goto err;
303
304   // *** END common event header ***
305
306   // *** BEGIN syslog fields ***
307
308   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
309                       strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
310       yajl_gen_status_ok)
311     goto err;
312
313   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
314     goto err;
315
316   // eventSourceHost
317   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
318                       strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
319       yajl_gen_status_ok)
320     goto err;
321
322   if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
323     goto err;
324
325   // eventSourceType
326   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
327                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
328       yajl_gen_status_ok)
329     goto err;
330
331   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
332                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
333       yajl_gen_status_ok)
334     goto err;
335
336   // syslogFieldsVersion
337   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
338                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
339       yajl_gen_status_ok)
340     goto err;
341
342   if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
343                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
344       yajl_gen_status_ok)
345     goto err;
346
347   // syslogMsg
348   if (msg != NULL) {
349     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
350                         strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
351         yajl_gen_status_ok)
352       goto err;
353
354     if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
355       goto err;
356   }
357
358   // syslogProc
359   if (process != NULL) {
360     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
361                         strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
362         yajl_gen_status_ok)
363       goto err;
364
365     if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
366         yajl_gen_status_ok)
367       goto err;
368   }
369
370   // syslogSev
371   if (sev != NULL) {
372     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
373                         strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
374         yajl_gen_status_ok)
375       goto err;
376
377     if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
378       goto err;
379   }
380
381   // syslogTag
382   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
383                       strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
384     goto err;
385
386   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
387                       strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
388     goto err;
389
390   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
391     goto err;
392
393   // *** END syslog fields ***
394
395   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
396     goto err;
397
398   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
399     goto err;
400
401   *buf = malloc(strlen((char *)buf2) + 1);
402
403   if (*buf == NULL)
404   {
405     ERROR("sysevent plugin: gen_message_payload malloc failed");
406     goto err;
407   }
408
409   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
410
411   yajl_gen_free(g);
412
413   return 0;
414
415 err:
416   yajl_gen_free(g);
417   ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
418   return -1;
419 }
420
421 static void *sysevent_thread(void *arg) /* {{{ */
422 {
423   pthread_mutex_lock(&sysevent_lock);
424
425   while (sysevent_thread_loop > 0) {
426     int status = 0;
427
428     pthread_mutex_unlock(&sysevent_lock);
429
430     if (sock == -1)
431       return ((void *)0);
432
433     char buffer[listen_buffer_size];
434     struct sockaddr_storage src_addr;
435     socklen_t src_addr_len = sizeof(src_addr);
436
437     memset(buffer, '\0', listen_buffer_size);
438
439     ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
440                              (struct sockaddr *)&src_addr, &src_addr_len);
441
442     if (count == -1) {
443       ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
444       status = -1;
445     } else if (count >= sizeof(buffer)) {
446       WARNING("sysevent plugin: datagram too large for buffer: truncated");
447     } else {
448       // 1. Acquire lock
449       // 2. Push to buffer if there is room, otherwise raise warning
450
451       pthread_mutex_lock(&sysevent_lock);
452
453       int next = ring.head + 1;
454       if (next >= ring.maxLen)
455         next = 0;
456
457       if (next == ring.tail) {
458         WARNING("sysevent plugin: ring buffer full");
459       } else {
460         DEBUG("sysevent plugin: writing %s", buffer);
461
462         strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
463         ring.timestamp[ring.head] =
464             (long long unsigned int)CDTIME_T_TO_US(cdtime());
465         ring.head = next;
466       }
467
468       pthread_mutex_unlock(&sysevent_lock);
469     }
470
471     usleep(1000);
472
473     pthread_mutex_lock(&sysevent_lock);
474
475     if (status < 0) {
476       WARNING("sysevent plugin: problem with thread status: %d", status);
477       sysevent_thread_error = 1;
478       break;
479     }
480
481     if (sysevent_thread_loop <= 0)
482       break;
483   } /* while (sysevent_thread_loop > 0) */
484
485   pthread_mutex_unlock(&sysevent_lock);
486
487   // pthread_exit instead of return?
488   return ((void *)0);
489 } /* }}} void *sysevent_thread */
490
491 static int start_thread(void) /* {{{ */
492 {
493   int status;
494
495   pthread_mutex_lock(&sysevent_lock);
496
497   if (sysevent_thread_loop != 0) {
498     pthread_mutex_unlock(&sysevent_lock);
499     return (0);
500   }
501
502   sysevent_thread_loop = 1;
503   sysevent_thread_error = 0;
504
505   DEBUG("sysevent plugin: starting thread");
506
507   status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
508                                 sysevent_thread,
509                                 /* arg = */ (void *)0, "sysevent");
510   if (status != 0) {
511     sysevent_thread_loop = 0;
512     ERROR("sysevent plugin: starting thread failed.");
513     pthread_mutex_unlock(&sysevent_lock);
514     return (-1);
515   }
516
517   pthread_mutex_unlock(&sysevent_lock);
518   return (0);
519 } /* }}} int start_thread */
520
521 static int stop_thread(int shutdown) /* {{{ */
522 {
523   int status;
524
525   pthread_mutex_lock(&sysevent_lock);
526
527   if (sysevent_thread_loop == 0) {
528     pthread_mutex_unlock(&sysevent_lock);
529     return (-1);
530   }
531
532   sysevent_thread_loop = 0;
533   pthread_mutex_unlock(&sysevent_lock);
534
535   if (shutdown == 1) {
536     // Since the thread is blocking, calling pthread_join
537     // doesn't actually succeed in stopping it.  It will stick around
538     // until a message is received on the socket (at which
539     // it will realize that "sysevent_thread_loop" is 0 and will
540     // break out of the read loop and be allowed to die).  This is
541     // fine when the process isn't supposed to be exiting, but in
542     // the case of a process shutdown, we don't want to have an
543     // idle thread hanging around.  Calling pthread_cancel here in
544     // the case of a shutdown is just assures that the thread is
545     // gone and that the process has been fully terminated.
546
547     DEBUG("sysevent plugin: Canceling thread for process shutdown");
548
549     status = pthread_cancel(sysevent_thread_id);
550
551     if (status != 0) {
552       ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
553             strerror(errno));
554       status = -1;
555     }
556   } else {
557     status = pthread_join(sysevent_thread_id, /* return = */ NULL);
558     if (status != 0) {
559       ERROR("sysevent plugin: Stopping thread failed.");
560       status = -1;
561     }
562   }
563
564   pthread_mutex_lock(&sysevent_lock);
565   memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
566   sysevent_thread_error = 0;
567   pthread_mutex_unlock(&sysevent_lock);
568
569   DEBUG("sysevent plugin: Finished requesting stop of thread");
570
571   return (status);
572 } /* }}} int stop_thread */
573
574 static int sysevent_init(void) /* {{{ */
575 {
576   ring.head = 0;
577   ring.tail = 0;
578   ring.maxLen = buffer_length;
579   ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
580
581   if (ring.buffer == NULL)
582   {
583     ERROR("sysevent plugin: sysevent_init malloc failed");
584     return (-1);
585   }
586
587   for (int i = 0; i < buffer_length; i++) {
588     ring.buffer[i] = malloc(listen_buffer_size);
589   }
590
591   ring.timestamp = (long long unsigned int *)malloc(
592       buffer_length * sizeof(long long unsigned int));
593
594   if (sock == -1) {
595     const char *hostname = listen_ip;
596     const char *portname = listen_port;
597     struct addrinfo hints;
598     memset(&hints, 0, sizeof(hints));
599     hints.ai_family = AF_UNSPEC;
600     hints.ai_socktype = SOCK_DGRAM;
601     hints.ai_protocol = 0;
602     hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
603     struct addrinfo *res = 0;
604
605     int err = getaddrinfo(hostname, portname, &hints, &res);
606
607     if (err != 0) {
608       ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
609             err);
610       freeaddrinfo(res);
611       return (-1);
612     }
613
614     sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
615     if (sock == -1) {
616       ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
617       freeaddrinfo(res);
618       return (-1);
619     }
620
621     if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
622       ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
623       freeaddrinfo(res);
624       return (-1);
625     }
626
627     freeaddrinfo(res);
628   }
629
630   DEBUG("sysevent plugin: socket created and bound");
631
632   return (start_thread());
633 } /* }}} int sysevent_init */
634
635 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
636 {
637   if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
638       ci->values[1].type != OCONFIG_TYPE_STRING) {
639     ERROR("sysevent plugin: The `%s' config option needs "
640           "two string arguments (ip and port).",
641           ci->key);
642     return (-1);
643   }
644
645   listen_ip = strdup(ci->values[0].value.string);
646   listen_port = strdup(ci->values[1].value.string);
647
648   return (0);
649 }
650
651 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
652 {
653   int tmp = 0;
654
655   if (cf_util_get_int(ci, &tmp) != 0)
656     return (-1);
657   else if ((tmp >= 1024) && (tmp <= 65535))
658     listen_buffer_size = tmp;
659   else {
660     WARNING(
661         "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
662     return (-1);
663   }
664
665   return (0);
666 }
667
668 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
669 {
670   int tmp = 0;
671
672   if (cf_util_get_int(ci, &tmp) != 0)
673     return (-1);
674   else if ((tmp >= 3) && (tmp <= 4096))
675     buffer_length = tmp;
676   else {
677     WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
678     return (-1);
679   }
680
681   return (0);
682 }
683
684 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
685 {
686   if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
687     ERROR("sysevent plugin: The `%s' config option needs "
688           "one string argument, a regular expression.",
689           ci->key);
690     return (-1);
691   }
692
693 #if HAVE_REGEX_H
694   if (ignorelist == NULL)
695     ignorelist = ignorelist_create(/* invert = */ 1);
696
697   int status = ignorelist_add(ignorelist, ci->values[0].value.string);
698
699   if (status != 0) {
700     ERROR("sysevent plugin: invalid regular expression: %s",
701           ci->values[0].value.string);
702     return (1);
703   }
704
705   monitor_all_messages = 0;
706 #else
707   WARNING("sysevent plugin: The plugin has been compiled without support "
708           "for the \"RegexFilter\" option.");
709 #endif
710
711   return (0);
712 }
713
714 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
715 {
716   for (int i = 0; i < ci->children_num; i++) {
717     oconfig_item_t *child = ci->children + i;
718
719     if (strcasecmp("Listen", child->key) == 0)
720       sysevent_config_add_listen(child);
721     else if (strcasecmp("BufferSize", child->key) == 0)
722       sysevent_config_add_buffer_size(child);
723     else if (strcasecmp("BufferLength", child->key) == 0)
724       sysevent_config_add_buffer_length(child);
725     else if (strcasecmp("RegexFilter", child->key) == 0)
726       sysevent_config_add_regex_filter(child);
727     else {
728       WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
729     }
730   }
731
732   return (0);
733 } /* }}} int sysevent_config */
734
735 static void sysevent_dispatch_notification(const char *message,
736 #if HAVE_YAJL_V2
737                                            yajl_val *node,
738 #endif
739                                            long long unsigned int timestamp) {
740   char *buf = NULL;
741   notification_t n = {NOTIF_OKAY, cdtime(), "", "",  "sysevent",
742                       "",         "",       "", NULL};
743
744 #if HAVE_YAJL_V2
745   if (node != NULL) {
746     // If we have a parsed-JSON node to work with, use that
747
748     char process[listen_buffer_size];
749     char severity[listen_buffer_size];
750     char sev_num_str[listen_buffer_size];
751     char msg[listen_buffer_size];
752     char hostname_str[listen_buffer_size];
753     int sev_num = -1;
754
755     // msg
756     const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
757     yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
758
759     if (msg_v != NULL) {
760       memset(msg, '\0', listen_buffer_size);
761       snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
762     }
763
764     // severity
765     const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
766                                    (const char *)0};
767     yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
768
769     if (severity_v != NULL) {
770       memset(severity, '\0', listen_buffer_size);
771       snprintf(severity, listen_buffer_size, "%s%c",
772                YAJL_GET_STRING(severity_v), '\0');
773     }
774
775     // sev_num
776     const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
777                                       (const char *)0};
778     yajl_val sev_num_str_v =
779         yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
780
781     if (sev_num_str_v != NULL) {
782       memset(sev_num_str, '\0', listen_buffer_size);
783       snprintf(sev_num_str, listen_buffer_size, "%s%c",
784                YAJL_GET_STRING(sev_num_str_v), '\0');
785
786       sev_num = atoi(sev_num_str);
787
788       if (sev_num < 4)
789         n.severity = NOTIF_FAILURE;
790     }
791
792     // process
793     const char *process_path[] = {"@fields", rsyslog_field_keys[3],
794                                   (const char *)0};
795     yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
796
797     if (process_v != NULL) {
798       memset(process, '\0', listen_buffer_size);
799       snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
800                '\0');
801     }
802
803     // hostname
804     const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
805     yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
806
807     if (hostname_v != NULL) {
808       memset(hostname_str, '\0', listen_buffer_size);
809       snprintf(hostname_str, listen_buffer_size, "%s%c",
810                YAJL_GET_STRING(hostname_v), '\0');
811     }
812
813     gen_message_payload(
814         (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
815         (sev_num_str_v != NULL ? sev_num : -1),
816         (process_v != NULL ? process : NULL),
817         (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
818   } else {
819     // Data was not sent in JSON format, so just treat the whole log entry
820     // as the message (and we'll be unable to acquire certain data, so the
821     // payload
822     // generated below will be less informative)
823
824     gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
825   }
826 #else
827   gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
828 #endif
829
830   sstrncpy(n.host, hostname_g, sizeof(n.host));
831   sstrncpy(n.type, "gauge", sizeof(n.type));
832
833   notification_meta_t *m = calloc(1, sizeof(*m));
834
835   if (m == NULL) {
836     char errbuf[1024];
837     sfree(buf);
838     ERROR("sysevent plugin: unable to allocate metadata: %s",
839           sstrerror(errno, errbuf, sizeof(errbuf)));
840     return;
841   }
842
843   sstrncpy(m->name, "ves", sizeof(m->name));
844   m->nm_value.nm_string = sstrdup(buf);
845   m->type = NM_TYPE_STRING;
846   n.meta = m;
847
848   DEBUG("sysevent plugin: notification message: %s",
849         n.meta->nm_value.nm_string);
850
851   DEBUG("sysevent plugin: dispatching message");
852
853   plugin_dispatch_notification(&n);
854   plugin_notification_meta_free(n.meta);
855
856   // malloc'd in gen_message_payload
857   if (buf != NULL)
858     sfree(buf);
859 }
860
861 static int sysevent_read(void) /* {{{ */
862 {
863   if (sysevent_thread_error != 0) {
864     ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
865           "it.",
866           sysevent_thread_error);
867
868     stop_thread(0);
869
870     start_thread();
871
872     return (-1);
873   } /* if (sysevent_thread_error != 0) */
874
875   pthread_mutex_lock(&sysevent_lock);
876
877   while (ring.head != ring.tail) {
878     long long unsigned int timestamp;
879     int is_match = 1;
880     char *match_str = NULL;
881     int next = ring.tail + 1;
882
883     if (next >= ring.maxLen)
884       next = 0;
885
886     DEBUG("sysevent plugin: reading from ring buffer: %s",
887           ring.buffer[ring.tail]);
888
889     timestamp = ring.timestamp[ring.tail];
890
891 #if HAVE_YAJL_V2
892     // Try to parse JSON, and if it fails, fall back to plain string
893     yajl_val node = NULL;
894     char errbuf[1024];
895     errbuf[0] = 0;
896     node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
897                            sizeof(errbuf));
898
899     if (node != NULL) {
900       // JSON rsyslog data
901
902       // If we have any regex filters, we need to see if the message portion of
903       // the data matches any of them (otherwise we're not interested)
904       if (monitor_all_messages == 0) {
905         char json_val[listen_buffer_size];
906         const char *path[] = {"@message", (const char *)0};
907         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
908
909         memset(json_val, '\0', listen_buffer_size);
910
911         snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
912                  '\0');
913
914         match_str = (char *)&json_val;
915       }
916     } else {
917       // non-JSON rsyslog data
918
919       // If we have any regex filters, we need to see if the message data
920       // matches any of them (otherwise we're not interested)
921       if (monitor_all_messages == 0)
922         match_str = ring.buffer[ring.tail];
923     }
924 #else
925     // If we have any regex filters, we need to see if the message data
926     // matches any of them (otherwise we're not interested)
927     if (monitor_all_messages == 0)
928       match_str = ring.buffer[ring.tail];
929 #endif
930
931     // If we care about matching, do that comparison here
932     if (match_str != NULL) {
933       is_match = 1;
934
935       if (ignorelist_match(ignorelist, match_str) != 0)
936         is_match = 0;
937       else
938         DEBUG("sysevent plugin: regex filter match");
939     }
940
941 #if HAVE_YAJL_V2
942     if (is_match == 1 && node != NULL) {
943       sysevent_dispatch_notification(NULL, &node, timestamp);
944       yajl_tree_free(node);
945     } else if (is_match == 1)
946       sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
947 #else
948     if (is_match == 1)
949       sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
950 #endif
951
952     ring.tail = next;
953   }
954
955   pthread_mutex_unlock(&sysevent_lock);
956
957   return (0);
958 } /* }}} int sysevent_read */
959
960 static int sysevent_shutdown(void) /* {{{ */
961 {
962   int status;
963
964   DEBUG("sysevent plugin: Shutting down thread.");
965   if (stop_thread(1) < 0)
966     return (-1);
967
968   if (sock != -1) {
969     status = close(sock);
970     if (status != 0) {
971       ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
972             strerror(errno));
973       return (-1);
974     } else
975       sock = -1;
976   }
977
978   free(listen_ip);
979   free(listen_port);
980
981   for (int i = 0; i < buffer_length; i++) {
982     free(ring.buffer[i]);
983   }
984
985   free(ring.buffer);
986   free(ring.timestamp);
987
988   return (0);
989 } /* }}} int sysevent_shutdown */
990
991 void module_register(void) {
992   plugin_register_complex_config("sysevent", sysevent_config);
993   plugin_register_init("sysevent", sysevent_init);
994   plugin_register_read("sysevent", sysevent_read);
995   plugin_register_shutdown("sysevent", sysevent_shutdown);
996 } /* void module_register */