+ if (procevent_netlink_thread_loop <= 0) {
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return 0;
+ }
+
+ pthread_mutex_unlock(&procevent_thread_lock);
+
+ int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
+
+ if (status == 0) {
+ return 0;
+ } else if (status < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ pthread_mutex_lock(&procevent_data_lock);
+
+ // There was nothing more to receive for now, so...
+ // If ring head does not equal ring tail, then there is data
+ // in the ring buffer for the dequeue thread to read, so
+ // signal it
+ if (ring.head != ring.tail)
+ pthread_cond_signal(&procevent_cond);
+
+ pthread_mutex_unlock(&procevent_data_lock);
+
+ // Since there was nothing to receive, set recv to block and
+ // try again
+ recv_flags = 0;
+ continue;
+ } else if (errno != EINTR) {
+ ERROR("procevent plugin: socket receive error: %d", errno);
+ return -1;
+ } else {
+ // Interrupt, so just continue and try again
+ continue;
+ }
+ }
+
+ // We successfully received a message, so don't block on the next
+ // read in case there are more (and if there aren't, it will be
+ // handled above in the EWOULDBLOCK error-checking)
+ recv_flags = MSG_DONTWAIT;
+
+ int proc_id = -1;
+ int proc_status = -1;
+
+ switch (nlcn_msg.proc_ev.what) {
+ case PROC_EVENT_EXEC:
+ proc_status = PROCEVENT_STARTED;
+ proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
+ break;
+ case PROC_EVENT_EXIT:
+ proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
+ proc_status = PROCEVENT_EXITED;
+ break;
+ default:
+ // Otherwise not of interest
+ break;
+ }
+
+ // If we're interested in this process status event, place the event
+ // in the ring buffer for consumption by the dequeue (dispatch) thread.
+
+ if (proc_status != -1) {
+ pthread_mutex_lock(&procevent_data_lock);
+
+ int next = ring.head + 1;
+ if (next >= ring.maxLen)
+ next = 0;
+
+ if (next == ring.tail) {
+ // Buffer is full, signal the dequeue thread to process the buffer
+ // and clean it out, and then sleep
+ WARNING("procevent plugin: ring buffer full");
+
+ pthread_cond_signal(&procevent_cond);
+ pthread_mutex_unlock(&procevent_data_lock);
+
+ usleep(1000);
+ continue;
+ } else {
+ DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
+ (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
+ (unsigned long long)cdtime());
+
+ ring.buffer[ring.head][RBUF_PROC_ID_INDEX] = proc_id;
+ ring.buffer[ring.head][RBUF_PROC_STATUS_INDEX] = proc_status;
+ ring.buffer[ring.head][RBUF_TIME_INDEX] = cdtime();
+
+ ring.head = next;
+ }
+
+ pthread_mutex_unlock(&procevent_data_lock);