302c777c864e082322e680754d09b3646fee713a
[collectd.git] / src / daemon / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27
28 /* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
29 #define _GNU_SOURCE
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "configfile.h"
35 #include "filter_chain.h"
36 #include "plugin.h"
37 #include "utils_avltree.h"
38 #include "utils_cache.h"
39 #include "utils_complain.h"
40 #include "utils_heap.h"
41 #include "utils_llist.h"
42 #include "utils_random.h"
43 #include "utils_time.h"
44
45 #if HAVE_PTHREAD_NP_H
46 #include <pthread_np.h> /* for pthread_set_name_np(3) */
47 #endif
48
49 #include <ltdl.h>
50
51 /*
52  * Private structures
53  */
54 struct callback_func_s {
55   void *cf_callback;
56   user_data_t cf_udata;
57   plugin_ctx_t cf_ctx;
58 };
59 typedef struct callback_func_s callback_func_t;
60
61 #define RF_SIMPLE 0
62 #define RF_COMPLEX 1
63 #define RF_REMOVE 65535
64 struct read_func_s {
65 /* `read_func_t' "inherits" from `callback_func_t'.
66  * The `rf_super' member MUST be the first one in this structure! */
67 #define rf_callback rf_super.cf_callback
68 #define rf_udata rf_super.cf_udata
69 #define rf_ctx rf_super.cf_ctx
70   callback_func_t rf_super;
71   char rf_group[DATA_MAX_NAME_LEN];
72   char *rf_name;
73   int rf_type;
74   cdtime_t rf_interval;
75   cdtime_t rf_effective_interval;
76   cdtime_t rf_next_read;
77 };
78 typedef struct read_func_s read_func_t;
79
80 struct write_queue_s;
81 typedef struct write_queue_s write_queue_t;
82 struct write_queue_s {
83   value_list_t *vl;
84   plugin_ctx_t ctx;
85   write_queue_t *next;
86 };
87
88 struct flush_callback_s {
89   char *name;
90   cdtime_t timeout;
91 };
92 typedef struct flush_callback_s flush_callback_t;
93
94 /*
95  * Private variables
96  */
97 static c_avl_tree_t *plugins_loaded = NULL;
98
99 static llist_t *list_init;
100 static llist_t *list_write;
101 static llist_t *list_flush;
102 static llist_t *list_missing;
103 static llist_t *list_shutdown;
104 static llist_t *list_log;
105 static llist_t *list_notification;
106
107 static fc_chain_t *pre_cache_chain = NULL;
108 static fc_chain_t *post_cache_chain = NULL;
109
110 static c_avl_tree_t *data_sets;
111
112 static char *plugindir = NULL;
113
114 #ifndef DEFAULT_MAX_READ_INTERVAL
115 #define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
116 #endif
117 static c_heap_t *read_heap = NULL;
118 static llist_t *read_list;
119 static int read_loop = 1;
120 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
121 static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
122 static pthread_t *read_threads = NULL;
123 static size_t read_threads_num = 0;
124 static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
125
126 static write_queue_t *write_queue_head;
127 static write_queue_t *write_queue_tail;
128 static long write_queue_length = 0;
129 static _Bool write_loop = 1;
130 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
131 static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
132 static pthread_t *write_threads = NULL;
133 static size_t write_threads_num = 0;
134
135 static pthread_key_t plugin_ctx_key;
136 static _Bool plugin_ctx_key_initialized = 0;
137
138 static long write_limit_high = 0;
139 static long write_limit_low = 0;
140
141 static derive_t stats_values_dropped = 0;
142 static _Bool record_statistics = 0;
143
144 /*
145  * Static functions
146  */
147 static int plugin_dispatch_values_internal(value_list_t *vl);
148
149 static const char *plugin_get_dir(void) {
150   if (plugindir == NULL)
151     return (PLUGINDIR);
152   else
153     return (plugindir);
154 }
155
156 static int plugin_update_internal_statistics(void) { /* {{{ */
157   gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
158
159   /* Initialize `vl' */
160   value_list_t vl = VALUE_LIST_INIT;
161   sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
162
163   /* Write queue */
164   sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
165
166   /* Write queue : queue length */
167   vl.values = &(value_t){.gauge = copy_write_queue_length};
168   vl.values_len = 1;
169   sstrncpy(vl.type, "queue_length", sizeof(vl.type));
170   vl.type_instance[0] = 0;
171   plugin_dispatch_values(&vl);
172
173   /* Write queue : Values dropped (queue length > low limit) */
174   vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
175   vl.values_len = 1;
176   sstrncpy(vl.type, "derive", sizeof(vl.type));
177   sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
178   plugin_dispatch_values(&vl);
179
180   /* Cache */
181   sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
182
183   /* Cache : Nb entry in cache tree */
184   vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
185   vl.values_len = 1;
186   sstrncpy(vl.type, "cache_size", sizeof(vl.type));
187   vl.type_instance[0] = 0;
188   plugin_dispatch_values(&vl);
189
190   return 0;
191 } /* }}} int plugin_update_internal_statistics */
192
193 static void destroy_callback(callback_func_t *cf) /* {{{ */
194 {
195   if (cf == NULL)
196     return;
197
198   if ((cf->cf_udata.data != NULL) && (cf->cf_udata.free_func != NULL)) {
199     cf->cf_udata.free_func(cf->cf_udata.data);
200     cf->cf_udata.data = NULL;
201     cf->cf_udata.free_func = NULL;
202   }
203   sfree(cf);
204 } /* }}} void destroy_callback */
205
206 static void destroy_all_callbacks(llist_t **list) /* {{{ */
207 {
208   llentry_t *le;
209
210   if (*list == NULL)
211     return;
212
213   le = llist_head(*list);
214   while (le != NULL) {
215     llentry_t *le_next;
216
217     le_next = le->next;
218
219     sfree(le->key);
220     destroy_callback(le->value);
221     le->value = NULL;
222
223     le = le_next;
224   }
225
226   llist_destroy(*list);
227   *list = NULL;
228 } /* }}} void destroy_all_callbacks */
229
230 static void destroy_read_heap(void) /* {{{ */
231 {
232   if (read_heap == NULL)
233     return;
234
235   while (42) {
236     read_func_t *rf;
237
238     rf = c_heap_get_root(read_heap);
239     if (rf == NULL)
240       break;
241     sfree(rf->rf_name);
242     destroy_callback((callback_func_t *)rf);
243   }
244
245   c_heap_destroy(read_heap);
246   read_heap = NULL;
247 } /* }}} void destroy_read_heap */
248
249 static int register_callback(llist_t **list, /* {{{ */
250                              const char *name, callback_func_t *cf) {
251   llentry_t *le;
252   char *key;
253
254   if (*list == NULL) {
255     *list = llist_create();
256     if (*list == NULL) {
257       ERROR("plugin: register_callback: "
258             "llist_create failed.");
259       destroy_callback(cf);
260       return (-1);
261     }
262   }
263
264   key = strdup(name);
265   if (key == NULL) {
266     ERROR("plugin: register_callback: strdup failed.");
267     destroy_callback(cf);
268     return (-1);
269   }
270
271   le = llist_search(*list, name);
272   if (le == NULL) {
273     le = llentry_create(key, cf);
274     if (le == NULL) {
275       ERROR("plugin: register_callback: "
276             "llentry_create failed.");
277       sfree(key);
278       destroy_callback(cf);
279       return (-1);
280     }
281
282     llist_append(*list, le);
283   } else {
284     callback_func_t *old_cf;
285
286     old_cf = le->value;
287     le->value = cf;
288
289     WARNING("plugin: register_callback: "
290             "a callback named `%s' already exists - "
291             "overwriting the old entry!",
292             name);
293
294     destroy_callback(old_cf);
295     sfree(key);
296   }
297
298   return (0);
299 } /* }}} int register_callback */
300
301 static void log_list_callbacks(llist_t **list, /* {{{ */
302                                const char *comment) {
303   char *str;
304   int len;
305   int i;
306   llentry_t *le;
307   int n;
308   char **keys;
309
310   n = llist_size(*list);
311   if (n == 0) {
312     INFO("%s [none]", comment);
313     return;
314   }
315
316   keys = calloc(n, sizeof(char *));
317
318   if (keys == NULL) {
319     ERROR("%s: failed to allocate memory for list of callbacks", comment);
320
321     return;
322   }
323
324   for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) {
325     keys[i] = le->key;
326     len += strlen(le->key) + 6;
327   }
328   str = malloc(len + 10);
329   if (str == NULL) {
330     ERROR("%s: failed to allocate memory for list of callbacks", comment);
331   } else {
332     *str = '\0';
333     strjoin(str, len, keys, n, "', '");
334     INFO("%s ['%s']", comment, str);
335     sfree(str);
336   }
337   sfree(keys);
338 } /* }}} void log_list_callbacks */
339
340 static int create_register_callback(llist_t **list, /* {{{ */
341                                     const char *name, void *callback,
342                                     user_data_t const *ud) {
343   callback_func_t *cf;
344
345   cf = calloc(1, sizeof(*cf));
346   if (cf == NULL) {
347     ERROR("plugin: create_register_callback: calloc failed.");
348     return (-1);
349   }
350
351   cf->cf_callback = callback;
352   if (ud == NULL) {
353     cf->cf_udata.data = NULL;
354     cf->cf_udata.free_func = NULL;
355   } else {
356     cf->cf_udata = *ud;
357   }
358
359   cf->cf_ctx = plugin_get_ctx();
360
361   return (register_callback(list, name, cf));
362 } /* }}} int create_register_callback */
363
364 static int plugin_unregister(llist_t *list, const char *name) /* {{{ */
365 {
366   llentry_t *e;
367
368   if (list == NULL)
369     return (-1);
370
371   e = llist_search(list, name);
372   if (e == NULL)
373     return (-1);
374
375   llist_remove(list, e);
376
377   sfree(e->key);
378   destroy_callback(e->value);
379
380   llentry_destroy(e);
381
382   return (0);
383 } /* }}} int plugin_unregister */
384
385 /*
386  * (Try to) load the shared object `file'. Won't complain if it isn't a shared
387  * object, but it will bitch about a shared object not having a
388  * ``module_register'' symbol..
389  */
390 static int plugin_load_file(char *file, uint32_t flags) {
391   lt_dlhandle dlh;
392   void (*reg_handle)(void);
393
394   lt_dlinit();
395   lt_dlerror(); /* clear errors */
396
397 #if LIBTOOL_VERSION == 2
398   if (flags & PLUGIN_FLAGS_GLOBAL) {
399     lt_dladvise advise;
400     lt_dladvise_init(&advise);
401     lt_dladvise_global(&advise);
402     dlh = lt_dlopenadvise(file, advise);
403     lt_dladvise_destroy(&advise);
404   } else {
405     dlh = lt_dlopen(file);
406   }
407 #else /* if LIBTOOL_VERSION == 1 */
408   if (flags & PLUGIN_FLAGS_GLOBAL)
409     WARNING("plugin_load_file: The global flag is not supported, "
410             "libtool 2 is required for this.");
411   dlh = lt_dlopen(file);
412 #endif
413
414   if (dlh == NULL) {
415     char errbuf[1024] = "";
416
417     ssnprintf(errbuf, sizeof(errbuf),
418               "lt_dlopen (\"%s\") failed: %s. "
419               "The most common cause for this problem is "
420               "missing dependencies. Use ldd(1) to check "
421               "the dependencies of the plugin "
422               "/ shared object.",
423               file, lt_dlerror());
424
425     ERROR("%s", errbuf);
426     /* Make sure this is printed to STDERR in any case, but also
427      * make sure it's printed only once. */
428     if (list_log != NULL)
429       fprintf(stderr, "ERROR: %s\n", errbuf);
430
431     return (1);
432   }
433
434   if ((reg_handle = (void (*)(void))lt_dlsym(dlh, "module_register")) == NULL) {
435     WARNING("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file,
436             lt_dlerror());
437     lt_dlclose(dlh);
438     return (-1);
439   }
440
441   (*reg_handle)();
442
443   return (0);
444 }
445
446 static void *plugin_read_thread(void __attribute__((unused)) * args) {
447   while (read_loop != 0) {
448     read_func_t *rf;
449     plugin_ctx_t old_ctx;
450     cdtime_t start;
451     cdtime_t now;
452     cdtime_t elapsed;
453     int status;
454     int rf_type;
455     int rc;
456
457     /* Get the read function that needs to be read next.
458      * We don't need to hold "read_lock" for the heap, but we need
459      * to call c_heap_get_root() and pthread_cond_wait() in the
460      * same protected block. */
461     pthread_mutex_lock(&read_lock);
462     rf = c_heap_get_root(read_heap);
463     if (rf == NULL) {
464       pthread_cond_wait(&read_cond, &read_lock);
465       pthread_mutex_unlock(&read_lock);
466       continue;
467     }
468     pthread_mutex_unlock(&read_lock);
469
470     if (rf->rf_interval == 0) {
471       /* this should not happen, because the interval is set
472        * for each plugin when loading it
473        * XXX: issue a warning? */
474       rf->rf_interval = plugin_get_interval();
475       rf->rf_effective_interval = rf->rf_interval;
476
477       rf->rf_next_read = cdtime();
478     }
479
480     /* sleep until this entry is due,
481      * using pthread_cond_timedwait */
482     pthread_mutex_lock(&read_lock);
483     /* In pthread_cond_timedwait, spurious wakeups are possible
484      * (and really happen, at least on NetBSD with > 1 CPU), thus
485      * we need to re-evaluate the condition every time
486      * pthread_cond_timedwait returns. */
487     rc = 0;
488     while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
489       rc = pthread_cond_timedwait(&read_cond, &read_lock,
490                                   &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
491     }
492
493     /* Must hold `read_lock' when accessing `rf->rf_type'. */
494     rf_type = rf->rf_type;
495     pthread_mutex_unlock(&read_lock);
496
497     /* Check if we're supposed to stop.. This may have interrupted
498      * the sleep, too. */
499     if (read_loop == 0) {
500       /* Insert `rf' again, so it can be free'd correctly */
501       c_heap_insert(read_heap, rf);
502       break;
503     }
504
505     /* The entry has been marked for deletion. The linked list
506      * entry has already been removed by `plugin_unregister_read'.
507      * All we have to do here is free the `read_func_t' and
508      * continue. */
509     if (rf_type == RF_REMOVE) {
510       DEBUG("plugin_read_thread: Destroying the `%s' "
511             "callback.",
512             rf->rf_name);
513       sfree(rf->rf_name);
514       destroy_callback((callback_func_t *)rf);
515       rf = NULL;
516       continue;
517     }
518
519     DEBUG("plugin_read_thread: Handling `%s'.", rf->rf_name);
520
521     start = cdtime();
522
523     old_ctx = plugin_set_ctx(rf->rf_ctx);
524
525     if (rf_type == RF_SIMPLE) {
526       int (*callback)(void);
527
528       callback = rf->rf_callback;
529       status = (*callback)();
530     } else {
531       plugin_read_cb callback;
532
533       assert(rf_type == RF_COMPLEX);
534
535       callback = rf->rf_callback;
536       status = (*callback)(&rf->rf_udata);
537     }
538
539     plugin_set_ctx(old_ctx);
540
541     /* If the function signals failure, we will increase the
542      * intervals in which it will be called. */
543     if (status != 0) {
544       rf->rf_effective_interval *= 2;
545       if (rf->rf_effective_interval > max_read_interval)
546         rf->rf_effective_interval = max_read_interval;
547
548       NOTICE("read-function of plugin `%s' failed. "
549              "Will suspend it for %.3f seconds.",
550              rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
551     } else {
552       /* Success: Restore the interval, if it was changed. */
553       rf->rf_effective_interval = rf->rf_interval;
554     }
555
556     /* update the ``next read due'' field */
557     now = cdtime();
558
559     /* calculate the time spent in the read function */
560     elapsed = (now - start);
561
562     if (elapsed > rf->rf_effective_interval)
563       WARNING(
564           "plugin_read_thread: read-function of the `%s' plugin took %.3f "
565           "seconds, which is above its read interval (%.3f seconds). You might "
566           "want to adjust the `Interval' or `ReadThreads' settings.",
567           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
568           CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
569
570     DEBUG("plugin_read_thread: read-function of the `%s' plugin took "
571           "%.6f seconds.",
572           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
573
574     DEBUG("plugin_read_thread: Effective interval of the "
575           "`%s' plugin is %.3f seconds.",
576           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
577
578     /* Calculate the next (absolute) time at which this function
579      * should be called. */
580     rf->rf_next_read += rf->rf_effective_interval;
581
582     /* Check, if `rf_next_read' is in the past. */
583     if (rf->rf_next_read < now) {
584       /* `rf_next_read' is in the past. Insert `now'
585        * so this value doesn't trail off into the
586        * past too much. */
587       rf->rf_next_read = now;
588     }
589
590     DEBUG("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
591           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_next_read));
592
593     /* Re-insert this read function into the heap again. */
594     c_heap_insert(read_heap, rf);
595   } /* while (read_loop) */
596
597   pthread_exit(NULL);
598   return ((void *)0);
599 } /* void *plugin_read_thread */
600
601 #ifdef PTHREAD_MAX_NAMELEN_NP
602 #define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
603 #else
604 #define THREAD_NAME_MAX 16
605 #endif
606
607 static void set_thread_name(pthread_t tid, char const *name) {
608 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
609
610   /* glibc limits the length of the name and fails if the passed string
611    * is too long, so we truncate it here. */
612   char n[THREAD_NAME_MAX];
613   if (strlen(name) >= THREAD_NAME_MAX)
614     WARNING("set_thread_name(\"%s\"): name too long", name);
615   sstrncpy(n, name, sizeof(n));
616
617 #if defined(HAVE_PTHREAD_SETNAME_NP)
618   int status = pthread_setname_np(tid, n);
619   if (status != 0) {
620     char errbuf[1024];
621     ERROR("set_thread_name(\"%s\"): %s", n,
622           sstrerror(status, errbuf, sizeof(errbuf)));
623   }
624 #else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
625   pthread_set_name_np(tid, n);
626 #endif
627
628 #endif
629 }
630
631 static void start_read_threads(size_t num) /* {{{ */
632 {
633   if (read_threads != NULL)
634     return;
635
636   read_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
637   if (read_threads == NULL) {
638     ERROR("plugin: start_read_threads: calloc failed.");
639     return;
640   }
641
642   read_threads_num = 0;
643   for (size_t i = 0; i < num; i++) {
644     int status = pthread_create(read_threads + read_threads_num,
645                                 /* attr = */ NULL, plugin_read_thread,
646                                 /* arg = */ NULL);
647     if (status != 0) {
648       char errbuf[1024];
649       ERROR("plugin: start_read_threads: pthread_create failed "
650             "with status %i (%s).",
651             status, sstrerror(status, errbuf, sizeof(errbuf)));
652       return;
653     }
654
655     char name[THREAD_NAME_MAX];
656     ssnprintf(name, sizeof(name), "reader#%zu", read_threads_num);
657     set_thread_name(read_threads[read_threads_num], name);
658
659     read_threads_num++;
660   } /* for (i) */
661 } /* }}} void start_read_threads */
662
663 static void stop_read_threads(void) {
664   if (read_threads == NULL)
665     return;
666
667   INFO("collectd: Stopping %zu read threads.", read_threads_num);
668
669   pthread_mutex_lock(&read_lock);
670   read_loop = 0;
671   DEBUG("plugin: stop_read_threads: Signalling `read_cond'");
672   pthread_cond_broadcast(&read_cond);
673   pthread_mutex_unlock(&read_lock);
674
675   for (size_t i = 0; i < read_threads_num; i++) {
676     if (pthread_join(read_threads[i], NULL) != 0) {
677       ERROR("plugin: stop_read_threads: pthread_join failed.");
678     }
679     read_threads[i] = (pthread_t)0;
680   }
681   sfree(read_threads);
682   read_threads_num = 0;
683 } /* void stop_read_threads */
684
685 static void plugin_value_list_free(value_list_t *vl) /* {{{ */
686 {
687   if (vl == NULL)
688     return;
689
690   meta_data_destroy(vl->meta);
691   sfree(vl->values);
692   sfree(vl);
693 } /* }}} void plugin_value_list_free */
694
695 static value_list_t *
696 plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */
697 {
698   value_list_t *vl;
699
700   if (vl_orig == NULL)
701     return (NULL);
702
703   vl = malloc(sizeof(*vl));
704   if (vl == NULL)
705     return (NULL);
706   memcpy(vl, vl_orig, sizeof(*vl));
707
708   if (vl->host[0] == 0)
709     sstrncpy(vl->host, hostname_g, sizeof(vl->host));
710
711   vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
712   if (vl->values == NULL) {
713     plugin_value_list_free(vl);
714     return (NULL);
715   }
716   memcpy(vl->values, vl_orig->values,
717          vl_orig->values_len * sizeof(*vl->values));
718
719   vl->meta = meta_data_clone(vl->meta);
720   if ((vl_orig->meta != NULL) && (vl->meta == NULL)) {
721     plugin_value_list_free(vl);
722     return (NULL);
723   }
724
725   if (vl->time == 0)
726     vl->time = cdtime();
727
728   /* Fill in the interval from the thread context, if it is zero. */
729   if (vl->interval == 0) {
730     plugin_ctx_t ctx = plugin_get_ctx();
731
732     if (ctx.interval != 0)
733       vl->interval = ctx.interval;
734     else {
735       char name[6 * DATA_MAX_NAME_LEN];
736       FORMAT_VL(name, sizeof(name), vl);
737       ERROR("plugin_value_list_clone: Unable to determine "
738             "interval from context for "
739             "value list \"%s\". "
740             "This indicates a broken plugin. "
741             "Please report this problem to the "
742             "collectd mailing list or at "
743             "<http://collectd.org/bugs/>.",
744             name);
745       vl->interval = cf_get_default_interval();
746     }
747   }
748
749   return (vl);
750 } /* }}} value_list_t *plugin_value_list_clone */
751
752 static int plugin_write_enqueue(value_list_t const *vl) /* {{{ */
753 {
754   write_queue_t *q;
755
756   q = malloc(sizeof(*q));
757   if (q == NULL)
758     return (ENOMEM);
759   q->next = NULL;
760
761   q->vl = plugin_value_list_clone(vl);
762   if (q->vl == NULL) {
763     sfree(q);
764     return (ENOMEM);
765   }
766
767   /* Store context of caller (read plugin); otherwise, it would not be
768    * available to the write plugins when actually dispatching the
769    * value-list later on. */
770   q->ctx = plugin_get_ctx();
771
772   pthread_mutex_lock(&write_lock);
773
774   if (write_queue_tail == NULL) {
775     write_queue_head = q;
776     write_queue_tail = q;
777     write_queue_length = 1;
778   } else {
779     write_queue_tail->next = q;
780     write_queue_tail = q;
781     write_queue_length += 1;
782   }
783
784   pthread_cond_signal(&write_cond);
785   pthread_mutex_unlock(&write_lock);
786
787   return (0);
788 } /* }}} int plugin_write_enqueue */
789
790 static value_list_t *plugin_write_dequeue(void) /* {{{ */
791 {
792   write_queue_t *q;
793   value_list_t *vl;
794
795   pthread_mutex_lock(&write_lock);
796
797   while (write_loop && (write_queue_head == NULL))
798     pthread_cond_wait(&write_cond, &write_lock);
799
800   if (write_queue_head == NULL) {
801     pthread_mutex_unlock(&write_lock);
802     return (NULL);
803   }
804
805   q = write_queue_head;
806   write_queue_head = q->next;
807   write_queue_length -= 1;
808   if (write_queue_head == NULL) {
809     write_queue_tail = NULL;
810     assert(0 == write_queue_length);
811   }
812
813   pthread_mutex_unlock(&write_lock);
814
815   (void)plugin_set_ctx(q->ctx);
816
817   vl = q->vl;
818   sfree(q);
819   return (vl);
820 } /* }}} value_list_t *plugin_write_dequeue */
821
822 static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */
823 {
824   while (write_loop) {
825     value_list_t *vl = plugin_write_dequeue();
826     if (vl == NULL)
827       continue;
828
829     plugin_dispatch_values_internal(vl);
830
831     plugin_value_list_free(vl);
832   }
833
834   pthread_exit(NULL);
835   return ((void *)0);
836 } /* }}} void *plugin_write_thread */
837
838 static void start_write_threads(size_t num) /* {{{ */
839 {
840   if (write_threads != NULL)
841     return;
842
843   write_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
844   if (write_threads == NULL) {
845     ERROR("plugin: start_write_threads: calloc failed.");
846     return;
847   }
848
849   write_threads_num = 0;
850   for (size_t i = 0; i < num; i++) {
851     int status = pthread_create(write_threads + write_threads_num,
852                                 /* attr = */ NULL, plugin_write_thread,
853                                 /* arg = */ NULL);
854     if (status != 0) {
855       char errbuf[1024];
856       ERROR("plugin: start_write_threads: pthread_create failed "
857             "with status %i (%s).",
858             status, sstrerror(status, errbuf, sizeof(errbuf)));
859       return;
860     }
861
862     char name[THREAD_NAME_MAX];
863     ssnprintf(name, sizeof(name), "writer#%zu", write_threads_num);
864     set_thread_name(write_threads[write_threads_num], name);
865
866     write_threads_num++;
867   } /* for (i) */
868 } /* }}} void start_write_threads */
869
870 static void stop_write_threads(void) /* {{{ */
871 {
872   write_queue_t *q;
873   size_t i;
874
875   if (write_threads == NULL)
876     return;
877
878   INFO("collectd: Stopping %zu write threads.", write_threads_num);
879
880   pthread_mutex_lock(&write_lock);
881   write_loop = 0;
882   DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
883   pthread_cond_broadcast(&write_cond);
884   pthread_mutex_unlock(&write_lock);
885
886   for (i = 0; i < write_threads_num; i++) {
887     if (pthread_join(write_threads[i], NULL) != 0) {
888       ERROR("plugin: stop_write_threads: pthread_join failed.");
889     }
890     write_threads[i] = (pthread_t)0;
891   }
892   sfree(write_threads);
893   write_threads_num = 0;
894
895   pthread_mutex_lock(&write_lock);
896   i = 0;
897   for (q = write_queue_head; q != NULL;) {
898     write_queue_t *q1 = q;
899     plugin_value_list_free(q->vl);
900     q = q->next;
901     sfree(q1);
902     i++;
903   }
904   write_queue_head = NULL;
905   write_queue_tail = NULL;
906   write_queue_length = 0;
907   pthread_mutex_unlock(&write_lock);
908
909   if (i > 0) {
910     WARNING("plugin: %zu value list%s left after shutting down "
911             "the write threads.",
912             i, (i == 1) ? " was" : "s were");
913   }
914 } /* }}} void stop_write_threads */
915
916 /*
917  * Public functions
918  */
919 void plugin_set_dir(const char *dir) {
920   sfree(plugindir);
921
922   if (dir == NULL) {
923     plugindir = NULL;
924     return;
925   }
926
927   plugindir = strdup(dir);
928   if (plugindir == NULL)
929     ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
930 }
931
932 static _Bool plugin_is_loaded(char const *name) {
933   int status;
934
935   if (plugins_loaded == NULL)
936     plugins_loaded =
937         c_avl_create((int (*)(const void *, const void *))strcasecmp);
938   assert(plugins_loaded != NULL);
939
940   status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
941   return (status == 0);
942 }
943
944 static int plugin_mark_loaded(char const *name) {
945   char *name_copy;
946   int status;
947
948   name_copy = strdup(name);
949   if (name_copy == NULL)
950     return (ENOMEM);
951
952   status = c_avl_insert(plugins_loaded,
953                         /* key = */ name_copy, /* value = */ NULL);
954   return (status);
955 }
956
957 static void plugin_free_loaded(void) {
958   void *key;
959   void *value;
960
961   if (plugins_loaded == NULL)
962     return;
963
964   while (c_avl_pick(plugins_loaded, &key, &value) == 0) {
965     sfree(key);
966     assert(value == NULL);
967   }
968
969   c_avl_destroy(plugins_loaded);
970   plugins_loaded = NULL;
971 }
972
973 #define BUFSIZE 512
974 int plugin_load(char const *plugin_name, uint32_t flags) {
975   DIR *dh;
976   const char *dir;
977   char filename[BUFSIZE] = "";
978   char typename[BUFSIZE];
979   int ret;
980   struct stat statbuf;
981   struct dirent *de;
982   int status;
983
984   if (plugin_name == NULL)
985     return (EINVAL);
986
987   /* Check if plugin is already loaded and don't do anything in this
988    * case. */
989   if (plugin_is_loaded(plugin_name))
990     return (0);
991
992   dir = plugin_get_dir();
993   ret = 1;
994
995   /*
996    * XXX: Magic at work:
997    *
998    * Some of the language bindings, for example the Python and Perl
999    * plugins, need to be able to export symbols to the scripts they run.
1000    * For this to happen, the "Globals" flag needs to be set.
1001    * Unfortunately, this technical detail is hard to explain to the
1002    * average user and she shouldn't have to worry about this, ideally.
1003    * So in order to save everyone's sanity use a different default for a
1004    * handful of special plugins. --octo
1005    */
1006   if ((strcasecmp("perl", plugin_name) == 0) ||
1007       (strcasecmp("python", plugin_name) == 0))
1008     flags |= PLUGIN_FLAGS_GLOBAL;
1009
1010   /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
1011    * type when matching the filename */
1012   status = ssnprintf(typename, sizeof(typename), "%s.so", plugin_name);
1013   if ((status < 0) || ((size_t)status >= sizeof(typename))) {
1014     WARNING("plugin_load: Filename too long: \"%s.so\"", plugin_name);
1015     return (-1);
1016   }
1017
1018   if ((dh = opendir(dir)) == NULL) {
1019     char errbuf[1024];
1020     ERROR("plugin_load: opendir (%s) failed: %s", dir,
1021           sstrerror(errno, errbuf, sizeof(errbuf)));
1022     return (-1);
1023   }
1024
1025   while ((de = readdir(dh)) != NULL) {
1026     if (strcasecmp(de->d_name, typename))
1027       continue;
1028
1029     status = ssnprintf(filename, sizeof(filename), "%s/%s", dir, de->d_name);
1030     if ((status < 0) || ((size_t)status >= sizeof(filename))) {
1031       WARNING("plugin_load: Filename too long: \"%s/%s\"", dir, de->d_name);
1032       continue;
1033     }
1034
1035     if (lstat(filename, &statbuf) == -1) {
1036       char errbuf[1024];
1037       WARNING("plugin_load: stat (\"%s\") failed: %s", filename,
1038               sstrerror(errno, errbuf, sizeof(errbuf)));
1039       continue;
1040     } else if (!S_ISREG(statbuf.st_mode)) {
1041       /* don't follow symlinks */
1042       WARNING("plugin_load: %s is not a regular file.", filename);
1043       continue;
1044     }
1045
1046     status = plugin_load_file(filename, flags);
1047     if (status == 0) {
1048       /* success */
1049       plugin_mark_loaded(plugin_name);
1050       ret = 0;
1051       INFO("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1052       break;
1053     } else {
1054       ERROR("plugin_load: Load plugin \"%s\" failed with "
1055             "status %i.",
1056             plugin_name, status);
1057     }
1058   }
1059
1060   closedir(dh);
1061
1062   if (filename[0] == 0)
1063     ERROR("plugin_load: Could not find plugin \"%s\" in %s", plugin_name, dir);
1064
1065   return (ret);
1066 }
1067
1068 /*
1069  * The `register_*' functions follow
1070  */
1071 int plugin_register_config(const char *name,
1072                            int (*callback)(const char *key, const char *val),
1073                            const char **keys, int keys_num) {
1074   cf_register(name, callback, keys, keys_num);
1075   return (0);
1076 } /* int plugin_register_config */
1077
1078 int plugin_register_complex_config(const char *type,
1079                                    int (*callback)(oconfig_item_t *)) {
1080   return (cf_register_complex(type, callback));
1081 } /* int plugin_register_complex_config */
1082
1083 int plugin_register_init(const char *name, int (*callback)(void)) {
1084   return (create_register_callback(&list_init, name, (void *)callback,
1085                                    /* user_data = */ NULL));
1086 } /* plugin_register_init */
1087
1088 static int plugin_compare_read_func(const void *arg0, const void *arg1) {
1089   const read_func_t *rf0;
1090   const read_func_t *rf1;
1091
1092   rf0 = arg0;
1093   rf1 = arg1;
1094
1095   if (rf0->rf_next_read < rf1->rf_next_read)
1096     return (-1);
1097   else if (rf0->rf_next_read > rf1->rf_next_read)
1098     return (1);
1099   else
1100     return (0);
1101 } /* int plugin_compare_read_func */
1102
1103 /* Add a read function to both, the heap and a linked list. The linked list if
1104  * used to look-up read functions, especially for the remove function. The heap
1105  * is used to determine which plugin to read next. */
1106 static int plugin_insert_read(read_func_t *rf) {
1107   int status;
1108   llentry_t *le;
1109
1110   rf->rf_next_read = cdtime();
1111   rf->rf_effective_interval = rf->rf_interval;
1112
1113   pthread_mutex_lock(&read_lock);
1114
1115   if (read_list == NULL) {
1116     read_list = llist_create();
1117     if (read_list == NULL) {
1118       pthread_mutex_unlock(&read_lock);
1119       ERROR("plugin_insert_read: read_list failed.");
1120       return (-1);
1121     }
1122   }
1123
1124   if (read_heap == NULL) {
1125     read_heap = c_heap_create(plugin_compare_read_func);
1126     if (read_heap == NULL) {
1127       pthread_mutex_unlock(&read_lock);
1128       ERROR("plugin_insert_read: c_heap_create failed.");
1129       return (-1);
1130     }
1131   }
1132
1133   le = llist_search(read_list, rf->rf_name);
1134   if (le != NULL) {
1135     pthread_mutex_unlock(&read_lock);
1136     WARNING("The read function \"%s\" is already registered. "
1137             "Check for duplicate \"LoadPlugin\" lines "
1138             "in your configuration!",
1139             rf->rf_name);
1140     return (EINVAL);
1141   }
1142
1143   le = llentry_create(rf->rf_name, rf);
1144   if (le == NULL) {
1145     pthread_mutex_unlock(&read_lock);
1146     ERROR("plugin_insert_read: llentry_create failed.");
1147     return (-1);
1148   }
1149
1150   status = c_heap_insert(read_heap, rf);
1151   if (status != 0) {
1152     pthread_mutex_unlock(&read_lock);
1153     ERROR("plugin_insert_read: c_heap_insert failed.");
1154     llentry_destroy(le);
1155     return (-1);
1156   }
1157
1158   /* This does not fail. */
1159   llist_append(read_list, le);
1160
1161   /* Wake up all the read threads. */
1162   pthread_cond_broadcast(&read_cond);
1163   pthread_mutex_unlock(&read_lock);
1164   return (0);
1165 } /* int plugin_insert_read */
1166
1167 int plugin_register_read(const char *name, int (*callback)(void)) {
1168   read_func_t *rf;
1169   int status;
1170
1171   rf = calloc(1, sizeof(*rf));
1172   if (rf == NULL) {
1173     ERROR("plugin_register_read: calloc failed.");
1174     return (ENOMEM);
1175   }
1176
1177   rf->rf_callback = (void *)callback;
1178   rf->rf_udata.data = NULL;
1179   rf->rf_udata.free_func = NULL;
1180   rf->rf_ctx = plugin_get_ctx();
1181   rf->rf_group[0] = '\0';
1182   rf->rf_name = strdup(name);
1183   rf->rf_type = RF_SIMPLE;
1184   rf->rf_interval = plugin_get_interval();
1185
1186   status = plugin_insert_read(rf);
1187   if (status != 0) {
1188     sfree(rf->rf_name);
1189     sfree(rf);
1190   }
1191
1192   return (status);
1193 } /* int plugin_register_read */
1194
1195 int plugin_register_complex_read(const char *group, const char *name,
1196                                  plugin_read_cb callback, cdtime_t interval,
1197                                  user_data_t const *user_data) {
1198   read_func_t *rf;
1199   int status;
1200
1201   rf = calloc(1, sizeof(*rf));
1202   if (rf == NULL) {
1203     ERROR("plugin_register_complex_read: calloc failed.");
1204     return (ENOMEM);
1205   }
1206
1207   rf->rf_callback = (void *)callback;
1208   if (group != NULL)
1209     sstrncpy(rf->rf_group, group, sizeof(rf->rf_group));
1210   else
1211     rf->rf_group[0] = '\0';
1212   rf->rf_name = strdup(name);
1213   rf->rf_type = RF_COMPLEX;
1214   rf->rf_interval = (interval != 0) ? interval : plugin_get_interval();
1215
1216   /* Set user data */
1217   if (user_data == NULL) {
1218     rf->rf_udata.data = NULL;
1219     rf->rf_udata.free_func = NULL;
1220   } else {
1221     rf->rf_udata = *user_data;
1222   }
1223
1224   rf->rf_ctx = plugin_get_ctx();
1225
1226   status = plugin_insert_read(rf);
1227   if (status != 0) {
1228     sfree(rf->rf_name);
1229     sfree(rf);
1230   }
1231
1232   return (status);
1233 } /* int plugin_register_complex_read */
1234
1235 int plugin_register_write(const char *name, plugin_write_cb callback,
1236                           user_data_t const *ud) {
1237   return (create_register_callback(&list_write, name, (void *)callback, ud));
1238 } /* int plugin_register_write */
1239
1240 static int plugin_flush_timeout_callback(user_data_t *ud) {
1241   flush_callback_t *cb = ud->data;
1242
1243   return plugin_flush(cb->name, cb->timeout, /* identifier = */ NULL);
1244 } /* static int plugin_flush_callback */
1245
1246 static void plugin_flush_timeout_callback_free(void *data) {
1247   flush_callback_t *cb = data;
1248
1249   if (cb == NULL)
1250     return;
1251
1252   sfree(cb->name);
1253   sfree(cb);
1254 } /* static void plugin_flush_callback_free */
1255
1256 static char *plugin_flush_callback_name(const char *name) {
1257   const char *flush_prefix = "flush/";
1258   size_t prefix_size;
1259   char *flush_name;
1260   size_t name_size;
1261
1262   prefix_size = strlen(flush_prefix);
1263   name_size = strlen(name);
1264
1265   flush_name = malloc(name_size + prefix_size + 1);
1266   if (flush_name == NULL) {
1267     ERROR("plugin_flush_callback_name: malloc failed.");
1268     return (NULL);
1269   }
1270
1271   sstrncpy(flush_name, flush_prefix, prefix_size + 1);
1272   sstrncpy(flush_name + prefix_size, name, name_size + 1);
1273
1274   return flush_name;
1275 } /* static char *plugin_flush_callback_name */
1276
1277 int plugin_register_flush(const char *name, plugin_flush_cb callback,
1278                           user_data_t const *ud) {
1279   int status;
1280   plugin_ctx_t ctx = plugin_get_ctx();
1281
1282   status = create_register_callback(&list_flush, name, (void *)callback, ud);
1283   if (status != 0)
1284     return status;
1285
1286   if (ctx.flush_interval != 0) {
1287     char *flush_name;
1288     flush_callback_t *cb;
1289
1290     flush_name = plugin_flush_callback_name(name);
1291     if (flush_name == NULL)
1292       return (-1);
1293
1294     cb = malloc(sizeof(*cb));
1295     if (cb == NULL) {
1296       ERROR("plugin_register_flush: malloc failed.");
1297       sfree(flush_name);
1298       return (-1);
1299     }
1300
1301     cb->name = strdup(name);
1302     if (cb->name == NULL) {
1303       ERROR("plugin_register_flush: strdup failed.");
1304       sfree(cb);
1305       sfree(flush_name);
1306       return (-1);
1307     }
1308     cb->timeout = ctx.flush_timeout;
1309
1310     status = plugin_register_complex_read(
1311         /* group     = */ "flush",
1312         /* name      = */ flush_name,
1313         /* callback  = */ plugin_flush_timeout_callback,
1314         /* interval  = */ ctx.flush_interval,
1315         /* user data = */ &(user_data_t){
1316             .data = cb, .free_func = plugin_flush_timeout_callback_free,
1317         });
1318
1319     sfree(flush_name);
1320     if (status != 0) {
1321       sfree(cb->name);
1322       sfree(cb);
1323       return status;
1324     }
1325   }
1326
1327   return 0;
1328 } /* int plugin_register_flush */
1329
1330 int plugin_register_missing(const char *name, plugin_missing_cb callback,
1331                             user_data_t const *ud) {
1332   return (create_register_callback(&list_missing, name, (void *)callback, ud));
1333 } /* int plugin_register_missing */
1334
1335 int plugin_register_shutdown(const char *name, int (*callback)(void)) {
1336   return (create_register_callback(&list_shutdown, name, (void *)callback,
1337                                    /* user_data = */ NULL));
1338 } /* int plugin_register_shutdown */
1339
1340 static void plugin_free_data_sets(void) {
1341   void *key;
1342   void *value;
1343
1344   if (data_sets == NULL)
1345     return;
1346
1347   while (c_avl_pick(data_sets, &key, &value) == 0) {
1348     data_set_t *ds = value;
1349     /* key is a pointer to ds->type */
1350
1351     sfree(ds->ds);
1352     sfree(ds);
1353   }
1354
1355   c_avl_destroy(data_sets);
1356   data_sets = NULL;
1357 } /* void plugin_free_data_sets */
1358
1359 int plugin_register_data_set(const data_set_t *ds) {
1360   data_set_t *ds_copy;
1361
1362   if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) {
1363     NOTICE("Replacing DS `%s' with another version.", ds->type);
1364     plugin_unregister_data_set(ds->type);
1365   } else if (data_sets == NULL) {
1366     data_sets = c_avl_create((int (*)(const void *, const void *))strcmp);
1367     if (data_sets == NULL)
1368       return (-1);
1369   }
1370
1371   ds_copy = malloc(sizeof(*ds_copy));
1372   if (ds_copy == NULL)
1373     return (-1);
1374   memcpy(ds_copy, ds, sizeof(data_set_t));
1375
1376   ds_copy->ds = malloc(sizeof(*ds_copy->ds) * ds->ds_num);
1377   if (ds_copy->ds == NULL) {
1378     sfree(ds_copy);
1379     return (-1);
1380   }
1381
1382   for (size_t i = 0; i < ds->ds_num; i++)
1383     memcpy(ds_copy->ds + i, ds->ds + i, sizeof(data_source_t));
1384
1385   return (c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy));
1386 } /* int plugin_register_data_set */
1387
1388 int plugin_register_log(const char *name, plugin_log_cb callback,
1389                         user_data_t const *ud) {
1390   return (create_register_callback(&list_log, name, (void *)callback, ud));
1391 } /* int plugin_register_log */
1392
1393 int plugin_register_notification(const char *name,
1394                                  plugin_notification_cb callback,
1395                                  user_data_t const *ud) {
1396   return (
1397       create_register_callback(&list_notification, name, (void *)callback, ud));
1398 } /* int plugin_register_log */
1399
1400 int plugin_unregister_config(const char *name) {
1401   cf_unregister(name);
1402   return (0);
1403 } /* int plugin_unregister_config */
1404
1405 int plugin_unregister_complex_config(const char *name) {
1406   cf_unregister_complex(name);
1407   return (0);
1408 } /* int plugin_unregister_complex_config */
1409
1410 int plugin_unregister_init(const char *name) {
1411   return (plugin_unregister(list_init, name));
1412 }
1413
1414 int plugin_unregister_read(const char *name) /* {{{ */
1415 {
1416   llentry_t *le;
1417   read_func_t *rf;
1418
1419   if (name == NULL)
1420     return (-ENOENT);
1421
1422   pthread_mutex_lock(&read_lock);
1423
1424   if (read_list == NULL) {
1425     pthread_mutex_unlock(&read_lock);
1426     return (-ENOENT);
1427   }
1428
1429   le = llist_search(read_list, name);
1430   if (le == NULL) {
1431     pthread_mutex_unlock(&read_lock);
1432     WARNING("plugin_unregister_read: No such read function: %s", name);
1433     return (-ENOENT);
1434   }
1435
1436   llist_remove(read_list, le);
1437
1438   rf = le->value;
1439   assert(rf != NULL);
1440   rf->rf_type = RF_REMOVE;
1441
1442   pthread_mutex_unlock(&read_lock);
1443
1444   llentry_destroy(le);
1445
1446   DEBUG("plugin_unregister_read: Marked `%s' for removal.", name);
1447
1448   return (0);
1449 } /* }}} int plugin_unregister_read */
1450
1451 void plugin_log_available_writers(void) {
1452   log_list_callbacks(&list_write, "Available write targets:");
1453 }
1454
1455 static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */
1456 {
1457   read_func_t *rf = e->value;
1458   char *group = ud;
1459
1460   return strcmp(rf->rf_group, (const char *)group);
1461 } /* }}} int compare_read_func_group */
1462
1463 int plugin_unregister_read_group(const char *group) /* {{{ */
1464 {
1465   llentry_t *le;
1466   read_func_t *rf;
1467
1468   int found = 0;
1469
1470   if (group == NULL)
1471     return (-ENOENT);
1472
1473   pthread_mutex_lock(&read_lock);
1474
1475   if (read_list == NULL) {
1476     pthread_mutex_unlock(&read_lock);
1477     return (-ENOENT);
1478   }
1479
1480   while (42) {
1481     le = llist_search_custom(read_list, compare_read_func_group, (void *)group);
1482
1483     if (le == NULL)
1484       break;
1485
1486     ++found;
1487
1488     llist_remove(read_list, le);
1489
1490     rf = le->value;
1491     assert(rf != NULL);
1492     rf->rf_type = RF_REMOVE;
1493
1494     llentry_destroy(le);
1495
1496     DEBUG("plugin_unregister_read_group: "
1497           "Marked `%s' (group `%s') for removal.",
1498           rf->rf_name, group);
1499   }
1500
1501   pthread_mutex_unlock(&read_lock);
1502
1503   if (found == 0) {
1504     WARNING("plugin_unregister_read_group: No such "
1505             "group of read function: %s",
1506             group);
1507     return (-ENOENT);
1508   }
1509
1510   return (0);
1511 } /* }}} int plugin_unregister_read_group */
1512
1513 int plugin_unregister_write(const char *name) {
1514   return (plugin_unregister(list_write, name));
1515 }
1516
1517 int plugin_unregister_flush(const char *name) {
1518   plugin_ctx_t ctx = plugin_get_ctx();
1519
1520   if (ctx.flush_interval != 0) {
1521     char *flush_name;
1522
1523     flush_name = plugin_flush_callback_name(name);
1524     if (flush_name != NULL) {
1525       plugin_unregister_read(flush_name);
1526       sfree(flush_name);
1527     }
1528   }
1529
1530   return plugin_unregister(list_flush, name);
1531 }
1532
1533 int plugin_unregister_missing(const char *name) {
1534   return (plugin_unregister(list_missing, name));
1535 }
1536
1537 int plugin_unregister_shutdown(const char *name) {
1538   return (plugin_unregister(list_shutdown, name));
1539 }
1540
1541 int plugin_unregister_data_set(const char *name) {
1542   data_set_t *ds;
1543
1544   if (data_sets == NULL)
1545     return (-1);
1546
1547   if (c_avl_remove(data_sets, name, NULL, (void *)&ds) != 0)
1548     return (-1);
1549
1550   sfree(ds->ds);
1551   sfree(ds);
1552
1553   return (0);
1554 } /* int plugin_unregister_data_set */
1555
1556 int plugin_unregister_log(const char *name) {
1557   return (plugin_unregister(list_log, name));
1558 }
1559
1560 int plugin_unregister_notification(const char *name) {
1561   return (plugin_unregister(list_notification, name));
1562 }
1563
1564 int plugin_init_all(void) {
1565   char const *chain_name;
1566   llentry_t *le;
1567   int status;
1568   int ret = 0;
1569
1570   /* Init the value cache */
1571   uc_init();
1572
1573   if (IS_TRUE(global_option_get("CollectInternalStats"))) {
1574     record_statistics = 1;
1575     plugin_register_read("collectd", plugin_update_internal_statistics);
1576   }
1577
1578   chain_name = global_option_get("PreCacheChain");
1579   pre_cache_chain = fc_chain_get_by_name(chain_name);
1580
1581   chain_name = global_option_get("PostCacheChain");
1582   post_cache_chain = fc_chain_get_by_name(chain_name);
1583
1584   write_limit_high = global_option_get_long("WriteQueueLimitHigh",
1585                                             /* default = */ 0);
1586   if (write_limit_high < 0) {
1587     ERROR("WriteQueueLimitHigh must be positive or zero.");
1588     write_limit_high = 0;
1589   }
1590
1591   write_limit_low =
1592       global_option_get_long("WriteQueueLimitLow",
1593                              /* default = */ write_limit_high / 2);
1594   if (write_limit_low < 0) {
1595     ERROR("WriteQueueLimitLow must be positive or zero.");
1596     write_limit_low = write_limit_high / 2;
1597   } else if (write_limit_low > write_limit_high) {
1598     ERROR("WriteQueueLimitLow must not be larger than "
1599           "WriteQueueLimitHigh.");
1600     write_limit_low = write_limit_high;
1601   }
1602
1603   write_threads_num = global_option_get_long("WriteThreads",
1604                                              /* default = */ 5);
1605   if (write_threads_num < 1) {
1606     ERROR("WriteThreads must be positive.");
1607     write_threads_num = 5;
1608   }
1609
1610   if ((list_init == NULL) && (read_heap == NULL))
1611     return ret;
1612
1613   /* Calling all init callbacks before checking if read callbacks
1614    * are available allows the init callbacks to register the read
1615    * callback. */
1616   le = llist_head(list_init);
1617   while (le != NULL) {
1618     callback_func_t *cf;
1619     plugin_init_cb callback;
1620     plugin_ctx_t old_ctx;
1621
1622     cf = le->value;
1623     old_ctx = plugin_set_ctx(cf->cf_ctx);
1624     callback = cf->cf_callback;
1625     status = (*callback)();
1626     plugin_set_ctx(old_ctx);
1627
1628     if (status != 0) {
1629       ERROR("Initialization of plugin `%s' "
1630             "failed with status %i. "
1631             "Plugin will be unloaded.",
1632             le->key, status);
1633       /* Plugins that register read callbacks from the init
1634        * callback should take care of appropriate error
1635        * handling themselves. */
1636       /* FIXME: Unload _all_ functions */
1637       plugin_unregister_read(le->key);
1638       ret = -1;
1639     }
1640
1641     le = le->next;
1642   }
1643
1644   start_write_threads((size_t)write_threads_num);
1645
1646   max_read_interval =
1647       global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL);
1648
1649   /* Start read-threads */
1650   if (read_heap != NULL) {
1651     const char *rt;
1652     int num;
1653
1654     rt = global_option_get("ReadThreads");
1655     num = atoi(rt);
1656     if (num != -1)
1657       start_read_threads((num > 0) ? ((size_t)num) : 5);
1658   }
1659   return ret;
1660 } /* void plugin_init_all */
1661
1662 /* TODO: Rename this function. */
1663 void plugin_read_all(void) {
1664   uc_check_timeout();
1665
1666   return;
1667 } /* void plugin_read_all */
1668
1669 /* Read function called when the `-T' command line argument is given. */
1670 int plugin_read_all_once(void) {
1671   int status;
1672   int return_status = 0;
1673
1674   if (read_heap == NULL) {
1675     NOTICE("No read-functions are registered.");
1676     return (0);
1677   }
1678
1679   while (42) {
1680     read_func_t *rf;
1681     plugin_ctx_t old_ctx;
1682
1683     rf = c_heap_get_root(read_heap);
1684     if (rf == NULL)
1685       break;
1686
1687     old_ctx = plugin_set_ctx(rf->rf_ctx);
1688
1689     if (rf->rf_type == RF_SIMPLE) {
1690       int (*callback)(void);
1691
1692       callback = rf->rf_callback;
1693       status = (*callback)();
1694     } else {
1695       plugin_read_cb callback;
1696
1697       callback = rf->rf_callback;
1698       status = (*callback)(&rf->rf_udata);
1699     }
1700
1701     plugin_set_ctx(old_ctx);
1702
1703     if (status != 0) {
1704       NOTICE("read-function of plugin `%s' failed.", rf->rf_name);
1705       return_status = -1;
1706     }
1707
1708     sfree(rf->rf_name);
1709     destroy_callback((void *)rf);
1710   }
1711
1712   return (return_status);
1713 } /* int plugin_read_all_once */
1714
1715 int plugin_write(const char *plugin, /* {{{ */
1716                  const data_set_t *ds, const value_list_t *vl) {
1717   llentry_t *le;
1718   int status;
1719
1720   if (vl == NULL)
1721     return (EINVAL);
1722
1723   if (list_write == NULL)
1724     return (ENOENT);
1725
1726   if (ds == NULL) {
1727     ds = plugin_get_ds(vl->type);
1728     if (ds == NULL) {
1729       ERROR("plugin_write: Unable to lookup type `%s'.", vl->type);
1730       return (ENOENT);
1731     }
1732   }
1733
1734   if (plugin == NULL) {
1735     int success = 0;
1736     int failure = 0;
1737
1738     le = llist_head(list_write);
1739     while (le != NULL) {
1740       callback_func_t *cf = le->value;
1741       plugin_write_cb callback;
1742
1743       /* do not switch plugin context; rather keep the context (interval)
1744        * information of the calling read plugin */
1745
1746       DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1747       callback = cf->cf_callback;
1748       status = (*callback)(ds, vl, &cf->cf_udata);
1749       if (status != 0)
1750         failure++;
1751       else
1752         success++;
1753
1754       le = le->next;
1755     }
1756
1757     if ((success == 0) && (failure != 0))
1758       status = -1;
1759     else
1760       status = 0;
1761   } else /* plugin != NULL */
1762   {
1763     callback_func_t *cf;
1764     plugin_write_cb callback;
1765
1766     le = llist_head(list_write);
1767     while (le != NULL) {
1768       if (strcasecmp(plugin, le->key) == 0)
1769         break;
1770
1771       le = le->next;
1772     }
1773
1774     if (le == NULL)
1775       return (ENOENT);
1776
1777     cf = le->value;
1778
1779     /* do not switch plugin context; rather keep the context (interval)
1780      * information of the calling read plugin */
1781
1782     DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1783     callback = cf->cf_callback;
1784     status = (*callback)(ds, vl, &cf->cf_udata);
1785   }
1786
1787   return (status);
1788 } /* }}} int plugin_write */
1789
1790 int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier) {
1791   llentry_t *le;
1792
1793   if (list_flush == NULL)
1794     return (0);
1795
1796   le = llist_head(list_flush);
1797   while (le != NULL) {
1798     callback_func_t *cf;
1799     plugin_flush_cb callback;
1800     plugin_ctx_t old_ctx;
1801
1802     if ((plugin != NULL) && (strcmp(plugin, le->key) != 0)) {
1803       le = le->next;
1804       continue;
1805     }
1806
1807     cf = le->value;
1808     old_ctx = plugin_set_ctx(cf->cf_ctx);
1809     callback = cf->cf_callback;
1810
1811     (*callback)(timeout, identifier, &cf->cf_udata);
1812
1813     plugin_set_ctx(old_ctx);
1814
1815     le = le->next;
1816   }
1817   return (0);
1818 } /* int plugin_flush */
1819
1820 int plugin_shutdown_all(void) {
1821   llentry_t *le;
1822   int ret = 0; // Assume success.
1823
1824   destroy_all_callbacks(&list_init);
1825
1826   stop_read_threads();
1827
1828   pthread_mutex_lock(&read_lock);
1829   llist_destroy(read_list);
1830   read_list = NULL;
1831   pthread_mutex_unlock(&read_lock);
1832
1833   destroy_read_heap();
1834
1835   /* blocks until all write threads have shut down. */
1836   stop_write_threads();
1837
1838   /* ask all plugins to write out the state they kept. */
1839   plugin_flush(/* plugin = */ NULL,
1840                /* timeout = */ 0,
1841                /* identifier = */ NULL);
1842
1843   le = NULL;
1844   if (list_shutdown != NULL)
1845     le = llist_head(list_shutdown);
1846
1847   while (le != NULL) {
1848     callback_func_t *cf;
1849     plugin_shutdown_cb callback;
1850     plugin_ctx_t old_ctx;
1851
1852     cf = le->value;
1853     old_ctx = plugin_set_ctx(cf->cf_ctx);
1854     callback = cf->cf_callback;
1855
1856     /* Advance the pointer before calling the callback allows
1857      * shutdown functions to unregister themselves. If done the
1858      * other way around the memory `le' points to will be freed
1859      * after callback returns. */
1860     le = le->next;
1861
1862     if ((*callback)() != 0)
1863       ret = -1;
1864
1865     plugin_set_ctx(old_ctx);
1866   }
1867
1868   /* Write plugins which use the `user_data' pointer usually need the
1869    * same data available to the flush callback. If this is the case, set
1870    * the free_function to NULL when registering the flush callback and to
1871    * the real free function when registering the write callback. This way
1872    * the data isn't freed twice. */
1873   destroy_all_callbacks(&list_flush);
1874   destroy_all_callbacks(&list_missing);
1875   destroy_all_callbacks(&list_write);
1876
1877   destroy_all_callbacks(&list_notification);
1878   destroy_all_callbacks(&list_shutdown);
1879   destroy_all_callbacks(&list_log);
1880
1881   plugin_free_loaded();
1882   plugin_free_data_sets();
1883   return (ret);
1884 } /* void plugin_shutdown_all */
1885
1886 int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
1887 {
1888   llentry_t *le;
1889
1890   if (list_missing == NULL)
1891     return (0);
1892
1893   le = llist_head(list_missing);
1894   while (le != NULL) {
1895     callback_func_t *cf;
1896     plugin_missing_cb callback;
1897     plugin_ctx_t old_ctx;
1898     int status;
1899
1900     cf = le->value;
1901     old_ctx = plugin_set_ctx(cf->cf_ctx);
1902     callback = cf->cf_callback;
1903
1904     status = (*callback)(vl, &cf->cf_udata);
1905     plugin_set_ctx(old_ctx);
1906     if (status != 0) {
1907       if (status < 0) {
1908         ERROR("plugin_dispatch_missing: Callback function \"%s\" "
1909               "failed with status %i.",
1910               le->key, status);
1911         return (status);
1912       } else {
1913         return (0);
1914       }
1915     }
1916
1917     le = le->next;
1918   }
1919   return (0);
1920 } /* int }}} plugin_dispatch_missing */
1921
1922 static int plugin_dispatch_values_internal(value_list_t *vl) {
1923   int status;
1924   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
1925
1926   data_set_t *ds;
1927
1928   _Bool free_meta_data = 0;
1929
1930   assert(vl != NULL);
1931
1932   /* These fields are initialized by plugin_value_list_clone() if needed: */
1933   assert(vl->host[0] != 0);
1934   assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
1935   assert(vl->interval != 0);
1936
1937   if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
1938     ERROR("plugin_dispatch_values: Invalid value list "
1939           "from plugin %s.",
1940           vl->plugin);
1941     return (-1);
1942   }
1943
1944   /* Free meta data only if the calling function didn't specify any. In
1945    * this case matches and targets may add some and the calling function
1946    * may not expect (and therefore free) that data. */
1947   if (vl->meta == NULL)
1948     free_meta_data = 1;
1949
1950   if (list_write == NULL)
1951     c_complain_once(LOG_WARNING, &no_write_complaint,
1952                     "plugin_dispatch_values: No write callback has been "
1953                     "registered. Please load at least one output plugin, "
1954                     "if you want the collected data to be stored.");
1955
1956   if (data_sets == NULL) {
1957     ERROR("plugin_dispatch_values: No data sets registered. "
1958           "Could the types database be read? Check "
1959           "your `TypesDB' setting!");
1960     return (-1);
1961   }
1962
1963   if (c_avl_get(data_sets, vl->type, (void *)&ds) != 0) {
1964     char ident[6 * DATA_MAX_NAME_LEN];
1965
1966     FORMAT_VL(ident, sizeof(ident), vl);
1967     INFO("plugin_dispatch_values: Dataset not found: %s "
1968          "(from \"%s\"), check your types.db!",
1969          vl->type, ident);
1970     return (-1);
1971   }
1972
1973   DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
1974         "host = %s; "
1975         "plugin = %s; plugin_instance = %s; "
1976         "type = %s; type_instance = %s;",
1977         CDTIME_T_TO_DOUBLE(vl->time), CDTIME_T_TO_DOUBLE(vl->interval),
1978         vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1979
1980 #if COLLECT_DEBUG
1981   assert(0 == strcmp(ds->type, vl->type));
1982 #else
1983   if (0 != strcmp(ds->type, vl->type))
1984     WARNING("plugin_dispatch_values: (ds->type = %s) != (vl->type = %s)",
1985             ds->type, vl->type);
1986 #endif
1987
1988 #if COLLECT_DEBUG
1989   assert(ds->ds_num == vl->values_len);
1990 #else
1991   if (ds->ds_num != vl->values_len) {
1992     ERROR("plugin_dispatch_values: ds->type = %s: "
1993           "(ds->ds_num = %zu) != "
1994           "(vl->values_len = %zu)",
1995           ds->type, ds->ds_num, vl->values_len);
1996     return (-1);
1997   }
1998 #endif
1999
2000   escape_slashes(vl->host, sizeof(vl->host));
2001   escape_slashes(vl->plugin, sizeof(vl->plugin));
2002   escape_slashes(vl->plugin_instance, sizeof(vl->plugin_instance));
2003   escape_slashes(vl->type, sizeof(vl->type));
2004   escape_slashes(vl->type_instance, sizeof(vl->type_instance));
2005
2006   if (pre_cache_chain != NULL) {
2007     status = fc_process_chain(ds, vl, pre_cache_chain);
2008     if (status < 0) {
2009       WARNING("plugin_dispatch_values: Running the "
2010               "pre-cache chain failed with "
2011               "status %i (%#x).",
2012               status, status);
2013     } else if (status == FC_TARGET_STOP)
2014       return (0);
2015   }
2016
2017   /* Update the value cache */
2018   uc_update(ds, vl);
2019
2020   if (post_cache_chain != NULL) {
2021     status = fc_process_chain(ds, vl, post_cache_chain);
2022     if (status < 0) {
2023       WARNING("plugin_dispatch_values: Running the "
2024               "post-cache chain failed with "
2025               "status %i (%#x).",
2026               status, status);
2027     }
2028   } else
2029     fc_default_action(ds, vl);
2030
2031   if ((free_meta_data != 0) && (vl->meta != NULL)) {
2032     meta_data_destroy(vl->meta);
2033     vl->meta = NULL;
2034   }
2035
2036   return (0);
2037 } /* int plugin_dispatch_values_internal */
2038
2039 static double get_drop_probability(void) /* {{{ */
2040 {
2041   long pos;
2042   long size;
2043   long wql;
2044
2045   pthread_mutex_lock(&write_lock);
2046   wql = write_queue_length;
2047   pthread_mutex_unlock(&write_lock);
2048
2049   if (wql < write_limit_low)
2050     return (0.0);
2051   if (wql >= write_limit_high)
2052     return (1.0);
2053
2054   pos = 1 + wql - write_limit_low;
2055   size = 1 + write_limit_high - write_limit_low;
2056
2057   return (((double)pos) / ((double)size));
2058 } /* }}} double get_drop_probability */
2059
2060 static _Bool check_drop_value(void) /* {{{ */
2061 {
2062   static cdtime_t last_message_time = 0;
2063   static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2064
2065   double p;
2066   double q;
2067   int status;
2068
2069   if (write_limit_high == 0)
2070     return (0);
2071
2072   p = get_drop_probability();
2073   if (p == 0.0)
2074     return (0);
2075
2076   status = pthread_mutex_trylock(&last_message_lock);
2077   if (status == 0) {
2078     cdtime_t now;
2079
2080     now = cdtime();
2081     if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) {
2082       last_message_time = now;
2083       ERROR("plugin_dispatch_values: Low water mark "
2084             "reached. Dropping %.0f%% of metrics.",
2085             100.0 * p);
2086     }
2087     pthread_mutex_unlock(&last_message_lock);
2088   }
2089
2090   if (p == 1.0)
2091     return (1);
2092
2093   q = cdrand_d();
2094   if (q > p)
2095     return (1);
2096   else
2097     return (0);
2098 } /* }}} _Bool check_drop_value */
2099
2100 int plugin_dispatch_values(value_list_t const *vl) {
2101   int status;
2102   static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
2103
2104   if (check_drop_value()) {
2105     if (record_statistics) {
2106       pthread_mutex_lock(&statistics_lock);
2107       stats_values_dropped++;
2108       pthread_mutex_unlock(&statistics_lock);
2109     }
2110     return (0);
2111   }
2112
2113   status = plugin_write_enqueue(vl);
2114   if (status != 0) {
2115     char errbuf[1024];
2116     ERROR("plugin_dispatch_values: plugin_write_enqueue failed "
2117           "with status %i (%s).",
2118           status, sstrerror(status, errbuf, sizeof(errbuf)));
2119     return (status);
2120   }
2121
2122   return (0);
2123 }
2124
2125 __attribute__((sentinel)) int
2126 plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */
2127                            _Bool store_percentage, int store_type, ...) {
2128   value_list_t *vl;
2129   int failed = 0;
2130   gauge_t sum = 0.0;
2131   va_list ap;
2132
2133   assert(template->values_len == 1);
2134
2135   /* Calculate sum for Gauge to calculate percent if needed */
2136   if (DS_TYPE_GAUGE == store_type) {
2137     va_start(ap, store_type);
2138     while (42) {
2139       char const *name;
2140       gauge_t value;
2141
2142       name = va_arg(ap, char const *);
2143       if (name == NULL)
2144         break;
2145
2146       value = va_arg(ap, gauge_t);
2147       if (!isnan(value))
2148         sum += value;
2149     }
2150     va_end(ap);
2151   }
2152
2153   vl = plugin_value_list_clone(template);
2154   /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2155   if (store_percentage)
2156     sstrncpy(vl->type, "percent", sizeof(vl->type));
2157
2158   va_start(ap, store_type);
2159   while (42) {
2160     char const *name;
2161     int status;
2162
2163     /* Set the type instance. */
2164     name = va_arg(ap, char const *);
2165     if (name == NULL)
2166       break;
2167     sstrncpy(vl->type_instance, name, sizeof(vl->type_instance));
2168
2169     /* Set the value. */
2170     switch (store_type) {
2171     case DS_TYPE_GAUGE:
2172       vl->values[0].gauge = va_arg(ap, gauge_t);
2173       if (store_percentage)
2174         vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2175       break;
2176     case DS_TYPE_ABSOLUTE:
2177       vl->values[0].absolute = va_arg(ap, absolute_t);
2178       break;
2179     case DS_TYPE_COUNTER:
2180       vl->values[0].counter = va_arg(ap, counter_t);
2181       break;
2182     case DS_TYPE_DERIVE:
2183       vl->values[0].derive = va_arg(ap, derive_t);
2184       break;
2185     default:
2186       ERROR("plugin_dispatch_multivalue: given store_type is incorrect.");
2187       failed++;
2188     }
2189
2190     status = plugin_write_enqueue(vl);
2191     if (status != 0)
2192       failed++;
2193   }
2194   va_end(ap);
2195
2196   plugin_value_list_free(vl);
2197   return (failed);
2198 } /* }}} int plugin_dispatch_multivalue */
2199
2200 int plugin_dispatch_notification(const notification_t *notif) {
2201   llentry_t *le;
2202   /* Possible TODO: Add flap detection here */
2203
2204   DEBUG("plugin_dispatch_notification: severity = %i; message = %s; "
2205         "time = %.3f; host = %s;",
2206         notif->severity, notif->message, CDTIME_T_TO_DOUBLE(notif->time),
2207         notif->host);
2208
2209   /* Nobody cares for notifications */
2210   if (list_notification == NULL)
2211     return (-1);
2212
2213   le = llist_head(list_notification);
2214   while (le != NULL) {
2215     callback_func_t *cf;
2216     plugin_notification_cb callback;
2217     int status;
2218
2219     /* do not switch plugin context; rather keep the context
2220      * (interval) information of the calling plugin */
2221
2222     cf = le->value;
2223     callback = cf->cf_callback;
2224     status = (*callback)(notif, &cf->cf_udata);
2225     if (status != 0) {
2226       WARNING("plugin_dispatch_notification: Notification "
2227               "callback %s returned %i.",
2228               le->key, status);
2229     }
2230
2231     le = le->next;
2232   }
2233
2234   return (0);
2235 } /* int plugin_dispatch_notification */
2236
2237 void plugin_log(int level, const char *format, ...) {
2238   char msg[1024];
2239   va_list ap;
2240   llentry_t *le;
2241
2242 #if !COLLECT_DEBUG
2243   if (level >= LOG_DEBUG)
2244     return;
2245 #endif
2246
2247   va_start(ap, format);
2248   vsnprintf(msg, sizeof(msg), format, ap);
2249   msg[sizeof(msg) - 1] = '\0';
2250   va_end(ap);
2251
2252   if (list_log == NULL) {
2253     fprintf(stderr, "%s\n", msg);
2254     return;
2255   }
2256
2257   le = llist_head(list_log);
2258   while (le != NULL) {
2259     callback_func_t *cf;
2260     plugin_log_cb callback;
2261
2262     cf = le->value;
2263     callback = cf->cf_callback;
2264
2265     /* do not switch plugin context; rather keep the context
2266      * (interval) information of the calling plugin */
2267
2268     (*callback)(level, msg, &cf->cf_udata);
2269
2270     le = le->next;
2271   }
2272 } /* void plugin_log */
2273
2274 int parse_log_severity(const char *severity) {
2275   int log_level = -1;
2276
2277   if ((0 == strcasecmp(severity, "emerg")) ||
2278       (0 == strcasecmp(severity, "alert")) ||
2279       (0 == strcasecmp(severity, "crit")) || (0 == strcasecmp(severity, "err")))
2280     log_level = LOG_ERR;
2281   else if (0 == strcasecmp(severity, "warning"))
2282     log_level = LOG_WARNING;
2283   else if (0 == strcasecmp(severity, "notice"))
2284     log_level = LOG_NOTICE;
2285   else if (0 == strcasecmp(severity, "info"))
2286     log_level = LOG_INFO;
2287 #if COLLECT_DEBUG
2288   else if (0 == strcasecmp(severity, "debug"))
2289     log_level = LOG_DEBUG;
2290 #endif /* COLLECT_DEBUG */
2291
2292   return (log_level);
2293 } /* int parse_log_severity */
2294
2295 int parse_notif_severity(const char *severity) {
2296   int notif_severity = -1;
2297
2298   if (strcasecmp(severity, "FAILURE") == 0)
2299     notif_severity = NOTIF_FAILURE;
2300   else if (strcmp(severity, "OKAY") == 0)
2301     notif_severity = NOTIF_OKAY;
2302   else if ((strcmp(severity, "WARNING") == 0) ||
2303            (strcmp(severity, "WARN") == 0))
2304     notif_severity = NOTIF_WARNING;
2305
2306   return (notif_severity);
2307 } /* int parse_notif_severity */
2308
2309 const data_set_t *plugin_get_ds(const char *name) {
2310   data_set_t *ds;
2311
2312   if (data_sets == NULL) {
2313     ERROR("plugin_get_ds: No data sets are defined yet.");
2314     return (NULL);
2315   }
2316
2317   if (c_avl_get(data_sets, name, (void *)&ds) != 0) {
2318     DEBUG("No such dataset registered: %s", name);
2319     return (NULL);
2320   }
2321
2322   return (ds);
2323 } /* data_set_t *plugin_get_ds */
2324
2325 static int plugin_notification_meta_add(notification_t *n, const char *name,
2326                                         enum notification_meta_type_e type,
2327                                         const void *value) {
2328   notification_meta_t *meta;
2329   notification_meta_t *tail;
2330
2331   if ((n == NULL) || (name == NULL) || (value == NULL)) {
2332     ERROR("plugin_notification_meta_add: A pointer is NULL!");
2333     return (-1);
2334   }
2335
2336   meta = calloc(1, sizeof(*meta));
2337   if (meta == NULL) {
2338     ERROR("plugin_notification_meta_add: calloc failed.");
2339     return (-1);
2340   }
2341
2342   sstrncpy(meta->name, name, sizeof(meta->name));
2343   meta->type = type;
2344
2345   switch (type) {
2346   case NM_TYPE_STRING: {
2347     meta->nm_value.nm_string = strdup((const char *)value);
2348     if (meta->nm_value.nm_string == NULL) {
2349       ERROR("plugin_notification_meta_add: strdup failed.");
2350       sfree(meta);
2351       return (-1);
2352     }
2353     break;
2354   }
2355   case NM_TYPE_SIGNED_INT: {
2356     meta->nm_value.nm_signed_int = *((int64_t *)value);
2357     break;
2358   }
2359   case NM_TYPE_UNSIGNED_INT: {
2360     meta->nm_value.nm_unsigned_int = *((uint64_t *)value);
2361     break;
2362   }
2363   case NM_TYPE_DOUBLE: {
2364     meta->nm_value.nm_double = *((double *)value);
2365     break;
2366   }
2367   case NM_TYPE_BOOLEAN: {
2368     meta->nm_value.nm_boolean = *((_Bool *)value);
2369     break;
2370   }
2371   default: {
2372     ERROR("plugin_notification_meta_add: Unknown type: %i", type);
2373     sfree(meta);
2374     return (-1);
2375   }
2376   } /* switch (type) */
2377
2378   meta->next = NULL;
2379   tail = n->meta;
2380   while ((tail != NULL) && (tail->next != NULL))
2381     tail = tail->next;
2382
2383   if (tail == NULL)
2384     n->meta = meta;
2385   else
2386     tail->next = meta;
2387
2388   return (0);
2389 } /* int plugin_notification_meta_add */
2390
2391 int plugin_notification_meta_add_string(notification_t *n, const char *name,
2392                                         const char *value) {
2393   return (plugin_notification_meta_add(n, name, NM_TYPE_STRING, value));
2394 }
2395
2396 int plugin_notification_meta_add_signed_int(notification_t *n, const char *name,
2397                                             int64_t value) {
2398   return (plugin_notification_meta_add(n, name, NM_TYPE_SIGNED_INT, &value));
2399 }
2400
2401 int plugin_notification_meta_add_unsigned_int(notification_t *n,
2402                                               const char *name,
2403                                               uint64_t value) {
2404   return (plugin_notification_meta_add(n, name, NM_TYPE_UNSIGNED_INT, &value));
2405 }
2406
2407 int plugin_notification_meta_add_double(notification_t *n, const char *name,
2408                                         double value) {
2409   return (plugin_notification_meta_add(n, name, NM_TYPE_DOUBLE, &value));
2410 }
2411
2412 int plugin_notification_meta_add_boolean(notification_t *n, const char *name,
2413                                          _Bool value) {
2414   return (plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value));
2415 }
2416
2417 int plugin_notification_meta_copy(notification_t *dst,
2418                                   const notification_t *src) {
2419   assert(dst != NULL);
2420   assert(src != NULL);
2421   assert(dst != src);
2422   assert((src->meta == NULL) || (src->meta != dst->meta));
2423
2424   for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next) {
2425     if (meta->type == NM_TYPE_STRING)
2426       plugin_notification_meta_add_string(dst, meta->name,
2427                                           meta->nm_value.nm_string);
2428     else if (meta->type == NM_TYPE_SIGNED_INT)
2429       plugin_notification_meta_add_signed_int(dst, meta->name,
2430                                               meta->nm_value.nm_signed_int);
2431     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2432       plugin_notification_meta_add_unsigned_int(dst, meta->name,
2433                                                 meta->nm_value.nm_unsigned_int);
2434     else if (meta->type == NM_TYPE_DOUBLE)
2435       plugin_notification_meta_add_double(dst, meta->name,
2436                                           meta->nm_value.nm_double);
2437     else if (meta->type == NM_TYPE_BOOLEAN)
2438       plugin_notification_meta_add_boolean(dst, meta->name,
2439                                            meta->nm_value.nm_boolean);
2440   }
2441
2442   return (0);
2443 } /* int plugin_notification_meta_copy */
2444
2445 int plugin_notification_meta_free(notification_meta_t *n) {
2446   notification_meta_t *this;
2447   notification_meta_t *next;
2448
2449   if (n == NULL) {
2450     ERROR("plugin_notification_meta_free: n == NULL!");
2451     return (-1);
2452   }
2453
2454   this = n;
2455   while (this != NULL) {
2456     next = this->next;
2457
2458     if (this->type == NM_TYPE_STRING) {
2459       /* Assign to a temporary variable to work around nm_string's const
2460        * modifier. */
2461       void *tmp = (void *)this->nm_value.nm_string;
2462
2463       sfree(tmp);
2464       this->nm_value.nm_string = NULL;
2465     }
2466     sfree(this);
2467
2468     this = next;
2469   }
2470
2471   return (0);
2472 } /* int plugin_notification_meta_free */
2473
2474 static void plugin_ctx_destructor(void *ctx) {
2475   sfree(ctx);
2476 } /* void plugin_ctx_destructor */
2477
2478 static plugin_ctx_t ctx_init = {/* interval = */ 0};
2479
2480 static plugin_ctx_t *plugin_ctx_create(void) {
2481   plugin_ctx_t *ctx;
2482
2483   ctx = malloc(sizeof(*ctx));
2484   if (ctx == NULL) {
2485     char errbuf[1024];
2486     ERROR("Failed to allocate plugin context: %s",
2487           sstrerror(errno, errbuf, sizeof(errbuf)));
2488     return NULL;
2489   }
2490
2491   *ctx = ctx_init;
2492   assert(plugin_ctx_key_initialized);
2493   pthread_setspecific(plugin_ctx_key, ctx);
2494   DEBUG("Created new plugin context.");
2495   return (ctx);
2496 } /* int plugin_ctx_create */
2497
2498 void plugin_init_ctx(void) {
2499   pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor);
2500   plugin_ctx_key_initialized = 1;
2501 } /* void plugin_init_ctx */
2502
2503 plugin_ctx_t plugin_get_ctx(void) {
2504   plugin_ctx_t *ctx;
2505
2506   assert(plugin_ctx_key_initialized);
2507   ctx = pthread_getspecific(plugin_ctx_key);
2508
2509   if (ctx == NULL) {
2510     ctx = plugin_ctx_create();
2511     /* this must no happen -- exit() instead? */
2512     if (ctx == NULL)
2513       return ctx_init;
2514   }
2515
2516   return (*ctx);
2517 } /* plugin_ctx_t plugin_get_ctx */
2518
2519 plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
2520   plugin_ctx_t *c;
2521   plugin_ctx_t old;
2522
2523   assert(plugin_ctx_key_initialized);
2524   c = pthread_getspecific(plugin_ctx_key);
2525
2526   if (c == NULL) {
2527     c = plugin_ctx_create();
2528     /* this must no happen -- exit() instead? */
2529     if (c == NULL)
2530       return ctx_init;
2531   }
2532
2533   old = *c;
2534   *c = ctx;
2535
2536   return (old);
2537 } /* void plugin_set_ctx */
2538
2539 cdtime_t plugin_get_interval(void) {
2540   cdtime_t interval;
2541
2542   interval = plugin_get_ctx().interval;
2543   if (interval > 0)
2544     return interval;
2545
2546   return cf_get_default_interval();
2547 } /* cdtime_t plugin_get_interval */
2548
2549 typedef struct {
2550   plugin_ctx_t ctx;
2551   void *(*start_routine)(void *);
2552   void *arg;
2553 } plugin_thread_t;
2554
2555 static void *plugin_thread_start(void *arg) {
2556   plugin_thread_t *plugin_thread = arg;
2557
2558   void *(*start_routine)(void *) = plugin_thread->start_routine;
2559   void *plugin_arg = plugin_thread->arg;
2560
2561   plugin_set_ctx(plugin_thread->ctx);
2562
2563   sfree(plugin_thread);
2564
2565   return start_routine(plugin_arg);
2566 } /* void *plugin_thread_start */
2567
2568 int plugin_thread_create(pthread_t *thread, const pthread_attr_t *attr,
2569                          void *(*start_routine)(void *), void *arg,
2570                          char const *name) {
2571   plugin_thread_t *plugin_thread;
2572
2573   plugin_thread = malloc(sizeof(*plugin_thread));
2574   if (plugin_thread == NULL)
2575     return ENOMEM;
2576
2577   plugin_thread->ctx = plugin_get_ctx();
2578   plugin_thread->start_routine = start_routine;
2579   plugin_thread->arg = arg;
2580
2581   int ret = pthread_create(thread, attr, plugin_thread_start, plugin_thread);
2582   if (ret != 0) {
2583     sfree(plugin_thread);
2584     return ret;
2585   }
2586
2587   if (name != NULL)
2588     set_thread_name(*thread, name);
2589
2590   return 0;
2591 } /* int plugin_thread_create */
2592
2593 /* vim: set sw=8 ts=8 noet fdm=marker : */