src/plugin.[ch]: Implement `plugin_dispatch_values_async'.
[collectd.git] / src / plugin.c
index 4d503f7..6fd74e2 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/plugin.c
- * Copyright (C) 2005-2008  Florian octo Forster
+ * Copyright (C) 2005-2009  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -36,6 +36,7 @@
 #include "utils_llist.h"
 #include "utils_cache.h"
 #include "utils_threshold.h"
+#include "filter_chain.h"
 
 /*
  * Private structures
@@ -49,27 +50,45 @@ struct read_func_s
 };
 typedef struct read_func_s read_func_t;
 
+struct dispatch_queue_s;
+typedef struct dispatch_queue_s dispatch_queue_t;
+struct dispatch_queue_s
+{
+       value_list_t *vl;
+       dispatch_queue_t *next;
+};
+
 /*
  * Private variables
  */
 static llist_t *list_init;
 static llist_t *list_read;
 static llist_t *list_write;
-static llist_t *list_filter;
 static llist_t *list_flush;
 static llist_t *list_shutdown;
 static llist_t *list_log;
 static llist_t *list_notification;
 
+static fc_chain_t *pre_cache_chain = NULL;
+static fc_chain_t *post_cache_chain = NULL;
+
 static c_avl_tree_t *data_sets;
 
 static char *plugindir = NULL;
 
-static int             read_loop = 1;
+static int             read_loop = 0;
 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
-static int             read_threads_num = 0;
+static size_t          read_threads_num = 0;
+
+static int               dispatch_loop = 0;
+static pthread_mutex_t   dispatch_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t    dispatch_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t        *dispatch_threads = NULL;
+static size_t            dispatch_threads_num = 0;
+static dispatch_queue_t *dispatch_head;
+static dispatch_queue_t *dispatch_tail;
 
 /*
  * Static functions
@@ -168,7 +187,7 @@ static int plugin_load_file (char *file)
        return (0);
 }
 
-static void *plugin_read_thread (void *args)
+static void *plugin_read_thread (void __attribute__((unused)) *args)
 {
        llentry_t   *le;
        read_func_t *rf;
@@ -241,57 +260,138 @@ static void *plugin_read_thread (void *args)
        return ((void *) 0);
 } /* void *plugin_read_thread */
 
-static void start_threads (int num)
+static void *plugin_dispatch_thread (void *arg)
 {
-       int i;
+       pthread_mutex_lock (&dispatch_lock);
 
-       if (read_threads != NULL)
-               return;
+       while ((dispatch_loop != 0) || (dispatch_head != NULL))
+       {
+               dispatch_queue_t *qi;
+
+               if (dispatch_head == NULL)
+                       pthread_cond_wait (&dispatch_cond, &dispatch_lock);
+
+               if (dispatch_head == NULL)
+                       continue;
+
+               qi = dispatch_head;
+
+               if (dispatch_head == dispatch_tail)
+               {
+                       dispatch_head = NULL;
+                       dispatch_tail = NULL;
+               }
+               else
+               {
+                       dispatch_head = qi->next;
+               }
+
+               pthread_mutex_unlock (&dispatch_lock);
+
+               plugin_dispatch_values (qi->vl);
+               sfree (qi->vl->values);
+               sfree (qi->vl);
+               sfree (qi);
+
+               pthread_mutex_lock (&dispatch_lock);
+       } /* while */
 
-       read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
-       if (read_threads == NULL)
+       pthread_mutex_unlock (&dispatch_lock);
+
+       return ((void *) 0);
+} /* void *plugin_dispatch_thread */
+
+static int plugin_start_threads (size_t num,
+               pthread_t **tlist, size_t *tlist_num,
+               void *(*thread_main) (void *))
+{
+       size_t i;
+       pthread_t *new_tlist;
+       size_t new_tlist_num;
+
+       if (*tlist != NULL)
+               return (-1);
+
+       new_tlist = (pthread_t *) calloc (num, sizeof (*new_tlist));
+       if (new_tlist == NULL)
        {
-               ERROR ("plugin: start_threads: calloc failed.");
-               return;
+               ERROR ("plugin: plugin_start_threads: calloc failed.");
+               return (-1);
        }
 
-       read_threads_num = 0;
+       new_tlist_num = 0;
        for (i = 0; i < num; i++)
        {
-               if (pthread_create (read_threads + read_threads_num, NULL,
-                                       plugin_read_thread, NULL) == 0)
-               {
-                       read_threads_num++;
-               }
-               else
+               int status;
+
+               status = pthread_create (new_tlist + new_tlist_num,
+                               /* attr = */ NULL, thread_main,
+                               /* arg = */ (void *) 0);
+               if (status != 0)
                {
-                       ERROR ("plugin: start_threads: pthread_create failed.");
-                       return;
+                       ERROR ("plugin: plugin_start_threads: "
+                                       "pthread_create failed.");
+                       continue;
                }
+
+               new_tlist_num++;
        } /* for (i) */
-} /* void start_threads */
 
-static void stop_threads (void)
+       if (new_tlist_num < 1)
+       {
+               ERROR ("plugin: plugin_start_threads: "
+                               "Creating threads failed.");
+               sfree (new_tlist);
+               return (-1);
+       }
+
+       *tlist     = new_tlist;
+       *tlist_num = new_tlist_num;
+
+       return (0);
+} /* int plugin_start_threads */
+
+static int plugin_stop_threads (int *loop,
+               pthread_mutex_t *lock, pthread_cond_t *cond,
+               pthread_t **ret_tlist, size_t *ret_tlist_len)
 {
-       int i;
+       pthread_t *tlist;
+       size_t tlist_len;
+       size_t i;
 
-       pthread_mutex_lock (&read_lock);
-       read_loop = 0;
-       DEBUG ("plugin: stop_threads: Signalling `read_cond'");
-       pthread_cond_broadcast (&read_cond);
-       pthread_mutex_unlock (&read_lock);
+       tlist     = *ret_tlist;
+       tlist_len = *ret_tlist_len;
+
+       if (tlist == NULL)
+               return (0);
+
+       DEBUG ("plugin_stop_threads: Stopping %zu threads.", tlist_len);
+
+       pthread_mutex_lock (lock);
+
+       *loop = 0;
 
