Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2013  Florian octo Forster
4  * Copyright (C) 2008-2008  Sebastian Harl
5  * Copyright (C) 2009       Mariusz Gronczewski
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at collectd.org>
22  *   Sebastian Harl <sh at tokkee.org>
23  *   Mariusz Gronczewski <xani666 at gmail.com>
24  **/
25
26 #include "collectd.h"
27
28 #include "common.h"
29 #include "plugin.h"
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
33
34 #include <rrd.h>
35
36 /*
37  * Private types
38  */
39 typedef struct rrd_cache_s {
40   int values_num;
41   char **values;
42   cdtime_t first_value;
43   cdtime_t last_value;
44   int64_t random_variation;
45   enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
46 } rrd_cache_t;
47
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
50
51 struct rrd_queue_s {
52   char *filename;
53   struct rrd_queue_s *next;
54 };
55 typedef struct rrd_queue_s rrd_queue_t;
56
57 /*
58  * Private variables
59  */
60 static const char *config_keys[] = {
61     "CacheTimeout", "CacheFlush",      "CreateFilesAsync", "DataDir",
62     "StepSize",     "HeartBeat",       "RRARows",          "RRATimespan",
63     "XFF",          "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
65
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67  * is zero a default, depending on the `interval' member of the value list is
68  * being used. */
69 static char *datadir = NULL;
70 static double write_rate = 0.0;
71 static rrdcreate_config_t rrdcreate_config = {
72     /* stepsize = */ 0,
73     /* heartbeat = */ 0,
74     /* rrarows = */ 1200,
75     /* xff = */ 0.1,
76
77     /* timespans = */ NULL,
78     /* timespans_num = */ 0,
79
80     /* consolidation_functions = */ NULL,
81     /* consolidation_functions_num = */ 0,
82
83     /* async = */ 0};
84
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86  * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout = 0;
88 static cdtime_t cache_flush_timeout = 0;
89 static cdtime_t random_timeout = 0;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache = NULL;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
93
94 static rrd_queue_t *queue_head = NULL;
95 static rrd_queue_t *queue_tail = NULL;
96 static rrd_queue_t *flushq_head = NULL;
97 static rrd_queue_t *flushq_tail = NULL;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
102
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
105 #endif
106
107 static int do_shutdown = 0;
108
109 #if HAVE_THREADSAFE_LIBRRD
110 static int srrd_update(char *filename, char *template, int argc,
111                        const char **argv) {
112   optind = 0; /* bug in librrd? */
113   rrd_clear_error();
114
115   int status = rrd_update_r(filename, template, argc, (void *)argv);
116   if (status != 0) {
117     WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
118             rrd_get_error());
119   }
120
121   return status;
122 } /* int srrd_update */
123 /* #endif HAVE_THREADSAFE_LIBRRD */
124
125 #else  /* !HAVE_THREADSAFE_LIBRRD */
126 static int srrd_update(char *filename, char *template, int argc,
127                        const char **argv) {
128   int status;
129
130   int new_argc;
131   char **new_argv;
132
133   assert(template == NULL);
134
135   new_argc = 2 + argc;
136   new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137   if (new_argv == NULL) {
138     ERROR("rrdtool plugin: malloc failed.");
139     return -1;
140   }
141
142   new_argv[0] = "update";
143   new_argv[1] = filename;
144
145   memcpy(new_argv + 2, argv, argc * sizeof(char *));
146   new_argv[new_argc] = NULL;
147
148   pthread_mutex_lock(&librrd_lock);
149   optind = 0; /* bug in librrd? */
150   rrd_clear_error();
151
152   status = rrd_update(new_argc, new_argv);
153   pthread_mutex_unlock(&librrd_lock);
154
155   if (status != 0) {
156     WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
157             rrd_get_error());
158   }
159
160   sfree(new_argv);
161
162   return status;
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
165
166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167                                          const data_set_t *ds,
168                                          const value_list_t *vl) {
169   int offset;
170   int status;
171   time_t tt;
172
173   memset(buffer, '\0', buffer_len);
174
175   tt = CDTIME_T_TO_TIME_T(vl->time);
176   status = snprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177   if ((status < 1) || (status >= buffer_len))
178     return -1;
179   offset = status;
180
181   for (size_t i = 0; i < ds->ds_num; i++) {
182     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183         (ds->ds[i].type != DS_TYPE_GAUGE) &&
184         (ds->ds[i].type != DS_TYPE_DERIVE) &&
185         (ds->ds[i].type != DS_TYPE_ABSOLUTE))
186       return -1;
187
188     if (ds->ds[i].type == DS_TYPE_COUNTER)
189       status = snprintf(buffer + offset, buffer_len - offset, ":%llu",
190                         vl->values[i].counter);
191     else if (ds->ds[i].type == DS_TYPE_GAUGE)
192       status = snprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193                         vl->values[i].gauge);
194     else if (ds->ds[i].type == DS_TYPE_DERIVE)
195       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196                         vl->values[i].derive);
197     else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199                         vl->values[i].absolute);
200
201     if ((status < 1) || (status >= (buffer_len - offset)))
202       return -1;
203
204     offset += status;
205   } /* for ds->ds_num */
206
207   return 0;
208 } /* int value_list_to_string_multiple */
209
210 static int value_list_to_string(char *buffer, int buffer_len,
211                                 const data_set_t *ds, const value_list_t *vl) {
212   int status;
213   time_t tt;
214
215   if (ds->ds_num != 1)
216     return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
217
218   tt = CDTIME_T_TO_TIME_T(vl->time);
219   switch (ds->ds[0].type) {
220   case DS_TYPE_DERIVE:
221     status = snprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222                       vl->values[0].derive);
223     break;
224   case DS_TYPE_GAUGE:
225     status = snprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226                       vl->values[0].gauge);
227     break;
228   case DS_TYPE_COUNTER:
229     status = snprintf(buffer, buffer_len, "%u:%llu", (unsigned)tt,
230                       vl->values[0].counter);
231     break;
232   case DS_TYPE_ABSOLUTE:
233     status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234                       vl->values[0].absolute);
235     break;
236   default:
237     return EINVAL;
238   }
239
240   if ((status < 1) || (status >= buffer_len))
241     return ENOMEM;
242
243   return 0;
244 } /* int value_list_to_string */
245
246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247                                   value_list_t const *vl) {
248   char const suffix[] = ".rrd";
249   int status;
250   size_t len;
251
252   if (datadir != NULL) {
253     size_t datadir_len = strlen(datadir) + 1;
254
255     if (datadir_len >= buffer_size)
256       return ENOMEM;
257
258     sstrncpy(buffer, datadir, buffer_size);
259     buffer[datadir_len - 1] = '/';
260     buffer[datadir_len] = 0;
261
262     buffer += datadir_len;
263     buffer_size -= datadir_len;
264   }
265
266   status = FORMAT_VL(buffer, buffer_size, vl);
267   if (status != 0)
268     return status;
269
270   len = strlen(buffer);
271   assert(len < buffer_size);
272   buffer += len;
273   buffer_size -= len;
274
275   if (buffer_size <= sizeof(suffix))
276     return ENOMEM;
277
278   memcpy(buffer, suffix, sizeof(suffix));
279   return 0;
280 } /* int value_list_to_filename */
281
282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283   struct timeval tv_next_update;
284   struct timeval tv_now;
285
286   gettimeofday(&tv_next_update, /* timezone = */ NULL);
287
288   while (42) {
289     rrd_queue_t *queue_entry;
290     rrd_cache_t *cache_entry;
291     char **values;
292     int values_num;
293     int status;
294
295     values = NULL;
296     values_num = 0;
297
298     pthread_mutex_lock(&queue_lock);
299     /* Wait for values to arrive */
300     while (42) {
301       struct timespec ts_wait;
302
303       while ((flushq_head == NULL) && (queue_head == NULL) &&
304              (do_shutdown == 0))
305         pthread_cond_wait(&queue_cond, &queue_lock);
306
307       if ((flushq_head == NULL) && (queue_head == NULL))
308         break;
309
310       /* Don't delay if there's something to flush */
311       if (flushq_head != NULL)
312         break;
313
314       /* Don't delay if we're shutting down */
315       if (do_shutdown != 0)
316         break;
317
318       /* Don't delay if no delay was configured. */
319       if (write_rate <= 0.0)
320         break;
321
322       gettimeofday(&tv_now, /* timezone = */ NULL);
323       status = timeval_cmp(tv_next_update, tv_now, NULL);
324       /* We're good to go */
325       if (status <= 0)
326         break;
327
328       /* We're supposed to wait a bit with this update, so we'll
329        * wait for the next addition to the queue or to the end of
330        * the wait period - whichever comes first. */
331       ts_wait.tv_sec = tv_next_update.tv_sec;
332       ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
333
334       status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335       if (status == ETIMEDOUT)
336         break;
337     } /* while (42) */
338
339     /* XXX: If you need to lock both, cache_lock and queue_lock, at
340      * the same time, ALWAYS lock `cache_lock' first! */
341
342     /* We're in the shutdown phase */
343     if ((flushq_head == NULL) && (queue_head == NULL)) {
344       pthread_mutex_unlock(&queue_lock);
345       break;
346     }
347
348     if (flushq_head != NULL) {
349       /* Dequeue the first flush entry */
350       queue_entry = flushq_head;
351       if (flushq_head == flushq_tail)
352         flushq_head = flushq_tail = NULL;
353       else
354         flushq_head = flushq_head->next;
355     } else /* if (queue_head != NULL) */
356     {
357       /* Dequeue the first regular entry */
358       queue_entry = queue_head;
359       if (queue_head == queue_tail)
360         queue_head = queue_tail = NULL;
361       else
362         queue_head = queue_head->next;
363     }
364
365     /* Unlock the queue again */
366     pthread_mutex_unlock(&queue_lock);
367
368     /* We now need the cache lock so the entry isn't updated while
369      * we make a copy of its values */
370     pthread_mutex_lock(&cache_lock);
371
372     status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
373
374     if (status == 0) {
375       values = cache_entry->values;
376       values_num = cache_entry->values_num;
377
378       cache_entry->values = NULL;
379       cache_entry->values_num = 0;
380       cache_entry->flags = FLAG_NONE;
381     }
382
383     pthread_mutex_unlock(&cache_lock);
384
385     if (status != 0) {
386       sfree(queue_entry->filename);
387       sfree(queue_entry);
388       continue;
389     }
390
391     /* Update `tv_next_update' */
392     if (write_rate > 0.0) {
393       gettimeofday(&tv_now, /* timezone = */ NULL);
394       tv_next_update.tv_sec = tv_now.tv_sec;
395       tv_next_update.tv_usec =
396           tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397       while (tv_next_update.tv_usec > 1000000) {
398         tv_next_update.tv_sec++;
399         tv_next_update.tv_usec -= 1000000;
400       }
401     }
402
403     /* Write the values to the RRD-file */
404     srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405     DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406           (values_num == 1) ? "" : "s", queue_entry->filename);
407
408     for (int i = 0; i < values_num; i++) {
409       sfree(values[i]);
410     }
411     sfree(values);
412     sfree(queue_entry->filename);
413     sfree(queue_entry);
414   } /* while (42) */
415
416   pthread_exit((void *)0);
417   return (void *)0;
418 } /* void *rrd_queue_thread */
419
420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421                              rrd_queue_t **tail) {
422   rrd_queue_t *queue_entry;
423
424   queue_entry = malloc(sizeof(*queue_entry));
425   if (queue_entry == NULL)
426     return -1;
427
428   queue_entry->filename = strdup(filename);
429   if (queue_entry->filename == NULL) {
430     free(queue_entry);
431     return -1;
432   }
433
434   queue_entry->next = NULL;
435
436   pthread_mutex_lock(&queue_lock);
437
438   if (*tail == NULL)
439     *head = queue_entry;
440   else
441     (*tail)->next = queue_entry;
442   *tail = queue_entry;
443
444   pthread_cond_signal(&queue_cond);
445   pthread_mutex_unlock(&queue_lock);
446
447   return 0;
448 } /* int rrd_queue_enqueue */
449
450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451                              rrd_queue_t **tail) {
452   rrd_queue_t *this;
453   rrd_queue_t *prev;
454
455   pthread_mutex_lock(&queue_lock);
456
457   prev = NULL;
458   this = *head;
459
460   while (this != NULL) {
461     if (strcmp(this->filename, filename) == 0)
462       break;
463
464     prev = this;
465     this = this->next;
466   }
467
468   if (this == NULL) {
469     pthread_mutex_unlock(&queue_lock);
470     return -1;
471   }
472
473   if (prev == NULL)
474     *head = this->next;
475   else
476     prev->next = this->next;
477
478   if (this->next == NULL)
479     *tail = prev;
480
481   pthread_mutex_unlock(&queue_lock);
482
483   sfree(this->filename);
484   sfree(this);
485
486   return 0;
487 } /* int rrd_queue_dequeue */
488
489 /* XXX: You must hold "cache_lock" when calling this function! */
490 static void rrd_cache_flush(cdtime_t timeout) {
491   rrd_cache_t *rc;
492   cdtime_t now;
493
494   char **keys = NULL;
495   int keys_num = 0;
496
497   char *key;
498   c_avl_iterator_t *iter;
499
500   DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501         CDTIME_T_TO_DOUBLE(timeout));
502
503   now = cdtime();
504
505   /* Build a list of entries to be flushed */
506   iter = c_avl_get_iterator(cache);
507   while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508     if (rc->flags != FLAG_NONE)
509       continue;
510     /* timeout == 0  =>  flush everything */
511     else if ((timeout != 0) && ((now - rc->first_value) < timeout))
512       continue;
513     else if (rc->values_num > 0) {
514       int status;
515
516       status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
517       if (status == 0)
518         rc->flags = FLAG_QUEUED;
519     } else /* ancient and no values -> waste of memory */
520     {
521       char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
522       if (tmp == NULL) {
523         char errbuf[1024];
524         ERROR("rrdtool plugin: "
525               "realloc failed: %s",
526               sstrerror(errno, errbuf, sizeof(errbuf)));
527         c_avl_iterator_destroy(iter);
528         sfree(keys);
529         return;
530       }
531       keys = tmp;
532       keys[keys_num] = key;
533       keys_num++;
534     }
535   } /* while (c_avl_iterator_next) */
536   c_avl_iterator_destroy(iter);
537
538   for (int i = 0; i < keys_num; i++) {
539     if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
540       DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
541       continue;
542     }
543
544     assert(rc->values == NULL);
545     assert(rc->values_num == 0);
546
547     sfree(rc);
548     sfree(key);
549     keys[i] = NULL;
550   } /* for (i = 0..keys_num) */
551
552   sfree(keys);
553
554   cache_flush_last = now;
555 } /* void rrd_cache_flush */
556
557 static int rrd_cache_flush_identifier(cdtime_t timeout,
558                                       const char *identifier) {
559   rrd_cache_t *rc;
560   cdtime_t now;
561   int status;
562   char key[2048];
563
564   if (identifier == NULL) {
565     rrd_cache_flush(timeout);
566     return 0;
567   }
568
569   now = cdtime();
570
571   if (datadir == NULL)
572     snprintf(key, sizeof(key), "%s.rrd", identifier);
573   else
574     snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
575   key[sizeof(key) - 1] = 0;
576
577   status = c_avl_get(cache, key, (void *)&rc);
578   if (status != 0) {
579     INFO("rrdtool plugin: rrd_cache_flush_identifier: "
580          "c_avl_get (%s) failed. Does that file really exist?",
581          key);
582     return status;
583   }
584
585   if (rc->flags == FLAG_FLUSHQ) {
586     status = 0;
587   } else if (rc->flags == FLAG_QUEUED) {
588     rrd_queue_dequeue(key, &queue_head, &queue_tail);
589     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
590     if (status == 0)
591       rc->flags = FLAG_FLUSHQ;
592   } else if ((now - rc->first_value) < timeout) {
593     status = 0;
594   } else if (rc->values_num > 0) {
595     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
596     if (status == 0)
597       rc->flags = FLAG_FLUSHQ;
598   }
599
600   return status;
601 } /* int rrd_cache_flush_identifier */
602
603 static int64_t rrd_get_random_variation(void) {
604   if (random_timeout == 0)
605     return 0;
606
607   return (int64_t)cdrand_range(-random_timeout, random_timeout);
608 } /* int64_t rrd_get_random_variation */
609
610 static int rrd_cache_insert(const char *filename, const char *value,
611                             cdtime_t value_time) {
612   rrd_cache_t *rc = NULL;
613   int new_rc = 0;
614   char **values_new;
615
616   pthread_mutex_lock(&cache_lock);
617
618   /* This shouldn't happen, but it did happen at least once, so we'll be
619    * careful. */
620   if (cache == NULL) {
621     pthread_mutex_unlock(&cache_lock);
622     WARNING("rrdtool plugin: cache == NULL.");
623     return -1;
624   }
625
626   int status = c_avl_get(cache, filename, (void *)&rc);
627   if ((status != 0) || (rc == NULL)) {
628     rc = malloc(sizeof(*rc));
629     if (rc == NULL) {
630       pthread_mutex_unlock(&cache_lock);
631       return -1;
632     }
633     rc->values_num = 0;
634     rc->values = NULL;
635     rc->first_value = 0;
636     rc->last_value = 0;
637     rc->random_variation = rrd_get_random_variation();
638     rc->flags = FLAG_NONE;
639     new_rc = 1;
640   }
641
642   assert(value_time > 0); /* plugin_dispatch() ensures this. */
643   if (rc->last_value >= value_time) {
644     pthread_mutex_unlock(&cache_lock);
645     DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
646           ">= (value_time = %" PRIu64 ")",
647           rc->last_value, value_time);
648     return -1;
649   }
650
651   values_new =
652       realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
653   if (values_new == NULL) {
654     char errbuf[1024];
655     void *cache_key = NULL;
656
657     sstrerror(errno, errbuf, sizeof(errbuf));
658
659     c_avl_remove(cache, filename, &cache_key, NULL);
660     pthread_mutex_unlock(&cache_lock);
661
662     ERROR("rrdtool plugin: realloc failed: %s", errbuf);
663
664     sfree(cache_key);
665     sfree(rc->values);
666     sfree(rc);
667     return -1;
668   }
669   rc->values = values_new;
670
671   rc->values[rc->values_num] = strdup(value);
672   if (rc->values[rc->values_num] != NULL)
673     rc->values_num++;
674
675   if (rc->values_num == 1)
676     rc->first_value = value_time;
677   rc->last_value = value_time;
678
679   /* Insert if this is the first value */
680   if (new_rc == 1) {
681     void *cache_key = strdup(filename);
682
683     if (cache_key == NULL) {
684       char errbuf[1024];
685       sstrerror(errno, errbuf, sizeof(errbuf));
686
687       pthread_mutex_unlock(&cache_lock);
688
689       ERROR("rrdtool plugin: strdup failed: %s", errbuf);
690
691       sfree(rc->values[0]);
692       sfree(rc->values);
693       sfree(rc);
694       return -1;
695     }
696
697     c_avl_insert(cache, cache_key, rc);
698   }
699
700   DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
701         "values_num = %i; age = %.3f;",
702         filename, rc->values_num,
703         CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
704
705   if ((rc->last_value - rc->first_value) >=
706       (cache_timeout + rc->random_variation)) {
707     /* XXX: If you need to lock both, cache_lock and queue_lock, at
708      * the same time, ALWAYS lock `cache_lock' first! */
709     if (rc->flags == FLAG_NONE) {
710       int status;
711
712       status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
713       if (status == 0)
714         rc->flags = FLAG_QUEUED;
715
716       rc->random_variation = rrd_get_random_variation();
717     } else {
718       DEBUG("rrdtool plugin: `%s' is already queued.", filename);
719     }
720   }
721
722   if ((cache_timeout > 0) &&
723       ((cdtime() - cache_flush_last) > cache_flush_timeout))
724     rrd_cache_flush(cache_timeout + random_timeout);
725
726   pthread_mutex_unlock(&cache_lock);
727
728   return 0;
729 } /* int rrd_cache_insert */
730
731 static int rrd_cache_destroy(void) /* {{{ */
732 {
733   void *key = NULL;
734   void *value = NULL;
735
736   int non_empty = 0;
737
738   pthread_mutex_lock(&cache_lock);
739
740   if (cache == NULL) {
741     pthread_mutex_unlock(&cache_lock);
742     return 0;
743   }
744
745   while (c_avl_pick(cache, &key, &value) == 0) {
746     rrd_cache_t *rc;
747
748     sfree(key);
749     key = NULL;
750
751     rc = value;
752     value = NULL;
753
754     if (rc->values_num > 0)
755       non_empty++;
756
757     for (int i = 0; i < rc->values_num; i++)
758       sfree(rc->values[i]);
759     sfree(rc->values);
760     sfree(rc);
761   }
762
763   c_avl_destroy(cache);
764   cache = NULL;
765
766   if (non_empty > 0) {
767     INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
768          non_empty, (non_empty == 1) ? "entry" : "entries");
769   } else {
770     DEBUG("rrdtool plugin: No values have been lost "
771           "when destroying the cache.");
772   }
773
774   pthread_mutex_unlock(&cache_lock);
775   return 0;
776 } /* }}} int rrd_cache_destroy */
777
778 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
779   int a = *((int *)a_ptr);
780   int b = *((int *)b_ptr);
781
782   if (a < b)
783     return -1;
784   else if (a > b)
785     return 1;
786   else
787     return 0;
788 } /* int rrd_compare_numeric */
789
790 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
791                      user_data_t __attribute__((unused)) * user_data) {
792
793   if (do_shutdown)
794     return 0;
795
796   if (0 != strcmp(ds->type, vl->type)) {
797     ERROR("rrdtool plugin: DS type does not match value list type");
798     return -1;
799   }
800
801   char filename[PATH_MAX];
802   if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
803     return -1;
804
805   char values[32 * (ds->ds_num + 1)];
806   if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
807     return -1;
808
809   struct stat statbuf = {0};
810   if (stat(filename, &statbuf) == -1) {
811     if (errno == ENOENT) {
812       if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
813         return -1;
814       } else if (rrdcreate_config.async) {
815         return 0;
816       }
817     } else {
818       char errbuf[1024];
819       ERROR("rrdtool plugin: stat(%s) failed: %s", filename,
820             sstrerror(errno, errbuf, sizeof(errbuf)));
821       return -1;
822     }
823   } else if (!S_ISREG(statbuf.st_mode)) {
824     ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
825     return -1;
826   }
827
828   return rrd_cache_insert(filename, values, vl->time);
829 } /* int rrd_write */
830
831 static int rrd_flush(cdtime_t timeout, const char *identifier,
832                      __attribute__((unused)) user_data_t *user_data) {
833   pthread_mutex_lock(&cache_lock);
834
835   if (cache == NULL) {
836     pthread_mutex_unlock(&cache_lock);
837     return 0;
838   }
839
840   rrd_cache_flush_identifier(timeout, identifier);
841
842   pthread_mutex_unlock(&cache_lock);
843   return 0;
844 } /* int rrd_flush */
845
846 static int rrd_config(const char *key, const char *value) {
847   if (strcasecmp("CacheTimeout", key) == 0) {
848     double tmp = atof(value);
849     if (tmp < 0) {
850       fprintf(stderr, "rrdtool: `CacheTimeout' must "
851                       "be greater than 0.\n");
852       ERROR("rrdtool: `CacheTimeout' must "
853             "be greater than 0.\n");
854       return 1;
855     }
856     cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
857   } else if (strcasecmp("CacheFlush", key) == 0) {
858     double tmp = atof(value);
859     if (tmp < 0) {
860       fprintf(stderr, "rrdtool: `CacheFlush' must "
861                       "be greater than 0.\n");
862       ERROR("rrdtool: `CacheFlush' must "
863             "be greater than 0.\n");
864       return 1;
865     }
866     cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
867   } else if (strcasecmp("DataDir", key) == 0) {
868     char *tmp;
869     size_t len;
870
871     tmp = strdup(value);
872     if (tmp == NULL) {
873       ERROR("rrdtool plugin: strdup failed.");
874       return 1;
875     }
876
877     len = strlen(tmp);
878     while ((len > 0) && (tmp[len - 1] == '/')) {
879       len--;
880       tmp[len] = 0;
881     }
882
883     if (len == 0) {
884       ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
885       sfree(tmp);
886       return 1;
887     }
888
889     if (datadir != NULL) {
890       sfree(datadir);
891     }
892
893     datadir = tmp;
894   } else if (strcasecmp("StepSize", key) == 0) {
895     unsigned long temp = strtoul(value, NULL, 0);
896     if (temp > 0)
897       rrdcreate_config.stepsize = temp;
898   } else if (strcasecmp("HeartBeat", key) == 0) {
899     int temp = atoi(value);
900     if (temp > 0)
901       rrdcreate_config.heartbeat = temp;
902   } else if (strcasecmp("CreateFilesAsync", key) == 0) {
903     if (IS_TRUE(value))
904       rrdcreate_config.async = 1;
905     else
906       rrdcreate_config.async = 0;
907   } else if (strcasecmp("RRARows", key) == 0) {
908     int tmp = atoi(value);
909     if (tmp <= 0) {
910       fprintf(stderr, "rrdtool: `RRARows' must "
911                       "be greater than 0.\n");
912       ERROR("rrdtool: `RRARows' must "
913             "be greater than 0.\n");
914       return 1;
915     }
916     rrdcreate_config.rrarows = tmp;
917   } else if (strcasecmp("RRATimespan", key) == 0) {
918     char *saveptr = NULL;
919     char *dummy;
920     char *ptr;
921     char *value_copy;
922     int *tmp_alloc;
923
924     value_copy = strdup(value);
925     if (value_copy == NULL)
926       return 1;
927
928     dummy = value_copy;
929     while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
930       dummy = NULL;
931
932       tmp_alloc = realloc(rrdcreate_config.timespans,
933                           sizeof(int) * (rrdcreate_config.timespans_num + 1));
934       if (tmp_alloc == NULL) {
935         fprintf(stderr, "rrdtool: realloc failed.\n");
936         ERROR("rrdtool: realloc failed.\n");
937         free(value_copy);
938         return 1;
939       }
940       rrdcreate_config.timespans = tmp_alloc;
941       rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
942       if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
943         rrdcreate_config.timespans_num++;
944     } /* while (strtok_r) */
945
946     qsort(/* base = */ rrdcreate_config.timespans,
947           /* nmemb  = */ rrdcreate_config.timespans_num,
948           /* size   = */ sizeof(rrdcreate_config.timespans[0]),
949           /* compar = */ rrd_compare_numeric);
950
951     free(value_copy);
952   } else if (strcasecmp("XFF", key) == 0) {
953     double tmp = atof(value);
954     if ((tmp < 0.0) || (tmp >= 1.0)) {
955       fprintf(stderr, "rrdtool: `XFF' must "
956                       "be in the range 0 to 1 (exclusive).");
957       ERROR("rrdtool: `XFF' must "
958             "be in the range 0 to 1 (exclusive).");
959       return 1;
960     }
961     rrdcreate_config.xff = tmp;
962   } else if (strcasecmp("WritesPerSecond", key) == 0) {
963     double wps = atof(value);
964
965     if (wps < 0.0) {
966       fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
967                       "greater than or equal to zero.");
968       return 1;
969     } else if (wps == 0.0) {
970       write_rate = 0.0;
971     } else {
972       write_rate = 1.0 / wps;
973     }
974   } else if (strcasecmp("RandomTimeout", key) == 0) {
975     double tmp;
976
977     tmp = atof(value);
978     if (tmp < 0.0) {
979       fprintf(stderr, "rrdtool: `RandomTimeout' must "
980                       "be greater than or equal to zero.\n");
981       ERROR("rrdtool: `RandomTimeout' must "
982             "be greater then or equal to zero.");
983     } else {
984       random_timeout = DOUBLE_TO_CDTIME_T(tmp);
985     }
986   } else {
987     return -1;
988   }
989   return 0;
990 } /* int rrd_config */
991
992 static int rrd_shutdown(void) {
993   pthread_mutex_lock(&cache_lock);
994   rrd_cache_flush(0);
995   pthread_mutex_unlock(&cache_lock);
996
997   pthread_mutex_lock(&queue_lock);
998   do_shutdown = 1;
999   pthread_cond_signal(&queue_cond);
1000   pthread_mutex_unlock(&queue_lock);
1001
1002   if ((queue_thread_running != 0) &&
1003       ((queue_head != NULL) || (flushq_head != NULL))) {
1004     INFO("rrdtool plugin: Shutting down the queue thread. "
1005          "This may take a while.");
1006   } else if (queue_thread_running != 0) {
1007     INFO("rrdtool plugin: Shutting down the queue thread.");
1008   }
1009
1010   /* Wait for all the values to be written to disk before returning. */
1011   if (queue_thread_running != 0) {
1012     pthread_join(queue_thread, NULL);
1013     memset(&queue_thread, 0, sizeof(queue_thread));
1014     queue_thread_running = 0;
1015     DEBUG("rrdtool plugin: queue_thread exited.");
1016   }
1017
1018   rrd_cache_destroy();
1019
1020   return 0;
1021 } /* int rrd_shutdown */
1022
1023 static int rrd_init(void) {
1024   static int init_once = 0;
1025
1026   if (init_once != 0)
1027     return 0;
1028   init_once = 1;
1029
1030   if (rrdcreate_config.heartbeat <= 0)
1031     rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1032
1033   /* Set the cache up */
1034   pthread_mutex_lock(&cache_lock);
1035
1036   cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1037   if (cache == NULL) {
1038     pthread_mutex_unlock(&cache_lock);
1039     ERROR("rrdtool plugin: c_avl_create failed.");
1040     return -1;
1041   }
1042
1043   cache_flush_last = cdtime();
1044   if (cache_timeout == 0) {
1045     random_timeout = 0;
1046     cache_flush_timeout = 0;
1047   } else if (cache_flush_timeout < cache_timeout) {
1048     INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1049          "%.3f\". Adjusting \"CacheFlush\" to %.3f seconds.",
1050          CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1051          CDTIME_T_TO_DOUBLE(cache_timeout),
1052          CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1053     cache_flush_timeout = 10 * cache_timeout;
1054   }
1055
1056   /* Assure that "cache_timeout + random_variation" is never negative. */
1057   if (random_timeout > cache_timeout) {
1058     INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1059          CDTIME_T_TO_DOUBLE(cache_timeout));
1060     random_timeout = cache_timeout;
1061   }
1062
1063   pthread_mutex_unlock(&cache_lock);
1064
1065   int status =
1066       plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1067                            /* args = */ NULL, "rrdtool queue");
1068   if (status != 0) {
1069     ERROR("rrdtool plugin: Cannot create queue-thread.");
1070     return -1;
1071   }
1072   queue_thread_running = 1;
1073
1074   DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1075         " heartbeat = %i; rrarows = %i; xff = %lf;",
1076         (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1077         rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1078         rrdcreate_config.xff);
1079
1080   return 0;
1081 } /* int rrd_init */
1082
1083 void module_register(void) {
1084   plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1085   plugin_register_init("rrdtool", rrd_init);
1086   plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1087   plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1088   plugin_register_shutdown("rrdtool", rrd_shutdown);
1089 }