Drain netlink socket before sleeping, then filter repeated process start msgs
[collectd.git] / src / procevent.c
1 /**
2  * collectd - src/procevent.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_complain.h"
32 #include "utils_ignorelist.h"
33
34 #include <errno.h>
35 #include <pthread.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <dirent.h>
42 #include <linux/cn_proc.h>
43 #include <linux/connector.h>
44 #include <linux/netlink.h>
45 #include <linux/rtnetlink.h>
46 #include <signal.h>
47 #include <stdbool.h>
48 #include <stdlib.h>
49 #include <string.h>
50
51 #include <yajl/yajl_common.h>
52 #include <yajl/yajl_gen.h>
53 #if HAVE_YAJL_YAJL_VERSION_H
54 #include <yajl/yajl_version.h>
55 #endif
56 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
57 #define HAVE_YAJL_V2 1
58 #endif
59
60 #define PROCEVENT_EXITED 0
61 #define PROCEVENT_STARTED 1
62 #define PROCEVENT_FIELDS 4 // pid, status, extra, timestamp
63 #define BUFSIZE 512
64 #define PROCDIR "/proc"
65
66 #define PROCEVENT_DOMAIN_FIELD "domain"
67 #define PROCEVENT_DOMAIN_VALUE "fault"
68 #define PROCEVENT_EVENT_ID_FIELD "eventId"
69 #define PROCEVENT_EVENT_NAME_FIELD "eventName"
70 #define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
71 #define PROCEVENT_EVENT_NAME_UP_VALUE "up"
72 #define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
73 #define PROCEVENT_PRIORITY_FIELD "priority"
74 #define PROCEVENT_PRIORITY_VALUE "high"
75 #define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
76 #define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
77 #define PROCEVENT_SEQUENCE_FIELD "sequence"
78 #define PROCEVENT_SEQUENCE_VALUE "0"
79 #define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
80 #define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
81 #define PROCEVENT_VERSION_FIELD "version"
82 #define PROCEVENT_VERSION_VALUE "1.0"
83
84 #define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
85 #define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
86 #define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
87 #define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
88 #define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
89 #define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
90 #define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
91 #define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
92 #define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
93 #define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
94 #define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
95 #define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
96 #define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
97 #define PROCEVENT_VF_STATUS_FIELD "vfStatus"
98 #define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
99 #define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
100
101 /*
102  * Private data types
103  */
104
105 typedef struct {
106   int head;
107   int tail;
108   int maxLen;
109   long long unsigned int **buffer;
110 } circbuf_t;
111
112 struct processlist_s {
113   char *process;
114
115   long pid;
116   int32_t last_status;
117
118   struct processlist_s *next;
119 };
120 typedef struct processlist_s processlist_t;
121
122 /*
123  * Private variables
124  */
125 static ignorelist_t *ignorelist = NULL;
126
127 static int procevent_thread_loop = 0;
128 static int procevent_thread_error = 0;
129 static pthread_t procevent_thread_id;
130 static pthread_mutex_t procevent_lock = PTHREAD_MUTEX_INITIALIZER;
131 static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
132 static pthread_mutex_t procevent_list_lock = PTHREAD_MUTEX_INITIALIZER;
133 static int nl_sock = -1;
134 static int buffer_length;
135 static circbuf_t ring;
136 static processlist_t *processlist_head = NULL;
137 static int event_id = 0;
138
139 static const char *config_keys[] = {"BufferLength", "Process", "RegexProcess"};
140 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
141
142 /*
143  * Private functions
144  */
145
146 static int gen_message_payload(int state, int pid, char *process,
147                                long long unsigned int timestamp, char **buf) {
148   const unsigned char *buf2;
149   yajl_gen g;
150   char json_str[DATA_MAX_NAME_LEN];
151
152 #if !defined(HAVE_YAJL_V2)
153   yajl_gen_config conf = {};
154
155   conf.beautify = 0;
156 #endif
157
158 #if HAVE_YAJL_V2
159   size_t len;
160   g = yajl_gen_alloc(NULL);
161   yajl_gen_config(g, yajl_gen_beautify, 0);
162 #else
163   unsigned int len;
164   g = yajl_gen_alloc(&conf, NULL);
165 #endif
166
167   yajl_gen_clear(g);
168
169   // *** BEGIN common event header ***
170
171   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
172     goto err;
173
174   // domain
175   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
176                       strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
177     goto err;
178
179   if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
180                       strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
181     goto err;
182
183   // eventId
184   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
185                       strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
186     goto err;
187
188   event_id = event_id + 1;
189   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
190   memset(json_str, '\0', DATA_MAX_NAME_LEN);
191   snprintf(json_str, event_id_len, "%d", event_id);
192
193   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
194     goto err;
195   }
196
197   // eventName
198   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
199                       strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
200     goto err;
201
202   int event_name_len = 0;
203   event_name_len = event_name_len + (sizeof(char) * sizeof(int) * 4); // pid
204   event_name_len = event_name_len + strlen(process);      // process name
205   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
206   event_name_len = event_name_len +
207                    13; // "process", 3 spaces, 2 parentheses and null-terminator
208   memset(json_str, '\0', DATA_MAX_NAME_LEN);
209   snprintf(json_str, event_name_len, "process %s (%d) %s", process, pid,
210            (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
211                        : PROCEVENT_EVENT_NAME_UP_VALUE));
212
213   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
214       yajl_gen_status_ok) {
215     goto err;
216   }
217
218   // lastEpochMicrosec
219   if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
220                       strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
221       yajl_gen_status_ok)
222     goto err;
223
224   int last_epoch_microsec_len =
225       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
226   memset(json_str, '\0', DATA_MAX_NAME_LEN);
227   snprintf(json_str, last_epoch_microsec_len, "%llu",
228            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
229
230   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
231     goto err;
232   }
233
234   // priority
235   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
236                       strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
237     goto err;
238
239   if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
240                       strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
241     goto err;
242
243   // reportingEntityName
244   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
245                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
246       yajl_gen_status_ok)
247     goto err;
248
249   if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
250                       strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
251       yajl_gen_status_ok)
252     goto err;
253
254   // sequence
255   if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
256                       strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
257     goto err;
258
259   if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
260                       strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
261     goto err;
262
263   // sourceName
264   if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
265                       strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
266       yajl_gen_status_ok)
267     goto err;
268
269   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
270       yajl_gen_status_ok)
271     goto err;
272
273   // startEpochMicrosec
274   if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
275                       strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
276       yajl_gen_status_ok)
277     goto err;
278
279   int start_epoch_microsec_len =
280       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
281   memset(json_str, '\0', DATA_MAX_NAME_LEN);
282   snprintf(json_str, start_epoch_microsec_len, "%llu",
283            (long long unsigned int)timestamp);
284
285   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
286     goto err;
287   }
288
289   // version
290   if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
291                       strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
292     goto err;
293
294   if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
295                       strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
296     goto err;
297
298   // *** END common event header ***
299
300   // *** BEGIN fault fields ***
301
302   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
303                       strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
304       yajl_gen_status_ok)
305     goto err;
306
307   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
308     goto err;
309
310   // alarmCondition
311   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
312                       strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
313       yajl_gen_status_ok)
314     goto err;
315
316   int alarm_condition_len = 0;
317   alarm_condition_len =
318       alarm_condition_len + (sizeof(char) * sizeof(int) * 4);  // pid
319   alarm_condition_len = alarm_condition_len + strlen(process); // process name
320   alarm_condition_len =
321       alarm_condition_len + 25; // "process", "state", "change", 4 spaces, 2
322                                 // parentheses and null-terminator
323   memset(json_str, '\0', DATA_MAX_NAME_LEN);
324   snprintf(json_str, alarm_condition_len, "process %s (%d) state change",
325            process, pid);
326
327   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
328       yajl_gen_status_ok) {
329     goto err;
330   }
331
332   // alarmInterfaceA
333   if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
334                       strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
335       yajl_gen_status_ok)
336     goto err;
337
338   if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
339       yajl_gen_status_ok)
340     goto err;
341
342   // eventSeverity
343   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
344                       strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
345       yajl_gen_status_ok)
346     goto err;
347
348   if (yajl_gen_string(
349           g,
350           (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
351                                 : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
352           strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
353                              : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
354       yajl_gen_status_ok)
355     goto err;
356
357   // eventSourceType
358   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
359                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
360       yajl_gen_status_ok)
361     goto err;
362
363   if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
364                       strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
365       yajl_gen_status_ok)
366     goto err;
367
368   // faultFieldsVersion
369   if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
370                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
371       yajl_gen_status_ok)
372     goto err;
373
374   if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
375                       strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
376       yajl_gen_status_ok)
377     goto err;
378
379   // specificProblem
380   if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
381                       strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
382       yajl_gen_status_ok)
383     goto err;
384
385   int specific_problem_len = 0;
386   specific_problem_len =
387       specific_problem_len + (sizeof(char) * sizeof(int) * 4);   // pid
388   specific_problem_len = specific_problem_len + strlen(process); // process name
389   specific_problem_len =
390       specific_problem_len + (state == 0 ? 4 : 2); // "down" or "up"
391   specific_problem_len =
392       specific_problem_len +
393       13; // "process", 3 spaces, 2 parentheses and null-terminator
394   memset(json_str, '\0', DATA_MAX_NAME_LEN);
395   snprintf(json_str, specific_problem_len, "process %s (%d) %s", process, pid,
396            (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
397                        : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE));
398
399   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
400       yajl_gen_status_ok) {
401     goto err;
402   }
403
404   // vfStatus
405   if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
406                       strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
407     goto err;
408
409   if (yajl_gen_string(
410           g,
411           (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
412                                 : PROCEVENT_VF_STATUS_NORMAL_VALUE),
413           strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
414                              : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
415       yajl_gen_status_ok)
416     goto err;
417
418   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
419     goto err;
420
421   // *** END fault fields ***
422
423   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
424     goto err;
425
426   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
427     goto err;
428
429   *buf = malloc(strlen((char *)buf2) + 1);
430
431   if (*buf == NULL) {
432     char errbuf[1024];
433     ERROR("procevent plugin: malloc failed during gen_message_payload: %s",
434           sstrerror(errno, errbuf, sizeof(errbuf)));
435     goto err;
436   }
437
438   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
439
440   yajl_gen_free(g);
441
442   return 0;
443
444 err:
445   yajl_gen_free(g);
446   ERROR("procevent plugin: gen_message_payload failed to generate JSON");
447   return -1;
448 }
449
450 // Does /proc/<pid>/comm contain a process name we are interested in?
451 static processlist_t *process_check(int pid) {
452   int len, is_match, retval;
453   char file[BUFSIZE];
454   FILE *fh;
455   char buffer[BUFSIZE];
456
457   len = snprintf(file, sizeof(file), PROCDIR "/%d/comm", pid);
458
459   if ((len < 0) || (len >= BUFSIZE)) {
460     WARNING("procevent process_check: process name too large");
461     return NULL;
462   }
463
464   if (NULL == (fh = fopen(file, "r"))) {
465     // No /proc/<pid>/comm for this pid, just ignore
466     DEBUG("procevent plugin: no comm file available for pid %d", pid);
467     return NULL;
468   }
469
470   retval = fscanf(fh, "%[^\n]", buffer);
471
472   if (retval < 0) {
473     WARNING("procevent process_check: unable to read comm file for pid %d",
474             pid);
475     fclose(fh);
476     return NULL;
477   }
478
479   // Now that we have the process name in the buffer, check if we are
480   // even interested in it
481   if (ignorelist_match(ignorelist, buffer) != 0) {
482     DEBUG("procevent process_check: ignoring process %s (%d)", buffer, pid);
483     fclose(fh);
484     return NULL;
485   }
486
487   if (fh != NULL) {
488     fclose(fh);
489     fh = NULL;
490   }
491
492   //
493   // Go through the processlist linked list and look for the process name
494   // in /proc/<pid>/comm.  If found:
495   // 1. If pl->pid is -1, then set pl->pid to <pid> (and return that object)
496   // 2. If pl->pid is not -1, then another <process name> process was already
497   //    found.  If <pid> == pl->pid, this is an old match, so do nothing.
498   //    If the <pid> is different, however, make a new processlist_t and
499   //    associate <pid> with it (with the same process name as the existing).
500   //
501
502   pthread_mutex_lock(&procevent_list_lock);
503
504   processlist_t *pl;
505   processlist_t *match = NULL;
506
507   for (pl = processlist_head; pl != NULL; pl = pl->next) {
508
509     is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
510
511     if (is_match == 1) {
512       DEBUG("procevent plugin: process %d name match for %s", pid, buffer);
513
514       if (pl->pid == pid) {
515         // this is a match, and we've already stored the exact pid/name combo
516         DEBUG("procevent plugin: found exact match with name %s, PID %ld for "
517               "incoming PID %d",
518               pl->process, pl->pid, pid);
519         match = pl;
520         break;
521       } else if (pl->pid == -1) {
522         // this is a match, and we've found a candidate processlist_t to store
523         // this new pid/name combo
524         DEBUG("procevent plugin: reusing pl object with PID %ld for incoming "
525               "PID %d",
526               pl->pid, pid);
527         pl->pid = pid;
528         match = pl;
529         break;
530       } else if (pl->pid != -1) {
531         // this is a match, but another instance of this process has already
532         // claimed this pid/name combo,
533         // so keep looking
534         DEBUG("procevent plugin: found pl object with matching name for "
535               "incoming PID %d, but object is in use by PID %ld",
536               pid, pl->pid);
537         match = pl;
538         continue;
539       }
540     }
541   }
542
543   if (match == NULL ||
544       (match != NULL && match->pid != -1 && match->pid != pid)) {
545     // if there wasn't an existing match, OR
546     // if there was a match but the associated processlist_t object already
547     // contained a pid/name combo,
548     // then make a new one and add it to the linked list
549
550     DEBUG(
551         "procevent plugin: allocating new processlist_t object for PID %d (%s)",
552         pid, buffer);
553
554     processlist_t *pl2;
555     char *process;
556
557     pl2 = malloc(sizeof(*pl2));
558     if (pl2 == NULL) {
559       char errbuf[1024];
560       ERROR("procevent plugin: malloc failed during process_check: %s",
561             sstrerror(errno, errbuf, sizeof(errbuf)));
562       pthread_mutex_unlock(&procevent_list_lock);
563       return NULL;
564     }
565
566     process = strdup(buffer);
567     if (process == NULL) {
568       char errbuf[1024];
569       sfree(pl2);
570       ERROR("procevent plugin: strdup failed during process_check: %s",
571             sstrerror(errno, errbuf, sizeof(errbuf)));
572       pthread_mutex_unlock(&procevent_list_lock);
573       return NULL;
574     }
575
576     pl2->process = process;
577     pl2->pid = pid;
578     pl2->next = processlist_head;
579     processlist_head = pl2;
580
581     match = pl2;
582   }
583
584   pthread_mutex_unlock(&procevent_list_lock);
585
586   return match;
587 }
588
589 // Does our map have this PID or name?
590 static processlist_t *process_map_check(int pid, char *process) {
591   processlist_t *pl;
592
593   pthread_mutex_lock(&procevent_list_lock);
594
595   for (pl = processlist_head; pl != NULL; pl = pl->next) {
596     int match_pid = 0;
597     int match_process = 0;
598     int match = 0;
599
600     if (pid > 0) {
601       if (pl->pid == pid)
602         match_pid = 1;
603     }
604
605     if (process != NULL) {
606       if (strcmp(pl->process, process) == 0)
607         match_process = 1;
608     }
609
610     if (pid > 0 && process == NULL && match_pid == 1)
611       match = 1;
612     else if (pid < 0 && process != NULL && match_process == 1)
613       match = 1;
614     else if (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)
615       match = 1;
616
617     if (match == 1) {
618       pthread_mutex_unlock(&procevent_list_lock);
619       return pl;
620     }
621   }
622
623   pthread_mutex_unlock(&procevent_list_lock);
624
625   return NULL;
626 }
627
628 static int process_map_refresh(void) {
629   DIR *proc;
630
631   errno = 0;
632   proc = opendir(PROCDIR);
633   if (proc == NULL) {
634     char errbuf[1024];
635     ERROR("procevent plugin: fopen (%s): %s", PROCDIR,
636           sstrerror(errno, errbuf, sizeof(errbuf)));
637     return -1;
638   }
639
640   while (42) {
641     struct dirent *dent;
642     int len;
643     char file[BUFSIZE];
644
645     struct stat statbuf;
646
647     int status;
648
649     errno = 0;
650     dent = readdir(proc);
651     if (dent == NULL) {
652       char errbuf[4096];
653
654       if (errno == 0) /* end of directory */
655         break;
656
657       ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
658             sstrerror(errno, errbuf, sizeof(errbuf)));
659       closedir(proc);
660       return -1;
661     }
662
663     if (dent->d_name[0] == '.')
664       continue;
665
666     len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
667     if ((len < 0) || (len >= BUFSIZE))
668       continue;
669
670     status = stat(file, &statbuf);
671     if (status != 0) {
672       char errbuf[4096];
673       WARNING("procevent plugin: stat (%s) failed: %s", file,
674               sstrerror(errno, errbuf, sizeof(errbuf)));
675       continue;
676     }
677
678     if (!S_ISDIR(statbuf.st_mode))
679       continue;
680
681     len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
682     if ((len < 0) || (len >= BUFSIZE))
683       continue;
684
685     int not_number = 0;
686
687     for (int i = 0; i < strlen(dent->d_name); i++) {
688       if (!isdigit(dent->d_name[i])) {
689         not_number = 1;
690         break;
691       }
692     }
693
694     if (not_number != 0)
695       continue;
696
697     // Check if we need to store this pid/name combo in our processlist_t linked
698     // list
699     int this_pid = atoi(dent->d_name);
700     processlist_t *pl = process_check(this_pid);
701
702     if (pl != NULL)
703       DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
704             this_pid, pl->process);
705   }
706
707   closedir(proc);
708
709   return 0;
710 }
711
712 static int nl_connect() {
713   int rc;
714   struct sockaddr_nl sa_nl;
715
716   nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
717   if (nl_sock == -1) {
718     ERROR("procevent plugin: socket open failed: %d", errno);
719     return -1;
720   }
721
722   sa_nl.nl_family = AF_NETLINK;
723   sa_nl.nl_groups = CN_IDX_PROC;
724   sa_nl.nl_pid = getpid();
725
726   rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
727   if (rc == -1) {
728     ERROR("procevent plugin: socket bind failed: %d", errno);
729     close(nl_sock);
730     return -1;
731   }
732
733   return 0;
734 }
735
736 static int set_proc_ev_listen(bool enable) {
737   int rc;
738   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
739     struct nlmsghdr nl_hdr;
740     struct __attribute__((__packed__)) {
741       struct cn_msg cn_msg;
742       enum proc_cn_mcast_op cn_mcast;
743     };
744   } nlcn_msg;
745
746   memset(&nlcn_msg, 0, sizeof(nlcn_msg));
747   nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
748   nlcn_msg.nl_hdr.nlmsg_pid = getpid();
749   nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
750
751   nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
752   nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
753   nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
754
755   nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
756
757   rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
758   if (rc == -1) {
759     ERROR("procevent plugin: subscribing to netlink process events failed: %d",
760           errno);
761     return -1;
762   }
763
764   return 0;
765 }
766
767 static int read_event() {
768   int status;
769   int ret = 0;
770   int proc_id = -1;
771   int proc_status = -1;
772   int proc_extra = -1;
773   struct __attribute__((aligned(NLMSG_ALIGNTO))) {
774     struct nlmsghdr nl_hdr;
775     struct __attribute__((__packed__)) {
776       struct cn_msg cn_msg;
777       struct proc_event proc_ev;
778     };
779   } nlcn_msg;
780
781   if (nl_sock == -1)
782     return ret;
783
784   while (42) {
785
786     pthread_mutex_lock(&procevent_lock);
787
788     if (procevent_thread_loop <= 0)
789       return ret;
790
791     pthread_mutex_unlock(&procevent_lock);
792
793     status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
794
795     if (status == 0) {
796       return 0;
797     } else if (status == -1) {
798       if (errno != EINTR) {
799         ERROR("procevent plugin: socket receive error: %d", errno);
800         return -1;
801       }
802     }
803
804     switch (nlcn_msg.proc_ev.what) {
805     case PROC_EVENT_NONE:
806     case PROC_EVENT_FORK:
807     case PROC_EVENT_UID:
808     case PROC_EVENT_GID:
809       // Not of interest in current version
810       break;
811     case PROC_EVENT_EXEC:
812       proc_status = PROCEVENT_STARTED;
813       proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
814       break;
815     case PROC_EVENT_EXIT:
816       proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
817       proc_status = PROCEVENT_EXITED;
818       proc_extra = nlcn_msg.proc_ev.event_data.exit.exit_code;
819       break;
820     default:
821       break;
822     }
823
824     // If we're interested in this process status event, place the event
825     // in the ring buffer for consumption by the main polling thread.
826
827     if (proc_status != -1) {
828       pthread_mutex_lock(&procevent_lock);
829
830       int next = ring.head + 1;
831       if (next >= ring.maxLen)
832         next = 0;
833
834       if (next == ring.tail) {
835         WARNING("procevent plugin: ring buffer full");
836       } else {
837         DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
838               (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
839               (long long unsigned int)CDTIME_T_TO_US(cdtime()));
840
841         if (proc_status == PROCEVENT_EXITED) {
842           ring.buffer[ring.head][0] = proc_id;
843           ring.buffer[ring.head][1] = proc_status;
844           ring.buffer[ring.head][2] = proc_extra;
845           ring.buffer[ring.head][3] =
846               (long long unsigned int)CDTIME_T_TO_US(cdtime());
847         } else {
848           ring.buffer[ring.head][0] = proc_id;
849           ring.buffer[ring.head][1] = proc_status;
850           ring.buffer[ring.head][2] = 0;
851           ring.buffer[ring.head][3] =
852               (long long unsigned int)CDTIME_T_TO_US(cdtime());
853         }
854
855         ring.head = next;
856       }
857
858       pthread_mutex_unlock(&procevent_lock);
859     }
860   }
861
862   return ret;
863 }
864
865 static void *procevent_thread(void *arg) /* {{{ */
866 {
867   pthread_mutex_lock(&procevent_lock);
868
869   while (procevent_thread_loop > 0) {
870     int status;
871
872     pthread_mutex_unlock(&procevent_lock);
873
874     usleep(1000);
875
876     status = read_event();
877
878     pthread_mutex_lock(&procevent_lock);
879
880     if (status < 0) {
881       procevent_thread_error = 1;
882       break;
883     }
884
885     if (procevent_thread_loop <= 0)
886       break;
887   } /* while (procevent_thread_loop > 0) */
888
889   pthread_mutex_unlock(&procevent_lock);
890
891   return ((void *)0);
892 } /* }}} void *procevent_thread */
893
894 static int start_thread(void) /* {{{ */
895 {
896   int status;
897
898   pthread_mutex_lock(&procevent_lock);
899
900   if (procevent_thread_loop != 0) {
901     pthread_mutex_unlock(&procevent_lock);
902     return (0);
903   }
904
905   if (nl_sock == -1) {
906     status = nl_connect();
907
908     if (status != 0)
909       return status;
910
911     status = set_proc_ev_listen(true);
912     if (status == -1)
913       return status;
914   }
915
916   DEBUG("procevent plugin: socket created and bound");
917
918   procevent_thread_loop = 1;
919   procevent_thread_error = 0;
920
921   status = plugin_thread_create(&procevent_thread_id, /* attr = */ NULL,
922                                 procevent_thread,
923                                 /* arg = */ (void *)0, "procevent");
924   if (status != 0) {
925     procevent_thread_loop = 0;
926     ERROR("procevent plugin: Starting thread failed.");
927     pthread_mutex_unlock(&procevent_lock);
928     return (-1);
929   }
930
931   pthread_mutex_unlock(&procevent_lock);
932   return (0);
933 } /* }}} int start_thread */
934
935 static int stop_thread(int shutdown) /* {{{ */
936 {
937   int status;
938
939   if (nl_sock != -1) {
940     status = close(nl_sock);
941     if (status != 0) {
942       ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
943             status, strerror(errno));
944       return (-1);
945     } else
946       nl_sock = -1;
947   }
948
949   pthread_mutex_lock(&procevent_lock);
950
951   if (procevent_thread_loop == 0) {
952     pthread_mutex_unlock(&procevent_lock);
953     return (-1);
954   }
955
956   procevent_thread_loop = 0;
957   pthread_cond_broadcast(&procevent_cond);
958   pthread_mutex_unlock(&procevent_lock);
959
960   if (shutdown == 1) {
961     // Calling pthread_cancel here in
962     // the case of a shutdown just assures that the thread is
963     // gone and that the process has been fully terminated.
964
965     DEBUG("procevent plugin: Canceling thread for process shutdown");
966
967     status = pthread_cancel(procevent_thread_id);
968
969     if (status != 0) {
970       ERROR("procevent plugin: Unable to cancel thread: %d", status);
971       status = -1;
972     }
973   } else {
974     status = pthread_join(procevent_thread_id, /* return = */ NULL);
975     if (status != 0) {
976       ERROR("procevent plugin: Stopping thread failed.");
977       status = -1;
978     }
979   }
980
981   pthread_mutex_lock(&procevent_lock);
982   memset(&procevent_thread_id, 0, sizeof(procevent_thread_id));
983   procevent_thread_error = 0;
984   pthread_mutex_unlock(&procevent_lock);
985
986   DEBUG("procevent plugin: Finished requesting stop of thread");
987
988   return (status);
989 } /* }}} int stop_thread */
990
991 static int procevent_init(void) /* {{{ */
992 {
993   int status;
994
995   ring.head = 0;
996   ring.tail = 0;
997   ring.maxLen = buffer_length;
998   ring.buffer = (long long unsigned int **)malloc(
999       buffer_length * sizeof(long long unsigned int *));
1000
1001   for (int i = 0; i < buffer_length; i++) {
1002     ring.buffer[i] = (long long unsigned int *)malloc(
1003         PROCEVENT_FIELDS * sizeof(long long unsigned int));
1004   }
1005
1006   status = process_map_refresh();
1007
1008   if (status == -1) {
1009     ERROR("procevent plugin: Initial process mapping failed.");
1010     return (-1);
1011   }
1012
1013   if (ignorelist == NULL) {
1014     NOTICE("procevent plugin: No processes have been configured.");
1015     return (-1);
1016   }
1017
1018   return (start_thread());
1019 } /* }}} int procevent_init */
1020
1021 static int procevent_config(const char *key, const char *value) /* {{{ */
1022 {
1023   int status;
1024
1025   if (ignorelist == NULL)
1026     ignorelist = ignorelist_create(/* invert = */ 1);
1027
1028   if (strcasecmp(key, "BufferLength") == 0) {
1029     buffer_length = atoi(value);
1030   } else if (strcasecmp(key, "Process") == 0) {
1031     ignorelist_add(ignorelist, value);
1032   } else if (strcasecmp(key, "RegexProcess") == 0) {
1033 #if HAVE_REGEX_H
1034     status = ignorelist_add(ignorelist, value);
1035
1036     if (status != 0) {
1037       ERROR("procevent plugin: invalid regular expression: %s", value);
1038       return (1);
1039     }
1040 #else
1041     WARNING("procevent plugin: The plugin has been compiled without support "
1042             "for the \"RegexProcess\" option.");
1043 #endif
1044   } else {
1045     return (-1);
1046   }
1047
1048   return (0);
1049 } /* }}} int procevent_config */
1050
1051 static void procevent_dispatch_notification(int pid, const char *type, /* {{{ */
1052                                             gauge_t value, char *process,
1053                                             long long unsigned int timestamp) {
1054   char *buf = NULL;
1055   notification_t n = {NOTIF_FAILURE, cdtime(), "", "", "procevent", "", "", "",
1056                       NULL};
1057
1058   if (value == 1)
1059     n.severity = NOTIF_OKAY;
1060
1061   sstrncpy(n.host, hostname_g, sizeof(n.host));
1062   sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
1063   sstrncpy(n.type, "gauge", sizeof(n.type));
1064   sstrncpy(n.type_instance, "process_status", sizeof(n.type_instance));
1065
1066   gen_message_payload(value, pid, process, timestamp, &buf);
1067
1068   notification_meta_t *m = calloc(1, sizeof(*m));
1069
1070   if (m == NULL) {
1071     char errbuf[1024];
1072     sfree(buf);
1073     ERROR("procevent plugin: unable to allocate metadata: %s",
1074           sstrerror(errno, errbuf, sizeof(errbuf)));
1075     return;
1076   }
1077
1078   sstrncpy(m->name, "ves", sizeof(m->name));
1079   m->nm_value.nm_string = sstrdup(buf);
1080   m->type = NM_TYPE_STRING;
1081   n.meta = m;
1082
1083   DEBUG("procevent plugin: notification message: %s",
1084         n.meta->nm_value.nm_string);
1085
1086   DEBUG("procevent plugin: dispatching state %d for PID %d (%s)", (int)value,
1087         pid, process);
1088
1089   plugin_dispatch_notification(&n);
1090   plugin_notification_meta_free(n.meta);
1091
1092   // malloc'd in gen_message_payload
1093   if (buf != NULL)
1094     sfree(buf);
1095 }
1096
1097 static int procevent_read(void) /* {{{ */
1098 {
1099   if (procevent_thread_error != 0) {
1100     ERROR(
1101         "procevent plugin: The interface thread had a problem. Restarting it.");
1102
1103     stop_thread(0);
1104
1105     start_thread();
1106
1107     return (-1);
1108   } /* if (procevent_thread_error != 0) */
1109
1110   pthread_mutex_lock(&procevent_lock);
1111
1112   while (ring.head != ring.tail) {
1113     int next = ring.tail + 1;
1114
1115     if (next >= ring.maxLen)
1116       next = 0;
1117
1118     if (ring.buffer[ring.tail][1] == PROCEVENT_EXITED) {
1119       processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
1120
1121       if (pl != NULL) {
1122         // This process is of interest to us, so publish its EXITED status
1123         procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1124                                         ring.buffer[ring.tail][1], pl->process,
1125                                         ring.buffer[ring.tail][3]);
1126         DEBUG("procevent plugin: PID %d (%s) EXITED, removing PID from process "
1127               "list",
1128               pl->pid, pl->process);
1129         pl->pid = -1;
1130         pl->last_status = -1;
1131       }
1132     } else if (ring.buffer[ring.tail][1] == PROCEVENT_STARTED) {
1133       // a new process has started, so check if we should monitor it
1134       processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
1135
1136       // If we had already seen this process name and pid combo before,
1137       // and the last message was a "process started" message, don't send
1138       // the notfication again
1139
1140       if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
1141         // This process is of interest to us, so publish its STARTED status
1142         procevent_dispatch_notification(ring.buffer[ring.tail][0], "gauge",
1143                                         ring.buffer[ring.tail][1], pl->process,
1144                                         ring.buffer[ring.tail][3]);
1145
1146         pl->last_status = PROCEVENT_STARTED;
1147
1148         DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
1149               "list",
1150               pl->pid, pl->process);
1151       }
1152     }
1153
1154     ring.tail = next;
1155   }
1156
1157   pthread_mutex_unlock(&procevent_lock);
1158
1159   return (0);
1160 } /* }}} int procevent_read */
1161
1162 static int procevent_shutdown(void) /* {{{ */
1163 {
1164   processlist_t *pl;
1165
1166   DEBUG("procevent plugin: Shutting down thread.");
1167
1168   if (stop_thread(1) < 0)
1169     return (-1);
1170
1171   for (int i = 0; i < buffer_length; i++) {
1172     free(ring.buffer[i]);
1173   }
1174
1175   free(ring.buffer);
1176
1177   pl = processlist_head;
1178   while (pl != NULL) {
1179     processlist_t *pl_next;
1180
1181     pl_next = pl->next;
1182
1183     sfree(pl->process);
1184     sfree(pl);
1185
1186     pl = pl_next;
1187   }
1188
1189   return (0);
1190 } /* }}} int procevent_shutdown */
1191
1192 void module_register(void) {
1193   plugin_register_config("procevent", procevent_config, config_keys,
1194                          config_keys_num);
1195   plugin_register_init("procevent", procevent_init);
1196   plugin_register_read("procevent", procevent_read);
1197   plugin_register_shutdown("procevent", procevent_shutdown);
1198 } /* void module_register */