Fix mutex lock in read_event
[collectd.git] / src / sysevent.c
index 538618c..b63767d 100644 (file)
 
 #include "collectd.h"
 
-#include "common.h"
 #include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
 #include "utils_complain.h"
-#include "utils_ignorelist.h"
 
 #include <errno.h>
 #include <netdb.h>
@@ -95,18 +95,23 @@ typedef struct {
   int tail;
   int maxLen;
   char **buffer;
-  long long unsigned int *timestamp;
+  cdtime_t *timestamp;
 } circbuf_t;
 
 /*
  * Private variables
  */
+
 static ignorelist_t *ignorelist = NULL;
 
-static int sysevent_thread_loop = 0;
-static int sysevent_thread_error = 0;
-static pthread_t sysevent_thread_id;
-static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
+static int sysevent_socket_thread_loop = 0;
+static int sysevent_socket_thread_error = 0;
+static pthread_t sysevent_socket_thread_id;
+static int sysevent_dequeue_thread_loop = 0;
+static pthread_t sysevent_dequeue_thread_id;
+static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER;
 static int sock = -1;
 static int event_id = 0;
 static circbuf_t ring;
@@ -129,16 +134,14 @@ static const char *rsyslog_field_keys[5] = {
  */
 
 static int gen_message_payload(const char *msg, char *sev, int sev_num,
-                               char *process, char *host,
-                               long long unsigned int timestamp, char **buf) {
+                               char *process, char *host, cdtime_t timestamp,
+                               char **buf) {
   const unsigned char *buf2;
   yajl_gen g;
   char json_str[DATA_MAX_NAME_LEN];
 
 #if !defined(HAVE_YAJL_V2)
-  yajl_gen_config conf = {};
-
-  conf.beautify = 0;
+  yajl_gen_config conf = {0};
 #endif
 
 #if HAVE_YAJL_V2
@@ -172,9 +175,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num,
     goto err;
 
   event_id = event_id + 1;
-  int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, event_id_len, "%d", event_id);
+  snprintf(json_str, sizeof(json_str), "%d", event_id);
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -185,13 +186,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num,
                       strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
     goto err;
 
-  int event_name_len = 0;
-  event_name_len = event_name_len + strlen(host); // host name
-  event_name_len =
-      event_name_len +
-      22; // "host", "rsyslog", "message", 3 spaces and null-terminator
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, event_name_len, "host %s rsyslog message", host);
+  snprintf(json_str, sizeof(json_str), "host %s rsyslog message", host);
 
   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
       yajl_gen_status_ok) {
@@ -204,11 +199,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num,
       yajl_gen_status_ok)
     goto err;
 
-  int last_epoch_microsec_len =
-      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, last_epoch_microsec_len, "%llu",
-           (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+  snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(cdtime()));
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -282,11 +273,7 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num,
       yajl_gen_status_ok)
     goto err;
 
-  int start_epoch_microsec_len =
-      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, start_epoch_microsec_len, "%llu",
-           (long long unsigned int)timestamp);
+  snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(timestamp));
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -386,21 +373,23 @@ static int gen_message_payload(const char *msg, char *sev, int sev_num,
   if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
                       strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
     goto err;
-  
-  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
-    goto err;
 
   // *** END syslog fields ***
 
-  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+  // close syslog and header fields
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+      yajl_gen_map_close(g) != yajl_gen_status_ok)
     goto err;
 
   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
     goto err;
 
-  *buf = malloc(strlen((char *)buf2) + 1);
+  *buf = strdup((char *)buf2);
 
-  sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
+  if (*buf == NULL) {
+    ERROR("sysevent plugin: gen_message_payload strdup failed");
+    goto err;
+  }
 
   yajl_gen_free(g);
 
@@ -412,338 +401,109 @@ err:
   return -1;
 }
 
