payload logic clean-up + leak fixes
[collectd.git] / src / procevent.c
1 /**
2  * collectd - src/procevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32
33 #include <errno.h>
34 #include <pthread.h>
35 #include <regex.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <dirent.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
46 #include <signal.h>
47 #include <stdbool.h>
48 #include <stdlib.h>
49 #include <string.h>
50
51 #include <yajl/yajl_common.h>
52 #include <yajl/yajl_gen.h>
53 #if HAVE_YAJL_YAJL_VERSION_H
54 #include <yajl/yajl_version.h>
55 #endif
56 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
57 #define HAVE_YAJL_V2 1
58 #endif
59
60 #define PROCEVENT_EXITED 0
61 #define PROCEVENT_STARTED 1
62 #define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp
63 #define BUFSIZE 512
64 #define PROCDIR "/proc"
65 #define PROCEVENT_REGEX_MATCHES 1
66
67 #define PROCEVENT_DOMAIN_FIELD "domain"
68 #define PROCEVENT_DOMAIN_VALUE "fault"
69 #define PROCEVENT_EVENT_ID_FIELD "eventId"
70 #define PROCEVENT_EVENT_NAME_FIELD "eventName"
71 #define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
72 #define PROCEVENT_EVENT_NAME_UP_VALUE "up"
73 #define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
74 #define PROCEVENT_PRIORITY_FIELD "priority"
75 #define PROCEVENT_PRIORITY_VALUE "high"
76 #define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
77 #define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
78 #define PROCEVENT_SEQUENCE_FIELD "sequence"
79 #define PROCEVENT_SEQUENCE_VALUE "0"
80 #define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
81 #define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
82 #define PROCEVENT_VERSION_FIELD "version"
83 #define PROCEVENT_VERSION_VALUE "1.0"
84
85 #define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
86 #define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
87 #define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
88 #define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
89 #define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
90 #define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
91 #define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
92 #define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
93 #define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
94 #define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
95 #define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
96 #define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
97 #define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
98 #define PROCEVENT_VF_STATUS_FIELD "vfStatus"
99 #define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
100 #define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
101
102 /*
103  * Private data types
104  */
105
106 typedef struct {
107   int head;
108   int tail;
109   int maxLen;
110   long long unsigned int **buffer;
111 } circbuf_t;
112
113 struct processlist_s {
114   char *process;
115   char *process_regex;
116
117   regex_t process_regex_obj;
118
119   uint32_t is_regex;
120   uint32_t pid;
121
122   struct processlist_s *next;
123 };
124 typedef struct processlist_s processlist_t;
125
126 /*
127  * Private variables
128  */
129
130 static int procevent_thread_loop = 0;
131 static int procevent_thread_error = 0;
132 static pthread_t procevent_thread_id;
133 static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
134 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
135 static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
136 static int nl_sock = -1;
137 static int buffer_length;
138 static circbuf_t ring;
139 static processlist_t *processlist_head = NULL;
140 static int event_id = 0;
141
142 static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"};
143 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
144
145 /*
146  * Private functions
147  */
148
149 static int gen_message_payload(int state, int pid, char *process,
150                                long long unsigned int timestamp, char **buf) {
151   const unsigned char *buf2;
152   yajl_gen g;
153   char json_str[DATA_MAX_NAME_LEN];
154
155 #if !defined(HAVE_YAJL_V2)
156   yajl_gen_config conf = {};
157
158   conf.beautify = 0;
159 #endif
160
161 #if HAVE_YAJL_V2
162   size_t len;
163   g = yajl_gen_alloc(NULL);
164   yajl_gen_config(g, yajl_gen_beautify, 0);
165 #else
166   unsigned int len;
167   g = yajl_gen_alloc(&conf, NULL);
168 #endif
169
170   yajl_gen_clear(g);
171
172   // *** BEGIN common event header ***
173
174   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
175     goto err;
176
177   // domain
178   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
179                       strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
180     goto err;
181
182   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
183                       strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
184     goto err;
185
186   // eventId
187   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
188                       strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
189     goto err;
190
191   event_id = event_id + 1;
192   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
193   memset(json_str, '\0', DATA_MAX_NAME_LEN);
194   snprintf(json_str, event_id_len, "%d", event_id);
195
196   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
197     goto err;
198   }
199
200   // eventName
201   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
202                       strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
203     goto err;
204
205   int event_name_len = 0;
206   event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid
207   event_name_len = event_name_len + strlen(process);      // process name
208   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
209   event_name_len = event_name_len +
210                    13; // "process", 3 spaces, 2 parentheses and null-terminator
211   memset(json_str, '\0', DATA_MAX_NAME_LEN);
212   snprintf(json_str, event_name_len, "process %s (%d) %s", process, pid,
213            (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
214                        : PROCEVENT_EVENT_NAME_UP_VALUE));
215
216   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
217       yajl_gen_status_ok) {
218     goto err;
219   }
220
221   // lastEpochMicrosec
222   if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
223                       strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
224       yajl_gen_status_ok)
225     goto err;
226
227   int last_epoch_microsec_len =
228       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
229   memset(json_str, '\0', DATA_MAX_NAME_LEN);
230   snprintf(json_str, last_epoch_microsec_len, "%llu",
231            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
232
233   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
234     goto err;
235   }
236
237   // priority
238   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
239                       strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
240     goto err;
241
242   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
243                       strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
244     goto err;
245
246   // reportingEntityName
247   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
248                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
249       yajl_gen_status_ok)
250     goto err;
251
252   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
253                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
254       yajl_gen_status_ok)
255     goto err;
256
257   // sequence
258   if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
259                       strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
260     goto err;
261
262   if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
263                       strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
264     goto err;
265
266   // sourceName
267   if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
268                       strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
269       yajl_gen_status_ok)
270     goto err;
271
272   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
273       yajl_gen_status_ok)
274     goto err;
275
276   // startEpochMicrosec
277   if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
278                       strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
279       yajl_gen_status_ok)
280     goto err;
281
282   int start_epoch_microsec_len =
283       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
284   memset(json_str, '\0', DATA_MAX_NAME_LEN);
285   snprintf(json_str, start_epoch_microsec_len, "%llu",
286            (long long unsigned int)timestamp);
287
288   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
289     goto err;
290   }
291
292   // version
293   if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
294                       strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
295     goto err;
296
297   if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
298                       strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
299     goto err;
300
301   // *** END common event header ***
302
303   // *** BEGIN fault fields ***
304
305   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
306                       strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
307       yajl_gen_status_ok)
308     goto err;
309
310   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
311     goto err;
312
313   // alarmCondition
314   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
315                       strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
316       yajl_gen_status_ok)
317     goto err;
318
319   int alarm_condition_len = 0;
320   alarm_condition_len =
321       alarm_condition_len + (sizeof(char) * sizeof(int) * 4);  // pid
322   alarm_condition_len = alarm_condition_len + strlen(process); // process name
323   alarm_condition_len =
324       alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2
325                                 // parentheses and null-terminator
326   memset(json_str, '\0', DATA_MAX_NAME_LEN);
327   snprintf(json_str, alarm_condition_len, "process %s (%d) state change",
328            process, pid);
329
330   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
331       yajl_gen_status_ok) {
332     goto err;
333   }
334
335   // alarmInterfaceA
336   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
337                       strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
338       yajl_gen_status_ok)
339     goto err;
340
341   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
342       yajl_gen_status_ok)
343     goto err;
344
345   // eventSeverity
346   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
347                       strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
348       yajl_gen_status_ok)
349     goto err;
350
351   if (yajl_gen_string(
352           g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
353                                    : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
354           strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
355                              : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
356       yajl_gen_status_ok)
357     goto err;
358
359   // eventSourceType
360   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
361                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
362       yajl_gen_status_ok)
363     goto err;
364
365   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
366                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
367       yajl_gen_status_ok)
368     goto err;
369
370   // faultFieldsVersion
371   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
372                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
373       yajl_gen_status_ok)
374     goto err;
375
376   if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
377                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
378       yajl_gen_status_ok)
379     goto err;
380
381   // specificProblem
382   if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
383                       strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
384       yajl_gen_status_ok)
385     goto err;
386
387   int specific_problem_len = 0;
388   specific_problem_len =
389       specific_problem_len + (sizeof(char) * sizeof(int) * 4);   // pid
390   specific_problem_len = specific_problem_len + strlen(process); // process name
391   specific_problem_len =
392       specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up"
393   specific_problem_len =
394       specific_problem_len +
395       13; // "process", 3 spaces, 2 parentheses and null-terminator
396   memset(json_str, '\0', DATA_MAX_NAME_LEN);
397   snprintf(json_str, specific_problem_len, "process %s (%d) %s", process, pid,
398            (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
399                        : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE));
400
401   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
402       yajl_gen_status_ok) {
403     goto err;
404   }
405
406   // vfStatus
407   if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
408                       strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
409     goto err;
410
411   if (yajl_gen_string(
412           g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
413                                    : PROCEVENT_VF_STATUS_NORMAL_VALUE),
414           strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
415                              : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
416       yajl_gen_status_ok)
417     goto err;
418
419   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
420     goto err;
421
422   // *** END fault fields ***
423
424   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
425     goto err;
426
427   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
428     goto err;
429
430   *buf = malloc(strlen((char *)buf2) + 1);
431
432   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
433
434   yajl_gen_free(g);
435
436   return 0;
437
438 err:
439   yajl_gen_free(g);
440   ERROR("procevent plugin: gen_message_payload failed to generate JSON");
441   return -1;
442 }
443
444 // Does /proc/<pid>/comm contain a process name we are interested in?
445 static processlist_t *process_check(int pid) {
446   int len, is_match, status, retval;
447   char file[BUFSIZE];
448   FILE *fh;
449   char buffer[BUFSIZE];
450   regmatch_t matches[PROCEVENT_REGEX_MATCHES];
451
452   len = snprintf(file, sizeof(file), PROCDIR "/%d/comm", pid);
453
454   if ((len < 0) || (len >= BUFSIZE)) {
455     WARNING("procevent process_check: process name too large");
456     return NULL;
457   }
458
459   if (NULL == (fh = fopen(file, "r"))) {
460     // No /proc/<pid>/comm for this pid, just ignore
461     DEBUG("procevent plugin: no comm file available for pid %d", pid);
462     return NULL;
463   }
464
465   retval = fscanf(fh, "%[^\n]", buffer);
466
467   if (retval < 0) {
468     WARNING("procevent process_check: unable to read comm file for pid %d",
469             pid);
470     return NULL;
471   }
472
473   //
474   // Go through the processlist linked list and look for the process name
475   // in /proc/<pid>/comm.  If found:
476   // 1. If pl->pid is -1, then set pl->pid to <pid>
477   // 2. If pl->pid is not -1, then another <process name> process was already
478   //    found.  If <pid> == pl->pid, this is an old match, so do nothing.
479   //    If the <pid> is different, however,  make a new processlist_t and
480   //    associate <pid> with it (with the same process name as the existing).
481   //
482
483   pthread_mutex_lock(&procevent_list_lock);
484
485   processlist_t *pl;
486   processlist_t *match = NULL;
487
488   for (pl = processlist_head; pl != NULL; pl = pl->next) {
489     if (pl->is_regex != 0) {
490       is_match = (regexec(&pl->process_regex_obj, buffer,
491                           PROCEVENT_REGEX_MATCHES, matches, 0) == 0
492                       ? 1
493                       : 0);
494     } else {
495       is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
496     }
497
498     if (is_match == 1) {
499       DEBUG("procevent plugin: process %d name match (pattern: %s) for %s", pid,
500             (pl->is_regex == 0 ? pl->process : pl->process_regex), buffer);
501
502       if (pl->is_regex == 1) {
503         // If this is a regex name, copy the actual process name into the object
504         // for cleaner log reporting
505
506         if (pl->process != NULL)
507           sfree(pl->process);
508         pl->process = strdup(buffer);
509         if (pl->process == NULL) {
510           char errbuf[1024];
511           ERROR("procevent plugin: strdup failed during process_check: %s",
512                 sstrerror(errno, errbuf, sizeof(errbuf)));
513           pthread_mutex_unlock(&procevent_list_lock);
514           return NULL;
515         }
516       }
517
518       if (pl->pid == pid) {
519         // this is a match, and we've already stored the exact pid/name combo
520         match = pl;
521         break;
522       } else if (pl->pid == -1) {
523         // this is a match, and we've found a candidate processlist_t to store
524         // this new pid/name combo
525         pl->pid = pid;
526         match = pl;
527         break;
528       } else if (pl->pid != -1) {
529         // this is a match, but another instance of this process has already
530         // claimed this pid/name combo,
531         // so keep looking
532         match = pl;
533         continue;
534       }
535     }
536   }
537
538   if (match != NULL && match->pid != -1 && match->pid != pid) {
539     // if there was a match but the associated processlist_t object already
540     // contained a pid/name combo,
541     // then make a new one and add it to the linked list
542
543     DEBUG(
544         "procevent plugin: allocating new processlist_t object for PID %d (%s)",
545         pid, match->process);
546
547     processlist_t *pl2;
548     char *process;
549     char *process_regex;
550
551     pl2 = malloc(sizeof(*pl2));
552     if (pl2 == NULL) {
553       char errbuf[1024];
554       ERROR("procevent plugin: malloc failed during process_check: %s",
555             sstrerror(errno, errbuf, sizeof(errbuf)));
556       pthread_mutex_unlock(&procevent_list_lock);
557       return NULL;
558     }
559
560     process = strdup(match->process);
561     if (process == NULL) {
562       char errbuf[1024];
563       sfree(pl2);
564       ERROR("procevent plugin: strdup failed during process_check: %s",
565             sstrerror(errno, errbuf, sizeof(errbuf)));
566       pthread_mutex_unlock(&procevent_list_lock);
567       return NULL;
568     }
569
570     if (match->is_regex == 1) {
571       pl2->is_regex = 1;
572       status =
573           regcomp(&pl2->process_regex_obj, match->process_regex, REG_EXTENDED);
574
575       if (status != 0) {
576         sfree(pl2);
577         sfree(process);
578         ERROR("procevent plugin: invalid regular expression: %s",
579               match->process_regex);
580         return NULL;
581       }
582
583       process_regex = strdup(match->process_regex);
584       if (process_regex == NULL) {
585         char errbuf[1024];
586         sfree(pl2);
587         sfree(process);
588         ERROR("procevent plugin: strdup failed during process_check: %s",
589               sstrerror(errno, errbuf, sizeof(errbuf)));
590         return NULL;
591       }
592
593       pl2->process_regex = process_regex;
594     }
595
596     pl2->process = process;
597     pl2->pid = pid;
598     pl2->next = processlist_head;
599     processlist_head = pl2;
600
601     match = pl2;
602   }
603
604   pthread_mutex_unlock(&procevent_list_lock);
605
606   if (fh != NULL) {
607     fclose(fh);
608     fh = NULL;
609   }
610
611   return match;
612 }
613
614 // Does our map have this PID or name?
615 static processlist_t *process_map_check(int pid, char *process) {
616   processlist_t *pl;
617
618   pthread_mutex_lock(&procevent_list_lock);
619
620   for (pl = processlist_head; pl != NULL; pl = pl->next) {
621     int match_pid = 0;
622     int match_process = 0;
623     int match = 0;
624
625     if (pid > 0) {
626       if (pl->pid == pid)
627         match_pid = 1;
628     }
629
630     if (process != NULL) {
631       if (strcmp(pl->process, process) == 0)
632         match_process = 1;
633     }
634
635     if (pid > 0 && process == NULL && match_pid == 1)
636       match = 1;
637     else if (pid < 0 && process != NULL && match_process == 1)
638       match = 1;
639     else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
640       match = 1;
641
642     if (match == 1) {
643       pthread_mutex_unlock(&procevent_list_lock);
644       return pl;
645     }
646   }
647
648   pthread_mutex_unlock(&procevent_list_lock);
649
650   return NULL;
651 }
652
653 static int process_map_refresh(void) {
654   DIR *proc;
655
656   errno = 0;
657   proc = opendir(PROCDIR);
658   if (proc == NULL) {
659     char errbuf[1024];
660     ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
661           sstrerror(errno, errbuf, sizeof(errbuf)));
662     return -1;
663   }
664
665   while (42) {
666     struct dirent *dent;
667     int len;
668     char file[BUFSIZE];
669
670     struct stat statbuf;
671
672     int status;
673
674     errno = 0;
675     dent = readdir(proc);
676     if (dent == NULL) {
677       char errbuf[4096];
678
679       if (errno == 0) /* end of directory */
680         break;
681
682       ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
683             sstrerror(errno, errbuf, sizeof(errbuf)));
684       closedir(proc);
685       return -1;
686     }
687
688     if (dent->d_name[0] == '.')
689       continue;
690
691     len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
692     if ((len < 0) || (len >= BUFSIZE))
693       continue;
694
695     status = stat(file, &statbuf);
696     if (status != 0) {
697       char errbuf[4096];
698       WARNING("procevent plugin: stat (%s) failed: %s", file,
699               sstrerror(errno, errbuf, sizeof(errbuf)));
700       continue;
701     }
702
703     if (!S_ISDIR(statbuf.st_mode))
704       continue;
705
706     len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
707     if ((len < 0) || (len >= BUFSIZE))
708       continue;
709
710     int not_number = 0;
711
712     for (int i = 0; i < strlen(dent->d_name); i++) {
713       if (!isdigit(dent->d_name[i])) {
714         not_number = 1;
715         break;
716       }
717     }
718
719     if (not_number != 0)
720       continue;
721
722     // Check if we need to store this pid/name combo in our processlist_t linked
723     // list
724     int this_pid = atoi(dent->d_name);
725     processlist_t *pl = process_check(this_pid);
726
727     if (pl != NULL)
728       DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
729             this_pid, pl->process);
730   }
731
732   closedir(proc);
733
734   return 0;
735 }
736
737 static int nl_connect() {
738   int rc;
739   struct sockaddr_nl sa_nl;
740
741   nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
742   if (nl_sock == -1) {
743     ERROR("procevent plugin: socket open failed.");
744     return -1;
745   }
746
747   sa_nl.nl_family = AF_NETLINK;
748   sa_nl.nl_groups = CN_IDX_PROC;
749   sa_nl.nl_pid = getpid();
750
751   rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
752   if (rc == -1) {
753     ERROR("procevent plugin: socket bind failed.");
754     close(nl_sock);
755     return -1;
756   }
757
758   return 0;
759 }
760
761 static int set_proc_ev_listen(bool enable) {
762   int rc;
763   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
764     struct nlmsghdr nl_hdr;
765     struct __attribute__((__packed__)) {
766       struct cn_msg cn_msg;
767       enum proc_cn_mcast_op cn_mcast;
768     };
769   } nlcn_msg;
770
771   memset(&nlcn_msg, 0, sizeof(nlcn_msg));
772   nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
773   nlcn_msg.nl_hdr.nlmsg_pid = getpid();
774   nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
775
776   nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
777   nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
778   nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
779
780   nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
781
782   rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
783   if (rc == -1) {
784     ERROR("procevent plugin: subscribing to netlink process events failed.");
785     return -1;
786   }
787
788   return 0;
789 }
790
791 static int read_event() {
792   int status;
793   int ret = 0;
794   int proc_id = -1;
795   int proc_status = -1;
796   int proc_extra = -1;
797   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
798     struct nlmsghdr nl_hdr;
799     struct __attribute__((__packed__)) {
800       struct cn_msg cn_msg;
801       struct proc_event proc_ev;
802     };
803   } nlcn_msg;
804
805   if (nl_sock == -1)
806     return ret;
807
808   status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
809
810   if (status == 0) {
811     return 0;
812   } else if (status == -1) {
813     if (errno != EINTR) {
814       ERROR("procevent plugin: socket receive error: %d", errno);
815       return -1;
816     }
817   }
818
819   switch (nlcn_msg.proc_ev.what) {
820   case PROC_EVENT_NONE:
821   case PROC_EVENT_FORK:
822   case PROC_EVENT_UID:
823   case PROC_EVENT_GID:
824     // Not of interest in current version
825     break;
826   case PROC_EVENT_EXEC:
827     proc_status = PROCEVENT_STARTED;
828     proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
829     break;
830   case PROC_EVENT_EXIT:
831     proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
832     proc_status = PROCEVENT_EXITED;
833     proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
834     break;
835   default:
836     break;
837   }
838
839   // If we're interested in this process status event, place the event
840   // in the ring buffer for consumption by the main polling thread.
841
842   if (proc_status != -1) {
843     pthread_mutex_unlock(&procevent_lock);
844
845     int next = ring.head + 1;
846     if (next >= ring.maxLen)
847       next = 0;
848
849     if (next == ring.tail) {
850       WARNING("procevent plugin: ring buffer full");
851     } else {
852       DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
853             (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
854             (long long unsigned int)CDTIME_T_TO_US(cdtime()));
855
856       if (proc_status == PROCEVENT_EXITED) {
857         ring.buffer[ring.head][0] = proc_id;
858         ring.buffer[ring.head][1] = proc_status;
859         ring.buffer[ring.head][2] = proc_extra;
860         ring.buffer[ring.head][3] =
861             (long long unsigned int)CDTIME_T_TO_US(cdtime());
862       } else {
863         ring.buffer[ring.head][0] = proc_id;
864         ring.buffer[ring.head][1] = proc_status;
865         ring.buffer[ring.head][2] = 0;
866         ring.buffer[ring.head][3] =
867             (long long unsigned int)CDTIME_T_TO_US(cdtime());
868       }
869
870       ring.head = next;
871     }
872
873     pthread_mutex_unlock(&procevent_lock);
874   }
875
876   return ret;
877 }
878
879 static void *procevent_thread(void *arg) /* {{{ */
880 {
881   pthread_mutex_lock(&procevent_lock);
882
883   while (procevent_thread_loop > 0) {
884     int status;
885
886     pthread_mutex_unlock(&procevent_lock);
887
888     usleep(1000);
889
890     status = read_event();
891
892     pthread_mutex_lock(&procevent_lock);
893
894     if (status < 0) {
895       procevent_thread_error = 1;
896       break;
897     }
898
899     if (procevent_thread_loop <= 0)
900       break;
901   } /* while (procevent_thread_loop > 0) */
902
903   pthread_mutex_unlock(&procevent_lock);
904
905   return ((void *)0);
906 } /* }}} void *procevent_thread */
907
908 static int start_thread(void) /* {{{ */
909 {
910   int status;
911
912   pthread_mutex_lock(&procevent_lock);
913
914   if (procevent_thread_loop != 0) {
915     pthread_mutex_unlock(&procevent_lock);
916     return (0);
917   }
918
919   if (nl_sock == -1) {
920     status = nl_connect();
921
922     if (status != 0)
923       return status;
924
925     status = set_proc_ev_listen(true);
926     if (status == -1)
927       return status;
928   }
929
930   DEBUG("procevent plugin: socket created and bound");
931
932   procevent_thread_loop = 1;
933   procevent_thread_error = 0;
934
935   status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
936                                 procevent_thread,
937                                 /* arg = */ (void *)0, "procevent");
938   if (status != 0) {
939     procevent_thread_loop = 0;
940     ERROR("procevent plugin: Starting thread failed.");
941     pthread_mutex_unlock(&procevent_lock);
942     return (-1);
943   }
944
945   pthread_mutex_unlock(&procevent_lock);
946   return (0);
947 } /* }}} int start_thread */
948
949 static int stop_thread(int shutdown) /* {{{ */
950 {
951   int status;
952
953   if (nl_sock != -1) {
954     status = close(nl_sock);
955     if (status != 0) {
956       ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
957             status, strerror(errno));
958       return (-1);
959     } else
960       nl_sock = -1;
961   }
962
963   pthread_mutex_lock(&procevent_lock);
964
965   if (procevent_thread_loop == 0) {
966     pthread_mutex_unlock(&procevent_lock);
967     return (-1);
968   }
969
970   procevent_thread_loop = 0;
971   pthread_cond_broadcast(&procevent_cond);
972   pthread_mutex_unlock(&procevent_lock);
973
974   if (shutdown == 1) {
975     // Calling pthread_cancel here in
976     // the case of a shutdown just assures that the thread is
977     // gone and that the process has been fully terminated.
978
979     DEBUG("procevent plugin: Canceling thread for process shutdown");
980
981     status = pthread_cancel(procevent_thread_id);
982
983     if (status != 0) {
984       ERROR("procevent plugin: Unable to cancel thread: %d", status);
985       status = -1;
986     }
987   } else {
988     status = pthread_join(procevent_thread_id, /* return = */ NULL);
989     if (status != 0) {
990       ERROR("procevent plugin: Stopping thread failed.");
991       status = -1;
992     }
993   }
994
995   pthread_mutex_lock(&procevent_lock);
996   memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
997   procevent_thread_error = 0;
998   pthread_mutex_unlock(&procevent_lock);
999
1000   DEBUG("procevent plugin: Finished requesting stop of thread");
1001
1002   return (status);
1003 } /* }}} int stop_thread */
1004
1005 static int procevent_init(void) /* {{{ */
1006 {
1007   int status;
1008
1009   if (processlist_head == NULL) {
1010     NOTICE("procevent plugin: No processes have been configured.");
1011     return (-1);
1012   }
1013
1014   ring.head = 0;
1015   ring.tail = 0;
1016   ring.maxLen = buffer_length;
1017   ring.buffer = (long long unsigned int **)malloc(
1018       buffer_length * sizeof(long long unsigned int *));
1019
1020   for (int i = 0; i < buffer_length; i++) {
1021     ring.buffer[i] = (long long unsigned int *)malloc(
1022         PROCEVENT_FIELDS * sizeof(long long unsigned int));
1023   }
1024
1025   status = process_map_refresh();
1026
1027   if (status == -1) {
1028     ERROR("procevent plugin: Initial process mapping failed.");
1029     return (-1);
1030   }
1031
1032   return (start_thread());
1033 } /* }}} int procevent_init */
1034
1035 static int procevent_config(const char *key, const char *value) /* {{{ */
1036 {
1037   int status;
1038
1039   if (strcasecmp(key, "BufferLength") == 0) {
1040     buffer_length = atoi(value);
1041   } else if (strcasecmp(key, "Process") == 0 ||
1042              strcasecmp(key, "RegexProcess") == 0) {
1043
1044     processlist_t *pl;
1045     char *process;
1046     char *process_regex;
1047
1048     pl = malloc(sizeof(*pl));
1049     if (pl == NULL) {
1050       char errbuf[1024];
1051       ERROR("procevent plugin: malloc failed during procevent_config: %s",
1052             sstrerror(errno, errbuf, sizeof(errbuf)));
1053       return (1);
1054     }
1055
1056     process = strdup(value);
1057     if (process == NULL) {
1058       char errbuf[1024];
1059       sfree(pl);
1060       ERROR("procevent plugin: strdup failed during procevent_config: %s",
1061             sstrerror(errno, errbuf, sizeof(errbuf)));
1062       return (1);
1063     }
1064
1065     if (strcasecmp(key, "RegexProcess") == 0) {
1066       pl->is_regex = 1;
1067       status = regcomp(&pl->process_regex_obj, value, REG_EXTENDED);
1068
1069       if (status != 0) {
1070         sfree(pl);
1071         sfree(process);
1072         ERROR("procevent plugin: invalid regular expression: %s", value);
1073         return (1);
1074       }
1075
1076       process_regex = strdup(value);
1077       if (process_regex == NULL) {
1078         char errbuf[1024];
1079         sfree(pl);
1080         sfree(process);
1081         ERROR("procevent plugin: strdup failed during procevent_config: %s",
1082               sstrerror(errno, errbuf, sizeof(errbuf)));
1083         return (1);
1084       }
1085
1086       pl->process_regex = process_regex;
1087     } else {
1088       pl->is_regex = 0;
1089     }
1090
1091     pl->process = process;
1092     pl->pid = -1;
1093     pl->next = processlist_head;
1094     processlist_head = pl;
1095   } else {
1096     return (-1);
1097   }
1098
1099   return (0);
1100 } /* }}} int procevent_config */
1101
1102 static void procevent_dispatch_notification(int pid, const char *type, /* {{{ */
1103                                             gauge_t value, char *process,
1104                                             long long unsigned int timestamp) {
1105   char *buf = NULL;
1106   notification_t n = {NOTIF_FAILURE, cdtime(), "", "", "procevent", "", "", "",
1107                       NULL};
1108
1109   if (value == 1)
1110     n.severity = NOTIF_OKAY;
1111
1112   char hostname[1024];
1113   gethostname(hostname, sizeof(hostname));
1114
1115   sstrncpy(n.host, hostname, sizeof(n.host));
1116   sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
1117   sstrncpy(n.type, "gauge", sizeof(n.type));
1118   sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance));
1119
1120   gen_message_payload(value, pid, process, timestamp, &buf);
1121
1122   notification_meta_t *m = calloc(1, sizeof(*m));
1123
1124   if (m == NULL) {
1125     char errbuf[1024];
1126     sfree(buf);
1127     ERROR("procevent plugin: unable to allocate metadata: %s",
1128           sstrerror(errno, errbuf, sizeof(errbuf)));
1129     return;
1130   }
1131
1132   sstrncpy(m->name, "ves", sizeof(m->name));
1133   m->nm_value.nm_string = sstrdup(buf);
1134   m->type = NM_TYPE_STRING;
1135   n.meta = m;
1136
1137   DEBUG("procevent plugin: notification message: %s",
1138         n.meta->nm_value.nm_string);
1139
1140   DEBUG("procevent plugin: dispatching state %d for PID %d (%s)", (int)value,
1141         pid, process);
1142
1143   plugin_dispatch_notification(&n);
1144   plugin_notification_meta_free(n.meta);
1145
1146   // malloc'd in gen_message_payload
1147   if (buf != NULL)
1148     sfree(buf);
1149 }
1150
1151 static int procevent_read(void) /* {{{ */
1152 {
1153   if (procevent_thread_error != 0) {
1154     ERROR(
1155         "procevent plugin: The interface thread had a problem. Restarting it.");
1156
1157     stop_thread(0);
1158
1159     start_thread();
1160
1161     return (-1);
1162   } /* if (procevent_thread_error != 0) */
1163
1164   pthread_mutex_lock(&procevent_lock);
1165
1166   while (ring.head != ring.tail) {
1167     int next = ring.tail + 1;
1168
1169     if (next >= ring.maxLen)
1170       next = 0;
1171
1172     if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
1173       processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
1174
1175       if (pl != NULL) {
1176         // This process is of interest to us, so publish its EXITED status
1177         procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1178                                         ring.buffer[ring.tail][1], pl->process,
1179                                         ring.buffer[ring.tail][3]);
1180         DEBUG("procevent plugin: PID %d (%s) EXITED, removing PID from process "
1181               "list",
1182               pl->pid, pl->process);
1183         pl->pid = -1;
1184       }
1185     } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
1186       // a new process has started, so check if we should monitor it
1187       processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
1188
1189       if (pl != NULL) {
1190         // This process is of interest to us, so publish its STARTED status
1191         procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1192                                         ring.buffer[ring.tail][1], pl->process,
1193                                         ring.buffer[ring.tail][3]);
1194         DEBUG(
1195             "procevent plugin: PID %d (%s) STARTED, adding PID to process list",
1196             pl->pid, pl->process);
1197       }
1198     }
1199
1200     ring.tail = next;
1201   }
1202
1203   pthread_mutex_unlock(&procevent_lock);
1204
1205   return (0);
1206 } /* }}} int procevent_read */
1207
1208 static int procevent_shutdown(void) /* {{{ */
1209 {
1210   processlist_t *pl;
1211
1212   DEBUG("procevent plugin: Shutting down thread.");
1213
1214   if (stop_thread(1) < 0)
1215     return (-1);
1216
1217   for (int i = 0; i < buffer_length; i++) {
1218     free(ring.buffer[i]);
1219   }
1220
1221   free(ring.buffer);
1222
1223   pl = processlist_head;
1224   while (pl != NULL) {
1225     processlist_t *pl_next;
1226
1227     pl_next = pl->next;
1228
1229     if (pl->is_regex == 1) {
1230       sfree(pl->process_regex);
1231       regfree(&pl->process_regex_obj);
1232     }
1233
1234     sfree(pl->process);
1235     sfree(pl);
1236
1237     pl = pl_next;
1238   }
1239
1240   return (0);
1241 } /* }}} int procevent_shutdown */
1242
1243 void module_register(void) {
1244   plugin_register_config("procevent", procevent_config, config_keys,
1245                          config_keys_num);
1246   plugin_register_init("procevent", procevent_init);
1247   plugin_register_read("procevent", procevent_read);
1248   plugin_register_shutdown("procevent", procevent_shutdown);
1249 } /* void module_register */