Merge pull request #3329 from efuss/fix-3311
[collectd.git] / src / procevent.c
index 69174d0..ab000db 100644 (file)
 
 #include "collectd.h"
 
-#include "common.h"
 #include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
 #include "utils_complain.h"
-#include "utils_ignorelist.h"
 
 #include <errno.h>
 #include <pthread.h>
 
 #define PROCEVENT_EXITED 0
 #define PROCEVENT_STARTED 1
-#define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp
+#define PROCEVENT_FIELDS 3 // pid, status, timestamp
 #define BUFSIZE 512
 #define PROCDIR "/proc"
+#define RBUF_PROC_ID_INDEX 0
+#define RBUF_PROC_STATUS_INDEX 1
+#define RBUF_TIME_INDEX 2
 
 #define PROCEVENT_DOMAIN_FIELD "domain"
 #define PROCEVENT_DOMAIN_VALUE "fault"
@@ -106,7 +109,7 @@ typedef struct {
   int head;
   int tail;
   int maxLen;
-  long long unsigned int **buffer;
+  cdtime_t **buffer;
 } circbuf_t;
 
 struct processlist_s {
@@ -124,19 +127,21 @@ typedef struct processlist_s processlist_t;
  */
 static ignorelist_t *ignorelist = NULL;
 
-static int procevent_thread_loop = 0;
-static int procevent_thread_error = 0;
-static pthread_t procevent_thread_id;
-static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
+static int procevent_netlink_thread_loop = 0;
+static int procevent_netlink_thread_error = 0;
+static pthread_t procevent_netlink_thread_id;
+static int procevent_dequeue_thread_loop = 0;
+static pthread_t procevent_dequeue_thread_id;
+static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
 static int nl_sock = -1;
 static int buffer_length;
 static circbuf_t ring;
 static processlist_t *processlist_head = NULL;
 static int event_id = 0;
 
-static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"};
+static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"};
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
 
 /*
@@ -144,15 +149,13 @@ static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
  */
 
 static int gen_message_payload(int state, long pid, char *process,
-                               long long unsigned int timestamp, char **buf) {
+                               cdtime_t timestamp, char **buf) {
   const unsigned char *buf2;
   yajl_gen g;
   char json_str[DATA_MAX_NAME_LEN];
 
 #if !defined(HAVE_YAJL_V2)
-  yajl_gen_config conf = {};
-
-  conf.beautify = 0;
+  yajl_gen_config conf = {0};
 #endif
 
 #if HAVE_YAJL_V2
@@ -186,9 +189,9 @@ static int gen_message_payload(int state, long pid, char *process,
     goto err;
 
   event_id = event_id + 1;
-  int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, event_id_len, "%d", event_id);
+  if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -199,16 +202,11 @@ static int gen_message_payload(int state, long pid, char *process,
                       strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
     goto err;
 
-  int event_name_len = 0;
-  event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid
-  event_name_len = event_name_len + strlen(process);      // process name
-  event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
-  event_name_len = event_name_len +
-                   13; // "process", 3 spaces, 2 parentheses and null-terminator
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, event_name_len, "process %s (%ld) %s", process, pid,
-           (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
-                       : PROCEVENT_EVENT_NAME_UP_VALUE));
+  if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
+               (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
+                           : PROCEVENT_EVENT_NAME_UP_VALUE)) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
       yajl_gen_status_ok) {
@@ -221,11 +219,10 @@ static int gen_message_payload(int state, long pid, char *process,
       yajl_gen_status_ok)
     goto err;
 
-  int last_epoch_microsec_len =
-      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, last_epoch_microsec_len, "%llu",
-           (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(cdtime())) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -276,11 +273,10 @@ static int gen_message_payload(int state, long pid, char *process,
       yajl_gen_status_ok)
     goto err;
 
-  int start_epoch_microsec_len =
-      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, start_epoch_microsec_len, "%llu",
-           (long long unsigned int)timestamp);
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(timestamp)) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -313,16 +309,10 @@ static int gen_message_payload(int state, long pid, char *process,
       yajl_gen_status_ok)
     goto err;
 
