Use a separate dequeue thread to dispatch notifications
authorAndrew Bays <abays@redhat.com>
Wed, 17 Oct 2018 14:20:39 +0000 (10:20 -0400)
committerAndrew Bays <andrew.bays@gmail.com>
Wed, 4 Sep 2019 19:50:44 +0000 (15:50 -0400)
src/procevent.c

index a7a0107..fc8e430 100644 (file)
@@ -124,9 +124,12 @@ typedef struct processlist_s processlist_t;
  */
 static ignorelist_t *ignorelist = NULL;
 
-static int procevent_thread_loop = 0;
-static int procevent_thread_error = 0;
-static pthread_t procevent_thread_id;
+static int procevent_netlink_thread_loop = 0;
+static int procevent_netlink_thread_error = 0;
+static pthread_t procevent_netlink_thread_id;
+static int procevent_dequeue_thread_loop = 0;
+static int procevent_dequeue_thread_error = 0;
+static pthread_t procevent_dequeue_thread_id;
 static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
 static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -140,6 +143,14 @@ static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"};
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
 
 /*
+ * Prototypes
+ */
+
+static void procevent_dispatch_notification(long pid, const char *type,
+                                            gauge_t value, char *process,
+                                            long long unsigned int timestamp);
+
+/*
  * Private functions
  */
 
@@ -762,12 +773,14 @@ static int set_proc_ev_listen(bool enable) {
   return 0;
 }
 
