Merge pull request #3159 from rpv-tomsk/collectd-usercache
[collectd.git] / src / daemon / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27
28 /* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
29 #define _GNU_SOURCE
30
31 #include "collectd.h"
32
33 #include "configfile.h"
34 #include "filter_chain.h"
35 #include "plugin.h"
36 #include "utils/avltree/avltree.h"
37 #include "utils/common/common.h"
38 #include "utils/heap/heap.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "utils_llist.h"
42 #include "utils_random.h"
43 #include "utils_time.h"
44
45 #ifdef WIN32
46 #define EXPORT __declspec(dllexport)
47 #include <sys/stat.h>
48 #include <unistd.h>
49 #else
50 #define EXPORT
51 #endif
52
53 #if HAVE_PTHREAD_NP_H
54 #include <pthread_np.h> /* for pthread_set_name_np(3) */
55 #endif
56
57 #include <dlfcn.h>
58
59 /*
60  * Private structures
61  */
62 struct callback_func_s {
63   void *cf_callback;
64   user_data_t cf_udata;
65   plugin_ctx_t cf_ctx;
66 };
67 typedef struct callback_func_s callback_func_t;
68
69 #define RF_SIMPLE 0
70 #define RF_COMPLEX 1
71 #define RF_REMOVE 65535
72 struct read_func_s {
73 /* `read_func_t' "inherits" from `callback_func_t'.
74  * The `rf_super' member MUST be the first one in this structure! */
75 #define rf_callback rf_super.cf_callback
76 #define rf_udata rf_super.cf_udata
77 #define rf_ctx rf_super.cf_ctx
78   callback_func_t rf_super;
79   char rf_group[DATA_MAX_NAME_LEN];
80   char *rf_name;
81   int rf_type;
82   cdtime_t rf_interval;
83   cdtime_t rf_effective_interval;
84   cdtime_t rf_next_read;
85 };
86 typedef struct read_func_s read_func_t;
87
88 struct cache_event_func_s {
89   plugin_cache_event_cb callback;
90   char *name;
91   user_data_t user_data;
92   plugin_ctx_t plugin_ctx;
93 };
94 typedef struct cache_event_func_s cache_event_func_t;
95
96 struct write_queue_s;
97 typedef struct write_queue_s write_queue_t;
98 struct write_queue_s {
99   value_list_t *vl;
100   plugin_ctx_t ctx;
101   write_queue_t *next;
102 };
103
104 struct flush_callback_s {
105   char *name;
106   cdtime_t timeout;
107 };
108 typedef struct flush_callback_s flush_callback_t;
109
110 /*
111  * Private variables
112  */
113 static c_avl_tree_t *plugins_loaded;
114
115 static llist_t *list_init;
116 static llist_t *list_write;
117 static llist_t *list_flush;
118 static llist_t *list_missing;
119 static llist_t *list_shutdown;
120 static llist_t *list_log;
121 static llist_t *list_notification;
122
123 static size_t list_cache_event_num;
124 static cache_event_func_t list_cache_event[32];
125
126 static fc_chain_t *pre_cache_chain;
127 static fc_chain_t *post_cache_chain;
128
129 static c_avl_tree_t *data_sets;
130
131 static char *plugindir;
132
133 #ifndef DEFAULT_MAX_READ_INTERVAL
134 #define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
135 #endif
136 static c_heap_t *read_heap;
137 static llist_t *read_list;
138 static int read_loop = 1;
139 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
140 static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
141 static pthread_t *read_threads;
142 static size_t read_threads_num;
143 static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
144
145 static write_queue_t *write_queue_head;
146 static write_queue_t *write_queue_tail;
147 static long write_queue_length;
148 static bool write_loop = true;
149 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
150 static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
151 static pthread_t *write_threads;
152 static size_t write_threads_num;
153
154 static pthread_key_t plugin_ctx_key;
155 static bool plugin_ctx_key_initialized;
156
157 static long write_limit_high;
158 static long write_limit_low;
159
160 static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
161 static derive_t stats_values_dropped;
162 static bool record_statistics;
163
164 /*
165  * Static functions
166  */
167 static int plugin_dispatch_values_internal(value_list_t *vl);
168
169 static const char *plugin_get_dir(void) {
170   if (plugindir == NULL)
171     return PLUGINDIR;
172   else
173     return plugindir;
174 }
175
176 static int plugin_update_internal_statistics(void) { /* {{{ */
177   gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
178
179   /* Initialize `vl' */
180   value_list_t vl = VALUE_LIST_INIT;
181   sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
182   vl.interval = plugin_get_interval();
183
184   /* Write queue */
185   sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
186
187   /* Write queue : queue length */
188   vl.values = &(value_t){.gauge = copy_write_queue_length};
189   vl.values_len = 1;
190   sstrncpy(vl.type, "queue_length", sizeof(vl.type));
191   vl.type_instance[0] = 0;
192   plugin_dispatch_values(&vl);
193
194   /* Write queue : Values dropped (queue length > low limit) */
195   vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
196   vl.values_len = 1;
197   sstrncpy(vl.type, "derive", sizeof(vl.type));
198   sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
199   plugin_dispatch_values(&vl);
200
201   /* Cache */
202   sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
203
204   /* Cache : Nb entry in cache tree */
205   vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
206   vl.values_len = 1;
207   sstrncpy(vl.type, "cache_size", sizeof(vl.type));
208   vl.type_instance[0] = 0;
209   plugin_dispatch_values(&vl);
210
211   return 0;
212 } /* }}} int plugin_update_internal_statistics */
213
214 static void free_userdata(user_data_t const *ud) /* {{{ */
215 {
216   if (ud == NULL)
217     return;
218
219   if ((ud->data != NULL) && (ud->free_func != NULL)) {
220     ud->free_func(ud->data);
221   }
222 } /* }}} void free_userdata */
223
224 static void destroy_callback(callback_func_t *cf) /* {{{ */
225 {
226   if (cf == NULL)
227     return;
228   free_userdata(&cf->cf_udata);
229   sfree(cf);
230 } /* }}} void destroy_callback */
231
232 static void destroy_all_callbacks(llist_t **list) /* {{{ */
233 {
234   llentry_t *le;
235
236   if (*list == NULL)
237     return;
238
239   le = llist_head(*list);
240   while (le != NULL) {
241     llentry_t *le_next;
242
243     le_next = le->next;
244
245     sfree(le->key);
246     destroy_callback(le->value);
247     le->value = NULL;
248
249     le = le_next;
250   }
251
252   llist_destroy(*list);
253   *list = NULL;
254 } /* }}} void destroy_all_callbacks */
255
256 static void destroy_read_heap(void) /* {{{ */
257 {
258   if (read_heap == NULL)
259     return;
260
261   while (42) {
262     read_func_t *rf;
263
264     rf = c_heap_get_root(read_heap);
265     if (rf == NULL)
266       break;
267     sfree(rf->rf_name);
268     destroy_callback((callback_func_t *)rf);
269   }
270
271   c_heap_destroy(read_heap);
272   read_heap = NULL;
273 } /* }}} void destroy_read_heap */
274
275 static int register_callback(llist_t **list, /* {{{ */
276                              const char *name, callback_func_t *cf) {
277
278   if (*list == NULL) {
279     *list = llist_create();
280     if (*list == NULL) {
281       ERROR("plugin: register_callback: "
282             "llist_create failed.");
283       destroy_callback(cf);
284       return -1;
285     }
286   }
287
288   char *key = strdup(name);
289   if (key == NULL) {
290     ERROR("plugin: register_callback: strdup failed.");
291     destroy_callback(cf);
292     return -1;
293   }
294
295   llentry_t *le = llist_search(*list, name);
296   if (le == NULL) {
297     le = llentry_create(key, cf);
298     if (le == NULL) {
299       ERROR("plugin: register_callback: "
300             "llentry_create failed.");
301       sfree(key);
302       destroy_callback(cf);
303       return -1;
304     }
305
306     llist_append(*list, le);
307   } else {
308     callback_func_t *old_cf = le->value;
309     le->value = cf;
310
311     P_WARNING("register_callback: "
312               "a callback named `%s' already exists - "
313               "overwriting the old entry!",
314               name);
315
316     destroy_callback(old_cf);
317     sfree(key);
318   }
319
320   return 0;
321 } /* }}} int register_callback */
322
323 static void log_list_callbacks(llist_t **list, /* {{{ */
324                                const char *comment) {
325   char *str;
326   int len;
327   int i;
328   llentry_t *le;
329   int n;
330
331   n = llist_size(*list);
332   if (n == 0) {
333     INFO("%s: [none]", comment);
334     return;
335   }
336
337   char **keys = calloc(n, sizeof(*keys));
338   if (keys == NULL) {
339     ERROR("%s: failed to allocate memory for list of callbacks", comment);
340     return;
341   }
342
343   for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) {
344     keys[i] = le->key;
345     len += strlen(le->key) + 6;
346   }
347   str = malloc(len + 10);
348   if (str == NULL) {
349     ERROR("%s: failed to allocate memory for list of callbacks", comment);
350   } else {
351     *str = '\0';
352     strjoin(str, len, keys, n, "', '");
353     INFO("%s ['%s']", comment, str);
354     sfree(str);
355   }
356   sfree(keys);
357 } /* }}} void log_list_callbacks */
358
359 static int create_register_callback(llist_t **list, /* {{{ */
360                                     const char *name, void *callback,
361                                     user_data_t const *ud) {
362
363   if (name == NULL || callback == NULL)
364     return EINVAL;
365
366   callback_func_t *cf = calloc(1, sizeof(*cf));
367   if (cf == NULL) {
368     free_userdata(ud);
369     ERROR("plugin: create_register_callback: calloc failed.");
370     return ENOMEM;
371   }
372
373   cf->cf_callback = callback;
374   if (ud == NULL) {
375     cf->cf_udata = (user_data_t){
376         .data = NULL,
377         .free_func = NULL,
378     };
379   } else {
380     cf->cf_udata = *ud;
381   }
382
383   cf->cf_ctx = plugin_get_ctx();
384
385   return register_callback(list, name, cf);
386 } /* }}} int create_register_callback */
387
388 static int plugin_unregister(llist_t *list, const char *name) /* {{{ */
389 {
390   llentry_t *e;
391
392   if (list == NULL)
393     return -1;
394
395   e = llist_search(list, name);
396   if (e == NULL)
397     return -1;
398
399   llist_remove(list, e);
400
401   sfree(e->key);
402   destroy_callback(e->value);
403
404   llentry_destroy(e);
405
406   return 0;
407 } /* }}} int plugin_unregister */
408
409 /* plugin_load_file loads the shared object "file" and calls its
410  * "module_register" function. Returns zero on success, non-zero otherwise. */
411 static int plugin_load_file(char const *file, bool global) {
412   int flags = RTLD_NOW;
413   if (global)
414     flags |= RTLD_GLOBAL;
415
416   void *dlh = dlopen(file, flags);
417   if (dlh == NULL) {
418     char errbuf[1024] = "";
419
420     snprintf(errbuf, sizeof(errbuf),
421              "dlopen(\"%s\") failed: %s. "
422              "The most common cause for this problem is missing dependencies. "
423              "Use ldd(1) to check the dependencies of the plugin / shared "
424              "object.",
425              file, dlerror());
426
427     /* This error is printed to STDERR unconditionally. If list_log is NULL,
428      * plugin_log() will also print to STDERR. We avoid duplicate output by
429      * checking that the list of log handlers, list_log, is not NULL. */
430     fprintf(stderr, "ERROR: %s\n", errbuf);
431     if (list_log != NULL) {
432       ERROR("%s", errbuf);
433     }
434
435     return ENOENT;
436   }
437
438   void (*reg_handle)(void) = dlsym(dlh, "module_register");
439   if (reg_handle == NULL) {
440     ERROR("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file,
441           dlerror());
442     dlclose(dlh);
443     return ENOENT;
444   }
445
446   (*reg_handle)();
447   return 0;
448 }
449
450 static void *plugin_read_thread(void __attribute__((unused)) * args) {
451   while (read_loop != 0) {
452     read_func_t *rf;
453     plugin_ctx_t old_ctx;
454     cdtime_t start;
455     cdtime_t now;
456     cdtime_t elapsed;
457     int status;
458     int rf_type;
459     int rc;
460
461     /* Get the read function that needs to be read next.
462      * We don't need to hold "read_lock" for the heap, but we need
463      * to call c_heap_get_root() and pthread_cond_wait() in the
464      * same protected block. */
465     pthread_mutex_lock(&read_lock);
466     rf = c_heap_get_root(read_heap);
467     if (rf == NULL) {
468       pthread_cond_wait(&read_cond, &read_lock);
469       pthread_mutex_unlock(&read_lock);
470       continue;
471     }
472     pthread_mutex_unlock(&read_lock);
473
474     if (rf->rf_interval == 0) {
475       /* this should not happen, because the interval is set
476        * for each plugin when loading it
477        * XXX: issue a warning? */
478       rf->rf_interval = plugin_get_interval();
479       rf->rf_effective_interval = rf->rf_interval;
480
481       rf->rf_next_read = cdtime();
482     }
483
484     /* sleep until this entry is due,
485      * using pthread_cond_timedwait */
486     pthread_mutex_lock(&read_lock);
487     /* In pthread_cond_timedwait, spurious wakeups are possible
488      * (and really happen, at least on NetBSD with > 1 CPU), thus
489      * we need to re-evaluate the condition every time
490      * pthread_cond_timedwait returns. */
491     rc = 0;
492     while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
493       rc = pthread_cond_timedwait(&read_cond, &read_lock,
494                                   &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
495     }
496
497     /* Must hold `read_lock' when accessing `rf->rf_type'. */
498     rf_type = rf->rf_type;
499     pthread_mutex_unlock(&read_lock);
500
501     /* Check if we're supposed to stop.. This may have interrupted
502      * the sleep, too. */
503     if (read_loop == 0) {
504       /* Insert `rf' again, so it can be free'd correctly */
505       c_heap_insert(read_heap, rf);
506       break;
507     }
508
509     /* The entry has been marked for deletion. The linked list
510      * entry has already been removed by `plugin_unregister_read'.
511      * All we have to do here is free the `read_func_t' and
512      * continue. */
513     if (rf_type == RF_REMOVE) {
514       DEBUG("plugin_read_thread: Destroying the `%s' "
515             "callback.",
516             rf->rf_name);
517       sfree(rf->rf_name);
518       destroy_callback((callback_func_t *)rf);
519       rf = NULL;
520       continue;
521     }
522
523     DEBUG("plugin_read_thread: Handling `%s'.", rf->rf_name);
524
525     start = cdtime();
526
527     old_ctx = plugin_set_ctx(rf->rf_ctx);
528
529     if (rf_type == RF_SIMPLE) {
530       int (*callback)(void);
531
532       callback = rf->rf_callback;
533       status = (*callback)();
534     } else {
535       plugin_read_cb callback;
536
537       assert(rf_type == RF_COMPLEX);
538
539       callback = rf->rf_callback;
540       status = (*callback)(&rf->rf_udata);
541     }
542
543     plugin_set_ctx(old_ctx);
544
545     /* If the function signals failure, we will increase the
546      * intervals in which it will be called. */
547     if (status != 0) {
548       rf->rf_effective_interval *= 2;
549       if (rf->rf_effective_interval > max_read_interval)
550         rf->rf_effective_interval = max_read_interval;
551
552       NOTICE("read-function of plugin `%s' failed. "
553              "Will suspend it for %.3f seconds.",
554              rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
555     } else {
556       /* Success: Restore the interval, if it was changed. */
557       rf->rf_effective_interval = rf->rf_interval;
558     }
559
560     /* update the ``next read due'' field */
561     now = cdtime();
562
563     /* calculate the time spent in the read function */
564     elapsed = (now - start);
565
566     if (elapsed > rf->rf_effective_interval)
567       WARNING(
568           "plugin_read_thread: read-function of the `%s' plugin took %.3f "
569           "seconds, which is above its read interval (%.3f seconds). You might "
570           "want to adjust the `Interval' or `ReadThreads' settings.",
571           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
572           CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
573
574     DEBUG("plugin_read_thread: read-function of the `%s' plugin took "
575           "%.6f seconds.",
576           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
577
578     DEBUG("plugin_read_thread: Effective interval of the "
579           "`%s' plugin is %.3f seconds.",
580           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
581
582     /* Calculate the next (absolute) time at which this function
583      * should be called. */
584     rf->rf_next_read += rf->rf_effective_interval;
585
586     /* Check, if `rf_next_read' is in the past. */
587     if (rf->rf_next_read < now) {
588       /* `rf_next_read' is in the past. Insert `now'
589        * so this value doesn't trail off into the
590        * past too much. */
591       rf->rf_next_read = now;
592     }
593
594     DEBUG("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
595           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_next_read));
596
597     /* Re-insert this read function into the heap again. */
598     c_heap_insert(read_heap, rf);
599   } /* while (read_loop) */
600
601   pthread_exit(NULL);
602   return (void *)0;
603 } /* void *plugin_read_thread */
604
605 #ifdef PTHREAD_MAX_NAMELEN_NP
606 #define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
607 #else
608 #define THREAD_NAME_MAX 16
609 #endif
610
611 static void set_thread_name(pthread_t tid, char const *name) {
612 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
613
614   /* glibc limits the length of the name and fails if the passed string
615    * is too long, so we truncate it here. */
616   char n[THREAD_NAME_MAX];
617   if (strlen(name) >= THREAD_NAME_MAX)
618     WARNING("set_thread_name(\"%s\"): name too long", name);
619   sstrncpy(n, name, sizeof(n));
620
621 #if defined(HAVE_PTHREAD_SETNAME_NP)
622   int status = pthread_setname_np(tid, n);
623   if (status != 0) {
624     ERROR("set_thread_name(\"%s\"): %s", n, STRERROR(status));
625   }
626 #else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
627   pthread_set_name_np(tid, n);
628 #endif
629
630 #endif
631 }
632
633 static void start_read_threads(size_t num) /* {{{ */
634 {
635   if (read_threads != NULL)
636     return;
637
638   read_threads = calloc(num, sizeof(*read_threads));
639   if (read_threads == NULL) {
640     ERROR("plugin: start_read_threads: calloc failed.");
641     return;
642   }
643
644   read_threads_num = 0;
645   for (size_t i = 0; i < num; i++) {
646     int status = pthread_create(read_threads + read_threads_num,
647                                 /* attr = */ NULL, plugin_read_thread,
648                                 /* arg = */ NULL);
649     if (status != 0) {
650       ERROR("plugin: start_read_threads: pthread_create failed with status %i "
651             "(%s).",
652             status, STRERROR(status));
653       return;
654     }
655
656     char name[THREAD_NAME_MAX];
657     ssnprintf(name, sizeof(name), "reader#%" PRIu64,
658               (uint64_t)read_threads_num);
659     set_thread_name(read_threads[read_threads_num], name);
660
661     read_threads_num++;
662   } /* for (i) */
663 } /* }}} void start_read_threads */
664
665 static void stop_read_threads(void) {
666   if (read_threads == NULL)
667     return;
668
669   INFO("collectd: Stopping %" PRIsz " read threads.", read_threads_num);
670
671   pthread_mutex_lock(&read_lock);
672   read_loop = 0;
673   DEBUG("plugin: stop_read_threads: Signalling `read_cond'");
674   pthread_cond_broadcast(&read_cond);
675   pthread_mutex_unlock(&read_lock);
676
677   for (size_t i = 0; i < read_threads_num; i++) {
678     if (pthread_join(read_threads[i], NULL) != 0) {
679       ERROR("plugin: stop_read_threads: pthread_join failed.");
680     }
681     read_threads[i] = (pthread_t)0;
682   }
683   sfree(read_threads);
684   read_threads_num = 0;
685 } /* void stop_read_threads */
686
687 static void plugin_value_list_free(value_list_t *vl) /* {{{ */
688 {
689   if (vl == NULL)
690     return;
691
692   meta_data_destroy(vl->meta);
693   sfree(vl->values);
694   sfree(vl);
695 } /* }}} void plugin_value_list_free */
696
697 static value_list_t *
698 plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */
699 {
700   value_list_t *vl;
701
702   if (vl_orig == NULL)
703     return NULL;
704
705   vl = malloc(sizeof(*vl));
706   if (vl == NULL)
707     return NULL;
708   memcpy(vl, vl_orig, sizeof(*vl));
709
710   if (vl->host[0] == 0)
711     sstrncpy(vl->host, hostname_g, sizeof(vl->host));
712
713   vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
714   if (vl->values == NULL) {
715     plugin_value_list_free(vl);
716     return NULL;
717   }
718   memcpy(vl->values, vl_orig->values,
719          vl_orig->values_len * sizeof(*vl->values));
720
721   vl->meta = meta_data_clone(vl->meta);
722   if ((vl_orig->meta != NULL) && (vl->meta == NULL)) {
723     plugin_value_list_free(vl);
724     return NULL;
725   }
726
727   if (vl->time == 0)
728     vl->time = cdtime();
729
730   /* Fill in the interval from the thread context, if it is zero. */
731   if (vl->interval == 0)
732     vl->interval = plugin_get_interval();
733
734   return vl;
735 } /* }}} value_list_t *plugin_value_list_clone */
736
737 static int plugin_write_enqueue(value_list_t const *vl) /* {{{ */
738 {
739   write_queue_t *q;
740
741   q = malloc(sizeof(*q));
742   if (q == NULL)
743     return ENOMEM;
744   q->next = NULL;
745
746   q->vl = plugin_value_list_clone(vl);
747   if (q->vl == NULL) {
748     sfree(q);
749     return ENOMEM;
750   }
751
752   /* Store context of caller (read plugin); otherwise, it would not be
753    * available to the write plugins when actually dispatching the
754    * value-list later on. */
755   q->ctx = plugin_get_ctx();
756
757   pthread_mutex_lock(&write_lock);
758
759   if (write_queue_tail == NULL) {
760     write_queue_head = q;
761     write_queue_tail = q;
762     write_queue_length = 1;
763   } else {
764     write_queue_tail->next = q;
765     write_queue_tail = q;
766     write_queue_length += 1;
767   }
768
769   pthread_cond_signal(&write_cond);
770   pthread_mutex_unlock(&write_lock);
771
772   return 0;
773 } /* }}} int plugin_write_enqueue */
774
775 static value_list_t *plugin_write_dequeue(void) /* {{{ */
776 {
777   write_queue_t *q;
778   value_list_t *vl;
779
780   pthread_mutex_lock(&write_lock);
781
782   while (write_loop && (write_queue_head == NULL))
783     pthread_cond_wait(&write_cond, &write_lock);
784
785   if (write_queue_head == NULL) {
786     pthread_mutex_unlock(&write_lock);
787     return NULL;
788   }
789
790   q = write_queue_head;
791   write_queue_head = q->next;
792   write_queue_length -= 1;
793   if (write_queue_head == NULL) {
794     write_queue_tail = NULL;
795     assert(0 == write_queue_length);
796   }
797
798   pthread_mutex_unlock(&write_lock);
799
800   (void)plugin_set_ctx(q->ctx);
801
802   vl = q->vl;
803   sfree(q);
804   return vl;
805 } /* }}} value_list_t *plugin_write_dequeue */
806
807 static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */
808 {
809   while (write_loop) {
810     value_list_t *vl = plugin_write_dequeue();
811     if (vl == NULL)
812       continue;
813
814     plugin_dispatch_values_internal(vl);
815
816     plugin_value_list_free(vl);
817   }
818
819   pthread_exit(NULL);
820   return (void *)0;
821 } /* }}} void *plugin_write_thread */
822
823 static void start_write_threads(size_t num) /* {{{ */
824 {
825   if (write_threads != NULL)
826     return;
827
828   write_threads = calloc(num, sizeof(*write_threads));
829   if (write_threads == NULL) {
830     ERROR("plugin: start_write_threads: calloc failed.");
831     return;
832   }
833
834   write_threads_num = 0;
835   for (size_t i = 0; i < num; i++) {
836     int status = pthread_create(write_threads + write_threads_num,
837                                 /* attr = */ NULL, plugin_write_thread,
838                                 /* arg = */ NULL);
839     if (status != 0) {
840       ERROR("plugin: start_write_threads: pthread_create failed with status %i "
841             "(%s).",
842             status, STRERROR(status));
843       return;
844     }
845
846     char name[THREAD_NAME_MAX];
847     ssnprintf(name, sizeof(name), "writer#%" PRIu64,
848               (uint64_t)write_threads_num);
849     set_thread_name(write_threads[write_threads_num], name);
850
851     write_threads_num++;
852   } /* for (i) */
853 } /* }}} void start_write_threads */
854
855 static void stop_write_threads(void) /* {{{ */
856 {
857   write_queue_t *q;
858   size_t i;
859
860   if (write_threads == NULL)
861     return;
862
863   INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num);
864
865   pthread_mutex_lock(&write_lock);
866   write_loop = false;
867   DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
868   pthread_cond_broadcast(&write_cond);
869   pthread_mutex_unlock(&write_lock);
870
871   for (i = 0; i < write_threads_num; i++) {
872     if (pthread_join(write_threads[i], NULL) != 0) {
873       ERROR("plugin: stop_write_threads: pthread_join failed.");
874     }
875     write_threads[i] = (pthread_t)0;
876   }
877   sfree(write_threads);
878   write_threads_num = 0;
879
880   pthread_mutex_lock(&write_lock);
881   i = 0;
882   for (q = write_queue_head; q != NULL;) {
883     write_queue_t *q1 = q;
884     plugin_value_list_free(q->vl);
885     q = q->next;
886     sfree(q1);
887     i++;
888   }
889   write_queue_head = NULL;
890   write_queue_tail = NULL;
891   write_queue_length = 0;
892   pthread_mutex_unlock(&write_lock);
893
894   if (i > 0) {
895     WARNING("plugin: %" PRIsz " value list%s left after shutting down "
896             "the write threads.",
897             i, (i == 1) ? " was" : "s were");
898   }
899 } /* }}} void stop_write_threads */
900
901 /*
902  * Public functions
903  */
904 void plugin_set_dir(const char *dir) {
905   sfree(plugindir);
906
907   if (dir == NULL) {
908     plugindir = NULL;
909     return;
910   }
911
912   plugindir = strdup(dir);
913   if (plugindir == NULL)
914     ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
915 }
916
917 bool plugin_is_loaded(char const *name) {
918   if (plugins_loaded == NULL)
919     plugins_loaded =
920         c_avl_create((int (*)(const void *, const void *))strcasecmp);
921   assert(plugins_loaded != NULL);
922
923   int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
924   return status == 0;
925 }
926
927 static int plugin_mark_loaded(char const *name) {
928   char *name_copy;
929   int status;
930
931   name_copy = strdup(name);
932   if (name_copy == NULL)
933     return ENOMEM;
934
935   status = c_avl_insert(plugins_loaded,
936                         /* key = */ name_copy, /* value = */ NULL);
937   return status;
938 }
939
940 static void plugin_free_loaded(void) {
941   void *key;
942   void *value;
943
944   if (plugins_loaded == NULL)
945     return;
946
947   while (c_avl_pick(plugins_loaded, &key, &value) == 0) {
948     sfree(key);
949     assert(value == NULL);
950   }
951
952   c_avl_destroy(plugins_loaded);
953   plugins_loaded = NULL;
954 }
955
956 #define BUFSIZE 512
957 #ifdef WIN32
958 #define SHLIB_SUFFIX ".dll"
959 #else
960 #define SHLIB_SUFFIX ".so"
961 #endif
962 int plugin_load(char const *plugin_name, bool global) {
963   DIR *dh;
964   const char *dir;
965   char filename[BUFSIZE] = "";
966   char typename[BUFSIZE];
967   int ret;
968   struct stat statbuf;
969   struct dirent *de;
970   int status;
971
972   if (plugin_name == NULL)
973     return EINVAL;
974
975   /* Check if plugin is already loaded and don't do anything in this
976    * case. */
977   if (plugin_is_loaded(plugin_name))
978     return 0;
979
980   dir = plugin_get_dir();
981   ret = 1;
982
983   /*
984    * XXX: Magic at work:
985    *
986    * Some of the language bindings, for example the Python and Perl
987    * plugins, need to be able to export symbols to the scripts they run.
988    * For this to happen, the "Globals" flag needs to be set.
989    * Unfortunately, this technical detail is hard to explain to the
990    * average user and she shouldn't have to worry about this, ideally.
991    * So in order to save everyone's sanity use a different default for a
992    * handful of special plugins. --octo
993    */
994   if ((strcasecmp("perl", plugin_name) == 0) ||
995       (strcasecmp("python", plugin_name) == 0))
996     global = true;
997
998   /* `cpu' should not match `cpufreq'. To solve this we add SHLIB_SUFFIX to the
999    * type when matching the filename */
1000   status = snprintf(typename, sizeof(typename), "%s" SHLIB_SUFFIX, plugin_name);
1001   if ((status < 0) || ((size_t)status >= sizeof(typename))) {
1002     WARNING("plugin_load: Filename too long: \"%s" SHLIB_SUFFIX "\"",
1003             plugin_name);
1004     return -1;
1005   }
1006
1007   if ((dh = opendir(dir)) == NULL) {
1008     ERROR("plugin_load: opendir (%s) failed: %s", dir, STRERRNO);
1009     return -1;
1010   }
1011
1012   while ((de = readdir(dh)) != NULL) {
1013     if (strcasecmp(de->d_name, typename))
1014       continue;
1015
1016     status = snprintf(filename, sizeof(filename), "%s/%s", dir, de->d_name);
1017     if ((status < 0) || ((size_t)status >= sizeof(filename))) {
1018       WARNING("plugin_load: Filename too long: \"%s/%s\"", dir, de->d_name);
1019       continue;
1020     }
1021
1022     if (lstat(filename, &statbuf) == -1) {
1023       WARNING("plugin_load: stat (\"%s\") failed: %s", filename, STRERRNO);
1024       continue;
1025     } else if (!S_ISREG(statbuf.st_mode)) {
1026       /* don't follow symlinks */
1027       WARNING("plugin_load: %s is not a regular file.", filename);
1028       continue;
1029     }
1030
1031     status = plugin_load_file(filename, global);
1032     if (status == 0) {
1033       /* success */
1034       plugin_mark_loaded(plugin_name);
1035       ret = 0;
1036       INFO("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1037       break;
1038     } else {
1039       ERROR("plugin_load: Load plugin \"%s\" failed with "
1040             "status %i.",
1041             plugin_name, status);
1042     }
1043   }
1044
1045   closedir(dh);
1046
1047   if (filename[0] == 0)
1048     ERROR("plugin_load: Could not find plugin \"%s\" in %s", plugin_name, dir);
1049
1050   return ret;
1051 }
1052
1053 /*
1054  * The `register_*' functions follow
1055  */
1056 EXPORT int plugin_register_config(const char *name,
1057                                   int (*callback)(const char *key,
1058                                                   const char *val),
1059                                   const char **keys, int keys_num) {
1060   cf_register(name, callback, keys, keys_num);
1061   return 0;
1062 } /* int plugin_register_config */
1063
1064 EXPORT int plugin_register_complex_config(const char *type,
1065                                           int (*callback)(oconfig_item_t *)) {
1066   return cf_register_complex(type, callback);
1067 } /* int plugin_register_complex_config */
1068
1069 EXPORT int plugin_register_init(const char *name, int (*callback)(void)) {
1070   return create_register_callback(&list_init, name, (void *)callback, NULL);
1071 } /* plugin_register_init */
1072
1073 static int plugin_compare_read_func(const void *arg0, const void *arg1) {
1074   const read_func_t *rf0;
1075   const read_func_t *rf1;
1076
1077   rf0 = arg0;
1078   rf1 = arg1;
1079
1080   if (rf0->rf_next_read < rf1->rf_next_read)
1081     return -1;
1082   else if (rf0->rf_next_read > rf1->rf_next_read)
1083     return 1;
1084   else
1085     return 0;
1086 } /* int plugin_compare_read_func */
1087
1088 /* Add a read function to both, the heap and a linked list. The linked list if
1089  * used to look-up read functions, especially for the remove function. The heap
1090  * is used to determine which plugin to read next. */
1091 static int plugin_insert_read(read_func_t *rf) {
1092   int status;
1093   llentry_t *le;
1094
1095   rf->rf_next_read = cdtime();
1096   rf->rf_effective_interval = rf->rf_interval;
1097
1098   pthread_mutex_lock(&read_lock);
1099
1100   if (read_list == NULL) {
1101     read_list = llist_create();
1102     if (read_list == NULL) {
1103       pthread_mutex_unlock(&read_lock);
1104       ERROR("plugin_insert_read: read_list failed.");
1105       return -1;
1106     }
1107   }
1108
1109   if (read_heap == NULL) {
1110     read_heap = c_heap_create(plugin_compare_read_func);
1111     if (read_heap == NULL) {
1112       pthread_mutex_unlock(&read_lock);
1113       ERROR("plugin_insert_read: c_heap_create failed.");
1114       return -1;
1115     }
1116   }
1117
1118   le = llist_search(read_list, rf->rf_name);
1119   if (le != NULL) {
1120     pthread_mutex_unlock(&read_lock);
1121     P_WARNING("The read function \"%s\" is already registered. "
1122               "Check for duplicates in your configuration!",
1123               rf->rf_name);
1124     return EINVAL;
1125   }
1126
1127   le = llentry_create(rf->rf_name, rf);
1128   if (le == NULL) {
1129     pthread_mutex_unlock(&read_lock);
1130     ERROR("plugin_insert_read: llentry_create failed.");
1131     return -1;
1132   }
1133
1134   status = c_heap_insert(read_heap, rf);
1135   if (status != 0) {
1136     pthread_mutex_unlock(&read_lock);
1137     ERROR("plugin_insert_read: c_heap_insert failed.");
1138     llentry_destroy(le);
1139     return -1;
1140   }
1141
1142   /* This does not fail. */
1143   llist_append(read_list, le);
1144
1145   /* Wake up all the read threads. */
1146   pthread_cond_broadcast(&read_cond);
1147   pthread_mutex_unlock(&read_lock);
1148   return 0;
1149 } /* int plugin_insert_read */
1150
1151 EXPORT int plugin_register_read(const char *name, int (*callback)(void)) {
1152   read_func_t *rf;
1153   int status;
1154
1155   rf = calloc(1, sizeof(*rf));
1156   if (rf == NULL) {
1157     ERROR("plugin_register_read: calloc failed.");
1158     return ENOMEM;
1159   }
1160
1161   rf->rf_callback = (void *)callback;
1162   rf->rf_udata.data = NULL;
1163   rf->rf_udata.free_func = NULL;
1164   rf->rf_ctx = plugin_get_ctx();
1165   rf->rf_group[0] = '\0';
1166   rf->rf_name = strdup(name);
1167   rf->rf_type = RF_SIMPLE;
1168   rf->rf_interval = plugin_get_interval();
1169   rf->rf_ctx.interval = rf->rf_interval;
1170
1171   status = plugin_insert_read(rf);
1172   if (status != 0) {
1173     sfree(rf->rf_name);
1174     sfree(rf);
1175   }
1176
1177   return status;
1178 } /* int plugin_register_read */
1179
1180 EXPORT int plugin_register_complex_read(const char *group, const char *name,
1181                                         plugin_read_cb callback,
1182                                         cdtime_t interval,
1183                                         user_data_t const *user_data) {
1184   read_func_t *rf;
1185   int status;
1186
1187   rf = calloc(1, sizeof(*rf));
1188   if (rf == NULL) {
1189     free_userdata(user_data);
1190     ERROR("plugin_register_complex_read: calloc failed.");
1191     return ENOMEM;
1192   }
1193
1194   rf->rf_callback = (void *)callback;
1195   if (group != NULL)
1196     sstrncpy(rf->rf_group, group, sizeof(rf->rf_group));
1197   else
1198     rf->rf_group[0] = '\0';
1199   rf->rf_name = strdup(name);
1200   rf->rf_type = RF_COMPLEX;
1201   rf->rf_interval = (interval != 0) ? interval : plugin_get_interval();
1202
1203   /* Set user data */
1204   if (user_data == NULL) {
1205     rf->rf_udata.data = NULL;
1206     rf->rf_udata.free_func = NULL;
1207   } else {
1208     rf->rf_udata = *user_data;
1209   }
1210
1211   rf->rf_ctx = plugin_get_ctx();
1212   rf->rf_ctx.interval = rf->rf_interval;
1213
1214   status = plugin_insert_read(rf);
1215   if (status != 0) {
1216     free_userdata(&rf->rf_udata);
1217     sfree(rf->rf_name);
1218     sfree(rf);
1219   }
1220
1221   return status;
1222 } /* int plugin_register_complex_read */
1223
1224 EXPORT int plugin_register_write(const char *name, plugin_write_cb callback,
1225                                  user_data_t const *ud) {
1226   return create_register_callback(&list_write, name, (void *)callback, ud);
1227 } /* int plugin_register_write */
1228
1229 static int plugin_flush_timeout_callback(user_data_t *ud) {
1230   flush_callback_t *cb = ud->data;
1231
1232   return plugin_flush(cb->name, cb->timeout, NULL);
1233 } /* static int plugin_flush_callback */
1234
1235 static void plugin_flush_timeout_callback_free(void *data) {
1236   flush_callback_t *cb = data;
1237
1238   if (cb == NULL)
1239     return;
1240
1241   sfree(cb->name);
1242   sfree(cb);
1243 } /* static void plugin_flush_callback_free */
1244
1245 static char *plugin_flush_callback_name(const char *name) {
1246   const char *flush_prefix = "flush/";
1247   size_t prefix_size;
1248   char *flush_name;
1249   size_t name_size;
1250
1251   prefix_size = strlen(flush_prefix);
1252   name_size = strlen(name);
1253
1254   flush_name = malloc(name_size + prefix_size + 1);
1255   if (flush_name == NULL) {
1256     ERROR("plugin_flush_callback_name: malloc failed.");
1257     return NULL;
1258   }
1259
1260   sstrncpy(flush_name, flush_prefix, prefix_size + 1);
1261   sstrncpy(flush_name + prefix_size, name, name_size + 1);
1262
1263   return flush_name;
1264 } /* static char *plugin_flush_callback_name */
1265
1266 EXPORT int plugin_register_flush(const char *name, plugin_flush_cb callback,
1267                                  user_data_t const *ud) {
1268   int status;
1269   plugin_ctx_t ctx = plugin_get_ctx();
1270
1271   status = create_register_callback(&list_flush, name, (void *)callback, ud);
1272   if (status != 0)
1273     return status;
1274
1275   if (ctx.flush_interval != 0) {
1276     char *flush_name;
1277     flush_callback_t *cb;
1278
1279     flush_name = plugin_flush_callback_name(name);
1280     if (flush_name == NULL)
1281       return -1;
1282
1283     cb = malloc(sizeof(*cb));
1284     if (cb == NULL) {
1285       ERROR("plugin_register_flush: malloc failed.");
1286       sfree(flush_name);
1287       return -1;
1288     }
1289
1290     cb->name = strdup(name);
1291     if (cb->name == NULL) {
1292       ERROR("plugin_register_flush: strdup failed.");
1293       sfree(cb);
1294       sfree(flush_name);
1295       return -1;
1296     }
1297     cb->timeout = ctx.flush_timeout;
1298
1299     status = plugin_register_complex_read(
1300         /* group     = */ "flush",
1301         /* name      = */ flush_name,
1302         /* callback  = */ plugin_flush_timeout_callback,
1303         /* interval  = */ ctx.flush_interval,
1304         /* user data = */
1305         &(user_data_t){
1306             .data = cb,
1307             .free_func = plugin_flush_timeout_callback_free,
1308         });
1309
1310     sfree(flush_name);
1311     return status;
1312   }
1313
1314   return 0;
1315 } /* int plugin_register_flush */
1316
1317 EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback,
1318                                    user_data_t const *ud) {
1319   return create_register_callback(&list_missing, name, (void *)callback, ud);
1320 } /* int plugin_register_missing */
1321
1322 EXPORT int plugin_register_cache_event(const char *name,
1323                                        plugin_cache_event_cb callback,
1324                                        user_data_t const *ud) {
1325
1326   if (name == NULL || callback == NULL)
1327     return EINVAL;
1328
1329   char *name_copy = strdup(name);
1330   if (name_copy == NULL) {
1331     P_ERROR("plugin_register_cache_event: strdup failed.");
1332     free_userdata(ud);
1333     return ENOMEM;
1334   }
1335
1336   if (list_cache_event_num >= 32) {
1337     P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
1338             "to be registered.");
1339     free_userdata(ud);
1340     return ENOMEM;
1341   }
1342
1343   for (size_t i = 0; i < list_cache_event_num; i++) {
1344     cache_event_func_t *cef = &list_cache_event[i];
1345     if (!cef->callback)
1346       continue;
1347
1348     if (strcmp(name, cef->name) == 0) {
1349       P_ERROR("plugin_register_cache_event: a callback named `%s' already "
1350               "registered!",
1351               name);
1352       free_userdata(ud);
1353       return -1;
1354     }
1355   }
1356
1357   user_data_t user_data;
1358   if (ud == NULL) {
1359     user_data = (user_data_t){
1360         .data = NULL, .free_func = NULL,
1361     };
1362   } else {
1363     user_data = *ud;
1364   }
1365
1366   list_cache_event[list_cache_event_num] =
1367       (cache_event_func_t){.callback = callback,
1368                            .name = name_copy,
1369                            .user_data = user_data,
1370                            .plugin_ctx = plugin_get_ctx()};
1371   list_cache_event_num++;
1372
1373   return 0;
1374 } /* int plugin_register_cache_event */
1375
1376 EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
1377   return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
1378 } /* int plugin_register_shutdown */
1379
1380 static void plugin_free_data_sets(void) {
1381   void *key;
1382   void *value;
1383
1384   if (data_sets == NULL)
1385     return;
1386
1387   while (c_avl_pick(data_sets, &key, &value) == 0) {
1388     data_set_t *ds = value;
1389     /* key is a pointer to ds->type */
1390
1391     sfree(ds->ds);
1392     sfree(ds);
1393   }
1394
1395   c_avl_destroy(data_sets);
1396   data_sets = NULL;
1397 } /* void plugin_free_data_sets */
1398
1399 EXPORT int plugin_register_data_set(const data_set_t *ds) {
1400   data_set_t *ds_copy;
1401
1402   if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) {
1403     NOTICE("Replacing DS `%s' with another version.", ds->type);
1404     plugin_unregister_data_set(ds->type);
1405   } else if (data_sets == NULL) {
1406     data_sets = c_avl_create((int (*)(const void *, const void *))strcmp);
1407     if (data_sets == NULL)
1408       return -1;
1409   }
1410
1411   ds_copy = malloc(sizeof(*ds_copy));
1412   if (ds_copy == NULL)
1413     return -1;
1414   memcpy(ds_copy, ds, sizeof(data_set_t));
1415
1416   ds_copy->ds = malloc(sizeof(*ds_copy->ds) * ds->ds_num);
1417   if (ds_copy->ds == NULL) {
1418     sfree(ds_copy);
1419     return -1;
1420   }
1421
1422   for (size_t i = 0; i < ds->ds_num; i++)
1423     memcpy(ds_copy->ds + i, ds->ds + i, sizeof(data_source_t));
1424
1425   return c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy);
1426 } /* int plugin_register_data_set */
1427
1428 EXPORT int plugin_register_log(const char *name, plugin_log_cb callback,
1429                                user_data_t const *ud) {
1430   return create_register_callback(&list_log, name, (void *)callback, ud);
1431 } /* int plugin_register_log */
1432
1433 EXPORT int plugin_register_notification(const char *name,
1434                                         plugin_notification_cb callback,
1435                                         user_data_t const *ud) {
1436   return create_register_callback(&list_notification, name, (void *)callback,
1437                                   ud);
1438 } /* int plugin_register_log */
1439
1440 EXPORT int plugin_unregister_config(const char *name) {
1441   cf_unregister(name);
1442   return 0;
1443 } /* int plugin_unregister_config */
1444
1445 EXPORT int plugin_unregister_complex_config(const char *name) {
1446   cf_unregister_complex(name);
1447   return 0;
1448 } /* int plugin_unregister_complex_config */
1449
1450 EXPORT int plugin_unregister_init(const char *name) {
1451   return plugin_unregister(list_init, name);
1452 }
1453
1454 EXPORT int plugin_unregister_read(const char *name) /* {{{ */
1455 {
1456   llentry_t *le;
1457   read_func_t *rf;
1458
1459   if (name == NULL)
1460     return -ENOENT;
1461
1462   pthread_mutex_lock(&read_lock);
1463
1464   if (read_list == NULL) {
1465     pthread_mutex_unlock(&read_lock);
1466     return -ENOENT;
1467   }
1468
1469   le = llist_search(read_list, name);
1470   if (le == NULL) {
1471     pthread_mutex_unlock(&read_lock);
1472     WARNING("plugin_unregister_read: No such read function: %s", name);
1473     return -ENOENT;
1474   }
1475
1476   llist_remove(read_list, le);
1477
1478   rf = le->value;
1479   assert(rf != NULL);
1480   rf->rf_type = RF_REMOVE;
1481
1482   pthread_mutex_unlock(&read_lock);
1483
1484   llentry_destroy(le);
1485
1486   DEBUG("plugin_unregister_read: Marked `%s' for removal.", name);
1487
1488   return 0;
1489 } /* }}} int plugin_unregister_read */
1490
1491 EXPORT void plugin_log_available_writers(void) {
1492   log_list_callbacks(&list_write, "Available write targets:");
1493 }
1494
1495 static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */
1496 {
1497   read_func_t *rf = e->value;
1498   char *group = ud;
1499
1500   return strcmp(rf->rf_group, (const char *)group);
1501 } /* }}} int compare_read_func_group */
1502
1503 EXPORT int plugin_unregister_read_group(const char *group) /* {{{ */
1504 {
1505   llentry_t *le;
1506   read_func_t *rf;
1507
1508   int found = 0;
1509
1510   if (group == NULL)
1511     return -ENOENT;
1512
1513   pthread_mutex_lock(&read_lock);
1514
1515   if (read_list == NULL) {
1516     pthread_mutex_unlock(&read_lock);
1517     return -ENOENT;
1518   }
1519
1520   while (42) {
1521     le = llist_search_custom(read_list, compare_read_func_group, (void *)group);
1522
1523     if (le == NULL)
1524       break;
1525
1526     ++found;
1527
1528     llist_remove(read_list, le);
1529
1530     rf = le->value;
1531     assert(rf != NULL);
1532     rf->rf_type = RF_REMOVE;
1533
1534     llentry_destroy(le);
1535
1536     DEBUG("plugin_unregister_read_group: "
1537           "Marked `%s' (group `%s') for removal.",
1538           rf->rf_name, group);
1539   }
1540
1541   pthread_mutex_unlock(&read_lock);
1542
1543   if (found == 0) {
1544     WARNING("plugin_unregister_read_group: No such "
1545             "group of read function: %s",
1546             group);
1547     return -ENOENT;
1548   }
1549
1550   return 0;
1551 } /* }}} int plugin_unregister_read_group */
1552
1553 EXPORT int plugin_unregister_write(const char *name) {
1554   return plugin_unregister(list_write, name);
1555 }
1556
1557 EXPORT int plugin_unregister_flush(const char *name) {
1558   plugin_ctx_t ctx = plugin_get_ctx();
1559
1560   if (ctx.flush_interval != 0) {
1561     char *flush_name;
1562
1563     flush_name = plugin_flush_callback_name(name);
1564     if (flush_name != NULL) {
1565       plugin_unregister_read(flush_name);
1566       sfree(flush_name);
1567     }
1568   }
1569
1570   return plugin_unregister(list_flush, name);
1571 }
1572
1573 EXPORT int plugin_unregister_missing(const char *name) {
1574   return plugin_unregister(list_missing, name);
1575 }
1576
1577 EXPORT int plugin_unregister_cache_event(const char *name) {
1578   for (size_t i = 0; i < list_cache_event_num; i++) {
1579     cache_event_func_t *cef = &list_cache_event[i];
1580     if (!cef->callback)
1581       continue;
1582     if (strcmp(name, cef->name) == 0) {
1583       /* Mark callback as inactive, so mask in cache entries remains actual */
1584       cef->callback = NULL;
1585       sfree(cef->name);
1586       free_userdata(&cef->user_data);
1587     }
1588   }
1589   return 0;
1590 }
1591
1592 static void destroy_cache_event_callbacks() {
1593   for (size_t i = 0; i < list_cache_event_num; i++) {
1594     cache_event_func_t *cef = &list_cache_event[i];
1595     if (!cef->callback)
1596       continue;
1597     cef->callback = NULL;
1598     sfree(cef->name);
1599     free_userdata(&cef->user_data);
1600   }
1601 }
1602
1603 EXPORT int plugin_unregister_shutdown(const char *name) {
1604   return plugin_unregister(list_shutdown, name);
1605 }
1606
1607 EXPORT int plugin_unregister_data_set(const char *name) {
1608   data_set_t *ds;
1609
1610   if (data_sets == NULL)
1611     return -1;
1612
1613   if (c_avl_remove(data_sets, name, NULL, (void *)&ds) != 0)
1614     return -1;
1615
1616   sfree(ds->ds);
1617   sfree(ds);
1618
1619   return 0;
1620 } /* int plugin_unregister_data_set */
1621
1622 EXPORT int plugin_unregister_log(const char *name) {
1623   return plugin_unregister(list_log, name);
1624 }
1625
1626 EXPORT int plugin_unregister_notification(const char *name) {
1627   return plugin_unregister(list_notification, name);
1628 }
1629
1630 EXPORT int plugin_init_all(void) {
1631   char const *chain_name;
1632   llentry_t *le;
1633   int status;
1634   int ret = 0;
1635
1636   /* Init the value cache */
1637   uc_init();
1638
1639   if (IS_TRUE(global_option_get("CollectInternalStats"))) {
1640     record_statistics = true;
1641     plugin_register_read("collectd", plugin_update_internal_statistics);
1642   }
1643
1644   chain_name = global_option_get("PreCacheChain");
1645   pre_cache_chain = fc_chain_get_by_name(chain_name);
1646
1647   chain_name = global_option_get("PostCacheChain");
1648   post_cache_chain = fc_chain_get_by_name(chain_name);
1649
1650   write_limit_high = global_option_get_long("WriteQueueLimitHigh",
1651                                             /* default = */ 0);
1652   if (write_limit_high < 0) {
1653     ERROR("WriteQueueLimitHigh must be positive or zero.");
1654     write_limit_high = 0;
1655   }
1656
1657   write_limit_low =
1658       global_option_get_long("WriteQueueLimitLow",
1659                              /* default = */ write_limit_high / 2);
1660   if (write_limit_low < 0) {
1661     ERROR("WriteQueueLimitLow must be positive or zero.");
1662     write_limit_low = write_limit_high / 2;
1663   } else if (write_limit_low > write_limit_high) {
1664     ERROR("WriteQueueLimitLow must not be larger than "
1665           "WriteQueueLimitHigh.");
1666     write_limit_low = write_limit_high;
1667   }
1668
1669   write_threads_num = global_option_get_long("WriteThreads",
1670                                              /* default = */ 5);
1671   if (write_threads_num < 1) {
1672     ERROR("WriteThreads must be positive.");
1673     write_threads_num = 5;
1674   }
1675
1676   if ((list_init == NULL) && (read_heap == NULL))
1677     return ret;
1678
1679   /* Calling all init callbacks before checking if read callbacks
1680    * are available allows the init callbacks to register the read
1681    * callback. */
1682   le = llist_head(list_init);
1683   while (le != NULL) {
1684     callback_func_t *cf;
1685     plugin_init_cb callback;
1686     plugin_ctx_t old_ctx;
1687
1688     cf = le->value;
1689     old_ctx = plugin_set_ctx(cf->cf_ctx);
1690     callback = cf->cf_callback;
1691     status = (*callback)();
1692     plugin_set_ctx(old_ctx);
1693
1694     if (status != 0) {
1695       ERROR("Initialization of plugin `%s' "
1696             "failed with status %i. "
1697             "Plugin will be unloaded.",
1698             le->key, status);
1699       /* Plugins that register read callbacks from the init
1700        * callback should take care of appropriate error
1701        * handling themselves. */
1702       /* FIXME: Unload _all_ functions */
1703       plugin_unregister_read(le->key);
1704       ret = -1;
1705     }
1706
1707     le = le->next;
1708   }
1709
1710   start_write_threads((size_t)write_threads_num);
1711
1712   max_read_interval =
1713       global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL);
1714
1715   /* Start read-threads */
1716   if (read_heap != NULL) {
1717     const char *rt;
1718     int num;
1719
1720     rt = global_option_get("ReadThreads");
1721     num = atoi(rt);
1722     if (num != -1)
1723       start_read_threads((num > 0) ? ((size_t)num) : 5);
1724   }
1725   return ret;
1726 } /* void plugin_init_all */
1727
1728 /* TODO: Rename this function. */
1729 EXPORT void plugin_read_all(void) {
1730   uc_check_timeout();
1731
1732   return;
1733 } /* void plugin_read_all */
1734
1735 /* Read function called when the `-T' command line argument is given. */
1736 EXPORT int plugin_read_all_once(void) {
1737   int status;
1738   int return_status = 0;
1739
1740   if (read_heap == NULL) {
1741     NOTICE("No read-functions are registered.");
1742     return 0;
1743   }
1744
1745   while (42) {
1746     read_func_t *rf;
1747     plugin_ctx_t old_ctx;
1748
1749     rf = c_heap_get_root(read_heap);
1750     if (rf == NULL)
1751       break;
1752
1753     old_ctx = plugin_set_ctx(rf->rf_ctx);
1754
1755     if (rf->rf_type == RF_SIMPLE) {
1756       int (*callback)(void);
1757
1758       callback = rf->rf_callback;
1759       status = (*callback)();
1760     } else {
1761       plugin_read_cb callback;
1762
1763       callback = rf->rf_callback;
1764       status = (*callback)(&rf->rf_udata);
1765     }
1766
1767     plugin_set_ctx(old_ctx);
1768
1769     if (status != 0) {
1770       NOTICE("read-function of plugin `%s' failed.", rf->rf_name);
1771       return_status = -1;
1772     }
1773
1774     sfree(rf->rf_name);
1775     destroy_callback((void *)rf);
1776   }
1777
1778   return return_status;
1779 } /* int plugin_read_all_once */
1780
1781 EXPORT int plugin_write(const char *plugin, /* {{{ */
1782                         const data_set_t *ds, const value_list_t *vl) {
1783   llentry_t *le;
1784   int status;
1785
1786   if (vl == NULL)
1787     return EINVAL;
1788
1789   if (list_write == NULL)
1790     return ENOENT;
1791
1792   if (ds == NULL) {
1793     ds = plugin_get_ds(vl->type);
1794     if (ds == NULL) {
1795       ERROR("plugin_write: Unable to lookup type `%s'.", vl->type);
1796       return ENOENT;
1797     }
1798   }
1799
1800   if (plugin == NULL) {
1801     int success = 0;
1802     int failure = 0;
1803
1804     le = llist_head(list_write);
1805     while (le != NULL) {
1806       callback_func_t *cf = le->value;
1807       plugin_write_cb callback;
1808
1809       /* Keep the read plugin's interval and flush information but update the
1810        * plugin name. */
1811       plugin_ctx_t old_ctx = plugin_get_ctx();
1812       plugin_ctx_t ctx = old_ctx;
1813       ctx.name = cf->cf_ctx.name;
1814       plugin_set_ctx(ctx);
1815
1816       DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1817       callback = cf->cf_callback;
1818       status = (*callback)(ds, vl, &cf->cf_udata);
1819       if (status != 0)
1820         failure++;
1821       else
1822         success++;
1823
1824       plugin_set_ctx(old_ctx);
1825       le = le->next;
1826     }
1827
1828     if ((success == 0) && (failure != 0))
1829       status = -1;
1830     else
1831       status = 0;
1832   } else /* plugin != NULL */
1833   {
1834     callback_func_t *cf;
1835     plugin_write_cb callback;
1836
1837     le = llist_head(list_write);
1838     while (le != NULL) {
1839       if (strcasecmp(plugin, le->key) == 0)
1840         break;
1841
1842       le = le->next;
1843     }
1844
1845     if (le == NULL)
1846       return ENOENT;
1847
1848     cf = le->value;
1849
1850     /* do not switch plugin context; rather keep the context (interval)
1851      * information of the calling read plugin */
1852
1853     DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1854     callback = cf->cf_callback;
1855     status = (*callback)(ds, vl, &cf->cf_udata);
1856   }
1857
1858   return status;
1859 } /* }}} int plugin_write */
1860
1861 EXPORT int plugin_flush(const char *plugin, cdtime_t timeout,
1862                         const char *identifier) {
1863   llentry_t *le;
1864
1865   if (list_flush == NULL)
1866     return 0;
1867
1868   le = llist_head(list_flush);
1869   while (le != NULL) {
1870     callback_func_t *cf;
1871     plugin_flush_cb callback;
1872     plugin_ctx_t old_ctx;
1873
1874     if ((plugin != NULL) && (strcmp(plugin, le->key) != 0)) {
1875       le = le->next;
1876       continue;
1877     }
1878
1879     cf = le->value;
1880     old_ctx = plugin_set_ctx(cf->cf_ctx);
1881     callback = cf->cf_callback;
1882
1883     (*callback)(timeout, identifier, &cf->cf_udata);
1884
1885     plugin_set_ctx(old_ctx);
1886
1887     le = le->next;
1888   }
1889   return 0;
1890 } /* int plugin_flush */
1891
1892 EXPORT int plugin_shutdown_all(void) {
1893   llentry_t *le;
1894   int ret = 0; // Assume success.
1895
1896   destroy_all_callbacks(&list_init);
1897
1898   stop_read_threads();
1899
1900   pthread_mutex_lock(&read_lock);
1901   llist_destroy(read_list);
1902   read_list = NULL;
1903   pthread_mutex_unlock(&read_lock);
1904
1905   destroy_read_heap();
1906
1907   /* blocks until all write threads have shut down. */
1908   stop_write_threads();
1909
1910   /* ask all plugins to write out the state they kept. */
1911   plugin_flush(/* plugin = */ NULL,
1912                /* timeout = */ 0,
1913                /* identifier = */ NULL);
1914
1915   le = NULL;
1916   if (list_shutdown != NULL)
1917     le = llist_head(list_shutdown);
1918
1919   while (le != NULL) {
1920     callback_func_t *cf;
1921     plugin_shutdown_cb callback;
1922     plugin_ctx_t old_ctx;
1923
1924     cf = le->value;
1925     old_ctx = plugin_set_ctx(cf->cf_ctx);
1926     callback = cf->cf_callback;
1927
1928     /* Advance the pointer before calling the callback allows
1929      * shutdown functions to unregister themselves. If done the
1930      * other way around the memory `le' points to will be freed
1931      * after callback returns. */
1932     le = le->next;
1933
1934     if ((*callback)() != 0)
1935       ret = -1;
1936
1937     plugin_set_ctx(old_ctx);
1938   }
1939
1940   /* Write plugins which use the `user_data' pointer usually need the
1941    * same data available to the flush callback. If this is the case, set
1942    * the free_function to NULL when registering the flush callback and to
1943    * the real free function when registering the write callback. This way
1944    * the data isn't freed twice. */
1945   destroy_all_callbacks(&list_flush);
1946   destroy_all_callbacks(&list_missing);
1947   destroy_cache_event_callbacks();
1948   destroy_all_callbacks(&list_write);
1949
1950   destroy_all_callbacks(&list_notification);
1951   destroy_all_callbacks(&list_shutdown);
1952   destroy_all_callbacks(&list_log);
1953
1954   plugin_free_loaded();
1955   plugin_free_data_sets();
1956   return ret;
1957 } /* void plugin_shutdown_all */
1958
1959 EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
1960 {
1961   if (list_missing == NULL)
1962     return 0;
1963
1964   llentry_t *le = llist_head(list_missing);
1965   while (le != NULL) {
1966     callback_func_t *cf = le->value;
1967     plugin_ctx_t old_ctx = plugin_set_ctx(cf->cf_ctx);
1968     plugin_missing_cb callback = cf->cf_callback;
1969
1970     int status = (*callback)(vl, &cf->cf_udata);
1971     plugin_set_ctx(old_ctx);
1972     if (status != 0) {
1973       if (status < 0) {
1974         ERROR("plugin_dispatch_missing: Callback function \"%s\" "
1975               "failed with status %i.",
1976               le->key, status);
1977         return status;
1978       } else {
1979         return 0;
1980       }
1981     }
1982
1983     le = le->next;
1984   }
1985   return 0;
1986 } /* int }}} plugin_dispatch_missing */
1987
1988 void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
1989                                  unsigned long callbacks_mask, const char *name,
1990                                  const value_list_t *vl) {
1991   switch (event_type) {
1992   case CE_VALUE_NEW:
1993     callbacks_mask = 0;
1994     for (size_t i = 0; i < list_cache_event_num; i++) {
1995       cache_event_func_t *cef = &list_cache_event[i];
1996       plugin_cache_event_cb callback = cef->callback;
1997
1998       if (!callback)
1999         continue;
2000
2001       cache_event_t event = (cache_event_t){.type = event_type,
2002                                             .value_list = vl,
2003                                             .value_list_name = name,
2004                                             .ret = 0};
2005
2006       plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2007       int status = (*callback)(&event, &cef->user_data);
2008       plugin_set_ctx(old_ctx);
2009
2010       if (status != 0) {
2011         ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2012               "%i for event NEW.",
2013               cef->name, status);
2014       } else {
2015         if (event.ret) {
2016           DEBUG(
2017               "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
2018               cef->name, name);
2019           callbacks_mask |= (1 << (i));
2020         } else {
2021           DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
2022                 cef->name, name);
2023         }
2024       }
2025     }
2026
2027     if (callbacks_mask)
2028       uc_set_callbacks_mask(name, callbacks_mask);
2029
2030     break;
2031   case CE_VALUE_UPDATE:
2032   case CE_VALUE_EXPIRED:
2033     for (size_t i = 0; i < list_cache_event_num; i++) {
2034       cache_event_func_t *cef = &list_cache_event[i];
2035       plugin_cache_event_cb callback = cef->callback;
2036
2037       if (!callback)
2038         continue;
2039
2040       if (callbacks_mask && (1 << (i)) == 0)
2041         continue;
2042
2043       cache_event_t event = (cache_event_t){.type = event_type,
2044                                             .value_list = vl,
2045                                             .value_list_name = name,
2046                                             .ret = 0};
2047
2048       plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2049       int status = (*callback)(&event, &cef->user_data);
2050       plugin_set_ctx(old_ctx);
2051
2052       if (status != 0) {
2053         ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2054               "%i for event %s.",
2055               cef->name, status,
2056               ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
2057       }
2058     }
2059     break;
2060   }
2061   return;
2062 }
2063
2064 static int plugin_dispatch_values_internal(value_list_t *vl) {
2065   int status;
2066   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
2067
2068   bool free_meta_data = false;
2069
2070   assert(vl != NULL);
2071
2072   /* These fields are initialized by plugin_value_list_clone() if needed: */
2073   assert(vl->host[0] != 0);
2074   assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
2075   assert(vl->interval != 0);
2076
2077   if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
2078     ERROR("plugin_dispatch_values: Invalid value list "
2079           "from plugin %s.",
2080           vl->plugin);
2081     return -1;
2082   }
2083
2084   /* Free meta data only if the calling function didn't specify any. In
2085    * this case matches and targets may add some and the calling function
2086    * may not expect (and therefore free) that data. */
2087   if (vl->meta == NULL)
2088     free_meta_data = true;
2089
2090   if (list_write == NULL)
2091     c_complain_once(LOG_WARNING, &no_write_complaint,
2092                     "plugin_dispatch_values: No write callback has been "
2093                     "registered. Please load at least one output plugin, "
2094                     "if you want the collected data to be stored.");
2095
2096   if (data_sets == NULL) {
2097     ERROR("plugin_dispatch_values: No data sets registered. "
2098           "Could the types database be read? Check "
2099           "your `TypesDB' setting!");
2100     return -1;
2101   }
2102
2103   data_set_t *ds = NULL;
2104   if (c_avl_get(data_sets, vl->type, (void *)&ds) != 0) {
2105     char ident[6 * DATA_MAX_NAME_LEN];
2106
2107     FORMAT_VL(ident, sizeof(ident), vl);
2108     INFO("plugin_dispatch_values: Dataset not found: %s "
2109          "(from \"%s\"), check your types.db!",
2110          vl->type, ident);
2111     return -1;
2112   }
2113
2114   DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
2115         "host = %s; "
2116         "plugin = %s; plugin_instance = %s; "
2117         "type = %s; type_instance = %s;",
2118         CDTIME_T_TO_DOUBLE(vl->time), CDTIME_T_TO_DOUBLE(vl->interval),
2119         vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
2120
2121 #if COLLECT_DEBUG
2122   assert(0 == strcmp(ds->type, vl->type));
2123 #else
2124   if (0 != strcmp(ds->type, vl->type))
2125     WARNING("plugin_dispatch_values: (ds->type = %s) != (vl->type = %s)",
2126             ds->type, vl->type);
2127 #endif
2128
2129 #if COLLECT_DEBUG
2130   assert(ds->ds_num == vl->values_len);
2131 #else
2132   if (ds->ds_num != vl->values_len) {
2133     ERROR("plugin_dispatch_values: ds->type = %s: "
2134           "(ds->ds_num = %" PRIsz ") != "
2135           "(vl->values_len = %" PRIsz ")",
2136           ds->type, ds->ds_num, vl->values_len);
2137     return -1;
2138   }
2139 #endif
2140
2141   escape_slashes(vl->host, sizeof(vl->host));
2142   escape_slashes(vl->plugin, sizeof(vl->plugin));
2143   escape_slashes(vl->plugin_instance, sizeof(vl->plugin_instance));
2144   escape_slashes(vl->type, sizeof(vl->type));
2145   escape_slashes(vl->type_instance, sizeof(vl->type_instance));
2146
2147   if (pre_cache_chain != NULL) {
2148     status = fc_process_chain(ds, vl, pre_cache_chain);
2149     if (status < 0) {
2150       WARNING("plugin_dispatch_values: Running the "
2151               "pre-cache chain failed with "
2152               "status %i (%#x).",
2153               status, status);
2154     } else if (status == FC_TARGET_STOP)
2155       return 0;
2156   }
2157
2158   /* Update the value cache */
2159   uc_update(ds, vl);
2160
2161   if (post_cache_chain != NULL) {
2162     status = fc_process_chain(ds, vl, post_cache_chain);
2163     if (status < 0) {
2164       WARNING("plugin_dispatch_values: Running the "
2165               "post-cache chain failed with "
2166               "status %i (%#x).",
2167               status, status);
2168     }
2169   } else
2170     fc_default_action(ds, vl);
2171
2172   if ((free_meta_data == true) && (vl->meta != NULL)) {
2173     meta_data_destroy(vl->meta);
2174     vl->meta = NULL;
2175   }
2176
2177   return 0;
2178 } /* int plugin_dispatch_values_internal */
2179
2180 static double get_drop_probability(void) /* {{{ */
2181 {
2182   long pos;
2183   long size;
2184   long wql;
2185
2186   pthread_mutex_lock(&write_lock);
2187   wql = write_queue_length;
2188   pthread_mutex_unlock(&write_lock);
2189
2190   if (wql < write_limit_low)
2191     return 0.0;
2192   if (wql >= write_limit_high)
2193     return 1.0;
2194
2195   pos = 1 + wql - write_limit_low;
2196   size = 1 + write_limit_high - write_limit_low;
2197
2198   return (double)pos / (double)size;
2199 } /* }}} double get_drop_probability */
2200
2201 static bool check_drop_value(void) /* {{{ */
2202 {
2203   static cdtime_t last_message_time;
2204   static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2205
2206   double p;
2207   double q;
2208   int status;
2209
2210   if (write_limit_high == 0)
2211     return false;
2212
2213   p = get_drop_probability();
2214   if (p == 0.0)
2215     return false;
2216
2217   status = pthread_mutex_trylock(&last_message_lock);
2218   if (status == 0) {
2219     cdtime_t now;
2220
2221     now = cdtime();
2222     if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) {
2223       last_message_time = now;
2224       ERROR("plugin_dispatch_values: Low water mark "
2225             "reached. Dropping %.0f%% of metrics.",
2226             100.0 * p);
2227     }
2228     pthread_mutex_unlock(&last_message_lock);
2229   }
2230
2231   if (p == 1.0)
2232     return true;
2233
2234   q = cdrand_d();
2235   if (q > p)
2236     return true;
2237   else
2238     return false;
2239 } /* }}} bool check_drop_value */
2240
2241 EXPORT int plugin_dispatch_values(value_list_t const *vl) {
2242   int status;
2243
2244   if (check_drop_value()) {
2245     if (record_statistics) {
2246       pthread_mutex_lock(&statistics_lock);
2247       stats_values_dropped++;
2248       pthread_mutex_unlock(&statistics_lock);
2249     }
2250     return 0;
2251   }
2252
2253   status = plugin_write_enqueue(vl);
2254   if (status != 0) {
2255     ERROR("plugin_dispatch_values: plugin_write_enqueue failed with status %i "
2256           "(%s).",
2257           status, STRERROR(status));
2258     return status;
2259   }
2260
2261   return 0;
2262 }
2263
2264 __attribute__((sentinel)) int
2265 plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */
2266                            bool store_percentage, int store_type, ...) {
2267   value_list_t *vl;
2268   int failed = 0;
2269   gauge_t sum = 0.0;
2270   va_list ap;
2271
2272   if (check_drop_value()) {
2273     if (record_statistics) {
2274       pthread_mutex_lock(&statistics_lock);
2275       stats_values_dropped++;
2276       pthread_mutex_unlock(&statistics_lock);
2277     }
2278     return 0;
2279   }
2280
2281   assert(template->values_len == 1);
2282
2283   /* Calculate sum for Gauge to calculate percent if needed */
2284   if (DS_TYPE_GAUGE == store_type) {
2285     va_start(ap, store_type);
2286     while (42) {
2287       char const *name;
2288       gauge_t value;
2289
2290       name = va_arg(ap, char const *);
2291       if (name == NULL)
2292         break;
2293
2294       value = va_arg(ap, gauge_t);
2295       if (!isnan(value))
2296         sum += value;
2297     }
2298     va_end(ap);
2299   }
2300
2301   vl = plugin_value_list_clone(template);
2302   /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2303   if (store_percentage)
2304     sstrncpy(vl->type, "percent", sizeof(vl->type));
2305
2306   va_start(ap, store_type);
2307   while (42) {
2308     char const *name;
2309     int status;
2310
2311     /* Set the type instance. */
2312     name = va_arg(ap, char const *);
2313     if (name == NULL)
2314       break;
2315     sstrncpy(vl->type_instance, name, sizeof(vl->type_instance));
2316
2317     /* Set the value. */
2318     switch (store_type) {
2319     case DS_TYPE_GAUGE:
2320       vl->values[0].gauge = va_arg(ap, gauge_t);
2321       if (store_percentage)
2322         vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2323       break;
2324     case DS_TYPE_ABSOLUTE:
2325       vl->values[0].absolute = va_arg(ap, absolute_t);
2326       break;
2327     case DS_TYPE_COUNTER:
2328       vl->values[0].counter = va_arg(ap, counter_t);
2329       break;
2330     case DS_TYPE_DERIVE:
2331       vl->values[0].derive = va_arg(ap, derive_t);
2332       break;
2333     default:
2334       ERROR("plugin_dispatch_multivalue: given store_type is incorrect.");
2335       failed++;
2336     }
2337
2338     status = plugin_write_enqueue(vl);
2339     if (status != 0)
2340       failed++;
2341   }
2342   va_end(ap);
2343
2344   plugin_value_list_free(vl);
2345   return failed;
2346 } /* }}} int plugin_dispatch_multivalue */
2347
2348 EXPORT int plugin_dispatch_notification(const notification_t *notif) {
2349   llentry_t *le;
2350   /* Possible TODO: Add flap detection here */
2351
2352   DEBUG("plugin_dispatch_notification: severity = %i; message = %s; "
2353         "time = %.3f; host = %s;",
2354         notif->severity, notif->message, CDTIME_T_TO_DOUBLE(notif->time),
2355         notif->host);
2356
2357   /* Nobody cares for notifications */
2358   if (list_notification == NULL)
2359     return -1;
2360
2361   le = llist_head(list_notification);
2362   while (le != NULL) {
2363     callback_func_t *cf;
2364     plugin_notification_cb callback;
2365     int status;
2366
2367     /* do not switch plugin context; rather keep the context
2368      * (interval) information of the calling plugin */
2369
2370     cf = le->value;
2371     callback = cf->cf_callback;
2372     status = (*callback)(notif, &cf->cf_udata);
2373     if (status != 0) {
2374       WARNING("plugin_dispatch_notification: Notification "
2375               "callback %s returned %i.",
2376               le->key, status);
2377     }
2378
2379     le = le->next;
2380   }
2381
2382   return 0;
2383 } /* int plugin_dispatch_notification */
2384
2385 EXPORT void plugin_log(int level, const char *format, ...) {
2386   char msg[1024];
2387   va_list ap;
2388   llentry_t *le;
2389
2390 #if !COLLECT_DEBUG
2391   if (level >= LOG_DEBUG)
2392     return;
2393 #endif
2394
2395   va_start(ap, format);
2396   vsnprintf(msg, sizeof(msg), format, ap);
2397   msg[sizeof(msg) - 1] = '\0';
2398   va_end(ap);
2399
2400   if (list_log == NULL) {
2401     fprintf(stderr, "%s\n", msg);
2402     return;
2403   }
2404
2405   le = llist_head(list_log);
2406   while (le != NULL) {
2407     callback_func_t *cf;
2408     plugin_log_cb callback;
2409
2410     cf = le->value;
2411     callback = cf->cf_callback;
2412
2413     /* do not switch plugin context; rather keep the context
2414      * (interval) information of the calling plugin */
2415
2416     (*callback)(level, msg, &cf->cf_udata);
2417
2418     le = le->next;
2419   }
2420 } /* void plugin_log */
2421
2422 void daemon_log(int level, const char *format, ...) {
2423   char msg[1024] = ""; // Size inherits from plugin_log()
2424
2425   char const *name = plugin_get_ctx().name;
2426   if (name == NULL)
2427     name = "UNKNOWN";
2428
2429   va_list ap;
2430   va_start(ap, format);
2431   vsnprintf(msg, sizeof(msg), format, ap);
2432   va_end(ap);
2433
2434   plugin_log(level, "%s plugin: %s", name, msg);
2435 } /* void daemon_log */
2436
2437 int parse_log_severity(const char *severity) {
2438   int log_level = -1;
2439
2440   if ((0 == strcasecmp(severity, "emerg")) ||
2441       (0 == strcasecmp(severity, "alert")) ||
2442       (0 == strcasecmp(severity, "crit")) || (0 == strcasecmp(severity, "err")))
2443     log_level = LOG_ERR;
2444   else if (0 == strcasecmp(severity, "warning"))
2445     log_level = LOG_WARNING;
2446   else if (0 == strcasecmp(severity, "notice"))
2447     log_level = LOG_NOTICE;
2448   else if (0 == strcasecmp(severity, "info"))
2449     log_level = LOG_INFO;
2450 #if COLLECT_DEBUG
2451   else if (0 == strcasecmp(severity, "debug"))
2452     log_level = LOG_DEBUG;
2453 #endif /* COLLECT_DEBUG */
2454
2455   return log_level;
2456 } /* int parse_log_severity */
2457
2458 EXPORT int parse_notif_severity(const char *severity) {
2459   int notif_severity = -1;
2460
2461   if (strcasecmp(severity, "FAILURE") == 0)
2462     notif_severity = NOTIF_FAILURE;
2463   else if (strcmp(severity, "OKAY") == 0)
2464     notif_severity = NOTIF_OKAY;
2465   else if ((strcmp(severity, "WARNING") == 0) ||
2466            (strcmp(severity, "WARN") == 0))
2467     notif_severity = NOTIF_WARNING;
2468
2469   return notif_severity;
2470 } /* int parse_notif_severity */
2471
2472 EXPORT const data_set_t *plugin_get_ds(const char *name) {
2473   data_set_t *ds;
2474
2475   if (data_sets == NULL) {
2476     P_ERROR("plugin_get_ds: No data sets are defined yet.");
2477     return NULL;
2478   }
2479
2480   if (c_avl_get(data_sets, name, (void *)&ds) != 0) {
2481     DEBUG("No such dataset registered: %s", name);
2482     return NULL;
2483   }
2484
2485   return ds;
2486 } /* data_set_t *plugin_get_ds */
2487
2488 static int plugin_notification_meta_add(notification_t *n, const char *name,
2489                                         enum notification_meta_type_e type,
2490                                         const void *value) {
2491   notification_meta_t *meta;
2492   notification_meta_t *tail;
2493
2494   if ((n == NULL) || (name == NULL) || (value == NULL)) {
2495     ERROR("plugin_notification_meta_add: A pointer is NULL!");
2496     return -1;
2497   }
2498
2499   meta = calloc(1, sizeof(*meta));
2500   if (meta == NULL) {
2501     ERROR("plugin_notification_meta_add: calloc failed.");
2502     return -1;
2503   }
2504
2505   sstrncpy(meta->name, name, sizeof(meta->name));
2506   meta->type = type;
2507
2508   switch (type) {
2509   case NM_TYPE_STRING: {
2510     meta->nm_value.nm_string = strdup((const char *)value);
2511     if (meta->nm_value.nm_string == NULL) {
2512       ERROR("plugin_notification_meta_add: strdup failed.");
2513       sfree(meta);
2514       return -1;
2515     }
2516     break;
2517   }
2518   case NM_TYPE_SIGNED_INT: {
2519     meta->nm_value.nm_signed_int = *((int64_t *)value);
2520     break;
2521   }
2522   case NM_TYPE_UNSIGNED_INT: {
2523     meta->nm_value.nm_unsigned_int = *((uint64_t *)value);
2524     break;
2525   }
2526   case NM_TYPE_DOUBLE: {
2527     meta->nm_value.nm_double = *((double *)value);
2528     break;
2529   }
2530   case NM_TYPE_BOOLEAN: {
2531     meta->nm_value.nm_boolean = *((bool *)value);
2532     break;
2533   }
2534   default: {
2535     ERROR("plugin_notification_meta_add: Unknown type: %i", type);
2536     sfree(meta);
2537     return -1;
2538   }
2539   } /* switch (type) */
2540
2541   meta->next = NULL;
2542   tail = n->meta;
2543   while ((tail != NULL) && (tail->next != NULL))
2544     tail = tail->next;
2545
2546   if (tail == NULL)
2547     n->meta = meta;
2548   else
2549     tail->next = meta;
2550
2551   return 0;
2552 } /* int plugin_notification_meta_add */
2553
2554 int plugin_notification_meta_add_string(notification_t *n, const char *name,
2555                                         const char *value) {
2556   return plugin_notification_meta_add(n, name, NM_TYPE_STRING, value);
2557 }
2558
2559 int plugin_notification_meta_add_signed_int(notification_t *n, const char *name,
2560                                             int64_t value) {
2561   return plugin_notification_meta_add(n, name, NM_TYPE_SIGNED_INT, &value);
2562 }
2563
2564 int plugin_notification_meta_add_unsigned_int(notification_t *n,
2565                                               const char *name,
2566                                               uint64_t value) {
2567   return plugin_notification_meta_add(n, name, NM_TYPE_UNSIGNED_INT, &value);
2568 }
2569
2570 int plugin_notification_meta_add_double(notification_t *n, const char *name,
2571                                         double value) {
2572   return plugin_notification_meta_add(n, name, NM_TYPE_DOUBLE, &value);
2573 }
2574
2575 int plugin_notification_meta_add_boolean(notification_t *n, const char *name,
2576                                          bool value) {
2577   return plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value);
2578 }
2579
2580 int plugin_notification_meta_copy(notification_t *dst,
2581                                   const notification_t *src) {
2582   assert(dst != NULL);
2583   assert(src != NULL);
2584   assert(dst != src);
2585   assert((src->meta == NULL) || (src->meta != dst->meta));
2586
2587   for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next) {
2588     if (meta->type == NM_TYPE_STRING)
2589       plugin_notification_meta_add_string(dst, meta->name,
2590                                           meta->nm_value.nm_string);
2591     else if (meta->type == NM_TYPE_SIGNED_INT)
2592       plugin_notification_meta_add_signed_int(dst, meta->name,
2593                                               meta->nm_value.nm_signed_int);
2594     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2595       plugin_notification_meta_add_unsigned_int(dst, meta->name,
2596                                                 meta->nm_value.nm_unsigned_int);
2597     else if (meta->type == NM_TYPE_DOUBLE)
2598       plugin_notification_meta_add_double(dst, meta->name,
2599                                           meta->nm_value.nm_double);
2600     else if (meta->type == NM_TYPE_BOOLEAN)
2601       plugin_notification_meta_add_boolean(dst, meta->name,
2602                                            meta->nm_value.nm_boolean);
2603   }
2604
2605   return 0;
2606 } /* int plugin_notification_meta_copy */
2607
2608 int plugin_notification_meta_free(notification_meta_t *n) {
2609   notification_meta_t *this;
2610   notification_meta_t *next;
2611
2612   if (n == NULL) {
2613     ERROR("plugin_notification_meta_free: n == NULL!");
2614     return -1;
2615   }
2616
2617   this = n;
2618   while (this != NULL) {
2619     next = this->next;
2620
2621     if (this->type == NM_TYPE_STRING) {
2622       /* Assign to a temporary variable to work around nm_string's const
2623        * modifier. */
2624       void *tmp = (void *)this->nm_value.nm_string;
2625
2626       sfree(tmp);
2627       this->nm_value.nm_string = NULL;
2628     }
2629     sfree(this);
2630
2631     this = next;
2632   }
2633
2634   return 0;
2635 } /* int plugin_notification_meta_free */
2636
2637 static void plugin_ctx_destructor(void *ctx) {
2638   sfree(ctx);
2639 } /* void plugin_ctx_destructor */
2640
2641 static plugin_ctx_t ctx_init = {/* interval = */ 0};
2642
2643 static plugin_ctx_t *plugin_ctx_create(void) {
2644   plugin_ctx_t *ctx;
2645
2646   ctx = malloc(sizeof(*ctx));
2647   if (ctx == NULL) {
2648     ERROR("Failed to allocate plugin context: %s", STRERRNO);
2649     return NULL;
2650   }
2651
2652   *ctx = ctx_init;
2653   assert(plugin_ctx_key_initialized);
2654   pthread_setspecific(plugin_ctx_key, ctx);
2655   DEBUG("Created new plugin context.");
2656   return ctx;
2657 } /* int plugin_ctx_create */
2658
2659 EXPORT void plugin_init_ctx(void) {
2660   pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor);
2661   plugin_ctx_key_initialized = true;
2662 } /* void plugin_init_ctx */
2663
2664 EXPORT plugin_ctx_t plugin_get_ctx(void) {
2665   plugin_ctx_t *ctx;
2666
2667   assert(plugin_ctx_key_initialized);
2668   ctx = pthread_getspecific(plugin_ctx_key);
2669
2670   if (ctx == NULL) {
2671     ctx = plugin_ctx_create();
2672     /* this must no happen -- exit() instead? */
2673     if (ctx == NULL)
2674       return ctx_init;
2675   }
2676
2677   return *ctx;
2678 } /* plugin_ctx_t plugin_get_ctx */
2679
2680 EXPORT plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
2681   plugin_ctx_t *c;
2682   plugin_ctx_t old;
2683
2684   assert(plugin_ctx_key_initialized);
2685   c = pthread_getspecific(plugin_ctx_key);
2686
2687   if (c == NULL) {
2688     c = plugin_ctx_create();
2689     /* this must no happen -- exit() instead? */
2690     if (c == NULL)
2691       return ctx_init;
2692   }
2693
2694   old = *c;
2695   *c = ctx;
2696
2697   return old;
2698 } /* void plugin_set_ctx */
2699
2700 EXPORT cdtime_t plugin_get_interval(void) {
2701   cdtime_t interval;
2702
2703   interval = plugin_get_ctx().interval;
2704   if (interval > 0)
2705     return interval;
2706
2707   P_ERROR("plugin_get_interval: Unable to determine Interval from context.");
2708
2709   return cf_get_default_interval();
2710 } /* cdtime_t plugin_get_interval */
2711
2712 typedef struct {
2713   plugin_ctx_t ctx;
2714   void *(*start_routine)(void *);
2715   void *arg;
2716 } plugin_thread_t;
2717
2718 static void *plugin_thread_start(void *arg) {
2719   plugin_thread_t *plugin_thread = arg;
2720
2721   void *(*start_routine)(void *) = plugin_thread->start_routine;
2722   void *plugin_arg = plugin_thread->arg;
2723
2724   plugin_set_ctx(plugin_thread->ctx);
2725
2726   sfree(plugin_thread);
2727
2728   return start_routine(plugin_arg);
2729 } /* void *plugin_thread_start */
2730
2731 int plugin_thread_create(pthread_t *thread, const pthread_attr_t *attr,
2732                          void *(*start_routine)(void *), void *arg,
2733                          char const *name) {
2734   plugin_thread_t *plugin_thread;
2735
2736   plugin_thread = malloc(sizeof(*plugin_thread));
2737   if (plugin_thread == NULL)
2738     return ENOMEM;
2739
2740   plugin_thread->ctx = plugin_get_ctx();
2741   plugin_thread->start_routine = start_routine;
2742   plugin_thread->arg = arg;
2743
2744   int ret = pthread_create(thread, attr, plugin_thread_start, plugin_thread);
2745   if (ret != 0) {
2746     sfree(plugin_thread);
2747     return ret;
2748   }
2749
2750   if (name != NULL)
2751     set_thread_name(*thread, name);
2752
2753   return 0;
2754 } /* int plugin_thread_create */