VES-enabled sysvent plugin
[collectd.git] / src / sysevent.c
1 /**
2  * collectd - src/sysevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
33
34 #include <asm/types.h>
35 #include <errno.h>
36 #include <netdb.h>
37 #include <netinet/in.h>
38 #include <pthread.h>
39 #include <regex.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <yajl/yajl_common.h>
46 #include <yajl/yajl_gen.h>
47
48 #if HAVE_YAJL_YAJL_VERSION_H
49 #include <yajl/yajl_version.h>
50 #endif
51 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
52 #include <yajl/yajl_tree.h>
53 #define HAVE_YAJL_V2 1
54 #endif
55
56 #define SYSEVENT_DOMAIN_FIELD "domain"
57 #define SYSEVENT_DOMAIN_VALUE "syslog"
58 #define SYSEVENT_EVENT_ID_FIELD "eventId"
59 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
60 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
61 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
62 #define SYSEVENT_PRIORITY_FIELD "priority"
63 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
64 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
65 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
66 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
67 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
69 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
70 #define SYSEVENT_SEQUENCE_FIELD "sequence"
71 #define SYSEVENT_SEQUENCE_VALUE "0"
72 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
73 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
74 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
75 #define SYSEVENT_VERSION_FIELD "version"
76 #define SYSEVENT_VERSION_VALUE "1.0"
77
78 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
80 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
81 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
83 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
84 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
85 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
86 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
87 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
88 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
89
90 /*
91  * Private data types
92  */
93
94 typedef struct {
95   int head;
96   int tail;
97   int maxLen;
98   char **buffer;
99   long long unsigned int *timestamp;
100 } circbuf_t;
101
102 /*
103  * Private variables
104  */
105 static ignorelist_t *ignorelist = NULL;
106
107 static int sysevent_thread_loop = 0;
108 static int sysevent_thread_error = 0;
109 static pthread_t sysevent_thread_id;
110 static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
111 static int sock = -1;
112 static int event_id = 0;
113 static circbuf_t ring;
114
115 static char *listen_ip;
116 static char *listen_port;
117 static int listen_buffer_size = 4096;
118 static int buffer_length = 10;
119
120 static int monitor_all_messages = 1;
121
122 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
123 static const char *rsyslog_field_keys[5] = {
124     "facility", "severity", "severity-num", "program", "processid"};
125
126 /*
127  * Private functions
128  */
129
130 static int gen_message_payload(const char *msg, char *sev, int sev_num,
131                                char *process, char *host,
132                                long long unsigned int timestamp, char **buf) {
133   const unsigned char *buf2;
134   yajl_gen g;
135   char json_str[DATA_MAX_NAME_LEN];
136
137 #if !defined(HAVE_YAJL_V2)
138   yajl_gen_config conf = {};
139
140   conf.beautify = 0;
141 #endif
142
143 #if HAVE_YAJL_V2
144   size_t len;
145   g = yajl_gen_alloc(NULL);
146   yajl_gen_config(g, yajl_gen_beautify, 0);
147 #else
148   unsigned int len;
149   g = yajl_gen_alloc(&conf, NULL);
150 #endif
151
152   yajl_gen_clear(g);
153
154   // *** BEGIN common event header ***
155
156   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
157     goto err;
158
159   // domain
160   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
161                       strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
162     goto err;
163
164   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
165                       strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
166     goto err;
167
168   // eventId
169   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
170                       strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
171     goto err;
172
173   event_id = event_id + 1;
174   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
175   memset(json_str, '\0', DATA_MAX_NAME_LEN);
176   snprintf(json_str, event_id_len, "%d", event_id);
177
178   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
179     goto err;
180   }
181
182   // eventName
183   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
184                       strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
185     goto err;
186
187   int event_name_len = 0;
188   event_name_len = event_name_len + strlen(host); // host name
189   event_name_len =
190       event_name_len +
191       22; // "host", "rsyslog", "message", 3 spaces and null-terminator
192   memset(json_str, '\0', DATA_MAX_NAME_LEN);
193   snprintf(json_str, event_name_len, "host %s rsyslog message", host);
194
195   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
196       yajl_gen_status_ok) {
197     goto err;
198   }
199
200   // lastEpochMicrosec
201   if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
202                       strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
203       yajl_gen_status_ok)
204     goto err;
205
206   int last_epoch_microsec_len =
207       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
208   memset(json_str, '\0', DATA_MAX_NAME_LEN);
209   snprintf(json_str, last_epoch_microsec_len, "%llu",
210            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
211
212   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
213     goto err;
214   }
215
216   // priority
217   if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
218                       strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
219     goto err;
220
221   switch (sev_num) {
222   case 4:
223     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
224                         strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
225         yajl_gen_status_ok)
226       goto err;
227     break;
228   case 5:
229     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
230                         strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
231         yajl_gen_status_ok)
232       goto err;
233     break;
234   case 6:
235   case 7:
236     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
237                         strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
238         yajl_gen_status_ok)
239       goto err;
240     break;
241   default:
242     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
243                         strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
244         yajl_gen_status_ok)
245       goto err;
246     break;
247   }
248
249   // reportingEntityName
250   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
251                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
252       yajl_gen_status_ok)
253     goto err;
254
255   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
256                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
257       yajl_gen_status_ok)
258     goto err;
259
260   // sequence
261   if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
262                       strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
263     goto err;
264
265   if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
266                       strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
267     goto err;
268
269   // sourceName
270   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
271                       strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
272     goto err;
273
274   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
275                       strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
276     goto err;
277
278   // startEpochMicrosec
279   if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
280                       strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
281       yajl_gen_status_ok)
282     goto err;
283
284   int start_epoch_microsec_len =
285       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
286   memset(json_str, '\0', DATA_MAX_NAME_LEN);
287   snprintf(json_str, start_epoch_microsec_len, "%llu",
288            (long long unsigned int)timestamp);
289
290   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
291     goto err;
292   }
293
294   // version
295   if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
296                       strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
297     goto err;
298
299   if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
300                       strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
301     goto err;
302
303   // *** END common event header ***
304
305   // *** BEGIN syslog fields ***
306
307   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
308                       strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
309       yajl_gen_status_ok)
310     goto err;
311
312   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
313     goto err;
314
315   // eventSourceHost
316   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
317                       strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
318       yajl_gen_status_ok)
319     goto err;
320
321   if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
322     goto err;
323
324   // eventSourceType
325   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
326                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
327       yajl_gen_status_ok)
328     goto err;
329
330   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
331                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
332       yajl_gen_status_ok)
333     goto err;
334
335   // syslogFieldsVersion
336   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
337                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
338       yajl_gen_status_ok)
339     goto err;
340
341   if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
342                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
343       yajl_gen_status_ok)
344     goto err;
345
346   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
347     goto err;
348
349   // syslogMsg
350   if (msg != NULL) {
351     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
352                         strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
353         yajl_gen_status_ok)
354       goto err;
355
356     if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
357       goto err;
358   }
359
360   // syslogProc
361   if (process != NULL) {
362     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
363                         strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
364         yajl_gen_status_ok)
365       goto err;
366
367     if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
368         yajl_gen_status_ok)
369       goto err;
370   }
371
372   // syslogSev
373   if (sev != NULL) {
374     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
375                         strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
376         yajl_gen_status_ok)
377       goto err;
378
379     if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
380       goto err;
381   }
382
383   // syslogTag
384   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
385                       strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
386     goto err;
387
388   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
389                       strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
390     goto err;
391
392   // *** END syslog fields ***
393
394   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
395     goto err;
396
397   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
398     goto err;
399
400   *buf = malloc(strlen((char *)buf2) + 1);
401
402   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
403
404   yajl_gen_free(g);
405
406   return 0;
407
408 err:
409   yajl_gen_free(g);
410   ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
411   return -1;
412 }
413
414 static void *sysevent_thread(void *arg) /* {{{ */
415 {
416   pthread_mutex_lock(&sysevent_lock);
417
418   while (sysevent_thread_loop > 0) {
419     int status = 0;
420
421     pthread_mutex_unlock(&sysevent_lock);
422
423     if (sock == -1)
424       return ((void *)0);
425
426     char buffer[listen_buffer_size];
427     struct sockaddr_storage src_addr;
428     socklen_t src_addr_len = sizeof(src_addr);
429
430     memset(buffer, '\0', listen_buffer_size);
431
432     ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
433                              (struct sockaddr *)&src_addr, &src_addr_len);
434
435     if (count == -1) {
436       ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
437       status = -1;
438     } else if (count >= sizeof(buffer)) {
439       WARNING("sysevent plugin: datagram too large for buffer: truncated");
440     } else {
441       // 1. Acquire lock
442       // 2. Push to buffer if there is room, otherwise raise warning
443
444       pthread_mutex_lock(&sysevent_lock);
445
446       int next = ring.head + 1;
447       if (next >= ring.maxLen)
448         next = 0;
449
450       if (next == ring.tail) {
451         WARNING("sysevent plugin: ring buffer full");
452       } else {
453         DEBUG("sysevent plugin: writing %s", buffer);
454
455         strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
456         ring.timestamp[ring.head] =
457             (long long unsigned int)CDTIME_T_TO_US(cdtime());
458         ring.head = next;
459       }
460
461       pthread_mutex_unlock(&sysevent_lock);
462     }
463
464     usleep(1000);
465
466     pthread_mutex_lock(&sysevent_lock);
467
468     if (status < 0) {
469       WARNING("sysevent plugin: problem with thread status: %d", status);
470       sysevent_thread_error = 1;
471       break;
472     }
473
474     if (sysevent_thread_loop <= 0)
475       break;
476   } /* while (sysevent_thread_loop > 0) */
477
478   pthread_mutex_unlock(&sysevent_lock);
479
480   // pthread_exit instead of return?
481   return ((void *)0);
482 } /* }}} void *sysevent_thread */
483
484 static int start_thread(void) /* {{{ */
485 {
486   int status;
487
488   pthread_mutex_lock(&sysevent_lock);
489
490   if (sysevent_thread_loop != 0) {
491     pthread_mutex_unlock(&sysevent_lock);
492     return (0);
493   }
494
495   sysevent_thread_loop = 1;
496   sysevent_thread_error = 0;
497
498   DEBUG("sysevent plugin: starting thread");
499
500   status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
501                                 sysevent_thread,
502                                 /* arg = */ (void *)0, "sysevent");
503   if (status != 0) {
504     sysevent_thread_loop = 0;
505     ERROR("sysevent plugin: starting thread failed.");
506     pthread_mutex_unlock(&sysevent_lock);
507     return (-1);
508   }
509
510   pthread_mutex_unlock(&sysevent_lock);
511   return (0);
512 } /* }}} int start_thread */
513
514 static int stop_thread(int shutdown) /* {{{ */
515 {
516   int status;
517
518   pthread_mutex_lock(&sysevent_lock);
519
520   if (sysevent_thread_loop == 0) {
521     pthread_mutex_unlock(&sysevent_lock);
522     return (-1);
523   }
524
525   sysevent_thread_loop = 0;
526   pthread_mutex_unlock(&sysevent_lock);
527
528   if (shutdown == 1) {
529     // Since the thread is blocking, calling pthread_join
530     // doesn't actually succeed in stopping it.  It will stick around
531     // until a message is received on the socket (at which
532     // it will realize that "sysevent_thread_loop" is 0 and will
533     // break out of the read loop and be allowed to die).  This is
534     // fine when the process isn't supposed to be exiting, but in
535     // the case of a process shutdown, we don't want to have an
536     // idle thread hanging around.  Calling pthread_cancel here in
537     // the case of a shutdown is just assures that the thread is
538     // gone and that the process has been fully terminated.
539
540     DEBUG("sysevent plugin: Canceling thread for process shutdown");
541
542     status = pthread_cancel(sysevent_thread_id);
543
544     if (status != 0) {
545       ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
546             strerror(errno));
547       status = -1;
548     }
549   } else {
550     status = pthread_join(sysevent_thread_id, /* return = */ NULL);
551     if (status != 0) {
552       ERROR("sysevent plugin: Stopping thread failed.");
553       status = -1;
554     }
555   }
556
557   pthread_mutex_lock(&sysevent_lock);
558   memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
559   sysevent_thread_error = 0;
560   pthread_mutex_unlock(&sysevent_lock);
561
562   DEBUG("sysevent plugin: Finished requesting stop of thread");
563
564   return (status);
565 } /* }}} int stop_thread */
566
567 static int sysevent_init(void) /* {{{ */
568 {
569   ring.head = 0;
570   ring.tail = 0;
571   ring.maxLen = buffer_length;
572   ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
573
574   for (int i = 0; i < buffer_length; i++) {
575     ring.buffer[i] = malloc(listen_buffer_size);
576   }
577
578   ring.timestamp = (long long unsigned int *)malloc(
579       buffer_length * sizeof(long long unsigned int));
580
581   if (sock == -1) {
582     const char *hostname = listen_ip;
583     const char *portname = listen_port;
584     struct addrinfo hints;
585     memset(&hints, 0, sizeof(hints));
586     hints.ai_family = AF_UNSPEC;
587     hints.ai_socktype = SOCK_DGRAM;
588     hints.ai_protocol = 0;
589     hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
590     struct addrinfo *res = 0;
591
592     int err = getaddrinfo(hostname, portname, &hints, &res);
593
594     if (err != 0) {
595       ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
596             err);
597       freeaddrinfo(res);
598       return (-1);
599     }
600
601     sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
602     if (sock == -1) {
603       ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
604       freeaddrinfo(res);
605       return (-1);
606     }
607
608     if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
609       ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
610       freeaddrinfo(res);
611       return (-1);
612     }
613
614     freeaddrinfo(res);
615   }
616
617   DEBUG("sysevent plugin: socket created and bound");
618
619   return (start_thread());
620 } /* }}} int sysevent_init */
621
622 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
623 {
624   if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
625       ci->values[1].type != OCONFIG_TYPE_STRING) {
626     ERROR("sysevent plugin: The `%s' config option needs "
627           "two string arguments (ip and port).",
628           ci->key);
629     return (-1);
630   }
631
632   listen_ip = strdup(ci->values[0].value.string);
633   listen_port = strdup(ci->values[1].value.string);
634
635   return (0);
636 }
637
638 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
639 {
640   int tmp = 0;
641
642   if (cf_util_get_int(ci, &tmp) != 0)
643     return (-1);
644   else if ((tmp >= 1024) && (tmp <= 65535))
645     listen_buffer_size = tmp;
646   else {
647     WARNING(
648         "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
649     return (-1);
650   }
651
652   return (0);
653 }
654
655 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
656 {
657   int tmp = 0;
658
659   if (cf_util_get_int(ci, &tmp) != 0)
660     return (-1);
661   else if ((tmp >= 3) && (tmp <= 4096))
662     buffer_length = tmp;
663   else {
664     WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
665     return (-1);
666   }
667
668   return (0);
669 }
670
671 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
672 {
673   if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
674     ERROR("sysevent plugin: The `%s' config option needs "
675           "one string argument, a regular expression.",
676           ci->key);
677     return (-1);
678   }
679
680 #if HAVE_REGEX_H
681   if (ignorelist == NULL)
682     ignorelist = ignorelist_create(/* invert = */ 1);
683
684   int status = ignorelist_add(ignorelist, ci->values[0].value.string);
685
686   if (status != 0) {
687     ERROR("sysevent plugin: invalid regular expression: %s",
688           ci->values[0].value.string);
689     return (1);
690   }
691
692   monitor_all_messages = 0;
693 #else
694   WARNING("sysevent plugin: The plugin has been compiled without support "
695           "for the \"RegexFilter\" option.");
696 #endif
697
698   return (0);
699 }
700
701 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
702 {
703   for (int i = 0; i < ci->children_num; i++) {
704     oconfig_item_t *child = ci->children + i;
705
706     if (strcasecmp("Listen", child->key) == 0)
707       sysevent_config_add_listen(child);
708     else if (strcasecmp("BufferSize", child->key) == 0)
709       sysevent_config_add_buffer_size(child);
710     else if (strcasecmp("BufferLength", child->key) == 0)
711       sysevent_config_add_buffer_length(child);
712     else if (strcasecmp("RegexFilter", child->key) == 0)
713       sysevent_config_add_regex_filter(child);
714     else {
715       WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
716     }
717   }
718
719   return (0);
720 } /* }}} int sysevent_config */
721
722 static void sysevent_dispatch_notification(const char *message,
723 #if HAVE_YAJL_V2
724                                            yajl_val *node,
725 #endif
726                                            long long unsigned int timestamp) {
727   char *buf = NULL;
728   notification_t n = {NOTIF_OKAY, cdtime(), "", "",  "sysevent",
729                       "",         "",       "", NULL};
730
731 #if HAVE_YAJL_V2
732   if (node != NULL) {
733     // If we have a parsed-JSON node to work with, use that
734
735     char process[listen_buffer_size];
736     char severity[listen_buffer_size];
737     char sev_num_str[listen_buffer_size];
738     char msg[listen_buffer_size];
739     char hostname_str[listen_buffer_size];
740     int sev_num = -1;
741
742     // msg
743     const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
744     yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
745
746     if (msg_v != NULL) {
747       memset(msg, '\0', listen_buffer_size);
748       snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
749     }
750
751     // severity
752     const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
753                                    (const char *)0};
754     yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
755
756     if (severity_v != NULL) {
757       memset(severity, '\0', listen_buffer_size);
758       snprintf(severity, listen_buffer_size, "%s%c",
759                YAJL_GET_STRING(severity_v), '\0');
760     }
761
762     // sev_num
763     const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
764                                       (const char *)0};
765     yajl_val sev_num_str_v =
766         yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
767
768     if (sev_num_str_v != NULL) {
769       memset(sev_num_str, '\0', listen_buffer_size);
770       snprintf(sev_num_str, listen_buffer_size, "%s%c",
771                YAJL_GET_STRING(sev_num_str_v), '\0');
772
773       sev_num = atoi(sev_num_str);
774
775       if (sev_num < 4)
776         n.severity = NOTIF_FAILURE;
777     }
778
779     // process
780     const char *process_path[] = {"@fields", rsyslog_field_keys[3],
781                                   (const char *)0};
782     yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
783
784     if (process_v != NULL) {
785       memset(process, '\0', listen_buffer_size);
786       snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
787                '\0');
788     }
789
790     // hostname
791     const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
792     yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
793
794     if (hostname_v != NULL) {
795       memset(hostname_str, '\0', listen_buffer_size);
796       snprintf(hostname_str, listen_buffer_size, "%s%c",
797                YAJL_GET_STRING(hostname_v), '\0');
798     }
799
800     gen_message_payload(
801         (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
802         (sev_num_str_v != NULL ? sev_num : -1),
803         (process_v != NULL ? process : NULL),
804         (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
805   } else {
806     // Data was not sent in JSON format, so just treat the whole log entry
807     // as the message (and we'll be unable to acquire certain data, so the
808     // payload
809     // generated below will be less informative)
810
811     gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
812   }
813 #else
814   gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
815 #endif
816
817   sstrncpy(n.host, hostname_g, sizeof(n.host));
818   sstrncpy(n.type, "gauge", sizeof(n.type));
819
820   notification_meta_t *m = calloc(1, sizeof(*m));
821
822   if (m == NULL) {
823     char errbuf[1024];
824     sfree(buf);
825     ERROR("sysevent plugin: unable to allocate metadata: %s",
826           sstrerror(errno, errbuf, sizeof(errbuf)));
827     return;
828   }
829
830   sstrncpy(m->name, "ves", sizeof(m->name));
831   m->nm_value.nm_string = sstrdup(buf);
832   m->type = NM_TYPE_STRING;
833   n.meta = m;
834
835   DEBUG("sysevent plugin: notification message: %s",
836         n.meta->nm_value.nm_string);
837
838   DEBUG("sysevent plugin: dispatching message");
839
840   plugin_dispatch_notification(&n);
841   plugin_notification_meta_free(n.meta);
842
843   // malloc'd in gen_message_payload
844   if (buf != NULL)
845     sfree(buf);
846 }
847
848 static int sysevent_read(void) /* {{{ */
849 {
850   if (sysevent_thread_error != 0) {
851     ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
852           "it.",
853           sysevent_thread_error);
854
855     stop_thread(0);
856
857     start_thread();
858
859     return (-1);
860   } /* if (sysevent_thread_error != 0) */
861
862   pthread_mutex_lock(&sysevent_lock);
863
864   while (ring.head != ring.tail) {
865     long long unsigned int timestamp;
866     int is_match = 1;
867     char *match_str = NULL;
868     int next = ring.tail + 1;
869
870     if (next >= ring.maxLen)
871       next = 0;
872
873     DEBUG("sysevent plugin: reading from ring buffer: %s",
874           ring.buffer[ring.tail]);
875
876     timestamp = ring.timestamp[ring.tail];
877
878 #if HAVE_YAJL_V2
879     // Try to parse JSON, and if it fails, fall back to plain string
880     yajl_val node = NULL;
881     char errbuf[1024];
882     errbuf[0] = 0;
883     node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
884                            sizeof(errbuf));
885
886     if (node != NULL) {
887       // JSON rsyslog data
888
889       // If we have any regex filters, we need to see if the message portion of
890       // the data matches any of them (otherwise we're not interested)
891       if (monitor_all_messages == 0) {
892         char json_val[listen_buffer_size];
893         const char *path[] = {"@message", (const char *)0};
894         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
895
896         memset(json_val, '\0', listen_buffer_size);
897
898         snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
899                  '\0');
900
901         match_str = (char *)&json_val;
902       }
903     } else {
904       // non-JSON rsyslog data
905
906       // If we have any regex filters, we need to see if the message data
907       // matches any of them (otherwise we're not interested)
908       if (monitor_all_messages == 0)
909         match_str = ring.buffer[ring.tail];
910     }
911 #else
912     // If we have any regex filters, we need to see if the message data
913     // matches any of them (otherwise we're not interested)
914     if (monitor_all_messages == 0)
915       match_str = ring.buffer[ring.tail];
916 #endif
917
918     // If we care about matching, do that comparison here
919     if (match_str != NULL) {
920       is_match = 1;
921
922       if (ignorelist_match(ignorelist, match_str) != 0)
923         is_match = 0;
924       else
925         DEBUG("sysevent plugin: regex filter match");
926     }
927
928 #if HAVE_YAJL_V2
929     if (is_match == 1 && node != NULL) {
930       sysevent_dispatch_notification(NULL, &node, timestamp);
931       yajl_tree_free(node);
932     } else if (is_match == 1)
933       sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
934 #else
935     if (is_match == 1)
936       sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
937 #endif
938
939     ring.tail = next;
940   }
941
942   pthread_mutex_unlock(&sysevent_lock);
943
944   return (0);
945 } /* }}} int sysevent_read */
946
947 static int sysevent_shutdown(void) /* {{{ */
948 {
949   int status;
950
951   DEBUG("sysevent plugin: Shutting down thread.");
952   if (stop_thread(1) < 0)
953     return (-1);
954
955   if (sock != -1) {
956     status = close(sock);
957     if (status != 0) {
958       ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
959             strerror(errno));
960       return (-1);
961     } else
962       sock = -1;
963   }
964
965   free(listen_ip);
966   free(listen_port);
967
968   for (int i = 0; i < buffer_length; i++) {
969     free(ring.buffer[i]);
970   }
971
972   free(ring.buffer);
973   free(ring.timestamp);
974
975   return (0);
976 } /* }}} int sysevent_shutdown */
977
978 void module_register(void) {
979   plugin_register_complex_config("sysevent", sysevent_config);
980   plugin_register_init("sysevent", sysevent_init);
981   plugin_register_read("sysevent", sysevent_read);
982   plugin_register_shutdown("sysevent", sysevent_shutdown);
983 } /* void module_register */