+// Read from netlink socket and write to ring buffer
 static int read_event() {
   int status;
   int ret = 0;
   int proc_id = -1;
   int proc_status = -1;
   int proc_extra = -1;
+  int recv_flags = MSG_DONTWAIT;
   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
     struct nlmsghdr nl_hdr;
     struct __attribute__((__packed__)) {
@@ -783,22 +796,45 @@ static int read_event() {
 
     pthread_mutex_lock(&procevent_lock);
 
-    if (procevent_thread_loop <= 0)
+    if (procevent_netlink_thread_loop <= 0) {
+      pthread_mutex_unlock(&procevent_lock);
       return ret;
+    }
 
     pthread_mutex_unlock(&procevent_lock);
 
-    status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+    status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
 
     if (status == 0) {
       return 0;
-    } else if (status == -1) {
-      if (errno != EINTR) {
+    } else if (status < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        pthread_mutex_lock(&procevent_lock);
+
+        // There was nothing more to receive for now, so...
+        // If ring head does not equal ring tail, there is data
+        // in the ring buffer for the dequeue thread to read, so
+        // signal it
+        if (ring.head != ring.tail)
+          pthread_cond_signal(&procevent_cond);
+
+        pthread_mutex_unlock(&procevent_lock);
+
+        // Since there was nothing to receive, set recv to block and
+        // try again
+        recv_flags = 0;
+        continue;
+      } else if (errno != EINTR) {
         ERROR("procevent plugin: socket receive error: %d", errno);
         return -1;
       }
     }
 
+    // We successfully received a message, so don't block on the next
+    // read in case there are more (and if there aren't, it will be
+    // handled above in the error-checking)
+    recv_flags = MSG_DONTWAIT;
+
     switch (nlcn_msg.proc_ev.what) {
     case PROC_EVENT_NONE:
     case PROC_EVENT_FORK:
@@ -830,7 +866,15 @@ static int read_event() {
         next = 0;
 
       if (next == ring.tail) {
+        // Buffer is full, signal the dequeue thread to process the buffer
+        // and clean it out, and then sleep
         WARNING("procevent plugin: ring buffer full");
+
+        pthread_cond_signal(&procevent_cond);
+        pthread_mutex_unlock(&procevent_lock);
+
+        usleep(1000);
+        continue;
       } else {
         DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
               (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
@@ -860,11 +904,73 @@ static int read_event() {
   return ret;
 }
 
-static void *procevent_thread(void *arg) /* {{{ */
+// Read from ring buffer and dispatch to write plugins
+static int read_ring_buffer() {
+  pthread_mutex_lock(&procevent_lock);
+
+  // If there's currently nothing to read from the buffer,
+  // then wait
+  if (ring.head == ring.tail)
+    pthread_cond_wait(&procevent_cond, &procevent_lock);
+
+  while (ring.head != ring.tail) {
+    int next = ring.tail + 1;
+
+    if (next >= ring.maxLen)
+      next = 0;
+
+    if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
+      processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
+
+      if (pl != NULL) {
+        // This process is of interest to us, so publish its EXITED status
+        procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
+                                        ring.buffer[ring.tail][1], pl->process,
+                                        ring.buffer[ring.tail][3]);
+        DEBUG(
+            "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
+            "list",
+            pl->pid, pl->process);
+        pl->pid = -1;
+        pl->last_status = -1;
+      }
+    } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
+      // a new process has started, so check if we should monitor it
+      processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
+
+      // If we had already seen this process name and pid combo before,
+      // and the last message was a "process started" message, don't send
+      // the notfication again
+
+      if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
+        // This process is of interest to us, so publish its STARTED status
+        procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
+                                        ring.buffer[ring.tail][1], pl->process,
+                                        ring.buffer[ring.tail][3]);
+
+        pl->last_status = PROCEVENT_STARTED;
+
+        DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
+              "list",
+              pl->pid, pl->process);
+      }
+    }
+
+    ring.tail = next;
+  }
+
+  pthread_mutex_unlock(&procevent_lock);
+
+  return 0;
+}
+
+// Entry point for thread responsible for listening
+// to netlink socket and writing data to ring buffer
+static void *procevent_netlink_thread(void *arg) /* {{{ */
 {
   pthread_mutex_lock(&procevent_lock);
 
-  while (procevent_thread_loop > 0) {
+  while (procevent_netlink_thread_loop > 0) {
     int status;
 
     pthread_mutex_unlock(&procevent_lock);
@@ -876,26 +982,55 @@ static void *procevent_thread(void *arg) /* {{{ */
     pthread_mutex_lock(&procevent_lock);
 
     if (status < 0) {
-      procevent_thread_error = 1;
+      procevent_netlink_thread_error = 1;
+      break;
+    }
+
+    if (procevent_netlink_thread_loop <= 0)
+      break;
+  } /* while (procevent_netlink_thread_loop > 0) */
+
+  pthread_mutex_unlock(&procevent_lock);
+
+  return ((void *)0);
+} /* }}} void *procevent_netlink_thread */
+
+// Entry point for thread responsible for reading from
+// ring buffer and dispatching notifications
+static void *procevent_dequeue_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_lock);
+
+  while (procevent_dequeue_thread_loop > 0) {
+    int status;
+
+    pthread_mutex_unlock(&procevent_lock);
+
+    status = read_ring_buffer();
+
+    pthread_mutex_lock(&procevent_lock);
+
+    if (status < 0) {
+      procevent_dequeue_thread_error = 1;
       break;
     }
 
-    if (procevent_thread_loop <= 0)
+    if (procevent_dequeue_thread_loop <= 0)
       break;
-  } /* while (procevent_thread_loop > 0) */
+  } /* while (procevent_dequeue_thread_loop > 0) */
 
   pthread_mutex_unlock(&procevent_lock);
 
   return ((void *)0);
-} /* }}} void *procevent_thread */
+} /* }}} void *procevent_dequeue_thread */
 
-static int start_thread(void) /* {{{ */
+static int start_netlink_thread(void) /* {{{ */
 {
   int status;
 
   pthread_mutex_lock(&procevent_lock);
 
-  if (procevent_thread_loop != 0) {
+  if (procevent_netlink_thread_loop != 0) {
     pthread_mutex_unlock(&procevent_lock);
     return (0);
   }
@@ -913,24 +1048,67 @@ static int start_thread(void) /* {{{ */
 
   DEBUG("procevent plugin: socket created and bound");
 
-  procevent_thread_loop = 1;
-  procevent_thread_error = 0;
+  procevent_netlink_thread_loop = 1;
+  procevent_netlink_thread_error = 0;
 
-  status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
-                                procevent_thread,
+  status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL,
+                                procevent_netlink_thread,
                                 /* arg = */ (void *)0, "procevent");
   if (status != 0) {
-    procevent_thread_loop = 0;
-    ERROR("procevent plugin: Starting thread failed.");
+    procevent_netlink_thread_loop = 0;
+    ERROR("procevent plugin: Starting netlink thread failed.");
     pthread_mutex_unlock(&procevent_lock);
     return (-1);
   }
 
   pthread_mutex_unlock(&procevent_lock);
-  return (0);
-} /* }}} int start_thread */
 