-       for (i = 0; i < read_threads_num; i++)
+       pthread_cond_broadcast (cond);
+       pthread_mutex_unlock (lock);
+
+       for (i = 0; i < tlist_len; i++)
        {
-               if (pthread_join (read_threads[i], NULL) != 0)
+               int status;
+
+               status = pthread_join (tlist[i], NULL);
+               if (status != 0)
                {
-                       ERROR ("pluginstop_threads: pthread_join failed.");
+                       ERROR ("plugin_stop_threads: pthread_join failed.");
                }
-               read_threads[i] = (pthread_t) 0;
        }
-       sfree (read_threads);
-       read_threads_num = 0;
-} /* void stop_threads */
+
+       sfree (tlist);
+
+       *ret_tlist = NULL;
+       *ret_tlist_len = 0;
+
+       return (0);
+} /* int plugin_stop_threads */
 
 /*
  * Public functions
@@ -322,6 +422,7 @@ int plugin_load (const char *type)
        int   ret;
        struct stat    statbuf;
        struct dirent *de;
+       int status;
 
        DEBUG ("type = %s", type);
 
@@ -330,8 +431,8 @@ int plugin_load (const char *type)
 
        /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
         * type when matching the filename */
-       if (ssnprintf (typename, sizeof (typename),
-                       "%s.so", type) >= sizeof (typename))
+       status = ssnprintf (typename, sizeof (typename), "%s.so", type);
+       if ((status < 0) || ((size_t) status >= sizeof (typename)))
        {
                WARNING ("snprintf: truncated: `%s.so'", type);
                return (-1);
@@ -351,8 +452,9 @@ int plugin_load (const char *type)
                if (strncasecmp (de->d_name, typename, typename_len))
                        continue;
 
-               if (ssnprintf (filename, sizeof (filename),
-                               "%s/%s", dir, de->d_name) >= sizeof (filename))
+               status = ssnprintf (filename, sizeof (filename),
+                               "%s/%s", dir, de->d_name);
+               if ((status < 0) || ((size_t) status >= sizeof (filename)))
                {
                        WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name);
                        continue;
@@ -443,12 +545,6 @@ int plugin_register_write (const char *name,
        return (register_callback (&list_write, name, (void *) callback));
 } /* int plugin_register_write */
 
-int plugin_register_filter (const char *name,
-               int (*callback) (const data_set_t *ds, value_list_t *vl))
-{
-       return (register_callback (&list_filter, name, (void *) callback));
-} /* int plugin_register_filter */
-
 int plugin_register_flush (const char *name,
                int (*callback) (const int timeout, const char *identifier))
 {
@@ -549,11 +645,6 @@ int plugin_unregister_write (const char *name)
        return (plugin_unregister (list_write, name));
 }
 
-int plugin_unregister_filter (const char *name)
-{
-       return (plugin_unregister (list_filter, name));
-}
-
 int plugin_unregister_flush (const char *name)
 {
        return (plugin_unregister (list_flush, name));
@@ -592,6 +683,7 @@ int plugin_unregister_notification (const char *name)
 
 void plugin_init_all (void)
 {
+       const char *chain_name;
        int (*callback) (void);
        llentry_t *le;
        int status;
@@ -599,6 +691,13 @@ void plugin_init_all (void)
        /* Init the value cache */
        uc_init ();
 
+       chain_name = global_option_get ("PreCacheChain");
+       pre_cache_chain = fc_chain_get_by_name (chain_name);
+
+       chain_name = global_option_get ("PostCacheChain");
+       post_cache_chain = fc_chain_get_by_name (chain_name);
+
+
        if ((list_init == NULL) && (list_read == NULL))
                return;
 
@@ -634,8 +733,19 @@ void plugin_init_all (void)
                int num;
                rt = global_option_get ("ReadThreads");
                num = atoi (rt);
-               start_threads ((num > 0) ? num : 5);
+               if (num != -1)
+               {
+                       read_loop = 1;
+                       plugin_start_threads ((num > 0) ? ((size_t ) num) : 5,
+                                       &read_threads, &read_threads_num,
+                                       plugin_read_thread);
+               }
        }
+
+       dispatch_loop = 1;
+       plugin_start_threads (/* num = */ 1, /* FIXME: Make this number configurable */
+                       &dispatch_threads, &dispatch_threads_num,
+                       plugin_dispatch_thread);
 } /* void plugin_init_all */
 
 void plugin_read_all (void)
@@ -677,6 +787,104 @@ void plugin_read_all (void)
        pthread_mutex_unlock (&read_lock);
 } /* void plugin_read_all */
 
