2 * collectd - src/procevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
38 #include <sys/socket.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
51 #include <yajl/yajl_common.h>
52 #include <yajl/yajl_gen.h>
53 #if HAVE_YAJL_YAJL_VERSION_H
54 #include <yajl/yajl_version.h>
56 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
57 #define HAVE_YAJL_V2 1
60 #define PROCEVENT_EXITED 0
61 #define PROCEVENT_STARTED 1
62 #define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp
64 #define PROCDIR "/proc"
66 #define PROCEVENT_DOMAIN_FIELD "domain"
67 #define PROCEVENT_DOMAIN_VALUE "fault"
68 #define PROCEVENT_EVENT_ID_FIELD "eventId"
69 #define PROCEVENT_EVENT_NAME_FIELD "eventName"
70 #define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
71 #define PROCEVENT_EVENT_NAME_UP_VALUE "up"
72 #define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
73 #define PROCEVENT_PRIORITY_FIELD "priority"
74 #define PROCEVENT_PRIORITY_VALUE "high"
75 #define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
76 #define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
77 #define PROCEVENT_SEQUENCE_FIELD "sequence"
78 #define PROCEVENT_SEQUENCE_VALUE "0"
79 #define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
80 #define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
81 #define PROCEVENT_VERSION_FIELD "version"
82 #define PROCEVENT_VERSION_VALUE "1.0"
84 #define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
85 #define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
86 #define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
87 #define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
88 #define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
89 #define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
90 #define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
91 #define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
92 #define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
93 #define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
94 #define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
95 #define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
96 #define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
97 #define PROCEVENT_VF_STATUS_FIELD "vfStatus"
98 #define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
99 #define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
109 long long unsigned int **buffer;
112 struct processlist_s {
118 struct processlist_s *next;
120 typedef struct processlist_s processlist_t;
125 static ignorelist_t *ignorelist = NULL;
127 static int procevent_netlink_thread_loop = 0;
128 static int procevent_netlink_thread_error = 0;
129 static pthread_t procevent_netlink_thread_id;
130 static int procevent_dequeue_thread_loop = 0;
131 static pthread_t procevent_dequeue_thread_id;
132 static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
133 static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
134 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
135 static int nl_sock = -1;
136 static int buffer_length;
137 static circbuf_t ring;
138 static processlist_t *processlist_head = NULL;
139 static int event_id = 0;
141 static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"};
142 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
148 static void procevent_dispatch_notification(long pid, const char *type,
149 gauge_t value, char *process,
150 long long unsigned int timestamp);
156 static int gen_message_payload(int state, long pid, char *process,
157 long long unsigned int timestamp, char **buf) {
158 const unsigned char *buf2;
160 char json_str[DATA_MAX_NAME_LEN];
162 #if !defined(HAVE_YAJL_V2)
163 yajl_gen_config conf = {};
170 g = yajl_gen_alloc(NULL);
171 yajl_gen_config(g, yajl_gen_beautify, 0);
174 g = yajl_gen_alloc(&conf, NULL);
179 // *** BEGIN common event header ***
181 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
185 if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
186 strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
189 if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
190 strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
194 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
195 strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
198 event_id = event_id + 1;
199 int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
200 memset(json_str, '\0', DATA_MAX_NAME_LEN);
201 snprintf(json_str, event_id_len, "%d", event_id);
203 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
208 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
209 strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
212 int event_name_len = 0;
213 event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid
214 event_name_len = event_name_len + strlen(process); // process name
215 event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
216 event_name_len = event_name_len +
217 13; // "process", 3 spaces, 2 parentheses and null-terminator
218 memset(json_str, '\0', DATA_MAX_NAME_LEN);
219 snprintf(json_str, event_name_len, "process %s (%ld) %s", process, pid,
220 (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
221 : PROCEVENT_EVENT_NAME_UP_VALUE));
223 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
224 yajl_gen_status_ok) {
229 if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
230 strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
234 int last_epoch_microsec_len =
235 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
236 memset(json_str, '\0', DATA_MAX_NAME_LEN);
237 snprintf(json_str, last_epoch_microsec_len, "%llu",
238 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
240 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
245 if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
246 strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
249 if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
250 strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
253 // reportingEntityName
254 if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
255 strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
259 if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
260 strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
265 if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
266 strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
269 if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
270 strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
274 if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
275 strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
279 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
283 // startEpochMicrosec
284 if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
285 strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
289 int start_epoch_microsec_len =
290 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
291 memset(json_str, '\0', DATA_MAX_NAME_LEN);
292 snprintf(json_str, start_epoch_microsec_len, "%llu",
293 (long long unsigned int)timestamp);
295 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
300 if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
301 strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
304 if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
305 strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
308 // *** END common event header ***
310 // *** BEGIN fault fields ***
312 if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
313 strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
317 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
321 if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
322 strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
326 int alarm_condition_len = 0;
327 alarm_condition_len =
328 alarm_condition_len + (sizeof(char) * sizeof(int) * 4); // pid
329 alarm_condition_len = alarm_condition_len + strlen(process); // process name
330 alarm_condition_len =
331 alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2
332 // parentheses and null-terminator
333 memset(json_str, '\0', DATA_MAX_NAME_LEN);
334 snprintf(json_str, alarm_condition_len, "process %s (%ld) state change",
337 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
338 yajl_gen_status_ok) {
343 if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
344 strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
348 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
353 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
354 strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
359 g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
360 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
361 strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
362 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
367 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
368 strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
372 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
373 strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
377 // faultFieldsVersion
378 if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
379 strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
383 if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
384 strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
389 if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
390 strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
394 int specific_problem_len = 0;
395 specific_problem_len =
396 specific_problem_len + (sizeof(char) * sizeof(int) * 4); // pid
397 specific_problem_len = specific_problem_len + strlen(process); // process name
398 specific_problem_len =
399 specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up"
400 specific_problem_len =
401 specific_problem_len +
402 13; // "process", 3 spaces, 2 parentheses and null-terminator
403 memset(json_str, '\0', DATA_MAX_NAME_LEN);
404 snprintf(json_str, specific_problem_len, "process %s (%ld) %s", process, pid,
405 (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
406 : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE));
408 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
409 yajl_gen_status_ok) {
414 if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
415 strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
419 g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
420 : PROCEVENT_VF_STATUS_NORMAL_VALUE),
421 strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
422 : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
426 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
429 // *** END fault fields ***
431 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
434 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
437 *buf = strdup((char *)buf2);
440 ERROR("procevent plugin: strdup failed during gen_message_payload: %s",
451 ERROR("procevent plugin: gen_message_payload failed to generate JSON");
455 // Does /proc/<pid>/comm contain a process name we are interested in?
456 // NOTE: Caller MUST hold procevent_data_lock when calling this function
457 static processlist_t *process_check(long pid) {
460 int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
462 if ((len < 0) || (len >= BUFSIZE)) {
463 WARNING("procevent process_check: process name too large");
469 if (NULL == (fh = fopen(file, "r"))) {
470 // No /proc/<pid>/comm for this pid, just ignore
471 DEBUG("procevent plugin: no comm file available for pid %ld", pid);
475 char buffer[BUFSIZE];
476 int retval = fscanf(fh, "%[^\n]", buffer);
479 WARNING("procevent process_check: unable to read comm file for pid %ld",
485 // Now that we have the process name in the buffer, check if we are
486 // even interested in it
487 if (ignorelist_match(ignorelist, buffer) != 0) {
488 DEBUG("procevent process_check: ignoring process %s (%ld)", buffer, pid);
499 // Go through the processlist linked list and look for the process name
500 // in /proc/<pid>/comm. If found:
501 // 1. If pl->pid is -1, then set pl->pid to <pid> (and return that object)
502 // 2. If pl->pid is not -1, then another <process name> process was already
503 // found. If <pid> == pl->pid, this is an old match, so do nothing.
504 // If the <pid> is different, however, make a new processlist_t and
505 // associate <pid> with it (with the same process name as the existing).
508 processlist_t *match = NULL;
510 for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
512 int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
515 DEBUG("procevent plugin: process %ld name match for %s", pid, buffer);
517 if (pl->pid == pid) {
518 // this is a match, and we've already stored the exact pid/name combo
519 DEBUG("procevent plugin: found exact match with name %s, PID %ld for "
521 pl->process, pl->pid, pid);
524 } else if (pl->pid == -1) {
525 // this is a match, and we've found a candidate processlist_t to store
526 // this new pid/name combo
527 DEBUG("procevent plugin: reusing pl object with PID %ld for incoming "
533 } else if (pl->pid != -1) {
534 // this is a match, but another instance of this process has already
535 // claimed this pid/name combo,
537 DEBUG("procevent plugin: found pl object with matching name for "
538 "incoming PID %ld, but object is in use by PID %ld",
547 (match != NULL && match->pid != -1 && match->pid != pid)) {
548 // if there wasn't an existing match, OR
549 // if there was a match but the associated processlist_t object already
550 // contained a pid/name combo,
551 // then make a new one and add it to the linked list
553 DEBUG("procevent plugin: allocating new processlist_t object for PID %ld "
557 processlist_t *pl2 = calloc(1, sizeof(*pl2));
559 ERROR("procevent plugin: calloc failed during process_check: %s",
564 char *process = strdup(buffer);
565 if (process == NULL) {
567 ERROR("procevent plugin: strdup failed during process_check: %s",
572 pl2->process = process;
574 pl2->next = processlist_head;
575 processlist_head = pl2;
583 // Does our map have this PID or name?
584 // NOTE: Caller MUST hold procevent_data_lock when calling this function
585 static processlist_t *process_map_check(long pid, char *process) {
586 for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
594 int match_process = 0;
596 if (process != NULL) {
597 if (strcmp(pl->process, process) == 0)
603 if (pid > 0 && process == NULL && match_pid == 1)
605 else if (pid < 0 && process != NULL && match_process == 1)
607 else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
618 static int process_map_refresh(void) {
620 DIR *proc = opendir(PROCDIR);
623 ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO);
629 struct dirent *dent = readdir(proc);
631 if (errno == 0) /* end of directory */
634 ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
640 if (dent->d_name[0] == '.')
645 int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
646 if ((len < 0) || (len >= BUFSIZE))
651 int status = stat(file, &statbuf);
653 WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO);
657 if (!S_ISDIR(statbuf.st_mode))
660 len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
661 if ((len < 0) || (len >= BUFSIZE))
666 for (int i = 0; i < strlen(dent->d_name); i++) {
667 if (!isdigit(dent->d_name[i])) {
676 // Check if we need to store this pid/name combo in our processlist_t linked
678 int this_pid = atoi(dent->d_name);
679 pthread_mutex_lock(&procevent_data_lock);
680 processlist_t *pl = process_check(this_pid);
681 pthread_mutex_unlock(&procevent_data_lock);
684 DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
685 this_pid, pl->process);
693 static int nl_connect() {
694 struct sockaddr_nl sa_nl = {
695 .nl_family = AF_NETLINK, .nl_groups = CN_IDX_PROC, .nl_pid = getpid(),
698 nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
700 ERROR("procevent plugin: socket open failed: %d", errno);
704 int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
706 ERROR("procevent plugin: socket bind failed: %d", errno);
714 static int set_proc_ev_listen(bool enable) {
715 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
716 struct nlmsghdr nl_hdr;
717 struct __attribute__((__packed__)) {
718 struct cn_msg cn_msg;
719 enum proc_cn_mcast_op cn_mcast;
723 memset(&nlcn_msg, 0, sizeof(nlcn_msg));
724 nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
725 nlcn_msg.nl_hdr.nlmsg_pid = getpid();
726 nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
728 nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
729 nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
730 nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
732 nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
734 int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
736 ERROR("procevent plugin: subscribing to netlink process events failed: %d",
744 // Read from netlink socket and write to ring buffer
745 static int read_event() {
746 int recv_flags = MSG_DONTWAIT;
747 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
748 struct nlmsghdr nl_hdr;
749 struct __attribute__((__packed__)) {
750 struct cn_msg cn_msg;
751 struct proc_event proc_ev;
759 pthread_mutex_lock(&procevent_thread_lock);
761 if (procevent_netlink_thread_loop <= 0) {
762 pthread_mutex_unlock(&procevent_thread_lock);
766 pthread_mutex_unlock(&procevent_thread_lock);
768 int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
772 } else if (status < 0) {
773 if (errno == EAGAIN || errno == EWOULDBLOCK) {
774 pthread_mutex_lock(&procevent_data_lock);
776 // There was nothing more to receive for now, so...
777 // If ring head does not equal ring tail, there is data
778 // in the ring buffer for the dequeue thread to read, so
780 if (ring.head != ring.tail)
781 pthread_cond_signal(&procevent_cond);
783 pthread_mutex_unlock(&procevent_data_lock);
785 // Since there was nothing to receive, set recv to block and
789 } else if (errno != EINTR) {
790 ERROR("procevent plugin: socket receive error: %d", errno);
793 // Interrupt, so just return
798 // We successfully received a message, so don't block on the next
799 // read in case there are more (and if there aren't, it will be
800 // handled above in the error-checking)
801 recv_flags = MSG_DONTWAIT;
804 int proc_status = -1;
807 switch (nlcn_msg.proc_ev.what) {
808 case PROC_EVENT_NONE:
809 case PROC_EVENT_FORK:
812 // Not of interest in current version
814 case PROC_EVENT_EXEC:
815 proc_status = PROCEVENT_STARTED;
816 proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
818 case PROC_EVENT_EXIT:
819 proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
820 proc_status = PROCEVENT_EXITED;
821 proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
827 // If we're interested in this process status event, place the event
828 // in the ring buffer for consumption by the main polling thread.
830 if (proc_status != -1) {
831 pthread_mutex_lock(&procevent_data_lock);
833 int next = ring.head + 1;
834 if (next >= ring.maxLen)
837 if (next == ring.tail) {
838 // Buffer is full, signal the dequeue thread to process the buffer
839 // and clean it out, and then sleep
840 WARNING("procevent plugin: ring buffer full");
842 pthread_cond_signal(&procevent_cond);
843 pthread_mutex_unlock(&procevent_data_lock);
848 DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
849 (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
850 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
852 if (proc_status == PROCEVENT_EXITED) {
853 ring.buffer[ring.head][0] = proc_id;
854 ring.buffer[ring.head][1] = proc_status;
855 ring.buffer[ring.head][2] = proc_extra;
856 ring.buffer[ring.head][3] =
857 (long long unsigned int)CDTIME_T_TO_US(cdtime());
859 ring.buffer[ring.head][0] = proc_id;
860 ring.buffer[ring.head][1] = proc_status;
861 ring.buffer[ring.head][2] = 0;
862 ring.buffer[ring.head][3] =
863 (long long unsigned int)CDTIME_T_TO_US(cdtime());
869 pthread_mutex_unlock(&procevent_data_lock);
876 // Read from ring buffer and dispatch to write plugins
877 static void read_ring_buffer() {
878 pthread_mutex_lock(&procevent_data_lock);
880 // If there's currently nothing to read from the buffer,
882 if (ring.head == ring.tail)
883 pthread_cond_wait(&procevent_cond, &procevent_data_lock);
885 while (ring.head != ring.tail) {
886 int next = ring.tail + 1;
888 if (next >= ring.maxLen)
891 if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
892 processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
895 // This process is of interest to us, so publish its EXITED status
896 procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
897 ring.buffer[ring.tail][1], pl->process,
898 ring.buffer[ring.tail][3]);
900 "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
902 pl->pid, pl->process);
904 pl->last_status = -1;
906 } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
907 // a new process has started, so check if we should monitor it
908 processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
910 // If we had already seen this process name and pid combo before,
911 // and the last message was a "process started" message, don't send
912 // the notfication again
914 if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
915 // This process is of interest to us, so publish its STARTED status
916 procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
917 ring.buffer[ring.tail][1], pl->process,
918 ring.buffer[ring.tail][3]);
920 pl->last_status = PROCEVENT_STARTED;
922 DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
924 pl->pid, pl->process);
931 pthread_mutex_unlock(&procevent_data_lock);
934 // Entry point for thread responsible for listening
935 // to netlink socket and writing data to ring buffer
936 static void *procevent_netlink_thread(void *arg) /* {{{ */
938 pthread_mutex_lock(&procevent_thread_lock);
940 while (procevent_netlink_thread_loop > 0) {
941 pthread_mutex_unlock(&procevent_thread_lock);
943 int status = read_event();
945 pthread_mutex_lock(&procevent_thread_lock);
948 procevent_netlink_thread_error = 1;
951 } /* while (procevent_netlink_thread_loop > 0) */
953 pthread_mutex_unlock(&procevent_thread_lock);
956 } /* }}} void *procevent_netlink_thread */
958 // Entry point for thread responsible for reading from
959 // ring buffer and dispatching notifications
960 static void *procevent_dequeue_thread(void *arg) /* {{{ */
962 pthread_mutex_lock(&procevent_thread_lock);
964 while (procevent_dequeue_thread_loop > 0) {
965 pthread_mutex_unlock(&procevent_thread_lock);
969 pthread_mutex_lock(&procevent_thread_lock);
970 } /* while (procevent_dequeue_thread_loop > 0) */
972 pthread_mutex_unlock(&procevent_thread_lock);
975 } /* }}} void *procevent_dequeue_thread */
977 static int start_netlink_thread(void) /* {{{ */
979 pthread_mutex_lock(&procevent_thread_lock);
981 if (procevent_netlink_thread_loop != 0) {
982 pthread_mutex_unlock(&procevent_thread_lock);
989 status = nl_connect();
992 pthread_mutex_unlock(&procevent_thread_lock);
996 status = set_proc_ev_listen(true);
998 pthread_mutex_unlock(&procevent_thread_lock);
1003 DEBUG("procevent plugin: socket created and bound");
1005 procevent_netlink_thread_loop = 1;
1006 procevent_netlink_thread_error = 0;
1008 status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL,
1009 procevent_netlink_thread,
1010 /* arg = */ (void *)0, "procevent");
1012 procevent_netlink_thread_loop = 0;
1013 ERROR("procevent plugin: Starting netlink thread failed.");
1014 pthread_mutex_unlock(&procevent_thread_lock);
1016 int status2 = close(nl_sock);
1019 ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
1027 pthread_mutex_unlock(&procevent_thread_lock);
1030 } /* }}} int start_netlink_thread */
1032 static int start_dequeue_thread(void) /* {{{ */
1034 pthread_mutex_lock(&procevent_thread_lock);
1036 if (procevent_dequeue_thread_loop != 0) {
1037 pthread_mutex_unlock(&procevent_thread_lock);
1041 procevent_dequeue_thread_loop = 1;
1043 int status = plugin_thread_create(&procevent_dequeue_thread_id,
1044 /* attr = */ NULL, procevent_dequeue_thread,
1045 /* arg = */ (void *)0, "procevent");
1047 procevent_dequeue_thread_loop = 0;
1048 ERROR("procevent plugin: Starting dequeue thread failed.");
1049 pthread_mutex_unlock(&procevent_thread_lock);
1053 pthread_mutex_unlock(&procevent_thread_lock);
1056 } /* }}} int start_dequeue_thread */
1058 static int start_threads(void) /* {{{ */
1060 int status = start_netlink_thread();
1061 int status2 = start_dequeue_thread();
1067 } /* }}} int start_threads */
1069 static int stop_netlink_thread(int shutdown) /* {{{ */
1073 if (nl_sock != -1) {
1074 socket_status = close(nl_sock);
1075 if (socket_status != 0) {
1076 ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
1077 socket_status, strerror(errno));
1084 pthread_mutex_lock(&procevent_thread_lock);
1086 if (procevent_netlink_thread_loop == 0) {
1087 pthread_mutex_unlock(&procevent_thread_lock);
1091 // Set thread termination status
1092 procevent_netlink_thread_loop = 0;
1093 pthread_mutex_unlock(&procevent_thread_lock);
1095 // Let threads waiting on access to the data know to move
1096 // on such that they'll see the thread's termination status
1097 pthread_cond_broadcast(&procevent_cond);
1101 if (shutdown == 1) {
1102 // Calling pthread_cancel here in
1103 // the case of a shutdown just assures that the thread is
1104 // gone and that the process has been fully terminated.
1106 DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
1108 thread_status = pthread_cancel(procevent_netlink_thread_id);
1110 if (thread_status != 0 && thread_status != ESRCH) {
1111 ERROR("procevent plugin: Unable to cancel netlink thread: %d",
1118 pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
1119 if (thread_status != 0 && thread_status != ESRCH) {
1120 ERROR("procevent plugin: Stopping netlink thread failed.");
1126 pthread_mutex_lock(&procevent_thread_lock);
1127 memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
1128 procevent_netlink_thread_error = 0;
1129 pthread_mutex_unlock(&procevent_thread_lock);
1131 DEBUG("procevent plugin: Finished requesting stop of netlink thread");
1133 if (socket_status != 0)
1134 return socket_status;
1136 return thread_status;
1137 } /* }}} int stop_netlink_thread */
1139 static int stop_dequeue_thread(int shutdown) /* {{{ */
1143 pthread_mutex_lock(&procevent_thread_lock);
1145 if (procevent_dequeue_thread_loop == 0) {
1146 pthread_mutex_unlock(&procevent_thread_lock);
1150 procevent_dequeue_thread_loop = 0;
1151 pthread_mutex_unlock(&procevent_thread_lock);
1153 pthread_cond_broadcast(&procevent_cond);
1155 if (shutdown == 1) {
1156 // Calling pthread_cancel here in
1157 // the case of a shutdown just assures that the thread is
1158 // gone and that the process has been fully terminated.
1160 DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
1162 status = pthread_cancel(procevent_dequeue_thread_id);
1164 if (status != 0 && status != ESRCH) {
1165 ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
1170 status = pthread_join(procevent_dequeue_thread_id, /* return = */ NULL);
1171 if (status != 0 && status != ESRCH) {
1172 ERROR("procevent plugin: Stopping dequeue thread failed.");
1178 pthread_mutex_lock(&procevent_thread_lock);
1179 memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
1180 pthread_mutex_unlock(&procevent_thread_lock);
1182 DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
1185 } /* }}} int stop_dequeue_thread */
1187 static int stop_threads(int shutdown) /* {{{ */
1189 int status = stop_netlink_thread(shutdown);
1190 int status2 = stop_dequeue_thread(shutdown);
1196 } /* }}} int stop_threads */
1198 static int procevent_init(void) /* {{{ */
1202 ring.maxLen = buffer_length;
1203 ring.buffer = (long long unsigned int **)calloc(
1204 buffer_length, sizeof(long long unsigned int *));
1206 for (int i = 0; i < buffer_length; i++) {
1207 ring.buffer[i] = (long long unsigned int *)calloc(
1208 PROCEVENT_FIELDS, sizeof(long long unsigned int));
1211 int status = process_map_refresh();
1214 ERROR("procevent plugin: Initial process mapping failed.");
1218 if (ignorelist == NULL) {
1219 NOTICE("procevent plugin: No processes have been configured.");
1223 return start_threads();
1224 } /* }}} int procevent_init */
1226 static int procevent_config(const char *key, const char *value) /* {{{ */
1228 if (ignorelist == NULL)
1229 ignorelist = ignorelist_create(/* invert = */ 1);
1231 if (strcasecmp(key, "BufferLength") == 0) {
1232 buffer_length = atoi(value);
1233 } else if (strcasecmp(key, "Process") == 0) {
1234 ignorelist_add(ignorelist, value);
1235 } else if (strcasecmp(key, "ProcessRegex") == 0) {
1237 int status = ignorelist_add(ignorelist, value);
1240 ERROR("procevent plugin: invalid regular expression: %s", value);
1244 WARNING("procevent plugin: The plugin has been compiled without support "
1245 "for the \"ProcessRegex\" option.");
1252 } /* }}} int procevent_config */
1254 static void procevent_dispatch_notification(long pid,
1255 const char *type, /* {{{ */
1256 gauge_t value, char *process,
1257 long long unsigned int timestamp) {
1259 notification_t n = {(value == 1 ? NOTIF_OKAY : NOTIF_FAILURE),
1268 sstrncpy(n.host, hostname_g, sizeof(n.host));
1269 sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
1270 sstrncpy(n.type, "gauge", sizeof(n.type));
1271 sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance));
1274 gen_message_payload(value, pid, process, timestamp, &buf);
1276 notification_meta_t *m = calloc(1, sizeof(*m));
1280 ERROR("procevent plugin: unable to allocate metadata: %s", STRERRNO);
1284 sstrncpy(m->name, "ves", sizeof(m->name));
1285 m->nm_value.nm_string = sstrdup(buf);
1286 m->type = NM_TYPE_STRING;
1289 DEBUG("procevent plugin: notification message: %s",
1290 n.meta->nm_value.nm_string);
1292 DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
1295 plugin_dispatch_notification(&n);
1296 plugin_notification_meta_free(n.meta);
1298 // strdup'd in gen_message_payload
1303 static int procevent_read(void) /* {{{ */
1305 pthread_mutex_lock(&procevent_thread_lock);
1307 if (procevent_netlink_thread_error != 0) {
1309 pthread_mutex_unlock(&procevent_thread_lock);
1311 ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
1313 stop_netlink_thread(0);
1315 start_netlink_thread();
1318 } /* if (procevent_netlink_thread_error != 0) */
1320 pthread_mutex_unlock(&procevent_thread_lock);
1323 } /* }}} int procevent_read */
1325 static int procevent_shutdown(void) /* {{{ */
1327 DEBUG("procevent plugin: Shutting down threads.");
1329 int status = stop_threads(1);
1331 for (int i = 0; i < buffer_length; i++) {
1332 free(ring.buffer[i]);
1337 processlist_t *pl = processlist_head;
1338 while (pl != NULL) {
1339 processlist_t *pl_next;
1349 ignorelist_free(ignorelist);
1352 } /* }}} int procevent_shutdown */
1354 void module_register(void) {
1355 plugin_register_config("procevent", procevent_config, config_keys,
1357 plugin_register_init("procevent", procevent_init);
1358 plugin_register_read("procevent", procevent_read);
1359 plugin_register_shutdown("procevent", procevent_shutdown);
1360 } /* void module_register */