#include "collectd.h"
-#include "common.h"
#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
#include "utils_complain.h"
-#include "utils_ignorelist.h"
-#include <asm/types.h>
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
int tail;
int maxLen;
char **buffer;
- long long unsigned int *timestamp;
+ cdtime_t *timestamp;
} circbuf_t;
/*
* Private variables
*/
+
static ignorelist_t *ignorelist = NULL;
-static int sysevent_thread_loop = 0;
-static int sysevent_thread_error = 0;
-static pthread_t sysevent_thread_id;
-static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
+static int sysevent_socket_thread_loop = 0;
+static int sysevent_socket_thread_error = 0;
+static pthread_t sysevent_socket_thread_id;
+static int sysevent_dequeue_thread_loop = 0;
+static pthread_t sysevent_dequeue_thread_id;
+static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER;
static int sock = -1;
static int event_id = 0;
static circbuf_t ring;
*/
static int gen_message_payload(const char *msg, char *sev, int sev_num,
- char *process, char *host,
- long long unsigned int timestamp, char **buf) {
+ char *process, char *host, cdtime_t timestamp,
+ char **buf) {
const unsigned char *buf2;
yajl_gen g;
char json_str[DATA_MAX_NAME_LEN];
#if !defined(HAVE_YAJL_V2)
- yajl_gen_config conf = {};
-
- conf.beautify = 0;
+ yajl_gen_config conf = {0};
#endif
#if HAVE_YAJL_V2
goto err;
event_id = event_id + 1;
- int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, event_id_len, "%d", event_id);
+ snprintf(json_str, sizeof(json_str), "%d", event_id);
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
goto err;
- int event_name_len = 0;
- event_name_len = event_name_len + strlen(host); // host name
- event_name_len =
- event_name_len +
- 22; // "host", "rsyslog", "message", 3 spaces and null-terminator
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, event_name_len, "host %s rsyslog message", host);
+ snprintf(json_str, sizeof(json_str), "host %s rsyslog message", host);
if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
yajl_gen_status_ok) {
yajl_gen_status_ok)
goto err;
- int last_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, last_epoch_microsec_len, "%llu",
- (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+ snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(cdtime()));
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
yajl_gen_status_ok)
goto err;
- int start_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, start_epoch_microsec_len, "%llu",
- (long long unsigned int)timestamp);
+ snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(timestamp));
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
yajl_gen_status_ok)
goto err;
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
- goto err;
-
// syslogMsg
if (msg != NULL) {
if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
// *** END syslog fields ***
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+ // close syslog and header fields
+ if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+ yajl_gen_map_close(g) != yajl_gen_status_ok)
goto err;
if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
goto err;
- *buf = malloc(strlen((char *)buf2) + 1);
+ *buf = strdup((char *)buf2);
- sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
+ if (*buf == NULL) {
+ ERROR("sysevent plugin: gen_message_payload strdup failed");
+ goto err;
+ }
yajl_gen_free(g);
return -1;
}
-static void *sysevent_thread(void *arg) /* {{{ */
-{
- pthread_mutex_lock(&sysevent_lock);
-
- while (sysevent_thread_loop > 0) {
- int status = 0;
+static int read_socket() {
+ int recv_flags = MSG_DONTWAIT;
- pthread_mutex_unlock(&sysevent_lock);
-
- if (sock == -1)
- return ((void *)0);
-
- char buffer[listen_buffer_size];
+ while (42) {
struct sockaddr_storage src_addr;
socklen_t src_addr_len = sizeof(src_addr);
+ char buffer[listen_buffer_size];
memset(buffer, '\0', listen_buffer_size);
- ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
+ ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags,
(struct sockaddr *)&src_addr, &src_addr_len);
- if (count == -1) {
- ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
- status = -1;
- } else if (count >= sizeof(buffer)) {
- WARNING("sysevent plugin: datagram too large for buffer: truncated");
- } else {
- // 1. Acquire lock
- // 2. Push to buffer if there is room, otherwise raise warning
-
- pthread_mutex_lock(&sysevent_lock);
-
- int next = ring.head + 1;
- if (next >= ring.maxLen)
- next = 0;
-
- if (next == ring.tail) {
- WARNING("sysevent plugin: ring buffer full");
+ if (count < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ pthread_mutex_lock(&sysevent_data_lock);
+
+ // There was nothing more to receive for now, so...
+ // If ring head does not equal ring tail, there is data
+ // in the ring buffer for the dequeue thread to read, so
+ // signal it
+ if (ring.head != ring.tail)
+ pthread_cond_signal(&sysevent_cond);
+
+ pthread_mutex_unlock(&sysevent_data_lock);
+
+ // Since there was nothing to receive, set recv to block and
+ // try again
+ recv_flags = 0;
+ continue;
+ } else if (errno != EINTR) {
+ ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
+ return -1;
} else {
- DEBUG("sysevent plugin: writing %s", buffer);
-
- strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
- ring.timestamp[ring.head] =
- (long long unsigned int)CDTIME_T_TO_US(cdtime());
- ring.head = next;
+ // Interrupt, so continue and try again
+ continue;
}
-
- pthread_mutex_unlock(&sysevent_lock);
}
- usleep(1000);
-
- pthread_mutex_lock(&sysevent_lock);
-
- if (status < 0) {
- WARNING("sysevent plugin: problem with thread status: %d", status);
- sysevent_thread_error = 1;
- break;
- }
-
- if (sysevent_thread_loop <= 0)
- break;
- } /* while (sysevent_thread_loop > 0) */
-
- pthread_mutex_unlock(&sysevent_lock);
-
- // pthread_exit instead of return?
- return ((void *)0);
-} /* }}} void *sysevent_thread */
-
-static int start_thread(void) /* {{{ */
-{
- int status;
-
- pthread_mutex_lock(&sysevent_lock);
-
- if (sysevent_thread_loop != 0) {
- pthread_mutex_unlock(&sysevent_lock);
- return (0);
- }
-
- sysevent_thread_loop = 1;
- sysevent_thread_error = 0;
-
- DEBUG("sysevent plugin: starting thread");
-
- status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
- sysevent_thread,
- /* arg = */ (void *)0, "sysevent");
- if (status != 0) {
- sysevent_thread_loop = 0;
- ERROR("sysevent plugin: starting thread failed.");
- pthread_mutex_unlock(&sysevent_lock);
- return (-1);
- }
-
- pthread_mutex_unlock(&sysevent_lock);
- return (0);
-} /* }}} int start_thread */
-
-static int stop_thread(int shutdown) /* {{{ */
-{
- int status;
-
- pthread_mutex_lock(&sysevent_lock);
-
- if (sysevent_thread_loop == 0) {
- pthread_mutex_unlock(&sysevent_lock);
- return (-1);
- }
-
- sysevent_thread_loop = 0;
- pthread_mutex_unlock(&sysevent_lock);
-
- if (shutdown == 1) {
- // Since the thread is blocking, calling pthread_join
- // doesn't actually succeed in stopping it. It will stick around
- // until a message is received on the socket (at which
- // it will realize that "sysevent_thread_loop" is 0 and will
- // break out of the read loop and be allowed to die). This is
- // fine when the process isn't supposed to be exiting, but in
- // the case of a process shutdown, we don't want to have an
- // idle thread hanging around. Calling pthread_cancel here in
- // the case of a shutdown is just assures that the thread is
- // gone and that the process has been fully terminated.
-
- DEBUG("sysevent plugin: Canceling thread for process shutdown");
-
- status = pthread_cancel(sysevent_thread_id);
-
- if (status != 0) {
- ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
- strerror(errno));
- status = -1;
- }
- } else {
- status = pthread_join(sysevent_thread_id, /* return = */ NULL);
- if (status != 0) {
- ERROR("sysevent plugin: Stopping thread failed.");
- status = -1;
+ if (count >= sizeof(buffer)) {
+ WARNING("sysevent plugin: datagram too large for buffer: truncated");
}
- }
- pthread_mutex_lock(&sysevent_lock);
- memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
- sysevent_thread_error = 0;
- pthread_mutex_unlock(&sysevent_lock);
+ // We successfully received a message, so don't block on the next
+ // read in case there are more (and if there aren't, it will be
+ // handled above in the EWOULDBLOCK error-checking)
+ recv_flags = MSG_DONTWAIT;
- DEBUG("sysevent plugin: Finished requesting stop of thread");
+ // 1. Acquire data lock
+ // 2. Push to buffer if there is room, otherwise raise warning
+ // and allow dequeue thread to take over
- return (status);
-} /* }}} int stop_thread */
+ pthread_mutex_lock(&sysevent_data_lock);
-static int sysevent_init(void) /* {{{ */
-{
- ring.head = 0;
- ring.tail = 0;
- ring.maxLen = buffer_length;
- ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
-
- for (int i = 0; i < buffer_length; i++) {
- ring.buffer[i] = malloc(listen_buffer_size);
- }
-
- ring.timestamp = (long long unsigned int *)malloc(
- buffer_length * sizeof(long long unsigned int));
-
- if (sock == -1) {
- const char *hostname = listen_ip;
- const char *portname = listen_port;
- struct addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_DGRAM;
- hints.ai_protocol = 0;
- hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
- struct addrinfo *res = 0;
+ int next = ring.head + 1;
+ if (next >= ring.maxLen)
+ next = 0;
- int err = getaddrinfo(hostname, portname, &hints, &res);
+ if (next == ring.tail) {
+ // Buffer is full, signal the dequeue thread to process the buffer
+ // and clean it out, and then sleep
+ WARNING("sysevent plugin: ring buffer full");
- if (err != 0) {
- ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
- err);
- freeaddrinfo(res);
- return (-1);
- }
+ pthread_cond_signal(&sysevent_cond);
+ pthread_mutex_unlock(&sysevent_data_lock);
- sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- if (sock == -1) {
- ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
- freeaddrinfo(res);
- return (-1);
- }
+ usleep(1000);
+ continue;
+ } else {
+ DEBUG("sysevent plugin: writing %s", buffer);
- if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
- ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
- freeaddrinfo(res);
- return (-1);
+ sstrncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
+ ring.timestamp[ring.head] = cdtime();
+ ring.head = next;
}
- freeaddrinfo(res);
+ pthread_mutex_unlock(&sysevent_data_lock);
}
-
- DEBUG("sysevent plugin: socket created and bound");
-
- return (start_thread());
-} /* }}} int sysevent_init */
-
-static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
-{
- if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
- ci->values[1].type != OCONFIG_TYPE_STRING) {
- ERROR("sysevent plugin: The `%s' config option needs "
- "two string arguments (ip and port).",
- ci->key);
- return (-1);
- }
-
- listen_ip = strdup(ci->values[0].value.string);
- listen_port = strdup(ci->values[1].value.string);
-
- return (0);
}
-static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
-{
- int tmp = 0;
-
- if (cf_util_get_int(ci, &tmp) != 0)
- return (-1);
- else if ((tmp >= 1024) && (tmp <= 65535))
- listen_buffer_size = tmp;
- else {
- WARNING(
- "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
- return (-1);
- }
-
- return (0);
-}
-
-static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
-{
- int tmp = 0;
-
- if (cf_util_get_int(ci, &tmp) != 0)
- return (-1);
- else if ((tmp >= 3) && (tmp <= 4096))
- buffer_length = tmp;
- else {
- WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
- return (-1);
- }
-
- return (0);
-}
-
-static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
-{
- if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
- ERROR("sysevent plugin: The `%s' config option needs "
- "one string argument, a regular expression.",
- ci->key);
- return (-1);
- }
-
-#if HAVE_REGEX_H
- if (ignorelist == NULL)
- ignorelist = ignorelist_create(/* invert = */ 1);
-
- int status = ignorelist_add(ignorelist, ci->values[0].value.string);
-
- if (status != 0) {
- ERROR("sysevent plugin: invalid regular expression: %s",
- ci->values[0].value.string);
- return (1);
- }
-
- monitor_all_messages = 0;
-#else
- WARNING("sysevent plugin: The plugin has been compiled without support "
- "for the \"RegexFilter\" option.");
-#endif
-
- return (0);
-}
-
-static int sysevent_config(oconfig_item_t *ci) /* {{{ */
-{
- for (int i = 0; i < ci->children_num; i++) {
- oconfig_item_t *child = ci->children + i;
-
- if (strcasecmp("Listen", child->key) == 0)
- sysevent_config_add_listen(child);
- else if (strcasecmp("BufferSize", child->key) == 0)
- sysevent_config_add_buffer_size(child);
- else if (strcasecmp("BufferLength", child->key) == 0)
- sysevent_config_add_buffer_length(child);
- else if (strcasecmp("RegexFilter", child->key) == 0)
- sysevent_config_add_regex_filter(child);
- else {
- WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
- }
- }
-
- return (0);
-} /* }}} int sysevent_config */
-
static void sysevent_dispatch_notification(const char *message,
#if HAVE_YAJL_V2
yajl_val *node,
#endif
- long long unsigned int timestamp) {
+ cdtime_t timestamp) {
char *buf = NULL;
- notification_t n = {NOTIF_OKAY, cdtime(), "", "", "sysevent",
- "", "", "", NULL};
+
+ notification_t n = {
+ .severity = NOTIF_OKAY,
+ .time = cdtime(),
+ .plugin = "sysevent",
+ .type = "gauge",
+ };
#if HAVE_YAJL_V2
if (node != NULL) {
// If we have a parsed-JSON node to work with, use that
-
- char process[listen_buffer_size];
- char severity[listen_buffer_size];
- char sev_num_str[listen_buffer_size];
- char msg[listen_buffer_size];
- char hostname_str[listen_buffer_size];
- int sev_num = -1;
-
// msg
const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
+ char msg[listen_buffer_size];
+
if (msg_v != NULL) {
memset(msg, '\0', listen_buffer_size);
snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
(const char *)0};
yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
+ char severity[listen_buffer_size];
+
if (severity_v != NULL) {
memset(severity, '\0', listen_buffer_size);
snprintf(severity, listen_buffer_size, "%s%c",
yajl_val sev_num_str_v =
yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
+ char sev_num_str[listen_buffer_size];
+ int sev_num = -1;
+
if (sev_num_str_v != NULL) {
memset(sev_num_str, '\0', listen_buffer_size);
snprintf(sev_num_str, listen_buffer_size, "%s%c",
(const char *)0};
yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
+ char process[listen_buffer_size];
+
if (process_v != NULL) {
memset(process, '\0', listen_buffer_size);
snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
+ char hostname_str[listen_buffer_size];
+
if (hostname_v != NULL) {
memset(hostname_str, '\0', listen_buffer_size);
snprintf(hostname_str, listen_buffer_size, "%s%c",
#endif
sstrncpy(n.host, hostname_g, sizeof(n.host));
- sstrncpy(n.type, "gauge", sizeof(n.type));
- notification_meta_t *m = calloc(1, sizeof(*m));
+ int status = plugin_notification_meta_add_string(&n, "ves", buf);
- if (m == NULL) {
- char errbuf[1024];
+ if (status < 0) {
sfree(buf);
- ERROR("sysevent plugin: unable to allocate metadata: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("sysevent plugin: unable to set notification VES metadata: %s",
+ STRERRNO);
return;
}
- sstrncpy(m->name, "ves", sizeof(m->name));
- m->nm_value.nm_string = sstrdup(buf);
- m->type = NM_TYPE_STRING;
- n.meta = m;
-
- DEBUG("sysevent plugin: notification message: %s",
+ DEBUG("sysevent plugin: notification VES metadata: %s",
n.meta->nm_value.nm_string);
DEBUG("sysevent plugin: dispatching message");
plugin_dispatch_notification(&n);
plugin_notification_meta_free(n.meta);
- // malloc'd in gen_message_payload
+ // strdup'd in gen_message_payload
if (buf != NULL)
sfree(buf);
}
-static int sysevent_read(void) /* {{{ */
-{
- if (sysevent_thread_error != 0) {
- ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
- "it.",
- sysevent_thread_error);
-
- stop_thread(0);
+static void read_ring_buffer() {
+ pthread_mutex_lock(&sysevent_data_lock);
- start_thread();
-
- return (-1);
- } /* if (sysevent_thread_error != 0) */
-
- pthread_mutex_lock(&sysevent_lock);
+ // If there's currently nothing to read from the buffer,
+ // then wait
+ if (ring.head == ring.tail)
+ pthread_cond_wait(&sysevent_cond, &sysevent_data_lock);
while (ring.head != ring.tail) {
- long long unsigned int timestamp;
- int is_match = 1;
- char *match_str = NULL;
int next = ring.tail + 1;
if (next >= ring.maxLen)
DEBUG("sysevent plugin: reading from ring buffer: %s",
ring.buffer[ring.tail]);
- timestamp = ring.timestamp[ring.tail];
+ cdtime_t timestamp = ring.timestamp[ring.tail];
+ char *match_str = NULL;
#if HAVE_YAJL_V2
// Try to parse JSON, and if it fails, fall back to plain string
- yajl_val node = NULL;
char errbuf[1024];
errbuf[0] = 0;
- node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
- sizeof(errbuf));
+ yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail],
+ errbuf, sizeof(errbuf));
if (node != NULL) {
// JSON rsyslog data
// If we have any regex filters, we need to see if the message portion of
// the data matches any of them (otherwise we're not interested)
if (monitor_all_messages == 0) {
- char json_val[listen_buffer_size];
const char *path[] = {"@message", (const char *)0};
yajl_val v = yajl_tree_get(node, path, yajl_t_string);
+ char json_val[listen_buffer_size];
memset(json_val, '\0', listen_buffer_size);
snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
match_str = ring.buffer[ring.tail];
#endif
+ int is_match = 1;
+
// If we care about matching, do that comparison here
if (match_str != NULL) {
- is_match = 1;
-
if (ignorelist_match(ignorelist, match_str) != 0)
is_match = 0;
else
ring.tail = next;
}
- pthread_mutex_unlock(&sysevent_lock);
+ pthread_mutex_unlock(&sysevent_data_lock);
+}
- return (0);
-} /* }}} int sysevent_read */
+static void *sysevent_socket_thread(void *arg) /* {{{ */
+{
+ pthread_mutex_lock(&sysevent_thread_lock);
-static int sysevent_shutdown(void) /* {{{ */
+ while (sysevent_socket_thread_loop > 0) {
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ if (sock == -1)
+ return (void *)0;
+
+ int status = read_socket();
+
+ pthread_mutex_lock(&sysevent_thread_lock);
+
+ if (status < 0) {
+ WARNING("sysevent plugin: problem with socket thread (status: %d)",
+ status);
+ sysevent_socket_thread_error = 1;
+ break;
+ }
+ } /* while (sysevent_socket_thread_loop > 0) */
+
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ return (void *)0;
+} /* }}} void *sysevent_socket_thread */
+
+// Entry point for thread responsible for reading from
+// ring buffer and dispatching notifications
+static void *sysevent_dequeue_thread(void *arg) /* {{{ */
+{
+ pthread_mutex_lock(&sysevent_thread_lock);
+
+ while (sysevent_dequeue_thread_loop > 0) {
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ read_ring_buffer();
+
+ pthread_mutex_lock(&sysevent_thread_lock);
+ } /* while (sysevent_dequeue_thread_loop > 0) */
+
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ return (void *)0;
+} /* }}} void *sysevent_dequeue_thread */
+
+static int start_socket_thread(void) /* {{{ */
+{
+ pthread_mutex_lock(&sysevent_thread_lock);
+
+ if (sysevent_socket_thread_loop != 0) {
+ pthread_mutex_unlock(&sysevent_thread_lock);
+ return 0;
+ }
+
+ sysevent_socket_thread_loop = 1;
+ sysevent_socket_thread_error = 0;
+
+ DEBUG("sysevent plugin: starting socket thread");
+
+ int status = plugin_thread_create(&sysevent_socket_thread_id,
+ /* attr = */ NULL, sysevent_socket_thread,
+ /* arg = */ (void *)0, "sysevent");
+ if (status != 0) {
+ sysevent_socket_thread_loop = 0;
+ ERROR("sysevent plugin: starting socket thread failed.");
+ pthread_mutex_unlock(&sysevent_thread_lock);
+ return -1;
+ }
+
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ return 0;
+} /* }}} int start_socket_thread */
+
+static int start_dequeue_thread(void) /* {{{ */
{
+ pthread_mutex_lock(&sysevent_thread_lock);
+
+ if (sysevent_dequeue_thread_loop != 0) {
+ pthread_mutex_unlock(&sysevent_thread_lock);
+ return 0;
+ }
+
+ sysevent_dequeue_thread_loop = 1;
+
+ int status = plugin_thread_create(&sysevent_dequeue_thread_id,
+ /* attr = */ NULL, sysevent_dequeue_thread,
+ /* arg = */ (void *)0, "ssyevent");
+ if (status != 0) {
+ sysevent_dequeue_thread_loop = 0;
+ ERROR("sysevent plugin: Starting dequeue thread failed.");
+ pthread_mutex_unlock(&sysevent_thread_lock);
+ return -1;
+ }
+
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+ int status = start_socket_thread();
+ int status2 = start_dequeue_thread();
+
+ if (status != 0)
+ return status;
+ else
+ return status2;
+} /* }}} int start_threads */
+
+static int stop_socket_thread(int shutdown) /* {{{ */
+{
+ pthread_mutex_lock(&sysevent_thread_lock);
+
+ if (sysevent_socket_thread_loop == 0) {
+ pthread_mutex_unlock(&sysevent_thread_lock);
+ return -1;
+ }
+
+ sysevent_socket_thread_loop = 0;
+ pthread_cond_broadcast(&sysevent_cond);
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
int status;
+ if (shutdown == 1) {
+ // Since the thread is blocking, calling pthread_join
+ // doesn't actually succeed in stopping it. It will stick around
+ // until a message is received on the socket (at which
+ // it will realize that "sysevent_socket_thread_loop" is 0 and will
+ // break out of the read loop and be allowed to die). This is
+ // fine when the process isn't supposed to be exiting, but in
+ // the case of a process shutdown, we don't want to have an
+ // idle thread hanging around. Calling pthread_cancel here in
+ // the case of a shutdown is just assures that the thread is
+ // gone and that the process has been fully terminated.
+
+ DEBUG("sysevent plugin: Canceling socket thread for process shutdown");
+
+ status = pthread_cancel(sysevent_socket_thread_id);
+
+ if (status != 0 && status != ESRCH) {
+ ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status,
+ STRERRNO);
+ status = -1;
+ } else
+ status = 0;
+ } else {
+ status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL);
+ if (status != 0 && status != ESRCH) {
+ ERROR("sysevent plugin: Stopping socket thread failed.");
+ status = -1;
+ } else
+ status = 0;
+ }
+
+ pthread_mutex_lock(&sysevent_thread_lock);
+ memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id));
+ sysevent_socket_thread_error = 0;
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ DEBUG("sysevent plugin: Finished requesting stop of socket thread");
+
+ return status;
+} /* }}} int stop_socket_thread */
+
+static int stop_dequeue_thread() /* {{{ */
+{
+ pthread_mutex_lock(&sysevent_thread_lock);
+
+ if (sysevent_dequeue_thread_loop == 0) {
+ pthread_mutex_unlock(&sysevent_thread_lock);
+ return -1;
+ }
+
+ sysevent_dequeue_thread_loop = 0;
+ pthread_cond_broadcast(&sysevent_cond);
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ // Since the thread is blocking, calling pthread_join
+ // doesn't actually succeed in stopping it. It will stick around
+ // until a message is received on the socket (at which
+ // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
+ // break out of the read loop and be allowed to die). Since this
+ // function is called when the processing is exiting, we don't want to
+ // have an idle thread hanging around. Calling pthread_cancel here
+ // just assures that the thread is gone and that the process has been
+ // fully terminated.
+
+ DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
+
+ int status = pthread_cancel(sysevent_dequeue_thread_id);
+
+ if (status != 0 && status != ESRCH) {
+ ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
+ STRERRNO);
+ status = -1;
+ } else
+ status = 0;
+
+ pthread_mutex_lock(&sysevent_thread_lock);
+ memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ DEBUG("sysevent plugin: Finished requesting stop of dequeue thread");
+
+ return status;
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads() /* {{{ */
+{
+ int status = stop_socket_thread(1);
+ int status2 = stop_dequeue_thread();
+
+ if (status != 0)
+ return status;
+ else
+ return status2;
+} /* }}} int stop_threads */
+
+static int sysevent_init(void) /* {{{ */
+{
+ ring.head = 0;
+ ring.tail = 0;
+ ring.maxLen = buffer_length;
+ ring.buffer = (char **)calloc(buffer_length, sizeof(char *));
+
+ if (ring.buffer == NULL) {
+ ERROR("sysevent plugin: sysevent_init ring buffer calloc failed");
+ return -1;
+ }
+
+ for (int i = 0; i < buffer_length; i++) {
+ ring.buffer[i] = calloc(1, listen_buffer_size);
+
+ if (ring.buffer[i] == NULL) {
+ ERROR("sysevent plugin: sysevent_init ring buffer entry calloc failed");
+ return -1;
+ }
+ }
+
+ ring.timestamp = (cdtime_t *)calloc(buffer_length, sizeof(cdtime_t));
+
+ if (ring.timestamp == NULL) {
+ ERROR("sysevent plugin: sysevent_init ring buffer timestamp calloc failed");
+ return -1;
+ }
+
+ if (sock == -1) {
+ struct addrinfo hints = {
+ .ai_family = AF_UNSPEC,
+ .ai_socktype = SOCK_DGRAM,
+ .ai_protocol = 0,
+ .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
+ };
+ struct addrinfo *res = 0;
+
+ int err = getaddrinfo(listen_ip, listen_port, &hints, &res);
+
+ if (err != 0) {
+ ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
+ err);
+ freeaddrinfo(res);
+ return -1;
+ }
+
+ sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (sock == -1) {
+ ERROR("sysevent plugin: failed to open socket: %s", STRERRNO);
+ freeaddrinfo(res);
+ return -1;
+ }
+
+ if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
+ ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
+ freeaddrinfo(res);
+ sock = -1;
+ return -1;
+ }
+
+ freeaddrinfo(res);
+ }
+
+ DEBUG("sysevent plugin: socket created and bound");
+
+ return start_threads();
+} /* }}} int sysevent_init */
+
+static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
+{
+ if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
+ ci->values[1].type != OCONFIG_TYPE_STRING) {
+ ERROR("sysevent plugin: The `%s' config option needs "
+ "two string arguments (ip and port).",
+ ci->key);
+ return -1;
+ }
+
+ listen_ip = strdup(ci->values[0].value.string);
+ listen_port = strdup(ci->values[1].value.string);
+
+ return 0;
+}
+
+static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
+{
+ int tmp = 0;
+
+ if (cf_util_get_int(ci, &tmp) != 0)
+ return -1;
+ else if ((tmp >= 1024) && (tmp <= 65535))
+ listen_buffer_size = tmp;
+ else {
+ WARNING(
+ "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
+{
+ int tmp = 0;
+
+ if (cf_util_get_int(ci, &tmp) != 0)
+ return -1;
+ else if ((tmp >= 3) && (tmp <= 4096))
+ buffer_length = tmp;
+ else {
+ WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
+{
+ if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
+ ERROR("sysevent plugin: The `%s' config option needs "
+ "one string argument, a regular expression.",
+ ci->key);
+ return -1;
+ }
+
+#if HAVE_REGEX_H
+ if (ignorelist == NULL)
+ ignorelist = ignorelist_create(/* invert = */ 1);
+
+ int status = ignorelist_add(ignorelist, ci->values[0].value.string);
+
+ if (status != 0) {
+ ERROR("sysevent plugin: invalid regular expression: %s",
+ ci->values[0].value.string);
+ return 1;
+ }
+
+ monitor_all_messages = 0;
+#else
+ WARNING("sysevent plugin: The plugin has been compiled without support "
+ "for the \"RegexFilter\" option.");
+#endif
+
+ return 0;
+}
+
+static int sysevent_config(oconfig_item_t *ci) /* {{{ */
+{
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Listen", child->key) == 0)
+ sysevent_config_add_listen(child);
+ else if (strcasecmp("BufferSize", child->key) == 0)
+ sysevent_config_add_buffer_size(child);
+ else if (strcasecmp("BufferLength", child->key) == 0)
+ sysevent_config_add_buffer_length(child);
+ else if (strcasecmp("RegexFilter", child->key) == 0)
+ sysevent_config_add_regex_filter(child);
+ else {
+ WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
+ }
+ }
+
+ return 0;
+} /* }}} int sysevent_config */
+
+static int sysevent_read(void) /* {{{ */
+{
+ pthread_mutex_lock(&sysevent_thread_lock);
+
+ if (sysevent_socket_thread_error != 0) {
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
+ "Restarting it.",
+ sysevent_socket_thread_error);
+
+ stop_threads();
+
+ start_threads();
+
+ return -1;
+ } /* if (sysevent_socket_thread_error != 0) */
+
+ pthread_mutex_unlock(&sysevent_thread_lock);
+
+ return 0;
+} /* }}} int sysevent_read */
+
+static int sysevent_shutdown(void) /* {{{ */
+{
DEBUG("sysevent plugin: Shutting down thread.");
- if (stop_thread(1) < 0)
- return (-1);
+
+ int status = stop_threads();
+ int status2 = 0;
if (sock != -1) {
- status = close(sock);
- if (status != 0) {
+ status2 = close(sock);
+ if (status2 != 0) {
ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
- strerror(errno));
- return (-1);
- } else
- sock = -1;
+ STRERRNO);
+ }
+
+ sock = -1;
}
free(listen_ip);
free(ring.buffer);
free(ring.timestamp);
- return (0);
+ if (status != 0)
+ return status;
+ else
+ return status2;
} /* }}} int sysevent_shutdown */
void module_register(void) {