Add dequeue thread + styling changes
[collectd.git] / src / sysevent.c
1 /**
2  * collectd - src/sysevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
33
34 #include <errno.h>
35 #include <netdb.h>
36 #include <netinet/in.h>
37 #include <pthread.h>
38 #include <regex.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <yajl/yajl_common.h>
45 #include <yajl/yajl_gen.h>
46
47 #if HAVE_YAJL_YAJL_VERSION_H
48 #include <yajl/yajl_version.h>
49 #endif
50 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
51 #include <yajl/yajl_tree.h>
52 #define HAVE_YAJL_V2 1
53 #endif
54
55 #define SYSEVENT_DOMAIN_FIELD "domain"
56 #define SYSEVENT_DOMAIN_VALUE "syslog"
57 #define SYSEVENT_EVENT_ID_FIELD "eventId"
58 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
59 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
60 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
61 #define SYSEVENT_PRIORITY_FIELD "priority"
62 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
63 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
64 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
65 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
66 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
67 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
69 #define SYSEVENT_SEQUENCE_FIELD "sequence"
70 #define SYSEVENT_SEQUENCE_VALUE "0"
71 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
72 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
73 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
74 #define SYSEVENT_VERSION_FIELD "version"
75 #define SYSEVENT_VERSION_VALUE "1.0"
76
77 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
78 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
80 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
81 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
83 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
84 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
85 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
86 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
87 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
88
89 /*
90  * Private data types
91  */
92
93 typedef struct {
94   int head;
95   int tail;
96   int maxLen;
97   char **buffer;
98   long long unsigned int *timestamp;
99 } circbuf_t;
100
101 /*
102  * Private variables
103  */
104 static ignorelist_t *ignorelist = NULL;
105
106 static int sysevent_socket_thread_loop = 0;
107 static int sysevent_socket_thread_error = 0;
108 static pthread_t sysevent_socket_thread_id;
109 static int sysevent_dequeue_thread_loop = 0;
110 static pthread_t sysevent_dequeue_thread_id;
111 static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
112 static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER;
114 static int sock = -1;
115 static int event_id = 0;
116 static circbuf_t ring;
117
118 static char *listen_ip;
119 static char *listen_port;
120 static int listen_buffer_size = 4096;
121 static int buffer_length = 10;
122
123 static int monitor_all_messages = 1;
124
125 #if HAVE_YAJL_V2
126 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
127 static const char *rsyslog_field_keys[5] = {
128     "facility", "severity", "severity-num", "program", "processid"};
129 #endif
130
131 /*
132  * Prototypes
133  */
134
135 static void sysevent_dispatch_notification(const char *message,
136 #if HAVE_YAJL_V2
137                                            yajl_val *node,
138 #endif
139                                            long long unsigned int timestamp);
140
141 /*
142  * Private functions
143  */
144
145 static int gen_message_payload(const char *msg, char *sev, int sev_num,
146                                char *process, char *host,
147                                long long unsigned int timestamp, char **buf) {
148   const unsigned char *buf2;
149   yajl_gen g;
150   char json_str[DATA_MAX_NAME_LEN];
151
152 #if !defined(HAVE_YAJL_V2)
153   yajl_gen_config conf = {};
154
155   conf.beautify = 0;
156 #endif
157
158 #if HAVE_YAJL_V2
159   size_t len;
160   g = yajl_gen_alloc(NULL);
161   yajl_gen_config(g, yajl_gen_beautify, 0);
162 #else
163   unsigned int len;
164   g = yajl_gen_alloc(&conf, NULL);
165 #endif
166
167   yajl_gen_clear(g);
168
169   // *** BEGIN common event header ***
170
171   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
172     goto err;
173
174   // domain
175   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
176                       strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
177     goto err;
178
179   if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
180                       strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
181     goto err;
182
183   // eventId
184   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
185                       strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
186     goto err;
187
188   event_id = event_id + 1;
189   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
190   memset(json_str, '\0', DATA_MAX_NAME_LEN);
191   snprintf(json_str, event_id_len, "%d", event_id);
192
193   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
194     goto err;
195   }
196
197   // eventName
198   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
199                       strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
200     goto err;
201
202   int event_name_len = 0;
203   event_name_len = event_name_len + strlen(host); // host name
204   event_name_len =
205       event_name_len +
206       22; // "host", "rsyslog", "message", 3 spaces and null-terminator
207   memset(json_str, '\0', DATA_MAX_NAME_LEN);
208   snprintf(json_str, event_name_len, "host %s rsyslog message", host);
209
210   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
211       yajl_gen_status_ok) {
212     goto err;
213   }
214
215   // lastEpochMicrosec
216   if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
217                       strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
218       yajl_gen_status_ok)
219     goto err;
220
221   int last_epoch_microsec_len =
222       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
223   memset(json_str, '\0', DATA_MAX_NAME_LEN);
224   snprintf(json_str, last_epoch_microsec_len, "%llu",
225            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
226
227   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
228     goto err;
229   }
230
231   // priority
232   if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
233                       strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
234     goto err;
235
236   switch (sev_num) {
237   case 4:
238     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
239                         strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
240         yajl_gen_status_ok)
241       goto err;
242     break;
243   case 5:
244     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
245                         strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
246         yajl_gen_status_ok)
247       goto err;
248     break;
249   case 6:
250   case 7:
251     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
252                         strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
253         yajl_gen_status_ok)
254       goto err;
255     break;
256   default:
257     if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
258                         strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
259         yajl_gen_status_ok)
260       goto err;
261     break;
262   }
263
264   // reportingEntityName
265   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
266                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
267       yajl_gen_status_ok)
268     goto err;
269
270   if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
271                       strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
272       yajl_gen_status_ok)
273     goto err;
274
275   // sequence
276   if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
277                       strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
278     goto err;
279
280   if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
281                       strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
282     goto err;
283
284   // sourceName
285   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
286                       strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
287     goto err;
288
289   if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
290                       strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
291     goto err;
292
293   // startEpochMicrosec
294   if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
295                       strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
296       yajl_gen_status_ok)
297     goto err;
298
299   int start_epoch_microsec_len =
300       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
301   memset(json_str, '\0', DATA_MAX_NAME_LEN);
302   snprintf(json_str, start_epoch_microsec_len, "%llu",
303            (long long unsigned int)timestamp);
304
305   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
306     goto err;
307   }
308
309   // version
310   if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
311                       strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
312     goto err;
313
314   if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
315                       strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
316     goto err;
317
318   // *** END common event header ***
319
320   // *** BEGIN syslog fields ***
321
322   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
323                       strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
324       yajl_gen_status_ok)
325     goto err;
326
327   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
328     goto err;
329
330   // eventSourceHost
331   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
332                       strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
333       yajl_gen_status_ok)
334     goto err;
335
336   if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
337     goto err;
338
339   // eventSourceType
340   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
341                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
342       yajl_gen_status_ok)
343     goto err;
344
345   if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
346                       strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
347       yajl_gen_status_ok)
348     goto err;
349
350   // syslogFieldsVersion
351   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
352                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
353       yajl_gen_status_ok)
354     goto err;
355
356   if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
357                       strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
358       yajl_gen_status_ok)
359     goto err;
360
361   // syslogMsg
362   if (msg != NULL) {
363     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
364                         strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
365         yajl_gen_status_ok)
366       goto err;
367
368     if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
369       goto err;
370   }
371
372   // syslogProc
373   if (process != NULL) {
374     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
375                         strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
376         yajl_gen_status_ok)
377       goto err;
378
379     if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
380         yajl_gen_status_ok)
381       goto err;
382   }
383
384   // syslogSev
385   if (sev != NULL) {
386     if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
387                         strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
388         yajl_gen_status_ok)
389       goto err;
390
391     if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
392       goto err;
393   }
394
395   // syslogTag
396   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
397                       strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
398     goto err;
399
400   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
401                       strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
402     goto err;
403
404   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
405     goto err;
406
407   // *** END syslog fields ***
408
409   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
410     goto err;
411
412   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
413     goto err;
414
415   *buf = strdup((char *)buf2);
416
417   if (*buf == NULL) {
418     ERROR("sysevent plugin: gen_message_payload strdup failed");
419     goto err;
420   }
421
422   yajl_gen_free(g);
423
424   return 0;
425
426 err:
427   yajl_gen_free(g);
428   ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
429   return -1;
430 }
431
432 static int read_socket() {
433   int recv_flags = MSG_DONTWAIT;
434
435   while (42) {
436     struct sockaddr_storage src_addr;
437     socklen_t src_addr_len = sizeof(src_addr);
438
439     char buffer[listen_buffer_size];
440     memset(buffer, '\0', listen_buffer_size);
441
442     ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags,
443                              (struct sockaddr *)&src_addr, &src_addr_len);
444
445     if (count < 0) {
446       if (errno == EAGAIN || errno == EWOULDBLOCK) {
447         pthread_mutex_lock(&sysevent_data_lock);
448
449         // There was nothing more to receive for now, so...
450         // If ring head does not equal ring tail, there is data
451         // in the ring buffer for the dequeue thread to read, so
452         // signal it
453         if (ring.head != ring.tail)
454           pthread_cond_signal(&sysevent_cond);
455
456         pthread_mutex_unlock(&sysevent_data_lock);
457
458         // Since there was nothing to receive, set recv to block and
459         // try again
460         recv_flags = 0;
461         continue;
462       } else if (errno != EINTR) {
463         ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
464         return -1;
465       } else {
466         // Interrupt, so just return
467         return 0;
468       }
469     }
470
471     if (count >= sizeof(buffer)) {
472       WARNING("sysevent plugin: datagram too large for buffer: truncated");
473     }
474
475     // We successfully received a message, so don't block on the next
476     // read in case there are more (and if there aren't, it will be
477     // handled above in the error-checking)
478     recv_flags = MSG_DONTWAIT;
479
480     // 1. Acquire data lock
481     // 2. Push to buffer if there is room, otherwise raise warning
482     //    and allow dequeue thread to take over
483
484     pthread_mutex_lock(&sysevent_data_lock);
485
486     int next = ring.head + 1;
487     if (next >= ring.maxLen)
488       next = 0;
489
490     if (next == ring.tail) {
491       // Buffer is full, signal the dequeue thread to process the buffer
492       // and clean it out, and then sleep
493       WARNING("sysevent plugin: ring buffer full");
494
495       pthread_cond_signal(&sysevent_cond);
496       pthread_mutex_unlock(&sysevent_data_lock);
497
498       usleep(1000);
499       continue;
500     } else {
501       DEBUG("sysevent plugin: writing %s", buffer);
502
503       strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
504       ring.timestamp[ring.head] =
505           (long long unsigned int)CDTIME_T_TO_US(cdtime());
506       ring.head = next;
507     }
508
509     pthread_mutex_unlock(&sysevent_data_lock);
510   }
511 }
512
513 static void read_ring_buffer() {
514   pthread_mutex_lock(&sysevent_data_lock);
515
516   // If there's currently nothing to read from the buffer,
517   // then wait
518   if (ring.head == ring.tail)
519     pthread_cond_wait(&sysevent_cond, &sysevent_data_lock);
520
521   while (ring.head != ring.tail) {
522     int next = ring.tail + 1;
523
524     if (next >= ring.maxLen)
525       next = 0;
526
527     DEBUG("sysevent plugin: reading from ring buffer: %s",
528           ring.buffer[ring.tail]);
529
530     long long unsigned int timestamp = ring.timestamp[ring.tail];
531     char *match_str = NULL;
532
533 #if HAVE_YAJL_V2
534     // Try to parse JSON, and if it fails, fall back to plain string
535     char errbuf[1024];
536     errbuf[0] = 0;
537     yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail],
538                                     errbuf, sizeof(errbuf));
539
540     if (node != NULL) {
541       // JSON rsyslog data
542
543       // If we have any regex filters, we need to see if the message portion of
544       // the data matches any of them (otherwise we're not interested)
545       if (monitor_all_messages == 0) {
546         const char *path[] = {"@message", (const char *)0};
547         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
548
549         char json_val[listen_buffer_size];
550         memset(json_val, '\0', listen_buffer_size);
551
552         snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
553                  '\0');
554
555         match_str = (char *)&json_val;
556       }
557     } else {
558       // non-JSON rsyslog data
559
560       // If we have any regex filters, we need to see if the message data
561       // matches any of them (otherwise we're not interested)
562       if (monitor_all_messages == 0)
563         match_str = ring.buffer[ring.tail];
564     }
565 #else
566     // If we have any regex filters, we need to see if the message data
567     // matches any of them (otherwise we're not interested)
568     if (monitor_all_messages == 0)
569       match_str = ring.buffer[ring.tail];
570 #endif
571
572     int is_match = 1;
573
574     // If we care about matching, do that comparison here
575     if (match_str != NULL) {
576       is_match = 1;
577
578       if (ignorelist_match(ignorelist, match_str) != 0)
579         is_match = 0;
580       else
581         DEBUG("sysevent plugin: regex filter match");
582     }
583
584 #if HAVE_YAJL_V2
585     if (is_match == 1 && node != NULL) {
586       sysevent_dispatch_notification(NULL, &node, timestamp);
587       yajl_tree_free(node);
588     } else if (is_match == 1)
589       sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
590 #else
591     if (is_match == 1)
592       sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
593 #endif
594
595     ring.tail = next;
596   }
597
598   pthread_mutex_unlock(&sysevent_data_lock);
599 }
600
601 static void *sysevent_socket_thread(void *arg) /* {{{ */
602 {
603   pthread_mutex_lock(&sysevent_thread_lock);
604
605   while (sysevent_socket_thread_loop > 0) {
606     pthread_mutex_unlock(&sysevent_thread_lock);
607
608     if (sock == -1)
609       return (void *)0;
610
611     int status = read_socket();
612
613     pthread_mutex_lock(&sysevent_thread_lock);
614
615     if (status < 0) {
616       WARNING("sysevent plugin: problem with thread status: %d", status);
617       sysevent_socket_thread_error = 1;
618       break;
619     }
620   } /* while (sysevent_socket_thread_loop > 0) */
621
622   pthread_mutex_unlock(&sysevent_thread_lock);
623
624   return (void *)0;
625 } /* }}} void *sysevent_socket_thread */
626
627 // Entry point for thread responsible for reading from
628 // ring buffer and dispatching notifications
629 static void *sysevent_dequeue_thread(void *arg) /* {{{ */
630 {
631   pthread_mutex_lock(&sysevent_thread_lock);
632
633   while (sysevent_dequeue_thread_loop > 0) {
634     pthread_mutex_unlock(&sysevent_thread_lock);
635
636     read_ring_buffer();
637
638     pthread_mutex_lock(&sysevent_thread_lock);
639   } /* while (sysevent_dequeue_thread_loop > 0) */
640
641   pthread_mutex_unlock(&sysevent_thread_lock);
642
643   return (void *)0;
644 } /* }}} void *sysevent_dequeue_thread */
645
646 static int start_socket_thread(void) /* {{{ */
647 {
648   pthread_mutex_lock(&sysevent_thread_lock);
649
650   if (sysevent_socket_thread_loop != 0) {
651     pthread_mutex_unlock(&sysevent_thread_lock);
652     return 0;
653   }
654
655   sysevent_socket_thread_loop = 1;
656   sysevent_socket_thread_error = 0;
657
658   DEBUG("sysevent plugin: starting socket thread");
659
660   int status = plugin_thread_create(&sysevent_socket_thread_id,
661                                     /* attr = */ NULL, sysevent_socket_thread,
662                                     /* arg = */ (void *)0, "sysevent");
663   if (status != 0) {
664     sysevent_socket_thread_loop = 0;
665     ERROR("sysevent plugin: starting socket thread failed.");
666     pthread_mutex_unlock(&sysevent_thread_lock);
667     return -1;
668   }
669
670   pthread_mutex_unlock(&sysevent_thread_lock);
671
672   return 0;
673 } /* }}} int start_socket_thread */
674
675 static int start_dequeue_thread(void) /* {{{ */
676 {
677   pthread_mutex_lock(&sysevent_thread_lock);
678
679   if (sysevent_dequeue_thread_loop != 0) {
680     pthread_mutex_unlock(&sysevent_thread_lock);
681     return 0;
682   }
683
684   sysevent_dequeue_thread_loop = 1;
685
686   int status = plugin_thread_create(&sysevent_dequeue_thread_id,
687                                     /* attr = */ NULL, sysevent_dequeue_thread,
688                                     /* arg = */ (void *)0, "ssyevent");
689   if (status != 0) {
690     sysevent_dequeue_thread_loop = 0;
691     ERROR("sysevent plugin: Starting dequeue thread failed.");
692     pthread_mutex_unlock(&sysevent_thread_lock);
693     return -1;
694   }
695
696   pthread_mutex_unlock(&sysevent_thread_lock);
697
698   return status;
699 } /* }}} int start_dequeue_thread */
700
701 static int start_threads(void) /* {{{ */
702 {
703   int status = start_socket_thread();
704   int status2 = start_dequeue_thread();
705
706   if (status != 0)
707     return status;
708   else
709     return status2;
710 } /* }}} int start_threads */
711
712 static int stop_socket_thread(int shutdown) /* {{{ */
713 {
714   pthread_mutex_lock(&sysevent_thread_lock);
715
716   if (sysevent_socket_thread_loop == 0) {
717     pthread_mutex_unlock(&sysevent_thread_lock);
718     return -1;
719   }
720
721   sysevent_socket_thread_loop = 0;
722   pthread_cond_broadcast(&sysevent_cond);
723   pthread_mutex_unlock(&sysevent_thread_lock);
724
725   int status;
726
727   if (shutdown == 1) {
728     // Since the thread is blocking, calling pthread_join
729     // doesn't actually succeed in stopping it.  It will stick around
730     // until a message is received on the socket (at which
731     // it will realize that "sysevent_socket_thread_loop" is 0 and will
732     // break out of the read loop and be allowed to die).  This is
733     // fine when the process isn't supposed to be exiting, but in
734     // the case of a process shutdown, we don't want to have an
735     // idle thread hanging around.  Calling pthread_cancel here in
736     // the case of a shutdown is just assures that the thread is
737     // gone and that the process has been fully terminated.
738
739     DEBUG("sysevent plugin: Canceling socket thread for process shutdown");
740
741     status = pthread_cancel(sysevent_socket_thread_id);
742
743     if (status != 0 && status != ESRCH) {
744       ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status,
745             STRERRNO);
746       status = -1;
747     } else
748       status = 0;
749   } else {
750     status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL);
751     if (status != 0 && status != ESRCH) {
752       ERROR("sysevent plugin: Stopping socket thread failed.");
753       status = -1;
754     } else
755       status = 0;
756   }
757
758   pthread_mutex_lock(&sysevent_thread_lock);
759   memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id));
760   sysevent_socket_thread_error = 0;
761   pthread_mutex_unlock(&sysevent_thread_lock);
762
763   DEBUG("sysevent plugin: Finished requesting stop of socket thread");
764
765   return status;
766 } /* }}} int stop_socket_thread */
767
768 static int stop_dequeue_thread(int shutdown) /* {{{ */
769 {
770   pthread_mutex_lock(&sysevent_thread_lock);
771
772   if (sysevent_dequeue_thread_loop == 0) {
773     pthread_mutex_unlock(&sysevent_thread_lock);
774     return -1;
775   }
776
777   sysevent_dequeue_thread_loop = 0;
778   pthread_cond_broadcast(&sysevent_cond);
779   pthread_mutex_unlock(&sysevent_thread_lock);
780
781   int status;
782
783   if (shutdown == 1) {
784     // Since the thread is blocking, calling pthread_join
785     // doesn't actually succeed in stopping it.  It will stick around
786     // until a message is received on the socket (at which
787     // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
788     // break out of the read loop and be allowed to die).  This is
789     // fine when the process isn't supposed to be exiting, but in
790     // the case of a process shutdown, we don't want to have an
791     // idle thread hanging around.  Calling pthread_cancel here in
792     // the case of a shutdown is just assures that the thread is
793     // gone and that the process has been fully terminated.
794
795     DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
796
797     status = pthread_cancel(sysevent_dequeue_thread_id);
798
799     if (status != 0 && status != ESRCH) {
800       ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
801             STRERRNO);
802       status = -1;
803     } else
804       status = 0;
805   } else {
806     status = pthread_join(sysevent_dequeue_thread_id, /* return = */ NULL);
807     if (status != 0 && status != ESRCH) {
808       ERROR("sysevent plugin: Stopping dequeue thread failed.");
809       status = -1;
810     } else
811       status = 0;
812   }
813
814   pthread_mutex_lock(&sysevent_thread_lock);
815   memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
816   pthread_mutex_unlock(&sysevent_thread_lock);
817
818   DEBUG("sysevent plugin: Finished requesting stop of dequeue thread");
819
820   return status;
821 } /* }}} int stop_dequeue_thread */
822
823 static int stop_threads(int shutdown) /* {{{ */
824 {
825   int status = stop_socket_thread(shutdown);
826   int status2 = stop_dequeue_thread(shutdown);
827
828   if (status != 0)
829     return status;
830   else
831     return status2;
832 } /* }}} int stop_threads */
833
834 static int sysevent_init(void) /* {{{ */
835 {
836   ring.head = 0;
837   ring.tail = 0;
838   ring.maxLen = buffer_length;
839   ring.buffer = (char **)calloc(buffer_length, sizeof(char *));
840
841   if (ring.buffer == NULL) {
842     ERROR("sysevent plugin: sysevent_init calloc failed");
843     return -1;
844   }
845
846   for (int i = 0; i < buffer_length; i++) {
847     ring.buffer[i] = calloc(1, listen_buffer_size);
848   }
849
850   ring.timestamp = (long long unsigned int *)calloc(
851       buffer_length, sizeof(long long unsigned int));
852
853   if (sock == -1) {
854     const char *hostname = listen_ip;
855     const char *portname = listen_port;
856     struct addrinfo hints;
857     memset(&hints, 0, sizeof(hints));
858     hints.ai_family = AF_UNSPEC;
859     hints.ai_socktype = SOCK_DGRAM;
860     hints.ai_protocol = 0;
861     hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
862     struct addrinfo *res = 0;
863
864     int err = getaddrinfo(hostname, portname, &hints, &res);
865
866     if (err != 0) {
867       ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
868             err);
869       freeaddrinfo(res);
870       return -1;
871     }
872
873     sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
874     if (sock == -1) {
875       ERROR("sysevent plugin: failed to open socket: %s", STRERRNO);
876       freeaddrinfo(res);
877       return -1;
878     }
879
880     if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
881       ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
882       freeaddrinfo(res);
883       return -1;
884     }
885
886     freeaddrinfo(res);
887   }
888
889   DEBUG("sysevent plugin: socket created and bound");
890
891   return start_threads();
892 } /* }}} int sysevent_init */
893
894 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
895 {
896   if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
897       ci->values[1].type != OCONFIG_TYPE_STRING) {
898     ERROR("sysevent plugin: The `%s' config option needs "
899           "two string arguments (ip and port).",
900           ci->key);
901     return -1;
902   }
903
904   listen_ip = strdup(ci->values[0].value.string);
905   listen_port = strdup(ci->values[1].value.string);
906
907   return 0;
908 }
909
910 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
911 {
912   int tmp = 0;
913
914   if (cf_util_get_int(ci, &tmp) != 0)
915     return -1;
916   else if ((tmp >= 1024) && (tmp <= 65535))
917     listen_buffer_size = tmp;
918   else {
919     WARNING(
920         "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
921     return -1;
922   }
923
924   return 0;
925 }
926
927 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
928 {
929   int tmp = 0;
930
931   if (cf_util_get_int(ci, &tmp) != 0)
932     return -1;
933   else if ((tmp >= 3) && (tmp <= 4096))
934     buffer_length = tmp;
935   else {
936     WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
937     return -1;
938   }
939
940   return 0;
941 }
942
943 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
944 {
945   if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
946     ERROR("sysevent plugin: The `%s' config option needs "
947           "one string argument, a regular expression.",
948           ci->key);
949     return -1;
950   }
951
952 #if HAVE_REGEX_H
953   if (ignorelist == NULL)
954     ignorelist = ignorelist_create(/* invert = */ 1);
955
956   int status = ignorelist_add(ignorelist, ci->values[0].value.string);
957
958   if (status != 0) {
959     ERROR("sysevent plugin: invalid regular expression: %s",
960           ci->values[0].value.string);
961     return 1;
962   }
963
964   monitor_all_messages = 0;
965 #else
966   WARNING("sysevent plugin: The plugin has been compiled without support "
967           "for the \"RegexFilter\" option.");
968 #endif
969
970   return 0;
971 }
972
973 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
974 {
975   for (int i = 0; i < ci->children_num; i++) {
976     oconfig_item_t *child = ci->children + i;
977
978     if (strcasecmp("Listen", child->key) == 0)
979       sysevent_config_add_listen(child);
980     else if (strcasecmp("BufferSize", child->key) == 0)
981       sysevent_config_add_buffer_size(child);
982     else if (strcasecmp("BufferLength", child->key) == 0)
983       sysevent_config_add_buffer_length(child);
984     else if (strcasecmp("RegexFilter", child->key) == 0)
985       sysevent_config_add_regex_filter(child);
986     else {
987       WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
988     }
989   }
990
991   return 0;
992 } /* }}} int sysevent_config */
993
994 static void sysevent_dispatch_notification(const char *message,
995 #if HAVE_YAJL_V2
996                                            yajl_val *node,
997 #endif
998                                            long long unsigned int timestamp) {
999   char *buf = NULL;
1000   notification_t n = {NOTIF_OKAY, cdtime(), "", "",  "sysevent",
1001                       "",         "",       "", NULL};
1002
1003 #if HAVE_YAJL_V2
1004   if (node != NULL) {
1005     // If we have a parsed-JSON node to work with, use that
1006
1007     // msg
1008     const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
1009     yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
1010
1011     char msg[listen_buffer_size];
1012
1013     if (msg_v != NULL) {
1014       memset(msg, '\0', listen_buffer_size);
1015       snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
1016     }
1017
1018     // severity
1019     const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
1020                                    (const char *)0};
1021     yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
1022
1023     char severity[listen_buffer_size];
1024
1025     if (severity_v != NULL) {
1026       memset(severity, '\0', listen_buffer_size);
1027       snprintf(severity, listen_buffer_size, "%s%c",
1028                YAJL_GET_STRING(severity_v), '\0');
1029     }
1030
1031     // sev_num
1032     const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
1033                                       (const char *)0};
1034     yajl_val sev_num_str_v =
1035         yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
1036
1037     char sev_num_str[listen_buffer_size];
1038     int sev_num = -1;
1039
1040     if (sev_num_str_v != NULL) {
1041       memset(sev_num_str, '\0', listen_buffer_size);
1042       snprintf(sev_num_str, listen_buffer_size, "%s%c",
1043                YAJL_GET_STRING(sev_num_str_v), '\0');
1044
1045       sev_num = atoi(sev_num_str);
1046
1047       if (sev_num < 4)
1048         n.severity = NOTIF_FAILURE;
1049     }
1050
1051     // process
1052     const char *process_path[] = {"@fields", rsyslog_field_keys[3],
1053                                   (const char *)0};
1054     yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
1055
1056     char process[listen_buffer_size];
1057
1058     if (process_v != NULL) {
1059       memset(process, '\0', listen_buffer_size);
1060       snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
1061                '\0');
1062     }
1063
1064     // hostname
1065     const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
1066     yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
1067
1068     char hostname_str[listen_buffer_size];
1069
1070     if (hostname_v != NULL) {
1071       memset(hostname_str, '\0', listen_buffer_size);
1072       snprintf(hostname_str, listen_buffer_size, "%s%c",
1073                YAJL_GET_STRING(hostname_v), '\0');
1074     }
1075
1076     gen_message_payload(
1077         (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
1078         (sev_num_str_v != NULL ? sev_num : -1),
1079         (process_v != NULL ? process : NULL),
1080         (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
1081   } else {
1082     // Data was not sent in JSON format, so just treat the whole log entry
1083     // as the message (and we'll be unable to acquire certain data, so the
1084     // payload
1085     // generated below will be less informative)
1086
1087     gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
1088   }
1089 #else
1090   gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
1091 #endif
1092
1093   sstrncpy(n.host, hostname_g, sizeof(n.host));
1094   sstrncpy(n.type, "gauge", sizeof(n.type));
1095
1096   notification_meta_t *m = calloc(1, sizeof(*m));
1097
1098   if (m == NULL) {
1099     sfree(buf);
1100     ERROR("sysevent plugin: unable to allocate metadata: %s", STRERRNO);
1101     return;
1102   }
1103
1104   sstrncpy(m->name, "ves", sizeof(m->name));
1105   m->nm_value.nm_string = sstrdup(buf);
1106   m->type = NM_TYPE_STRING;
1107   n.meta = m;
1108
1109   DEBUG("sysevent plugin: notification message: %s",
1110         n.meta->nm_value.nm_string);
1111
1112   DEBUG("sysevent plugin: dispatching message");
1113
1114   plugin_dispatch_notification(&n);
1115   plugin_notification_meta_free(n.meta);
1116
1117   // strdup'd in gen_message_payload
1118   if (buf != NULL)
1119     sfree(buf);
1120 }
1121
1122 static int sysevent_read(void) /* {{{ */
1123 {
1124   pthread_mutex_lock(&sysevent_thread_lock);
1125
1126   if (sysevent_socket_thread_error != 0) {
1127     pthread_mutex_unlock(&sysevent_thread_lock);
1128
1129     ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
1130           "Restarting "
1131           "it.",
1132           sysevent_socket_thread_error);
1133
1134     stop_threads(0);
1135
1136     start_threads();
1137
1138     return -1;
1139   } /* if (sysevent_socket_thread_error != 0) */
1140
1141   pthread_mutex_unlock(&sysevent_thread_lock);
1142
1143   return 0;
1144 } /* }}} int sysevent_read */
1145
1146 static int sysevent_shutdown(void) /* {{{ */
1147 {
1148   DEBUG("sysevent plugin: Shutting down thread.");
1149
1150   int status = stop_threads(1);
1151   int status2 = 0;
1152
1153   if (sock != -1) {
1154     status2 = close(sock);
1155     if (status2 != 0) {
1156       ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
1157             STRERRNO);
1158     } else
1159       sock = -1;
1160   }
1161
1162   free(listen_ip);
1163   free(listen_port);
1164
1165   for (int i = 0; i < buffer_length; i++) {
1166     free(ring.buffer[i]);
1167   }
1168
1169   free(ring.buffer);
1170   free(ring.timestamp);
1171
1172   if (status != 0)
1173     return status;
1174   else
1175     return status2;
1176 } /* }}} int sysevent_shutdown */
1177
1178 void module_register(void) {
1179   plugin_register_complex_config("sysevent", sysevent_config);
1180   plugin_register_init("sysevent", sysevent_init);
1181   plugin_register_read("sysevent", sysevent_read);
1182   plugin_register_shutdown("sysevent", sysevent_shutdown);
1183 } /* void module_register */