-static void *sysevent_thread(void *arg) /* {{{ */
-{
-  pthread_mutex_lock(&sysevent_lock);
-
-  while (sysevent_thread_loop > 0) {
-    int status = 0;
-
-    pthread_mutex_unlock(&sysevent_lock);
+static int read_socket() {
+  int recv_flags = MSG_DONTWAIT;
 
-    if (sock == -1)
-      return ((void *)0);
-
-    char buffer[listen_buffer_size];
+  while (42) {
     struct sockaddr_storage src_addr;
     socklen_t src_addr_len = sizeof(src_addr);
 
+    char buffer[listen_buffer_size];
     memset(buffer, '\0', listen_buffer_size);
 
-    ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
+    ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags,
                              (struct sockaddr *)&src_addr, &src_addr_len);
 
-    if (count == -1) {
-      ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
-      status = -1;
-    } else if (count >= sizeof(buffer)) {
-      WARNING("sysevent plugin: datagram too large for buffer: truncated");
-    } else {
-      // 1. Acquire lock
-      // 2. Push to buffer if there is room, otherwise raise warning
-
-      pthread_mutex_lock(&sysevent_lock);
-
-      int next = ring.head + 1;
-      if (next >= ring.maxLen)
-        next = 0;
-
-      if (next == ring.tail) {
-        WARNING("sysevent plugin: ring buffer full");
+    if (count < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        pthread_mutex_lock(&sysevent_data_lock);
+
+        // There was nothing more to receive for now, so...
+        // If ring head does not equal ring tail, there is data
+        // in the ring buffer for the dequeue thread to read, so
+        // signal it
+        if (ring.head != ring.tail)
+          pthread_cond_signal(&sysevent_cond);
+
+        pthread_mutex_unlock(&sysevent_data_lock);
+
+        // Since there was nothing to receive, set recv to block and
+        // try again
+        recv_flags = 0;
+        continue;
+      } else if (errno != EINTR) {
+        ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
+        return -1;
       } else {
-        DEBUG("sysevent plugin: writing %s", buffer);
-
-        strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
-        ring.timestamp[ring.head] =
-            (long long unsigned int)CDTIME_T_TO_US(cdtime());
-        ring.head = next;
+        // Interrupt, so continue and try again
+        continue;
       }
-
-      pthread_mutex_unlock(&sysevent_lock);
-    }
-
-    usleep(1000);
-
-    pthread_mutex_lock(&sysevent_lock);
-
-    if (status < 0) {
-      WARNING("sysevent plugin: problem with thread status: %d", status);
-      sysevent_thread_error = 1;
-      break;
     }
 
-    if (sysevent_thread_loop <= 0)
-      break;
-  } /* while (sysevent_thread_loop > 0) */
-
-  pthread_mutex_unlock(&sysevent_lock);
-
-  // pthread_exit instead of return?
-  return ((void *)0);
-} /* }}} void *sysevent_thread */
-
-static int start_thread(void) /* {{{ */
-{
-  int status;
-
-  pthread_mutex_lock(&sysevent_lock);
-
-  if (sysevent_thread_loop != 0) {
-    pthread_mutex_unlock(&sysevent_lock);
-    return (0);
-  }
-
-  sysevent_thread_loop = 1;
-  sysevent_thread_error = 0;
-
-  DEBUG("sysevent plugin: starting thread");
-
-  status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
-                                sysevent_thread,
-                                /* arg = */ (void *)0, "sysevent");
-  if (status != 0) {
-    sysevent_thread_loop = 0;
-    ERROR("sysevent plugin: starting thread failed.");
-    pthread_mutex_unlock(&sysevent_lock);
-    return (-1);
-  }
-
-  pthread_mutex_unlock(&sysevent_lock);
-  return (0);
-} /* }}} int start_thread */
-
-static int stop_thread(int shutdown) /* {{{ */
-{
-  int status;
-
-  pthread_mutex_lock(&sysevent_lock);
-
-  if (sysevent_thread_loop == 0) {
-    pthread_mutex_unlock(&sysevent_lock);
-    return (-1);
-  }
-
-  sysevent_thread_loop = 0;
-  pthread_mutex_unlock(&sysevent_lock);
-
-  if (shutdown == 1) {
-    // Since the thread is blocking, calling pthread_join
-    // doesn't actually succeed in stopping it.  It will stick around
-    // until a message is received on the socket (at which
-    // it will realize that "sysevent_thread_loop" is 0 and will
-    // break out of the read loop and be allowed to die).  This is
-    // fine when the process isn't supposed to be exiting, but in
-    // the case of a process shutdown, we don't want to have an
-    // idle thread hanging around.  Calling pthread_cancel here in
-    // the case of a shutdown is just assures that the thread is
-    // gone and that the process has been fully terminated.
-
-    DEBUG("sysevent plugin: Canceling thread for process shutdown");
-
-    status = pthread_cancel(sysevent_thread_id);
-
-    if (status != 0) {
-      ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
-            strerror(errno));
-      status = -1;
-    }
-  } else {
-    status = pthread_join(sysevent_thread_id, /* return = */ NULL);
-    if (status != 0) {
-      ERROR("sysevent plugin: Stopping thread failed.");
-      status = -1;
+    if (count >= sizeof(buffer)) {
+      WARNING("sysevent plugin: datagram too large for buffer: truncated");
     }
-  }
 
-  pthread_mutex_lock(&sysevent_lock);
-  memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
-  sysevent_thread_error = 0;
-  pthread_mutex_unlock(&sysevent_lock);
+    // We successfully received a message, so don't block on the next
+    // read in case there are more (and if there aren't, it will be
+    // handled above in the EWOULDBLOCK error-checking)
+    recv_flags = MSG_DONTWAIT;
 
-  DEBUG("sysevent plugin: Finished requesting stop of thread");
+    // 1. Acquire data lock
+    // 2. Push to buffer if there is room, otherwise raise warning
+    //    and allow dequeue thread to take over
 
-  return (status);
-} /* }}} int stop_thread */
+    pthread_mutex_lock(&sysevent_data_lock);
 
-static int sysevent_init(void) /* {{{ */
-{
-  ring.head = 0;
-  ring.tail = 0;
-  ring.maxLen = buffer_length;
-  ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
-
-  for (int i = 0; i < buffer_length; i++) {
-    ring.buffer[i] = malloc(listen_buffer_size);
-  }
-
-  ring.timestamp = (long long unsigned int *)malloc(
-      buffer_length * sizeof(long long unsigned int));
-
-  if (sock == -1) {
-    const char *hostname = listen_ip;
-    const char *portname = listen_port;
-    struct addrinfo hints;
-    memset(&hints, 0, sizeof(hints));
-    hints.ai_family = AF_UNSPEC;
-    hints.ai_socktype = SOCK_DGRAM;
-    hints.ai_protocol = 0;
-    hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
-    struct addrinfo *res = 0;
+    int next = ring.head + 1;
+    if (next >= ring.maxLen)
+      next = 0;
 
-    int err = getaddrinfo(hostname, portname, &hints, &res);
+    if (next == ring.tail) {
+      // Buffer is full, signal the dequeue thread to process the buffer
+      // and clean it out, and then sleep
+      WARNING("sysevent plugin: ring buffer full");
 
-    if (err != 0) {
-      ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
-            err);
-      freeaddrinfo(res);
-      return (-1);
-    }
+      pthread_cond_signal(&sysevent_cond);
+      pthread_mutex_unlock(&sysevent_data_lock);
 
-    sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-    if (sock == -1) {
-      ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
-      freeaddrinfo(res);
-      return (-1);
-    }
+      usleep(1000);
+      continue;
+    } else {
+      DEBUG("sysevent plugin: writing %s", buffer);
 
-    if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
-      ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
-      freeaddrinfo(res);
-      return (-1);
+      strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
+      ring.timestamp[ring.head] = cdtime();
+      ring.head = next;
     }
 
-    freeaddrinfo(res);
+    pthread_mutex_unlock(&sysevent_data_lock);
   }
-
-  DEBUG("sysevent plugin: socket created and bound");
-
-  return (start_thread());
-} /* }}} int sysevent_init */
-
-static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
-{
-  if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
-      ci->values[1].type != OCONFIG_TYPE_STRING) {
-    ERROR("sysevent plugin: The `%s' config option needs "
-          "two string arguments (ip and port).",
-          ci->key);
-    return (-1);
-  }
-
-  listen_ip = strdup(ci->values[0].value.string);
-  listen_port = strdup(ci->values[1].value.string);
-
-  return (0);
 }
 
