X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fsysevent.c;h=7f9aa9f65460a77badf9cdca69a64048375c2fa9;hb=0e73c26f5670b2f11eecfdadaf545bcc7f260658;hp=e7dfdab787d3b64fd90358b54a324361b5ce4b47;hpb=bcd52823cbd135061373c1e157a4e42cf0898605;p=collectd.git diff --git a/src/sysevent.c b/src/sysevent.c index e7dfdab7..7f9aa9f6 100644 --- a/src/sysevent.c +++ b/src/sysevent.c @@ -26,10 +26,10 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" +#include "utils/common/common.h" +#include "utils/ignorelist/ignorelist.h" #include "utils_complain.h" -#include "utils_ignorelist.h" #include #include @@ -95,18 +95,23 @@ typedef struct { int tail; int maxLen; char **buffer; - long long unsigned int *timestamp; + cdtime_t *timestamp; } circbuf_t; /* * Private variables */ + static ignorelist_t *ignorelist = NULL; -static int sysevent_thread_loop = 0; -static int sysevent_thread_error = 0; -static pthread_t sysevent_thread_id; -static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER; +static int sysevent_socket_thread_loop = 0; +static int sysevent_socket_thread_error = 0; +static pthread_t sysevent_socket_thread_id; +static int sysevent_dequeue_thread_loop = 0; +static pthread_t sysevent_dequeue_thread_id; +static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER; static int sock = -1; static int event_id = 0; static circbuf_t ring; @@ -129,16 +134,14 @@ static const char *rsyslog_field_keys[5] = { */ static int gen_message_payload(const char *msg, char *sev, int sev_num, - char *process, char *host, - long long unsigned int timestamp, char **buf) { + char *process, char *host, cdtime_t timestamp, + char **buf) { const unsigned char *buf2; yajl_gen g; char json_str[DATA_MAX_NAME_LEN]; #if !defined(HAVE_YAJL_V2) - yajl_gen_config conf = {}; - - conf.beautify = 0; + yajl_gen_config conf = {0}; #endif #if HAVE_YAJL_V2 @@ -172,9 +175,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num, goto err; event_id = event_id + 1; - int event_id_len = sizeof(char) * sizeof(int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_id_len, "%d", event_id); + snprintf(json_str, sizeof(json_str), "%d", event_id); if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -185,13 +186,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num, strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok) goto err; - int event_name_len = 0; - event_name_len = event_name_len + strlen(host); // host name - event_name_len = - event_name_len + - 22; // "host", "rsyslog", "message", 3 spaces and null-terminator - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_name_len, "host %s rsyslog message", host); + snprintf(json_str, sizeof(json_str), "host %s rsyslog message", host); if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { @@ -204,11 +199,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num, yajl_gen_status_ok) goto err; - int last_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, last_epoch_microsec_len, "%llu", - (long long unsigned int)CDTIME_T_TO_US(cdtime())); + snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(cdtime())); if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -282,11 +273,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num, yajl_gen_status_ok) goto err; - int start_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, start_epoch_microsec_len, "%llu", - (long long unsigned int)timestamp); + snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(timestamp)); if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -387,20 +374,22 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num, strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok) goto err; - if (yajl_gen_map_close(g) != yajl_gen_status_ok) - goto err; - // *** END syslog fields *** - if (yajl_gen_map_close(g) != yajl_gen_status_ok) + // close syslog and header fields + if (yajl_gen_map_close(g) != yajl_gen_status_ok || + yajl_gen_map_close(g) != yajl_gen_status_ok) goto err; if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; - *buf = malloc(strlen((char *)buf2) + 1); + *buf = strdup((char *)buf2); - sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); + if (*buf == NULL) { + ERROR("sysevent plugin: gen_message_payload strdup failed"); + goto err; + } yajl_gen_free(g); @@ -412,338 +401,109 @@ err: return -1; } -static void *sysevent_thread(void *arg) /* {{{ */ -{ - pthread_mutex_lock(&sysevent_lock); - - while (sysevent_thread_loop > 0) { - int status = 0; +static int read_socket() { + int recv_flags = MSG_DONTWAIT; - pthread_mutex_unlock(&sysevent_lock); - - if (sock == -1) - return ((void *)0); - - char buffer[listen_buffer_size]; + while (42) { struct sockaddr_storage src_addr; socklen_t src_addr_len = sizeof(src_addr); + char buffer[listen_buffer_size]; memset(buffer, '\0', listen_buffer_size); - ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0, + ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags, (struct sockaddr *)&src_addr, &src_addr_len); - if (count == -1) { - ERROR("sysevent plugin: failed to receive data: %s", strerror(errno)); - status = -1; - } else if (count >= sizeof(buffer)) { - WARNING("sysevent plugin: datagram too large for buffer: truncated"); - } else { - // 1. Acquire lock - // 2. Push to buffer if there is room, otherwise raise warning - - pthread_mutex_lock(&sysevent_lock); - - int next = ring.head + 1; - if (next >= ring.maxLen) - next = 0; - - if (next == ring.tail) { - WARNING("sysevent plugin: ring buffer full"); + if (count < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + pthread_mutex_lock(&sysevent_data_lock); + + // There was nothing more to receive for now, so... + // If ring head does not equal ring tail, there is data + // in the ring buffer for the dequeue thread to read, so + // signal it + if (ring.head != ring.tail) + pthread_cond_signal(&sysevent_cond); + + pthread_mutex_unlock(&sysevent_data_lock); + + // Since there was nothing to receive, set recv to block and + // try again + recv_flags = 0; + continue; + } else if (errno != EINTR) { + ERROR("sysevent plugin: failed to receive data: %s", STRERRNO); + return -1; } else { - DEBUG("sysevent plugin: writing %s", buffer); - - strncpy(ring.buffer[ring.head], buffer, sizeof(buffer)); - ring.timestamp[ring.head] = - (long long unsigned int)CDTIME_T_TO_US(cdtime()); - ring.head = next; + // Interrupt, so continue and try again + continue; } - - pthread_mutex_unlock(&sysevent_lock); } - usleep(1000); - - pthread_mutex_lock(&sysevent_lock); - - if (status < 0) { - WARNING("sysevent plugin: problem with thread status: %d", status); - sysevent_thread_error = 1; - break; - } - - if (sysevent_thread_loop <= 0) - break; - } /* while (sysevent_thread_loop > 0) */ - - pthread_mutex_unlock(&sysevent_lock); - - // pthread_exit instead of return? - return ((void *)0); -} /* }}} void *sysevent_thread */ - -static int start_thread(void) /* {{{ */ -{ - int status; - - pthread_mutex_lock(&sysevent_lock); - - if (sysevent_thread_loop != 0) { - pthread_mutex_unlock(&sysevent_lock); - return (0); - } - - sysevent_thread_loop = 1; - sysevent_thread_error = 0; - - DEBUG("sysevent plugin: starting thread"); - - status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL, - sysevent_thread, - /* arg = */ (void *)0, "sysevent"); - if (status != 0) { - sysevent_thread_loop = 0; - ERROR("sysevent plugin: starting thread failed."); - pthread_mutex_unlock(&sysevent_lock); - return (-1); - } - - pthread_mutex_unlock(&sysevent_lock); - return (0); -} /* }}} int start_thread */ - -static int stop_thread(int shutdown) /* {{{ */ -{ - int status; - - pthread_mutex_lock(&sysevent_lock); - - if (sysevent_thread_loop == 0) { - pthread_mutex_unlock(&sysevent_lock); - return (-1); - } - - sysevent_thread_loop = 0; - pthread_mutex_unlock(&sysevent_lock); - - if (shutdown == 1) { - // Since the thread is blocking, calling pthread_join - // doesn't actually succeed in stopping it. It will stick around - // until a message is received on the socket (at which - // it will realize that "sysevent_thread_loop" is 0 and will - // break out of the read loop and be allowed to die). This is - // fine when the process isn't supposed to be exiting, but in - // the case of a process shutdown, we don't want to have an - // idle thread hanging around. Calling pthread_cancel here in - // the case of a shutdown is just assures that the thread is - // gone and that the process has been fully terminated. - - DEBUG("sysevent plugin: Canceling thread for process shutdown"); - - status = pthread_cancel(sysevent_thread_id); - - if (status != 0) { - ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status, - strerror(errno)); - status = -1; - } - } else { - status = pthread_join(sysevent_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("sysevent plugin: Stopping thread failed."); - status = -1; + if (count >= sizeof(buffer)) { + WARNING("sysevent plugin: datagram too large for buffer: truncated"); } - } - pthread_mutex_lock(&sysevent_lock); - memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id)); - sysevent_thread_error = 0; - pthread_mutex_unlock(&sysevent_lock); + // We successfully received a message, so don't block on the next + // read in case there are more (and if there aren't, it will be + // handled above in the EWOULDBLOCK error-checking) + recv_flags = MSG_DONTWAIT; - DEBUG("sysevent plugin: Finished requesting stop of thread"); + // 1. Acquire data lock + // 2. Push to buffer if there is room, otherwise raise warning + // and allow dequeue thread to take over - return (status); -} /* }}} int stop_thread */ + pthread_mutex_lock(&sysevent_data_lock); -static int sysevent_init(void) /* {{{ */ -{ - ring.head = 0; - ring.tail = 0; - ring.maxLen = buffer_length; - ring.buffer = (char **)malloc(buffer_length * sizeof(char *)); - - for (int i = 0; i < buffer_length; i++) { - ring.buffer[i] = malloc(listen_buffer_size); - } - - ring.timestamp = (long long unsigned int *)malloc( - buffer_length * sizeof(long long unsigned int)); - - if (sock == -1) { - const char *hostname = listen_ip; - const char *portname = listen_port; - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = 0; - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - struct addrinfo *res = 0; + int next = ring.head + 1; + if (next >= ring.maxLen) + next = 0; - int err = getaddrinfo(hostname, portname, &hints, &res); + if (next == ring.tail) { + // Buffer is full, signal the dequeue thread to process the buffer + // and clean it out, and then sleep + WARNING("sysevent plugin: ring buffer full"); - if (err != 0) { - ERROR("sysevent plugin: failed to resolve local socket address (err=%d)", - err); - freeaddrinfo(res); - return (-1); - } + pthread_cond_signal(&sysevent_cond); + pthread_mutex_unlock(&sysevent_data_lock); - sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if (sock == -1) { - ERROR("sysevent plugin: failed to open socket: %s", strerror(errno)); - freeaddrinfo(res); - return (-1); - } + usleep(1000); + continue; + } else { + DEBUG("sysevent plugin: writing %s", buffer); - if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) { - ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno)); - freeaddrinfo(res); - return (-1); + sstrncpy(ring.buffer[ring.head], buffer, sizeof(buffer)); + ring.timestamp[ring.head] = cdtime(); + ring.head = next; } - freeaddrinfo(res); + pthread_mutex_unlock(&sysevent_data_lock); } - - DEBUG("sysevent plugin: socket created and bound"); - - return (start_thread()); -} /* }}} int sysevent_init */ - -static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */ -{ - if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING || - ci->values[1].type != OCONFIG_TYPE_STRING) { - ERROR("sysevent plugin: The `%s' config option needs " - "two string arguments (ip and port).", - ci->key); - return (-1); - } - - listen_ip = strdup(ci->values[0].value.string); - listen_port = strdup(ci->values[1].value.string); - - return (0); } -static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */ -{ - int tmp = 0; - - if (cf_util_get_int(ci, &tmp) != 0) - return (-1); - else if ((tmp >= 1024) && (tmp <= 65535)) - listen_buffer_size = tmp; - else { - WARNING( - "sysevent plugin: The `BufferSize' must be between 1024 and 65535."); - return (-1); - } - - return (0); -} - -static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */ -{ - int tmp = 0; - - if (cf_util_get_int(ci, &tmp) != 0) - return (-1); - else if ((tmp >= 3) && (tmp <= 4096)) - buffer_length = tmp; - else { - WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096."); - return (-1); - } - - return (0); -} - -static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */ -{ - if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) { - ERROR("sysevent plugin: The `%s' config option needs " - "one string argument, a regular expression.", - ci->key); - return (-1); - } - -#if HAVE_REGEX_H - if (ignorelist == NULL) - ignorelist = ignorelist_create(/* invert = */ 1); - - int status = ignorelist_add(ignorelist, ci->values[0].value.string); - - if (status != 0) { - ERROR("sysevent plugin: invalid regular expression: %s", - ci->values[0].value.string); - return (1); - } - - monitor_all_messages = 0; -#else - WARNING("sysevent plugin: The plugin has been compiled without support " - "for the \"RegexFilter\" option."); -#endif - - return (0); -} - -static int sysevent_config(oconfig_item_t *ci) /* {{{ */ -{ - for (int i = 0; i < ci->children_num; i++) { - oconfig_item_t *child = ci->children + i; - - if (strcasecmp("Listen", child->key) == 0) - sysevent_config_add_listen(child); - else if (strcasecmp("BufferSize", child->key) == 0) - sysevent_config_add_buffer_size(child); - else if (strcasecmp("BufferLength", child->key) == 0) - sysevent_config_add_buffer_length(child); - else if (strcasecmp("RegexFilter", child->key) == 0) - sysevent_config_add_regex_filter(child); - else { - WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key); - } - } - - return (0); -} /* }}} int sysevent_config */ - static void sysevent_dispatch_notification(const char *message, #if HAVE_YAJL_V2 yajl_val *node, #endif - long long unsigned int timestamp) { + cdtime_t timestamp) { char *buf = NULL; - notification_t n = {NOTIF_OKAY, cdtime(), "", "", "sysevent", - "", "", "", NULL}; + + notification_t n = { + .severity = NOTIF_OKAY, + .time = cdtime(), + .plugin = "sysevent", + .type = "gauge", + }; #if HAVE_YAJL_V2 if (node != NULL) { // If we have a parsed-JSON node to work with, use that - - char process[listen_buffer_size]; - char severity[listen_buffer_size]; - char sev_num_str[listen_buffer_size]; - char msg[listen_buffer_size]; - char hostname_str[listen_buffer_size]; - int sev_num = -1; - // msg const char *msg_path[] = {rsyslog_keys[2], (const char *)0}; yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string); + char msg[listen_buffer_size]; + if (msg_v != NULL) { memset(msg, '\0', listen_buffer_size); snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0'); @@ -754,6 +514,8 @@ static void sysevent_dispatch_notification(const char *message, (const char *)0}; yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string); + char severity[listen_buffer_size]; + if (severity_v != NULL) { memset(severity, '\0', listen_buffer_size); snprintf(severity, listen_buffer_size, "%s%c", @@ -766,6 +528,9 @@ static void sysevent_dispatch_notification(const char *message, yajl_val sev_num_str_v = yajl_tree_get(*node, sev_num_str_path, yajl_t_string); + char sev_num_str[listen_buffer_size]; + int sev_num = -1; + if (sev_num_str_v != NULL) { memset(sev_num_str, '\0', listen_buffer_size); snprintf(sev_num_str, listen_buffer_size, "%s%c", @@ -782,6 +547,8 @@ static void sysevent_dispatch_notification(const char *message, (const char *)0}; yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string); + char process[listen_buffer_size]; + if (process_v != NULL) { memset(process, '\0', listen_buffer_size); snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v), @@ -792,6 +559,8 @@ static void sysevent_dispatch_notification(const char *message, const char *hostname_path[] = {rsyslog_keys[1], (const char *)0}; yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string); + char hostname_str[listen_buffer_size]; + if (hostname_v != NULL) { memset(hostname_str, '\0', listen_buffer_size); snprintf(hostname_str, listen_buffer_size, "%s%c", @@ -816,24 +585,17 @@ static void sysevent_dispatch_notification(const char *message, #endif sstrncpy(n.host, hostname_g, sizeof(n.host)); - sstrncpy(n.type, "gauge", sizeof(n.type)); - notification_meta_t *m = calloc(1, sizeof(*m)); + int status = plugin_notification_meta_add_string(&n, "ves", buf); - if (m == NULL) { - char errbuf[1024]; + if (status < 0) { sfree(buf); - ERROR("sysevent plugin: unable to allocate metadata: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("sysevent plugin: unable to set notification VES metadata: %s", + STRERRNO); return; } - sstrncpy(m->name, "ves", sizeof(m->name)); - m->nm_value.nm_string = sstrdup(buf); - m->type = NM_TYPE_STRING; - n.meta = m; - - DEBUG("sysevent plugin: notification message: %s", + DEBUG("sysevent plugin: notification VES metadata: %s", n.meta->nm_value.nm_string); DEBUG("sysevent plugin: dispatching message"); @@ -841,31 +603,20 @@ static void sysevent_dispatch_notification(const char *message, plugin_dispatch_notification(&n); plugin_notification_meta_free(n.meta); - // malloc'd in gen_message_payload + // strdup'd in gen_message_payload if (buf != NULL) sfree(buf); } -static int sysevent_read(void) /* {{{ */ -{ - if (sysevent_thread_error != 0) { - ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting " - "it.", - sysevent_thread_error); - - stop_thread(0); +static void read_ring_buffer() { + pthread_mutex_lock(&sysevent_data_lock); - start_thread(); - - return (-1); - } /* if (sysevent_thread_error != 0) */ - - pthread_mutex_lock(&sysevent_lock); + // If there's currently nothing to read from the buffer, + // then wait + if (ring.head == ring.tail) + pthread_cond_wait(&sysevent_cond, &sysevent_data_lock); while (ring.head != ring.tail) { - long long unsigned int timestamp; - int is_match = 1; - char *match_str = NULL; int next = ring.tail + 1; if (next >= ring.maxLen) @@ -874,15 +625,15 @@ static int sysevent_read(void) /* {{{ */ DEBUG("sysevent plugin: reading from ring buffer: %s", ring.buffer[ring.tail]); - timestamp = ring.timestamp[ring.tail]; + cdtime_t timestamp = ring.timestamp[ring.tail]; + char *match_str = NULL; #if HAVE_YAJL_V2 // Try to parse JSON, and if it fails, fall back to plain string - yajl_val node = NULL; char errbuf[1024]; errbuf[0] = 0; - node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf, - sizeof(errbuf)); + yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail], + errbuf, sizeof(errbuf)); if (node != NULL) { // JSON rsyslog data @@ -890,10 +641,10 @@ static int sysevent_read(void) /* {{{ */ // If we have any regex filters, we need to see if the message portion of // the data matches any of them (otherwise we're not interested) if (monitor_all_messages == 0) { - char json_val[listen_buffer_size]; const char *path[] = {"@message", (const char *)0}; yajl_val v = yajl_tree_get(node, path, yajl_t_string); + char json_val[listen_buffer_size]; memset(json_val, '\0', listen_buffer_size); snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v), @@ -916,10 +667,10 @@ static int sysevent_read(void) /* {{{ */ match_str = ring.buffer[ring.tail]; #endif + int is_match = 1; + // If we care about matching, do that comparison here if (match_str != NULL) { - is_match = 1; - if (ignorelist_match(ignorelist, match_str) != 0) is_match = 0; else @@ -940,27 +691,437 @@ static int sysevent_read(void) /* {{{ */ ring.tail = next; } - pthread_mutex_unlock(&sysevent_lock); + pthread_mutex_unlock(&sysevent_data_lock); +} - return (0); -} /* }}} int sysevent_read */ +static void *sysevent_socket_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&sysevent_thread_lock); -static int sysevent_shutdown(void) /* {{{ */ + while (sysevent_socket_thread_loop > 0) { + pthread_mutex_unlock(&sysevent_thread_lock); + + if (sock == -1) + return (void *)0; + + int status = read_socket(); + + pthread_mutex_lock(&sysevent_thread_lock); + + if (status < 0) { + WARNING("sysevent plugin: problem with socket thread (status: %d)", + status); + sysevent_socket_thread_error = 1; + break; + } + } /* while (sysevent_socket_thread_loop > 0) */ + + pthread_mutex_unlock(&sysevent_thread_lock); + + return (void *)0; +} /* }}} void *sysevent_socket_thread */ + +// Entry point for thread responsible for reading from +// ring buffer and dispatching notifications +static void *sysevent_dequeue_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&sysevent_thread_lock); + + while (sysevent_dequeue_thread_loop > 0) { + pthread_mutex_unlock(&sysevent_thread_lock); + + read_ring_buffer(); + + pthread_mutex_lock(&sysevent_thread_lock); + } /* while (sysevent_dequeue_thread_loop > 0) */ + + pthread_mutex_unlock(&sysevent_thread_lock); + + return (void *)0; +} /* }}} void *sysevent_dequeue_thread */ + +static int start_socket_thread(void) /* {{{ */ +{ + pthread_mutex_lock(&sysevent_thread_lock); + + if (sysevent_socket_thread_loop != 0) { + pthread_mutex_unlock(&sysevent_thread_lock); + return 0; + } + + sysevent_socket_thread_loop = 1; + sysevent_socket_thread_error = 0; + + DEBUG("sysevent plugin: starting socket thread"); + + int status = plugin_thread_create(&sysevent_socket_thread_id, + /* attr = */ NULL, sysevent_socket_thread, + /* arg = */ (void *)0, "sysevent"); + if (status != 0) { + sysevent_socket_thread_loop = 0; + ERROR("sysevent plugin: starting socket thread failed."); + pthread_mutex_unlock(&sysevent_thread_lock); + return -1; + } + + pthread_mutex_unlock(&sysevent_thread_lock); + + return 0; +} /* }}} int start_socket_thread */ + +static int start_dequeue_thread(void) /* {{{ */ { + pthread_mutex_lock(&sysevent_thread_lock); + + if (sysevent_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&sysevent_thread_lock); + return 0; + } + + sysevent_dequeue_thread_loop = 1; + + int status = plugin_thread_create(&sysevent_dequeue_thread_id, + /* attr = */ NULL, sysevent_dequeue_thread, + /* arg = */ (void *)0, "ssyevent"); + if (status != 0) { + sysevent_dequeue_thread_loop = 0; + ERROR("sysevent plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&sysevent_thread_lock); + return -1; + } + + pthread_mutex_unlock(&sysevent_thread_lock); + + return status; +} /* }}} int start_dequeue_thread */ + +static int start_threads(void) /* {{{ */ +{ + int status = start_socket_thread(); + int status2 = start_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int start_threads */ + +static int stop_socket_thread(int shutdown) /* {{{ */ +{ + pthread_mutex_lock(&sysevent_thread_lock); + + if (sysevent_socket_thread_loop == 0) { + pthread_mutex_unlock(&sysevent_thread_lock); + return -1; + } + + sysevent_socket_thread_loop = 0; + pthread_cond_broadcast(&sysevent_cond); + pthread_mutex_unlock(&sysevent_thread_lock); + int status; + if (shutdown == 1) { + // Since the thread is blocking, calling pthread_join + // doesn't actually succeed in stopping it. It will stick around + // until a message is received on the socket (at which + // it will realize that "sysevent_socket_thread_loop" is 0 and will + // break out of the read loop and be allowed to die). This is + // fine when the process isn't supposed to be exiting, but in + // the case of a process shutdown, we don't want to have an + // idle thread hanging around. Calling pthread_cancel here in + // the case of a shutdown is just assures that the thread is + // gone and that the process has been fully terminated. + + DEBUG("sysevent plugin: Canceling socket thread for process shutdown"); + + status = pthread_cancel(sysevent_socket_thread_id); + + if (status != 0 && status != ESRCH) { + ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status, + STRERRNO); + status = -1; + } else + status = 0; + } else { + status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL); + if (status != 0 && status != ESRCH) { + ERROR("sysevent plugin: Stopping socket thread failed."); + status = -1; + } else + status = 0; + } + + pthread_mutex_lock(&sysevent_thread_lock); + memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id)); + sysevent_socket_thread_error = 0; + pthread_mutex_unlock(&sysevent_thread_lock); + + DEBUG("sysevent plugin: Finished requesting stop of socket thread"); + + return status; +} /* }}} int stop_socket_thread */ + +static int stop_dequeue_thread() /* {{{ */ +{ + pthread_mutex_lock(&sysevent_thread_lock); + + if (sysevent_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&sysevent_thread_lock); + return -1; + } + + sysevent_dequeue_thread_loop = 0; + pthread_cond_broadcast(&sysevent_cond); + pthread_mutex_unlock(&sysevent_thread_lock); + + // Since the thread is blocking, calling pthread_join + // doesn't actually succeed in stopping it. It will stick around + // until a message is received on the socket (at which + // it will realize that "sysevent_dequeue_thread_loop" is 0 and will + // break out of the read loop and be allowed to die). Since this + // function is called when the processing is exiting, we don't want to + // have an idle thread hanging around. Calling pthread_cancel here + // just assures that the thread is gone and that the process has been + // fully terminated. + + DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown"); + + int status = pthread_cancel(sysevent_dequeue_thread_id); + + if (status != 0 && status != ESRCH) { + ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status, + STRERRNO); + status = -1; + } else + status = 0; + + pthread_mutex_lock(&sysevent_thread_lock); + memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id)); + pthread_mutex_unlock(&sysevent_thread_lock); + + DEBUG("sysevent plugin: Finished requesting stop of dequeue thread"); + + return status; +} /* }}} int stop_dequeue_thread */ + +static int stop_threads() /* {{{ */ +{ + int status = stop_socket_thread(1); + int status2 = stop_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int stop_threads */ + +static int sysevent_init(void) /* {{{ */ +{ + ring.head = 0; + ring.tail = 0; + ring.maxLen = buffer_length; + ring.buffer = (char **)calloc(buffer_length, sizeof(char *)); + + if (ring.buffer == NULL) { + ERROR("sysevent plugin: sysevent_init ring buffer calloc failed"); + return -1; + } + + for (int i = 0; i < buffer_length; i++) { + ring.buffer[i] = calloc(1, listen_buffer_size); + + if (ring.buffer[i] == NULL) { + ERROR("sysevent plugin: sysevent_init ring buffer entry calloc failed"); + return -1; + } + } + + ring.timestamp = (cdtime_t *)calloc(buffer_length, sizeof(cdtime_t)); + + if (ring.timestamp == NULL) { + ERROR("sysevent plugin: sysevent_init ring buffer timestamp calloc failed"); + return -1; + } + + if (sock == -1) { + struct addrinfo hints = { + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_DGRAM, + .ai_protocol = 0, + .ai_flags = AI_PASSIVE | AI_ADDRCONFIG, + }; + struct addrinfo *res = 0; + + int err = getaddrinfo(listen_ip, listen_port, &hints, &res); + + if (err != 0) { + ERROR("sysevent plugin: failed to resolve local socket address (err=%d)", + err); + freeaddrinfo(res); + return -1; + } + + sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sock == -1) { + ERROR("sysevent plugin: failed to open socket: %s", STRERRNO); + freeaddrinfo(res); + return -1; + } + + if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) { + ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO); + freeaddrinfo(res); + sock = -1; + return -1; + } + + freeaddrinfo(res); + } + + DEBUG("sysevent plugin: socket created and bound"); + + return start_threads(); +} /* }}} int sysevent_init */ + +static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */ +{ + if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING || + ci->values[1].type != OCONFIG_TYPE_STRING) { + ERROR("sysevent plugin: The `%s' config option needs " + "two string arguments (ip and port).", + ci->key); + return -1; + } + + listen_ip = strdup(ci->values[0].value.string); + listen_port = strdup(ci->values[1].value.string); + + return 0; +} + +static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */ +{ + int tmp = 0; + + if (cf_util_get_int(ci, &tmp) != 0) + return -1; + else if ((tmp >= 1024) && (tmp <= 65535)) + listen_buffer_size = tmp; + else { + WARNING( + "sysevent plugin: The `BufferSize' must be between 1024 and 65535."); + return -1; + } + + return 0; +} + +static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */ +{ + int tmp = 0; + + if (cf_util_get_int(ci, &tmp) != 0) + return -1; + else if ((tmp >= 3) && (tmp <= 4096)) + buffer_length = tmp; + else { + WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096."); + return -1; + } + + return 0; +} + +static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */ +{ + if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) { + ERROR("sysevent plugin: The `%s' config option needs " + "one string argument, a regular expression.", + ci->key); + return -1; + } + +#if HAVE_REGEX_H + if (ignorelist == NULL) + ignorelist = ignorelist_create(/* invert = */ 1); + + int status = ignorelist_add(ignorelist, ci->values[0].value.string); + + if (status != 0) { + ERROR("sysevent plugin: invalid regular expression: %s", + ci->values[0].value.string); + return 1; + } + + monitor_all_messages = 0; +#else + WARNING("sysevent plugin: The plugin has been compiled without support " + "for the \"RegexFilter\" option."); +#endif + + return 0; +} + +static int sysevent_config(oconfig_item_t *ci) /* {{{ */ +{ + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Listen", child->key) == 0) + sysevent_config_add_listen(child); + else if (strcasecmp("BufferSize", child->key) == 0) + sysevent_config_add_buffer_size(child); + else if (strcasecmp("BufferLength", child->key) == 0) + sysevent_config_add_buffer_length(child); + else if (strcasecmp("RegexFilter", child->key) == 0) + sysevent_config_add_regex_filter(child); + else { + WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key); + } + } + + return 0; +} /* }}} int sysevent_config */ + +static int sysevent_read(void) /* {{{ */ +{ + pthread_mutex_lock(&sysevent_thread_lock); + + if (sysevent_socket_thread_error != 0) { + pthread_mutex_unlock(&sysevent_thread_lock); + + ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). " + "Restarting it.", + sysevent_socket_thread_error); + + stop_threads(); + + start_threads(); + + return -1; + } /* if (sysevent_socket_thread_error != 0) */ + + pthread_mutex_unlock(&sysevent_thread_lock); + + return 0; +} /* }}} int sysevent_read */ + +static int sysevent_shutdown(void) /* {{{ */ +{ DEBUG("sysevent plugin: Shutting down thread."); - if (stop_thread(1) < 0) - return (-1); + + int status = stop_threads(); + int status2 = 0; if (sock != -1) { - status = close(sock); - if (status != 0) { + status2 = close(sock); + if (status2 != 0) { ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status, - strerror(errno)); - return (-1); - } else - sock = -1; + STRERRNO); + } + + sock = -1; } free(listen_ip); @@ -973,7 +1134,10 @@ static int sysevent_shutdown(void) /* {{{ */ free(ring.buffer); free(ring.timestamp); - return (0); + if (status != 0) + return status; + else + return status2; } /* }}} int sysevent_shutdown */ void module_register(void) {