rrdcached plugin: Add the `CollectStatistics' option.
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
26 #include "utils_rrdcreate.h"
27
28 #include <rrd.h>
29
30 #if HAVE_PTHREAD_H
31 # include <pthread.h>
32 #endif
33
34 /*
35  * Private types
36  */
37 struct rrd_cache_s
38 {
39         int    values_num;
40         char **values;
41         time_t first_value;
42         time_t last_value;
43         enum
44         {
45                 FLAG_NONE   = 0x00,
46                 FLAG_QUEUED = 0x01
47         } flags;
48 };
49 typedef struct rrd_cache_s rrd_cache_t;
50
51 enum rrd_queue_dir_e
52 {
53   QUEUE_INSERT_FRONT,
54   QUEUE_INSERT_BACK
55 };
56 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
57
58 struct rrd_queue_s
59 {
60         char *filename;
61         struct rrd_queue_s *next;
62 };
63 typedef struct rrd_queue_s rrd_queue_t;
64
65 /*
66  * Private variables
67  */
68 static const char *config_keys[] =
69 {
70         "CacheTimeout",
71         "CacheFlush",
72         "DataDir",
73         "StepSize",
74         "HeartBeat",
75         "RRARows",
76         "RRATimespan",
77         "XFF"
78 };
79 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
80
81 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
82  * is zero a default, depending on the `interval' member of the value list is
83  * being used. */
84 static char   *datadir   = NULL;
85 static rrdcreate_config_t rrdcreate_config =
86 {
87         /* stepsize = */ 0,
88         /* heartbeat = */ 0,
89         /* rrarows = */ 1200,
90         /* xff = */ 0.1,
91
92         /* timespans = */ NULL,
93         /* timespans_num = */ 0,
94
95         /* consolidation_functions = */ NULL,
96         /* consolidation_functions_num = */ 0
97 };
98
99 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
100  * ALWAYS lock `cache_lock' first! */
101 static int         cache_timeout = 0;
102 static int         cache_flush_timeout = 0;
103 static time_t      cache_flush_last;
104 static c_avl_tree_t *cache = NULL;
105 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
106
107 static rrd_queue_t    *queue_head = NULL;
108 static rrd_queue_t    *queue_tail = NULL;
109 static pthread_t       queue_thread = 0;
110 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
111 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
112
113 #if !HAVE_THREADSAFE_LIBRRD
114 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
115 #endif
116
117 static int do_shutdown = 0;
118
119 #if HAVE_THREADSAFE_LIBRRD
120 static int srrd_update (char *filename, char *template,
121                 int argc, const char **argv)
122 {
123         int status;
124
125         optind = 0; /* bug in librrd? */
126         rrd_clear_error ();
127
128         status = rrd_update_r (filename, template, argc, (void *) argv);
129
130         if (status != 0)
131         {
132                 WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
133                                 filename, rrd_get_error ());
134         }
135
136         return (status);
137 } /* int srrd_update */
138 /* #endif HAVE_THREADSAFE_LIBRRD */
139
140 #else /* !HAVE_THREADSAFE_LIBRRD */
141 static int srrd_update (char *filename, char *template,
142                 int argc, const char **argv)
143 {
144         int status;
145
146         int new_argc;
147         char **new_argv;
148
149         assert (template == NULL);
150
151         new_argc = 2 + argc;
152         new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
153         if (new_argv == NULL)
154         {
155                 ERROR ("rrdtool plugin: malloc failed.");
156                 return (-1);
157         }
158
159         new_argv[0] = "update";
160         new_argv[1] = filename;
161
162         memcpy (new_argv + 2, argv, argc * sizeof (char *));
163         new_argv[new_argc] = NULL;
164
165         pthread_mutex_lock (&librrd_lock);
166         optind = 0; /* bug in librrd? */
167         rrd_clear_error ();
168
169         status = rrd_update (new_argc, new_argv);
170         pthread_mutex_unlock (&librrd_lock);
171
172         if (status != 0)
173         {
174                 WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
175                                 argv[1], rrd_get_error ());
176         }
177
178         sfree (new_argv);
179
180         return (status);
181 } /* int srrd_update */
182 #endif /* !HAVE_THREADSAFE_LIBRRD */
183
184 static int value_list_to_string (char *buffer, int buffer_len,
185                 const data_set_t *ds, const value_list_t *vl)
186 {
187         int offset;
188         int status;
189         int i;
190
191         memset (buffer, '\0', buffer_len);
192
193         status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
194         if ((status < 1) || (status >= buffer_len))
195                 return (-1);
196         offset = status;
197
198         for (i = 0; i < ds->ds_num; i++)
199         {
200                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
201                                 && (ds->ds[i].type != DS_TYPE_GAUGE))
202                         return (-1);
203
204                 if (ds->ds[i].type == DS_TYPE_COUNTER)
205                         status = ssnprintf (buffer + offset, buffer_len - offset,
206                                         ":%llu", vl->values[i].counter);
207                 else
208                         status = ssnprintf (buffer + offset, buffer_len - offset,
209                                         ":%lf", vl->values[i].gauge);
210
211                 if ((status < 1) || (status >= (buffer_len - offset)))
212                         return (-1);
213
214                 offset += status;
215         } /* for ds->ds_num */
216
217         return (0);
218 } /* int value_list_to_string */
219
220 static int value_list_to_filename (char *buffer, int buffer_len,
221                 const data_set_t *ds, const value_list_t *vl)
222 {
223         int offset = 0;
224         int status;
225
226         if (datadir != NULL)
227         {
228                 status = ssnprintf (buffer + offset, buffer_len - offset,
229                                 "%s/", datadir);
230                 if ((status < 1) || (status >= buffer_len - offset))
231                         return (-1);
232                 offset += status;
233         }
234
235         status = ssnprintf (buffer + offset, buffer_len - offset,
236                         "%s/", vl->host);
237         if ((status < 1) || (status >= buffer_len - offset))
238                 return (-1);
239         offset += status;
240
241         if (strlen (vl->plugin_instance) > 0)
242                 status = ssnprintf (buffer + offset, buffer_len - offset,
243                                 "%s-%s/", vl->plugin, vl->plugin_instance);
244         else
245                 status = ssnprintf (buffer + offset, buffer_len - offset,
246                                 "%s/", vl->plugin);
247         if ((status < 1) || (status >= buffer_len - offset))
248                 return (-1);
249         offset += status;
250
251         if (strlen (vl->type_instance) > 0)
252                 status = ssnprintf (buffer + offset, buffer_len - offset,
253                                 "%s-%s.rrd", vl->type, vl->type_instance);
254         else
255                 status = ssnprintf (buffer + offset, buffer_len - offset,
256                                 "%s.rrd", vl->type);
257         if ((status < 1) || (status >= buffer_len - offset))
258                 return (-1);
259         offset += status;
260
261         return (0);
262 } /* int value_list_to_filename */
263
264 static void *rrd_queue_thread (void *data)
265 {
266         while (42)
267         {
268                 rrd_queue_t *queue_entry;
269                 rrd_cache_t *cache_entry;
270                 char **values;
271                 int    values_num;
272                 int    i;
273
274                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
275                  * the same time, ALWAYS lock `cache_lock' first! */
276
277                 /* wait until an entry is available */
278                 pthread_mutex_lock (&queue_lock);
279                 while ((queue_head == NULL) && (do_shutdown == 0))
280                         pthread_cond_wait (&queue_cond, &queue_lock);
281
282                 /* We're in the shutdown phase */
283                 if (queue_head == NULL)
284                 {
285                         pthread_mutex_unlock (&queue_lock);
286                         break;
287                 }
288
289                 /* Dequeue the first entry */
290                 queue_entry = queue_head;
291                 if (queue_head == queue_tail)
292                         queue_head = queue_tail = NULL;
293                 else
294                         queue_head = queue_head->next;
295
296                 /* Unlock the queue again */
297                 pthread_mutex_unlock (&queue_lock);
298
299                 /* We now need the cache lock so the entry isn't updated while
300                  * we make a copy of it's values */
301                 pthread_mutex_lock (&cache_lock);
302
303                 c_avl_get (cache, queue_entry->filename, (void *) &cache_entry);
304
305                 values = cache_entry->values;
306                 values_num = cache_entry->values_num;
307
308                 cache_entry->values = NULL;
309                 cache_entry->values_num = 0;
310                 cache_entry->flags = FLAG_NONE;
311
312                 pthread_mutex_unlock (&cache_lock);
313
314                 /* Write the values to the RRD-file */
315                 srrd_update (queue_entry->filename, NULL,
316                                 values_num, (const char **)values);
317                 DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
318                                 values_num, queue_entry->filename);
319
320                 for (i = 0; i < values_num; i++)
321                 {
322                         sfree (values[i]);
323                 }
324                 sfree (values);
325                 sfree (queue_entry->filename);
326                 sfree (queue_entry);
327         } /* while (42) */
328
329         pthread_mutex_lock (&cache_lock);
330         c_avl_destroy (cache);
331         cache = NULL;
332         pthread_mutex_unlock (&cache_lock);
333
334         pthread_exit ((void *) 0);
335         return ((void *) 0);
336 } /* void *rrd_queue_thread */
337
338 static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir)
339 {
340   rrd_queue_t *queue_entry;
341
342   queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
343   if (queue_entry == NULL)
344     return (-1);
345
346   queue_entry->filename = strdup (filename);
347   if (queue_entry->filename == NULL)
348   {
349     free (queue_entry);
350     return (-1);
351   }
352
353   queue_entry->next = NULL;
354
355   pthread_mutex_lock (&queue_lock);
356   if (dir == QUEUE_INSERT_FRONT)
357   {
358     queue_entry->next = queue_head;
359     queue_head = queue_entry;
360     if (queue_tail == NULL)
361       queue_tail = queue_head;
362   }
363   else /* (dir == QUEUE_INSERT_BACK) */
364   {
365     if (queue_tail == NULL)
366       queue_head = queue_entry;
367     else
368       queue_tail->next = queue_entry;
369     queue_tail = queue_entry;
370   }
371   pthread_cond_signal (&queue_cond);
372   pthread_mutex_unlock (&queue_lock);
373
374   DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
375
376   return (0);
377 } /* int rrd_queue_cache_entry */
378
379 static int rrd_queue_move_to_front (const char *filename)
380 {
381   rrd_queue_t *this;
382   rrd_queue_t *prev;
383
384   this = NULL;
385   prev = NULL;
386   pthread_mutex_lock (&queue_lock);
387   for (this = queue_head; this != NULL; this = this->next)
388   {
389     if (strcmp (this->filename, filename) == 0)
390       break;
391     prev = this;
392   }
393
394   /* Check if we found the entry and if it is NOT the first entry. */
395   if ((this != NULL) && (prev != NULL))
396   {
397     prev->next = this->next;
398     this->next = queue_head;
399     queue_head = this;
400   }
401   pthread_mutex_unlock (&queue_lock);
402
403   return (0);
404 } /* int rrd_queue_move_to_front */
405
406 static void rrd_cache_flush (int timeout)
407 {
408         rrd_cache_t *rc;
409         time_t       now;
410
411         char **keys = NULL;
412         int    keys_num = 0;
413
414         char *key;
415         c_avl_iterator_t *iter;
416         int i;
417
418         DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
419
420         now = time (NULL);
421
422         /* Build a list of entries to be flushed */
423         iter = c_avl_get_iterator (cache);
424         while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
425         {
426                 if (rc->flags == FLAG_QUEUED)
427                         continue;
428                 else if ((now - rc->first_value) < timeout)
429                         continue;
430                 else if (rc->values_num > 0)
431                 {
432                         if (rrd_queue_cache_entry (key, QUEUE_INSERT_BACK) == 0)
433                                 rc->flags = FLAG_QUEUED;
434                 }
435                 else /* ancient and no values -> waste of memory */
436                 {
437                         char **tmp = (char **) realloc ((void *) keys,
438                                         (keys_num + 1) * sizeof (char *));
439                         if (tmp == NULL)
440                         {
441                                 char errbuf[1024];
442                                 ERROR ("rrdtool plugin: "
443                                                 "realloc failed: %s",
444                                                 sstrerror (errno, errbuf,
445                                                         sizeof (errbuf)));
446                                 c_avl_iterator_destroy (iter);
447                                 sfree (keys);
448                                 return;
449                         }
450                         keys = tmp;
451                         keys[keys_num] = key;
452                         keys_num++;
453                 }
454         } /* while (c_avl_iterator_next) */
455         c_avl_iterator_destroy (iter);
456         
457         for (i = 0; i < keys_num; i++)
458         {
459                 if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
460                 {
461                         DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
462                         continue;
463                 }
464
465                 assert (rc->values == NULL);
466                 assert (rc->values_num == 0);
467
468                 sfree (rc);
469                 sfree (key);
470                 keys[i] = NULL;
471         } /* for (i = 0..keys_num) */
472
473         sfree (keys);
474
475         cache_flush_last = now;
476 } /* void rrd_cache_flush */
477
478 static int rrd_cache_flush_identifier (int timeout, const char *identifier)
479 {
480   rrd_cache_t *rc;
481   time_t now;
482   int status;
483   char key[2048];
484
485   if (identifier == NULL)
486   {
487     rrd_cache_flush (timeout);
488     return (0);
489   }
490
491   now = time (NULL);
492
493   if (datadir == NULL)
494           snprintf (key, sizeof (key), "%s.rrd",
495                           identifier);
496   else
497           snprintf (key, sizeof (key), "%s/%s.rrd",
498                           datadir, identifier);
499   key[sizeof (key) - 1] = 0;
500
501   status = c_avl_get (cache, key, (void *) &rc);
502   if (status != 0)
503   {
504     WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
505         "c_avl_get (%s) failed. Does that file really exist?",
506         key);
507     return (status);
508   }
509
510   if (rc->flags == FLAG_QUEUED)
511     status = rrd_queue_move_to_front (key);
512   else if ((now - rc->first_value) < timeout)
513     status = 0;
514   else if (rc->values_num > 0)
515   {
516     status = rrd_queue_cache_entry (key, QUEUE_INSERT_FRONT);
517     if (status == 0)
518       rc->flags = FLAG_QUEUED;
519   }
520
521   return (status);
522 } /* int rrd_cache_flush_identifier */
523
524 static int rrd_cache_insert (const char *filename,
525                 const char *value, time_t value_time)
526 {
527         rrd_cache_t *rc = NULL;
528         int new_rc = 0;
529         char **values_new;
530
531         pthread_mutex_lock (&cache_lock);
532
533         c_avl_get (cache, filename, (void *) &rc);
534
535         if (rc == NULL)
536         {
537                 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
538                 if (rc == NULL)
539                         return (-1);
540                 rc->values_num = 0;
541                 rc->values = NULL;
542                 rc->first_value = 0;
543                 rc->last_value = 0;
544                 rc->flags = FLAG_NONE;
545                 new_rc = 1;
546         }
547
548         if (rc->last_value >= value_time)
549         {
550                 pthread_mutex_unlock (&cache_lock);
551                 WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
552                                 (unsigned int) rc->last_value,
553                                 (unsigned int) value_time);
554                 return (-1);
555         }
556
557         values_new = (char **) realloc ((void *) rc->values,
558                         (rc->values_num + 1) * sizeof (char *));
559         if (values_new == NULL)
560         {
561                 char errbuf[1024];
562                 void *cache_key = NULL;
563
564                 sstrerror (errno, errbuf, sizeof (errbuf));
565
566                 c_avl_remove (cache, filename, &cache_key, NULL);
567                 pthread_mutex_unlock (&cache_lock);
568
569                 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
570
571                 sfree (cache_key);
572                 sfree (rc->values);
573                 sfree (rc);
574                 return (-1);
575         }
576         rc->values = values_new;
577
578         rc->values[rc->values_num] = strdup (value);
579         if (rc->values[rc->values_num] != NULL)
580                 rc->values_num++;
581
582         if (rc->values_num == 1)
583                 rc->first_value = value_time;
584         rc->last_value = value_time;
585
586         /* Insert if this is the first value */
587         if (new_rc == 1)
588         {
589                 void *cache_key = strdup (filename);
590
591                 if (cache_key == NULL)
592                 {
593                         char errbuf[1024];
594                         sstrerror (errno, errbuf, sizeof (errbuf));
595
596                         pthread_mutex_unlock (&cache_lock);
597
598                         ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
599
600                         sfree (rc->values[0]);
601                         sfree (rc->values);
602                         sfree (rc);
603                         return (-1);
604                 }
605
606                 c_avl_insert (cache, cache_key, rc);
607         }
608
609         DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
610                         "values_num = %i; age = %lu;",
611                         filename, rc->values_num,
612                         (unsigned long)(rc->last_value - rc->first_value));
613
614         if ((rc->last_value - rc->first_value) >= cache_timeout)
615         {
616                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
617                  * the same time, ALWAYS lock `cache_lock' first! */
618                 if (rc->flags != FLAG_QUEUED)
619                 {
620                         if (rrd_queue_cache_entry (filename, QUEUE_INSERT_BACK) == 0)
621                                 rc->flags = FLAG_QUEUED;
622                 }
623                 else
624                 {
625                         DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
626                 }
627         }
628
629         if ((cache_timeout > 0) &&
630                         ((time (NULL) - cache_flush_last) > cache_flush_timeout))
631                 rrd_cache_flush (cache_flush_timeout);
632
633
634         pthread_mutex_unlock (&cache_lock);
635
636         return (0);
637 } /* int rrd_cache_insert */
638
639 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
640 {
641         int a = *((int *) a_ptr);
642         int b = *((int *) b_ptr);
643
644         if (a < b)
645                 return (-1);
646         else if (a > b)
647                 return (1);
648         else
649                 return (0);
650 } /* int rrd_compare_numeric */
651
652 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
653 {
654         struct stat  statbuf;
655         char         filename[512];
656         char         values[512];
657         int          status;
658
659         if (0 != strcmp (ds->type, vl->type)) {
660                 ERROR ("rrdtool plugin: DS type does not match value list type");
661                 return -1;
662         }
663
664         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
665                 return (-1);
666
667         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
668                 return (-1);
669
670         if (stat (filename, &statbuf) == -1)
671         {
672                 if (errno == ENOENT)
673                 {
674                         status = cu_rrd_create_file (filename,
675                                         ds, vl, &rrdcreate_config);
676                         if (status != 0)
677                                 return (-1);
678                 }
679                 else
680                 {
681                         char errbuf[1024];
682                         ERROR ("stat(%s) failed: %s", filename,
683                                         sstrerror (errno, errbuf,
684                                                 sizeof (errbuf)));
685                         return (-1);
686                 }
687         }
688         else if (!S_ISREG (statbuf.st_mode))
689         {
690                 ERROR ("stat(%s): Not a regular file!",
691                                 filename);
692                 return (-1);
693         }
694
695         status = rrd_cache_insert (filename, values, vl->time);
696
697         return (status);
698 } /* int rrd_write */
699
700 static int rrd_flush (int timeout, const char *identifier)
701 {
702         pthread_mutex_lock (&cache_lock);
703
704         if (cache == NULL) {
705                 pthread_mutex_unlock (&cache_lock);
706                 return (0);
707         }
708
709         rrd_cache_flush_identifier (timeout, identifier);
710
711         pthread_mutex_unlock (&cache_lock);
712         return (0);
713 } /* int rrd_flush */
714
715 static int rrd_config (const char *key, const char *value)
716 {
717         if (strcasecmp ("CacheTimeout", key) == 0)
718         {
719                 int tmp = atoi (value);
720                 if (tmp < 0)
721                 {
722                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
723                                         "be greater than 0.\n");
724                         return (1);
725                 }
726                 cache_timeout = tmp;
727         }
728         else if (strcasecmp ("CacheFlush", key) == 0)
729         {
730                 int tmp = atoi (value);
731                 if (tmp < 0)
732                 {
733                         fprintf (stderr, "rrdtool: `CacheFlush' must "
734                                         "be greater than 0.\n");
735                         return (1);
736                 }
737                 cache_flush_timeout = tmp;
738         }
739         else if (strcasecmp ("DataDir", key) == 0)
740         {
741                 if (datadir != NULL)
742                         free (datadir);
743                 datadir = strdup (value);
744                 if (datadir != NULL)
745                 {
746                         int len = strlen (datadir);
747                         while ((len > 0) && (datadir[len - 1] == '/'))
748                         {
749                                 len--;
750                                 datadir[len] = '\0';
751                         }
752                         if (len <= 0)
753                         {
754                                 free (datadir);
755                                 datadir = NULL;
756                         }
757                 }
758         }
759         else if (strcasecmp ("StepSize", key) == 0)
760         {
761                 int temp = atoi (value);
762                 if (temp > 0)
763                         rrdcreate_config.stepsize = temp;
764         }
765         else if (strcasecmp ("HeartBeat", key) == 0)
766         {
767                 int temp = atoi (value);
768                 if (temp > 0)
769                         rrdcreate_config.heartbeat = temp;
770         }
771         else if (strcasecmp ("RRARows", key) == 0)
772         {
773                 int tmp = atoi (value);
774                 if (tmp <= 0)
775                 {
776                         fprintf (stderr, "rrdtool: `RRARows' must "
777                                         "be greater than 0.\n");
778                         return (1);
779                 }
780                 rrdcreate_config.rrarows = tmp;
781         }
782         else if (strcasecmp ("RRATimespan", key) == 0)
783         {
784                 char *saveptr = NULL;
785                 char *dummy;
786                 char *ptr;
787                 char *value_copy;
788                 int *tmp_alloc;
789
790                 value_copy = strdup (value);
791                 if (value_copy == NULL)
792                         return (1);
793
794                 dummy = value_copy;
795                 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
796                 {
797                         dummy = NULL;
798                         
799                         tmp_alloc = realloc (rrdcreate_config.timespans,
800                                         sizeof (int) * (rrdcreate_config.timespans_num + 1));
801                         if (tmp_alloc == NULL)
802                         {
803                                 fprintf (stderr, "rrdtool: realloc failed.\n");
804                                 free (value_copy);
805                                 return (1);
806                         }
807                         rrdcreate_config.timespans = tmp_alloc;
808                         rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi (ptr);
809                         if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
810                                 rrdcreate_config.timespans_num++;
811                 } /* while (strtok_r) */
812
813                 qsort (/* base = */ rrdcreate_config.timespans,
814                                 /* nmemb  = */ rrdcreate_config.timespans_num,
815                                 /* size   = */ sizeof (rrdcreate_config.timespans[0]),
816                                 /* compar = */ rrd_compare_numeric);
817
818                 free (value_copy);
819         }
820         else if (strcasecmp ("XFF", key) == 0)
821         {
822                 double tmp = atof (value);
823                 if ((tmp < 0.0) || (tmp >= 1.0))
824                 {
825                         fprintf (stderr, "rrdtool: `XFF' must "
826                                         "be in the range 0 to 1 (exclusive).");
827                         return (1);
828                 }
829                 rrdcreate_config.xff = tmp;
830         }
831         else
832         {
833                 return (-1);
834         }
835         return (0);
836 } /* int rrd_config */
837
838 static int rrd_shutdown (void)
839 {
840         pthread_mutex_lock (&cache_lock);
841         rrd_cache_flush (-1);
842         pthread_mutex_unlock (&cache_lock);
843
844         pthread_mutex_lock (&queue_lock);
845         do_shutdown = 1;
846         pthread_cond_signal (&queue_cond);
847         pthread_mutex_unlock (&queue_lock);
848
849         /* Wait for all the values to be written to disk before returning. */
850         if (queue_thread != 0)
851         {
852                 pthread_join (queue_thread, NULL);
853                 queue_thread = 0;
854                 DEBUG ("rrdtool plugin: queue_thread exited.");
855         }
856
857         return (0);
858 } /* int rrd_shutdown */
859
860 static int rrd_init (void)
861 {
862         int status;
863
864         if (rrdcreate_config.stepsize < 0)
865                 rrdcreate_config.stepsize = 0;
866         if (rrdcreate_config.heartbeat <= 0)
867                 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
868
869         if ((rrdcreate_config.heartbeat > 0)
870                         && (rrdcreate_config.heartbeat < interval_g))
871                 WARNING ("rrdtool plugin: Your `heartbeat' is "
872                                 "smaller than your `interval'. This will "
873                                 "likely cause problems.");
874         else if ((rrdcreate_config.stepsize > 0)
875                         && (rrdcreate_config.stepsize < interval_g))
876                 WARNING ("rrdtool plugin: Your `stepsize' is "
877                                 "smaller than your `interval'. This will "
878                                 "create needlessly big RRD-files.");
879
880         /* Set the cache up */
881         pthread_mutex_lock (&cache_lock);
882
883         cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
884         if (cache == NULL)
885         {
886                 ERROR ("rrdtool plugin: c_avl_create failed.");
887                 return (-1);
888         }
889
890         cache_flush_last = time (NULL);
891         if (cache_timeout < 2)
892         {
893                 cache_timeout = 0;
894                 cache_flush_timeout = 0;
895         }
896         else if (cache_flush_timeout < cache_timeout)
897                 cache_flush_timeout = 10 * cache_timeout;
898
899         pthread_mutex_unlock (&cache_lock);
900
901         status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
902         if (status != 0)
903         {
904                 ERROR ("rrdtool plugin: Cannot create queue-thread.");
905                 return (-1);
906         }
907
908         DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
909                         " heartbeat = %i; rrarows = %i; xff = %lf;",
910                         (datadir == NULL) ? "(null)" : datadir,
911                         rrdcreate_config.stepsize,
912                         rrdcreate_config.heartbeat,
913                         rrdcreate_config.rrarows,
914                         rrdcreate_config.xff);
915
916         return (0);
917 } /* int rrd_init */
918
919 void module_register (void)
920 {
921         plugin_register_config ("rrdtool", rrd_config,
922                         config_keys, config_keys_num);
923         plugin_register_init ("rrdtool", rrd_init);
924         plugin_register_write ("rrdtool", rrd_write);
925         plugin_register_flush ("rrdtool", rrd_flush);
926         plugin_register_shutdown ("rrdtool", rrd_shutdown);
927 }