Merge branch 'collectd-5.8'
[collectd.git] / src / daemon / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27
28 /* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
29 #define _GNU_SOURCE
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "configfile.h"
35 #include "filter_chain.h"
36 #include "plugin.h"
37 #include "utils_avltree.h"
38 #include "utils_cache.h"
39 #include "utils_complain.h"
40 #include "utils_heap.h"
41 #include "utils_llist.h"
42 #include "utils_random.h"
43 #include "utils_time.h"
44
45 #if HAVE_PTHREAD_NP_H
46 #include <pthread_np.h> /* for pthread_set_name_np(3) */
47 #endif
48
49 #include <dlfcn.h>
50
51 /*
52  * Private structures
53  */
54 struct callback_func_s {
55   void *cf_callback;
56   user_data_t cf_udata;
57   plugin_ctx_t cf_ctx;
58 };
59 typedef struct callback_func_s callback_func_t;
60
61 #define RF_SIMPLE 0
62 #define RF_COMPLEX 1
63 #define RF_REMOVE 65535
64 struct read_func_s {
65 /* `read_func_t' "inherits" from `callback_func_t'.
66  * The `rf_super' member MUST be the first one in this structure! */
67 #define rf_callback rf_super.cf_callback
68 #define rf_udata rf_super.cf_udata
69 #define rf_ctx rf_super.cf_ctx
70   callback_func_t rf_super;
71   char rf_group[DATA_MAX_NAME_LEN];
72   char *rf_name;
73   int rf_type;
74   cdtime_t rf_interval;
75   cdtime_t rf_effective_interval;
76   cdtime_t rf_next_read;
77 };
78 typedef struct read_func_s read_func_t;
79
80 struct write_queue_s;
81 typedef struct write_queue_s write_queue_t;
82 struct write_queue_s {
83   value_list_t *vl;
84   plugin_ctx_t ctx;
85   write_queue_t *next;
86 };
87
88 struct flush_callback_s {
89   char *name;
90   cdtime_t timeout;
91 };
92 typedef struct flush_callback_s flush_callback_t;
93
94 /*
95  * Private variables
96  */
97 static c_avl_tree_t *plugins_loaded;
98
99 static llist_t *list_init;
100 static llist_t *list_write;
101 static llist_t *list_flush;
102 static llist_t *list_missing;
103 static llist_t *list_shutdown;
104 static llist_t *list_log;
105 static llist_t *list_notification;
106
107 static fc_chain_t *pre_cache_chain;
108 static fc_chain_t *post_cache_chain;
109
110 static c_avl_tree_t *data_sets;
111
112 static char *plugindir;
113
114 #ifndef DEFAULT_MAX_READ_INTERVAL
115 #define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
116 #endif
117 static c_heap_t *read_heap;
118 static llist_t *read_list;
119 static int read_loop = 1;
120 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
121 static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
122 static pthread_t *read_threads;
123 static size_t read_threads_num;
124 static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
125
126 static write_queue_t *write_queue_head;
127 static write_queue_t *write_queue_tail;
128 static long write_queue_length;
129 static bool write_loop = true;
130 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
131 static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
132 static pthread_t *write_threads;
133 static size_t write_threads_num;
134
135 static pthread_key_t plugin_ctx_key;
136 static bool plugin_ctx_key_initialized;
137
138 static long write_limit_high;
139 static long write_limit_low;
140
141 static derive_t stats_values_dropped;
142 static bool record_statistics;
143
144 /*
145  * Static functions
146  */
147 static int plugin_dispatch_values_internal(value_list_t *vl);
148
149 static const char *plugin_get_dir(void) {
150   if (plugindir == NULL)
151     return PLUGINDIR;
152   else
153     return plugindir;
154 }
155
156 static int plugin_update_internal_statistics(void) { /* {{{ */
157   gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
158
159   /* Initialize `vl' */
160   value_list_t vl = VALUE_LIST_INIT;
161   sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
162   vl.interval = plugin_get_interval();
163
164   /* Write queue */
165   sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
166
167   /* Write queue : queue length */
168   vl.values = &(value_t){.gauge = copy_write_queue_length};
169   vl.values_len = 1;
170   sstrncpy(vl.type, "queue_length", sizeof(vl.type));
171   vl.type_instance[0] = 0;
172   plugin_dispatch_values(&vl);
173
174   /* Write queue : Values dropped (queue length > low limit) */
175   vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
176   vl.values_len = 1;
177   sstrncpy(vl.type, "derive", sizeof(vl.type));
178   sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
179   plugin_dispatch_values(&vl);
180
181   /* Cache */
182   sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
183
184   /* Cache : Nb entry in cache tree */
185   vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
186   vl.values_len = 1;
187   sstrncpy(vl.type, "cache_size", sizeof(vl.type));
188   vl.type_instance[0] = 0;
189   plugin_dispatch_values(&vl);
190
191   return 0;
192 } /* }}} int plugin_update_internal_statistics */
193
194 static void free_userdata(user_data_t const *ud) /* {{{ */
195 {
196   if (ud == NULL)
197     return;
198
199   if ((ud->data != NULL) && (ud->free_func != NULL)) {
200     ud->free_func(ud->data);
201   }
202 } /* }}} void free_userdata */
203
204 static void destroy_callback(callback_func_t *cf) /* {{{ */
205 {
206   if (cf == NULL)
207     return;
208   free_userdata(&cf->cf_udata);
209   sfree(cf);
210 } /* }}} void destroy_callback */
211
212 static void destroy_all_callbacks(llist_t **list) /* {{{ */
213 {
214   llentry_t *le;
215
216   if (*list == NULL)
217     return;
218
219   le = llist_head(*list);
220   while (le != NULL) {
221     llentry_t *le_next;
222
223     le_next = le->next;
224
225     sfree(le->key);
226     destroy_callback(le->value);
227     le->value = NULL;
228
229     le = le_next;
230   }
231
232   llist_destroy(*list);
233   *list = NULL;
234 } /* }}} void destroy_all_callbacks */
235
236 static void destroy_read_heap(void) /* {{{ */
237 {
238   if (read_heap == NULL)
239     return;
240
241   while (42) {
242     read_func_t *rf;
243
244     rf = c_heap_get_root(read_heap);
245     if (rf == NULL)
246       break;
247     sfree(rf->rf_name);
248     destroy_callback((callback_func_t *)rf);
249   }
250
251   c_heap_destroy(read_heap);
252   read_heap = NULL;
253 } /* }}} void destroy_read_heap */
254
255 static int register_callback(llist_t **list, /* {{{ */
256                              const char *name, callback_func_t *cf) {
257   llentry_t *le;
258   char *key;
259
260   if (*list == NULL) {
261     *list = llist_create();
262     if (*list == NULL) {
263       ERROR("plugin: register_callback: "
264             "llist_create failed.");
265       destroy_callback(cf);
266       return -1;
267     }
268   }
269
270   key = strdup(name);
271   if (key == NULL) {
272     ERROR("plugin: register_callback: strdup failed.");
273     destroy_callback(cf);
274     return -1;
275   }
276
277   le = llist_search(*list, name);
278   if (le == NULL) {
279     le = llentry_create(key, cf);
280     if (le == NULL) {
281       ERROR("plugin: register_callback: "
282             "llentry_create failed.");
283       sfree(key);
284       destroy_callback(cf);
285       return -1;
286     }
287
288     llist_append(*list, le);
289   } else {
290     callback_func_t *old_cf;
291
292     old_cf = le->value;
293     le->value = cf;
294
295     WARNING("plugin: register_callback: "
296             "a callback named `%s' already exists - "
297             "overwriting the old entry!",
298             name);
299
300     destroy_callback(old_cf);
301     sfree(key);
302   }
303
304   return 0;
305 } /* }}} int register_callback */
306
307 static void log_list_callbacks(llist_t **list, /* {{{ */
308                                const char *comment) {
309   char *str;
310   int len;
311   int i;
312   llentry_t *le;
313   int n;
314   char **keys;
315
316   n = llist_size(*list);
317   if (n == 0) {
318     INFO("%s [none]", comment);
319     return;
320   }
321
322   keys = calloc(n, sizeof(char *));
323
324   if (keys == NULL) {
325     ERROR("%s: failed to allocate memory for list of callbacks", comment);
326
327     return;
328   }
329
330   for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) {
331     keys[i] = le->key;
332     len += strlen(le->key) + 6;
333   }
334   str = malloc(len + 10);
335   if (str == NULL) {
336     ERROR("%s: failed to allocate memory for list of callbacks", comment);
337   } else {
338     *str = '\0';
339     strjoin(str, len, keys, n, "', '");
340     INFO("%s ['%s']", comment, str);
341     sfree(str);
342   }
343   sfree(keys);
344 } /* }}} void log_list_callbacks */
345
346 static int create_register_callback(llist_t **list, /* {{{ */
347                                     const char *name, void *callback,
348                                     user_data_t const *ud) {
349   callback_func_t *cf;
350
351   cf = calloc(1, sizeof(*cf));
352   if (cf == NULL) {
353     free_userdata(ud);
354     ERROR("plugin: create_register_callback: calloc failed.");
355     return -1;
356   }
357
358   cf->cf_callback = callback;
359   if (ud == NULL) {
360     cf->cf_udata.data = NULL;
361     cf->cf_udata.free_func = NULL;
362   } else {
363     cf->cf_udata = *ud;
364   }
365
366   cf->cf_ctx = plugin_get_ctx();
367
368   return register_callback(list, name, cf);
369 } /* }}} int create_register_callback */
370
371 static int plugin_unregister(llist_t *list, const char *name) /* {{{ */
372 {
373   llentry_t *e;
374
375   if (list == NULL)
376     return -1;
377
378   e = llist_search(list, name);
379   if (e == NULL)
380     return -1;
381
382   llist_remove(list, e);
383
384   sfree(e->key);
385   destroy_callback(e->value);
386
387   llentry_destroy(e);
388
389   return 0;
390 } /* }}} int plugin_unregister */
391
392 /* plugin_load_file loads the shared object "file" and calls its
393  * "module_register" function. Returns zero on success, non-zero otherwise. */
394 static int plugin_load_file(char const *file, bool global) {
395   int flags = RTLD_NOW;
396   if (global)
397     flags |= RTLD_GLOBAL;
398
399   void *dlh = dlopen(file, flags);
400   if (dlh == NULL) {
401     char errbuf[1024] = "";
402
403     snprintf(errbuf, sizeof(errbuf),
404              "dlopen(\"%s\") failed: %s. "
405              "The most common cause for this problem is missing dependencies. "
406              "Use ldd(1) to check the dependencies of the plugin / shared "
407              "object.",
408              file, dlerror());
409
410     /* This error is printed to STDERR unconditionally. If list_log is NULL,
411      * plugin_log() will also print to STDERR. We avoid duplicate output by
412      * checking that the list of log handlers, list_log, is not NULL. */
413     fprintf(stderr, "ERROR: %s\n", errbuf);
414     if (list_log != NULL) {
415       ERROR("%s", errbuf);
416     }
417
418     return ENOENT;
419   }
420
421   void (*reg_handle)(void) = dlsym(dlh, "module_register");
422   if (reg_handle == NULL) {
423     ERROR("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file,
424           dlerror());
425     dlclose(dlh);
426     return ENOENT;
427   }
428
429   (*reg_handle)();
430   return 0;
431 }
432
433 static void *plugin_read_thread(void __attribute__((unused)) * args) {
434   while (read_loop != 0) {
435     read_func_t *rf;
436     plugin_ctx_t old_ctx;
437     cdtime_t start;
438     cdtime_t now;
439     cdtime_t elapsed;
440     int status;
441     int rf_type;
442     int rc;
443
444     /* Get the read function that needs to be read next.
445      * We don't need to hold "read_lock" for the heap, but we need
446      * to call c_heap_get_root() and pthread_cond_wait() in the
447      * same protected block. */
448     pthread_mutex_lock(&read_lock);
449     rf = c_heap_get_root(read_heap);
450     if (rf == NULL) {
451       pthread_cond_wait(&read_cond, &read_lock);
452       pthread_mutex_unlock(&read_lock);
453       continue;
454     }
455     pthread_mutex_unlock(&read_lock);
456
457     if (rf->rf_interval == 0) {
458       /* this should not happen, because the interval is set
459        * for each plugin when loading it
460        * XXX: issue a warning? */
461       rf->rf_interval = plugin_get_interval();
462       rf->rf_effective_interval = rf->rf_interval;
463
464       rf->rf_next_read = cdtime();
465     }
466
467     /* sleep until this entry is due,
468      * using pthread_cond_timedwait */
469     pthread_mutex_lock(&read_lock);
470     /* In pthread_cond_timedwait, spurious wakeups are possible
471      * (and really happen, at least on NetBSD with > 1 CPU), thus
472      * we need to re-evaluate the condition every time
473      * pthread_cond_timedwait returns. */
474     rc = 0;
475     while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
476       rc = pthread_cond_timedwait(&read_cond, &read_lock,
477                                   &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
478     }
479
480     /* Must hold `read_lock' when accessing `rf->rf_type'. */
481     rf_type = rf->rf_type;
482     pthread_mutex_unlock(&read_lock);
483
484     /* Check if we're supposed to stop.. This may have interrupted
485      * the sleep, too. */
486     if (read_loop == 0) {
487       /* Insert `rf' again, so it can be free'd correctly */
488       c_heap_insert(read_heap, rf);
489       break;
490     }
491
492     /* The entry has been marked for deletion. The linked list
493      * entry has already been removed by `plugin_unregister_read'.
494      * All we have to do here is free the `read_func_t' and
495      * continue. */
496     if (rf_type == RF_REMOVE) {
497       DEBUG("plugin_read_thread: Destroying the `%s' "
498             "callback.",
499             rf->rf_name);
500       sfree(rf->rf_name);
501       destroy_callback((callback_func_t *)rf);
502       rf = NULL;
503       continue;
504     }
505
506     DEBUG("plugin_read_thread: Handling `%s'.", rf->rf_name);
507
508     start = cdtime();
509
510     old_ctx = plugin_set_ctx(rf->rf_ctx);
511
512     if (rf_type == RF_SIMPLE) {
513       int (*callback)(void);
514
515       callback = rf->rf_callback;
516       status = (*callback)();
517     } else {
518       plugin_read_cb callback;
519
520       assert(rf_type == RF_COMPLEX);
521
522       callback = rf->rf_callback;
523       status = (*callback)(&rf->rf_udata);
524     }
525
526     plugin_set_ctx(old_ctx);
527
528     /* If the function signals failure, we will increase the
529      * intervals in which it will be called. */
530     if (status != 0) {
531       rf->rf_effective_interval *= 2;
532       if (rf->rf_effective_interval > max_read_interval)
533         rf->rf_effective_interval = max_read_interval;
534
535       NOTICE("read-function of plugin `%s' failed. "
536              "Will suspend it for %.3f seconds.",
537              rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
538     } else {
539       /* Success: Restore the interval, if it was changed. */
540       rf->rf_effective_interval = rf->rf_interval;
541     }
542
543     /* update the ``next read due'' field */
544     now = cdtime();
545
546     /* calculate the time spent in the read function */
547     elapsed = (now - start);
548
549     if (elapsed > rf->rf_effective_interval)
550       WARNING(
551           "plugin_read_thread: read-function of the `%s' plugin took %.3f "
552           "seconds, which is above its read interval (%.3f seconds). You might "
553           "want to adjust the `Interval' or `ReadThreads' settings.",
554           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
555           CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
556
557     DEBUG("plugin_read_thread: read-function of the `%s' plugin took "
558           "%.6f seconds.",
559           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
560
561     DEBUG("plugin_read_thread: Effective interval of the "
562           "`%s' plugin is %.3f seconds.",
563           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
564
565     /* Calculate the next (absolute) time at which this function
566      * should be called. */
567     rf->rf_next_read += rf->rf_effective_interval;
568
569     /* Check, if `rf_next_read' is in the past. */
570     if (rf->rf_next_read < now) {
571       /* `rf_next_read' is in the past. Insert `now'
572        * so this value doesn't trail off into the
573        * past too much. */
574       rf->rf_next_read = now;
575     }
576
577     DEBUG("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
578           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_next_read));
579
580     /* Re-insert this read function into the heap again. */
581     c_heap_insert(read_heap, rf);
582   } /* while (read_loop) */
583
584   pthread_exit(NULL);
585   return (void *)0;
586 } /* void *plugin_read_thread */
587
588 #ifdef PTHREAD_MAX_NAMELEN_NP
589 #define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
590 #else
591 #define THREAD_NAME_MAX 16
592 #endif
593
594 static void set_thread_name(pthread_t tid, char const *name) {
595 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
596
597   /* glibc limits the length of the name and fails if the passed string
598    * is too long, so we truncate it here. */
599   char n[THREAD_NAME_MAX];
600   if (strlen(name) >= THREAD_NAME_MAX)
601     WARNING("set_thread_name(\"%s\"): name too long", name);
602   sstrncpy(n, name, sizeof(n));
603
604 #if defined(HAVE_PTHREAD_SETNAME_NP)
605   int status = pthread_setname_np(tid, n);
606   if (status != 0) {
607     ERROR("set_thread_name(\"%s\"): %s", n, STRERROR(status));
608   }
609 #else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
610   pthread_set_name_np(tid, n);
611 #endif
612
613 #endif
614 }
615
616 static void start_read_threads(size_t num) /* {{{ */
617 {
618   if (read_threads != NULL)
619     return;
620
621   read_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
622   if (read_threads == NULL) {
623     ERROR("plugin: start_read_threads: calloc failed.");
624     return;
625   }
626
627   read_threads_num = 0;
628   for (size_t i = 0; i < num; i++) {
629     int status = pthread_create(read_threads + read_threads_num,
630                                 /* attr = */ NULL, plugin_read_thread,
631                                 /* arg = */ NULL);
632     if (status != 0) {
633       ERROR("plugin: start_read_threads: pthread_create failed with status %i "
634             "(%s).",
635             status, STRERROR(status));
636       return;
637     }
638
639     char name[THREAD_NAME_MAX];
640     snprintf(name, sizeof(name), "reader#%" PRIsz, read_threads_num);
641     set_thread_name(read_threads[read_threads_num], name);
642
643     read_threads_num++;
644   } /* for (i) */
645 } /* }}} void start_read_threads */
646
647 static void stop_read_threads(void) {
648   if (read_threads == NULL)
649     return;
650
651   INFO("collectd: Stopping %" PRIsz " read threads.", read_threads_num);
652
653   pthread_mutex_lock(&read_lock);
654   read_loop = 0;
655   DEBUG("plugin: stop_read_threads: Signalling `read_cond'");
656   pthread_cond_broadcast(&read_cond);
657   pthread_mutex_unlock(&read_lock);
658
659   for (size_t i = 0; i < read_threads_num; i++) {
660     if (pthread_join(read_threads[i], NULL) != 0) {
661       ERROR("plugin: stop_read_threads: pthread_join failed.");
662     }
663     read_threads[i] = (pthread_t)0;
664   }
665   sfree(read_threads);
666   read_threads_num = 0;
667 } /* void stop_read_threads */
668
669 static void plugin_value_list_free(value_list_t *vl) /* {{{ */
670 {
671   if (vl == NULL)
672     return;
673
674   meta_data_destroy(vl->meta);
675   sfree(vl->values);
676   sfree(vl);
677 } /* }}} void plugin_value_list_free */
678
679 static value_list_t *
680 plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */
681 {
682   value_list_t *vl;
683
684   if (vl_orig == NULL)
685     return NULL;
686
687   vl = malloc(sizeof(*vl));
688   if (vl == NULL)
689     return NULL;
690   memcpy(vl, vl_orig, sizeof(*vl));
691
692   if (vl->host[0] == 0)
693     sstrncpy(vl->host, hostname_g, sizeof(vl->host));
694
695   vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
696   if (vl->values == NULL) {
697     plugin_value_list_free(vl);
698     return NULL;
699   }
700   memcpy(vl->values, vl_orig->values,
701          vl_orig->values_len * sizeof(*vl->values));
702
703   vl->meta = meta_data_clone(vl->meta);
704   if ((vl_orig->meta != NULL) && (vl->meta == NULL)) {
705     plugin_value_list_free(vl);
706     return NULL;
707   }
708
709   if (vl->time == 0)
710     vl->time = cdtime();
711
712   /* Fill in the interval from the thread context, if it is zero. */
713   if (vl->interval == 0) {
714     plugin_ctx_t ctx = plugin_get_ctx();
715
716     if (ctx.interval != 0)
717       vl->interval = ctx.interval;
718     else {
719       char name[6 * DATA_MAX_NAME_LEN];
720       FORMAT_VL(name, sizeof(name), vl);
721       ERROR("plugin_value_list_clone: Unable to determine "
722             "interval from context for "
723             "value list \"%s\". "
724             "This indicates a broken plugin. "
725             "Please report this problem to the "
726             "collectd mailing list or at "
727             "<http://collectd.org/bugs/>.",
728             name);
729       vl->interval = cf_get_default_interval();
730     }
731   }
732
733   return vl;
734 } /* }}} value_list_t *plugin_value_list_clone */
735
736 static int plugin_write_enqueue(value_list_t const *vl) /* {{{ */
737 {
738   write_queue_t *q;
739
740   q = malloc(sizeof(*q));
741   if (q == NULL)
742     return ENOMEM;
743   q->next = NULL;
744
745   q->vl = plugin_value_list_clone(vl);
746   if (q->vl == NULL) {
747     sfree(q);
748     return ENOMEM;
749   }
750
751   /* Store context of caller (read plugin); otherwise, it would not be
752    * available to the write plugins when actually dispatching the
753    * value-list later on. */
754   q->ctx = plugin_get_ctx();
755
756   pthread_mutex_lock(&write_lock);
757
758   if (write_queue_tail == NULL) {
759     write_queue_head = q;
760     write_queue_tail = q;
761     write_queue_length = 1;
762   } else {
763     write_queue_tail->next = q;
764     write_queue_tail = q;
765     write_queue_length += 1;
766   }
767
768   pthread_cond_signal(&write_cond);
769   pthread_mutex_unlock(&write_lock);
770
771   return 0;
772 } /* }}} int plugin_write_enqueue */
773
774 static value_list_t *plugin_write_dequeue(void) /* {{{ */
775 {
776   write_queue_t *q;
777   value_list_t *vl;
778
779   pthread_mutex_lock(&write_lock);
780
781   while (write_loop && (write_queue_head == NULL))
782     pthread_cond_wait(&write_cond, &write_lock);
783
784   if (write_queue_head == NULL) {
785     pthread_mutex_unlock(&write_lock);
786     return NULL;
787   }
788
789   q = write_queue_head;
790   write_queue_head = q->next;
791   write_queue_length -= 1;
792   if (write_queue_head == NULL) {
793     write_queue_tail = NULL;
794     assert(0 == write_queue_length);
795   }
796
797   pthread_mutex_unlock(&write_lock);
798
799   (void)plugin_set_ctx(q->ctx);
800
801   vl = q->vl;
802   sfree(q);
803   return vl;
804 } /* }}} value_list_t *plugin_write_dequeue */
805
806 static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */
807 {
808   while (write_loop) {
809     value_list_t *vl = plugin_write_dequeue();
810     if (vl == NULL)
811       continue;
812
813     plugin_dispatch_values_internal(vl);
814
815     plugin_value_list_free(vl);
816   }
817
818   pthread_exit(NULL);
819   return (void *)0;
820 } /* }}} void *plugin_write_thread */
821
822 static void start_write_threads(size_t num) /* {{{ */
823 {
824   if (write_threads != NULL)
825     return;
826
827   write_threads = (pthread_t *)calloc(num, sizeof(pthread_t));
828   if (write_threads == NULL) {
829     ERROR("plugin: start_write_threads: calloc failed.");
830     return;
831   }
832
833   write_threads_num = 0;
834   for (size_t i = 0; i < num; i++) {
835     int status = pthread_create(write_threads + write_threads_num,
836                                 /* attr = */ NULL, plugin_write_thread,
837                                 /* arg = */ NULL);
838     if (status != 0) {
839       ERROR("plugin: start_write_threads: pthread_create failed with status %i "
840             "(%s).",
841             status, STRERROR(status));
842       return;
843     }
844
845     char name[THREAD_NAME_MAX];
846     snprintf(name, sizeof(name), "writer#%" PRIsz, write_threads_num);
847     set_thread_name(write_threads[write_threads_num], name);
848
849     write_threads_num++;
850   } /* for (i) */
851 } /* }}} void start_write_threads */
852
853 static void stop_write_threads(void) /* {{{ */
854 {
855   write_queue_t *q;
856   size_t i;
857
858   if (write_threads == NULL)
859     return;
860
861   INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num);
862
863   pthread_mutex_lock(&write_lock);
864   write_loop = false;
865   DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
866   pthread_cond_broadcast(&write_cond);
867   pthread_mutex_unlock(&write_lock);
868
869   for (i = 0; i < write_threads_num; i++) {
870     if (pthread_join(write_threads[i], NULL) != 0) {
871       ERROR("plugin: stop_write_threads: pthread_join failed.");
872     }
873     write_threads[i] = (pthread_t)0;
874   }
875   sfree(write_threads);
876   write_threads_num = 0;
877
878   pthread_mutex_lock(&write_lock);
879   i = 0;
880   for (q = write_queue_head; q != NULL;) {
881     write_queue_t *q1 = q;
882     plugin_value_list_free(q->vl);
883     q = q->next;
884     sfree(q1);
885     i++;
886   }
887   write_queue_head = NULL;
888   write_queue_tail = NULL;
889   write_queue_length = 0;
890   pthread_mutex_unlock(&write_lock);
891
892   if (i > 0) {
893     WARNING("plugin: %" PRIsz " value list%s left after shutting down "
894             "the write threads.",
895             i, (i == 1) ? " was" : "s were");
896   }
897 } /* }}} void stop_write_threads */
898
899 /*
900  * Public functions
901  */
902 void plugin_set_dir(const char *dir) {
903   sfree(plugindir);
904
905   if (dir == NULL) {
906     plugindir = NULL;
907     return;
908   }
909
910   plugindir = strdup(dir);
911   if (plugindir == NULL)
912     ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
913 }
914
915 static bool plugin_is_loaded(char const *name) {
916   int status;
917
918   if (plugins_loaded == NULL)
919     plugins_loaded =
920         c_avl_create((int (*)(const void *, const void *))strcasecmp);
921   assert(plugins_loaded != NULL);
922
923   status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
924   return status == 0;
925 }
926
927 static int plugin_mark_loaded(char const *name) {
928   char *name_copy;
929   int status;
930
931   name_copy = strdup(name);
932   if (name_copy == NULL)
933     return ENOMEM;
934
935   status = c_avl_insert(plugins_loaded,
936                         /* key = */ name_copy, /* value = */ NULL);
937   return status;
938 }
939
940 static void plugin_free_loaded(void) {
941   void *key;
942   void *value;
943
944   if (plugins_loaded == NULL)
945     return;
946
947   while (c_avl_pick(plugins_loaded, &key, &value) == 0) {
948     sfree(key);
949     assert(value == NULL);
950   }
951
952   c_avl_destroy(plugins_loaded);
953   plugins_loaded = NULL;
954 }
955
956 #define BUFSIZE 512
957 int plugin_load(char const *plugin_name, bool global) {
958   DIR *dh;
959   const char *dir;
960   char filename[BUFSIZE] = "";
961   char typename[BUFSIZE];
962   int ret;
963   struct stat statbuf;
964   struct dirent *de;
965   int status;
966
967   if (plugin_name == NULL)
968     return EINVAL;
969
970   /* Check if plugin is already loaded and don't do anything in this
971    * case. */
972   if (plugin_is_loaded(plugin_name))
973     return 0;
974
975   dir = plugin_get_dir();
976   ret = 1;
977
978   /*
979    * XXX: Magic at work:
980    *
981    * Some of the language bindings, for example the Python and Perl
982    * plugins, need to be able to export symbols to the scripts they run.
983    * For this to happen, the "Globals" flag needs to be set.
984    * Unfortunately, this technical detail is hard to explain to the
985    * average user and she shouldn't have to worry about this, ideally.
986    * So in order to save everyone's sanity use a different default for a
987    * handful of special plugins. --octo
988    */
989   if ((strcasecmp("perl", plugin_name) == 0) ||
990       (strcasecmp("python", plugin_name) == 0))
991     global = true;
992
993   /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
994    * type when matching the filename */
995   status = snprintf(typename, sizeof(typename), "%s.so", plugin_name);
996   if ((status < 0) || ((size_t)status >= sizeof(typename))) {
997     WARNING("plugin_load: Filename too long: \"%s.so\"", plugin_name);
998     return -1;
999   }
1000
1001   if ((dh = opendir(dir)) == NULL) {
1002     ERROR("plugin_load: opendir (%s) failed: %s", dir, STRERRNO);
1003     return -1;
1004   }
1005
1006   while ((de = readdir(dh)) != NULL) {
1007     if (strcasecmp(de->d_name, typename))
1008       continue;
1009
1010     status = snprintf(filename, sizeof(filename), "%s/%s", dir, de->d_name);
1011     if ((status < 0) || ((size_t)status >= sizeof(filename))) {
1012       WARNING("plugin_load: Filename too long: \"%s/%s\"", dir, de->d_name);
1013       continue;
1014     }
1015
1016     if (lstat(filename, &statbuf) == -1) {
1017       WARNING("plugin_load: stat (\"%s\") failed: %s", filename, STRERRNO);
1018       continue;
1019     } else if (!S_ISREG(statbuf.st_mode)) {
1020       /* don't follow symlinks */
1021       WARNING("plugin_load: %s is not a regular file.", filename);
1022       continue;
1023     }
1024
1025     status = plugin_load_file(filename, global);
1026     if (status == 0) {
1027       /* success */
1028       plugin_mark_loaded(plugin_name);
1029       ret = 0;
1030       INFO("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1031       break;
1032     } else {
1033       ERROR("plugin_load: Load plugin \"%s\" failed with "
1034             "status %i.",
1035             plugin_name, status);
1036     }
1037   }
1038
1039   closedir(dh);
1040
1041   if (filename[0] == 0)
1042     ERROR("plugin_load: Could not find plugin \"%s\" in %s", plugin_name, dir);
1043
1044   return ret;
1045 }
1046
1047 /*
1048  * The `register_*' functions follow
1049  */
1050 int plugin_register_config(const char *name,
1051                            int (*callback)(const char *key, const char *val),
1052                            const char **keys, int keys_num) {
1053   cf_register(name, callback, keys, keys_num);
1054   return 0;
1055 } /* int plugin_register_config */
1056
1057 int plugin_register_complex_config(const char *type,
1058                                    int (*callback)(oconfig_item_t *)) {
1059   return cf_register_complex(type, callback);
1060 } /* int plugin_register_complex_config */
1061
1062 int plugin_register_init(const char *name, int (*callback)(void)) {
1063   return create_register_callback(&list_init, name, (void *)callback, NULL);
1064 } /* plugin_register_init */
1065
1066 static int plugin_compare_read_func(const void *arg0, const void *arg1) {
1067   const read_func_t *rf0;
1068   const read_func_t *rf1;
1069
1070   rf0 = arg0;
1071   rf1 = arg1;
1072
1073   if (rf0->rf_next_read < rf1->rf_next_read)
1074     return -1;
1075   else if (rf0->rf_next_read > rf1->rf_next_read)
1076     return 1;
1077   else
1078     return 0;
1079 } /* int plugin_compare_read_func */
1080
1081 /* Add a read function to both, the heap and a linked list. The linked list if
1082  * used to look-up read functions, especially for the remove function. The heap
1083  * is used to determine which plugin to read next. */
1084 static int plugin_insert_read(read_func_t *rf) {
1085   int status;
1086   llentry_t *le;
1087
1088   rf->rf_next_read = cdtime();
1089   rf->rf_effective_interval = rf->rf_interval;
1090
1091   pthread_mutex_lock(&read_lock);
1092
1093   if (read_list == NULL) {
1094     read_list = llist_create();
1095     if (read_list == NULL) {
1096       pthread_mutex_unlock(&read_lock);
1097       ERROR("plugin_insert_read: read_list failed.");
1098       return -1;
1099     }
1100   }
1101
1102   if (read_heap == NULL) {
1103     read_heap = c_heap_create(plugin_compare_read_func);
1104     if (read_heap == NULL) {
1105       pthread_mutex_unlock(&read_lock);
1106       ERROR("plugin_insert_read: c_heap_create failed.");
1107       return -1;
1108     }
1109   }
1110
1111   le = llist_search(read_list, rf->rf_name);
1112   if (le != NULL) {
1113     pthread_mutex_unlock(&read_lock);
1114     WARNING("The read function \"%s\" is already registered. "
1115             "Check for duplicates in your configuration!",
1116             rf->rf_name);
1117     return EINVAL;
1118   }
1119
1120   le = llentry_create(rf->rf_name, rf);
1121   if (le == NULL) {
1122     pthread_mutex_unlock(&read_lock);
1123     ERROR("plugin_insert_read: llentry_create failed.");
1124     return -1;
1125   }
1126
1127   status = c_heap_insert(read_heap, rf);
1128   if (status != 0) {
1129     pthread_mutex_unlock(&read_lock);
1130     ERROR("plugin_insert_read: c_heap_insert failed.");
1131     llentry_destroy(le);
1132     return -1;
1133   }
1134
1135   /* This does not fail. */
1136   llist_append(read_list, le);
1137
1138   /* Wake up all the read threads. */
1139   pthread_cond_broadcast(&read_cond);
1140   pthread_mutex_unlock(&read_lock);
1141   return 0;
1142 } /* int plugin_insert_read */
1143
1144 int plugin_register_read(const char *name, int (*callback)(void)) {
1145   read_func_t *rf;
1146   int status;
1147
1148   rf = calloc(1, sizeof(*rf));
1149   if (rf == NULL) {
1150     ERROR("plugin_register_read: calloc failed.");
1151     return ENOMEM;
1152   }
1153
1154   rf->rf_callback = (void *)callback;
1155   rf->rf_udata.data = NULL;
1156   rf->rf_udata.free_func = NULL;
1157   rf->rf_ctx = plugin_get_ctx();
1158   rf->rf_group[0] = '\0';
1159   rf->rf_name = strdup(name);
1160   rf->rf_type = RF_SIMPLE;
1161   rf->rf_interval = plugin_get_interval();
1162
1163   status = plugin_insert_read(rf);
1164   if (status != 0) {
1165     sfree(rf->rf_name);
1166     sfree(rf);
1167   }
1168
1169   return status;
1170 } /* int plugin_register_read */
1171
1172 int plugin_register_complex_read(const char *group, const char *name,
1173                                  plugin_read_cb callback, cdtime_t interval,
1174                                  user_data_t const *user_data) {
1175   read_func_t *rf;
1176   int status;
1177
1178   rf = calloc(1, sizeof(*rf));
1179   if (rf == NULL) {
1180     free_userdata(user_data);
1181     ERROR("plugin_register_complex_read: calloc failed.");
1182     return ENOMEM;
1183   }
1184
1185   rf->rf_callback = (void *)callback;
1186   if (group != NULL)
1187     sstrncpy(rf->rf_group, group, sizeof(rf->rf_group));
1188   else
1189     rf->rf_group[0] = '\0';
1190   rf->rf_name = strdup(name);
1191   rf->rf_type = RF_COMPLEX;
1192   rf->rf_interval = (interval != 0) ? interval : plugin_get_interval();
1193
1194   /* Set user data */
1195   if (user_data == NULL) {
1196     rf->rf_udata.data = NULL;
1197     rf->rf_udata.free_func = NULL;
1198   } else {
1199     rf->rf_udata = *user_data;
1200   }
1201
1202   rf->rf_ctx = plugin_get_ctx();
1203
1204   status = plugin_insert_read(rf);
1205   if (status != 0) {
1206     free_userdata(&rf->rf_udata);
1207     sfree(rf->rf_name);
1208     sfree(rf);
1209   }
1210
1211   return status;
1212 } /* int plugin_register_complex_read */
1213
1214 int plugin_register_write(const char *name, plugin_write_cb callback,
1215                           user_data_t const *ud) {
1216   return create_register_callback(&list_write, name, (void *)callback, ud);
1217 } /* int plugin_register_write */
1218
1219 static int plugin_flush_timeout_callback(user_data_t *ud) {
1220   flush_callback_t *cb = ud->data;
1221
1222   return plugin_flush(cb->name, cb->timeout, NULL);
1223 } /* static int plugin_flush_callback */
1224
1225 static void plugin_flush_timeout_callback_free(void *data) {
1226   flush_callback_t *cb = data;
1227
1228   if (cb == NULL)
1229     return;
1230
1231   sfree(cb->name);
1232   sfree(cb);
1233 } /* static void plugin_flush_callback_free */
1234
1235 static char *plugin_flush_callback_name(const char *name) {
1236   const char *flush_prefix = "flush/";
1237   size_t prefix_size;
1238   char *flush_name;
1239   size_t name_size;
1240
1241   prefix_size = strlen(flush_prefix);
1242   name_size = strlen(name);
1243
1244   flush_name = malloc(name_size + prefix_size + 1);
1245   if (flush_name == NULL) {
1246     ERROR("plugin_flush_callback_name: malloc failed.");
1247     return NULL;
1248   }
1249
1250   sstrncpy(flush_name, flush_prefix, prefix_size + 1);
1251   sstrncpy(flush_name + prefix_size, name, name_size + 1);
1252
1253   return flush_name;
1254 } /* static char *plugin_flush_callback_name */
1255
1256 int plugin_register_flush(const char *name, plugin_flush_cb callback,
1257                           user_data_t const *ud) {
1258   int status;
1259   plugin_ctx_t ctx = plugin_get_ctx();
1260
1261   status = create_register_callback(&list_flush, name, (void *)callback, ud);
1262   if (status != 0)
1263     return status;
1264
1265   if (ctx.flush_interval != 0) {
1266     char *flush_name;
1267     flush_callback_t *cb;
1268
1269     flush_name = plugin_flush_callback_name(name);
1270     if (flush_name == NULL)
1271       return -1;
1272
1273     cb = malloc(sizeof(*cb));
1274     if (cb == NULL) {
1275       ERROR("plugin_register_flush: malloc failed.");
1276       sfree(flush_name);
1277       return -1;
1278     }
1279
1280     cb->name = strdup(name);
1281     if (cb->name == NULL) {
1282       ERROR("plugin_register_flush: strdup failed.");
1283       sfree(cb);
1284       sfree(flush_name);
1285       return -1;
1286     }
1287     cb->timeout = ctx.flush_timeout;
1288
1289     status = plugin_register_complex_read(
1290         /* group     = */ "flush",
1291         /* name      = */ flush_name,
1292         /* callback  = */ plugin_flush_timeout_callback,
1293         /* interval  = */ ctx.flush_interval,
1294         /* user data = */ &(user_data_t){
1295             .data = cb, .free_func = plugin_flush_timeout_callback_free,
1296         });
1297
1298     sfree(flush_name);
1299     return status;
1300   }
1301
1302   return 0;
1303 } /* int plugin_register_flush */
1304
1305 int plugin_register_missing(const char *name, plugin_missing_cb callback,
1306                             user_data_t const *ud) {
1307   return create_register_callback(&list_missing, name, (void *)callback, ud);
1308 } /* int plugin_register_missing */
1309
1310 int plugin_register_shutdown(const char *name, int (*callback)(void)) {
1311   return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
1312 } /* int plugin_register_shutdown */
1313
1314 static void plugin_free_data_sets(void) {
1315   void *key;
1316   void *value;
1317
1318   if (data_sets == NULL)
1319     return;
1320
1321   while (c_avl_pick(data_sets, &key, &value) == 0) {
1322     data_set_t *ds = value;
1323     /* key is a pointer to ds->type */
1324
1325     sfree(ds->ds);
1326     sfree(ds);
1327   }
1328
1329   c_avl_destroy(data_sets);
1330   data_sets = NULL;
1331 } /* void plugin_free_data_sets */
1332
1333 int plugin_register_data_set(const data_set_t *ds) {
1334   data_set_t *ds_copy;
1335
1336   if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) {
1337     NOTICE("Replacing DS `%s' with another version.", ds->type);
1338     plugin_unregister_data_set(ds->type);
1339   } else if (data_sets == NULL) {
1340     data_sets = c_avl_create((int (*)(const void *, const void *))strcmp);
1341     if (data_sets == NULL)
1342       return -1;
1343   }
1344
1345   ds_copy = malloc(sizeof(*ds_copy));
1346   if (ds_copy == NULL)
1347     return -1;
1348   memcpy(ds_copy, ds, sizeof(data_set_t));
1349
1350   ds_copy->ds = malloc(sizeof(*ds_copy->ds) * ds->ds_num);
1351   if (ds_copy->ds == NULL) {
1352     sfree(ds_copy);
1353     return -1;
1354   }
1355
1356   for (size_t i = 0; i < ds->ds_num; i++)
1357     memcpy(ds_copy->ds + i, ds->ds + i, sizeof(data_source_t));
1358
1359   return c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy);
1360 } /* int plugin_register_data_set */
1361
1362 int plugin_register_log(const char *name, plugin_log_cb callback,
1363                         user_data_t const *ud) {
1364   return create_register_callback(&list_log, name, (void *)callback, ud);
1365 } /* int plugin_register_log */
1366
1367 int plugin_register_notification(const char *name,
1368                                  plugin_notification_cb callback,
1369                                  user_data_t const *ud) {
1370   return create_register_callback(&list_notification, name, (void *)callback,
1371                                   ud);
1372 } /* int plugin_register_log */
1373
1374 int plugin_unregister_config(const char *name) {
1375   cf_unregister(name);
1376   return 0;
1377 } /* int plugin_unregister_config */
1378
1379 int plugin_unregister_complex_config(const char *name) {
1380   cf_unregister_complex(name);
1381   return 0;
1382 } /* int plugin_unregister_complex_config */
1383
1384 int plugin_unregister_init(const char *name) {
1385   return plugin_unregister(list_init, name);
1386 }
1387
1388 int plugin_unregister_read(const char *name) /* {{{ */
1389 {
1390   llentry_t *le;
1391   read_func_t *rf;
1392
1393   if (name == NULL)
1394     return -ENOENT;
1395
1396   pthread_mutex_lock(&read_lock);
1397
1398   if (read_list == NULL) {
1399     pthread_mutex_unlock(&read_lock);
1400     return -ENOENT;
1401   }
1402
1403   le = llist_search(read_list, name);
1404   if (le == NULL) {
1405     pthread_mutex_unlock(&read_lock);
1406     WARNING("plugin_unregister_read: No such read function: %s", name);
1407     return -ENOENT;
1408   }
1409
1410   llist_remove(read_list, le);
1411
1412   rf = le->value;
1413   assert(rf != NULL);
1414   rf->rf_type = RF_REMOVE;
1415
1416   pthread_mutex_unlock(&read_lock);
1417
1418   llentry_destroy(le);
1419
1420   DEBUG("plugin_unregister_read: Marked `%s' for removal.", name);
1421
1422   return 0;
1423 } /* }}} int plugin_unregister_read */
1424
1425 void plugin_log_available_writers(void) {
1426   log_list_callbacks(&list_write, "Available write targets:");
1427 }
1428
1429 static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */
1430 {
1431   read_func_t *rf = e->value;
1432   char *group = ud;
1433
1434   return strcmp(rf->rf_group, (const char *)group);
1435 } /* }}} int compare_read_func_group */
1436
1437 int plugin_unregister_read_group(const char *group) /* {{{ */
1438 {
1439   llentry_t *le;
1440   read_func_t *rf;
1441
1442   int found = 0;
1443
1444   if (group == NULL)
1445     return -ENOENT;
1446
1447   pthread_mutex_lock(&read_lock);
1448
1449   if (read_list == NULL) {
1450     pthread_mutex_unlock(&read_lock);
1451     return -ENOENT;
1452   }
1453
1454   while (42) {
1455     le = llist_search_custom(read_list, compare_read_func_group, (void *)group);
1456
1457     if (le == NULL)
1458       break;
1459
1460     ++found;
1461
1462     llist_remove(read_list, le);
1463
1464     rf = le->value;
1465     assert(rf != NULL);
1466     rf->rf_type = RF_REMOVE;
1467
1468     llentry_destroy(le);
1469
1470     DEBUG("plugin_unregister_read_group: "
1471           "Marked `%s' (group `%s') for removal.",
1472           rf->rf_name, group);
1473   }
1474
1475   pthread_mutex_unlock(&read_lock);
1476
1477   if (found == 0) {
1478     WARNING("plugin_unregister_read_group: No such "
1479             "group of read function: %s",
1480             group);
1481     return -ENOENT;
1482   }
1483
1484   return 0;
1485 } /* }}} int plugin_unregister_read_group */
1486
1487 int plugin_unregister_write(const char *name) {
1488   return plugin_unregister(list_write, name);
1489 }
1490
1491 int plugin_unregister_flush(const char *name) {
1492   plugin_ctx_t ctx = plugin_get_ctx();
1493
1494   if (ctx.flush_interval != 0) {
1495     char *flush_name;
1496
1497     flush_name = plugin_flush_callback_name(name);
1498     if (flush_name != NULL) {
1499       plugin_unregister_read(flush_name);
1500       sfree(flush_name);
1501     }
1502   }
1503
1504   return plugin_unregister(list_flush, name);
1505 }
1506
1507 int plugin_unregister_missing(const char *name) {
1508   return plugin_unregister(list_missing, name);
1509 }
1510
1511 int plugin_unregister_shutdown(const char *name) {
1512   return plugin_unregister(list_shutdown, name);
1513 }
1514
1515 int plugin_unregister_data_set(const char *name) {
1516   data_set_t *ds;
1517
1518   if (data_sets == NULL)
1519     return -1;
1520
1521   if (c_avl_remove(data_sets, name, NULL, (void *)&ds) != 0)
1522     return -1;
1523
1524   sfree(ds->ds);
1525   sfree(ds);
1526
1527   return 0;
1528 } /* int plugin_unregister_data_set */
1529
1530 int plugin_unregister_log(const char *name) {
1531   return plugin_unregister(list_log, name);
1532 }
1533
1534 int plugin_unregister_notification(const char *name) {
1535   return plugin_unregister(list_notification, name);
1536 }
1537
1538 int plugin_init_all(void) {
1539   char const *chain_name;
1540   llentry_t *le;
1541   int status;
1542   int ret = 0;
1543
1544   /* Init the value cache */
1545   uc_init();
1546
1547   if (IS_TRUE(global_option_get("CollectInternalStats"))) {
1548     record_statistics = true;
1549     plugin_register_read("collectd", plugin_update_internal_statistics);
1550   }
1551
1552   chain_name = global_option_get("PreCacheChain");
1553   pre_cache_chain = fc_chain_get_by_name(chain_name);
1554
1555   chain_name = global_option_get("PostCacheChain");
1556   post_cache_chain = fc_chain_get_by_name(chain_name);
1557
1558   write_limit_high = global_option_get_long("WriteQueueLimitHigh",
1559                                             /* default = */ 0);
1560   if (write_limit_high < 0) {
1561     ERROR("WriteQueueLimitHigh must be positive or zero.");
1562     write_limit_high = 0;
1563   }
1564
1565   write_limit_low =
1566       global_option_get_long("WriteQueueLimitLow",
1567                              /* default = */ write_limit_high / 2);
1568   if (write_limit_low < 0) {
1569     ERROR("WriteQueueLimitLow must be positive or zero.");
1570     write_limit_low = write_limit_high / 2;
1571   } else if (write_limit_low > write_limit_high) {
1572     ERROR("WriteQueueLimitLow must not be larger than "
1573           "WriteQueueLimitHigh.");
1574     write_limit_low = write_limit_high;
1575   }
1576
1577   write_threads_num = global_option_get_long("WriteThreads",
1578                                              /* default = */ 5);
1579   if (write_threads_num < 1) {
1580     ERROR("WriteThreads must be positive.");
1581     write_threads_num = 5;
1582   }
1583
1584   if ((list_init == NULL) && (read_heap == NULL))
1585     return ret;
1586
1587   /* Calling all init callbacks before checking if read callbacks
1588    * are available allows the init callbacks to register the read
1589    * callback. */
1590   le = llist_head(list_init);
1591   while (le != NULL) {
1592     callback_func_t *cf;
1593     plugin_init_cb callback;
1594     plugin_ctx_t old_ctx;
1595
1596     cf = le->value;
1597     old_ctx = plugin_set_ctx(cf->cf_ctx);
1598     callback = cf->cf_callback;
1599     status = (*callback)();
1600     plugin_set_ctx(old_ctx);
1601
1602     if (status != 0) {
1603       ERROR("Initialization of plugin `%s' "
1604             "failed with status %i. "
1605             "Plugin will be unloaded.",
1606             le->key, status);
1607       /* Plugins that register read callbacks from the init
1608        * callback should take care of appropriate error
1609        * handling themselves. */
1610       /* FIXME: Unload _all_ functions */
1611       plugin_unregister_read(le->key);
1612       ret = -1;
1613     }
1614
1615     le = le->next;
1616   }
1617
1618   start_write_threads((size_t)write_threads_num);
1619
1620   max_read_interval =
1621       global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL);
1622
1623   /* Start read-threads */
1624   if (read_heap != NULL) {
1625     const char *rt;
1626     int num;
1627
1628     rt = global_option_get("ReadThreads");
1629     num = atoi(rt);
1630     if (num != -1)
1631       start_read_threads((num > 0) ? ((size_t)num) : 5);
1632   }
1633   return ret;
1634 } /* void plugin_init_all */
1635
1636 /* TODO: Rename this function. */
1637 void plugin_read_all(void) {
1638   uc_check_timeout();
1639
1640   return;
1641 } /* void plugin_read_all */
1642
1643 /* Read function called when the `-T' command line argument is given. */
1644 int plugin_read_all_once(void) {
1645   int status;
1646   int return_status = 0;
1647
1648   if (read_heap == NULL) {
1649     NOTICE("No read-functions are registered.");
1650     return 0;
1651   }
1652
1653   while (42) {
1654     read_func_t *rf;
1655     plugin_ctx_t old_ctx;
1656
1657     rf = c_heap_get_root(read_heap);
1658     if (rf == NULL)
1659       break;
1660
1661     old_ctx = plugin_set_ctx(rf->rf_ctx);
1662
1663     if (rf->rf_type == RF_SIMPLE) {
1664       int (*callback)(void);
1665
1666       callback = rf->rf_callback;
1667       status = (*callback)();
1668     } else {
1669       plugin_read_cb callback;
1670
1671       callback = rf->rf_callback;
1672       status = (*callback)(&rf->rf_udata);
1673     }
1674
1675     plugin_set_ctx(old_ctx);
1676
1677     if (status != 0) {
1678       NOTICE("read-function of plugin `%s' failed.", rf->rf_name);
1679       return_status = -1;
1680     }
1681
1682     sfree(rf->rf_name);
1683     destroy_callback((void *)rf);
1684   }
1685
1686   return return_status;
1687 } /* int plugin_read_all_once */
1688
1689 int plugin_write(const char *plugin, /* {{{ */
1690                  const data_set_t *ds, const value_list_t *vl) {
1691   llentry_t *le;
1692   int status;
1693
1694   if (vl == NULL)
1695     return EINVAL;
1696
1697   if (list_write == NULL)
1698     return ENOENT;
1699
1700   if (ds == NULL) {
1701     ds = plugin_get_ds(vl->type);
1702     if (ds == NULL) {
1703       ERROR("plugin_write: Unable to lookup type `%s'.", vl->type);
1704       return ENOENT;
1705     }
1706   }
1707
1708   if (plugin == NULL) {
1709     int success = 0;
1710     int failure = 0;
1711
1712     le = llist_head(list_write);
1713     while (le != NULL) {
1714       callback_func_t *cf = le->value;
1715       plugin_write_cb callback;
1716
1717       /* do not switch plugin context; rather keep the context (interval)
1718        * information of the calling read plugin */
1719
1720       DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1721       callback = cf->cf_callback;
1722       status = (*callback)(ds, vl, &cf->cf_udata);
1723       if (status != 0)
1724         failure++;
1725       else
1726         success++;
1727
1728       le = le->next;
1729     }
1730
1731     if ((success == 0) && (failure != 0))
1732       status = -1;
1733     else
1734       status = 0;
1735   } else /* plugin != NULL */
1736   {
1737     callback_func_t *cf;
1738     plugin_write_cb callback;
1739
1740     le = llist_head(list_write);
1741     while (le != NULL) {
1742       if (strcasecmp(plugin, le->key) == 0)
1743         break;
1744
1745       le = le->next;
1746     }
1747
1748     if (le == NULL)
1749       return ENOENT;
1750
1751     cf = le->value;
1752
1753     /* do not switch plugin context; rather keep the context (interval)
1754      * information of the calling read plugin */
1755
1756     DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1757     callback = cf->cf_callback;
1758     status = (*callback)(ds, vl, &cf->cf_udata);
1759   }
1760
1761   return status;
1762 } /* }}} int plugin_write */
1763
1764 int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier) {
1765   llentry_t *le;
1766
1767   if (list_flush == NULL)
1768     return 0;
1769
1770   le = llist_head(list_flush);
1771   while (le != NULL) {
1772     callback_func_t *cf;
1773     plugin_flush_cb callback;
1774     plugin_ctx_t old_ctx;
1775
1776     if ((plugin != NULL) && (strcmp(plugin, le->key) != 0)) {
1777       le = le->next;
1778       continue;
1779     }
1780
1781     cf = le->value;
1782     old_ctx = plugin_set_ctx(cf->cf_ctx);
1783     callback = cf->cf_callback;
1784
1785     (*callback)(timeout, identifier, &cf->cf_udata);
1786
1787     plugin_set_ctx(old_ctx);
1788
1789     le = le->next;
1790   }
1791   return 0;
1792 } /* int plugin_flush */
1793
1794 int plugin_shutdown_all(void) {
1795   llentry_t *le;
1796   int ret = 0; // Assume success.
1797
1798   destroy_all_callbacks(&list_init);
1799
1800   stop_read_threads();
1801
1802   pthread_mutex_lock(&read_lock);
1803   llist_destroy(read_list);
1804   read_list = NULL;
1805   pthread_mutex_unlock(&read_lock);
1806
1807   destroy_read_heap();
1808
1809   /* blocks until all write threads have shut down. */
1810   stop_write_threads();
1811
1812   /* ask all plugins to write out the state they kept. */
1813   plugin_flush(/* plugin = */ NULL,
1814                /* timeout = */ 0,
1815                /* identifier = */ NULL);
1816
1817   le = NULL;
1818   if (list_shutdown != NULL)
1819     le = llist_head(list_shutdown);
1820
1821   while (le != NULL) {
1822     callback_func_t *cf;
1823     plugin_shutdown_cb callback;
1824     plugin_ctx_t old_ctx;
1825
1826     cf = le->value;
1827     old_ctx = plugin_set_ctx(cf->cf_ctx);
1828     callback = cf->cf_callback;
1829
1830     /* Advance the pointer before calling the callback allows
1831      * shutdown functions to unregister themselves. If done the
1832      * other way around the memory `le' points to will be freed
1833      * after callback returns. */
1834     le = le->next;
1835
1836     if ((*callback)() != 0)
1837       ret = -1;
1838
1839     plugin_set_ctx(old_ctx);
1840   }
1841
1842   /* Write plugins which use the `user_data' pointer usually need the
1843    * same data available to the flush callback. If this is the case, set
1844    * the free_function to NULL when registering the flush callback and to
1845    * the real free function when registering the write callback. This way
1846    * the data isn't freed twice. */
1847   destroy_all_callbacks(&list_flush);
1848   destroy_all_callbacks(&list_missing);
1849   destroy_all_callbacks(&list_write);
1850
1851   destroy_all_callbacks(&list_notification);
1852   destroy_all_callbacks(&list_shutdown);
1853   destroy_all_callbacks(&list_log);
1854
1855   plugin_free_loaded();
1856   plugin_free_data_sets();
1857   return ret;
1858 } /* void plugin_shutdown_all */
1859
1860 int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
1861 {
1862   if (list_missing == NULL)
1863     return 0;
1864
1865   llentry_t *le = llist_head(list_missing);
1866   while (le != NULL) {
1867     callback_func_t *cf = le->value;
1868     plugin_ctx_t old_ctx = plugin_set_ctx(cf->cf_ctx);
1869     plugin_missing_cb callback = cf->cf_callback;
1870
1871     int status = (*callback)(vl, &cf->cf_udata);
1872     plugin_set_ctx(old_ctx);
1873     if (status != 0) {
1874       if (status < 0) {
1875         ERROR("plugin_dispatch_missing: Callback function \"%s\" "
1876               "failed with status %i.",
1877               le->key, status);
1878         return status;
1879       } else {
1880         return 0;
1881       }
1882     }
1883
1884     le = le->next;
1885   }
1886   return 0;
1887 } /* int }}} plugin_dispatch_missing */
1888
1889 static int plugin_dispatch_values_internal(value_list_t *vl) {
1890   int status;
1891   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
1892
1893   bool free_meta_data = false;
1894
1895   assert(vl != NULL);
1896
1897   /* These fields are initialized by plugin_value_list_clone() if needed: */
1898   assert(vl->host[0] != 0);
1899   assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
1900   assert(vl->interval != 0);
1901
1902   if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
1903     ERROR("plugin_dispatch_values: Invalid value list "
1904           "from plugin %s.",
1905           vl->plugin);
1906     return -1;
1907   }
1908
1909   /* Free meta data only if the calling function didn't specify any. In
1910    * this case matches and targets may add some and the calling function
1911    * may not expect (and therefore free) that data. */
1912   if (vl->meta == NULL)
1913     free_meta_data = true;
1914
1915   if (list_write == NULL)
1916     c_complain_once(LOG_WARNING, &no_write_complaint,
1917                     "plugin_dispatch_values: No write callback has been "
1918                     "registered. Please load at least one output plugin, "
1919                     "if you want the collected data to be stored.");
1920
1921   if (data_sets == NULL) {
1922     ERROR("plugin_dispatch_values: No data sets registered. "
1923           "Could the types database be read? Check "
1924           "your `TypesDB' setting!");
1925     return -1;
1926   }
1927
1928   data_set_t *ds = NULL;
1929   if (c_avl_get(data_sets, vl->type, (void *)&ds) != 0) {
1930     char ident[6 * DATA_MAX_NAME_LEN];
1931
1932     FORMAT_VL(ident, sizeof(ident), vl);
1933     INFO("plugin_dispatch_values: Dataset not found: %s "
1934          "(from \"%s\"), check your types.db!",
1935          vl->type, ident);
1936     return -1;
1937   }
1938
1939   DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
1940         "host = %s; "
1941         "plugin = %s; plugin_instance = %s; "
1942         "type = %s; type_instance = %s;",
1943         CDTIME_T_TO_DOUBLE(vl->time), CDTIME_T_TO_DOUBLE(vl->interval),
1944         vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1945
1946 #if COLLECT_DEBUG
1947   assert(0 == strcmp(ds->type, vl->type));
1948 #else
1949   if (0 != strcmp(ds->type, vl->type))
1950     WARNING("plugin_dispatch_values: (ds->type = %s) != (vl->type = %s)",
1951             ds->type, vl->type);
1952 #endif
1953
1954 #if COLLECT_DEBUG
1955   assert(ds->ds_num == vl->values_len);
1956 #else
1957   if (ds->ds_num != vl->values_len) {
1958     ERROR("plugin_dispatch_values: ds->type = %s: "
1959           "(ds->ds_num = %" PRIsz ") != "
1960           "(vl->values_len = %" PRIsz ")",
1961           ds->type, ds->ds_num, vl->values_len);
1962     return -1;
1963   }
1964 #endif
1965
1966   escape_slashes(vl->host, sizeof(vl->host));
1967   escape_slashes(vl->plugin, sizeof(vl->plugin));
1968   escape_slashes(vl->plugin_instance, sizeof(vl->plugin_instance));
1969   escape_slashes(vl->type, sizeof(vl->type));
1970   escape_slashes(vl->type_instance, sizeof(vl->type_instance));
1971
1972   if (pre_cache_chain != NULL) {
1973     status = fc_process_chain(ds, vl, pre_cache_chain);
1974     if (status < 0) {
1975       WARNING("plugin_dispatch_values: Running the "
1976               "pre-cache chain failed with "
1977               "status %i (%#x).",
1978               status, status);
1979     } else if (status == FC_TARGET_STOP)
1980       return 0;
1981   }
1982
1983   /* Update the value cache */
1984   uc_update(ds, vl);
1985
1986   if (post_cache_chain != NULL) {
1987     status = fc_process_chain(ds, vl, post_cache_chain);
1988     if (status < 0) {
1989       WARNING("plugin_dispatch_values: Running the "
1990               "post-cache chain failed with "
1991               "status %i (%#x).",
1992               status, status);
1993     }
1994   } else
1995     fc_default_action(ds, vl);
1996
1997   if ((free_meta_data == true) && (vl->meta != NULL)) {
1998     meta_data_destroy(vl->meta);
1999     vl->meta = NULL;
2000   }
2001
2002   return 0;
2003 } /* int plugin_dispatch_values_internal */
2004
2005 static double get_drop_probability(void) /* {{{ */
2006 {
2007   long pos;
2008   long size;
2009   long wql;
2010
2011   pthread_mutex_lock(&write_lock);
2012   wql = write_queue_length;
2013   pthread_mutex_unlock(&write_lock);
2014
2015   if (wql < write_limit_low)
2016     return 0.0;
2017   if (wql >= write_limit_high)
2018     return 1.0;
2019
2020   pos = 1 + wql - write_limit_low;
2021   size = 1 + write_limit_high - write_limit_low;
2022
2023   return (double)pos / (double)size;
2024 } /* }}} double get_drop_probability */
2025
2026 static bool check_drop_value(void) /* {{{ */
2027 {
2028   static cdtime_t last_message_time;
2029   static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2030
2031   double p;
2032   double q;
2033   int status;
2034
2035   if (write_limit_high == 0)
2036     return false;
2037
2038   p = get_drop_probability();
2039   if (p == 0.0)
2040     return false;
2041
2042   status = pthread_mutex_trylock(&last_message_lock);
2043   if (status == 0) {
2044     cdtime_t now;
2045
2046     now = cdtime();
2047     if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) {
2048       last_message_time = now;
2049       ERROR("plugin_dispatch_values: Low water mark "
2050             "reached. Dropping %.0f%% of metrics.",
2051             100.0 * p);
2052     }
2053     pthread_mutex_unlock(&last_message_lock);
2054   }
2055
2056   if (p == 1.0)
2057     return true;
2058
2059   q = cdrand_d();
2060   if (q > p)
2061     return true;
2062   else
2063     return false;
2064 } /* }}} bool check_drop_value */
2065
2066 int plugin_dispatch_values(value_list_t const *vl) {
2067   int status;
2068   static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
2069
2070   if (check_drop_value()) {
2071     if (record_statistics) {
2072       pthread_mutex_lock(&statistics_lock);
2073       stats_values_dropped++;
2074       pthread_mutex_unlock(&statistics_lock);
2075     }
2076     return 0;
2077   }
2078
2079   status = plugin_write_enqueue(vl);
2080   if (status != 0) {
2081     ERROR("plugin_dispatch_values: plugin_write_enqueue failed with status %i "
2082           "(%s).",
2083           status, STRERROR(status));
2084     return status;
2085   }
2086
2087   return 0;
2088 }
2089
2090 __attribute__((sentinel)) int
2091 plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */
2092                            bool store_percentage, int store_type, ...) {
2093   value_list_t *vl;
2094   int failed = 0;
2095   gauge_t sum = 0.0;
2096   va_list ap;
2097
2098   assert(template->values_len == 1);
2099
2100   /* Calculate sum for Gauge to calculate percent if needed */
2101   if (DS_TYPE_GAUGE == store_type) {
2102     va_start(ap, store_type);
2103     while (42) {
2104       char const *name;
2105       gauge_t value;
2106
2107       name = va_arg(ap, char const *);
2108       if (name == NULL)
2109         break;
2110
2111       value = va_arg(ap, gauge_t);
2112       if (!isnan(value))
2113         sum += value;
2114     }
2115     va_end(ap);
2116   }
2117
2118   vl = plugin_value_list_clone(template);
2119   /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2120   if (store_percentage)
2121     sstrncpy(vl->type, "percent", sizeof(vl->type));
2122
2123   va_start(ap, store_type);
2124   while (42) {
2125     char const *name;
2126     int status;
2127
2128     /* Set the type instance. */
2129     name = va_arg(ap, char const *);
2130     if (name == NULL)
2131       break;
2132     sstrncpy(vl->type_instance, name, sizeof(vl->type_instance));
2133
2134     /* Set the value. */
2135     switch (store_type) {
2136     case DS_TYPE_GAUGE:
2137       vl->values[0].gauge = va_arg(ap, gauge_t);
2138       if (store_percentage)
2139         vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2140       break;
2141     case DS_TYPE_ABSOLUTE:
2142       vl->values[0].absolute = va_arg(ap, absolute_t);
2143       break;
2144     case DS_TYPE_COUNTER:
2145       vl->values[0].counter = va_arg(ap, counter_t);
2146       break;
2147     case DS_TYPE_DERIVE:
2148       vl->values[0].derive = va_arg(ap, derive_t);
2149       break;
2150     default:
2151       ERROR("plugin_dispatch_multivalue: given store_type is incorrect.");
2152       failed++;
2153     }
2154
2155     status = plugin_write_enqueue(vl);
2156     if (status != 0)
2157       failed++;
2158   }
2159   va_end(ap);
2160
2161   plugin_value_list_free(vl);
2162   return failed;
2163 } /* }}} int plugin_dispatch_multivalue */
2164
2165 int plugin_dispatch_notification(const notification_t *notif) {
2166   llentry_t *le;
2167   /* Possible TODO: Add flap detection here */
2168
2169   DEBUG("plugin_dispatch_notification: severity = %i; message = %s; "
2170         "time = %.3f; host = %s;",
2171         notif->severity, notif->message, CDTIME_T_TO_DOUBLE(notif->time),
2172         notif->host);
2173
2174   /* Nobody cares for notifications */
2175   if (list_notification == NULL)
2176     return -1;
2177
2178   le = llist_head(list_notification);
2179   while (le != NULL) {
2180     callback_func_t *cf;
2181     plugin_notification_cb callback;
2182     int status;
2183
2184     /* do not switch plugin context; rather keep the context
2185      * (interval) information of the calling plugin */
2186
2187     cf = le->value;
2188     callback = cf->cf_callback;
2189     status = (*callback)(notif, &cf->cf_udata);
2190     if (status != 0) {
2191       WARNING("plugin_dispatch_notification: Notification "
2192               "callback %s returned %i.",
2193               le->key, status);
2194     }
2195
2196     le = le->next;
2197   }
2198
2199   return 0;
2200 } /* int plugin_dispatch_notification */
2201
2202 void plugin_log(int level, const char *format, ...) {
2203   char msg[1024];
2204   va_list ap;
2205   llentry_t *le;
2206
2207 #if !COLLECT_DEBUG
2208   if (level >= LOG_DEBUG)
2209     return;
2210 #endif
2211
2212   va_start(ap, format);
2213   vsnprintf(msg, sizeof(msg), format, ap);
2214   msg[sizeof(msg) - 1] = '\0';
2215   va_end(ap);
2216
2217   if (list_log == NULL) {
2218     fprintf(stderr, "%s\n", msg);
2219     return;
2220   }
2221
2222   le = llist_head(list_log);
2223   while (le != NULL) {
2224     callback_func_t *cf;
2225     plugin_log_cb callback;
2226
2227     cf = le->value;
2228     callback = cf->cf_callback;
2229
2230     /* do not switch plugin context; rather keep the context
2231      * (interval) information of the calling plugin */
2232
2233     (*callback)(level, msg, &cf->cf_udata);
2234
2235     le = le->next;
2236   }
2237 } /* void plugin_log */
2238
2239 int parse_log_severity(const char *severity) {
2240   int log_level = -1;
2241
2242   if ((0 == strcasecmp(severity, "emerg")) ||
2243       (0 == strcasecmp(severity, "alert")) ||
2244       (0 == strcasecmp(severity, "crit")) || (0 == strcasecmp(severity, "err")))
2245     log_level = LOG_ERR;
2246   else if (0 == strcasecmp(severity, "warning"))
2247     log_level = LOG_WARNING;
2248   else if (0 == strcasecmp(severity, "notice"))
2249     log_level = LOG_NOTICE;
2250   else if (0 == strcasecmp(severity, "info"))
2251     log_level = LOG_INFO;
2252 #if COLLECT_DEBUG
2253   else if (0 == strcasecmp(severity, "debug"))
2254     log_level = LOG_DEBUG;
2255 #endif /* COLLECT_DEBUG */
2256
2257   return log_level;
2258 } /* int parse_log_severity */
2259
2260 int parse_notif_severity(const char *severity) {
2261   int notif_severity = -1;
2262
2263   if (strcasecmp(severity, "FAILURE") == 0)
2264     notif_severity = NOTIF_FAILURE;
2265   else if (strcmp(severity, "OKAY") == 0)
2266     notif_severity = NOTIF_OKAY;
2267   else if ((strcmp(severity, "WARNING") == 0) ||
2268            (strcmp(severity, "WARN") == 0))
2269     notif_severity = NOTIF_WARNING;
2270
2271   return notif_severity;
2272 } /* int parse_notif_severity */
2273
2274 const data_set_t *plugin_get_ds(const char *name) {
2275   data_set_t *ds;
2276
2277   if (data_sets == NULL) {
2278     ERROR("plugin_get_ds: No data sets are defined yet.");
2279     return NULL;
2280   }
2281
2282   if (c_avl_get(data_sets, name, (void *)&ds) != 0) {
2283     DEBUG("No such dataset registered: %s", name);
2284     return NULL;
2285   }
2286
2287   return ds;
2288 } /* data_set_t *plugin_get_ds */
2289
2290 static int plugin_notification_meta_add(notification_t *n, const char *name,
2291                                         enum notification_meta_type_e type,
2292                                         const void *value) {
2293   notification_meta_t *meta;
2294   notification_meta_t *tail;
2295
2296   if ((n == NULL) || (name == NULL) || (value == NULL)) {
2297     ERROR("plugin_notification_meta_add: A pointer is NULL!");
2298     return -1;
2299   }
2300
2301   meta = calloc(1, sizeof(*meta));
2302   if (meta == NULL) {
2303     ERROR("plugin_notification_meta_add: calloc failed.");
2304     return -1;
2305   }
2306
2307   sstrncpy(meta->name, name, sizeof(meta->name));
2308   meta->type = type;
2309
2310   switch (type) {
2311   case NM_TYPE_STRING: {
2312     meta->nm_value.nm_string = strdup((const char *)value);
2313     if (meta->nm_value.nm_string == NULL) {
2314       ERROR("plugin_notification_meta_add: strdup failed.");
2315       sfree(meta);
2316       return -1;
2317     }
2318     break;
2319   }
2320   case NM_TYPE_SIGNED_INT: {
2321     meta->nm_value.nm_signed_int = *((int64_t *)value);
2322     break;
2323   }
2324   case NM_TYPE_UNSIGNED_INT: {
2325     meta->nm_value.nm_unsigned_int = *((uint64_t *)value);
2326     break;
2327   }
2328   case NM_TYPE_DOUBLE: {
2329     meta->nm_value.nm_double = *((double *)value);
2330     break;
2331   }
2332   case NM_TYPE_BOOLEAN: {
2333     meta->nm_value.nm_boolean = *((bool *)value);
2334     break;
2335   }
2336   default: {
2337     ERROR("plugin_notification_meta_add: Unknown type: %i", type);
2338     sfree(meta);
2339     return -1;
2340   }
2341   } /* switch (type) */
2342
2343   meta->next = NULL;
2344   tail = n->meta;
2345   while ((tail != NULL) && (tail->next != NULL))
2346     tail = tail->next;
2347
2348   if (tail == NULL)
2349     n->meta = meta;
2350   else
2351     tail->next = meta;
2352
2353   return 0;
2354 } /* int plugin_notification_meta_add */
2355
2356 int plugin_notification_meta_add_string(notification_t *n, const char *name,
2357                                         const char *value) {
2358   return plugin_notification_meta_add(n, name, NM_TYPE_STRING, value);
2359 }
2360
2361 int plugin_notification_meta_add_signed_int(notification_t *n, const char *name,
2362                                             int64_t value) {
2363   return plugin_notification_meta_add(n, name, NM_TYPE_SIGNED_INT, &value);
2364 }
2365
2366 int plugin_notification_meta_add_unsigned_int(notification_t *n,
2367                                               const char *name,
2368                                               uint64_t value) {
2369   return plugin_notification_meta_add(n, name, NM_TYPE_UNSIGNED_INT, &value);
2370 }
2371
2372 int plugin_notification_meta_add_double(notification_t *n, const char *name,
2373                                         double value) {
2374   return plugin_notification_meta_add(n, name, NM_TYPE_DOUBLE, &value);
2375 }
2376
2377 int plugin_notification_meta_add_boolean(notification_t *n, const char *name,
2378                                          bool value) {
2379   return plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value);
2380 }
2381
2382 int plugin_notification_meta_copy(notification_t *dst,
2383                                   const notification_t *src) {
2384   assert(dst != NULL);
2385   assert(src != NULL);
2386   assert(dst != src);
2387   assert((src->meta == NULL) || (src->meta != dst->meta));
2388
2389   for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next) {
2390     if (meta->type == NM_TYPE_STRING)
2391       plugin_notification_meta_add_string(dst, meta->name,
2392                                           meta->nm_value.nm_string);
2393     else if (meta->type == NM_TYPE_SIGNED_INT)
2394       plugin_notification_meta_add_signed_int(dst, meta->name,
2395                                               meta->nm_value.nm_signed_int);
2396     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2397       plugin_notification_meta_add_unsigned_int(dst, meta->name,
2398                                                 meta->nm_value.nm_unsigned_int);
2399     else if (meta->type == NM_TYPE_DOUBLE)
2400       plugin_notification_meta_add_double(dst, meta->name,
2401                                           meta->nm_value.nm_double);
2402     else if (meta->type == NM_TYPE_BOOLEAN)
2403       plugin_notification_meta_add_boolean(dst, meta->name,
2404                                            meta->nm_value.nm_boolean);
2405   }
2406
2407   return 0;
2408 } /* int plugin_notification_meta_copy */
2409
2410 int plugin_notification_meta_free(notification_meta_t *n) {
2411   notification_meta_t *this;
2412   notification_meta_t *next;
2413
2414   if (n == NULL) {
2415     ERROR("plugin_notification_meta_free: n == NULL!");
2416     return -1;
2417   }
2418
2419   this = n;
2420   while (this != NULL) {
2421     next = this->next;
2422
2423     if (this->type == NM_TYPE_STRING) {
2424       /* Assign to a temporary variable to work around nm_string's const
2425        * modifier. */
2426       void *tmp = (void *)this->nm_value.nm_string;
2427
2428       sfree(tmp);
2429       this->nm_value.nm_string = NULL;
2430     }
2431     sfree(this);
2432
2433     this = next;
2434   }
2435
2436   return 0;
2437 } /* int plugin_notification_meta_free */
2438
2439 static void plugin_ctx_destructor(void *ctx) {
2440   sfree(ctx);
2441 } /* void plugin_ctx_destructor */
2442
2443 static plugin_ctx_t ctx_init = {/* interval = */ 0};
2444
2445 static plugin_ctx_t *plugin_ctx_create(void) {
2446   plugin_ctx_t *ctx;
2447
2448   ctx = malloc(sizeof(*ctx));
2449   if (ctx == NULL) {
2450     ERROR("Failed to allocate plugin context: %s", STRERRNO);
2451     return NULL;
2452   }
2453
2454   *ctx = ctx_init;
2455   assert(plugin_ctx_key_initialized);
2456   pthread_setspecific(plugin_ctx_key, ctx);
2457   DEBUG("Created new plugin context.");
2458   return ctx;
2459 } /* int plugin_ctx_create */
2460
2461 void plugin_init_ctx(void) {
2462   pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor);
2463   plugin_ctx_key_initialized = true;
2464 } /* void plugin_init_ctx */
2465
2466 plugin_ctx_t plugin_get_ctx(void) {
2467   plugin_ctx_t *ctx;
2468
2469   assert(plugin_ctx_key_initialized);
2470   ctx = pthread_getspecific(plugin_ctx_key);
2471
2472   if (ctx == NULL) {
2473     ctx = plugin_ctx_create();
2474     /* this must no happen -- exit() instead? */
2475     if (ctx == NULL)
2476       return ctx_init;
2477   }
2478
2479   return *ctx;
2480 } /* plugin_ctx_t plugin_get_ctx */
2481
2482 plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
2483   plugin_ctx_t *c;
2484   plugin_ctx_t old;
2485
2486   assert(plugin_ctx_key_initialized);
2487   c = pthread_getspecific(plugin_ctx_key);
2488
2489   if (c == NULL) {
2490     c = plugin_ctx_create();
2491     /* this must no happen -- exit() instead? */
2492     if (c == NULL)
2493       return ctx_init;
2494   }
2495
2496   old = *c;
2497   *c = ctx;
2498
2499   return old;
2500 } /* void plugin_set_ctx */
2501
2502 cdtime_t plugin_get_interval(void) {
2503   cdtime_t interval;
2504
2505   interval = plugin_get_ctx().interval;
2506   if (interval > 0)
2507     return interval;
2508
2509   return cf_get_default_interval();
2510 } /* cdtime_t plugin_get_interval */
2511
2512 typedef struct {
2513   plugin_ctx_t ctx;
2514   void *(*start_routine)(void *);
2515   void *arg;
2516 } plugin_thread_t;
2517
2518 static void *plugin_thread_start(void *arg) {
2519   plugin_thread_t *plugin_thread = arg;
2520
2521   void *(*start_routine)(void *) = plugin_thread->start_routine;
2522   void *plugin_arg = plugin_thread->arg;
2523
2524   plugin_set_ctx(plugin_thread->ctx);
2525
2526   sfree(plugin_thread);
2527
2528   return start_routine(plugin_arg);
2529 } /* void *plugin_thread_start */
2530
2531 int plugin_thread_create(pthread_t *thread, const pthread_attr_t *attr,
2532                          void *(*start_routine)(void *), void *arg,
2533                          char const *name) {
2534   plugin_thread_t *plugin_thread;
2535
2536   plugin_thread = malloc(sizeof(*plugin_thread));
2537   if (plugin_thread == NULL)
2538     return ENOMEM;
2539
2540   plugin_thread->ctx = plugin_get_ctx();
2541   plugin_thread->start_routine = start_routine;
2542   plugin_thread->arg = arg;
2543
2544   int ret = pthread_create(thread, attr, plugin_thread_start, plugin_thread);
2545   if (ret != 0) {
2546     sfree(plugin_thread);
2547     return ret;
2548   }
2549
2550   if (name != NULL)
2551     set_thread_name(*thread, name);
2552
2553   return 0;
2554 } /* int plugin_thread_create */