Merge branch 'collectd-4.4'
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006,2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
26
27 #include <rrd.h>
28
29 #if HAVE_PTHREAD_H
30 # include <pthread.h>
31 #endif
32
33 /*
34  * Private types
35  */
36 struct rrd_cache_s
37 {
38         int    values_num;
39         char **values;
40         time_t first_value;
41         time_t last_value;
42         enum
43         {
44                 FLAG_NONE   = 0x00,
45                 FLAG_QUEUED = 0x01
46         } flags;
47 };
48 typedef struct rrd_cache_s rrd_cache_t;
49
50 struct rrd_queue_s
51 {
52         char *filename;
53         struct rrd_queue_s *next;
54 };
55 typedef struct rrd_queue_s rrd_queue_t;
56
57 /*
58  * Private variables
59  */
60 static int rra_timespans[] =
61 {
62         3600,
63         86400,
64         604800,
65         2678400,
66         31622400
67 };
68 static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans);
69
70 static int *rra_timespans_custom = NULL;
71 static int rra_timespans_custom_num = 0;
72
73 static char *rra_types[] =
74 {
75         "AVERAGE",
76         "MIN",
77         "MAX"
78 };
79 static int rra_types_num = STATIC_ARRAY_SIZE (rra_types);
80
81 static const char *config_keys[] =
82 {
83         "CacheTimeout",
84         "CacheFlush",
85         "DataDir",
86         "StepSize",
87         "HeartBeat",
88         "RRARows",
89         "RRATimespan",
90         "XFF"
91 };
92 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
93
94 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
95  * is zero a default, depending on the `interval' member of the value list is
96  * being used. */
97 static char   *datadir   = NULL;
98 static int     stepsize  = 0;
99 static int     heartbeat = 0;
100 static int     rrarows   = 1200;
101 static double  xff       = 0.1;
102
103 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
104  * ALWAYS lock `cache_lock' first! */
105 static int         cache_timeout = 0;
106 static int         cache_flush_timeout = 0;
107 static time_t      cache_flush_last;
108 static c_avl_tree_t *cache = NULL;
109 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
110
111 static rrd_queue_t    *queue_head = NULL;
112 static rrd_queue_t    *queue_tail = NULL;
113 static pthread_t       queue_thread = 0;
114 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
115 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
116
117 #if !HAVE_THREADSAFE_LIBRRD
118 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
119 #endif
120
121 static int do_shutdown = 0;
122
123 /* * * * * * * * * *
124  * WARNING:  Magic *
125  * * * * * * * * * */
126
127 static void rra_free (int rra_num, char **rra_def)
128 {
129         int i;
130
131         for (i = 0; i < rra_num; i++)
132         {
133                 sfree (rra_def[i]);
134         }
135         sfree (rra_def);
136 } /* void rra_free */
137
138 static int rra_get (char ***ret, const value_list_t *vl)
139 {
140         char **rra_def;
141         int rra_num;
142
143         int *rts;
144         int  rts_num;
145
146         int rra_max;
147
148         int span;
149
150         int cdp_num;
151         int cdp_len;
152         int i, j;
153
154         char buffer[64];
155
156         /* The stepsize we use here: If it is user-set, use it. If not, use the
157          * interval of the value-list. */
158         int ss;
159
160         if (rrarows <= 0)
161         {
162                 *ret = NULL;
163                 return (-1);
164         }
165
166         ss = (stepsize > 0) ? stepsize : vl->interval;
167         if (ss <= 0)
168         {
169                 *ret = NULL;
170                 return (-1);
171         }
172
173         /* Use the configured timespans or fall back to the built-in defaults */
174         if (rra_timespans_custom_num != 0)
175         {
176                 rts = rra_timespans_custom;
177                 rts_num = rra_timespans_custom_num;
178         }
179         else
180         {
181                 rts = rra_timespans;
182                 rts_num = rra_timespans_num;
183         }
184
185         rra_max = rts_num * rra_types_num;
186
187         if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
188                 return (-1);
189         memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
190         rra_num = 0;
191
192         cdp_len = 0;
193         for (i = 0; i < rts_num; i++)
194         {
195                 span = rts[i];
196
197                 if ((span / ss) < rrarows)
198                         span = ss * rrarows;
199
200                 if (cdp_len == 0)
201                         cdp_len = 1;
202                 else
203                         cdp_len = (int) floor (((double) span)
204                                         / ((double) (rrarows * ss)));
205
206                 cdp_num = (int) ceil (((double) span)
207                                 / ((double) (cdp_len * ss)));
208
209                 for (j = 0; j < rra_types_num; j++)
210                 {
211                         if (rra_num >= rra_max)
212                                 break;
213
214                         if (ssnprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u",
215                                                 rra_types[j], xff,
216                                                 cdp_len, cdp_num) >= sizeof (buffer))
217                         {
218                                 ERROR ("rra_get: Buffer would have been truncated.");
219                                 continue;
220                         }
221
222                         rra_def[rra_num++] = sstrdup (buffer);
223                 }
224         }
225
226 #if COLLECT_DEBUG
227         DEBUG ("rra_num = %i", rra_num);
228         for (i = 0; i < rra_num; i++)
229                 DEBUG ("  %s", rra_def[i]);
230 #endif
231
232         *ret = rra_def;
233         return (rra_num);
234 } /* int rra_get */
235
236 static void ds_free (int ds_num, char **ds_def)
237 {
238         int i;
239
240         for (i = 0; i < ds_num; i++)
241                 if (ds_def[i] != NULL)
242                         free (ds_def[i]);
243         free (ds_def);
244 }
245
246 static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
247 {
248         char **ds_def;
249         int ds_num;
250
251         char min[32];
252         char max[32];
253         char buffer[128];
254
255         DEBUG ("ds->ds_num = %i", ds->ds_num);
256
257         ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
258         if (ds_def == NULL)
259         {
260                 char errbuf[1024];
261                 ERROR ("rrdtool plugin: malloc failed: %s",
262                                 sstrerror (errno, errbuf, sizeof (errbuf)));
263                 return (-1);
264         }
265         memset (ds_def, '\0', ds->ds_num * sizeof (char *));
266
267         for (ds_num = 0; ds_num < ds->ds_num; ds_num++)
268         {
269                 data_source_t *d = ds->ds + ds_num;
270                 char *type;
271                 int status;
272
273                 ds_def[ds_num] = NULL;
274
275                 if (d->type == DS_TYPE_COUNTER)
276                         type = "COUNTER";
277                 else if (d->type == DS_TYPE_GAUGE)
278                         type = "GAUGE";
279                 else
280                 {
281                         ERROR ("rrdtool plugin: Unknown DS type: %i",
282                                         d->type);
283                         break;
284                 }
285
286                 if (isnan (d->min))
287                         strcpy (min, "U");
288                 else
289                         ssnprintf (min, sizeof (min), "%lf", d->min);
290
291                 if (isnan (d->max))
292                         strcpy (max, "U");
293                 else
294                         ssnprintf (max, sizeof (max), "%lf", d->max);
295
296                 status = ssnprintf (buffer, sizeof (buffer),
297                                 "DS:%s:%s:%i:%s:%s",
298                                 d->name, type,
299                                 (heartbeat > 0) ? heartbeat : (2 * vl->interval),
300                                 min, max);
301                 if ((status < 1) || (status >= sizeof (buffer)))
302                         break;
303
304                 ds_def[ds_num] = sstrdup (buffer);
305         } /* for ds_num = 0 .. ds->ds_num */
306
307 #if COLLECT_DEBUG
308 {
309         int i;
310         DEBUG ("ds_num = %i", ds_num);
311         for (i = 0; i < ds_num; i++)
312                 DEBUG ("  %s", ds_def[i]);
313 }
314 #endif
315
316         if (ds_num != ds->ds_num)
317         {
318                 ds_free (ds_num, ds_def);
319                 return (-1);
320         }
321
322         *ret = ds_def;
323         return (ds_num);
324 }
325
326 #if HAVE_THREADSAFE_LIBRRD
327 static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
328                 int argc, char **argv)
329 {
330         int status;
331
332         optind = 0; /* bug in librrd? */
333         rrd_clear_error ();
334
335         status = rrd_create_r (filename, pdp_step, last_up, argc, argv);
336
337         if (status != 0)
338         {
339                 WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s",
340                                 filename, rrd_get_error ());
341         }
342
343         return (status);
344 } /* int srrd_create */
345
346 static int srrd_update (char *filename, char *template, int argc, char **argv)
347 {
348         int status;
349
350         optind = 0; /* bug in librrd? */
351         rrd_clear_error ();
352
353         status = rrd_update_r (filename, template, argc, argv);
354
355         if (status != 0)
356         {
357                 WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
358                                 filename, rrd_get_error ());
359         }
360
361         return (status);
362 } /* int srrd_update */
363 /* #endif HAVE_THREADSAFE_LIBRRD */
364
365 #else /* !HAVE_THREADSAFE_LIBRRD */
366 static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
367                 int argc, char **argv)
368 {
369         int status;
370
371         int new_argc;
372         char **new_argv;
373
374         char pdp_step_str[16];
375         char last_up_str[16];
376
377         new_argc = 6 + argc;
378         new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
379         if (new_argv == NULL)
380         {
381                 ERROR ("rrdtool plugin: malloc failed.");
382                 return (-1);
383         }
384
385         if (last_up == 0)
386                 last_up = time (NULL) - 10;
387
388         ssnprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step);
389         ssnprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up);
390
391         new_argv[0] = "create";
392         new_argv[1] = filename;
393         new_argv[2] = "-s";
394         new_argv[3] = pdp_step_str;
395         new_argv[4] = "-b";
396         new_argv[5] = last_up_str;
397
398         memcpy (new_argv + 6, argv, argc * sizeof (char *));
399         new_argv[new_argc] = NULL;
400         
401         pthread_mutex_lock (&librrd_lock);
402         optind = 0; /* bug in librrd? */
403         rrd_clear_error ();
404
405         status = rrd_create (new_argc, new_argv);
406         pthread_mutex_unlock (&librrd_lock);
407
408         if (status != 0)
409         {
410                 WARNING ("rrdtool plugin: rrd_create (%s) failed: %s",
411                                 filename, rrd_get_error ());
412         }
413
414         sfree (new_argv);
415
416         return (status);
417 } /* int srrd_create */
418
419 static int srrd_update (char *filename, char *template, int argc, char **argv)
420 {
421         int status;
422
423         int new_argc;
424         char **new_argv;
425
426         assert (template == NULL);
427
428         new_argc = 2 + argc;
429         new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
430         if (new_argv == NULL)
431         {
432                 ERROR ("rrdtool plugin: malloc failed.");
433                 return (-1);
434         }
435
436         new_argv[0] = "update";
437         new_argv[1] = filename;
438
439         memcpy (new_argv + 2, argv, argc * sizeof (char *));
440         new_argv[new_argc] = NULL;
441
442         pthread_mutex_lock (&librrd_lock);
443         optind = 0; /* bug in librrd? */
444         rrd_clear_error ();
445
446         status = rrd_update (new_argc, new_argv);
447         pthread_mutex_unlock (&librrd_lock);
448
449         if (status != 0)
450         {
451                 WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
452                                 argv[1], rrd_get_error ());
453         }
454
455         sfree (new_argv);
456
457         return (status);
458 } /* int srrd_update */
459 #endif /* !HAVE_THREADSAFE_LIBRRD */
460
461 static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
462 {
463         char **argv;
464         int argc;
465         char **rra_def;
466         int rra_num;
467         char **ds_def;
468         int ds_num;
469         int status = 0;
470
471         if (check_create_dir (filename))
472                 return (-1);
473
474         if ((rra_num = rra_get (&rra_def, vl)) < 1)
475         {
476                 ERROR ("rrd_create_file failed: Could not calculate RRAs");
477                 return (-1);
478         }
479
480         if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
481         {
482                 ERROR ("rrd_create_file failed: Could not calculate DSes");
483                 return (-1);
484         }
485
486         argc = ds_num + rra_num;
487
488         if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
489         {
490                 char errbuf[1024];
491                 ERROR ("rrd_create failed: %s",
492                                 sstrerror (errno, errbuf, sizeof (errbuf)));
493                 return (-1);
494         }
495
496         memcpy (argv, ds_def, ds_num * sizeof (char *));
497         memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *));
498         argv[ds_num + rra_num] = NULL;
499
500         assert (vl->time > 10);
501         status = srrd_create (filename,
502                         (stepsize > 0) ? stepsize : vl->interval,
503                         vl->time - 10,
504                         argc, argv);
505
506         free (argv);
507         ds_free (ds_num, ds_def);
508         rra_free (rra_num, rra_def);
509
510         return (status);
511 }
512
513 static int value_list_to_string (char *buffer, int buffer_len,
514                 const data_set_t *ds, const value_list_t *vl)
515 {
516         int offset;
517         int status;
518         int i;
519
520         memset (buffer, '\0', buffer_len);
521
522         status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
523         if ((status < 1) || (status >= buffer_len))
524                 return (-1);
525         offset = status;
526
527         for (i = 0; i < ds->ds_num; i++)
528         {
529                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
530                                 && (ds->ds[i].type != DS_TYPE_GAUGE))
531                         return (-1);
532
533                 if (ds->ds[i].type == DS_TYPE_COUNTER)
534                         status = ssnprintf (buffer + offset, buffer_len - offset,
535                                         ":%llu", vl->values[i].counter);
536                 else
537                         status = ssnprintf (buffer + offset, buffer_len - offset,
538                                         ":%lf", vl->values[i].gauge);
539
540                 if ((status < 1) || (status >= (buffer_len - offset)))
541                         return (-1);
542
543                 offset += status;
544         } /* for ds->ds_num */
545
546         return (0);
547 } /* int value_list_to_string */
548
549 static int value_list_to_filename (char *buffer, int buffer_len,
550                 const data_set_t *ds, const value_list_t *vl)
551 {
552         int offset = 0;
553         int status;
554
555         if (datadir != NULL)
556         {
557                 status = ssnprintf (buffer + offset, buffer_len - offset,
558                                 "%s/", datadir);
559                 if ((status < 1) || (status >= buffer_len - offset))
560                         return (-1);
561                 offset += status;
562         }
563
564         status = ssnprintf (buffer + offset, buffer_len - offset,
565                         "%s/", vl->host);
566         if ((status < 1) || (status >= buffer_len - offset))
567                 return (-1);
568         offset += status;
569
570         if (strlen (vl->plugin_instance) > 0)
571                 status = ssnprintf (buffer + offset, buffer_len - offset,
572                                 "%s-%s/", vl->plugin, vl->plugin_instance);
573         else
574                 status = ssnprintf (buffer + offset, buffer_len - offset,
575                                 "%s/", vl->plugin);
576         if ((status < 1) || (status >= buffer_len - offset))
577                 return (-1);
578         offset += status;
579
580         if (strlen (vl->type_instance) > 0)
581                 status = ssnprintf (buffer + offset, buffer_len - offset,
582                                 "%s-%s.rrd", vl->type, vl->type_instance);
583         else
584                 status = ssnprintf (buffer + offset, buffer_len - offset,
585                                 "%s.rrd", vl->type);
586         if ((status < 1) || (status >= buffer_len - offset))
587                 return (-1);
588         offset += status;
589
590         return (0);
591 } /* int value_list_to_filename */
592
593 static void *rrd_queue_thread (void *data)
594 {
595         while (42)
596         {
597                 rrd_queue_t *queue_entry;
598                 rrd_cache_t *cache_entry;
599                 char **values;
600                 int    values_num;
601                 int    i;
602
603                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
604                  * the same time, ALWAYS lock `cache_lock' first! */
605
606                 /* wait until an entry is available */
607                 pthread_mutex_lock (&queue_lock);
608                 while ((queue_head == NULL) && (do_shutdown == 0))
609                         pthread_cond_wait (&queue_cond, &queue_lock);
610
611                 /* We're in the shutdown phase */
612                 if (queue_head == NULL)
613                 {
614                         pthread_mutex_unlock (&queue_lock);
615                         break;
616                 }
617
618                 /* Dequeue the first entry */
619                 queue_entry = queue_head;
620                 if (queue_head == queue_tail)
621                         queue_head = queue_tail = NULL;
622                 else
623                         queue_head = queue_head->next;
624
625                 /* Unlock the queue again */
626                 pthread_mutex_unlock (&queue_lock);
627
628                 /* We now need the cache lock so the entry isn't updated while
629                  * we make a copy of it's values */
630                 pthread_mutex_lock (&cache_lock);
631
632                 c_avl_get (cache, queue_entry->filename, (void *) &cache_entry);
633
634                 values = cache_entry->values;
635                 values_num = cache_entry->values_num;
636
637                 cache_entry->values = NULL;
638                 cache_entry->values_num = 0;
639                 cache_entry->flags = FLAG_NONE;
640
641                 pthread_mutex_unlock (&cache_lock);
642
643                 /* Write the values to the RRD-file */
644                 srrd_update (queue_entry->filename, NULL, values_num, values);
645                 DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
646                                 values_num, queue_entry->filename);
647
648                 for (i = 0; i < values_num; i++)
649                 {
650                         sfree (values[i]);
651                 }
652                 sfree (values);
653                 sfree (queue_entry->filename);
654                 sfree (queue_entry);
655         } /* while (42) */
656
657         pthread_mutex_lock (&cache_lock);
658         c_avl_destroy (cache);
659         cache = NULL;
660         pthread_mutex_unlock (&cache_lock);
661
662         pthread_exit ((void *) 0);
663         return ((void *) 0);
664 } /* void *rrd_queue_thread */
665
666 static int rrd_queue_cache_entry (const char *filename)
667 {
668         rrd_queue_t *queue_entry;
669
670         queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
671         if (queue_entry == NULL)
672                 return (-1);
673
674         queue_entry->filename = strdup (filename);
675         if (queue_entry->filename == NULL)
676         {
677                 free (queue_entry);
678                 return (-1);
679         }
680
681         queue_entry->next = NULL;
682
683         pthread_mutex_lock (&queue_lock);
684         if (queue_tail == NULL)
685                 queue_head = queue_entry;
686         else
687                 queue_tail->next = queue_entry;
688         queue_tail = queue_entry;
689         pthread_cond_signal (&queue_cond);
690         pthread_mutex_unlock (&queue_lock);
691
692         DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
693
694         return (0);
695 } /* int rrd_queue_cache_entry */
696
697 static void rrd_cache_flush (int timeout)
698 {
699         rrd_cache_t *rc;
700         time_t       now;
701
702         char **keys = NULL;
703         int    keys_num = 0;
704
705         char *key;
706         c_avl_iterator_t *iter;
707         int i;
708
709         DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
710
711         now = time (NULL);
712
713         /* Build a list of entries to be flushed */
714         iter = c_avl_get_iterator (cache);
715         while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
716         {
717                 if (rc->flags == FLAG_QUEUED)
718                         continue;
719                 else if ((now - rc->first_value) < timeout)
720                         continue;
721                 else if (rc->values_num > 0)
722                 {
723                         if (rrd_queue_cache_entry (key) == 0)
724                                 rc->flags = FLAG_QUEUED;
725                 }
726                 else /* ancient and no values -> waste of memory */
727                 {
728                         char **tmp = (char **) realloc ((void *) keys,
729                                         (keys_num + 1) * sizeof (char *));
730                         if (tmp == NULL)
731                         {
732                                 char errbuf[1024];
733                                 ERROR ("rrdtool plugin: "
734                                                 "realloc failed: %s",
735                                                 sstrerror (errno, errbuf,
736                                                         sizeof (errbuf)));
737                                 c_avl_iterator_destroy (iter);
738                                 sfree (keys);
739                                 return;
740                         }
741                         keys = tmp;
742                         keys[keys_num] = key;
743                         keys_num++;
744                 }
745         } /* while (c_avl_iterator_next) */
746         c_avl_iterator_destroy (iter);
747         
748         for (i = 0; i < keys_num; i++)
749         {
750                 if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
751                 {
752                         DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
753                         continue;
754                 }
755
756                 assert (rc->values == NULL);
757                 assert (rc->values_num == 0);
758
759                 sfree (rc);
760                 sfree (key);
761                 keys[i] = NULL;
762         } /* for (i = 0..keys_num) */
763
764         sfree (keys);
765
766         cache_flush_last = now;
767 } /* void rrd_cache_flush */
768
769 static int rrd_cache_insert (const char *filename,
770                 const char *value, time_t value_time)
771 {
772         rrd_cache_t *rc = NULL;
773         int new_rc = 0;
774         char **values_new;
775
776         pthread_mutex_lock (&cache_lock);
777
778         c_avl_get (cache, filename, (void *) &rc);
779
780         if (rc == NULL)
781         {
782                 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
783                 if (rc == NULL)
784                         return (-1);
785                 rc->values_num = 0;
786                 rc->values = NULL;
787                 rc->first_value = 0;
788                 rc->last_value = 0;
789                 rc->flags = FLAG_NONE;
790                 new_rc = 1;
791         }
792
793         if (rc->last_value >= value_time)
794         {
795                 pthread_mutex_unlock (&cache_lock);
796                 WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
797                                 (unsigned int) rc->last_value,
798                                 (unsigned int) value_time);
799                 return (-1);
800         }
801
802         values_new = (char **) realloc ((void *) rc->values,
803                         (rc->values_num + 1) * sizeof (char *));
804         if (values_new == NULL)
805         {
806                 char errbuf[1024];
807                 void *cache_key = NULL;
808
809                 sstrerror (errno, errbuf, sizeof (errbuf));
810
811                 c_avl_remove (cache, filename, &cache_key, NULL);
812                 pthread_mutex_unlock (&cache_lock);
813
814                 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
815
816                 sfree (cache_key);
817                 sfree (rc->values);
818                 sfree (rc);
819                 return (-1);
820         }
821         rc->values = values_new;
822
823         rc->values[rc->values_num] = strdup (value);
824         if (rc->values[rc->values_num] != NULL)
825                 rc->values_num++;
826
827         if (rc->values_num == 1)
828                 rc->first_value = value_time;
829         rc->last_value = value_time;
830
831         /* Insert if this is the first value */
832         if (new_rc == 1)
833         {
834                 void *cache_key = strdup (filename);
835
836                 if (cache_key == NULL)
837                 {
838                         char errbuf[1024];
839                         sstrerror (errno, errbuf, sizeof (errbuf));
840
841                         pthread_mutex_unlock (&cache_lock);
842
843                         ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
844
845                         sfree (rc->values[0]);
846                         sfree (rc->values);
847                         sfree (rc);
848                         return (-1);
849                 }
850
851                 c_avl_insert (cache, cache_key, rc);
852         }
853
854         DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
855                         "values_num = %i; age = %u;",
856                         filename, rc->values_num,
857                         rc->last_value - rc->first_value);
858
859         if ((rc->last_value - rc->first_value) >= cache_timeout)
860         {
861                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
862                  * the same time, ALWAYS lock `cache_lock' first! */
863                 if (rc->flags != FLAG_QUEUED)
864                 {
865                         if (rrd_queue_cache_entry (filename) == 0)
866                                 rc->flags = FLAG_QUEUED;
867                 }
868                 else
869                 {
870                         DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
871                 }
872         }
873
874         if ((cache_timeout > 0) &&
875                         ((time (NULL) - cache_flush_last) > cache_flush_timeout))
876                 rrd_cache_flush (cache_flush_timeout);
877
878
879         pthread_mutex_unlock (&cache_lock);
880
881         return (0);
882 } /* int rrd_cache_insert */
883
884 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
885 {
886         int a = *((int *) a_ptr);
887         int b = *((int *) b_ptr);
888
889         if (a < b)
890                 return (-1);
891         else if (a > b)
892                 return (1);
893         else
894                 return (0);
895 } /* int rrd_compare_numeric */
896
897 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
898 {
899         struct stat  statbuf;
900         char         filename[512];
901         char         values[512];
902         int          status;
903
904         if (0 != strcmp (ds->type, vl->type)) {
905                 ERROR ("rrdtool plugin: DS type does not match value list type");
906                 return -1;
907         }
908
909         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
910                 return (-1);
911
912         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
913                 return (-1);
914
915         if (stat (filename, &statbuf) == -1)
916         {
917                 if (errno == ENOENT)
918                 {
919                         if (rrd_create_file (filename, ds, vl))
920                                 return (-1);
921                 }
922                 else
923                 {
924                         char errbuf[1024];
925                         ERROR ("stat(%s) failed: %s", filename,
926                                         sstrerror (errno, errbuf,
927                                                 sizeof (errbuf)));
928                         return (-1);
929                 }
930         }
931         else if (!S_ISREG (statbuf.st_mode))
932         {
933                 ERROR ("stat(%s): Not a regular file!",
934                                 filename);
935                 return (-1);
936         }
937
938         status = rrd_cache_insert (filename, values, vl->time);
939
940         return (status);
941 } /* int rrd_write */
942
943 static int rrd_flush (const int timeout)
944 {
945         pthread_mutex_lock (&cache_lock);
946
947         if (cache == NULL) {
948                 pthread_mutex_unlock (&cache_lock);
949                 return (0);
950         }
951
952         rrd_cache_flush (timeout);
953         pthread_mutex_unlock (&cache_lock);
954         return (0);
955 } /* int rrd_flush */
956
957 static int rrd_config (const char *key, const char *value)
958 {
959         if (strcasecmp ("CacheTimeout", key) == 0)
960         {
961                 int tmp = atoi (value);
962                 if (tmp < 0)
963                 {
964                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
965                                         "be greater than 0.\n");
966                         return (1);
967                 }
968                 cache_timeout = tmp;
969         }
970         else if (strcasecmp ("CacheFlush", key) == 0)
971         {
972                 int tmp = atoi (value);
973                 if (tmp < 0)
974                 {
975                         fprintf (stderr, "rrdtool: `CacheFlush' must "
976                                         "be greater than 0.\n");
977                         return (1);
978                 }
979                 cache_flush_timeout = tmp;
980         }
981         else if (strcasecmp ("DataDir", key) == 0)
982         {
983                 if (datadir != NULL)
984                         free (datadir);
985                 datadir = strdup (value);
986                 if (datadir != NULL)
987                 {
988                         int len = strlen (datadir);
989                         while ((len > 0) && (datadir[len - 1] == '/'))
990                         {
991                                 len--;
992                                 datadir[len] = '\0';
993                         }
994                         if (len <= 0)
995                         {
996                                 free (datadir);
997                                 datadir = NULL;
998                         }
999                 }
1000         }
1001         else if (strcasecmp ("StepSize", key) == 0)
1002         {
1003                 stepsize = atoi (value);
1004                 if (stepsize < 0)
1005                         stepsize = 0;
1006         }
1007         else if (strcasecmp ("HeartBeat", key) == 0)
1008         {
1009                 heartbeat = atoi (value);
1010                 if (heartbeat < 0)
1011                         heartbeat = 0;
1012         }
1013         else if (strcasecmp ("RRARows", key) == 0)
1014         {
1015                 int tmp = atoi (value);
1016                 if (tmp <= 0)
1017                 {
1018                         fprintf (stderr, "rrdtool: `RRARows' must "
1019                                         "be greater than 0.\n");
1020                         return (1);
1021                 }
1022                 rrarows = tmp;
1023         }
1024         else if (strcasecmp ("RRATimespan", key) == 0)
1025         {
1026                 char *saveptr = NULL;
1027                 char *dummy;
1028                 char *ptr;
1029                 char *value_copy;
1030                 int *tmp_alloc;
1031
1032                 value_copy = strdup (value);
1033                 if (value_copy == NULL)
1034                         return (1);
1035
1036                 dummy = value_copy;
1037                 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
1038                 {
1039                         dummy = NULL;
1040                         
1041                         tmp_alloc = realloc (rra_timespans_custom,
1042                                         sizeof (int) * (rra_timespans_custom_num + 1));
1043                         if (tmp_alloc == NULL)
1044                         {
1045                                 fprintf (stderr, "rrdtool: realloc failed.\n");
1046                                 free (value_copy);
1047                                 return (1);
1048                         }
1049                         rra_timespans_custom = tmp_alloc;
1050                         rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr);
1051                         if (rra_timespans_custom[rra_timespans_custom_num] != 0)
1052                                 rra_timespans_custom_num++;
1053                 } /* while (strtok_r) */
1054
1055                 qsort (/* base = */ rra_timespans_custom,
1056                                 /* nmemb  = */ rra_timespans_custom_num,
1057                                 /* size   = */ sizeof (rra_timespans_custom[0]),
1058                                 /* compar = */ rrd_compare_numeric);
1059
1060                 free (value_copy);
1061         }
1062         else if (strcasecmp ("XFF", key) == 0)
1063         {
1064                 double tmp = atof (value);
1065                 if ((tmp < 0.0) || (tmp >= 1.0))
1066                 {
1067                         fprintf (stderr, "rrdtool: `XFF' must "
1068                                         "be in the range 0 to 1 (exclusive).");
1069                         return (1);
1070                 }
1071                 xff = tmp;
1072         }
1073         else
1074         {
1075                 return (-1);
1076         }
1077         return (0);
1078 } /* int rrd_config */
1079
1080 static int rrd_shutdown (void)
1081 {
1082         pthread_mutex_lock (&cache_lock);
1083         rrd_cache_flush (-1);
1084         pthread_mutex_unlock (&cache_lock);
1085
1086         pthread_mutex_lock (&queue_lock);
1087         do_shutdown = 1;
1088         pthread_cond_signal (&queue_cond);
1089         pthread_mutex_unlock (&queue_lock);
1090
1091         /* Wait for all the values to be written to disk before returning. */
1092         if (queue_thread != 0)
1093         {
1094                 pthread_join (queue_thread, NULL);
1095                 queue_thread = 0;
1096                 DEBUG ("rrdtool plugin: queue_thread exited.");
1097         }
1098
1099         return (0);
1100 } /* int rrd_shutdown */
1101
1102 static int rrd_init (void)
1103 {
1104         int status;
1105
1106         if (stepsize < 0)
1107                 stepsize = 0;
1108         if (heartbeat <= 0)
1109                 heartbeat = 2 * stepsize;
1110
1111         if ((heartbeat > 0) && (heartbeat < interval_g))
1112                 WARNING ("rrdtool plugin: Your `heartbeat' is "
1113                                 "smaller than your `interval'. This will "
1114                                 "likely cause problems.");
1115         else if ((stepsize > 0) && (stepsize < interval_g))
1116                 WARNING ("rrdtool plugin: Your `stepsize' is "
1117                                 "smaller than your `interval'. This will "
1118                                 "create needlessly big RRD-files.");
1119
1120         /* Set the cache up */
1121         pthread_mutex_lock (&cache_lock);
1122
1123         cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1124         if (cache == NULL)
1125         {
1126                 ERROR ("rrdtool plugin: c_avl_create failed.");
1127                 return (-1);
1128         }
1129
1130         cache_flush_last = time (NULL);
1131         if (cache_timeout < 2)
1132         {
1133                 cache_timeout = 0;
1134                 cache_flush_timeout = 0;
1135         }
1136         else if (cache_flush_timeout < cache_timeout)
1137                 cache_flush_timeout = 10 * cache_timeout;
1138
1139         pthread_mutex_unlock (&cache_lock);
1140
1141         status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
1142         if (status != 0)
1143         {
1144                 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1145                 return (-1);
1146         }
1147
1148         DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
1149                         " heartbeat = %i; rrarows = %i; xff = %lf;",
1150                         (datadir == NULL) ? "(null)" : datadir,
1151                         stepsize, heartbeat, rrarows, xff);
1152
1153         return (0);
1154 } /* int rrd_init */
1155
1156 void module_register (void)
1157 {
1158         plugin_register_config ("rrdtool", rrd_config,
1159                         config_keys, config_keys_num);
1160         plugin_register_init ("rrdtool", rrd_init);
1161         plugin_register_write ("rrdtool", rrd_write);
1162         plugin_register_flush ("rrdtool", rrd_flush);
1163         plugin_register_shutdown ("rrdtool", rrd_shutdown);
1164 }