2 * collectd - src/procevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
30 #include "utils/common/common.h"
31 #include "utils/ignorelist/ignorelist.h"
32 #include "utils_complain.h"
38 #include <sys/socket.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
51 #include <yajl/yajl_common.h>
52 #include <yajl/yajl_gen.h>
53 #if HAVE_YAJL_YAJL_VERSION_H
54 #include <yajl/yajl_version.h>
56 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
57 #define HAVE_YAJL_V2 1
60 #define PROCEVENT_EXITED 0
61 #define PROCEVENT_STARTED 1
62 #define PROCEVENT_FIELDS 3 // pid, status, timestamp
64 #define PROCDIR "/proc"
65 #define RBUF_PROC_ID_INDEX 0
66 #define RBUF_PROC_STATUS_INDEX 1
67 #define RBUF_TIME_INDEX 2
69 #define PROCEVENT_DOMAIN_FIELD "domain"
70 #define PROCEVENT_DOMAIN_VALUE "fault"
71 #define PROCEVENT_EVENT_ID_FIELD "eventId"
72 #define PROCEVENT_EVENT_NAME_FIELD "eventName"
73 #define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
74 #define PROCEVENT_EVENT_NAME_UP_VALUE "up"
75 #define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
76 #define PROCEVENT_PRIORITY_FIELD "priority"
77 #define PROCEVENT_PRIORITY_VALUE "high"
78 #define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
79 #define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
80 #define PROCEVENT_SEQUENCE_FIELD "sequence"
81 #define PROCEVENT_SEQUENCE_VALUE "0"
82 #define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
83 #define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
84 #define PROCEVENT_VERSION_FIELD "version"
85 #define PROCEVENT_VERSION_VALUE "1.0"
87 #define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
88 #define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
89 #define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
90 #define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
91 #define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
92 #define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
93 #define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
94 #define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
95 #define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
96 #define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
97 #define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
98 #define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
99 #define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
100 #define PROCEVENT_VF_STATUS_FIELD "vfStatus"
101 #define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
102 #define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
115 struct processlist_s {
121 struct processlist_s *next;
123 typedef struct processlist_s processlist_t;
128 static ignorelist_t *ignorelist = NULL;
130 static int procevent_netlink_thread_loop = 0;
131 static int procevent_netlink_thread_error = 0;
132 static pthread_t procevent_netlink_thread_id;
133 static int procevent_dequeue_thread_loop = 0;
134 static pthread_t procevent_dequeue_thread_id;
135 static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
136 static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
137 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
138 static int nl_sock = -1;
139 static int buffer_length;
140 static circbuf_t ring;
141 static processlist_t *processlist_head = NULL;
142 static int event_id = 0;
144 static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"};
145 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
151 static int gen_message_payload(int state, long pid, char *process,
152 cdtime_t timestamp, char **buf) {
153 const unsigned char *buf2;
155 char json_str[DATA_MAX_NAME_LEN];
157 #if !defined(HAVE_YAJL_V2)
158 yajl_gen_config conf = {0};
163 g = yajl_gen_alloc(NULL);
164 yajl_gen_config(g, yajl_gen_beautify, 0);
167 g = yajl_gen_alloc(&conf, NULL);
172 // *** BEGIN common event header ***
174 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
178 if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
179 strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
182 if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
183 strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
187 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
188 strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
191 event_id = event_id + 1;
192 if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
196 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
201 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
202 strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
205 if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
206 (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
207 : PROCEVENT_EVENT_NAME_UP_VALUE)) < 0) {
211 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
212 yajl_gen_status_ok) {
217 if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
218 strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
222 if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
223 CDTIME_T_TO_US(cdtime())) < 0) {
227 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
232 if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
233 strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
236 if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
237 strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
240 // reportingEntityName
241 if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
242 strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
246 if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
247 strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
252 if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
253 strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
256 if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
257 strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
261 if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
262 strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
266 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
270 // startEpochMicrosec
271 if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
272 strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
276 if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
277 CDTIME_T_TO_US(timestamp)) < 0) {
281 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
286 if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
287 strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
290 if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
291 strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
294 // *** END common event header ***
296 // *** BEGIN fault fields ***
298 if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
299 strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
303 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
307 if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
308 strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
312 if (snprintf(json_str, sizeof(json_str), "process %s (%ld) state change",
317 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
318 yajl_gen_status_ok) {
323 if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
324 strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
328 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
333 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
334 strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
340 (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
341 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
342 strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
343 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
348 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
349 strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
353 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
354 strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
358 // faultFieldsVersion
359 if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
360 strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
364 if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
365 strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
370 if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
371 strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
375 if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
376 (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
377 : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE)) < 0) {
381 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
382 yajl_gen_status_ok) {
387 if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
388 strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
393 (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
394 : PROCEVENT_VF_STATUS_NORMAL_VALUE),
395 strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
396 : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
400 // *** END fault fields ***
402 // close fault and header fields
403 if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
404 yajl_gen_map_close(g) != yajl_gen_status_ok)
407 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
410 *buf = strdup((char *)buf2);
413 ERROR("procevent plugin: strdup failed during gen_message_payload: %s",
424 ERROR("procevent plugin: gen_message_payload failed to generate JSON");
428 // Does /proc/<pid>/comm contain a process name we are interested in?
429 // NOTE: Caller MUST hold procevent_data_lock when calling this function
430 static processlist_t *process_check(long pid) {
433 int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
435 if ((len < 0) || (len >= BUFSIZE)) {
436 WARNING("procevent process_check: process name too large");
442 if (NULL == (fh = fopen(file, "r"))) {
443 // No /proc/<pid>/comm for this pid, just ignore
444 DEBUG("procevent plugin: no comm file available for pid %ld", pid);
448 char buffer[BUFSIZE];
449 int retval = fscanf(fh, "%[^\n]", buffer);
452 WARNING("procevent process_check: unable to read comm file for pid %ld",
458 // Now that we have the process name in the buffer, check if we are
459 // even interested in it
460 if (ignorelist_match(ignorelist, buffer) != 0) {
461 DEBUG("procevent process_check: ignoring process %s (%ld)", buffer, pid);
472 // Go through the processlist linked list and look for the process name
473 // in /proc/<pid>/comm. If found:
474 // 1. If pl->pid is -1, then set pl->pid to <pid> (and return that object)
475 // 2. If pl->pid is not -1, then another <process name> process was already
476 // found. If <pid> == pl->pid, this is an old match, so do nothing.
477 // If the <pid> is different, however, make a new processlist_t and
478 // associate <pid> with it (with the same process name as the existing).
481 processlist_t *match = NULL;
483 for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
485 int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
488 DEBUG("procevent plugin: process %ld name match for %s", pid, buffer);
490 if (pl->pid == pid) {
491 // this is a match, and we've already stored the exact pid/name combo
492 DEBUG("procevent plugin: found exact match with name %s, PID %ld for "
494 pl->process, pl->pid, pid);
497 } else if (pl->pid == -1) {
498 // this is a match, and we've found a candidate processlist_t to store
499 // this new pid/name combo
500 DEBUG("procevent plugin: reusing pl object with PID %ld for incoming "
506 } else if (pl->pid != -1) {
507 // this is a match, but another instance of this process has already
508 // claimed this pid/name combo,
510 DEBUG("procevent plugin: found pl object with matching name for "
511 "incoming PID %ld, but object is in use by PID %ld",
520 (match != NULL && match->pid != -1 && match->pid != pid)) {
521 // if there wasn't an existing match, OR
522 // if there was a match but the associated processlist_t object already
523 // contained a pid/name combo,
524 // then make a new one and add it to the linked list
526 DEBUG("procevent plugin: allocating new processlist_t object for PID %ld "
530 processlist_t *pl2 = calloc(1, sizeof(*pl2));
532 ERROR("procevent plugin: calloc failed during process_check: %s",
537 char *process = strdup(buffer);
538 if (process == NULL) {
540 ERROR("procevent plugin: strdup failed during process_check: %s",
545 pl2->process = process;
547 pl2->next = processlist_head;
548 processlist_head = pl2;
556 // Does our map have this PID or name?
557 // NOTE: Caller MUST hold procevent_data_lock when calling this function
558 static processlist_t *process_map_check(long pid, char *process) {
559 for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
567 int match_process = 0;
569 if (process != NULL) {
570 if (strcmp(pl->process, process) == 0)
576 if ((pid > 0 && process == NULL && match_pid == 1) ||
577 (pid < 0 && process != NULL && match_process == 1) ||
578 (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)) {
590 static int process_map_refresh(void) {
592 DIR *proc = opendir(PROCDIR);
595 ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO);
601 struct dirent *dent = readdir(proc);
603 if (errno == 0) /* end of directory */
606 ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
612 if (dent->d_name[0] == '.')
617 int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
618 if ((len < 0) || (len >= BUFSIZE))
623 int status = stat(file, &statbuf);
625 WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO);
629 if (!S_ISDIR(statbuf.st_mode))
632 len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
633 if ((len < 0) || (len >= BUFSIZE))
638 for (int i = 0; i < strlen(dent->d_name); i++) {
639 if (!isdigit(dent->d_name[i])) {
648 // Check if we need to store this pid/name combo in our processlist_t linked
650 int this_pid = atoi(dent->d_name);
651 pthread_mutex_lock(&procevent_data_lock);
652 processlist_t *pl = process_check(this_pid);
653 pthread_mutex_unlock(&procevent_data_lock);
656 DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
657 this_pid, pl->process);
665 static int nl_connect() {
666 struct sockaddr_nl sa_nl = {
667 .nl_family = AF_NETLINK,
668 .nl_groups = CN_IDX_PROC,
672 nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
674 ERROR("procevent plugin: socket open failed: %d", errno);
678 int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
680 ERROR("procevent plugin: socket bind failed: %d", errno);
689 static int set_proc_ev_listen(bool enable) {
690 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
691 struct nlmsghdr nl_hdr;
692 struct __attribute__((__packed__)) {
693 struct cn_msg cn_msg;
694 enum proc_cn_mcast_op cn_mcast;
698 memset(&nlcn_msg, 0, sizeof(nlcn_msg));
699 nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
700 nlcn_msg.nl_hdr.nlmsg_pid = getpid();
701 nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
703 nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
704 nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
705 nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
707 nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
709 int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
711 ERROR("procevent plugin: subscribing to netlink process events failed: %d",
719 // Read from netlink socket and write to ring buffer
720 static int read_event() {
721 int recv_flags = MSG_DONTWAIT;
722 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
723 struct nlmsghdr nl_hdr;
724 struct __attribute__((__packed__)) {
725 struct cn_msg cn_msg;
726 struct proc_event proc_ev;
734 pthread_mutex_lock(&procevent_thread_lock);
736 if (procevent_netlink_thread_loop <= 0) {
737 pthread_mutex_unlock(&procevent_thread_lock);
741 pthread_mutex_unlock(&procevent_thread_lock);
743 int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
747 } else if (status < 0) {
748 if (errno == EAGAIN || errno == EWOULDBLOCK) {
749 pthread_mutex_lock(&procevent_data_lock);
751 // There was nothing more to receive for now, so...
752 // If ring head does not equal ring tail, then there is data
753 // in the ring buffer for the dequeue thread to read, so
755 if (ring.head != ring.tail)
756 pthread_cond_signal(&procevent_cond);
758 pthread_mutex_unlock(&procevent_data_lock);
760 // Since there was nothing to receive, set recv to block and
764 } else if (errno != EINTR) {
765 ERROR("procevent plugin: socket receive error: %d", errno);
768 // Interrupt, so just continue and try again
773 // We successfully received a message, so don't block on the next
774 // read in case there are more (and if there aren't, it will be
775 // handled above in the EWOULDBLOCK error-checking)
776 recv_flags = MSG_DONTWAIT;
779 int proc_status = -1;
781 switch (nlcn_msg.proc_ev.what) {
782 case PROC_EVENT_EXEC:
783 proc_status = PROCEVENT_STARTED;
784 proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
786 case PROC_EVENT_EXIT:
787 proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
788 proc_status = PROCEVENT_EXITED;
791 // Otherwise not of interest
795 // If we're interested in this process status event, place the event
796 // in the ring buffer for consumption by the dequeue (dispatch) thread.
798 if (proc_status != -1) {
799 pthread_mutex_lock(&procevent_data_lock);
801 int next = ring.head + 1;
802 if (next >= ring.maxLen)
805 if (next == ring.tail) {
806 // Buffer is full, signal the dequeue thread to process the buffer
807 // and clean it out, and then sleep
808 WARNING("procevent plugin: ring buffer full");
810 pthread_cond_signal(&procevent_cond);
811 pthread_mutex_unlock(&procevent_data_lock);
816 DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
817 (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
818 (unsigned long long)cdtime());
820 ring.buffer[ring.head][RBUF_PROC_ID_INDEX] = proc_id;
821 ring.buffer[ring.head][RBUF_PROC_STATUS_INDEX] = proc_status;
822 ring.buffer[ring.head][RBUF_TIME_INDEX] = cdtime();
827 pthread_mutex_unlock(&procevent_data_lock);
834 static void procevent_dispatch_notification(long pid, gauge_t value,
835 char *process, cdtime_t timestamp) {
838 .severity = (value == 1 ? NOTIF_OKAY : NOTIF_FAILURE),
840 .plugin = "procevent",
842 .type_instance = "process_status",
845 sstrncpy(n.host, hostname_g, sizeof(n.host));
846 sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
849 gen_message_payload(value, pid, process, timestamp, &buf);
851 int status = plugin_notification_meta_add_string(&n, "ves", buf);
855 ERROR("procevent plugin: unable to set notification VES metadata: %s",
860 DEBUG("procevent plugin: notification VES metadata: %s",
861 n.meta->nm_value.nm_string);
863 DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
866 plugin_dispatch_notification(&n);
867 plugin_notification_meta_free(n.meta);
869 // strdup'd in gen_message_payload
874 // Read from ring buffer and dispatch to write plugins
875 static void read_ring_buffer() {
876 pthread_mutex_lock(&procevent_data_lock);
878 // If there's currently nothing to read from the buffer,
880 if (ring.head == ring.tail)
881 pthread_cond_wait(&procevent_cond, &procevent_data_lock);
883 while (ring.head != ring.tail) {
884 int next = ring.tail + 1;
886 if (next >= ring.maxLen)
889 if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] == PROCEVENT_EXITED) {
890 processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
893 // This process is of interest to us, so publish its EXITED status
894 procevent_dispatch_notification(
895 ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
896 ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
897 ring.buffer[ring.tail][RBUF_TIME_INDEX]);
899 "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
901 pl->pid, pl->process);
903 pl->last_status = -1;
905 } else if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] ==
907 // a new process has started, so check if we should monitor it
908 processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
910 // If we had already seen this process name and pid combo before,
911 // and the last message was a "process started" message, don't send
912 // the notfication again
914 if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
915 // This process is of interest to us, so publish its STARTED status
916 procevent_dispatch_notification(
917 ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
918 ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
919 ring.buffer[ring.tail][RBUF_TIME_INDEX]);
921 pl->last_status = PROCEVENT_STARTED;
923 DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
925 pl->pid, pl->process);
932 pthread_mutex_unlock(&procevent_data_lock);
935 // Entry point for thread responsible for listening
936 // to netlink socket and writing data to ring buffer
937 static void *procevent_netlink_thread(void *arg) /* {{{ */
939 pthread_mutex_lock(&procevent_thread_lock);
941 while (procevent_netlink_thread_loop > 0) {
942 pthread_mutex_unlock(&procevent_thread_lock);
944 int status = read_event();
946 pthread_mutex_lock(&procevent_thread_lock);
949 procevent_netlink_thread_error = 1;
952 } /* while (procevent_netlink_thread_loop > 0) */
954 pthread_mutex_unlock(&procevent_thread_lock);
957 } /* }}} void *procevent_netlink_thread */
959 // Entry point for thread responsible for reading from
960 // ring buffer and dispatching notifications
961 static void *procevent_dequeue_thread(void *arg) /* {{{ */
963 pthread_mutex_lock(&procevent_thread_lock);
965 while (procevent_dequeue_thread_loop > 0) {
966 pthread_mutex_unlock(&procevent_thread_lock);
970 pthread_mutex_lock(&procevent_thread_lock);
971 } /* while (procevent_dequeue_thread_loop > 0) */
973 pthread_mutex_unlock(&procevent_thread_lock);
976 } /* }}} void *procevent_dequeue_thread */
978 static int start_netlink_thread(void) /* {{{ */
980 pthread_mutex_lock(&procevent_thread_lock);
982 if (procevent_netlink_thread_loop != 0) {
983 pthread_mutex_unlock(&procevent_thread_lock);
990 status = nl_connect();
993 pthread_mutex_unlock(&procevent_thread_lock);
997 status = set_proc_ev_listen(true);
999 pthread_mutex_unlock(&procevent_thread_lock);
1004 DEBUG("procevent plugin: socket created and bound");
1006 procevent_netlink_thread_loop = 1;
1007 procevent_netlink_thread_error = 0;
1009 status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL,
1010 procevent_netlink_thread,
1011 /* arg = */ (void *)0, "procevent");
1013 procevent_netlink_thread_loop = 0;
1014 ERROR("procevent plugin: Starting netlink thread failed.");
1015 pthread_mutex_unlock(&procevent_thread_lock);
1017 int status2 = close(nl_sock);
1020 ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
1029 pthread_mutex_unlock(&procevent_thread_lock);
1032 } /* }}} int start_netlink_thread */
1034 static int start_dequeue_thread(void) /* {{{ */
1036 pthread_mutex_lock(&procevent_thread_lock);
1038 if (procevent_dequeue_thread_loop != 0) {
1039 pthread_mutex_unlock(&procevent_thread_lock);
1043 procevent_dequeue_thread_loop = 1;
1045 int status = plugin_thread_create(&procevent_dequeue_thread_id,
1046 /* attr = */ NULL, procevent_dequeue_thread,
1047 /* arg = */ (void *)0, "procevent");
1049 procevent_dequeue_thread_loop = 0;
1050 ERROR("procevent plugin: Starting dequeue thread failed.");
1051 pthread_mutex_unlock(&procevent_thread_lock);
1055 pthread_mutex_unlock(&procevent_thread_lock);
1058 } /* }}} int start_dequeue_thread */
1060 static int start_threads(void) /* {{{ */
1062 int status = start_netlink_thread();
1063 int status2 = start_dequeue_thread();
1069 } /* }}} int start_threads */
1071 static int stop_netlink_thread(int shutdown) /* {{{ */
1075 if (nl_sock != -1) {
1076 socket_status = close(nl_sock);
1077 if (socket_status != 0) {
1078 ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
1079 socket_status, strerror(errno));
1086 pthread_mutex_lock(&procevent_thread_lock);
1088 if (procevent_netlink_thread_loop == 0) {
1089 pthread_mutex_unlock(&procevent_thread_lock);
1093 // Set thread termination status
1094 procevent_netlink_thread_loop = 0;
1095 pthread_mutex_unlock(&procevent_thread_lock);
1097 // Let threads waiting on access to the data know to move
1098 // on such that they'll see the thread's termination status
1099 pthread_cond_broadcast(&procevent_cond);
1103 if (shutdown == 1) {
1104 // Calling pthread_cancel here in
1105 // the case of a shutdown just assures that the thread is
1106 // gone and that the process has been fully terminated.
1108 DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
1110 thread_status = pthread_cancel(procevent_netlink_thread_id);
1112 if (thread_status != 0 && thread_status != ESRCH) {
1113 ERROR("procevent plugin: Unable to cancel netlink thread: %d",
1120 pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
1121 if (thread_status != 0 && thread_status != ESRCH) {
1122 ERROR("procevent plugin: Stopping netlink thread failed.");
1128 pthread_mutex_lock(&procevent_thread_lock);
1129 memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
1130 procevent_netlink_thread_error = 0;
1131 pthread_mutex_unlock(&procevent_thread_lock);
1133 DEBUG("procevent plugin: Finished requesting stop of netlink thread");
1135 if (socket_status != 0)
1136 return socket_status;
1138 return thread_status;
1139 } /* }}} int stop_netlink_thread */
1141 static int stop_dequeue_thread() /* {{{ */
1143 pthread_mutex_lock(&procevent_thread_lock);
1145 if (procevent_dequeue_thread_loop == 0) {
1146 pthread_mutex_unlock(&procevent_thread_lock);
1150 procevent_dequeue_thread_loop = 0;
1151 pthread_mutex_unlock(&procevent_thread_lock);
1153 pthread_cond_broadcast(&procevent_cond);
1155 // Calling pthread_cancel here just assures that the thread is
1156 // gone and that the process has been fully terminated.
1158 DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
1160 int status = pthread_cancel(procevent_dequeue_thread_id);
1162 if (status != 0 && status != ESRCH) {
1163 ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
1168 pthread_mutex_lock(&procevent_thread_lock);
1169 memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
1170 pthread_mutex_unlock(&procevent_thread_lock);
1172 DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
1175 } /* }}} int stop_dequeue_thread */
1177 static int stop_threads() /* {{{ */
1179 int status = stop_netlink_thread(1);
1180 int status2 = stop_dequeue_thread();
1186 } /* }}} int stop_threads */
1188 static int procevent_init(void) /* {{{ */
1192 ring.maxLen = buffer_length;
1193 ring.buffer = (cdtime_t **)calloc(buffer_length, sizeof(cdtime_t *));
1195 for (int i = 0; i < buffer_length; i++) {
1196 ring.buffer[i] = (cdtime_t *)calloc(PROCEVENT_FIELDS, sizeof(cdtime_t));
1199 int status = process_map_refresh();
1202 ERROR("procevent plugin: Initial process mapping failed.");
1206 if (ignorelist == NULL) {
1207 NOTICE("procevent plugin: No processes have been configured.");
1211 return start_threads();
1212 } /* }}} int procevent_init */
1214 static int procevent_config(const char *key, const char *value) /* {{{ */
1216 if (ignorelist == NULL)
1217 ignorelist = ignorelist_create(/* invert = */ 1);
1219 if (ignorelist == NULL) {
1223 if (strcasecmp(key, "BufferLength") == 0) {
1224 buffer_length = atoi(value);
1225 } else if (strcasecmp(key, "Process") == 0) {
1226 ignorelist_add(ignorelist, value);
1227 } else if (strcasecmp(key, "ProcessRegex") == 0) {
1229 int status = ignorelist_add(ignorelist, value);
1232 ERROR("procevent plugin: invalid regular expression: %s", value);
1236 WARNING("procevent plugin: The plugin has been compiled without support "
1237 "for the \"ProcessRegex\" option.");
1244 } /* }}} int procevent_config */
1246 static int procevent_read(void) /* {{{ */
1248 pthread_mutex_lock(&procevent_thread_lock);
1250 if (procevent_netlink_thread_error != 0) {
1252 pthread_mutex_unlock(&procevent_thread_lock);
1254 ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
1256 stop_netlink_thread(0);
1258 start_netlink_thread();
1261 } /* if (procevent_netlink_thread_error != 0) */
1263 pthread_mutex_unlock(&procevent_thread_lock);
1266 } /* }}} int procevent_read */
1268 static int procevent_shutdown(void) /* {{{ */
1270 DEBUG("procevent plugin: Shutting down threads.");
1272 int status = stop_threads();
1274 for (int i = 0; i < buffer_length; i++) {
1275 free(ring.buffer[i]);
1280 processlist_t *pl = processlist_head;
1281 while (pl != NULL) {
1282 processlist_t *pl_next;
1292 ignorelist_free(ignorelist);
1295 } /* }}} int procevent_shutdown */
1297 void module_register(void) {
1298 plugin_register_config("procevent", procevent_config, config_keys,
1300 plugin_register_init("procevent", procevent_init);
1301 plugin_register_read("procevent", procevent_read);
1302 plugin_register_shutdown("procevent", procevent_shutdown);
1303 } /* void module_register */