Only declare rsyslog keys if yajl2 present
[collectd.git] / src / sysevent.c
1 /**
2  * collectd - src/sysevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
33
34 #include <asm/types.h>
35 #include <errno.h>
36 #include <netdb.h>
37 #include <netinet/in.h>
38 #include <pthread.h>
39 #include <regex.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <yajl/yajl_common.h>
46 #include <yajl/yajl_gen.h>
47
48 #if HAVE_YAJL_YAJL_VERSION_H
49 #include <yajl/yajl_version.h>
50 #endif
51 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
52 #include <yajl/yajl_tree.h>
53 #define HAVE_YAJL_V2 1
54 #endif
55
56 #define SYSEVENT_DOMAIN_FIELD "domain"
57 #define SYSEVENT_DOMAIN_VALUE "syslog"
58 #define SYSEVENT_EVENT_ID_FIELD "eventId"
59 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
60 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
61 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
62 #define SYSEVENT_PRIORITY_FIELD "priority"
63 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
64 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
65 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
66 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
67 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
69 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
70 #define SYSEVENT_SEQUENCE_FIELD "sequence"
71 #define SYSEVENT_SEQUENCE_VALUE "0"
72 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
73 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
74 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
75 #define SYSEVENT_VERSION_FIELD "version"
76 #define SYSEVENT_VERSION_VALUE "1.0"
77
78 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
80 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
81 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
83 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
84 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
85 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
86 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
87 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
88 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
89
90 /*
91  * Private data types
92  */
93
94 typedef struct {
95   int head;
96   int tail;
97   int maxLen;
98   char **buffer;
99   long long unsigned int *timestamp;
100 } circbuf_t;
101
102 /*
103  * Private variables
104  */
105 static ignorelist_t *ignorelist = NULL;
106
107 static int sysevent_thread_loop = 0;
108 static int sysevent_thread_error = 0;
109 static pthread_t sysevent_thread_id;
110 static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
111 static int sock = -1;
112 static int event_id = 0;
113 static circbuf_t ring;
114
115 static char *listen_ip;
116 static char *listen_port;
117 static int listen_buffer_size = 4096;
118 static int buffer_length = 10;
119
120 static int monitor_all_messages = 1;
121
122 #if HAVE_YAJL_V2
123 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
124 static const char *rsyslog_field_keys[5] = {
125     "facility", "severity", "severity-num", "program", "processid"};
126 #endif
127     
128 /*
129  * Private functions
130  */
131
132 static int gen_message_payload(const char *msg, char *sev, int sev_num,
133                                char *process, char *host,
134                                long long unsigned int timestamp, char **buf) {
135   const unsigned char *buf2;
136   yajl_gen g;
137   char json_str[DATA_MAX_NAME_LEN];
138
139 #if !defined(HAVE_YAJL_V2)
140   yajl_gen_config conf = {};
141
142   conf.beautify = 0;
143 #endif
144
145 #if HAVE_YAJL_V2
146   size_t len;
147   g = yajl_gen_alloc(NULL);
148   yajl_gen_config(g, yajl_gen_beautify, 0);
149 #else
150   unsigned int len;
151   g = yajl_gen_alloc(&conf, NULL);
152 #endif
153
154   yajl_gen_clear(g);
155
156   // *** BEGIN common event header ***
157
158   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
159     goto err;
160
161   // domain
162   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
163                       strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
164     goto err;
165
166   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
167                       strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
168     goto err;
169
170   // eventId
171   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
172                       strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
173     goto err;
174
175   event_id = event_id + 1;
176   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
177   memset(json_str, '\0', DATA_MAX_NAME_LEN);
178   snprintf(json_str, event_id_len, "%d", event_id);
179
180   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
181     goto err;
182   }
183
184   // eventName
185   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
186                       strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
187     goto err;
188
189   int event_name_len = 0;
190   event_name_len = event_name_len + strlen(host); // host name
191   event_name_len =
192       event_name_len +
193       22; // "host", "rsyslog", "message", 3 spaces and null-terminator
194   memset(json_str, '\0', DATA_MAX_NAME_LEN);
195   snprintf(json_str, event_name_len, "host %s rsyslog message", host);
196
197   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
198       yajl_gen_status_ok) {
199     goto err;
200   }
201
202   // lastEpochMicrosec
203   if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
204                       strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
205       yajl_gen_status_ok)
206     goto err;
207
208   int last_epoch_microsec_len =
209       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
210   memset(json_str, '\0', DATA_MAX_NAME_LEN);
211   snprintf(json_str, last_epoch_microsec_len, "%llu",
212            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
213
214   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
215     goto err;
216   }
217
218   // priority
219   if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
220                       strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
221     goto err;
222
223   switch (sev_num) {
224   case 4:
225     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
226                         strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
227         yajl_gen_status_ok)
228       goto err;
229     break;
230   case 5:
231     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
232                         strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
233         yajl_gen_status_ok)
234       goto err;
235     break;
236   case 6:
237   case 7:
238     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
239                         strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
240         yajl_gen_status_ok)
241       goto err;
242     break;
243   default:
244     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
245                         strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
246         yajl_gen_status_ok)
247       goto err;
248     break;
249   }
250
251   // reportingEntityName
252   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
253                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
254       yajl_gen_status_ok)
255     goto err;
256
257   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
258                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
259       yajl_gen_status_ok)
260     goto err;
261
262   // sequence
263   if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
264                       strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
265     goto err;
266
267   if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
268                       strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
269     goto err;
270
271   // sourceName
272   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
273                       strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
274     goto err;
275
276   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
277                       strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
278     goto err;
279
280   // startEpochMicrosec
281   if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
282                       strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
283       yajl_gen_status_ok)
284     goto err;
285
286   int start_epoch_microsec_len =
287       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
288   memset(json_str, '\0', DATA_MAX_NAME_LEN);
289   snprintf(json_str, start_epoch_microsec_len, "%llu",
290            (long long unsigned int)timestamp);
291
292   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
293     goto err;
294   }
295
296   // version
297   if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
298                       strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
299     goto err;
300
301   if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
302                       strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
303     goto err;
304
305   // *** END common event header ***
306
307   // *** BEGIN syslog fields ***
308
309   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
310                       strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
311       yajl_gen_status_ok)
312     goto err;
313
314   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
315     goto err;
316
317   // eventSourceHost
318   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
319                       strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
320       yajl_gen_status_ok)
321     goto err;
322
323   if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
324     goto err;
325
326   // eventSourceType
327   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
328                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
329       yajl_gen_status_ok)
330     goto err;
331
332   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
333                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
334       yajl_gen_status_ok)
335     goto err;
336
337   // syslogFieldsVersion
338   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
339                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
340       yajl_gen_status_ok)
341     goto err;
342
343   if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
344                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
345       yajl_gen_status_ok)
346     goto err;
347
348   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
349     goto err;
350
351   // syslogMsg
352   if (msg != NULL) {
353     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
354                         strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
355         yajl_gen_status_ok)
356       goto err;
357
358     if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
359       goto err;
360   }
361
362   // syslogProc
363   if (process != NULL) {
364     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
365                         strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
366         yajl_gen_status_ok)
367       goto err;
368
369     if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
370         yajl_gen_status_ok)
371       goto err;
372   }
373
374   // syslogSev
375   if (sev != NULL) {
376     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
377                         strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
378         yajl_gen_status_ok)
379       goto err;
380
381     if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
382       goto err;
383   }
384
385   // syslogTag
386   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
387                       strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
388     goto err;
389
390   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
391                       strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
392     goto err;
393
394   // *** END syslog fields ***
395
396   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
397     goto err;
398
399   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
400     goto err;
401
402   *buf = malloc(strlen((char *)buf2) + 1);
403
404   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
405
406   yajl_gen_free(g);
407
408   return 0;
409
410 err:
411   yajl_gen_free(g);
412   ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
413   return -1;
414 }
415
416 static void *sysevent_thread(void *arg) /* {{{ */
417 {
418   pthread_mutex_lock(&sysevent_lock);
419
420   while (sysevent_thread_loop > 0) {
421     int status = 0;
422
423     pthread_mutex_unlock(&sysevent_lock);
424
425     if (sock == -1)
426       return ((void *)0);
427
428     char buffer[listen_buffer_size];
429     struct sockaddr_storage src_addr;
430     socklen_t src_addr_len = sizeof(src_addr);
431
432     memset(buffer, '\0', listen_buffer_size);
433
434     ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
435                              (struct sockaddr *)&src_addr, &src_addr_len);
436
437     if (count == -1) {
438       ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
439       status = -1;
440     } else if (count >= sizeof(buffer)) {
441       WARNING("sysevent plugin: datagram too large for buffer: truncated");
442     } else {
443       // 1. Acquire lock
444       // 2. Push to buffer if there is room, otherwise raise warning
445
446       pthread_mutex_lock(&sysevent_lock);
447
448       int next = ring.head + 1;
449       if (next >= ring.maxLen)
450         next = 0;
451
452       if (next == ring.tail) {
453         WARNING("sysevent plugin: ring buffer full");
454       } else {
455         DEBUG("sysevent plugin: writing %s", buffer);
456
457         strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
458         ring.timestamp[ring.head] =
459             (long long unsigned int)CDTIME_T_TO_US(cdtime());
460         ring.head = next;
461       }
462
463       pthread_mutex_unlock(&sysevent_lock);
464     }
465
466     usleep(1000);
467
468     pthread_mutex_lock(&sysevent_lock);
469
470     if (status < 0) {
471       WARNING("sysevent plugin: problem with thread status: %d", status);
472       sysevent_thread_error = 1;
473       break;
474     }
475
476     if (sysevent_thread_loop <= 0)
477       break;
478   } /* while (sysevent_thread_loop > 0) */
479
480   pthread_mutex_unlock(&sysevent_lock);
481
482   // pthread_exit instead of return?
483   return ((void *)0);
484 } /* }}} void *sysevent_thread */
485
486 static int start_thread(void) /* {{{ */
487 {
488   int status;
489
490   pthread_mutex_lock(&sysevent_lock);
491
492   if (sysevent_thread_loop != 0) {
493     pthread_mutex_unlock(&sysevent_lock);
494     return (0);
495   }
496
497   sysevent_thread_loop = 1;
498   sysevent_thread_error = 0;
499
500   DEBUG("sysevent plugin: starting thread");
501
502   status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
503                                 sysevent_thread,
504                                 /* arg = */ (void *)0, "sysevent");
505   if (status != 0) {
506     sysevent_thread_loop = 0;
507     ERROR("sysevent plugin: starting thread failed.");
508     pthread_mutex_unlock(&sysevent_lock);
509     return (-1);
510   }
511
512   pthread_mutex_unlock(&sysevent_lock);
513   return (0);
514 } /* }}} int start_thread */
515
516 static int stop_thread(int shutdown) /* {{{ */
517 {
518   int status;
519
520   pthread_mutex_lock(&sysevent_lock);
521
522   if (sysevent_thread_loop == 0) {
523     pthread_mutex_unlock(&sysevent_lock);
524     return (-1);
525   }
526
527   sysevent_thread_loop = 0;
528   pthread_mutex_unlock(&sysevent_lock);
529
530   if (shutdown == 1) {
531     // Since the thread is blocking, calling pthread_join
532     // doesn't actually succeed in stopping it.  It will stick around
533     // until a message is received on the socket (at which
534     // it will realize that "sysevent_thread_loop" is 0 and will
535     // break out of the read loop and be allowed to die).  This is
536     // fine when the process isn't supposed to be exiting, but in
537     // the case of a process shutdown, we don't want to have an
538     // idle thread hanging around.  Calling pthread_cancel here in
539     // the case of a shutdown is just assures that the thread is
540     // gone and that the process has been fully terminated.
541
542     DEBUG("sysevent plugin: Canceling thread for process shutdown");
543
544     status = pthread_cancel(sysevent_thread_id);
545
546     if (status != 0) {
547       ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
548             strerror(errno));
549       status = -1;
550     }
551   } else {
552     status = pthread_join(sysevent_thread_id, /* return = */ NULL);
553     if (status != 0) {
554       ERROR("sysevent plugin: Stopping thread failed.");
555       status = -1;
556     }
557   }
558
559   pthread_mutex_lock(&sysevent_lock);
560   memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
561   sysevent_thread_error = 0;
562   pthread_mutex_unlock(&sysevent_lock);
563
564   DEBUG("sysevent plugin: Finished requesting stop of thread");
565
566   return (status);
567 } /* }}} int stop_thread */
568
569 static int sysevent_init(void) /* {{{ */
570 {
571   ring.head = 0;
572   ring.tail = 0;
573   ring.maxLen = buffer_length;
574   ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
575
576   for (int i = 0; i < buffer_length; i++) {
577     ring.buffer[i] = malloc(listen_buffer_size);
578   }
579
580   ring.timestamp = (long long unsigned int *)malloc(
581       buffer_length * sizeof(long long unsigned int));
582
583   if (sock == -1) {
584     const char *hostname = listen_ip;
585     const char *portname = listen_port;
586     struct addrinfo hints;
587     memset(&hints, 0, sizeof(hints));
588     hints.ai_family = AF_UNSPEC;
589     hints.ai_socktype = SOCK_DGRAM;
590     hints.ai_protocol = 0;
591     hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
592     struct addrinfo *res = 0;
593
594     int err = getaddrinfo(hostname, portname, &hints, &res);
595
596     if (err != 0) {
597       ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
598             err);
599       freeaddrinfo(res);
600       return (-1);
601     }
602
603     sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
604     if (sock == -1) {
605       ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
606       freeaddrinfo(res);
607       return (-1);
608     }
609
610     if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
611       ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
612       freeaddrinfo(res);
613       return (-1);
614     }
615
616     freeaddrinfo(res);
617   }
618
619   DEBUG("sysevent plugin: socket created and bound");
620
621   return (start_thread());
622 } /* }}} int sysevent_init */
623
624 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
625 {
626   if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
627       ci->values[1].type != OCONFIG_TYPE_STRING) {
628     ERROR("sysevent plugin: The `%s' config option needs "
629           "two string arguments (ip and port).",
630           ci->key);
631     return (-1);
632   }
633
634   listen_ip = strdup(ci->values[0].value.string);
635   listen_port = strdup(ci->values[1].value.string);
636
637   return (0);
638 }
639
640 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
641 {
642   int tmp = 0;
643
644   if (cf_util_get_int(ci, &tmp) != 0)
645     return (-1);
646   else if ((tmp >= 1024) && (tmp <= 65535))
647     listen_buffer_size = tmp;
648   else {
649     WARNING(
650         "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
651     return (-1);
652   }
653
654   return (0);
655 }
656
657 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
658 {
659   int tmp = 0;
660
661   if (cf_util_get_int(ci, &tmp) != 0)
662     return (-1);
663   else if ((tmp >= 3) && (tmp <= 4096))
664     buffer_length = tmp;
665   else {
666     WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
667     return (-1);
668   }
669
670   return (0);
671 }
672
673 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
674 {
675   if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
676     ERROR("sysevent plugin: The `%s' config option needs "
677           "one string argument, a regular expression.",
678           ci->key);
679     return (-1);
680   }
681
682 #if HAVE_REGEX_H
683   if (ignorelist == NULL)
684     ignorelist = ignorelist_create(/* invert = */ 1);
685
686   int status = ignorelist_add(ignorelist, ci->values[0].value.string);
687
688   if (status != 0) {
689     ERROR("sysevent plugin: invalid regular expression: %s",
690           ci->values[0].value.string);
691     return (1);
692   }
693
694   monitor_all_messages = 0;
695 #else
696   WARNING("sysevent plugin: The plugin has been compiled without support "
697           "for the \"RegexFilter\" option.");
698 #endif
699
700   return (0);
701 }
702
703 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
704 {
705   for (int i = 0; i < ci->children_num; i++) {
706     oconfig_item_t *child = ci->children + i;
707
708     if (strcasecmp("Listen", child->key) == 0)
709       sysevent_config_add_listen(child);
710     else if (strcasecmp("BufferSize", child->key) == 0)
711       sysevent_config_add_buffer_size(child);
712     else if (strcasecmp("BufferLength", child->key) == 0)
713       sysevent_config_add_buffer_length(child);
714     else if (strcasecmp("RegexFilter", child->key) == 0)
715       sysevent_config_add_regex_filter(child);
716     else {
717       WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
718     }
719   }
720
721   return (0);
722 } /* }}} int sysevent_config */
723
724 static void sysevent_dispatch_notification(const char *message,
725 #if HAVE_YAJL_V2
726                                            yajl_val *node,
727 #endif
728                                            long long unsigned int timestamp) {
729   char *buf = NULL;
730   notification_t n = {NOTIF_OKAY, cdtime(), "", "",  "sysevent",
731                       "",         "",       "", NULL};
732
733 #if HAVE_YAJL_V2
734   if (node != NULL) {
735     // If we have a parsed-JSON node to work with, use that
736
737     char process[listen_buffer_size];
738     char severity[listen_buffer_size];
739     char sev_num_str[listen_buffer_size];
740     char msg[listen_buffer_size];
741     char hostname_str[listen_buffer_size];
742     int sev_num = -1;
743
744     // msg
745     const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
746     yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
747
748     if (msg_v != NULL) {
749       memset(msg, '\0', listen_buffer_size);
750       snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
751     }
752
753     // severity
754     const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
755                                    (const char *)0};
756     yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
757
758     if (severity_v != NULL) {
759       memset(severity, '\0', listen_buffer_size);
760       snprintf(severity, listen_buffer_size, "%s%c",
761                YAJL_GET_STRING(severity_v), '\0');
762     }
763
764     // sev_num
765     const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
766                                       (const char *)0};
767     yajl_val sev_num_str_v =
768         yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
769
770     if (sev_num_str_v != NULL) {
771       memset(sev_num_str, '\0', listen_buffer_size);
772       snprintf(sev_num_str, listen_buffer_size, "%s%c",
773                YAJL_GET_STRING(sev_num_str_v), '\0');
774
775       sev_num = atoi(sev_num_str);
776
777       if (sev_num < 4)
778         n.severity = NOTIF_FAILURE;
779     }
780
781     // process
782     const char *process_path[] = {"@fields", rsyslog_field_keys[3],
783                                   (const char *)0};
784     yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
785
786     if (process_v != NULL) {
787       memset(process, '\0', listen_buffer_size);
788       snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
789                '\0');
790     }
791
792     // hostname
793     const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
794     yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
795
796     if (hostname_v != NULL) {
797       memset(hostname_str, '\0', listen_buffer_size);
798       snprintf(hostname_str, listen_buffer_size, "%s%c",
799                YAJL_GET_STRING(hostname_v), '\0');
800     }
801
802     gen_message_payload(
803         (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
804         (sev_num_str_v != NULL ? sev_num : -1),
805         (process_v != NULL ? process : NULL),
806         (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
807   } else {
808     // Data was not sent in JSON format, so just treat the whole log entry
809     // as the message (and we'll be unable to acquire certain data, so the
810     // payload
811     // generated below will be less informative)
812
813     gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
814   }
815 #else
816   gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
817 #endif
818
819   sstrncpy(n.host, hostname_g, sizeof(n.host));
820   sstrncpy(n.type, "gauge", sizeof(n.type));
821
822   notification_meta_t *m = calloc(1, sizeof(*m));
823
824   if (m == NULL) {
825     char errbuf[1024];
826     sfree(buf);
827     ERROR("sysevent plugin: unable to allocate metadata: %s",
828           sstrerror(errno, errbuf, sizeof(errbuf)));
829     return;
830   }
831
832   sstrncpy(m->name, "ves", sizeof(m->name));
833   m->nm_value.nm_string = sstrdup(buf);
834   m->type = NM_TYPE_STRING;
835   n.meta = m;
836
837   DEBUG("sysevent plugin: notification message: %s",
838         n.meta->nm_value.nm_string);
839
840   DEBUG("sysevent plugin: dispatching message");
841
842   plugin_dispatch_notification(&n);
843   plugin_notification_meta_free(n.meta);
844
845   // malloc'd in gen_message_payload
846   if (buf != NULL)
847     sfree(buf);
848 }
849
850 static int sysevent_read(void) /* {{{ */
851 {
852   if (sysevent_thread_error != 0) {
853     ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
854           "it.",
855           sysevent_thread_error);
856
857     stop_thread(0);
858
859     start_thread();
860
861     return (-1);
862   } /* if (sysevent_thread_error != 0) */
863
864   pthread_mutex_lock(&sysevent_lock);
865
866   while (ring.head != ring.tail) {
867     long long unsigned int timestamp;
868     int is_match = 1;
869     char *match_str = NULL;
870     int next = ring.tail + 1;
871
872     if (next >= ring.maxLen)
873       next = 0;
874
875     DEBUG("sysevent plugin: reading from ring buffer: %s",
876           ring.buffer[ring.tail]);
877
878     timestamp = ring.timestamp[ring.tail];
879
880 #if HAVE_YAJL_V2
881     // Try to parse JSON, and if it fails, fall back to plain string
882     yajl_val node = NULL;
883     char errbuf[1024];
884     errbuf[0] = 0;
885     node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
886                            sizeof(errbuf));
887
888     if (node != NULL) {
889       // JSON rsyslog data
890
891       // If we have any regex filters, we need to see if the message portion of
892       // the data matches any of them (otherwise we're not interested)
893       if (monitor_all_messages == 0) {
894         char json_val[listen_buffer_size];
895         const char *path[] = {"@message", (const char *)0};
896         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
897
898         memset(json_val, '\0', listen_buffer_size);
899
900         snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
901                  '\0');
902
903         match_str = (char *)&json_val;
904       }
905     } else {
906       // non-JSON rsyslog data
907
908       // If we have any regex filters, we need to see if the message data
909       // matches any of them (otherwise we're not interested)
910       if (monitor_all_messages == 0)
911         match_str = ring.buffer[ring.tail];
912     }
913 #else
914     // If we have any regex filters, we need to see if the message data
915     // matches any of them (otherwise we're not interested)
916     if (monitor_all_messages == 0)
917       match_str = ring.buffer[ring.tail];
918 #endif
919
920     // If we care about matching, do that comparison here
921     if (match_str != NULL) {
922       is_match = 1;
923
924       if (ignorelist_match(ignorelist, match_str) != 0)
925         is_match = 0;
926       else
927         DEBUG("sysevent plugin: regex filter match");
928     }
929
930 #if HAVE_YAJL_V2
931     if (is_match == 1 && node != NULL) {
932       sysevent_dispatch_notification(NULL, &node, timestamp);
933       yajl_tree_free(node);
934     } else if (is_match == 1)
935       sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
936 #else
937     if (is_match == 1)
938       sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
939 #endif
940
941     ring.tail = next;
942   }
943
944   pthread_mutex_unlock(&sysevent_lock);
945
946   return (0);
947 } /* }}} int sysevent_read */
948
949 static int sysevent_shutdown(void) /* {{{ */
950 {
951   int status;
952
953   DEBUG("sysevent plugin: Shutting down thread.");
954   if (stop_thread(1) < 0)
955     return (-1);
956
957   if (sock != -1) {
958     status = close(sock);
959     if (status != 0) {
960       ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
961             strerror(errno));
962       return (-1);
963     } else
964       sock = -1;
965   }
966
967   free(listen_ip);
968   free(listen_port);
969
970   for (int i = 0; i < buffer_length; i++) {
971     free(ring.buffer[i]);
972   }
973
974   free(ring.buffer);
975   free(ring.timestamp);
976
977   return (0);
978 } /* }}} int sysevent_shutdown */
979
980 void module_register(void) {
981   plugin_register_complex_config("sysevent", sysevent_config);
982   plugin_register_init("sysevent", sysevent_init);
983   plugin_register_read("sysevent", sysevent_read);
984   plugin_register_shutdown("sysevent", sysevent_shutdown);
985 } /* void module_register */