X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fprocevent.c;h=ab000dbd00d331ef87952077a5e308f4b7198983;hp=a04a9c99d5f6f81a92fdeeab3d19aaa3d3b665b4;hb=48efd3deb4c9139fd060ff3d289896e9031bcc7c;hpb=9e9a23452121ac992424c65b0c743d2f7f148f31 diff --git a/src/procevent.c b/src/procevent.c index a04a9c99..ab000dbd 100644 --- a/src/procevent.c +++ b/src/procevent.c @@ -26,13 +26,13 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" +#include "utils/common/common.h" +#include "utils/ignorelist/ignorelist.h" #include "utils_complain.h" #include #include -#include #include #include #include @@ -59,10 +59,12 @@ #define PROCEVENT_EXITED 0 #define PROCEVENT_STARTED 1 -#define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp +#define PROCEVENT_FIELDS 3 // pid, status, timestamp #define BUFSIZE 512 #define PROCDIR "/proc" -#define PROCEVENT_REGEX_MATCHES 1 +#define RBUF_PROC_ID_INDEX 0 +#define RBUF_PROC_STATUS_INDEX 1 +#define RBUF_TIME_INDEX 2 #define PROCEVENT_DOMAIN_FIELD "domain" #define PROCEVENT_DOMAIN_VALUE "fault" @@ -107,17 +109,14 @@ typedef struct { int head; int tail; int maxLen; - long long unsigned int **buffer; + cdtime_t **buffer; } circbuf_t; struct processlist_s { char *process; - char *process_regex; - regex_t process_regex_obj; - - uint32_t is_regex; - uint32_t pid; + long pid; + int32_t last_status; struct processlist_s *next; }; @@ -126,36 +125,37 @@ typedef struct processlist_s processlist_t; /* * Private variables */ - -static int procevent_thread_loop = 0; -static int procevent_thread_error = 0; -static pthread_t procevent_thread_id; -static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER; +static ignorelist_t *ignorelist = NULL; + +static int procevent_netlink_thread_loop = 0; +static int procevent_netlink_thread_error = 0; +static pthread_t procevent_netlink_thread_id; +static int procevent_dequeue_thread_loop = 0; +static pthread_t procevent_dequeue_thread_id; +static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER; static int nl_sock = -1; static int buffer_length; static circbuf_t ring; static processlist_t *processlist_head = NULL; static int event_id = 0; -static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"}; +static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* * Private functions */ -static int gen_message_payload(int state, int pid, char *process, - long long unsigned int timestamp, char **buf) { +static int gen_message_payload(int state, long pid, char *process, + cdtime_t timestamp, char **buf) { const unsigned char *buf2; yajl_gen g; char json_str[DATA_MAX_NAME_LEN]; #if !defined(HAVE_YAJL_V2) - yajl_gen_config conf = {}; - - conf.beautify = 0; + yajl_gen_config conf = {0}; #endif #if HAVE_YAJL_V2 @@ -189,9 +189,9 @@ static int gen_message_payload(int state, int pid, char *process, goto err; event_id = event_id + 1; - int event_id_len = sizeof(char) * sizeof(int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_id_len, "%d", event_id); + if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -202,16 +202,11 @@ static int gen_message_payload(int state, int pid, char *process, strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok) goto err; - int event_name_len = 0; - event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid - event_name_len = event_name_len + strlen(process); // process name - event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up" - event_name_len = event_name_len + - 13; // "process", 3 spaces, 2 parentheses and null-terminator - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_name_len, "process %s (%d) %s", process, pid, - (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE - : PROCEVENT_EVENT_NAME_UP_VALUE)); + if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid, + (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE + : PROCEVENT_EVENT_NAME_UP_VALUE)) < 0) { + goto err; + } if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { @@ -224,11 +219,10 @@ static int gen_message_payload(int state, int pid, char *process, yajl_gen_status_ok) goto err; - int last_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, last_epoch_microsec_len, "%llu", - (long long unsigned int)CDTIME_T_TO_US(cdtime())); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(cdtime())) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -279,11 +273,10 @@ static int gen_message_payload(int state, int pid, char *process, yajl_gen_status_ok) goto err; - int start_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, start_epoch_microsec_len, "%llu", - (long long unsigned int)timestamp); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(timestamp)) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -316,16 +309,10 @@ static int gen_message_payload(int state, int pid, char *process, yajl_gen_status_ok) goto err; - int alarm_condition_len = 0; - alarm_condition_len = - alarm_condition_len + (sizeof(char) * sizeof(int) * 4); // pid - alarm_condition_len = alarm_condition_len + strlen(process); // process name - alarm_condition_len = - alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2 - // parentheses and null-terminator - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, alarm_condition_len, "process %s (%d) state change", - process, pid); + if (snprintf(json_str, sizeof(json_str), "process %s (%ld) state change", + process, pid) < 0) { + goto err; + } if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { @@ -349,8 +336,9 @@ static int gen_message_payload(int state, int pid, char *process, goto err; if (yajl_gen_string( - g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE - : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE), + g, + (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE + : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE), strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) != yajl_gen_status_ok) @@ -384,19 +372,11 @@ static int gen_message_payload(int state, int pid, char *process, yajl_gen_status_ok) goto err; - int specific_problem_len = 0; - specific_problem_len = - specific_problem_len + (sizeof(char) * sizeof(int) * 4); // pid - specific_problem_len = specific_problem_len + strlen(process); // process name - specific_problem_len = - specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up" - specific_problem_len = - specific_problem_len + - 13; // "process", 3 spaces, 2 parentheses and null-terminator - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, specific_problem_len, "process %s (%d) %s", process, pid, - (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE - : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE)); + if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid, + (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE + : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE)) < 0) { + goto err; + } if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { @@ -409,27 +389,31 @@ static int gen_message_payload(int state, int pid, char *process, goto err; if (yajl_gen_string( - g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE - : PROCEVENT_VF_STATUS_NORMAL_VALUE), + g, + (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE + : PROCEVENT_VF_STATUS_NORMAL_VALUE), strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE : PROCEVENT_VF_STATUS_NORMAL_VALUE))) != yajl_gen_status_ok) goto err; - if (yajl_gen_map_close(g) != yajl_gen_status_ok) - goto err; - // *** END fault fields *** - if (yajl_gen_map_close(g) != yajl_gen_status_ok) + // close fault and header fields + if (yajl_gen_map_close(g) != yajl_gen_status_ok || + yajl_gen_map_close(g) != yajl_gen_status_ok) goto err; if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; - *buf = malloc(strlen((char *)buf2) + 1); + *buf = strdup((char *)buf2); - sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); + if (*buf == NULL) { + ERROR("procevent plugin: strdup failed during gen_message_payload: %s", + STRERRNO); + goto err; + } yajl_gen_free(g); @@ -442,86 +426,80 @@ err: } // Does /proc//comm contain a process name we are interested in? -static processlist_t *process_check(int pid) { - int len, is_match, status, retval; +// NOTE: Caller MUST hold procevent_data_lock when calling this function +static processlist_t *process_check(long pid) { char file[BUFSIZE]; - FILE *fh; - char buffer[BUFSIZE]; - regmatch_t matches[PROCEVENT_REGEX_MATCHES]; - len = snprintf(file, sizeof(file), PROCDIR "/%d/comm", pid); + int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid); if ((len < 0) || (len >= BUFSIZE)) { WARNING("procevent process_check: process name too large"); return NULL; } + FILE *fh; + if (NULL == (fh = fopen(file, "r"))) { // No /proc//comm for this pid, just ignore - DEBUG("procevent plugin: no comm file available for pid %d", pid); + DEBUG("procevent plugin: no comm file available for pid %ld", pid); return NULL; } - retval = fscanf(fh, "%[^\n]", buffer); + char buffer[BUFSIZE]; + int retval = fscanf(fh, "%[^\n]", buffer); if (retval < 0) { - WARNING("procevent process_check: unable to read comm file for pid %d", + WARNING("procevent process_check: unable to read comm file for pid %ld", pid); + fclose(fh); return NULL; } + // Now that we have the process name in the buffer, check if we are + // even interested in it + if (ignorelist_match(ignorelist, buffer) != 0) { + DEBUG("procevent process_check: ignoring process %s (%ld)", buffer, pid); + fclose(fh); + return NULL; + } + + if (fh != NULL) { + fclose(fh); + fh = NULL; + } + // // Go through the processlist linked list and look for the process name // in /proc//comm. If found: - // 1. If pl->pid is -1, then set pl->pid to + // 1. If pl->pid is -1, then set pl->pid to (and return that object) // 2. If pl->pid is not -1, then another process was already // found. If == pl->pid, this is an old match, so do nothing. - // If the is different, however, make a new processlist_t and + // If the is different, however, make a new processlist_t and // associate with it (with the same process name as the existing). // - pthread_mutex_lock(&procevent_list_lock); - - processlist_t *pl; processlist_t *match = NULL; - for (pl = processlist_head; pl != NULL; pl = pl->next) { - if (pl->is_regex != 0) { - is_match = (regexec(&pl->process_regex_obj, buffer, - PROCEVENT_REGEX_MATCHES, matches, 0) == 0 - ? 1 - : 0); - } else { - is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0); - } + for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) { + + int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0); if (is_match == 1) { - DEBUG("procevent plugin: process %d name match (pattern: %s) for %s", pid, - (pl->is_regex == 0 ? pl->process : pl->process_regex), buffer); - - if (pl->is_regex == 1) { - // If this is a regex name, copy the actual process name into the object - // for cleaner log reporting - - if (pl->process != NULL) - sfree(pl->process); - pl->process = strdup(buffer); - if (pl->process == NULL) { - char errbuf[1024]; - ERROR("procevent plugin: strdup failed during process_check: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - pthread_mutex_unlock(&procevent_list_lock); - return NULL; - } - } + DEBUG("procevent plugin: process %ld name match for %s", pid, buffer); if (pl->pid == pid) { // this is a match, and we've already stored the exact pid/name combo + DEBUG("procevent plugin: found exact match with name %s, PID %ld for " + "incoming PID %ld", + pl->process, pl->pid, pid); match = pl; break; } else if (pl->pid == -1) { // this is a match, and we've found a candidate processlist_t to store // this new pid/name combo + DEBUG("procevent plugin: reusing pl object with PID %ld for incoming " + "PID %ld", + pl->pid, pid); pl->pid = pid; match = pl; break; @@ -529,70 +507,41 @@ static processlist_t *process_check(int pid) { // this is a match, but another instance of this process has already // claimed this pid/name combo, // so keep looking + DEBUG("procevent plugin: found pl object with matching name for " + "incoming PID %ld, but object is in use by PID %ld", + pid, pl->pid); match = pl; continue; } } } - if (match != NULL && match->pid != -1 && match->pid != pid) { + if (match == NULL || + (match != NULL && match->pid != -1 && match->pid != pid)) { + // if there wasn't an existing match, OR // if there was a match but the associated processlist_t object already // contained a pid/name combo, // then make a new one and add it to the linked list - DEBUG( - "procevent plugin: allocating new processlist_t object for PID %d (%s)", - pid, match->process); - - processlist_t *pl2; - char *process; - char *process_regex; + DEBUG("procevent plugin: allocating new processlist_t object for PID %ld " + "(%s)", + pid, buffer); - pl2 = malloc(sizeof(*pl2)); + processlist_t *pl2 = calloc(1, sizeof(*pl2)); if (pl2 == NULL) { - char errbuf[1024]; - ERROR("procevent plugin: malloc failed during process_check: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - pthread_mutex_unlock(&procevent_list_lock); + ERROR("procevent plugin: calloc failed during process_check: %s", + STRERRNO); return NULL; } - process = strdup(match->process); + char *process = strdup(buffer); if (process == NULL) { - char errbuf[1024]; sfree(pl2); ERROR("procevent plugin: strdup failed during process_check: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - pthread_mutex_unlock(&procevent_list_lock); + STRERRNO); return NULL; } - if (match->is_regex == 1) { - pl2->is_regex = 1; - status = - regcomp(&pl2->process_regex_obj, match->process_regex, REG_EXTENDED); - - if (status != 0) { - sfree(pl2); - sfree(process); - ERROR("procevent plugin: invalid regular expression: %s", - match->process_regex); - return NULL; - } - - process_regex = strdup(match->process_regex); - if (process_regex == NULL) { - char errbuf[1024]; - sfree(pl2); - sfree(process); - ERROR("procevent plugin: strdup failed during process_check: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return NULL; - } - - pl2->process_regex = process_regex; - } - pl2->process = process; pl2->pid = pid; pl2->next = processlist_head; @@ -601,86 +550,61 @@ static processlist_t *process_check(int pid) { match = pl2; } - pthread_mutex_unlock(&procevent_list_lock); - - if (fh != NULL) { - fclose(fh); - fh = NULL; - } - return match; } // Does our map have this PID or name? -static processlist_t *process_map_check(int pid, char *process) { - processlist_t *pl; - - pthread_mutex_lock(&procevent_list_lock); - - for (pl = processlist_head; pl != NULL; pl = pl->next) { +// NOTE: Caller MUST hold procevent_data_lock when calling this function +static processlist_t *process_map_check(long pid, char *process) { + for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) { int match_pid = 0; - int match_process = 0; - int match = 0; if (pid > 0) { if (pl->pid == pid) match_pid = 1; } + int match_process = 0; + if (process != NULL) { if (strcmp(pl->process, process) == 0) match_process = 1; } - if (pid > 0 && process == NULL && match_pid == 1) - match = 1; - else if (pid < 0 && process != NULL && match_process == 1) - match = 1; - else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1) + int match = 0; + + if ((pid > 0 && process == NULL && match_pid == 1) || + (pid < 0 && process != NULL && match_process == 1) || + (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)) { match = 1; + } if (match == 1) { - pthread_mutex_unlock(&procevent_list_lock); return pl; } } - pthread_mutex_unlock(&procevent_list_lock); - return NULL; } static int process_map_refresh(void) { - DIR *proc; - errno = 0; - proc = opendir(PROCDIR); + DIR *proc = opendir(PROCDIR); + if (proc == NULL) { - char errbuf[1024]; - ERROR("procevent plugin: fopen (%s): %s", PROCDIR, - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO); return -1; } while (42) { - struct dirent *dent; - int len; - char file[BUFSIZE]; - - struct stat statbuf; - - int status; - errno = 0; - dent = readdir(proc); + struct dirent *dent = readdir(proc); if (dent == NULL) { - char errbuf[4096]; - if (errno == 0) /* end of directory */ break; ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR, - sstrerror(errno, errbuf, sizeof(errbuf))); + STRERRNO); closedir(proc); return -1; } @@ -688,15 +612,17 @@ static int process_map_refresh(void) { if (dent->d_name[0] == '.') continue; - len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name); + char file[BUFSIZE]; + + int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name); if ((len < 0) || (len >= BUFSIZE)) continue; - status = stat(file, &statbuf); + struct stat statbuf; + + int status = stat(file, &statbuf); if (status != 0) { - char errbuf[4096]; - WARNING("procevent plugin: stat (%s) failed: %s", file, - sstrerror(errno, errbuf, sizeof(errbuf))); + WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO); continue; } @@ -722,7 +648,9 @@ static int process_map_refresh(void) { // Check if we need to store this pid/name combo in our processlist_t linked // list int this_pid = atoi(dent->d_name); + pthread_mutex_lock(&procevent_data_lock); processlist_t *pl = process_check(this_pid); + pthread_mutex_unlock(&procevent_data_lock); if (pl != NULL) DEBUG("procevent plugin: process map refreshed for PID %d and name %s", @@ -735,23 +663,23 @@ static int process_map_refresh(void) { } static int nl_connect() { - int rc; - struct sockaddr_nl sa_nl; + struct sockaddr_nl sa_nl = { + .nl_family = AF_NETLINK, + .nl_groups = CN_IDX_PROC, + .nl_pid = getpid(), + }; nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR); if (nl_sock == -1) { - ERROR("procevent plugin: socket open failed."); + ERROR("procevent plugin: socket open failed: %d", errno); return -1; } - sa_nl.nl_family = AF_NETLINK; - sa_nl.nl_groups = CN_IDX_PROC; - sa_nl.nl_pid = getpid(); - - rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); if (rc == -1) { - ERROR("procevent plugin: socket bind failed."); + ERROR("procevent plugin: socket bind failed: %d", errno); close(nl_sock); + nl_sock = -1; return -1; } @@ -759,7 +687,6 @@ static int nl_connect() { } static int set_proc_ev_listen(bool enable) { - int rc; struct __attribute__((aligned(NLMSG_ALIGNTO))) { struct nlmsghdr nl_hdr; struct __attribute__((__packed__)) { @@ -779,21 +706,19 @@ static int set_proc_ev_listen(bool enable) { nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE; - rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0); + int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0); if (rc == -1) { - ERROR("procevent plugin: subscribing to netlink process events failed."); + ERROR("procevent plugin: subscribing to netlink process events failed: %d", + errno); return -1; } return 0; } +// Read from netlink socket and write to ring buffer static int read_event() { - int status; - int ret = 0; - int proc_id = -1; - int proc_status = -1; - int proc_extra = -1; + int recv_flags = MSG_DONTWAIT; struct __attribute__((aligned(NLMSG_ALIGNTO))) { struct nlmsghdr nl_hdr; struct __attribute__((__packed__)) { @@ -803,416 +728,548 @@ static int read_event() { } nlcn_msg; if (nl_sock == -1) - return ret; + return 0; - status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0); + while (42) { + pthread_mutex_lock(&procevent_thread_lock); - if (status == 0) { - return 0; - } else if (status == -1) { - if (errno != EINTR) { - ERROR("procevent plugin: socket receive error: %d", errno); - return -1; + if (procevent_netlink_thread_loop <= 0) { + pthread_mutex_unlock(&procevent_thread_lock); + return 0; + } + + pthread_mutex_unlock(&procevent_thread_lock); + + int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags); + + if (status == 0) { + return 0; + } else if (status < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + pthread_mutex_lock(&procevent_data_lock); + + // There was nothing more to receive for now, so... + // If ring head does not equal ring tail, then there is data + // in the ring buffer for the dequeue thread to read, so + // signal it + if (ring.head != ring.tail) + pthread_cond_signal(&procevent_cond); + + pthread_mutex_unlock(&procevent_data_lock); + + // Since there was nothing to receive, set recv to block and + // try again + recv_flags = 0; + continue; + } else if (errno != EINTR) { + ERROR("procevent plugin: socket receive error: %d", errno); + return -1; + } else { + // Interrupt, so just continue and try again + continue; + } + } + + // We successfully received a message, so don't block on the next + // read in case there are more (and if there aren't, it will be + // handled above in the EWOULDBLOCK error-checking) + recv_flags = MSG_DONTWAIT; + + int proc_id = -1; + int proc_status = -1; + + switch (nlcn_msg.proc_ev.what) { + case PROC_EVENT_EXEC: + proc_status = PROCEVENT_STARTED; + proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid; + break; + case PROC_EVENT_EXIT: + proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid; + proc_status = PROCEVENT_EXITED; + break; + default: + // Otherwise not of interest + break; + } + + // If we're interested in this process status event, place the event + // in the ring buffer for consumption by the dequeue (dispatch) thread. + + if (proc_status != -1) { + pthread_mutex_lock(&procevent_data_lock); + + int next = ring.head + 1; + if (next >= ring.maxLen) + next = 0; + + if (next == ring.tail) { + // Buffer is full, signal the dequeue thread to process the buffer + // and clean it out, and then sleep + WARNING("procevent plugin: ring buffer full"); + + pthread_cond_signal(&procevent_cond); + pthread_mutex_unlock(&procevent_data_lock); + + usleep(1000); + continue; + } else { + DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id, + (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"), + (unsigned long long)cdtime()); + + ring.buffer[ring.head][RBUF_PROC_ID_INDEX] = proc_id; + ring.buffer[ring.head][RBUF_PROC_STATUS_INDEX] = proc_status; + ring.buffer[ring.head][RBUF_TIME_INDEX] = cdtime(); + + ring.head = next; + } + + pthread_mutex_unlock(&procevent_data_lock); } } - switch (nlcn_msg.proc_ev.what) { - case PROC_EVENT_NONE: - case PROC_EVENT_FORK: - case PROC_EVENT_UID: - case PROC_EVENT_GID: - // Not of interest in current version - break; - case PROC_EVENT_EXEC: - proc_status = PROCEVENT_STARTED; - proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid; - break; - case PROC_EVENT_EXIT: - proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid; - proc_status = PROCEVENT_EXITED; - proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code; - break; - default: - break; + return 0; +} + +static void procevent_dispatch_notification(long pid, gauge_t value, + char *process, cdtime_t timestamp) { + + notification_t n = { + .severity = (value == 1 ? NOTIF_OKAY : NOTIF_FAILURE), + .time = cdtime(), + .plugin = "procevent", + .type = "gauge", + .type_instance = "process_status", + }; + + sstrncpy(n.host, hostname_g, sizeof(n.host)); + sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance)); + + char *buf = NULL; + gen_message_payload(value, pid, process, timestamp, &buf); + + int status = plugin_notification_meta_add_string(&n, "ves", buf); + + if (status < 0) { + sfree(buf); + ERROR("procevent plugin: unable to set notification VES metadata: %s", + STRERRNO); + return; } - // If we're interested in this process status event, place the event - // in the ring buffer for consumption by the main polling thread. + DEBUG("procevent plugin: notification VES metadata: %s", + n.meta->nm_value.nm_string); + + DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value, + pid, process); + + plugin_dispatch_notification(&n); + plugin_notification_meta_free(n.meta); + + // strdup'd in gen_message_payload + if (buf != NULL) + sfree(buf); +} - if (proc_status != -1) { - pthread_mutex_unlock(&procevent_lock); +// Read from ring buffer and dispatch to write plugins +static void read_ring_buffer() { + pthread_mutex_lock(&procevent_data_lock); + + // If there's currently nothing to read from the buffer, + // then wait + if (ring.head == ring.tail) + pthread_cond_wait(&procevent_cond, &procevent_data_lock); + + while (ring.head != ring.tail) { + int next = ring.tail + 1; - int next = ring.head + 1; if (next >= ring.maxLen) next = 0; - if (next == ring.tail) { - WARNING("procevent plugin: ring buffer full"); - } else { - DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id, - (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"), - (long long unsigned int)CDTIME_T_TO_US(cdtime())); - - if (proc_status == PROCEVENT_EXITED) { - ring.buffer[ring.head][0] = proc_id; - ring.buffer[ring.head][1] = proc_status; - ring.buffer[ring.head][2] = proc_extra; - ring.buffer[ring.head][3] = - (long long unsigned int)CDTIME_T_TO_US(cdtime()); - } else { - ring.buffer[ring.head][0] = proc_id; - ring.buffer[ring.head][1] = proc_status; - ring.buffer[ring.head][2] = 0; - ring.buffer[ring.head][3] = - (long long unsigned int)CDTIME_T_TO_US(cdtime()); + if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] == PROCEVENT_EXITED) { + processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL); + + if (pl != NULL) { + // This process is of interest to us, so publish its EXITED status + procevent_dispatch_notification( + ring.buffer[ring.tail][RBUF_PROC_ID_INDEX], + ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process, + ring.buffer[ring.tail][RBUF_TIME_INDEX]); + DEBUG( + "procevent plugin: PID %ld (%s) EXITED, removing PID from process " + "list", + pl->pid, pl->process); + pl->pid = -1; + pl->last_status = -1; } + } else if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] == + PROCEVENT_STARTED) { + // a new process has started, so check if we should monitor it + processlist_t *pl = process_check(ring.buffer[ring.tail][0]); + + // If we had already seen this process name and pid combo before, + // and the last message was a "process started" message, don't send + // the notfication again + + if (pl != NULL && pl->last_status != PROCEVENT_STARTED) { + // This process is of interest to us, so publish its STARTED status + procevent_dispatch_notification( + ring.buffer[ring.tail][RBUF_PROC_ID_INDEX], + ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process, + ring.buffer[ring.tail][RBUF_TIME_INDEX]); - ring.head = next; + pl->last_status = PROCEVENT_STARTED; + + DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process " + "list", + pl->pid, pl->process); + } } - pthread_mutex_unlock(&procevent_lock); + ring.tail = next; } - return ret; + pthread_mutex_unlock(&procevent_data_lock); } -static void *procevent_thread(void *arg) /* {{{ */ +// Entry point for thread responsible for listening +// to netlink socket and writing data to ring buffer +static void *procevent_netlink_thread(void *arg) /* {{{ */ { - pthread_mutex_lock(&procevent_lock); - - while (procevent_thread_loop > 0) { - int status; - - pthread_mutex_unlock(&procevent_lock); + pthread_mutex_lock(&procevent_thread_lock); - usleep(1000); + while (procevent_netlink_thread_loop > 0) { + pthread_mutex_unlock(&procevent_thread_lock); - status = read_event(); + int status = read_event(); - pthread_mutex_lock(&procevent_lock); + pthread_mutex_lock(&procevent_thread_lock); if (status < 0) { - procevent_thread_error = 1; + procevent_netlink_thread_error = 1; break; } + } /* while (procevent_netlink_thread_loop > 0) */ - if (procevent_thread_loop <= 0) - break; - } /* while (procevent_thread_loop > 0) */ - - pthread_mutex_unlock(&procevent_lock); + pthread_mutex_unlock(&procevent_thread_lock); - return ((void *)0); -} /* }}} void *procevent_thread */ + return (void *)0; +} /* }}} void *procevent_netlink_thread */ -static int start_thread(void) /* {{{ */ +// Entry point for thread responsible for reading from +// ring buffer and dispatching notifications +static void *procevent_dequeue_thread(void *arg) /* {{{ */ { - int status; + pthread_mutex_lock(&procevent_thread_lock); + + while (procevent_dequeue_thread_loop > 0) { + pthread_mutex_unlock(&procevent_thread_lock); + + read_ring_buffer(); + + pthread_mutex_lock(&procevent_thread_lock); + } /* while (procevent_dequeue_thread_loop > 0) */ + + pthread_mutex_unlock(&procevent_thread_lock); - pthread_mutex_lock(&procevent_lock); + return (void *)0; +} /* }}} void *procevent_dequeue_thread */ - if (procevent_thread_loop != 0) { - pthread_mutex_unlock(&procevent_lock); - return (0); +static int start_netlink_thread(void) /* {{{ */ +{ + pthread_mutex_lock(&procevent_thread_lock); + + if (procevent_netlink_thread_loop != 0) { + pthread_mutex_unlock(&procevent_thread_lock); + return 0; } + int status; + if (nl_sock == -1) { status = nl_connect(); - if (status != 0) + if (status != 0) { + pthread_mutex_unlock(&procevent_thread_lock); return status; + } status = set_proc_ev_listen(true); - if (status == -1) + if (status == -1) { + pthread_mutex_unlock(&procevent_thread_lock); return status; + } } DEBUG("procevent plugin: socket created and bound"); - procevent_thread_loop = 1; - procevent_thread_error = 0; + procevent_netlink_thread_loop = 1; + procevent_netlink_thread_error = 0; - status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL, - procevent_thread, + status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL, + procevent_netlink_thread, /* arg = */ (void *)0, "procevent"); if (status != 0) { - procevent_thread_loop = 0; - ERROR("procevent plugin: Starting thread failed."); - pthread_mutex_unlock(&procevent_lock); - return (-1); + procevent_netlink_thread_loop = 0; + ERROR("procevent plugin: Starting netlink thread failed."); + pthread_mutex_unlock(&procevent_thread_lock); + + int status2 = close(nl_sock); + + if (status2 != 0) { + ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock, + status2, STRERRNO); + } + + nl_sock = -1; + + return -1; } - pthread_mutex_unlock(&procevent_lock); - return (0); -} /* }}} int start_thread */ + pthread_mutex_unlock(&procevent_thread_lock); -static int stop_thread(int shutdown) /* {{{ */ + return status; +} /* }}} int start_netlink_thread */ + +static int start_dequeue_thread(void) /* {{{ */ { - int status; + pthread_mutex_lock(&procevent_thread_lock); + + if (procevent_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&procevent_thread_lock); + return 0; + } + + procevent_dequeue_thread_loop = 1; + + int status = plugin_thread_create(&procevent_dequeue_thread_id, + /* attr = */ NULL, procevent_dequeue_thread, + /* arg = */ (void *)0, "procevent"); + if (status != 0) { + procevent_dequeue_thread_loop = 0; + ERROR("procevent plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&procevent_thread_lock); + return -1; + } + + pthread_mutex_unlock(&procevent_thread_lock); + + return status; +} /* }}} int start_dequeue_thread */ + +static int start_threads(void) /* {{{ */ +{ + int status = start_netlink_thread(); + int status2 = start_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int start_threads */ + +static int stop_netlink_thread(int shutdown) /* {{{ */ +{ + int socket_status; if (nl_sock != -1) { - status = close(nl_sock); - if (status != 0) { + socket_status = close(nl_sock); + if (socket_status != 0) { ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock, - status, strerror(errno)); - return (-1); - } else - nl_sock = -1; - } + socket_status, strerror(errno)); + } + + nl_sock = -1; + } else + socket_status = 0; - pthread_mutex_lock(&procevent_lock); + pthread_mutex_lock(&procevent_thread_lock); - if (procevent_thread_loop == 0) { - pthread_mutex_unlock(&procevent_lock); - return (-1); + if (procevent_netlink_thread_loop == 0) { + pthread_mutex_unlock(&procevent_thread_lock); + return -1; } - procevent_thread_loop = 0; + // Set thread termination status + procevent_netlink_thread_loop = 0; + pthread_mutex_unlock(&procevent_thread_lock); + + // Let threads waiting on access to the data know to move + // on such that they'll see the thread's termination status pthread_cond_broadcast(&procevent_cond); - pthread_mutex_unlock(&procevent_lock); + + int thread_status; if (shutdown == 1) { // Calling pthread_cancel here in // the case of a shutdown just assures that the thread is // gone and that the process has been fully terminated. - DEBUG("procevent plugin: Canceling thread for process shutdown"); + DEBUG("procevent plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(procevent_thread_id); + thread_status = pthread_cancel(procevent_netlink_thread_id); - if (status != 0) { - ERROR("procevent plugin: Unable to cancel thread: %d", status); - status = -1; - } + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("procevent plugin: Unable to cancel netlink thread: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; } else { - status = pthread_join(procevent_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("procevent plugin: Stopping thread failed."); - status = -1; - } + thread_status = + pthread_join(procevent_netlink_thread_id, /* return = */ NULL); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("procevent plugin: Stopping netlink thread failed."); + thread_status = -1; + } else + thread_status = 0; } - pthread_mutex_lock(&procevent_lock); - memset(&procevent_thread_id, 0, sizeof(procevent_thread_id)); - procevent_thread_error = 0; - pthread_mutex_unlock(&procevent_lock); + pthread_mutex_lock(&procevent_thread_lock); + memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id)); + procevent_netlink_thread_error = 0; + pthread_mutex_unlock(&procevent_thread_lock); - DEBUG("procevent plugin: Finished requesting stop of thread"); + DEBUG("procevent plugin: Finished requesting stop of netlink thread"); - return (status); -} /* }}} int stop_thread */ + if (socket_status != 0) + return socket_status; + else + return thread_status; +} /* }}} int stop_netlink_thread */ -static int procevent_init(void) /* {{{ */ +static int stop_dequeue_thread() /* {{{ */ { - int status; + pthread_mutex_lock(&procevent_thread_lock); - if (processlist_head == NULL) { - NOTICE("procevent plugin: No processes have been configured."); - return (-1); + if (procevent_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&procevent_thread_lock); + return -1; } + procevent_dequeue_thread_loop = 0; + pthread_mutex_unlock(&procevent_thread_lock); + + pthread_cond_broadcast(&procevent_cond); + + // Calling pthread_cancel here just assures that the thread is + // gone and that the process has been fully terminated. + + DEBUG("procevent plugin: Canceling dequeue thread for process shutdown"); + + int status = pthread_cancel(procevent_dequeue_thread_id); + + if (status != 0 && status != ESRCH) { + ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status); + status = -1; + } else + status = 0; + + pthread_mutex_lock(&procevent_thread_lock); + memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id)); + pthread_mutex_unlock(&procevent_thread_lock); + + DEBUG("procevent plugin: Finished requesting stop of dequeue thread"); + + return status; +} /* }}} int stop_dequeue_thread */ + +static int stop_threads() /* {{{ */ +{ + int status = stop_netlink_thread(1); + int status2 = stop_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int stop_threads */ + +static int procevent_init(void) /* {{{ */ +{ ring.head = 0; ring.tail = 0; ring.maxLen = buffer_length; - ring.buffer = (long long unsigned int **)malloc( - buffer_length * sizeof(long long unsigned int *)); + ring.buffer = (cdtime_t **)calloc(buffer_length, sizeof(cdtime_t *)); for (int i = 0; i < buffer_length; i++) { - ring.buffer[i] = (long long unsigned int *)malloc( - PROCEVENT_FIELDS * sizeof(long long unsigned int)); + ring.buffer[i] = (cdtime_t *)calloc(PROCEVENT_FIELDS, sizeof(cdtime_t)); } - status = process_map_refresh(); + int status = process_map_refresh(); if (status == -1) { ERROR("procevent plugin: Initial process mapping failed."); - return (-1); + return -1; } - return (start_thread()); + if (ignorelist == NULL) { + NOTICE("procevent plugin: No processes have been configured."); + return -1; + } + + return start_threads(); } /* }}} int procevent_init */ static int procevent_config(const char *key, const char *value) /* {{{ */ { - int status; + if (ignorelist == NULL) + ignorelist = ignorelist_create(/* invert = */ 1); + + if (ignorelist == NULL) { + return -1; + } if (strcasecmp(key, "BufferLength") == 0) { buffer_length = atoi(value); - } else if (strcasecmp(key, "Process") == 0 || - strcasecmp(key, "RegexProcess") == 0) { - - processlist_t *pl; - char *process; - char *process_regex; - - pl = malloc(sizeof(*pl)); - if (pl == NULL) { - char errbuf[1024]; - ERROR("procevent plugin: malloc failed during procevent_config: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return (1); - } - - process = strdup(value); - if (process == NULL) { - char errbuf[1024]; - sfree(pl); - ERROR("procevent plugin: strdup failed during procevent_config: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return (1); - } + } else if (strcasecmp(key, "Process") == 0) { + ignorelist_add(ignorelist, value); + } else if (strcasecmp(key, "ProcessRegex") == 0) { +#if HAVE_REGEX_H + int status = ignorelist_add(ignorelist, value); - if (strcasecmp(key, "RegexProcess") == 0) { - pl->is_regex = 1; - status = regcomp(&pl->process_regex_obj, value, REG_EXTENDED); - - if (status != 0) { - sfree(pl); - sfree(process); - ERROR("procevent plugin: invalid regular expression: %s", value); - return (1); - } - - process_regex = strdup(value); - if (process_regex == NULL) { - char errbuf[1024]; - sfree(pl); - sfree(process); - ERROR("procevent plugin: strdup failed during procevent_config: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return (1); - } - - pl->process_regex = process_regex; - } else { - pl->is_regex = 0; + if (status != 0) { + ERROR("procevent plugin: invalid regular expression: %s", value); + return 1; } - - pl->process = process; - pl->pid = -1; - pl->next = processlist_head; - processlist_head = pl; +#else + WARNING("procevent plugin: The plugin has been compiled without support " + "for the \"ProcessRegex\" option."); +#endif } else { - return (-1); + return -1; } - return (0); + return 0; } /* }}} int procevent_config */ -static void procevent_dispatch_notification(int pid, const char *type, /* {{{ */ - gauge_t value, char *process, - long long unsigned int timestamp) { - char *buf = NULL; - notification_t n = {NOTIF_FAILURE, cdtime(), "", "", "procevent", "", "", "", - NULL}; - - if (value == 1) - n.severity = NOTIF_OKAY; - - char hostname[1024]; - gethostname(hostname, sizeof(hostname)); - - sstrncpy(n.host, hostname, sizeof(n.host)); - sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance)); - sstrncpy(n.type, "gauge", sizeof(n.type)); - sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance)); - - gen_message_payload(value, pid, process, timestamp, &buf); - - notification_meta_t *m = calloc(1, sizeof(*m)); - - if (m == NULL) { - char errbuf[1024]; - sfree(buf); - ERROR("procevent plugin: unable to allocate metadata: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return; - } - - sstrncpy(m->name, "ves", sizeof(m->name)); - m->nm_value.nm_string = sstrdup(buf); - m->type = NM_TYPE_STRING; - n.meta = m; - - DEBUG("procevent plugin: notification message: %s", - n.meta->nm_value.nm_string); - - DEBUG("procevent plugin: dispatching state %d for PID %d (%s)", (int)value, - pid, process); - - plugin_dispatch_notification(&n); - plugin_notification_meta_free(n.meta); - - // malloc'd in gen_message_payload - if (buf != NULL) - sfree(buf); -} - static int procevent_read(void) /* {{{ */ { - if (procevent_thread_error != 0) { - ERROR( - "procevent plugin: The interface thread had a problem. Restarting it."); - - stop_thread(0); + pthread_mutex_lock(&procevent_thread_lock); - start_thread(); + if (procevent_netlink_thread_error != 0) { - return (-1); - } /* if (procevent_thread_error != 0) */ + pthread_mutex_unlock(&procevent_thread_lock); - pthread_mutex_lock(&procevent_lock); + ERROR("procevent plugin: The netlink thread had a problem. Restarting it."); - while (ring.head != ring.tail) { - int next = ring.tail + 1; - - if (next >= ring.maxLen) - next = 0; - - if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) { - processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL); - - if (pl != NULL) { - // This process is of interest to us, so publish its EXITED status - procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge", - ring.buffer[ring.tail][1], pl->process, - ring.buffer[ring.tail][3]); - DEBUG("procevent plugin: PID %d (%s) EXITED, removing PID from process " - "list", - pl->pid, pl->process); - pl->pid = -1; - } - } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) { - // a new process has started, so check if we should monitor it - processlist_t *pl = process_check(ring.buffer[ring.tail][0]); + stop_netlink_thread(0); - if (pl != NULL) { - // This process is of interest to us, so publish its STARTED status - procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge", - ring.buffer[ring.tail][1], pl->process, - ring.buffer[ring.tail][3]); - DEBUG( - "procevent plugin: PID %d (%s) STARTED, adding PID to process list", - pl->pid, pl->process); - } - } + start_netlink_thread(); - ring.tail = next; - } + return -1; + } /* if (procevent_netlink_thread_error != 0) */ - pthread_mutex_unlock(&procevent_lock); + pthread_mutex_unlock(&procevent_thread_lock); - return (0); + return 0; } /* }}} int procevent_read */ static int procevent_shutdown(void) /* {{{ */ { - processlist_t *pl; + DEBUG("procevent plugin: Shutting down threads."); - DEBUG("procevent plugin: Shutting down thread."); - - if (stop_thread(1) < 0) - return (-1); + int status = stop_threads(); for (int i = 0; i < buffer_length; i++) { free(ring.buffer[i]); @@ -1220,24 +1277,21 @@ static int procevent_shutdown(void) /* {{{ */ free(ring.buffer); - pl = processlist_head; + processlist_t *pl = processlist_head; while (pl != NULL) { processlist_t *pl_next; pl_next = pl->next; - if (pl->is_regex == 1) { - sfree(pl->process_regex); - regfree(&pl->process_regex_obj); - } - sfree(pl->process); sfree(pl); pl = pl_next; } - return (0); + ignorelist_free(ignorelist); + + return status; } /* }}} int procevent_shutdown */ void module_register(void) {