plugin.c: prevent re-adding read functions
[collectd.git] / src / plugin.c
index 5ff21c6..52a5ea0 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/plugin.c
- * Copyright (C) 2005-2009  Florian octo Forster
+ * Copyright (C) 2005-2010  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -16,7 +16,7 @@
  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  *
  * Authors:
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  *   Sebastian Harl <sh at tokkee.org>
  **/
 
@@ -36,7 +36,6 @@
 #include "utils_llist.h"
 #include "utils_heap.h"
 #include "utils_cache.h"
-#include "utils_threshold.h"
 #include "filter_chain.h"
 
 /*
@@ -59,6 +58,7 @@ struct read_func_s
 #define rf_callback rf_super.cf_callback
 #define rf_udata rf_super.cf_udata
        callback_func_t rf_super;
+       char rf_group[DATA_MAX_NAME_LEN];
        char rf_name[DATA_MAX_NAME_LEN];
        int rf_type;
        struct timespec rf_interval;
@@ -73,6 +73,7 @@ typedef struct read_func_s read_func_t;
 static llist_t *list_init;
 static llist_t *list_write;
 static llist_t *list_flush;
+static llist_t *list_missing;
 static llist_t *list_shutdown;
 static llist_t *list_log;
 static llist_t *list_notification;
@@ -335,7 +336,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
        while (read_loop != 0)
        {
                read_func_t *rf;
-               struct timeval now;
+               cdtime_t now;
                int status;
                int rf_type;
                int rc;
@@ -346,10 +347,9 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                {
                        struct timespec abstime;
 
-                       gettimeofday (&now, /* timezone = */ NULL);
+                       now = cdtime ();
 
-                       abstime.tv_sec = now.tv_sec + interval_g;
-                       abstime.tv_nsec = 1000 * now.tv_usec;
+                       CDTIME_T_TO_TIMESPEC (now + interval_g, &abstime);
 
                        pthread_mutex_lock (&read_lock);
                        pthread_cond_timedwait (&read_cond, &read_lock,
@@ -360,15 +360,13 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
 
                if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
                {
-                       gettimeofday (&now, /* timezone = */ NULL);
+                       now = cdtime ();
 
-                       rf->rf_interval.tv_sec = interval_g;
-                       rf->rf_interval.tv_nsec = 0;
+                       CDTIME_T_TO_TIMESPEC (interval_g, &rf->rf_interval);
 
                        rf->rf_effective_interval = rf->rf_interval;
 
-                       rf->rf_next_read.tv_sec = now.tv_sec;
-                       rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
+                       CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
                }
 
                /* sleep until this entry is due,
@@ -379,12 +377,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                 * we need to re-evaluate the condition every time
                 * pthread_cond_timedwait returns. */
                rc = 0;
-               while (!timeout_reached(rf->rf_next_read) && rc == 0) {
+               while ((read_loop != 0)
+                               && !timeout_reached(rf->rf_next_read)
+                               && rc == 0)
+               {
                        rc = pthread_cond_timedwait (&read_cond, &read_lock,
                                &rf->rf_next_read);
                }
 
-               /* Must hold `real_lock' when accessing `rf->rf_type'. */
+               /* Must hold `read_lock' when accessing `rf->rf_type'. */
                rf_type = rf->rf_type;
                pthread_mutex_unlock (&read_lock);
 
@@ -455,7 +456,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                }
 
                /* update the ``next read due'' field */
-               gettimeofday (&now, /* timezone = */ NULL);
+               now = cdtime ();
 
                DEBUG ("plugin_read_thread: Effective interval of the "
                                "%s plugin is %i.%09i.",
@@ -472,15 +473,12 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                NORMALIZE_TIMESPEC (rf->rf_next_read);
 
                /* Check, if `rf_next_read' is in the past. */
-               if ((rf->rf_next_read.tv_sec < now.tv_sec)
-                               || ((rf->rf_next_read.tv_sec == now.tv_sec)
-                                       && (rf->rf_next_read.tv_nsec < (1000 * now.tv_usec))))
+               if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now)
                {
                        /* `rf_next_read' is in the past. Insert `now'
                         * so this value doesn't trail off into the
                         * past too much. */
-                       rf->rf_next_read.tv_sec = now.tv_sec;
-                       rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
+                       CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
                }
 
                DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.",
@@ -730,26 +728,36 @@ static int plugin_insert_read (read_func_t *rf)
                }
        }
 
