Merge remote-tracking branch 'github/pr/734'
[collectd.git] / src / daemon / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27
28 #include "collectd.h"
29 #include "common.h"
30 #include "plugin.h"
31 #include "configfile.h"
32 #include "filter_chain.h"
33 #include "utils_avltree.h"
34 #include "utils_cache.h"
35 #include "utils_complain.h"
36 #include "utils_llist.h"
37 #include "utils_heap.h"
38 #include "utils_time.h"
39 #include "utils_random.h"
40
41 #if HAVE_PTHREAD_H
42 # include <pthread.h>
43 #endif
44
45 #include <ltdl.h>
46
47 /*
48  * Private structures
49  */
50 struct callback_func_s
51 {
52         void *cf_callback;
53         user_data_t cf_udata;
54         plugin_ctx_t cf_ctx;
55 };
56 typedef struct callback_func_s callback_func_t;
57
58 #define RF_SIMPLE  0
59 #define RF_COMPLEX 1
60 #define RF_REMOVE  65535
61 struct read_func_s
62 {
63         /* `read_func_t' "inherits" from `callback_func_t'.
64          * The `rf_super' member MUST be the first one in this structure! */
65 #define rf_callback rf_super.cf_callback
66 #define rf_udata rf_super.cf_udata
67 #define rf_ctx rf_super.cf_ctx
68         callback_func_t rf_super;
69         char rf_group[DATA_MAX_NAME_LEN];
70         char *rf_name;
71         int rf_type;
72         cdtime_t rf_interval;
73         cdtime_t rf_effective_interval;
74         cdtime_t rf_next_read;
75 };
76 typedef struct read_func_s read_func_t;
77
78 struct write_queue_s;
79 typedef struct write_queue_s write_queue_t;
80 struct write_queue_s
81 {
82         value_list_t *vl;
83         plugin_ctx_t ctx;
84         write_queue_t *next;
85 };
86
87 /*
88  * Private variables
89  */
90 static c_avl_tree_t *plugins_loaded = NULL;
91
92 static llist_t *list_init;
93 static llist_t *list_write;
94 static llist_t *list_flush;
95 static llist_t *list_missing;
96 static llist_t *list_shutdown;
97 static llist_t *list_log;
98 static llist_t *list_notification;
99
100 static fc_chain_t *pre_cache_chain = NULL;
101 static fc_chain_t *post_cache_chain = NULL;
102
103 static c_avl_tree_t *data_sets;
104
105 static char *plugindir = NULL;
106
107 #ifndef DEFAULT_MAX_READ_INTERVAL
108 # define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400)
109 #endif
110 static c_heap_t       *read_heap = NULL;
111 static llist_t        *read_list;
112 static int             read_loop = 1;
113 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
114 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
115 static pthread_t      *read_threads = NULL;
116 static int             read_threads_num = 0;
117 static cdtime_t        max_read_interval = DEFAULT_MAX_READ_INTERVAL;
118
119 static write_queue_t  *write_queue_head;
120 static write_queue_t  *write_queue_tail;
121 static long            write_queue_length = 0;
122 static _Bool           write_loop = 1;
123 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
124 static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
125 static pthread_t      *write_threads = NULL;
126 static size_t          write_threads_num = 0;
127
128 static pthread_key_t   plugin_ctx_key;
129 static _Bool           plugin_ctx_key_initialized = 0;
130
131 static long            write_limit_high = 0;
132 static long            write_limit_low = 0;
133
134 static derive_t        stats_values_dropped = 0;
135 static _Bool           record_statistics = 0;
136
137 /*
138  * Static functions
139  */
140 static int plugin_dispatch_values_internal (value_list_t *vl);
141
142 static const char *plugin_get_dir (void)
143 {
144         if (plugindir == NULL)
145                 return (PLUGINDIR);
146         else
147                 return (plugindir);
148 }
149
150 static void plugin_update_internal_statistics (void) { /* {{{ */
151         derive_t copy_write_queue_length;
152         value_list_t vl = VALUE_LIST_INIT;
153         value_t values[2];
154
155         copy_write_queue_length = write_queue_length;
156
157         /* Initialize `vl' */
158         vl.values = values;
159         vl.values_len = 2;
160         vl.time = 0;
161         sstrncpy (vl.host, hostname_g, sizeof (vl.host));
162         sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin));
163
164         vl.type_instance[0] = 0;
165         vl.values_len = 1;
166
167         /* Write queue */
168         sstrncpy (vl.plugin_instance, "write_queue",
169                         sizeof (vl.plugin_instance));
170
171         /* Write queue : queue length */
172         vl.values[0].gauge = (gauge_t) copy_write_queue_length;
173         sstrncpy (vl.type, "queue_length", sizeof (vl.type));
174         vl.type_instance[0] = 0;
175         plugin_dispatch_values (&vl);
176
177         /* Write queue : Values dropped (queue length > low limit) */
178         vl.values[0].derive = (derive_t) stats_values_dropped;
179         sstrncpy (vl.type, "derive", sizeof (vl.type));
180         sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance));
181         plugin_dispatch_values (&vl);
182
183         /* Cache */
184         sstrncpy (vl.plugin_instance, "cache",
185                         sizeof (vl.plugin_instance));
186
187         /* Cache : Nb entry in cache tree */
188         vl.values[0].gauge = (gauge_t) uc_get_size();
189         sstrncpy (vl.type, "cache_size", sizeof (vl.type));
190         vl.type_instance[0] = 0;
191         plugin_dispatch_values (&vl);
192
193         return;
194 } /* }}} void plugin_update_internal_statistics */
195
196 static void destroy_callback (callback_func_t *cf) /* {{{ */
197 {
198         if (cf == NULL)
199                 return;
200
201         if ((cf->cf_udata.data != NULL) && (cf->cf_udata.free_func != NULL))
202         {
203                 cf->cf_udata.free_func (cf->cf_udata.data);
204                 cf->cf_udata.data = NULL;
205                 cf->cf_udata.free_func = NULL;
206         }
207         sfree (cf);
208 } /* }}} void destroy_callback */
209
210 static void destroy_all_callbacks (llist_t **list) /* {{{ */
211 {
212         llentry_t *le;
213
214         if (*list == NULL)
215                 return;
216
217         le = llist_head (*list);
218         while (le != NULL)
219         {
220                 llentry_t *le_next;
221
222                 le_next = le->next;
223
224                 sfree (le->key);
225                 destroy_callback (le->value);
226                 le->value = NULL;
227
228                 le = le_next;
229         }
230
231         llist_destroy (*list);
232         *list = NULL;
233 } /* }}} void destroy_all_callbacks */
234
235 static void destroy_read_heap (void) /* {{{ */
236 {
237         if (read_heap == NULL)
238                 return;
239
240         while (42)
241         {
242                 callback_func_t *cf;
243
244                 cf = c_heap_get_root (read_heap);
245                 if (cf == NULL)
246                         break;
247
248                 destroy_callback (cf);
249         }
250
251         c_heap_destroy (read_heap);
252         read_heap = NULL;
253 } /* }}} void destroy_read_heap */
254
255 static int register_callback (llist_t **list, /* {{{ */
256                 const char *name, callback_func_t *cf)
257 {
258         llentry_t *le;
259         char *key;
260
261         if (*list == NULL)
262         {
263                 *list = llist_create ();
264                 if (*list == NULL)
265                 {
266                         ERROR ("plugin: register_callback: "
267                                         "llist_create failed.");
268                         destroy_callback (cf);
269                         return (-1);
270                 }
271         }
272
273         key = strdup (name);
274         if (key == NULL)
275         {
276                 ERROR ("plugin: register_callback: strdup failed.");
277                 destroy_callback (cf);
278                 return (-1);
279         }
280
281         le = llist_search (*list, name);
282         if (le == NULL)
283         {
284                 le = llentry_create (key, cf);
285                 if (le == NULL)
286                 {
287                         ERROR ("plugin: register_callback: "
288                                         "llentry_create failed.");
289                         free (key);
290                         destroy_callback (cf);
291                         return (-1);
292                 }
293
294                 llist_append (*list, le);
295         }
296         else
297         {
298                 callback_func_t *old_cf;
299
300                 old_cf = le->value;
301                 le->value = cf;
302
303                 WARNING ("plugin: register_callback: "
304                                 "a callback named `%s' already exists - "
305                                 "overwriting the old entry!", name);
306
307                 destroy_callback (old_cf);
308                 sfree (key);
309         }
310
311         return (0);
312 } /* }}} int register_callback */
313
314 static int create_register_callback (llist_t **list, /* {{{ */
315                 const char *name, void *callback, user_data_t *ud)
316 {
317         callback_func_t *cf;
318
319         cf = (callback_func_t *) malloc (sizeof (*cf));
320         if (cf == NULL)
321         {
322                 ERROR ("plugin: create_register_callback: malloc failed.");
323                 return (-1);
324         }
325         memset (cf, 0, sizeof (*cf));
326
327         cf->cf_callback = callback;
328         if (ud == NULL)
329         {
330                 cf->cf_udata.data = NULL;
331                 cf->cf_udata.free_func = NULL;
332         }
333         else
334         {
335                 cf->cf_udata = *ud;
336         }
337
338         cf->cf_ctx = plugin_get_ctx ();
339
340         return (register_callback (list, name, cf));
341 } /* }}} int create_register_callback */
342
343 static int plugin_unregister (llist_t *list, const char *name) /* {{{ */
344 {
345         llentry_t *e;
346
347         if (list == NULL)
348                 return (-1);
349
350         e = llist_search (list, name);
351         if (e == NULL)
352                 return (-1);
353
354         llist_remove (list, e);
355
356         sfree (e->key);
357         destroy_callback (e->value);
358
359         llentry_destroy (e);
360
361         return (0);
362 } /* }}} int plugin_unregister */
363
364 /*
365  * (Try to) load the shared object `file'. Won't complain if it isn't a shared
366  * object, but it will bitch about a shared object not having a
367  * ``module_register'' symbol..
368  */
369 static int plugin_load_file (char *file, uint32_t flags)
370 {
371         lt_dlhandle dlh;
372         void (*reg_handle) (void);
373
374         lt_dlinit ();
375         lt_dlerror (); /* clear errors */
376
377 #if LIBTOOL_VERSION == 2
378         if (flags & PLUGIN_FLAGS_GLOBAL) {
379                 lt_dladvise advise;
380                 lt_dladvise_init(&advise);
381                 lt_dladvise_global(&advise);
382                 dlh = lt_dlopenadvise(file, advise);
383                 lt_dladvise_destroy(&advise);
384         } else {
385                 dlh = lt_dlopen (file);
386         }
387 #else /* if LIBTOOL_VERSION == 1 */
388         if (flags & PLUGIN_FLAGS_GLOBAL)
389                 WARNING ("plugin_load_file: The global flag is not supported, "
390                                 "libtool 2 is required for this.");
391         dlh = lt_dlopen (file);
392 #endif
393
394         if (dlh == NULL)
395         {
396                 char errbuf[1024] = "";
397
398                 ssnprintf (errbuf, sizeof (errbuf),
399                                 "lt_dlopen (\"%s\") failed: %s. "
400                                 "The most common cause for this problem are "
401                                 "missing dependencies. Use ldd(1) to check "
402                                 "the dependencies of the plugin "
403                                 "/ shared object.",
404                                 file, lt_dlerror ());
405
406                 ERROR ("%s", errbuf);
407                 /* Make sure this is printed to STDERR in any case, but also
408                  * make sure it's printed only once. */
409                 if (list_log != NULL)
410                         fprintf (stderr, "ERROR: %s\n", errbuf);
411
412                 return (1);
413         }
414
415         if ((reg_handle = (void (*) (void)) lt_dlsym (dlh, "module_register")) == NULL)
416         {
417                 WARNING ("Couldn't find symbol \"module_register\" in \"%s\": %s\n",
418                                 file, lt_dlerror ());
419                 lt_dlclose (dlh);
420                 return (-1);
421         }
422
423         (*reg_handle) ();
424
425         return (0);
426 }
427
428 static void *plugin_read_thread (void __attribute__((unused)) *args)
429 {
430         while (read_loop != 0)
431         {
432                 read_func_t *rf;
433                 plugin_ctx_t old_ctx;
434                 cdtime_t now;
435                 int status;
436                 int rf_type;
437                 int rc;
438
439                 /* Get the read function that needs to be read next.
440                  * We don't need to hold "read_lock" for the heap, but we need
441                  * to call c_heap_get_root() and pthread_cond_wait() in the
442                  * same protected block. */
443                 pthread_mutex_lock (&read_lock);
444                 rf = c_heap_get_root (read_heap);
445                 if (rf == NULL)
446                 {
447                         pthread_cond_wait (&read_cond, &read_lock);
448                         pthread_mutex_unlock (&read_lock);
449                         continue;
450                 }
451                 pthread_mutex_unlock (&read_lock);
452
453                 if (rf->rf_interval == 0)
454                 {
455                         /* this should not happen, because the interval is set
456                          * for each plugin when loading it
457                          * XXX: issue a warning? */
458                         rf->rf_interval = plugin_get_interval ();
459                         rf->rf_effective_interval = rf->rf_interval;
460
461                         rf->rf_next_read = cdtime ();
462                 }
463
464                 /* sleep until this entry is due,
465                  * using pthread_cond_timedwait */
466                 pthread_mutex_lock (&read_lock);
467                 /* In pthread_cond_timedwait, spurious wakeups are possible
468                  * (and really happen, at least on NetBSD with > 1 CPU), thus
469                  * we need to re-evaluate the condition every time
470                  * pthread_cond_timedwait returns. */
471                 rc = 0;
472                 while ((read_loop != 0)
473                                 && (cdtime () < rf->rf_next_read)
474                                 && rc == 0)
475                 {
476                         struct timespec ts = { 0 };
477
478                         CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
479
480                         rc = pthread_cond_timedwait (&read_cond, &read_lock,
481                                 &ts);
482                 }
483
484                 /* Must hold `read_lock' when accessing `rf->rf_type'. */
485                 rf_type = rf->rf_type;
486                 pthread_mutex_unlock (&read_lock);
487
488                 /* Check if we're supposed to stop.. This may have interrupted
489                  * the sleep, too. */
490                 if (read_loop == 0)
491                 {
492                         /* Insert `rf' again, so it can be free'd correctly */
493                         c_heap_insert (read_heap, rf);
494                         break;
495                 }
496
497                 /* The entry has been marked for deletion. The linked list
498                  * entry has already been removed by `plugin_unregister_read'.
499                  * All we have to do here is free the `read_func_t' and
500                  * continue. */
501                 if (rf_type == RF_REMOVE)
502                 {
503                         DEBUG ("plugin_read_thread: Destroying the `%s' "
504                                         "callback.", rf->rf_name);
505                         sfree (rf->rf_name);
506                         destroy_callback ((callback_func_t *) rf);
507                         rf = NULL;
508                         continue;
509                 }
510
511                 DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
512
513                 old_ctx = plugin_set_ctx (rf->rf_ctx);
514
515                 if (rf_type == RF_SIMPLE)
516                 {
517                         int (*callback) (void);
518
519                         callback = rf->rf_callback;
520                         status = (*callback) ();
521                 }
522                 else
523                 {
524                         plugin_read_cb callback;
525
526                         assert (rf_type == RF_COMPLEX);
527
528                         callback = rf->rf_callback;
529                         status = (*callback) (&rf->rf_udata);
530                 }
531
532                 plugin_set_ctx (old_ctx);
533
534                 /* If the function signals failure, we will increase the
535                  * intervals in which it will be called. */
536                 if (status != 0)
537                 {
538                         rf->rf_effective_interval *= 2;
539                         if (rf->rf_effective_interval > max_read_interval)
540                                 rf->rf_effective_interval = max_read_interval;
541
542                         NOTICE ("read-function of plugin `%s' failed. "
543                                         "Will suspend it for %.3f seconds.",
544                                         rf->rf_name,
545                                         CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
546                 }
547                 else
548                 {
549                         /* Success: Restore the interval, if it was changed. */
550                         rf->rf_effective_interval = rf->rf_interval;
551                 }
552
553                 /* update the ``next read due'' field */
554                 now = cdtime ();
555
556                 DEBUG ("plugin_read_thread: Effective interval of the "
557                                 "%s plugin is %.3f seconds.",
558                                 rf->rf_name,
559                                 CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
560
561                 /* Calculate the next (absolute) time at which this function
562                  * should be called. */
563                 rf->rf_next_read += rf->rf_effective_interval;
564
565                 /* Check, if `rf_next_read' is in the past. */
566                 if (rf->rf_next_read < now)
567                 {
568                         /* `rf_next_read' is in the past. Insert `now'
569                          * so this value doesn't trail off into the
570                          * past too much. */
571                         rf->rf_next_read = now;
572                 }
573
574                 DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.",
575                                 rf->rf_name,
576                                 CDTIME_T_TO_DOUBLE (rf->rf_next_read));
577
578                 /* Re-insert this read function into the heap again. */
579                 c_heap_insert (read_heap, rf);
580         } /* while (read_loop) */
581
582         pthread_exit (NULL);
583         return ((void *) 0);
584 } /* void *plugin_read_thread */
585
586 static void start_read_threads (int num)
587 {
588         int i;
589
590         if (read_threads != NULL)
591                 return;
592
593         read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
594         if (read_threads == NULL)
595         {
596                 ERROR ("plugin: start_read_threads: calloc failed.");
597                 return;
598         }
599
600         read_threads_num = 0;
601         for (i = 0; i < num; i++)
602         {
603                 if (pthread_create (read_threads + read_threads_num, NULL,
604                                         plugin_read_thread, NULL) == 0)
605                 {
606                         read_threads_num++;
607                 }
608                 else
609                 {
610                         ERROR ("plugin: start_read_threads: pthread_create failed.");
611                         return;
612                 }
613         } /* for (i) */
614 } /* void start_read_threads */
615
616 static void stop_read_threads (void)
617 {
618         int i;
619
620         if (read_threads == NULL)
621                 return;
622
623         INFO ("collectd: Stopping %i read threads.", read_threads_num);
624
625         pthread_mutex_lock (&read_lock);
626         read_loop = 0;
627         DEBUG ("plugin: stop_read_threads: Signalling `read_cond'");
628         pthread_cond_broadcast (&read_cond);
629         pthread_mutex_unlock (&read_lock);
630
631         for (i = 0; i < read_threads_num; i++)
632         {
633                 if (pthread_join (read_threads[i], NULL) != 0)
634                 {
635                         ERROR ("plugin: stop_read_threads: pthread_join failed.");
636                 }
637                 read_threads[i] = (pthread_t) 0;
638         }
639         sfree (read_threads);
640         read_threads_num = 0;
641 } /* void stop_read_threads */
642
643 static void plugin_value_list_free (value_list_t *vl) /* {{{ */
644 {
645         if (vl == NULL)
646                 return;
647
648         meta_data_destroy (vl->meta);
649         sfree (vl->values);
650         sfree (vl);
651 } /* }}} void plugin_value_list_free */
652
653 static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
654 {
655         value_list_t *vl;
656
657         if (vl_orig == NULL)
658                 return (NULL);
659
660         vl = malloc (sizeof (*vl));
661         if (vl == NULL)
662                 return (NULL);
663         memcpy (vl, vl_orig, sizeof (*vl));
664
665         vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
666         if (vl->values == NULL)
667         {
668                 plugin_value_list_free (vl);
669                 return (NULL);
670         }
671         memcpy (vl->values, vl_orig->values,
672                         vl_orig->values_len * sizeof (*vl->values));
673
674         vl->meta = meta_data_clone (vl->meta);
675         if ((vl_orig->meta != NULL) && (vl->meta == NULL))
676         {
677                 plugin_value_list_free (vl);
678                 return (NULL);
679         }
680
681         if (vl->time == 0)
682                 vl->time = cdtime ();
683
684         /* Fill in the interval from the thread context, if it is zero. */
685         if (vl->interval == 0)
686         {
687                 plugin_ctx_t ctx = plugin_get_ctx ();
688
689                 if (ctx.interval != 0)
690                         vl->interval = ctx.interval;
691                 else
692                 {
693                         char name[6 * DATA_MAX_NAME_LEN];
694                         FORMAT_VL (name, sizeof (name), vl);
695                         ERROR ("plugin_value_list_clone: Unable to determine "
696                                         "interval from context for "
697                                         "value list \"%s\". "
698                                         "This indicates a broken plugin. "
699                                         "Please report this problem to the "
700                                         "collectd mailing list or at "
701                                         "<http://collectd.org/bugs/>.", name);
702                         vl->interval = cf_get_default_interval ();
703                 }
704         }
705
706         return (vl);
707 } /* }}} value_list_t *plugin_value_list_clone */
708
709 static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
710 {
711         write_queue_t *q;
712
713         q = malloc (sizeof (*q));
714         if (q == NULL)
715                 return (ENOMEM);
716         q->next = NULL;
717
718         q->vl = plugin_value_list_clone (vl);
719         if (q->vl == NULL)
720         {
721                 sfree (q);
722                 return (ENOMEM);
723         }
724
725         /* Store context of caller (read plugin); otherwise, it would not be
726          * available to the write plugins when actually dispatching the
727          * value-list later on. */
728         q->ctx = plugin_get_ctx ();
729
730         pthread_mutex_lock (&write_lock);
731
732         if (write_queue_tail == NULL)
733         {
734                 write_queue_head = q;
735                 write_queue_tail = q;
736                 write_queue_length = 1;
737         }
738         else
739         {
740                 write_queue_tail->next = q;
741                 write_queue_tail = q;
742                 write_queue_length += 1;
743         }
744
745         pthread_cond_signal (&write_cond);
746         pthread_mutex_unlock (&write_lock);
747
748         return (0);
749 } /* }}} int plugin_write_enqueue */
750
751 static value_list_t *plugin_write_dequeue (void) /* {{{ */
752 {
753         write_queue_t *q;
754         value_list_t *vl;
755
756         pthread_mutex_lock (&write_lock);
757
758         while (write_loop && (write_queue_head == NULL))
759                 pthread_cond_wait (&write_cond, &write_lock);
760
761         if (write_queue_head == NULL)
762         {
763                 pthread_mutex_unlock (&write_lock);
764                 return (NULL);
765         }
766
767         q = write_queue_head;
768         write_queue_head = q->next;
769         write_queue_length -= 1;
770         if (write_queue_head == NULL) {
771                 write_queue_tail = NULL;
772                 assert(0 == write_queue_length);
773                 }
774
775         pthread_mutex_unlock (&write_lock);
776
777         (void) plugin_set_ctx (q->ctx);
778
779         vl = q->vl;
780         sfree (q);
781         return (vl);
782 } /* }}} value_list_t *plugin_write_dequeue */
783
784 static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
785 {
786         while (write_loop)
787         {
788                 value_list_t *vl = plugin_write_dequeue ();
789                 if (vl == NULL)
790                         continue;
791
792                 plugin_dispatch_values_internal (vl);
793
794                 plugin_value_list_free (vl);
795         }
796
797         pthread_exit (NULL);
798         return ((void *) 0);
799 } /* }}} void *plugin_write_thread */
800
801 static void start_write_threads (size_t num) /* {{{ */
802 {
803         size_t i;
804
805         if (write_threads != NULL)
806                 return;
807
808         write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
809         if (write_threads == NULL)
810         {
811                 ERROR ("plugin: start_write_threads: calloc failed.");
812                 return;
813         }
814
815         write_threads_num = 0;
816         for (i = 0; i < num; i++)
817         {
818                 int status;
819
820                 status = pthread_create (write_threads + write_threads_num,
821                                 /* attr = */ NULL,
822                                 plugin_write_thread,
823                                 /* arg = */ NULL);
824                 if (status != 0)
825                 {
826                         char errbuf[1024];
827                         ERROR ("plugin: start_write_threads: pthread_create failed "
828                                         "with status %i (%s).", status,
829                                         sstrerror (status, errbuf, sizeof (errbuf)));
830                         return;
831                 }
832
833                 write_threads_num++;
834         } /* for (i) */
835 } /* }}} void start_write_threads */
836
837 static void stop_write_threads (void) /* {{{ */
838 {
839         write_queue_t *q;
840         int i;
841
842         if (write_threads == NULL)
843                 return;
844
845         INFO ("collectd: Stopping %zu write threads.", write_threads_num);
846
847         pthread_mutex_lock (&write_lock);
848         write_loop = 0;
849         DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
850         pthread_cond_broadcast (&write_cond);
851         pthread_mutex_unlock (&write_lock);
852
853         for (i = 0; i < write_threads_num; i++)
854         {
855                 if (pthread_join (write_threads[i], NULL) != 0)
856                 {
857                         ERROR ("plugin: stop_write_threads: pthread_join failed.");
858                 }
859                 write_threads[i] = (pthread_t) 0;
860         }
861         sfree (write_threads);
862         write_threads_num = 0;
863
864         pthread_mutex_lock (&write_lock);
865         i = 0;
866         for (q = write_queue_head; q != NULL; )
867         {
868                 write_queue_t *q1 = q;
869                 plugin_value_list_free (q->vl);
870                 q = q->next;
871                 sfree (q1);
872                 i++;
873         }
874         write_queue_head = NULL;
875         write_queue_tail = NULL;
876         write_queue_length = 0;
877         pthread_mutex_unlock (&write_lock);
878
879         if (i > 0)
880         {
881                 WARNING ("plugin: %i value list%s left after shutting down "
882                                 "the write threads.",
883                                 i, (i == 1) ? " was" : "s were");
884         }
885 } /* }}} void stop_write_threads */
886
887 /*
888  * Public functions
889  */
890 void plugin_set_dir (const char *dir)
891 {
892         if (plugindir != NULL)
893                 free (plugindir);
894
895         if (dir == NULL)
896                 plugindir = NULL;
897         else if ((plugindir = strdup (dir)) == NULL)
898         {
899                 char errbuf[1024];
900                 ERROR ("strdup failed: %s",
901                                 sstrerror (errno, errbuf, sizeof (errbuf)));
902         }
903 }
904
905 static _Bool plugin_is_loaded (char const *name)
906 {
907         int status;
908
909         if (plugins_loaded == NULL)
910                 plugins_loaded = c_avl_create ((void *) strcasecmp);
911         assert (plugins_loaded != NULL);
912
913         status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL);
914         return (status == 0);
915 }
916
917 static int plugin_mark_loaded (char const *name)
918 {
919         char *name_copy;
920         int status;
921
922         name_copy = strdup (name);
923         if (name_copy == NULL)
924                 return (ENOMEM);
925
926         status = c_avl_insert (plugins_loaded,
927                         /* key = */ name_copy, /* value = */ NULL);
928         return (status);
929 }
930
931 static void plugin_free_loaded ()
932 {
933         void *key;
934         void *value;
935
936         if (plugins_loaded == NULL)
937                 return;
938
939         while (c_avl_pick (plugins_loaded, &key, &value) == 0)
940         {
941                 sfree (key);
942                 assert (value == NULL);
943         }
944
945         c_avl_destroy (plugins_loaded);
946         plugins_loaded = NULL;
947 }
948
949 #define BUFSIZE 512
950 int plugin_load (char const *plugin_name, uint32_t flags)
951 {
952         DIR  *dh;
953         const char *dir;
954         char  filename[BUFSIZE] = "";
955         char  typename[BUFSIZE];
956         int   typename_len;
957         int   ret;
958         struct stat    statbuf;
959         struct dirent *de;
960         int status;
961
962         if (plugin_name == NULL)
963                 return (EINVAL);
964
965         /* Check if plugin is already loaded and don't do anything in this
966          * case. */
967         if (plugin_is_loaded (plugin_name))
968                 return (0);
969
970         dir = plugin_get_dir ();
971         ret = 1;
972
973         /*
974          * XXX: Magic at work:
975          *
976          * Some of the language bindings, for example the Python and Perl
977          * plugins, need to be able to export symbols to the scripts they run.
978          * For this to happen, the "Globals" flag needs to be set.
979          * Unfortunately, this technical detail is hard to explain to the
980          * average user and she shouldn't have to worry about this, ideally.
981          * So in order to save everyone's sanity use a different default for a
982          * handful of special plugins. --octo
983          */
984         if ((strcasecmp ("perl", plugin_name) == 0)
985                         || (strcasecmp ("python", plugin_name) == 0))
986                 flags |= PLUGIN_FLAGS_GLOBAL;
987
988         /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
989          * type when matching the filename */
990         status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name);
991         if ((status < 0) || ((size_t) status >= sizeof (typename)))
992         {
993                 WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name);
994                 return (-1);
995         }
996         typename_len = strlen (typename);
997
998         if ((dh = opendir (dir)) == NULL)
999         {
1000                 char errbuf[1024];
1001                 ERROR ("plugin_load: opendir (%s) failed: %s", dir,
1002                                 sstrerror (errno, errbuf, sizeof (errbuf)));
1003                 return (-1);
1004         }
1005
1006         while ((de = readdir (dh)) != NULL)
1007         {
1008                 if (strncasecmp (de->d_name, typename, typename_len))
1009                         continue;
1010
1011                 status = ssnprintf (filename, sizeof (filename),
1012                                 "%s/%s", dir, de->d_name);
1013                 if ((status < 0) || ((size_t) status >= sizeof (filename)))
1014                 {
1015                         WARNING ("plugin_load: Filename too long: \"%s/%s\"",
1016                                         dir, de->d_name);
1017                         continue;
1018                 }
1019
1020                 if (lstat (filename, &statbuf) == -1)
1021                 {
1022                         char errbuf[1024];
1023                         WARNING ("plugin_load: stat (\"%s\") failed: %s",
1024                                         filename,
1025                                         sstrerror (errno, errbuf, sizeof (errbuf)));
1026                         continue;
1027                 }
1028                 else if (!S_ISREG (statbuf.st_mode))
1029                 {
1030                         /* don't follow symlinks */
1031                         WARNING ("plugin_load: %s is not a regular file.",
1032                                         filename);
1033                         continue;
1034                 }
1035
1036                 status = plugin_load_file (filename, flags);
1037                 if (status == 0)
1038                 {
1039                         /* success */
1040                         plugin_mark_loaded (plugin_name);
1041                         ret = 0;
1042                         break;
1043                 }
1044                 else
1045                 {
1046                         ERROR ("plugin_load: Load plugin \"%s\" failed with "
1047                                         "status %i.", plugin_name, status);
1048                 }
1049         }
1050
1051         closedir (dh);
1052
1053         if (filename[0] == 0)
1054                 ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
1055                                 plugin_name, dir);
1056
1057         return (ret);
1058 }
1059
1060 /*
1061  * The `register_*' functions follow
1062  */
1063 int plugin_register_config (const char *name,
1064                 int (*callback) (const char *key, const char *val),
1065                 const char **keys, int keys_num)
1066 {
1067         cf_register (name, callback, keys, keys_num);
1068         return (0);
1069 } /* int plugin_register_config */
1070
1071 int plugin_register_complex_config (const char *type,
1072                 int (*callback) (oconfig_item_t *))
1073 {
1074         return (cf_register_complex (type, callback));
1075 } /* int plugin_register_complex_config */
1076
1077 int plugin_register_init (const char *name,
1078                 int (*callback) (void))
1079 {
1080         return (create_register_callback (&list_init, name, (void *) callback,
1081                                 /* user_data = */ NULL));
1082 } /* plugin_register_init */
1083
1084 static int plugin_compare_read_func (const void *arg0, const void *arg1)
1085 {
1086         const read_func_t *rf0;
1087         const read_func_t *rf1;
1088
1089         rf0 = arg0;
1090         rf1 = arg1;
1091
1092         if (rf0->rf_next_read < rf1->rf_next_read)
1093                 return (-1);
1094         else if (rf0->rf_next_read > rf1->rf_next_read)
1095                 return (1);
1096         else
1097                 return (0);
1098 } /* int plugin_compare_read_func */
1099
1100 /* Add a read function to both, the heap and a linked list. The linked list if
1101  * used to look-up read functions, especially for the remove function. The heap
1102  * is used to determine which plugin to read next. */
1103 static int plugin_insert_read (read_func_t *rf)
1104 {
1105         int status;
1106         llentry_t *le;
1107
1108         rf->rf_next_read = cdtime ();
1109         rf->rf_effective_interval = rf->rf_interval;
1110
1111         pthread_mutex_lock (&read_lock);
1112
1113         if (read_list == NULL)
1114         {
1115                 read_list = llist_create ();
1116                 if (read_list == NULL)
1117                 {
1118                         pthread_mutex_unlock (&read_lock);
1119                         ERROR ("plugin_insert_read: read_list failed.");
1120                         return (-1);
1121                 }
1122         }
1123
1124         if (read_heap == NULL)
1125         {
1126                 read_heap = c_heap_create (plugin_compare_read_func);
1127                 if (read_heap == NULL)
1128                 {
1129                         pthread_mutex_unlock (&read_lock);
1130                         ERROR ("plugin_insert_read: c_heap_create failed.");
1131                         return (-1);
1132                 }
1133         }
1134
1135         le = llist_search (read_list, rf->rf_name);
1136         if (le != NULL)
1137         {
1138                 pthread_mutex_unlock (&read_lock);
1139                 WARNING ("The read function \"%s\" is already registered. "
1140                                 "Check for duplicate \"LoadPlugin\" lines "
1141                                 "in your configuration!",
1142                                 rf->rf_name);
1143                 return (EINVAL);
1144         }
1145
1146         le = llentry_create (rf->rf_name, rf);
1147         if (le == NULL)
1148         {
1149                 pthread_mutex_unlock (&read_lock);
1150                 ERROR ("plugin_insert_read: llentry_create failed.");
1151                 return (-1);
1152         }
1153
1154         status = c_heap_insert (read_heap, rf);
1155         if (status != 0)
1156         {
1157                 pthread_mutex_unlock (&read_lock);
1158                 ERROR ("plugin_insert_read: c_heap_insert failed.");
1159                 llentry_destroy (le);
1160                 return (-1);
1161         }
1162
1163         /* This does not fail. */
1164         llist_append (read_list, le);
1165
1166         /* Wake up all the read threads. */
1167         pthread_cond_broadcast (&read_cond);
1168         pthread_mutex_unlock (&read_lock);
1169         return (0);
1170 } /* int plugin_insert_read */
1171
1172 int plugin_register_read (const char *name,
1173                 int (*callback) (void))
1174 {
1175         read_func_t *rf;
1176         int status;
1177
1178         rf = malloc (sizeof (*rf));
1179         if (rf == NULL)
1180         {
1181                 ERROR ("plugin_register_read: malloc failed.");
1182                 return (ENOMEM);
1183         }
1184
1185         memset (rf, 0, sizeof (read_func_t));
1186         rf->rf_callback = (void *) callback;
1187         rf->rf_udata.data = NULL;
1188         rf->rf_udata.free_func = NULL;
1189         rf->rf_ctx = plugin_get_ctx ();
1190         rf->rf_group[0] = '\0';
1191         rf->rf_name = strdup (name);
1192         rf->rf_type = RF_SIMPLE;
1193         rf->rf_interval = plugin_get_interval ();
1194
1195         status = plugin_insert_read (rf);
1196         if (status != 0)
1197                 sfree (rf);
1198
1199         return (status);
1200 } /* int plugin_register_read */
1201
1202 int plugin_register_complex_read (const char *group, const char *name,
1203                 plugin_read_cb callback,
1204                 const struct timespec *interval,
1205                 user_data_t *user_data)
1206 {
1207         read_func_t *rf;
1208         int status;
1209
1210         rf = malloc (sizeof (*rf));
1211         if (rf == NULL)
1212         {
1213                 ERROR ("plugin_register_complex_read: malloc failed.");
1214                 return (ENOMEM);
1215         }
1216
1217         memset (rf, 0, sizeof (read_func_t));
1218         rf->rf_callback = (void *) callback;
1219         if (group != NULL)
1220                 sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
1221         else
1222                 rf->rf_group[0] = '\0';
1223         rf->rf_name = strdup (name);
1224         rf->rf_type = RF_COMPLEX;
1225         if (interval != NULL)
1226                 rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval);
1227         else
1228                 rf->rf_interval = plugin_get_interval ();
1229
1230         /* Set user data */
1231         if (user_data == NULL)
1232         {
1233                 rf->rf_udata.data = NULL;
1234                 rf->rf_udata.free_func = NULL;
1235         }
1236         else
1237         {
1238                 rf->rf_udata = *user_data;
1239         }
1240
1241         rf->rf_ctx = plugin_get_ctx ();
1242
1243         status = plugin_insert_read (rf);
1244         if (status != 0)
1245                 sfree (rf);
1246
1247         return (status);
1248 } /* int plugin_register_complex_read */
1249
1250 int plugin_register_write (const char *name,
1251                 plugin_write_cb callback, user_data_t *ud)
1252 {
1253         return (create_register_callback (&list_write, name,
1254                                 (void *) callback, ud));
1255 } /* int plugin_register_write */
1256
1257 int plugin_register_flush (const char *name,
1258                 plugin_flush_cb callback, user_data_t *ud)
1259 {
1260         return (create_register_callback (&list_flush, name,
1261                                 (void *) callback, ud));
1262 } /* int plugin_register_flush */
1263
1264 int plugin_register_missing (const char *name,
1265                 plugin_missing_cb callback, user_data_t *ud)
1266 {
1267         return (create_register_callback (&list_missing, name,
1268                                 (void *) callback, ud));
1269 } /* int plugin_register_missing */
1270
1271 int plugin_register_shutdown (const char *name,
1272                 int (*callback) (void))
1273 {
1274         return (create_register_callback (&list_shutdown, name,
1275                                 (void *) callback, /* user_data = */ NULL));
1276 } /* int plugin_register_shutdown */
1277
1278 static void plugin_free_data_sets (void)
1279 {
1280         void *key;
1281         void *value;
1282
1283         if (data_sets == NULL)
1284                 return;
1285
1286         while (c_avl_pick (data_sets, &key, &value) == 0)
1287         {
1288                 data_set_t *ds = value;
1289                 /* key is a pointer to ds->type */
1290
1291                 sfree (ds->ds);
1292                 sfree (ds);
1293         }
1294
1295         c_avl_destroy (data_sets);
1296         data_sets = NULL;
1297 } /* void plugin_free_data_sets */
1298
1299 int plugin_register_data_set (const data_set_t *ds)
1300 {
1301         data_set_t *ds_copy;
1302         int i;
1303
1304         if ((data_sets != NULL)
1305                         && (c_avl_get (data_sets, ds->type, NULL) == 0))
1306         {
1307                 NOTICE ("Replacing DS `%s' with another version.", ds->type);
1308                 plugin_unregister_data_set (ds->type);
1309         }
1310         else if (data_sets == NULL)
1311         {
1312                 data_sets = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1313                 if (data_sets == NULL)
1314                         return (-1);
1315         }
1316
1317         ds_copy = (data_set_t *) malloc (sizeof (data_set_t));
1318         if (ds_copy == NULL)
1319                 return (-1);
1320         memcpy(ds_copy, ds, sizeof (data_set_t));
1321
1322         ds_copy->ds = (data_source_t *) malloc (sizeof (data_source_t)
1323                         * ds->ds_num);
1324         if (ds_copy->ds == NULL)
1325         {
1326                 free (ds_copy);
1327                 return (-1);
1328         }
1329
1330         for (i = 0; i < ds->ds_num; i++)
1331                 memcpy (ds_copy->ds + i, ds->ds + i, sizeof (data_source_t));
1332
1333         return (c_avl_insert (data_sets, (void *) ds_copy->type, (void *) ds_copy));
1334 } /* int plugin_register_data_set */
1335
1336 int plugin_register_log (const char *name,
1337                 plugin_log_cb callback, user_data_t *ud)
1338 {
1339         return (create_register_callback (&list_log, name,
1340                                 (void *) callback, ud));
1341 } /* int plugin_register_log */
1342
1343 int plugin_register_notification (const char *name,
1344                 plugin_notification_cb callback, user_data_t *ud)
1345 {
1346         return (create_register_callback (&list_notification, name,
1347                                 (void *) callback, ud));
1348 } /* int plugin_register_log */
1349
1350 int plugin_unregister_config (const char *name)
1351 {
1352         cf_unregister (name);
1353         return (0);
1354 } /* int plugin_unregister_config */
1355
1356 int plugin_unregister_complex_config (const char *name)
1357 {
1358         cf_unregister_complex (name);
1359         return (0);
1360 } /* int plugin_unregister_complex_config */
1361
1362 int plugin_unregister_init (const char *name)
1363 {
1364         return (plugin_unregister (list_init, name));
1365 }
1366
1367 int plugin_unregister_read (const char *name) /* {{{ */
1368 {
1369         llentry_t *le;
1370         read_func_t *rf;
1371
1372         if (name == NULL)
1373                 return (-ENOENT);
1374
1375         pthread_mutex_lock (&read_lock);
1376
1377         if (read_list == NULL)
1378         {
1379                 pthread_mutex_unlock (&read_lock);
1380                 return (-ENOENT);
1381         }
1382
1383         le = llist_search (read_list, name);
1384         if (le == NULL)
1385         {
1386                 pthread_mutex_unlock (&read_lock);
1387                 WARNING ("plugin_unregister_read: No such read function: %s",
1388                                 name);
1389                 return (-ENOENT);
1390         }
1391
1392         llist_remove (read_list, le);
1393
1394         rf = le->value;
1395         assert (rf != NULL);
1396         rf->rf_type = RF_REMOVE;
1397
1398         pthread_mutex_unlock (&read_lock);
1399
1400         llentry_destroy (le);
1401
1402         DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name);
1403
1404         return (0);
1405 } /* }}} int plugin_unregister_read */
1406
1407 static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */
1408 {
1409         read_func_t *rf    = e->value;
1410         char        *group = ud;
1411
1412         return strcmp (rf->rf_group, (const char *)group);
1413 } /* }}} int compare_read_func_group */
1414
1415 int plugin_unregister_read_group (const char *group) /* {{{ */
1416 {
1417         llentry_t *le;
1418         read_func_t *rf;
1419
1420         int found = 0;
1421
1422         if (group == NULL)
1423                 return (-ENOENT);
1424
1425         pthread_mutex_lock (&read_lock);
1426
1427         if (read_list == NULL)
1428         {
1429                 pthread_mutex_unlock (&read_lock);
1430                 return (-ENOENT);
1431         }
1432
1433         while (42)
1434         {
1435                 le = llist_search_custom (read_list,
1436                                 compare_read_func_group, (void *)group);
1437
1438                 if (le == NULL)
1439                         break;
1440
1441                 ++found;
1442
1443                 llist_remove (read_list, le);
1444
1445                 rf = le->value;
1446                 assert (rf != NULL);
1447                 rf->rf_type = RF_REMOVE;
1448
1449                 llentry_destroy (le);
1450
1451                 DEBUG ("plugin_unregister_read_group: "
1452                                 "Marked `%s' (group `%s') for removal.",
1453                                 rf->rf_name, group);
1454         }
1455
1456         pthread_mutex_unlock (&read_lock);
1457
1458         if (found == 0)
1459         {
1460                 WARNING ("plugin_unregister_read_group: No such "
1461                                 "group of read function: %s", group);
1462                 return (-ENOENT);
1463         }
1464
1465         return (0);
1466 } /* }}} int plugin_unregister_read_group */
1467
1468 int plugin_unregister_write (const char *name)
1469 {
1470         return (plugin_unregister (list_write, name));
1471 }
1472
1473 int plugin_unregister_flush (const char *name)
1474 {
1475         return (plugin_unregister (list_flush, name));
1476 }
1477
1478 int plugin_unregister_missing (const char *name)
1479 {
1480         return (plugin_unregister (list_missing, name));
1481 }
1482
1483 int plugin_unregister_shutdown (const char *name)
1484 {
1485         return (plugin_unregister (list_shutdown, name));
1486 }
1487
1488 int plugin_unregister_data_set (const char *name)
1489 {
1490         data_set_t *ds;
1491
1492         if (data_sets == NULL)
1493                 return (-1);
1494
1495         if (c_avl_remove (data_sets, name, NULL, (void *) &ds) != 0)
1496                 return (-1);
1497
1498         sfree (ds->ds);
1499         sfree (ds);
1500
1501         return (0);
1502 } /* int plugin_unregister_data_set */
1503
1504 int plugin_unregister_log (const char *name)
1505 {
1506         return (plugin_unregister (list_log, name));
1507 }
1508
1509 int plugin_unregister_notification (const char *name)
1510 {
1511         return (plugin_unregister (list_notification, name));
1512 }
1513
1514 void plugin_init_all (void)
1515 {
1516         char const *chain_name;
1517         long write_threads_num;
1518         llentry_t *le;
1519         int status;
1520
1521         /* Init the value cache */
1522         uc_init ();
1523
1524         if (IS_TRUE (global_option_get ("CollectInternalStats")))
1525                 record_statistics = 1;
1526
1527         chain_name = global_option_get ("PreCacheChain");
1528         pre_cache_chain = fc_chain_get_by_name (chain_name);
1529
1530         chain_name = global_option_get ("PostCacheChain");
1531         post_cache_chain = fc_chain_get_by_name (chain_name);
1532
1533         write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
1534                         /* default = */ 0);
1535         if (write_limit_high < 0)
1536         {
1537                 ERROR ("WriteQueueLimitHigh must be positive or zero.");
1538                 write_limit_high = 0;
1539         }
1540
1541         write_limit_low = global_option_get_long ("WriteQueueLimitLow",
1542                         /* default = */ write_limit_high / 2);
1543         if (write_limit_low < 0)
1544         {
1545                 ERROR ("WriteQueueLimitLow must be positive or zero.");
1546                 write_limit_low = write_limit_high / 2;
1547         }
1548         else if (write_limit_low > write_limit_high)
1549         {
1550                 ERROR ("WriteQueueLimitLow must not be larger than "
1551                                 "WriteQueueLimitHigh.");
1552                 write_limit_low = write_limit_high;
1553         }
1554
1555         write_threads_num = global_option_get_long ("WriteThreads",
1556                         /* default = */ 5);
1557         if (write_threads_num < 1)
1558         {
1559                 ERROR ("WriteThreads must be positive.");
1560                 write_threads_num = 5;
1561         }
1562
1563         start_write_threads ((size_t) write_threads_num);
1564
1565         if ((list_init == NULL) && (read_heap == NULL))
1566                 return;
1567
1568         /* Calling all init callbacks before checking if read callbacks
1569          * are available allows the init callbacks to register the read
1570          * callback. */
1571         le = llist_head (list_init);
1572         while (le != NULL)
1573         {
1574                 callback_func_t *cf;
1575                 plugin_init_cb callback;
1576                 plugin_ctx_t old_ctx;
1577
1578                 cf = le->value;
1579                 old_ctx = plugin_set_ctx (cf->cf_ctx);
1580                 callback = cf->cf_callback;
1581                 status = (*callback) ();
1582                 plugin_set_ctx (old_ctx);
1583
1584                 if (status != 0)
1585                 {
1586                         ERROR ("Initialization of plugin `%s' "
1587                                         "failed with status %i. "
1588                                         "Plugin will be unloaded.",
1589                                         le->key, status);
1590                         /* Plugins that register read callbacks from the init
1591                          * callback should take care of appropriate error
1592                          * handling themselves. */
1593                         /* FIXME: Unload _all_ functions */
1594                         plugin_unregister_read (le->key);
1595                 }
1596
1597                 le = le->next;
1598         }
1599
1600         max_read_interval = global_option_get_time ("MaxReadInterval",
1601                         DEFAULT_MAX_READ_INTERVAL);
1602
1603         /* Start read-threads */
1604         if (read_heap != NULL)
1605         {
1606                 const char *rt;
1607                 int num;
1608
1609                 rt = global_option_get ("ReadThreads");
1610                 num = atoi (rt);
1611                 if (num != -1)
1612                         start_read_threads ((num > 0) ? num : 5);
1613         }
1614 } /* void plugin_init_all */
1615
1616 /* TODO: Rename this function. */
1617 void plugin_read_all (void)
1618 {
1619         if(record_statistics) {
1620                 plugin_update_internal_statistics ();
1621         }
1622         uc_check_timeout ();
1623
1624         return;
1625 } /* void plugin_read_all */
1626
1627 /* Read function called when the `-T' command line argument is given. */
1628 int plugin_read_all_once (void)
1629 {
1630         int status;
1631         int return_status = 0;
1632
1633         if (read_heap == NULL)
1634         {
1635                 NOTICE ("No read-functions are registered.");
1636                 return (0);
1637         }
1638
1639         while (42)
1640         {
1641                 read_func_t *rf;
1642                 plugin_ctx_t old_ctx;
1643
1644                 rf = c_heap_get_root (read_heap);
1645                 if (rf == NULL)
1646                         break;
1647
1648                 old_ctx = plugin_set_ctx (rf->rf_ctx);
1649
1650                 if (rf->rf_type == RF_SIMPLE)
1651                 {
1652                         int (*callback) (void);
1653
1654                         callback = rf->rf_callback;
1655                         status = (*callback) ();
1656                 }
1657                 else
1658                 {
1659                         plugin_read_cb callback;
1660
1661                         callback = rf->rf_callback;
1662                         status = (*callback) (&rf->rf_udata);
1663                 }
1664
1665                 plugin_set_ctx (old_ctx);
1666
1667                 if (status != 0)
1668                 {
1669                         NOTICE ("read-function of plugin `%s' failed.",
1670                                         rf->rf_name);
1671                         return_status = -1;
1672                 }
1673
1674                 destroy_callback ((void *) rf);
1675         }
1676
1677         return (return_status);
1678 } /* int plugin_read_all_once */
1679
1680 int plugin_write (const char *plugin, /* {{{ */
1681                 const data_set_t *ds, const value_list_t *vl)
1682 {
1683   llentry_t *le;
1684   int status;
1685
1686   if (vl == NULL)
1687     return (EINVAL);
1688
1689   if (list_write == NULL)
1690     return (ENOENT);
1691
1692   if (ds == NULL)
1693   {
1694     ds = plugin_get_ds (vl->type);
1695     if (ds == NULL)
1696     {
1697       ERROR ("plugin_write: Unable to lookup type `%s'.", vl->type);
1698       return (ENOENT);
1699     }
1700   }
1701
1702   if (plugin == NULL)
1703   {
1704     int success = 0;
1705     int failure = 0;
1706
1707     le = llist_head (list_write);
1708     while (le != NULL)
1709     {
1710       callback_func_t *cf = le->value;
1711       plugin_write_cb callback;
1712
1713       /* do not switch plugin context; rather keep the context (interval)
1714        * information of the calling read plugin */
1715
1716       DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
1717       callback = cf->cf_callback;
1718       status = (*callback) (ds, vl, &cf->cf_udata);
1719       if (status != 0)
1720         failure++;
1721       else
1722         success++;
1723
1724       le = le->next;
1725     }
1726
1727     if ((success == 0) && (failure != 0))
1728       status = -1;
1729     else
1730       status = 0;
1731   }
1732   else /* plugin != NULL */
1733   {
1734     callback_func_t *cf;
1735     plugin_write_cb callback;
1736
1737     le = llist_head (list_write);
1738     while (le != NULL)
1739     {
1740       if (strcasecmp (plugin, le->key) == 0)
1741         break;
1742
1743       le = le->next;
1744     }
1745
1746     if (le == NULL)
1747       return (ENOENT);
1748
1749     cf = le->value;
1750
1751     /* do not switch plugin context; rather keep the context (interval)
1752      * information of the calling read plugin */
1753
1754     DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
1755     callback = cf->cf_callback;
1756     status = (*callback) (ds, vl, &cf->cf_udata);
1757   }
1758
1759   return (status);
1760 } /* }}} int plugin_write */
1761
1762 int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
1763 {
1764   llentry_t *le;
1765
1766   if (list_flush == NULL)
1767     return (0);
1768
1769   le = llist_head (list_flush);
1770   while (le != NULL)
1771   {
1772     callback_func_t *cf;
1773     plugin_flush_cb callback;
1774     plugin_ctx_t old_ctx;
1775
1776     if ((plugin != NULL)
1777         && (strcmp (plugin, le->key) != 0))
1778     {
1779       le = le->next;
1780       continue;
1781     }
1782
1783     cf = le->value;
1784     old_ctx = plugin_set_ctx (cf->cf_ctx);
1785     callback = cf->cf_callback;
1786
1787     (*callback) (timeout, identifier, &cf->cf_udata);
1788
1789     plugin_set_ctx (old_ctx);
1790
1791     le = le->next;
1792   }
1793   return (0);
1794 } /* int plugin_flush */
1795
1796 void plugin_shutdown_all (void)
1797 {
1798         llentry_t *le;
1799
1800         stop_read_threads ();
1801
1802         destroy_all_callbacks (&list_init);
1803
1804         pthread_mutex_lock (&read_lock);
1805         llist_destroy (read_list);
1806         read_list = NULL;
1807         pthread_mutex_unlock (&read_lock);
1808
1809         destroy_read_heap ();
1810
1811         plugin_flush (/* plugin = */ NULL,
1812                         /* timeout = */ 0,
1813                         /* identifier = */ NULL);
1814
1815         le = NULL;
1816         if (list_shutdown != NULL)
1817                 le = llist_head (list_shutdown);
1818
1819         while (le != NULL)
1820         {
1821                 callback_func_t *cf;
1822                 plugin_shutdown_cb callback;
1823                 plugin_ctx_t old_ctx;
1824
1825                 cf = le->value;
1826                 old_ctx = plugin_set_ctx (cf->cf_ctx);
1827                 callback = cf->cf_callback;
1828
1829                 /* Advance the pointer before calling the callback allows
1830                  * shutdown functions to unregister themselves. If done the
1831                  * other way around the memory `le' points to will be freed
1832                  * after callback returns. */
1833                 le = le->next;
1834
1835                 (*callback) ();
1836
1837                 plugin_set_ctx (old_ctx);
1838         }
1839
1840         stop_write_threads ();
1841
1842         /* Write plugins which use the `user_data' pointer usually need the
1843          * same data available to the flush callback. If this is the case, set
1844          * the free_function to NULL when registering the flush callback and to
1845          * the real free function when registering the write callback. This way
1846          * the data isn't freed twice. */
1847         destroy_all_callbacks (&list_flush);
1848         destroy_all_callbacks (&list_missing);
1849         destroy_all_callbacks (&list_write);
1850
1851         destroy_all_callbacks (&list_notification);
1852         destroy_all_callbacks (&list_shutdown);
1853         destroy_all_callbacks (&list_log);
1854
1855         plugin_free_loaded ();
1856         plugin_free_data_sets ();
1857 } /* void plugin_shutdown_all */
1858
1859 int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
1860 {
1861   llentry_t *le;
1862
1863   if (list_missing == NULL)
1864     return (0);
1865
1866   le = llist_head (list_missing);
1867   while (le != NULL)
1868   {
1869     callback_func_t *cf;
1870     plugin_missing_cb callback;
1871     plugin_ctx_t old_ctx;
1872     int status;
1873
1874     cf = le->value;
1875     old_ctx = plugin_set_ctx (cf->cf_ctx);
1876     callback = cf->cf_callback;
1877
1878     status = (*callback) (vl, &cf->cf_udata);
1879     plugin_set_ctx (old_ctx);
1880     if (status != 0)
1881     {
1882       if (status < 0)
1883       {
1884         ERROR ("plugin_dispatch_missing: Callback function \"%s\" "
1885             "failed with status %i.",
1886             le->key, status);
1887         return (status);
1888       }
1889       else
1890       {
1891         return (0);
1892       }
1893     }
1894
1895     le = le->next;
1896   }
1897   return (0);
1898 } /* int }}} plugin_dispatch_missing */
1899
1900 static int plugin_dispatch_values_internal (value_list_t *vl)
1901 {
1902         int status;
1903         static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
1904
1905         value_t *saved_values;
1906         int      saved_values_len;
1907
1908         data_set_t *ds;
1909
1910         int free_meta_data = 0;
1911
1912         if ((vl == NULL) || (vl->type[0] == 0)
1913                         || (vl->values == NULL) || (vl->values_len < 1))
1914         {
1915                 ERROR ("plugin_dispatch_values: Invalid value list "
1916                                 "from plugin %s.", vl->plugin);
1917                 return (-1);
1918         }
1919
1920         /* Free meta data only if the calling function didn't specify any. In
1921          * this case matches and targets may add some and the calling function
1922          * may not expect (and therefore free) that data. */
1923         if (vl->meta == NULL)
1924                 free_meta_data = 1;
1925
1926         if (list_write == NULL)
1927                 c_complain_once (LOG_WARNING, &no_write_complaint,
1928                                 "plugin_dispatch_values: No write callback has been "
1929                                 "registered. Please load at least one output plugin, "
1930                                 "if you want the collected data to be stored.");
1931
1932         if (data_sets == NULL)
1933         {
1934                 ERROR ("plugin_dispatch_values: No data sets registered. "
1935                                 "Could the types database be read? Check "
1936                                 "your `TypesDB' setting!");
1937                 return (-1);
1938         }
1939
1940         if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0)
1941         {
1942                 char ident[6 * DATA_MAX_NAME_LEN];
1943
1944                 FORMAT_VL (ident, sizeof (ident), vl);
1945                 INFO ("plugin_dispatch_values: Dataset not found: %s "
1946                                 "(from \"%s\"), check your types.db!",
1947                                 vl->type, ident);
1948                 return (-1);
1949         }
1950
1951         /* Assured by plugin_value_list_clone(). The time is determined at
1952          * _enqueue_ time. */
1953         assert (vl->time != 0);
1954         assert (vl->interval != 0);
1955
1956         DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
1957                         "host = %s; "
1958                         "plugin = %s; plugin_instance = %s; "
1959                         "type = %s; type_instance = %s;",
1960                         CDTIME_T_TO_DOUBLE (vl->time),
1961                         CDTIME_T_TO_DOUBLE (vl->interval),
1962                         vl->host,
1963                         vl->plugin, vl->plugin_instance,
1964                         vl->type, vl->type_instance);
1965
1966 #if COLLECT_DEBUG
1967         assert (0 == strcmp (ds->type, vl->type));
1968 #else
1969         if (0 != strcmp (ds->type, vl->type))
1970                 WARNING ("plugin_dispatch_values: (ds->type = %s) != (vl->type = %s)",
1971                                 ds->type, vl->type);
1972 #endif
1973
1974 #if COLLECT_DEBUG
1975         assert (ds->ds_num == vl->values_len);
1976 #else
1977         if (ds->ds_num != vl->values_len)
1978         {
1979                 ERROR ("plugin_dispatch_values: ds->type = %s: "
1980                                 "(ds->ds_num = %i) != "
1981                                 "(vl->values_len = %i)",
1982                                 ds->type, ds->ds_num, vl->values_len);
1983                 return (-1);
1984         }
1985 #endif
1986
1987         escape_slashes (vl->host, sizeof (vl->host));
1988         escape_slashes (vl->plugin, sizeof (vl->plugin));
1989         escape_slashes (vl->plugin_instance, sizeof (vl->plugin_instance));
1990         escape_slashes (vl->type, sizeof (vl->type));
1991         escape_slashes (vl->type_instance, sizeof (vl->type_instance));
1992
1993         /* Copy the values. This way, we can assure `targets' that they get
1994          * dynamically allocated values, which they can free and replace if
1995          * they like. */
1996         if ((pre_cache_chain != NULL) || (post_cache_chain != NULL))
1997         {
1998                 saved_values     = vl->values;
1999                 saved_values_len = vl->values_len;
2000
2001                 vl->values = (value_t *) calloc (vl->values_len,
2002                                 sizeof (*vl->values));
2003                 if (vl->values == NULL)
2004                 {
2005                         ERROR ("plugin_dispatch_values: calloc failed.");
2006                         vl->values = saved_values;
2007                         return (-1);
2008                 }
2009                 memcpy (vl->values, saved_values,
2010                                 vl->values_len * sizeof (*vl->values));
2011         }
2012         else /* if ((pre == NULL) && (post == NULL)) */
2013         {
2014                 saved_values     = NULL;
2015                 saved_values_len = 0;
2016         }
2017
2018         if (pre_cache_chain != NULL)
2019         {
2020                 status = fc_process_chain (ds, vl, pre_cache_chain);
2021                 if (status < 0)
2022                 {
2023                         WARNING ("plugin_dispatch_values: Running the "
2024                                         "pre-cache chain failed with "
2025                                         "status %i (%#x).",
2026                                         status, status);
2027                 }
2028                 else if (status == FC_TARGET_STOP)
2029                 {
2030                         /* Restore the state of the value_list so that plugins
2031                          * don't get confused.. */
2032                         if (saved_values != NULL)
2033                         {
2034                                 free (vl->values);
2035                                 vl->values     = saved_values;
2036                                 vl->values_len = saved_values_len;
2037                         }
2038                         return (0);
2039                 }
2040         }
2041
2042         /* Update the value cache */
2043         uc_update (ds, vl);
2044
2045         if (post_cache_chain != NULL)
2046         {
2047                 status = fc_process_chain (ds, vl, post_cache_chain);
2048                 if (status < 0)
2049                 {
2050                         WARNING ("plugin_dispatch_values: Running the "
2051                                         "post-cache chain failed with "
2052                                         "status %i (%#x).",
2053                                         status, status);
2054                 }
2055         }
2056         else
2057                 fc_default_action (ds, vl);
2058
2059         /* Restore the state of the value_list so that plugins don't get
2060          * confused.. */
2061         if (saved_values != NULL)
2062         {
2063                 free (vl->values);
2064                 vl->values     = saved_values;
2065                 vl->values_len = saved_values_len;
2066         }
2067
2068         if ((free_meta_data != 0) && (vl->meta != NULL))
2069         {
2070                 meta_data_destroy (vl->meta);
2071                 vl->meta = NULL;
2072         }
2073
2074         return (0);
2075 } /* int plugin_dispatch_values_internal */
2076
2077 static double get_drop_probability (void) /* {{{ */
2078 {
2079         long pos;
2080         long size;
2081         long wql;
2082
2083         pthread_mutex_lock (&write_lock);
2084         wql = write_queue_length;
2085         pthread_mutex_unlock (&write_lock);
2086
2087         if (wql < write_limit_low)
2088                 return (0.0);
2089         if (wql >= write_limit_high)
2090                 return (1.0);
2091
2092         pos = 1 + wql - write_limit_low;
2093         size = 1 + write_limit_high - write_limit_low;
2094
2095         return (((double) pos) / ((double) size));
2096 } /* }}} double get_drop_probability */
2097
2098 static _Bool check_drop_value (void) /* {{{ */
2099 {
2100         static cdtime_t last_message_time = 0;
2101         static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2102
2103         double p;
2104         double q;
2105         int status;
2106
2107         if (write_limit_high == 0)
2108                 return (0);
2109
2110         p = get_drop_probability ();
2111         if (p == 0.0)
2112                 return (0);
2113
2114         status = pthread_mutex_trylock (&last_message_lock);
2115         if (status == 0)
2116         {
2117                 cdtime_t now;
2118
2119                 now = cdtime ();
2120                 if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
2121                 {
2122                         last_message_time = now;
2123                         ERROR ("plugin_dispatch_values: Low water mark "
2124                                         "reached. Dropping %.0f%% of metrics.",
2125                                         100.0 * p);
2126                 }
2127                 pthread_mutex_unlock (&last_message_lock);
2128         }
2129
2130         if (p == 1.0)
2131                 return (1);
2132
2133         q = cdrand_d ();
2134         if (q > p)
2135                 return (1);
2136         else
2137                 return (0);
2138 } /* }}} _Bool check_drop_value */
2139
2140 int plugin_dispatch_values (value_list_t const *vl)
2141 {
2142         int status;
2143         static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
2144
2145         if (check_drop_value ()) {
2146                 if(record_statistics) {
2147                         pthread_mutex_lock(&statistics_lock);
2148                         stats_values_dropped++;
2149                         pthread_mutex_unlock(&statistics_lock);
2150                 }
2151                 return (0);
2152         }
2153
2154         status = plugin_write_enqueue (vl);
2155         if (status != 0)
2156         {
2157                 char errbuf[1024];
2158                 ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
2159                                 "with status %i (%s).", status,
2160                                 sstrerror (status, errbuf, sizeof (errbuf)));
2161                 return (status);
2162         }
2163
2164         return (0);
2165 }
2166
2167 __attribute__((sentinel))
2168 int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */
2169                 _Bool store_percentage, ...)
2170 {
2171         value_list_t *vl;
2172         int failed = 0;
2173         gauge_t sum = 0.0;
2174         va_list ap;
2175
2176         assert (template->values_len == 1);
2177
2178         va_start (ap, store_percentage);
2179         while (42)
2180         {
2181                 char const *name;
2182                 gauge_t value;
2183
2184                 name = va_arg (ap, char const *);
2185                 if (name == NULL)
2186                         break;
2187
2188                 value = va_arg (ap, gauge_t);
2189                 if (!isnan (value))
2190                         sum += value;
2191         }
2192         va_end (ap);
2193
2194         vl = plugin_value_list_clone (template);
2195         /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2196         if (store_percentage)
2197                 sstrncpy (vl->type, "percent", sizeof (vl->type));
2198
2199         va_start (ap, store_percentage);
2200         while (42)
2201         {
2202                 char const *name;
2203                 int status;
2204
2205                 /* Set the type instance. */
2206                 name = va_arg (ap, char const *);
2207                 if (name == NULL)
2208                         break;
2209                 sstrncpy (vl->type_instance, name, sizeof (vl->type_instance));
2210
2211                 /* Set the value. */
2212                 vl->values[0].gauge = va_arg (ap, gauge_t);
2213                 if (store_percentage)
2214                         vl->values[0].gauge *= 100.0 / sum;
2215
2216                 status = plugin_write_enqueue (vl);
2217                 if (status != 0)
2218                         failed++;
2219         }
2220         va_end (ap);
2221
2222         plugin_value_list_free (vl);
2223         return (failed);
2224 } /* }}} int plugin_dispatch_multivalue */
2225
2226 int plugin_dispatch_notification (const notification_t *notif)
2227 {
2228         llentry_t *le;
2229         /* Possible TODO: Add flap detection here */
2230
2231         DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
2232                         "time = %.3f; host = %s;",
2233                         notif->severity, notif->message,
2234                         CDTIME_T_TO_DOUBLE (notif->time), notif->host);
2235
2236         /* Nobody cares for notifications */
2237         if (list_notification == NULL)
2238                 return (-1);
2239
2240         le = llist_head (list_notification);
2241         while (le != NULL)
2242         {
2243                 callback_func_t *cf;
2244                 plugin_notification_cb callback;
2245                 int status;
2246
2247                 /* do not switch plugin context; rather keep the context
2248                  * (interval) information of the calling plugin */
2249
2250                 cf = le->value;
2251                 callback = cf->cf_callback;
2252                 status = (*callback) (notif, &cf->cf_udata);
2253                 if (status != 0)
2254                 {
2255                         WARNING ("plugin_dispatch_notification: Notification "
2256                                         "callback %s returned %i.",
2257                                         le->key, status);
2258                 }
2259
2260                 le = le->next;
2261         }
2262
2263         return (0);
2264 } /* int plugin_dispatch_notification */
2265
2266 void plugin_log (int level, const char *format, ...)
2267 {
2268         char msg[1024];
2269         va_list ap;
2270         llentry_t *le;
2271
2272 #if !COLLECT_DEBUG
2273         if (level >= LOG_DEBUG)
2274                 return;
2275 #endif
2276
2277         va_start (ap, format);
2278         vsnprintf (msg, sizeof (msg), format, ap);
2279         msg[sizeof (msg) - 1] = '\0';
2280         va_end (ap);
2281
2282         if (list_log == NULL)
2283         {
2284                 fprintf (stderr, "%s\n", msg);
2285                 return;
2286         }
2287
2288         le = llist_head (list_log);
2289         while (le != NULL)
2290         {
2291                 callback_func_t *cf;
2292                 plugin_log_cb callback;
2293
2294                 cf = le->value;
2295                 callback = cf->cf_callback;
2296
2297                 /* do not switch plugin context; rather keep the context
2298                  * (interval) information of the calling plugin */
2299
2300                 (*callback) (level, msg, &cf->cf_udata);
2301
2302                 le = le->next;
2303         }
2304 } /* void plugin_log */
2305
2306 int parse_log_severity (const char *severity)
2307 {
2308         int log_level = -1;
2309
2310         if ((0 == strcasecmp (severity, "emerg"))
2311                         || (0 == strcasecmp (severity, "alert"))
2312                         || (0 == strcasecmp (severity, "crit"))
2313                         || (0 == strcasecmp (severity, "err")))
2314                 log_level = LOG_ERR;
2315         else if (0 == strcasecmp (severity, "warning"))
2316                 log_level = LOG_WARNING;
2317         else if (0 == strcasecmp (severity, "notice"))
2318                 log_level = LOG_NOTICE;
2319         else if (0 == strcasecmp (severity, "info"))
2320                 log_level = LOG_INFO;
2321 #if COLLECT_DEBUG
2322         else if (0 == strcasecmp (severity, "debug"))
2323                 log_level = LOG_DEBUG;
2324 #endif /* COLLECT_DEBUG */
2325
2326         return (log_level);
2327 } /* int parse_log_severity */
2328
2329 int parse_notif_severity (const char *severity)
2330 {
2331         int notif_severity = -1;
2332
2333         if (strcasecmp (severity, "FAILURE") == 0)
2334                 notif_severity = NOTIF_FAILURE;
2335         else if (strcmp (severity, "OKAY") == 0)
2336                 notif_severity = NOTIF_OKAY;
2337         else if ((strcmp (severity, "WARNING") == 0)
2338                         || (strcmp (severity, "WARN") == 0))
2339                 notif_severity = NOTIF_WARNING;
2340
2341         return (notif_severity);
2342 } /* int parse_notif_severity */
2343
2344 const data_set_t *plugin_get_ds (const char *name)
2345 {
2346         data_set_t *ds;
2347
2348         if (data_sets == NULL)
2349         {
2350                 ERROR ("plugin_get_ds: No data sets are defined yet.");
2351                 return (NULL);
2352         }
2353
2354         if (c_avl_get (data_sets, name, (void *) &ds) != 0)
2355         {
2356                 DEBUG ("No such dataset registered: %s", name);
2357                 return (NULL);
2358         }
2359
2360         return (ds);
2361 } /* data_set_t *plugin_get_ds */
2362
2363 static int plugin_notification_meta_add (notification_t *n,
2364     const char *name,
2365     enum notification_meta_type_e type,
2366     const void *value)
2367 {
2368   notification_meta_t *meta;
2369   notification_meta_t *tail;
2370
2371   if ((n == NULL) || (name == NULL) || (value == NULL))
2372   {
2373     ERROR ("plugin_notification_meta_add: A pointer is NULL!");
2374     return (-1);
2375   }
2376
2377   meta = (notification_meta_t *) malloc (sizeof (notification_meta_t));
2378   if (meta == NULL)
2379   {
2380     ERROR ("plugin_notification_meta_add: malloc failed.");
2381     return (-1);
2382   }
2383   memset (meta, 0, sizeof (notification_meta_t));
2384
2385   sstrncpy (meta->name, name, sizeof (meta->name));
2386   meta->type = type;
2387
2388   switch (type)
2389   {
2390     case NM_TYPE_STRING:
2391     {
2392       meta->nm_value.nm_string = strdup ((const char *) value);
2393       if (meta->nm_value.nm_string == NULL)
2394       {
2395         ERROR ("plugin_notification_meta_add: strdup failed.");
2396         sfree (meta);
2397         return (-1);
2398       }
2399       break;
2400     }
2401     case NM_TYPE_SIGNED_INT:
2402     {
2403       meta->nm_value.nm_signed_int = *((int64_t *) value);
2404       break;
2405     }
2406     case NM_TYPE_UNSIGNED_INT:
2407     {
2408       meta->nm_value.nm_unsigned_int = *((uint64_t *) value);
2409       break;
2410     }
2411     case NM_TYPE_DOUBLE:
2412     {
2413       meta->nm_value.nm_double = *((double *) value);
2414       break;
2415     }
2416     case NM_TYPE_BOOLEAN:
2417     {
2418       meta->nm_value.nm_boolean = *((_Bool *) value);
2419       break;
2420     }
2421     default:
2422     {
2423       ERROR ("plugin_notification_meta_add: Unknown type: %i", type);
2424       sfree (meta);
2425       return (-1);
2426     }
2427   } /* switch (type) */
2428
2429   meta->next = NULL;
2430   tail = n->meta;
2431   while ((tail != NULL) && (tail->next != NULL))
2432     tail = tail->next;
2433
2434   if (tail == NULL)
2435     n->meta = meta;
2436   else
2437     tail->next = meta;
2438
2439   return (0);
2440 } /* int plugin_notification_meta_add */
2441
2442 int plugin_notification_meta_add_string (notification_t *n,
2443     const char *name,
2444     const char *value)
2445 {
2446   return (plugin_notification_meta_add (n, name, NM_TYPE_STRING, value));
2447 }
2448
2449 int plugin_notification_meta_add_signed_int (notification_t *n,
2450     const char *name,
2451     int64_t value)
2452 {
2453   return (plugin_notification_meta_add (n, name, NM_TYPE_SIGNED_INT, &value));
2454 }
2455
2456 int plugin_notification_meta_add_unsigned_int (notification_t *n,
2457     const char *name,
2458     uint64_t value)
2459 {
2460   return (plugin_notification_meta_add (n, name, NM_TYPE_UNSIGNED_INT, &value));
2461 }
2462
2463 int plugin_notification_meta_add_double (notification_t *n,
2464     const char *name,
2465     double value)
2466 {
2467   return (plugin_notification_meta_add (n, name, NM_TYPE_DOUBLE, &value));
2468 }
2469
2470 int plugin_notification_meta_add_boolean (notification_t *n,
2471     const char *name,
2472     _Bool value)
2473 {
2474   return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value));
2475 }
2476
2477 int plugin_notification_meta_copy (notification_t *dst,
2478     const notification_t *src)
2479 {
2480   notification_meta_t *meta;
2481
2482   assert (dst != NULL);
2483   assert (src != NULL);
2484   assert (dst != src);
2485   assert ((src->meta == NULL) || (src->meta != dst->meta));
2486
2487   for (meta = src->meta; meta != NULL; meta = meta->next)
2488   {
2489     if (meta->type == NM_TYPE_STRING)
2490       plugin_notification_meta_add_string (dst, meta->name,
2491           meta->nm_value.nm_string);
2492     else if (meta->type == NM_TYPE_SIGNED_INT)
2493       plugin_notification_meta_add_signed_int (dst, meta->name,
2494           meta->nm_value.nm_signed_int);
2495     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2496       plugin_notification_meta_add_unsigned_int (dst, meta->name,
2497           meta->nm_value.nm_unsigned_int);
2498     else if (meta->type == NM_TYPE_DOUBLE)
2499       plugin_notification_meta_add_double (dst, meta->name,
2500           meta->nm_value.nm_double);
2501     else if (meta->type == NM_TYPE_BOOLEAN)
2502       plugin_notification_meta_add_boolean (dst, meta->name,
2503           meta->nm_value.nm_boolean);
2504   }
2505
2506   return (0);
2507 } /* int plugin_notification_meta_copy */
2508
2509 int plugin_notification_meta_free (notification_meta_t *n)
2510 {
2511   notification_meta_t *this;
2512   notification_meta_t *next;
2513
2514   if (n == NULL)
2515   {
2516     ERROR ("plugin_notification_meta_free: n == NULL!");
2517     return (-1);
2518   }
2519
2520   this = n;
2521   while (this != NULL)
2522   {
2523     next = this->next;
2524
2525     if (this->type == NM_TYPE_STRING)
2526     {
2527       free ((char *)this->nm_value.nm_string);
2528       this->nm_value.nm_string = NULL;
2529     }
2530     sfree (this);
2531
2532     this = next;
2533   }
2534
2535   return (0);
2536 } /* int plugin_notification_meta_free */
2537
2538 static void plugin_ctx_destructor (void *ctx)
2539 {
2540         sfree (ctx);
2541 } /* void plugin_ctx_destructor */
2542
2543 static plugin_ctx_t ctx_init = { /* interval = */ 0 };
2544
2545 static plugin_ctx_t *plugin_ctx_create (void)
2546 {
2547         plugin_ctx_t *ctx;
2548
2549         ctx = malloc (sizeof (*ctx));
2550         if (ctx == NULL) {
2551                 char errbuf[1024];
2552                 ERROR ("Failed to allocate plugin context: %s",
2553                                 sstrerror (errno, errbuf, sizeof (errbuf)));
2554                 return NULL;
2555         }
2556
2557         *ctx = ctx_init;
2558         assert (plugin_ctx_key_initialized);
2559         pthread_setspecific (plugin_ctx_key, ctx);
2560         DEBUG("Created new plugin context.");
2561         return (ctx);
2562 } /* int plugin_ctx_create */
2563
2564 void plugin_init_ctx (void)
2565 {
2566         pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
2567         plugin_ctx_key_initialized = 1;
2568 } /* void plugin_init_ctx */
2569
2570 plugin_ctx_t plugin_get_ctx (void)
2571 {
2572         plugin_ctx_t *ctx;
2573
2574         assert (plugin_ctx_key_initialized);
2575         ctx = pthread_getspecific (plugin_ctx_key);
2576
2577         if (ctx == NULL) {
2578                 ctx = plugin_ctx_create ();
2579                 /* this must no happen -- exit() instead? */
2580                 if (ctx == NULL)
2581                         return ctx_init;
2582         }
2583
2584         return (*ctx);
2585 } /* plugin_ctx_t plugin_get_ctx */
2586
2587 plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
2588 {
2589         plugin_ctx_t *c;
2590         plugin_ctx_t old;
2591
2592         assert (plugin_ctx_key_initialized);
2593         c = pthread_getspecific (plugin_ctx_key);
2594
2595         if (c == NULL) {
2596                 c = plugin_ctx_create ();
2597                 /* this must no happen -- exit() instead? */
2598                 if (c == NULL)
2599                         return ctx_init;
2600         }
2601
2602         old = *c;
2603         *c = ctx;
2604
2605         return (old);
2606 } /* void plugin_set_ctx */
2607
2608 cdtime_t plugin_get_interval (void)
2609 {
2610         cdtime_t interval;
2611
2612         interval = plugin_get_ctx().interval;
2613         if (interval > 0)
2614                 return interval;
2615
2616         return cf_get_default_interval ();
2617 } /* cdtime_t plugin_get_interval */
2618
2619 typedef struct {
2620         plugin_ctx_t ctx;
2621         void *(*start_routine) (void *);
2622         void *arg;
2623 } plugin_thread_t;
2624
2625 static void *plugin_thread_start (void *arg)
2626 {
2627         plugin_thread_t *plugin_thread = arg;
2628
2629         void *(*start_routine) (void *) = plugin_thread->start_routine;
2630         void *plugin_arg = plugin_thread->arg;
2631
2632         plugin_set_ctx (plugin_thread->ctx);
2633
2634         free (plugin_thread);
2635
2636         return start_routine (plugin_arg);
2637 } /* void *plugin_thread_start */
2638
2639 int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
2640                 void *(*start_routine) (void *), void *arg)
2641 {
2642         plugin_thread_t *plugin_thread;
2643
2644         plugin_thread = malloc (sizeof (*plugin_thread));
2645         if (plugin_thread == NULL)
2646                 return -1;
2647
2648         plugin_thread->ctx           = plugin_get_ctx ();
2649         plugin_thread->start_routine = start_routine;
2650         plugin_thread->arg           = arg;
2651
2652         return pthread_create (thread, attr,
2653                         plugin_thread_start, plugin_thread);
2654 } /* int plugin_thread_create */
2655
2656 /* vim: set sw=8 ts=8 noet fdm=marker : */