+/* Read function called when the `-T' command line argument is given. */
+int plugin_read_all_once (void)
+{
+       llentry_t   *le;
+       read_func_t *rf;
+       int status;
+       int return_status = 0;
+
+       if (list_read == NULL)
+       {
+               NOTICE ("No read-functions are registered.");
+               return (0);
+       }
+
+       for (le = llist_head (list_read);
+            le != NULL;
+            le = le->next)
+       {
+               rf = (read_func_t *) le->value;
+               status = rf->callback ();
+               if (status != 0)
+               {
+                       NOTICE ("read-function of plugin `%s' failed.",
+                               le->key);
+                       return_status = -1;
+               }
+       }
+
+       return (return_status);
+} /* int plugin_read_all_once */
+
+int plugin_write (const char *plugin, /* {{{ */
+               const data_set_t *ds, const value_list_t *vl)
+{
+  int (*callback) (const data_set_t *ds, const value_list_t *vl);
+  llentry_t *le;
+  int status;
+
+  if (vl == NULL)
+    return (EINVAL);
+
+  if (list_write == NULL)
+    return (ENOENT);
+
+  if (ds == NULL)
+  {
+    ds = plugin_get_ds (vl->type);
+    if (ds == NULL)
+    {
+      ERROR ("plugin_write: Unable to lookup type `%s'.", vl->type);
+      return (ENOENT);
+    }
+  }
+
+  if (plugin == NULL)
+  {
+    int success = 0;
+    int failure = 0;
+
+    le = llist_head (list_write);
+    while (le != NULL)
+    {
+      callback = le->value;
+      status = (*callback) (ds, vl);
+      if (status != 0)
+        failure++;
+      else
+        success++;
+
+      le = le->next;
+    }
+
+    if ((success == 0) && (failure != 0))
+      status = -1;
+    else
+      status = 0;
+  }
+  else /* plugin != NULL */
+  {
+    le = llist_head (list_write);
+    while (le != NULL)
+    {
+      if (strcasecmp (plugin, le->key) == 0)
+        break;
+
+      le = le->next;
+    }
+
+    if (le == NULL)
+      return (ENOENT);
+
+    callback = le->value;
+    status = (*callback) (ds, vl);
+  }
+
+  return (status);
+} /* }}} int plugin_write */
+
 int plugin_flush (const char *plugin, int timeout, const char *identifier)
 {
   int (*callback) (int timeout, const char *identifier);
@@ -708,7 +916,11 @@ void plugin_shutdown_all (void)
        int (*callback) (void);
        llentry_t *le;
 
-       stop_threads ();
+       plugin_stop_threads (&read_loop, &read_lock, &read_cond,
+                       &read_threads, &read_threads_num);
+
+       plugin_stop_threads (&dispatch_loop, &dispatch_lock, &dispatch_cond,
+                       &dispatch_threads, &dispatch_threads_num);
 
        if (list_shutdown == NULL)
                return;
@@ -730,12 +942,10 @@ void plugin_shutdown_all (void)
 
 int plugin_dispatch_values (value_list_t *vl)
 {
+       int status;
        static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
 
        data_set_t *ds;
-       llentry_t *le;
-
-       int filter = 0;
 
        if ((vl == NULL) || (*vl->type == '\0')) {
                ERROR ("plugin_dispatch_values: Invalid value list.");
@@ -762,6 +972,9 @@ int plugin_dispatch_values (value_list_t *vl)
                return (-1);
        }
 
+       if (vl->time == 0)
+               vl->time = time (NULL);
+
        DEBUG ("plugin_dispatch_values: time = %u; interval = %i; "
                        "host = %s; "
                        "plugin = %s; plugin_instance = %s; "
@@ -798,42 +1011,109 @@ int plugin_dispatch_values (value_list_t *vl)
        escape_slashes (vl->type, sizeof (vl->type));
        escape_slashes (vl->type_instance, sizeof (vl->type_instance));
 
-       le = llist_head (list_filter);
-       while (le != NULL)
+       if (pre_cache_chain != NULL)
        {
-               int (*filter_callback) (const data_set_t *, value_list_t *) =
-                               (int (*) (const data_set_t *, value_list_t *)) le->value;
-
-               filter |= (*filter_callback) (ds, vl);
-
-               if (filter == FILTER_IGNORE)
-                       return (-1);
-
-               le = le->next;
+               status = fc_process_chain (ds, vl, pre_cache_chain);
+               if (status < 0)
+               {
+                       WARNING ("plugin_dispatch_values: Running the "
+                                       "pre-cache chain failed with "
+                                       "status %i (%#x).",
+                                       status, status);
+               }
+               else if (status == FC_TARGET_STOP)
+                       return (0);
        }
 
        /* Update the value cache */
        uc_update (ds, vl);
 
-       if ((filter & FILTER_NOTHRESHOLD_CHECK) == 0)
-               ut_check_threshold (ds, vl);
+       if (post_cache_chain != NULL)
+       {
+               status = fc_process_chain (ds, vl, post_cache_chain);
+               if (status < 0)
+               {
+                       WARNING ("plugin_dispatch_values: Running the "
+                                       "post-cache chain failed with "
+                                       "status %i (%#x).",
+                                       status, status);
+               }
+       }
+       else
+               fc_default_action (ds, vl);
 
-       if (filter & FILTER_NOWRITE)
-               return (0);
+       return (0);
+} /* int plugin_dispatch_values */
 
-       le = llist_head (list_write);
-       while (le != NULL)
+int plugin_dispatch_values_async (const value_list_t *vl)
+{
+       dispatch_queue_t *qi;
+       int i;
+
+       if (vl == NULL)
+               return (-EINVAL);
+
+       if (dispatch_threads_num < 1)
        {
-               int (*write_callback) (const data_set_t *, const value_list_t *) =
-                               (int (*) (const data_set_t *, const value_list_t *)) le->value;
+               ERROR ("plugin_dispatch_values_async: "
+                               "No dispatch threads have been started!");
+#ifdef ENOTCONN
+               return (-ENOTCONN);
+#else
+               return (-1);
+#endif
+       }
+
+       qi = (dispatch_queue_t *) malloc (sizeof (*qi));
+       if (qi == NULL)
+       {
+               ERROR ("plugin_dispatch_values_async: malloc failed.");
+               return (-ENOMEM);
+       }
+       memset (qi, 0, sizeof (*qi));
+       qi->next = NULL;
 
-               (*write_callback) (ds, vl);
+       qi->vl = (value_list_t *) malloc (sizeof (value_list_t));
+       if (qi->vl == NULL)
+       {
+               ERROR ("plugin_dispatch_values_async: malloc failed.");
+               sfree (qi);
+               return (-ENOMEM);
+       }
+       memcpy (qi->vl, vl, sizeof (value_list_t));
+       qi->vl->values = NULL;
 
-               le = le->next;
+       qi->vl->values = (value_t *) calloc (qi->vl->values_len,
+                       sizeof (value_t));
+       if (qi->vl->values == NULL)
+       {
+               ERROR ("plugin_dispatch_values_async: malloc failed.");
+               sfree (qi->vl);
+               sfree (qi);
+               return (-ENOMEM);
        }
 
+       for (i = 0; i < vl->values_len; i++)
+               qi->vl->values[i] = vl->values[i];
+
+       pthread_mutex_lock (&dispatch_lock);
+
+       if (dispatch_tail == NULL)
+       {
+               dispatch_head = qi;
+               dispatch_tail = qi;
+       }
+       else
+       {
+               dispatch_tail->next = qi;
+               dispatch_tail = qi;
+       }
+
+       pthread_cond_signal (&dispatch_cond);
+       pthread_mutex_unlock (&dispatch_lock);
+
        return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_async */
 
 int plugin_dispatch_notification (const notification_t *notif)
 {
@@ -935,8 +1215,8 @@ static int plugin_notification_meta_add (notification_t *n,
   {
     case NM_TYPE_STRING:
     {
-      meta->value_string = strdup ((const char *) value);
-      if (meta->value_string == NULL)
+      meta->nm_value.nm_string = strdup ((const char *) value);
+      if (meta->nm_value.nm_string == NULL)
       {
         ERROR ("plugin_notification_meta_add: strdup failed.");
         sfree (meta);
@@ -946,22 +1226,22 @@ static int plugin_notification_meta_add (notification_t *n,
     }
     case NM_TYPE_SIGNED_INT:
     {
-      meta->value_signed_int = *((int64_t *) value);
+      meta->nm_value.nm_signed_int = *((int64_t *) value);
       break;
     }
     case NM_TYPE_UNSIGNED_INT:
     {
-      meta->value_unsigned_int = *((uint64_t *) value);
+      meta->nm_value.nm_unsigned_int = *((uint64_t *) value);
       break;
     }
     case NM_TYPE_DOUBLE:
     {
-      meta->value_double = *((double *) value);
+      meta->nm_value.nm_double = *((double *) value);
       break;
     }
     case NM_TYPE_BOOLEAN:
     {
-      meta->value_boolean = *((bool *) value);
+      meta->nm_value.nm_boolean = *((bool *) value);
       break;
     }
     default:
@@ -1034,25 +1314,25 @@ int plugin_notification_meta_copy (notification_t *dst,
   {
     if (meta->type == NM_TYPE_STRING)
       plugin_notification_meta_add_string (dst, meta->name,
-          meta->value_string);
+          meta->nm_value.nm_string);
     else if (meta->type == NM_TYPE_SIGNED_INT)
       plugin_notification_meta_add_signed_int (dst, meta->name,
-          meta->value_signed_int);
+          meta->nm_value.nm_signed_int);
     else if (meta->type == NM_TYPE_UNSIGNED_INT)
       plugin_notification_meta_add_unsigned_int (dst, meta->name,
-          meta->value_unsigned_int);
+          meta->nm_value.nm_unsigned_int);
     else if (meta->type == NM_TYPE_DOUBLE)
       plugin_notification_meta_add_double (dst, meta->name,
-          meta->value_double);
+          meta->nm_value.nm_double);
     else if (meta->type == NM_TYPE_BOOLEAN)
       plugin_notification_meta_add_boolean (dst, meta->name,
-          meta->value_boolean);
+          meta->nm_value.nm_boolean);
   }
 
   return (0);
 } /* int plugin_notification_meta_copy */
 
-int plugin_notification_meta_free (notification_t *n)
+int plugin_notification_meta_free (notification_meta_t *n)
 {
   notification_meta_t *this;
   notification_meta_t *next;
@@ -1063,16 +1343,15 @@ int plugin_notification_meta_free (notification_t *n)
     return (-1);
   }
 
-  this = n->meta;
-  n->meta = NULL;
+  this = n;
   while (this != NULL)
   {
     next = this->next;
 
     if (this->type == NM_TYPE_STRING)
     {
-      free ((char *)this->value_string);
-      this->value_string = NULL;
+      free ((char *)this->nm_value.nm_string);
+      this->nm_value.nm_string = NULL;
     }
     sfree (this);