Merge branch 'collectd-4.0'
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006,2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
26
27 #if HAVE_PTHREAD_H
28 # include <pthread.h>
29 #endif
30
31 /*
32  * Private types
33  */
34 struct rrd_cache_s
35 {
36         int    values_num;
37         char **values;
38         time_t first_value;
39         time_t last_value;
40         enum
41         {
42                 FLAG_NONE   = 0x00,
43                 FLAG_QUEUED = 0x01
44         } flags;
45 };
46 typedef struct rrd_cache_s rrd_cache_t;
47
48 struct rrd_queue_s
49 {
50         char *filename;
51         struct rrd_queue_s *next;
52 };
53 typedef struct rrd_queue_s rrd_queue_t;
54
55 /*
56  * Private variables
57  */
58 static int rra_timespans[] =
59 {
60         3600,
61         86400,
62         604800,
63         2678400,
64         31622400
65 };
66 static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans);
67
68 static int *rra_timespans_custom = NULL;
69 static int rra_timespans_custom_num = 0;
70
71 static char *rra_types[] =
72 {
73         "AVERAGE",
74         "MIN",
75         "MAX"
76 };
77 static int rra_types_num = STATIC_ARRAY_SIZE (rra_types);
78
79 static const char *config_keys[] =
80 {
81         "CacheTimeout",
82         "CacheFlush",
83         "DataDir",
84         "StepSize",
85         "HeartBeat",
86         "RRARows",
87         "RRATimespan",
88         "XFF"
89 };
90 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
91
92 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
93  * is zero a default, depending on the `interval' member of the value list is
94  * being used. */
95 static char   *datadir   = NULL;
96 static int     stepsize  = 0;
97 static int     heartbeat = 0;
98 static int     rrarows   = 1200;
99 static double  xff       = 0.1;
100
101 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
102  * ALWAYS lock `cache_lock' first! */
103 static int         cache_timeout = 0;
104 static int         cache_flush_timeout = 0;
105 static time_t      cache_flush_last;
106 static avl_tree_t *cache = NULL;
107 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
108
109 static rrd_queue_t    *queue_head = NULL;
110 static rrd_queue_t    *queue_tail = NULL;
111 static pthread_t       queue_thread = 0;
112 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
114
115 static int do_shutdown = 0;
116
117 /* * * * * * * * * *
118  * WARNING:  Magic *
119  * * * * * * * * * */
120
121 static void rra_free (int rra_num, char **rra_def)
122 {
123         int i;
124
125         for (i = 0; i < rra_num; i++)
126         {
127                 sfree (rra_def[i]);
128         }
129         sfree (rra_def);
130 } /* void rra_free */
131
132 static int rra_get (char ***ret, const value_list_t *vl)
133 {
134         char **rra_def;
135         int rra_num;
136
137         int *rts;
138         int  rts_num;
139
140         int rra_max;
141
142         int span;
143
144         int cdp_num;
145         int cdp_len;
146         int i, j;
147
148         char buffer[64];
149
150         /* The stepsize we use here: If it is user-set, use it. If not, use the
151          * interval of the value-list. */
152         int ss;
153
154         if (rrarows <= 0)
155         {
156                 *ret = NULL;
157                 return (-1);
158         }
159
160         ss = (stepsize > 0) ? stepsize : vl->interval;
161         if (ss <= 0)
162         {
163                 *ret = NULL;
164                 return (-1);
165         }
166
167         /* Use the configured timespans or fall back to the built-in defaults */
168         if (rra_timespans_custom_num != 0)
169         {
170                 rts = rra_timespans_custom;
171                 rts_num = rra_timespans_custom_num;
172         }
173         else
174         {
175                 rts = rra_timespans;
176                 rts_num = rra_timespans_num;
177         }
178
179         rra_max = rts_num * rra_types_num;
180
181         if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
182                 return (-1);
183         memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
184         rra_num = 0;
185
186         cdp_len = 0;
187         for (i = 0; i < rts_num; i++)
188         {
189                 span = rts[i];
190
191                 if ((span / ss) < rrarows)
192                         continue;
193
194                 if (cdp_len == 0)
195                         cdp_len = 1;
196                 else
197                         cdp_len = (int) floor (((double) span)
198                                         / ((double) (rrarows * ss)));
199
200                 cdp_num = (int) ceil (((double) span)
201                                 / ((double) (cdp_len * ss)));
202
203                 for (j = 0; j < rra_types_num; j++)
204                 {
205                         if (rra_num >= rra_max)
206                                 break;
207
208                         if (snprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u",
209                                                 rra_types[j], xff,
210                                                 cdp_len, cdp_num) >= sizeof (buffer))
211                         {
212                                 ERROR ("rra_get: Buffer would have been truncated.");
213                                 continue;
214                         }
215
216                         rra_def[rra_num++] = sstrdup (buffer);
217                 }
218         }
219
220 #if COLLECT_DEBUG
221         DEBUG ("rra_num = %i", rra_num);
222         for (i = 0; i < rra_num; i++)
223                 DEBUG ("  %s", rra_def[i]);
224 #endif
225
226         *ret = rra_def;
227         return (rra_num);
228 } /* int rra_get */
229
230 static void ds_free (int ds_num, char **ds_def)
231 {
232         int i;
233
234         for (i = 0; i < ds_num; i++)
235                 if (ds_def[i] != NULL)
236                         free (ds_def[i]);
237         free (ds_def);
238 }
239
240 static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
241 {
242         char **ds_def;
243         int ds_num;
244
245         char min[32];
246         char max[32];
247         char buffer[128];
248
249         DEBUG ("ds->ds_num = %i", ds->ds_num);
250
251         ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
252         if (ds_def == NULL)
253         {
254                 char errbuf[1024];
255                 ERROR ("rrdtool plugin: malloc failed: %s",
256                                 sstrerror (errno, errbuf, sizeof (errbuf)));
257                 return (-1);
258         }
259         memset (ds_def, '\0', ds->ds_num * sizeof (char *));
260
261         for (ds_num = 0; ds_num < ds->ds_num; ds_num++)
262         {
263                 data_source_t *d = ds->ds + ds_num;
264                 char *type;
265                 int status;
266
267                 ds_def[ds_num] = NULL;
268
269                 if (d->type == DS_TYPE_COUNTER)
270                         type = "COUNTER";
271                 else if (d->type == DS_TYPE_GAUGE)
272                         type = "GAUGE";
273                 else
274                 {
275                         ERROR ("rrdtool plugin: Unknown DS type: %i",
276                                         d->type);
277                         break;
278                 }
279
280                 if (isnan (d->min))
281                 {
282                         strcpy (min, "U");
283                 }
284                 else
285                 {
286                         snprintf (min, sizeof (min), "%lf", d->min);
287                         min[sizeof (min) - 1] = '\0';
288                 }
289
290                 if (isnan (d->max))
291                 {
292                         strcpy (max, "U");
293                 }
294                 else
295                 {
296                         snprintf (max, sizeof (max), "%lf", d->max);
297                         max[sizeof (max) - 1] = '\0';
298                 }
299
300                 status = snprintf (buffer, sizeof (buffer),
301                                 "DS:%s:%s:%i:%s:%s",
302                                 d->name, type,
303                                 (heartbeat > 0) ? heartbeat : (2 * vl->interval),
304                                 min, max);
305                 if ((status < 1) || (status >= sizeof (buffer)))
306                         break;
307
308                 ds_def[ds_num] = sstrdup (buffer);
309         } /* for ds_num = 0 .. ds->ds_num */
310
311 #if COLLECT_DEBUG
312 {
313         int i;
314         DEBUG ("ds_num = %i", ds_num);
315         for (i = 0; i < ds_num; i++)
316                 DEBUG ("  %s", ds_def[i]);
317 }
318 #endif
319
320         if (ds_num != ds->ds_num)
321         {
322                 ds_free (ds_num, ds_def);
323                 return (-1);
324         }
325
326         *ret = ds_def;
327         return (ds_num);
328 }
329
330 static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
331 {
332         char **argv;
333         int argc;
334         char **rra_def;
335         int rra_num;
336         char **ds_def;
337         int ds_num;
338         int i, j;
339         char stepsize_str[16];
340         int status = 0;
341
342         if (check_create_dir (filename))
343                 return (-1);
344
345         if ((rra_num = rra_get (&rra_def, vl)) < 1)
346         {
347                 ERROR ("rrd_create_file failed: Could not calculate RRAs");
348                 return (-1);
349         }
350
351         if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
352         {
353                 ERROR ("rrd_create_file failed: Could not calculate DSes");
354                 return (-1);
355         }
356
357         argc = ds_num + rra_num + 4;
358
359         if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
360         {
361                 char errbuf[1024];
362                 ERROR ("rrd_create failed: %s",
363                                 sstrerror (errno, errbuf, sizeof (errbuf)));
364                 return (-1);
365         }
366
367         status = snprintf (stepsize_str, sizeof (stepsize_str),
368                         "%i", (stepsize > 0) ? stepsize : vl->interval);
369         if ((status < 1) || (status >= sizeof (stepsize_str)))
370         {
371                 ERROR ("rrdtool plugin: snprintf failed.");
372                 free (argv);
373                 ds_free (ds_num, ds_def);
374                 rra_free (rra_num, rra_def);
375                 return (-1);
376         }
377
378         argv[0] = "create";
379         argv[1] = filename;
380         argv[2] = "-s";
381         argv[3] = stepsize_str;
382
383         j = 4;
384         for (i = 0; i < ds_num; i++)
385                 argv[j++] = ds_def[i];
386         for (i = 0; i < rra_num; i++)
387                 argv[j++] = rra_def[i];
388         argv[j] = NULL;
389
390         optind = 0; /* bug in librrd? */
391         rrd_clear_error ();
392         if (rrd_create (argc, argv) == -1)
393         {
394                 ERROR ("rrd_create failed: %s: %s", filename, rrd_get_error ());
395                 status = -1;
396         }
397
398         free (argv);
399         ds_free (ds_num, ds_def);
400         rra_free (rra_num, rra_def);
401
402         return (status);
403 }
404
405 static int value_list_to_string (char *buffer, int buffer_len,
406                 const data_set_t *ds, const value_list_t *vl)
407 {
408         int offset;
409         int status;
410         int i;
411
412         memset (buffer, '\0', buffer_len);
413
414         status = snprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
415         if ((status < 1) || (status >= buffer_len))
416                 return (-1);
417         offset = status;
418
419         for (i = 0; i < ds->ds_num; i++)
420         {
421                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
422                                 && (ds->ds[i].type != DS_TYPE_GAUGE))
423                         return (-1);
424
425                 if (ds->ds[i].type == DS_TYPE_COUNTER)
426                         status = snprintf (buffer + offset, buffer_len - offset,
427                                         ":%llu", vl->values[i].counter);
428                 else
429                         status = snprintf (buffer + offset, buffer_len - offset,
430                                         ":%lf", vl->values[i].gauge);
431
432                 if ((status < 1) || (status >= (buffer_len - offset)))
433                         return (-1);
434
435                 offset += status;
436         } /* for ds->ds_num */
437
438         return (0);
439 } /* int value_list_to_string */
440
441 static int value_list_to_filename (char *buffer, int buffer_len,
442                 const data_set_t *ds, const value_list_t *vl)
443 {
444         int offset = 0;
445         int status;
446
447         if (datadir != NULL)
448         {
449                 status = snprintf (buffer + offset, buffer_len - offset,
450                                 "%s/", datadir);
451                 if ((status < 1) || (status >= buffer_len - offset))
452                         return (-1);
453                 offset += status;
454         }
455
456         status = snprintf (buffer + offset, buffer_len - offset,
457                         "%s/", vl->host);
458         if ((status < 1) || (status >= buffer_len - offset))
459                 return (-1);
460         offset += status;
461
462         if (strlen (vl->plugin_instance) > 0)
463                 status = snprintf (buffer + offset, buffer_len - offset,
464                                 "%s-%s/", vl->plugin, vl->plugin_instance);
465         else
466                 status = snprintf (buffer + offset, buffer_len - offset,
467                                 "%s/", vl->plugin);
468         if ((status < 1) || (status >= buffer_len - offset))
469                 return (-1);
470         offset += status;
471
472         if (strlen (vl->type_instance) > 0)
473                 status = snprintf (buffer + offset, buffer_len - offset,
474                                 "%s-%s.rrd", ds->type, vl->type_instance);
475         else
476                 status = snprintf (buffer + offset, buffer_len - offset,
477                                 "%s.rrd", ds->type);
478         if ((status < 1) || (status >= buffer_len - offset))
479                 return (-1);
480         offset += status;
481
482         return (0);
483 } /* int value_list_to_filename */
484
485 static int rrd_write_to_file (char *filename, char **values, int values_num)
486 {
487         char **argv;
488         int argc;
489         int status;
490
491         if (values_num < 1)
492                 return (0);
493
494         argc = values_num + 2;
495         argv = (char **) malloc ((argc + 1) * sizeof (char *));
496         if (argv == NULL)
497                 return (-1);
498
499         argv[0] = "update";
500         argv[1] = filename;
501         memcpy (argv + 2, values, values_num * sizeof (char *));
502         argv[argc] = NULL;
503
504         DEBUG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv);
505
506         optind = 0; /* bug in librrd? */
507         rrd_clear_error ();
508         status = rrd_update (argc, argv);
509         if (status != 0)
510         {
511                 WARNING ("rrd_update failed: %s: %s",
512                                 filename, rrd_get_error ());
513                 status = -1;
514         }
515
516         sfree (argv);
517
518         return (status);
519 } /* int rrd_write_cache_entry */
520
521 static void *rrd_queue_thread (void *data)
522 {
523         while (42)
524         {
525                 rrd_queue_t *queue_entry;
526                 rrd_cache_t *cache_entry;
527                 char **values;
528                 int    values_num;
529                 int    i;
530
531                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
532                  * the same time, ALWAYS lock `cache_lock' first! */
533
534                 /* wait until an entry is available */
535                 pthread_mutex_lock (&queue_lock);
536                 while ((queue_head == NULL) && (do_shutdown == 0))
537                         pthread_cond_wait (&queue_cond, &queue_lock);
538
539                 /* We're in the shutdown phase */
540                 if (queue_head == NULL)
541                 {
542                         pthread_mutex_unlock (&queue_lock);
543                         break;
544                 }
545
546                 /* Dequeue the first entry */
547                 queue_entry = queue_head;
548                 if (queue_head == queue_tail)
549                         queue_head = queue_tail = NULL;
550                 else
551                         queue_head = queue_head->next;
552
553                 /* Unlock the queue again */
554                 pthread_mutex_unlock (&queue_lock);
555
556                 /* We now need the cache lock so the entry isn't updated while
557                  * we make a copy of it's values */
558                 pthread_mutex_lock (&cache_lock);
559
560                 avl_get (cache, queue_entry->filename, (void *) &cache_entry);
561
562                 values = cache_entry->values;
563                 values_num = cache_entry->values_num;
564
565                 cache_entry->values = NULL;
566                 cache_entry->values_num = 0;
567                 cache_entry->flags = FLAG_NONE;
568
569                 pthread_mutex_unlock (&cache_lock);
570
571                 /* Write the values to the RRD-file */
572                 rrd_write_to_file (queue_entry->filename, values, values_num);
573
574                 for (i = 0; i < values_num; i++)
575                 {
576                         sfree (values[i]);
577                 }
578                 sfree (values);
579                 sfree (queue_entry->filename);
580                 sfree (queue_entry);
581         } /* while (42) */
582
583         pthread_mutex_lock (&cache_lock);
584         avl_destroy (cache);
585         cache = NULL;
586         pthread_mutex_unlock (&cache_lock);
587
588         pthread_exit ((void *) 0);
589         return ((void *) 0);
590 } /* void *rrd_queue_thread */
591
592 static int rrd_queue_cache_entry (const char *filename)
593 {
594         rrd_queue_t *queue_entry;
595
596         queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
597         if (queue_entry == NULL)
598                 return (-1);
599
600         queue_entry->filename = strdup (filename);
601         if (queue_entry->filename == NULL)
602         {
603                 free (queue_entry);
604                 return (-1);
605         }
606
607         queue_entry->next = NULL;
608
609         pthread_mutex_lock (&queue_lock);
610         if (queue_tail == NULL)
611                 queue_head = queue_entry;
612         else
613                 queue_tail->next = queue_entry;
614         queue_tail = queue_entry;
615         pthread_cond_signal (&queue_cond);
616         pthread_mutex_unlock (&queue_lock);
617
618         DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
619
620         return (0);
621 } /* int rrd_queue_cache_entry */
622
623 static void rrd_cache_flush (int timeout)
624 {
625         rrd_cache_t *rc;
626         time_t       now;
627
628         char **keys = NULL;
629         int    keys_num = 0;
630
631         char *key;
632         avl_iterator_t *iter;
633         int i;
634
635         DEBUG ("Flushing cache, timeout = %i", timeout);
636
637         now = time (NULL);
638
639         /* Build a list of entries to be flushed */
640         iter = avl_get_iterator (cache);
641         while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
642         {
643                 DEBUG ("key = %s; age = %i;", key, now - rc->first_value);
644
645                 if (rc->flags == FLAG_QUEUED)
646                         continue;
647                 else if ((now - rc->first_value) < timeout)
648                         continue;
649                 else if (rc->values_num > 0)
650                 {
651                         if (rrd_queue_cache_entry (key) == 0)
652                                 rc->flags = FLAG_QUEUED;
653                 }
654                 else /* ancient and no values -> waste of memory */
655                 {
656                         keys = (char **) realloc ((void *) keys,
657                                         (keys_num + 1) * sizeof (char *));
658                         if (keys == NULL)
659                         {
660                                 char errbuf[1024];
661                                 ERROR ("rrdtool plugin: "
662                                                 "realloc failed: %s",
663                                                 sstrerror (errno, errbuf,
664                                                         sizeof (errbuf)));
665                                 avl_iterator_destroy (iter);
666                                 return;
667                         }
668                         keys[keys_num] = key;
669                         keys_num++;
670                 }
671         } /* while (avl_iterator_next) */
672         avl_iterator_destroy (iter);
673         
674         for (i = 0; i < keys_num; i++)
675         {
676                 if (avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
677                 {
678                         DEBUG ("avl_remove (%s) failed.", keys[i]);
679                         continue;
680                 }
681
682                 assert (rc->values == NULL);
683                 assert (rc->values_num == 0);
684
685                 sfree (rc);
686                 sfree (key);
687                 keys[i] = NULL;
688         } /* for (i = 0..keys_num) */
689
690         free (keys);
691         DEBUG ("Flushed %i value(s)", keys_num);
692
693         cache_flush_last = now;
694 } /* void rrd_cache_flush */
695
696 static int rrd_cache_insert (const char *filename,
697                 const char *value, time_t value_time)
698 {
699         rrd_cache_t *rc = NULL;
700         int new_rc = 0;
701         char **values_new;
702
703         pthread_mutex_lock (&cache_lock);
704
705         avl_get (cache, filename, (void *) &rc);
706
707         if (rc == NULL)
708         {
709                 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
710                 if (rc == NULL)
711                         return (-1);
712                 rc->values_num = 0;
713                 rc->values = NULL;
714                 rc->first_value = 0;
715                 rc->last_value = 0;
716                 rc->flags = FLAG_NONE;
717                 new_rc = 1;
718         }
719
720         if (rc->last_value >= value_time)
721         {
722                 pthread_mutex_unlock (&cache_lock);
723                 WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
724                                 (unsigned int) rc->last_value,
725                                 (unsigned int) value_time);
726                 return (-1);
727         }
728
729         values_new = (char **) realloc ((void *) rc->values,
730                         (rc->values_num + 1) * sizeof (char *));
731         if (values_new == NULL)
732         {
733                 char errbuf[1024];
734                 void *cache_key = NULL;
735
736                 sstrerror (errno, errbuf, sizeof (errbuf));
737
738                 avl_remove (cache, filename, &cache_key, NULL);
739                 pthread_mutex_unlock (&cache_lock);
740
741                 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
742
743                 sfree (cache_key);
744                 sfree (rc->values);
745                 sfree (rc);
746                 return (-1);
747         }
748         rc->values = values_new;
749
750         rc->values[rc->values_num] = strdup (value);
751         if (rc->values[rc->values_num] != NULL)
752                 rc->values_num++;
753
754         if (rc->values_num == 1)
755                 rc->first_value = value_time;
756         rc->last_value = value_time;
757
758         /* Insert if this is the first value */
759         if (new_rc == 1)
760         {
761                 void *cache_key = strdup (filename);
762
763                 if (cache_key == NULL)
764                 {
765                         char errbuf[1024];
766                         sstrerror (errno, errbuf, sizeof (errbuf));
767
768                         pthread_mutex_unlock (&cache_lock);
769
770                         ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
771
772                         sfree (rc->values[0]);
773                         sfree (rc->values);
774                         sfree (rc);
775                         return (-1);
776                 }
777
778                 avl_insert (cache, cache_key, rc);
779         }
780
781         DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value,
782                         (unsigned int) value_time, (void *) rc);
783
784         if ((rc->last_value - rc->first_value) >= cache_timeout)
785         {
786                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
787                  * the same time, ALWAYS lock `cache_lock' first! */
788                 if (rc->flags != FLAG_QUEUED)
789                 {
790                         if (rrd_queue_cache_entry (filename) == 0)
791                                 rc->flags = FLAG_QUEUED;
792                 }
793                 else
794                 {
795                         DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
796                 }
797         }
798
799         if ((cache_timeout > 0) &&
800                         ((time (NULL) - cache_flush_last) > cache_flush_timeout))
801                 rrd_cache_flush (cache_flush_timeout);
802
803
804         pthread_mutex_unlock (&cache_lock);
805
806         return (0);
807 } /* int rrd_cache_insert */
808
809 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
810 {
811         struct stat  statbuf;
812         char         filename[512];
813         char         values[512];
814         int          status;
815
816         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
817                 return (-1);
818
819         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
820                 return (-1);
821
822         if (stat (filename, &statbuf) == -1)
823         {
824                 if (errno == ENOENT)
825                 {
826                         if (rrd_create_file (filename, ds, vl))
827                                 return (-1);
828                 }
829                 else
830                 {
831                         char errbuf[1024];
832                         ERROR ("stat(%s) failed: %s", filename,
833                                         sstrerror (errno, errbuf,
834                                                 sizeof (errbuf)));
835                         return (-1);
836                 }
837         }
838         else if (!S_ISREG (statbuf.st_mode))
839         {
840                 ERROR ("stat(%s): Not a regular file!",
841                                 filename);
842                 return (-1);
843         }
844
845         status = rrd_cache_insert (filename, values, vl->time);
846
847         return (status);
848 } /* int rrd_write */
849
850 static int rrd_config (const char *key, const char *value)
851 {
852         if (strcasecmp ("CacheTimeout", key) == 0)
853         {
854                 int tmp = atoi (value);
855                 if (tmp < 0)
856                 {
857                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
858                                         "be greater than 0.\n");
859                         return (1);
860                 }
861                 cache_timeout = tmp;
862         }
863         else if (strcasecmp ("CacheFlush", key) == 0)
864         {
865                 int tmp = atoi (value);
866                 if (tmp < 0)
867                 {
868                         fprintf (stderr, "rrdtool: `CacheFlush' must "
869                                         "be greater than 0.\n");
870                         return (1);
871                 }
872                 cache_flush_timeout = tmp;
873         }
874         else if (strcasecmp ("DataDir", key) == 0)
875         {
876                 if (datadir != NULL)
877                         free (datadir);
878                 datadir = strdup (value);
879                 if (datadir != NULL)
880                 {
881                         int len = strlen (datadir);
882                         while ((len > 0) && (datadir[len - 1] == '/'))
883                         {
884                                 len--;
885                                 datadir[len] = '\0';
886                         }
887                         if (len <= 0)
888                         {
889                                 free (datadir);
890                                 datadir = NULL;
891                         }
892                 }
893         }
894         else if (strcasecmp ("StepSize", key) == 0)
895         {
896                 stepsize = atoi (value);
897                 if (stepsize < 0)
898                         stepsize = 0;
899         }
900         else if (strcasecmp ("HeartBeat", key) == 0)
901         {
902                 heartbeat = atoi (value);
903                 if (heartbeat < 0)
904                         heartbeat = 0;
905         }
906         else if (strcasecmp ("RRARows", key) == 0)
907         {
908                 int tmp = atoi (value);
909                 if (tmp <= 0)
910                 {
911                         fprintf (stderr, "rrdtool: `RRARows' must "
912                                         "be greater than 0.\n");
913                         return (1);
914                 }
915                 rrarows = tmp;
916         }
917         else if (strcasecmp ("RRATimespan", key) == 0)
918         {
919                 char *saveptr = NULL;
920                 char *dummy;
921                 char *ptr;
922                 char *value_copy;
923                 int *tmp_alloc;
924
925                 value_copy = strdup (value);
926                 if (value_copy == NULL)
927                         return (1);
928
929                 dummy = value_copy;
930                 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
931                 {
932                         dummy = NULL;
933                         
934                         tmp_alloc = realloc (rra_timespans_custom,
935                                         sizeof (int) * (rra_timespans_custom_num + 1));
936                         if (tmp_alloc == NULL)
937                         {
938                                 fprintf (stderr, "rrdtool: realloc failed.\n");
939                                 free (value_copy);
940                                 return (1);
941                         }
942                         rra_timespans_custom = tmp_alloc;
943                         rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr);
944                         if (rra_timespans_custom[rra_timespans_custom_num] != 0)
945                                 rra_timespans_custom_num++;
946                 } /* while (strtok_r) */
947                 free (value_copy);
948         }
949         else if (strcasecmp ("XFF", key) == 0)
950         {
951                 double tmp = atof (value);
952                 if ((tmp < 0.0) || (tmp >= 1.0))
953                 {
954                         fprintf (stderr, "rrdtool: `XFF' must "
955                                         "be in the range 0 to 1 (exclusive).");
956                         return (1);
957                 }
958                 xff = tmp;
959         }
960         else
961         {
962                 return (-1);
963         }
964         return (0);
965 } /* int rrd_config */
966
967 static int rrd_shutdown (void)
968 {
969         pthread_mutex_lock (&cache_lock);
970         rrd_cache_flush (-1);
971         pthread_mutex_unlock (&cache_lock);
972
973         pthread_mutex_lock (&queue_lock);
974         do_shutdown = 1;
975         pthread_cond_signal (&queue_cond);
976         pthread_mutex_unlock (&queue_lock);
977
978         return (0);
979 } /* int rrd_shutdown */
980
981 static int rrd_init (void)
982 {
983         int status;
984
985         if (stepsize < 0)
986                 stepsize = 0;
987         if (heartbeat <= 0)
988         {
989                 if (stepsize > 0)
990                         heartbeat = 2 * stepsize;
991                 else
992                         heartbeat = 0;
993         }
994
995         if ((heartbeat > 0) && (heartbeat < interval_g))
996                 WARNING ("rrdtool plugin: Your `heartbeat' is "
997                                 "smaller than your `interval'. This will "
998                                 "likely cause problems.");
999         else if ((stepsize > 0) && (stepsize < interval_g))
1000                 WARNING ("rrdtool plugin: Your `stepsize' is "
1001                                 "smaller than your `interval'. This will "
1002                                 "create needlessly big RRD-files.");
1003
1004         /* Set the cache up */
1005         pthread_mutex_lock (&cache_lock);
1006
1007         cache = avl_create ((int (*) (const void *, const void *)) strcmp);
1008         if (cache == NULL)
1009         {
1010                 ERROR ("rrdtool plugin: avl_create failed.");
1011                 return (-1);
1012         }
1013
1014         cache_flush_last = time (NULL);
1015         if (cache_timeout < 2)
1016         {
1017                 cache_timeout = 0;
1018                 cache_flush_timeout = 0;
1019         }
1020         else if (cache_flush_timeout < cache_timeout)
1021                 cache_flush_timeout = 10 * cache_timeout;
1022
1023         pthread_mutex_unlock (&cache_lock);
1024
1025         status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
1026         if (status != 0)
1027         {
1028                 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1029                 return (-1);
1030         }
1031
1032         DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
1033                         " heartbeat = %i; rrarows = %i; xff = %lf;",
1034                         (datadir == NULL) ? "(null)" : datadir,
1035                         stepsize, heartbeat, rrarows, xff);
1036
1037         return (0);
1038 } /* int rrd_init */
1039
1040 void module_register (void)
1041 {
1042         plugin_register_config ("rrdtool", rrd_config,
1043                         config_keys, config_keys_num);
1044         plugin_register_init ("rrdtool", rrd_init);
1045         plugin_register_write ("rrdtool", rrd_write);
1046         plugin_register_shutdown ("rrdtool", rrd_shutdown);
1047 }