Remove LVM plugin
[collectd.git] / src / procevent.c
1 /**
2  * collectd - src/procevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/common/common.h"
31 #include "utils/ignorelist/ignorelist.h"
32 #include "utils_complain.h"
33
34 #include <errno.h>
35 #include <pthread.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <dirent.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
46 #include <signal.h>
47 #include <stdbool.h>
48 #include <stdlib.h>
49 #include <string.h>
50
51 #include <yajl/yajl_common.h>
52 #include <yajl/yajl_gen.h>
53 #if HAVE_YAJL_YAJL_VERSION_H
54 #include <yajl/yajl_version.h>
55 #endif
56 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
57 #define HAVE_YAJL_V2 1
58 #endif
59
60 #define PROCEVENT_EXITED 0
61 #define PROCEVENT_STARTED 1
62 #define PROCEVENT_FIELDS 3 // pid, status, timestamp
63 #define BUFSIZE 512
64 #define PROCDIR "/proc"
65 #define RBUF_PROC_ID_INDEX 0
66 #define RBUF_PROC_STATUS_INDEX 1
67 #define RBUF_TIME_INDEX 2
68
69 #define PROCEVENT_DOMAIN_FIELD "domain"
70 #define PROCEVENT_DOMAIN_VALUE "fault"
71 #define PROCEVENT_EVENT_ID_FIELD "eventId"
72 #define PROCEVENT_EVENT_NAME_FIELD "eventName"
73 #define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
74 #define PROCEVENT_EVENT_NAME_UP_VALUE "up"
75 #define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
76 #define PROCEVENT_PRIORITY_FIELD "priority"
77 #define PROCEVENT_PRIORITY_VALUE "high"
78 #define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
79 #define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
80 #define PROCEVENT_SEQUENCE_FIELD "sequence"
81 #define PROCEVENT_SEQUENCE_VALUE "0"
82 #define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
83 #define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
84 #define PROCEVENT_VERSION_FIELD "version"
85 #define PROCEVENT_VERSION_VALUE "1.0"
86
87 #define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
88 #define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
89 #define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
90 #define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
91 #define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
92 #define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
93 #define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
94 #define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
95 #define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
96 #define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
97 #define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
98 #define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
99 #define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
100 #define PROCEVENT_VF_STATUS_FIELD "vfStatus"
101 #define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
102 #define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
103
104 /*
105  * Private data types
106  */
107
108 typedef struct {
109   int head;
110   int tail;
111   int maxLen;
112   cdtime_t **buffer;
113 } circbuf_t;
114
115 struct processlist_s {
116   char *process;
117
118   long pid;
119   int32_t last_status;
120
121   struct processlist_s *next;
122 };
123 typedef struct processlist_s processlist_t;
124
125 /*
126  * Private variables
127  */
128 static ignorelist_t *ignorelist = NULL;
129
130 static int procevent_netlink_thread_loop = 0;
131 static int procevent_netlink_thread_error = 0;
132 static pthread_t procevent_netlink_thread_id;
133 static int procevent_dequeue_thread_loop = 0;
134 static pthread_t procevent_dequeue_thread_id;
135 static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
136 static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
137 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
138 static int nl_sock = -1;
139 static int buffer_length;
140 static circbuf_t ring;
141 static processlist_t *processlist_head = NULL;
142 static int event_id = 0;
143
144 static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"};
145 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
146
147 /*
148  * Private functions
149  */
150
151 static int gen_message_payload(int state, long pid, char *process,
152                                cdtime_t timestamp, char **buf) {
153   const unsigned char *buf2;
154   yajl_gen g;
155   char json_str[DATA_MAX_NAME_LEN];
156
157 #if !defined(HAVE_YAJL_V2)
158   yajl_gen_config conf = {0};
159 #endif
160
161 #if HAVE_YAJL_V2
162   size_t len;
163   g = yajl_gen_alloc(NULL);
164   yajl_gen_config(g, yajl_gen_beautify, 0);
165 #else
166   unsigned int len;
167   g = yajl_gen_alloc(&conf, NULL);
168 #endif
169
170   yajl_gen_clear(g);
171
172   // *** BEGIN common event header ***
173
174   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
175     goto err;
176
177   // domain
178   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
179                       strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
180     goto err;
181
182   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
183                       strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
184     goto err;
185
186   // eventId
187   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
188                       strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
189     goto err;
190
191   event_id = event_id + 1;
192   if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
193     goto err;
194   }
195
196   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
197     goto err;
198   }
199
200   // eventName
201   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
202                       strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
203     goto err;
204
205   if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
206                (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
207                            : PROCEVENT_EVENT_NAME_UP_VALUE)) < 0) {
208     goto err;
209   }
210
211   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
212       yajl_gen_status_ok) {
213     goto err;
214   }
215
216   // lastEpochMicrosec
217   if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
218                       strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
219       yajl_gen_status_ok)
220     goto err;
221
222   if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
223                CDTIME_T_TO_US(cdtime())) < 0) {
224     goto err;
225   }
226
227   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
228     goto err;
229   }
230
231   // priority
232   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
233                       strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
234     goto err;
235
236   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
237                       strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
238     goto err;
239
240   // reportingEntityName
241   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
242                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
243       yajl_gen_status_ok)
244     goto err;
245
246   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
247                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
248       yajl_gen_status_ok)
249     goto err;
250
251   // sequence
252   if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
253                       strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
254     goto err;
255
256   if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
257                       strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
258     goto err;
259
260   // sourceName
261   if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
262                       strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
263       yajl_gen_status_ok)
264     goto err;
265
266   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
267       yajl_gen_status_ok)
268     goto err;
269
270   // startEpochMicrosec
271   if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
272                       strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
273       yajl_gen_status_ok)
274     goto err;
275
276   if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
277                CDTIME_T_TO_US(timestamp)) < 0) {
278     goto err;
279   }
280
281   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
282     goto err;
283   }
284
285   // version
286   if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
287                       strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
288     goto err;
289
290   if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
291                       strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
292     goto err;
293
294   // *** END common event header ***
295
296   // *** BEGIN fault fields ***
297
298   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
299                       strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
300       yajl_gen_status_ok)
301     goto err;
302
303   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
304     goto err;
305
306   // alarmCondition
307   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
308                       strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
309       yajl_gen_status_ok)
310     goto err;
311
312   if (snprintf(json_str, sizeof(json_str), "process %s (%ld) state change",
313                process, pid) < 0) {
314     goto err;
315   }
316
317   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
318       yajl_gen_status_ok) {
319     goto err;
320   }
321
322   // alarmInterfaceA
323   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
324                       strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
325       yajl_gen_status_ok)
326     goto err;
327
328   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
329       yajl_gen_status_ok)
330     goto err;
331
332   // eventSeverity
333   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
334                       strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
335       yajl_gen_status_ok)
336     goto err;
337
338   if (yajl_gen_string(
339           g,
340           (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
341                                 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
342           strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
343                              : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
344       yajl_gen_status_ok)
345     goto err;
346
347   // eventSourceType
348   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
349                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
350       yajl_gen_status_ok)
351     goto err;
352
353   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
354                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
355       yajl_gen_status_ok)
356     goto err;
357
358   // faultFieldsVersion
359   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
360                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
361       yajl_gen_status_ok)
362     goto err;
363
364   if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
365                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
366       yajl_gen_status_ok)
367     goto err;
368
369   // specificProblem
370   if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
371                       strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
372       yajl_gen_status_ok)
373     goto err;
374
375   if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
376                (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
377                            : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE)) < 0) {
378     goto err;
379   }
380
381   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
382       yajl_gen_status_ok) {
383     goto err;
384   }
385
386   // vfStatus
387   if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
388                       strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
389     goto err;
390
391   if (yajl_gen_string(
392           g,
393           (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
394                                 : PROCEVENT_VF_STATUS_NORMAL_VALUE),
395           strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
396                              : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
397       yajl_gen_status_ok)
398     goto err;
399
400   // *** END fault fields ***
401
402   // close fault and header fields
403   if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
404       yajl_gen_map_close(g) != yajl_gen_status_ok)
405     goto err;
406
407   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
408     goto err;
409
410   *buf = strdup((char *)buf2);
411
412   if (*buf == NULL) {
413     ERROR("procevent plugin: strdup failed during gen_message_payload: %s",
414           STRERRNO);
415     goto err;
416   }
417
418   yajl_gen_free(g);
419
420   return 0;
421
422 err:
423   yajl_gen_free(g);
424   ERROR("procevent plugin: gen_message_payload failed to generate JSON");
425   return -1;
426 }
427
428 // Does /proc/<pid>/comm contain a process name we are interested in?
429 // NOTE: Caller MUST hold procevent_data_lock when calling this function
430 static processlist_t *process_check(long pid) {
431   char file[BUFSIZE];
432
433   int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
434
435   if ((len < 0) || (len >= BUFSIZE)) {
436     WARNING("procevent process_check: process name too large");
437     return NULL;
438   }
439
440   FILE *fh;
441
442   if (NULL == (fh = fopen(file, "r"))) {
443     // No /proc/<pid>/comm for this pid, just ignore
444     DEBUG("procevent plugin: no comm file available for pid %ld", pid);
445     return NULL;
446   }
447
448   char buffer[BUFSIZE];
449   int retval = fscanf(fh, "%[^\n]", buffer);
450
451   if (retval < 0) {
452     WARNING("procevent process_check: unable to read comm file for pid %ld",
453             pid);
454     fclose(fh);
455     return NULL;
456   }
457
458   // Now that we have the process name in the buffer, check if we are
459   // even interested in it
460   if (ignorelist_match(ignorelist, buffer) != 0) {
461     DEBUG("procevent process_check: ignoring process %s (%ld)", buffer, pid);
462     fclose(fh);
463     return NULL;
464   }
465
466   if (fh != NULL) {
467     fclose(fh);
468     fh = NULL;
469   }
470
471   //
472   // Go through the processlist linked list and look for the process name
473   // in /proc/<pid>/comm.  If found:
474   // 1. If pl->pid is -1, then set pl->pid to <pid> (and return that object)
475   // 2. If pl->pid is not -1, then another <process name> process was already
476   //    found.  If <pid> == pl->pid, this is an old match, so do nothing.
477   //    If the <pid> is different, however, make a new processlist_t and
478   //    associate <pid> with it (with the same process name as the existing).
479   //
480
481   processlist_t *match = NULL;
482
483   for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
484
485     int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
486
487     if (is_match == 1) {
488       DEBUG("procevent plugin: process %ld name match for %s", pid, buffer);
489
490       if (pl->pid == pid) {
491         // this is a match, and we've already stored the exact pid/name combo
492         DEBUG("procevent plugin: found exact match with name %s, PID %ld for "
493               "incoming PID %ld",
494               pl->process, pl->pid, pid);
495         match = pl;
496         break;
497       } else if (pl->pid == -1) {
498         // this is a match, and we've found a candidate processlist_t to store
499         // this new pid/name combo
500         DEBUG("procevent plugin: reusing pl object with PID %ld for incoming "
501               "PID %ld",
502               pl->pid, pid);
503         pl->pid = pid;
504         match = pl;
505         break;
506       } else if (pl->pid != -1) {
507         // this is a match, but another instance of this process has already
508         // claimed this pid/name combo,
509         // so keep looking
510         DEBUG("procevent plugin: found pl object with matching name for "
511               "incoming PID %ld, but object is in use by PID %ld",
512               pid, pl->pid);
513         match = pl;
514         continue;
515       }
516     }
517   }
518
519   if (match == NULL ||
520       (match != NULL && match->pid != -1 && match->pid != pid)) {
521     // if there wasn't an existing match, OR
522     // if there was a match but the associated processlist_t object already
523     // contained a pid/name combo,
524     // then make a new one and add it to the linked list
525
526     DEBUG("procevent plugin: allocating new processlist_t object for PID %ld "
527           "(%s)",
528           pid, buffer);
529
530     processlist_t *pl2 = calloc(1, sizeof(*pl2));
531     if (pl2 == NULL) {
532       ERROR("procevent plugin: calloc failed during process_check: %s",
533             STRERRNO);
534       return NULL;
535     }
536
537     char *process = strdup(buffer);
538     if (process == NULL) {
539       sfree(pl2);
540       ERROR("procevent plugin: strdup failed during process_check: %s",
541             STRERRNO);
542       return NULL;
543     }
544
545     pl2->process = process;
546     pl2->pid = pid;
547     pl2->next = processlist_head;
548     processlist_head = pl2;
549
550     match = pl2;
551   }
552
553   return match;
554 }
555
556 // Does our map have this PID or name?
557 // NOTE: Caller MUST hold procevent_data_lock when calling this function
558 static processlist_t *process_map_check(long pid, char *process) {
559   for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
560     int match_pid = 0;
561
562     if (pid > 0) {
563       if (pl->pid == pid)
564         match_pid = 1;
565     }
566
567     int match_process = 0;
568
569     if (process != NULL) {
570       if (strcmp(pl->process, process) == 0)
571         match_process = 1;
572     }
573
574     int match = 0;
575
576     if ((pid > 0 && process == NULL && match_pid == 1) ||
577         (pid < 0 && process != NULL && match_process == 1) ||
578         (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)) {
579       match = 1;
580     }
581
582     if (match == 1) {
583       return pl;
584     }
585   }
586
587   return NULL;
588 }
589
590 static int process_map_refresh(void) {
591   errno = 0;
592   DIR *proc = opendir(PROCDIR);
593
594   if (proc == NULL) {
595     ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO);
596     return -1;
597   }
598
599   while (42) {
600     errno = 0;
601     struct dirent *dent = readdir(proc);
602     if (dent == NULL) {
603       if (errno == 0) /* end of directory */
604         break;
605
606       ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
607             STRERRNO);
608       closedir(proc);
609       return -1;
610     }
611
612     if (dent->d_name[0] == '.')
613       continue;
614
615     char file[BUFSIZE];
616
617     int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
618     if ((len < 0) || (len >= BUFSIZE))
619       continue;
620
621     struct stat statbuf;
622
623     int status = stat(file, &statbuf);
624     if (status != 0) {
625       WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO);
626       continue;
627     }
628
629     if (!S_ISDIR(statbuf.st_mode))
630       continue;
631
632     len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
633     if ((len < 0) || (len >= BUFSIZE))
634       continue;
635
636     int not_number = 0;
637
638     for (int i = 0; i < strlen(dent->d_name); i++) {
639       if (!isdigit(dent->d_name[i])) {
640         not_number = 1;
641         break;
642       }
643     }
644
645     if (not_number != 0)
646       continue;
647
648     // Check if we need to store this pid/name combo in our processlist_t linked
649     // list
650     int this_pid = atoi(dent->d_name);
651     pthread_mutex_lock(&procevent_data_lock);
652     processlist_t *pl = process_check(this_pid);
653     pthread_mutex_unlock(&procevent_data_lock);
654
655     if (pl != NULL)
656       DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
657             this_pid, pl->process);
658   }
659
660   closedir(proc);
661
662   return 0;
663 }
664
665 static int nl_connect() {
666   struct sockaddr_nl sa_nl = {
667       .nl_family = AF_NETLINK,
668       .nl_groups = CN_IDX_PROC,
669       .nl_pid = getpid(),
670   };
671
672   nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
673   if (nl_sock == -1) {
674     ERROR("procevent plugin: socket open failed: %d", errno);
675     return -1;
676   }
677
678   int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
679   if (rc == -1) {
680     ERROR("procevent plugin: socket bind failed: %d", errno);
681     close(nl_sock);
682     nl_sock = -1;
683     return -1;
684   }
685
686   return 0;
687 }
688
689 static int set_proc_ev_listen(bool enable) {
690   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
691     struct nlmsghdr nl_hdr;
692     struct __attribute__((__packed__)) {
693       struct cn_msg cn_msg;
694       enum proc_cn_mcast_op cn_mcast;
695     };
696   } nlcn_msg;
697
698   memset(&nlcn_msg, 0, sizeof(nlcn_msg));
699   nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
700   nlcn_msg.nl_hdr.nlmsg_pid = getpid();
701   nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
702
703   nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
704   nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
705   nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
706
707   nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
708
709   int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
710   if (rc == -1) {
711     ERROR("procevent plugin: subscribing to netlink process events failed: %d",
712           errno);
713     return -1;
714   }
715
716   return 0;
717 }
718
719 // Read from netlink socket and write to ring buffer
720 static int read_event() {
721   int recv_flags = MSG_DONTWAIT;
722   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
723     struct nlmsghdr nl_hdr;
724     struct __attribute__((__packed__)) {
725       struct cn_msg cn_msg;
726       struct proc_event proc_ev;
727     };
728   } nlcn_msg;
729
730   if (nl_sock == -1)
731     return 0;
732
733   while (42) {
734     pthread_mutex_lock(&procevent_thread_lock);
735
736     if (procevent_netlink_thread_loop <= 0) {
737       pthread_mutex_unlock(&procevent_thread_lock);
738       return 0;
739     }
740
741     pthread_mutex_unlock(&procevent_thread_lock);
742
743     int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
744
745     if (status == 0) {
746       return 0;
747     } else if (status < 0) {
748       if (errno == EAGAIN || errno == EWOULDBLOCK) {
749         pthread_mutex_lock(&procevent_data_lock);
750
751         // There was nothing more to receive for now, so...
752         // If ring head does not equal ring tail, then there is data
753         // in the ring buffer for the dequeue thread to read, so
754         // signal it
755         if (ring.head != ring.tail)
756           pthread_cond_signal(&procevent_cond);
757
758         pthread_mutex_unlock(&procevent_data_lock);
759
760         // Since there was nothing to receive, set recv to block and
761         // try again
762         recv_flags = 0;
763         continue;
764       } else if (errno != EINTR) {
765         ERROR("procevent plugin: socket receive error: %d", errno);
766         return -1;
767       } else {
768         // Interrupt, so just continue and try again
769         continue;
770       }
771     }
772
773     // We successfully received a message, so don't block on the next
774     // read in case there are more (and if there aren't, it will be
775     // handled above in the EWOULDBLOCK error-checking)
776     recv_flags = MSG_DONTWAIT;
777
778     int proc_id = -1;
779     int proc_status = -1;
780
781     switch (nlcn_msg.proc_ev.what) {
782     case PROC_EVENT_EXEC:
783       proc_status = PROCEVENT_STARTED;
784       proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
785       break;
786     case PROC_EVENT_EXIT:
787       proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
788       proc_status = PROCEVENT_EXITED;
789       break;
790     default:
791       // Otherwise not of interest
792       break;
793     }
794
795     // If we're interested in this process status event, place the event
796     // in the ring buffer for consumption by the dequeue (dispatch) thread.
797
798     if (proc_status != -1) {
799       pthread_mutex_lock(&procevent_data_lock);
800
801       int next = ring.head + 1;
802       if (next >= ring.maxLen)
803         next = 0;
804
805       if (next == ring.tail) {
806         // Buffer is full, signal the dequeue thread to process the buffer
807         // and clean it out, and then sleep
808         WARNING("procevent plugin: ring buffer full");
809
810         pthread_cond_signal(&procevent_cond);
811         pthread_mutex_unlock(&procevent_data_lock);
812
813         usleep(1000);
814         continue;
815       } else {
816         DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
817               (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
818               (unsigned long long)cdtime());
819
820         ring.buffer[ring.head][RBUF_PROC_ID_INDEX] = proc_id;
821         ring.buffer[ring.head][RBUF_PROC_STATUS_INDEX] = proc_status;
822         ring.buffer[ring.head][RBUF_TIME_INDEX] = cdtime();
823
824         ring.head = next;
825       }
826
827       pthread_mutex_unlock(&procevent_data_lock);
828     }
829   }
830
831   return 0;
832 }
833
834 static void procevent_dispatch_notification(long pid, gauge_t value,
835                                             char *process, cdtime_t timestamp) {
836
837   notification_t n = {
838       .severity = (value == 1 ? NOTIF_OKAY : NOTIF_FAILURE),
839       .time = cdtime(),
840       .plugin = "procevent",
841       .type = "gauge",
842       .type_instance = "process_status",
843   };
844
845   sstrncpy(n.host, hostname_g, sizeof(n.host));
846   sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
847
848   char *buf = NULL;
849   gen_message_payload(value, pid, process, timestamp, &buf);
850
851   int status = plugin_notification_meta_add_string(&n, "ves", buf);
852
853   if (status < 0) {
854     sfree(buf);
855     ERROR("procevent plugin: unable to set notification VES metadata: %s",
856           STRERRNO);
857     return;
858   }
859
860   DEBUG("procevent plugin: notification VES metadata: %s",
861         n.meta->nm_value.nm_string);
862
863   DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
864         pid, process);
865
866   plugin_dispatch_notification(&n);
867   plugin_notification_meta_free(n.meta);
868
869   // strdup'd in gen_message_payload
870   if (buf != NULL)
871     sfree(buf);
872 }
873
874 // Read from ring buffer and dispatch to write plugins
875 static void read_ring_buffer() {
876   pthread_mutex_lock(&procevent_data_lock);
877
878   // If there's currently nothing to read from the buffer,
879   // then wait
880   if (ring.head == ring.tail)
881     pthread_cond_wait(&procevent_cond, &procevent_data_lock);
882
883   while (ring.head != ring.tail) {
884     int next = ring.tail + 1;
885
886     if (next >= ring.maxLen)
887       next = 0;
888
889     if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] == PROCEVENT_EXITED) {
890       processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
891
892       if (pl != NULL) {
893         // This process is of interest to us, so publish its EXITED status
894         procevent_dispatch_notification(
895             ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
896             ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
897             ring.buffer[ring.tail][RBUF_TIME_INDEX]);
898         DEBUG(
899             "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
900             "list",
901             pl->pid, pl->process);
902         pl->pid = -1;
903         pl->last_status = -1;
904       }
905     } else if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] ==
906                PROCEVENT_STARTED) {
907       // a new process has started, so check if we should monitor it
908       processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
909
910       // If we had already seen this process name and pid combo before,
911       // and the last message was a "process started" message, don't send
912       // the notfication again
913
914       if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
915         // This process is of interest to us, so publish its STARTED status
916         procevent_dispatch_notification(
917             ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
918             ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
919             ring.buffer[ring.tail][RBUF_TIME_INDEX]);
920
921         pl->last_status = PROCEVENT_STARTED;
922
923         DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
924               "list",
925               pl->pid, pl->process);
926       }
927     }
928
929     ring.tail = next;
930   }
931
932   pthread_mutex_unlock(&procevent_data_lock);
933 }
934
935 // Entry point for thread responsible for listening
936 // to netlink socket and writing data to ring buffer
937 static void *procevent_netlink_thread(void *arg) /* {{{ */
938 {
939   pthread_mutex_lock(&procevent_thread_lock);
940
941   while (procevent_netlink_thread_loop > 0) {
942     pthread_mutex_unlock(&procevent_thread_lock);
943
944     int status = read_event();
945
946     pthread_mutex_lock(&procevent_thread_lock);
947
948     if (status < 0) {
949       procevent_netlink_thread_error = 1;
950       break;
951     }
952   } /* while (procevent_netlink_thread_loop > 0) */
953
954   pthread_mutex_unlock(&procevent_thread_lock);
955
956   return (void *)0;
957 } /* }}} void *procevent_netlink_thread */
958
959 // Entry point for thread responsible for reading from
960 // ring buffer and dispatching notifications
961 static void *procevent_dequeue_thread(void *arg) /* {{{ */
962 {
963   pthread_mutex_lock(&procevent_thread_lock);
964
965   while (procevent_dequeue_thread_loop > 0) {
966     pthread_mutex_unlock(&procevent_thread_lock);
967
968     read_ring_buffer();
969
970     pthread_mutex_lock(&procevent_thread_lock);
971   } /* while (procevent_dequeue_thread_loop > 0) */
972
973   pthread_mutex_unlock(&procevent_thread_lock);
974
975   return (void *)0;
976 } /* }}} void *procevent_dequeue_thread */
977
978 static int start_netlink_thread(void) /* {{{ */
979 {
980   pthread_mutex_lock(&procevent_thread_lock);
981
982   if (procevent_netlink_thread_loop != 0) {
983     pthread_mutex_unlock(&procevent_thread_lock);
984     return 0;
985   }
986
987   int status;
988
989   if (nl_sock == -1) {
990     status = nl_connect();
991
992     if (status != 0) {
993       pthread_mutex_unlock(&procevent_thread_lock);
994       return status;
995     }
996
997     status = set_proc_ev_listen(true);
998     if (status == -1) {
999       pthread_mutex_unlock(&procevent_thread_lock);
1000       return status;
1001     }
1002   }
1003
1004   DEBUG("procevent plugin: socket created and bound");
1005
1006   procevent_netlink_thread_loop = 1;
1007   procevent_netlink_thread_error = 0;
1008
1009   status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL,
1010                                 procevent_netlink_thread,
1011                                 /* arg = */ (void *)0, "procevent");
1012   if (status != 0) {
1013     procevent_netlink_thread_loop = 0;
1014     ERROR("procevent plugin: Starting netlink thread failed.");
1015     pthread_mutex_unlock(&procevent_thread_lock);
1016
1017     int status2 = close(nl_sock);
1018
1019     if (status2 != 0) {
1020       ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
1021             status2, STRERRNO);
1022     }
1023
1024     nl_sock = -1;
1025
1026     return -1;
1027   }
1028
1029   pthread_mutex_unlock(&procevent_thread_lock);
1030
1031   return status;
1032 } /* }}} int start_netlink_thread */
1033
1034 static int start_dequeue_thread(void) /* {{{ */
1035 {
1036   pthread_mutex_lock(&procevent_thread_lock);
1037
1038   if (procevent_dequeue_thread_loop != 0) {
1039     pthread_mutex_unlock(&procevent_thread_lock);
1040     return 0;
1041   }
1042
1043   procevent_dequeue_thread_loop = 1;
1044
1045   int status = plugin_thread_create(&procevent_dequeue_thread_id,
1046                                     /* attr = */ NULL, procevent_dequeue_thread,
1047                                     /* arg = */ (void *)0, "procevent");
1048   if (status != 0) {
1049     procevent_dequeue_thread_loop = 0;
1050     ERROR("procevent plugin: Starting dequeue thread failed.");
1051     pthread_mutex_unlock(&procevent_thread_lock);
1052     return -1;
1053   }
1054
1055   pthread_mutex_unlock(&procevent_thread_lock);
1056
1057   return status;
1058 } /* }}} int start_dequeue_thread */
1059
1060 static int start_threads(void) /* {{{ */
1061 {
1062   int status = start_netlink_thread();
1063   int status2 = start_dequeue_thread();
1064
1065   if (status != 0)
1066     return status;
1067   else
1068     return status2;
1069 } /* }}} int start_threads */
1070
1071 static int stop_netlink_thread(int shutdown) /* {{{ */
1072 {
1073   int socket_status;
1074
1075   if (nl_sock != -1) {
1076     socket_status = close(nl_sock);
1077     if (socket_status != 0) {
1078       ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
1079             socket_status, strerror(errno));
1080     }
1081
1082     nl_sock = -1;
1083   } else
1084     socket_status = 0;
1085
1086   pthread_mutex_lock(&procevent_thread_lock);
1087
1088   if (procevent_netlink_thread_loop == 0) {
1089     pthread_mutex_unlock(&procevent_thread_lock);
1090     return -1;
1091   }
1092
1093   // Set thread termination status
1094   procevent_netlink_thread_loop = 0;
1095   pthread_mutex_unlock(&procevent_thread_lock);
1096
1097   // Let threads waiting on access to the data know to move
1098   // on such that they'll see the thread's termination status
1099   pthread_cond_broadcast(&procevent_cond);
1100
1101   int thread_status;
1102
1103   if (shutdown == 1) {
1104     // Calling pthread_cancel here in
1105     // the case of a shutdown just assures that the thread is
1106     // gone and that the process has been fully terminated.
1107
1108     DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
1109
1110     thread_status = pthread_cancel(procevent_netlink_thread_id);
1111
1112     if (thread_status != 0 && thread_status != ESRCH) {
1113       ERROR("procevent plugin: Unable to cancel netlink thread: %d",
1114             thread_status);
1115       thread_status = -1;
1116     } else
1117       thread_status = 0;
1118   } else {
1119     thread_status =
1120         pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
1121     if (thread_status != 0 && thread_status != ESRCH) {
1122       ERROR("procevent plugin: Stopping netlink thread failed.");
1123       thread_status = -1;
1124     } else
1125       thread_status = 0;
1126   }
1127
1128   pthread_mutex_lock(&procevent_thread_lock);
1129   memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
1130   procevent_netlink_thread_error = 0;
1131   pthread_mutex_unlock(&procevent_thread_lock);
1132
1133   DEBUG("procevent plugin: Finished requesting stop of netlink thread");
1134
1135   if (socket_status != 0)
1136     return socket_status;
1137   else
1138     return thread_status;
1139 } /* }}} int stop_netlink_thread */
1140
1141 static int stop_dequeue_thread() /* {{{ */
1142 {
1143   pthread_mutex_lock(&procevent_thread_lock);
1144
1145   if (procevent_dequeue_thread_loop == 0) {
1146     pthread_mutex_unlock(&procevent_thread_lock);
1147     return -1;
1148   }
1149
1150   procevent_dequeue_thread_loop = 0;
1151   pthread_mutex_unlock(&procevent_thread_lock);
1152
1153   pthread_cond_broadcast(&procevent_cond);
1154
1155   // Calling pthread_cancel here just assures that the thread is
1156   // gone and that the process has been fully terminated.
1157
1158   DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
1159
1160   int status = pthread_cancel(procevent_dequeue_thread_id);
1161
1162   if (status != 0 && status != ESRCH) {
1163     ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
1164     status = -1;
1165   } else
1166     status = 0;
1167
1168   pthread_mutex_lock(&procevent_thread_lock);
1169   memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
1170   pthread_mutex_unlock(&procevent_thread_lock);
1171
1172   DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
1173
1174   return status;
1175 } /* }}} int stop_dequeue_thread */
1176
1177 static int stop_threads() /* {{{ */
1178 {
1179   int status = stop_netlink_thread(1);
1180   int status2 = stop_dequeue_thread();
1181
1182   if (status != 0)
1183     return status;
1184   else
1185     return status2;
1186 } /* }}} int stop_threads */
1187
1188 static int procevent_init(void) /* {{{ */
1189 {
1190   ring.head = 0;
1191   ring.tail = 0;
1192   ring.maxLen = buffer_length;
1193   ring.buffer = (cdtime_t **)calloc(buffer_length, sizeof(cdtime_t *));
1194
1195   for (int i = 0; i < buffer_length; i++) {
1196     ring.buffer[i] = (cdtime_t *)calloc(PROCEVENT_FIELDS, sizeof(cdtime_t));
1197   }
1198
1199   int status = process_map_refresh();
1200
1201   if (status == -1) {
1202     ERROR("procevent plugin: Initial process mapping failed.");
1203     return -1;
1204   }
1205
1206   if (ignorelist == NULL) {
1207     NOTICE("procevent plugin: No processes have been configured.");
1208     return -1;
1209   }
1210
1211   return start_threads();
1212 } /* }}} int procevent_init */
1213
1214 static int procevent_config(const char *key, const char *value) /* {{{ */
1215 {
1216   if (ignorelist == NULL)
1217     ignorelist = ignorelist_create(/* invert = */ 1);
1218
1219   if (ignorelist == NULL) {
1220     return -1;
1221   }
1222
1223   if (strcasecmp(key, "BufferLength") == 0) {
1224     buffer_length = atoi(value);
1225   } else if (strcasecmp(key, "Process") == 0) {
1226     ignorelist_add(ignorelist, value);
1227   } else if (strcasecmp(key, "ProcessRegex") == 0) {
1228 #if HAVE_REGEX_H
1229     int status = ignorelist_add(ignorelist, value);
1230
1231     if (status != 0) {
1232       ERROR("procevent plugin: invalid regular expression: %s", value);
1233       return 1;
1234     }
1235 #else
1236     WARNING("procevent plugin: The plugin has been compiled without support "
1237             "for the \"ProcessRegex\" option.");
1238 #endif
1239   } else {
1240     return -1;
1241   }
1242
1243   return 0;
1244 } /* }}} int procevent_config */
1245
1246 static int procevent_read(void) /* {{{ */
1247 {
1248   pthread_mutex_lock(&procevent_thread_lock);
1249
1250   if (procevent_netlink_thread_error != 0) {
1251
1252     pthread_mutex_unlock(&procevent_thread_lock);
1253
1254     ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
1255
1256     stop_netlink_thread(0);
1257
1258     start_netlink_thread();
1259
1260     return -1;
1261   } /* if (procevent_netlink_thread_error != 0) */
1262
1263   pthread_mutex_unlock(&procevent_thread_lock);
1264
1265   return 0;
1266 } /* }}} int procevent_read */
1267
1268 static int procevent_shutdown(void) /* {{{ */
1269 {
1270   DEBUG("procevent plugin: Shutting down threads.");
1271
1272   int status = stop_threads();
1273
1274   for (int i = 0; i < buffer_length; i++) {
1275     free(ring.buffer[i]);
1276   }
1277
1278   free(ring.buffer);
1279
1280   processlist_t *pl = processlist_head;
1281   while (pl != NULL) {
1282     processlist_t *pl_next;
1283
1284     pl_next = pl->next;
1285
1286     sfree(pl->process);
1287     sfree(pl);
1288
1289     pl = pl_next;
1290   }
1291
1292   ignorelist_free(ignorelist);
1293
1294   return status;
1295 } /* }}} int procevent_shutdown */
1296
1297 void module_register(void) {
1298   plugin_register_config("procevent", procevent_config, config_keys,
1299                          config_keys_num);
1300   plugin_register_init("procevent", procevent_init);
1301   plugin_register_read("procevent", procevent_read);
1302   plugin_register_shutdown("procevent", procevent_shutdown);
1303 } /* void module_register */