6b17290f32dbe08a9a4bb6406cbc8fef35efdd01
[collectd.git] / src / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005,2006  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23
24 #include <ltdl.h>
25
26 #if HAVE_PTHREAD_H
27 # include <pthread.h>
28 #endif
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "configfile.h"
33 #include "utils_llist.h"
34 #include "utils_cache.h"
35 #include "utils_threshold.h"
36
37 /*
38  * Private structures
39  */
40 struct read_func_s
41 {
42         int wait_time;
43         int wait_left;
44         int (*callback) (void);
45         enum { DONE = 0, TODO = 1, ACTIVE = 2 } needs_read;
46 };
47 typedef struct read_func_s read_func_t;
48
49 /*
50  * Private variables
51  */
52 static llist_t *list_init;
53 static llist_t *list_read;
54 static llist_t *list_write;
55 static llist_t *list_shutdown;
56 static llist_t *list_data_set;
57 static llist_t *list_log;
58 static llist_t *list_notification;
59
60 static char *plugindir = NULL;
61
62 static int             read_loop = 1;
63 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
64 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
65 static pthread_t      *read_threads = NULL;
66 static int             read_threads_num = 0;
67
68 /*
69  * Static functions
70  */
71 static const char *plugin_get_dir (void)
72 {
73         if (plugindir == NULL)
74                 return (PLUGINDIR);
75         else
76                 return (plugindir);
77 }
78
79 static int register_callback (llist_t **list, const char *name, void *callback)
80 {
81         llentry_t *le;
82
83         if ((*list == NULL)
84                         && ((*list = llist_create ()) == NULL))
85                 return (-1);
86
87         le = llist_search (*list, name);
88         if (le == NULL)
89         {
90                 le = llentry_create (name, callback);
91                 if (le == NULL)
92                         return (-1);
93
94                 llist_append (*list, le);
95         }
96         else
97         {
98                 le->value = callback;
99         }
100
101         return (0);
102 } /* int register_callback */
103
104 static int plugin_unregister (llist_t *list, const char *name)
105 {
106         llentry_t *e;
107
108         e = llist_search (list, name);
109
110         if (e == NULL)
111                 return (-1);
112
113         llist_remove (list, e);
114         llentry_destroy (e);
115
116         return (0);
117 } /* int plugin_unregister */
118
119 /*
120  * (Try to) load the shared object `file'. Won't complain if it isn't a shared
121  * object, but it will bitch about a shared object not having a
122  * ``module_register'' symbol..
123  */
124 static int plugin_load_file (char *file)
125 {
126         lt_dlhandle dlh;
127         void (*reg_handle) (void);
128
129         DEBUG ("file = %s", file);
130
131         lt_dlinit ();
132         lt_dlerror (); /* clear errors */
133
134         if ((dlh = lt_dlopen (file)) == NULL)
135         {
136                 const char *error = lt_dlerror ();
137
138                 ERROR ("lt_dlopen failed: %s", error);
139                 fprintf (stderr, "lt_dlopen failed: %s\n", error);
140                 return (1);
141         }
142
143         if ((reg_handle = (void (*) (void)) lt_dlsym (dlh, "module_register")) == NULL)
144         {
145                 WARNING ("Couldn't find symbol ``module_register'' in ``%s'': %s\n",
146                                 file, lt_dlerror ());
147                 lt_dlclose (dlh);
148                 return (-1);
149         }
150
151         (*reg_handle) ();
152
153         return (0);
154 }
155
156 static void *plugin_read_thread (void *args)
157 {
158         llentry_t   *le;
159         read_func_t *rf;
160         int          status;
161         int          done;
162
163         pthread_mutex_lock (&read_lock);
164
165         while (read_loop != 0)
166         {
167                 le = llist_head (list_read);
168                 done = 0;
169
170                 while ((read_loop != 0) && (le != NULL))
171                 {
172                         rf = (read_func_t *) le->value;
173
174                         if (rf->needs_read != TODO)
175                         {
176                                 le = le->next;
177                                 continue;
178                         }
179
180                         /* We will do this read function */
181                         rf->needs_read = ACTIVE;
182
183                         DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Handling %s",
184                                         (unsigned long int) pthread_self (), le->key);
185                         pthread_mutex_unlock (&read_lock);
186
187                         status = rf->callback ();
188                         done++;
189
190                         if (status != 0)
191                         {
192                                 if (rf->wait_time < interval_g)
193                                         rf->wait_time = interval_g;
194                                 rf->wait_left = rf->wait_time;
195                                 rf->wait_time = rf->wait_time * 2;
196                                 if (rf->wait_time > 86400)
197                                         rf->wait_time = 86400;
198
199                                 NOTICE ("read-function of plugin `%s' "
200                                                 "failed. Will suspend it for %i "
201                                                 "seconds.", le->key, rf->wait_left);
202                         }
203                         else
204                         {
205                                 rf->wait_left = 0;
206                                 rf->wait_time = interval_g;
207                         }
208
209                         pthread_mutex_lock (&read_lock);
210
211                         rf->needs_read = DONE;
212                         le = le->next;
213                 } /* while (le != NULL) */
214
215                 if ((read_loop != 0) && (done == 0))
216                 {
217                         DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Waiting on read_cond.",
218                                         (unsigned long int) pthread_self ());
219                         pthread_cond_wait (&read_cond, &read_lock);
220                 }
221         } /* while (read_loop) */
222
223         pthread_mutex_unlock (&read_lock);
224
225         pthread_exit (NULL);
226 } /* void *plugin_read_thread */
227
228 static void start_threads (int num)
229 {
230         int i;
231
232         if (read_threads != NULL)
233                 return;
234
235         read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
236         if (read_threads == NULL)
237         {
238                 ERROR ("plugin: start_threads: calloc failed.");
239                 return;
240         }
241
242         read_threads_num = 0;
243         for (i = 0; i < num; i++)
244         {
245                 if (pthread_create (read_threads + read_threads_num, NULL,
246                                         plugin_read_thread, NULL) == 0)
247                 {
248                         read_threads_num++;
249                 }
250                 else
251                 {
252                         ERROR ("plugin: start_threads: pthread_create failed.");
253                         return;
254                 }
255         } /* for (i) */
256 } /* void start_threads */
257
258 static void stop_threads (void)
259 {
260         int i;
261
262         pthread_mutex_lock (&read_lock);
263         read_loop = 0;
264         DEBUG ("plugin: stop_threads: Signalling `read_cond'");
265         pthread_cond_broadcast (&read_cond);
266         pthread_mutex_unlock (&read_lock);
267
268         for (i = 0; i < read_threads_num; i++)
269         {
270                 if (pthread_join (read_threads[i], NULL) != 0)
271                 {
272                         ERROR ("plugin: stop_threads: pthread_join failed.");
273                 }
274                 read_threads[i] = (pthread_t) 0;
275         }
276         sfree (read_threads);
277         read_threads_num = 0;
278 } /* void stop_threads */
279
280 /*
281  * Public functions
282  */
283 void plugin_set_dir (const char *dir)
284 {
285         if (plugindir != NULL)
286                 free (plugindir);
287
288         if (dir == NULL)
289                 plugindir = NULL;
290         else if ((plugindir = strdup (dir)) == NULL)
291         {
292                 char errbuf[1024];
293                 ERROR ("strdup failed: %s",
294                                 sstrerror (errno, errbuf, sizeof (errbuf)));
295         }
296 }
297
298 #define BUFSIZE 512
299 int plugin_load (const char *type)
300 {
301         DIR  *dh;
302         const char *dir;
303         char  filename[BUFSIZE];
304         char  typename[BUFSIZE];
305         int   typename_len;
306         int   ret;
307         struct stat    statbuf;
308         struct dirent *de;
309
310         DEBUG ("type = %s", type);
311
312         dir = plugin_get_dir ();
313         ret = 1;
314
315         /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
316          * type when matching the filename */
317         if (snprintf (typename, BUFSIZE, "%s.so", type) >= BUFSIZE)
318         {
319                 WARNING ("snprintf: truncated: `%s.so'", type);
320                 return (-1);
321         }
322         typename_len = strlen (typename);
323
324         if ((dh = opendir (dir)) == NULL)
325         {
326                 char errbuf[1024];
327                 ERROR ("opendir (%s): %s", dir,
328                                 sstrerror (errno, errbuf, sizeof (errbuf)));
329                 return (-1);
330         }
331
332         while ((de = readdir (dh)) != NULL)
333         {
334                 if (strncasecmp (de->d_name, typename, typename_len))
335                         continue;
336
337                 if (snprintf (filename, BUFSIZE, "%s/%s", dir, de->d_name) >= BUFSIZE)
338                 {
339                         WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name);
340                         continue;
341                 }
342
343                 if (lstat (filename, &statbuf) == -1)
344                 {
345                         char errbuf[1024];
346                         WARNING ("stat %s: %s", filename,
347                                         sstrerror (errno, errbuf, sizeof (errbuf)));
348                         continue;
349                 }
350                 else if (!S_ISREG (statbuf.st_mode))
351                 {
352                         /* don't follow symlinks */
353                         continue;
354                 }
355
356                 if (plugin_load_file (filename) == 0)
357                 {
358                         /* success */
359                         ret = 0;
360                         break;
361                 }
362                 else
363                 {
364                         fprintf (stderr, "Unable to load plugin %s.\n", type);
365                 }
366         }
367
368         closedir (dh);
369
370         return (ret);
371 }
372
373 /*
374  * The `register_*' functions follow
375  */
376 int plugin_register_config (const char *name,
377                 int (*callback) (const char *key, const char *val),
378                 const char **keys, int keys_num)
379 {
380         cf_register (name, callback, keys, keys_num);
381         return (0);
382 } /* int plugin_register_config */
383
384 int plugin_register_complex_config (const char *type,
385                 int (*callback) (oconfig_item_t *))
386 {
387         return (cf_register_complex (type, callback));
388 } /* int plugin_register_complex_config */
389
390 int plugin_register_init (const char *name,
391                 int (*callback) (void))
392 {
393         return (register_callback (&list_init, name, (void *) callback));
394 } /* plugin_register_init */
395
396 int plugin_register_read (const char *name,
397                 int (*callback) (void))
398 {
399         read_func_t *rf;
400
401         rf = (read_func_t *) malloc (sizeof (read_func_t));
402         if (rf == NULL)
403         {
404                 char errbuf[1024];
405                 ERROR ("plugin_register_read: malloc failed: %s",
406                                 sstrerror (errno, errbuf, sizeof (errbuf)));
407                 return (-1);
408         }
409
410         memset (rf, '\0', sizeof (read_func_t));
411         rf->wait_time = interval_g;
412         rf->wait_left = 0;
413         rf->callback = callback;
414         rf->needs_read = DONE;
415
416         return (register_callback (&list_read, name, (void *) rf));
417 } /* int plugin_register_read */
418
419 int plugin_register_write (const char *name,
420                 int (*callback) (const data_set_t *ds, const value_list_t *vl))
421 {
422         return (register_callback (&list_write, name, (void *) callback));
423 } /* int plugin_register_write */
424
425 int plugin_register_shutdown (char *name,
426                 int (*callback) (void))
427 {
428         return (register_callback (&list_shutdown, name, (void *) callback));
429 } /* int plugin_register_shutdown */
430
431 int plugin_register_data_set (const data_set_t *ds)
432 {
433         data_set_t *ds_copy;
434         int i;
435
436         if ((list_data_set != NULL)
437                         && (llist_search (list_data_set, ds->type) != NULL))
438         {
439                 NOTICE ("Replacing DS `%s' with another version.", ds->type);
440                 plugin_unregister_data_set (ds->type);
441         }
442
443         ds_copy = (data_set_t *) malloc (sizeof (data_set_t));
444         if (ds_copy == NULL)
445                 return (-1);
446         memcpy(ds_copy, ds, sizeof (data_set_t));
447
448         ds_copy->ds = (data_source_t *) malloc (sizeof (data_source_t)
449                         * ds->ds_num);
450         if (ds_copy->ds == NULL)
451         {
452                 free (ds_copy);
453                 return (-1);
454         }
455
456         for (i = 0; i < ds->ds_num; i++)
457                 memcpy (ds_copy->ds + i, ds->ds + i, sizeof (data_source_t));
458
459         return (register_callback (&list_data_set, ds->type, (void *) ds_copy));
460 } /* int plugin_register_data_set */
461
462 int plugin_register_log (char *name,
463                 void (*callback) (int priority, const char *msg))
464 {
465         return (register_callback (&list_log, name, (void *) callback));
466 } /* int plugin_register_log */
467
468 int plugin_register_notification (const char *name,
469                 int (*callback) (const notification_t *notif))
470 {
471         return (register_callback (&list_log, name, (void *) callback));
472 } /* int plugin_register_log */
473
474 int plugin_unregister_config (const char *name)
475 {
476         cf_unregister (name);
477         return (0);
478 } /* int plugin_unregister_config */
479
480 int plugin_unregister_complex_config (const char *name)
481 {
482         cf_unregister_complex (name);
483         return (0);
484 } /* int plugin_unregister_complex_config */
485
486 int plugin_unregister_init (const char *name)
487 {
488         return (plugin_unregister (list_init, name));
489 }
490
491 int plugin_unregister_read (const char *name)
492 {
493         llentry_t *e;
494
495         e = llist_search (list_read, name);
496
497         if (e == NULL)
498                 return (-1);
499
500         llist_remove (list_read, e);
501         free (e->value);
502         llentry_destroy (e);
503
504         return (0);
505 }
506
507 int plugin_unregister_write (const char *name)
508 {
509         return (plugin_unregister (list_write, name));
510 }
511
512 int plugin_unregister_shutdown (const char *name)
513 {
514         return (plugin_unregister (list_shutdown, name));
515 }
516
517 int plugin_unregister_data_set (const char *name)
518 {
519         llentry_t  *e;
520         data_set_t *ds;
521
522         if (list_data_set == NULL)
523                 return (-1);
524
525         e = llist_search (list_data_set, name);
526
527         if (e == NULL)
528                 return (-1);
529
530         llist_remove (list_data_set, e);
531         ds = (data_set_t *) e->value;
532         llentry_destroy (e);
533
534         sfree (ds->ds);
535         sfree (ds);
536
537         return (0);
538 } /* int plugin_unregister_data_set */
539
540 int plugin_unregister_log (const char *name)
541 {
542         return (plugin_unregister (list_log, name));
543 }
544
545 int plugin_unregister_notification (const char *name)
546 {
547         return (plugin_unregister (list_notification, name));
548 }
549
550 void plugin_init_all (void)
551 {
552         int (*callback) (void);
553         llentry_t *le;
554         int status;
555
556         /* Start read-threads */
557         if (list_read != NULL)
558         {
559                 const char *rt;
560                 int num;
561                 rt = global_option_get ("ReadThreads");
562                 num = atoi (rt);
563                 start_threads ((num > 0) ? num : 5);
564         }
565
566         /* Init the value cache */
567         uc_init ();
568
569         if (list_init == NULL)
570                 return;
571
572         le = llist_head (list_init);
573         while (le != NULL)
574         {
575                 callback = (int (*) (void)) le->value;
576                 status = (*callback) ();
577
578                 if (status != 0)
579                 {
580                         ERROR ("Initialization of plugin `%s' "
581                                         "failed with status %i. "
582                                         "Plugin will be unloaded.",
583                                         le->key, status);
584                         /* FIXME: Unload _all_ functions */
585                         plugin_unregister_read (le->key);
586                 }
587
588                 le = le->next;
589         }
590 } /* void plugin_init_all */
591
592 void plugin_read_all (const int *loop)
593 {
594         llentry_t   *le;
595         read_func_t *rf;
596
597         if (list_read == NULL)
598                 return;
599
600         pthread_mutex_lock (&read_lock);
601
602         le = llist_head (list_read);
603         while (le != NULL)
604         {
605                 rf = (read_func_t *) le->value;
606
607                 if (rf->needs_read != DONE)
608                 {
609                         le = le->next;
610                         continue;
611                 }
612
613                 if (rf->wait_left > 0)
614                         rf->wait_left -= interval_g;
615
616                 if (rf->wait_left <= 0)
617                 {
618                         rf->needs_read = TODO;
619                 }
620
621                 le = le->next;
622         }
623
624         DEBUG ("plugin: plugin_read_all: Signalling `read_cond'");
625         pthread_cond_broadcast (&read_cond);
626         pthread_mutex_unlock (&read_lock);
627 } /* void plugin_read_all */
628
629 void plugin_shutdown_all (void)
630 {
631         int (*callback) (void);
632         llentry_t *le;
633
634         stop_threads ();
635
636         if (list_shutdown == NULL)
637                 return;
638
639         le = llist_head (list_shutdown);
640         while (le != NULL)
641         {
642                 callback = (int (*) (void)) le->value;
643
644                 /* Advance the pointer before calling the callback allows
645                  * shutdown functions to unregister themselves. If done the
646                  * other way around the memory `le' points to will be freed
647                  * after callback returns. */
648                 le = le->next;
649
650                 (*callback) ();
651         }
652 } /* void plugin_shutdown_all */
653
654 int plugin_dispatch_values (const char *name, value_list_t *vl)
655 {
656         int (*callback) (const data_set_t *, const value_list_t *);
657         data_set_t *ds;
658         llentry_t *le;
659
660         if ((list_write == NULL) || (list_data_set == NULL))
661                 return (-1);
662
663         le = llist_search (list_data_set, name);
664         if (le == NULL)
665         {
666                 DEBUG ("No such dataset registered: %s", name);
667                 return (-1);
668         }
669
670         ds = (data_set_t *) le->value;
671
672         DEBUG ("plugin: plugin_dispatch_values: time = %u; interval = %i; "
673                         "host = %s; "
674                         "plugin = %s; plugin_instance = %s; "
675                         "type = %s; type_instance = %s;",
676                         (unsigned int) vl->time, vl->interval,
677                         vl->host,
678                         vl->plugin, vl->plugin_instance,
679                         ds->type, vl->type_instance);
680
681 #if COLLECT_DEBUG
682         assert (ds->ds_num == vl->values_len);
683 #else
684         if (ds->ds_num != vl->values_len)
685         {
686                 ERROR ("plugin: ds->type = %s: (ds->ds_num = %i) != "
687                                 "(vl->values_len = %i)",
688                                 ds->type, ds->ds_num, vl->values_len);
689                 return (-1);
690         }
691 #endif
692
693         escape_slashes (vl->host, sizeof (vl->host));
694         escape_slashes (vl->plugin, sizeof (vl->plugin));
695         escape_slashes (vl->plugin_instance, sizeof (vl->plugin_instance));
696         escape_slashes (vl->type_instance, sizeof (vl->type_instance));
697
698         /* Update the value cache */
699         uc_update (ds, vl);
700         ut_check_threshold (ds, vl);
701
702         le = llist_head (list_write);
703         while (le != NULL)
704         {
705                 callback = (int (*) (const data_set_t *, const value_list_t *)) le->value;
706                 (*callback) (ds, vl);
707
708                 le = le->next;
709         }
710
711         return (0);
712 } /* int plugin_dispatch_values */
713
714 int plugin_dispatch_notification (const notification_t *notif)
715 {
716         int (*callback) (const notification_t *);
717         llentry_t *le;
718         /* Possible TODO: Add flap detection here */
719
720         DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
721                         "time = %u; host = %s;",
722                         notif->severity, notif->message,
723                         (unsigned int) notif->time, notif->host);
724
725         /* Nobody cares for notifications */
726         if (list_notification == NULL)
727                 return (-1);
728
729         le = llist_head (list_notification);
730         while (le != NULL)
731         {
732                 callback = (int (*) (const notification_t *)) le->value;
733                 (*callback) (notif);
734
735                 le = le->next;
736         }
737
738         return (0);
739 } /* int plugin_dispatch_notification */
740
741 void plugin_log (int level, const char *format, ...)
742 {
743         char msg[512];
744         va_list ap;
745
746         void (*callback) (int, const char *);
747         llentry_t *le;
748
749         if (list_log == NULL)
750                 return;
751
752 #if !COLLECT_DEBUG
753         if (level >= LOG_DEBUG)
754                 return;
755 #endif
756
757         va_start (ap, format);
758         vsnprintf (msg, 512, format, ap);
759         msg[511] = '\0';
760         va_end (ap);
761
762         le = llist_head (list_log);
763         while (le != NULL)
764         {
765                 callback = (void (*) (int, const char *)) le->value;
766                 (*callback) (level, msg);
767
768                 le = le->next;
769         }
770 } /* void plugin_log */
771
772 void plugin_complain (int level, complain_t *c, const char *format, ...)
773 {
774         char message[512];
775         va_list ap;
776
777         if (c->delay > 0)
778         {
779                 c->delay--;
780                 return;
781         }
782
783         if (c->interval < interval_g)
784                 c->interval = interval_g;
785         else
786                 c->interval *= 2;
787
788         if (c->interval > 86400)
789                 c->interval = 86400;
790
791         c->delay = c->interval / interval_g;
792
793         va_start (ap, format);
794         vsnprintf (message, 512, format, ap);
795         message[511] = '\0';
796         va_end (ap);
797
798         plugin_log (level, message);
799 }
800
801 void plugin_relief (int level, complain_t *c, const char *format, ...)
802 {
803         char message[512];
804         va_list ap;
805
806         if (c->interval == 0)
807                 return;
808
809         c->interval = 0;
810
811         va_start (ap, format);
812         vsnprintf (message, 512, format, ap);
813         message[511] = '\0';
814         va_end (ap);
815
816         plugin_log (level, message);
817 }
818
819 const data_set_t *plugin_get_ds (const char *name)
820 {
821         data_set_t *ds;
822         llentry_t *le;
823
824         le = llist_search (list_data_set, name);
825         if (le == NULL)
826         {
827                 DEBUG ("No such dataset registered: %s", name);
828                 return (NULL);
829         }
830
831         ds = (data_set_t *) le->value;
832
833         return (ds);
834 } /* data_set_t *plugin_get_ds */