src/plugin.[ch]: Implemented `plugin_flush_one' which flushes only one specific plugin.
[collectd.git] / src / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  *   Sebastian Harl <sh at tokkee.org>
21  **/
22
23 #include "collectd.h"
24
25 #include <ltdl.h>
26
27 #if HAVE_PTHREAD_H
28 # include <pthread.h>
29 #endif
30
31 #include "common.h"
32 #include "plugin.h"
33 #include "configfile.h"
34 #include "utils_avltree.h"
35 #include "utils_llist.h"
36 #include "utils_cache.h"
37 #include "utils_threshold.h"
38
39 /*
40  * Private structures
41  */
42 struct read_func_s
43 {
44         int wait_time;
45         int wait_left;
46         int (*callback) (void);
47         enum { DONE = 0, TODO = 1, ACTIVE = 2 } needs_read;
48 };
49 typedef struct read_func_s read_func_t;
50
51 /*
52  * Private variables
53  */
54 static llist_t *list_init;
55 static llist_t *list_read;
56 static llist_t *list_write;
57 static llist_t *list_flush;
58 static llist_t *list_shutdown;
59 static llist_t *list_log;
60 static llist_t *list_notification;
61
62 static c_avl_tree_t *data_sets;
63
64 static char *plugindir = NULL;
65
66 static int             read_loop = 1;
67 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
68 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
69 static pthread_t      *read_threads = NULL;
70 static int             read_threads_num = 0;
71
72 /*
73  * Static functions
74  */
75 static const char *plugin_get_dir (void)
76 {
77         if (plugindir == NULL)
78                 return (PLUGINDIR);
79         else
80                 return (plugindir);
81 }
82
83 static int register_callback (llist_t **list, const char *name, void *callback)
84 {
85         llentry_t *le;
86         char *key;
87
88         if ((*list == NULL)
89                         && ((*list = llist_create ()) == NULL))
90                 return (-1);
91
92         le = llist_search (*list, name);
93         if (le == NULL)
94         {
95                 key = strdup (name);
96                 if (key == NULL)
97                         return (-1);
98
99                 le = llentry_create (key, callback);
100                 if (le == NULL)
101                 {
102                         free (key);
103                         return (-1);
104                 }
105
106                 llist_append (*list, le);
107         }
108         else
109         {
110                 le->value = callback;
111         }
112
113         return (0);
114 } /* int register_callback */
115
116 static int plugin_unregister (llist_t *list, const char *name)
117 {
118         llentry_t *e;
119
120         e = llist_search (list, name);
121
122         if (e == NULL)
123                 return (-1);
124
125         llist_remove (list, e);
126         free (e->key);
127         llentry_destroy (e);
128
129         return (0);
130 } /* int plugin_unregister */
131
132 /*
133  * (Try to) load the shared object `file'. Won't complain if it isn't a shared
134  * object, but it will bitch about a shared object not having a
135  * ``module_register'' symbol..
136  */
137 static int plugin_load_file (char *file)
138 {
139         lt_dlhandle dlh;
140         void (*reg_handle) (void);
141
142         DEBUG ("file = %s", file);
143
144         lt_dlinit ();
145         lt_dlerror (); /* clear errors */
146
147         if ((dlh = lt_dlopen (file)) == NULL)
148         {
149                 const char *error = lt_dlerror ();
150
151                 ERROR ("lt_dlopen failed: %s", error);
152                 fprintf (stderr, "lt_dlopen failed: %s\n", error);
153                 return (1);
154         }
155
156         if ((reg_handle = (void (*) (void)) lt_dlsym (dlh, "module_register")) == NULL)
157         {
158                 WARNING ("Couldn't find symbol ``module_register'' in ``%s'': %s\n",
159                                 file, lt_dlerror ());
160                 lt_dlclose (dlh);
161                 return (-1);
162         }
163
164         (*reg_handle) ();
165
166         return (0);
167 }
168
169 static void *plugin_read_thread (void *args)
170 {
171         llentry_t   *le;
172         read_func_t *rf;
173         int          status;
174         int          done;
175
176         pthread_mutex_lock (&read_lock);
177
178         while (read_loop != 0)
179         {
180                 le = llist_head (list_read);
181                 done = 0;
182
183                 while ((read_loop != 0) && (le != NULL))
184                 {
185                         rf = (read_func_t *) le->value;
186
187                         if (rf->needs_read != TODO)
188                         {
189                                 le = le->next;
190                                 continue;
191                         }
192
193                         /* We will do this read function */
194                         rf->needs_read = ACTIVE;
195
196                         DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Handling %s",
197                                         (unsigned long int) pthread_self (), le->key);
198                         pthread_mutex_unlock (&read_lock);
199
200                         status = rf->callback ();
201                         done++;
202
203                         if (status != 0)
204                         {
205                                 if (rf->wait_time < interval_g)
206                                         rf->wait_time = interval_g;
207                                 rf->wait_left = rf->wait_time;
208                                 rf->wait_time = rf->wait_time * 2;
209                                 if (rf->wait_time > 86400)
210                                         rf->wait_time = 86400;
211
212                                 NOTICE ("read-function of plugin `%s' "
213                                                 "failed. Will suspend it for %i "
214                                                 "seconds.", le->key, rf->wait_left);
215                         }
216                         else
217                         {
218                                 rf->wait_left = 0;
219                                 rf->wait_time = interval_g;
220                         }
221
222                         pthread_mutex_lock (&read_lock);
223
224                         rf->needs_read = DONE;
225                         le = le->next;
226                 } /* while (le != NULL) */
227
228                 if ((read_loop != 0) && (done == 0))
229                 {
230                         DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Waiting on read_cond.",
231                                         (unsigned long int) pthread_self ());
232                         pthread_cond_wait (&read_cond, &read_lock);
233                 }
234         } /* while (read_loop) */
235
236         pthread_mutex_unlock (&read_lock);
237
238         pthread_exit (NULL);
239 } /* void *plugin_read_thread */
240
241 static void start_threads (int num)
242 {
243         int i;
244
245         if (read_threads != NULL)
246                 return;
247
248         read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
249         if (read_threads == NULL)
250         {
251                 ERROR ("plugin: start_threads: calloc failed.");
252                 return;
253         }
254
255         read_threads_num = 0;
256         for (i = 0; i < num; i++)
257         {
258                 if (pthread_create (read_threads + read_threads_num, NULL,
259                                         plugin_read_thread, NULL) == 0)
260                 {
261                         read_threads_num++;
262                 }
263                 else
264                 {
265                         ERROR ("plugin: start_threads: pthread_create failed.");
266                         return;
267                 }
268         } /* for (i) */
269 } /* void start_threads */
270
271 static void stop_threads (void)
272 {
273         int i;
274
275         pthread_mutex_lock (&read_lock);
276         read_loop = 0;
277         DEBUG ("plugin: stop_threads: Signalling `read_cond'");
278         pthread_cond_broadcast (&read_cond);
279         pthread_mutex_unlock (&read_lock);
280
281         for (i = 0; i < read_threads_num; i++)
282         {
283                 if (pthread_join (read_threads[i], NULL) != 0)
284                 {
285                         ERROR ("plugin: stop_threads: pthread_join failed.");
286                 }
287                 read_threads[i] = (pthread_t) 0;
288         }
289         sfree (read_threads);
290         read_threads_num = 0;
291 } /* void stop_threads */
292
293 /*
294  * Public functions
295  */
296 void plugin_set_dir (const char *dir)
297 {
298         if (plugindir != NULL)
299                 free (plugindir);
300
301         if (dir == NULL)
302                 plugindir = NULL;
303         else if ((plugindir = strdup (dir)) == NULL)
304         {
305                 char errbuf[1024];
306                 ERROR ("strdup failed: %s",
307                                 sstrerror (errno, errbuf, sizeof (errbuf)));
308         }
309 }
310
311 #define BUFSIZE 512
312 int plugin_load (const char *type)
313 {
314         DIR  *dh;
315         const char *dir;
316         char  filename[BUFSIZE];
317         char  typename[BUFSIZE];
318         int   typename_len;
319         int   ret;
320         struct stat    statbuf;
321         struct dirent *de;
322
323         DEBUG ("type = %s", type);
324
325         dir = plugin_get_dir ();
326         ret = 1;
327
328         /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
329          * type when matching the filename */
330         if (snprintf (typename, BUFSIZE, "%s.so", type) >= BUFSIZE)
331         {
332                 WARNING ("snprintf: truncated: `%s.so'", type);
333                 return (-1);
334         }
335         typename_len = strlen (typename);
336
337         if ((dh = opendir (dir)) == NULL)
338         {
339                 char errbuf[1024];
340                 ERROR ("opendir (%s): %s", dir,
341                                 sstrerror (errno, errbuf, sizeof (errbuf)));
342                 return (-1);
343         }
344
345         while ((de = readdir (dh)) != NULL)
346         {
347                 if (strncasecmp (de->d_name, typename, typename_len))
348                         continue;
349
350                 if (snprintf (filename, BUFSIZE, "%s/%s", dir, de->d_name) >= BUFSIZE)
351                 {
352                         WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name);
353                         continue;
354                 }
355
356                 if (lstat (filename, &statbuf) == -1)
357                 {
358                         char errbuf[1024];
359                         WARNING ("stat %s: %s", filename,
360                                         sstrerror (errno, errbuf, sizeof (errbuf)));
361                         continue;
362                 }
363                 else if (!S_ISREG (statbuf.st_mode))
364                 {
365                         /* don't follow symlinks */
366                         continue;
367                 }
368
369                 if (plugin_load_file (filename) == 0)
370                 {
371                         /* success */
372                         ret = 0;
373                         break;
374                 }
375                 else
376                 {
377                         fprintf (stderr, "Unable to load plugin %s.\n", type);
378                 }
379         }
380
381         closedir (dh);
382
383         return (ret);
384 }
385
386 /*
387  * The `register_*' functions follow
388  */
389 int plugin_register_config (const char *name,
390                 int (*callback) (const char *key, const char *val),
391                 const char **keys, int keys_num)
392 {
393         cf_register (name, callback, keys, keys_num);
394         return (0);
395 } /* int plugin_register_config */
396
397 int plugin_register_complex_config (const char *type,
398                 int (*callback) (oconfig_item_t *))
399 {
400         return (cf_register_complex (type, callback));
401 } /* int plugin_register_complex_config */
402
403 int plugin_register_init (const char *name,
404                 int (*callback) (void))
405 {
406         return (register_callback (&list_init, name, (void *) callback));
407 } /* plugin_register_init */
408
409 int plugin_register_read (const char *name,
410                 int (*callback) (void))
411 {
412         read_func_t *rf;
413
414         rf = (read_func_t *) malloc (sizeof (read_func_t));
415         if (rf == NULL)
416         {
417                 char errbuf[1024];
418                 ERROR ("plugin_register_read: malloc failed: %s",
419                                 sstrerror (errno, errbuf, sizeof (errbuf)));
420                 return (-1);
421         }
422
423         memset (rf, '\0', sizeof (read_func_t));
424         rf->wait_time = interval_g;
425         rf->wait_left = 0;
426         rf->callback = callback;
427         rf->needs_read = DONE;
428
429         return (register_callback (&list_read, name, (void *) rf));
430 } /* int plugin_register_read */
431
432 int plugin_register_write (const char *name,
433                 int (*callback) (const data_set_t *ds, const value_list_t *vl))
434 {
435         return (register_callback (&list_write, name, (void *) callback));
436 } /* int plugin_register_write */
437
438 int plugin_register_flush (const char *name, int (*callback) (const int))
439 {
440         return (register_callback (&list_flush, name, (void *) callback));
441 } /* int plugin_register_flush */
442
443 int plugin_register_shutdown (char *name,
444                 int (*callback) (void))
445 {
446         return (register_callback (&list_shutdown, name, (void *) callback));
447 } /* int plugin_register_shutdown */
448
449 int plugin_register_data_set (const data_set_t *ds)
450 {
451         data_set_t *ds_copy;
452         int i;
453
454         if ((data_sets != NULL)
455                         && (c_avl_get (data_sets, ds->type, NULL) == 0))
456         {
457                 NOTICE ("Replacing DS `%s' with another version.", ds->type);
458                 plugin_unregister_data_set (ds->type);
459         }
460         else if (data_sets == NULL)
461         {
462                 data_sets = c_avl_create ((int (*) (const void *, const void *)) strcmp);
463                 if (data_sets == NULL)
464                         return (-1);
465         }
466
467         ds_copy = (data_set_t *) malloc (sizeof (data_set_t));
468         if (ds_copy == NULL)
469                 return (-1);
470         memcpy(ds_copy, ds, sizeof (data_set_t));
471
472         ds_copy->ds = (data_source_t *) malloc (sizeof (data_source_t)
473                         * ds->ds_num);
474         if (ds_copy->ds == NULL)
475         {
476                 free (ds_copy);
477                 return (-1);
478         }
479
480         for (i = 0; i < ds->ds_num; i++)
481                 memcpy (ds_copy->ds + i, ds->ds + i, sizeof (data_source_t));
482
483         return (c_avl_insert (data_sets, (void *) ds_copy->type, (void *) ds_copy));
484 } /* int plugin_register_data_set */
485
486 int plugin_register_log (char *name,
487                 void (*callback) (int priority, const char *msg))
488 {
489         return (register_callback (&list_log, name, (void *) callback));
490 } /* int plugin_register_log */
491
492 int plugin_register_notification (const char *name,
493                 int (*callback) (const notification_t *notif))
494 {
495         return (register_callback (&list_notification, name, (void *) callback));
496 } /* int plugin_register_log */
497
498 int plugin_unregister_config (const char *name)
499 {
500         cf_unregister (name);
501         return (0);
502 } /* int plugin_unregister_config */
503
504 int plugin_unregister_complex_config (const char *name)
505 {
506         cf_unregister_complex (name);
507         return (0);
508 } /* int plugin_unregister_complex_config */
509
510 int plugin_unregister_init (const char *name)
511 {
512         return (plugin_unregister (list_init, name));
513 }
514
515 int plugin_unregister_read (const char *name)
516 {
517         llentry_t *e;
518
519         e = llist_search (list_read, name);
520
521         if (e == NULL)
522                 return (-1);
523
524         llist_remove (list_read, e);
525         free (e->value);
526         free (e->key);
527         llentry_destroy (e);
528
529         return (0);
530 }
531
532 int plugin_unregister_write (const char *name)
533 {
534         return (plugin_unregister (list_write, name));
535 }
536
537 int plugin_unregister_flush (const char *name)
538 {
539         return (plugin_unregister (list_flush, name));
540 }
541
542 int plugin_unregister_shutdown (const char *name)
543 {
544         return (plugin_unregister (list_shutdown, name));
545 }
546
547 int plugin_unregister_data_set (const char *name)
548 {
549         data_set_t *ds;
550
551         if (data_sets == NULL)
552                 return (-1);
553
554         if (c_avl_remove (data_sets, name, NULL, (void *) &ds) != 0)
555                 return (-1);
556
557         sfree (ds->ds);
558         sfree (ds);
559
560         return (0);
561 } /* int plugin_unregister_data_set */
562
563 int plugin_unregister_log (const char *name)
564 {
565         return (plugin_unregister (list_log, name));
566 }
567
568 int plugin_unregister_notification (const char *name)
569 {
570         return (plugin_unregister (list_notification, name));
571 }
572
573 void plugin_init_all (void)
574 {
575         int (*callback) (void);
576         llentry_t *le;
577         int status;
578
579         /* Start read-threads */
580         if (list_read != NULL)
581         {
582                 const char *rt;
583                 int num;
584                 rt = global_option_get ("ReadThreads");
585                 num = atoi (rt);
586                 start_threads ((num > 0) ? num : 5);
587         }
588
589         /* Init the value cache */
590         uc_init ();
591
592         if (list_init == NULL)
593                 return;
594
595         le = llist_head (list_init);
596         while (le != NULL)
597         {
598                 callback = (int (*) (void)) le->value;
599                 status = (*callback) ();
600
601                 if (status != 0)
602                 {
603                         ERROR ("Initialization of plugin `%s' "
604                                         "failed with status %i. "
605                                         "Plugin will be unloaded.",
606                                         le->key, status);
607                         /* FIXME: Unload _all_ functions */
608                         plugin_unregister_read (le->key);
609                 }
610
611                 le = le->next;
612         }
613 } /* void plugin_init_all */
614
615 void plugin_read_all (void)
616 {
617         llentry_t   *le;
618         read_func_t *rf;
619
620         uc_check_timeout ();
621
622         if (list_read == NULL)
623                 return;
624
625         pthread_mutex_lock (&read_lock);
626
627         le = llist_head (list_read);
628         while (le != NULL)
629         {
630                 rf = (read_func_t *) le->value;
631
632                 if (rf->needs_read != DONE)
633                 {
634                         le = le->next;
635                         continue;
636                 }
637
638                 if (rf->wait_left > 0)
639                         rf->wait_left -= interval_g;
640
641                 if (rf->wait_left <= 0)
642                 {
643                         rf->needs_read = TODO;
644                 }
645
646                 le = le->next;
647         }
648
649         DEBUG ("plugin: plugin_read_all: Signalling `read_cond'");
650         pthread_cond_broadcast (&read_cond);
651         pthread_mutex_unlock (&read_lock);
652 } /* void plugin_read_all */
653
654 int plugin_flush_one (int timeout, const char *name)
655 {
656         int (*callback) (int);
657         llentry_t *le;
658         int status;
659
660         if (list_flush == NULL)
661                 return (-1);
662
663         le = llist_search (list_flush, name);
664         if (le == NULL)
665                 return (-1);
666         callback = (int (*) (int)) le->value;
667
668         status = (*callback) (timeout);
669
670         return (status);
671 } /* int plugin_flush_ont */
672
673 void plugin_flush_all (int timeout)
674 {
675         int (*callback) (int);
676         llentry_t *le;
677
678         if (list_flush == NULL)
679                 return;
680
681         le = llist_head (list_flush);
682         while (le != NULL)
683         {
684                 callback = (int (*) (int)) le->value;
685                 le = le->next;
686
687                 (*callback) (timeout);
688         }
689 } /* void plugin_flush_all */
690
691 void plugin_shutdown_all (void)
692 {
693         int (*callback) (void);
694         llentry_t *le;
695
696         stop_threads ();
697
698         if (list_shutdown == NULL)
699                 return;
700
701         le = llist_head (list_shutdown);
702         while (le != NULL)
703         {
704                 callback = (int (*) (void)) le->value;
705
706                 /* Advance the pointer before calling the callback allows
707                  * shutdown functions to unregister themselves. If done the
708                  * other way around the memory `le' points to will be freed
709                  * after callback returns. */
710                 le = le->next;
711
712                 (*callback) ();
713         }
714 } /* void plugin_shutdown_all */
715
716 int plugin_dispatch_values (const char *name, value_list_t *vl)
717 {
718         int (*callback) (const data_set_t *, const value_list_t *);
719         data_set_t *ds;
720         llentry_t *le;
721
722         if ((list_write == NULL) || (data_sets == NULL))
723                 return (-1);
724
725         if (c_avl_get (data_sets, name, (void *) &ds) != 0)
726         {
727                 DEBUG ("No such dataset registered: %s", name);
728                 return (-1);
729         }
730
731         DEBUG ("plugin: plugin_dispatch_values: time = %u; interval = %i; "
732                         "host = %s; "
733                         "plugin = %s; plugin_instance = %s; "
734                         "type = %s; type_instance = %s;",
735                         (unsigned int) vl->time, vl->interval,
736                         vl->host,
737                         vl->plugin, vl->plugin_instance,
738                         ds->type, vl->type_instance);
739
740 #if COLLECT_DEBUG
741         assert (ds->ds_num == vl->values_len);
742 #else
743         if (ds->ds_num != vl->values_len)
744         {
745                 ERROR ("plugin: ds->type = %s: (ds->ds_num = %i) != "
746                                 "(vl->values_len = %i)",
747                                 ds->type, ds->ds_num, vl->values_len);
748                 return (-1);
749         }
750 #endif
751
752         escape_slashes (vl->host, sizeof (vl->host));
753         escape_slashes (vl->plugin, sizeof (vl->plugin));
754         escape_slashes (vl->plugin_instance, sizeof (vl->plugin_instance));
755         escape_slashes (vl->type_instance, sizeof (vl->type_instance));
756
757         /* Update the value cache */
758         uc_update (ds, vl);
759         ut_check_threshold (ds, vl);
760
761         le = llist_head (list_write);
762         while (le != NULL)
763         {
764                 callback = (int (*) (const data_set_t *, const value_list_t *)) le->value;
765                 (*callback) (ds, vl);
766
767                 le = le->next;
768         }
769
770         return (0);
771 } /* int plugin_dispatch_values */
772
773 int plugin_dispatch_notification (const notification_t *notif)
774 {
775         int (*callback) (const notification_t *);
776         llentry_t *le;
777         /* Possible TODO: Add flap detection here */
778
779         DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
780                         "time = %u; host = %s;",
781                         notif->severity, notif->message,
782                         (unsigned int) notif->time, notif->host);
783
784         /* Nobody cares for notifications */
785         if (list_notification == NULL)
786                 return (-1);
787
788         le = llist_head (list_notification);
789         while (le != NULL)
790         {
791                 callback = (int (*) (const notification_t *)) le->value;
792                 (*callback) (notif);
793
794                 le = le->next;
795         }
796
797         return (0);
798 } /* int plugin_dispatch_notification */
799
800 void plugin_log (int level, const char *format, ...)
801 {
802         char msg[512];
803         va_list ap;
804
805         void (*callback) (int, const char *);
806         llentry_t *le;
807
808         if (list_log == NULL)
809                 return;
810
811 #if !COLLECT_DEBUG
812         if (level >= LOG_DEBUG)
813                 return;
814 #endif
815
816         va_start (ap, format);
817         vsnprintf (msg, 512, format, ap);
818         msg[511] = '\0';
819         va_end (ap);
820
821         le = llist_head (list_log);
822         while (le != NULL)
823         {
824                 callback = (void (*) (int, const char *)) le->value;
825                 (*callback) (level, msg);
826
827                 le = le->next;
828         }
829 } /* void plugin_log */
830
831 const data_set_t *plugin_get_ds (const char *name)
832 {
833         data_set_t *ds;
834
835         if (c_avl_get (data_sets, name, (void *) &ds) != 0)
836         {
837                 DEBUG ("No such dataset registered: %s", name);
838                 return (NULL);
839         }
840
841         return (ds);
842 } /* data_set_t *plugin_get_ds */