#include "collectd.h"
-#include "common.h"
#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
#include "utils_complain.h"
-#include "utils_ignorelist.h"
#include <errno.h>
#include <pthread.h>
#define PROCEVENT_EXITED 0
#define PROCEVENT_STARTED 1
-#define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp
+#define PROCEVENT_FIELDS 3 // pid, status, timestamp
#define BUFSIZE 512
#define PROCDIR "/proc"
+#define RBUF_PROC_ID_INDEX 0
+#define RBUF_PROC_STATUS_INDEX 1
+#define RBUF_TIME_INDEX 2
#define PROCEVENT_DOMAIN_FIELD "domain"
#define PROCEVENT_DOMAIN_VALUE "fault"
int head;
int tail;
int maxLen;
- long long unsigned int **buffer;
+ cdtime_t **buffer;
} circbuf_t;
struct processlist_s {
static int procevent_netlink_thread_error = 0;
static pthread_t procevent_netlink_thread_id;
static int procevent_dequeue_thread_loop = 0;
-static int procevent_dequeue_thread_error = 0;
static pthread_t procevent_dequeue_thread_id;
-static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
static int nl_sock = -1;
static int buffer_length;
static circbuf_t ring;
static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
/*
- * Prototypes
- */
-
-static void procevent_dispatch_notification(long pid, const char *type,
- gauge_t value, char *process,
- long long unsigned int timestamp);
-
-/*
* Private functions
*/
static int gen_message_payload(int state, long pid, char *process,
- long long unsigned int timestamp, char **buf) {
+ cdtime_t timestamp, char **buf) {
const unsigned char *buf2;
yajl_gen g;
char json_str[DATA_MAX_NAME_LEN];
#if !defined(HAVE_YAJL_V2)
- yajl_gen_config conf = {};
-
- conf.beautify = 0;
+ yajl_gen_config conf = {0};
#endif
#if HAVE_YAJL_V2
goto err;
event_id = event_id + 1;
- int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, event_id_len, "%d", event_id);
+ if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
+ goto err;
+ }
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
goto err;
- int event_name_len = 0;
- event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid
- event_name_len = event_name_len + strlen(process); // process name
- event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
- event_name_len = event_name_len +
- 13; // "process", 3 spaces, 2 parentheses and null-terminator
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, event_name_len, "process %s (%ld) %s", process, pid,
- (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
- : PROCEVENT_EVENT_NAME_UP_VALUE));
+ if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
+ (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
+ : PROCEVENT_EVENT_NAME_UP_VALUE)) < 0) {
+ goto err;
+ }
if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
yajl_gen_status_ok) {
yajl_gen_status_ok)
goto err;
- int last_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, last_epoch_microsec_len, "%llu",
- (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+ if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+ CDTIME_T_TO_US(cdtime())) < 0) {
+ goto err;
+ }
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
yajl_gen_status_ok)
goto err;
- int start_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, start_epoch_microsec_len, "%llu",
- (long long unsigned int)timestamp);
+ if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+ CDTIME_T_TO_US(timestamp)) < 0) {
+ goto err;
+ }
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
yajl_gen_status_ok)
goto err;
- int alarm_condition_len = 0;
- alarm_condition_len =
- alarm_condition_len + (sizeof(char) * sizeof(int) * 4); // pid
- alarm_condition_len = alarm_condition_len + strlen(process); // process name
- alarm_condition_len =
- alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2
- // parentheses and null-terminator
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, alarm_condition_len, "process %s (%ld) state change",
- process, pid);
+ if (snprintf(json_str, sizeof(json_str), "process %s (%ld) state change",
+ process, pid) < 0) {
+ goto err;
+ }
if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
yajl_gen_status_ok) {
goto err;
if (yajl_gen_string(
- g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
- : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
+ g,
+ (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
+ : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
: PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
yajl_gen_status_ok)
yajl_gen_status_ok)
goto err;
- int specific_problem_len = 0;
- specific_problem_len =
- specific_problem_len + (sizeof(char) * sizeof(int) * 4); // pid
- specific_problem_len = specific_problem_len + strlen(process); // process name
- specific_problem_len =
- specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up"
- specific_problem_len =
- specific_problem_len +
- 13; // "process", 3 spaces, 2 parentheses and null-terminator
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, specific_problem_len, "process %s (%ld) %s", process, pid,
- (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
- : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE));
+ if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
+ (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
+ : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE)) < 0) {
+ goto err;
+ }
if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
yajl_gen_status_ok) {
goto err;
if (yajl_gen_string(
- g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
- : PROCEVENT_VF_STATUS_NORMAL_VALUE),
+ g,
+ (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
+ : PROCEVENT_VF_STATUS_NORMAL_VALUE),
strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
: PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
yajl_gen_status_ok)
goto err;
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
- goto err;
-
// *** END fault fields ***
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+ // close fault and header fields
+ if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+ yajl_gen_map_close(g) != yajl_gen_status_ok)
goto err;
if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
goto err;
- *buf = malloc(strlen((char *)buf2) + 1);
+ *buf = strdup((char *)buf2);
if (*buf == NULL) {
- char errbuf[1024];
- ERROR("procevent plugin: malloc failed during gen_message_payload: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("procevent plugin: strdup failed during gen_message_payload: %s",
+ STRERRNO);
goto err;
}
- sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
-
yajl_gen_free(g);
return 0;
}
// Does /proc/<pid>/comm contain a process name we are interested in?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
static processlist_t *process_check(long pid) {
- int len, is_match, retval;
char file[BUFSIZE];
- FILE *fh;
- char buffer[BUFSIZE];
- len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
+ int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
if ((len < 0) || (len >= BUFSIZE)) {
WARNING("procevent process_check: process name too large");
return NULL;
}
+ FILE *fh;
+
if (NULL == (fh = fopen(file, "r"))) {
// No /proc/<pid>/comm for this pid, just ignore
DEBUG("procevent plugin: no comm file available for pid %ld", pid);
return NULL;
}
- retval = fscanf(fh, "%[^\n]", buffer);
+ char buffer[BUFSIZE];
+ int retval = fscanf(fh, "%[^\n]", buffer);
if (retval < 0) {
WARNING("procevent process_check: unable to read comm file for pid %ld",
// associate <pid> with it (with the same process name as the existing).
//
- pthread_mutex_lock(&procevent_list_lock);
-
- processlist_t *pl;
processlist_t *match = NULL;
- for (pl = processlist_head; pl != NULL; pl = pl->next) {
+ for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
- is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
+ int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
if (is_match == 1) {
DEBUG("procevent plugin: process %ld name match for %s", pid, buffer);
"(%s)",
pid, buffer);
- processlist_t *pl2;
- char *process;
-
- pl2 = malloc(sizeof(*pl2));
+ processlist_t *pl2 = calloc(1, sizeof(*pl2));
if (pl2 == NULL) {
- char errbuf[1024];
- ERROR("procevent plugin: malloc failed during process_check: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- pthread_mutex_unlock(&procevent_list_lock);
+ ERROR("procevent plugin: calloc failed during process_check: %s",
+ STRERRNO);
return NULL;
}
- process = strdup(buffer);
+ char *process = strdup(buffer);
if (process == NULL) {
- char errbuf[1024];
sfree(pl2);
ERROR("procevent plugin: strdup failed during process_check: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- pthread_mutex_unlock(&procevent_list_lock);
+ STRERRNO);
return NULL;
}
match = pl2;
}
- pthread_mutex_unlock(&procevent_list_lock);
-
return match;
}
// Does our map have this PID or name?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
static processlist_t *process_map_check(long pid, char *process) {
- processlist_t *pl;
-
- pthread_mutex_lock(&procevent_list_lock);
-
- for (pl = processlist_head; pl != NULL; pl = pl->next) {
+ for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
int match_pid = 0;
- int match_process = 0;
- int match = 0;
if (pid > 0) {
if (pl->pid == pid)
match_pid = 1;
}
+ int match_process = 0;
+
if (process != NULL) {
if (strcmp(pl->process, process) == 0)
match_process = 1;
}
- if (pid > 0 && process == NULL && match_pid == 1)
- match = 1;
- else if (pid < 0 && process != NULL && match_process == 1)
- match = 1;
- else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
+ int match = 0;
+
+ if ((pid > 0 && process == NULL && match_pid == 1) ||
+ (pid < 0 && process != NULL && match_process == 1) ||
+ (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)) {
match = 1;
+ }
if (match == 1) {
- pthread_mutex_unlock(&procevent_list_lock);
return pl;
}
}
- pthread_mutex_unlock(&procevent_list_lock);
-
return NULL;
}
static int process_map_refresh(void) {
- DIR *proc;
-
errno = 0;
- proc = opendir(PROCDIR);
+ DIR *proc = opendir(PROCDIR);
+
if (proc == NULL) {
- char errbuf[1024];
- ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO);
return -1;
}
while (42) {
- struct dirent *dent;
- int len;
- char file[BUFSIZE];
-
- struct stat statbuf;
-
- int status;
-
errno = 0;
- dent = readdir(proc);
+ struct dirent *dent = readdir(proc);
if (dent == NULL) {
- char errbuf[4096];
-
if (errno == 0) /* end of directory */
break;
ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ STRERRNO);
closedir(proc);
return -1;
}
if (dent->d_name[0] == '.')
continue;
- len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
+ char file[BUFSIZE];
+
+ int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
if ((len < 0) || (len >= BUFSIZE))
continue;
- status = stat(file, &statbuf);
+ struct stat statbuf;
+
+ int status = stat(file, &statbuf);
if (status != 0) {
- char errbuf[4096];
- WARNING("procevent plugin: stat (%s) failed: %s", file,
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO);
continue;
}
// Check if we need to store this pid/name combo in our processlist_t linked
// list
int this_pid = atoi(dent->d_name);
+ pthread_mutex_lock(&procevent_data_lock);
processlist_t *pl = process_check(this_pid);
+ pthread_mutex_unlock(&procevent_data_lock);
if (pl != NULL)
DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
}
static int nl_connect() {
- int rc;
- struct sockaddr_nl sa_nl;
+ struct sockaddr_nl sa_nl = {
+ .nl_family = AF_NETLINK,
+ .nl_groups = CN_IDX_PROC,
+ .nl_pid = getpid(),
+ };
nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
if (nl_sock == -1) {
return -1;
}
- sa_nl.nl_family = AF_NETLINK;
- sa_nl.nl_groups = CN_IDX_PROC;
- sa_nl.nl_pid = getpid();
-
- rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+ int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
if (rc == -1) {
ERROR("procevent plugin: socket bind failed: %d", errno);
close(nl_sock);
+ nl_sock = -1;
return -1;
}
}
static int set_proc_ev_listen(bool enable) {
- int rc;
struct __attribute__((aligned(NLMSG_ALIGNTO))) {
struct nlmsghdr nl_hdr;
struct __attribute__((__packed__)) {
nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
- rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+ int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
if (rc == -1) {
ERROR("procevent plugin: subscribing to netlink process events failed: %d",
errno);
// Read from netlink socket and write to ring buffer
static int read_event() {
- int status;
- int ret = 0;
- int proc_id = -1;
- int proc_status = -1;
- int proc_extra = -1;
int recv_flags = MSG_DONTWAIT;
struct __attribute__((aligned(NLMSG_ALIGNTO))) {
struct nlmsghdr nl_hdr;
} nlcn_msg;
if (nl_sock == -1)
- return ret;
+ return 0;
while (42) {
-
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_loop <= 0) {
- pthread_mutex_unlock(&procevent_lock);
- return ret;
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return 0;
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
+ int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
if (status == 0) {
return 0;
} else if (status < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_data_lock);
// There was nothing more to receive for now, so...
- // If ring head does not equal ring tail, there is data
+ // If ring head does not equal ring tail, then there is data
// in the ring buffer for the dequeue thread to read, so
// signal it
if (ring.head != ring.tail)
pthread_cond_signal(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_data_lock);
// Since there was nothing to receive, set recv to block and
// try again
} else if (errno != EINTR) {
ERROR("procevent plugin: socket receive error: %d", errno);
return -1;
+ } else {
+ // Interrupt, so just continue and try again
+ continue;
}
}
// We successfully received a message, so don't block on the next
// read in case there are more (and if there aren't, it will be
- // handled above in the error-checking)
+ // handled above in the EWOULDBLOCK error-checking)
recv_flags = MSG_DONTWAIT;
+ int proc_id = -1;
+ int proc_status = -1;
+
switch (nlcn_msg.proc_ev.what) {
- case PROC_EVENT_NONE:
- case PROC_EVENT_FORK:
- case PROC_EVENT_UID:
- case PROC_EVENT_GID:
- // Not of interest in current version
- break;
case PROC_EVENT_EXEC:
proc_status = PROCEVENT_STARTED;
proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
case PROC_EVENT_EXIT:
proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
proc_status = PROCEVENT_EXITED;
- proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
break;
default:
+ // Otherwise not of interest
break;
}
// If we're interested in this process status event, place the event
- // in the ring buffer for consumption by the main polling thread.
+ // in the ring buffer for consumption by the dequeue (dispatch) thread.
if (proc_status != -1) {
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_data_lock);
int next = ring.head + 1;
if (next >= ring.maxLen)
WARNING("procevent plugin: ring buffer full");
pthread_cond_signal(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_data_lock);
usleep(1000);
continue;
} else {
DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
(proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
- (long long unsigned int)CDTIME_T_TO_US(cdtime()));
-
- if (proc_status == PROCEVENT_EXITED) {
- ring.buffer[ring.head][0] = proc_id;
- ring.buffer[ring.head][1] = proc_status;
- ring.buffer[ring.head][2] = proc_extra;
- ring.buffer[ring.head][3] =
- (long long unsigned int)CDTIME_T_TO_US(cdtime());
- } else {
- ring.buffer[ring.head][0] = proc_id;
- ring.buffer[ring.head][1] = proc_status;
- ring.buffer[ring.head][2] = 0;
- ring.buffer[ring.head][3] =
- (long long unsigned int)CDTIME_T_TO_US(cdtime());
- }
+ (unsigned long long)cdtime());
+
+ ring.buffer[ring.head][RBUF_PROC_ID_INDEX] = proc_id;
+ ring.buffer[ring.head][RBUF_PROC_STATUS_INDEX] = proc_status;
+ ring.buffer[ring.head][RBUF_TIME_INDEX] = cdtime();
ring.head = next;
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_data_lock);
}
}
- return ret;
+ return 0;
+}
+
+static void procevent_dispatch_notification(long pid, gauge_t value,
+ char *process, cdtime_t timestamp) {
+
+ notification_t n = {
+ .severity = (value == 1 ? NOTIF_OKAY : NOTIF_FAILURE),
+ .time = cdtime(),
+ .plugin = "procevent",
+ .type = "gauge",
+ .type_instance = "process_status",
+ };
+
+ sstrncpy(n.host, hostname_g, sizeof(n.host));
+ sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
+
+ char *buf = NULL;
+ gen_message_payload(value, pid, process, timestamp, &buf);
+
+ int status = plugin_notification_meta_add_string(&n, "ves", buf);
+
+ if (status < 0) {
+ sfree(buf);
+ ERROR("procevent plugin: unable to set notification VES metadata: %s",
+ STRERRNO);
+ return;
+ }
+
+ DEBUG("procevent plugin: notification VES metadata: %s",
+ n.meta->nm_value.nm_string);
+
+ DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
+ pid, process);
+
+ plugin_dispatch_notification(&n);
+ plugin_notification_meta_free(n.meta);
+
+ // strdup'd in gen_message_payload
+ if (buf != NULL)
+ sfree(buf);
}
// Read from ring buffer and dispatch to write plugins
-static int read_ring_buffer() {
- pthread_mutex_lock(&procevent_lock);
+static void read_ring_buffer() {
+ pthread_mutex_lock(&procevent_data_lock);
// If there's currently nothing to read from the buffer,
// then wait
if (ring.head == ring.tail)
- pthread_cond_wait(&procevent_cond, &procevent_lock);
+ pthread_cond_wait(&procevent_cond, &procevent_data_lock);
while (ring.head != ring.tail) {
int next = ring.tail + 1;
if (next >= ring.maxLen)
next = 0;
- if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
+ if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] == PROCEVENT_EXITED) {
processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
if (pl != NULL) {
// This process is of interest to us, so publish its EXITED status
- procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
- ring.buffer[ring.tail][1], pl->process,
- ring.buffer[ring.tail][3]);
+ procevent_dispatch_notification(
+ ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
+ ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
+ ring.buffer[ring.tail][RBUF_TIME_INDEX]);
DEBUG(
"procevent plugin: PID %ld (%s) EXITED, removing PID from process "
"list",
pl->pid = -1;
pl->last_status = -1;
}
- } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
+ } else if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] ==
+ PROCEVENT_STARTED) {
// a new process has started, so check if we should monitor it
processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
// This process is of interest to us, so publish its STARTED status
- procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
- ring.buffer[ring.tail][1], pl->process,
- ring.buffer[ring.tail][3]);
+ procevent_dispatch_notification(
+ ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
+ ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
+ ring.buffer[ring.tail][RBUF_TIME_INDEX]);
pl->last_status = PROCEVENT_STARTED;
ring.tail = next;
}
- pthread_mutex_unlock(&procevent_lock);
-
- return 0;
+ pthread_mutex_unlock(&procevent_data_lock);
}
// Entry point for thread responsible for listening
// to netlink socket and writing data to ring buffer
static void *procevent_netlink_thread(void *arg) /* {{{ */
{
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
while (procevent_netlink_thread_loop > 0) {
- int status;
-
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- usleep(1000);
+ int status = read_event();
- status = read_event();
-
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (status < 0) {
procevent_netlink_thread_error = 1;
break;
}
-
- if (procevent_netlink_thread_loop <= 0)
- break;
} /* while (procevent_netlink_thread_loop > 0) */
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- return ((void *)0);
+ return (void *)0;
} /* }}} void *procevent_netlink_thread */
// Entry point for thread responsible for reading from
// ring buffer and dispatching notifications
static void *procevent_dequeue_thread(void *arg) /* {{{ */
{
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
while (procevent_dequeue_thread_loop > 0) {
- int status;
-
- pthread_mutex_unlock(&procevent_lock);
-
- status = read_ring_buffer();
-
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- if (status < 0) {
- procevent_dequeue_thread_error = 1;
- break;
- }
+ read_ring_buffer();
- if (procevent_dequeue_thread_loop <= 0)
- break;
+ pthread_mutex_lock(&procevent_thread_lock);
} /* while (procevent_dequeue_thread_loop > 0) */
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
- return ((void *)0);
+ return (void *)0;
} /* }}} void *procevent_dequeue_thread */
static int start_netlink_thread(void) /* {{{ */
{
- int status;
-
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_loop != 0) {
- pthread_mutex_unlock(&procevent_lock);
- return (0);
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return 0;
}
+ int status;
+
if (nl_sock == -1) {
status = nl_connect();
- if (status != 0)
+ if (status != 0) {
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
+ }
status = set_proc_ev_listen(true);
- if (status == -1)
+ if (status == -1) {
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
+ }
}
DEBUG("procevent plugin: socket created and bound");
if (status != 0) {
procevent_netlink_thread_loop = 0;
ERROR("procevent plugin: Starting netlink thread failed.");
- pthread_mutex_unlock(&procevent_lock);
- return (-1);
+ pthread_mutex_unlock(&procevent_thread_lock);
+
+ int status2 = close(nl_sock);
+
+ if (status2 != 0) {
+ ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
+ status2, STRERRNO);
+ }
+
+ nl_sock = -1;
+
+ return -1;
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
} /* }}} int start_netlink_thread */
static int start_dequeue_thread(void) /* {{{ */
{
- int status;
-
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_dequeue_thread_loop != 0) {
- pthread_mutex_unlock(&procevent_lock);
- return (0);
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return 0;
}
procevent_dequeue_thread_loop = 1;
- procevent_dequeue_thread_error = 0;
- status = plugin_thread_create(&procevent_dequeue_thread_id, /* attr = */ NULL,
- procevent_dequeue_thread,
- /* arg = */ (void *)0, "procevent");
+ int status = plugin_thread_create(&procevent_dequeue_thread_id,
+ /* attr = */ NULL, procevent_dequeue_thread,
+ /* arg = */ (void *)0, "procevent");
if (status != 0) {
procevent_dequeue_thread_loop = 0;
ERROR("procevent plugin: Starting dequeue thread failed.");
- pthread_mutex_unlock(&procevent_lock);
- return (-1);
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return -1;
}
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
return status;
} /* }}} int start_dequeue_thread */
static int start_threads(void) /* {{{ */
{
- int status, status2;
-
- status = start_netlink_thread();
- status2 = start_dequeue_thread();
+ int status = start_netlink_thread();
+ int status2 = start_dequeue_thread();
- if (status < 0)
+ if (status != 0)
return status;
else
return status2;
static int stop_netlink_thread(int shutdown) /* {{{ */
{
- int status;
+ int socket_status;
if (nl_sock != -1) {
- status = close(nl_sock);
- if (status != 0) {
+ socket_status = close(nl_sock);
+ if (socket_status != 0) {
ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
- status, strerror(errno));
- return (-1);
- } else
- nl_sock = -1;
- }
+ socket_status, strerror(errno));
+ }
+
+ nl_sock = -1;
+ } else
+ socket_status = 0;
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_loop == 0) {
- pthread_mutex_unlock(&procevent_lock);
- return (-1);
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return -1;
}
+ // Set thread termination status
procevent_netlink_thread_loop = 0;
+ pthread_mutex_unlock(&procevent_thread_lock);
+
+ // Let threads waiting on access to the data know to move
+ // on such that they'll see the thread's termination status
pthread_cond_broadcast(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
+
+ int thread_status;
if (shutdown == 1) {
// Calling pthread_cancel here in
DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
- status = pthread_cancel(procevent_netlink_thread_id);
+ thread_status = pthread_cancel(procevent_netlink_thread_id);
- if (status != 0 && status != ESRCH) {
- ERROR("procevent plugin: Unable to cancel netlink thread: %d", status);
- status = -1;
+ if (thread_status != 0 && thread_status != ESRCH) {
+ ERROR("procevent plugin: Unable to cancel netlink thread: %d",
+ thread_status);
+ thread_status = -1;
} else
- status = 0;
+ thread_status = 0;
} else {
- status = pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
- if (status != 0 && status != ESRCH) {
+ thread_status =
+ pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
+ if (thread_status != 0 && thread_status != ESRCH) {
ERROR("procevent plugin: Stopping netlink thread failed.");
- status = -1;
+ thread_status = -1;
} else
- status = 0;
+ thread_status = 0;
}
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
procevent_netlink_thread_error = 0;
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
DEBUG("procevent plugin: Finished requesting stop of netlink thread");
- return (status);
+ if (socket_status != 0)
+ return socket_status;
+ else
+ return thread_status;
} /* }}} int stop_netlink_thread */
-static int stop_dequeue_thread(int shutdown) /* {{{ */
+static int stop_dequeue_thread() /* {{{ */
{
- int status;
-
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_dequeue_thread_loop == 0) {
- pthread_mutex_unlock(&procevent_lock);
- return (-1);
+ pthread_mutex_unlock(&procevent_thread_lock);
+ return -1;
}
procevent_dequeue_thread_loop = 0;
+ pthread_mutex_unlock(&procevent_thread_lock);
+
pthread_cond_broadcast(&procevent_cond);
- pthread_mutex_unlock(&procevent_lock);
- if (shutdown == 1) {
- // Calling pthread_cancel here in
- // the case of a shutdown just assures that the thread is
- // gone and that the process has been fully terminated.
+ // Calling pthread_cancel here just assures that the thread is
+ // gone and that the process has been fully terminated.
- DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
+ DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
- status = pthread_cancel(procevent_dequeue_thread_id);
+ int status = pthread_cancel(procevent_dequeue_thread_id);
- if (status != 0 && status != ESRCH) {
- ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
- status = -1;
- } else
- status = 0;
- } else {
- status = pthread_join(procevent_dequeue_thread_id, /* return = */ NULL);
- if (status != 0 && status != ESRCH) {
- ERROR("procevent plugin: Stopping dequeue thread failed.");
- status = -1;
- } else
- status = 0;
- }
+ if (status != 0 && status != ESRCH) {
+ ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
+ status = -1;
+ } else
+ status = 0;
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
- procevent_dequeue_thread_error = 0;
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
- return (status);
+ return status;
} /* }}} int stop_dequeue_thread */
-static int stop_threads(int shutdown) /* {{{ */
+static int stop_threads() /* {{{ */
{
- int status, status2;
-
- status = stop_netlink_thread(shutdown);
- status2 = stop_dequeue_thread(shutdown);
+ int status = stop_netlink_thread(1);
+ int status2 = stop_dequeue_thread();
- if (status < 0)
+ if (status != 0)
return status;
else
return status2;
static int procevent_init(void) /* {{{ */
{
- int status;
-
ring.head = 0;
ring.tail = 0;
ring.maxLen = buffer_length;
- ring.buffer = (long long unsigned int **)malloc(
- buffer_length * sizeof(long long unsigned int *));
+ ring.buffer = (cdtime_t **)calloc(buffer_length, sizeof(cdtime_t *));
for (int i = 0; i < buffer_length; i++) {
- ring.buffer[i] = (long long unsigned int *)malloc(
- PROCEVENT_FIELDS * sizeof(long long unsigned int));
+ ring.buffer[i] = (cdtime_t *)calloc(PROCEVENT_FIELDS, sizeof(cdtime_t));
}
- status = process_map_refresh();
+ int status = process_map_refresh();
if (status == -1) {
ERROR("procevent plugin: Initial process mapping failed.");
- return (-1);
+ return -1;
}
if (ignorelist == NULL) {
NOTICE("procevent plugin: No processes have been configured.");
- return (-1);
+ return -1;
}
- return (start_threads());
+ return start_threads();
} /* }}} int procevent_init */
static int procevent_config(const char *key, const char *value) /* {{{ */
{
- int status;
-
if (ignorelist == NULL)
ignorelist = ignorelist_create(/* invert = */ 1);
+ if (ignorelist == NULL) {
+ return -1;
+ }
+
if (strcasecmp(key, "BufferLength") == 0) {
buffer_length = atoi(value);
} else if (strcasecmp(key, "Process") == 0) {
ignorelist_add(ignorelist, value);
} else if (strcasecmp(key, "ProcessRegex") == 0) {
#if HAVE_REGEX_H
- status = ignorelist_add(ignorelist, value);
+ int status = ignorelist_add(ignorelist, value);
if (status != 0) {
ERROR("procevent plugin: invalid regular expression: %s", value);
- return (1);
+ return 1;
}
#else
WARNING("procevent plugin: The plugin has been compiled without support "
"for the \"ProcessRegex\" option.");
#endif
} else {
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
} /* }}} int procevent_config */
-static void procevent_dispatch_notification(long pid,
- const char *type, /* {{{ */
- gauge_t value, char *process,
- long long unsigned int timestamp) {
- char *buf = NULL;
- notification_t n = {NOTIF_FAILURE, cdtime(), "", "", "procevent", "", "", "",
- NULL};
-
- if (value == 1)
- n.severity = NOTIF_OKAY;
-
- sstrncpy(n.host, hostname_g, sizeof(n.host));
- sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
- sstrncpy(n.type, "gauge", sizeof(n.type));
- sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance));
-
- gen_message_payload(value, pid, process, timestamp, &buf);
-
- notification_meta_t *m = calloc(1, sizeof(*m));
-
- if (m == NULL) {
- char errbuf[1024];
- sfree(buf);
- ERROR("procevent plugin: unable to allocate metadata: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- return;
- }
-
- sstrncpy(m->name, "ves", sizeof(m->name));
- m->nm_value.nm_string = sstrdup(buf);
- m->type = NM_TYPE_STRING;
- n.meta = m;
-
- DEBUG("procevent plugin: notification message: %s",
- n.meta->nm_value.nm_string);
-
- DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
- pid, process);
-
- plugin_dispatch_notification(&n);
- plugin_notification_meta_free(n.meta);
-
- // malloc'd in gen_message_payload
- if (buf != NULL)
- sfree(buf);
-}
-
static int procevent_read(void) /* {{{ */
{
- pthread_mutex_lock(&procevent_lock);
+ pthread_mutex_lock(&procevent_thread_lock);
if (procevent_netlink_thread_error != 0) {
- pthread_mutex_unlock(&procevent_lock);
+ pthread_mutex_unlock(&procevent_thread_lock);
ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
start_netlink_thread();
- return (-1);
+ return -1;
} /* if (procevent_netlink_thread_error != 0) */
- if (procevent_dequeue_thread_error != 0) {
-
- pthread_mutex_unlock(&procevent_lock);
-
- ERROR("procevent plugin: The dequeue thread had a problem. Restarting it.");
-
- stop_dequeue_thread(0);
-
- start_dequeue_thread();
+ pthread_mutex_unlock(&procevent_thread_lock);
- return (-1);
- } /* if (procevent_dequeue_thread_error != 0) */
-
- pthread_mutex_unlock(&procevent_lock);
-
- return (0);
+ return 0;
} /* }}} int procevent_read */
static int procevent_shutdown(void) /* {{{ */
{
- int status;
- processlist_t *pl;
-
DEBUG("procevent plugin: Shutting down threads.");
- status = stop_threads(1);
+ int status = stop_threads();
for (int i = 0; i < buffer_length; i++) {
free(ring.buffer[i]);
free(ring.buffer);
- pl = processlist_head;
+ processlist_t *pl = processlist_head;
while (pl != NULL) {
processlist_t *pl_next;
pl = pl_next;
}
+ ignorelist_free(ignorelist);
+
return status;
} /* }}} int procevent_shutdown */