-static int stop_thread(int shutdown) /* {{{ */
+  return status;
+} /* }}} int start_netlink_thread */
+
+static int start_dequeue_thread(void) /* {{{ */
+{
+  int status;
+
+  pthread_mutex_lock(&procevent_lock);
+
+  if (procevent_dequeue_thread_loop != 0) {
+    pthread_mutex_unlock(&procevent_lock);
+    return (0);
+  }
+
+  procevent_dequeue_thread_loop = 1;
+  procevent_dequeue_thread_error = 0;
+
+  status = plugin_thread_create(&procevent_dequeue_thread_id, /* attr = */ NULL,
+                                procevent_dequeue_thread,
+                                /* arg = */ (void *)0, "procevent");
+  if (status != 0) {
+    procevent_dequeue_thread_loop = 0;
+    ERROR("procevent plugin: Starting dequeue thread failed.");
+    pthread_mutex_unlock(&procevent_lock);
+    return (-1);
+  }
+
+  pthread_mutex_unlock(&procevent_lock);
+
+  return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+  int status, status2;
+
+  status = start_netlink_thread();
+  status2 = start_dequeue_thread();
+
+  if (status < 0)
+    return status;
+  else
+    return status2;
+} /* }}} int start_threads */
+
+static int stop_netlink_thread(int shutdown) /* {{{ */
 {
   int status;
 
@@ -946,12 +1124,12 @@ static int stop_thread(int shutdown) /* {{{ */
 
   pthread_mutex_lock(&procevent_lock);
 
-  if (procevent_thread_loop == 0) {
+  if (procevent_netlink_thread_loop == 0) {
     pthread_mutex_unlock(&procevent_lock);
     return (-1);
   }
 
-  procevent_thread_loop = 0;
+  procevent_netlink_thread_loop = 0;
   pthread_cond_broadcast(&procevent_cond);
   pthread_mutex_unlock(&procevent_lock);
 
@@ -960,31 +1138,94 @@ static int stop_thread(int shutdown) /* {{{ */
     // the case of a shutdown just assures that the thread is
     // gone and that the process has been fully terminated.
 
-    DEBUG("procevent plugin: Canceling thread for process shutdown");
+    DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
 
-    status = pthread_cancel(procevent_thread_id);
+    status = pthread_cancel(procevent_netlink_thread_id);
 
-    if (status != 0) {
-      ERROR("procevent plugin: Unable to cancel thread: %d", status);
+    if (status != 0 && status != ESRCH) {
+      ERROR("procevent plugin: Unable to cancel netlink thread: %d", status);
       status = -1;
-    }
+    } else
+      status = 0;
   } else {
-    status = pthread_join(procevent_thread_id, /* return = */ NULL);
-    if (status != 0) {
-      ERROR("procevent plugin: Stopping thread failed.");
+    status = pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
+    if (status != 0 && status != ESRCH) {
+      ERROR("procevent plugin: Stopping netlink thread failed.");
       status = -1;
-    }
+    } else
+      status = 0;
+  }
+
+  pthread_mutex_lock(&procevent_lock);
+  memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
+  procevent_netlink_thread_error = 0;
+  pthread_mutex_unlock(&procevent_lock);
+
+  DEBUG("procevent plugin: Finished requesting stop of netlink thread");
+
+  return (status);
+} /* }}} int stop_netlink_thread */
+
+static int stop_dequeue_thread(int shutdown) /* {{{ */
+{
+  int status;
+
+  pthread_mutex_lock(&procevent_lock);
+
+  if (procevent_dequeue_thread_loop == 0) {
+    pthread_mutex_unlock(&procevent_lock);
+    return (-1);
+  }
+
+  procevent_dequeue_thread_loop = 0;
+  pthread_cond_broadcast(&procevent_cond);
+  pthread_mutex_unlock(&procevent_lock);
+
+  if (shutdown == 1) {
+    // Calling pthread_cancel here in
+    // the case of a shutdown just assures that the thread is
+    // gone and that the process has been fully terminated.
+
+    DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
+
+    status = pthread_cancel(procevent_dequeue_thread_id);
+
+    if (status != 0 && status != ESRCH) {
+      ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
+      status = -1;
+    } else
+      status = 0;
+  } else {
+    status = pthread_join(procevent_dequeue_thread_id, /* return = */ NULL);
+    if (status != 0 && status != ESRCH) {
+      ERROR("procevent plugin: Stopping dequeue thread failed.");
+      status = -1;
+    } else
+      status = 0;
   }
 
   pthread_mutex_lock(&procevent_lock);
-  memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
-  procevent_thread_error = 0;
+  memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
+  procevent_dequeue_thread_error = 0;
   pthread_mutex_unlock(&procevent_lock);
 
-  DEBUG("procevent plugin: Finished requesting stop of thread");
+  DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
 
   return (status);
-} /* }}} int stop_thread */
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads(int shutdown) /* {{{ */
+{
+  int status, status2;
+
+  status = stop_netlink_thread(shutdown);
+  status2 = stop_dequeue_thread(shutdown);
+
+  if (status < 0)
+    return status;
+  else
+    return status2;
+} /* }}} int stop_threads */
 
 static int procevent_init(void) /* {{{ */
 {
@@ -1013,7 +1254,7 @@ static int procevent_init(void) /* {{{ */
     return (-1);
   }
 