-       le = llentry_create (rf->rf_name, rf);
+       le = llist_search (read_list, rf->rf_name);
        if (le == NULL)
        {
-               pthread_mutex_unlock (&read_lock);
-               ERROR ("plugin_insert_read: llentry_create failed.");
-               return (-1);
-       }
+               le = llentry_create (rf->rf_name, rf);
+               if (le == NULL)
+               {
+                       pthread_mutex_unlock (&read_lock);
+                       ERROR ("plugin_insert_read: llentry_create failed.");
+                       return (-1);
+               }
+
+               status = c_heap_insert (read_heap, rf);
+               if (status != 0)
+               {
+                       pthread_mutex_unlock (&read_lock);
+                       ERROR ("plugin_insert_read: c_heap_insert failed.");
+                       llentry_destroy (le);
+                       return (-1);
+               }
 
-       status = c_heap_insert (read_heap, rf);
-       if (status != 0)
+               /* This does not fail. */
+               llist_append (read_list, le);
+       }
+       else
        {
-               pthread_mutex_unlock (&read_lock);
-               ERROR ("plugin_insert_read: c_heap_insert failed.");
-               llentry_destroy (le);
-               return (-1);
+               INFO ("plugin: plugin_insert_read: "
+                               "read function for plugin `%s' already added.",
+                               rf->rf_name);
        }
 
-       /* This does not fail. */
-       llist_append (read_list, le);
-
        pthread_mutex_unlock (&read_lock);
        return (0);
 } /* int plugin_insert_read */
