00c55f96af42c465d3c1456bc42327ab52dece62
[collectd.git] / src / procevent.c
1 /**
2  * collectd - src/procevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32
33 #include <errno.h>
34 #include <pthread.h>
35 #include <regex.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <dirent.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
46 #include <signal.h>
47 #include <stdbool.h>
48 #include <stdlib.h>
49 #include <string.h>
50
51 #include <yajl/yajl_common.h>
52 #include <yajl/yajl_gen.h>
53 #if HAVE_YAJL_YAJL_VERSION_H
54 #include <yajl/yajl_version.h>
55 #endif
56 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
57 #define HAVE_YAJL_V2 1
58 #endif
59
60 #define PROCEVENT_EXITED 0
61 #define PROCEVENT_STARTED 1
62 #define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp
63 #define BUFSIZE 512
64 #define PROCDIR "/proc"
65 #define PROCEVENT_REGEX_MATCHES 1
66
67 #define PROCEVENT_DOMAIN_FIELD "domain"
68 #define PROCEVENT_DOMAIN_VALUE "fault"
69 #define PROCEVENT_EVENT_ID_FIELD "eventId"
70 #define PROCEVENT_EVENT_NAME_FIELD "eventName"
71 #define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
72 #define PROCEVENT_EVENT_NAME_UP_VALUE "up"
73 #define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
74 #define PROCEVENT_PRIORITY_FIELD "priority"
75 #define PROCEVENT_PRIORITY_VALUE "high"
76 #define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
77 #define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
78 #define PROCEVENT_SEQUENCE_FIELD "sequence"
79 #define PROCEVENT_SEQUENCE_VALUE "0"
80 #define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
81 #define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
82 #define PROCEVENT_VERSION_FIELD "version"
83 #define PROCEVENT_VERSION_VALUE "1.0"
84
85 #define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
86 #define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
87 #define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
88 #define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
89 #define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
90 #define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
91 #define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
92 #define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
93 #define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
94 #define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
95 #define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
96 #define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
97 #define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
98 #define PROCEVENT_VF_STATUS_FIELD "vfStatus"
99 #define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
100 #define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
101
102 /*
103  * Private data types
104  */
105
106 typedef struct {
107   int head;
108   int tail;
109   int maxLen;
110   long long unsigned int **buffer;
111 } circbuf_t;
112
113 struct processlist_s {
114   char *process;
115   char *process_regex;
116
117   regex_t process_regex_obj;
118
119   uint32_t is_regex;
120   uint32_t pid;
121
122   struct processlist_s *next;
123 };
124 typedef struct processlist_s processlist_t;
125
126 /*
127  * Private variables
128  */
129
130 static int procevent_thread_loop = 0;
131 static int procevent_thread_error = 0;
132 static pthread_t procevent_thread_id;
133 static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
134 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
135 static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
136 static int nl_sock = -1;
137 static int buffer_length;
138 static circbuf_t ring;
139 static processlist_t *processlist_head = NULL;
140 static int event_id = 0;
141
142 static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"};
143 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
144
145 /*
146  * Private functions
147  */
148
149 static int gen_message_payload(int state, int pid, char *process,
150                                long long unsigned int timestamp, char **buf) {
151   const unsigned char *buf2;
152   yajl_gen g;
153
154 #if !defined(HAVE_YAJL_V2)
155   yajl_gen_config conf = {};
156
157   conf.beautify = 0;
158 #endif
159
160 #if HAVE_YAJL_V2
161   size_t len;
162   g = yajl_gen_alloc(NULL);
163   yajl_gen_config(g, yajl_gen_beautify, 0);
164 #else
165   unsigned int len;
166   g = yajl_gen_alloc(&conf, NULL);
167 #endif
168
169   yajl_gen_clear(g);
170
171   // *** BEGIN common event header ***
172
173   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
174     goto err;
175
176   // domain
177   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
178                       strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
179     goto err;
180
181   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
182                       strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
183     goto err;
184
185   // eventId
186   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
187                       strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
188     goto err;
189
190   event_id = event_id + 1;
191   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
192   char *event_id_str = malloc(event_id_len);
193   snprintf(event_id_str, event_id_len, "%d", event_id);
194
195   if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
196       yajl_gen_status_ok) {
197     sfree(event_id_str);
198     goto err;
199   }
200
201   sfree(event_id_str);
202
203   // eventName
204   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
205                       strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
206     goto err;
207
208   int event_name_len = 0;
209   event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid
210   event_name_len = event_name_len + strlen(process);      // process name
211   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
212   event_name_len = event_name_len +
213                    13; // "process", 3 spaces, 2 parentheses and null-terminator
214   char *event_name_str = malloc(event_name_len);
215   memset(event_name_str, '\0', event_name_len);
216   snprintf(event_name_str, event_name_len, "process %s (%d) %s", process, pid,
217            (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
218                        : PROCEVENT_EVENT_NAME_UP_VALUE));
219
220   if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
221       yajl_gen_status_ok) {
222     sfree(event_name_str);
223     goto err;
224   }
225
226   sfree(event_name_str);
227
228   // lastEpochMicrosec
229   if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
230                       strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
231       yajl_gen_status_ok)
232     goto err;
233
234   int last_epoch_microsec_len =
235       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
236   char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
237   snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
238            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
239
240   if (yajl_gen_number(g, last_epoch_microsec_str,
241                       strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
242     sfree(last_epoch_microsec_str);
243     goto err;
244   }
245
246   sfree(last_epoch_microsec_str);
247
248   // priority
249   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
250                       strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
251     goto err;
252
253   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
254                       strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
255     goto err;
256
257   // reportingEntityName
258   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
259                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
260       yajl_gen_status_ok)
261     goto err;
262
263   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
264                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
265       yajl_gen_status_ok)
266     goto err;
267
268   // sequence
269   if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
270                       strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
271     goto err;
272
273   if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
274                       strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
275     goto err;
276
277   // sourceName
278   if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
279                       strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
280       yajl_gen_status_ok)
281     goto err;
282
283   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
284       yajl_gen_status_ok)
285     goto err;
286
287   // startEpochMicrosec
288   if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
289                       strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
290       yajl_gen_status_ok)
291     goto err;
292
293   int start_epoch_microsec_len =
294       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
295   char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
296   snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
297            (long long unsigned int)timestamp);
298
299   if (yajl_gen_number(g, start_epoch_microsec_str,
300                       strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
301     sfree(start_epoch_microsec_str);
302     goto err;
303   }
304
305   sfree(start_epoch_microsec_str);
306
307   // version
308   if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
309                       strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
310     goto err;
311
312   if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
313                       strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
314     goto err;
315
316   // *** END common event header ***
317
318   // *** BEGIN fault fields ***
319
320   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
321                       strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
322       yajl_gen_status_ok)
323     goto err;
324
325   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
326     goto err;
327
328   // alarmCondition
329   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
330                       strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
331       yajl_gen_status_ok)
332     goto err;
333
334   int alarm_condition_len = 0;
335   alarm_condition_len =
336       alarm_condition_len + (sizeof(char) * sizeof(int) * 4);  // pid
337   alarm_condition_len = alarm_condition_len + strlen(process); // process name
338   alarm_condition_len =
339       alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2
340                                 // parentheses and null-terminator
341   char *alarm_condition_str = malloc(alarm_condition_len);
342   memset(alarm_condition_str, '\0', alarm_condition_len);
343   snprintf(alarm_condition_str, alarm_condition_len,
344            "process %s (%d) state change", process, pid);
345
346   if (yajl_gen_string(g, (u_char *)alarm_condition_str,
347                       strlen(alarm_condition_str)) != yajl_gen_status_ok) {
348     sfree(alarm_condition_str);
349     goto err;
350   }
351
352   sfree(alarm_condition_str);
353
354   // alarmInterfaceA
355   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
356                       strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
357       yajl_gen_status_ok)
358     goto err;
359
360   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
361       yajl_gen_status_ok)
362     goto err;
363
364   // eventSeverity
365   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
366                       strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
367       yajl_gen_status_ok)
368     goto err;
369
370   if (yajl_gen_string(
371           g, (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
372                                    : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
373           strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
374                              : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
375       yajl_gen_status_ok)
376     goto err;
377
378   // eventSourceType
379   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
380                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
381       yajl_gen_status_ok)
382     goto err;
383
384   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
385                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
386       yajl_gen_status_ok)
387     goto err;
388
389   // faultFieldsVersion
390   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
391                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
392       yajl_gen_status_ok)
393     goto err;
394
395   if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
396                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
397       yajl_gen_status_ok)
398     goto err;
399
400   // specificProblem
401   if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
402                       strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
403       yajl_gen_status_ok)
404     goto err;
405
406   int specific_problem_len = 0;
407   specific_problem_len =
408       specific_problem_len + (sizeof(char) * sizeof(int) * 4);   // pid
409   specific_problem_len = specific_problem_len + strlen(process); // process name
410   specific_problem_len =
411       specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up"
412   specific_problem_len =
413       specific_problem_len +
414       13; // "process", 3 spaces, 2 parentheses and null-terminator
415   char *specific_problem_str = malloc(specific_problem_len);
416   memset(specific_problem_str, '\0', specific_problem_len);
417   snprintf(specific_problem_str, specific_problem_len, "process %s (%d) %s",
418            process, pid, (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
419                                      : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE));
420
421   if (yajl_gen_string(g, (u_char *)specific_problem_str,
422                       strlen(specific_problem_str)) != yajl_gen_status_ok) {
423     sfree(specific_problem_str);
424     goto err;
425   }
426
427   sfree(specific_problem_str);
428
429   // vfStatus
430   if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
431                       strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
432     goto err;
433
434   if (yajl_gen_string(
435           g, (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
436                                    : PROCEVENT_VF_STATUS_NORMAL_VALUE),
437           strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
438                              : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
439       yajl_gen_status_ok)
440     goto err;
441
442   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
443     goto err;
444
445   // *** END fault fields ***
446
447   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
448     goto err;
449
450   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
451     goto err;
452
453   *buf = malloc(strlen((char *)buf2) + 1);
454
455   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
456
457   yajl_gen_free(g);
458
459   return 0;
460
461 err:
462   yajl_gen_free(g);
463   ERROR("procevent plugin: gen_message_payload failed to generate JSON");
464   return -1;
465 }
466
467 // Does /proc/<pid>/comm contain a process name we are interested in?
468 static processlist_t *process_check(int pid) {
469   int len, is_match, status, retval;
470   char file[BUFSIZE];
471   FILE *fh;
472   char buffer[BUFSIZE];
473   regmatch_t matches[PROCEVENT_REGEX_MATCHES];
474
475   len = snprintf(file, sizeof(file), PROCDIR "/%d/comm", pid);
476
477   if ((len < 0) || (len >= BUFSIZE)) {
478     WARNING("procevent process_check: process name too large");
479     return NULL;
480   }
481
482   if (NULL == (fh = fopen(file, "r"))) {
483     // No /proc/<pid>/comm for this pid, just ignore
484     DEBUG("procevent plugin: no comm file available for pid %d", pid);
485     return NULL;
486   }
487
488   retval = fscanf(fh, "%[^\n]", buffer);
489
490   if (retval < 0) {
491     WARNING("procevent process_check: unable to read comm file for pid %d",
492             pid);
493     return NULL;
494   }
495
496   //
497   // Go through the processlist linked list and look for the process name
498   // in /proc/<pid>/comm.  If found:
499   // 1. If pl->pid is -1, then set pl->pid to <pid>
500   // 2. If pl->pid is not -1, then another <process name> process was already
501   //    found.  If <pid> == pl->pid, this is an old match, so do nothing.
502   //    If the <pid> is different, however,  make a new processlist_t and
503   //    associate <pid> with it (with the same process name as the existing).
504   //
505
506   pthread_mutex_lock(&procevent_list_lock);
507
508   processlist_t *pl;
509   processlist_t *match = NULL;
510
511   for (pl = processlist_head; pl != NULL; pl = pl->next) {
512     if (pl->is_regex != 0) {
513       is_match = (regexec(&pl->process_regex_obj, buffer,
514                           PROCEVENT_REGEX_MATCHES, matches, 0) == 0
515                       ? 1
516                       : 0);
517     } else {
518       is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
519     }
520
521     if (is_match == 1) {
522       DEBUG("procevent plugin: process %d name match (pattern: %s) for %s", pid,
523             (pl->is_regex == 0 ? pl->process : pl->process_regex), buffer);
524
525       if (pl->is_regex == 1) {
526         // If this is a regex name, copy the actual process name into the object
527         // for cleaner log reporting
528
529         if (pl->process != NULL)
530           sfree(pl->process);
531         pl->process = strdup(buffer);
532         if (pl->process == NULL) {
533           char errbuf[1024];
534           ERROR("procevent plugin: strdup failed during process_check: %s",
535                 sstrerror(errno, errbuf, sizeof(errbuf)));
536           pthread_mutex_unlock(&procevent_list_lock);
537           return NULL;
538         }
539       }
540
541       if (pl->pid == pid) {
542         // this is a match, and we've already stored the exact pid/name combo
543         match = pl;
544         break;
545       } else if (pl->pid == -1) {
546         // this is a match, and we've found a candidate processlist_t to store
547         // this new pid/name combo
548         pl->pid = pid;
549         match = pl;
550         break;
551       } else if (pl->pid != -1) {
552         // this is a match, but another instance of this process has already
553         // claimed this pid/name combo,
554         // so keep looking
555         match = pl;
556         continue;
557       }
558     }
559   }
560
561   if (match != NULL && match->pid != -1 && match->pid != pid) {
562     // if there was a match but the associated processlist_t object already
563     // contained a pid/name combo,
564     // then make a new one and add it to the linked list
565
566     DEBUG(
567         "procevent plugin: allocating new processlist_t object for PID %d (%s)",
568         pid, match->process);
569
570     processlist_t *pl2;
571     char *process;
572     char *process_regex;
573
574     pl2 = malloc(sizeof(*pl2));
575     if (pl2 == NULL) {
576       char errbuf[1024];
577       ERROR("procevent plugin: malloc failed during process_check: %s",
578             sstrerror(errno, errbuf, sizeof(errbuf)));
579       pthread_mutex_unlock(&procevent_list_lock);
580       return NULL;
581     }
582
583     process = strdup(match->process);
584     if (process == NULL) {
585       char errbuf[1024];
586       sfree(pl2);
587       ERROR("procevent plugin: strdup failed during process_check: %s",
588             sstrerror(errno, errbuf, sizeof(errbuf)));
589       pthread_mutex_unlock(&procevent_list_lock);
590       return NULL;
591     }
592
593     if (match->is_regex == 1) {
594       pl2->is_regex = 1;
595       status =
596           regcomp(&pl2->process_regex_obj, match->process_regex, REG_EXTENDED);
597
598       if (status != 0) {
599         ERROR("procevent plugin: invalid regular expression: %s",
600               match->process_regex);
601         return NULL;
602       }
603
604       process_regex = strdup(match->process_regex);
605       if (process_regex == NULL) {
606         char errbuf[1024];
607         sfree(pl);
608         ERROR("procevent plugin: strdup failed during process_check: %s",
609               sstrerror(errno, errbuf, sizeof(errbuf)));
610         return NULL;
611       }
612
613       pl2->process_regex = process_regex;
614     }
615
616     pl2->process = process;
617     pl2->pid = pid;
618     pl2->next = processlist_head;
619     processlist_head = pl2;
620
621     match = pl2;
622   }
623
624   pthread_mutex_unlock(&procevent_list_lock);
625
626   if (fh != NULL) {
627     fclose(fh);
628     fh = NULL;
629   }
630
631   return match;
632 }
633
634 // Does our map have this PID or name?
635 static processlist_t *process_map_check(int pid, char *process) {
636   processlist_t *pl;
637
638   pthread_mutex_lock(&procevent_list_lock);
639
640   for (pl = processlist_head; pl != NULL; pl = pl->next) {
641     int match_pid = 0;
642     int match_process = 0;
643     int match = 0;
644
645     if (pid > 0) {
646       if (pl->pid == pid)
647         match_pid = 1;
648     }
649
650     if (process != NULL) {
651       if (strcmp(pl->process, process) == 0)
652         match_process = 1;
653     }
654
655     if (pid > 0 && process == NULL && match_pid == 1)
656       match = 1;
657     else if (pid < 0 && process != NULL && match_process == 1)
658       match = 1;
659     else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
660       match = 1;
661
662     if (match == 1) {
663       pthread_mutex_unlock(&procevent_list_lock);
664       return pl;
665     }
666   }
667
668   pthread_mutex_unlock(&procevent_list_lock);
669
670   return NULL;
671 }
672
673 static int process_map_refresh(void) {
674   DIR *proc;
675
676   errno = 0;
677   proc = opendir(PROCDIR);
678   if (proc == NULL) {
679     char errbuf[1024];
680     ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
681           sstrerror(errno, errbuf, sizeof(errbuf)));
682     return -1;
683   }
684
685   while (42) {
686     struct dirent *dent;
687     int len;
688     char file[BUFSIZE];
689
690     struct stat statbuf;
691
692     int status;
693
694     errno = 0;
695     dent = readdir(proc);
696     if (dent == NULL) {
697       char errbuf[4096];
698
699       if (errno == 0) /* end of directory */
700         break;
701
702       ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
703             sstrerror(errno, errbuf, sizeof(errbuf)));
704       closedir(proc);
705       return -1;
706     }
707
708     if (dent->d_name[0] == '.')
709       continue;
710
711     len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
712     if ((len < 0) || (len >= BUFSIZE))
713       continue;
714
715     status = stat(file, &statbuf);
716     if (status != 0) {
717       char errbuf[4096];
718       WARNING("procevent plugin: stat (%s) failed: %s", file,
719               sstrerror(errno, errbuf, sizeof(errbuf)));
720       continue;
721     }
722
723     if (!S_ISDIR(statbuf.st_mode))
724       continue;
725
726     len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
727     if ((len < 0) || (len >= BUFSIZE))
728       continue;
729
730     int not_number = 0;
731
732     for (int i = 0; i < strlen(dent->d_name); i++) {
733       if (!isdigit(dent->d_name[i])) {
734         not_number = 1;
735         break;
736       }
737     }
738
739     if (not_number != 0)
740       continue;
741
742     // Check if we need to store this pid/name combo in our processlist_t linked
743     // list
744     int this_pid = atoi(dent->d_name);
745     processlist_t *pl = process_check(this_pid);
746
747     if (pl != NULL)
748       DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
749             this_pid, pl->process);
750   }
751
752   closedir(proc);
753
754   return 0;
755 }
756
757 static int nl_connect() {
758   int rc;
759   struct sockaddr_nl sa_nl;
760
761   nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
762   if (nl_sock == -1) {
763     ERROR("procevent plugin: socket open failed.");
764     return -1;
765   }
766
767   sa_nl.nl_family = AF_NETLINK;
768   sa_nl.nl_groups = CN_IDX_PROC;
769   sa_nl.nl_pid = getpid();
770
771   rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
772   if (rc == -1) {
773     ERROR("procevent plugin: socket bind failed.");
774     close(nl_sock);
775     return -1;
776   }
777
778   return 0;
779 }
780
781 static int set_proc_ev_listen(bool enable) {
782   int rc;
783   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
784     struct nlmsghdr nl_hdr;
785     struct __attribute__((__packed__)) {
786       struct cn_msg cn_msg;
787       enum proc_cn_mcast_op cn_mcast;
788     };
789   } nlcn_msg;
790
791   memset(&nlcn_msg, 0, sizeof(nlcn_msg));
792   nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
793   nlcn_msg.nl_hdr.nlmsg_pid = getpid();
794   nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
795
796   nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
797   nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
798   nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
799
800   nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
801
802   rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
803   if (rc == -1) {
804     ERROR("procevent plugin: subscribing to netlink process events failed.");
805     return -1;
806   }
807
808   return 0;
809 }
810
811 static int read_event() {
812   int status;
813   int ret = 0;
814   int proc_id = -1;
815   int proc_status = -1;
816   int proc_extra = -1;
817   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
818     struct nlmsghdr nl_hdr;
819     struct __attribute__((__packed__)) {
820       struct cn_msg cn_msg;
821       struct proc_event proc_ev;
822     };
823   } nlcn_msg;
824
825   if (nl_sock == -1)
826     return ret;
827
828   status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
829
830   if (status == 0) {
831     return 0;
832   } else if (status == -1) {
833     if (errno != EINTR) {
834       ERROR("procevent plugin: socket receive error: %d", errno);
835       return -1;
836     }
837   }
838
839   switch (nlcn_msg.proc_ev.what) {
840   case PROC_EVENT_NONE:
841     // printf("set mcast listen ok\n");
842     break;
843   case PROC_EVENT_FORK:
844     // printf("fork: parent tid=%d pid=%d -> child tid=%d pid=%d\n",
845     //         nlcn_msg.proc_ev.event_data.fork.parent_pid,
846     //         nlcn_msg.proc_ev.event_data.fork.parent_tgid,
847     //         nlcn_msg.proc_ev.event_data.fork.child_pid,
848     //         nlcn_msg.proc_ev.event_data.fork.child_tgid);
849     // proc_status = PROCEVENT_STARTED;
850     // proc_id = nlcn_msg.proc_ev.event_data.fork.child_pid;
851     break;
852   case PROC_EVENT_EXEC:
853     // printf("exec: tid=%d pid=%d\n",
854     //         nlcn_msg.proc_ev.event_data.exec.process_pid,
855     //         nlcn_msg.proc_ev.event_data.exec.process_tgid);
856     proc_status = PROCEVENT_STARTED;
857     proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
858     break;
859   case PROC_EVENT_UID:
860     // printf("uid change: tid=%d pid=%d from %d to %d\n",
861     //         nlcn_msg.proc_ev.event_data.id.process_pid,
862     //         nlcn_msg.proc_ev.event_data.id.process_tgid,
863     //         nlcn_msg.proc_ev.event_data.id.r.ruid,
864     //         nlcn_msg.proc_ev.event_data.id.e.euid);
865     break;
866   case PROC_EVENT_GID:
867     // printf("gid change: tid=%d pid=%d from %d to %d\n",
868     //         nlcn_msg.proc_ev.event_data.id.process_pid,
869     //         nlcn_msg.proc_ev.event_data.id.process_tgid,
870     //         nlcn_msg.proc_ev.event_data.id.r.rgid,
871     //         nlcn_msg.proc_ev.event_data.id.e.egid);
872     break;
873   case PROC_EVENT_EXIT:
874     proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
875     proc_status = PROCEVENT_EXITED;
876     proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
877     break;
878   default:
879     break;
880   }
881
882   // If we're interested in this process status event, place the event
883   // in the ring buffer for consumption by the main polling thread.
884
885   if (proc_status != -1) {
886     pthread_mutex_unlock(&procevent_lock);
887
888     int next = ring.head + 1;
889     if (next >= ring.maxLen)
890       next = 0;
891
892     if (next == ring.tail) {
893       WARNING("procevent plugin: ring buffer full");
894     } else {
895       DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
896             (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
897             (long long unsigned int)CDTIME_T_TO_US(cdtime()));
898
899       if (proc_status == PROCEVENT_EXITED) {
900         ring.buffer[ring.head][0] = proc_id;
901         ring.buffer[ring.head][1] = proc_status;
902         ring.buffer[ring.head][2] = proc_extra;
903         ring.buffer[ring.head][3] =
904             (long long unsigned int)CDTIME_T_TO_US(cdtime());
905       } else {
906         ring.buffer[ring.head][0] = proc_id;
907         ring.buffer[ring.head][1] = proc_status;
908         ring.buffer[ring.head][2] = 0;
909         ring.buffer[ring.head][3] =
910             (long long unsigned int)CDTIME_T_TO_US(cdtime());
911       }
912
913       ring.head = next;
914     }
915
916     pthread_mutex_unlock(&procevent_lock);
917   }
918
919   return ret;
920 }
921
922 static void *procevent_thread(void *arg) /* {{{ */
923 {
924   pthread_mutex_lock(&procevent_lock);
925
926   while (procevent_thread_loop > 0) {
927     int status;
928
929     pthread_mutex_unlock(&procevent_lock);
930
931     usleep(1000);
932
933     status = read_event();
934
935     pthread_mutex_lock(&procevent_lock);
936
937     if (status < 0) {
938       procevent_thread_error = 1;
939       break;
940     }
941
942     if (procevent_thread_loop <= 0)
943       break;
944   } /* while (procevent_thread_loop > 0) */
945
946   pthread_mutex_unlock(&procevent_lock);
947
948   return ((void *)0);
949 } /* }}} void *procevent_thread */
950
951 static int start_thread(void) /* {{{ */
952 {
953   int status;
954
955   pthread_mutex_lock(&procevent_lock);
956
957   if (procevent_thread_loop != 0) {
958     pthread_mutex_unlock(&procevent_lock);
959     return (0);
960   }
961
962   if (nl_sock == -1) {
963     status = nl_connect();
964
965     if (status != 0)
966       return status;
967
968     status = set_proc_ev_listen(true);
969     if (status == -1)
970       return status;
971   }
972
973   DEBUG("procevent plugin: socket created and bound");
974
975   procevent_thread_loop = 1;
976   procevent_thread_error = 0;
977
978   status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
979                                 procevent_thread,
980                                 /* arg = */ (void *)0, "procevent");
981   if (status != 0) {
982     procevent_thread_loop = 0;
983     ERROR("procevent plugin: Starting thread failed.");
984     pthread_mutex_unlock(&procevent_lock);
985     return (-1);
986   }
987
988   pthread_mutex_unlock(&procevent_lock);
989   return (0);
990 } /* }}} int start_thread */
991
992 static int stop_thread(int shutdown) /* {{{ */
993 {
994   int status;
995
996   if (nl_sock != -1) {
997     status = close(nl_sock);
998     if (status != 0) {
999       ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
1000             status, strerror(errno));
1001       return (-1);
1002     } else
1003       nl_sock = -1;
1004   }
1005
1006   pthread_mutex_lock(&procevent_lock);
1007
1008   if (procevent_thread_loop == 0) {
1009     pthread_mutex_unlock(&procevent_lock);
1010     return (-1);
1011   }
1012
1013   procevent_thread_loop = 0;
1014   pthread_cond_broadcast(&procevent_cond);
1015   pthread_mutex_unlock(&procevent_lock);
1016
1017   if (shutdown == 1) {
1018     // Calling pthread_cancel here in
1019     // the case of a shutdown just assures that the thread is
1020     // gone and that the process has been fully terminated.
1021
1022     DEBUG("procevent plugin: Canceling thread for process shutdown");
1023
1024     status = pthread_cancel(procevent_thread_id);
1025
1026     if (status != 0) {
1027       ERROR("procevent plugin: Unable to cancel thread: %d", status);
1028       status = -1;
1029     }
1030   } else {
1031     status = pthread_join(procevent_thread_id, /* return = */ NULL);
1032     if (status != 0) {
1033       ERROR("procevent plugin: Stopping thread failed.");
1034       status = -1;
1035     }
1036   }
1037
1038   pthread_mutex_lock(&procevent_lock);
1039   memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
1040   procevent_thread_error = 0;
1041   pthread_mutex_unlock(&procevent_lock);
1042
1043   DEBUG("procevent plugin: Finished requesting stop of thread");
1044
1045   return (status);
1046 } /* }}} int stop_thread */
1047
1048 static int procevent_init(void) /* {{{ */
1049 {
1050   int status;
1051
1052   if (processlist_head == NULL) {
1053     NOTICE("procevent plugin: No processes have been configured.");
1054     return (-1);
1055   }
1056
1057   ring.head = 0;
1058   ring.tail = 0;
1059   ring.maxLen = buffer_length;
1060   ring.buffer = (long long unsigned int **)malloc(
1061       buffer_length * sizeof(long long unsigned int *));
1062
1063   for (int i = 0; i < buffer_length; i++) {
1064     ring.buffer[i] = (long long unsigned int *)malloc(
1065         PROCEVENT_FIELDS * sizeof(long long unsigned int));
1066   }
1067
1068   status = process_map_refresh();
1069
1070   if (status == -1) {
1071     ERROR("procevent plugin: Initial process mapping failed.");
1072     return (-1);
1073   }
1074
1075   return (start_thread());
1076 } /* }}} int procevent_init */
1077
1078 static int procevent_config(const char *key, const char *value) /* {{{ */
1079 {
1080   int status;
1081
1082   if (strcasecmp(key, "BufferLength") == 0) {
1083     buffer_length = atoi(value);
1084   } else if (strcasecmp(key, "Process") == 0 ||
1085              strcasecmp(key, "RegexProcess") == 0) {
1086
1087     processlist_t *pl;
1088     char *process;
1089     char *process_regex;
1090
1091     pl = malloc(sizeof(*pl));
1092     if (pl == NULL) {
1093       char errbuf[1024];
1094       ERROR("procevent plugin: malloc failed during procevent_config: %s",
1095             sstrerror(errno, errbuf, sizeof(errbuf)));
1096       return (1);
1097     }
1098
1099     process = strdup(value);
1100     if (process == NULL) {
1101       char errbuf[1024];
1102       sfree(pl);
1103       ERROR("procevent plugin: strdup failed during procevent_config: %s",
1104             sstrerror(errno, errbuf, sizeof(errbuf)));
1105       return (1);
1106     }
1107
1108     if (strcasecmp(key, "RegexProcess") == 0) {
1109       pl->is_regex = 1;
1110       status = regcomp(&pl->process_regex_obj, value, REG_EXTENDED);
1111
1112       if (status != 0) {
1113         ERROR("procevent plugin: invalid regular expression: %s", value);
1114         return (1);
1115       }
1116
1117       process_regex = strdup(value);
1118       if (process_regex == NULL) {
1119         char errbuf[1024];
1120         sfree(pl);
1121         ERROR("procevent plugin: strdup failed during procevent_config: %s",
1122               sstrerror(errno, errbuf, sizeof(errbuf)));
1123         return (1);
1124       }
1125
1126       pl->process_regex = process_regex;
1127     } else {
1128       pl->is_regex = 0;
1129     }
1130
1131     pl->process = process;
1132     pl->pid = -1;
1133     pl->next = processlist_head;
1134     processlist_head = pl;
1135   } else {
1136     return (-1);
1137   }
1138
1139   return (0);
1140 } /* }}} int procevent_config */
1141
1142 static void procevent_dispatch_notification(int pid, const char *type, /* {{{ */
1143                                             gauge_t value, char *process,
1144                                             long long unsigned int timestamp) {
1145   char *buf = NULL;
1146   notification_t n = {NOTIF_FAILURE, cdtime(), "", "", "procevent", "", "", "",
1147                       NULL};
1148
1149   if (value == 1)
1150     n.severity = NOTIF_OKAY;
1151
1152   char hostname[1024];
1153   gethostname(hostname, sizeof(hostname));
1154
1155   sstrncpy(n.host, hostname, sizeof(n.host));
1156   sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
1157   sstrncpy(n.type, "gauge", sizeof(n.type));
1158   sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance));
1159
1160   gen_message_payload(value, pid, process, timestamp, &buf);
1161
1162   notification_meta_t *m = calloc(1, sizeof(*m));
1163
1164   if (m == NULL) {
1165     char errbuf[1024];
1166     sfree(buf);
1167     ERROR("procevent plugin: unable to allocate metadata: %s",
1168           sstrerror(errno, errbuf, sizeof(errbuf)));
1169     return;
1170   }
1171
1172   sstrncpy(m->name, "ves", sizeof(m->name));
1173   m->nm_value.nm_string = sstrdup(buf);
1174   m->type = NM_TYPE_STRING;
1175   n.meta = m;
1176
1177   DEBUG("procevent plugin: notification message: %s",
1178         n.meta->nm_value.nm_string);
1179
1180   DEBUG("procevent plugin: dispatching state %d for PID %d (%s)", (int)value,
1181         pid, process);
1182
1183   plugin_dispatch_notification(&n);
1184   plugin_notification_meta_free(n.meta);
1185
1186   // malloc'd in gen_message_payload
1187   if (buf != NULL)
1188     sfree(buf);
1189 }
1190
1191 static int procevent_read(void) /* {{{ */
1192 {
1193   if (procevent_thread_error != 0) {
1194     ERROR(
1195         "procevent plugin: The interface thread had a problem. Restarting it.");
1196
1197     stop_thread(0);
1198
1199     start_thread();
1200
1201     return (-1);
1202   } /* if (procevent_thread_error != 0) */
1203
1204   pthread_mutex_lock(&procevent_lock);
1205
1206   while (ring.head != ring.tail) {
1207     int next = ring.tail + 1;
1208
1209     if (next >= ring.maxLen)
1210       next = 0;
1211
1212     if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
1213       processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
1214
1215       if (pl != NULL) {
1216         // This process is of interest to us, so publish its EXITED status
1217         procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1218                                         ring.buffer[ring.tail][1], pl->process,
1219                                         ring.buffer[ring.tail][3]);
1220         DEBUG("procevent plugin: PID %d (%s) EXITED, removing PID from process "
1221               "list",
1222               pl->pid, pl->process);
1223         pl->pid = -1;
1224       }
1225     } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
1226       // a new process has started, so check if we should monitor it
1227       processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
1228
1229       if (pl != NULL) {
1230         // This process is of interest to us, so publish its STARTED status
1231         procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1232                                         ring.buffer[ring.tail][1], pl->process,
1233                                         ring.buffer[ring.tail][3]);
1234         DEBUG(
1235             "procevent plugin: PID %d (%s) STARTED, adding PID to process list",
1236             pl->pid, pl->process);
1237       }
1238     }
1239
1240     ring.tail = next;
1241   }
1242
1243   pthread_mutex_unlock(&procevent_lock);
1244
1245   return (0);
1246 } /* }}} int procevent_read */
1247
1248 static int procevent_shutdown(void) /* {{{ */
1249 {
1250   // int status = 0;
1251   processlist_t *pl;
1252
1253   DEBUG("procevent plugin: Shutting down thread.");
1254
1255   if (stop_thread(1) < 0)
1256     return (-1);
1257
1258   for (int i = 0; i < buffer_length; i++) {
1259     free(ring.buffer[i]);
1260   }
1261
1262   free(ring.buffer);
1263
1264   pl = processlist_head;
1265   while (pl != NULL) {
1266     processlist_t *pl_next;
1267
1268     pl_next = pl->next;
1269
1270     if (pl->is_regex == 1) {
1271       sfree(pl->process_regex);
1272       regfree(&pl->process_regex_obj);
1273     }
1274
1275     sfree(pl->process);
1276     sfree(pl);
1277
1278     pl = pl_next;
1279   }
1280
1281   return (0);
1282 } /* }}} int procevent_shutdown */
1283
1284 void module_register(void) {
1285   plugin_register_config("procevent", procevent_config, config_keys,
1286                          config_keys_num);
1287   plugin_register_init("procevent", procevent_init);
1288   plugin_register_read("procevent", procevent_read);
1289   plugin_register_shutdown("procevent", procevent_shutdown);
1290 } /* void module_register */