From: Andrew Bays Date: Fri, 12 Oct 2018 17:47:43 +0000 (-0400) Subject: Drain netlink socket before sleeping, then filter repeated process start msgs X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=b5d7a870dea53f562ad275adf01040896fdbab8d Drain netlink socket before sleeping, then filter repeated process start msgs --- diff --git a/src/procevent.c b/src/procevent.c index aba66178..78c9182b 100644 --- a/src/procevent.c +++ b/src/procevent.c @@ -112,7 +112,8 @@ typedef struct { struct processlist_s { char *process; - uint32_t pid; + long pid; + int32_t last_status; struct processlist_s *next; }; @@ -345,8 +346,9 @@ static int gen_message_payload(int state, int pid, char *process, goto err; if (yajl_gen_string( - g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE - : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE), + g, + (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE + : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE), strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) != yajl_gen_status_ok) @@ -405,8 +407,9 @@ static int gen_message_payload(int state, int pid, char *process, goto err; if (yajl_gen_string( - g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE - : PROCEVENT_VF_STATUS_NORMAL_VALUE), + g, + (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE + : PROCEVENT_VF_STATUS_NORMAL_VALUE), strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE : PROCEVENT_VF_STATUS_NORMAL_VALUE))) != yajl_gen_status_ok) @@ -510,11 +513,17 @@ static processlist_t *process_check(int pid) { if (pl->pid == pid) { // this is a match, and we've already stored the exact pid/name combo + DEBUG("procevent plugin: found exact match with name %s, PID %ld for " + "incoming PID %d", + pl->process, pl->pid, pid); match = pl; break; } else if (pl->pid == -1) { // this is a match, and we've found a candidate processlist_t to store // this new pid/name combo + DEBUG("procevent plugin: reusing pl object with PID %ld for incoming " + "PID %d", + pl->pid, pid); pl->pid = pid; match = pl; break; @@ -522,6 +531,9 @@ static processlist_t *process_check(int pid) { // this is a match, but another instance of this process has already // claimed this pid/name combo, // so keep looking + DEBUG("procevent plugin: found pl object with matching name for " + "incoming PID %d, but object is in use by PID %ld", + pid, pl->pid); match = pl; continue; } @@ -703,7 +715,7 @@ static int nl_connect() { nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR); if (nl_sock == -1) { - ERROR("procevent plugin: socket open failed."); + ERROR("procevent plugin: socket open failed: %d", errno); return -1; } @@ -713,7 +725,7 @@ static int nl_connect() { rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); if (rc == -1) { - ERROR("procevent plugin: socket bind failed."); + ERROR("procevent plugin: socket bind failed: %d", errno); close(nl_sock); return -1; } @@ -744,7 +756,8 @@ static int set_proc_ev_listen(bool enable) { rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0); if (rc == -1) { - ERROR("procevent plugin: subscribing to netlink process events failed."); + ERROR("procevent plugin: subscribing to netlink process events failed: %d", + errno); return -1; } @@ -768,72 +781,82 @@ static int read_event() { if (nl_sock == -1) return ret; - status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0); + while (42) { - if (status == 0) { - return 0; - } else if (status == -1) { - if (errno != EINTR) { - ERROR("procevent plugin: socket receive error: %d", errno); - return -1; + pthread_mutex_lock(&procevent_lock); + + if (procevent_thread_loop <= 0) + return ret; + + pthread_mutex_unlock(&procevent_lock); + + status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0); + + if (status == 0) { + return 0; + } else if (status == -1) { + if (errno != EINTR) { + ERROR("procevent plugin: socket receive error: %d", errno); + return -1; + } } - } - switch (nlcn_msg.proc_ev.what) { - case PROC_EVENT_NONE: - case PROC_EVENT_FORK: - case PROC_EVENT_UID: - case PROC_EVENT_GID: - // Not of interest in current version - break; - case PROC_EVENT_EXEC: - proc_status = PROCEVENT_STARTED; - proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid; - break; - case PROC_EVENT_EXIT: - proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid; - proc_status = PROCEVENT_EXITED; - proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code; - break; - default: - break; - } + switch (nlcn_msg.proc_ev.what) { + case PROC_EVENT_NONE: + case PROC_EVENT_FORK: + case PROC_EVENT_UID: + case PROC_EVENT_GID: + // Not of interest in current version + break; + case PROC_EVENT_EXEC: + proc_status = PROCEVENT_STARTED; + proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid; + break; + case PROC_EVENT_EXIT: + proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid; + proc_status = PROCEVENT_EXITED; + proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code; + break; + default: + break; + } - // If we're interested in this process status event, place the event - // in the ring buffer for consumption by the main polling thread. + // If we're interested in this process status event, place the event + // in the ring buffer for consumption by the main polling thread. - if (proc_status != -1) { - pthread_mutex_lock(&procevent_lock); + if (proc_status != -1) { + pthread_mutex_lock(&procevent_lock); - int next = ring.head + 1; - if (next >= ring.maxLen) - next = 0; + int next = ring.head + 1; + if (next >= ring.maxLen) + next = 0; - if (next == ring.tail) { - WARNING("procevent plugin: ring buffer full"); - } else { - DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id, - (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"), - (long long unsigned int)CDTIME_T_TO_US(cdtime())); - - if (proc_status == PROCEVENT_EXITED) { - ring.buffer[ring.head][0] = proc_id; - ring.buffer[ring.head][1] = proc_status; - ring.buffer[ring.head][2] = proc_extra; - ring.buffer[ring.head][3] = - (long long unsigned int)CDTIME_T_TO_US(cdtime()); + if (next == ring.tail) { + WARNING("procevent plugin: ring buffer full"); } else { - ring.buffer[ring.head][0] = proc_id; - ring.buffer[ring.head][1] = proc_status; - ring.buffer[ring.head][2] = 0; - ring.buffer[ring.head][3] = - (long long unsigned int)CDTIME_T_TO_US(cdtime()); + DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id, + (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"), + (long long unsigned int)CDTIME_T_TO_US(cdtime())); + + if (proc_status == PROCEVENT_EXITED) { + ring.buffer[ring.head][0] = proc_id; + ring.buffer[ring.head][1] = proc_status; + ring.buffer[ring.head][2] = proc_extra; + ring.buffer[ring.head][3] = + (long long unsigned int)CDTIME_T_TO_US(cdtime()); + } else { + ring.buffer[ring.head][0] = proc_id; + ring.buffer[ring.head][1] = proc_status; + ring.buffer[ring.head][2] = 0; + ring.buffer[ring.head][3] = + (long long unsigned int)CDTIME_T_TO_US(cdtime()); + } + + ring.head = next; } - ring.head = next; + pthread_mutex_unlock(&procevent_lock); } - - pthread_mutex_unlock(&procevent_lock); } return ret; @@ -1104,19 +1127,27 @@ static int procevent_read(void) /* {{{ */ "list", pl->pid, pl->process); pl->pid = -1; + pl->last_status = -1; } } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) { // a new process has started, so check if we should monitor it processlist_t *pl = process_check(ring.buffer[ring.tail][0]); - if (pl != NULL) { + // If we had already seen this process name and pid combo before, + // and the last message was a "process started" message, don't send + // the notfication again + + if (pl != NULL && pl->last_status != PROCEVENT_STARTED) { // This process is of interest to us, so publish its STARTED status procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge", ring.buffer[ring.tail][1], pl->process, ring.buffer[ring.tail][3]); - DEBUG( - "procevent plugin: PID %d (%s) STARTED, adding PID to process list", - pl->pid, pl->process); + + pl->last_status = PROCEVENT_STARTED; + + DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process " + "list", + pl->pid, pl->process); } }