-  int alarm_condition_len = 0;
-  alarm_condition_len =
-      alarm_condition_len + (sizeof(char) * sizeof(int) * 4);  // pid
-  alarm_condition_len = alarm_condition_len + strlen(process); // process name
-  alarm_condition_len =
-      alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2
-                                // parentheses and null-terminator
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, alarm_condition_len, "process %s (%ld) state change",
-           process, pid);
+  if (snprintf(json_str, sizeof(json_str), "process %s (%ld) state change",
+               process, pid) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
       yajl_gen_status_ok) {
@@ -346,8 +336,9 @@ static int gen_message_payload(int state, long pid, char *process,
     goto err;
 
   if (yajl_gen_string(
-          g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
-                                   : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
+          g,
+          (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
+                                : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
           strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
                              : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
       yajl_gen_status_ok)
@@ -381,19 +372,11 @@ static int gen_message_payload(int state, long pid, char *process,
       yajl_gen_status_ok)
     goto err;
 
-  int specific_problem_len = 0;
-  specific_problem_len =
-      specific_problem_len + (sizeof(char) * sizeof(int) * 4);   // pid
-  specific_problem_len = specific_problem_len + strlen(process); // process name
-  specific_problem_len =
-      specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up"
-  specific_problem_len =
-      specific_problem_len +
-      13; // "process", 3 spaces, 2 parentheses and null-terminator
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, specific_problem_len, "process %s (%ld) %s", process, pid,
-           (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
-                       : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE));
+  if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
+               (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
+                           : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE)) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
       yajl_gen_status_ok) {
@@ -406,35 +389,32 @@ static int gen_message_payload(int state, long pid, char *process,
     goto err;
 
   if (yajl_gen_string(
-          g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
-                                   : PROCEVENT_VF_STATUS_NORMAL_VALUE),
+          g,
+          (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
+                                : PROCEVENT_VF_STATUS_NORMAL_VALUE),
           strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
                              : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
       yajl_gen_status_ok)
     goto err;
 
-  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
-    goto err;
-
   // *** END fault fields ***
 
-  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+  // close fault and header fields
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+      yajl_gen_map_close(g) != yajl_gen_status_ok)
     goto err;
 
   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
     goto err;
 
-  *buf = malloc(strlen((char *)buf2) + 1);
+  *buf = strdup((char *)buf2);
 
   if (*buf == NULL) {
-    char errbuf[1024];
-    ERROR("procevent plugin: malloc failed during gen_message_payload: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
+    ERROR("procevent plugin: strdup failed during gen_message_payload: %s",
+          STRERRNO);
     goto err;
   }
 
-  sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
-
   yajl_gen_free(g);
 
   return 0;
@@ -446,26 +426,27 @@ err:
 }
 
 // Does /proc/<pid>/comm contain a process name we are interested in?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
 static processlist_t *process_check(long pid) {
-  int len, is_match, retval;
   char file[BUFSIZE];
-  FILE *fh;
-  char buffer[BUFSIZE];
 
-  len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
+  int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
 
   if ((len < 0) || (len >= BUFSIZE)) {
     WARNING("procevent process_check: process name too large");
     return NULL;
   }
 
+  FILE *fh;
+
   if (NULL == (fh = fopen(file, "r"))) {
     // No /proc/<pid>/comm for this pid, just ignore
     DEBUG("procevent plugin: no comm file available for pid %ld", pid);
     return NULL;
   }
 
-  retval = fscanf(fh, "%[^\n]", buffer);
+  char buffer[BUFSIZE];
+  int retval = fscanf(fh, "%[^\n]", buffer);
 
   if (retval < 0) {
     WARNING("procevent process_check: unable to read comm file for pid %ld",
@@ -497,14 +478,11 @@ static processlist_t *process_check(long pid) {
   //    associate <pid> with it (with the same process name as the existing).
   //
 
-  pthread_mutex_lock(&procevent_list_lock);
-
-  processlist_t *pl;
   processlist_t *match = NULL;
 
-  for (pl = processlist_head; pl != NULL; pl = pl->next) {
+  for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
 
-    is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
+    int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
 
     if (is_match == 1) {
       DEBUG("procevent plugin: process %ld name match for %s", pid, buffer);
@@ -549,25 +527,18 @@ static processlist_t *process_check(long pid) {
           "(%s)",
           pid, buffer);
 
-    processlist_t *pl2;
-    char *process;
-
-    pl2 = malloc(sizeof(*pl2));
+    processlist_t *pl2 = calloc(1, sizeof(*pl2));
     if (pl2 == NULL) {
-      char errbuf[1024];
-      ERROR("procevent plugin: malloc failed during process_check: %s",
-            sstrerror(errno, errbuf, sizeof(errbuf)));
-      pthread_mutex_unlock(&procevent_list_lock);
+      ERROR("procevent plugin: calloc failed during process_check: %s",
+            STRERRNO);
       return NULL;
     }
 
-    process = strdup(buffer);
+    char *process = strdup(buffer);
     if (process == NULL) {
-      char errbuf[1024];
       sfree(pl2);
       ERROR("procevent plugin: strdup failed during process_check: %s",
-            sstrerror(errno, errbuf, sizeof(errbuf)));
-      pthread_mutex_unlock(&procevent_list_lock);
+            STRERRNO);
       return NULL;
     }
 
@@ -579,81 +550,61 @@ static processlist_t *process_check(long pid) {
     match = pl2;
   }
 
-  pthread_mutex_unlock(&procevent_list_lock);
-
   return match;
 }
 
 // Does our map have this PID or name?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
 static processlist_t *process_map_check(long pid, char *process) {
-  processlist_t *pl;
-
-  pthread_mutex_lock(&procevent_list_lock);
-
-  for (pl = processlist_head; pl != NULL; pl = pl->next) {
+  for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
     int match_pid = 0;
-    int match_process = 0;
-    int match = 0;
 
     if (pid > 0) {
       if (pl->pid == pid)
         match_pid = 1;
     }
 
+    int match_process = 0;
+
     if (process != NULL) {
       if (strcmp(pl->process, process) == 0)
         match_process = 1;
     }
 
-    if (pid > 0 && process == NULL && match_pid == 1)
-      match = 1;
-    else if (pid < 0 && process != NULL && match_process == 1)
-      match = 1;
-    else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
+    int match = 0;
+
+    if ((pid > 0 && process == NULL && match_pid == 1) ||
+        (pid < 0 && process != NULL && match_process == 1) ||
+        (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)) {
       match = 1;
+    }
 
     if (match == 1) {
-      pthread_mutex_unlock(&procevent_list_lock);
       return pl;
     }
   }
 
-  pthread_mutex_unlock(&procevent_list_lock);
-
   return NULL;
 }
 
 static int process_map_refresh(void) {
-  DIR *proc;
-
   errno = 0;
-  proc = opendir(PROCDIR);
+  DIR *proc = opendir(PROCDIR);
+
   if (proc == NULL) {
-    char errbuf[1024];
-    ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
-          sstrerror(errno, errbuf, sizeof(errbuf)));
+    ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO);
     return -1;
   }
 
   while (42) {
-    struct dirent *dent;
-    int len;
-    char file[BUFSIZE];
-
-    struct stat statbuf;
-
-    int status;
-
     errno = 0;
-    dent = readdir(proc);
+    struct dirent *dent = readdir(proc);
     if (dent == NULL) {
-      char errbuf[4096];
-
       if (errno == 0) /* end of directory */
         break;
 
       ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
-            sstrerror(errno, errbuf, sizeof(errbuf)));
+            STRERRNO);
       closedir(proc);
       return -1;
     }
@@ -661,15 +612,17 @@ static int process_map_refresh(void) {
     if (dent->d_name[0] == '.')
       continue;
 
-    len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
+    char file[BUFSIZE];
+
+    int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
     if ((len < 0) || (len >= BUFSIZE))
       continue;
 
-    status = stat(file, &statbuf);
+    struct stat statbuf;
+
+    int status = stat(file, &statbuf);
     if (status != 0) {
-      char errbuf[4096];
-      WARNING("procevent plugin: stat (%s) failed: %s", file,
-              sstrerror(errno, errbuf, sizeof(errbuf)));
+      WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO);
       continue;
     }
 
@@ -695,7 +648,9 @@ static int process_map_refresh(void) {
     // Check if we need to store this pid/name combo in our processlist_t linked
     // list
     int this_pid = atoi(dent->d_name);
+    pthread_mutex_lock(&procevent_data_lock);
     processlist_t *pl = process_check(this_pid);
+    pthread_mutex_unlock(&procevent_data_lock);
 
     if (pl != NULL)
       DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
@@ -708,8 +663,11 @@ static int process_map_refresh(void) {
 }
 
 static int nl_connect() {
-  int rc;
-  struct sockaddr_nl sa_nl;
+  struct sockaddr_nl sa_nl = {
+      .nl_family = AF_NETLINK,
+      .nl_groups = CN_IDX_PROC,
+      .nl_pid = getpid(),
+  };
 
   nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
   if (nl_sock == -1) {
@@ -717,14 +675,11 @@ static int nl_connect() {
     return -1;
   }
 
-  sa_nl.nl_family = AF_NETLINK;
-  sa_nl.nl_groups = CN_IDX_PROC;
-  sa_nl.nl_pid = getpid();
-
-  rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+  int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
   if (rc == -1) {
     ERROR("procevent plugin: socket bind failed: %d", errno);
     close(nl_sock);
+    nl_sock = -1;
     return -1;
   }
 
@@ -732,7 +687,6 @@ static int nl_connect() {
 }
 
 static int set_proc_ev_listen(bool enable) {
-  int rc;
   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
     struct nlmsghdr nl_hdr;
     struct __attribute__((__packed__)) {
@@ -752,7 +706,7 @@ static int set_proc_ev_listen(bool enable) {
 
   nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
 
-  rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+  int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
   if (rc == -1) {
     ERROR("procevent plugin: subscribing to netlink process events failed: %d",
           errno);
@@ -762,12 +716,9 @@ static int set_proc_ev_listen(bool enable) {
   return 0;
 }
 
+// Read from netlink socket and write to ring buffer
 static int read_event() {
-  int status;
-  int ret = 0;
-  int proc_id = -1;
-  int proc_status = -1;
-  int proc_extra = -1;
+  int recv_flags = MSG_DONTWAIT;
   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
     struct nlmsghdr nl_hdr;
     struct __attribute__((__packed__)) {
@@ -777,35 +728,57 @@ static int read_event() {
   } nlcn_msg;
 
   if (nl_sock == -1)
-    return ret;
+    return 0;
 
   while (42) {
+    pthread_mutex_lock(&procevent_thread_lock);
 
-    pthread_mutex_lock(&procevent_lock);
-
-    if (procevent_thread_loop <= 0)
-      return ret;
+    if (procevent_netlink_thread_loop <= 0) {
+      pthread_mutex_unlock(&procevent_thread_lock);
+      return 0;
+    }
 
-    pthread_mutex_unlock(&procevent_lock);
+    pthread_mutex_unlock(&procevent_thread_lock);
 
-    status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+    int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
 
     if (status == 0) {
       return 0;
-    } else if (status == -1) {
-      if (errno != EINTR) {
+    } else if (status < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        pthread_mutex_lock(&procevent_data_lock);
+
+        // There was nothing more to receive for now, so...
+        // If ring head does not equal ring tail, then there is data
+        // in the ring buffer for the dequeue thread to read, so
+        // signal it
+        if (ring.head != ring.tail)
+          pthread_cond_signal(&procevent_cond);
+
+        pthread_mutex_unlock(&procevent_data_lock);
+
+        // Since there was nothing to receive, set recv to block and
+        // try again
+        recv_flags = 0;
+        continue;
+      } else if (errno != EINTR) {
         ERROR("procevent plugin: socket receive error: %d", errno);
         return -1;
+      } else {
+        // Interrupt, so just continue and try again
+        continue;
       }
     }
 
+    // We successfully received a message, so don't block on the next
+    // read in case there are more (and if there aren't, it will be
+    // handled above in the EWOULDBLOCK error-checking)
+    recv_flags = MSG_DONTWAIT;
+
+    int proc_id = -1;
+    int proc_status = -1;
+
     switch (nlcn_msg.proc_ev.what) {
-    case PROC_EVENT_NONE:
-    case PROC_EVENT_FORK:
-    case PROC_EVENT_UID:
-    case PROC_EVENT_GID:
-      // Not of interest in current version
-      break;
     case PROC_EVENT_EXEC:
       proc_status = PROCEVENT_STARTED;
       proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
@@ -813,360 +786,490 @@ static int read_event() {
     case PROC_EVENT_EXIT:
       proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
       proc_status = PROCEVENT_EXITED;
-      proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
       break;
     default:
+      // Otherwise not of interest
       break;
     }
 
     // If we're interested in this process status event, place the event
-    // in the ring buffer for consumption by the main polling thread.
+    // in the ring buffer for consumption by the dequeue (dispatch) thread.
 
     if (proc_status != -1) {
-      pthread_mutex_lock(&procevent_lock);
+      pthread_mutex_lock(&procevent_data_lock);
 
       int next = ring.head + 1;
       if (next >= ring.maxLen)
         next = 0;
 
       if (next == ring.tail) {
+        // Buffer is full, signal the dequeue thread to process the buffer
+        // and clean it out, and then sleep
         WARNING("procevent plugin: ring buffer full");
+
+        pthread_cond_signal(&procevent_cond);
+        pthread_mutex_unlock(&procevent_data_lock);
+
+        usleep(1000);
+        continue;
       } else {
         DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
               (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
-              (long long unsigned int)CDTIME_T_TO_US(cdtime()));
-
-        if (proc_status == PROCEVENT_EXITED) {
-          ring.buffer[ring.head][0] = proc_id;
-          ring.buffer[ring.head][1] = proc_status;
-          ring.buffer[ring.head][2] = proc_extra;
-          ring.buffer[ring.head][3] =
-              (long long unsigned int)CDTIME_T_TO_US(cdtime());
-        } else {
-          ring.buffer[ring.head][0] = proc_id;
-          ring.buffer[ring.head][1] = proc_status;
-          ring.buffer[ring.head][2] = 0;
-          ring.buffer[ring.head][3] =
-              (long long unsigned int)CDTIME_T_TO_US(cdtime());
-        }
+              (unsigned long long)cdtime());
+
+        ring.buffer[ring.head][RBUF_PROC_ID_INDEX] = proc_id;
+        ring.buffer[ring.head][RBUF_PROC_STATUS_INDEX] = proc_status;
+        ring.buffer[ring.head][RBUF_TIME_INDEX] = cdtime();
 
         ring.head = next;
       }
 
-      pthread_mutex_unlock(&procevent_lock);
+      pthread_mutex_unlock(&procevent_data_lock);
     }
   }
 
-  return ret;
+  return 0;
 }
 
-static void *procevent_thread(void *arg) /* {{{ */
-{
-  pthread_mutex_lock(&procevent_lock);
+static void procevent_dispatch_notification(long pid, gauge_t value,
+                                            char *process, cdtime_t timestamp) {
 
-  while (procevent_thread_loop > 0) {
-    int status;
+  notification_t n = {
+      .severity = (value == 1 ? NOTIF_OKAY : NOTIF_FAILURE),
+      .time = cdtime(),
+      .plugin = "procevent",
+      .type = "gauge",
+      .type_instance = "process_status",
+  };
 
-    pthread_mutex_unlock(&procevent_lock);
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
 
-    usleep(1000);
+  char *buf = NULL;
+  gen_message_payload(value, pid, process, timestamp, &buf);
 
-    status = read_event();
+  int status = plugin_notification_meta_add_string(&n, "ves", buf);
 
-    pthread_mutex_lock(&procevent_lock);
+  if (status < 0) {
+    sfree(buf);
+    ERROR("procevent plugin: unable to set notification VES metadata: %s",
+          STRERRNO);
+    return;
+  }
 
-    if (status < 0) {
-      procevent_thread_error = 1;
-      break;
+  DEBUG("procevent plugin: notification VES metadata: %s",
+        n.meta->nm_value.nm_string);
+
+  DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
+        pid, process);
+
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // strdup'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
+
+// Read from ring buffer and dispatch to write plugins
+static void read_ring_buffer() {
+  pthread_mutex_lock(&procevent_data_lock);
+
+  // If there's currently nothing to read from the buffer,
+  // then wait
+  if (ring.head == ring.tail)
+    pthread_cond_wait(&procevent_cond, &procevent_data_lock);
+
+  while (ring.head != ring.tail) {
+    int next = ring.tail + 1;
+
+    if (next >= ring.maxLen)
+      next = 0;
+
+    if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] == PROCEVENT_EXITED) {
+      processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
+
+      if (pl != NULL) {
+        // This process is of interest to us, so publish its EXITED status
+        procevent_dispatch_notification(
+            ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
+            ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
+            ring.buffer[ring.tail][RBUF_TIME_INDEX]);
+        DEBUG(
+            "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
+            "list",
+            pl->pid, pl->process);
+        pl->pid = -1;
+        pl->last_status = -1;
+      }
+    } else if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] ==
+               PROCEVENT_STARTED) {
+      // a new process has started, so check if we should monitor it
+      processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
+
+      // If we had already seen this process name and pid combo before,
+      // and the last message was a "process started" message, don't send
+      // the notfication again
+
+      if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
+        // This process is of interest to us, so publish its STARTED status
+        procevent_dispatch_notification(
+            ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
+            ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
+            ring.buffer[ring.tail][RBUF_TIME_INDEX]);
+
+        pl->last_status = PROCEVENT_STARTED;
+
+        DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
+              "list",
+              pl->pid, pl->process);
+      }
     }
 
-    if (procevent_thread_loop <= 0)
+    ring.tail = next;
+  }
+
+  pthread_mutex_unlock(&procevent_data_lock);
+}
+
+// Entry point for thread responsible for listening
+// to netlink socket and writing data to ring buffer
+static void *procevent_netlink_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  while (procevent_netlink_thread_loop > 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    int status = read_event();
+
+    pthread_mutex_lock(&procevent_thread_lock);
+
+    if (status < 0) {
+      procevent_netlink_thread_error = 1;
       break;
-  } /* while (procevent_thread_loop > 0) */
+    }
+  } /* while (procevent_netlink_thread_loop > 0) */
 
-  pthread_mutex_unlock(&procevent_lock);
+  pthread_mutex_unlock(&procevent_thread_lock);
 
-  return ((void *)0);
-} /* }}} void *procevent_thread */
+  return (void *)0;
+} /* }}} void *procevent_netlink_thread */
 
-static int start_thread(void) /* {{{ */
+// Entry point for thread responsible for reading from
+// ring buffer and dispatching notifications
+static void *procevent_dequeue_thread(void *arg) /* {{{ */
 {
-  int status;
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  while (procevent_dequeue_thread_loop > 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    read_ring_buffer();
+
+    pthread_mutex_lock(&procevent_thread_lock);
+  } /* while (procevent_dequeue_thread_loop > 0) */
+
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return (void *)0;
+} /* }}} void *procevent_dequeue_thread */
 
-  pthread_mutex_lock(&procevent_lock);
+static int start_netlink_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
 
-  if (procevent_thread_loop != 0) {
-    pthread_mutex_unlock(&procevent_lock);
-    return (0);
+  if (procevent_netlink_thread_loop != 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return 0;
   }
 
+  int status;
+
   if (nl_sock == -1) {
     status = nl_connect();
 
-    if (status != 0)
+    if (status != 0) {
+      pthread_mutex_unlock(&procevent_thread_lock);
       return status;
+    }
 
     status = set_proc_ev_listen(true);
-    if (status == -1)
+    if (status == -1) {
+      pthread_mutex_unlock(&procevent_thread_lock);
       return status;
+    }
   }
 
   DEBUG("procevent plugin: socket created and bound");
 
-  procevent_thread_loop = 1;
-  procevent_thread_error = 0;
+  procevent_netlink_thread_loop = 1;
+  procevent_netlink_thread_error = 0;
 
-  status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
-                                procevent_thread,
+  status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL,
+                                procevent_netlink_thread,
                                 /* arg = */ (void *)0, "procevent");
   if (status != 0) {
-    procevent_thread_loop = 0;
-    ERROR("procevent plugin: Starting thread failed.");
-    pthread_mutex_unlock(&procevent_lock);
-    return (-1);
+    procevent_netlink_thread_loop = 0;
+    ERROR("procevent plugin: Starting netlink thread failed.");
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    int status2 = close(nl_sock);
+
+    if (status2 != 0) {
+      ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
+            status2, STRERRNO);
+    }
+
+    nl_sock = -1;
+
+    return -1;
   }
 
-  pthread_mutex_unlock(&procevent_lock);
-  return (0);
-} /* }}} int start_thread */
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return status;
+} /* }}} int start_netlink_thread */
 
-static int stop_thread(int shutdown) /* {{{ */
+static int start_dequeue_thread(void) /* {{{ */
 {
-  int status;
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_dequeue_thread_loop != 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return 0;
+  }
+
+  procevent_dequeue_thread_loop = 1;
+
+  int status = plugin_thread_create(&procevent_dequeue_thread_id,
+                                    /* attr = */ NULL, procevent_dequeue_thread,
+                                    /* arg = */ (void *)0, "procevent");
+  if (status != 0) {
+    procevent_dequeue_thread_loop = 0;
+    ERROR("procevent plugin: Starting dequeue thread failed.");
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return -1;
+  }
+
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+  int status = start_netlink_thread();
+  int status2 = start_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int start_threads */
+
+static int stop_netlink_thread(int shutdown) /* {{{ */
+{
+  int socket_status;
 
   if (nl_sock != -1) {
-    status = close(nl_sock);
-    if (status != 0) {
+    socket_status = close(nl_sock);
+    if (socket_status != 0) {
       ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
-            status, strerror(errno));
-      return (-1);
-    } else
-      nl_sock = -1;
-  }
+            socket_status, strerror(errno));
+    }
 
-  pthread_mutex_lock(&procevent_lock);
+    nl_sock = -1;
+  } else
+    socket_status = 0;
 
-  if (procevent_thread_loop == 0) {
-    pthread_mutex_unlock(&procevent_lock);
-    return (-1);
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_netlink_thread_loop == 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return -1;
   }
 
-  procevent_thread_loop = 0;
+  // Set thread termination status
+  procevent_netlink_thread_loop = 0;
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  // Let threads waiting on access to the data know to move
+  // on such that they'll see the thread's termination status
   pthread_cond_broadcast(&procevent_cond);
-  pthread_mutex_unlock(&procevent_lock);
+
+  int thread_status;
 
   if (shutdown == 1) {
     // Calling pthread_cancel here in
     // the case of a shutdown just assures that the thread is
     // gone and that the process has been fully terminated.
 
-    DEBUG("procevent plugin: Canceling thread for process shutdown");
+    DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
 
-    status = pthread_cancel(procevent_thread_id);
+    thread_status = pthread_cancel(procevent_netlink_thread_id);
 
-    if (status != 0) {
-      ERROR("procevent plugin: Unable to cancel thread: %d", status);
-      status = -1;
-    }
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("procevent plugin: Unable to cancel netlink thread: %d",
+            thread_status);
+      thread_status = -1;
+    } else
+      thread_status = 0;
   } else {
-    status = pthread_join(procevent_thread_id, /* return = */ NULL);
-    if (status != 0) {
-      ERROR("procevent plugin: Stopping thread failed.");
-      status = -1;
-    }
+    thread_status =
+        pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("procevent plugin: Stopping netlink thread failed.");
+      thread_status = -1;
+    } else
+      thread_status = 0;
   }
 
