2 * collectd - src/procevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
31 #include "utils_complain.h"
38 #include <sys/socket.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
51 #define PROCEVENT_EXITED 0
52 #define PROCEVENT_STARTED 1
53 #define PROCEVENT_FIELDS 3 // pid, status, extra
55 #define PROCDIR "/proc"
56 #define PROCEVENT_REGEX_MATCHES 1
69 struct processlist_s {
73 regex_t process_regex_obj;
78 struct processlist_s *next;
80 typedef struct processlist_s processlist_t;
86 static int procevent_thread_loop = 0;
87 static int procevent_thread_error = 0;
88 static pthread_t procevent_thread_id;
89 static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
91 static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
92 static int nl_sock = -1;
93 static int buffer_length;
94 static circbuf_t ring;
95 static processlist_t *processlist_head = NULL;
97 static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"};
98 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
104 // Does /proc/<pid>/comm contain a process name we are interested in?
105 static processlist_t *process_check(int pid) {
106 int len, is_match, status;
109 char buffer[BUFSIZE];
110 regmatch_t matches[PROCEVENT_REGEX_MATCHES];
112 len = snprintf(file, sizeof(file), PROCDIR "/%d/comm", pid);
114 if ((len < 0) || (len >= BUFSIZE)) {
115 WARNING("procevent process_check: process name too large");
119 if (NULL == (fh = fopen(file, "r"))) {
120 // No /proc/<pid>/comm for this pid, just ignore
121 DEBUG("procevent plugin: no comm file available for pid %d", pid);
125 fscanf(fh, "%[^\n]", buffer);
128 // Go through the processlist linked list and look for the process name
129 // in /proc/<pid>/comm. If found:
130 // 1. If pl->pid is -1, then set pl->pid to <pid>
131 // 2. If pl->pid is not -1, then another <process name> process was already
132 // found. If <pid> == pl->pid, this is an old match, so do nothing.
133 // If the <pid> is different, however, make a new processlist_t and
134 // associate <pid> with it (with the same process name as the existing).
137 pthread_mutex_lock(&procevent_list_lock);
140 processlist_t *match = NULL;
142 for (pl = processlist_head; pl != NULL; pl = pl->next) {
143 if (pl->is_regex != 0) {
144 is_match = (regexec(&pl->process_regex_obj, buffer,
145 PROCEVENT_REGEX_MATCHES, matches, 0) == 0
149 is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
153 DEBUG("procevent plugin: process %d name match (pattern: %s) for %s", pid,
154 (pl->is_regex == 0 ? pl->process : pl->process_regex), buffer);
156 if (pl->is_regex == 1) {
157 // If this is a regex name, copy the actual process name into the object
158 // for cleaner log reporting
160 if (pl->process != NULL)
162 pl->process = strdup(buffer);
163 if (pl->process == NULL) {
165 ERROR("procevent plugin: strdup failed during process_check: %s",
166 sstrerror(errno, errbuf, sizeof(errbuf)));
167 pthread_mutex_unlock(&procevent_list_lock);
172 if (pl->pid == pid) {
173 // this is a match, and we've already stored the exact pid/name combo
176 } else if (pl->pid == -1) {
177 // this is a match, and we've found a candidate processlist_t to store
178 // this new pid/name combo
182 } else if (pl->pid != -1) {
183 // this is a match, but another instance of this process has already
184 // claimed this pid/name combo,
192 if (match != NULL && match->pid != -1 && match->pid != pid) {
193 // if there was a match but the associated processlist_t object already
194 // contained a pid/name combo,
195 // then make a new one and add it to the linked list
198 "procevent plugin: allocating new processlist_t object for PID %d (%s)",
199 pid, match->process);
205 pl2 = malloc(sizeof(*pl2));
208 ERROR("procevent plugin: malloc failed during process_check: %s",
209 sstrerror(errno, errbuf, sizeof(errbuf)));
210 pthread_mutex_unlock(&procevent_list_lock);
214 process = strdup(match->process);
215 if (process == NULL) {
218 ERROR("procevent plugin: strdup failed during process_check: %s",
219 sstrerror(errno, errbuf, sizeof(errbuf)));
220 pthread_mutex_unlock(&procevent_list_lock);
224 if (match->is_regex == 1) {
227 regcomp(&pl2->process_regex_obj, match->process_regex, REG_EXTENDED);
230 ERROR("procevent plugin: invalid regular expression: %s",
231 match->process_regex);
235 process_regex = strdup(match->process_regex);
236 if (process_regex == NULL) {
239 ERROR("procevent plugin: strdup failed during process_check: %s",
240 sstrerror(errno, errbuf, sizeof(errbuf)));
244 pl2->process_regex = process_regex;
247 pl2->process = process;
249 pl2->next = processlist_head;
250 processlist_head = pl2;
255 pthread_mutex_unlock(&procevent_list_lock);
265 // Does our map have this PID or name?
266 static processlist_t *process_map_check(int pid, char *process) {
269 pthread_mutex_lock(&procevent_list_lock);
271 for (pl = processlist_head; pl != NULL; pl = pl->next) {
273 int match_process = 0;
281 if (process != NULL) {
282 if (strcmp(pl->process, process) == 0)
286 if (pid > 0 && process == NULL && match_pid == 1)
288 else if (pid < 0 && process != NULL && match_process == 1)
290 else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
294 pthread_mutex_unlock(&procevent_list_lock);
299 pthread_mutex_unlock(&procevent_list_lock);
304 static int process_map_refresh(void) {
308 proc = opendir(PROCDIR);
311 ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
312 sstrerror(errno, errbuf, sizeof(errbuf)));
326 dent = readdir(proc);
330 if (errno == 0) /* end of directory */
333 ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
334 sstrerror(errno, errbuf, sizeof(errbuf)));
339 if (dent->d_name[0] == '.')
342 len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
343 if ((len < 0) || (len >= BUFSIZE))
346 status = stat(file, &statbuf);
349 WARNING("procevent plugin: stat (%s) failed: %s", file,
350 sstrerror(errno, errbuf, sizeof(errbuf)));
354 if (!S_ISDIR(statbuf.st_mode))
357 len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
358 if ((len < 0) || (len >= BUFSIZE))
363 for (int i = 0; i < strlen(dent->d_name); i++) {
364 if (!isdigit(dent->d_name[i])) {
373 // Check if we need to store this pid/name combo in our processlist_t linked
375 int this_pid = atoi(dent->d_name);
376 processlist_t *pl = process_check(this_pid);
379 DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
380 this_pid, pl->process);
388 static int nl_connect() {
390 struct sockaddr_nl sa_nl;
392 nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
394 ERROR("procevent plugin: socket open failed.");
398 sa_nl.nl_family = AF_NETLINK;
399 sa_nl.nl_groups = CN_IDX_PROC;
400 sa_nl.nl_pid = getpid();
402 rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
404 ERROR("procevent plugin: socket bind failed.");
412 static int set_proc_ev_listen(bool enable) {
414 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
415 struct nlmsghdr nl_hdr;
416 struct __attribute__((__packed__)) {
417 struct cn_msg cn_msg;
418 enum proc_cn_mcast_op cn_mcast;
422 memset(&nlcn_msg, 0, sizeof(nlcn_msg));
423 nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
424 nlcn_msg.nl_hdr.nlmsg_pid = getpid();
425 nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
427 nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
428 nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
429 nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
431 nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
433 rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
435 ERROR("procevent plugin: subscribing to netlink process events failed.");
442 static int read_event() {
446 int proc_status = -1;
448 struct __attribute__((aligned(NLMSG_ALIGNTO))) {
449 struct nlmsghdr nl_hdr;
450 struct __attribute__((__packed__)) {
451 struct cn_msg cn_msg;
452 struct proc_event proc_ev;
459 status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
463 } else if (status == -1) {
464 if (errno != EINTR) {
465 ERROR("procevent plugin: socket receive error: %d", errno);
470 switch (nlcn_msg.proc_ev.what) {
471 case PROC_EVENT_NONE:
472 // printf("set mcast listen ok\n");
474 case PROC_EVENT_FORK:
475 // printf("fork: parent tid=%d pid=%d -> child tid=%d pid=%d\n",
476 // nlcn_msg.proc_ev.event_data.fork.parent_pid,
477 // nlcn_msg.proc_ev.event_data.fork.parent_tgid,
478 // nlcn_msg.proc_ev.event_data.fork.child_pid,
479 // nlcn_msg.proc_ev.event_data.fork.child_tgid);
480 // proc_status = PROCEVENT_STARTED;
481 // proc_id = nlcn_msg.proc_ev.event_data.fork.child_pid;
483 case PROC_EVENT_EXEC:
484 // printf("exec: tid=%d pid=%d\n",
485 // nlcn_msg.proc_ev.event_data.exec.process_pid,
486 // nlcn_msg.proc_ev.event_data.exec.process_tgid);
487 proc_status = PROCEVENT_STARTED;
488 proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
491 // printf("uid change: tid=%d pid=%d from %d to %d\n",
492 // nlcn_msg.proc_ev.event_data.id.process_pid,
493 // nlcn_msg.proc_ev.event_data.id.process_tgid,
494 // nlcn_msg.proc_ev.event_data.id.r.ruid,
495 // nlcn_msg.proc_ev.event_data.id.e.euid);
498 // printf("gid change: tid=%d pid=%d from %d to %d\n",
499 // nlcn_msg.proc_ev.event_data.id.process_pid,
500 // nlcn_msg.proc_ev.event_data.id.process_tgid,
501 // nlcn_msg.proc_ev.event_data.id.r.rgid,
502 // nlcn_msg.proc_ev.event_data.id.e.egid);
504 case PROC_EVENT_EXIT:
505 proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
506 proc_status = PROCEVENT_EXITED;
507 proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
513 // If we're interested in this process status event, place the event
514 // in the ring buffer for consumption by the main polling thread.
516 if (proc_status != -1) {
517 pthread_mutex_unlock(&procevent_lock);
519 int next = ring.head + 1;
520 if (next >= ring.maxLen)
523 if (next == ring.tail) {
524 WARNING("procevent plugin: ring buffer full");
526 DEBUG("procevent plugin: Process %d status is now %s", proc_id,
527 (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"));
529 if (proc_status == PROCEVENT_EXITED) {
530 ring.buffer[ring.head][0] = proc_id;
531 ring.buffer[ring.head][1] = proc_status;
532 ring.buffer[ring.head][2] = proc_extra;
534 ring.buffer[ring.head][0] = proc_id;
535 ring.buffer[ring.head][1] = proc_status;
536 ring.buffer[ring.head][2] = 0;
542 pthread_mutex_unlock(&procevent_lock);
548 static void *procevent_thread(void *arg) /* {{{ */
550 pthread_mutex_lock(&procevent_lock);
552 while (procevent_thread_loop > 0) {
555 pthread_mutex_unlock(&procevent_lock);
559 status = read_event();
561 pthread_mutex_lock(&procevent_lock);
564 procevent_thread_error = 1;
568 if (procevent_thread_loop <= 0)
570 } /* while (procevent_thread_loop > 0) */
572 pthread_mutex_unlock(&procevent_lock);
575 } /* }}} void *procevent_thread */
577 static int start_thread(void) /* {{{ */
581 pthread_mutex_lock(&procevent_lock);
583 if (procevent_thread_loop != 0) {
584 pthread_mutex_unlock(&procevent_lock);
589 status = nl_connect();
594 status = set_proc_ev_listen(true);
599 DEBUG("procevent plugin: socket created and bound");
601 procevent_thread_loop = 1;
602 procevent_thread_error = 0;
604 status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
606 /* arg = */ (void *)0, "procevent");
608 procevent_thread_loop = 0;
609 ERROR("procevent plugin: Starting thread failed.");
610 pthread_mutex_unlock(&procevent_lock);
614 pthread_mutex_unlock(&procevent_lock);
616 } /* }}} int start_thread */
618 static int stop_thread(int shutdown) /* {{{ */
623 status = close(nl_sock);
625 ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
626 status, strerror(errno));
632 pthread_mutex_lock(&procevent_lock);
634 if (procevent_thread_loop == 0) {
635 pthread_mutex_unlock(&procevent_lock);
639 procevent_thread_loop = 0;
640 pthread_cond_broadcast(&procevent_cond);
641 pthread_mutex_unlock(&procevent_lock);
644 // Calling pthread_cancel here in
645 // the case of a shutdown just assures that the thread is
646 // gone and that the process has been fully terminated.
648 DEBUG("procevent plugin: Canceling thread for process shutdown");
650 status = pthread_cancel(procevent_thread_id);
653 ERROR("procevent plugin: Unable to cancel thread: %d", status);
657 status = pthread_join(procevent_thread_id, /* return = */ NULL);
659 ERROR("procevent plugin: Stopping thread failed.");
664 pthread_mutex_lock(&procevent_lock);
665 memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
666 procevent_thread_error = 0;
667 pthread_mutex_unlock(&procevent_lock);
669 DEBUG("procevent plugin: Finished requesting stop of thread");
672 } /* }}} int stop_thread */
674 static int procevent_init(void) /* {{{ */
678 if (processlist_head == NULL) {
679 NOTICE("procevent plugin: No processes have been configured.");
685 ring.maxLen = buffer_length;
686 ring.buffer = (int **)malloc(buffer_length * sizeof(int *));
688 for (int i = 0; i < buffer_length; i++) {
689 ring.buffer[i] = (int *)malloc(PROCEVENT_FIELDS * sizeof(int));
692 status = process_map_refresh();
695 ERROR("procevent plugin: Initial process mapping failed.");
699 return (start_thread());
700 } /* }}} int procevent_init */
702 static int procevent_config(const char *key, const char *value) /* {{{ */
706 if (strcasecmp(key, "BufferLength") == 0) {
707 buffer_length = atoi(value);
708 } else if (strcasecmp(key, "Process") == 0 ||
709 strcasecmp(key, "RegexProcess") == 0) {
715 pl = malloc(sizeof(*pl));
718 ERROR("procevent plugin: malloc failed during procevent_config: %s",
719 sstrerror(errno, errbuf, sizeof(errbuf)));
723 process = strdup(value);
724 if (process == NULL) {
727 ERROR("procevent plugin: strdup failed during procevent_config: %s",
728 sstrerror(errno, errbuf, sizeof(errbuf)));
732 if (strcasecmp(key, "RegexProcess") == 0) {
734 status = regcomp(&pl->process_regex_obj, value, REG_EXTENDED);
737 ERROR("procevent plugin: invalid regular expression: %s", value);
741 process_regex = strdup(value);
742 if (process_regex == NULL) {
745 ERROR("procevent plugin: strdup failed during procevent_config: %s",
746 sstrerror(errno, errbuf, sizeof(errbuf)));
750 pl->process_regex = process_regex;
755 pl->process = process;
757 pl->next = processlist_head;
758 processlist_head = pl;
764 } /* }}} int procevent_config */
766 static void submit(int pid, const char *type, /* {{{ */
767 gauge_t value, const char *process) {
768 value_list_t vl = VALUE_LIST_INIT;
771 vl.values = &(value_t){.gauge = value};
773 sstrncpy(vl.plugin, "procevent", sizeof(vl.plugin));
774 sstrncpy(vl.plugin_instance, process, sizeof(vl.plugin_instance));
775 sstrncpy(vl.type, type, sizeof(vl.type));
777 DEBUG("procevent plugin: dispatching state %d for PID %d (%s)", (int)value,
780 // Create metadata to store JSON key-values
781 meta_data_t *meta = meta_data_create();
785 gethostname(hostname, sizeof(hostname));
788 meta_data_add_string(meta, "condition", "process_up");
789 meta_data_add_string(meta, "entity", process);
790 meta_data_add_string(meta, "source", hostname);
791 meta_data_add_string(meta, "dest", "process_down");
793 meta_data_add_string(meta, "condition", "process_down");
794 meta_data_add_string(meta, "entity", process);
795 meta_data_add_string(meta, "source", hostname);
796 meta_data_add_string(meta, "dest", "process_up");
799 plugin_dispatch_values(&vl);
800 } /* }}} void interface_submit */
802 static int procevent_read(void) /* {{{ */
804 if (procevent_thread_error != 0) {
806 "procevent plugin: The interface thread had a problem. Restarting it.");
813 } /* if (procevent_thread_error != 0) */
815 pthread_mutex_lock(&procevent_lock);
817 while (ring.head != ring.tail) {
818 int next = ring.tail + 1;
820 if (next >= ring.maxLen)
823 if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
824 processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
827 // This process is of interest to us, so publish its EXITED status
828 submit(ring.buffer[ring.tail][0], "gauge", ring.buffer[ring.tail][1],
830 DEBUG("procevent plugin: PID %d (%s) EXITED, removing PID from process "
832 pl->pid, pl->process);
835 } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
836 // a new process has started, so check if we should monitor it
837 processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
840 // This process is of interest to us, so publish its STARTED status
841 submit(ring.buffer[ring.tail][0], "gauge", ring.buffer[ring.tail][1],
844 "procevent plugin: PID %d (%s) STARTED, adding PID to process list",
845 pl->pid, pl->process);
852 pthread_mutex_unlock(&procevent_lock);
855 } /* }}} int procevent_read */
857 static int procevent_shutdown(void) /* {{{ */
862 DEBUG("procevent plugin: Shutting down thread.");
864 if (stop_thread(1) < 0)
867 for (int i = 0; i < buffer_length; i++) {
868 free(ring.buffer[i]);
873 pl = processlist_head;
875 processlist_t *pl_next;
879 if (pl->is_regex == 1) {
880 sfree(pl->process_regex);
881 regfree(&pl->process_regex_obj);
891 } /* }}} int procevent_shutdown */
893 void module_register(void) {
894 plugin_register_config("procevent", procevent_config, config_keys,
896 plugin_register_init("procevent", procevent_init);
897 plugin_register_read("procevent", procevent_read);
898 plugin_register_shutdown("procevent", procevent_shutdown);
899 } /* void module_register */