Check for sysevent plugin init calloc failures
[collectd.git] / src / sysevent.c
1 /**
2  * collectd - src/sysevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/common/common.h"
31 #include "utils/ignorelist/ignorelist.h"
32 #include "utils_complain.h"
33
34 #include <errno.h>
35 #include <netdb.h>
36 #include <netinet/in.h>
37 #include <pthread.h>
38 #include <regex.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <yajl/yajl_common.h>
45 #include <yajl/yajl_gen.h>
46
47 #if HAVE_YAJL_YAJL_VERSION_H
48 #include <yajl/yajl_version.h>
49 #endif
50 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
51 #include <yajl/yajl_tree.h>
52 #define HAVE_YAJL_V2 1
53 #endif
54
55 #define SYSEVENT_DOMAIN_FIELD "domain"
56 #define SYSEVENT_DOMAIN_VALUE "syslog"
57 #define SYSEVENT_EVENT_ID_FIELD "eventId"
58 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
59 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
60 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
61 #define SYSEVENT_PRIORITY_FIELD "priority"
62 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
63 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
64 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
65 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
66 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
67 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
69 #define SYSEVENT_SEQUENCE_FIELD "sequence"
70 #define SYSEVENT_SEQUENCE_VALUE "0"
71 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
72 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
73 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
74 #define SYSEVENT_VERSION_FIELD "version"
75 #define SYSEVENT_VERSION_VALUE "1.0"
76
77 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
78 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
80 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
81 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
83 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
84 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
85 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
86 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
87 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
88
89 /*
90  * Private data types
91  */
92
93 typedef struct {
94   int head;
95   int tail;
96   int maxLen;
97   char **buffer;
98   cdtime_t *timestamp;
99 } circbuf_t;
100
101 /*
102  * Private variables
103  */
104
105 static ignorelist_t *ignorelist = NULL;
106
107 static int sysevent_socket_thread_loop = 0;
108 static int sysevent_socket_thread_error = 0;
109 static pthread_t sysevent_socket_thread_id;
110 static int sysevent_dequeue_thread_loop = 0;
111 static pthread_t sysevent_dequeue_thread_id;
112 static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
114 static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER;
115 static int sock = -1;
116 static int event_id = 0;
117 static circbuf_t ring;
118
119 static char *listen_ip;
120 static char *listen_port;
121 static int listen_buffer_size = 4096;
122 static int buffer_length = 10;
123
124 static int monitor_all_messages = 1;
125
126 #if HAVE_YAJL_V2
127 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
128 static const char *rsyslog_field_keys[5] = {
129     "facility", "severity", "severity-num", "program", "processid"};
130 #endif
131
132 /*
133  * Private functions
134  */
135
136 static int gen_message_payload(const char *msg, char *sev, int sev_num,
137                                char *process, char *host, cdtime_t timestamp,
138                                char **buf) {
139   const unsigned char *buf2;
140   yajl_gen g;
141   char json_str[DATA_MAX_NAME_LEN];
142
143 #if !defined(HAVE_YAJL_V2)
144   yajl_gen_config conf = {0};
145 #endif
146
147 #if HAVE_YAJL_V2
148   size_t len;
149   g = yajl_gen_alloc(NULL);
150   yajl_gen_config(g, yajl_gen_beautify, 0);
151 #else
152   unsigned int len;
153   g = yajl_gen_alloc(&conf, NULL);
154 #endif
155
156   yajl_gen_clear(g);
157
158   // *** BEGIN common event header ***
159
160   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
161     goto err;
162
163   // domain
164   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
165                       strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
166     goto err;
167
168   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
169                       strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
170     goto err;
171
172   // eventId
173   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
174                       strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
175     goto err;
176
177   event_id = event_id + 1;
178   snprintf(json_str, sizeof(json_str), "%d", event_id);
179
180   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
181     goto err;
182   }
183
184   // eventName
185   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
186                       strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
187     goto err;
188
189   snprintf(json_str, sizeof(json_str), "host %s rsyslog message", host);
190
191   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
192       yajl_gen_status_ok) {
193     goto err;
194   }
195
196   // lastEpochMicrosec
197   if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
198                       strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
199       yajl_gen_status_ok)
200     goto err;
201
202   snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(cdtime()));
203
204   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
205     goto err;
206   }
207
208   // priority
209   if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
210                       strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
211     goto err;
212
213   switch (sev_num) {
214   case 4:
215     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
216                         strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
217         yajl_gen_status_ok)
218       goto err;
219     break;
220   case 5:
221     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
222                         strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
223         yajl_gen_status_ok)
224       goto err;
225     break;
226   case 6:
227   case 7:
228     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
229                         strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
230         yajl_gen_status_ok)
231       goto err;
232     break;
233   default:
234     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
235                         strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
236         yajl_gen_status_ok)
237       goto err;
238     break;
239   }
240
241   // reportingEntityName
242   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
243                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
244       yajl_gen_status_ok)
245     goto err;
246
247   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
248                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
249       yajl_gen_status_ok)
250     goto err;
251
252   // sequence
253   if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
254                       strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
255     goto err;
256
257   if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
258                       strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
259     goto err;
260
261   // sourceName
262   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
263                       strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
264     goto err;
265
266   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
267                       strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
268     goto err;
269
270   // startEpochMicrosec
271   if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
272                       strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
273       yajl_gen_status_ok)
274     goto err;
275
276   snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(timestamp));
277
278   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
279     goto err;
280   }
281
282   // version
283   if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
284                       strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
285     goto err;
286
287   if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
288                       strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
289     goto err;
290
291   // *** END common event header ***
292
293   // *** BEGIN syslog fields ***
294
295   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
296                       strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
297       yajl_gen_status_ok)
298     goto err;
299
300   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
301     goto err;
302
303   // eventSourceHost
304   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
305                       strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
306       yajl_gen_status_ok)
307     goto err;
308
309   if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
310     goto err;
311
312   // eventSourceType
313   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
314                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
315       yajl_gen_status_ok)
316     goto err;
317
318   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
319                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
320       yajl_gen_status_ok)
321     goto err;
322
323   // syslogFieldsVersion
324   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
325                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
326       yajl_gen_status_ok)
327     goto err;
328
329   if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
330                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
331       yajl_gen_status_ok)
332     goto err;
333
334   // syslogMsg
335   if (msg != NULL) {
336     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
337                         strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
338         yajl_gen_status_ok)
339       goto err;
340
341     if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
342       goto err;
343   }
344
345   // syslogProc
346   if (process != NULL) {
347     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
348                         strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
349         yajl_gen_status_ok)
350       goto err;
351
352     if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
353         yajl_gen_status_ok)
354       goto err;
355   }
356
357   // syslogSev
358   if (sev != NULL) {
359     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
360                         strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
361         yajl_gen_status_ok)
362       goto err;
363
364     if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
365       goto err;
366   }
367
368   // syslogTag
369   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
370                       strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
371     goto err;
372
373   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
374                       strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
375     goto err;
376
377   // *** END syslog fields ***
378
379   // close syslog and header fields
380   if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
381       yajl_gen_map_close(g) != yajl_gen_status_ok)
382     goto err;
383
384   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
385     goto err;
386
387   *buf = strdup((char *)buf2);
388
389   if (*buf == NULL) {
390     ERROR("sysevent plugin: gen_message_payload strdup failed");
391     goto err;
392   }
393
394   yajl_gen_free(g);
395
396   return 0;
397
398 err:
399   yajl_gen_free(g);
400   ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
401   return -1;
402 }
403
404 static int read_socket() {
405   int recv_flags = MSG_DONTWAIT;
406
407   while (42) {
408     struct sockaddr_storage src_addr;
409     socklen_t src_addr_len = sizeof(src_addr);
410
411     char buffer[listen_buffer_size];
412     memset(buffer, '\0', listen_buffer_size);
413
414     ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags,
415                              (struct sockaddr *)&src_addr, &src_addr_len);
416
417     if (count < 0) {
418       if (errno == EAGAIN || errno == EWOULDBLOCK) {
419         pthread_mutex_lock(&sysevent_data_lock);
420
421         // There was nothing more to receive for now, so...
422         // If ring head does not equal ring tail, there is data
423         // in the ring buffer for the dequeue thread to read, so
424         // signal it
425         if (ring.head != ring.tail)
426           pthread_cond_signal(&sysevent_cond);
427
428         pthread_mutex_unlock(&sysevent_data_lock);
429
430         // Since there was nothing to receive, set recv to block and
431         // try again
432         recv_flags = 0;
433         continue;
434       } else if (errno != EINTR) {
435         ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
436         return -1;
437       } else {
438         // Interrupt, so continue and try again
439         continue;
440       }
441     }
442
443     if (count >= sizeof(buffer)) {
444       WARNING("sysevent plugin: datagram too large for buffer: truncated");
445     }
446
447     // We successfully received a message, so don't block on the next
448     // read in case there are more (and if there aren't, it will be
449     // handled above in the EWOULDBLOCK error-checking)
450     recv_flags = MSG_DONTWAIT;
451
452     // 1. Acquire data lock
453     // 2. Push to buffer if there is room, otherwise raise warning
454     //    and allow dequeue thread to take over
455
456     pthread_mutex_lock(&sysevent_data_lock);
457
458     int next = ring.head + 1;
459     if (next >= ring.maxLen)
460       next = 0;
461
462     if (next == ring.tail) {
463       // Buffer is full, signal the dequeue thread to process the buffer
464       // and clean it out, and then sleep
465       WARNING("sysevent plugin: ring buffer full");
466
467       pthread_cond_signal(&sysevent_cond);
468       pthread_mutex_unlock(&sysevent_data_lock);
469
470       usleep(1000);
471       continue;
472     } else {
473       DEBUG("sysevent plugin: writing %s", buffer);
474
475       sstrncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
476       ring.timestamp[ring.head] = cdtime();
477       ring.head = next;
478     }
479
480     pthread_mutex_unlock(&sysevent_data_lock);
481   }
482 }
483
484 static void sysevent_dispatch_notification(const char *message,
485 #if HAVE_YAJL_V2
486                                            yajl_val *node,
487 #endif
488                                            cdtime_t timestamp) {
489   char *buf = NULL;
490
491   notification_t n = {
492       .severity = NOTIF_OKAY,
493       .time = cdtime(),
494       .plugin = "sysevent",
495       .type = "gauge",
496   };
497
498 #if HAVE_YAJL_V2
499   if (node != NULL) {
500     // If we have a parsed-JSON node to work with, use that
501     // msg
502     const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
503     yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
504
505     char msg[listen_buffer_size];
506
507     if (msg_v != NULL) {
508       memset(msg, '\0', listen_buffer_size);
509       snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
510     }
511
512     // severity
513     const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
514                                    (const char *)0};
515     yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
516
517     char severity[listen_buffer_size];
518
519     if (severity_v != NULL) {
520       memset(severity, '\0', listen_buffer_size);
521       snprintf(severity, listen_buffer_size, "%s%c",
522                YAJL_GET_STRING(severity_v), '\0');
523     }
524
525     // sev_num
526     const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
527                                       (const char *)0};
528     yajl_val sev_num_str_v =
529         yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
530
531     char sev_num_str[listen_buffer_size];
532     int sev_num = -1;
533
534     if (sev_num_str_v != NULL) {
535       memset(sev_num_str, '\0', listen_buffer_size);
536       snprintf(sev_num_str, listen_buffer_size, "%s%c",
537                YAJL_GET_STRING(sev_num_str_v), '\0');
538
539       sev_num = atoi(sev_num_str);
540
541       if (sev_num < 4)
542         n.severity = NOTIF_FAILURE;
543     }
544
545     // process
546     const char *process_path[] = {"@fields", rsyslog_field_keys[3],
547                                   (const char *)0};
548     yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
549
550     char process[listen_buffer_size];
551
552     if (process_v != NULL) {
553       memset(process, '\0', listen_buffer_size);
554       snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
555                '\0');
556     }
557
558     // hostname
559     const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
560     yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
561
562     char hostname_str[listen_buffer_size];
563
564     if (hostname_v != NULL) {
565       memset(hostname_str, '\0', listen_buffer_size);
566       snprintf(hostname_str, listen_buffer_size, "%s%c",
567                YAJL_GET_STRING(hostname_v), '\0');
568     }
569
570     gen_message_payload(
571         (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
572         (sev_num_str_v != NULL ? sev_num : -1),
573         (process_v != NULL ? process : NULL),
574         (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
575   } else {
576     // Data was not sent in JSON format, so just treat the whole log entry
577     // as the message (and we'll be unable to acquire certain data, so the
578     // payload
579     // generated below will be less informative)
580
581     gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
582   }
583 #else
584   gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
585 #endif
586
587   sstrncpy(n.host, hostname_g, sizeof(n.host));
588
589   int status = plugin_notification_meta_add_string(&n, "ves", buf);
590
591   if (status < 0) {
592     sfree(buf);
593     ERROR("sysevent plugin: unable to set notification VES metadata: %s",
594           STRERRNO);
595     return;
596   }
597
598   DEBUG("sysevent plugin: notification VES metadata: %s",
599         n.meta->nm_value.nm_string);
600
601   DEBUG("sysevent plugin: dispatching message");
602
603   plugin_dispatch_notification(&n);
604   plugin_notification_meta_free(n.meta);
605
606   // strdup'd in gen_message_payload
607   if (buf != NULL)
608     sfree(buf);
609 }
610
611 static void read_ring_buffer() {
612   pthread_mutex_lock(&sysevent_data_lock);
613
614   // If there's currently nothing to read from the buffer,
615   // then wait
616   if (ring.head == ring.tail)
617     pthread_cond_wait(&sysevent_cond, &sysevent_data_lock);
618
619   while (ring.head != ring.tail) {
620     int next = ring.tail + 1;
621
622     if (next >= ring.maxLen)
623       next = 0;
624
625     DEBUG("sysevent plugin: reading from ring buffer: %s",
626           ring.buffer[ring.tail]);
627
628     cdtime_t timestamp = ring.timestamp[ring.tail];
629     char *match_str = NULL;
630
631 #if HAVE_YAJL_V2
632     // Try to parse JSON, and if it fails, fall back to plain string
633     char errbuf[1024];
634     errbuf[0] = 0;
635     yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail],
636                                     errbuf, sizeof(errbuf));
637
638     if (node != NULL) {
639       // JSON rsyslog data
640
641       // If we have any regex filters, we need to see if the message portion of
642       // the data matches any of them (otherwise we're not interested)
643       if (monitor_all_messages == 0) {
644         const char *path[] = {"@message", (const char *)0};
645         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
646
647         char json_val[listen_buffer_size];
648         memset(json_val, '\0', listen_buffer_size);
649
650         snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
651                  '\0');
652
653         match_str = (char *)&json_val;
654       }
655     } else {
656       // non-JSON rsyslog data
657
658       // If we have any regex filters, we need to see if the message data
659       // matches any of them (otherwise we're not interested)
660       if (monitor_all_messages == 0)
661         match_str = ring.buffer[ring.tail];
662     }
663 #else
664     // If we have any regex filters, we need to see if the message data
665     // matches any of them (otherwise we're not interested)
666     if (monitor_all_messages == 0)
667       match_str = ring.buffer[ring.tail];
668 #endif
669
670     int is_match = 1;
671
672     // If we care about matching, do that comparison here
673     if (match_str != NULL) {
674       if (ignorelist_match(ignorelist, match_str) != 0)
675         is_match = 0;
676       else
677         DEBUG("sysevent plugin: regex filter match");
678     }
679
680 #if HAVE_YAJL_V2
681     if (is_match == 1 && node != NULL) {
682       sysevent_dispatch_notification(NULL, &node, timestamp);
683       yajl_tree_free(node);
684     } else if (is_match == 1)
685       sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
686 #else
687     if (is_match == 1)
688       sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
689 #endif
690
691     ring.tail = next;
692   }
693
694   pthread_mutex_unlock(&sysevent_data_lock);
695 }
696
697 static void *sysevent_socket_thread(void *arg) /* {{{ */
698 {
699   pthread_mutex_lock(&sysevent_thread_lock);
700
701   while (sysevent_socket_thread_loop > 0) {
702     pthread_mutex_unlock(&sysevent_thread_lock);
703
704     if (sock == -1)
705       return (void *)0;
706
707     int status = read_socket();
708
709     pthread_mutex_lock(&sysevent_thread_lock);
710
711     if (status < 0) {
712       WARNING("sysevent plugin: problem with socket thread (status: %d)",
713               status);
714       sysevent_socket_thread_error = 1;
715       break;
716     }
717   } /* while (sysevent_socket_thread_loop > 0) */
718
719   pthread_mutex_unlock(&sysevent_thread_lock);
720
721   return (void *)0;
722 } /* }}} void *sysevent_socket_thread */
723
724 // Entry point for thread responsible for reading from
725 // ring buffer and dispatching notifications
726 static void *sysevent_dequeue_thread(void *arg) /* {{{ */
727 {
728   pthread_mutex_lock(&sysevent_thread_lock);
729
730   while (sysevent_dequeue_thread_loop > 0) {
731     pthread_mutex_unlock(&sysevent_thread_lock);
732
733     read_ring_buffer();
734
735     pthread_mutex_lock(&sysevent_thread_lock);
736   } /* while (sysevent_dequeue_thread_loop > 0) */
737
738   pthread_mutex_unlock(&sysevent_thread_lock);
739
740   return (void *)0;
741 } /* }}} void *sysevent_dequeue_thread */
742
743 static int start_socket_thread(void) /* {{{ */
744 {
745   pthread_mutex_lock(&sysevent_thread_lock);
746
747   if (sysevent_socket_thread_loop != 0) {
748     pthread_mutex_unlock(&sysevent_thread_lock);
749     return 0;
750   }
751
752   sysevent_socket_thread_loop = 1;
753   sysevent_socket_thread_error = 0;
754
755   DEBUG("sysevent plugin: starting socket thread");
756
757   int status = plugin_thread_create(&sysevent_socket_thread_id,
758                                     /* attr = */ NULL, sysevent_socket_thread,
759                                     /* arg = */ (void *)0, "sysevent");
760   if (status != 0) {
761     sysevent_socket_thread_loop = 0;
762     ERROR("sysevent plugin: starting socket thread failed.");
763     pthread_mutex_unlock(&sysevent_thread_lock);
764     return -1;
765   }
766
767   pthread_mutex_unlock(&sysevent_thread_lock);
768
769   return 0;
770 } /* }}} int start_socket_thread */
771
772 static int start_dequeue_thread(void) /* {{{ */
773 {
774   pthread_mutex_lock(&sysevent_thread_lock);
775
776   if (sysevent_dequeue_thread_loop != 0) {
777     pthread_mutex_unlock(&sysevent_thread_lock);
778     return 0;
779   }
780
781   sysevent_dequeue_thread_loop = 1;
782
783   int status = plugin_thread_create(&sysevent_dequeue_thread_id,
784                                     /* attr = */ NULL, sysevent_dequeue_thread,
785                                     /* arg = */ (void *)0, "ssyevent");
786   if (status != 0) {
787     sysevent_dequeue_thread_loop = 0;
788     ERROR("sysevent plugin: Starting dequeue thread failed.");
789     pthread_mutex_unlock(&sysevent_thread_lock);
790     return -1;
791   }
792
793   pthread_mutex_unlock(&sysevent_thread_lock);
794
795   return status;
796 } /* }}} int start_dequeue_thread */
797
798 static int start_threads(void) /* {{{ */
799 {
800   int status = start_socket_thread();
801   int status2 = start_dequeue_thread();
802
803   if (status != 0)
804     return status;
805   else
806     return status2;
807 } /* }}} int start_threads */
808
809 static int stop_socket_thread(int shutdown) /* {{{ */
810 {
811   pthread_mutex_lock(&sysevent_thread_lock);
812
813   if (sysevent_socket_thread_loop == 0) {
814     pthread_mutex_unlock(&sysevent_thread_lock);
815     return -1;
816   }
817
818   sysevent_socket_thread_loop = 0;
819   pthread_cond_broadcast(&sysevent_cond);
820   pthread_mutex_unlock(&sysevent_thread_lock);
821
822   int status;
823
824   if (shutdown == 1) {
825     // Since the thread is blocking, calling pthread_join
826     // doesn't actually succeed in stopping it.  It will stick around
827     // until a message is received on the socket (at which
828     // it will realize that "sysevent_socket_thread_loop" is 0 and will
829     // break out of the read loop and be allowed to die).  This is
830     // fine when the process isn't supposed to be exiting, but in
831     // the case of a process shutdown, we don't want to have an
832     // idle thread hanging around.  Calling pthread_cancel here in
833     // the case of a shutdown is just assures that the thread is
834     // gone and that the process has been fully terminated.
835
836     DEBUG("sysevent plugin: Canceling socket thread for process shutdown");
837
838     status = pthread_cancel(sysevent_socket_thread_id);
839
840     if (status != 0 && status != ESRCH) {
841       ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status,
842             STRERRNO);
843       status = -1;
844     } else
845       status = 0;
846   } else {
847     status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL);
848     if (status != 0 && status != ESRCH) {
849       ERROR("sysevent plugin: Stopping socket thread failed.");
850       status = -1;
851     } else
852       status = 0;
853   }
854
855   pthread_mutex_lock(&sysevent_thread_lock);
856   memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id));
857   sysevent_socket_thread_error = 0;
858   pthread_mutex_unlock(&sysevent_thread_lock);
859
860   DEBUG("sysevent plugin: Finished requesting stop of socket thread");
861
862   return status;
863 } /* }}} int stop_socket_thread */
864
865 static int stop_dequeue_thread() /* {{{ */
866 {
867   pthread_mutex_lock(&sysevent_thread_lock);
868
869   if (sysevent_dequeue_thread_loop == 0) {
870     pthread_mutex_unlock(&sysevent_thread_lock);
871     return -1;
872   }
873
874   sysevent_dequeue_thread_loop = 0;
875   pthread_cond_broadcast(&sysevent_cond);
876   pthread_mutex_unlock(&sysevent_thread_lock);
877
878   // Since the thread is blocking, calling pthread_join
879   // doesn't actually succeed in stopping it.  It will stick around
880   // until a message is received on the socket (at which
881   // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
882   // break out of the read loop and be allowed to die).  Since this
883   // function is called when the processing is exiting, we don't want to
884   // have an idle thread hanging around.  Calling pthread_cancel here
885   // just assures that the thread is gone and that the process has been
886   // fully terminated.
887
888   DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
889
890   int status = pthread_cancel(sysevent_dequeue_thread_id);
891
892   if (status != 0 && status != ESRCH) {
893     ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
894           STRERRNO);
895     status = -1;
896   } else
897     status = 0;
898
899   pthread_mutex_lock(&sysevent_thread_lock);
900   memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
901   pthread_mutex_unlock(&sysevent_thread_lock);
902
903   DEBUG("sysevent plugin: Finished requesting stop of dequeue thread");
904
905   return status;
906 } /* }}} int stop_dequeue_thread */
907
908 static int stop_threads() /* {{{ */
909 {
910   int status = stop_socket_thread(1);
911   int status2 = stop_dequeue_thread();
912
913   if (status != 0)
914     return status;
915   else
916     return status2;
917 } /* }}} int stop_threads */
918
919 static int sysevent_init(void) /* {{{ */
920 {
921   ring.head = 0;
922   ring.tail = 0;
923   ring.maxLen = buffer_length;
924   ring.buffer = (char **)calloc(buffer_length, sizeof(char *));
925
926   if (ring.buffer == NULL) {
927     ERROR("sysevent plugin: sysevent_init ring buffer calloc failed");
928     return -1;
929   }
930
931   for (int i = 0; i < buffer_length; i++) {
932     ring.buffer[i] = calloc(1, listen_buffer_size);
933
934     if (ring.buffer[i] == NULL) {
935       ERROR("sysevent plugin: sysevent_init ring buffer entry calloc failed");
936       return -1;
937     }
938   }
939
940   ring.timestamp = (cdtime_t *)calloc(buffer_length, sizeof(cdtime_t));
941
942   if (ring.timestamp == NULL) {
943     ERROR("sysevent plugin: sysevent_init ring buffer timestamp calloc failed");
944     return -1;
945   }
946
947   if (sock == -1) {
948     struct addrinfo hints = {
949         .ai_family = AF_UNSPEC,
950         .ai_socktype = SOCK_DGRAM,
951         .ai_protocol = 0,
952         .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
953     };
954     struct addrinfo *res = 0;
955
956     int err = getaddrinfo(listen_ip, listen_port, &hints, &res);
957
958     if (err != 0) {
959       ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
960             err);
961       freeaddrinfo(res);
962       return -1;
963     }
964
965     sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
966     if (sock == -1) {
967       ERROR("sysevent plugin: failed to open socket: %s", STRERRNO);
968       freeaddrinfo(res);
969       return -1;
970     }
971
972     if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
973       ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
974       freeaddrinfo(res);
975       sock = -1;
976       return -1;
977     }
978
979     freeaddrinfo(res);
980   }
981
982   DEBUG("sysevent plugin: socket created and bound");
983
984   return start_threads();
985 } /* }}} int sysevent_init */
986
987 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
988 {
989   if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
990       ci->values[1].type != OCONFIG_TYPE_STRING) {
991     ERROR("sysevent plugin: The `%s' config option needs "
992           "two string arguments (ip and port).",
993           ci->key);
994     return -1;
995   }
996
997   listen_ip = strdup(ci->values[0].value.string);
998   listen_port = strdup(ci->values[1].value.string);
999
1000   return 0;
1001 }
1002
1003 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
1004 {
1005   int tmp = 0;
1006
1007   if (cf_util_get_int(ci, &tmp) != 0)
1008     return -1;
1009   else if ((tmp >= 1024) && (tmp <= 65535))
1010     listen_buffer_size = tmp;
1011   else {
1012     WARNING(
1013         "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
1014     return -1;
1015   }
1016
1017   return 0;
1018 }
1019
1020 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
1021 {
1022   int tmp = 0;
1023
1024   if (cf_util_get_int(ci, &tmp) != 0)
1025     return -1;
1026   else if ((tmp >= 3) && (tmp <= 4096))
1027     buffer_length = tmp;
1028   else {
1029     WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
1030     return -1;
1031   }
1032
1033   return 0;
1034 }
1035
1036 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
1037 {
1038   if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
1039     ERROR("sysevent plugin: The `%s' config option needs "
1040           "one string argument, a regular expression.",
1041           ci->key);
1042     return -1;
1043   }
1044
1045 #if HAVE_REGEX_H
1046   if (ignorelist == NULL)
1047     ignorelist = ignorelist_create(/* invert = */ 1);
1048
1049   int status = ignorelist_add(ignorelist, ci->values[0].value.string);
1050
1051   if (status != 0) {
1052     ERROR("sysevent plugin: invalid regular expression: %s",
1053           ci->values[0].value.string);
1054     return 1;
1055   }
1056
1057   monitor_all_messages = 0;
1058 #else
1059   WARNING("sysevent plugin: The plugin has been compiled without support "
1060           "for the \"RegexFilter\" option.");
1061 #endif
1062
1063   return 0;
1064 }
1065
1066 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
1067 {
1068   for (int i = 0; i < ci->children_num; i++) {
1069     oconfig_item_t *child = ci->children + i;
1070
1071     if (strcasecmp("Listen", child->key) == 0)
1072       sysevent_config_add_listen(child);
1073     else if (strcasecmp("BufferSize", child->key) == 0)
1074       sysevent_config_add_buffer_size(child);
1075     else if (strcasecmp("BufferLength", child->key) == 0)
1076       sysevent_config_add_buffer_length(child);
1077     else if (strcasecmp("RegexFilter", child->key) == 0)
1078       sysevent_config_add_regex_filter(child);
1079     else {
1080       WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
1081     }
1082   }
1083
1084   return 0;
1085 } /* }}} int sysevent_config */
1086
1087 static int sysevent_read(void) /* {{{ */
1088 {
1089   pthread_mutex_lock(&sysevent_thread_lock);
1090
1091   if (sysevent_socket_thread_error != 0) {
1092     pthread_mutex_unlock(&sysevent_thread_lock);
1093
1094     ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
1095           "Restarting it.",
1096           sysevent_socket_thread_error);
1097
1098     stop_threads();
1099
1100     start_threads();
1101
1102     return -1;
1103   } /* if (sysevent_socket_thread_error != 0) */
1104
1105   pthread_mutex_unlock(&sysevent_thread_lock);
1106
1107   return 0;
1108 } /* }}} int sysevent_read */
1109
1110 static int sysevent_shutdown(void) /* {{{ */
1111 {
1112   DEBUG("sysevent plugin: Shutting down thread.");
1113
1114   int status = stop_threads();
1115   int status2 = 0;
1116
1117   if (sock != -1) {
1118     status2 = close(sock);
1119     if (status2 != 0) {
1120       ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
1121             STRERRNO);
1122     }
1123
1124     sock = -1;
1125   }
1126
1127   free(listen_ip);
1128   free(listen_port);
1129
1130   for (int i = 0; i < buffer_length; i++) {
1131     free(ring.buffer[i]);
1132   }
1133
1134   free(ring.buffer);
1135   free(ring.timestamp);
1136
1137   if (status != 0)
1138     return status;
1139   else
1140     return status2;
1141 } /* }}} int sysevent_shutdown */
1142
1143 void module_register(void) {
1144   plugin_register_complex_config("sysevent", sysevent_config);
1145   plugin_register_init("sysevent", sysevent_init);
1146   plugin_register_read("sysevent", sysevent_read);
1147   plugin_register_shutdown("sysevent", sysevent_shutdown);
1148 } /* void module_register */