-  return (start_thread());
+  return (start_threads());
 } /* }}} int procevent_init */
 
 static int procevent_config(const char *key, const char *value) /* {{{ */
@@ -1095,64 +1336,33 @@ static void procevent_dispatch_notification(long pid,
 
 static int procevent_read(void) /* {{{ */
 {
-  if (procevent_thread_error != 0) {
-    ERROR(
-        "procevent plugin: The interface thread had a problem. Restarting it.");
+  pthread_mutex_lock(&procevent_lock);
 
-    stop_thread(0);
+  if (procevent_netlink_thread_error != 0) {
 
-    start_thread();
+    pthread_mutex_unlock(&procevent_lock);
 
-    return (-1);
-  } /* if (procevent_thread_error != 0) */
+    ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
 
-  pthread_mutex_lock(&procevent_lock);
+    stop_netlink_thread(0);
 
-  while (ring.head != ring.tail) {
-    int next = ring.tail + 1;
-
-    if (next >= ring.maxLen)
-      next = 0;
+    start_netlink_thread();
 
-    if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
-      processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
+    return (-1);
+  } /* if (procevent_netlink_thread_error != 0) */
 
-      if (pl != NULL) {
-        // This process is of interest to us, so publish its EXITED status
-        procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
-                                        ring.buffer[ring.tail][1], pl->process,
-                                        ring.buffer[ring.tail][3]);
-        DEBUG(
-            "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
-            "list",
-            pl->pid, pl->process);
-        pl->pid = -1;
-        pl->last_status = -1;
-      }
-    } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
-      // a new process has started, so check if we should monitor it
-      processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
+  if (procevent_dequeue_thread_error != 0) {
 
-      // If we had already seen this process name and pid combo before,
-      // and the last message was a "process started" message, don't send
-      // the notfication again
+    pthread_mutex_unlock(&procevent_lock);
 
-      if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
-        // This process is of interest to us, so publish its STARTED status
-        procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
-                                        ring.buffer[ring.tail][1], pl->process,
-                                        ring.buffer[ring.tail][3]);
+    ERROR("procevent plugin: The dequeue thread had a problem. Restarting it.");
 
-        pl->last_status = PROCEVENT_STARTED;
+    stop_dequeue_thread(0);
 
-        DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
-              "list",
-              pl->pid, pl->process);
-      }
-    }
+    start_dequeue_thread();
 
-    ring.tail = next;
-  }
+    return (-1);
+  } /* if (procevent_dequeue_thread_error != 0) */
 
   pthread_mutex_unlock(&procevent_lock);
 
@@ -1161,12 +1371,12 @@ static int procevent_read(void) /* {{{ */
 
 static int procevent_shutdown(void) /* {{{ */
 {
+  int status;
   processlist_t *pl;
 
-  DEBUG("procevent plugin: Shutting down thread.");
+  DEBUG("procevent plugin: Shutting down threads.");
 
-  if (stop_thread(1) < 0)
-    return (-1);
+  status = stop_threads(1);
 
   for (int i = 0; i < buffer_length; i++) {
     free(ring.buffer[i]);
@@ -1186,7 +1396,7 @@ static int procevent_shutdown(void) /* {{{ */
     pl = pl_next;
   }
 
-  return (0);
+  return status;
 } /* }}} int procevent_shutdown */
 
 void module_register(void) {