e7c48c0e7ee7baf1b2fbed4a7dc668e50acfcb70
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2008  Florian octo Forster
4  * Copyright (C) 2008-2008  Sebastian Harl
5  * Copyright (C) 2009       Mariusz Gronczewski
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at verplant.org>
22  *   Sebastian Harl <sh at tokkee.org>
23  *   Mariusz Gronczewski <xani666 at gmail.com>
24  **/
25
26 #include "collectd.h"
27 #include "plugin.h"
28 #include "common.h"
29 #include "utils_avltree.h"
30 #include "utils_rrdcreate.h"
31
32 #include <rrd.h>
33
34 #if HAVE_PTHREAD_H
35 # include <pthread.h>
36 #endif
37
38 /*
39  * Private types
40  */
41 struct rrd_cache_s
42 {
43         int      values_num;
44         char   **values;
45         cdtime_t first_value;
46         cdtime_t last_value;
47         int64_t  random_variation;
48         enum
49         {
50                 FLAG_NONE   = 0x00,
51                 FLAG_QUEUED = 0x01,
52                 FLAG_FLUSHQ = 0x02
53         } flags;
54 };
55 typedef struct rrd_cache_s rrd_cache_t;
56
57 enum rrd_queue_dir_e
58 {
59   QUEUE_INSERT_FRONT,
60   QUEUE_INSERT_BACK
61 };
62 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
63
64 struct rrd_queue_s
65 {
66         char *filename;
67         struct rrd_queue_s *next;
68 };
69 typedef struct rrd_queue_s rrd_queue_t;
70
71 /*
72  * Private variables
73  */
74 static const char *config_keys[] =
75 {
76         "CacheTimeout",
77         "CacheFlush",
78         "CreateFilesAsync",
79         "DataDir",
80         "StepSize",
81         "HeartBeat",
82         "RRARows",
83         "RRATimespan",
84         "XFF",
85         "WritesPerSecond",
86         "RandomTimeout"
87 };
88 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
89
90 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
91  * is zero a default, depending on the `interval' member of the value list is
92  * being used. */
93 static char *datadir   = NULL;
94 static double write_rate = 0.0;
95 static rrdcreate_config_t rrdcreate_config =
96 {
97         /* stepsize = */ 0,
98         /* heartbeat = */ 0,
99         /* rrarows = */ 1200,
100         /* xff = */ 0.1,
101
102         /* timespans = */ NULL,
103         /* timespans_num = */ 0,
104
105         /* consolidation_functions = */ NULL,
106         /* consolidation_functions_num = */ 0,
107
108         /* async = */ 0
109 };
110
111 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
112  * ALWAYS lock `cache_lock' first! */
113 static cdtime_t    cache_timeout = 0;
114 static cdtime_t    cache_flush_timeout = 0;
115 static cdtime_t    random_timeout = TIME_T_TO_CDTIME_T (1);
116 static cdtime_t    cache_flush_last;
117 static c_avl_tree_t *cache = NULL;
118 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
119
120 static rrd_queue_t    *queue_head = NULL;
121 static rrd_queue_t    *queue_tail = NULL;
122 static rrd_queue_t    *flushq_head = NULL;
123 static rrd_queue_t    *flushq_tail = NULL;
124 static pthread_t       queue_thread;
125 static int             queue_thread_running = 1;
126 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
127 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
128
129 #if !HAVE_THREADSAFE_LIBRRD
130 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
131 #endif
132
133 static int do_shutdown = 0;
134
135 #if HAVE_THREADSAFE_LIBRRD
136 static int srrd_update (char *filename, char *template,
137                 int argc, const char **argv)
138 {
139         int status;
140
141         optind = 0; /* bug in librrd? */
142         rrd_clear_error ();
143
144         status = rrd_update_r (filename, template, argc, (void *) argv);
145
146         if (status != 0)
147         {
148                 WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
149                                 filename, rrd_get_error ());
150         }
151
152         return (status);
153 } /* int srrd_update */
154 /* #endif HAVE_THREADSAFE_LIBRRD */
155
156 #else /* !HAVE_THREADSAFE_LIBRRD */
157 static int srrd_update (char *filename, char *template,
158                 int argc, const char **argv)
159 {
160         int status;
161
162         int new_argc;
163         char **new_argv;
164
165         assert (template == NULL);
166
167         new_argc = 2 + argc;
168         new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
169         if (new_argv == NULL)
170         {
171                 ERROR ("rrdtool plugin: malloc failed.");
172                 return (-1);
173         }
174
175         new_argv[0] = "update";
176         new_argv[1] = filename;
177
178         memcpy (new_argv + 2, argv, argc * sizeof (char *));
179         new_argv[new_argc] = NULL;
180
181         pthread_mutex_lock (&librrd_lock);
182         optind = 0; /* bug in librrd? */
183         rrd_clear_error ();
184
185         status = rrd_update (new_argc, new_argv);
186         pthread_mutex_unlock (&librrd_lock);
187
188         if (status != 0)
189         {
190                 WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
191                                 filename, rrd_get_error ());
192         }
193
194         sfree (new_argv);
195
196         return (status);
197 } /* int srrd_update */
198 #endif /* !HAVE_THREADSAFE_LIBRRD */
199
200 static int value_list_to_string (char *buffer, int buffer_len,
201                 const data_set_t *ds, const value_list_t *vl)
202 {
203         int offset;
204         int status;
205         time_t tt;
206         int i;
207
208         memset (buffer, '\0', buffer_len);
209
210         tt = CDTIME_T_TO_TIME_T (vl->time);
211         status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) tt);
212         if ((status < 1) || (status >= buffer_len))
213                 return (-1);
214         offset = status;
215
216         for (i = 0; i < ds->ds_num; i++)
217         {
218                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
219                                 && (ds->ds[i].type != DS_TYPE_GAUGE)
220                                 && (ds->ds[i].type != DS_TYPE_DERIVE)
221                                 && (ds->ds[i].type != DS_TYPE_ABSOLUTE))
222                         return (-1);
223
224                 if (ds->ds[i].type == DS_TYPE_COUNTER)
225                         status = ssnprintf (buffer + offset, buffer_len - offset,
226                                         ":%llu", vl->values[i].counter);
227                 else if (ds->ds[i].type == DS_TYPE_GAUGE)
228                         status = ssnprintf (buffer + offset, buffer_len - offset,
229                                         ":%lf", vl->values[i].gauge);
230                 else if (ds->ds[i].type == DS_TYPE_DERIVE)
231                         status = ssnprintf (buffer + offset, buffer_len - offset,
232                                         ":%"PRIi64, vl->values[i].derive);
233                 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
234                         status = ssnprintf (buffer + offset, buffer_len - offset,
235                                         ":%"PRIu64, vl->values[i].absolute);
236
237                 if ((status < 1) || (status >= (buffer_len - offset)))
238                         return (-1);
239
240                 offset += status;
241         } /* for ds->ds_num */
242
243         return (0);
244 } /* int value_list_to_string */
245
246 static int value_list_to_filename (char *buffer, int buffer_len,
247                 const data_set_t __attribute__((unused)) *ds, const value_list_t *vl)
248 {
249         int offset = 0;
250         int status;
251
252         if (datadir != NULL)
253         {
254                 status = ssnprintf (buffer + offset, buffer_len - offset,
255                                 "%s/", datadir);
256                 if ((status < 1) || (status >= buffer_len - offset))
257                         return (-1);
258                 offset += status;
259         }
260
261         status = ssnprintf (buffer + offset, buffer_len - offset,
262                         "%s/", vl->host);
263         if ((status < 1) || (status >= buffer_len - offset))
264                 return (-1);
265         offset += status;
266
267         if (strlen (vl->plugin_instance) > 0)
268                 status = ssnprintf (buffer + offset, buffer_len - offset,
269                                 "%s-%s/", vl->plugin, vl->plugin_instance);
270         else
271                 status = ssnprintf (buffer + offset, buffer_len - offset,
272                                 "%s/", vl->plugin);
273         if ((status < 1) || (status >= buffer_len - offset))
274                 return (-1);
275         offset += status;
276
277         if (strlen (vl->type_instance) > 0)
278                 status = ssnprintf (buffer + offset, buffer_len - offset,
279                                 "%s-%s.rrd", vl->type, vl->type_instance);
280         else
281                 status = ssnprintf (buffer + offset, buffer_len - offset,
282                                 "%s.rrd", vl->type);
283         if ((status < 1) || (status >= buffer_len - offset))
284                 return (-1);
285         offset += status;
286
287         return (0);
288 } /* int value_list_to_filename */
289
290 static void *rrd_queue_thread (void __attribute__((unused)) *data)
291 {
292         struct timeval tv_next_update;
293         struct timeval tv_now;
294
295         gettimeofday (&tv_next_update, /* timezone = */ NULL);
296
297         while (42)
298         {
299                 rrd_queue_t *queue_entry;
300                 rrd_cache_t *cache_entry;
301                 char **values;
302                 int    values_num;
303                 int    status;
304                 int    i;
305
306                 values = NULL;
307                 values_num = 0;
308
309                 pthread_mutex_lock (&queue_lock);
310                 /* Wait for values to arrive */
311                 while (42)
312                 {
313                   struct timespec ts_wait;
314
315                   while ((flushq_head == NULL) && (queue_head == NULL)
316                       && (do_shutdown == 0))
317                     pthread_cond_wait (&queue_cond, &queue_lock);
318
319                   if ((flushq_head == NULL) && (queue_head == NULL))
320                     break;
321
322                   /* Don't delay if there's something to flush */
323                   if (flushq_head != NULL)
324                     break;
325
326                   /* Don't delay if we're shutting down */
327                   if (do_shutdown != 0)
328                     break;
329
330                   /* Don't delay if no delay was configured. */
331                   if (write_rate <= 0.0)
332                     break;
333
334                   gettimeofday (&tv_now, /* timezone = */ NULL);
335                   status = timeval_cmp (tv_next_update, tv_now, NULL);
336                   /* We're good to go */
337                   if (status <= 0)
338                     break;
339
340                   /* We're supposed to wait a bit with this update, so we'll
341                    * wait for the next addition to the queue or to the end of
342                    * the wait period - whichever comes first. */
343                   ts_wait.tv_sec = tv_next_update.tv_sec;
344                   ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
345
346                   status = pthread_cond_timedwait (&queue_cond, &queue_lock,
347                       &ts_wait);
348                   if (status == ETIMEDOUT)
349                     break;
350                 } /* while (42) */
351
352                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
353                  * the same time, ALWAYS lock `cache_lock' first! */
354
355                 /* We're in the shutdown phase */
356                 if ((flushq_head == NULL) && (queue_head == NULL))
357                 {
358                   pthread_mutex_unlock (&queue_lock);
359                   break;
360                 }
361
362                 if (flushq_head != NULL)
363                 {
364                   /* Dequeue the first flush entry */
365                   queue_entry = flushq_head;
366                   if (flushq_head == flushq_tail)
367                     flushq_head = flushq_tail = NULL;
368                   else
369                     flushq_head = flushq_head->next;
370                 }
371                 else /* if (queue_head != NULL) */
372                 {
373                   /* Dequeue the first regular entry */
374                   queue_entry = queue_head;
375                   if (queue_head == queue_tail)
376                     queue_head = queue_tail = NULL;
377                   else
378                     queue_head = queue_head->next;
379                 }
380
381                 /* Unlock the queue again */
382                 pthread_mutex_unlock (&queue_lock);
383
384                 /* We now need the cache lock so the entry isn't updated while
385                  * we make a copy of it's values */
386                 pthread_mutex_lock (&cache_lock);
387
388                 status = c_avl_get (cache, queue_entry->filename,
389                                 (void *) &cache_entry);
390
391                 if (status == 0)
392                 {
393                         values = cache_entry->values;
394                         values_num = cache_entry->values_num;
395
396                         cache_entry->values = NULL;
397                         cache_entry->values_num = 0;
398                         cache_entry->flags = FLAG_NONE;
399                 }
400
401                 pthread_mutex_unlock (&cache_lock);
402
403                 if (status != 0)
404                 {
405                         sfree (queue_entry->filename);
406                         sfree (queue_entry);
407                         continue;
408                 }
409
410                 /* Update `tv_next_update' */
411                 if (write_rate > 0.0) 
412                 {
413                   gettimeofday (&tv_now, /* timezone = */ NULL);
414                   tv_next_update.tv_sec = tv_now.tv_sec;
415                   tv_next_update.tv_usec = tv_now.tv_usec
416                     + ((suseconds_t) (1000000 * write_rate));
417                   while (tv_next_update.tv_usec > 1000000)
418                   {
419                     tv_next_update.tv_sec++;
420                     tv_next_update.tv_usec -= 1000000;
421                   }
422                 }
423
424                 /* Write the values to the RRD-file */
425                 srrd_update (queue_entry->filename, NULL,
426                                 values_num, (const char **)values);
427                 DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s",
428                                 values_num, (values_num == 1) ? "" : "s",
429                                 queue_entry->filename);
430
431                 for (i = 0; i < values_num; i++)
432                 {
433                         sfree (values[i]);
434                 }
435                 sfree (values);
436                 sfree (queue_entry->filename);
437                 sfree (queue_entry);
438         } /* while (42) */
439
440         pthread_exit ((void *) 0);
441         return ((void *) 0);
442 } /* void *rrd_queue_thread */
443
444 static int rrd_queue_enqueue (const char *filename,
445     rrd_queue_t **head, rrd_queue_t **tail)
446 {
447   rrd_queue_t *queue_entry;
448
449   queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
450   if (queue_entry == NULL)
451     return (-1);
452
453   queue_entry->filename = strdup (filename);
454   if (queue_entry->filename == NULL)
455   {
456     free (queue_entry);
457     return (-1);
458   }
459
460   queue_entry->next = NULL;
461
462   pthread_mutex_lock (&queue_lock);
463
464   if (*tail == NULL)
465     *head = queue_entry;
466   else
467     (*tail)->next = queue_entry;
468   *tail = queue_entry;
469
470   pthread_cond_signal (&queue_cond);
471   pthread_mutex_unlock (&queue_lock);
472
473   return (0);
474 } /* int rrd_queue_enqueue */
475
476 static int rrd_queue_dequeue (const char *filename,
477     rrd_queue_t **head, rrd_queue_t **tail)
478 {
479   rrd_queue_t *this;
480   rrd_queue_t *prev;
481
482   pthread_mutex_lock (&queue_lock);
483
484   prev = NULL;
485   this = *head;
486
487   while (this != NULL)
488   {
489     if (strcmp (this->filename, filename) == 0)
490       break;
491     
492     prev = this;
493     this = this->next;
494   }
495
496   if (this == NULL)
497   {
498     pthread_mutex_unlock (&queue_lock);
499     return (-1);
500   }
501
502   if (prev == NULL)
503     *head = this->next;
504   else
505     prev->next = this->next;
506
507   if (this->next == NULL)
508     *tail = prev;
509
510   pthread_mutex_unlock (&queue_lock);
511
512   sfree (this->filename);
513   sfree (this);
514
515   return (0);
516 } /* int rrd_queue_dequeue */
517
518 /* XXX: You must hold "cache_lock" when calling this function! */
519 static void rrd_cache_flush (cdtime_t timeout)
520 {
521         rrd_cache_t *rc;
522         cdtime_t     now;
523
524         char **keys = NULL;
525         int    keys_num = 0;
526
527         char *key;
528         c_avl_iterator_t *iter;
529         int i;
530
531         DEBUG ("rrdtool plugin: Flushing cache, timeout = %.3f",
532                         CDTIME_T_TO_DOUBLE (timeout));
533
534         now = cdtime ();
535         timeout = TIME_T_TO_CDTIME_T (timeout);
536
537         /* Build a list of entries to be flushed */
538         iter = c_avl_get_iterator (cache);
539         while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
540         {
541                 if (rc->flags != FLAG_NONE)
542                         continue;
543                 /* timeout == 0  =>  flush everything */
544                 else if ((timeout != 0)
545                                 && ((now - rc->first_value) < timeout))
546                         continue;
547                 else if (rc->values_num > 0)
548                 {
549                         int status;
550
551                         status = rrd_queue_enqueue (key, &queue_head,  &queue_tail);
552                         if (status == 0)
553                                 rc->flags = FLAG_QUEUED;
554                 }
555                 else /* ancient and no values -> waste of memory */
556                 {
557                         char **tmp = (char **) realloc ((void *) keys,
558                                         (keys_num + 1) * sizeof (char *));
559                         if (tmp == NULL)
560                         {
561                                 char errbuf[1024];
562                                 ERROR ("rrdtool plugin: "
563                                                 "realloc failed: %s",
564                                                 sstrerror (errno, errbuf,
565                                                         sizeof (errbuf)));
566                                 c_avl_iterator_destroy (iter);
567                                 sfree (keys);
568                                 return;
569                         }
570                         keys = tmp;
571                         keys[keys_num] = key;
572                         keys_num++;
573                 }
574         } /* while (c_avl_iterator_next) */
575         c_avl_iterator_destroy (iter);
576         
577         for (i = 0; i < keys_num; i++)
578         {
579                 if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
580                 {
581                         DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
582                         continue;
583                 }
584
585                 assert (rc->values == NULL);
586                 assert (rc->values_num == 0);
587
588                 sfree (rc);
589                 sfree (key);
590                 keys[i] = NULL;
591         } /* for (i = 0..keys_num) */
592
593         sfree (keys);
594
595         cache_flush_last = now;
596 } /* void rrd_cache_flush */
597
598 static int rrd_cache_flush_identifier (cdtime_t timeout,
599     const char *identifier)
600 {
601   rrd_cache_t *rc;
602   cdtime_t now;
603   int status;
604   char key[2048];
605
606   if (identifier == NULL)
607   {
608     rrd_cache_flush (timeout);
609     return (0);
610   }
611
612   now = cdtime ();
613
614   if (datadir == NULL)
615     snprintf (key, sizeof (key), "%s.rrd",
616         identifier);
617   else
618     snprintf (key, sizeof (key), "%s/%s.rrd",
619         datadir, identifier);
620   key[sizeof (key) - 1] = 0;
621
622   status = c_avl_get (cache, key, (void *) &rc);
623   if (status != 0)
624   {
625     INFO ("rrdtool plugin: rrd_cache_flush_identifier: "
626         "c_avl_get (%s) failed. Does that file really exist?",
627         key);
628     return (status);
629   }
630
631   if (rc->flags == FLAG_FLUSHQ)
632   {
633     status = 0;
634   }
635   else if (rc->flags == FLAG_QUEUED)
636   {
637     rrd_queue_dequeue (key, &queue_head, &queue_tail);
638     status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
639     if (status == 0)
640       rc->flags = FLAG_FLUSHQ;
641   }
642   else if ((now - rc->first_value) < timeout)
643   {
644     status = 0;
645   }
646   else if (rc->values_num > 0)
647   {
648     status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
649     if (status == 0)
650       rc->flags = FLAG_FLUSHQ;
651   }
652
653   return (status);
654 } /* int rrd_cache_flush_identifier */
655
656 static int64_t rrd_get_random_variation (void)
657 {
658   double dbl_timeout;
659   cdtime_t ctm_timeout;
660   double rand_fact;
661   _Bool negative;
662   int64_t ret;
663
664   if (random_timeout <= 0)
665     return (0);
666
667   /* Assure that "cache_timeout + random_variation" is never negative. */
668   if (random_timeout > cache_timeout)
669   {
670           INFO ("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
671                           CDTIME_T_TO_DOUBLE (cache_timeout));
672           random_timeout = cache_timeout;
673   }
674
675   /* This seems a bit complicated, but "random_timeout" is likely larger than
676    * RAND_MAX, so we can't simply use modulo here. */
677   dbl_timeout = CDTIME_T_TO_DOUBLE (random_timeout);
678   rand_fact = ((double) random ())
679     / ((double) RAND_MAX);
680   negative = (_Bool) (random () % 2);
681
682   ctm_timeout = DOUBLE_TO_CDTIME_T (dbl_timeout * rand_fact);
683
684   ret = (int64_t) ctm_timeout;
685   if (negative)
686     ret *= -1;
687
688   return (ret);
689 } /* int64_t rrd_get_random_variation */
690
691 static int rrd_cache_insert (const char *filename,
692                 const char *value, cdtime_t value_time)
693 {
694         rrd_cache_t *rc = NULL;
695         int new_rc = 0;
696         char **values_new;
697
698         pthread_mutex_lock (&cache_lock);
699
700         /* This shouldn't happen, but it did happen at least once, so we'll be
701          * careful. */
702         if (cache == NULL)
703         {
704                 pthread_mutex_unlock (&cache_lock);
705                 WARNING ("rrdtool plugin: cache == NULL.");
706                 return (-1);
707         }
708
709         c_avl_get (cache, filename, (void *) &rc);
710
711         if (rc == NULL)
712         {
713                 rc = malloc (sizeof (*rc));
714                 if (rc == NULL)
715                         return (-1);
716                 rc->values_num = 0;
717                 rc->values = NULL;
718                 rc->first_value = 0;
719                 rc->last_value = 0;
720                 rc->random_variation = rrd_get_random_variation ();
721                 rc->flags = FLAG_NONE;
722                 new_rc = 1;
723         }
724
725         if (rc->last_value >= value_time)
726         {
727                 pthread_mutex_unlock (&cache_lock);
728                 DEBUG ("rrdtool plugin: (rc->last_value = %"PRIu64") "
729                                 ">= (value_time = %"PRIu64")",
730                                 rc->last_value, value_time);
731                 return (-1);
732         }
733
734         values_new = (char **) realloc ((void *) rc->values,
735                         (rc->values_num + 1) * sizeof (char *));
736         if (values_new == NULL)
737         {
738                 char errbuf[1024];
739                 void *cache_key = NULL;
740
741                 sstrerror (errno, errbuf, sizeof (errbuf));
742
743                 c_avl_remove (cache, filename, &cache_key, NULL);
744                 pthread_mutex_unlock (&cache_lock);
745
746                 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
747
748                 sfree (cache_key);
749                 sfree (rc->values);
750                 sfree (rc);
751                 return (-1);
752         }
753         rc->values = values_new;
754
755         rc->values[rc->values_num] = strdup (value);
756         if (rc->values[rc->values_num] != NULL)
757                 rc->values_num++;
758
759         if (rc->values_num == 1)
760                 rc->first_value = value_time;
761         rc->last_value = value_time;
762
763         /* Insert if this is the first value */
764         if (new_rc == 1)
765         {
766                 void *cache_key = strdup (filename);
767
768                 if (cache_key == NULL)
769                 {
770                         char errbuf[1024];
771                         sstrerror (errno, errbuf, sizeof (errbuf));
772
773                         pthread_mutex_unlock (&cache_lock);
774
775                         ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
776
777                         sfree (rc->values[0]);
778                         sfree (rc->values);
779                         sfree (rc);
780                         return (-1);
781                 }
782
783                 c_avl_insert (cache, cache_key, rc);
784         }
785
786         DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
787                         "values_num = %i; age = %.3f;",
788                         filename, rc->values_num,
789                         CDTIME_T_TO_DOUBLE (rc->last_value - rc->first_value));
790
791         if ((rc->last_value - rc->first_value) >= (cache_timeout + rc->random_variation))
792         {
793                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
794                  * the same time, ALWAYS lock `cache_lock' first! */
795                 if (rc->flags == FLAG_NONE)
796                 {
797                         int status;
798
799                         status = rrd_queue_enqueue (filename, &queue_head, &queue_tail);
800                         if (status == 0)
801                                 rc->flags = FLAG_QUEUED;
802
803                         rc->random_variation = rrd_get_random_variation ();
804                 }
805                 else
806                 {
807                         DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
808                 }
809         }
810
811         if ((cache_timeout > 0) &&
812                         ((cdtime () - cache_flush_last) > cache_flush_timeout))
813                 rrd_cache_flush (cache_flush_timeout);
814
815         pthread_mutex_unlock (&cache_lock);
816
817         return (0);
818 } /* int rrd_cache_insert */
819
820 static int rrd_cache_destroy (void) /* {{{ */
821 {
822   void *key = NULL;
823   void *value = NULL;
824
825   int non_empty = 0;
826
827   pthread_mutex_lock (&cache_lock);
828
829   if (cache == NULL)
830   {
831     pthread_mutex_unlock (&cache_lock);
832     return (0);
833   }
834
835   while (c_avl_pick (cache, &key, &value) == 0)
836   {
837     rrd_cache_t *rc;
838     int i;
839
840     sfree (key);
841     key = NULL;
842
843     rc = value;
844     value = NULL;
845
846     if (rc->values_num > 0)
847       non_empty++;
848
849     for (i = 0; i < rc->values_num; i++)
850       sfree (rc->values[i]);
851     sfree (rc->values);
852     sfree (rc);
853   }
854
855   c_avl_destroy (cache);
856   cache = NULL;
857
858   if (non_empty > 0)
859   {
860     INFO ("rrdtool plugin: %i cache %s had values when destroying the cache.",
861         non_empty, (non_empty == 1) ? "entry" : "entries");
862   }
863   else
864   {
865     DEBUG ("rrdtool plugin: No values have been lost "
866         "when destroying the cache.");
867   }
868
869   pthread_mutex_unlock (&cache_lock);
870   return (0);
871 } /* }}} int rrd_cache_destroy */
872
873 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
874 {
875         int a = *((int *) a_ptr);
876         int b = *((int *) b_ptr);
877
878         if (a < b)
879                 return (-1);
880         else if (a > b)
881                 return (1);
882         else
883                 return (0);
884 } /* int rrd_compare_numeric */
885
886 static int rrd_write (const data_set_t *ds, const value_list_t *vl,
887                 user_data_t __attribute__((unused)) *user_data)
888 {
889         struct stat  statbuf;
890         char         filename[512];
891         char         values[512];
892         int          status;
893
894         if (do_shutdown)
895                 return (0);
896
897         if (0 != strcmp (ds->type, vl->type)) {
898                 ERROR ("rrdtool plugin: DS type does not match value list type");
899                 return -1;
900         }
901
902         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
903                 return (-1);
904
905         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
906                 return (-1);
907
908         if (stat (filename, &statbuf) == -1)
909         {
910                 if (errno == ENOENT)
911                 {
912                         status = cu_rrd_create_file (filename,
913                                         ds, vl, &rrdcreate_config);
914                         if (status != 0)
915                                 return (-1);
916                         else if (rrdcreate_config.async)
917                                 return (0);
918                 }
919                 else
920                 {
921                         char errbuf[1024];
922                         ERROR ("stat(%s) failed: %s", filename,
923                                         sstrerror (errno, errbuf,
924                                                 sizeof (errbuf)));
925                         return (-1);
926                 }
927         }
928         else if (!S_ISREG (statbuf.st_mode))
929         {
930                 ERROR ("stat(%s): Not a regular file!",
931                                 filename);
932                 return (-1);
933         }
934
935         status = rrd_cache_insert (filename, values, vl->time);
936
937         return (status);
938 } /* int rrd_write */
939
940 static int rrd_flush (cdtime_t timeout, const char *identifier,
941                 __attribute__((unused)) user_data_t *user_data)
942 {
943         pthread_mutex_lock (&cache_lock);
944
945         if (cache == NULL) {
946                 pthread_mutex_unlock (&cache_lock);
947                 return (0);
948         }
949
950         rrd_cache_flush_identifier (timeout, identifier);
951
952         pthread_mutex_unlock (&cache_lock);
953         return (0);
954 } /* int rrd_flush */
955
956 static int rrd_config (const char *key, const char *value)
957 {
958         if (strcasecmp ("CacheTimeout", key) == 0)
959         {
960                 double tmp = atof (value);
961                 if (tmp < 0)
962                 {
963                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
964                                         "be greater than 0.\n");
965                         ERROR ("rrdtool: `CacheTimeout' must "
966                                         "be greater than 0.\n");
967                         return (1);
968                 }
969                 cache_timeout = DOUBLE_TO_CDTIME_T (tmp);
970         }
971         else if (strcasecmp ("CacheFlush", key) == 0)
972         {
973                 int tmp = atoi (value);
974                 if (tmp < 0)
975                 {
976                         fprintf (stderr, "rrdtool: `CacheFlush' must "
977                                         "be greater than 0.\n");
978                         ERROR ("rrdtool: `CacheFlush' must "
979                                         "be greater than 0.\n");
980                         return (1);
981                 }
982                 cache_flush_timeout = tmp;
983         }
984         else if (strcasecmp ("DataDir", key) == 0)
985         {
986                 if (datadir != NULL)
987                         free (datadir);
988                 datadir = strdup (value);
989                 if (datadir != NULL)
990                 {
991                         int len = strlen (datadir);
992                         while ((len > 0) && (datadir[len - 1] == '/'))
993                         {
994                                 len--;
995                                 datadir[len] = '\0';
996                         }
997                         if (len <= 0)
998                         {
999                                 free (datadir);
1000                                 datadir = NULL;
1001                         }
1002                 }
1003         }
1004         else if (strcasecmp ("StepSize", key) == 0)
1005         {
1006                 unsigned long temp = strtoul (value, NULL, 0);
1007                 if (temp > 0)
1008                         rrdcreate_config.stepsize = temp;
1009         }
1010         else if (strcasecmp ("HeartBeat", key) == 0)
1011         {
1012                 int temp = atoi (value);
1013                 if (temp > 0)
1014                         rrdcreate_config.heartbeat = temp;
1015         }
1016         else if (strcasecmp ("CreateFilesAsync", key) == 0)
1017         {
1018                 if (IS_TRUE (value))
1019                         rrdcreate_config.async = 1;
1020                 else
1021                         rrdcreate_config.async = 0;
1022         }
1023         else if (strcasecmp ("RRARows", key) == 0)
1024         {
1025                 int tmp = atoi (value);
1026                 if (tmp <= 0)
1027                 {
1028                         fprintf (stderr, "rrdtool: `RRARows' must "
1029                                         "be greater than 0.\n");
1030                         ERROR ("rrdtool: `RRARows' must "
1031                                         "be greater than 0.\n");
1032                         return (1);
1033                 }
1034                 rrdcreate_config.rrarows = tmp;
1035         }
1036         else if (strcasecmp ("RRATimespan", key) == 0)
1037         {
1038                 char *saveptr = NULL;
1039                 char *dummy;
1040                 char *ptr;
1041                 char *value_copy;
1042                 int *tmp_alloc;
1043
1044                 value_copy = strdup (value);
1045                 if (value_copy == NULL)
1046                         return (1);
1047
1048                 dummy = value_copy;
1049                 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
1050                 {
1051                         dummy = NULL;
1052                         
1053                         tmp_alloc = realloc (rrdcreate_config.timespans,
1054                                         sizeof (int) * (rrdcreate_config.timespans_num + 1));
1055                         if (tmp_alloc == NULL)
1056                         {
1057                                 fprintf (stderr, "rrdtool: realloc failed.\n");
1058                                 ERROR ("rrdtool: realloc failed.\n");
1059                                 free (value_copy);
1060                                 return (1);
1061                         }
1062                         rrdcreate_config.timespans = tmp_alloc;
1063                         rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi (ptr);
1064                         if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
1065                                 rrdcreate_config.timespans_num++;
1066                 } /* while (strtok_r) */
1067
1068                 qsort (/* base = */ rrdcreate_config.timespans,
1069                                 /* nmemb  = */ rrdcreate_config.timespans_num,
1070                                 /* size   = */ sizeof (rrdcreate_config.timespans[0]),
1071                                 /* compar = */ rrd_compare_numeric);
1072
1073                 free (value_copy);
1074         }
1075         else if (strcasecmp ("XFF", key) == 0)
1076         {
1077                 double tmp = atof (value);
1078                 if ((tmp < 0.0) || (tmp >= 1.0))
1079                 {
1080                         fprintf (stderr, "rrdtool: `XFF' must "
1081                                         "be in the range 0 to 1 (exclusive).");
1082                         ERROR ("rrdtool: `XFF' must "
1083                                         "be in the range 0 to 1 (exclusive).");
1084                         return (1);
1085                 }
1086                 rrdcreate_config.xff = tmp;
1087         }
1088         else if (strcasecmp ("WritesPerSecond", key) == 0)
1089         {
1090                 double wps = atof (value);
1091
1092                 if (wps < 0.0)
1093                 {
1094                         fprintf (stderr, "rrdtool: `WritesPerSecond' must be "
1095                                         "greater than or equal to zero.");
1096                         return (1);
1097                 }
1098                 else if (wps == 0.0)
1099                 {
1100                         write_rate = 0.0;
1101                 }
1102                 else
1103                 {
1104                         write_rate = 1.0 / wps;
1105                 }
1106         }
1107         else if (strcasecmp ("RandomTimeout", key) == 0)
1108         {
1109                 double tmp;
1110
1111                 tmp = atof (value);
1112                 if (tmp < 0.0)
1113                 {
1114                         fprintf (stderr, "rrdtool: `RandomTimeout' must "
1115                                         "be greater than or equal to zero.\n");
1116                         ERROR ("rrdtool: `RandomTimeout' must "
1117                                         "be greater then or equal to zero.");
1118                 }
1119                 else
1120                 {
1121                         random_timeout = DOUBLE_TO_CDTIME_T (tmp);
1122                 }
1123         }
1124         else
1125         {
1126                 return (-1);
1127         }
1128         return (0);
1129 } /* int rrd_config */
1130
1131 static int rrd_shutdown (void)
1132 {
1133         pthread_mutex_lock (&cache_lock);
1134         rrd_cache_flush (0);
1135         pthread_mutex_unlock (&cache_lock);
1136
1137         pthread_mutex_lock (&queue_lock);
1138         do_shutdown = 1;
1139         pthread_cond_signal (&queue_cond);
1140         pthread_mutex_unlock (&queue_lock);
1141
1142         if ((queue_thread_running != 0)
1143                         && ((queue_head != NULL) || (flushq_head != NULL)))
1144         {
1145                 INFO ("rrdtool plugin: Shutting down the queue thread. "
1146                                 "This may take a while.");
1147         }
1148         else if (queue_thread_running != 0)
1149         {
1150                 INFO ("rrdtool plugin: Shutting down the queue thread.");
1151         }
1152
1153         /* Wait for all the values to be written to disk before returning. */
1154         if (queue_thread_running != 0)
1155         {
1156                 pthread_join (queue_thread, NULL);
1157                 memset (&queue_thread, 0, sizeof (queue_thread));
1158                 queue_thread_running = 0;
1159                 DEBUG ("rrdtool plugin: queue_thread exited.");
1160         }
1161
1162         rrd_cache_destroy ();
1163
1164         return (0);
1165 } /* int rrd_shutdown */
1166
1167 static int rrd_init (void)
1168 {
1169         static int init_once = 0;
1170         int status;
1171
1172         if (init_once != 0)
1173                 return (0);
1174         init_once = 1;
1175
1176         if (rrdcreate_config.heartbeat <= 0)
1177                 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1178
1179         /* Set the cache up */
1180         pthread_mutex_lock (&cache_lock);
1181
1182         cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1183         if (cache == NULL)
1184         {
1185                 ERROR ("rrdtool plugin: c_avl_create failed.");
1186                 return (-1);
1187         }
1188
1189         cache_flush_last = cdtime ();
1190         if (cache_timeout == 0)
1191         {
1192                 cache_flush_timeout = 0;
1193         }
1194         else if (cache_flush_timeout < cache_timeout)
1195                 cache_flush_timeout = 10 * cache_timeout;
1196
1197         pthread_mutex_unlock (&cache_lock);
1198
1199         status = plugin_thread_create (&queue_thread, /* attr = */ NULL,
1200                         rrd_queue_thread, /* args = */ NULL);
1201         if (status != 0)
1202         {
1203                 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1204                 return (-1);
1205         }
1206         queue_thread_running = 1;
1207
1208         DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1209                         " heartbeat = %i; rrarows = %i; xff = %lf;",
1210                         (datadir == NULL) ? "(null)" : datadir,
1211                         rrdcreate_config.stepsize,
1212                         rrdcreate_config.heartbeat,
1213                         rrdcreate_config.rrarows,
1214                         rrdcreate_config.xff);
1215
1216         return (0);
1217 } /* int rrd_init */
1218
1219 void module_register (void)
1220 {
1221         plugin_register_config ("rrdtool", rrd_config,
1222                         config_keys, config_keys_num);
1223         plugin_register_init ("rrdtool", rrd_init);
1224         plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL);
1225         plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL);
1226         plugin_register_shutdown ("rrdtool", rrd_shutdown);
1227 }