Merge branch 'collectd-4.0' into collectd-4.1
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006,2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
26
27 #if HAVE_PTHREAD_H
28 # include <pthread.h>
29 #endif
30
31 /*
32  * Private types
33  */
34 struct rrd_cache_s
35 {
36         int    values_num;
37         char **values;
38         time_t first_value;
39         time_t last_value;
40         enum
41         {
42                 FLAG_NONE   = 0x00,
43                 FLAG_QUEUED = 0x01
44         } flags;
45 };
46 typedef struct rrd_cache_s rrd_cache_t;
47
48 struct rrd_queue_s
49 {
50         char *filename;
51         struct rrd_queue_s *next;
52 };
53 typedef struct rrd_queue_s rrd_queue_t;
54
55 /*
56  * Private variables
57  */
58 static int rra_timespans[] =
59 {
60         3600,
61         86400,
62         604800,
63         2678400,
64         31622400
65 };
66 static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans);
67
68 static int *rra_timespans_custom = NULL;
69 static int rra_timespans_custom_num = 0;
70
71 static char *rra_types[] =
72 {
73         "AVERAGE",
74         "MIN",
75         "MAX"
76 };
77 static int rra_types_num = STATIC_ARRAY_SIZE (rra_types);
78
79 static const char *config_keys[] =
80 {
81         "CacheTimeout",
82         "CacheFlush",
83         "DataDir",
84         "StepSize",
85         "HeartBeat",
86         "RRARows",
87         "RRATimespan",
88         "XFF"
89 };
90 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
91
92 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
93  * is zero a default, depending on the `interval' member of the value list is
94  * being used. */
95 static char   *datadir   = NULL;
96 static int     stepsize  = 0;
97 static int     heartbeat = 0;
98 static int     rrarows   = 1200;
99 static double  xff       = 0.1;
100
101 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
102  * ALWAYS lock `cache_lock' first! */
103 static int         cache_timeout = 0;
104 static int         cache_flush_timeout = 0;
105 static time_t      cache_flush_last;
106 static avl_tree_t *cache = NULL;
107 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
108
109 static rrd_queue_t    *queue_head = NULL;
110 static rrd_queue_t    *queue_tail = NULL;
111 static pthread_t       queue_thread = 0;
112 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
114
115 static int do_shutdown = 0;
116
117 /* * * * * * * * * *
118  * WARNING:  Magic *
119  * * * * * * * * * */
120
121 static void rra_free (int rra_num, char **rra_def)
122 {
123         int i;
124
125         for (i = 0; i < rra_num; i++)
126         {
127                 sfree (rra_def[i]);
128         }
129         sfree (rra_def);
130 } /* void rra_free */
131
132 static int rra_get (char ***ret, const value_list_t *vl)
133 {
134         char **rra_def;
135         int rra_num;
136
137         int *rts;
138         int  rts_num;
139
140         int rra_max;
141
142         int span;
143
144         int cdp_num;
145         int cdp_len;
146         int i, j;
147
148         char buffer[64];
149
150         /* The stepsize we use here: If it is user-set, use it. If not, use the
151          * interval of the value-list. */
152         int ss;
153
154         if (rrarows <= 0)
155         {
156                 *ret = NULL;
157                 return (-1);
158         }
159
160         ss = (stepsize > 0) ? stepsize : vl->interval;
161         if (ss <= 0)
162         {
163                 *ret = NULL;
164                 return (-1);
165         }
166
167         /* Use the configured timespans or fall back to the built-in defaults */
168         if (rra_timespans_custom_num != 0)
169         {
170                 rts = rra_timespans_custom;
171                 rts_num = rra_timespans_custom_num;
172         }
173         else
174         {
175                 rts = rra_timespans;
176                 rts_num = rra_timespans_num;
177         }
178
179         rra_max = rts_num * rra_types_num;
180
181         if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
182                 return (-1);
183         memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
184         rra_num = 0;
185
186         cdp_len = 0;
187         for (i = 0; i < rts_num; i++)
188         {
189                 span = rts[i];
190
191                 if ((span / ss) < rrarows)
192                         continue;
193
194                 if (cdp_len == 0)
195                         cdp_len = 1;
196                 else
197                         cdp_len = (int) floor (((double) span)
198                                         / ((double) (rrarows * ss)));
199
200                 cdp_num = (int) ceil (((double) span)
201                                 / ((double) (cdp_len * ss)));
202
203                 for (j = 0; j < rra_types_num; j++)
204                 {
205                         if (rra_num >= rra_max)
206                                 break;
207
208                         if (snprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u",
209                                                 rra_types[j], xff,
210                                                 cdp_len, cdp_num) >= sizeof (buffer))
211                         {
212                                 ERROR ("rra_get: Buffer would have been truncated.");
213                                 continue;
214                         }
215
216                         rra_def[rra_num++] = sstrdup (buffer);
217                 }
218         }
219
220 #if COLLECT_DEBUG
221         DEBUG ("rra_num = %i", rra_num);
222         for (i = 0; i < rra_num; i++)
223                 DEBUG ("  %s", rra_def[i]);
224 #endif
225
226         *ret = rra_def;
227         return (rra_num);
228 } /* int rra_get */
229
230 static void ds_free (int ds_num, char **ds_def)
231 {
232         int i;
233
234         for (i = 0; i < ds_num; i++)
235                 if (ds_def[i] != NULL)
236                         free (ds_def[i]);
237         free (ds_def);
238 }
239
240 static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
241 {
242         char **ds_def;
243         int ds_num;
244
245         char min[32];
246         char max[32];
247         char buffer[128];
248
249         DEBUG ("ds->ds_num = %i", ds->ds_num);
250
251         ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
252         if (ds_def == NULL)
253         {
254                 char errbuf[1024];
255                 ERROR ("rrdtool plugin: malloc failed: %s",
256                                 sstrerror (errno, errbuf, sizeof (errbuf)));
257                 return (-1);
258         }
259         memset (ds_def, '\0', ds->ds_num * sizeof (char *));
260
261         for (ds_num = 0; ds_num < ds->ds_num; ds_num++)
262         {
263                 data_source_t *d = ds->ds + ds_num;
264                 char *type;
265                 int status;
266
267                 ds_def[ds_num] = NULL;
268
269                 if (d->type == DS_TYPE_COUNTER)
270                         type = "COUNTER";
271                 else if (d->type == DS_TYPE_GAUGE)
272                         type = "GAUGE";
273                 else
274                 {
275                         ERROR ("rrdtool plugin: Unknown DS type: %i",
276                                         d->type);
277                         break;
278                 }
279
280                 if (isnan (d->min))
281                 {
282                         strcpy (min, "U");
283                 }
284                 else
285                 {
286                         snprintf (min, sizeof (min), "%lf", d->min);
287                         min[sizeof (min) - 1] = '\0';
288                 }
289
290                 if (isnan (d->max))
291                 {
292                         strcpy (max, "U");
293                 }
294                 else
295                 {
296                         snprintf (max, sizeof (max), "%lf", d->max);
297                         max[sizeof (max) - 1] = '\0';
298                 }
299
300                 status = snprintf (buffer, sizeof (buffer),
301                                 "DS:%s:%s:%i:%s:%s",
302                                 d->name, type,
303                                 (heartbeat > 0) ? heartbeat : (2 * vl->interval),
304                                 min, max);
305                 if ((status < 1) || (status >= sizeof (buffer)))
306                         break;
307
308                 ds_def[ds_num] = sstrdup (buffer);
309         } /* for ds_num = 0 .. ds->ds_num */
310
311 #if COLLECT_DEBUG
312 {
313         int i;
314         DEBUG ("ds_num = %i", ds_num);
315         for (i = 0; i < ds_num; i++)
316                 DEBUG ("  %s", ds_def[i]);
317 }
318 #endif
319
320         if (ds_num != ds->ds_num)
321         {
322                 ds_free (ds_num, ds_def);
323                 return (-1);
324         }
325
326         *ret = ds_def;
327         return (ds_num);
328 }
329
330 static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
331 {
332         char **argv;
333         int argc;
334         char **rra_def;
335         int rra_num;
336         char **ds_def;
337         int ds_num;
338         int i, j;
339         char stepsize_str[16];
340         char begin_str[16];
341         int status = 0;
342
343         if (check_create_dir (filename))
344                 return (-1);
345
346         if ((rra_num = rra_get (&rra_def, vl)) < 1)
347         {
348                 ERROR ("rrd_create_file failed: Could not calculate RRAs");
349                 return (-1);
350         }
351
352         if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
353         {
354                 ERROR ("rrd_create_file failed: Could not calculate DSes");
355                 return (-1);
356         }
357
358         argc = ds_num + rra_num + 6;
359
360         if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
361         {
362                 char errbuf[1024];
363                 ERROR ("rrd_create failed: %s",
364                                 sstrerror (errno, errbuf, sizeof (errbuf)));
365                 return (-1);
366         }
367
368         status = snprintf (stepsize_str, sizeof (stepsize_str),
369                         "%i", (stepsize > 0) ? stepsize : vl->interval);
370         if ((status < 1) || (status >= sizeof (stepsize_str)))
371         {
372                 ERROR ("rrdtool plugin: snprintf failed.");
373                 free (argv);
374                 ds_free (ds_num, ds_def);
375                 rra_free (rra_num, rra_def);
376                 return (-1);
377         }
378
379         assert (vl->time > 10);
380         status = snprintf (begin_str, sizeof (begin_str),
381                         "%llu", (unsigned long long) (vl->time - 10));
382         if ((status < 1) || (status >= sizeof (begin_str)))
383         {
384                 ERROR ("rrdtool plugin: snprintf failed.");
385                 return (-1);
386         }
387
388         argv[0] = "create";
389         argv[1] = filename;
390         argv[2] = "-b";
391         argv[3] = begin_str;
392         argv[4] = "-s";
393         argv[5] = stepsize_str;
394
395         j = 6;
396         for (i = 0; i < ds_num; i++)
397                 argv[j++] = ds_def[i];
398         for (i = 0; i < rra_num; i++)
399                 argv[j++] = rra_def[i];
400         argv[j] = NULL;
401
402         optind = 0; /* bug in librrd? */
403         rrd_clear_error ();
404         if (rrd_create (argc, argv) == -1)
405         {
406                 ERROR ("rrd_create failed: %s: %s", filename, rrd_get_error ());
407                 status = -1;
408         }
409
410         free (argv);
411         ds_free (ds_num, ds_def);
412         rra_free (rra_num, rra_def);
413
414         return (status);
415 }
416
417 static int value_list_to_string (char *buffer, int buffer_len,
418                 const data_set_t *ds, const value_list_t *vl)
419 {
420         int offset;
421         int status;
422         int i;
423
424         memset (buffer, '\0', buffer_len);
425
426         status = snprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
427         if ((status < 1) || (status >= buffer_len))
428                 return (-1);
429         offset = status;
430
431         for (i = 0; i < ds->ds_num; i++)
432         {
433                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
434                                 && (ds->ds[i].type != DS_TYPE_GAUGE))
435                         return (-1);
436
437                 if (ds->ds[i].type == DS_TYPE_COUNTER)
438                         status = snprintf (buffer + offset, buffer_len - offset,
439                                         ":%llu", vl->values[i].counter);
440                 else
441                         status = snprintf (buffer + offset, buffer_len - offset,
442                                         ":%lf", vl->values[i].gauge);
443
444                 if ((status < 1) || (status >= (buffer_len - offset)))
445                         return (-1);
446
447                 offset += status;
448         } /* for ds->ds_num */
449
450         return (0);
451 } /* int value_list_to_string */
452
453 static int value_list_to_filename (char *buffer, int buffer_len,
454                 const data_set_t *ds, const value_list_t *vl)
455 {
456         int offset = 0;
457         int status;
458
459         if (datadir != NULL)
460         {
461                 status = snprintf (buffer + offset, buffer_len - offset,
462                                 "%s/", datadir);
463                 if ((status < 1) || (status >= buffer_len - offset))
464                         return (-1);
465                 offset += status;
466         }
467
468         status = snprintf (buffer + offset, buffer_len - offset,
469                         "%s/", vl->host);
470         if ((status < 1) || (status >= buffer_len - offset))
471                 return (-1);
472         offset += status;
473
474         if (strlen (vl->plugin_instance) > 0)
475                 status = snprintf (buffer + offset, buffer_len - offset,
476                                 "%s-%s/", vl->plugin, vl->plugin_instance);
477         else
478                 status = snprintf (buffer + offset, buffer_len - offset,
479                                 "%s/", vl->plugin);
480         if ((status < 1) || (status >= buffer_len - offset))
481                 return (-1);
482         offset += status;
483
484         if (strlen (vl->type_instance) > 0)
485                 status = snprintf (buffer + offset, buffer_len - offset,
486                                 "%s-%s.rrd", ds->type, vl->type_instance);
487         else
488                 status = snprintf (buffer + offset, buffer_len - offset,
489                                 "%s.rrd", ds->type);
490         if ((status < 1) || (status >= buffer_len - offset))
491                 return (-1);
492         offset += status;
493
494         return (0);
495 } /* int value_list_to_filename */
496
497 static int rrd_write_to_file (char *filename, char **values, int values_num)
498 {
499         char **argv;
500         int argc;
501         int status;
502
503         if (values_num < 1)
504                 return (0);
505
506         argc = values_num + 2;
507         argv = (char **) malloc ((argc + 1) * sizeof (char *));
508         if (argv == NULL)
509                 return (-1);
510
511         argv[0] = "update";
512         argv[1] = filename;
513         memcpy (argv + 2, values, values_num * sizeof (char *));
514         argv[argc] = NULL;
515
516         DEBUG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv);
517
518         optind = 0; /* bug in librrd? */
519         rrd_clear_error ();
520         status = rrd_update (argc, argv);
521         if (status != 0)
522         {
523                 WARNING ("rrd_update failed: %s: %s",
524                                 filename, rrd_get_error ());
525                 status = -1;
526         }
527
528         sfree (argv);
529
530         return (status);
531 } /* int rrd_write_cache_entry */
532
533 static void *rrd_queue_thread (void *data)
534 {
535         while (42)
536         {
537                 rrd_queue_t *queue_entry;
538                 rrd_cache_t *cache_entry;
539                 char **values;
540                 int    values_num;
541                 int    i;
542
543                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
544                  * the same time, ALWAYS lock `cache_lock' first! */
545
546                 /* wait until an entry is available */
547                 pthread_mutex_lock (&queue_lock);
548                 while ((queue_head == NULL) && (do_shutdown == 0))
549                         pthread_cond_wait (&queue_cond, &queue_lock);
550
551                 /* We're in the shutdown phase */
552                 if (queue_head == NULL)
553                 {
554                         pthread_mutex_unlock (&queue_lock);
555                         break;
556                 }
557
558                 /* Dequeue the first entry */
559                 queue_entry = queue_head;
560                 if (queue_head == queue_tail)
561                         queue_head = queue_tail = NULL;
562                 else
563                         queue_head = queue_head->next;
564
565                 /* Unlock the queue again */
566                 pthread_mutex_unlock (&queue_lock);
567
568                 /* We now need the cache lock so the entry isn't updated while
569                  * we make a copy of it's values */
570                 pthread_mutex_lock (&cache_lock);
571
572                 avl_get (cache, queue_entry->filename, (void *) &cache_entry);
573
574                 values = cache_entry->values;
575                 values_num = cache_entry->values_num;
576
577                 cache_entry->values = NULL;
578                 cache_entry->values_num = 0;
579                 cache_entry->flags = FLAG_NONE;
580
581                 pthread_mutex_unlock (&cache_lock);
582
583                 /* Write the values to the RRD-file */
584                 rrd_write_to_file (queue_entry->filename, values, values_num);
585
586                 for (i = 0; i < values_num; i++)
587                 {
588                         sfree (values[i]);
589                 }
590                 sfree (values);
591                 sfree (queue_entry->filename);
592                 sfree (queue_entry);
593         } /* while (42) */
594
595         pthread_mutex_lock (&cache_lock);
596         avl_destroy (cache);
597         cache = NULL;
598         pthread_mutex_unlock (&cache_lock);
599
600         pthread_exit ((void *) 0);
601         return ((void *) 0);
602 } /* void *rrd_queue_thread */
603
604 static int rrd_queue_cache_entry (const char *filename)
605 {
606         rrd_queue_t *queue_entry;
607
608         queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
609         if (queue_entry == NULL)
610                 return (-1);
611
612         queue_entry->filename = strdup (filename);
613         if (queue_entry->filename == NULL)
614         {
615                 free (queue_entry);
616                 return (-1);
617         }
618
619         queue_entry->next = NULL;
620
621         pthread_mutex_lock (&queue_lock);
622         if (queue_tail == NULL)
623                 queue_head = queue_entry;
624         else
625                 queue_tail->next = queue_entry;
626         queue_tail = queue_entry;
627         pthread_cond_signal (&queue_cond);
628         pthread_mutex_unlock (&queue_lock);
629
630         DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
631
632         return (0);
633 } /* int rrd_queue_cache_entry */
634
635 static void rrd_cache_flush (int timeout)
636 {
637         rrd_cache_t *rc;
638         time_t       now;
639
640         char **keys = NULL;
641         int    keys_num = 0;
642
643         char *key;
644         avl_iterator_t *iter;
645         int i;
646
647         DEBUG ("Flushing cache, timeout = %i", timeout);
648
649         now = time (NULL);
650
651         /* Build a list of entries to be flushed */
652         iter = avl_get_iterator (cache);
653         while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
654         {
655                 DEBUG ("key = %s; age = %i;", key, now - rc->first_value);
656
657                 if (rc->flags == FLAG_QUEUED)
658                         continue;
659                 else if ((now - rc->first_value) < timeout)
660                         continue;
661                 else if (rc->values_num > 0)
662                 {
663                         if (rrd_queue_cache_entry (key) == 0)
664                                 rc->flags = FLAG_QUEUED;
665                 }
666                 else /* ancient and no values -> waste of memory */
667                 {
668                         keys = (char **) realloc ((void *) keys,
669                                         (keys_num + 1) * sizeof (char *));
670                         if (keys == NULL)
671                         {
672                                 char errbuf[1024];
673                                 ERROR ("rrdtool plugin: "
674                                                 "realloc failed: %s",
675                                                 sstrerror (errno, errbuf,
676                                                         sizeof (errbuf)));
677                                 avl_iterator_destroy (iter);
678                                 return;
679                         }
680                         keys[keys_num] = key;
681                         keys_num++;
682                 }
683         } /* while (avl_iterator_next) */
684         avl_iterator_destroy (iter);
685         
686         for (i = 0; i < keys_num; i++)
687         {
688                 if (avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
689                 {
690                         DEBUG ("avl_remove (%s) failed.", keys[i]);
691                         continue;
692                 }
693
694                 assert (rc->values == NULL);
695                 assert (rc->values_num == 0);
696
697                 sfree (rc);
698                 sfree (key);
699                 keys[i] = NULL;
700         } /* for (i = 0..keys_num) */
701
702         free (keys);
703         DEBUG ("Flushed %i value(s)", keys_num);
704
705         cache_flush_last = now;
706 } /* void rrd_cache_flush */
707
708 static int rrd_cache_insert (const char *filename,
709                 const char *value, time_t value_time)
710 {
711         rrd_cache_t *rc = NULL;
712         int new_rc = 0;
713         char **values_new;
714
715         pthread_mutex_lock (&cache_lock);
716
717         avl_get (cache, filename, (void *) &rc);
718
719         if (rc == NULL)
720         {
721                 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
722                 if (rc == NULL)
723                         return (-1);
724                 rc->values_num = 0;
725                 rc->values = NULL;
726                 rc->first_value = 0;
727                 rc->last_value = 0;
728                 rc->flags = FLAG_NONE;
729                 new_rc = 1;
730         }
731
732         if (rc->last_value >= value_time)
733         {
734                 pthread_mutex_unlock (&cache_lock);
735                 WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
736                                 (unsigned int) rc->last_value,
737                                 (unsigned int) value_time);
738                 return (-1);
739         }
740
741         values_new = (char **) realloc ((void *) rc->values,
742                         (rc->values_num + 1) * sizeof (char *));
743         if (values_new == NULL)
744         {
745                 char errbuf[1024];
746                 void *cache_key = NULL;
747
748                 sstrerror (errno, errbuf, sizeof (errbuf));
749
750                 avl_remove (cache, filename, &cache_key, NULL);
751                 pthread_mutex_unlock (&cache_lock);
752
753                 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
754
755                 sfree (cache_key);
756                 sfree (rc->values);
757                 sfree (rc);
758                 return (-1);
759         }
760         rc->values = values_new;
761
762         rc->values[rc->values_num] = strdup (value);
763         if (rc->values[rc->values_num] != NULL)
764                 rc->values_num++;
765
766         if (rc->values_num == 1)
767                 rc->first_value = value_time;
768         rc->last_value = value_time;
769
770         /* Insert if this is the first value */
771         if (new_rc == 1)
772         {
773                 void *cache_key = strdup (filename);
774
775                 if (cache_key == NULL)
776                 {
777                         char errbuf[1024];
778                         sstrerror (errno, errbuf, sizeof (errbuf));
779
780                         pthread_mutex_unlock (&cache_lock);
781
782                         ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
783
784                         sfree (rc->values[0]);
785                         sfree (rc->values);
786                         sfree (rc);
787                         return (-1);
788                 }
789
790                 avl_insert (cache, cache_key, rc);
791         }
792
793         DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value,
794                         (unsigned int) value_time, (void *) rc);
795
796         if ((rc->last_value - rc->first_value) >= cache_timeout)
797         {
798                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
799                  * the same time, ALWAYS lock `cache_lock' first! */
800                 if (rc->flags != FLAG_QUEUED)
801                 {
802                         if (rrd_queue_cache_entry (filename) == 0)
803                                 rc->flags = FLAG_QUEUED;
804                 }
805                 else
806                 {
807                         DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
808                 }
809         }
810
811         if ((cache_timeout > 0) &&
812                         ((time (NULL) - cache_flush_last) > cache_flush_timeout))
813                 rrd_cache_flush (cache_flush_timeout);
814
815
816         pthread_mutex_unlock (&cache_lock);
817
818         return (0);
819 } /* int rrd_cache_insert */
820
821 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
822 {
823         struct stat  statbuf;
824         char         filename[512];
825         char         values[512];
826         int          status;
827
828         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
829                 return (-1);
830
831         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
832                 return (-1);
833
834         if (stat (filename, &statbuf) == -1)
835         {
836                 if (errno == ENOENT)
837                 {
838                         if (rrd_create_file (filename, ds, vl))
839                                 return (-1);
840                 }
841                 else
842                 {
843                         char errbuf[1024];
844                         ERROR ("stat(%s) failed: %s", filename,
845                                         sstrerror (errno, errbuf,
846                                                 sizeof (errbuf)));
847                         return (-1);
848                 }
849         }
850         else if (!S_ISREG (statbuf.st_mode))
851         {
852                 ERROR ("stat(%s): Not a regular file!",
853                                 filename);
854                 return (-1);
855         }
856
857         status = rrd_cache_insert (filename, values, vl->time);
858
859         return (status);
860 } /* int rrd_write */
861
862 static int rrd_config (const char *key, const char *value)
863 {
864         if (strcasecmp ("CacheTimeout", key) == 0)
865         {
866                 int tmp = atoi (value);
867                 if (tmp < 0)
868                 {
869                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
870                                         "be greater than 0.\n");
871                         return (1);
872                 }
873                 cache_timeout = tmp;
874         }
875         else if (strcasecmp ("CacheFlush", key) == 0)
876         {
877                 int tmp = atoi (value);
878                 if (tmp < 0)
879                 {
880                         fprintf (stderr, "rrdtool: `CacheFlush' must "
881                                         "be greater than 0.\n");
882                         return (1);
883                 }
884                 cache_flush_timeout = tmp;
885         }
886         else if (strcasecmp ("DataDir", key) == 0)
887         {
888                 if (datadir != NULL)
889                         free (datadir);
890                 datadir = strdup (value);
891                 if (datadir != NULL)
892                 {
893                         int len = strlen (datadir);
894                         while ((len > 0) && (datadir[len - 1] == '/'))
895                         {
896                                 len--;
897                                 datadir[len] = '\0';
898                         }
899                         if (len <= 0)
900                         {
901                                 free (datadir);
902                                 datadir = NULL;
903                         }
904                 }
905         }
906         else if (strcasecmp ("StepSize", key) == 0)
907         {
908                 stepsize = atoi (value);
909                 if (stepsize < 0)
910                         stepsize = 0;
911         }
912         else if (strcasecmp ("HeartBeat", key) == 0)
913         {
914                 heartbeat = atoi (value);
915                 if (heartbeat < 0)
916                         heartbeat = 0;
917         }
918         else if (strcasecmp ("RRARows", key) == 0)
919         {
920                 int tmp = atoi (value);
921                 if (tmp <= 0)
922                 {
923                         fprintf (stderr, "rrdtool: `RRARows' must "
924                                         "be greater than 0.\n");
925                         return (1);
926                 }
927                 rrarows = tmp;
928         }
929         else if (strcasecmp ("RRATimespan", key) == 0)
930         {
931                 char *saveptr = NULL;
932                 char *dummy;
933                 char *ptr;
934                 char *value_copy;
935                 int *tmp_alloc;
936
937                 value_copy = strdup (value);
938                 if (value_copy == NULL)
939                         return (1);
940
941                 dummy = value_copy;
942                 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
943                 {
944                         dummy = NULL;
945                         
946                         tmp_alloc = realloc (rra_timespans_custom,
947                                         sizeof (int) * (rra_timespans_custom_num + 1));
948                         if (tmp_alloc == NULL)
949                         {
950                                 fprintf (stderr, "rrdtool: realloc failed.\n");
951                                 free (value_copy);
952                                 return (1);
953                         }
954                         rra_timespans_custom = tmp_alloc;
955                         rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr);
956                         if (rra_timespans_custom[rra_timespans_custom_num] != 0)
957                                 rra_timespans_custom_num++;
958                 } /* while (strtok_r) */
959                 free (value_copy);
960         }
961         else if (strcasecmp ("XFF", key) == 0)
962         {
963                 double tmp = atof (value);
964                 if ((tmp < 0.0) || (tmp >= 1.0))
965                 {
966                         fprintf (stderr, "rrdtool: `XFF' must "
967                                         "be in the range 0 to 1 (exclusive).");
968                         return (1);
969                 }
970                 xff = tmp;
971         }
972         else
973         {
974                 return (-1);
975         }
976         return (0);
977 } /* int rrd_config */
978
979 static int rrd_shutdown (void)
980 {
981         pthread_mutex_lock (&cache_lock);
982         rrd_cache_flush (-1);
983         pthread_mutex_unlock (&cache_lock);
984
985         pthread_mutex_lock (&queue_lock);
986         do_shutdown = 1;
987         pthread_cond_signal (&queue_cond);
988         pthread_mutex_unlock (&queue_lock);
989
990         return (0);
991 } /* int rrd_shutdown */
992
993 static int rrd_init (void)
994 {
995         int status;
996
997         if (stepsize < 0)
998                 stepsize = 0;
999         if (heartbeat <= 0)
1000         {
1001                 if (stepsize > 0)
1002                         heartbeat = 2 * stepsize;
1003                 else
1004                         heartbeat = 0;
1005         }
1006
1007         if ((heartbeat > 0) && (heartbeat < interval_g))
1008                 WARNING ("rrdtool plugin: Your `heartbeat' is "
1009                                 "smaller than your `interval'. This will "
1010                                 "likely cause problems.");
1011         else if ((stepsize > 0) && (stepsize < interval_g))
1012                 WARNING ("rrdtool plugin: Your `stepsize' is "
1013                                 "smaller than your `interval'. This will "
1014                                 "create needlessly big RRD-files.");
1015
1016         /* Set the cache up */
1017         pthread_mutex_lock (&cache_lock);
1018
1019         cache = avl_create ((int (*) (const void *, const void *)) strcmp);
1020         if (cache == NULL)
1021         {
1022                 ERROR ("rrdtool plugin: avl_create failed.");
1023                 return (-1);
1024         }
1025
1026         cache_flush_last = time (NULL);
1027         if (cache_timeout < 2)
1028         {
1029                 cache_timeout = 0;
1030                 cache_flush_timeout = 0;
1031         }
1032         else if (cache_flush_timeout < cache_timeout)
1033                 cache_flush_timeout = 10 * cache_timeout;
1034
1035         pthread_mutex_unlock (&cache_lock);
1036
1037         status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
1038         if (status != 0)
1039         {
1040                 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1041                 return (-1);
1042         }
1043
1044         DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
1045                         " heartbeat = %i; rrarows = %i; xff = %lf;",
1046                         (datadir == NULL) ? "(null)" : datadir,
1047                         stepsize, heartbeat, rrarows, xff);
1048
1049         return (0);
1050 } /* int rrd_init */
1051
1052 void module_register (void)
1053 {
1054         plugin_register_config ("rrdtool", rrd_config,
1055                         config_keys, config_keys_num);
1056         plugin_register_init ("rrdtool", rrd_init);
1057         plugin_register_write ("rrdtool", rrd_write);
1058         plugin_register_shutdown ("rrdtool", rrd_shutdown);
1059 }