2 * collectd - src/sysevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
34 #include <asm/types.h>
37 #include <netinet/in.h>
42 #include <sys/socket.h>
45 #include <yajl/yajl_common.h>
46 #include <yajl/yajl_gen.h>
48 #if HAVE_YAJL_YAJL_VERSION_H
49 #include <yajl/yajl_version.h>
51 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
52 #include <yajl/yajl_tree.h>
53 #define HAVE_YAJL_V2 1
56 #define SYSEVENT_DOMAIN_FIELD "domain"
57 #define SYSEVENT_DOMAIN_VALUE "syslog"
58 #define SYSEVENT_EVENT_ID_FIELD "eventId"
59 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
60 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
61 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
62 #define SYSEVENT_PRIORITY_FIELD "priority"
63 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
64 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
65 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
66 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
67 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
69 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
70 #define SYSEVENT_SEQUENCE_FIELD "sequence"
71 #define SYSEVENT_SEQUENCE_VALUE "0"
72 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
73 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
74 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
75 #define SYSEVENT_VERSION_FIELD "version"
76 #define SYSEVENT_VERSION_VALUE "1.0"
78 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
80 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
81 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
83 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
84 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
85 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
86 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
87 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
88 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
99 long long unsigned int *timestamp;
105 static ignorelist_t *ignorelist = NULL;
107 static int sysevent_thread_loop = 0;
108 static int sysevent_thread_error = 0;
109 static pthread_t sysevent_thread_id;
110 static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
111 static int sock = -1;
112 static int event_id = 0;
113 static circbuf_t ring;
115 static char *listen_ip;
116 static char *listen_port;
117 static int listen_buffer_size = 4096;
118 static int buffer_length = 10;
120 static int monitor_all_messages = 1;
122 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
123 static const char *rsyslog_field_keys[5] = {
124 "facility", "severity", "severity-num", "program", "processid"};
130 static int gen_message_payload(const char *msg, char *sev, int sev_num,
131 char *process, char *host,
132 long long unsigned int timestamp, char **buf) {
133 const unsigned char *buf2;
135 char json_str[DATA_MAX_NAME_LEN];
137 #if !defined(HAVE_YAJL_V2)
138 yajl_gen_config conf = {};
145 g = yajl_gen_alloc(NULL);
146 yajl_gen_config(g, yajl_gen_beautify, 0);
149 g = yajl_gen_alloc(&conf, NULL);
154 // *** BEGIN common event header ***
156 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
160 if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
161 strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
164 if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
165 strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
169 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
170 strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
173 event_id = event_id + 1;
174 int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
175 memset(json_str, '\0', DATA_MAX_NAME_LEN);
176 snprintf(json_str, event_id_len, "%d", event_id);
178 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
183 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
184 strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
187 int event_name_len = 0;
188 event_name_len = event_name_len + strlen(host); // host name
191 22; // "host", "rsyslog", "message", 3 spaces and null-terminator
192 memset(json_str, '\0', DATA_MAX_NAME_LEN);
193 snprintf(json_str, event_name_len, "host %s rsyslog message", host);
195 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
196 yajl_gen_status_ok) {
201 if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
202 strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
206 int last_epoch_microsec_len =
207 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
208 memset(json_str, '\0', DATA_MAX_NAME_LEN);
209 snprintf(json_str, last_epoch_microsec_len, "%llu",
210 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
212 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
217 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
218 strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
223 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
224 strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
229 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
230 strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
236 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
237 strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
242 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
243 strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
249 // reportingEntityName
250 if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
251 strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
255 if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
256 strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
261 if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
262 strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
265 if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
266 strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
270 if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
271 strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
274 if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
275 strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
278 // startEpochMicrosec
279 if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
280 strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
284 int start_epoch_microsec_len =
285 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
286 memset(json_str, '\0', DATA_MAX_NAME_LEN);
287 snprintf(json_str, start_epoch_microsec_len, "%llu",
288 (long long unsigned int)timestamp);
290 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
295 if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
296 strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
299 if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
300 strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
303 // *** END common event header ***
305 // *** BEGIN syslog fields ***
307 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
308 strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
312 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
316 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
317 strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
321 if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
325 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
326 strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
330 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
331 strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
335 // syslogFieldsVersion
336 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
337 strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
341 if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
342 strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
346 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
351 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
352 strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
356 if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
361 if (process != NULL) {
362 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
363 strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
367 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
374 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
375 strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
379 if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
384 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
385 strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
388 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
389 strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
392 // *** END syslog fields ***
394 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
397 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
400 *buf = malloc(strlen((char *)buf2) + 1);
402 sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
410 ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
414 static void *sysevent_thread(void *arg) /* {{{ */
416 pthread_mutex_lock(&sysevent_lock);
418 while (sysevent_thread_loop > 0) {
421 pthread_mutex_unlock(&sysevent_lock);
426 char buffer[listen_buffer_size];
427 struct sockaddr_storage src_addr;
428 socklen_t src_addr_len = sizeof(src_addr);
430 memset(buffer, '\0', listen_buffer_size);
432 ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
433 (struct sockaddr *)&src_addr, &src_addr_len);
436 ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
438 } else if (count >= sizeof(buffer)) {
439 WARNING("sysevent plugin: datagram too large for buffer: truncated");
442 // 2. Push to buffer if there is room, otherwise raise warning
444 pthread_mutex_lock(&sysevent_lock);
446 int next = ring.head + 1;
447 if (next >= ring.maxLen)
450 if (next == ring.tail) {
451 WARNING("sysevent plugin: ring buffer full");
453 DEBUG("sysevent plugin: writing %s", buffer);
455 strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
456 ring.timestamp[ring.head] =
457 (long long unsigned int)CDTIME_T_TO_US(cdtime());
461 pthread_mutex_unlock(&sysevent_lock);
466 pthread_mutex_lock(&sysevent_lock);
469 WARNING("sysevent plugin: problem with thread status: %d", status);
470 sysevent_thread_error = 1;
474 if (sysevent_thread_loop <= 0)
476 } /* while (sysevent_thread_loop > 0) */
478 pthread_mutex_unlock(&sysevent_lock);
480 // pthread_exit instead of return?
482 } /* }}} void *sysevent_thread */
484 static int start_thread(void) /* {{{ */
488 pthread_mutex_lock(&sysevent_lock);
490 if (sysevent_thread_loop != 0) {
491 pthread_mutex_unlock(&sysevent_lock);
495 sysevent_thread_loop = 1;
496 sysevent_thread_error = 0;
498 DEBUG("sysevent plugin: starting thread");
500 status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
502 /* arg = */ (void *)0, "sysevent");
504 sysevent_thread_loop = 0;
505 ERROR("sysevent plugin: starting thread failed.");
506 pthread_mutex_unlock(&sysevent_lock);
510 pthread_mutex_unlock(&sysevent_lock);
512 } /* }}} int start_thread */
514 static int stop_thread(int shutdown) /* {{{ */
518 pthread_mutex_lock(&sysevent_lock);
520 if (sysevent_thread_loop == 0) {
521 pthread_mutex_unlock(&sysevent_lock);
525 sysevent_thread_loop = 0;
526 pthread_mutex_unlock(&sysevent_lock);
529 // Since the thread is blocking, calling pthread_join
530 // doesn't actually succeed in stopping it. It will stick around
531 // until a message is received on the socket (at which
532 // it will realize that "sysevent_thread_loop" is 0 and will
533 // break out of the read loop and be allowed to die). This is
534 // fine when the process isn't supposed to be exiting, but in
535 // the case of a process shutdown, we don't want to have an
536 // idle thread hanging around. Calling pthread_cancel here in
537 // the case of a shutdown is just assures that the thread is
538 // gone and that the process has been fully terminated.
540 DEBUG("sysevent plugin: Canceling thread for process shutdown");
542 status = pthread_cancel(sysevent_thread_id);
545 ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
550 status = pthread_join(sysevent_thread_id, /* return = */ NULL);
552 ERROR("sysevent plugin: Stopping thread failed.");
557 pthread_mutex_lock(&sysevent_lock);
558 memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
559 sysevent_thread_error = 0;
560 pthread_mutex_unlock(&sysevent_lock);
562 DEBUG("sysevent plugin: Finished requesting stop of thread");
565 } /* }}} int stop_thread */
567 static int sysevent_init(void) /* {{{ */
571 ring.maxLen = buffer_length;
572 ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
574 for (int i = 0; i < buffer_length; i++) {
575 ring.buffer[i] = malloc(listen_buffer_size);
578 ring.timestamp = (long long unsigned int *)malloc(
579 buffer_length * sizeof(long long unsigned int));
582 const char *hostname = listen_ip;
583 const char *portname = listen_port;
584 struct addrinfo hints;
585 memset(&hints, 0, sizeof(hints));
586 hints.ai_family = AF_UNSPEC;
587 hints.ai_socktype = SOCK_DGRAM;
588 hints.ai_protocol = 0;
589 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
590 struct addrinfo *res = 0;
592 int err = getaddrinfo(hostname, portname, &hints, &res);
595 ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
601 sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
603 ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
608 if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
609 ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
617 DEBUG("sysevent plugin: socket created and bound");
619 return (start_thread());
620 } /* }}} int sysevent_init */
622 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
624 if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
625 ci->values[1].type != OCONFIG_TYPE_STRING) {
626 ERROR("sysevent plugin: The `%s' config option needs "
627 "two string arguments (ip and port).",
632 listen_ip = strdup(ci->values[0].value.string);
633 listen_port = strdup(ci->values[1].value.string);
638 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
642 if (cf_util_get_int(ci, &tmp) != 0)
644 else if ((tmp >= 1024) && (tmp <= 65535))
645 listen_buffer_size = tmp;
648 "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
655 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
659 if (cf_util_get_int(ci, &tmp) != 0)
661 else if ((tmp >= 3) && (tmp <= 4096))
664 WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
671 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
673 if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
674 ERROR("sysevent plugin: The `%s' config option needs "
675 "one string argument, a regular expression.",
681 if (ignorelist == NULL)
682 ignorelist = ignorelist_create(/* invert = */ 1);
684 int status = ignorelist_add(ignorelist, ci->values[0].value.string);
687 ERROR("sysevent plugin: invalid regular expression: %s",
688 ci->values[0].value.string);
692 monitor_all_messages = 0;
694 WARNING("sysevent plugin: The plugin has been compiled without support "
695 "for the \"RegexFilter\" option.");
701 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
703 for (int i = 0; i < ci->children_num; i++) {
704 oconfig_item_t *child = ci->children + i;
706 if (strcasecmp("Listen", child->key) == 0)
707 sysevent_config_add_listen(child);
708 else if (strcasecmp("BufferSize", child->key) == 0)
709 sysevent_config_add_buffer_size(child);
710 else if (strcasecmp("BufferLength", child->key) == 0)
711 sysevent_config_add_buffer_length(child);
712 else if (strcasecmp("RegexFilter", child->key) == 0)
713 sysevent_config_add_regex_filter(child);
715 WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
720 } /* }}} int sysevent_config */
722 static void sysevent_dispatch_notification(const char *message,
726 long long unsigned int timestamp) {
728 notification_t n = {NOTIF_OKAY, cdtime(), "", "", "sysevent",
733 // If we have a parsed-JSON node to work with, use that
735 char process[listen_buffer_size];
736 char severity[listen_buffer_size];
737 char sev_num_str[listen_buffer_size];
738 char msg[listen_buffer_size];
739 char hostname_str[listen_buffer_size];
743 const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
744 yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
747 memset(msg, '\0', listen_buffer_size);
748 snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
752 const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
754 yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
756 if (severity_v != NULL) {
757 memset(severity, '\0', listen_buffer_size);
758 snprintf(severity, listen_buffer_size, "%s%c",
759 YAJL_GET_STRING(severity_v), '\0');
763 const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
765 yajl_val sev_num_str_v =
766 yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
768 if (sev_num_str_v != NULL) {
769 memset(sev_num_str, '\0', listen_buffer_size);
770 snprintf(sev_num_str, listen_buffer_size, "%s%c",
771 YAJL_GET_STRING(sev_num_str_v), '\0');
773 sev_num = atoi(sev_num_str);
776 n.severity = NOTIF_FAILURE;
780 const char *process_path[] = {"@fields", rsyslog_field_keys[3],
782 yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
784 if (process_v != NULL) {
785 memset(process, '\0', listen_buffer_size);
786 snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
791 const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
792 yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
794 if (hostname_v != NULL) {
795 memset(hostname_str, '\0', listen_buffer_size);
796 snprintf(hostname_str, listen_buffer_size, "%s%c",
797 YAJL_GET_STRING(hostname_v), '\0');
801 (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
802 (sev_num_str_v != NULL ? sev_num : -1),
803 (process_v != NULL ? process : NULL),
804 (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
806 // Data was not sent in JSON format, so just treat the whole log entry
807 // as the message (and we'll be unable to acquire certain data, so the
809 // generated below will be less informative)
811 gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
814 gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
817 sstrncpy(n.host, hostname_g, sizeof(n.host));
818 sstrncpy(n.type, "gauge", sizeof(n.type));
820 notification_meta_t *m = calloc(1, sizeof(*m));
825 ERROR("sysevent plugin: unable to allocate metadata: %s",
826 sstrerror(errno, errbuf, sizeof(errbuf)));
830 sstrncpy(m->name, "ves", sizeof(m->name));
831 m->nm_value.nm_string = sstrdup(buf);
832 m->type = NM_TYPE_STRING;
835 DEBUG("sysevent plugin: notification message: %s",
836 n.meta->nm_value.nm_string);
838 DEBUG("sysevent plugin: dispatching message");
840 plugin_dispatch_notification(&n);
841 plugin_notification_meta_free(n.meta);
843 // malloc'd in gen_message_payload
848 static int sysevent_read(void) /* {{{ */
850 if (sysevent_thread_error != 0) {
851 ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
853 sysevent_thread_error);
860 } /* if (sysevent_thread_error != 0) */
862 pthread_mutex_lock(&sysevent_lock);
864 while (ring.head != ring.tail) {
865 long long unsigned int timestamp;
867 char *match_str = NULL;
868 int next = ring.tail + 1;
870 if (next >= ring.maxLen)
873 DEBUG("sysevent plugin: reading from ring buffer: %s",
874 ring.buffer[ring.tail]);
876 timestamp = ring.timestamp[ring.tail];
879 // Try to parse JSON, and if it fails, fall back to plain string
880 yajl_val node = NULL;
883 node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
889 // If we have any regex filters, we need to see if the message portion of
890 // the data matches any of them (otherwise we're not interested)
891 if (monitor_all_messages == 0) {
892 char json_val[listen_buffer_size];
893 const char *path[] = {"@message", (const char *)0};
894 yajl_val v = yajl_tree_get(node, path, yajl_t_string);
896 memset(json_val, '\0', listen_buffer_size);
898 snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
901 match_str = (char *)&json_val;
904 // non-JSON rsyslog data
906 // If we have any regex filters, we need to see if the message data
907 // matches any of them (otherwise we're not interested)
908 if (monitor_all_messages == 0)
909 match_str = ring.buffer[ring.tail];
912 // If we have any regex filters, we need to see if the message data
913 // matches any of them (otherwise we're not interested)
914 if (monitor_all_messages == 0)
915 match_str = ring.buffer[ring.tail];
918 // If we care about matching, do that comparison here
919 if (match_str != NULL) {
922 if (ignorelist_match(ignorelist, match_str) != 0)
925 DEBUG("sysevent plugin: regex filter match");
929 if (is_match == 1 && node != NULL) {
930 sysevent_dispatch_notification(NULL, &node, timestamp);
931 yajl_tree_free(node);
932 } else if (is_match == 1)
933 sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
936 sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
942 pthread_mutex_unlock(&sysevent_lock);
945 } /* }}} int sysevent_read */
947 static int sysevent_shutdown(void) /* {{{ */
951 DEBUG("sysevent plugin: Shutting down thread.");
952 if (stop_thread(1) < 0)
956 status = close(sock);
958 ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
968 for (int i = 0; i < buffer_length; i++) {
969 free(ring.buffer[i]);
973 free(ring.timestamp);
976 } /* }}} int sysevent_shutdown */
978 void module_register(void) {
979 plugin_register_complex_config("sysevent", sysevent_config);
980 plugin_register_init("sysevent", sysevent_init);
981 plugin_register_read("sysevent", sysevent_read);
982 plugin_register_shutdown("sysevent", sysevent_shutdown);
983 } /* void module_register */