-static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
-{
-  int tmp = 0;
-
-  if (cf_util_get_int(ci, &tmp) != 0)
-    return (-1);
-  else if ((tmp >= 1024) && (tmp <= 65535))
-    listen_buffer_size = tmp;
-  else {
-    WARNING(
-        "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
-    return (-1);
-  }
-
-  return (0);
-}
-
-static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
-{
-  int tmp = 0;
-
-  if (cf_util_get_int(ci, &tmp) != 0)
-    return (-1);
-  else if ((tmp >= 3) && (tmp <= 4096))
-    buffer_length = tmp;
-  else {
-    WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
-    return (-1);
-  }
-
-  return (0);
-}
-
-static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
-{
-  if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
-    ERROR("sysevent plugin: The `%s' config option needs "
-          "one string argument, a regular expression.",
-          ci->key);
-    return (-1);
-  }
-
-#if HAVE_REGEX_H
-  if (ignorelist == NULL)
-    ignorelist = ignorelist_create(/* invert = */ 1);
-
-  int status = ignorelist_add(ignorelist, ci->values[0].value.string);
-
-  if (status != 0) {
-    ERROR("sysevent plugin: invalid regular expression: %s",
-          ci->values[0].value.string);
-    return (1);
-  }
-
-  monitor_all_messages = 0;
-#else
-  WARNING("sysevent plugin: The plugin has been compiled without support "
-          "for the \"RegexFilter\" option.");
-#endif
-
-  return (0);
-}
-
-static int sysevent_config(oconfig_item_t *ci) /* {{{ */
-{
-  for (int i = 0; i < ci->children_num; i++) {
-    oconfig_item_t *child = ci->children + i;
-
-    if (strcasecmp("Listen", child->key) == 0)
-      sysevent_config_add_listen(child);
-    else if (strcasecmp("BufferSize", child->key) == 0)
-      sysevent_config_add_buffer_size(child);
-    else if (strcasecmp("BufferLength", child->key) == 0)
-      sysevent_config_add_buffer_length(child);
-    else if (strcasecmp("RegexFilter", child->key) == 0)
-      sysevent_config_add_regex_filter(child);
-    else {
-      WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
-    }
-  }
-
-  return (0);
-} /* }}} int sysevent_config */
-
 static void sysevent_dispatch_notification(const char *message,
 #if HAVE_YAJL_V2
                                            yajl_val *node,
 #endif
-                                           long long unsigned int timestamp) {
+                                           cdtime_t timestamp) {
   char *buf = NULL;
-  notification_t n = {NOTIF_OKAY, cdtime(), "", "",  "sysevent",
-                      "",         "",       "", NULL};
+
+  notification_t n = {
+      .severity = NOTIF_OKAY,
+      .time = cdtime(),
+      .plugin = "sysevent",
+      .type = "gauge",
+  };
 
 #if HAVE_YAJL_V2
   if (node != NULL) {
     // If we have a parsed-JSON node to work with, use that
-
-    char process[listen_buffer_size];
-    char severity[listen_buffer_size];
-    char sev_num_str[listen_buffer_size];
-    char msg[listen_buffer_size];
-    char hostname_str[listen_buffer_size];
-    int sev_num = -1;
-
     // msg
     const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
     yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
 
+    char msg[listen_buffer_size];
+
     if (msg_v != NULL) {
       memset(msg, '\0', listen_buffer_size);
       snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
@@ -754,6 +514,8 @@ static void sysevent_dispatch_notification(const char *message,
                                    (const char *)0};
     yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
 
+    char severity[listen_buffer_size];
+
     if (severity_v != NULL) {
       memset(severity, '\0', listen_buffer_size);
       snprintf(severity, listen_buffer_size, "%s%c",
@@ -766,6 +528,9 @@ static void sysevent_dispatch_notification(const char *message,
     yajl_val sev_num_str_v =
         yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
 
+    char sev_num_str[listen_buffer_size];
+    int sev_num = -1;
+
     if (sev_num_str_v != NULL) {
       memset(sev_num_str, '\0', listen_buffer_size);
       snprintf(sev_num_str, listen_buffer_size, "%s%c",
@@ -782,6 +547,8 @@ static void sysevent_dispatch_notification(const char *message,
                                   (const char *)0};
     yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
 
+    char process[listen_buffer_size];
+
     if (process_v != NULL) {
       memset(process, '\0', listen_buffer_size);
       snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
@@ -792,6 +559,8 @@ static void sysevent_dispatch_notification(const char *message,
     const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
     yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
 
+    char hostname_str[listen_buffer_size];
+
     if (hostname_v != NULL) {
       memset(hostname_str, '\0', listen_buffer_size);
       snprintf(hostname_str, listen_buffer_size, "%s%c",
@@ -816,24 +585,17 @@ static void sysevent_dispatch_notification(const char *message,
 #endif
 
   sstrncpy(n.host, hostname_g, sizeof(n.host));
-  sstrncpy(n.type, "gauge", sizeof(n.type));
 
-  notification_meta_t *m = calloc(1, sizeof(*m));
+  int status = plugin_notification_meta_add_string(&n, "ves", buf);
 
-  if (m == NULL) {
-    char errbuf[1024];
+  if (status < 0) {
     sfree(buf);
-    ERROR("sysevent plugin: unable to allocate metadata: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
+    ERROR("sysevent plugin: unable to set notification VES metadata: %s",
+          STRERRNO);
     return;
   }
 
-  sstrncpy(m->name, "ves", sizeof(m->name));
-  m->nm_value.nm_string = sstrdup(buf);
-  m->type = NM_TYPE_STRING;
-  n.meta = m;
-
-  DEBUG("sysevent plugin: notification message: %s",
+  DEBUG("sysevent plugin: notification VES metadata: %s",
         n.meta->nm_value.nm_string);
 
   DEBUG("sysevent plugin: dispatching message");
@@ -841,31 +603,20 @@ static void sysevent_dispatch_notification(const char *message,
   plugin_dispatch_notification(&n);
   plugin_notification_meta_free(n.meta);
 
-  // malloc'd in gen_message_payload
+  // strdup'd in gen_message_payload
   if (buf != NULL)
     sfree(buf);
 }
 
-static int sysevent_read(void) /* {{{ */
-{
-  if (sysevent_thread_error != 0) {
-    ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
-          "it.",
-          sysevent_thread_error);
-
-    stop_thread(0);
-
-    start_thread();
+static void read_ring_buffer() {
+  pthread_mutex_lock(&sysevent_data_lock);
 
-    return (-1);
-  } /* if (sysevent_thread_error != 0) */
-
-  pthread_mutex_lock(&sysevent_lock);
+  // If there's currently nothing to read from the buffer,
+  // then wait
+  if (ring.head == ring.tail)
+    pthread_cond_wait(&sysevent_cond, &sysevent_data_lock);
 
   while (ring.head != ring.tail) {
-    long long unsigned int timestamp;
-    int is_match = 1;
-    char *match_str = NULL;
     int next = ring.tail + 1;
 
     if (next >= ring.maxLen)
@@ -874,15 +625,15 @@ static int sysevent_read(void) /* {{{ */
     DEBUG("sysevent plugin: reading from ring buffer: %s",
           ring.buffer[ring.tail]);
 
-    timestamp = ring.timestamp[ring.tail];
+    cdtime_t timestamp = ring.timestamp[ring.tail];
+    char *match_str = NULL;
 
 #if HAVE_YAJL_V2
     // Try to parse JSON, and if it fails, fall back to plain string
-    yajl_val node = NULL;
     char errbuf[1024];
     errbuf[0] = 0;
-    node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
-                           sizeof(errbuf));
+    yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail],
+                                    errbuf, sizeof(errbuf));
 
     if (node != NULL) {
       // JSON rsyslog data
@@ -890,10 +641,10 @@ static int sysevent_read(void) /* {{{ */
       // If we have any regex filters, we need to see if the message portion of
       // the data matches any of them (otherwise we're not interested)
       if (monitor_all_messages == 0) {
-        char json_val[listen_buffer_size];
         const char *path[] = {"@message", (const char *)0};
         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
 
+        char json_val[listen_buffer_size];
         memset(json_val, '\0', listen_buffer_size);
 
         snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
@@ -916,10 +667,10 @@ static int sysevent_read(void) /* {{{ */
       match_str = ring.buffer[ring.tail];
 #endif
 
+    int is_match = 1;
+
     // If we care about matching, do that comparison here
     if (match_str != NULL) {
-      is_match = 1;
-
       if (ignorelist_match(ignorelist, match_str) != 0)
         is_match = 0;
       else
@@ -940,27 +691,427 @@ static int sysevent_read(void) /* {{{ */
     ring.tail = next;
   }
 
-  pthread_mutex_unlock(&sysevent_lock);
+  pthread_mutex_unlock(&sysevent_data_lock);
+}
 
-  return (0);
-} /* }}} int sysevent_read */
+static void *sysevent_socket_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
 
-static int sysevent_shutdown(void) /* {{{ */
+  while (sysevent_socket_thread_loop > 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+
+    if (sock == -1)
+      return (void *)0;
+
+    int status = read_socket();
+
+    pthread_mutex_lock(&sysevent_thread_lock);
+
+    if (status < 0) {
+      WARNING("sysevent plugin: problem with socket thread (status: %d)",
+              status);
+      sysevent_socket_thread_error = 1;
+      break;
+    }
+  } /* while (sysevent_socket_thread_loop > 0) */
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return (void *)0;
+} /* }}} void *sysevent_socket_thread */
+
+// Entry point for thread responsible for reading from
+// ring buffer and dispatching notifications
+static void *sysevent_dequeue_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  while (sysevent_dequeue_thread_loop > 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+
+    read_ring_buffer();
+
+    pthread_mutex_lock(&sysevent_thread_lock);
+  } /* while (sysevent_dequeue_thread_loop > 0) */
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return (void *)0;
+} /* }}} void *sysevent_dequeue_thread */
+
+static int start_socket_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_socket_thread_loop != 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return 0;
+  }
+
+  sysevent_socket_thread_loop = 1;
+  sysevent_socket_thread_error = 0;
+
+  DEBUG("sysevent plugin: starting socket thread");
+
+  int status = plugin_thread_create(&sysevent_socket_thread_id,
+                                    /* attr = */ NULL, sysevent_socket_thread,
+                                    /* arg = */ (void *)0, "sysevent");
+  if (status != 0) {
+    sysevent_socket_thread_loop = 0;
+    ERROR("sysevent plugin: starting socket thread failed.");
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return 0;
+} /* }}} int start_socket_thread */
+
+static int start_dequeue_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_dequeue_thread_loop != 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return 0;
+  }
+
+  sysevent_dequeue_thread_loop = 1;
+
+  int status = plugin_thread_create(&sysevent_dequeue_thread_id,
+                                    /* attr = */ NULL, sysevent_dequeue_thread,
+                                    /* arg = */ (void *)0, "ssyevent");
+  if (status != 0) {
+    sysevent_dequeue_thread_loop = 0;
+    ERROR("sysevent plugin: Starting dequeue thread failed.");
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+  int status = start_socket_thread();
+  int status2 = start_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int start_threads */
+
+static int stop_socket_thread(int shutdown) /* {{{ */
 {
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_socket_thread_loop == 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  sysevent_socket_thread_loop = 0;
+  pthread_cond_broadcast(&sysevent_cond);
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
   int status;
 
+  if (shutdown == 1) {
+    // Since the thread is blocking, calling pthread_join
+    // doesn't actually succeed in stopping it.  It will stick around
+    // until a message is received on the socket (at which
+    // it will realize that "sysevent_socket_thread_loop" is 0 and will
+    // break out of the read loop and be allowed to die).  This is
+    // fine when the process isn't supposed to be exiting, but in
+    // the case of a process shutdown, we don't want to have an
+    // idle thread hanging around.  Calling pthread_cancel here in
+    // the case of a shutdown is just assures that the thread is
+    // gone and that the process has been fully terminated.
+
+    DEBUG("sysevent plugin: Canceling socket thread for process shutdown");
+
+    status = pthread_cancel(sysevent_socket_thread_id);
+
+    if (status != 0 && status != ESRCH) {
+      ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status,
+            STRERRNO);
+      status = -1;
+    } else
+      status = 0;
+  } else {
+    status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL);
+    if (status != 0 && status != ESRCH) {
+      ERROR("sysevent plugin: Stopping socket thread failed.");
+      status = -1;
+    } else
+      status = 0;
+  }
+
+  pthread_mutex_lock(&sysevent_thread_lock);
+  memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id));
+  sysevent_socket_thread_error = 0;
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  DEBUG("sysevent plugin: Finished requesting stop of socket thread");
+
+  return status;
+} /* }}} int stop_socket_thread */
+
+static int stop_dequeue_thread() /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_dequeue_thread_loop == 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  sysevent_dequeue_thread_loop = 0;
+  pthread_cond_broadcast(&sysevent_cond);
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  // Since the thread is blocking, calling pthread_join
+  // doesn't actually succeed in stopping it.  It will stick around
+  // until a message is received on the socket (at which
+  // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
+  // break out of the read loop and be allowed to die).  Since this
+  // function is called when the processing is exiting, we don't want to
+  // have an idle thread hanging around.  Calling pthread_cancel here
+  // just assures that the thread is gone and that the process has been
+  // fully terminated.
+
+  DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
+
+  int status = pthread_cancel(sysevent_dequeue_thread_id);
+
+  if (status != 0 && status != ESRCH) {
+    ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
+          STRERRNO);
+    status = -1;
+  } else
+    status = 0;
+
+  pthread_mutex_lock(&sysevent_thread_lock);
+  memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  DEBUG("sysevent plugin: Finished requesting stop of dequeue thread");
+
+  return status;
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads() /* {{{ */
+{
+  int status = stop_socket_thread(1);
+  int status2 = stop_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int stop_threads */
+
+static int sysevent_init(void) /* {{{ */
+{
+  ring.head = 0;
+  ring.tail = 0;
+  ring.maxLen = buffer_length;
+  ring.buffer = (char **)calloc(buffer_length, sizeof(char *));
+
+  if (ring.buffer == NULL) {
+    ERROR("sysevent plugin: sysevent_init calloc failed");
+    return -1;
+  }
+
+  for (int i = 0; i < buffer_length; i++) {
+    ring.buffer[i] = calloc(1, listen_buffer_size);
+  }
+
+  ring.timestamp = (cdtime_t *)calloc(buffer_length, sizeof(cdtime_t));
+
+  if (sock == -1) {
+    struct addrinfo hints = {
+        .ai_family = AF_UNSPEC,
+        .ai_socktype = SOCK_DGRAM,
+        .ai_protocol = 0,
+        .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
+    };
+    struct addrinfo *res = 0;
+
+    int err = getaddrinfo(listen_ip, listen_port, &hints, &res);
+
+    if (err != 0) {
+      ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
+            err);
+      freeaddrinfo(res);
+      return -1;
+    }
+
+    sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+    if (sock == -1) {
+      ERROR("sysevent plugin: failed to open socket: %s", STRERRNO);
+      freeaddrinfo(res);
+      return -1;
+    }
+
+    if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
+      ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
+      freeaddrinfo(res);
+      sock = -1;
+      return -1;
+    }
+
+    freeaddrinfo(res);
+  }
+
+  DEBUG("sysevent plugin: socket created and bound");
+
+  return start_threads();
+} /* }}} int sysevent_init */
+
+static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
+{
+  if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
+      ci->values[1].type != OCONFIG_TYPE_STRING) {
+    ERROR("sysevent plugin: The `%s' config option needs "
+          "two string arguments (ip and port).",
+          ci->key);
+    return -1;
+  }
+
+  listen_ip = strdup(ci->values[0].value.string);
+  listen_port = strdup(ci->values[1].value.string);
+
+  return 0;
+}
+
+static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
+{
+  int tmp = 0;
+
+  if (cf_util_get_int(ci, &tmp) != 0)
+    return -1;
+  else if ((tmp >= 1024) && (tmp <= 65535))
+    listen_buffer_size = tmp;
+  else {
+    WARNING(
+        "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
+    return -1;
+  }
+
+  return 0;
+}
+
+static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
+{
+  int tmp = 0;
+
+  if (cf_util_get_int(ci, &tmp) != 0)
+    return -1;
+  else if ((tmp >= 3) && (tmp <= 4096))
+    buffer_length = tmp;
+  else {
+    WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
+    return -1;
+  }
+
+  return 0;
+}
+
+static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
+{
+  if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
+    ERROR("sysevent plugin: The `%s' config option needs "
+          "one string argument, a regular expression.",
+          ci->key);
+    return -1;
+  }
+
+#if HAVE_REGEX_H
+  if (ignorelist == NULL)
+    ignorelist = ignorelist_create(/* invert = */ 1);
+
+  int status = ignorelist_add(ignorelist, ci->values[0].value.string);
+
+  if (status != 0) {
+    ERROR("sysevent plugin: invalid regular expression: %s",
+          ci->values[0].value.string);
+    return 1;
+  }
+
+  monitor_all_messages = 0;
+#else
+  WARNING("sysevent plugin: The plugin has been compiled without support "
+          "for the \"RegexFilter\" option.");
+#endif
+
+  return 0;
+}
+
+static int sysevent_config(oconfig_item_t *ci) /* {{{ */
+{
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Listen", child->key) == 0)
+      sysevent_config_add_listen(child);
+    else if (strcasecmp("BufferSize", child->key) == 0)
+      sysevent_config_add_buffer_size(child);
+    else if (strcasecmp("BufferLength", child->key) == 0)
+      sysevent_config_add_buffer_length(child);
+    else if (strcasecmp("RegexFilter", child->key) == 0)
+      sysevent_config_add_regex_filter(child);
+    else {
+      WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
+    }
+  }
+
+  return 0;
+} /* }}} int sysevent_config */
+
+static int sysevent_read(void) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_socket_thread_error != 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+
+    ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
+          "Restarting it.",
+          sysevent_socket_thread_error);
+
+    stop_threads();
+
+    start_threads();
+
+    return -1;
+  } /* if (sysevent_socket_thread_error != 0) */
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return 0;
+} /* }}} int sysevent_read */
+
+static int sysevent_shutdown(void) /* {{{ */
+{
   DEBUG("sysevent plugin: Shutting down thread.");
-  if (stop_thread(1) < 0)
-    return (-1);
+
+  int status = stop_threads();
+  int status2 = 0;
 
   if (sock != -1) {
-    status = close(sock);
-    if (status != 0) {
+    status2 = close(sock);
+    if (status2 != 0) {
       ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
-            strerror(errno));
-      return (-1);
-    } else
-      sock = -1;
+            STRERRNO);
+    }
+
+    sock = -1;
   }
 
   free(listen_ip);
@@ -973,7 +1124,10 @@ static int sysevent_shutdown(void) /* {{{ */
   free(ring.buffer);
   free(ring.timestamp);
 
-  return (0);
+  if (status != 0)
+    return status;
+  else
+    return status2;
 } /* }}} int sysevent_shutdown */
 
 void module_register(void) {