2 * collectd - src/sysevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
30 #include "utils/common/common.h"
31 #include "utils/ignorelist/ignorelist.h"
32 #include "utils_complain.h"
36 #include <netinet/in.h>
41 #include <sys/socket.h>
44 #include <yajl/yajl_common.h>
45 #include <yajl/yajl_gen.h>
47 #if HAVE_YAJL_YAJL_VERSION_H
48 #include <yajl/yajl_version.h>
50 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
51 #include <yajl/yajl_tree.h>
52 #define HAVE_YAJL_V2 1
55 #define SYSEVENT_DOMAIN_FIELD "domain"
56 #define SYSEVENT_DOMAIN_VALUE "syslog"
57 #define SYSEVENT_EVENT_ID_FIELD "eventId"
58 #define SYSEVENT_EVENT_NAME_FIELD "eventName"
59 #define SYSEVENT_EVENT_NAME_VALUE "syslog message"
60 #define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
61 #define SYSEVENT_PRIORITY_FIELD "priority"
62 #define SYSEVENT_PRIORITY_VALUE_HIGH "high"
63 #define SYSEVENT_PRIORITY_VALUE_LOW "low"
64 #define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
65 #define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
66 #define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
67 #define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
68 #define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
69 #define SYSEVENT_SEQUENCE_FIELD "sequence"
70 #define SYSEVENT_SEQUENCE_VALUE "0"
71 #define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
72 #define SYSEVENT_SOURCE_NAME_VALUE "syslog"
73 #define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
74 #define SYSEVENT_VERSION_FIELD "version"
75 #define SYSEVENT_VERSION_VALUE "1.0"
77 #define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
78 #define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
79 #define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
80 #define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
81 #define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
82 #define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
83 #define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
84 #define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
85 #define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
86 #define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
87 #define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
105 static ignorelist_t *ignorelist = NULL;
107 static int sysevent_socket_thread_loop = 0;
108 static int sysevent_socket_thread_error = 0;
109 static pthread_t sysevent_socket_thread_id;
110 static int sysevent_dequeue_thread_loop = 0;
111 static pthread_t sysevent_dequeue_thread_id;
112 static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
114 static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER;
115 static int sock = -1;
116 static int event_id = 0;
117 static circbuf_t ring;
119 static char *listen_ip;
120 static char *listen_port;
121 static int listen_buffer_size = 4096;
122 static int buffer_length = 10;
124 static int monitor_all_messages = 1;
127 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
128 static const char *rsyslog_field_keys[5] = {
129 "facility", "severity", "severity-num", "program", "processid"};
136 static int gen_message_payload(const char *msg, char *sev, int sev_num,
137 char *process, char *host, cdtime_t timestamp,
139 const unsigned char *buf2;
141 char json_str[DATA_MAX_NAME_LEN];
143 #if !defined(HAVE_YAJL_V2)
144 yajl_gen_config conf = {0};
149 g = yajl_gen_alloc(NULL);
150 yajl_gen_config(g, yajl_gen_beautify, 0);
153 g = yajl_gen_alloc(&conf, NULL);
158 // *** BEGIN common event header ***
160 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
164 if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
165 strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
168 if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
169 strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
173 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
174 strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
177 event_id = event_id + 1;
178 snprintf(json_str, sizeof(json_str), "%d", event_id);
180 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
185 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
186 strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
189 snprintf(json_str, sizeof(json_str), "host %s rsyslog message", host);
191 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
192 yajl_gen_status_ok) {
197 if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
198 strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
202 snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(cdtime()));
204 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
209 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
210 strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
215 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
216 strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
221 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
222 strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
228 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
229 strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
234 if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
235 strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
241 // reportingEntityName
242 if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
243 strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
247 if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
248 strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
253 if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
254 strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
257 if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
258 strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
262 if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
263 strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
266 if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
267 strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
270 // startEpochMicrosec
271 if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
272 strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
276 snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(timestamp));
278 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
283 if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
284 strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
287 if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
288 strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
291 // *** END common event header ***
293 // *** BEGIN syslog fields ***
295 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
296 strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
300 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
304 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
305 strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
309 if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
313 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
314 strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
318 if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
319 strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
323 // syslogFieldsVersion
324 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
325 strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
329 if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
330 strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
336 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
337 strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
341 if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
346 if (process != NULL) {
347 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
348 strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
352 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
359 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
360 strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
364 if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
369 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
370 strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
373 if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
374 strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
377 // *** END syslog fields ***
379 // close syslog and header fields
380 if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
381 yajl_gen_map_close(g) != yajl_gen_status_ok)
384 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
387 *buf = strdup((char *)buf2);
390 ERROR("sysevent plugin: gen_message_payload strdup failed");
400 ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
404 static int read_socket() {
405 int recv_flags = MSG_DONTWAIT;
408 struct sockaddr_storage src_addr;
409 socklen_t src_addr_len = sizeof(src_addr);
411 char buffer[listen_buffer_size];
412 memset(buffer, '\0', listen_buffer_size);
414 ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags,
415 (struct sockaddr *)&src_addr, &src_addr_len);
418 if (errno == EAGAIN || errno == EWOULDBLOCK) {
419 pthread_mutex_lock(&sysevent_data_lock);
421 // There was nothing more to receive for now, so...
422 // If ring head does not equal ring tail, there is data
423 // in the ring buffer for the dequeue thread to read, so
425 if (ring.head != ring.tail)
426 pthread_cond_signal(&sysevent_cond);
428 pthread_mutex_unlock(&sysevent_data_lock);
430 // Since there was nothing to receive, set recv to block and
434 } else if (errno != EINTR) {
435 ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
438 // Interrupt, so continue and try again
443 if (count >= sizeof(buffer)) {
444 WARNING("sysevent plugin: datagram too large for buffer: truncated");
447 // We successfully received a message, so don't block on the next
448 // read in case there are more (and if there aren't, it will be
449 // handled above in the EWOULDBLOCK error-checking)
450 recv_flags = MSG_DONTWAIT;
452 // 1. Acquire data lock
453 // 2. Push to buffer if there is room, otherwise raise warning
454 // and allow dequeue thread to take over
456 pthread_mutex_lock(&sysevent_data_lock);
458 int next = ring.head + 1;
459 if (next >= ring.maxLen)
462 if (next == ring.tail) {
463 // Buffer is full, signal the dequeue thread to process the buffer
464 // and clean it out, and then sleep
465 WARNING("sysevent plugin: ring buffer full");
467 pthread_cond_signal(&sysevent_cond);
468 pthread_mutex_unlock(&sysevent_data_lock);
473 DEBUG("sysevent plugin: writing %s", buffer);
475 sstrncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
476 ring.timestamp[ring.head] = cdtime();
480 pthread_mutex_unlock(&sysevent_data_lock);
484 static void sysevent_dispatch_notification(const char *message,
488 cdtime_t timestamp) {
492 .severity = NOTIF_OKAY,
494 .plugin = "sysevent",
500 // If we have a parsed-JSON node to work with, use that
502 const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
503 yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
505 char msg[listen_buffer_size];
508 memset(msg, '\0', listen_buffer_size);
509 snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
513 const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
515 yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
517 char severity[listen_buffer_size];
519 if (severity_v != NULL) {
520 memset(severity, '\0', listen_buffer_size);
521 snprintf(severity, listen_buffer_size, "%s%c",
522 YAJL_GET_STRING(severity_v), '\0');
526 const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
528 yajl_val sev_num_str_v =
529 yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
531 char sev_num_str[listen_buffer_size];
534 if (sev_num_str_v != NULL) {
535 memset(sev_num_str, '\0', listen_buffer_size);
536 snprintf(sev_num_str, listen_buffer_size, "%s%c",
537 YAJL_GET_STRING(sev_num_str_v), '\0');
539 sev_num = atoi(sev_num_str);
542 n.severity = NOTIF_FAILURE;
546 const char *process_path[] = {"@fields", rsyslog_field_keys[3],
548 yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
550 char process[listen_buffer_size];
552 if (process_v != NULL) {
553 memset(process, '\0', listen_buffer_size);
554 snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
559 const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
560 yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
562 char hostname_str[listen_buffer_size];
564 if (hostname_v != NULL) {
565 memset(hostname_str, '\0', listen_buffer_size);
566 snprintf(hostname_str, listen_buffer_size, "%s%c",
567 YAJL_GET_STRING(hostname_v), '\0');
571 (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
572 (sev_num_str_v != NULL ? sev_num : -1),
573 (process_v != NULL ? process : NULL),
574 (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
576 // Data was not sent in JSON format, so just treat the whole log entry
577 // as the message (and we'll be unable to acquire certain data, so the
579 // generated below will be less informative)
581 gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
584 gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
587 sstrncpy(n.host, hostname_g, sizeof(n.host));
589 int status = plugin_notification_meta_add_string(&n, "ves", buf);
593 ERROR("sysevent plugin: unable to set notification VES metadata: %s",
598 DEBUG("sysevent plugin: notification VES metadata: %s",
599 n.meta->nm_value.nm_string);
601 DEBUG("sysevent plugin: dispatching message");
603 plugin_dispatch_notification(&n);
604 plugin_notification_meta_free(n.meta);
606 // strdup'd in gen_message_payload
611 static void read_ring_buffer() {
612 pthread_mutex_lock(&sysevent_data_lock);
614 // If there's currently nothing to read from the buffer,
616 if (ring.head == ring.tail)
617 pthread_cond_wait(&sysevent_cond, &sysevent_data_lock);
619 while (ring.head != ring.tail) {
620 int next = ring.tail + 1;
622 if (next >= ring.maxLen)
625 DEBUG("sysevent plugin: reading from ring buffer: %s",
626 ring.buffer[ring.tail]);
628 cdtime_t timestamp = ring.timestamp[ring.tail];
629 char *match_str = NULL;
632 // Try to parse JSON, and if it fails, fall back to plain string
635 yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail],
636 errbuf, sizeof(errbuf));
641 // If we have any regex filters, we need to see if the message portion of
642 // the data matches any of them (otherwise we're not interested)
643 if (monitor_all_messages == 0) {
644 const char *path[] = {"@message", (const char *)0};
645 yajl_val v = yajl_tree_get(node, path, yajl_t_string);
647 char json_val[listen_buffer_size];
648 memset(json_val, '\0', listen_buffer_size);
650 snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
653 match_str = (char *)&json_val;
656 // non-JSON rsyslog data
658 // If we have any regex filters, we need to see if the message data
659 // matches any of them (otherwise we're not interested)
660 if (monitor_all_messages == 0)
661 match_str = ring.buffer[ring.tail];
664 // If we have any regex filters, we need to see if the message data
665 // matches any of them (otherwise we're not interested)
666 if (monitor_all_messages == 0)
667 match_str = ring.buffer[ring.tail];
672 // If we care about matching, do that comparison here
673 if (match_str != NULL) {
674 if (ignorelist_match(ignorelist, match_str) != 0)
677 DEBUG("sysevent plugin: regex filter match");
681 if (is_match == 1 && node != NULL) {
682 sysevent_dispatch_notification(NULL, &node, timestamp);
683 yajl_tree_free(node);
684 } else if (is_match == 1)
685 sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
688 sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
694 pthread_mutex_unlock(&sysevent_data_lock);
697 static void *sysevent_socket_thread(void *arg) /* {{{ */
699 pthread_mutex_lock(&sysevent_thread_lock);
701 while (sysevent_socket_thread_loop > 0) {
702 pthread_mutex_unlock(&sysevent_thread_lock);
707 int status = read_socket();
709 pthread_mutex_lock(&sysevent_thread_lock);
712 WARNING("sysevent plugin: problem with socket thread (status: %d)",
714 sysevent_socket_thread_error = 1;
717 } /* while (sysevent_socket_thread_loop > 0) */
719 pthread_mutex_unlock(&sysevent_thread_lock);
722 } /* }}} void *sysevent_socket_thread */
724 // Entry point for thread responsible for reading from
725 // ring buffer and dispatching notifications
726 static void *sysevent_dequeue_thread(void *arg) /* {{{ */
728 pthread_mutex_lock(&sysevent_thread_lock);
730 while (sysevent_dequeue_thread_loop > 0) {
731 pthread_mutex_unlock(&sysevent_thread_lock);
735 pthread_mutex_lock(&sysevent_thread_lock);
736 } /* while (sysevent_dequeue_thread_loop > 0) */
738 pthread_mutex_unlock(&sysevent_thread_lock);
741 } /* }}} void *sysevent_dequeue_thread */
743 static int start_socket_thread(void) /* {{{ */
745 pthread_mutex_lock(&sysevent_thread_lock);
747 if (sysevent_socket_thread_loop != 0) {
748 pthread_mutex_unlock(&sysevent_thread_lock);
752 sysevent_socket_thread_loop = 1;
753 sysevent_socket_thread_error = 0;
755 DEBUG("sysevent plugin: starting socket thread");
757 int status = plugin_thread_create(&sysevent_socket_thread_id,
758 /* attr = */ NULL, sysevent_socket_thread,
759 /* arg = */ (void *)0, "sysevent");
761 sysevent_socket_thread_loop = 0;
762 ERROR("sysevent plugin: starting socket thread failed.");
763 pthread_mutex_unlock(&sysevent_thread_lock);
767 pthread_mutex_unlock(&sysevent_thread_lock);
770 } /* }}} int start_socket_thread */
772 static int start_dequeue_thread(void) /* {{{ */
774 pthread_mutex_lock(&sysevent_thread_lock);
776 if (sysevent_dequeue_thread_loop != 0) {
777 pthread_mutex_unlock(&sysevent_thread_lock);
781 sysevent_dequeue_thread_loop = 1;
783 int status = plugin_thread_create(&sysevent_dequeue_thread_id,
784 /* attr = */ NULL, sysevent_dequeue_thread,
785 /* arg = */ (void *)0, "ssyevent");
787 sysevent_dequeue_thread_loop = 0;
788 ERROR("sysevent plugin: Starting dequeue thread failed.");
789 pthread_mutex_unlock(&sysevent_thread_lock);
793 pthread_mutex_unlock(&sysevent_thread_lock);
796 } /* }}} int start_dequeue_thread */
798 static int start_threads(void) /* {{{ */
800 int status = start_socket_thread();
801 int status2 = start_dequeue_thread();
807 } /* }}} int start_threads */
809 static int stop_socket_thread(int shutdown) /* {{{ */
811 pthread_mutex_lock(&sysevent_thread_lock);
813 if (sysevent_socket_thread_loop == 0) {
814 pthread_mutex_unlock(&sysevent_thread_lock);
818 sysevent_socket_thread_loop = 0;
819 pthread_cond_broadcast(&sysevent_cond);
820 pthread_mutex_unlock(&sysevent_thread_lock);
825 // Since the thread is blocking, calling pthread_join
826 // doesn't actually succeed in stopping it. It will stick around
827 // until a message is received on the socket (at which
828 // it will realize that "sysevent_socket_thread_loop" is 0 and will
829 // break out of the read loop and be allowed to die). This is
830 // fine when the process isn't supposed to be exiting, but in
831 // the case of a process shutdown, we don't want to have an
832 // idle thread hanging around. Calling pthread_cancel here in
833 // the case of a shutdown is just assures that the thread is
834 // gone and that the process has been fully terminated.
836 DEBUG("sysevent plugin: Canceling socket thread for process shutdown");
838 status = pthread_cancel(sysevent_socket_thread_id);
840 if (status != 0 && status != ESRCH) {
841 ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status,
847 status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL);
848 if (status != 0 && status != ESRCH) {
849 ERROR("sysevent plugin: Stopping socket thread failed.");
855 pthread_mutex_lock(&sysevent_thread_lock);
856 memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id));
857 sysevent_socket_thread_error = 0;
858 pthread_mutex_unlock(&sysevent_thread_lock);
860 DEBUG("sysevent plugin: Finished requesting stop of socket thread");
863 } /* }}} int stop_socket_thread */
865 static int stop_dequeue_thread() /* {{{ */
867 pthread_mutex_lock(&sysevent_thread_lock);
869 if (sysevent_dequeue_thread_loop == 0) {
870 pthread_mutex_unlock(&sysevent_thread_lock);
874 sysevent_dequeue_thread_loop = 0;
875 pthread_cond_broadcast(&sysevent_cond);
876 pthread_mutex_unlock(&sysevent_thread_lock);
878 // Since the thread is blocking, calling pthread_join
879 // doesn't actually succeed in stopping it. It will stick around
880 // until a message is received on the socket (at which
881 // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
882 // break out of the read loop and be allowed to die). Since this
883 // function is called when the processing is exiting, we don't want to
884 // have an idle thread hanging around. Calling pthread_cancel here
885 // just assures that the thread is gone and that the process has been
888 DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
890 int status = pthread_cancel(sysevent_dequeue_thread_id);
892 if (status != 0 && status != ESRCH) {
893 ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
899 pthread_mutex_lock(&sysevent_thread_lock);
900 memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
901 pthread_mutex_unlock(&sysevent_thread_lock);
903 DEBUG("sysevent plugin: Finished requesting stop of dequeue thread");
906 } /* }}} int stop_dequeue_thread */
908 static int stop_threads() /* {{{ */
910 int status = stop_socket_thread(1);
911 int status2 = stop_dequeue_thread();
917 } /* }}} int stop_threads */
919 static int sysevent_init(void) /* {{{ */
923 ring.maxLen = buffer_length;
924 ring.buffer = (char **)calloc(buffer_length, sizeof(char *));
926 if (ring.buffer == NULL) {
927 ERROR("sysevent plugin: sysevent_init calloc failed");
931 for (int i = 0; i < buffer_length; i++) {
932 ring.buffer[i] = calloc(1, listen_buffer_size);
935 ring.timestamp = (cdtime_t *)calloc(buffer_length, sizeof(cdtime_t));
938 struct addrinfo hints = {
939 .ai_family = AF_UNSPEC,
940 .ai_socktype = SOCK_DGRAM,
942 .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
944 struct addrinfo *res = 0;
946 int err = getaddrinfo(listen_ip, listen_port, &hints, &res);
949 ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
955 sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
957 ERROR("sysevent plugin: failed to open socket: %s", STRERRNO);
962 if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
963 ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
972 DEBUG("sysevent plugin: socket created and bound");
974 return start_threads();
975 } /* }}} int sysevent_init */
977 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
979 if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
980 ci->values[1].type != OCONFIG_TYPE_STRING) {
981 ERROR("sysevent plugin: The `%s' config option needs "
982 "two string arguments (ip and port).",
987 listen_ip = strdup(ci->values[0].value.string);
988 listen_port = strdup(ci->values[1].value.string);
993 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
997 if (cf_util_get_int(ci, &tmp) != 0)
999 else if ((tmp >= 1024) && (tmp <= 65535))
1000 listen_buffer_size = tmp;
1003 "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
1010 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
1014 if (cf_util_get_int(ci, &tmp) != 0)
1016 else if ((tmp >= 3) && (tmp <= 4096))
1017 buffer_length = tmp;
1019 WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
1026 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
1028 if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
1029 ERROR("sysevent plugin: The `%s' config option needs "
1030 "one string argument, a regular expression.",
1036 if (ignorelist == NULL)
1037 ignorelist = ignorelist_create(/* invert = */ 1);
1039 int status = ignorelist_add(ignorelist, ci->values[0].value.string);
1042 ERROR("sysevent plugin: invalid regular expression: %s",
1043 ci->values[0].value.string);
1047 monitor_all_messages = 0;
1049 WARNING("sysevent plugin: The plugin has been compiled without support "
1050 "for the \"RegexFilter\" option.");
1056 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
1058 for (int i = 0; i < ci->children_num; i++) {
1059 oconfig_item_t *child = ci->children + i;
1061 if (strcasecmp("Listen", child->key) == 0)
1062 sysevent_config_add_listen(child);
1063 else if (strcasecmp("BufferSize", child->key) == 0)
1064 sysevent_config_add_buffer_size(child);
1065 else if (strcasecmp("BufferLength", child->key) == 0)
1066 sysevent_config_add_buffer_length(child);
1067 else if (strcasecmp("RegexFilter", child->key) == 0)
1068 sysevent_config_add_regex_filter(child);
1070 WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
1075 } /* }}} int sysevent_config */
1077 static int sysevent_read(void) /* {{{ */
1079 pthread_mutex_lock(&sysevent_thread_lock);
1081 if (sysevent_socket_thread_error != 0) {
1082 pthread_mutex_unlock(&sysevent_thread_lock);
1084 ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
1086 sysevent_socket_thread_error);
1093 } /* if (sysevent_socket_thread_error != 0) */
1095 pthread_mutex_unlock(&sysevent_thread_lock);
1098 } /* }}} int sysevent_read */
1100 static int sysevent_shutdown(void) /* {{{ */
1102 DEBUG("sysevent plugin: Shutting down thread.");
1104 int status = stop_threads();
1108 status2 = close(sock);
1110 ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
1120 for (int i = 0; i < buffer_length; i++) {
1121 free(ring.buffer[i]);
1125 free(ring.timestamp);
1131 } /* }}} int sysevent_shutdown */
1133 void module_register(void) {
1134 plugin_register_complex_config("sysevent", sysevent_config);
1135 plugin_register_init("sysevent", sysevent_init);
1136 plugin_register_read("sysevent", sysevent_read);
1137 plugin_register_shutdown("sysevent", sysevent_shutdown);
1138 } /* void module_register */