-  pthread_mutex_lock(&procevent_lock);
-  memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
-  procevent_thread_error = 0;
-  pthread_mutex_unlock(&procevent_lock);
+  pthread_mutex_lock(&procevent_thread_lock);
+  memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
+  procevent_netlink_thread_error = 0;
+  pthread_mutex_unlock(&procevent_thread_lock);
 
-  DEBUG("procevent plugin: Finished requesting stop of thread");
+  DEBUG("procevent plugin: Finished requesting stop of netlink thread");
 
-  return (status);
-} /* }}} int stop_thread */
+  if (socket_status != 0)
+    return socket_status;
+  else
+    return thread_status;
+} /* }}} int stop_netlink_thread */
 
-static int procevent_init(void) /* {{{ */
+static int stop_dequeue_thread() /* {{{ */
 {
-  int status;
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_dequeue_thread_loop == 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return -1;
+  }
+
+  procevent_dequeue_thread_loop = 0;
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  pthread_cond_broadcast(&procevent_cond);
+
+  // Calling pthread_cancel here just assures that the thread is
+  // gone and that the process has been fully terminated.
+
+  DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
 
+  int status = pthread_cancel(procevent_dequeue_thread_id);
+
+  if (status != 0 && status != ESRCH) {
+    ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
+    status = -1;
+  } else
+    status = 0;
+
+  pthread_mutex_lock(&procevent_thread_lock);
+  memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
+
+  return status;
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads() /* {{{ */
+{
+  int status = stop_netlink_thread(1);
+  int status2 = stop_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int stop_threads */
+
+static int procevent_init(void) /* {{{ */
+{
   ring.head = 0;
   ring.tail = 0;
   ring.maxLen = buffer_length;
-  ring.buffer = (long long unsigned int **)malloc(
-      buffer_length * sizeof(long long unsigned int *));
+  ring.buffer = (cdtime_t **)calloc(buffer_length, sizeof(cdtime_t *));
 
   for (int i = 0; i < buffer_length; i++) {
-    ring.buffer[i] = (long long unsigned int *)malloc(
-        PROCEVENT_FIELDS * sizeof(long long unsigned int));
+    ring.buffer[i] = (cdtime_t *)calloc(PROCEVENT_FIELDS, sizeof(cdtime_t));
   }
 
-  status = process_map_refresh();
+  int status = process_map_refresh();
 
   if (status == -1) {
     ERROR("procevent plugin: Initial process mapping failed.");
-    return (-1);
+    return -1;
   }
 
   if (ignorelist == NULL) {
     NOTICE("procevent plugin: No processes have been configured.");
-    return (-1);
+    return -1;
   }
 
-  return (start_thread());
+  return start_threads();
 } /* }}} int procevent_init */
 
 static int procevent_config(const char *key, const char *value) /* {{{ */
 {
-  int status;
-
   if (ignorelist == NULL)
     ignorelist = ignorelist_create(/* invert = */ 1);
 
+  if (ignorelist == NULL) {
+    return -1;
+  }
+
   if (strcasecmp(key, "BufferLength") == 0) {
     buffer_length = atoi(value);
   } else if (strcasecmp(key, "Process") == 0) {
     ignorelist_add(ignorelist, value);
-  } else if (strcasecmp(key, "RegexProcess") == 0) {
+  } else if (strcasecmp(key, "ProcessRegex") == 0) {
 #if HAVE_REGEX_H
-    status = ignorelist_add(ignorelist, value);
+    int status = ignorelist_add(ignorelist, value);
 
     if (status != 0) {
       ERROR("procevent plugin: invalid regular expression: %s", value);
-      return (1);
+      return 1;
     }
 #else
     WARNING("procevent plugin: The plugin has been compiled without support "
-            "for the \"RegexProcess\" option.");
+            "for the \"ProcessRegex\" option.");
 #endif
   } else {
-    return (-1);
+    return -1;
   }
 
-  return (0);
+  return 0;
 } /* }}} int procevent_config */
 
