src/plugin.c: Improve error messages in plugin_load().
[collectd.git] / src / plugin.c
index bbede05..478ec52 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/plugin.c
- * Copyright (C) 2005-2011  Florian octo Forster
+ * Copyright (C) 2005-2013  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -69,6 +69,14 @@ struct read_func_s
 };
 typedef struct read_func_s read_func_t;
 
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+       value_list_t *vl;
+       write_queue_t *next;
+};
+
 /*
  * Private variables
  */
@@ -95,12 +103,22 @@ static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
 static int             read_threads_num = 0;
 
+static write_queue_t  *write_queue_head;
+static write_queue_t  *write_queue_tail;
+static _Bool           write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t      *write_threads = NULL;
+static size_t          write_threads_num = 0;
+
 static pthread_key_t   plugin_ctx_key;
 static _Bool           plugin_ctx_key_initialized = 0;
 
 /*
  * Static functions
  */
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
 static const char *plugin_get_dir (void)
 {
        if (plugindir == NULL)
@@ -573,6 +591,210 @@ static void stop_read_threads (void)
        read_threads_num = 0;
 } /* void stop_read_threads */
 
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+       if (vl == NULL)
+               return;
+
+       meta_data_destroy (vl->meta);
+       sfree (vl->values);
+       sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+       value_list_t *vl;
+
+       if (vl_orig == NULL)
+               return (NULL);
+
+       vl = malloc (sizeof (*vl));
+       if (vl == NULL)
+               return (NULL);
+       memcpy (vl, vl_orig, sizeof (*vl));
+
+       vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+       if (vl->values == NULL)
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+       memcpy (vl->values, vl_orig->values,
+                       vl_orig->values_len * sizeof (*vl->values));
+
+       vl->meta = meta_data_clone (vl->meta);
+       if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+
+       return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+       write_queue_t *q;
+
+       q = malloc (sizeof (*q));
+       if (q == NULL)
+               return (ENOMEM);
+       q->next = NULL;
+
+       q->vl = plugin_value_list_clone (vl);
+       if (q->vl == NULL)
+       {
+               sfree (q);
+               return (ENOMEM);
+       }
+
+       pthread_mutex_lock (&write_lock);
+
+       if (write_queue_tail == NULL)
+       {
+               write_queue_head = q;
+               write_queue_tail = q;
+       }
+       else
+       {
+               write_queue_tail->next = q;
+               write_queue_tail = q;
+       }
+
+       pthread_cond_signal (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+       write_queue_t *q;
+       value_list_t *vl;
+
+       pthread_mutex_lock (&write_lock);
+
+       while (write_loop && (write_queue_head == NULL))
+               pthread_cond_wait (&write_cond, &write_lock);
+
+       if (write_queue_head == NULL)
+       {
+               pthread_mutex_unlock (&write_lock);
+               return (NULL);
+       }
+
+       q = write_queue_head;
+       write_queue_head = q->next;
+       if (write_queue_head == NULL)
+               write_queue_tail = NULL;
+
+       pthread_mutex_unlock (&write_lock);
+
+       vl = q->vl;
+       sfree (q);
+       return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+       while (write_loop)
+       {
+               value_list_t *vl = plugin_write_dequeue ();
+               if (vl == NULL)
+                       continue;
+
+               plugin_dispatch_values_internal (vl);
+
+               plugin_value_list_free (vl);
+       }
+
+       pthread_exit (NULL);
+       return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+       size_t i;
+
+       if (write_threads != NULL)
+               return;
+
+       write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+       if (write_threads == NULL)
+       {
+               ERROR ("plugin: start_write_threads: calloc failed.");
+               return;
+       }
+
+       write_threads_num = 0;
+       for (i = 0; i < num; i++)
+       {
+               int status;
+
+               status = pthread_create (write_threads + write_threads_num,
+                               /* attr = */ NULL,
+                               plugin_write_thread,
+                               /* arg = */ NULL);
+               if (status != 0)
+               {
+                       char errbuf[1024];
+                       ERROR ("plugin: start_write_threads: pthread_create failed "
+                                       "with status %i (%s).", status,
+                                       sstrerror (status, errbuf, sizeof (errbuf)));
+                       return;
+               }
+
+               write_threads_num++;
+       } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+       write_queue_t *q;
+       int i;
+
+       if (write_threads == NULL)
+               return;
+
+       INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+       pthread_mutex_lock (&write_lock);
+       write_loop = 0;
+       DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+       pthread_cond_broadcast (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       for (i = 0; i < write_threads_num; i++)
+       {
+               if (pthread_join (write_threads[i], NULL) != 0)
+               {
+                       ERROR ("plugin: stop_write_threads: pthread_join failed.");
+               }
+               write_threads[i] = (pthread_t) 0;
+       }
+       sfree (write_threads);
+       write_threads_num = 0;
+
+       pthread_mutex_lock (&write_lock);
+       i = 0;
+       for (q = write_queue_head; q != NULL; q = q->next)
+       {
+               plugin_value_list_free (q->vl);
+               sfree (q);
+               i++;
+       }
+       write_queue_head = NULL;
+       write_queue_tail = NULL;
+       pthread_mutex_unlock (&write_lock);
+
+       if (i > 0)
+       {
+               WARNING ("plugin: %i value list%s left after shutting down "
+                               "the write threads.",
+                               i, (i == 1) ? " was" : "s were");
+       }
+} /* }}} void stop_write_threads */
+
 /*
  * Public functions
  */
@@ -604,8 +826,6 @@ int plugin_load (const char *type, uint32_t flags)
        struct dirent *de;
        int status;
 
-       DEBUG ("type = %s", type);
-
        dir = plugin_get_dir ();
        ret = 1;
 
@@ -614,7 +834,7 @@ int plugin_load (const char *type, uint32_t flags)
        status = ssnprintf (typename, sizeof (typename), "%s.so", type);
        if ((status < 0) || ((size_t) status >= sizeof (typename)))
        {
-               WARNING ("snprintf: truncated: `%s.so'", type);
+               WARNING ("plugin_load: Filename too long: \"%s.so\"", type);
                return (-1);
        }
        typename_len = strlen (typename);
@@ -622,7 +842,7 @@ int plugin_load (const char *type, uint32_t flags)
        if ((dh = opendir (dir)) == NULL)
        {
                char errbuf[1024];
-               ERROR ("opendir (%s): %s", dir,
+               ERROR ("plugin_load: opendir (%s) failed: %s", dir,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                return (-1);
        }
@@ -636,25 +856,29 @@ int plugin_load (const char *type, uint32_t flags)
                                "%s/%s", dir, de->d_name);
                if ((status < 0) || ((size_t) status >= sizeof (filename)))
                {
-                       WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name);
+                       WARNING ("plugin_load: Filename too long: \"%s/%s\"",
+                                       dir, de->d_name);
                        continue;
                }
 
                if (lstat (filename, &statbuf) == -1)
                {
                        char errbuf[1024];
-                       WARNING ("stat %s: %s", filename,
+                       WARNING ("plugin_load: stat (\"%s\") failed: %s",
+                                       filename,
                                        sstrerror (errno, errbuf, sizeof (errbuf)));
                        continue;
                }
                else if (!S_ISREG (statbuf.st_mode))
                {
                        /* don't follow symlinks */
-                       WARNING ("stat %s: not a regular file", filename);
+                       WARNING ("plugin_load: %s is not a regular file.",
+                                       filename);
                        continue;
                }
 
-               if (plugin_load_file (filename, flags) == 0)
+               status = plugin_load_file (filename, flags);
+               if (status == 0)
                {
                        /* success */
                        ret = 0;
@@ -662,14 +886,16 @@ int plugin_load (const char *type, uint32_t flags)
                }
                else
                {
-                       fprintf (stderr, "Unable to load plugin %s.\n", type);
+                       ERROR ("plugin_load: Load plugin \"%s\" failed with "
+                                       "status %i.", type, status);
                }
        }
 
        closedir (dh);
 
-       if (filename[0] == '\0')
-               fprintf (stderr, "Could not find plugin %s.\n", type);
+       if (filename[0] == 0)
+               ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
+                               type, dir);
 
        return (ret);
 }
