2 * collectd - src/sysevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
36 #include <netinet/in.h>
41 #include <sys/socket.h>
44 #include <yajl/yajl_common.h>
45 #include <yajl/yajl_gen.h>
47 #if HAVE_YAJL_YAJL_VERSION_H
48 #include <yajl/yajl_version.h>
50 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
51 #include <yajl/yajl_tree.h>
52 #define HAVE_YAJL_V2 1
55 #define SYSEVENT_DOMAIN_FIELD "domain"
56 #define SYSEVENT_DOMAIN_VALUE "syslog"
57 #define SYSEVENT_EVENT_ID_FIELD "eventId"
58 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
59 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
60 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
61 #define SYSEVENT_PRIORITY_FIELD "priority"
62 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
63 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
64 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
65 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
66 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
67 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
69 #define SYSEVENT_SEQUENCE_FIELD "sequence"
70 #define SYSEVENT_SEQUENCE_VALUE "0"
71 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
72 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
73 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
74 #define SYSEVENT_VERSION_FIELD "version"
75 #define SYSEVENT_VERSION_VALUE "1.0"
77 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
78 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
80 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
81 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
83 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
84 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
85 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
86 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
87 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
98 long long unsigned int *timestamp;
104 static ignorelist_t *ignorelist = NULL;
106 static int sysevent_socket_thread_loop = 0;
107 static int sysevent_socket_thread_error = 0;
108 static pthread_t sysevent_socket_thread_id;
109 static int sysevent_dequeue_thread_loop = 0;
110 static pthread_t sysevent_dequeue_thread_id;
111 static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
112 static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER;
114 static int sock = -1;
115 static int event_id = 0;
116 static circbuf_t ring;
118 static char *listen_ip;
119 static char *listen_port;
120 static int listen_buffer_size = 4096;
121 static int buffer_length = 10;
123 static int monitor_all_messages = 1;
126 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
127 static const char *rsyslog_field_keys[5] = {
128 "facility", "severity", "severity-num", "program", "processid"};
135 static void sysevent_dispatch_notification(const char *message,
139 long long unsigned int timestamp);
145 static int gen_message_payload(const char *msg, char *sev, int sev_num,
146 char *process, char *host,
147 long long unsigned int timestamp, char **buf) {
148 const unsigned char *buf2;
150 char json_str[DATA_MAX_NAME_LEN];
152 #if !defined(HAVE_YAJL_V2)
153 yajl_gen_config conf = {};
160 g = yajl_gen_alloc(NULL);
161 yajl_gen_config(g, yajl_gen_beautify, 0);
164 g = yajl_gen_alloc(&conf, NULL);
169 // *** BEGIN common event header ***
171 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
175 if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
176 strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
179 if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
180 strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
184 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
185 strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
188 event_id = event_id + 1;
189 int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
190 memset(json_str, '\0', DATA_MAX_NAME_LEN);
191 snprintf(json_str, event_id_len, "%d", event_id);
193 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
198 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
199 strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
202 int event_name_len = 0;
203 event_name_len = event_name_len + strlen(host); // host name
206 22; // "host", "rsyslog", "message", 3 spaces and null-terminator
207 memset(json_str, '\0', DATA_MAX_NAME_LEN);
208 snprintf(json_str, event_name_len, "host %s rsyslog message", host);
210 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
211 yajl_gen_status_ok) {
216 if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
217 strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
221 int last_epoch_microsec_len =
222 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
223 memset(json_str, '\0', DATA_MAX_NAME_LEN);
224 snprintf(json_str, last_epoch_microsec_len, "%llu",
225 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
227 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
232 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
233 strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
238 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
239 strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
244 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
245 strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
251 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
252 strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
257 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
258 strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
264 // reportingEntityName
265 if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
266 strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
270 if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
271 strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
276 if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
277 strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
280 if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
281 strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
285 if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
286 strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
289 if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
290 strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
293 // startEpochMicrosec
294 if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
295 strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
299 int start_epoch_microsec_len =
300 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
301 memset(json_str, '\0', DATA_MAX_NAME_LEN);
302 snprintf(json_str, start_epoch_microsec_len, "%llu",
303 (long long unsigned int)timestamp);
305 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
310 if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
311 strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
314 if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
315 strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
318 // *** END common event header ***
320 // *** BEGIN syslog fields ***
322 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
323 strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
327 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
331 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
332 strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
336 if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
340 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
341 strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
345 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
346 strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
350 // syslogFieldsVersion
351 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
352 strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
356 if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
357 strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
363 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
364 strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
368 if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
373 if (process != NULL) {
374 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
375 strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
379 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
386 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
387 strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
391 if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
396 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
397 strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
400 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
401 strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
404 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
407 // *** END syslog fields ***
409 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
412 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
415 *buf = strdup((char *)buf2);
418 ERROR("sysevent plugin: gen_message_payload strdup failed");
428 ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
432 static int read_socket() {
433 int recv_flags = MSG_DONTWAIT;
436 struct sockaddr_storage src_addr;
437 socklen_t src_addr_len = sizeof(src_addr);
439 char buffer[listen_buffer_size];
440 memset(buffer, '\0', listen_buffer_size);
442 ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags,
443 (struct sockaddr *)&src_addr, &src_addr_len);
446 if (errno == EAGAIN || errno == EWOULDBLOCK) {
447 pthread_mutex_lock(&sysevent_data_lock);
449 // There was nothing more to receive for now, so...
450 // If ring head does not equal ring tail, there is data
451 // in the ring buffer for the dequeue thread to read, so
453 if (ring.head != ring.tail)
454 pthread_cond_signal(&sysevent_cond);
456 pthread_mutex_unlock(&sysevent_data_lock);
458 // Since there was nothing to receive, set recv to block and
462 } else if (errno != EINTR) {
463 ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
466 // Interrupt, so just return
471 if (count >= sizeof(buffer)) {
472 WARNING("sysevent plugin: datagram too large for buffer: truncated");
475 // We successfully received a message, so don't block on the next
476 // read in case there are more (and if there aren't, it will be
477 // handled above in the error-checking)
478 recv_flags = MSG_DONTWAIT;
480 // 1. Acquire data lock
481 // 2. Push to buffer if there is room, otherwise raise warning
482 // and allow dequeue thread to take over
484 pthread_mutex_lock(&sysevent_data_lock);
486 int next = ring.head + 1;
487 if (next >= ring.maxLen)
490 if (next == ring.tail) {
491 // Buffer is full, signal the dequeue thread to process the buffer
492 // and clean it out, and then sleep
493 WARNING("sysevent plugin: ring buffer full");
495 pthread_cond_signal(&sysevent_cond);
496 pthread_mutex_unlock(&sysevent_data_lock);
501 DEBUG("sysevent plugin: writing %s", buffer);
503 strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
504 ring.timestamp[ring.head] =
505 (long long unsigned int)CDTIME_T_TO_US(cdtime());
509 pthread_mutex_unlock(&sysevent_data_lock);
513 static void read_ring_buffer() {
514 pthread_mutex_lock(&sysevent_data_lock);
516 // If there's currently nothing to read from the buffer,
518 if (ring.head == ring.tail)
519 pthread_cond_wait(&sysevent_cond, &sysevent_data_lock);
521 while (ring.head != ring.tail) {
522 int next = ring.tail + 1;
524 if (next >= ring.maxLen)
527 DEBUG("sysevent plugin: reading from ring buffer: %s",
528 ring.buffer[ring.tail]);
530 long long unsigned int timestamp = ring.timestamp[ring.tail];
531 char *match_str = NULL;
534 // Try to parse JSON, and if it fails, fall back to plain string
537 yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail],
538 errbuf, sizeof(errbuf));
543 // If we have any regex filters, we need to see if the message portion of
544 // the data matches any of them (otherwise we're not interested)
545 if (monitor_all_messages == 0) {
546 const char *path[] = {"@message", (const char *)0};
547 yajl_val v = yajl_tree_get(node, path, yajl_t_string);
549 char json_val[listen_buffer_size];
550 memset(json_val, '\0', listen_buffer_size);
552 snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
555 match_str = (char *)&json_val;
558 // non-JSON rsyslog data
560 // If we have any regex filters, we need to see if the message data
561 // matches any of them (otherwise we're not interested)
562 if (monitor_all_messages == 0)
563 match_str = ring.buffer[ring.tail];
566 // If we have any regex filters, we need to see if the message data
567 // matches any of them (otherwise we're not interested)
568 if (monitor_all_messages == 0)
569 match_str = ring.buffer[ring.tail];
574 // If we care about matching, do that comparison here
575 if (match_str != NULL) {
578 if (ignorelist_match(ignorelist, match_str) != 0)
581 DEBUG("sysevent plugin: regex filter match");
585 if (is_match == 1 && node != NULL) {
586 sysevent_dispatch_notification(NULL, &node, timestamp);
587 yajl_tree_free(node);
588 } else if (is_match == 1)
589 sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
592 sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
598 pthread_mutex_unlock(&sysevent_data_lock);
601 static void *sysevent_socket_thread(void *arg) /* {{{ */
603 pthread_mutex_lock(&sysevent_thread_lock);
605 while (sysevent_socket_thread_loop > 0) {
606 pthread_mutex_unlock(&sysevent_thread_lock);
611 int status = read_socket();
613 pthread_mutex_lock(&sysevent_thread_lock);
616 WARNING("sysevent plugin: problem with thread status: %d", status);
617 sysevent_socket_thread_error = 1;
620 } /* while (sysevent_socket_thread_loop > 0) */
622 pthread_mutex_unlock(&sysevent_thread_lock);
625 } /* }}} void *sysevent_socket_thread */
627 // Entry point for thread responsible for reading from
628 // ring buffer and dispatching notifications
629 static void *sysevent_dequeue_thread(void *arg) /* {{{ */
631 pthread_mutex_lock(&sysevent_thread_lock);
633 while (sysevent_dequeue_thread_loop > 0) {
634 pthread_mutex_unlock(&sysevent_thread_lock);
638 pthread_mutex_lock(&sysevent_thread_lock);
639 } /* while (sysevent_dequeue_thread_loop > 0) */
641 pthread_mutex_unlock(&sysevent_thread_lock);
644 } /* }}} void *sysevent_dequeue_thread */
646 static int start_socket_thread(void) /* {{{ */
648 pthread_mutex_lock(&sysevent_thread_lock);
650 if (sysevent_socket_thread_loop != 0) {
651 pthread_mutex_unlock(&sysevent_thread_lock);
655 sysevent_socket_thread_loop = 1;
656 sysevent_socket_thread_error = 0;
658 DEBUG("sysevent plugin: starting socket thread");
660 int status = plugin_thread_create(&sysevent_socket_thread_id,
661 /* attr = */ NULL, sysevent_socket_thread,
662 /* arg = */ (void *)0, "sysevent");
664 sysevent_socket_thread_loop = 0;
665 ERROR("sysevent plugin: starting socket thread failed.");
666 pthread_mutex_unlock(&sysevent_thread_lock);
670 pthread_mutex_unlock(&sysevent_thread_lock);
673 } /* }}} int start_socket_thread */
675 static int start_dequeue_thread(void) /* {{{ */
677 pthread_mutex_lock(&sysevent_thread_lock);
679 if (sysevent_dequeue_thread_loop != 0) {
680 pthread_mutex_unlock(&sysevent_thread_lock);
684 sysevent_dequeue_thread_loop = 1;
686 int status = plugin_thread_create(&sysevent_dequeue_thread_id,
687 /* attr = */ NULL, sysevent_dequeue_thread,
688 /* arg = */ (void *)0, "ssyevent");
690 sysevent_dequeue_thread_loop = 0;
691 ERROR("sysevent plugin: Starting dequeue thread failed.");
692 pthread_mutex_unlock(&sysevent_thread_lock);
696 pthread_mutex_unlock(&sysevent_thread_lock);
699 } /* }}} int start_dequeue_thread */
701 static int start_threads(void) /* {{{ */
703 int status = start_socket_thread();
704 int status2 = start_dequeue_thread();
710 } /* }}} int start_threads */
712 static int stop_socket_thread(int shutdown) /* {{{ */
714 pthread_mutex_lock(&sysevent_thread_lock);
716 if (sysevent_socket_thread_loop == 0) {
717 pthread_mutex_unlock(&sysevent_thread_lock);
721 sysevent_socket_thread_loop = 0;
722 pthread_cond_broadcast(&sysevent_cond);
723 pthread_mutex_unlock(&sysevent_thread_lock);
728 // Since the thread is blocking, calling pthread_join
729 // doesn't actually succeed in stopping it. It will stick around
730 // until a message is received on the socket (at which
731 // it will realize that "sysevent_socket_thread_loop" is 0 and will
732 // break out of the read loop and be allowed to die). This is
733 // fine when the process isn't supposed to be exiting, but in
734 // the case of a process shutdown, we don't want to have an
735 // idle thread hanging around. Calling pthread_cancel here in
736 // the case of a shutdown is just assures that the thread is
737 // gone and that the process has been fully terminated.
739 DEBUG("sysevent plugin: Canceling socket thread for process shutdown");
741 status = pthread_cancel(sysevent_socket_thread_id);
743 if (status != 0 && status != ESRCH) {
744 ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status,
750 status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL);
751 if (status != 0 && status != ESRCH) {
752 ERROR("sysevent plugin: Stopping socket thread failed.");
758 pthread_mutex_lock(&sysevent_thread_lock);
759 memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id));
760 sysevent_socket_thread_error = 0;
761 pthread_mutex_unlock(&sysevent_thread_lock);
763 DEBUG("sysevent plugin: Finished requesting stop of socket thread");
766 } /* }}} int stop_socket_thread */
768 static int stop_dequeue_thread(int shutdown) /* {{{ */
770 pthread_mutex_lock(&sysevent_thread_lock);
772 if (sysevent_dequeue_thread_loop == 0) {
773 pthread_mutex_unlock(&sysevent_thread_lock);
777 sysevent_dequeue_thread_loop = 0;
778 pthread_cond_broadcast(&sysevent_cond);
779 pthread_mutex_unlock(&sysevent_thread_lock);
784 // Since the thread is blocking, calling pthread_join
785 // doesn't actually succeed in stopping it. It will stick around
786 // until a message is received on the socket (at which
787 // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
788 // break out of the read loop and be allowed to die). This is
789 // fine when the process isn't supposed to be exiting, but in
790 // the case of a process shutdown, we don't want to have an
791 // idle thread hanging around. Calling pthread_cancel here in
792 // the case of a shutdown is just assures that the thread is
793 // gone and that the process has been fully terminated.
795 DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
797 status = pthread_cancel(sysevent_dequeue_thread_id);
799 if (status != 0 && status != ESRCH) {
800 ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
806 status = pthread_join(sysevent_dequeue_thread_id, /* return = */ NULL);
807 if (status != 0 && status != ESRCH) {
808 ERROR("sysevent plugin: Stopping dequeue thread failed.");
814 pthread_mutex_lock(&sysevent_thread_lock);
815 memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
816 pthread_mutex_unlock(&sysevent_thread_lock);
818 DEBUG("sysevent plugin: Finished requesting stop of dequeue thread");
821 } /* }}} int stop_dequeue_thread */
823 static int stop_threads(int shutdown) /* {{{ */
825 int status = stop_socket_thread(shutdown);
826 int status2 = stop_dequeue_thread(shutdown);
832 } /* }}} int stop_threads */
834 static int sysevent_init(void) /* {{{ */
838 ring.maxLen = buffer_length;
839 ring.buffer = (char **)calloc(buffer_length, sizeof(char *));
841 if (ring.buffer == NULL) {
842 ERROR("sysevent plugin: sysevent_init calloc failed");
846 for (int i = 0; i < buffer_length; i++) {
847 ring.buffer[i] = calloc(1, listen_buffer_size);
850 ring.timestamp = (long long unsigned int *)calloc(
851 buffer_length, sizeof(long long unsigned int));
854 const char *hostname = listen_ip;
855 const char *portname = listen_port;
856 struct addrinfo hints;
857 memset(&hints, 0, sizeof(hints));
858 hints.ai_family = AF_UNSPEC;
859 hints.ai_socktype = SOCK_DGRAM;
860 hints.ai_protocol = 0;
861 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
862 struct addrinfo *res = 0;
864 int err = getaddrinfo(hostname, portname, &hints, &res);
867 ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
873 sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
875 ERROR("sysevent plugin: failed to open socket: %s", STRERRNO);
880 if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
881 ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
889 DEBUG("sysevent plugin: socket created and bound");
891 return start_threads();
892 } /* }}} int sysevent_init */
894 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
896 if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
897 ci->values[1].type != OCONFIG_TYPE_STRING) {
898 ERROR("sysevent plugin: The `%s' config option needs "
899 "two string arguments (ip and port).",
904 listen_ip = strdup(ci->values[0].value.string);
905 listen_port = strdup(ci->values[1].value.string);
910 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
914 if (cf_util_get_int(ci, &tmp) != 0)
916 else if ((tmp >= 1024) && (tmp <= 65535))
917 listen_buffer_size = tmp;
920 "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
927 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
931 if (cf_util_get_int(ci, &tmp) != 0)
933 else if ((tmp >= 3) && (tmp <= 4096))
936 WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
943 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
945 if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
946 ERROR("sysevent plugin: The `%s' config option needs "
947 "one string argument, a regular expression.",
953 if (ignorelist == NULL)
954 ignorelist = ignorelist_create(/* invert = */ 1);
956 int status = ignorelist_add(ignorelist, ci->values[0].value.string);
959 ERROR("sysevent plugin: invalid regular expression: %s",
960 ci->values[0].value.string);
964 monitor_all_messages = 0;
966 WARNING("sysevent plugin: The plugin has been compiled without support "
967 "for the \"RegexFilter\" option.");
973 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
975 for (int i = 0; i < ci->children_num; i++) {
976 oconfig_item_t *child = ci->children + i;
978 if (strcasecmp("Listen", child->key) == 0)
979 sysevent_config_add_listen(child);
980 else if (strcasecmp("BufferSize", child->key) == 0)
981 sysevent_config_add_buffer_size(child);
982 else if (strcasecmp("BufferLength", child->key) == 0)
983 sysevent_config_add_buffer_length(child);
984 else if (strcasecmp("RegexFilter", child->key) == 0)
985 sysevent_config_add_regex_filter(child);
987 WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
992 } /* }}} int sysevent_config */
994 static void sysevent_dispatch_notification(const char *message,
998 long long unsigned int timestamp) {
1000 notification_t n = {NOTIF_OKAY, cdtime(), "", "", "sysevent",
1005 // If we have a parsed-JSON node to work with, use that
1008 const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
1009 yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
1011 char msg[listen_buffer_size];
1013 if (msg_v != NULL) {
1014 memset(msg, '\0', listen_buffer_size);
1015 snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
1019 const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
1021 yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
1023 char severity[listen_buffer_size];
1025 if (severity_v != NULL) {
1026 memset(severity, '\0', listen_buffer_size);
1027 snprintf(severity, listen_buffer_size, "%s%c",
1028 YAJL_GET_STRING(severity_v), '\0');
1032 const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
1034 yajl_val sev_num_str_v =
1035 yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
1037 char sev_num_str[listen_buffer_size];
1040 if (sev_num_str_v != NULL) {
1041 memset(sev_num_str, '\0', listen_buffer_size);
1042 snprintf(sev_num_str, listen_buffer_size, "%s%c",
1043 YAJL_GET_STRING(sev_num_str_v), '\0');
1045 sev_num = atoi(sev_num_str);
1048 n.severity = NOTIF_FAILURE;
1052 const char *process_path[] = {"@fields", rsyslog_field_keys[3],
1054 yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
1056 char process[listen_buffer_size];
1058 if (process_v != NULL) {
1059 memset(process, '\0', listen_buffer_size);
1060 snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
1065 const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
1066 yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
1068 char hostname_str[listen_buffer_size];
1070 if (hostname_v != NULL) {
1071 memset(hostname_str, '\0', listen_buffer_size);
1072 snprintf(hostname_str, listen_buffer_size, "%s%c",
1073 YAJL_GET_STRING(hostname_v), '\0');
1076 gen_message_payload(
1077 (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
1078 (sev_num_str_v != NULL ? sev_num : -1),
1079 (process_v != NULL ? process : NULL),
1080 (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
1082 // Data was not sent in JSON format, so just treat the whole log entry
1083 // as the message (and we'll be unable to acquire certain data, so the
1085 // generated below will be less informative)
1087 gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
1090 gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
1093 sstrncpy(n.host, hostname_g, sizeof(n.host));
1094 sstrncpy(n.type, "gauge", sizeof(n.type));
1096 notification_meta_t *m = calloc(1, sizeof(*m));
1100 ERROR("sysevent plugin: unable to allocate metadata: %s", STRERRNO);
1104 sstrncpy(m->name, "ves", sizeof(m->name));
1105 m->nm_value.nm_string = sstrdup(buf);
1106 m->type = NM_TYPE_STRING;
1109 DEBUG("sysevent plugin: notification message: %s",
1110 n.meta->nm_value.nm_string);
1112 DEBUG("sysevent plugin: dispatching message");
1114 plugin_dispatch_notification(&n);
1115 plugin_notification_meta_free(n.meta);
1117 // strdup'd in gen_message_payload
1122 static int sysevent_read(void) /* {{{ */
1124 pthread_mutex_lock(&sysevent_thread_lock);
1126 if (sysevent_socket_thread_error != 0) {
1127 pthread_mutex_unlock(&sysevent_thread_lock);
1129 ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
1132 sysevent_socket_thread_error);
1139 } /* if (sysevent_socket_thread_error != 0) */
1141 pthread_mutex_unlock(&sysevent_thread_lock);
1144 } /* }}} int sysevent_read */
1146 static int sysevent_shutdown(void) /* {{{ */
1148 DEBUG("sysevent plugin: Shutting down thread.");
1150 int status = stop_threads(1);
1154 status2 = close(sock);
1156 ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
1165 for (int i = 0; i < buffer_length; i++) {
1166 free(ring.buffer[i]);
1170 free(ring.timestamp);
1176 } /* }}} int sysevent_shutdown */
1178 void module_register(void) {
1179 plugin_register_complex_config("sysevent", sysevent_config);
1180 plugin_register_init("sysevent", sysevent_init);
1181 plugin_register_read("sysevent", sysevent_read);
1182 plugin_register_shutdown("sysevent", sysevent_shutdown);
1183 } /* void module_register */