check_uptime: New plugin, based on new cache_event callback.
[collectd.git] / src / daemon / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27
28 /* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
29 #define _GNU_SOURCE
30
31 #include "collectd.h"
32
33 #include "configfile.h"
34 #include "filter_chain.h"
35 #include "plugin.h"
36 #include "utils/avltree/avltree.h"
37 #include "utils/common/common.h"
38 #include "utils/heap/heap.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "utils_llist.h"
42 #include "utils_random.h"
43 #include "utils_time.h"
44
45 #ifdef WIN32
46 #define EXPORT __declspec(dllexport)
47 #include <sys/stat.h>
48 #include <unistd.h>
49 #else
50 #define EXPORT
51 #endif
52
53 #if HAVE_PTHREAD_NP_H
54 #include <pthread_np.h> /* for pthread_set_name_np(3) */
55 #endif
56
57 #include <dlfcn.h>
58
59 /*
60  * Private structures
61  */
62 struct callback_func_s {
63   void *cf_callback;
64   user_data_t cf_udata;
65   plugin_ctx_t cf_ctx;
66 };
67 typedef struct callback_func_s callback_func_t;
68
69 #define RF_SIMPLE 0
70 #define RF_COMPLEX 1
71 #define RF_REMOVE 65535
72 struct read_func_s {
73 /* `read_func_t' "inherits" from `callback_func_t'.
74  * The `rf_super' member MUST be the first one in this structure! */
75 #define rf_callback rf_super.cf_callback
76 #define rf_udata rf_super.cf_udata
77 #define rf_ctx rf_super.cf_ctx
78   callback_func_t rf_super;
79   char rf_group[DATA_MAX_NAME_LEN];
80   char *rf_name;
81   int rf_type;
82   cdtime_t rf_interval;
83   cdtime_t rf_effective_interval;
84   cdtime_t rf_next_read;
85 };
86 typedef struct read_func_s read_func_t;
87
88 struct cache_event_func_s {
89   plugin_cache_event_cb callback;
90   char *name;
91   user_data_t user_data;
92   plugin_ctx_t plugin_ctx;
93 };
94 typedef struct cache_event_func_s cache_event_func_t;
95
96 struct write_queue_s;
97 typedef struct write_queue_s write_queue_t;
98 struct write_queue_s {
99   value_list_t *vl;
100   plugin_ctx_t ctx;
101   write_queue_t *next;
102 };
103
104 struct flush_callback_s {
105   char *name;
106   cdtime_t timeout;
107 };
108 typedef struct flush_callback_s flush_callback_t;
109
110 /*
111  * Private variables
112  */
113 static c_avl_tree_t *plugins_loaded;
114
115 static llist_t *list_init;
116 static llist_t *list_write;
117 static llist_t *list_flush;
118 static llist_t *list_missing;
119 static llist_t *list_shutdown;
120 static llist_t *list_log;
121 static llist_t *list_notification;
122
123 static size_t list_cache_event_num;
124 static cache_event_func_t list_cache_event[32];
125
126 static fc_chain_t *pre_cache_chain;
127 static fc_chain_t *post_cache_chain;
128
129 static c_avl_tree_t *data_sets;
130
131 static char *plugindir;
132
133 #ifndef DEFAULT_MAX_READ_INTERVAL
134 #define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
135 #endif
136 static c_heap_t *read_heap;
137 static llist_t *read_list;
138 static int read_loop = 1;
139 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
140 static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
141 static pthread_t *read_threads;
142 static size_t read_threads_num;
143 static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
144
145 static write_queue_t *write_queue_head;
146 static write_queue_t *write_queue_tail;
147 static long write_queue_length;
148 static bool write_loop = true;
149 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
150 static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
151 static pthread_t *write_threads;
152 static size_t write_threads_num;
153
154 static pthread_key_t plugin_ctx_key;
155 static bool plugin_ctx_key_initialized;
156
157 static long write_limit_high;
158 static long write_limit_low;
159
160 static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
161 static derive_t stats_values_dropped;
162 static bool record_statistics;
163
164 /*
165  * Static functions
166  */
167 static int plugin_dispatch_values_internal(value_list_t *vl);
168
169 static const char *plugin_get_dir(void) {
170   if (plugindir == NULL)
171     return PLUGINDIR;
172   else
173     return plugindir;
174 }
175
176 static int plugin_update_internal_statistics(void) { /* {{{ */
177   gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
178
179   /* Initialize `vl' */
180   value_list_t vl = VALUE_LIST_INIT;
181   sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
182   vl.interval = plugin_get_interval();
183
184   /* Write queue */
185   sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
186
187   /* Write queue : queue length */
188   vl.values = &(value_t){.gauge = copy_write_queue_length};
189   vl.values_len = 1;
190   sstrncpy(vl.type, "queue_length", sizeof(vl.type));
191   vl.type_instance[0] = 0;
192   plugin_dispatch_values(&vl);
193
194   /* Write queue : Values dropped (queue length > low limit) */
195   vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
196   vl.values_len = 1;
197   sstrncpy(vl.type, "derive", sizeof(vl.type));
198   sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
199   plugin_dispatch_values(&vl);
200
201   /* Cache */
202   sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
203
204   /* Cache : Nb entry in cache tree */
205   vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
206   vl.values_len = 1;
207   sstrncpy(vl.type, "cache_size", sizeof(vl.type));
208   vl.type_instance[0] = 0;
209   plugin_dispatch_values(&vl);
210
211   return 0;
212 } /* }}} int plugin_update_internal_statistics */
213
214 static void free_userdata(user_data_t const *ud) /* {{{ */
215 {
216   if (ud == NULL)
217     return;
218
219   if ((ud->data != NULL) && (ud->free_func != NULL)) {
220     ud->free_func(ud->data);
221   }
222 } /* }}} void free_userdata */
223
224 static void destroy_callback(callback_func_t *cf) /* {{{ */
225 {
226   if (cf == NULL)
227     return;
228   free_userdata(&cf->cf_udata);
229   sfree(cf);
230 } /* }}} void destroy_callback */
231
232 static void destroy_all_callbacks(llist_t **list) /* {{{ */
233 {
234   llentry_t *le;
235
236   if (*list == NULL)
237     return;
238
239   le = llist_head(*list);
240   while (le != NULL) {
241     llentry_t *le_next;
242
243     le_next = le->next;
244
245     sfree(le->key);
246     destroy_callback(le->value);
247     le->value = NULL;
248
249     le = le_next;
250   }
251
252   llist_destroy(*list);
253   *list = NULL;
254 } /* }}} void destroy_all_callbacks */
255
256 static void destroy_read_heap(void) /* {{{ */
257 {
258   if (read_heap == NULL)
259     return;
260
261   while (42) {
262     read_func_t *rf;
263
264     rf = c_heap_get_root(read_heap);
265     if (rf == NULL)
266       break;
267     sfree(rf->rf_name);
268     destroy_callback((callback_func_t *)rf);
269   }
270
271   c_heap_destroy(read_heap);
272   read_heap = NULL;
273 } /* }}} void destroy_read_heap */
274
275 static int register_callback(llist_t **list, /* {{{ */
276                              const char *name, callback_func_t *cf) {
277
278   if (*list == NULL) {
279     *list = llist_create();
280     if (*list == NULL) {
281       ERROR("plugin: register_callback: "
282             "llist_create failed.");
283       destroy_callback(cf);
284       return -1;
285     }
286   }
287
288   char *key = strdup(name);
289   if (key == NULL) {
290     ERROR("plugin: register_callback: strdup failed.");
291     destroy_callback(cf);
292     return -1;
293   }
294
295   llentry_t *le = llist_search(*list, name);
296   if (le == NULL) {
297     le = llentry_create(key, cf);
298     if (le == NULL) {
299       ERROR("plugin: register_callback: "
300             "llentry_create failed.");
301       sfree(key);
302       destroy_callback(cf);
303       return -1;
304     }
305
306     llist_append(*list, le);
307   } else {
308     callback_func_t *old_cf = le->value;
309     le->value = cf;
310
311     P_WARNING("register_callback: "
312               "a callback named `%s' already exists - "
313               "overwriting the old entry!",
314               name);
315
316     destroy_callback(old_cf);
317     sfree(key);
318   }
319
320   return 0;
321 } /* }}} int register_callback */
322
323 static void log_list_callbacks(llist_t **list, /* {{{ */
324                                const char *comment) {
325   char *str;
326   int len;
327   int i;
328   llentry_t *le;
329   int n;
330
331   n = llist_size(*list);
332   if (n == 0) {
333     INFO("%s: [none]", comment);
334     return;
335   }
336
337   char **keys = calloc(n, sizeof(*keys));
338   if (keys == NULL) {
339     ERROR("%s: failed to allocate memory for list of callbacks", comment);
340     return;
341   }
342
343   for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) {
344     keys[i] = le->key;
345     len += strlen(le->key) + 6;
346   }
347   str = malloc(len + 10);
348   if (str == NULL) {
349     ERROR("%s: failed to allocate memory for list of callbacks", comment);
350   } else {
351     *str = '\0';
352     strjoin(str, len, keys, n, "', '");
353     INFO("%s ['%s']", comment, str);
354     sfree(str);
355   }
356   sfree(keys);
357 } /* }}} void log_list_callbacks */
358
359 static int create_register_callback(llist_t **list, /* {{{ */
360                                     const char *name, void *callback,
361                                     user_data_t const *ud) {
362
363   if (name == NULL || callback == NULL)
364     return EINVAL;
365
366   callback_func_t *cf = calloc(1, sizeof(*cf));
367   if (cf == NULL) {
368     free_userdata(ud);
369     ERROR("plugin: create_register_callback: calloc failed.");
370     return ENOMEM;
371   }
372
373   cf->cf_callback = callback;
374   if (ud == NULL) {
375     cf->cf_udata = (user_data_t){
376         .data = NULL, .free_func = NULL,
377     };
378   } else {
379     cf->cf_udata = *ud;
380   }
381
382   cf->cf_ctx = plugin_get_ctx();
383
384   return register_callback(list, name, cf);
385 } /* }}} int create_register_callback */
386
387 static int plugin_unregister(llist_t *list, const char *name) /* {{{ */
388 {
389   llentry_t *e;
390
391   if (list == NULL)
392     return -1;
393
394   e = llist_search(list, name);
395   if (e == NULL)
396     return -1;
397
398   llist_remove(list, e);
399
400   sfree(e->key);
401   destroy_callback(e->value);
402
403   llentry_destroy(e);
404
405   return 0;
406 } /* }}} int plugin_unregister */
407
408 /* plugin_load_file loads the shared object "file" and calls its
409  * "module_register" function. Returns zero on success, non-zero otherwise. */
410 static int plugin_load_file(char const *file, bool global) {
411   int flags = RTLD_NOW;
412   if (global)
413     flags |= RTLD_GLOBAL;
414
415   void *dlh = dlopen(file, flags);
416   if (dlh == NULL) {
417     char errbuf[1024] = "";
418
419     snprintf(errbuf, sizeof(errbuf),
420              "dlopen(\"%s\") failed: %s. "
421              "The most common cause for this problem is missing dependencies. "
422              "Use ldd(1) to check the dependencies of the plugin / shared "
423              "object.",
424              file, dlerror());
425
426     /* This error is printed to STDERR unconditionally. If list_log is NULL,
427      * plugin_log() will also print to STDERR. We avoid duplicate output by
428      * checking that the list of log handlers, list_log, is not NULL. */
429     fprintf(stderr, "ERROR: %s\n", errbuf);
430     if (list_log != NULL) {
431       ERROR("%s", errbuf);
432     }
433
434     return ENOENT;
435   }
436
437   void (*reg_handle)(void) = dlsym(dlh, "module_register");
438   if (reg_handle == NULL) {
439     ERROR("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file,
440           dlerror());
441     dlclose(dlh);
442     return ENOENT;
443   }
444
445   (*reg_handle)();
446   return 0;
447 }
448
449 static void *plugin_read_thread(void __attribute__((unused)) * args) {
450   while (read_loop != 0) {
451     read_func_t *rf;
452     plugin_ctx_t old_ctx;
453     cdtime_t start;
454     cdtime_t now;
455     cdtime_t elapsed;
456     int status;
457     int rf_type;
458     int rc;
459
460     /* Get the read function that needs to be read next.
461      * We don't need to hold "read_lock" for the heap, but we need
462      * to call c_heap_get_root() and pthread_cond_wait() in the
463      * same protected block. */
464     pthread_mutex_lock(&read_lock);
465     rf = c_heap_get_root(read_heap);
466     if (rf == NULL) {
467       pthread_cond_wait(&read_cond, &read_lock);
468       pthread_mutex_unlock(&read_lock);
469       continue;
470     }
471     pthread_mutex_unlock(&read_lock);
472
473     if (rf->rf_interval == 0) {
474       /* this should not happen, because the interval is set
475        * for each plugin when loading it
476        * XXX: issue a warning? */
477       rf->rf_interval = plugin_get_interval();
478       rf->rf_effective_interval = rf->rf_interval;
479
480       rf->rf_next_read = cdtime();
481     }
482
483     /* sleep until this entry is due,
484      * using pthread_cond_timedwait */
485     pthread_mutex_lock(&read_lock);
486     /* In pthread_cond_timedwait, spurious wakeups are possible
487      * (and really happen, at least on NetBSD with > 1 CPU), thus
488      * we need to re-evaluate the condition every time
489      * pthread_cond_timedwait returns. */
490     rc = 0;
491     while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
492       rc = pthread_cond_timedwait(&read_cond, &read_lock,
493                                   &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
494     }
495
496     /* Must hold `read_lock' when accessing `rf->rf_type'. */
497     rf_type = rf->rf_type;
498     pthread_mutex_unlock(&read_lock);
499
500     /* Check if we're supposed to stop.. This may have interrupted
501      * the sleep, too. */
502     if (read_loop == 0) {
503       /* Insert `rf' again, so it can be free'd correctly */
504       c_heap_insert(read_heap, rf);
505       break;
506     }
507
508     /* The entry has been marked for deletion. The linked list
509      * entry has already been removed by `plugin_unregister_read'.
510      * All we have to do here is free the `read_func_t' and
511      * continue. */
512     if (rf_type == RF_REMOVE) {
513       DEBUG("plugin_read_thread: Destroying the `%s' "
514             "callback.",
515             rf->rf_name);
516       sfree(rf->rf_name);
517       destroy_callback((callback_func_t *)rf);
518       rf = NULL;
519       continue;
520     }
521
522     DEBUG("plugin_read_thread: Handling `%s'.", rf->rf_name);
523
524     start = cdtime();
525
526     old_ctx = plugin_set_ctx(rf->rf_ctx);
527
528     if (rf_type == RF_SIMPLE) {
529       int (*callback)(void);
530
531       callback = rf->rf_callback;
532       status = (*callback)();
533     } else {
534       plugin_read_cb callback;
535
536       assert(rf_type == RF_COMPLEX);
537
538       callback = rf->rf_callback;
539       status = (*callback)(&rf->rf_udata);
540     }
541
542     plugin_set_ctx(old_ctx);
543
544     /* If the function signals failure, we will increase the
545      * intervals in which it will be called. */
546     if (status != 0) {
547       rf->rf_effective_interval *= 2;
548       if (rf->rf_effective_interval > max_read_interval)
549         rf->rf_effective_interval = max_read_interval;
550
551       NOTICE("read-function of plugin `%s' failed. "
552              "Will suspend it for %.3f seconds.",
553              rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
554     } else {
555       /* Success: Restore the interval, if it was changed. */
556       rf->rf_effective_interval = rf->rf_interval;
557     }
558
559     /* update the ``next read due'' field */
560     now = cdtime();
561
562     /* calculate the time spent in the read function */
563     elapsed = (now - start);
564
565     if (elapsed > rf->rf_effective_interval)
566       WARNING(
567           "plugin_read_thread: read-function of the `%s' plugin took %.3f "
568           "seconds, which is above its read interval (%.3f seconds). You might "
569           "want to adjust the `Interval' or `ReadThreads' settings.",
570           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
571           CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
572
573     DEBUG("plugin_read_thread: read-function of the `%s' plugin took "
574           "%.6f seconds.",
575           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
576
577     DEBUG("plugin_read_thread: Effective interval of the "
578           "`%s' plugin is %.3f seconds.",
579           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
580
581     /* Calculate the next (absolute) time at which this function
582      * should be called. */
583     rf->rf_next_read += rf->rf_effective_interval;
584
585     /* Check, if `rf_next_read' is in the past. */
586     if (rf->rf_next_read < now) {
587       /* `rf_next_read' is in the past. Insert `now'
588        * so this value doesn't trail off into the
589        * past too much. */
590       rf->rf_next_read = now;
591     }
592
593     DEBUG("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
594           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_next_read));
595
596     /* Re-insert this read function into the heap again. */
597     c_heap_insert(read_heap, rf);
598   } /* while (read_loop) */
599
600   pthread_exit(NULL);
601   return (void *)0;
602 } /* void *plugin_read_thread */
603
604 #ifdef PTHREAD_MAX_NAMELEN_NP
605 #define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
606 #else
607 #define THREAD_NAME_MAX 16
608 #endif
609
610 static void set_thread_name(pthread_t tid, char const *name) {
611 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
612
613   /* glibc limits the length of the name and fails if the passed string
614    * is too long, so we truncate it here. */
615   char n[THREAD_NAME_MAX];
616   if (strlen(name) >= THREAD_NAME_MAX)
617     WARNING("set_thread_name(\"%s\"): name too long", name);
618   sstrncpy(n, name, sizeof(n));
619
620 #if defined(HAVE_PTHREAD_SETNAME_NP)
621   int status = pthread_setname_np(tid, n);
622   if (status != 0) {
623     ERROR("set_thread_name(\"%s\"): %s", n, STRERROR(status));
624   }
625 #else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
626   pthread_set_name_np(tid, n);
627 #endif
628
629 #endif
630 }
631
632 static void start_read_threads(size_t num) /* {{{ */
633 {
634   if (read_threads != NULL)
635     return;
636
637   read_threads = calloc(num, sizeof(*read_threads));
638   if (read_threads == NULL) {
639     ERROR("plugin: start_read_threads: calloc failed.");
640     return;
641   }
642
643   read_threads_num = 0;
644   for (size_t i = 0; i < num; i++) {
645     int status = pthread_create(read_threads + read_threads_num,
646                                 /* attr = */ NULL, plugin_read_thread,
647                                 /* arg = */ NULL);
648     if (status != 0) {
649       ERROR("plugin: start_read_threads: pthread_create failed with status %i "
650             "(%s).",
651             status, STRERROR(status));
652       return;
653     }
654
655     char name[THREAD_NAME_MAX];
656     snprintf(name, sizeof(name), "reader#%" PRIu64, (uint64_t)read_threads_num);
657     set_thread_name(read_threads[read_threads_num], name);
658
659     read_threads_num++;
660   } /* for (i) */
661 } /* }}} void start_read_threads */
662
663 static void stop_read_threads(void) {
664   if (read_threads == NULL)
665     return;
666
667   INFO("collectd: Stopping %" PRIsz " read threads.", read_threads_num);
668
669   pthread_mutex_lock(&read_lock);
670   read_loop = 0;
671   DEBUG("plugin: stop_read_threads: Signalling `read_cond'");
672   pthread_cond_broadcast(&read_cond);
673   pthread_mutex_unlock(&read_lock);
674
675   for (size_t i = 0; i < read_threads_num; i++) {
676     if (pthread_join(read_threads[i], NULL) != 0) {
677       ERROR("plugin: stop_read_threads: pthread_join failed.");
678     }
679     read_threads[i] = (pthread_t)0;
680   }
681   sfree(read_threads);
682   read_threads_num = 0;
683 } /* void stop_read_threads */
684
685 static void plugin_value_list_free(value_list_t *vl) /* {{{ */
686 {
687   if (vl == NULL)
688     return;
689
690   meta_data_destroy(vl->meta);
691   sfree(vl->values);
692   sfree(vl);
693 } /* }}} void plugin_value_list_free */
694
695 static value_list_t *
696 plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */
697 {
698   value_list_t *vl;
699
700   if (vl_orig == NULL)
701     return NULL;
702
703   vl = malloc(sizeof(*vl));
704   if (vl == NULL)
705     return NULL;
706   memcpy(vl, vl_orig, sizeof(*vl));
707
708   if (vl->host[0] == 0)
709     sstrncpy(vl->host, hostname_g, sizeof(vl->host));
710
711   vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
712   if (vl->values == NULL) {
713     plugin_value_list_free(vl);
714     return NULL;
715   }
716   memcpy(vl->values, vl_orig->values,
717          vl_orig->values_len * sizeof(*vl->values));
718
719   vl->meta = meta_data_clone(vl->meta);
720   if ((vl_orig->meta != NULL) && (vl->meta == NULL)) {
721     plugin_value_list_free(vl);
722     return NULL;
723   }
724
725   if (vl->time == 0)
726     vl->time = cdtime();
727
728   /* Fill in the interval from the thread context, if it is zero. */
729   if (vl->interval == 0)
730     vl->interval = plugin_get_interval();
731
732   return vl;
733 } /* }}} value_list_t *plugin_value_list_clone */
734
735 static int plugin_write_enqueue(value_list_t const *vl) /* {{{ */
736 {
737   write_queue_t *q;
738
739   q = malloc(sizeof(*q));
740   if (q == NULL)
741     return ENOMEM;
742   q->next = NULL;
743
744   q->vl = plugin_value_list_clone(vl);
745   if (q->vl == NULL) {
746     sfree(q);
747     return ENOMEM;
748   }
749
750   /* Store context of caller (read plugin); otherwise, it would not be
751    * available to the write plugins when actually dispatching the
752    * value-list later on. */
753   q->ctx = plugin_get_ctx();
754
755   pthread_mutex_lock(&write_lock);
756
757   if (write_queue_tail == NULL) {
758     write_queue_head = q;
759     write_queue_tail = q;
760     write_queue_length = 1;
761   } else {
762     write_queue_tail->next = q;
763     write_queue_tail = q;
764     write_queue_length += 1;
765   }
766
767   pthread_cond_signal(&write_cond);
768   pthread_mutex_unlock(&write_lock);
769
770   return 0;
771 } /* }}} int plugin_write_enqueue */
772
773 static value_list_t *plugin_write_dequeue(void) /* {{{ */
774 {
775   write_queue_t *q;
776   value_list_t *vl;
777
778   pthread_mutex_lock(&write_lock);
779
780   while (write_loop && (write_queue_head == NULL))
781     pthread_cond_wait(&write_cond, &write_lock);
782
783   if (write_queue_head == NULL) {
784     pthread_mutex_unlock(&write_lock);
785     return NULL;
786   }
787
788   q = write_queue_head;
789   write_queue_head = q->next;
790   write_queue_length -= 1;
791   if (write_queue_head == NULL) {
792     write_queue_tail = NULL;
793     assert(0 == write_queue_length);
794   }
795
796   pthread_mutex_unlock(&write_lock);
797
798   (void)plugin_set_ctx(q->ctx);
799
800   vl = q->vl;
801   sfree(q);
802   return vl;
803 } /* }}} value_list_t *plugin_write_dequeue */
804
805 static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */
806 {
807   while (write_loop) {
808     value_list_t *vl = plugin_write_dequeue();
809     if (vl == NULL)
810       continue;
811
812     plugin_dispatch_values_internal(vl);
813
814     plugin_value_list_free(vl);
815   }
816
817   pthread_exit(NULL);
818   return (void *)0;
819 } /* }}} void *plugin_write_thread */
820
821 static void start_write_threads(size_t num) /* {{{ */
822 {
823   if (write_threads != NULL)
824     return;
825
826   write_threads = calloc(num, sizeof(*write_threads));
827   if (write_threads == NULL) {
828     ERROR("plugin: start_write_threads: calloc failed.");
829     return;
830   }
831
832   write_threads_num = 0;
833   for (size_t i = 0; i < num; i++) {
834     int status = pthread_create(write_threads + write_threads_num,
835                                 /* attr = */ NULL, plugin_write_thread,
836                                 /* arg = */ NULL);
837     if (status != 0) {
838       ERROR("plugin: start_write_threads: pthread_create failed with status %i "
839             "(%s).",
840             status, STRERROR(status));
841       return;
842     }
843
844     char name[THREAD_NAME_MAX];
845     snprintf(name, sizeof(name), "writer#%" PRIu64,
846              (uint64_t)write_threads_num);
847     set_thread_name(write_threads[write_threads_num], name);
848
849     write_threads_num++;
850   } /* for (i) */
851 } /* }}} void start_write_threads */
852
853 static void stop_write_threads(void) /* {{{ */
854 {
855   write_queue_t *q;
856   size_t i;
857
858   if (write_threads == NULL)
859     return;
860
861   INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num);
862
863   pthread_mutex_lock(&write_lock);
864   write_loop = false;
865   DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
866   pthread_cond_broadcast(&write_cond);
867   pthread_mutex_unlock(&write_lock);
868
869   for (i = 0; i < write_threads_num; i++) {
870     if (pthread_join(write_threads[i], NULL) != 0) {
871       ERROR("plugin: stop_write_threads: pthread_join failed.");
872     }
873     write_threads[i] = (pthread_t)0;
874   }
875   sfree(write_threads);
876   write_threads_num = 0;
877
878   pthread_mutex_lock(&write_lock);
879   i = 0;
880   for (q = write_queue_head; q != NULL;) {
881     write_queue_t *q1 = q;
882     plugin_value_list_free(q->vl);
883     q = q->next;
884     sfree(q1);
885     i++;
886   }
887   write_queue_head = NULL;
888   write_queue_tail = NULL;
889   write_queue_length = 0;
890   pthread_mutex_unlock(&write_lock);
891
892   if (i > 0) {
893     WARNING("plugin: %" PRIsz " value list%s left after shutting down "
894             "the write threads.",
895             i, (i == 1) ? " was" : "s were");
896   }
897 } /* }}} void stop_write_threads */
898
899 /*
900  * Public functions
901  */
902 void plugin_set_dir(const char *dir) {
903   sfree(plugindir);
904
905   if (dir == NULL) {
906     plugindir = NULL;
907     return;
908   }
909
910   plugindir = strdup(dir);
911   if (plugindir == NULL)
912     ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
913 }
914
915 static bool plugin_is_loaded(char const *name) {
916   int status;
917
918   if (plugins_loaded == NULL)
919     plugins_loaded =
920         c_avl_create((int (*)(const void *, const void *))strcasecmp);
921   assert(plugins_loaded != NULL);
922
923   status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
924   return status == 0;
925 }
926
927 static int plugin_mark_loaded(char const *name) {
928   char *name_copy;
929   int status;
930
931   name_copy = strdup(name);
932   if (name_copy == NULL)
933     return ENOMEM;
934
935   status = c_avl_insert(plugins_loaded,
936                         /* key = */ name_copy, /* value = */ NULL);
937   return status;
938 }
939
940 static void plugin_free_loaded(void) {
941   void *key;
942   void *value;
943
944   if (plugins_loaded == NULL)
945     return;
946
947   while (c_avl_pick(plugins_loaded, &key, &value) == 0) {
948     sfree(key);
949     assert(value == NULL);
950   }
951
952   c_avl_destroy(plugins_loaded);
953   plugins_loaded = NULL;
954 }
955
956 #define BUFSIZE 512
957 #ifdef WIN32
958 #define SHLIB_SUFFIX ".dll"
959 #else
960 #define SHLIB_SUFFIX ".so"
961 #endif
962 int plugin_load(char const *plugin_name, bool global) {
963   DIR *dh;
964   const char *dir;
965   char filename[BUFSIZE] = "";
966   char typename[BUFSIZE];
967   int ret;
968   struct stat statbuf;
969   struct dirent *de;
970   int status;
971
972   if (plugin_name == NULL)
973     return EINVAL;
974
975   /* Check if plugin is already loaded and don't do anything in this
976    * case. */
977   if (plugin_is_loaded(plugin_name))
978     return 0;
979
980   dir = plugin_get_dir();
981   ret = 1;
982
983   /*
984    * XXX: Magic at work:
985    *
986    * Some of the language bindings, for example the Python and Perl
987    * plugins, need to be able to export symbols to the scripts they run.
988    * For this to happen, the "Globals" flag needs to be set.
989    * Unfortunately, this technical detail is hard to explain to the
990    * average user and she shouldn't have to worry about this, ideally.
991    * So in order to save everyone's sanity use a different default for a
992    * handful of special plugins. --octo
993    */
994   if ((strcasecmp("perl", plugin_name) == 0) ||
995       (strcasecmp("python", plugin_name) == 0))
996     global = true;
997
998   /* `cpu' should not match `cpufreq'. To solve this we add SHLIB_SUFFIX to the
999    * type when matching the filename */
1000   status = snprintf(typename, sizeof(typename), "%s" SHLIB_SUFFIX, plugin_name);
1001   if ((status < 0) || ((size_t)status >= sizeof(typename))) {
1002     WARNING("plugin_load: Filename too long: \"%s" SHLIB_SUFFIX "\"",
1003             plugin_name);
1004     return -1;
1005   }
1006
1007   if ((dh = opendir(dir)) == NULL) {
1008     ERROR("plugin_load: opendir (%s) failed: %s", dir, STRERRNO);
1009     return -1;
1010   }
1011
1012   while ((de = readdir(dh)) != NULL) {
1013     if (strcasecmp(de->d_name, typename))
1014       continue;
1015
1016     status = snprintf(filename, sizeof(filename), "%s/%s", dir, de->d_name);
1017     if ((status < 0) || ((size_t)status >= sizeof(filename))) {
1018       WARNING("plugin_load: Filename too long: \"%s/%s\"", dir, de->d_name);
1019       continue;
1020     }
1021
1022     if (lstat(filename, &statbuf) == -1) {
1023       WARNING("plugin_load: stat (\"%s\") failed: %s", filename, STRERRNO);
1024       continue;
1025     } else if (!S_ISREG(statbuf.st_mode)) {
1026       /* don't follow symlinks */
1027       WARNING("plugin_load: %s is not a regular file.", filename);
1028       continue;
1029     }
1030
1031     status = plugin_load_file(filename, global);
1032     if (status == 0) {
1033       /* success */
1034       plugin_mark_loaded(plugin_name);
1035       ret = 0;
1036       INFO("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1037       break;
1038     } else {
1039       ERROR("plugin_load: Load plugin \"%s\" failed with "
1040             "status %i.",
1041             plugin_name, status);
1042     }
1043   }
1044
1045   closedir(dh);
1046
1047   if (filename[0] == 0)
1048     ERROR("plugin_load: Could not find plugin \"%s\" in %s", plugin_name, dir);
1049
1050   return ret;
1051 }
1052
1053 /*
1054  * The `register_*' functions follow
1055  */
1056 EXPORT int plugin_register_config(const char *name,
1057                                   int (*callback)(const char *key,
1058                                                   const char *val),
1059                                   const char **keys, int keys_num) {
1060   cf_register(name, callback, keys, keys_num);
1061   return 0;
1062 } /* int plugin_register_config */
1063
1064 EXPORT int plugin_register_complex_config(const char *type,
1065                                           int (*callback)(oconfig_item_t *)) {
1066   return cf_register_complex(type, callback);
1067 } /* int plugin_register_complex_config */
1068
1069 EXPORT int plugin_register_init(const char *name, int (*callback)(void)) {
1070   return create_register_callback(&list_init, name, (void *)callback, NULL);
1071 } /* plugin_register_init */
1072
1073 static int plugin_compare_read_func(const void *arg0, const void *arg1) {
1074   const read_func_t *rf0;
1075   const read_func_t *rf1;
1076
1077   rf0 = arg0;
1078   rf1 = arg1;
1079
1080   if (rf0->rf_next_read < rf1->rf_next_read)
1081     return -1;
1082   else if (rf0->rf_next_read > rf1->rf_next_read)
1083     return 1;
1084   else
1085     return 0;
1086 } /* int plugin_compare_read_func */
1087
1088 /* Add a read function to both, the heap and a linked list. The linked list if
1089  * used to look-up read functions, especially for the remove function. The heap
1090  * is used to determine which plugin to read next. */
1091 static int plugin_insert_read(read_func_t *rf) {
1092   int status;
1093   llentry_t *le;
1094
1095   rf->rf_next_read = cdtime();
1096   rf->rf_effective_interval = rf->rf_interval;
1097
1098   pthread_mutex_lock(&read_lock);
1099
1100   if (read_list == NULL) {
1101     read_list = llist_create();
1102     if (read_list == NULL) {
1103       pthread_mutex_unlock(&read_lock);
1104       ERROR("plugin_insert_read: read_list failed.");
1105       return -1;
1106     }
1107   }
1108
1109   if (read_heap == NULL) {
1110     read_heap = c_heap_create(plugin_compare_read_func);
1111     if (read_heap == NULL) {
1112       pthread_mutex_unlock(&read_lock);
1113       ERROR("plugin_insert_read: c_heap_create failed.");
1114       return -1;
1115     }
1116   }
1117
1118   le = llist_search(read_list, rf->rf_name);
1119   if (le != NULL) {
1120     pthread_mutex_unlock(&read_lock);
1121     P_WARNING("The read function \"%s\" is already registered. "
1122               "Check for duplicates in your configuration!",
1123               rf->rf_name);
1124     return EINVAL;
1125   }
1126
1127   le = llentry_create(rf->rf_name, rf);
1128   if (le == NULL) {
1129     pthread_mutex_unlock(&read_lock);
1130     ERROR("plugin_insert_read: llentry_create failed.");
1131     return -1;
1132   }
1133
1134   status = c_heap_insert(read_heap, rf);
1135   if (status != 0) {
1136     pthread_mutex_unlock(&read_lock);
1137     ERROR("plugin_insert_read: c_heap_insert failed.");
1138     llentry_destroy(le);
1139     return -1;
1140   }
1141
1142   /* This does not fail. */
1143   llist_append(read_list, le);
1144
1145   /* Wake up all the read threads. */
1146   pthread_cond_broadcast(&read_cond);
1147   pthread_mutex_unlock(&read_lock);
1148   return 0;
1149 } /* int plugin_insert_read */
1150
1151 EXPORT int plugin_register_read(const char *name, int (*callback)(void)) {
1152   read_func_t *rf;
1153   int status;
1154
1155   rf = calloc(1, sizeof(*rf));
1156   if (rf == NULL) {
1157     ERROR("plugin_register_read: calloc failed.");
1158     return ENOMEM;
1159   }
1160
1161   rf->rf_callback = (void *)callback;
1162   rf->rf_udata.data = NULL;
1163   rf->rf_udata.free_func = NULL;
1164   rf->rf_ctx = plugin_get_ctx();
1165   rf->rf_group[0] = '\0';
1166   rf->rf_name = strdup(name);
1167   rf->rf_type = RF_SIMPLE;
1168   rf->rf_interval = plugin_get_interval();
1169   rf->rf_ctx.interval = rf->rf_interval;
1170
1171   status = plugin_insert_read(rf);
1172   if (status != 0) {
1173     sfree(rf->rf_name);
1174     sfree(rf);
1175   }
1176
1177   return status;
1178 } /* int plugin_register_read */
1179
1180 EXPORT int plugin_register_complex_read(const char *group, const char *name,
1181                                         plugin_read_cb callback,
1182                                         cdtime_t interval,
1183                                         user_data_t const *user_data) {
1184   read_func_t *rf;
1185   int status;
1186
1187   rf = calloc(1, sizeof(*rf));
1188   if (rf == NULL) {
1189     free_userdata(user_data);
1190     ERROR("plugin_register_complex_read: calloc failed.");
1191     return ENOMEM;
1192   }
1193
1194   rf->rf_callback = (void *)callback;
1195   if (group != NULL)
1196     sstrncpy(rf->rf_group, group, sizeof(rf->rf_group));
1197   else
1198     rf->rf_group[0] = '\0';
1199   rf->rf_name = strdup(name);
1200   rf->rf_type = RF_COMPLEX;
1201   rf->rf_interval = (interval != 0) ? interval : plugin_get_interval();
1202
1203   /* Set user data */
1204   if (user_data == NULL) {
1205     rf->rf_udata.data = NULL;
1206     rf->rf_udata.free_func = NULL;
1207   } else {
1208     rf->rf_udata = *user_data;
1209   }
1210
1211   rf->rf_ctx = plugin_get_ctx();
1212   rf->rf_ctx.interval = rf->rf_interval;
1213
1214   status = plugin_insert_read(rf);
1215   if (status != 0) {
1216     free_userdata(&rf->rf_udata);
1217     sfree(rf->rf_name);
1218     sfree(rf);
1219   }
1220
1221   return status;
1222 } /* int plugin_register_complex_read */
1223
1224 EXPORT int plugin_register_write(const char *name, plugin_write_cb callback,
1225                                  user_data_t const *ud) {
1226   return create_register_callback(&list_write, name, (void *)callback, ud);
1227 } /* int plugin_register_write */
1228
1229 static int plugin_flush_timeout_callback(user_data_t *ud) {
1230   flush_callback_t *cb = ud->data;
1231
1232   return plugin_flush(cb->name, cb->timeout, NULL);
1233 } /* static int plugin_flush_callback */
1234
1235 static void plugin_flush_timeout_callback_free(void *data) {
1236   flush_callback_t *cb = data;
1237
1238   if (cb == NULL)
1239     return;
1240
1241   sfree(cb->name);
1242   sfree(cb);
1243 } /* static void plugin_flush_callback_free */
1244
1245 static char *plugin_flush_callback_name(const char *name) {
1246   const char *flush_prefix = "flush/";
1247   size_t prefix_size;
1248   char *flush_name;
1249   size_t name_size;
1250
1251   prefix_size = strlen(flush_prefix);
1252   name_size = strlen(name);
1253
1254   flush_name = malloc(name_size + prefix_size + 1);
1255   if (flush_name == NULL) {
1256     ERROR("plugin_flush_callback_name: malloc failed.");
1257     return NULL;
1258   }
1259
1260   sstrncpy(flush_name, flush_prefix, prefix_size + 1);
1261   sstrncpy(flush_name + prefix_size, name, name_size + 1);
1262
1263   return flush_name;
1264 } /* static char *plugin_flush_callback_name */
1265
1266 EXPORT int plugin_register_flush(const char *name, plugin_flush_cb callback,
1267                                  user_data_t const *ud) {
1268   int status;
1269   plugin_ctx_t ctx = plugin_get_ctx();
1270
1271   status = create_register_callback(&list_flush, name, (void *)callback, ud);
1272   if (status != 0)
1273     return status;
1274
1275   if (ctx.flush_interval != 0) {
1276     char *flush_name;
1277     flush_callback_t *cb;
1278
1279     flush_name = plugin_flush_callback_name(name);
1280     if (flush_name == NULL)
1281       return -1;
1282
1283     cb = malloc(sizeof(*cb));
1284     if (cb == NULL) {
1285       ERROR("plugin_register_flush: malloc failed.");
1286       sfree(flush_name);
1287       return -1;
1288     }
1289
1290     cb->name = strdup(name);
1291     if (cb->name == NULL) {
1292       ERROR("plugin_register_flush: strdup failed.");
1293       sfree(cb);
1294       sfree(flush_name);
1295       return -1;
1296     }
1297     cb->timeout = ctx.flush_timeout;
1298
1299     status = plugin_register_complex_read(
1300         /* group     = */ "flush",
1301         /* name      = */ flush_name,
1302         /* callback  = */ plugin_flush_timeout_callback,
1303         /* interval  = */ ctx.flush_interval,
1304         /* user data = */ &(user_data_t){
1305             .data = cb, .free_func = plugin_flush_timeout_callback_free,
1306         });
1307
1308     sfree(flush_name);
1309     return status;
1310   }
1311
1312   return 0;
1313 } /* int plugin_register_flush */
1314
1315 EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback,
1316                                    user_data_t const *ud) {
1317   return create_register_callback(&list_missing, name, (void *)callback, ud);
1318 } /* int plugin_register_missing */
1319
1320 EXPORT int plugin_register_cache_event(const char *name,
1321                                        plugin_cache_event_cb callback,
1322                                        user_data_t const *ud) {
1323
1324   if (name == NULL || callback == NULL)
1325     return EINVAL;
1326
1327   char *name_copy = strdup(name);
1328   if (name_copy == NULL) {
1329     P_ERROR("plugin_register_cache_event: strdup failed.");
1330     free_userdata(ud);
1331     return ENOMEM;
1332   }
1333
1334   if (list_cache_event_num >= 32) {
1335     P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
1336             "to be registered.");
1337     free_userdata(ud);
1338     return ENOMEM;
1339   }
1340
1341   for (size_t i = 0; i < list_cache_event_num; i++) {
1342     cache_event_func_t *cef = &list_cache_event[i];
1343     if (!cef->callback)
1344       continue;
1345
1346     if (strcmp(name, cef->name) == 0) {
1347       P_ERROR("plugin_register_cache_event: a callback named `%s' already "
1348               "registered!",
1349               name);
1350       free_userdata(ud);
1351       return -1;
1352     }
1353   }
1354
1355   user_data_t user_data;
1356   if (ud == NULL) {
1357     user_data = (user_data_t){
1358         .data = NULL, .free_func = NULL,
1359     };
1360   } else {
1361     user_data = *ud;
1362   }
1363
1364   list_cache_event[list_cache_event_num] =
1365       (cache_event_func_t){.callback = callback,
1366                            .name = name_copy,
1367                            .user_data = user_data,
1368                            .plugin_ctx = plugin_get_ctx()};
1369   list_cache_event_num++;
1370
1371   return 0;
1372 } /* int plugin_register_cache_event */
1373
1374 EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
1375   return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
1376 } /* int plugin_register_shutdown */
1377
1378 static void plugin_free_data_sets(void) {
1379   void *key;
1380   void *value;
1381
1382   if (data_sets == NULL)
1383     return;
1384
1385   while (c_avl_pick(data_sets, &key, &value) == 0) {
1386     data_set_t *ds = value;
1387     /* key is a pointer to ds->type */
1388
1389     sfree(ds->ds);
1390     sfree(ds);
1391   }
1392
1393   c_avl_destroy(data_sets);
1394   data_sets = NULL;
1395 } /* void plugin_free_data_sets */
1396
1397 EXPORT int plugin_register_data_set(const data_set_t *ds) {
1398   data_set_t *ds_copy;
1399
1400   if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) {
1401     NOTICE("Replacing DS `%s' with another version.", ds->type);
1402     plugin_unregister_data_set(ds->type);
1403   } else if (data_sets == NULL) {
1404     data_sets = c_avl_create((int (*)(const void *, const void *))strcmp);
1405     if (data_sets == NULL)
1406       return -1;
1407   }
1408
1409   ds_copy = malloc(sizeof(*ds_copy));
1410   if (ds_copy == NULL)
1411     return -1;
1412   memcpy(ds_copy, ds, sizeof(data_set_t));
1413
1414   ds_copy->ds = malloc(sizeof(*ds_copy->ds) * ds->ds_num);
1415   if (ds_copy->ds == NULL) {
1416     sfree(ds_copy);
1417     return -1;
1418   }
1419
1420   for (size_t i = 0; i < ds->ds_num; i++)
1421     memcpy(ds_copy->ds + i, ds->ds + i, sizeof(data_source_t));
1422
1423   return c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy);
1424 } /* int plugin_register_data_set */
1425
1426 EXPORT int plugin_register_log(const char *name, plugin_log_cb callback,
1427                                user_data_t const *ud) {
1428   return create_register_callback(&list_log, name, (void *)callback, ud);
1429 } /* int plugin_register_log */
1430
1431 EXPORT int plugin_register_notification(const char *name,
1432                                         plugin_notification_cb callback,
1433                                         user_data_t const *ud) {
1434   return create_register_callback(&list_notification, name, (void *)callback,
1435                                   ud);
1436 } /* int plugin_register_log */
1437
1438 EXPORT int plugin_unregister_config(const char *name) {
1439   cf_unregister(name);
1440   return 0;
1441 } /* int plugin_unregister_config */
1442
1443 EXPORT int plugin_unregister_complex_config(const char *name) {
1444   cf_unregister_complex(name);
1445   return 0;
1446 } /* int plugin_unregister_complex_config */
1447
1448 EXPORT int plugin_unregister_init(const char *name) {
1449   return plugin_unregister(list_init, name);
1450 }
1451
1452 EXPORT int plugin_unregister_read(const char *name) /* {{{ */
1453 {
1454   llentry_t *le;
1455   read_func_t *rf;
1456
1457   if (name == NULL)
1458     return -ENOENT;
1459
1460   pthread_mutex_lock(&read_lock);
1461
1462   if (read_list == NULL) {
1463     pthread_mutex_unlock(&read_lock);
1464     return -ENOENT;
1465   }
1466
1467   le = llist_search(read_list, name);
1468   if (le == NULL) {
1469     pthread_mutex_unlock(&read_lock);
1470     WARNING("plugin_unregister_read: No such read function: %s", name);
1471     return -ENOENT;
1472   }
1473
1474   llist_remove(read_list, le);
1475
1476   rf = le->value;
1477   assert(rf != NULL);
1478   rf->rf_type = RF_REMOVE;
1479
1480   pthread_mutex_unlock(&read_lock);
1481
1482   llentry_destroy(le);
1483
1484   DEBUG("plugin_unregister_read: Marked `%s' for removal.", name);
1485
1486   return 0;
1487 } /* }}} int plugin_unregister_read */
1488
1489 EXPORT void plugin_log_available_writers(void) {
1490   log_list_callbacks(&list_write, "Available write targets:");
1491 }
1492
1493 static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */
1494 {
1495   read_func_t *rf = e->value;
1496   char *group = ud;
1497
1498   return strcmp(rf->rf_group, (const char *)group);
1499 } /* }}} int compare_read_func_group */
1500
1501 EXPORT int plugin_unregister_read_group(const char *group) /* {{{ */
1502 {
1503   llentry_t *le;
1504   read_func_t *rf;
1505
1506   int found = 0;
1507
1508   if (group == NULL)
1509     return -ENOENT;
1510
1511   pthread_mutex_lock(&read_lock);
1512
1513   if (read_list == NULL) {
1514     pthread_mutex_unlock(&read_lock);
1515     return -ENOENT;
1516   }
1517
1518   while (42) {
1519     le = llist_search_custom(read_list, compare_read_func_group, (void *)group);
1520
1521     if (le == NULL)
1522       break;
1523
1524     ++found;
1525
1526     llist_remove(read_list, le);
1527
1528     rf = le->value;
1529     assert(rf != NULL);
1530     rf->rf_type = RF_REMOVE;
1531
1532     llentry_destroy(le);
1533
1534     DEBUG("plugin_unregister_read_group: "
1535           "Marked `%s' (group `%s') for removal.",
1536           rf->rf_name, group);
1537   }
1538
1539   pthread_mutex_unlock(&read_lock);
1540
1541   if (found == 0) {
1542     WARNING("plugin_unregister_read_group: No such "
1543             "group of read function: %s",
1544             group);
1545     return -ENOENT;
1546   }
1547
1548   return 0;
1549 } /* }}} int plugin_unregister_read_group */
1550
1551 EXPORT int plugin_unregister_write(const char *name) {
1552   return plugin_unregister(list_write, name);
1553 }
1554
1555 EXPORT int plugin_unregister_flush(const char *name) {
1556   plugin_ctx_t ctx = plugin_get_ctx();
1557
1558   if (ctx.flush_interval != 0) {
1559     char *flush_name;
1560
1561     flush_name = plugin_flush_callback_name(name);
1562     if (flush_name != NULL) {
1563       plugin_unregister_read(flush_name);
1564       sfree(flush_name);
1565     }
1566   }
1567
1568   return plugin_unregister(list_flush, name);
1569 }
1570
1571 EXPORT int plugin_unregister_missing(const char *name) {
1572   return plugin_unregister(list_missing, name);
1573 }
1574
1575 EXPORT int plugin_unregister_cache_event(const char *name) {
1576   for (size_t i = 0; i < list_cache_event_num; i++) {
1577     cache_event_func_t *cef = &list_cache_event[i];
1578     if (!cef->callback)
1579       continue;
1580     if (strcmp(name, cef->name) == 0) {
1581       /* Mark callback as inactive, so mask in cache entries remains actual */
1582       cef->callback = NULL;
1583       sfree(cef->name);
1584       free_userdata(&cef->user_data);
1585     }
1586   }
1587   return 0;
1588 }
1589
1590 static void destroy_cache_event_callbacks() {
1591   for (size_t i = 0; i < list_cache_event_num; i++) {
1592     cache_event_func_t *cef = &list_cache_event[i];
1593     if (!cef->callback)
1594       continue;
1595     cef->callback = NULL;
1596     sfree(cef->name);
1597     free_userdata(&cef->user_data);
1598   }
1599 }
1600
1601 EXPORT int plugin_unregister_shutdown(const char *name) {
1602   return plugin_unregister(list_shutdown, name);
1603 }
1604
1605 EXPORT int plugin_unregister_data_set(const char *name) {
1606   data_set_t *ds;
1607
1608   if (data_sets == NULL)
1609     return -1;
1610
1611   if (c_avl_remove(data_sets, name, NULL, (void *)&ds) != 0)
1612     return -1;
1613
1614   sfree(ds->ds);
1615   sfree(ds);
1616
1617   return 0;
1618 } /* int plugin_unregister_data_set */
1619
1620 EXPORT int plugin_unregister_log(const char *name) {
1621   return plugin_unregister(list_log, name);
1622 }
1623
1624 EXPORT int plugin_unregister_notification(const char *name) {
1625   return plugin_unregister(list_notification, name);
1626 }
1627
1628 EXPORT int plugin_init_all(void) {
1629   char const *chain_name;
1630   llentry_t *le;
1631   int status;
1632   int ret = 0;
1633
1634   /* Init the value cache */
1635   uc_init();
1636
1637   if (IS_TRUE(global_option_get("CollectInternalStats"))) {
1638     record_statistics = true;
1639     plugin_register_read("collectd", plugin_update_internal_statistics);
1640   }
1641
1642   chain_name = global_option_get("PreCacheChain");
1643   pre_cache_chain = fc_chain_get_by_name(chain_name);
1644
1645   chain_name = global_option_get("PostCacheChain");
1646   post_cache_chain = fc_chain_get_by_name(chain_name);
1647
1648   write_limit_high = global_option_get_long("WriteQueueLimitHigh",
1649                                             /* default = */ 0);
1650   if (write_limit_high < 0) {
1651     ERROR("WriteQueueLimitHigh must be positive or zero.");
1652     write_limit_high = 0;
1653   }
1654
1655   write_limit_low =
1656       global_option_get_long("WriteQueueLimitLow",
1657                              /* default = */ write_limit_high / 2);
1658   if (write_limit_low < 0) {
1659     ERROR("WriteQueueLimitLow must be positive or zero.");
1660     write_limit_low = write_limit_high / 2;
1661   } else if (write_limit_low > write_limit_high) {
1662     ERROR("WriteQueueLimitLow must not be larger than "
1663           "WriteQueueLimitHigh.");
1664     write_limit_low = write_limit_high;
1665   }
1666
1667   write_threads_num = global_option_get_long("WriteThreads",
1668                                              /* default = */ 5);
1669   if (write_threads_num < 1) {
1670     ERROR("WriteThreads must be positive.");
1671     write_threads_num = 5;
1672   }
1673
1674   if ((list_init == NULL) && (read_heap == NULL))
1675     return ret;
1676
1677   /* Calling all init callbacks before checking if read callbacks
1678    * are available allows the init callbacks to register the read
1679    * callback. */
1680   le = llist_head(list_init);
1681   while (le != NULL) {
1682     callback_func_t *cf;
1683     plugin_init_cb callback;
1684     plugin_ctx_t old_ctx;
1685
1686     cf = le->value;
1687     old_ctx = plugin_set_ctx(cf->cf_ctx);
1688     callback = cf->cf_callback;
1689     status = (*callback)();
1690     plugin_set_ctx(old_ctx);
1691
1692     if (status != 0) {
1693       ERROR("Initialization of plugin `%s' "
1694             "failed with status %i. "
1695             "Plugin will be unloaded.",
1696             le->key, status);
1697       /* Plugins that register read callbacks from the init
1698        * callback should take care of appropriate error
1699        * handling themselves. */
1700       /* FIXME: Unload _all_ functions */
1701       plugin_unregister_read(le->key);
1702       ret = -1;
1703     }
1704
1705     le = le->next;
1706   }
1707
1708   start_write_threads((size_t)write_threads_num);
1709
1710   max_read_interval =
1711       global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL);
1712
1713   /* Start read-threads */
1714   if (read_heap != NULL) {
1715     const char *rt;
1716     int num;
1717
1718     rt = global_option_get("ReadThreads");
1719     num = atoi(rt);
1720     if (num != -1)
1721       start_read_threads((num > 0) ? ((size_t)num) : 5);
1722   }
1723   return ret;
1724 } /* void plugin_init_all */
1725
1726 /* TODO: Rename this function. */
1727 EXPORT void plugin_read_all(void) {
1728   uc_check_timeout();
1729
1730   return;
1731 } /* void plugin_read_all */
1732
1733 /* Read function called when the `-T' command line argument is given. */
1734 EXPORT int plugin_read_all_once(void) {
1735   int status;
1736   int return_status = 0;
1737
1738   if (read_heap == NULL) {
1739     NOTICE("No read-functions are registered.");
1740     return 0;
1741   }
1742
1743   while (42) {
1744     read_func_t *rf;
1745     plugin_ctx_t old_ctx;
1746
1747     rf = c_heap_get_root(read_heap);
1748     if (rf == NULL)
1749       break;
1750
1751     old_ctx = plugin_set_ctx(rf->rf_ctx);
1752
1753     if (rf->rf_type == RF_SIMPLE) {
1754       int (*callback)(void);
1755
1756       callback = rf->rf_callback;
1757       status = (*callback)();
1758     } else {
1759       plugin_read_cb callback;
1760
1761       callback = rf->rf_callback;
1762       status = (*callback)(&rf->rf_udata);
1763     }
1764
1765     plugin_set_ctx(old_ctx);
1766
1767     if (status != 0) {
1768       NOTICE("read-function of plugin `%s' failed.", rf->rf_name);
1769       return_status = -1;
1770     }
1771
1772     sfree(rf->rf_name);
1773     destroy_callback((void *)rf);
1774   }
1775
1776   return return_status;
1777 } /* int plugin_read_all_once */
1778
1779 EXPORT int plugin_write(const char *plugin, /* {{{ */
1780                         const data_set_t *ds, const value_list_t *vl) {
1781   llentry_t *le;
1782   int status;
1783
1784   if (vl == NULL)
1785     return EINVAL;
1786
1787   if (list_write == NULL)
1788     return ENOENT;
1789
1790   if (ds == NULL) {
1791     ds = plugin_get_ds(vl->type);
1792     if (ds == NULL) {
1793       ERROR("plugin_write: Unable to lookup type `%s'.", vl->type);
1794       return ENOENT;
1795     }
1796   }
1797
1798   if (plugin == NULL) {
1799     int success = 0;
1800     int failure = 0;
1801
1802     le = llist_head(list_write);
1803     while (le != NULL) {
1804       callback_func_t *cf = le->value;
1805       plugin_write_cb callback;
1806
1807       /* Keep the read plugin's interval and flush information but update the
1808        * plugin name. */
1809       plugin_ctx_t old_ctx = plugin_get_ctx();
1810       plugin_ctx_t ctx = old_ctx;
1811       ctx.name = cf->cf_ctx.name;
1812       plugin_set_ctx(ctx);
1813
1814       DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1815       callback = cf->cf_callback;
1816       status = (*callback)(ds, vl, &cf->cf_udata);
1817       if (status != 0)
1818         failure++;
1819       else
1820         success++;
1821
1822       plugin_set_ctx(old_ctx);
1823       le = le->next;
1824     }
1825
1826     if ((success == 0) && (failure != 0))
1827       status = -1;
1828     else
1829       status = 0;
1830   } else /* plugin != NULL */
1831   {
1832     callback_func_t *cf;
1833     plugin_write_cb callback;
1834
1835     le = llist_head(list_write);
1836     while (le != NULL) {
1837       if (strcasecmp(plugin, le->key) == 0)
1838         break;
1839
1840       le = le->next;
1841     }
1842
1843     if (le == NULL)
1844       return ENOENT;
1845
1846     cf = le->value;
1847
1848     /* do not switch plugin context; rather keep the context (interval)
1849      * information of the calling read plugin */
1850
1851     DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1852     callback = cf->cf_callback;
1853     status = (*callback)(ds, vl, &cf->cf_udata);
1854   }
1855
1856   return status;
1857 } /* }}} int plugin_write */
1858
1859 EXPORT int plugin_flush(const char *plugin, cdtime_t timeout,
1860                         const char *identifier) {
1861   llentry_t *le;
1862
1863   if (list_flush == NULL)
1864     return 0;
1865
1866   le = llist_head(list_flush);
1867   while (le != NULL) {
1868     callback_func_t *cf;
1869     plugin_flush_cb callback;
1870     plugin_ctx_t old_ctx;
1871
1872     if ((plugin != NULL) && (strcmp(plugin, le->key) != 0)) {
1873       le = le->next;
1874       continue;
1875     }
1876
1877     cf = le->value;
1878     old_ctx = plugin_set_ctx(cf->cf_ctx);
1879     callback = cf->cf_callback;
1880
1881     (*callback)(timeout, identifier, &cf->cf_udata);
1882
1883     plugin_set_ctx(old_ctx);
1884
1885     le = le->next;
1886   }
1887   return 0;
1888 } /* int plugin_flush */
1889
1890 EXPORT int plugin_shutdown_all(void) {
1891   llentry_t *le;
1892   int ret = 0; // Assume success.
1893
1894   destroy_all_callbacks(&list_init);
1895
1896   stop_read_threads();
1897
1898   pthread_mutex_lock(&read_lock);
1899   llist_destroy(read_list);
1900   read_list = NULL;
1901   pthread_mutex_unlock(&read_lock);
1902
1903   destroy_read_heap();
1904
1905   /* blocks until all write threads have shut down. */
1906   stop_write_threads();
1907
1908   /* ask all plugins to write out the state they kept. */
1909   plugin_flush(/* plugin = */ NULL,
1910                /* timeout = */ 0,
1911                /* identifier = */ NULL);
1912
1913   le = NULL;
1914   if (list_shutdown != NULL)
1915     le = llist_head(list_shutdown);
1916
1917   while (le != NULL) {
1918     callback_func_t *cf;
1919     plugin_shutdown_cb callback;
1920     plugin_ctx_t old_ctx;
1921
1922     cf = le->value;
1923     old_ctx = plugin_set_ctx(cf->cf_ctx);
1924     callback = cf->cf_callback;
1925
1926     /* Advance the pointer before calling the callback allows
1927      * shutdown functions to unregister themselves. If done the
1928      * other way around the memory `le' points to will be freed
1929      * after callback returns. */
1930     le = le->next;
1931
1932     if ((*callback)() != 0)
1933       ret = -1;
1934
1935     plugin_set_ctx(old_ctx);
1936   }
1937
1938   /* Write plugins which use the `user_data' pointer usually need the
1939    * same data available to the flush callback. If this is the case, set
1940    * the free_function to NULL when registering the flush callback and to
1941    * the real free function when registering the write callback. This way
1942    * the data isn't freed twice. */
1943   destroy_all_callbacks(&list_flush);
1944   destroy_all_callbacks(&list_missing);
1945   destroy_cache_event_callbacks();
1946   destroy_all_callbacks(&list_write);
1947
1948   destroy_all_callbacks(&list_notification);
1949   destroy_all_callbacks(&list_shutdown);
1950   destroy_all_callbacks(&list_log);
1951
1952   plugin_free_loaded();
1953   plugin_free_data_sets();
1954   return ret;
1955 } /* void plugin_shutdown_all */
1956
1957 EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
1958 {
1959   if (list_missing == NULL)
1960     return 0;
1961
1962   llentry_t *le = llist_head(list_missing);
1963   while (le != NULL) {
1964     callback_func_t *cf = le->value;
1965     plugin_ctx_t old_ctx = plugin_set_ctx(cf->cf_ctx);
1966     plugin_missing_cb callback = cf->cf_callback;
1967
1968     int status = (*callback)(vl, &cf->cf_udata);
1969     plugin_set_ctx(old_ctx);
1970     if (status != 0) {
1971       if (status < 0) {
1972         ERROR("plugin_dispatch_missing: Callback function \"%s\" "
1973               "failed with status %i.",
1974               le->key, status);
1975         return status;
1976       } else {
1977         return 0;
1978       }
1979     }
1980
1981     le = le->next;
1982   }
1983   return 0;
1984 } /* int }}} plugin_dispatch_missing */
1985
1986 void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
1987                                  unsigned long callbacks_mask, const char *name,
1988                                  const value_list_t *vl) {
1989   switch (event_type) {
1990   case CE_VALUE_NEW:
1991     callbacks_mask = 0;
1992     for (size_t i = 0; i < list_cache_event_num; i++) {
1993       cache_event_func_t *cef = &list_cache_event[i];
1994       plugin_cache_event_cb callback = cef->callback;
1995
1996       if (!callback)
1997         continue;
1998
1999       cache_event_t event = (cache_event_t){.type = event_type,
2000                                             .value_list = vl,
2001                                             .value_list_name = name,
2002                                             .ret = 0};
2003
2004       plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2005       int status = (*callback)(&event, &cef->user_data);
2006       plugin_set_ctx(old_ctx);
2007
2008       if (status != 0) {
2009         ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2010               "%i for event NEW.",
2011               cef->name, status);
2012       } else {
2013         if (event.ret) {
2014           DEBUG(
2015               "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
2016               cef->name, name);
2017           callbacks_mask |= (1 << (i));
2018         } else {
2019           DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
2020                 cef->name, name);
2021         }
2022       }
2023     }
2024
2025     if (callbacks_mask)
2026       uc_set_callbacks_mask(name, callbacks_mask);
2027
2028     break;
2029   case CE_VALUE_UPDATE:
2030   case CE_VALUE_EXPIRED:
2031     for (size_t i = 0; i < list_cache_event_num; i++) {
2032       cache_event_func_t *cef = &list_cache_event[i];
2033       plugin_cache_event_cb callback = cef->callback;
2034
2035       if (!callback)
2036         continue;
2037
2038       if (callbacks_mask && (1 << (i)) == 0)
2039         continue;
2040
2041       cache_event_t event = (cache_event_t){.type = event_type,
2042                                             .value_list = vl,
2043                                             .value_list_name = name,
2044                                             .ret = 0};
2045
2046       plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2047       int status = (*callback)(&event, &cef->user_data);
2048       plugin_set_ctx(old_ctx);
2049
2050       if (status != 0) {
2051         ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2052               "%i for event %s.",
2053               cef->name, status,
2054               ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
2055       }
2056     }
2057     break;
2058   }
2059   return;
2060 }
2061
2062 static int plugin_dispatch_values_internal(value_list_t *vl) {
2063   int status;
2064   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
2065
2066   bool free_meta_data = false;
2067
2068   assert(vl != NULL);
2069
2070   /* These fields are initialized by plugin_value_list_clone() if needed: */
2071   assert(vl->host[0] != 0);
2072   assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
2073   assert(vl->interval != 0);
2074
2075   if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
2076     ERROR("plugin_dispatch_values: Invalid value list "
2077           "from plugin %s.",
2078           vl->plugin);
2079     return -1;
2080   }
2081
2082   /* Free meta data only if the calling function didn't specify any. In
2083    * this case matches and targets may add some and the calling function
2084    * may not expect (and therefore free) that data. */
2085   if (vl->meta == NULL)
2086     free_meta_data = true;
2087
2088   if (list_write == NULL)
2089     c_complain_once(LOG_WARNING, &no_write_complaint,
2090                     "plugin_dispatch_values: No write callback has been "
2091                     "registered. Please load at least one output plugin, "
2092                     "if you want the collected data to be stored.");
2093
2094   if (data_sets == NULL) {
2095     ERROR("plugin_dispatch_values: No data sets registered. "
2096           "Could the types database be read? Check "
2097           "your `TypesDB' setting!");
2098     return -1;
2099   }
2100
2101   data_set_t *ds = NULL;
2102   if (c_avl_get(data_sets, vl->type, (void *)&ds) != 0) {
2103     char ident[6 * DATA_MAX_NAME_LEN];
2104
2105     FORMAT_VL(ident, sizeof(ident), vl);
2106     INFO("plugin_dispatch_values: Dataset not found: %s "
2107          "(from \"%s\"), check your types.db!",
2108          vl->type, ident);
2109     return -1;
2110   }
2111
2112   DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
2113         "host = %s; "
2114         "plugin = %s; plugin_instance = %s; "
2115         "type = %s; type_instance = %s;",
2116         CDTIME_T_TO_DOUBLE(vl->time), CDTIME_T_TO_DOUBLE(vl->interval),
2117         vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
2118
2119 #if COLLECT_DEBUG
2120   assert(0 == strcmp(ds->type, vl->type));
2121 #else
2122   if (0 != strcmp(ds->type, vl->type))
2123     WARNING("plugin_dispatch_values: (ds->type = %s) != (vl->type = %s)",
2124             ds->type, vl->type);
2125 #endif
2126
2127 #if COLLECT_DEBUG
2128   assert(ds->ds_num == vl->values_len);
2129 #else
2130   if (ds->ds_num != vl->values_len) {
2131     ERROR("plugin_dispatch_values: ds->type = %s: "
2132           "(ds->ds_num = %" PRIsz ") != "
2133           "(vl->values_len = %" PRIsz ")",
2134           ds->type, ds->ds_num, vl->values_len);
2135     return -1;
2136   }
2137 #endif
2138
2139   escape_slashes(vl->host, sizeof(vl->host));
2140   escape_slashes(vl->plugin, sizeof(vl->plugin));
2141   escape_slashes(vl->plugin_instance, sizeof(vl->plugin_instance));
2142   escape_slashes(vl->type, sizeof(vl->type));
2143   escape_slashes(vl->type_instance, sizeof(vl->type_instance));
2144
2145   if (pre_cache_chain != NULL) {
2146     status = fc_process_chain(ds, vl, pre_cache_chain);
2147     if (status < 0) {
2148       WARNING("plugin_dispatch_values: Running the "
2149               "pre-cache chain failed with "
2150               "status %i (%#x).",
2151               status, status);
2152     } else if (status == FC_TARGET_STOP)
2153       return 0;
2154   }
2155
2156   /* Update the value cache */
2157   uc_update(ds, vl);
2158
2159   if (post_cache_chain != NULL) {
2160     status = fc_process_chain(ds, vl, post_cache_chain);
2161     if (status < 0) {
2162       WARNING("plugin_dispatch_values: Running the "
2163               "post-cache chain failed with "
2164               "status %i (%#x).",
2165               status, status);
2166     }
2167   } else
2168     fc_default_action(ds, vl);
2169
2170   if ((free_meta_data == true) && (vl->meta != NULL)) {
2171     meta_data_destroy(vl->meta);
2172     vl->meta = NULL;
2173   }
2174
2175   return 0;
2176 } /* int plugin_dispatch_values_internal */
2177
2178 static double get_drop_probability(void) /* {{{ */
2179 {
2180   long pos;
2181   long size;
2182   long wql;
2183
2184   pthread_mutex_lock(&write_lock);
2185   wql = write_queue_length;
2186   pthread_mutex_unlock(&write_lock);
2187
2188   if (wql < write_limit_low)
2189     return 0.0;
2190   if (wql >= write_limit_high)
2191     return 1.0;
2192
2193   pos = 1 + wql - write_limit_low;
2194   size = 1 + write_limit_high - write_limit_low;
2195
2196   return (double)pos / (double)size;
2197 } /* }}} double get_drop_probability */
2198
2199 static bool check_drop_value(void) /* {{{ */
2200 {
2201   static cdtime_t last_message_time;
2202   static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2203
2204   double p;
2205   double q;
2206   int status;
2207
2208   if (write_limit_high == 0)
2209     return false;
2210
2211   p = get_drop_probability();
2212   if (p == 0.0)
2213     return false;
2214
2215   status = pthread_mutex_trylock(&last_message_lock);
2216   if (status == 0) {
2217     cdtime_t now;
2218
2219     now = cdtime();
2220     if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) {
2221       last_message_time = now;
2222       ERROR("plugin_dispatch_values: Low water mark "
2223             "reached. Dropping %.0f%% of metrics.",
2224             100.0 * p);
2225     }
2226     pthread_mutex_unlock(&last_message_lock);
2227   }
2228
2229   if (p == 1.0)
2230     return true;
2231
2232   q = cdrand_d();
2233   if (q > p)
2234     return true;
2235   else
2236     return false;
2237 } /* }}} bool check_drop_value */
2238
2239 EXPORT int plugin_dispatch_values(value_list_t const *vl) {
2240   int status;
2241
2242   if (check_drop_value()) {
2243     if (record_statistics) {
2244       pthread_mutex_lock(&statistics_lock);
2245       stats_values_dropped++;
2246       pthread_mutex_unlock(&statistics_lock);
2247     }
2248     return 0;
2249   }
2250
2251   status = plugin_write_enqueue(vl);
2252   if (status != 0) {
2253     ERROR("plugin_dispatch_values: plugin_write_enqueue failed with status %i "
2254           "(%s).",
2255           status, STRERROR(status));
2256     return status;
2257   }
2258
2259   return 0;
2260 }
2261
2262 __attribute__((sentinel)) int
2263 plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */
2264                            bool store_percentage, int store_type, ...) {
2265   value_list_t *vl;
2266   int failed = 0;
2267   gauge_t sum = 0.0;
2268   va_list ap;
2269
2270   if (check_drop_value()) {
2271     if (record_statistics) {
2272       pthread_mutex_lock(&statistics_lock);
2273       stats_values_dropped++;
2274       pthread_mutex_unlock(&statistics_lock);
2275     }
2276     return 0;
2277   }
2278
2279   assert(template->values_len == 1);
2280
2281   /* Calculate sum for Gauge to calculate percent if needed */
2282   if (DS_TYPE_GAUGE == store_type) {
2283     va_start(ap, store_type);
2284     while (42) {
2285       char const *name;
2286       gauge_t value;
2287
2288       name = va_arg(ap, char const *);
2289       if (name == NULL)
2290         break;
2291
2292       value = va_arg(ap, gauge_t);
2293       if (!isnan(value))
2294         sum += value;
2295     }
2296     va_end(ap);
2297   }
2298
2299   vl = plugin_value_list_clone(template);
2300   /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2301   if (store_percentage)
2302     sstrncpy(vl->type, "percent", sizeof(vl->type));
2303
2304   va_start(ap, store_type);
2305   while (42) {
2306     char const *name;
2307     int status;
2308
2309     /* Set the type instance. */
2310     name = va_arg(ap, char const *);
2311     if (name == NULL)
2312       break;
2313     sstrncpy(vl->type_instance, name, sizeof(vl->type_instance));
2314
2315     /* Set the value. */
2316     switch (store_type) {
2317     case DS_TYPE_GAUGE:
2318       vl->values[0].gauge = va_arg(ap, gauge_t);
2319       if (store_percentage)
2320         vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2321       break;
2322     case DS_TYPE_ABSOLUTE:
2323       vl->values[0].absolute = va_arg(ap, absolute_t);
2324       break;
2325     case DS_TYPE_COUNTER:
2326       vl->values[0].counter = va_arg(ap, counter_t);
2327       break;
2328     case DS_TYPE_DERIVE:
2329       vl->values[0].derive = va_arg(ap, derive_t);
2330       break;
2331     default:
2332       ERROR("plugin_dispatch_multivalue: given store_type is incorrect.");
2333       failed++;
2334     }
2335
2336     status = plugin_write_enqueue(vl);
2337     if (status != 0)
2338       failed++;
2339   }
2340   va_end(ap);
2341
2342   plugin_value_list_free(vl);
2343   return failed;
2344 } /* }}} int plugin_dispatch_multivalue */
2345
2346 EXPORT int plugin_dispatch_notification(const notification_t *notif) {
2347   llentry_t *le;
2348   /* Possible TODO: Add flap detection here */
2349
2350   DEBUG("plugin_dispatch_notification: severity = %i; message = %s; "
2351         "time = %.3f; host = %s;",
2352         notif->severity, notif->message, CDTIME_T_TO_DOUBLE(notif->time),
2353         notif->host);
2354
2355   /* Nobody cares for notifications */
2356   if (list_notification == NULL)
2357     return -1;
2358
2359   le = llist_head(list_notification);
2360   while (le != NULL) {
2361     callback_func_t *cf;
2362     plugin_notification_cb callback;
2363     int status;
2364
2365     /* do not switch plugin context; rather keep the context
2366      * (interval) information of the calling plugin */
2367
2368     cf = le->value;
2369     callback = cf->cf_callback;
2370     status = (*callback)(notif, &cf->cf_udata);
2371     if (status != 0) {
2372       WARNING("plugin_dispatch_notification: Notification "
2373               "callback %s returned %i.",
2374               le->key, status);
2375     }
2376
2377     le = le->next;
2378   }
2379
2380   return 0;
2381 } /* int plugin_dispatch_notification */
2382
2383 EXPORT void plugin_log(int level, const char *format, ...) {
2384   char msg[1024];
2385   va_list ap;
2386   llentry_t *le;
2387
2388 #if !COLLECT_DEBUG
2389   if (level >= LOG_DEBUG)
2390     return;
2391 #endif
2392
2393   va_start(ap, format);
2394   vsnprintf(msg, sizeof(msg), format, ap);
2395   msg[sizeof(msg) - 1] = '\0';
2396   va_end(ap);
2397
2398   if (list_log == NULL) {
2399     fprintf(stderr, "%s\n", msg);
2400     return;
2401   }
2402
2403   le = llist_head(list_log);
2404   while (le != NULL) {
2405     callback_func_t *cf;
2406     plugin_log_cb callback;
2407
2408     cf = le->value;
2409     callback = cf->cf_callback;
2410
2411     /* do not switch plugin context; rather keep the context
2412      * (interval) information of the calling plugin */
2413
2414     (*callback)(level, msg, &cf->cf_udata);
2415
2416     le = le->next;
2417   }
2418 } /* void plugin_log */
2419
2420 void daemon_log(int level, const char *format, ...) {
2421   char msg[1024] = ""; // Size inherits from plugin_log()
2422
2423   char const *name = plugin_get_ctx().name;
2424   if (name == NULL)
2425     name = "UNKNOWN";
2426
2427   va_list ap;
2428   va_start(ap, format);
2429   vsnprintf(msg, sizeof(msg), format, ap);
2430   va_end(ap);
2431
2432   plugin_log(level, "%s plugin: %s", name, msg);
2433 } /* void daemon_log */
2434
2435 int parse_log_severity(const char *severity) {
2436   int log_level = -1;
2437
2438   if ((0 == strcasecmp(severity, "emerg")) ||
2439       (0 == strcasecmp(severity, "alert")) ||
2440       (0 == strcasecmp(severity, "crit")) || (0 == strcasecmp(severity, "err")))
2441     log_level = LOG_ERR;
2442   else if (0 == strcasecmp(severity, "warning"))
2443     log_level = LOG_WARNING;
2444   else if (0 == strcasecmp(severity, "notice"))
2445     log_level = LOG_NOTICE;
2446   else if (0 == strcasecmp(severity, "info"))
2447     log_level = LOG_INFO;
2448 #if COLLECT_DEBUG
2449   else if (0 == strcasecmp(severity, "debug"))
2450     log_level = LOG_DEBUG;
2451 #endif /* COLLECT_DEBUG */
2452
2453   return log_level;
2454 } /* int parse_log_severity */
2455
2456 EXPORT int parse_notif_severity(const char *severity) {
2457   int notif_severity = -1;
2458
2459   if (strcasecmp(severity, "FAILURE") == 0)
2460     notif_severity = NOTIF_FAILURE;
2461   else if (strcmp(severity, "OKAY") == 0)
2462     notif_severity = NOTIF_OKAY;
2463   else if ((strcmp(severity, "WARNING") == 0) ||
2464            (strcmp(severity, "WARN") == 0))
2465     notif_severity = NOTIF_WARNING;
2466
2467   return notif_severity;
2468 } /* int parse_notif_severity */
2469
2470 EXPORT const data_set_t *plugin_get_ds(const char *name) {
2471   data_set_t *ds;
2472
2473   if (data_sets == NULL) {
2474     P_ERROR("plugin_get_ds: No data sets are defined yet.");
2475     return NULL;
2476   }
2477
2478   if (c_avl_get(data_sets, name, (void *)&ds) != 0) {
2479     DEBUG("No such dataset registered: %s", name);
2480     return NULL;
2481   }
2482
2483   return ds;
2484 } /* data_set_t *plugin_get_ds */
2485
2486 static int plugin_notification_meta_add(notification_t *n, const char *name,
2487                                         enum notification_meta_type_e type,
2488                                         const void *value) {
2489   notification_meta_t *meta;
2490   notification_meta_t *tail;
2491
2492   if ((n == NULL) || (name == NULL) || (value == NULL)) {
2493     ERROR("plugin_notification_meta_add: A pointer is NULL!");
2494     return -1;
2495   }
2496
2497   meta = calloc(1, sizeof(*meta));
2498   if (meta == NULL) {
2499     ERROR("plugin_notification_meta_add: calloc failed.");
2500     return -1;
2501   }
2502
2503   sstrncpy(meta->name, name, sizeof(meta->name));
2504   meta->type = type;
2505
2506   switch (type) {
2507   case NM_TYPE_STRING: {
2508     meta->nm_value.nm_string = strdup((const char *)value);
2509     if (meta->nm_value.nm_string == NULL) {
2510       ERROR("plugin_notification_meta_add: strdup failed.");
2511       sfree(meta);
2512       return -1;
2513     }
2514     break;
2515   }
2516   case NM_TYPE_SIGNED_INT: {
2517     meta->nm_value.nm_signed_int = *((int64_t *)value);
2518     break;
2519   }
2520   case NM_TYPE_UNSIGNED_INT: {
2521     meta->nm_value.nm_unsigned_int = *((uint64_t *)value);
2522     break;
2523   }
2524   case NM_TYPE_DOUBLE: {
2525     meta->nm_value.nm_double = *((double *)value);
2526     break;
2527   }
2528   case NM_TYPE_BOOLEAN: {
2529     meta->nm_value.nm_boolean = *((bool *)value);
2530     break;
2531   }
2532   default: {
2533     ERROR("plugin_notification_meta_add: Unknown type: %i", type);
2534     sfree(meta);
2535     return -1;
2536   }
2537   } /* switch (type) */
2538
2539   meta->next = NULL;
2540   tail = n->meta;
2541   while ((tail != NULL) && (tail->next != NULL))
2542     tail = tail->next;
2543
2544   if (tail == NULL)
2545     n->meta = meta;
2546   else
2547     tail->next = meta;
2548
2549   return 0;
2550 } /* int plugin_notification_meta_add */
2551
2552 int plugin_notification_meta_add_string(notification_t *n, const char *name,
2553                                         const char *value) {
2554   return plugin_notification_meta_add(n, name, NM_TYPE_STRING, value);
2555 }
2556
2557 int plugin_notification_meta_add_signed_int(notification_t *n, const char *name,
2558                                             int64_t value) {
2559   return plugin_notification_meta_add(n, name, NM_TYPE_SIGNED_INT, &value);
2560 }
2561
2562 int plugin_notification_meta_add_unsigned_int(notification_t *n,
2563                                               const char *name,
2564                                               uint64_t value) {
2565   return plugin_notification_meta_add(n, name, NM_TYPE_UNSIGNED_INT, &value);
2566 }
2567
2568 int plugin_notification_meta_add_double(notification_t *n, const char *name,
2569                                         double value) {
2570   return plugin_notification_meta_add(n, name, NM_TYPE_DOUBLE, &value);
2571 }
2572
2573 int plugin_notification_meta_add_boolean(notification_t *n, const char *name,
2574                                          bool value) {
2575   return plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value);
2576 }
2577
2578 int plugin_notification_meta_copy(notification_t *dst,
2579                                   const notification_t *src) {
2580   assert(dst != NULL);
2581   assert(src != NULL);
2582   assert(dst != src);
2583   assert((src->meta == NULL) || (src->meta != dst->meta));
2584
2585   for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next) {
2586     if (meta->type == NM_TYPE_STRING)
2587       plugin_notification_meta_add_string(dst, meta->name,
2588                                           meta->nm_value.nm_string);
2589     else if (meta->type == NM_TYPE_SIGNED_INT)
2590       plugin_notification_meta_add_signed_int(dst, meta->name,
2591                                               meta->nm_value.nm_signed_int);
2592     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2593       plugin_notification_meta_add_unsigned_int(dst, meta->name,
2594                                                 meta->nm_value.nm_unsigned_int);
2595     else if (meta->type == NM_TYPE_DOUBLE)
2596       plugin_notification_meta_add_double(dst, meta->name,
2597                                           meta->nm_value.nm_double);
2598     else if (meta->type == NM_TYPE_BOOLEAN)
2599       plugin_notification_meta_add_boolean(dst, meta->name,
2600                                            meta->nm_value.nm_boolean);
2601   }
2602
2603   return 0;
2604 } /* int plugin_notification_meta_copy */
2605
2606 int plugin_notification_meta_free(notification_meta_t *n) {
2607   notification_meta_t *this;
2608   notification_meta_t *next;
2609
2610   if (n == NULL) {
2611     ERROR("plugin_notification_meta_free: n == NULL!");
2612     return -1;
2613   }
2614
2615   this = n;
2616   while (this != NULL) {
2617     next = this->next;
2618
2619     if (this->type == NM_TYPE_STRING) {
2620       /* Assign to a temporary variable to work around nm_string's const
2621        * modifier. */
2622       void *tmp = (void *)this->nm_value.nm_string;
2623
2624       sfree(tmp);
2625       this->nm_value.nm_string = NULL;
2626     }
2627     sfree(this);
2628
2629     this = next;
2630   }
2631
2632   return 0;
2633 } /* int plugin_notification_meta_free */
2634
2635 static void plugin_ctx_destructor(void *ctx) {
2636   sfree(ctx);
2637 } /* void plugin_ctx_destructor */
2638
2639 static plugin_ctx_t ctx_init = {/* interval = */ 0};
2640
2641 static plugin_ctx_t *plugin_ctx_create(void) {
2642   plugin_ctx_t *ctx;
2643
2644   ctx = malloc(sizeof(*ctx));
2645   if (ctx == NULL) {
2646     ERROR("Failed to allocate plugin context: %s", STRERRNO);
2647     return NULL;
2648   }
2649
2650   *ctx = ctx_init;
2651   assert(plugin_ctx_key_initialized);
2652   pthread_setspecific(plugin_ctx_key, ctx);
2653   DEBUG("Created new plugin context.");
2654   return ctx;
2655 } /* int plugin_ctx_create */
2656
2657 EXPORT void plugin_init_ctx(void) {
2658   pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor);
2659   plugin_ctx_key_initialized = true;
2660 } /* void plugin_init_ctx */
2661
2662 EXPORT plugin_ctx_t plugin_get_ctx(void) {
2663   plugin_ctx_t *ctx;
2664
2665   assert(plugin_ctx_key_initialized);
2666   ctx = pthread_getspecific(plugin_ctx_key);
2667
2668   if (ctx == NULL) {
2669     ctx = plugin_ctx_create();
2670     /* this must no happen -- exit() instead? */
2671     if (ctx == NULL)
2672       return ctx_init;
2673   }
2674
2675   return *ctx;
2676 } /* plugin_ctx_t plugin_get_ctx */
2677
2678 EXPORT plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
2679   plugin_ctx_t *c;
2680   plugin_ctx_t old;
2681
2682   assert(plugin_ctx_key_initialized);
2683   c = pthread_getspecific(plugin_ctx_key);
2684
2685   if (c == NULL) {
2686     c = plugin_ctx_create();
2687     /* this must no happen -- exit() instead? */
2688     if (c == NULL)
2689       return ctx_init;
2690   }
2691
2692   old = *c;
2693   *c = ctx;
2694
2695   return old;
2696 } /* void plugin_set_ctx */
2697
2698 EXPORT cdtime_t plugin_get_interval(void) {
2699   cdtime_t interval;
2700
2701   interval = plugin_get_ctx().interval;
2702   if (interval > 0)
2703     return interval;
2704
2705   P_ERROR("plugin_get_interval: Unable to determine Interval from context.");
2706
2707   return cf_get_default_interval();
2708 } /* cdtime_t plugin_get_interval */
2709
2710 typedef struct {
2711   plugin_ctx_t ctx;
2712   void *(*start_routine)(void *);
2713   void *arg;
2714 } plugin_thread_t;
2715
2716 static void *plugin_thread_start(void *arg) {
2717   plugin_thread_t *plugin_thread = arg;
2718
2719   void *(*start_routine)(void *) = plugin_thread->start_routine;
2720   void *plugin_arg = plugin_thread->arg;
2721
2722   plugin_set_ctx(plugin_thread->ctx);
2723
2724   sfree(plugin_thread);
2725
2726   return start_routine(plugin_arg);
2727 } /* void *plugin_thread_start */
2728
2729 int plugin_thread_create(pthread_t *thread, const pthread_attr_t *attr,
2730                          void *(*start_routine)(void *), void *arg,
2731                          char const *name) {
2732   plugin_thread_t *plugin_thread;
2733
2734   plugin_thread = malloc(sizeof(*plugin_thread));
2735   if (plugin_thread == NULL)
2736     return ENOMEM;
2737
2738   plugin_thread->ctx = plugin_get_ctx();
2739   plugin_thread->start_routine = start_routine;
2740   plugin_thread->arg = arg;
2741
2742   int ret = pthread_create(thread, attr, plugin_thread_start, plugin_thread);
2743   if (ret != 0) {
2744     sfree(plugin_thread);
2745     return ret;
2746   }
2747
2748   if (name != NULL)
2749     set_thread_name(*thread, name);
2750
2751   return 0;
2752 } /* int plugin_thread_create */