Merge branch 'collectd-4.5' into collectd-4.6
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
26 #include "utils_rrdcreate.h"
27
28 #include <rrd.h>
29
30 #if HAVE_PTHREAD_H
31 # include <pthread.h>
32 #endif
33
34 /*
35  * Private types
36  */
37 struct rrd_cache_s
38 {
39         int    values_num;
40         char **values;
41         time_t first_value;
42         time_t last_value;
43         enum
44         {
45                 FLAG_NONE   = 0x00,
46                 FLAG_QUEUED = 0x01,
47                 FLAG_FLUSHQ = 0x02
48         } flags;
49 };
50 typedef struct rrd_cache_s rrd_cache_t;
51
52 enum rrd_queue_dir_e
53 {
54   QUEUE_INSERT_FRONT,
55   QUEUE_INSERT_BACK
56 };
57 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
58
59 struct rrd_queue_s
60 {
61         char *filename;
62         struct rrd_queue_s *next;
63 };
64 typedef struct rrd_queue_s rrd_queue_t;
65
66 /*
67  * Private variables
68  */
69 static const char *config_keys[] =
70 {
71         "CacheTimeout",
72         "CacheFlush",
73         "DataDir",
74         "StepSize",
75         "HeartBeat",
76         "RRARows",
77         "RRATimespan",
78         "XFF",
79         "WritesPerSecond"
80 };
81 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
82
83 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
84  * is zero a default, depending on the `interval' member of the value list is
85  * being used. */
86 static char *datadir   = NULL;
87 static double write_rate = 0.0;
88 static rrdcreate_config_t rrdcreate_config =
89 {
90         /* stepsize = */ 0,
91         /* heartbeat = */ 0,
92         /* rrarows = */ 1200,
93         /* xff = */ 0.1,
94
95         /* timespans = */ NULL,
96         /* timespans_num = */ 0,
97
98         /* consolidation_functions = */ NULL,
99         /* consolidation_functions_num = */ 0
100 };
101
102 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
103  * ALWAYS lock `cache_lock' first! */
104 static int         cache_timeout = 0;
105 static int         cache_flush_timeout = 0;
106 static time_t      cache_flush_last;
107 static c_avl_tree_t *cache = NULL;
108 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
109
110 static rrd_queue_t    *queue_head = NULL;
111 static rrd_queue_t    *queue_tail = NULL;
112 static rrd_queue_t    *flushq_head = NULL;
113 static rrd_queue_t    *flushq_tail = NULL;
114 static pthread_t       queue_thread = 0;
115 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
116 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
117
118 #if !HAVE_THREADSAFE_LIBRRD
119 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
120 #endif
121
122 static int do_shutdown = 0;
123
124 #if HAVE_THREADSAFE_LIBRRD
125 static int srrd_update (char *filename, char *template,
126                 int argc, const char **argv)
127 {
128         int status;
129
130         optind = 0; /* bug in librrd? */
131         rrd_clear_error ();
132
133         status = rrd_update_r (filename, template, argc, (void *) argv);
134
135         if (status != 0)
136         {
137                 WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
138                                 filename, rrd_get_error ());
139         }
140
141         return (status);
142 } /* int srrd_update */
143 /* #endif HAVE_THREADSAFE_LIBRRD */
144
145 #else /* !HAVE_THREADSAFE_LIBRRD */
146 static int srrd_update (char *filename, char *template,
147                 int argc, const char **argv)
148 {
149         int status;
150
151         int new_argc;
152         char **new_argv;
153
154         assert (template == NULL);
155
156         new_argc = 2 + argc;
157         new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
158         if (new_argv == NULL)
159         {
160                 ERROR ("rrdtool plugin: malloc failed.");
161                 return (-1);
162         }
163
164         new_argv[0] = "update";
165         new_argv[1] = filename;
166
167         memcpy (new_argv + 2, argv, argc * sizeof (char *));
168         new_argv[new_argc] = NULL;
169
170         pthread_mutex_lock (&librrd_lock);
171         optind = 0; /* bug in librrd? */
172         rrd_clear_error ();
173
174         status = rrd_update (new_argc, new_argv);
175         pthread_mutex_unlock (&librrd_lock);
176
177         if (status != 0)
178         {
179                 WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
180                                 argv[1], rrd_get_error ());
181         }
182
183         sfree (new_argv);
184
185         return (status);
186 } /* int srrd_update */
187 #endif /* !HAVE_THREADSAFE_LIBRRD */
188
189 static int value_list_to_string (char *buffer, int buffer_len,
190                 const data_set_t *ds, const value_list_t *vl)
191 {
192         int offset;
193         int status;
194         int i;
195
196         memset (buffer, '\0', buffer_len);
197
198         status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
199         if ((status < 1) || (status >= buffer_len))
200                 return (-1);
201         offset = status;
202
203         for (i = 0; i < ds->ds_num; i++)
204         {
205                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
206                                 && (ds->ds[i].type != DS_TYPE_GAUGE))
207                         return (-1);
208
209                 if (ds->ds[i].type == DS_TYPE_COUNTER)
210                         status = ssnprintf (buffer + offset, buffer_len - offset,
211                                         ":%llu", vl->values[i].counter);
212                 else
213                         status = ssnprintf (buffer + offset, buffer_len - offset,
214                                         ":%lf", vl->values[i].gauge);
215
216                 if ((status < 1) || (status >= (buffer_len - offset)))
217                         return (-1);
218
219                 offset += status;
220         } /* for ds->ds_num */
221
222         return (0);
223 } /* int value_list_to_string */
224
225 static int value_list_to_filename (char *buffer, int buffer_len,
226                 const data_set_t __attribute__((unused)) *ds, const value_list_t *vl)
227 {
228         int offset = 0;
229         int status;
230
231         if (datadir != NULL)
232         {
233                 status = ssnprintf (buffer + offset, buffer_len - offset,
234                                 "%s/", datadir);
235                 if ((status < 1) || (status >= buffer_len - offset))
236                         return (-1);
237                 offset += status;
238         }
239
240         status = ssnprintf (buffer + offset, buffer_len - offset,
241                         "%s/", vl->host);
242         if ((status < 1) || (status >= buffer_len - offset))
243                 return (-1);
244         offset += status;
245
246         if (strlen (vl->plugin_instance) > 0)
247                 status = ssnprintf (buffer + offset, buffer_len - offset,
248                                 "%s-%s/", vl->plugin, vl->plugin_instance);
249         else
250                 status = ssnprintf (buffer + offset, buffer_len - offset,
251                                 "%s/", vl->plugin);
252         if ((status < 1) || (status >= buffer_len - offset))
253                 return (-1);
254         offset += status;
255
256         if (strlen (vl->type_instance) > 0)
257                 status = ssnprintf (buffer + offset, buffer_len - offset,
258                                 "%s-%s.rrd", vl->type, vl->type_instance);
259         else
260                 status = ssnprintf (buffer + offset, buffer_len - offset,
261                                 "%s.rrd", vl->type);
262         if ((status < 1) || (status >= buffer_len - offset))
263                 return (-1);
264         offset += status;
265
266         return (0);
267 } /* int value_list_to_filename */
268
269 static void *rrd_queue_thread (void __attribute__((unused)) *data)
270 {
271         struct timeval tv_next_update;
272         struct timeval tv_now;
273
274         gettimeofday (&tv_next_update, /* timezone = */ NULL);
275
276         while (42)
277         {
278                 rrd_queue_t *queue_entry;
279                 rrd_cache_t *cache_entry;
280                 char **values;
281                 int    values_num;
282                 int    status;
283                 int    i;
284
285                 values = NULL;
286                 values_num = 0;
287
288                 pthread_mutex_lock (&queue_lock);
289                 /* Wait for values to arrive */
290                 while (true)
291                 {
292                   struct timespec ts_wait;
293
294                   while ((flushq_head == NULL) && (queue_head == NULL)
295                       && (do_shutdown == 0))
296                     pthread_cond_wait (&queue_cond, &queue_lock);
297
298                   if ((flushq_head == NULL) && (queue_head == NULL))
299                     break;
300
301                   /* Don't delay if there's something to flush */
302                   if (flushq_head != NULL)
303                     break;
304
305                   /* Don't delay if we're shutting down */
306                   if (do_shutdown != 0)
307                     break;
308
309                   /* Don't delay if no delay was configured. */
310                   if (write_rate <= 0.0)
311                     break;
312
313                   gettimeofday (&tv_now, /* timezone = */ NULL);
314                   status = timeval_cmp (tv_next_update, tv_now, NULL);
315                   /* We're good to go */
316                   if (status <= 0)
317                     break;
318
319                   /* We're supposed to wait a bit with this update, so we'll
320                    * wait for the next addition to the queue or to the end of
321                    * the wait period - whichever comes first. */
322                   ts_wait.tv_sec = tv_next_update.tv_sec;
323                   ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
324
325                   status = pthread_cond_timedwait (&queue_cond, &queue_lock,
326                       &ts_wait);
327                   if (status == ETIMEDOUT)
328                     break;
329                 } /* while (true) */
330
331                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
332                  * the same time, ALWAYS lock `cache_lock' first! */
333
334                 /* We're in the shutdown phase */
335                 if ((flushq_head == NULL) && (queue_head == NULL))
336                 {
337                   pthread_mutex_unlock (&queue_lock);
338                   break;
339                 }
340
341                 if (flushq_head != NULL)
342                 {
343                   /* Dequeue the first flush entry */
344                   queue_entry = flushq_head;
345                   if (flushq_head == flushq_tail)
346                     flushq_head = flushq_tail = NULL;
347                   else
348                     flushq_head = flushq_head->next;
349                 }
350                 else /* if (queue_head != NULL) */
351                 {
352                   /* Dequeue the first regular entry */
353                   queue_entry = queue_head;
354                   if (queue_head == queue_tail)
355                     queue_head = queue_tail = NULL;
356                   else
357                     queue_head = queue_head->next;
358                 }
359
360                 /* Unlock the queue again */
361                 pthread_mutex_unlock (&queue_lock);
362
363                 /* We now need the cache lock so the entry isn't updated while
364                  * we make a copy of it's values */
365                 pthread_mutex_lock (&cache_lock);
366
367                 status = c_avl_get (cache, queue_entry->filename,
368                                 (void *) &cache_entry);
369
370                 if (status == 0)
371                 {
372                         values = cache_entry->values;
373                         values_num = cache_entry->values_num;
374
375                         cache_entry->values = NULL;
376                         cache_entry->values_num = 0;
377                         cache_entry->flags = FLAG_NONE;
378                 }
379
380                 pthread_mutex_unlock (&cache_lock);
381
382                 if (status != 0)
383                 {
384                         sfree (queue_entry->filename);
385                         sfree (queue_entry);
386                         continue;
387                 }
388
389                 /* Update `tv_next_update' */
390                 if (write_rate > 0.0) 
391                 {
392                   gettimeofday (&tv_now, /* timezone = */ NULL);
393                   tv_next_update.tv_sec = tv_now.tv_sec;
394                   tv_next_update.tv_usec = tv_now.tv_usec
395                     + ((suseconds_t) (1000000 * write_rate));
396                   while (tv_next_update.tv_usec > 1000000)
397                   {
398                     tv_next_update.tv_sec++;
399                     tv_next_update.tv_usec -= 1000000;
400                   }
401                 }
402
403                 /* Write the values to the RRD-file */
404                 srrd_update (queue_entry->filename, NULL,
405                                 values_num, (const char **)values);
406                 DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
407                                 values_num, queue_entry->filename);
408
409                 for (i = 0; i < values_num; i++)
410                 {
411                         sfree (values[i]);
412                 }
413                 sfree (values);
414                 sfree (queue_entry->filename);
415                 sfree (queue_entry);
416         } /* while (42) */
417
418         pthread_mutex_lock (&cache_lock);
419         c_avl_destroy (cache);
420         cache = NULL;
421         pthread_mutex_unlock (&cache_lock);
422
423         pthread_exit ((void *) 0);
424         return ((void *) 0);
425 } /* void *rrd_queue_thread */
426
427 static int rrd_queue_enqueue (const char *filename,
428     rrd_queue_t **head, rrd_queue_t **tail)
429 {
430   rrd_queue_t *queue_entry;
431
432   queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
433   if (queue_entry == NULL)
434     return (-1);
435
436   queue_entry->filename = strdup (filename);
437   if (queue_entry->filename == NULL)
438   {
439     free (queue_entry);
440     return (-1);
441   }
442
443   queue_entry->next = NULL;
444
445   pthread_mutex_lock (&queue_lock);
446
447   if (*tail == NULL)
448     *head = queue_entry;
449   else
450     (*tail)->next = queue_entry;
451   *tail = queue_entry;
452
453   pthread_cond_signal (&queue_cond);
454   pthread_mutex_unlock (&queue_lock);
455
456   return (0);
457 } /* int rrd_queue_enqueue */
458
459 static int rrd_queue_dequeue (const char *filename,
460     rrd_queue_t **head, rrd_queue_t **tail)
461 {
462   rrd_queue_t *this;
463   rrd_queue_t *prev;
464
465   pthread_mutex_lock (&queue_lock);
466
467   prev = NULL;
468   this = *head;
469
470   while (this != NULL)
471   {
472     if (strcmp (this->filename, filename) == 0)
473       break;
474     
475     prev = this;
476     this = this->next;
477   }
478
479   if (this == NULL)
480   {
481     pthread_mutex_unlock (&queue_lock);
482     return (-1);
483   }
484
485   if (prev == NULL)
486     *head = this->next;
487   else
488     prev->next = this->next;
489
490   if (this->next == NULL)
491     *tail = prev;
492
493   pthread_mutex_unlock (&queue_lock);
494
495   sfree (this->filename);
496   sfree (this);
497
498   return (0);
499 } /* int rrd_queue_dequeue */
500
501 static void rrd_cache_flush (int timeout)
502 {
503         rrd_cache_t *rc;
504         time_t       now;
505
506         char **keys = NULL;
507         int    keys_num = 0;
508
509         char *key;
510         c_avl_iterator_t *iter;
511         int i;
512
513         DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
514
515         now = time (NULL);
516
517         /* Build a list of entries to be flushed */
518         iter = c_avl_get_iterator (cache);
519         while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
520         {
521                 if (rc->flags != FLAG_NONE)
522                         continue;
523                 else if ((now - rc->first_value) < timeout)
524                         continue;
525                 else if (rc->values_num > 0)
526                 {
527                         int status;
528
529                         status = rrd_queue_enqueue (key, &queue_head,  &queue_tail);
530                         if (status == 0)
531                                 rc->flags = FLAG_QUEUED;
532                 }
533                 else /* ancient and no values -> waste of memory */
534                 {
535                         char **tmp = (char **) realloc ((void *) keys,
536                                         (keys_num + 1) * sizeof (char *));
537                         if (tmp == NULL)
538                         {
539                                 char errbuf[1024];
540                                 ERROR ("rrdtool plugin: "
541                                                 "realloc failed: %s",
542                                                 sstrerror (errno, errbuf,
543                                                         sizeof (errbuf)));
544                                 c_avl_iterator_destroy (iter);
545                                 sfree (keys);
546                                 return;
547                         }
548                         keys = tmp;
549                         keys[keys_num] = key;
550                         keys_num++;
551                 }
552         } /* while (c_avl_iterator_next) */
553         c_avl_iterator_destroy (iter);
554         
555         for (i = 0; i < keys_num; i++)
556         {
557                 if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
558                 {
559                         DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
560                         continue;
561                 }
562
563                 assert (rc->values == NULL);
564                 assert (rc->values_num == 0);
565
566                 sfree (rc);
567                 sfree (key);
568                 keys[i] = NULL;
569         } /* for (i = 0..keys_num) */
570
571         sfree (keys);
572
573         cache_flush_last = now;
574 } /* void rrd_cache_flush */
575
576 static int rrd_cache_flush_identifier (int timeout, const char *identifier)
577 {
578   rrd_cache_t *rc;
579   time_t now;
580   int status;
581   char key[2048];
582
583   if (identifier == NULL)
584   {
585     rrd_cache_flush (timeout);
586     return (0);
587   }
588
589   now = time (NULL);
590
591   if (datadir == NULL)
592     snprintf (key, sizeof (key), "%s.rrd",
593         identifier);
594   else
595     snprintf (key, sizeof (key), "%s/%s.rrd",
596         datadir, identifier);
597   key[sizeof (key) - 1] = 0;
598
599   status = c_avl_get (cache, key, (void *) &rc);
600   if (status != 0)
601   {
602     WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
603         "c_avl_get (%s) failed. Does that file really exist?",
604         key);
605     return (status);
606   }
607
608   if (rc->flags == FLAG_FLUSHQ)
609   {
610     status = 0;
611   }
612   else if (rc->flags == FLAG_QUEUED)
613   {
614     rrd_queue_dequeue (key, &queue_head, &queue_tail);
615     status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
616     if (status == 0)
617       rc->flags = FLAG_FLUSHQ;
618   }
619   else if ((now - rc->first_value) < timeout)
620   {
621     status = 0;
622   }
623   else if (rc->values_num > 0)
624   {
625     status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
626     if (status == 0)
627       rc->flags = FLAG_FLUSHQ;
628   }
629
630   return (status);
631 } /* int rrd_cache_flush_identifier */
632
633 static int rrd_cache_insert (const char *filename,
634                 const char *value, time_t value_time)
635 {
636         rrd_cache_t *rc = NULL;
637         int new_rc = 0;
638         char **values_new;
639
640         pthread_mutex_lock (&cache_lock);
641
642         /* This shouldn't happen, but it did happen at least once, so we'll be
643          * careful. */
644         if (cache == NULL)
645         {
646                 pthread_mutex_unlock (&cache_lock);
647                 WARNING ("rrdtool plugin: cache == NULL.");
648                 return (-1);
649         }
650
651         c_avl_get (cache, filename, (void *) &rc);
652
653         if (rc == NULL)
654         {
655                 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
656                 if (rc == NULL)
657                         return (-1);
658                 rc->values_num = 0;
659                 rc->values = NULL;
660                 rc->first_value = 0;
661                 rc->last_value = 0;
662                 rc->flags = FLAG_NONE;
663                 new_rc = 1;
664         }
665
666         if (rc->last_value >= value_time)
667         {
668                 pthread_mutex_unlock (&cache_lock);
669                 WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
670                                 (unsigned int) rc->last_value,
671                                 (unsigned int) value_time);
672                 return (-1);
673         }
674
675         values_new = (char **) realloc ((void *) rc->values,
676                         (rc->values_num + 1) * sizeof (char *));
677         if (values_new == NULL)
678         {
679                 char errbuf[1024];
680                 void *cache_key = NULL;
681
682                 sstrerror (errno, errbuf, sizeof (errbuf));
683
684                 c_avl_remove (cache, filename, &cache_key, NULL);
685                 pthread_mutex_unlock (&cache_lock);
686
687                 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
688
689                 sfree (cache_key);
690                 sfree (rc->values);
691                 sfree (rc);
692                 return (-1);
693         }
694         rc->values = values_new;
695
696         rc->values[rc->values_num] = strdup (value);
697         if (rc->values[rc->values_num] != NULL)
698                 rc->values_num++;
699
700         if (rc->values_num == 1)
701                 rc->first_value = value_time;
702         rc->last_value = value_time;
703
704         /* Insert if this is the first value */
705         if (new_rc == 1)
706         {
707                 void *cache_key = strdup (filename);
708
709                 if (cache_key == NULL)
710                 {
711                         char errbuf[1024];
712                         sstrerror (errno, errbuf, sizeof (errbuf));
713
714                         pthread_mutex_unlock (&cache_lock);
715
716                         ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
717
718                         sfree (rc->values[0]);
719                         sfree (rc->values);
720                         sfree (rc);
721                         return (-1);
722                 }
723
724                 c_avl_insert (cache, cache_key, rc);
725         }
726
727         DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
728                         "values_num = %i; age = %lu;",
729                         filename, rc->values_num,
730                         (unsigned long)(rc->last_value - rc->first_value));
731
732         if ((rc->last_value - rc->first_value) >= cache_timeout)
733         {
734                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
735                  * the same time, ALWAYS lock `cache_lock' first! */
736                 if (rc->flags == FLAG_NONE)
737                 {
738                         int status;
739
740                         status = rrd_queue_enqueue (filename, &queue_head, &queue_tail);
741                         if (status == 0)
742                                 rc->flags = FLAG_QUEUED;
743                 }
744                 else
745                 {
746                         DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
747                 }
748         }
749
750         if ((cache_timeout > 0) &&
751                         ((time (NULL) - cache_flush_last) > cache_flush_timeout))
752                 rrd_cache_flush (cache_flush_timeout);
753
754         pthread_mutex_unlock (&cache_lock);
755
756         return (0);
757 } /* int rrd_cache_insert */
758
759 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
760 {
761         int a = *((int *) a_ptr);
762         int b = *((int *) b_ptr);
763
764         if (a < b)
765                 return (-1);
766         else if (a > b)
767                 return (1);
768         else
769                 return (0);
770 } /* int rrd_compare_numeric */
771
772 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
773 {
774         struct stat  statbuf;
775         char         filename[512];
776         char         values[512];
777         int          status;
778
779         if (0 != strcmp (ds->type, vl->type)) {
780                 ERROR ("rrdtool plugin: DS type does not match value list type");
781                 return -1;
782         }
783
784         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
785                 return (-1);
786
787         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
788                 return (-1);
789
790         if (stat (filename, &statbuf) == -1)
791         {
792                 if (errno == ENOENT)
793                 {
794                         status = cu_rrd_create_file (filename,
795                                         ds, vl, &rrdcreate_config);
796                         if (status != 0)
797                                 return (-1);
798                 }
799                 else
800                 {
801                         char errbuf[1024];
802                         ERROR ("stat(%s) failed: %s", filename,
803                                         sstrerror (errno, errbuf,
804                                                 sizeof (errbuf)));
805                         return (-1);
806                 }
807         }
808         else if (!S_ISREG (statbuf.st_mode))
809         {
810                 ERROR ("stat(%s): Not a regular file!",
811                                 filename);
812                 return (-1);
813         }
814
815         status = rrd_cache_insert (filename, values, vl->time);
816
817         return (status);
818 } /* int rrd_write */
819
820 static int rrd_flush (int timeout, const char *identifier)
821 {
822         pthread_mutex_lock (&cache_lock);
823
824         if (cache == NULL) {
825                 pthread_mutex_unlock (&cache_lock);
826                 return (0);
827         }
828
829         rrd_cache_flush_identifier (timeout, identifier);
830
831         pthread_mutex_unlock (&cache_lock);
832         return (0);
833 } /* int rrd_flush */
834
835 static int rrd_config (const char *key, const char *value)
836 {
837         if (strcasecmp ("CacheTimeout", key) == 0)
838         {
839                 int tmp = atoi (value);
840                 if (tmp < 0)
841                 {
842                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
843                                         "be greater than 0.\n");
844                         ERROR ("rrdtool: `CacheTimeout' must "
845                                         "be greater than 0.\n");
846                         return (1);
847                 }
848                 cache_timeout = tmp;
849         }
850         else if (strcasecmp ("CacheFlush", key) == 0)
851         {
852                 int tmp = atoi (value);
853                 if (tmp < 0)
854                 {
855                         fprintf (stderr, "rrdtool: `CacheFlush' must "
856                                         "be greater than 0.\n");
857                         ERROR ("rrdtool: `CacheFlush' must "
858                                         "be greater than 0.\n");
859                         return (1);
860                 }
861                 cache_flush_timeout = tmp;
862         }
863         else if (strcasecmp ("DataDir", key) == 0)
864         {
865                 if (datadir != NULL)
866                         free (datadir);
867                 datadir = strdup (value);
868                 if (datadir != NULL)
869                 {
870                         int len = strlen (datadir);
871                         while ((len > 0) && (datadir[len - 1] == '/'))
872                         {
873                                 len--;
874                                 datadir[len] = '\0';
875                         }
876                         if (len <= 0)
877                         {
878                                 free (datadir);
879                                 datadir = NULL;
880                         }
881                 }
882         }
883         else if (strcasecmp ("StepSize", key) == 0)
884         {
885                 int temp = atoi (value);
886                 if (temp > 0)
887                         rrdcreate_config.stepsize = temp;
888         }
889         else if (strcasecmp ("HeartBeat", key) == 0)
890         {
891                 int temp = atoi (value);
892                 if (temp > 0)
893                         rrdcreate_config.heartbeat = temp;
894         }
895         else if (strcasecmp ("RRARows", key) == 0)
896         {
897                 int tmp = atoi (value);
898                 if (tmp <= 0)
899                 {
900                         fprintf (stderr, "rrdtool: `RRARows' must "
901                                         "be greater than 0.\n");
902                         ERROR ("rrdtool: `RRARows' must "
903                                         "be greater than 0.\n");
904                         return (1);
905                 }
906                 rrdcreate_config.rrarows = tmp;
907         }
908         else if (strcasecmp ("RRATimespan", key) == 0)
909         {
910                 char *saveptr = NULL;
911                 char *dummy;
912                 char *ptr;
913                 char *value_copy;
914                 int *tmp_alloc;
915
916                 value_copy = strdup (value);
917                 if (value_copy == NULL)
918                         return (1);
919
920                 dummy = value_copy;
921                 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
922                 {
923                         dummy = NULL;
924                         
925                         tmp_alloc = realloc (rrdcreate_config.timespans,
926                                         sizeof (int) * (rrdcreate_config.timespans_num + 1));
927                         if (tmp_alloc == NULL)
928                         {
929                                 fprintf (stderr, "rrdtool: realloc failed.\n");
930                                 ERROR ("rrdtool: realloc failed.\n");
931                                 free (value_copy);
932                                 return (1);
933                         }
934                         rrdcreate_config.timespans = tmp_alloc;
935                         rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi (ptr);
936                         if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
937                                 rrdcreate_config.timespans_num++;
938                 } /* while (strtok_r) */
939
940                 qsort (/* base = */ rrdcreate_config.timespans,
941                                 /* nmemb  = */ rrdcreate_config.timespans_num,
942                                 /* size   = */ sizeof (rrdcreate_config.timespans[0]),
943                                 /* compar = */ rrd_compare_numeric);
944
945                 free (value_copy);
946         }
947         else if (strcasecmp ("XFF", key) == 0)
948         {
949                 double tmp = atof (value);
950                 if ((tmp < 0.0) || (tmp >= 1.0))
951                 {
952                         fprintf (stderr, "rrdtool: `XFF' must "
953                                         "be in the range 0 to 1 (exclusive).");
954                         ERROR ("rrdtool: `XFF' must "
955                                         "be in the range 0 to 1 (exclusive).");
956                         return (1);
957                 }
958                 rrdcreate_config.xff = tmp;
959         }
960         else if (strcasecmp ("WritesPerSecond", key) == 0)
961         {
962                 double wps = atof (value);
963
964                 if (wps < 0.0)
965                 {
966                         fprintf (stderr, "rrdtool: `WritesPerSecond' must be "
967                                         "greater than or equal to zero.");
968                         return (1);
969                 }
970                 else if (wps == 0.0)
971                 {
972                         write_rate = 0.0;
973                 }
974                 else
975                 {
976                         write_rate = 1.0 / wps;
977                 }
978         }
979         else
980         {
981                 return (-1);
982         }
983         return (0);
984 } /* int rrd_config */
985
986 static int rrd_shutdown (void)
987 {
988         pthread_mutex_lock (&cache_lock);
989         rrd_cache_flush (-1);
990         pthread_mutex_unlock (&cache_lock);
991
992         pthread_mutex_lock (&queue_lock);
993         do_shutdown = 1;
994         pthread_cond_signal (&queue_cond);
995         pthread_mutex_unlock (&queue_lock);
996
997         /* Wait for all the values to be written to disk before returning. */
998         if (queue_thread != 0)
999         {
1000                 pthread_join (queue_thread, NULL);
1001                 queue_thread = 0;
1002                 DEBUG ("rrdtool plugin: queue_thread exited.");
1003         }
1004
1005         return (0);
1006 } /* int rrd_shutdown */
1007
1008 static int rrd_init (void)
1009 {
1010         int status;
1011
1012         if (rrdcreate_config.stepsize < 0)
1013                 rrdcreate_config.stepsize = 0;
1014         if (rrdcreate_config.heartbeat <= 0)
1015                 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1016
1017         if ((rrdcreate_config.heartbeat > 0)
1018                         && (rrdcreate_config.heartbeat < interval_g))
1019                 WARNING ("rrdtool plugin: Your `heartbeat' is "
1020                                 "smaller than your `interval'. This will "
1021                                 "likely cause problems.");
1022         else if ((rrdcreate_config.stepsize > 0)
1023                         && (rrdcreate_config.stepsize < interval_g))
1024                 WARNING ("rrdtool plugin: Your `stepsize' is "
1025                                 "smaller than your `interval'. This will "
1026                                 "create needlessly big RRD-files.");
1027
1028         /* Set the cache up */
1029         pthread_mutex_lock (&cache_lock);
1030
1031         cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1032         if (cache == NULL)
1033         {
1034                 ERROR ("rrdtool plugin: c_avl_create failed.");
1035                 return (-1);
1036         }
1037
1038         cache_flush_last = time (NULL);
1039         if (cache_timeout < 2)
1040         {
1041                 cache_timeout = 0;
1042                 cache_flush_timeout = 0;
1043         }
1044         else if (cache_flush_timeout < cache_timeout)
1045                 cache_flush_timeout = 10 * cache_timeout;
1046
1047         pthread_mutex_unlock (&cache_lock);
1048
1049         status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
1050         if (status != 0)
1051         {
1052                 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1053                 return (-1);
1054         }
1055
1056         DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
1057                         " heartbeat = %i; rrarows = %i; xff = %lf;",
1058                         (datadir == NULL) ? "(null)" : datadir,
1059                         rrdcreate_config.stepsize,
1060                         rrdcreate_config.heartbeat,
1061                         rrdcreate_config.rrarows,
1062                         rrdcreate_config.xff);
1063
1064         return (0);
1065 } /* int rrd_init */
1066
1067 void module_register (void)
1068 {
1069         plugin_register_config ("rrdtool", rrd_config,
1070                         config_keys, config_keys_num);
1071         plugin_register_init ("rrdtool", rrd_init);
1072         plugin_register_write ("rrdtool", rrd_write);
1073         plugin_register_flush ("rrdtool", rrd_flush);
1074         plugin_register_shutdown ("rrdtool", rrd_shutdown);
1075 }