2 * collectd - src/procevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
31 #include "utils_complain.h"
38 #include <sys/socket.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
51 #define PROCEVENT_EXITED 0
52 #define PROCEVENT_STARTED 1
53 #define PROCEVENT_FIELDS 3 // pid, status, extra
55 #define PROCDIR "/proc"
56 #define PROCEVENT_REGEX_MATCHES 1
69 struct processlist_s {
73 regex_t process_regex_obj;
78 struct processlist_s *next;
80 typedef struct processlist_s processlist_t;
86 static int procevent_thread_loop = 0;
87 static int procevent_thread_error = 0;
88 static pthread_t procevent_thread_id;
89 static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
91 static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
92 static int nl_sock = -1;
93 static int buffer_length;
94 static circbuf_t ring;
95 static processlist_t *processlist_head = NULL;
97 static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"};
98 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
104 // Does /proc/<pid>/comm contain a process name we are interested in?
105 static processlist_t *process_check(int pid) {
106 int len, is_match, status, retval;
109 char buffer[BUFSIZE];
110 regmatch_t matches[PROCEVENT_REGEX_MATCHES];
112 len = snprintf(file, sizeof(file), PROCDIR "/%d/comm", pid);
114 if ((len < 0) || (len >= BUFSIZE)) {
115 WARNING("procevent process_check: process name too large");
119 if (NULL == (fh = fopen(file, "r"))) {
120 // No /proc/<pid>/comm for this pid, just ignore
121 DEBUG("procevent plugin: no comm file available for pid %d", pid);
125 retval = fscanf(fh, "%[^\n]", buffer);
128 WARNING("procevent process_check: unable to read comm file for pid %d",
134 // Go through the processlist linked list and look for the process name
135 // in /proc/<pid>/comm. If found:
136 // 1. If pl->pid is -1, then set pl->pid to <pid>
137 // 2. If pl->pid is not -1, then another <process name> process was already
138 // found. If <pid> == pl->pid, this is an old match, so do nothing.
139 // If the <pid> is different, however, make a new processlist_t and
140 // associate <pid> with it (with the same process name as the existing).
143 pthread_mutex_lock(&procevent_list_lock);
146 processlist_t *match = NULL;
148 for (pl = processlist_head; pl != NULL; pl = pl->next) {
149 if (pl->is_regex != 0) {
150 is_match = (regexec(&pl->process_regex_obj, buffer,
151 PROCEVENT_REGEX_MATCHES, matches, 0) == 0
155 is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
159 DEBUG("procevent plugin: process %d name match (pattern: %s) for %s", pid,
160 (pl->is_regex == 0 ? pl->process : pl->process_regex), buffer);
162 if (pl->is_regex == 1) {
163 // If this is a regex name, copy the actual process name into the object
164 // for cleaner log reporting
166 if (pl->process != NULL)
168 pl->process = strdup(buffer);
169 if (pl->process == NULL) {
171 ERROR("procevent plugin: strdup failed during process_check: %s",
172 sstrerror(errno, errbuf, sizeof(errbuf)));
173 pthread_mutex_unlock(&procevent_list_lock);
178 if (pl->pid == pid) {
179 // this is a match, and we've already stored the exact pid/name combo
182 } else if (pl->pid == -1) {
183 // this is a match, and we've found a candidate processlist_t to store
184 // this new pid/name combo
188 } else if (pl->pid != -1) {
189 // this is a match, but another instance of this process has already
190 // claimed this pid/name combo,
198 if (match != NULL && match->pid != -1 && match->pid != pid) {
199 // if there was a match but the associated processlist_t object already
200 // contained a pid/name combo,
201 // then make a new one and add it to the linked list
204 "procevent plugin: allocating new processlist_t object for PID %d (%s)",
205 pid, match->process);
211 pl2 = malloc(sizeof(*pl2));
214 ERROR("procevent plugin: malloc failed during process_check: %s",
215 sstrerror(errno, errbuf, sizeof(errbuf)));
216 pthread_mutex_unlock(&procevent_list_lock);
220 process = strdup(match->process);
221 if (process == NULL) {
224 ERROR("procevent plugin: strdup failed during process_check: %s",
225 sstrerror(errno, errbuf, sizeof(errbuf)));
226 pthread_mutex_unlock(&procevent_list_lock);
230 if (match->is_regex == 1) {
233 regcomp(&pl2->process_regex_obj, match->process_regex, REG_EXTENDED);
236 ERROR("procevent plugin: invalid regular expression: %s",
237 match->process_regex);
241 process_regex = strdup(match->process_regex);
242 if (process_regex == NULL) {
245 ERROR("procevent plugin: strdup failed during process_check: %s",
246 sstrerror(errno, errbuf, sizeof(errbuf)));
250 pl2->process_regex = process_regex;
253 pl2->process = process;
255 pl2->next = processlist_head;
256 processlist_head = pl2;
261 pthread_mutex_unlock(&procevent_list_lock);
271 // Does our map have this PID or name?
272 static processlist_t *process_map_check(int pid, char *process) {
275 pthread_mutex_lock(&procevent_list_lock);
277 for (pl = processlist_head; pl != NULL; pl = pl->next) {
279 int match_process = 0;
287 if (process != NULL) {
288 if (strcmp(pl->process, process) == 0)
292 if (pid > 0 && process == NULL && match_pid == 1)
294 else if (pid < 0 && process != NULL && match_process == 1)
296 else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
300 pthread_mutex_unlock(&procevent_list_lock);
305 pthread_mutex_unlock(&procevent_list_lock);
310 static int process_map_refresh(void) {
314 proc = opendir(PROCDIR);
317 ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
318 sstrerror(errno, errbuf, sizeof(errbuf)));
332 dent = readdir(proc);
336 if (errno == 0) /* end of directory */
339 ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
340 sstrerror(errno, errbuf, sizeof(errbuf)));
345 if (dent->d_name[0] == '.')
348 len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
349 if ((len < 0) || (len >= BUFSIZE))
352 status = stat(file, &statbuf);
355 WARNING("procevent plugin: stat (%s) failed: %s", file,
356 sstrerror(errno, errbuf, sizeof(errbuf)));
360 if (!S_ISDIR(statbuf.st_mode))
363 len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
364 if ((len < 0) || (len >= BUFSIZE))
369 for (int i = 0; i < strlen(dent->d_name); i++) {
370 if (!isdigit(dent->d_name[i])) {
379 // Check if we need to store this pid/name combo in our processlist_t linked
381 int this_pid = atoi(dent->d_name);
382 processlist_t *pl = process_check(this_pid);
385 DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
386 this_pid, pl->process);
394 static int nl_connect() {
396 struct sockaddr_nl sa_nl;
398 nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
400 ERROR("procevent plugin: socket open failed.");
404 sa_nl.nl_family = AF_NETLINK;
405 sa_nl.nl_groups = CN_IDX_PROC;
406 sa_nl.nl_pid = getpid();
408 rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
410 ERROR("procevent plugin: socket bind failed.");
418 static int set_proc_ev_listen(bool enable) {
420 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
421 struct nlmsghdr nl_hdr;
422 struct __attribute__((__packed__)) {
423 struct cn_msg cn_msg;
424 enum proc_cn_mcast_op cn_mcast;
428 memset(&nlcn_msg, 0, sizeof(nlcn_msg));
429 nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
430 nlcn_msg.nl_hdr.nlmsg_pid = getpid();
431 nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
433 nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
434 nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
435 nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
437 nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
439 rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
441 ERROR("procevent plugin: subscribing to netlink process events failed.");
448 static int read_event() {
452 int proc_status = -1;
454 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
455 struct nlmsghdr nl_hdr;
456 struct __attribute__((__packed__)) {
457 struct cn_msg cn_msg;
458 struct proc_event proc_ev;
465 status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
469 } else if (status == -1) {
470 if (errno != EINTR) {
471 ERROR("procevent plugin: socket receive error: %d", errno);
476 switch (nlcn_msg.proc_ev.what) {
477 case PROC_EVENT_NONE:
478 // printf("set mcast listen ok\n");
480 case PROC_EVENT_FORK:
481 // printf("fork: parent tid=%d pid=%d -> child tid=%d pid=%d\n",
482 // nlcn_msg.proc_ev.event_data.fork.parent_pid,
483 // nlcn_msg.proc_ev.event_data.fork.parent_tgid,
484 // nlcn_msg.proc_ev.event_data.fork.child_pid,
485 // nlcn_msg.proc_ev.event_data.fork.child_tgid);
486 // proc_status = PROCEVENT_STARTED;
487 // proc_id = nlcn_msg.proc_ev.event_data.fork.child_pid;
489 case PROC_EVENT_EXEC:
490 // printf("exec: tid=%d pid=%d\n",
491 // nlcn_msg.proc_ev.event_data.exec.process_pid,
492 // nlcn_msg.proc_ev.event_data.exec.process_tgid);
493 proc_status = PROCEVENT_STARTED;
494 proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
497 // printf("uid change: tid=%d pid=%d from %d to %d\n",
498 // nlcn_msg.proc_ev.event_data.id.process_pid,
499 // nlcn_msg.proc_ev.event_data.id.process_tgid,
500 // nlcn_msg.proc_ev.event_data.id.r.ruid,
501 // nlcn_msg.proc_ev.event_data.id.e.euid);
504 // printf("gid change: tid=%d pid=%d from %d to %d\n",
505 // nlcn_msg.proc_ev.event_data.id.process_pid,
506 // nlcn_msg.proc_ev.event_data.id.process_tgid,
507 // nlcn_msg.proc_ev.event_data.id.r.rgid,
508 // nlcn_msg.proc_ev.event_data.id.e.egid);
510 case PROC_EVENT_EXIT:
511 proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
512 proc_status = PROCEVENT_EXITED;
513 proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
519 // If we're interested in this process status event, place the event
520 // in the ring buffer for consumption by the main polling thread.
522 if (proc_status != -1) {
523 pthread_mutex_unlock(&procevent_lock);
525 int next = ring.head + 1;
526 if (next >= ring.maxLen)
529 if (next == ring.tail) {
530 WARNING("procevent plugin: ring buffer full");
532 DEBUG("procevent plugin: Process %d status is now %s", proc_id,
533 (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"));
535 if (proc_status == PROCEVENT_EXITED) {
536 ring.buffer[ring.head][0] = proc_id;
537 ring.buffer[ring.head][1] = proc_status;
538 ring.buffer[ring.head][2] = proc_extra;
540 ring.buffer[ring.head][0] = proc_id;
541 ring.buffer[ring.head][1] = proc_status;
542 ring.buffer[ring.head][2] = 0;
548 pthread_mutex_unlock(&procevent_lock);
554 static void *procevent_thread(void *arg) /* {{{ */
556 pthread_mutex_lock(&procevent_lock);
558 while (procevent_thread_loop > 0) {
561 pthread_mutex_unlock(&procevent_lock);
565 status = read_event();
567 pthread_mutex_lock(&procevent_lock);
570 procevent_thread_error = 1;
574 if (procevent_thread_loop <= 0)
576 } /* while (procevent_thread_loop > 0) */
578 pthread_mutex_unlock(&procevent_lock);
581 } /* }}} void *procevent_thread */
583 static int start_thread(void) /* {{{ */
587 pthread_mutex_lock(&procevent_lock);
589 if (procevent_thread_loop != 0) {
590 pthread_mutex_unlock(&procevent_lock);
595 status = nl_connect();
600 status = set_proc_ev_listen(true);
605 DEBUG("procevent plugin: socket created and bound");
607 procevent_thread_loop = 1;
608 procevent_thread_error = 0;
610 status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
612 /* arg = */ (void *)0, "procevent");
614 procevent_thread_loop = 0;
615 ERROR("procevent plugin: Starting thread failed.");
616 pthread_mutex_unlock(&procevent_lock);
620 pthread_mutex_unlock(&procevent_lock);
622 } /* }}} int start_thread */
624 static int stop_thread(int shutdown) /* {{{ */
629 status = close(nl_sock);
631 ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
632 status, strerror(errno));
638 pthread_mutex_lock(&procevent_lock);
640 if (procevent_thread_loop == 0) {
641 pthread_mutex_unlock(&procevent_lock);
645 procevent_thread_loop = 0;
646 pthread_cond_broadcast(&procevent_cond);
647 pthread_mutex_unlock(&procevent_lock);
650 // Calling pthread_cancel here in
651 // the case of a shutdown just assures that the thread is
652 // gone and that the process has been fully terminated.
654 DEBUG("procevent plugin: Canceling thread for process shutdown");
656 status = pthread_cancel(procevent_thread_id);
659 ERROR("procevent plugin: Unable to cancel thread: %d", status);
663 status = pthread_join(procevent_thread_id, /* return = */ NULL);
665 ERROR("procevent plugin: Stopping thread failed.");
670 pthread_mutex_lock(&procevent_lock);
671 memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
672 procevent_thread_error = 0;
673 pthread_mutex_unlock(&procevent_lock);
675 DEBUG("procevent plugin: Finished requesting stop of thread");
678 } /* }}} int stop_thread */
680 static int procevent_init(void) /* {{{ */
684 if (processlist_head == NULL) {
685 NOTICE("procevent plugin: No processes have been configured.");
691 ring.maxLen = buffer_length;
692 ring.buffer = (int **)malloc(buffer_length * sizeof(int *));
694 for (int i = 0; i < buffer_length; i++) {
695 ring.buffer[i] = (int *)malloc(PROCEVENT_FIELDS * sizeof(int));
698 status = process_map_refresh();
701 ERROR("procevent plugin: Initial process mapping failed.");
705 return (start_thread());
706 } /* }}} int procevent_init */
708 static int procevent_config(const char *key, const char *value) /* {{{ */
712 if (strcasecmp(key, "BufferLength") == 0) {
713 buffer_length = atoi(value);
714 } else if (strcasecmp(key, "Process") == 0 ||
715 strcasecmp(key, "RegexProcess") == 0) {
721 pl = malloc(sizeof(*pl));
724 ERROR("procevent plugin: malloc failed during procevent_config: %s",
725 sstrerror(errno, errbuf, sizeof(errbuf)));
729 process = strdup(value);
730 if (process == NULL) {
733 ERROR("procevent plugin: strdup failed during procevent_config: %s",
734 sstrerror(errno, errbuf, sizeof(errbuf)));
738 if (strcasecmp(key, "RegexProcess") == 0) {
740 status = regcomp(&pl->process_regex_obj, value, REG_EXTENDED);
743 ERROR("procevent plugin: invalid regular expression: %s", value);
747 process_regex = strdup(value);
748 if (process_regex == NULL) {
751 ERROR("procevent plugin: strdup failed during procevent_config: %s",
752 sstrerror(errno, errbuf, sizeof(errbuf)));
756 pl->process_regex = process_regex;
761 pl->process = process;
763 pl->next = processlist_head;
764 processlist_head = pl;
770 } /* }}} int procevent_config */
772 static void submit(int pid, const char *type, /* {{{ */
773 gauge_t value, const char *process) {
774 value_list_t vl = VALUE_LIST_INIT;
777 vl.values = &(value_t){.gauge = value};
779 sstrncpy(vl.plugin, "procevent", sizeof(vl.plugin));
780 sstrncpy(vl.plugin_instance, process, sizeof(vl.plugin_instance));
781 sstrncpy(vl.type, type, sizeof(vl.type));
783 DEBUG("procevent plugin: dispatching state %d for PID %d (%s)", (int)value,
786 // Create metadata to store JSON key-values
787 meta_data_t *meta = meta_data_create();
791 gethostname(hostname, sizeof(hostname));
794 meta_data_add_string(meta, "condition", "process_up");
795 meta_data_add_string(meta, "entity", process);
796 meta_data_add_string(meta, "source", hostname);
797 meta_data_add_string(meta, "dest", "process_down");
799 meta_data_add_string(meta, "condition", "process_down");
800 meta_data_add_string(meta, "entity", process);
801 meta_data_add_string(meta, "source", hostname);
802 meta_data_add_string(meta, "dest", "process_up");
805 plugin_dispatch_values(&vl);
806 } /* }}} void interface_submit */
808 static int procevent_read(void) /* {{{ */
810 if (procevent_thread_error != 0) {
812 "procevent plugin: The interface thread had a problem. Restarting it.");
819 } /* if (procevent_thread_error != 0) */
821 pthread_mutex_lock(&procevent_lock);
823 while (ring.head != ring.tail) {
824 int next = ring.tail + 1;
826 if (next >= ring.maxLen)
829 if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
830 processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
833 // This process is of interest to us, so publish its EXITED status
834 submit(ring.buffer[ring.tail][0], "gauge", ring.buffer[ring.tail][1],
836 DEBUG("procevent plugin: PID %d (%s) EXITED, removing PID from process "
838 pl->pid, pl->process);
841 } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
842 // a new process has started, so check if we should monitor it
843 processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
846 // This process is of interest to us, so publish its STARTED status
847 submit(ring.buffer[ring.tail][0], "gauge", ring.buffer[ring.tail][1],
850 "procevent plugin: PID %d (%s) STARTED, adding PID to process list",
851 pl->pid, pl->process);
858 pthread_mutex_unlock(&procevent_lock);
861 } /* }}} int procevent_read */
863 static int procevent_shutdown(void) /* {{{ */
868 DEBUG("procevent plugin: Shutting down thread.");
870 if (stop_thread(1) < 0)
873 for (int i = 0; i < buffer_length; i++) {
874 free(ring.buffer[i]);
879 pl = processlist_head;
881 processlist_t *pl_next;
885 if (pl->is_regex == 1) {
886 sfree(pl->process_regex);
887 regfree(&pl->process_regex_obj);
897 } /* }}} int procevent_shutdown */
899 void module_register(void) {
900 plugin_register_config("procevent", procevent_config, config_keys,
902 plugin_register_init("procevent", procevent_init);
903 plugin_register_read("procevent", procevent_read);
904 plugin_register_shutdown("procevent", procevent_shutdown);
905 } /* void module_register */