Merge branch 'collectd-4.6' into collectd-4.7
[collectd.git] / src / rrdtool.c
index 698f89f..243a8c8 100644 (file)
@@ -111,7 +111,8 @@ static rrd_queue_t    *queue_head = NULL;
 static rrd_queue_t    *queue_tail = NULL;
 static rrd_queue_t    *flushq_head = NULL;
 static rrd_queue_t    *flushq_tail = NULL;
-static pthread_t       queue_thread = 0;
+static pthread_t       queue_thread;
+static int             queue_thread_running = 1;
 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
 
@@ -279,14 +280,17 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data)
                rrd_cache_t *cache_entry;
                char **values;
                int    values_num;
+               int    status;
                int    i;
 
+               values = NULL;
+               values_num = 0;
+
                 pthread_mutex_lock (&queue_lock);
                 /* Wait for values to arrive */
                 while (true)
                 {
                   struct timespec ts_wait;
-                  int status;
 
                   while ((flushq_head == NULL) && (queue_head == NULL)
                       && (do_shutdown == 0))
@@ -361,17 +365,28 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data)
                 * we make a copy of it's values */
                pthread_mutex_lock (&cache_lock);
 
-               c_avl_get (cache, queue_entry->filename, (void *) &cache_entry);
+               status = c_avl_get (cache, queue_entry->filename,
+                               (void *) &cache_entry);
 
-               values = cache_entry->values;
-               values_num = cache_entry->values_num;
+               if (status == 0)
+               {
+                       values = cache_entry->values;
+                       values_num = cache_entry->values_num;
 
-               cache_entry->values = NULL;
-               cache_entry->values_num = 0;
-               cache_entry->flags = FLAG_NONE;
+                       cache_entry->values = NULL;
+                       cache_entry->values_num = 0;
+                       cache_entry->flags = FLAG_NONE;
+               }
 
                pthread_mutex_unlock (&cache_lock);
 
+               if (status != 0)
+               {
+                       sfree (queue_entry->filename);
+                       sfree (queue_entry);
+                       continue;
+               }
+
                /* Update `tv_next_update' */
                if (write_rate > 0.0) 
                 {
@@ -389,8 +404,9 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data)
                /* Write the values to the RRD-file */
                srrd_update (queue_entry->filename, NULL,
                                values_num, (const char **)values);
-               DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
-                               values_num, queue_entry->filename);
+               DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s",
+                               values_num, (values_num == 1) ? "" : "s",
+                               queue_entry->filename);
 
                for (i = 0; i < values_num; i++)
                {
@@ -585,7 +601,7 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier)
   status = c_avl_get (cache, key, (void *) &rc);
   if (status != 0)
   {
-    WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
+    INFO ("rrdtool plugin: rrd_cache_flush_identifier: "
         "c_avl_get (%s) failed. Does that file really exist?",
         key);
     return (status);
@@ -625,6 +641,15 @@ static int rrd_cache_insert (const char *filename,
 
        pthread_mutex_lock (&cache_lock);
 
+       /* This shouldn't happen, but it did happen at least once, so we'll be
+        * careful. */
+       if (cache == NULL)
+       {
+               pthread_mutex_unlock (&cache_lock);
+               WARNING ("rrdtool plugin: cache == NULL.");
+               return (-1);
+       }
+
        c_avl_get (cache, filename, (void *) &rc);
 
        if (rc == NULL)
@@ -795,7 +820,8 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl,
        return (status);
 } /* int rrd_write */
 
-static int rrd_flush (int timeout, const char *identifier)
+static int rrd_flush (int timeout, const char *identifier,
+               user_data_t __attribute__((unused)) *user_data)
 {
        pthread_mutex_lock (&cache_lock);
 
@@ -972,21 +998,40 @@ static int rrd_shutdown (void)
        pthread_cond_signal (&queue_cond);
        pthread_mutex_unlock (&queue_lock);
 
+       if ((queue_thread_running != 0)
+                       && ((queue_head != NULL) || (flushq_head != NULL)))
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread. "
+                               "This may take a while.");
+       }
+       else if (queue_thread_running != 0)
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread.");
+       }
+
        /* Wait for all the values to be written to disk before returning. */
-       if (queue_thread != 0)
+       if (queue_thread_running != 0)
        {
                pthread_join (queue_thread, NULL);
-               queue_thread = 0;
+               memset (&queue_thread, 0, sizeof (queue_thread));
+               queue_thread_running = 0;
                DEBUG ("rrdtool plugin: queue_thread exited.");
        }
 
+       /* TODO: Maybe it'd be a good idea to free the cache here.. */
+
        return (0);
 } /* int rrd_shutdown */
 
 static int rrd_init (void)
 {
+       static int init_once = 0;
        int status;
 
+       if (init_once != 0)
+               return (0);
+       init_once = 1;
+
        if (rrdcreate_config.stepsize < 0)
                rrdcreate_config.stepsize = 0;
        if (rrdcreate_config.heartbeat <= 0)
@@ -1024,12 +1069,14 @@ static int rrd_init (void)
 
        pthread_mutex_unlock (&cache_lock);
 
-       status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
+       status = pthread_create (&queue_thread, /* attr = */ NULL,
+                       rrd_queue_thread, /* args = */ NULL);
        if (status != 0)
        {
                ERROR ("rrdtool plugin: Cannot create queue-thread.");
                return (-1);
        }
+       queue_thread_running = 1;
 
        DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
                        " heartbeat = %i; rrarows = %i; xff = %lf;",
@@ -1048,6 +1095,6 @@ void module_register (void)
                        config_keys, config_keys_num);
        plugin_register_init ("rrdtool", rrd_init);
        plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL);
-       plugin_register_flush ("rrdtool", rrd_flush);
+       plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL);
        plugin_register_shutdown ("rrdtool", rrd_shutdown);
 }