@@ -772,6 +780,7 @@ int plugin_register_read (const char *name,
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
+       rf->rf_group[0] = '\0';
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_SIMPLE;
        rf->rf_interval.tv_sec = 0;
@@ -781,7 +790,7 @@ int plugin_register_read (const char *name,
        return (plugin_insert_read (rf));
 } /* int plugin_register_read */
 
-int plugin_register_complex_read (const char *name,
+int plugin_register_complex_read (const char *group, const char *name,
                plugin_read_cb callback,
                const struct timespec *interval,
                user_data_t *user_data)
@@ -797,6 +806,10 @@ int plugin_register_complex_read (const char *name,
 
        memset (rf, 0, sizeof (read_func_t));
        rf->rf_callback = (void *) callback;
+       if (group != NULL)
+               sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
+       else
+               rf->rf_group[0] = '\0';
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_COMPLEX;
        if (interval != NULL)
@@ -833,6 +846,13 @@ int plugin_register_flush (const char *name,
                                (void *) callback, ud));
 } /* int plugin_register_flush */
 
+int plugin_register_missing (const char *name,
+               plugin_missing_cb callback, user_data_t *ud)
+{
+       return (create_register_callback (&list_missing, name,
+                               (void *) callback, ud));
+} /* int plugin_register_missing */
+
 int plugin_register_shutdown (char *name,
                int (*callback) (void))
 {
@@ -948,6 +968,67 @@ int plugin_unregister_read (const char *name) /* {{{ */
        return (0);
 } /* }}} int plugin_unregister_read */
 
+static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */
+{
+       read_func_t *rf    = e->value;
+       char        *group = ud;
+
+       return strcmp (rf->rf_group, (const char *)group);
+} /* }}} int compare_read_func_group */
+
+int plugin_unregister_read_group (const char *group) /* {{{ */
+{
+       llentry_t *le;
+       read_func_t *rf;
+
+       int found = 0;
+
+       if (group == NULL)
+               return (-ENOENT);
+
+       pthread_mutex_lock (&read_lock);
+
+       if (read_list == NULL)
+       {
+               pthread_mutex_unlock (&read_lock);
+               return (-ENOENT);
+       }
+
+       while (42)
+       {
+               le = llist_search_custom (read_list,
+                               compare_read_func_group, (void *)group);
+
+               if (le == NULL)
+                       break;
+
+               ++found;
+
+               llist_remove (read_list, le);
+
+               rf = le->value;
+               assert (rf != NULL);
+               rf->rf_type = RF_REMOVE;
+
+               llentry_destroy (le);
+
+               DEBUG ("plugin_unregister_read_group: "
+                               "Marked `%s' (group `%s') for removal.",
+                               rf->rf_name, group);
+       }
+
+       pthread_mutex_unlock (&read_lock);
+
+       if (found == 0)
+       {
+               WARNING ("plugin_unregister_read_group: No such "
+                               "group of read function: %s", group);
+               return (-ENOENT);
+       }
+
+       return (0);
+} /* }}} int plugin_unregister_read_group */
+
 int plugin_unregister_write (const char *name)
 {
        return (plugin_unregister (list_write, name));
@@ -958,6 +1039,11 @@ int plugin_unregister_flush (const char *name)
        return (plugin_unregister (list_flush, name));
 }
 
+int plugin_unregister_missing (const char *name)
+{
+       return (plugin_unregister (list_missing, name));
+}
+
 int plugin_unregister_shutdown (const char *name)
 {
        return (plugin_unregister (list_shutdown, name));
@@ -1181,7 +1267,7 @@ int plugin_write (const char *plugin, /* {{{ */
   return (status);
 } /* }}} int plugin_write */
 
-int plugin_flush (const char *plugin, int timeout, const char *identifier)
+int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
 {
   llentry_t *le;
 
@@ -1226,7 +1312,8 @@ void plugin_shutdown_all (void)
 
        destroy_read_heap ();
 
-       plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
+       plugin_flush (/* plugin = */ NULL,
+                       /* timeout = */ 0,
                        /* identifier = */ NULL);
 
        le = NULL;
@@ -1256,6 +1343,7 @@ void plugin_shutdown_all (void)
         * the real free function when registering the write callback. This way
         * the data isn't freed twice. */
        destroy_all_callbacks (&list_flush);
+       destroy_all_callbacks (&list_missing);
        destroy_all_callbacks (&list_write);
 
        destroy_all_callbacks (&list_notification);
@@ -1263,6 +1351,44 @@ void plugin_shutdown_all (void)
        destroy_all_callbacks (&list_log);
 } /* void plugin_shutdown_all */
 
+int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
+{
+  llentry_t *le;
+
+  if (list_missing == NULL)
+    return (0);
+
+  le = llist_head (list_missing);
+  while (le != NULL)
+  {
+    callback_func_t *cf;
+    plugin_missing_cb callback;
+    int status;
+
+    cf = le->value;
+    callback = cf->cf_callback;
+
+    status = (*callback) (vl, &cf->cf_udata);
+    if (status != 0)
+    {
+      if (status < 0)
+      {
+        ERROR ("plugin_dispatch_missing: Callback function \"%s\" "
+            "failed with status %i.",
+            le->key, status);
+        return (status);
+      }
+      else
+      {
+        return (0);
+      }
+    }
+
+    le = le->next;
+  }
+  return (0);
+} /* int }}} plugin_dispatch_missing */
+
 int plugin_dispatch_values (value_list_t *vl)
 {
        int status;
@@ -1304,21 +1430,27 @@ int plugin_dispatch_values (value_list_t *vl)
 
        if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0)
        {
-               INFO ("plugin_dispatch_values: Dataset not found: %s", vl->type);
+               char ident[6 * DATA_MAX_NAME_LEN];
+
+               FORMAT_VL (ident, sizeof (ident), vl);
+               INFO ("plugin_dispatch_values: Dataset not found: %s "
+                               "(from \"%s\"), check your types.db!",
+                               vl->type, ident);
                return (-1);
        }
 
        if (vl->time == 0)
-               vl->time = time (NULL);
+               vl->time = cdtime ();
 
        if (vl->interval <= 0)
                vl->interval = interval_g;
 
-       DEBUG ("plugin_dispatch_values: time = %u; interval = %i; "
+       DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
                        "host = %s; "
                        "plugin = %s; plugin_instance = %s; "
                        "type = %s; type_instance = %s;",
-                       (unsigned int) vl->time, vl->interval,
+                       CDTIME_T_TO_DOUBLE (vl->time),
+                       CDTIME_T_TO_DOUBLE (vl->interval),
                        vl->host,
                        vl->plugin, vl->plugin_instance,
                        vl->type, vl->type_instance);
@@ -1402,9 +1534,6 @@ int plugin_dispatch_values (value_list_t *vl)
        /* Update the value cache */
        uc_update (ds, vl);
 
-       /* Initiate threshold checking */
-       ut_check_threshold (ds, vl);
-
        if (post_cache_chain != NULL)
        {
                status = fc_process_chain (ds, vl, post_cache_chain);
@@ -1443,9 +1572,9 @@ int plugin_dispatch_notification (const notification_t *notif)
        /* Possible TODO: Add flap detection here */
 
        DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
-                       "time = %u; host = %s;",
+                       "time = %.3f; host = %s;",
                        notif->severity, notif->message,
-                       (unsigned int) notif->time, notif->host);
+                       CDTIME_T_TO_DOUBLE (notif->time), notif->host);
 
        /* Nobody cares for notifications */
        if (list_notification == NULL)
@@ -1579,7 +1708,7 @@ static int plugin_notification_meta_add (notification_t *n,
     }
     case NM_TYPE_BOOLEAN:
     {
-      meta->nm_value.nm_boolean = *((bool *) value);
+      meta->nm_value.nm_boolean = *((_Bool *) value);
       break;
     }
     default:
@@ -1633,7 +1762,7 @@ int plugin_notification_meta_add_double (notification_t *n,
 
 int plugin_notification_meta_add_boolean (notification_t *n,
     const char *name,
-    bool value)
+    _Bool value)
 {
   return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value));
 }