Merge branch 'collectd-5.5' into collectd-5.6
[collectd.git] / src / daemon / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "configfile.h"
33 #include "filter_chain.h"
34 #include "utils_avltree.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37 #include "utils_llist.h"
38 #include "utils_heap.h"
39 #include "utils_time.h"
40 #include "utils_random.h"
41
42 #include <ltdl.h>
43
44 /*
45  * Private structures
46  */
47 struct callback_func_s
48 {
49         void *cf_callback;
50         user_data_t cf_udata;
51         plugin_ctx_t cf_ctx;
52 };
53 typedef struct callback_func_s callback_func_t;
54
55 #define RF_SIMPLE  0
56 #define RF_COMPLEX 1
57 #define RF_REMOVE  65535
58 struct read_func_s
59 {
60         /* `read_func_t' "inherits" from `callback_func_t'.
61          * The `rf_super' member MUST be the first one in this structure! */
62 #define rf_callback rf_super.cf_callback
63 #define rf_udata rf_super.cf_udata
64 #define rf_ctx rf_super.cf_ctx
65         callback_func_t rf_super;
66         char rf_group[DATA_MAX_NAME_LEN];
67         char *rf_name;
68         int rf_type;
69         cdtime_t rf_interval;
70         cdtime_t rf_effective_interval;
71         cdtime_t rf_next_read;
72 };
73 typedef struct read_func_s read_func_t;
74
75 struct write_queue_s;
76 typedef struct write_queue_s write_queue_t;
77 struct write_queue_s
78 {
79         value_list_t *vl;
80         plugin_ctx_t ctx;
81         write_queue_t *next;
82 };
83
84 struct flush_callback_s {
85         char *name;
86         cdtime_t timeout;
87 };
88 typedef struct flush_callback_s flush_callback_t;
89
90 /*
91  * Private variables
92  */
93 static c_avl_tree_t *plugins_loaded = NULL;
94
95 static llist_t *list_init;
96 static llist_t *list_write;
97 static llist_t *list_flush;
98 static llist_t *list_missing;
99 static llist_t *list_shutdown;
100 static llist_t *list_log;
101 static llist_t *list_notification;
102
103 static fc_chain_t *pre_cache_chain = NULL;
104 static fc_chain_t *post_cache_chain = NULL;
105
106 static c_avl_tree_t *data_sets;
107
108 static char *plugindir = NULL;
109
110 #ifndef DEFAULT_MAX_READ_INTERVAL
111 # define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400)
112 #endif
113 static c_heap_t       *read_heap = NULL;
114 static llist_t        *read_list;
115 static int             read_loop = 1;
116 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
117 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
118 static pthread_t      *read_threads = NULL;
119 static int             read_threads_num = 0;
120 static cdtime_t        max_read_interval = DEFAULT_MAX_READ_INTERVAL;
121
122 static write_queue_t  *write_queue_head;
123 static write_queue_t  *write_queue_tail;
124 static long            write_queue_length = 0;
125 static _Bool           write_loop = 1;
126 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
127 static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
128 static pthread_t      *write_threads = NULL;
129 static size_t          write_threads_num = 0;
130
131 static pthread_key_t   plugin_ctx_key;
132 static _Bool           plugin_ctx_key_initialized = 0;
133
134 static long            write_limit_high = 0;
135 static long            write_limit_low = 0;
136
137 static derive_t        stats_values_dropped = 0;
138 static _Bool           record_statistics = 0;
139
140 /*
141  * Static functions
142  */
143 static int plugin_dispatch_values_internal (value_list_t *vl);
144
145 static const char *plugin_get_dir (void)
146 {
147         if (plugindir == NULL)
148                 return (PLUGINDIR);
149         else
150                 return (plugindir);
151 }
152
153 static void plugin_update_internal_statistics (void) { /* {{{ */
154         derive_t copy_write_queue_length;
155         value_list_t vl = VALUE_LIST_INIT;
156         value_t values[2];
157
158         copy_write_queue_length = write_queue_length;
159
160         /* Initialize `vl' */
161         vl.values = values;
162         vl.values_len = 2;
163         vl.time = 0;
164         sstrncpy (vl.host, hostname_g, sizeof (vl.host));
165         sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin));
166
167         vl.type_instance[0] = 0;
168         vl.values_len = 1;
169
170         /* Write queue */
171         sstrncpy (vl.plugin_instance, "write_queue",
172                         sizeof (vl.plugin_instance));
173
174         /* Write queue : queue length */
175         vl.values[0].gauge = (gauge_t) copy_write_queue_length;
176         sstrncpy (vl.type, "queue_length", sizeof (vl.type));
177         vl.type_instance[0] = 0;
178         plugin_dispatch_values (&vl);
179
180         /* Write queue : Values dropped (queue length > low limit) */
181         vl.values[0].derive = (derive_t) stats_values_dropped;
182         sstrncpy (vl.type, "derive", sizeof (vl.type));
183         sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance));
184         plugin_dispatch_values (&vl);
185
186         /* Cache */
187         sstrncpy (vl.plugin_instance, "cache",
188                         sizeof (vl.plugin_instance));
189
190         /* Cache : Nb entry in cache tree */
191         vl.values[0].gauge = (gauge_t) uc_get_size();
192         sstrncpy (vl.type, "cache_size", sizeof (vl.type));
193         vl.type_instance[0] = 0;
194         plugin_dispatch_values (&vl);
195
196         return;
197 } /* }}} void plugin_update_internal_statistics */
198
199 static void destroy_callback (callback_func_t *cf) /* {{{ */
200 {
201         if (cf == NULL)
202                 return;
203
204         if ((cf->cf_udata.data != NULL) && (cf->cf_udata.free_func != NULL))
205         {
206                 cf->cf_udata.free_func (cf->cf_udata.data);
207                 cf->cf_udata.data = NULL;
208                 cf->cf_udata.free_func = NULL;
209         }
210         sfree (cf);
211 } /* }}} void destroy_callback */
212
213 static void destroy_all_callbacks (llist_t **list) /* {{{ */
214 {
215         llentry_t *le;
216
217         if (*list == NULL)
218                 return;
219
220         le = llist_head (*list);
221         while (le != NULL)
222         {
223                 llentry_t *le_next;
224
225                 le_next = le->next;
226
227                 sfree (le->key);
228                 destroy_callback (le->value);
229                 le->value = NULL;
230
231                 le = le_next;
232         }
233
234         llist_destroy (*list);
235         *list = NULL;
236 } /* }}} void destroy_all_callbacks */
237
238 static void destroy_read_heap (void) /* {{{ */
239 {
240         if (read_heap == NULL)
241                 return;
242
243         while (42)
244         {
245                 read_func_t *rf;
246
247                 rf = c_heap_get_root (read_heap);
248                 if (rf == NULL)
249                         break;
250                 sfree (rf->rf_name);
251                 destroy_callback ((callback_func_t *) rf);
252         }
253
254         c_heap_destroy (read_heap);
255         read_heap = NULL;
256 } /* }}} void destroy_read_heap */
257
258 static int register_callback (llist_t **list, /* {{{ */
259                 const char *name, callback_func_t *cf)
260 {
261         llentry_t *le;
262         char *key;
263
264         if (*list == NULL)
265         {
266                 *list = llist_create ();
267                 if (*list == NULL)
268                 {
269                         ERROR ("plugin: register_callback: "
270                                         "llist_create failed.");
271                         destroy_callback (cf);
272                         return (-1);
273                 }
274         }
275
276         key = strdup (name);
277         if (key == NULL)
278         {
279                 ERROR ("plugin: register_callback: strdup failed.");
280                 destroy_callback (cf);
281                 return (-1);
282         }
283
284         le = llist_search (*list, name);
285         if (le == NULL)
286         {
287                 le = llentry_create (key, cf);
288                 if (le == NULL)
289                 {
290                         ERROR ("plugin: register_callback: "
291                                         "llentry_create failed.");
292                         sfree (key);
293                         destroy_callback (cf);
294                         return (-1);
295                 }
296
297                 llist_append (*list, le);
298         }
299         else
300         {
301                 callback_func_t *old_cf;
302
303                 old_cf = le->value;
304                 le->value = cf;
305
306                 WARNING ("plugin: register_callback: "
307                                 "a callback named `%s' already exists - "
308                                 "overwriting the old entry!", name);
309
310                 destroy_callback (old_cf);
311                 sfree (key);
312         }
313
314         return (0);
315 } /* }}} int register_callback */
316
317 static void log_list_callbacks (llist_t **list, /* {{{ */
318                                 const char *comment)
319 {
320         char *str;
321         int len;
322         int i;
323         llentry_t *le;
324         int n;
325         char **keys;
326
327         n = llist_size(*list);
328         if (n == 0)
329         {
330                 INFO("%s [none]", comment);
331                 return;
332         }
333
334         keys = calloc(n, sizeof(char*));
335
336         if (keys == NULL)
337         {
338                 ERROR("%s: failed to allocate memory for list of callbacks",
339                       comment);
340
341                 return;
342         }
343
344         for (le = llist_head (*list), i = 0, len = 0;
345              le != NULL;
346              le = le->next, i++)
347         {
348                 keys[i] = le->key;
349                 len += strlen(le->key) + 6;
350         }
351         str = malloc(len + 10);
352         if (str == NULL)
353         {
354                 ERROR("%s: failed to allocate memory for list of callbacks",
355                       comment);
356         }
357         else
358         {
359                 *str = '\0';
360                 strjoin(str, len, keys, n, "', '");
361                 INFO("%s ['%s']", comment, str);
362                 sfree (str);
363         }
364         sfree (keys);
365 } /* }}} void log_list_callbacks */
366
367 static int create_register_callback (llist_t **list, /* {{{ */
368                 const char *name, void *callback, user_data_t *ud)
369 {
370         callback_func_t *cf;
371
372         cf = calloc (1, sizeof (*cf));
373         if (cf == NULL)
374         {
375                 ERROR ("plugin: create_register_callback: calloc failed.");
376                 return (-1);
377         }
378
379         cf->cf_callback = callback;
380         if (ud == NULL)
381         {
382                 cf->cf_udata.data = NULL;
383                 cf->cf_udata.free_func = NULL;
384         }
385         else
386         {
387                 cf->cf_udata = *ud;
388         }
389
390         cf->cf_ctx = plugin_get_ctx ();
391
392         return (register_callback (list, name, cf));
393 } /* }}} int create_register_callback */
394
395 static int plugin_unregister (llist_t *list, const char *name) /* {{{ */
396 {
397         llentry_t *e;
398
399         if (list == NULL)
400                 return (-1);
401
402         e = llist_search (list, name);
403         if (e == NULL)
404                 return (-1);
405
406         llist_remove (list, e);
407
408         sfree (e->key);
409         destroy_callback (e->value);
410
411         llentry_destroy (e);
412
413         return (0);
414 } /* }}} int plugin_unregister */
415
416 /*
417  * (Try to) load the shared object `file'. Won't complain if it isn't a shared
418  * object, but it will bitch about a shared object not having a
419  * ``module_register'' symbol..
420  */
421 static int plugin_load_file (char *file, uint32_t flags)
422 {
423         lt_dlhandle dlh;
424         void (*reg_handle) (void);
425
426         lt_dlinit ();
427         lt_dlerror (); /* clear errors */
428
429 #if LIBTOOL_VERSION == 2
430         if (flags & PLUGIN_FLAGS_GLOBAL) {
431                 lt_dladvise advise;
432                 lt_dladvise_init(&advise);
433                 lt_dladvise_global(&advise);
434                 dlh = lt_dlopenadvise(file, advise);
435                 lt_dladvise_destroy(&advise);
436         } else {
437                 dlh = lt_dlopen (file);
438         }
439 #else /* if LIBTOOL_VERSION == 1 */
440         if (flags & PLUGIN_FLAGS_GLOBAL)
441                 WARNING ("plugin_load_file: The global flag is not supported, "
442                                 "libtool 2 is required for this.");
443         dlh = lt_dlopen (file);
444 #endif
445
446         if (dlh == NULL)
447         {
448                 char errbuf[1024] = "";
449
450                 ssnprintf (errbuf, sizeof (errbuf),
451                                 "lt_dlopen (\"%s\") failed: %s. "
452                                 "The most common cause for this problem is "
453                                 "missing dependencies. Use ldd(1) to check "
454                                 "the dependencies of the plugin "
455                                 "/ shared object.",
456                                 file, lt_dlerror ());
457
458                 ERROR ("%s", errbuf);
459                 /* Make sure this is printed to STDERR in any case, but also
460                  * make sure it's printed only once. */
461                 if (list_log != NULL)
462                         fprintf (stderr, "ERROR: %s\n", errbuf);
463
464                 return (1);
465         }
466
467         if ((reg_handle = (void (*) (void)) lt_dlsym (dlh, "module_register")) == NULL)
468         {
469                 WARNING ("Couldn't find symbol \"module_register\" in \"%s\": %s\n",
470                                 file, lt_dlerror ());
471                 lt_dlclose (dlh);
472                 return (-1);
473         }
474
475         (*reg_handle) ();
476
477         return (0);
478 }
479
480 static void *plugin_read_thread (void __attribute__((unused)) *args)
481 {
482         while (read_loop != 0)
483         {
484                 read_func_t *rf;
485                 plugin_ctx_t old_ctx;
486                 cdtime_t start;
487                 cdtime_t now;
488                 cdtime_t elapsed;
489                 int status;
490                 int rf_type;
491                 int rc;
492
493                 /* Get the read function that needs to be read next.
494                  * We don't need to hold "read_lock" for the heap, but we need
495                  * to call c_heap_get_root() and pthread_cond_wait() in the
496                  * same protected block. */
497                 pthread_mutex_lock (&read_lock);
498                 rf = c_heap_get_root (read_heap);
499                 if (rf == NULL)
500                 {
501                         pthread_cond_wait (&read_cond, &read_lock);
502                         pthread_mutex_unlock (&read_lock);
503                         continue;
504                 }
505                 pthread_mutex_unlock (&read_lock);
506
507                 if (rf->rf_interval == 0)
508                 {
509                         /* this should not happen, because the interval is set
510                          * for each plugin when loading it
511                          * XXX: issue a warning? */
512                         rf->rf_interval = plugin_get_interval ();
513                         rf->rf_effective_interval = rf->rf_interval;
514
515                         rf->rf_next_read = cdtime ();
516                 }
517
518                 /* sleep until this entry is due,
519                  * using pthread_cond_timedwait */
520                 pthread_mutex_lock (&read_lock);
521                 /* In pthread_cond_timedwait, spurious wakeups are possible
522                  * (and really happen, at least on NetBSD with > 1 CPU), thus
523                  * we need to re-evaluate the condition every time
524                  * pthread_cond_timedwait returns. */
525                 rc = 0;
526                 while ((read_loop != 0)
527                                 && (cdtime () < rf->rf_next_read)
528                                 && rc == 0)
529                 {
530                         struct timespec ts = { 0 };
531
532                         CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
533
534                         rc = pthread_cond_timedwait (&read_cond, &read_lock,
535                                 &ts);
536                 }
537
538                 /* Must hold `read_lock' when accessing `rf->rf_type'. */
539                 rf_type = rf->rf_type;
540                 pthread_mutex_unlock (&read_lock);
541
542                 /* Check if we're supposed to stop.. This may have interrupted
543                  * the sleep, too. */
544                 if (read_loop == 0)
545                 {
546                         /* Insert `rf' again, so it can be free'd correctly */
547                         c_heap_insert (read_heap, rf);
548                         break;
549                 }
550
551                 /* The entry has been marked for deletion. The linked list
552                  * entry has already been removed by `plugin_unregister_read'.
553                  * All we have to do here is free the `read_func_t' and
554                  * continue. */
555                 if (rf_type == RF_REMOVE)
556                 {
557                         DEBUG ("plugin_read_thread: Destroying the `%s' "
558                                         "callback.", rf->rf_name);
559                         sfree (rf->rf_name);
560                         destroy_callback ((callback_func_t *) rf);
561                         rf = NULL;
562                         continue;
563                 }
564
565                 DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
566
567                 start = cdtime ();
568
569                 old_ctx = plugin_set_ctx (rf->rf_ctx);
570
571                 if (rf_type == RF_SIMPLE)
572                 {
573                         int (*callback) (void);
574
575                         callback = rf->rf_callback;
576                         status = (*callback) ();
577                 }
578                 else
579                 {
580                         plugin_read_cb callback;
581
582                         assert (rf_type == RF_COMPLEX);
583
584                         callback = rf->rf_callback;
585                         status = (*callback) (&rf->rf_udata);
586                 }
587
588                 plugin_set_ctx (old_ctx);
589
590                 /* If the function signals failure, we will increase the
591                  * intervals in which it will be called. */
592                 if (status != 0)
593                 {
594                         rf->rf_effective_interval *= 2;
595                         if (rf->rf_effective_interval > max_read_interval)
596                                 rf->rf_effective_interval = max_read_interval;
597
598                         NOTICE ("read-function of plugin `%s' failed. "
599                                         "Will suspend it for %.3f seconds.",
600                                         rf->rf_name,
601                                         CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
602                 }
603                 else
604                 {
605                         /* Success: Restore the interval, if it was changed. */
606                         rf->rf_effective_interval = rf->rf_interval;
607                 }
608
609                 /* update the ``next read due'' field */
610                 now = cdtime ();
611
612                 /* calculate the time spent in the read function */
613                 elapsed = (now - start);
614
615                 if (elapsed > rf->rf_effective_interval)
616                         WARNING ("plugin_read_thread: read-function of the `%s' plugin took %.3f "
617                                 "seconds, which is above its read interval (%.3f seconds). You might "
618                                 "want to adjust the `Interval' or `ReadThreads' settings.",
619                                 rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
620                                 CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
621
622                 DEBUG ("plugin_read_thread: read-function of the `%s' plugin took "
623                                 "%.6f seconds.",
624                                 rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
625
626                 DEBUG ("plugin_read_thread: Effective interval of the "
627                                 "`%s' plugin is %.3f seconds.",
628                                 rf->rf_name,
629                                 CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
630
631                 /* Calculate the next (absolute) time at which this function
632                  * should be called. */
633                 rf->rf_next_read += rf->rf_effective_interval;
634
635                 /* Check, if `rf_next_read' is in the past. */
636                 if (rf->rf_next_read < now)
637                 {
638                         /* `rf_next_read' is in the past. Insert `now'
639                          * so this value doesn't trail off into the
640                          * past too much. */
641                         rf->rf_next_read = now;
642                 }
643
644                 DEBUG ("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
645                                 rf->rf_name,
646                                 CDTIME_T_TO_DOUBLE (rf->rf_next_read));
647
648                 /* Re-insert this read function into the heap again. */
649                 c_heap_insert (read_heap, rf);
650         } /* while (read_loop) */
651
652         pthread_exit (NULL);
653         return ((void *) 0);
654 } /* void *plugin_read_thread */
655
656 static void start_read_threads (int num)
657 {
658         if (read_threads != NULL)
659                 return;
660
661         read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
662         if (read_threads == NULL)
663         {
664                 ERROR ("plugin: start_read_threads: calloc failed.");
665                 return;
666         }
667
668         read_threads_num = 0;
669         for (int i = 0; i < num; i++)
670         {
671                 if (pthread_create (read_threads + read_threads_num, NULL,
672                                         plugin_read_thread, NULL) == 0)
673                 {
674                         read_threads_num++;
675                 }
676                 else
677                 {
678                         ERROR ("plugin: start_read_threads: pthread_create failed.");
679                         return;
680                 }
681         } /* for (i) */
682 } /* void start_read_threads */
683
684 static void stop_read_threads (void)
685 {
686         if (read_threads == NULL)
687                 return;
688
689         INFO ("collectd: Stopping %i read threads.", read_threads_num);
690
691         pthread_mutex_lock (&read_lock);
692         read_loop = 0;
693         DEBUG ("plugin: stop_read_threads: Signalling `read_cond'");
694         pthread_cond_broadcast (&read_cond);
695         pthread_mutex_unlock (&read_lock);
696
697         for (int i = 0; i < read_threads_num; i++)
698         {
699                 if (pthread_join (read_threads[i], NULL) != 0)
700                 {
701                         ERROR ("plugin: stop_read_threads: pthread_join failed.");
702                 }
703                 read_threads[i] = (pthread_t) 0;
704         }
705         sfree (read_threads);
706         read_threads_num = 0;
707 } /* void stop_read_threads */
708
709 static void plugin_value_list_free (value_list_t *vl) /* {{{ */
710 {
711         if (vl == NULL)
712                 return;
713
714         meta_data_destroy (vl->meta);
715         sfree (vl->values);
716         sfree (vl);
717 } /* }}} void plugin_value_list_free */
718
719 static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
720 {
721         value_list_t *vl;
722
723         if (vl_orig == NULL)
724                 return (NULL);
725
726         vl = malloc (sizeof (*vl));
727         if (vl == NULL)
728                 return (NULL);
729         memcpy (vl, vl_orig, sizeof (*vl));
730
731         vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
732         if (vl->values == NULL)
733         {
734                 plugin_value_list_free (vl);
735                 return (NULL);
736         }
737         memcpy (vl->values, vl_orig->values,
738                         vl_orig->values_len * sizeof (*vl->values));
739
740         vl->meta = meta_data_clone (vl->meta);
741         if ((vl_orig->meta != NULL) && (vl->meta == NULL))
742         {
743                 plugin_value_list_free (vl);
744                 return (NULL);
745         }
746
747         if (vl->time == 0)
748                 vl->time = cdtime ();
749
750         /* Fill in the interval from the thread context, if it is zero. */
751         if (vl->interval == 0)
752         {
753                 plugin_ctx_t ctx = plugin_get_ctx ();
754
755                 if (ctx.interval != 0)
756                         vl->interval = ctx.interval;
757                 else
758                 {
759                         char name[6 * DATA_MAX_NAME_LEN];
760                         FORMAT_VL (name, sizeof (name), vl);
761                         ERROR ("plugin_value_list_clone: Unable to determine "
762                                         "interval from context for "
763                                         "value list \"%s\". "
764                                         "This indicates a broken plugin. "
765                                         "Please report this problem to the "
766                                         "collectd mailing list or at "
767                                         "<http://collectd.org/bugs/>.", name);
768                         vl->interval = cf_get_default_interval ();
769                 }
770         }
771
772         return (vl);
773 } /* }}} value_list_t *plugin_value_list_clone */
774
775 static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
776 {
777         write_queue_t *q;
778
779         q = malloc (sizeof (*q));
780         if (q == NULL)
781                 return (ENOMEM);
782         q->next = NULL;
783
784         q->vl = plugin_value_list_clone (vl);
785         if (q->vl == NULL)
786         {
787                 sfree (q);
788                 return (ENOMEM);
789         }
790
791         /* Store context of caller (read plugin); otherwise, it would not be
792          * available to the write plugins when actually dispatching the
793          * value-list later on. */
794         q->ctx = plugin_get_ctx ();
795
796         pthread_mutex_lock (&write_lock);
797
798         if (write_queue_tail == NULL)
799         {
800                 write_queue_head = q;
801                 write_queue_tail = q;
802                 write_queue_length = 1;
803         }
804         else
805         {
806                 write_queue_tail->next = q;
807                 write_queue_tail = q;
808                 write_queue_length += 1;
809         }
810
811         pthread_cond_signal (&write_cond);
812         pthread_mutex_unlock (&write_lock);
813
814         return (0);
815 } /* }}} int plugin_write_enqueue */
816
817 static value_list_t *plugin_write_dequeue (void) /* {{{ */
818 {
819         write_queue_t *q;
820         value_list_t *vl;
821
822         pthread_mutex_lock (&write_lock);
823
824         while (write_loop && (write_queue_head == NULL))
825                 pthread_cond_wait (&write_cond, &write_lock);
826
827         if (write_queue_head == NULL)
828         {
829                 pthread_mutex_unlock (&write_lock);
830                 return (NULL);
831         }
832
833         q = write_queue_head;
834         write_queue_head = q->next;
835         write_queue_length -= 1;
836         if (write_queue_head == NULL) {
837                 write_queue_tail = NULL;
838                 assert(0 == write_queue_length);
839                 }
840
841         pthread_mutex_unlock (&write_lock);
842
843         (void) plugin_set_ctx (q->ctx);
844
845         vl = q->vl;
846         sfree (q);
847         return (vl);
848 } /* }}} value_list_t *plugin_write_dequeue */
849
850 static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
851 {
852         while (write_loop)
853         {
854                 value_list_t *vl = plugin_write_dequeue ();
855                 if (vl == NULL)
856                         continue;
857
858                 plugin_dispatch_values_internal (vl);
859
860                 plugin_value_list_free (vl);
861         }
862
863         pthread_exit (NULL);
864         return ((void *) 0);
865 } /* }}} void *plugin_write_thread */
866
867 static void start_write_threads (size_t num) /* {{{ */
868 {
869         if (write_threads != NULL)
870                 return;
871
872         write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
873         if (write_threads == NULL)
874         {
875                 ERROR ("plugin: start_write_threads: calloc failed.");
876                 return;
877         }
878
879         write_threads_num = 0;
880         for (size_t i = 0; i < num; i++)
881         {
882                 int status;
883
884                 status = pthread_create (write_threads + write_threads_num,
885                                 /* attr = */ NULL,
886                                 plugin_write_thread,
887                                 /* arg = */ NULL);
888                 if (status != 0)
889                 {
890                         char errbuf[1024];
891                         ERROR ("plugin: start_write_threads: pthread_create failed "
892                                         "with status %i (%s).", status,
893                                         sstrerror (status, errbuf, sizeof (errbuf)));
894                         return;
895                 }
896
897                 write_threads_num++;
898         } /* for (i) */
899 } /* }}} void start_write_threads */
900
901 static void stop_write_threads (void) /* {{{ */
902 {
903         write_queue_t *q;
904         size_t i;
905
906         if (write_threads == NULL)
907                 return;
908
909         INFO ("collectd: Stopping %zu write threads.", write_threads_num);
910
911         pthread_mutex_lock (&write_lock);
912         write_loop = 0;
913         DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
914         pthread_cond_broadcast (&write_cond);
915         pthread_mutex_unlock (&write_lock);
916
917         for (i = 0; i < write_threads_num; i++)
918         {
919                 if (pthread_join (write_threads[i], NULL) != 0)
920                 {
921                         ERROR ("plugin: stop_write_threads: pthread_join failed.");
922                 }
923                 write_threads[i] = (pthread_t) 0;
924         }
925         sfree (write_threads);
926         write_threads_num = 0;
927
928         pthread_mutex_lock (&write_lock);
929         i = 0;
930         for (q = write_queue_head; q != NULL; )
931         {
932                 write_queue_t *q1 = q;
933                 plugin_value_list_free (q->vl);
934                 q = q->next;
935                 sfree (q1);
936                 i++;
937         }
938         write_queue_head = NULL;
939         write_queue_tail = NULL;
940         write_queue_length = 0;
941         pthread_mutex_unlock (&write_lock);
942
943         if (i > 0)
944         {
945                 WARNING ("plugin: %zu value list%s left after shutting down "
946                                 "the write threads.",
947                                 i, (i == 1) ? " was" : "s were");
948         }
949 } /* }}} void stop_write_threads */
950
951 /*
952  * Public functions
953  */
954 void plugin_set_dir (const char *dir)
955 {
956         sfree (plugindir);
957
958         if (dir == NULL)
959         {
960                 plugindir = NULL;
961                 return;
962         }
963
964         plugindir = strdup (dir);
965         if (plugindir == NULL)
966                 ERROR ("plugin_set_dir: strdup(\"%s\") failed", dir);
967 }
968
969 static _Bool plugin_is_loaded (char const *name)
970 {
971         int status;
972
973         if (plugins_loaded == NULL)
974                 plugins_loaded = c_avl_create ((int (*) (const void *, const void *)) strcasecmp);
975         assert (plugins_loaded != NULL);
976
977         status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL);
978         return (status == 0);
979 }
980
981 static int plugin_mark_loaded (char const *name)
982 {
983         char *name_copy;
984         int status;
985
986         name_copy = strdup (name);
987         if (name_copy == NULL)
988                 return (ENOMEM);
989
990         status = c_avl_insert (plugins_loaded,
991                         /* key = */ name_copy, /* value = */ NULL);
992         return (status);
993 }
994
995 static void plugin_free_loaded (void)
996 {
997         void *key;
998         void *value;
999
1000         if (plugins_loaded == NULL)
1001                 return;
1002
1003         while (c_avl_pick (plugins_loaded, &key, &value) == 0)
1004         {
1005                 sfree (key);
1006                 assert (value == NULL);
1007         }
1008
1009         c_avl_destroy (plugins_loaded);
1010         plugins_loaded = NULL;
1011 }
1012
1013 #define BUFSIZE 512
1014 int plugin_load (char const *plugin_name, uint32_t flags)
1015 {
1016         DIR  *dh;
1017         const char *dir;
1018         char  filename[BUFSIZE] = "";
1019         char  typename[BUFSIZE];
1020         int   ret;
1021         struct stat    statbuf;
1022         struct dirent *de;
1023         int status;
1024
1025         if (plugin_name == NULL)
1026                 return (EINVAL);
1027
1028         /* Check if plugin is already loaded and don't do anything in this
1029          * case. */
1030         if (plugin_is_loaded (plugin_name))
1031                 return (0);
1032
1033         dir = plugin_get_dir ();
1034         ret = 1;
1035
1036         /*
1037          * XXX: Magic at work:
1038          *
1039          * Some of the language bindings, for example the Python and Perl
1040          * plugins, need to be able to export symbols to the scripts they run.
1041          * For this to happen, the "Globals" flag needs to be set.
1042          * Unfortunately, this technical detail is hard to explain to the
1043          * average user and she shouldn't have to worry about this, ideally.
1044          * So in order to save everyone's sanity use a different default for a
1045          * handful of special plugins. --octo
1046          */
1047         if ((strcasecmp ("perl", plugin_name) == 0)
1048                         || (strcasecmp ("python", plugin_name) == 0))
1049                 flags |= PLUGIN_FLAGS_GLOBAL;
1050
1051         /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
1052          * type when matching the filename */
1053         status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name);
1054         if ((status < 0) || ((size_t) status >= sizeof (typename)))
1055         {
1056                 WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name);
1057                 return (-1);
1058         }
1059
1060         if ((dh = opendir (dir)) == NULL)
1061         {
1062                 char errbuf[1024];
1063                 ERROR ("plugin_load: opendir (%s) failed: %s", dir,
1064                                 sstrerror (errno, errbuf, sizeof (errbuf)));
1065                 return (-1);
1066         }
1067
1068         while ((de = readdir (dh)) != NULL)
1069         {
1070                 if (strcasecmp (de->d_name, typename))
1071                         continue;
1072
1073                 status = ssnprintf (filename, sizeof (filename),
1074                                 "%s/%s", dir, de->d_name);
1075                 if ((status < 0) || ((size_t) status >= sizeof (filename)))
1076                 {
1077                         WARNING ("plugin_load: Filename too long: \"%s/%s\"",
1078                                         dir, de->d_name);
1079                         continue;
1080                 }
1081
1082                 if (lstat (filename, &statbuf) == -1)
1083                 {
1084                         char errbuf[1024];
1085                         WARNING ("plugin_load: stat (\"%s\") failed: %s",
1086                                         filename,
1087                                         sstrerror (errno, errbuf, sizeof (errbuf)));
1088                         continue;
1089                 }
1090                 else if (!S_ISREG (statbuf.st_mode))
1091                 {
1092                         /* don't follow symlinks */
1093                         WARNING ("plugin_load: %s is not a regular file.",
1094                                         filename);
1095                         continue;
1096                 }
1097
1098                 status = plugin_load_file (filename, flags);
1099                 if (status == 0)
1100                 {
1101                         /* success */
1102                         plugin_mark_loaded (plugin_name);
1103                         ret = 0;
1104                         INFO ("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1105                         break;
1106                 }
1107                 else
1108                 {
1109                         ERROR ("plugin_load: Load plugin \"%s\" failed with "
1110                                         "status %i.", plugin_name, status);
1111                 }
1112         }
1113
1114         closedir (dh);
1115
1116         if (filename[0] == 0)
1117                 ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
1118                                 plugin_name, dir);
1119
1120         return (ret);
1121 }
1122
1123 /*
1124  * The `register_*' functions follow
1125  */
1126 int plugin_register_config (const char *name,
1127                 int (*callback) (const char *key, const char *val),
1128                 const char **keys, int keys_num)
1129 {
1130         cf_register (name, callback, keys, keys_num);
1131         return (0);
1132 } /* int plugin_register_config */
1133
1134 int plugin_register_complex_config (const char *type,
1135                 int (*callback) (oconfig_item_t *))
1136 {
1137         return (cf_register_complex (type, callback));
1138 } /* int plugin_register_complex_config */
1139
1140 int plugin_register_init (const char *name,
1141                 int (*callback) (void))
1142 {
1143         return (create_register_callback (&list_init, name, (void *) callback,
1144                                 /* user_data = */ NULL));
1145 } /* plugin_register_init */
1146
1147 static int plugin_compare_read_func (const void *arg0, const void *arg1)
1148 {
1149         const read_func_t *rf0;
1150         const read_func_t *rf1;
1151
1152         rf0 = arg0;
1153         rf1 = arg1;
1154
1155         if (rf0->rf_next_read < rf1->rf_next_read)
1156                 return (-1);
1157         else if (rf0->rf_next_read > rf1->rf_next_read)
1158                 return (1);
1159         else
1160                 return (0);
1161 } /* int plugin_compare_read_func */
1162
1163 /* Add a read function to both, the heap and a linked list. The linked list if
1164  * used to look-up read functions, especially for the remove function. The heap
1165  * is used to determine which plugin to read next. */
1166 static int plugin_insert_read (read_func_t *rf)
1167 {
1168         int status;
1169         llentry_t *le;
1170
1171         rf->rf_next_read = cdtime ();
1172         rf->rf_effective_interval = rf->rf_interval;
1173
1174         pthread_mutex_lock (&read_lock);
1175
1176         if (read_list == NULL)
1177         {
1178                 read_list = llist_create ();
1179                 if (read_list == NULL)
1180                 {
1181                         pthread_mutex_unlock (&read_lock);
1182                         ERROR ("plugin_insert_read: read_list failed.");
1183                         return (-1);
1184                 }
1185         }
1186
1187         if (read_heap == NULL)
1188         {
1189                 read_heap = c_heap_create (plugin_compare_read_func);
1190                 if (read_heap == NULL)
1191                 {
1192                         pthread_mutex_unlock (&read_lock);
1193                         ERROR ("plugin_insert_read: c_heap_create failed.");
1194                         return (-1);
1195                 }
1196         }
1197
1198         le = llist_search (read_list, rf->rf_name);
1199         if (le != NULL)
1200         {
1201                 pthread_mutex_unlock (&read_lock);
1202                 WARNING ("The read function \"%s\" is already registered. "
1203                                 "Check for duplicate \"LoadPlugin\" lines "
1204                                 "in your configuration!",
1205                                 rf->rf_name);
1206                 return (EINVAL);
1207         }
1208
1209         le = llentry_create (rf->rf_name, rf);
1210         if (le == NULL)
1211         {
1212                 pthread_mutex_unlock (&read_lock);
1213                 ERROR ("plugin_insert_read: llentry_create failed.");
1214                 return (-1);
1215         }
1216
1217         status = c_heap_insert (read_heap, rf);
1218         if (status != 0)
1219         {
1220                 pthread_mutex_unlock (&read_lock);
1221                 ERROR ("plugin_insert_read: c_heap_insert failed.");
1222                 llentry_destroy (le);
1223                 return (-1);
1224         }
1225
1226         /* This does not fail. */
1227         llist_append (read_list, le);
1228
1229         /* Wake up all the read threads. */
1230         pthread_cond_broadcast (&read_cond);
1231         pthread_mutex_unlock (&read_lock);
1232         return (0);
1233 } /* int plugin_insert_read */
1234
1235 int plugin_register_read (const char *name,
1236                 int (*callback) (void))
1237 {
1238         read_func_t *rf;
1239         int status;
1240
1241         rf = calloc (1, sizeof (*rf));
1242         if (rf == NULL)
1243         {
1244                 ERROR ("plugin_register_read: calloc failed.");
1245                 return (ENOMEM);
1246         }
1247
1248         rf->rf_callback = (void *) callback;
1249         rf->rf_udata.data = NULL;
1250         rf->rf_udata.free_func = NULL;
1251         rf->rf_ctx = plugin_get_ctx ();
1252         rf->rf_group[0] = '\0';
1253         rf->rf_name = strdup (name);
1254         rf->rf_type = RF_SIMPLE;
1255         rf->rf_interval = plugin_get_interval ();
1256
1257         status = plugin_insert_read (rf);
1258         if (status != 0) {
1259                 sfree (rf->rf_name);
1260                 sfree (rf);
1261         }
1262
1263         return (status);
1264 } /* int plugin_register_read */
1265
1266 int plugin_register_complex_read (const char *group, const char *name,
1267                 plugin_read_cb callback,
1268                 cdtime_t interval,
1269                 user_data_t *user_data)
1270 {
1271         read_func_t *rf;
1272         int status;
1273
1274         rf = calloc (1,sizeof (*rf));
1275         if (rf == NULL)
1276         {
1277                 ERROR ("plugin_register_complex_read: calloc failed.");
1278                 return (ENOMEM);
1279         }
1280
1281         rf->rf_callback = (void *) callback;
1282         if (group != NULL)
1283                 sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
1284         else
1285                 rf->rf_group[0] = '\0';
1286         rf->rf_name = strdup (name);
1287         rf->rf_type = RF_COMPLEX;
1288         rf->rf_interval = (interval != 0) ? interval : plugin_get_interval ();
1289
1290         /* Set user data */
1291         if (user_data == NULL)
1292         {
1293                 rf->rf_udata.data = NULL;
1294                 rf->rf_udata.free_func = NULL;
1295         }
1296         else
1297         {
1298                 rf->rf_udata = *user_data;
1299         }
1300
1301         rf->rf_ctx = plugin_get_ctx ();
1302
1303         status = plugin_insert_read (rf);
1304         if (status != 0) {
1305                 sfree (rf->rf_name);
1306                 sfree (rf);
1307         }
1308
1309         return (status);
1310 } /* int plugin_register_complex_read */
1311
1312 int plugin_register_write (const char *name,
1313                 plugin_write_cb callback, user_data_t *ud)
1314 {
1315         return (create_register_callback (&list_write, name,
1316                                 (void *) callback, ud));
1317 } /* int plugin_register_write */
1318
1319 static int plugin_flush_timeout_callback (user_data_t *ud)
1320 {
1321         flush_callback_t *cb = ud->data;
1322
1323         return plugin_flush (cb->name, cb->timeout, /* identifier = */ NULL);
1324 } /* static int plugin_flush_callback */
1325
1326 static void plugin_flush_timeout_callback_free (void *data)
1327 {
1328         flush_callback_t *cb = data;
1329
1330         if (cb == NULL) return;
1331
1332         sfree (cb->name);
1333         sfree (cb);
1334 } /* static void plugin_flush_callback_free */
1335
1336 static char *plugin_flush_callback_name (const char *name)
1337 {
1338         const char *flush_prefix = "flush/";
1339         size_t prefix_size;
1340         char *flush_name;
1341         size_t name_size;
1342
1343         prefix_size = strlen(flush_prefix);
1344         name_size = strlen(name);
1345
1346         flush_name = malloc (name_size + prefix_size + 1);
1347         if (flush_name == NULL)
1348         {
1349                 ERROR ("plugin_flush_callback_name: malloc failed.");
1350                 return (NULL);
1351         }
1352
1353         sstrncpy (flush_name, flush_prefix, prefix_size + 1);
1354         sstrncpy (flush_name + prefix_size, name, name_size + 1);
1355
1356         return flush_name;
1357 } /* static char *plugin_flush_callback_name */
1358
1359 int plugin_register_flush (const char *name,
1360                 plugin_flush_cb callback, user_data_t *ud)
1361 {
1362         int status;
1363         plugin_ctx_t ctx = plugin_get_ctx ();
1364
1365         status = create_register_callback (&list_flush, name,
1366                 (void *) callback, ud);
1367         if (status != 0)
1368                 return status;
1369
1370         if (ctx.flush_interval != 0)
1371         {
1372                 char *flush_name;
1373                 flush_callback_t *cb;
1374
1375                 flush_name = plugin_flush_callback_name (name);
1376                 if (flush_name == NULL)
1377                         return (-1);
1378
1379                 cb = malloc(sizeof (*cb));
1380                 if (cb == NULL)
1381                 {
1382                         ERROR ("plugin_register_flush: malloc failed.");
1383                         sfree (flush_name);
1384                         return (-1);
1385                 }
1386
1387                 cb->name = strdup (name);
1388                 if (cb->name == NULL)
1389                 {
1390                         ERROR ("plugin_register_flush: strdup failed.");
1391                         sfree (cb);
1392                         sfree (flush_name);
1393                         return (-1);
1394                 }
1395                 cb->timeout = ctx.flush_timeout;
1396
1397                 ud->data = cb;
1398                 ud->free_func = plugin_flush_timeout_callback_free;
1399
1400                 status = plugin_register_complex_read (
1401                         /* group     = */ "flush",
1402                         /* name      = */ flush_name,
1403                         /* callback  = */ plugin_flush_timeout_callback,
1404                         /* interval  = */ ctx.flush_interval,
1405                         /* user data = */ ud);
1406
1407                 sfree (flush_name);
1408                 if (status != 0)
1409                 {
1410                         sfree (cb->name);
1411                         sfree (cb);
1412                         return status;
1413                 }
1414         }
1415
1416         return 0;
1417 } /* int plugin_register_flush */
1418
1419 int plugin_register_missing (const char *name,
1420                 plugin_missing_cb callback, user_data_t *ud)
1421 {
1422         return (create_register_callback (&list_missing, name,
1423                                 (void *) callback, ud));
1424 } /* int plugin_register_missing */
1425
1426 int plugin_register_shutdown (const char *name,
1427                 int (*callback) (void))
1428 {
1429         return (create_register_callback (&list_shutdown, name,
1430                                 (void *) callback, /* user_data = */ NULL));
1431 } /* int plugin_register_shutdown */
1432
1433 static void plugin_free_data_sets (void)
1434 {
1435         void *key;
1436         void *value;
1437
1438         if (data_sets == NULL)
1439                 return;
1440
1441         while (c_avl_pick (data_sets, &key, &value) == 0)
1442         {
1443                 data_set_t *ds = value;
1444                 /* key is a pointer to ds->type */
1445
1446                 sfree (ds->ds);
1447                 sfree (ds);
1448         }
1449
1450         c_avl_destroy (data_sets);
1451         data_sets = NULL;
1452 } /* void plugin_free_data_sets */
1453
1454 int plugin_register_data_set (const data_set_t *ds)
1455 {
1456         data_set_t *ds_copy;
1457
1458         if ((data_sets != NULL)
1459                         && (c_avl_get (data_sets, ds->type, NULL) == 0))
1460         {
1461                 NOTICE ("Replacing DS `%s' with another version.", ds->type);
1462                 plugin_unregister_data_set (ds->type);
1463         }
1464         else if (data_sets == NULL)
1465         {
1466                 data_sets = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1467                 if (data_sets == NULL)
1468                         return (-1);
1469         }
1470
1471         ds_copy = malloc (sizeof (*ds_copy));
1472         if (ds_copy == NULL)
1473                 return (-1);
1474         memcpy(ds_copy, ds, sizeof (data_set_t));
1475
1476         ds_copy->ds = malloc (sizeof (*ds_copy->ds)
1477                         * ds->ds_num);
1478         if (ds_copy->ds == NULL)
1479         {
1480                 sfree (ds_copy);
1481                 return (-1);
1482         }
1483
1484         for (size_t i = 0; i < ds->ds_num; i++)
1485                 memcpy (ds_copy->ds + i, ds->ds + i, sizeof (data_source_t));
1486
1487         return (c_avl_insert (data_sets, (void *) ds_copy->type, (void *) ds_copy));
1488 } /* int plugin_register_data_set */
1489
1490 int plugin_register_log (const char *name,
1491                 plugin_log_cb callback, user_data_t *ud)
1492 {
1493         return (create_register_callback (&list_log, name,
1494                                 (void *) callback, ud));
1495 } /* int plugin_register_log */
1496
1497 int plugin_register_notification (const char *name,
1498                 plugin_notification_cb callback, user_data_t *ud)
1499 {
1500         return (create_register_callback (&list_notification, name,
1501                                 (void *) callback, ud));
1502 } /* int plugin_register_log */
1503
1504 int plugin_unregister_config (const char *name)
1505 {
1506         cf_unregister (name);
1507         return (0);
1508 } /* int plugin_unregister_config */
1509
1510 int plugin_unregister_complex_config (const char *name)
1511 {
1512         cf_unregister_complex (name);
1513         return (0);
1514 } /* int plugin_unregister_complex_config */
1515
1516 int plugin_unregister_init (const char *name)
1517 {
1518         return (plugin_unregister (list_init, name));
1519 }
1520
1521 int plugin_unregister_read (const char *name) /* {{{ */
1522 {
1523         llentry_t *le;
1524         read_func_t *rf;
1525
1526         if (name == NULL)
1527                 return (-ENOENT);
1528
1529         pthread_mutex_lock (&read_lock);
1530
1531         if (read_list == NULL)
1532         {
1533                 pthread_mutex_unlock (&read_lock);
1534                 return (-ENOENT);
1535         }
1536
1537         le = llist_search (read_list, name);
1538         if (le == NULL)
1539         {
1540                 pthread_mutex_unlock (&read_lock);
1541                 WARNING ("plugin_unregister_read: No such read function: %s",
1542                                 name);
1543                 return (-ENOENT);
1544         }
1545
1546         llist_remove (read_list, le);
1547
1548         rf = le->value;
1549         assert (rf != NULL);
1550         rf->rf_type = RF_REMOVE;
1551
1552         pthread_mutex_unlock (&read_lock);
1553
1554         llentry_destroy (le);
1555
1556         DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name);
1557
1558         return (0);
1559 } /* }}} int plugin_unregister_read */
1560
1561 void plugin_log_available_writers (void)
1562 {
1563         log_list_callbacks (&list_write, "Available write targets:");
1564 }
1565
1566 static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */
1567 {
1568         read_func_t *rf    = e->value;
1569         char        *group = ud;
1570
1571         return strcmp (rf->rf_group, (const char *)group);
1572 } /* }}} int compare_read_func_group */
1573
1574 int plugin_unregister_read_group (const char *group) /* {{{ */
1575 {
1576         llentry_t *le;
1577         read_func_t *rf;
1578
1579         int found = 0;
1580
1581         if (group == NULL)
1582                 return (-ENOENT);
1583
1584         pthread_mutex_lock (&read_lock);
1585
1586         if (read_list == NULL)
1587         {
1588                 pthread_mutex_unlock (&read_lock);
1589                 return (-ENOENT);
1590         }
1591
1592         while (42)
1593         {
1594                 le = llist_search_custom (read_list,
1595                                 compare_read_func_group, (void *)group);
1596
1597                 if (le == NULL)
1598                         break;
1599
1600                 ++found;
1601
1602                 llist_remove (read_list, le);
1603
1604                 rf = le->value;
1605                 assert (rf != NULL);
1606                 rf->rf_type = RF_REMOVE;
1607
1608                 llentry_destroy (le);
1609
1610                 DEBUG ("plugin_unregister_read_group: "
1611                                 "Marked `%s' (group `%s') for removal.",
1612                                 rf->rf_name, group);
1613         }
1614
1615         pthread_mutex_unlock (&read_lock);
1616
1617         if (found == 0)
1618         {
1619                 WARNING ("plugin_unregister_read_group: No such "
1620                                 "group of read function: %s", group);
1621                 return (-ENOENT);
1622         }
1623
1624         return (0);
1625 } /* }}} int plugin_unregister_read_group */
1626
1627 int plugin_unregister_write (const char *name)
1628 {
1629         return (plugin_unregister (list_write, name));
1630 }
1631
1632 int plugin_unregister_flush (const char *name)
1633 {
1634         plugin_ctx_t ctx = plugin_get_ctx ();
1635
1636         if (ctx.flush_interval != 0)
1637         {
1638                 char *flush_name;
1639
1640                 flush_name = plugin_flush_callback_name (name);
1641                 if (flush_name != NULL)
1642                 {
1643                         plugin_unregister_read(flush_name);
1644                         sfree (flush_name);
1645                 }
1646         }
1647
1648         return plugin_unregister (list_flush, name);
1649 }
1650
1651 int plugin_unregister_missing (const char *name)
1652 {
1653         return (plugin_unregister (list_missing, name));
1654 }
1655
1656 int plugin_unregister_shutdown (const char *name)
1657 {
1658         return (plugin_unregister (list_shutdown, name));
1659 }
1660
1661 int plugin_unregister_data_set (const char *name)
1662 {
1663         data_set_t *ds;
1664
1665         if (data_sets == NULL)
1666                 return (-1);
1667
1668         if (c_avl_remove (data_sets, name, NULL, (void *) &ds) != 0)
1669                 return (-1);
1670
1671         sfree (ds->ds);
1672         sfree (ds);
1673
1674         return (0);
1675 } /* int plugin_unregister_data_set */
1676
1677 int plugin_unregister_log (const char *name)
1678 {
1679         return (plugin_unregister (list_log, name));
1680 }
1681
1682 int plugin_unregister_notification (const char *name)
1683 {
1684         return (plugin_unregister (list_notification, name));
1685 }
1686
1687 int plugin_init_all (void)
1688 {
1689         char const *chain_name;
1690         llentry_t *le;
1691         int status;
1692         int ret = 0;
1693
1694         /* Init the value cache */
1695         uc_init ();
1696
1697         if (IS_TRUE (global_option_get ("CollectInternalStats")))
1698                 record_statistics = 1;
1699
1700         chain_name = global_option_get ("PreCacheChain");
1701         pre_cache_chain = fc_chain_get_by_name (chain_name);
1702
1703         chain_name = global_option_get ("PostCacheChain");
1704         post_cache_chain = fc_chain_get_by_name (chain_name);
1705
1706         write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
1707                         /* default = */ 0);
1708         if (write_limit_high < 0)
1709         {
1710                 ERROR ("WriteQueueLimitHigh must be positive or zero.");
1711                 write_limit_high = 0;
1712         }
1713
1714         write_limit_low = global_option_get_long ("WriteQueueLimitLow",
1715                         /* default = */ write_limit_high / 2);
1716         if (write_limit_low < 0)
1717         {
1718                 ERROR ("WriteQueueLimitLow must be positive or zero.");
1719                 write_limit_low = write_limit_high / 2;
1720         }
1721         else if (write_limit_low > write_limit_high)
1722         {
1723                 ERROR ("WriteQueueLimitLow must not be larger than "
1724                                 "WriteQueueLimitHigh.");
1725                 write_limit_low = write_limit_high;
1726         }
1727
1728         write_threads_num = global_option_get_long ("WriteThreads",
1729                         /* default = */ 5);
1730         if (write_threads_num < 1)
1731         {
1732                 ERROR ("WriteThreads must be positive.");
1733                 write_threads_num = 5;
1734         }
1735
1736         if ((list_init == NULL) && (read_heap == NULL))
1737                 return ret;
1738
1739         /* Calling all init callbacks before checking if read callbacks
1740          * are available allows the init callbacks to register the read
1741          * callback. */
1742         le = llist_head (list_init);
1743         while (le != NULL)
1744         {
1745                 callback_func_t *cf;
1746                 plugin_init_cb callback;
1747                 plugin_ctx_t old_ctx;
1748
1749                 cf = le->value;
1750                 old_ctx = plugin_set_ctx (cf->cf_ctx);
1751                 callback = cf->cf_callback;
1752                 status = (*callback) ();
1753                 plugin_set_ctx (old_ctx);
1754
1755                 if (status != 0)
1756                 {
1757                         ERROR ("Initialization of plugin `%s' "
1758                                         "failed with status %i. "
1759                                         "Plugin will be unloaded.",
1760                                         le->key, status);
1761                         /* Plugins that register read callbacks from the init
1762                          * callback should take care of appropriate error
1763                          * handling themselves. */
1764                         /* FIXME: Unload _all_ functions */
1765                         plugin_unregister_read (le->key);
1766                         ret = -1;
1767                 }
1768
1769                 le = le->next;
1770         }
1771
1772         start_write_threads ((size_t) write_threads_num);
1773
1774         max_read_interval = global_option_get_time ("MaxReadInterval",
1775                         DEFAULT_MAX_READ_INTERVAL);
1776
1777         /* Start read-threads */
1778         if (read_heap != NULL)
1779         {
1780                 const char *rt;
1781                 int num;
1782
1783                 rt = global_option_get ("ReadThreads");
1784                 num = atoi (rt);
1785                 if (num != -1)
1786                         start_read_threads ((num > 0) ? num : 5);
1787         }
1788         return ret;
1789 } /* void plugin_init_all */
1790
1791 /* TODO: Rename this function. */
1792 void plugin_read_all (void)
1793 {
1794         if(record_statistics) {
1795                 plugin_update_internal_statistics ();
1796         }
1797         uc_check_timeout ();
1798
1799         return;
1800 } /* void plugin_read_all */
1801
1802 /* Read function called when the `-T' command line argument is given. */
1803 int plugin_read_all_once (void)
1804 {
1805         int status;
1806         int return_status = 0;
1807
1808         if (read_heap == NULL)
1809         {
1810                 NOTICE ("No read-functions are registered.");
1811                 return (0);
1812         }
1813
1814         while (42)
1815         {
1816                 read_func_t *rf;
1817                 plugin_ctx_t old_ctx;
1818
1819                 rf = c_heap_get_root (read_heap);
1820                 if (rf == NULL)
1821                         break;
1822
1823                 old_ctx = plugin_set_ctx (rf->rf_ctx);
1824
1825                 if (rf->rf_type == RF_SIMPLE)
1826                 {
1827                         int (*callback) (void);
1828
1829                         callback = rf->rf_callback;
1830                         status = (*callback) ();
1831                 }
1832                 else
1833                 {
1834                         plugin_read_cb callback;
1835
1836                         callback = rf->rf_callback;
1837                         status = (*callback) (&rf->rf_udata);
1838                 }
1839
1840                 plugin_set_ctx (old_ctx);
1841
1842                 if (status != 0)
1843                 {
1844                         NOTICE ("read-function of plugin `%s' failed.",
1845                                         rf->rf_name);
1846                         return_status = -1;
1847                 }
1848
1849                 sfree (rf->rf_name);
1850                 destroy_callback ((void *) rf);
1851         }
1852
1853         return (return_status);
1854 } /* int plugin_read_all_once */
1855
1856 int plugin_write (const char *plugin, /* {{{ */
1857                 const data_set_t *ds, const value_list_t *vl)
1858 {
1859   llentry_t *le;
1860   int status;
1861
1862   if (vl == NULL)
1863     return (EINVAL);
1864
1865   if (list_write == NULL)
1866     return (ENOENT);
1867
1868   if (ds == NULL)
1869   {
1870     ds = plugin_get_ds (vl->type);
1871     if (ds == NULL)
1872     {
1873       ERROR ("plugin_write: Unable to lookup type `%s'.", vl->type);
1874       return (ENOENT);
1875     }
1876   }
1877
1878   if (plugin == NULL)
1879   {
1880     int success = 0;
1881     int failure = 0;
1882
1883     le = llist_head (list_write);
1884     while (le != NULL)
1885     {
1886       callback_func_t *cf = le->value;
1887       plugin_write_cb callback;
1888
1889       /* do not switch plugin context; rather keep the context (interval)
1890        * information of the calling read plugin */
1891
1892       DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
1893       callback = cf->cf_callback;
1894       status = (*callback) (ds, vl, &cf->cf_udata);
1895       if (status != 0)
1896         failure++;
1897       else
1898         success++;
1899
1900       le = le->next;
1901     }
1902
1903     if ((success == 0) && (failure != 0))
1904       status = -1;
1905     else
1906       status = 0;
1907   }
1908   else /* plugin != NULL */
1909   {
1910     callback_func_t *cf;
1911     plugin_write_cb callback;
1912
1913     le = llist_head (list_write);
1914     while (le != NULL)
1915     {
1916       if (strcasecmp (plugin, le->key) == 0)
1917         break;
1918
1919       le = le->next;
1920     }
1921
1922     if (le == NULL)
1923       return (ENOENT);
1924
1925     cf = le->value;
1926
1927     /* do not switch plugin context; rather keep the context (interval)
1928      * information of the calling read plugin */
1929
1930     DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
1931     callback = cf->cf_callback;
1932     status = (*callback) (ds, vl, &cf->cf_udata);
1933   }
1934
1935   return (status);
1936 } /* }}} int plugin_write */
1937
1938 int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
1939 {
1940   llentry_t *le;
1941
1942   if (list_flush == NULL)
1943     return (0);
1944
1945   le = llist_head (list_flush);
1946   while (le != NULL)
1947   {
1948     callback_func_t *cf;
1949     plugin_flush_cb callback;
1950     plugin_ctx_t old_ctx;
1951
1952     if ((plugin != NULL)
1953         && (strcmp (plugin, le->key) != 0))
1954     {
1955       le = le->next;
1956       continue;
1957     }
1958
1959     cf = le->value;
1960     old_ctx = plugin_set_ctx (cf->cf_ctx);
1961     callback = cf->cf_callback;
1962
1963     (*callback) (timeout, identifier, &cf->cf_udata);
1964
1965     plugin_set_ctx (old_ctx);
1966
1967     le = le->next;
1968   }
1969   return (0);
1970 } /* int plugin_flush */
1971
1972 int plugin_shutdown_all (void)
1973 {
1974         llentry_t *le;
1975         int ret = 0;  // Assume success.
1976
1977         destroy_all_callbacks (&list_init);
1978
1979         stop_read_threads ();
1980
1981         pthread_mutex_lock (&read_lock);
1982         llist_destroy (read_list);
1983         read_list = NULL;
1984         pthread_mutex_unlock (&read_lock);
1985
1986         destroy_read_heap ();
1987
1988         /* blocks until all write threads have shut down. */
1989         stop_write_threads ();
1990
1991         /* ask all plugins to write out the state they kept. */
1992         plugin_flush (/* plugin = */ NULL,
1993                         /* timeout = */ 0,
1994                         /* identifier = */ NULL);
1995
1996         le = NULL;
1997         if (list_shutdown != NULL)
1998                 le = llist_head (list_shutdown);
1999
2000         while (le != NULL)
2001         {
2002                 callback_func_t *cf;
2003                 plugin_shutdown_cb callback;
2004                 plugin_ctx_t old_ctx;
2005
2006                 cf = le->value;
2007                 old_ctx = plugin_set_ctx (cf->cf_ctx);
2008                 callback = cf->cf_callback;
2009
2010                 /* Advance the pointer before calling the callback allows
2011                  * shutdown functions to unregister themselves. If done the
2012                  * other way around the memory `le' points to will be freed
2013                  * after callback returns. */
2014                 le = le->next;
2015
2016                 if ((*callback) () != 0)
2017                         ret = -1;
2018
2019                 plugin_set_ctx (old_ctx);
2020         }
2021
2022         /* Write plugins which use the `user_data' pointer usually need the
2023          * same data available to the flush callback. If this is the case, set
2024          * the free_function to NULL when registering the flush callback and to
2025          * the real free function when registering the write callback. This way
2026          * the data isn't freed twice. */
2027         destroy_all_callbacks (&list_flush);
2028         destroy_all_callbacks (&list_missing);
2029         destroy_all_callbacks (&list_write);
2030
2031         destroy_all_callbacks (&list_notification);
2032         destroy_all_callbacks (&list_shutdown);
2033         destroy_all_callbacks (&list_log);
2034
2035         plugin_free_loaded ();
2036         plugin_free_data_sets ();
2037         return (ret);
2038 } /* void plugin_shutdown_all */
2039
2040 int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
2041 {
2042   llentry_t *le;
2043
2044   if (list_missing == NULL)
2045     return (0);
2046
2047   le = llist_head (list_missing);
2048   while (le != NULL)
2049   {
2050     callback_func_t *cf;
2051     plugin_missing_cb callback;
2052     plugin_ctx_t old_ctx;
2053     int status;
2054
2055     cf = le->value;
2056     old_ctx = plugin_set_ctx (cf->cf_ctx);
2057     callback = cf->cf_callback;
2058
2059     status = (*callback) (vl, &cf->cf_udata);
2060     plugin_set_ctx (old_ctx);
2061     if (status != 0)
2062     {
2063       if (status < 0)
2064       {
2065         ERROR ("plugin_dispatch_missing: Callback function \"%s\" "
2066             "failed with status %i.",
2067             le->key, status);
2068         return (status);
2069       }
2070       else
2071       {
2072         return (0);
2073       }
2074     }
2075
2076     le = le->next;
2077   }
2078   return (0);
2079 } /* int }}} plugin_dispatch_missing */
2080
2081 static int plugin_dispatch_values_internal (value_list_t *vl)
2082 {
2083         int status;
2084         static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
2085
2086         value_t *saved_values;
2087         int      saved_values_len;
2088
2089         data_set_t *ds;
2090
2091         int free_meta_data = 0;
2092
2093         assert(vl);
2094         assert(vl->plugin);
2095
2096         if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1)
2097         {
2098                 ERROR ("plugin_dispatch_values: Invalid value list "
2099                                 "from plugin %s.", vl->plugin);
2100                 return (-1);
2101         }
2102
2103         /* Free meta data only if the calling function didn't specify any. In
2104          * this case matches and targets may add some and the calling function
2105          * may not expect (and therefore free) that data. */
2106         if (vl->meta == NULL)
2107                 free_meta_data = 1;
2108
2109         if (list_write == NULL)
2110                 c_complain_once (LOG_WARNING, &no_write_complaint,
2111                                 "plugin_dispatch_values: No write callback has been "
2112                                 "registered. Please load at least one output plugin, "
2113                                 "if you want the collected data to be stored.");
2114
2115         if (data_sets == NULL)
2116         {
2117                 ERROR ("plugin_dispatch_values: No data sets registered. "
2118                                 "Could the types database be read? Check "
2119                                 "your `TypesDB' setting!");
2120                 return (-1);
2121         }
2122
2123         if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0)
2124         {
2125                 char ident[6 * DATA_MAX_NAME_LEN];
2126
2127                 FORMAT_VL (ident, sizeof (ident), vl);
2128                 INFO ("plugin_dispatch_values: Dataset not found: %s "
2129                                 "(from \"%s\"), check your types.db!",
2130                                 vl->type, ident);
2131                 return (-1);
2132         }
2133
2134         /* Assured by plugin_value_list_clone(). The time is determined at
2135          * _enqueue_ time. */
2136         assert (vl->time != 0);
2137         assert (vl->interval != 0);
2138
2139         DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
2140                         "host = %s; "
2141                         "plugin = %s; plugin_instance = %s; "
2142                         "type = %s; type_instance = %s;",
2143                         CDTIME_T_TO_DOUBLE (vl->time),
2144                         CDTIME_T_TO_DOUBLE (vl->interval),
2145                         vl->host,
2146                         vl->plugin, vl->plugin_instance,
2147                         vl->type, vl->type_instance);
2148
2149 #if COLLECT_DEBUG
2150         assert (0 == strcmp (ds->type, vl->type));
2151 #else
2152         if (0 != strcmp (ds->type, vl->type))
2153                 WARNING ("plugin_dispatch_values: (ds->type = %s) != (vl->type = %s)",
2154                                 ds->type, vl->type);
2155 #endif
2156
2157 #if COLLECT_DEBUG
2158         assert (ds->ds_num == vl->values_len);
2159 #else
2160         if (ds->ds_num != vl->values_len)
2161         {
2162                 ERROR ("plugin_dispatch_values: ds->type = %s: "
2163                                 "(ds->ds_num = %zu) != "
2164                                 "(vl->values_len = %zu)",
2165                                 ds->type, ds->ds_num, vl->values_len);
2166                 return (-1);
2167         }
2168 #endif
2169
2170         escape_slashes (vl->host, sizeof (vl->host));
2171         escape_slashes (vl->plugin, sizeof (vl->plugin));
2172         escape_slashes (vl->plugin_instance, sizeof (vl->plugin_instance));
2173         escape_slashes (vl->type, sizeof (vl->type));
2174         escape_slashes (vl->type_instance, sizeof (vl->type_instance));
2175
2176         /* Copy the values. This way, we can assure `targets' that they get
2177          * dynamically allocated values, which they can free and replace if
2178          * they like. */
2179         if ((pre_cache_chain != NULL) || (post_cache_chain != NULL))
2180         {
2181                 saved_values     = vl->values;
2182                 saved_values_len = vl->values_len;
2183
2184                 vl->values = (value_t *) calloc (vl->values_len,
2185                                 sizeof (*vl->values));
2186                 if (vl->values == NULL)
2187                 {
2188                         ERROR ("plugin_dispatch_values: calloc failed.");
2189                         vl->values = saved_values;
2190                         return (-1);
2191                 }
2192                 memcpy (vl->values, saved_values,
2193                                 vl->values_len * sizeof (*vl->values));
2194         }
2195         else /* if ((pre == NULL) && (post == NULL)) */
2196         {
2197                 saved_values     = NULL;
2198                 saved_values_len = 0;
2199         }
2200
2201         if (pre_cache_chain != NULL)
2202         {
2203                 status = fc_process_chain (ds, vl, pre_cache_chain);
2204                 if (status < 0)
2205                 {
2206                         WARNING ("plugin_dispatch_values: Running the "
2207                                         "pre-cache chain failed with "
2208                                         "status %i (%#x).",
2209                                         status, status);
2210                 }
2211                 else if (status == FC_TARGET_STOP)
2212                 {
2213                         /* Restore the state of the value_list so that plugins
2214                          * don't get confused.. */
2215                         if (saved_values != NULL)
2216                         {
2217                                 sfree (vl->values);
2218                                 vl->values     = saved_values;
2219                                 vl->values_len = saved_values_len;
2220                         }
2221                         return (0);
2222                 }
2223         }
2224
2225         /* Update the value cache */
2226         uc_update (ds, vl);
2227
2228         if (post_cache_chain != NULL)
2229         {
2230                 status = fc_process_chain (ds, vl, post_cache_chain);
2231                 if (status < 0)
2232                 {
2233                         WARNING ("plugin_dispatch_values: Running the "
2234                                         "post-cache chain failed with "
2235                                         "status %i (%#x).",
2236                                         status, status);
2237                 }
2238         }
2239         else
2240                 fc_default_action (ds, vl);
2241
2242         /* Restore the state of the value_list so that plugins don't get
2243          * confused.. */
2244         if (saved_values != NULL)
2245         {
2246                 sfree (vl->values);
2247                 vl->values     = saved_values;
2248                 vl->values_len = saved_values_len;
2249         }
2250
2251         if ((free_meta_data != 0) && (vl->meta != NULL))
2252         {
2253                 meta_data_destroy (vl->meta);
2254                 vl->meta = NULL;
2255         }
2256
2257         return (0);
2258 } /* int plugin_dispatch_values_internal */
2259
2260 static double get_drop_probability (void) /* {{{ */
2261 {
2262         long pos;
2263         long size;
2264         long wql;
2265
2266         pthread_mutex_lock (&write_lock);
2267         wql = write_queue_length;
2268         pthread_mutex_unlock (&write_lock);
2269
2270         if (wql < write_limit_low)
2271                 return (0.0);
2272         if (wql >= write_limit_high)
2273                 return (1.0);
2274
2275         pos = 1 + wql - write_limit_low;
2276         size = 1 + write_limit_high - write_limit_low;
2277
2278         return (((double) pos) / ((double) size));
2279 } /* }}} double get_drop_probability */
2280
2281 static _Bool check_drop_value (void) /* {{{ */
2282 {
2283         static cdtime_t last_message_time = 0;
2284         static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2285
2286         double p;
2287         double q;
2288         int status;
2289
2290         if (write_limit_high == 0)
2291                 return (0);
2292
2293         p = get_drop_probability ();
2294         if (p == 0.0)
2295                 return (0);
2296
2297         status = pthread_mutex_trylock (&last_message_lock);
2298         if (status == 0)
2299         {
2300                 cdtime_t now;
2301
2302                 now = cdtime ();
2303                 if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
2304                 {
2305                         last_message_time = now;
2306                         ERROR ("plugin_dispatch_values: Low water mark "
2307                                         "reached. Dropping %.0f%% of metrics.",
2308                                         100.0 * p);
2309                 }
2310                 pthread_mutex_unlock (&last_message_lock);
2311         }
2312
2313         if (p == 1.0)
2314                 return (1);
2315
2316         q = cdrand_d ();
2317         if (q > p)
2318                 return (1);
2319         else
2320                 return (0);
2321 } /* }}} _Bool check_drop_value */
2322
2323 int plugin_dispatch_values (value_list_t const *vl)
2324 {
2325         int status;
2326         static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
2327
2328         if (check_drop_value ()) {
2329                 if(record_statistics) {
2330                         pthread_mutex_lock(&statistics_lock);
2331                         stats_values_dropped++;
2332                         pthread_mutex_unlock(&statistics_lock);
2333                 }
2334                 return (0);
2335         }
2336
2337         status = plugin_write_enqueue (vl);
2338         if (status != 0)
2339         {
2340                 char errbuf[1024];
2341                 ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
2342                                 "with status %i (%s).", status,
2343                                 sstrerror (status, errbuf, sizeof (errbuf)));
2344                 return (status);
2345         }
2346
2347         return (0);
2348 }
2349
2350 __attribute__((sentinel))
2351 int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */
2352                 _Bool store_percentage, int store_type, ...)
2353 {
2354         value_list_t *vl;
2355         int failed = 0;
2356         gauge_t sum = 0.0;
2357         va_list ap;
2358
2359         assert (template->values_len == 1);
2360
2361         /* Calculate sum for Gauge to calculate percent if needed */
2362         if (DS_TYPE_GAUGE == store_type)        {
2363                 va_start (ap, store_type);
2364                 while (42)
2365                 {
2366                         char const *name;
2367                         gauge_t value;
2368
2369                         name = va_arg (ap, char const *);
2370                         if (name == NULL)
2371                                 break;
2372
2373                         value = va_arg (ap, gauge_t);
2374                         if (!isnan (value))
2375                                 sum += value;
2376                 }
2377                 va_end (ap);
2378         }
2379
2380
2381         vl = plugin_value_list_clone (template);
2382         /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2383         if (store_percentage)
2384                 sstrncpy (vl->type, "percent", sizeof (vl->type));
2385
2386         va_start (ap, store_type);
2387         while (42)
2388         {
2389                 char const *name;
2390                 int status;
2391
2392                 /* Set the type instance. */
2393                 name = va_arg (ap, char const *);
2394                 if (name == NULL)
2395                         break;
2396                 sstrncpy (vl->type_instance, name, sizeof (vl->type_instance));
2397
2398                 /* Set the value. */
2399                 switch (store_type)
2400                 {
2401                 case DS_TYPE_GAUGE:
2402                         vl->values[0].gauge = va_arg (ap, gauge_t);
2403                         if (store_percentage)
2404                                 vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2405                         break;
2406                 case DS_TYPE_ABSOLUTE:
2407                         vl->values[0].absolute = va_arg (ap, absolute_t);
2408                         break;
2409                 case DS_TYPE_COUNTER:
2410                         vl->values[0].counter  = va_arg (ap, counter_t);
2411                         break;
2412                 case DS_TYPE_DERIVE:
2413                         vl->values[0].derive   = va_arg (ap, derive_t);
2414                         break;
2415                 default:
2416                         ERROR ("plugin_dispatch_multivalue: given store_type is incorrect.");
2417                         failed++;
2418                 }
2419
2420
2421                 status = plugin_write_enqueue (vl);
2422                 if (status != 0)
2423                         failed++;
2424         }
2425         va_end (ap);
2426
2427         plugin_value_list_free (vl);
2428         return (failed);
2429 } /* }}} int plugin_dispatch_multivalue */
2430
2431 int plugin_dispatch_notification (const notification_t *notif)
2432 {
2433         llentry_t *le;
2434         /* Possible TODO: Add flap detection here */
2435
2436         DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
2437                         "time = %.3f; host = %s;",
2438                         notif->severity, notif->message,
2439                         CDTIME_T_TO_DOUBLE (notif->time), notif->host);
2440
2441         /* Nobody cares for notifications */
2442         if (list_notification == NULL)
2443                 return (-1);
2444
2445         le = llist_head (list_notification);
2446         while (le != NULL)
2447         {
2448                 callback_func_t *cf;
2449                 plugin_notification_cb callback;
2450                 int status;
2451
2452                 /* do not switch plugin context; rather keep the context
2453                  * (interval) information of the calling plugin */
2454
2455                 cf = le->value;
2456                 callback = cf->cf_callback;
2457                 status = (*callback) (notif, &cf->cf_udata);
2458                 if (status != 0)
2459                 {
2460                         WARNING ("plugin_dispatch_notification: Notification "
2461                                         "callback %s returned %i.",
2462                                         le->key, status);
2463                 }
2464
2465                 le = le->next;
2466         }
2467
2468         return (0);
2469 } /* int plugin_dispatch_notification */
2470
2471 void plugin_log (int level, const char *format, ...)
2472 {
2473         char msg[1024];
2474         va_list ap;
2475         llentry_t *le;
2476
2477 #if !COLLECT_DEBUG
2478         if (level >= LOG_DEBUG)
2479                 return;
2480 #endif
2481
2482         va_start (ap, format);
2483         vsnprintf (msg, sizeof (msg), format, ap);
2484         msg[sizeof (msg) - 1] = '\0';
2485         va_end (ap);
2486
2487         if (list_log == NULL)
2488         {
2489                 fprintf (stderr, "%s\n", msg);
2490                 return;
2491         }
2492
2493         le = llist_head (list_log);
2494         while (le != NULL)
2495         {
2496                 callback_func_t *cf;
2497                 plugin_log_cb callback;
2498
2499                 cf = le->value;
2500                 callback = cf->cf_callback;
2501
2502                 /* do not switch plugin context; rather keep the context
2503                  * (interval) information of the calling plugin */
2504
2505                 (*callback) (level, msg, &cf->cf_udata);
2506
2507                 le = le->next;
2508         }
2509 } /* void plugin_log */
2510
2511 int parse_log_severity (const char *severity)
2512 {
2513         int log_level = -1;
2514
2515         if ((0 == strcasecmp (severity, "emerg"))
2516                         || (0 == strcasecmp (severity, "alert"))
2517                         || (0 == strcasecmp (severity, "crit"))
2518                         || (0 == strcasecmp (severity, "err")))
2519                 log_level = LOG_ERR;
2520         else if (0 == strcasecmp (severity, "warning"))
2521                 log_level = LOG_WARNING;
2522         else if (0 == strcasecmp (severity, "notice"))
2523                 log_level = LOG_NOTICE;
2524         else if (0 == strcasecmp (severity, "info"))
2525                 log_level = LOG_INFO;
2526 #if COLLECT_DEBUG
2527         else if (0 == strcasecmp (severity, "debug"))
2528                 log_level = LOG_DEBUG;
2529 #endif /* COLLECT_DEBUG */
2530
2531         return (log_level);
2532 } /* int parse_log_severity */
2533
2534 int parse_notif_severity (const char *severity)
2535 {
2536         int notif_severity = -1;
2537
2538         if (strcasecmp (severity, "FAILURE") == 0)
2539                 notif_severity = NOTIF_FAILURE;
2540         else if (strcmp (severity, "OKAY") == 0)
2541                 notif_severity = NOTIF_OKAY;
2542         else if ((strcmp (severity, "WARNING") == 0)
2543                         || (strcmp (severity, "WARN") == 0))
2544                 notif_severity = NOTIF_WARNING;
2545
2546         return (notif_severity);
2547 } /* int parse_notif_severity */
2548
2549 const data_set_t *plugin_get_ds (const char *name)
2550 {
2551         data_set_t *ds;
2552
2553         if (data_sets == NULL)
2554         {
2555                 ERROR ("plugin_get_ds: No data sets are defined yet.");
2556                 return (NULL);
2557         }
2558
2559         if (c_avl_get (data_sets, name, (void *) &ds) != 0)
2560         {
2561                 DEBUG ("No such dataset registered: %s", name);
2562                 return (NULL);
2563         }
2564
2565         return (ds);
2566 } /* data_set_t *plugin_get_ds */
2567
2568 static int plugin_notification_meta_add (notification_t *n,
2569     const char *name,
2570     enum notification_meta_type_e type,
2571     const void *value)
2572 {
2573   notification_meta_t *meta;
2574   notification_meta_t *tail;
2575
2576   if ((n == NULL) || (name == NULL) || (value == NULL))
2577   {
2578     ERROR ("plugin_notification_meta_add: A pointer is NULL!");
2579     return (-1);
2580   }
2581
2582   meta = calloc (1, sizeof (*meta));
2583   if (meta == NULL)
2584   {
2585     ERROR ("plugin_notification_meta_add: calloc failed.");
2586     return (-1);
2587   }
2588
2589   sstrncpy (meta->name, name, sizeof (meta->name));
2590   meta->type = type;
2591
2592   switch (type)
2593   {
2594     case NM_TYPE_STRING:
2595     {
2596       meta->nm_value.nm_string = strdup ((const char *) value);
2597       if (meta->nm_value.nm_string == NULL)
2598       {
2599         ERROR ("plugin_notification_meta_add: strdup failed.");
2600         sfree (meta);
2601         return (-1);
2602       }
2603       break;
2604     }
2605     case NM_TYPE_SIGNED_INT:
2606     {
2607       meta->nm_value.nm_signed_int = *((int64_t *) value);
2608       break;
2609     }
2610     case NM_TYPE_UNSIGNED_INT:
2611     {
2612       meta->nm_value.nm_unsigned_int = *((uint64_t *) value);
2613       break;
2614     }
2615     case NM_TYPE_DOUBLE:
2616     {
2617       meta->nm_value.nm_double = *((double *) value);
2618       break;
2619     }
2620     case NM_TYPE_BOOLEAN:
2621     {
2622       meta->nm_value.nm_boolean = *((_Bool *) value);
2623       break;
2624     }
2625     default:
2626     {
2627       ERROR ("plugin_notification_meta_add: Unknown type: %i", type);
2628       sfree (meta);
2629       return (-1);
2630     }
2631   } /* switch (type) */
2632
2633   meta->next = NULL;
2634   tail = n->meta;
2635   while ((tail != NULL) && (tail->next != NULL))
2636     tail = tail->next;
2637
2638   if (tail == NULL)
2639     n->meta = meta;
2640   else
2641     tail->next = meta;
2642
2643   return (0);
2644 } /* int plugin_notification_meta_add */
2645
2646 int plugin_notification_meta_add_string (notification_t *n,
2647     const char *name,
2648     const char *value)
2649 {
2650   return (plugin_notification_meta_add (n, name, NM_TYPE_STRING, value));
2651 }
2652
2653 int plugin_notification_meta_add_signed_int (notification_t *n,
2654     const char *name,
2655     int64_t value)
2656 {
2657   return (plugin_notification_meta_add (n, name, NM_TYPE_SIGNED_INT, &value));
2658 }
2659
2660 int plugin_notification_meta_add_unsigned_int (notification_t *n,
2661     const char *name,
2662     uint64_t value)
2663 {
2664   return (plugin_notification_meta_add (n, name, NM_TYPE_UNSIGNED_INT, &value));
2665 }
2666
2667 int plugin_notification_meta_add_double (notification_t *n,
2668     const char *name,
2669     double value)
2670 {
2671   return (plugin_notification_meta_add (n, name, NM_TYPE_DOUBLE, &value));
2672 }
2673
2674 int plugin_notification_meta_add_boolean (notification_t *n,
2675     const char *name,
2676     _Bool value)
2677 {
2678   return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value));
2679 }
2680
2681 int plugin_notification_meta_copy (notification_t *dst,
2682     const notification_t *src)
2683 {
2684   assert (dst != NULL);
2685   assert (src != NULL);
2686   assert (dst != src);
2687   assert ((src->meta == NULL) || (src->meta != dst->meta));
2688
2689   for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next)
2690   {
2691     if (meta->type == NM_TYPE_STRING)
2692       plugin_notification_meta_add_string (dst, meta->name,
2693           meta->nm_value.nm_string);
2694     else if (meta->type == NM_TYPE_SIGNED_INT)
2695       plugin_notification_meta_add_signed_int (dst, meta->name,
2696           meta->nm_value.nm_signed_int);
2697     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2698       plugin_notification_meta_add_unsigned_int (dst, meta->name,
2699           meta->nm_value.nm_unsigned_int);
2700     else if (meta->type == NM_TYPE_DOUBLE)
2701       plugin_notification_meta_add_double (dst, meta->name,
2702           meta->nm_value.nm_double);
2703     else if (meta->type == NM_TYPE_BOOLEAN)
2704       plugin_notification_meta_add_boolean (dst, meta->name,
2705           meta->nm_value.nm_boolean);
2706   }
2707
2708   return (0);
2709 } /* int plugin_notification_meta_copy */
2710
2711 int plugin_notification_meta_free (notification_meta_t *n)
2712 {
2713   notification_meta_t *this;
2714   notification_meta_t *next;
2715
2716   if (n == NULL)
2717   {
2718     ERROR ("plugin_notification_meta_free: n == NULL!");
2719     return (-1);
2720   }
2721
2722   this = n;
2723   while (this != NULL)
2724   {
2725     next = this->next;
2726
2727     if (this->type == NM_TYPE_STRING)
2728     {
2729       /* Assign to a temporary variable to work around nm_string's const
2730        * modifier. */
2731       void *tmp = (void *) this->nm_value.nm_string;
2732
2733       sfree (tmp);
2734       this->nm_value.nm_string = NULL;
2735     }
2736     sfree (this);
2737
2738     this = next;
2739   }
2740
2741   return (0);
2742 } /* int plugin_notification_meta_free */
2743
2744 static void plugin_ctx_destructor (void *ctx)
2745 {
2746         sfree (ctx);
2747 } /* void plugin_ctx_destructor */
2748
2749 static plugin_ctx_t ctx_init = { /* interval = */ 0 };
2750
2751 static plugin_ctx_t *plugin_ctx_create (void)
2752 {
2753         plugin_ctx_t *ctx;
2754
2755         ctx = malloc (sizeof (*ctx));
2756         if (ctx == NULL) {
2757                 char errbuf[1024];
2758                 ERROR ("Failed to allocate plugin context: %s",
2759                                 sstrerror (errno, errbuf, sizeof (errbuf)));
2760                 return NULL;
2761         }
2762
2763         *ctx = ctx_init;
2764         assert (plugin_ctx_key_initialized);
2765         pthread_setspecific (plugin_ctx_key, ctx);
2766         DEBUG("Created new plugin context.");
2767         return (ctx);
2768 } /* int plugin_ctx_create */
2769
2770 void plugin_init_ctx (void)
2771 {
2772         pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
2773         plugin_ctx_key_initialized = 1;
2774 } /* void plugin_init_ctx */
2775
2776 plugin_ctx_t plugin_get_ctx (void)
2777 {
2778         plugin_ctx_t *ctx;
2779
2780         assert (plugin_ctx_key_initialized);
2781         ctx = pthread_getspecific (plugin_ctx_key);
2782
2783         if (ctx == NULL) {
2784                 ctx = plugin_ctx_create ();
2785                 /* this must no happen -- exit() instead? */
2786                 if (ctx == NULL)
2787                         return ctx_init;
2788         }
2789
2790         return (*ctx);
2791 } /* plugin_ctx_t plugin_get_ctx */
2792
2793 plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
2794 {
2795         plugin_ctx_t *c;
2796         plugin_ctx_t old;
2797
2798         assert (plugin_ctx_key_initialized);
2799         c = pthread_getspecific (plugin_ctx_key);
2800
2801         if (c == NULL) {
2802                 c = plugin_ctx_create ();
2803                 /* this must no happen -- exit() instead? */
2804                 if (c == NULL)
2805                         return ctx_init;
2806         }
2807
2808         old = *c;
2809         *c = ctx;
2810
2811         return (old);
2812 } /* void plugin_set_ctx */
2813
2814 cdtime_t plugin_get_interval (void)
2815 {
2816         cdtime_t interval;
2817
2818         interval = plugin_get_ctx().interval;
2819         if (interval > 0)
2820                 return interval;
2821
2822         return cf_get_default_interval ();
2823 } /* cdtime_t plugin_get_interval */
2824
2825 typedef struct {
2826         plugin_ctx_t ctx;
2827         void *(*start_routine) (void *);
2828         void *arg;
2829 } plugin_thread_t;
2830
2831 static void *plugin_thread_start (void *arg)
2832 {
2833         plugin_thread_t *plugin_thread = arg;
2834
2835         void *(*start_routine) (void *) = plugin_thread->start_routine;
2836         void *plugin_arg = plugin_thread->arg;
2837
2838         plugin_set_ctx (plugin_thread->ctx);
2839
2840         sfree (plugin_thread);
2841
2842         return start_routine (plugin_arg);
2843 } /* void *plugin_thread_start */
2844
2845 int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
2846                 void *(*start_routine) (void *), void *arg)
2847 {
2848         plugin_thread_t *plugin_thread;
2849
2850         plugin_thread = malloc (sizeof (*plugin_thread));
2851         if (plugin_thread == NULL)
2852                 return -1;
2853
2854         plugin_thread->ctx           = plugin_get_ctx ();
2855         plugin_thread->start_routine = start_routine;
2856         plugin_thread->arg           = arg;
2857
2858         return pthread_create (thread, attr,
2859                         plugin_thread_start, plugin_thread);
2860 } /* int plugin_thread_create */
2861
2862 /* vim: set sw=8 ts=8 noet fdm=marker : */