-static void procevent_dispatch_notification(long pid,
-                                            const char *type, /* {{{ */
-                                            gauge_t value, char *process,
-                                            long long unsigned int timestamp) {
-  char *buf = NULL;
-  notification_t n = {NOTIF_FAILURE, cdtime(), "", "", "procevent", "", "", "",
-                      NULL};
-
-  if (value == 1)
-    n.severity = NOTIF_OKAY;
-
-  sstrncpy(n.host, hostname_g, sizeof(n.host));
-  sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
-  sstrncpy(n.type, "gauge", sizeof(n.type));
-  sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance));
-
-  gen_message_payload(value, pid, process, timestamp, &buf);
-
-  notification_meta_t *m = calloc(1, sizeof(*m));
-
-  if (m == NULL) {
-    char errbuf[1024];
-    sfree(buf);
-    ERROR("procevent plugin: unable to allocate metadata: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
-    return;
-  }
-
-  sstrncpy(m->name, "ves", sizeof(m->name));
-  m->nm_value.nm_string = sstrdup(buf);
-  m->type = NM_TYPE_STRING;
-  n.meta = m;
-
-  DEBUG("procevent plugin: notification message: %s",
-        n.meta->nm_value.nm_string);
-
-  DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
-        pid, process);
-
-  plugin_dispatch_notification(&n);
-  plugin_notification_meta_free(n.meta);
-
-  // malloc'd in gen_message_payload
-  if (buf != NULL)
-    sfree(buf);
-}
-
 static int procevent_read(void) /* {{{ */
 {
-  if (procevent_thread_error != 0) {
-    ERROR(
-        "procevent plugin: The interface thread had a problem. Restarting it.");
+  pthread_mutex_lock(&procevent_thread_lock);
 
-    stop_thread(0);
+  if (procevent_netlink_thread_error != 0) {
 
-    start_thread();
+    pthread_mutex_unlock(&procevent_thread_lock);
 
-    return (-1);
-  } /* if (procevent_thread_error != 0) */
+    ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
 
-  pthread_mutex_lock(&procevent_lock);
-
-  while (ring.head != ring.tail) {
-    int next = ring.tail + 1;
+    stop_netlink_thread(0);
 
-    if (next >= ring.maxLen)
-      next = 0;
+    start_netlink_thread();
 
-    if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
-      processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
-
-      if (pl != NULL) {
-        // This process is of interest to us, so publish its EXITED status
-        procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
-                                        ring.buffer[ring.tail][1], pl->process,
-                                        ring.buffer[ring.tail][3]);
-        DEBUG(
-            "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
-            "list",
-            pl->pid, pl->process);
-        pl->pid = -1;
-        pl->last_status = -1;
-      }
-    } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
-      // a new process has started, so check if we should monitor it
-      processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
-
-      // If we had already seen this process name and pid combo before,
-      // and the last message was a "process started" message, don't send
-      // the notfication again
-
-      if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
-        // This process is of interest to us, so publish its STARTED status
-        procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
-                                        ring.buffer[ring.tail][1], pl->process,
-                                        ring.buffer[ring.tail][3]);
-
-        pl->last_status = PROCEVENT_STARTED;
-
-        DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
-              "list",
-              pl->pid, pl->process);
-      }
-    }
-
-    ring.tail = next;
-  }
+    return -1;
+  } /* if (procevent_netlink_thread_error != 0) */
 
-  pthread_mutex_unlock(&procevent_lock);
+  pthread_mutex_unlock(&procevent_thread_lock);
 
-  return (0);
+  return 0;
 } /* }}} int procevent_read */
 
 static int procevent_shutdown(void) /* {{{ */
 {
-  processlist_t *pl;
+  DEBUG("procevent plugin: Shutting down threads.");
 
-  DEBUG("procevent plugin: Shutting down thread.");
-
-  if (stop_thread(1) < 0)
-    return (-1);
+  int status = stop_threads();
 
   for (int i = 0; i < buffer_length; i++) {
     free(ring.buffer[i]);
@@ -1174,7 +1277,7 @@ static int procevent_shutdown(void) /* {{{ */
 
   free(ring.buffer);
 
-  pl = processlist_head;
+  processlist_t *pl = processlist_head;
   while (pl != NULL) {
     processlist_t *pl_next;
 
@@ -1186,7 +1289,9 @@ static int procevent_shutdown(void) /* {{{ */
     pl = pl_next;
   }
 
-  return (0);
+  ignorelist_free(ignorelist);
+
+  return status;
 } /* }}} int procevent_shutdown */
 
 void module_register(void) {