@@ -1166,6 +1392,15 @@ void plugin_init_all (void)
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
 
+       {
+               char const *tmp = global_option_get ("WriteThreads");
+               int num = atoi (tmp);
+
+               if (num < 1)
+                       num = 5;
+
+               start_write_threads ((size_t) num);
+       }
 
        if ((list_init == NULL) && (read_heap == NULL))
                return;
@@ -1435,6 +1670,8 @@ void plugin_shutdown_all (void)
                plugin_set_ctx (old_ctx);
        }
 
+       stop_write_threads ();
+
        /* Write plugins which use the `user_data' pointer usually need the
         * same data available to the flush callback. If this is the case, set
         * the free_function to NULL when registering the flush callback and to
@@ -1490,7 +1727,7 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
   return (0);
 } /* int }}} plugin_dispatch_missing */
 
-int plugin_dispatch_values (value_list_t *vl)
+static int plugin_dispatch_values_internal (value_list_t *vl)
 {
        int status;
        static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
@@ -1684,53 +1921,24 @@ int plugin_dispatch_values (value_list_t *vl)
        }
 
        return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
 
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
 {
-  value_list_t vl_copy;
-  int status;
-
-  if (vl == NULL)
-    return EINVAL;
-
-  memcpy (&vl_copy, vl, sizeof (vl_copy));
-
-  /* Write callbacks must not change the values and meta pointers, so we can
-   * savely skip copying those and make this more efficient. */
-  if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
-    return (plugin_dispatch_values (&vl_copy));
-
-  /* Set pointers to NULL, just to be on the save side. */
-  vl_copy.values = NULL;
-  vl_copy.meta = NULL;
-
-  vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
-  if (vl_copy.values == NULL)
-  {
-    ERROR ("plugin_dispatch_values_secure: malloc failed.");
-    return (ENOMEM);
-  }
-  memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
-  if (vl->meta != NULL)
-  {
-    vl_copy.meta = meta_data_clone (vl->meta);
-    if (vl_copy.meta == NULL)
-    {
-      ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
-      free (vl_copy.values);
-      return (ENOMEM);
-    }
-  } /* if (vl->meta) */
-
-  status = plugin_dispatch_values (&vl_copy);
+       int status;
 
-  meta_data_destroy (vl_copy.meta);
-  free (vl_copy.values);
+       status = plugin_write_enqueue (vl);
+       if (status != 0)
+       {
+               char errbuf[1024];
+               ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+                               "with status %i (%s).", status,
+                               sstrerror (status, errbuf, sizeof (errbuf)));
+               return (status);
+       }
 
-  return (status);
-} /* int plugin_dispatch_values_secure */
+       return (0);
+}
 
 int plugin_dispatch_notification (const notification_t *notif)
 {