2 * collectd - src/procevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
31 #include "utils_complain.h"
38 #include <sys/socket.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
51 #include <yajl/yajl_common.h>
52 #include <yajl/yajl_gen.h>
53 #if HAVE_YAJL_YAJL_VERSION_H
54 #include <yajl/yajl_version.h>
56 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
57 #define HAVE_YAJL_V2 1
60 #define PROCEVENT_EXITED 0
61 #define PROCEVENT_STARTED 1
62 #define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp
64 #define PROCDIR "/proc"
65 #define PROCEVENT_REGEX_MATCHES 1
67 #define PROCEVENT_DOMAIN_FIELD "domain"
68 #define PROCEVENT_DOMAIN_VALUE "fault"
69 #define PROCEVENT_EVENT_ID_FIELD "eventId"
70 #define PROCEVENT_EVENT_NAME_FIELD "eventName"
71 #define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
72 #define PROCEVENT_EVENT_NAME_UP_VALUE "up"
73 #define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
74 #define PROCEVENT_PRIORITY_FIELD "priority"
75 #define PROCEVENT_PRIORITY_VALUE "high"
76 #define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
77 #define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
78 #define PROCEVENT_SEQUENCE_FIELD "sequence"
79 #define PROCEVENT_SEQUENCE_VALUE "0"
80 #define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
81 #define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
82 #define PROCEVENT_VERSION_FIELD "version"
83 #define PROCEVENT_VERSION_VALUE "1.0"
85 #define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
86 #define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
87 #define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
88 #define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
89 #define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
90 #define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
91 #define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
92 #define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
93 #define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
94 #define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
95 #define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
96 #define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
97 #define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
98 #define PROCEVENT_VF_STATUS_FIELD "vfStatus"
99 #define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
100 #define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
110 long long unsigned int **buffer;
113 struct processlist_s {
117 regex_t process_regex_obj;
122 struct processlist_s *next;
124 typedef struct processlist_s processlist_t;
130 static int procevent_thread_loop = 0;
131 static int procevent_thread_error = 0;
132 static pthread_t procevent_thread_id;
133 static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
134 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
135 static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
136 static int nl_sock = -1;
137 static int buffer_length;
138 static circbuf_t ring;
139 static processlist_t *processlist_head = NULL;
140 static int event_id = 0;
142 static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"};
143 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
149 static int gen_message_payload(int state, int pid, char *process,
150 long long unsigned int timestamp, char **buf) {
151 const unsigned char *buf2;
154 #if !defined(HAVE_YAJL_V2)
155 yajl_gen_config conf = {};
162 g = yajl_gen_alloc(NULL);
163 yajl_gen_config(g, yajl_gen_beautify, 0);
166 g = yajl_gen_alloc(&conf, NULL);
171 // *** BEGIN common event header ***
173 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
177 if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
178 strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
181 if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
182 strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
186 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
187 strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
190 event_id = event_id + 1;
191 int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
192 char *event_id_str = malloc(event_id_len);
193 snprintf(event_id_str, event_id_len, "%d", event_id);
195 if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
196 yajl_gen_status_ok) {
204 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
205 strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
208 int event_name_len = 0;
209 event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid
210 event_name_len = event_name_len + strlen(process); // process name
211 event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
212 event_name_len = event_name_len +
213 13; // "process", 3 spaces, 2 parentheses and null-terminator
214 char *event_name_str = malloc(event_name_len);
215 memset(event_name_str, '\0', event_name_len);
216 snprintf(event_name_str, event_name_len, "process %s (%d) %s", process, pid,
217 (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
218 : PROCEVENT_EVENT_NAME_UP_VALUE));
220 if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
221 yajl_gen_status_ok) {
222 sfree(event_name_str);
226 sfree(event_name_str);
229 if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
230 strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
234 int last_epoch_microsec_len =
235 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
236 char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
237 snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
238 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
240 if (yajl_gen_number(g, last_epoch_microsec_str,
241 strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
242 sfree(last_epoch_microsec_str);
246 sfree(last_epoch_microsec_str);
249 if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
250 strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
253 if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
254 strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
257 // reportingEntityName
258 if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
259 strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
263 if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
264 strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
269 if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
270 strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
273 if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
274 strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
278 if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
279 strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
283 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
287 // startEpochMicrosec
288 if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
289 strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
293 int start_epoch_microsec_len =
294 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
295 char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
296 snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
297 (long long unsigned int)timestamp);
299 if (yajl_gen_number(g, start_epoch_microsec_str,
300 strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
301 sfree(start_epoch_microsec_str);
305 sfree(start_epoch_microsec_str);
308 if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
309 strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
312 if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
313 strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
316 // *** END common event header ***
318 // *** BEGIN fault fields ***
320 if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
321 strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
325 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
329 if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
330 strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
334 int alarm_condition_len = 0;
335 alarm_condition_len =
336 alarm_condition_len + (sizeof(char) * sizeof(int) * 4); // pid
337 alarm_condition_len = alarm_condition_len + strlen(process); // process name
338 alarm_condition_len =
339 alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2
340 // parentheses and null-terminator
341 char *alarm_condition_str = malloc(alarm_condition_len);
342 memset(alarm_condition_str, '\0', alarm_condition_len);
343 snprintf(alarm_condition_str, alarm_condition_len,
344 "process %s (%d) state change", process, pid);
346 if (yajl_gen_string(g, (u_char *)alarm_condition_str,
347 strlen(alarm_condition_str)) != yajl_gen_status_ok) {
348 sfree(alarm_condition_str);
352 sfree(alarm_condition_str);
355 if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
356 strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
360 if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
365 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
366 strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
371 g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
372 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
373 strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
374 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
379 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
380 strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
384 if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
385 strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
389 // faultFieldsVersion
390 if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
391 strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
395 if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
396 strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
401 if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
402 strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
406 int specific_problem_len = 0;
407 specific_problem_len =
408 specific_problem_len + (sizeof(char) * sizeof(int) * 4); // pid
409 specific_problem_len = specific_problem_len + strlen(process); // process name
410 specific_problem_len =
411 specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up"
412 specific_problem_len =
413 specific_problem_len +
414 13; // "process", 3 spaces, 2 parentheses and null-terminator
415 char *specific_problem_str = malloc(specific_problem_len);
416 memset(specific_problem_str, '\0', specific_problem_len);
417 snprintf(specific_problem_str, specific_problem_len, "process %s (%d) %s",
418 process, pid, (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
419 : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE));
421 if (yajl_gen_string(g, (u_char *)specific_problem_str,
422 strlen(specific_problem_str)) != yajl_gen_status_ok) {
423 sfree(specific_problem_str);
427 sfree(specific_problem_str);
430 if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
431 strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
435 g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
436 : PROCEVENT_VF_STATUS_NORMAL_VALUE),
437 strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
438 : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
442 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
445 // *** END fault fields ***
447 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
450 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
453 *buf = malloc(strlen((char *)buf2) + 1);
455 sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
463 ERROR("procevent plugin: gen_message_payload failed to generate JSON");
467 // Does /proc/<pid>/comm contain a process name we are interested in?
468 static processlist_t *process_check(int pid) {
469 int len, is_match, status, retval;
472 char buffer[BUFSIZE];
473 regmatch_t matches[PROCEVENT_REGEX_MATCHES];
475 len = snprintf(file, sizeof(file), PROCDIR "/%d/comm", pid);
477 if ((len < 0) || (len >= BUFSIZE)) {
478 WARNING("procevent process_check: process name too large");
482 if (NULL == (fh = fopen(file, "r"))) {
483 // No /proc/<pid>/comm for this pid, just ignore
484 DEBUG("procevent plugin: no comm file available for pid %d", pid);
488 retval = fscanf(fh, "%[^\n]", buffer);
491 WARNING("procevent process_check: unable to read comm file for pid %d",
497 // Go through the processlist linked list and look for the process name
498 // in /proc/<pid>/comm. If found:
499 // 1. If pl->pid is -1, then set pl->pid to <pid>
500 // 2. If pl->pid is not -1, then another <process name> process was already
501 // found. If <pid> == pl->pid, this is an old match, so do nothing.
502 // If the <pid> is different, however, make a new processlist_t and
503 // associate <pid> with it (with the same process name as the existing).
506 pthread_mutex_lock(&procevent_list_lock);
509 processlist_t *match = NULL;
511 for (pl = processlist_head; pl != NULL; pl = pl->next) {
512 if (pl->is_regex != 0) {
513 is_match = (regexec(&pl->process_regex_obj, buffer,
514 PROCEVENT_REGEX_MATCHES, matches, 0) == 0
518 is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
522 DEBUG("procevent plugin: process %d name match (pattern: %s) for %s", pid,
523 (pl->is_regex == 0 ? pl->process : pl->process_regex), buffer);
525 if (pl->is_regex == 1) {
526 // If this is a regex name, copy the actual process name into the object
527 // for cleaner log reporting
529 if (pl->process != NULL)
531 pl->process = strdup(buffer);
532 if (pl->process == NULL) {
534 ERROR("procevent plugin: strdup failed during process_check: %s",
535 sstrerror(errno, errbuf, sizeof(errbuf)));
536 pthread_mutex_unlock(&procevent_list_lock);
541 if (pl->pid == pid) {
542 // this is a match, and we've already stored the exact pid/name combo
545 } else if (pl->pid == -1) {
546 // this is a match, and we've found a candidate processlist_t to store
547 // this new pid/name combo
551 } else if (pl->pid != -1) {
552 // this is a match, but another instance of this process has already
553 // claimed this pid/name combo,
561 if (match != NULL && match->pid != -1 && match->pid != pid) {
562 // if there was a match but the associated processlist_t object already
563 // contained a pid/name combo,
564 // then make a new one and add it to the linked list
567 "procevent plugin: allocating new processlist_t object for PID %d (%s)",
568 pid, match->process);
574 pl2 = malloc(sizeof(*pl2));
577 ERROR("procevent plugin: malloc failed during process_check: %s",
578 sstrerror(errno, errbuf, sizeof(errbuf)));
579 pthread_mutex_unlock(&procevent_list_lock);
583 process = strdup(match->process);
584 if (process == NULL) {
587 ERROR("procevent plugin: strdup failed during process_check: %s",
588 sstrerror(errno, errbuf, sizeof(errbuf)));
589 pthread_mutex_unlock(&procevent_list_lock);
593 if (match->is_regex == 1) {
596 regcomp(&pl2->process_regex_obj, match->process_regex, REG_EXTENDED);
599 ERROR("procevent plugin: invalid regular expression: %s",
600 match->process_regex);
604 process_regex = strdup(match->process_regex);
605 if (process_regex == NULL) {
608 ERROR("procevent plugin: strdup failed during process_check: %s",
609 sstrerror(errno, errbuf, sizeof(errbuf)));
613 pl2->process_regex = process_regex;
616 pl2->process = process;
618 pl2->next = processlist_head;
619 processlist_head = pl2;
624 pthread_mutex_unlock(&procevent_list_lock);
634 // Does our map have this PID or name?
635 static processlist_t *process_map_check(int pid, char *process) {
638 pthread_mutex_lock(&procevent_list_lock);
640 for (pl = processlist_head; pl != NULL; pl = pl->next) {
642 int match_process = 0;
650 if (process != NULL) {
651 if (strcmp(pl->process, process) == 0)
655 if (pid > 0 && process == NULL && match_pid == 1)
657 else if (pid < 0 && process != NULL && match_process == 1)
659 else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
663 pthread_mutex_unlock(&procevent_list_lock);
668 pthread_mutex_unlock(&procevent_list_lock);
673 static int process_map_refresh(void) {
677 proc = opendir(PROCDIR);
680 ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
681 sstrerror(errno, errbuf, sizeof(errbuf)));
695 dent = readdir(proc);
699 if (errno == 0) /* end of directory */
702 ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
703 sstrerror(errno, errbuf, sizeof(errbuf)));
708 if (dent->d_name[0] == '.')
711 len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
712 if ((len < 0) || (len >= BUFSIZE))
715 status = stat(file, &statbuf);
718 WARNING("procevent plugin: stat (%s) failed: %s", file,
719 sstrerror(errno, errbuf, sizeof(errbuf)));
723 if (!S_ISDIR(statbuf.st_mode))
726 len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
727 if ((len < 0) || (len >= BUFSIZE))
732 for (int i = 0; i < strlen(dent->d_name); i++) {
733 if (!isdigit(dent->d_name[i])) {
742 // Check if we need to store this pid/name combo in our processlist_t linked
744 int this_pid = atoi(dent->d_name);
745 processlist_t *pl = process_check(this_pid);
748 DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
749 this_pid, pl->process);
757 static int nl_connect() {
759 struct sockaddr_nl sa_nl;
761 nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
763 ERROR("procevent plugin: socket open failed.");
767 sa_nl.nl_family = AF_NETLINK;
768 sa_nl.nl_groups = CN_IDX_PROC;
769 sa_nl.nl_pid = getpid();
771 rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
773 ERROR("procevent plugin: socket bind failed.");
781 static int set_proc_ev_listen(bool enable) {
783 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
784 struct nlmsghdr nl_hdr;
785 struct __attribute__((__packed__)) {
786 struct cn_msg cn_msg;
787 enum proc_cn_mcast_op cn_mcast;
791 memset(&nlcn_msg, 0, sizeof(nlcn_msg));
792 nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
793 nlcn_msg.nl_hdr.nlmsg_pid = getpid();
794 nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
796 nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
797 nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
798 nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
800 nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
802 rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
804 ERROR("procevent plugin: subscribing to netlink process events failed.");
811 static int read_event() {
815 int proc_status = -1;
817 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
818 struct nlmsghdr nl_hdr;
819 struct __attribute__((__packed__)) {
820 struct cn_msg cn_msg;
821 struct proc_event proc_ev;
828 status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
832 } else if (status == -1) {
833 if (errno != EINTR) {
834 ERROR("procevent plugin: socket receive error: %d", errno);
839 switch (nlcn_msg.proc_ev.what) {
840 case PROC_EVENT_NONE:
841 case PROC_EVENT_FORK:
844 // Not of interest in current version
846 case PROC_EVENT_EXEC:
847 proc_status = PROCEVENT_STARTED;
848 proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
850 case PROC_EVENT_EXIT:
851 proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
852 proc_status = PROCEVENT_EXITED;
853 proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
859 // If we're interested in this process status event, place the event
860 // in the ring buffer for consumption by the main polling thread.
862 if (proc_status != -1) {
863 pthread_mutex_unlock(&procevent_lock);
865 int next = ring.head + 1;
866 if (next >= ring.maxLen)
869 if (next == ring.tail) {
870 WARNING("procevent plugin: ring buffer full");
872 DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
873 (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
874 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
876 if (proc_status == PROCEVENT_EXITED) {
877 ring.buffer[ring.head][0] = proc_id;
878 ring.buffer[ring.head][1] = proc_status;
879 ring.buffer[ring.head][2] = proc_extra;
880 ring.buffer[ring.head][3] =
881 (long long unsigned int)CDTIME_T_TO_US(cdtime());
883 ring.buffer[ring.head][0] = proc_id;
884 ring.buffer[ring.head][1] = proc_status;
885 ring.buffer[ring.head][2] = 0;
886 ring.buffer[ring.head][3] =
887 (long long unsigned int)CDTIME_T_TO_US(cdtime());
893 pthread_mutex_unlock(&procevent_lock);
899 static void *procevent_thread(void *arg) /* {{{ */
901 pthread_mutex_lock(&procevent_lock);
903 while (procevent_thread_loop > 0) {
906 pthread_mutex_unlock(&procevent_lock);
910 status = read_event();
912 pthread_mutex_lock(&procevent_lock);
915 procevent_thread_error = 1;
919 if (procevent_thread_loop <= 0)
921 } /* while (procevent_thread_loop > 0) */
923 pthread_mutex_unlock(&procevent_lock);
926 } /* }}} void *procevent_thread */
928 static int start_thread(void) /* {{{ */
932 pthread_mutex_lock(&procevent_lock);
934 if (procevent_thread_loop != 0) {
935 pthread_mutex_unlock(&procevent_lock);
940 status = nl_connect();
945 status = set_proc_ev_listen(true);
950 DEBUG("procevent plugin: socket created and bound");
952 procevent_thread_loop = 1;
953 procevent_thread_error = 0;
955 status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
957 /* arg = */ (void *)0, "procevent");
959 procevent_thread_loop = 0;
960 ERROR("procevent plugin: Starting thread failed.");
961 pthread_mutex_unlock(&procevent_lock);
965 pthread_mutex_unlock(&procevent_lock);
967 } /* }}} int start_thread */
969 static int stop_thread(int shutdown) /* {{{ */
974 status = close(nl_sock);
976 ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
977 status, strerror(errno));
983 pthread_mutex_lock(&procevent_lock);
985 if (procevent_thread_loop == 0) {
986 pthread_mutex_unlock(&procevent_lock);
990 procevent_thread_loop = 0;
991 pthread_cond_broadcast(&procevent_cond);
992 pthread_mutex_unlock(&procevent_lock);
995 // Calling pthread_cancel here in
996 // the case of a shutdown just assures that the thread is
997 // gone and that the process has been fully terminated.
999 DEBUG("procevent plugin: Canceling thread for process shutdown");
1001 status = pthread_cancel(procevent_thread_id);
1004 ERROR("procevent plugin: Unable to cancel thread: %d", status);
1008 status = pthread_join(procevent_thread_id, /* return = */ NULL);
1010 ERROR("procevent plugin: Stopping thread failed.");
1015 pthread_mutex_lock(&procevent_lock);
1016 memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
1017 procevent_thread_error = 0;
1018 pthread_mutex_unlock(&procevent_lock);
1020 DEBUG("procevent plugin: Finished requesting stop of thread");
1023 } /* }}} int stop_thread */
1025 static int procevent_init(void) /* {{{ */
1029 if (processlist_head == NULL) {
1030 NOTICE("procevent plugin: No processes have been configured.");
1036 ring.maxLen = buffer_length;
1037 ring.buffer = (long long unsigned int **)malloc(
1038 buffer_length * sizeof(long long unsigned int *));
1040 for (int i = 0; i < buffer_length; i++) {
1041 ring.buffer[i] = (long long unsigned int *)malloc(
1042 PROCEVENT_FIELDS * sizeof(long long unsigned int));
1045 status = process_map_refresh();
1048 ERROR("procevent plugin: Initial process mapping failed.");
1052 return (start_thread());
1053 } /* }}} int procevent_init */
1055 static int procevent_config(const char *key, const char *value) /* {{{ */
1059 if (strcasecmp(key, "BufferLength") == 0) {
1060 buffer_length = atoi(value);
1061 } else if (strcasecmp(key, "Process") == 0 ||
1062 strcasecmp(key, "RegexProcess") == 0) {
1066 char *process_regex;
1068 pl = malloc(sizeof(*pl));
1071 ERROR("procevent plugin: malloc failed during procevent_config: %s",
1072 sstrerror(errno, errbuf, sizeof(errbuf)));
1076 process = strdup(value);
1077 if (process == NULL) {
1080 ERROR("procevent plugin: strdup failed during procevent_config: %s",
1081 sstrerror(errno, errbuf, sizeof(errbuf)));
1085 if (strcasecmp(key, "RegexProcess") == 0) {
1087 status = regcomp(&pl->process_regex_obj, value, REG_EXTENDED);
1090 ERROR("procevent plugin: invalid regular expression: %s", value);
1094 process_regex = strdup(value);
1095 if (process_regex == NULL) {
1098 ERROR("procevent plugin: strdup failed during procevent_config: %s",
1099 sstrerror(errno, errbuf, sizeof(errbuf)));
1103 pl->process_regex = process_regex;
1108 pl->process = process;
1110 pl->next = processlist_head;
1111 processlist_head = pl;
1117 } /* }}} int procevent_config */
1119 static void procevent_dispatch_notification(int pid, const char *type, /* {{{ */
1120 gauge_t value, char *process,
1121 long long unsigned int timestamp) {
1123 notification_t n = {NOTIF_FAILURE, cdtime(), "", "", "procevent", "", "", "",
1127 n.severity = NOTIF_OKAY;
1129 char hostname[1024];
1130 gethostname(hostname, sizeof(hostname));
1132 sstrncpy(n.host, hostname, sizeof(n.host));
1133 sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
1134 sstrncpy(n.type, "gauge", sizeof(n.type));
1135 sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance));
1137 gen_message_payload(value, pid, process, timestamp, &buf);
1139 notification_meta_t *m = calloc(1, sizeof(*m));
1144 ERROR("procevent plugin: unable to allocate metadata: %s",
1145 sstrerror(errno, errbuf, sizeof(errbuf)));
1149 sstrncpy(m->name, "ves", sizeof(m->name));
1150 m->nm_value.nm_string = sstrdup(buf);
1151 m->type = NM_TYPE_STRING;
1154 DEBUG("procevent plugin: notification message: %s",
1155 n.meta->nm_value.nm_string);
1157 DEBUG("procevent plugin: dispatching state %d for PID %d (%s)", (int)value,
1160 plugin_dispatch_notification(&n);
1161 plugin_notification_meta_free(n.meta);
1163 // malloc'd in gen_message_payload
1168 static int procevent_read(void) /* {{{ */
1170 if (procevent_thread_error != 0) {
1172 "procevent plugin: The interface thread had a problem. Restarting it.");
1179 } /* if (procevent_thread_error != 0) */
1181 pthread_mutex_lock(&procevent_lock);
1183 while (ring.head != ring.tail) {
1184 int next = ring.tail + 1;
1186 if (next >= ring.maxLen)
1189 if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
1190 processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
1193 // This process is of interest to us, so publish its EXITED status
1194 procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1195 ring.buffer[ring.tail][1], pl->process,
1196 ring.buffer[ring.tail][3]);
1197 DEBUG("procevent plugin: PID %d (%s) EXITED, removing PID from process "
1199 pl->pid, pl->process);
1202 } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
1203 // a new process has started, so check if we should monitor it
1204 processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
1207 // This process is of interest to us, so publish its STARTED status
1208 procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1209 ring.buffer[ring.tail][1], pl->process,
1210 ring.buffer[ring.tail][3]);
1212 "procevent plugin: PID %d (%s) STARTED, adding PID to process list",
1213 pl->pid, pl->process);
1220 pthread_mutex_unlock(&procevent_lock);
1223 } /* }}} int procevent_read */
1225 static int procevent_shutdown(void) /* {{{ */
1229 DEBUG("procevent plugin: Shutting down thread.");
1231 if (stop_thread(1) < 0)
1234 for (int i = 0; i < buffer_length; i++) {
1235 free(ring.buffer[i]);
1240 pl = processlist_head;
1241 while (pl != NULL) {
1242 processlist_t *pl_next;
1246 if (pl->is_regex == 1) {
1247 sfree(pl->process_regex);
1248 regfree(&pl->process_regex_obj);
1258 } /* }}} int procevent_shutdown */
1260 void module_register(void) {
1261 plugin_register_config("procevent", procevent_config, config_keys,
1263 plugin_register_init("procevent", procevent_init);
1264 plugin_register_read("procevent", procevent_read);
1265 plugin_register_shutdown("procevent", procevent_shutdown);
1266 } /* void module_register */