Merge branch 'collectd-5.8'
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2013  Florian octo Forster
4  * Copyright (C) 2008-2008  Sebastian Harl
5  * Copyright (C) 2009       Mariusz Gronczewski
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at collectd.org>
22  *   Sebastian Harl <sh at tokkee.org>
23  *   Mariusz Gronczewski <xani666 at gmail.com>
24  **/
25
26 #include "collectd.h"
27
28 #include "common.h"
29 #include "plugin.h"
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
33
34 #include <rrd.h>
35
36 /*
37  * Private types
38  */
39 typedef struct rrd_cache_s {
40   int values_num;
41   char **values;
42   cdtime_t first_value;
43   cdtime_t last_value;
44   int64_t random_variation;
45   enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
46 } rrd_cache_t;
47
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
50
51 struct rrd_queue_s {
52   char *filename;
53   struct rrd_queue_s *next;
54 };
55 typedef struct rrd_queue_s rrd_queue_t;
56
57 /*
58  * Private variables
59  */
60 static const char *config_keys[] = {
61     "CacheTimeout", "CacheFlush",      "CreateFilesAsync", "DataDir",
62     "StepSize",     "HeartBeat",       "RRARows",          "RRATimespan",
63     "XFF",          "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
65
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67  * is zero a default, depending on the `interval' member of the value list is
68  * being used. */
69 static char *datadir = NULL;
70 static double write_rate = 0.0;
71 static rrdcreate_config_t rrdcreate_config = {
72     /* stepsize = */ 0,
73     /* heartbeat = */ 0,
74     /* rrarows = */ 1200,
75     /* xff = */ 0.1,
76
77     /* timespans = */ NULL,
78     /* timespans_num = */ 0,
79
80     /* consolidation_functions = */ NULL,
81     /* consolidation_functions_num = */ 0,
82
83     /* async = */ 0};
84
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86  * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout = 0;
88 static cdtime_t cache_flush_timeout = 0;
89 static cdtime_t random_timeout = 0;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache = NULL;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
93
94 static rrd_queue_t *queue_head = NULL;
95 static rrd_queue_t *queue_tail = NULL;
96 static rrd_queue_t *flushq_head = NULL;
97 static rrd_queue_t *flushq_tail = NULL;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
102
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
105 #endif
106
107 static int do_shutdown = 0;
108
109 #if HAVE_THREADSAFE_LIBRRD
110 static int srrd_update(char *filename, char *template, int argc,
111                        const char **argv) {
112   optind = 0; /* bug in librrd? */
113   rrd_clear_error();
114
115   int status = rrd_update_r(filename, template, argc, (void *)argv);
116   if (status != 0) {
117     WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
118             rrd_get_error());
119   }
120
121   return status;
122 } /* int srrd_update */
123 /* #endif HAVE_THREADSAFE_LIBRRD */
124
125 #else  /* !HAVE_THREADSAFE_LIBRRD */
126 static int srrd_update(char *filename, char *template, int argc,
127                        const char **argv) {
128   int status;
129
130   int new_argc;
131   char **new_argv;
132
133   assert(template == NULL);
134
135   new_argc = 2 + argc;
136   new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137   if (new_argv == NULL) {
138     ERROR("rrdtool plugin: malloc failed.");
139     return -1;
140   }
141
142   new_argv[0] = "update";
143   new_argv[1] = filename;
144
145   memcpy(new_argv + 2, argv, argc * sizeof(char *));
146   new_argv[new_argc] = NULL;
147
148   pthread_mutex_lock(&librrd_lock);
149   optind = 0; /* bug in librrd? */
150   rrd_clear_error();
151
152   status = rrd_update(new_argc, new_argv);
153   pthread_mutex_unlock(&librrd_lock);
154
155   if (status != 0) {
156     WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
157             rrd_get_error());
158   }
159
160   sfree(new_argv);
161
162   return status;
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
165
166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167                                          const data_set_t *ds,
168                                          const value_list_t *vl) {
169   int offset;
170   int status;
171   time_t tt;
172
173   memset(buffer, '\0', buffer_len);
174
175   tt = CDTIME_T_TO_TIME_T(vl->time);
176   status = snprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177   if ((status < 1) || (status >= buffer_len))
178     return -1;
179   offset = status;
180
181   for (size_t i = 0; i < ds->ds_num; i++) {
182     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183         (ds->ds[i].type != DS_TYPE_GAUGE) &&
184         (ds->ds[i].type != DS_TYPE_DERIVE) &&
185         (ds->ds[i].type != DS_TYPE_ABSOLUTE))
186       return -1;
187
188     if (ds->ds[i].type == DS_TYPE_COUNTER)
189       status = snprintf(buffer + offset, buffer_len - offset, ":%llu",
190                         vl->values[i].counter);
191     else if (ds->ds[i].type == DS_TYPE_GAUGE)
192       status = snprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193                         vl->values[i].gauge);
194     else if (ds->ds[i].type == DS_TYPE_DERIVE)
195       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196                         vl->values[i].derive);
197     else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199                         vl->values[i].absolute);
200
201     if ((status < 1) || (status >= (buffer_len - offset)))
202       return -1;
203
204     offset += status;
205   } /* for ds->ds_num */
206
207   return 0;
208 } /* int value_list_to_string_multiple */
209
210 static int value_list_to_string(char *buffer, int buffer_len,
211                                 const data_set_t *ds, const value_list_t *vl) {
212   int status;
213   time_t tt;
214
215   if (ds->ds_num != 1)
216     return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
217
218   tt = CDTIME_T_TO_TIME_T(vl->time);
219   switch (ds->ds[0].type) {
220   case DS_TYPE_DERIVE:
221     status = snprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222                       vl->values[0].derive);
223     break;
224   case DS_TYPE_GAUGE:
225     status = snprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226                       vl->values[0].gauge);
227     break;
228   case DS_TYPE_COUNTER:
229     status = snprintf(buffer, buffer_len, "%u:%llu", (unsigned)tt,
230                       vl->values[0].counter);
231     break;
232   case DS_TYPE_ABSOLUTE:
233     status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234                       vl->values[0].absolute);
235     break;
236   default:
237     return EINVAL;
238   }
239
240   if ((status < 1) || (status >= buffer_len))
241     return ENOMEM;
242
243   return 0;
244 } /* int value_list_to_string */
245
246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247                                   value_list_t const *vl) {
248   char const suffix[] = ".rrd";
249   int status;
250   size_t len;
251
252   if (datadir != NULL) {
253     size_t datadir_len = strlen(datadir) + 1;
254
255     if (datadir_len >= buffer_size)
256       return ENOMEM;
257
258     sstrncpy(buffer, datadir, buffer_size);
259     buffer[datadir_len - 1] = '/';
260     buffer[datadir_len] = 0;
261
262     buffer += datadir_len;
263     buffer_size -= datadir_len;
264   }
265
266   status = FORMAT_VL(buffer, buffer_size, vl);
267   if (status != 0)
268     return status;
269
270   len = strlen(buffer);
271   assert(len < buffer_size);
272   buffer += len;
273   buffer_size -= len;
274
275   if (buffer_size <= sizeof(suffix))
276     return ENOMEM;
277
278   memcpy(buffer, suffix, sizeof(suffix));
279   return 0;
280 } /* int value_list_to_filename */
281
282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283   struct timeval tv_next_update;
284   struct timeval tv_now;
285
286   gettimeofday(&tv_next_update, /* timezone = */ NULL);
287
288   while (42) {
289     rrd_queue_t *queue_entry;
290     rrd_cache_t *cache_entry;
291     char **values;
292     int values_num;
293     int status;
294
295     values = NULL;
296     values_num = 0;
297
298     pthread_mutex_lock(&queue_lock);
299     /* Wait for values to arrive */
300     while (42) {
301       struct timespec ts_wait;
302
303       while ((flushq_head == NULL) && (queue_head == NULL) &&
304              (do_shutdown == 0))
305         pthread_cond_wait(&queue_cond, &queue_lock);
306
307       if ((flushq_head == NULL) && (queue_head == NULL))
308         break;
309
310       /* Don't delay if there's something to flush */
311       if (flushq_head != NULL)
312         break;
313
314       /* Don't delay if we're shutting down */
315       if (do_shutdown != 0)
316         break;
317
318       /* Don't delay if no delay was configured. */
319       if (write_rate <= 0.0)
320         break;
321
322       gettimeofday(&tv_now, /* timezone = */ NULL);
323       status = timeval_cmp(tv_next_update, tv_now, NULL);
324       /* We're good to go */
325       if (status <= 0)
326         break;
327
328       /* We're supposed to wait a bit with this update, so we'll
329        * wait for the next addition to the queue or to the end of
330        * the wait period - whichever comes first. */
331       ts_wait.tv_sec = tv_next_update.tv_sec;
332       ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
333
334       status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335       if (status == ETIMEDOUT)
336         break;
337     } /* while (42) */
338
339     /* XXX: If you need to lock both, cache_lock and queue_lock, at
340      * the same time, ALWAYS lock `cache_lock' first! */
341
342     /* We're in the shutdown phase */
343     if ((flushq_head == NULL) && (queue_head == NULL)) {
344       pthread_mutex_unlock(&queue_lock);
345       break;
346     }
347
348     if (flushq_head != NULL) {
349       /* Dequeue the first flush entry */
350       queue_entry = flushq_head;
351       if (flushq_head == flushq_tail)
352         flushq_head = flushq_tail = NULL;
353       else
354         flushq_head = flushq_head->next;
355     } else /* if (queue_head != NULL) */
356     {
357       /* Dequeue the first regular entry */
358       queue_entry = queue_head;
359       if (queue_head == queue_tail)
360         queue_head = queue_tail = NULL;
361       else
362         queue_head = queue_head->next;
363     }
364
365     /* Unlock the queue again */
366     pthread_mutex_unlock(&queue_lock);
367
368     /* We now need the cache lock so the entry isn't updated while
369      * we make a copy of its values */
370     pthread_mutex_lock(&cache_lock);
371
372     status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
373
374     if (status == 0) {
375       values = cache_entry->values;
376       values_num = cache_entry->values_num;
377
378       cache_entry->values = NULL;
379       cache_entry->values_num = 0;
380       cache_entry->flags = FLAG_NONE;
381     }
382
383     pthread_mutex_unlock(&cache_lock);
384
385     if (status != 0) {
386       sfree(queue_entry->filename);
387       sfree(queue_entry);
388       continue;
389     }
390
391     /* Update `tv_next_update' */
392     if (write_rate > 0.0) {
393       gettimeofday(&tv_now, /* timezone = */ NULL);
394       tv_next_update.tv_sec = tv_now.tv_sec;
395       tv_next_update.tv_usec =
396           tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397       while (tv_next_update.tv_usec > 1000000) {
398         tv_next_update.tv_sec++;
399         tv_next_update.tv_usec -= 1000000;
400       }
401     }
402
403     /* Write the values to the RRD-file */
404     srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405     DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406           (values_num == 1) ? "" : "s", queue_entry->filename);
407
408     for (int i = 0; i < values_num; i++) {
409       sfree(values[i]);
410     }
411     sfree(values);
412     sfree(queue_entry->filename);
413     sfree(queue_entry);
414   } /* while (42) */
415
416   pthread_exit((void *)0);
417   return (void *)0;
418 } /* void *rrd_queue_thread */
419
420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421                              rrd_queue_t **tail) {
422   rrd_queue_t *queue_entry;
423
424   queue_entry = malloc(sizeof(*queue_entry));
425   if (queue_entry == NULL)
426     return -1;
427
428   queue_entry->filename = strdup(filename);
429   if (queue_entry->filename == NULL) {
430     free(queue_entry);
431     return -1;
432   }
433
434   queue_entry->next = NULL;
435
436   pthread_mutex_lock(&queue_lock);
437
438   if (*tail == NULL)
439     *head = queue_entry;
440   else
441     (*tail)->next = queue_entry;
442   *tail = queue_entry;
443
444   pthread_cond_signal(&queue_cond);
445   pthread_mutex_unlock(&queue_lock);
446
447   return 0;
448 } /* int rrd_queue_enqueue */
449
450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451                              rrd_queue_t **tail) {
452   rrd_queue_t *this;
453   rrd_queue_t *prev;
454
455   pthread_mutex_lock(&queue_lock);
456
457   prev = NULL;
458   this = *head;
459
460   while (this != NULL) {
461     if (strcmp(this->filename, filename) == 0)
462       break;
463
464     prev = this;
465     this = this->next;
466   }
467
468   if (this == NULL) {
469     pthread_mutex_unlock(&queue_lock);
470     return -1;
471   }
472
473   if (prev == NULL)
474     *head = this->next;
475   else
476     prev->next = this->next;
477
478   if (this->next == NULL)
479     *tail = prev;
480
481   pthread_mutex_unlock(&queue_lock);
482
483   sfree(this->filename);
484   sfree(this);
485
486   return 0;
487 } /* int rrd_queue_dequeue */
488
489 /* XXX: You must hold "cache_lock" when calling this function! */
490 static void rrd_cache_flush(cdtime_t timeout) {
491   rrd_cache_t *rc;
492   cdtime_t now;
493
494   char **keys = NULL;
495   int keys_num = 0;
496
497   char *key;
498   c_avl_iterator_t *iter;
499
500   DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501         CDTIME_T_TO_DOUBLE(timeout));
502
503   now = cdtime();
504
505   /* Build a list of entries to be flushed */
506   iter = c_avl_get_iterator(cache);
507   while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508     if (rc->flags != FLAG_NONE)
509       continue;
510     /* timeout == 0  =>  flush everything */
511     else if ((timeout != 0) && ((now - rc->first_value) < timeout))
512       continue;
513     else if (rc->values_num > 0) {
514       int status;
515
516       status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
517       if (status == 0)
518         rc->flags = FLAG_QUEUED;
519     } else /* ancient and no values -> waste of memory */
520     {
521       char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
522       if (tmp == NULL) {
523         ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
524         c_avl_iterator_destroy(iter);
525         sfree(keys);
526         return;
527       }
528       keys = tmp;
529       keys[keys_num] = key;
530       keys_num++;
531     }
532   } /* while (c_avl_iterator_next) */
533   c_avl_iterator_destroy(iter);
534
535   for (int i = 0; i < keys_num; i++) {
536     if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
537       DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
538       continue;
539     }
540
541     assert(rc->values == NULL);
542     assert(rc->values_num == 0);
543
544     sfree(rc);
545     sfree(key);
546     keys[i] = NULL;
547   } /* for (i = 0..keys_num) */
548
549   sfree(keys);
550
551   cache_flush_last = now;
552 } /* void rrd_cache_flush */
553
554 static int rrd_cache_flush_identifier(cdtime_t timeout,
555                                       const char *identifier) {
556   rrd_cache_t *rc;
557   cdtime_t now;
558   int status;
559   char key[2048];
560
561   if (identifier == NULL) {
562     rrd_cache_flush(timeout);
563     return 0;
564   }
565
566   now = cdtime();
567
568   if (datadir == NULL)
569     snprintf(key, sizeof(key), "%s.rrd", identifier);
570   else
571     snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
572   key[sizeof(key) - 1] = 0;
573
574   status = c_avl_get(cache, key, (void *)&rc);
575   if (status != 0) {
576     INFO("rrdtool plugin: rrd_cache_flush_identifier: "
577          "c_avl_get (%s) failed. Does that file really exist?",
578          key);
579     return status;
580   }
581
582   if (rc->flags == FLAG_FLUSHQ) {
583     status = 0;
584   } else if (rc->flags == FLAG_QUEUED) {
585     rrd_queue_dequeue(key, &queue_head, &queue_tail);
586     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
587     if (status == 0)
588       rc->flags = FLAG_FLUSHQ;
589   } else if ((now - rc->first_value) < timeout) {
590     status = 0;
591   } else if (rc->values_num > 0) {
592     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
593     if (status == 0)
594       rc->flags = FLAG_FLUSHQ;
595   }
596
597   return status;
598 } /* int rrd_cache_flush_identifier */
599
600 static int64_t rrd_get_random_variation(void) {
601   if (random_timeout == 0)
602     return 0;
603
604   return (int64_t)cdrand_range(-random_timeout, random_timeout);
605 } /* int64_t rrd_get_random_variation */
606
607 static int rrd_cache_insert(const char *filename, const char *value,
608                             cdtime_t value_time) {
609   rrd_cache_t *rc = NULL;
610   int new_rc = 0;
611   char **values_new;
612
613   pthread_mutex_lock(&cache_lock);
614
615   /* This shouldn't happen, but it did happen at least once, so we'll be
616    * careful. */
617   if (cache == NULL) {
618     pthread_mutex_unlock(&cache_lock);
619     WARNING("rrdtool plugin: cache == NULL.");
620     return -1;
621   }
622
623   int status = c_avl_get(cache, filename, (void *)&rc);
624   if ((status != 0) || (rc == NULL)) {
625     rc = malloc(sizeof(*rc));
626     if (rc == NULL) {
627       pthread_mutex_unlock(&cache_lock);
628       return -1;
629     }
630     rc->values_num = 0;
631     rc->values = NULL;
632     rc->first_value = 0;
633     rc->last_value = 0;
634     rc->random_variation = rrd_get_random_variation();
635     rc->flags = FLAG_NONE;
636     new_rc = 1;
637   }
638
639   assert(value_time > 0); /* plugin_dispatch() ensures this. */
640   if (rc->last_value >= value_time) {
641     pthread_mutex_unlock(&cache_lock);
642     DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
643           ">= (value_time = %" PRIu64 ")",
644           rc->last_value, value_time);
645     return -1;
646   }
647
648   values_new =
649       realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
650   if (values_new == NULL) {
651     void *cache_key = NULL;
652
653     c_avl_remove(cache, filename, &cache_key, NULL);
654     pthread_mutex_unlock(&cache_lock);
655
656     ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
657
658     sfree(cache_key);
659     sfree(rc->values);
660     sfree(rc);
661     return -1;
662   }
663   rc->values = values_new;
664
665   rc->values[rc->values_num] = strdup(value);
666   if (rc->values[rc->values_num] != NULL)
667     rc->values_num++;
668
669   if (rc->values_num == 1)
670     rc->first_value = value_time;
671   rc->last_value = value_time;
672
673   /* Insert if this is the first value */
674   if (new_rc == 1) {
675     void *cache_key = strdup(filename);
676
677     if (cache_key == NULL) {
678       pthread_mutex_unlock(&cache_lock);
679
680       ERROR("rrdtool plugin: strdup failed: %s", STRERRNO);
681
682       sfree(rc->values[0]);
683       sfree(rc->values);
684       sfree(rc);
685       return -1;
686     }
687
688     c_avl_insert(cache, cache_key, rc);
689   }
690
691   DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
692         "values_num = %i; age = %.3f;",
693         filename, rc->values_num,
694         CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
695
696   if ((rc->last_value - rc->first_value) >=
697       (cache_timeout + rc->random_variation)) {
698     /* XXX: If you need to lock both, cache_lock and queue_lock, at
699      * the same time, ALWAYS lock `cache_lock' first! */
700     if (rc->flags == FLAG_NONE) {
701       int status;
702
703       status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
704       if (status == 0)
705         rc->flags = FLAG_QUEUED;
706
707       rc->random_variation = rrd_get_random_variation();
708     } else {
709       DEBUG("rrdtool plugin: `%s' is already queued.", filename);
710     }
711   }
712
713   if ((cache_timeout > 0) &&
714       ((cdtime() - cache_flush_last) > cache_flush_timeout))
715     rrd_cache_flush(cache_timeout + random_timeout);
716
717   pthread_mutex_unlock(&cache_lock);
718
719   return 0;
720 } /* int rrd_cache_insert */
721
722 static int rrd_cache_destroy(void) /* {{{ */
723 {
724   void *key = NULL;
725   void *value = NULL;
726
727   int non_empty = 0;
728
729   pthread_mutex_lock(&cache_lock);
730
731   if (cache == NULL) {
732     pthread_mutex_unlock(&cache_lock);
733     return 0;
734   }
735
736   while (c_avl_pick(cache, &key, &value) == 0) {
737     rrd_cache_t *rc;
738
739     sfree(key);
740     key = NULL;
741
742     rc = value;
743     value = NULL;
744
745     if (rc->values_num > 0)
746       non_empty++;
747
748     for (int i = 0; i < rc->values_num; i++)
749       sfree(rc->values[i]);
750     sfree(rc->values);
751     sfree(rc);
752   }
753
754   c_avl_destroy(cache);
755   cache = NULL;
756
757   if (non_empty > 0) {
758     INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
759          non_empty, (non_empty == 1) ? "entry" : "entries");
760   } else {
761     DEBUG("rrdtool plugin: No values have been lost "
762           "when destroying the cache.");
763   }
764
765   pthread_mutex_unlock(&cache_lock);
766   return 0;
767 } /* }}} int rrd_cache_destroy */
768
769 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
770   int a = *((int *)a_ptr);
771   int b = *((int *)b_ptr);
772
773   if (a < b)
774     return -1;
775   else if (a > b)
776     return 1;
777   else
778     return 0;
779 } /* int rrd_compare_numeric */
780
781 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
782                      user_data_t __attribute__((unused)) * user_data) {
783
784   if (do_shutdown)
785     return 0;
786
787   if (0 != strcmp(ds->type, vl->type)) {
788     ERROR("rrdtool plugin: DS type does not match value list type");
789     return -1;
790   }
791
792   char filename[PATH_MAX];
793   if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
794     return -1;
795
796   char values[32 * (ds->ds_num + 1)];
797   if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
798     return -1;
799
800   struct stat statbuf = {0};
801   if (stat(filename, &statbuf) == -1) {
802     if (errno == ENOENT) {
803       if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
804         return -1;
805       } else if (rrdcreate_config.async) {
806         return 0;
807       }
808     } else {
809       ERROR("rrdtool plugin: stat(%s) failed: %s", filename, STRERRNO);
810       return -1;
811     }
812   } else if (!S_ISREG(statbuf.st_mode)) {
813     ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
814     return -1;
815   }
816
817   return rrd_cache_insert(filename, values, vl->time);
818 } /* int rrd_write */
819
820 static int rrd_flush(cdtime_t timeout, const char *identifier,
821                      __attribute__((unused)) user_data_t *user_data) {
822   pthread_mutex_lock(&cache_lock);
823
824   if (cache == NULL) {
825     pthread_mutex_unlock(&cache_lock);
826     return 0;
827   }
828
829   rrd_cache_flush_identifier(timeout, identifier);
830
831   pthread_mutex_unlock(&cache_lock);
832   return 0;
833 } /* int rrd_flush */
834
835 static int rrd_config(const char *key, const char *value) {
836   if (strcasecmp("CacheTimeout", key) == 0) {
837     double tmp = atof(value);
838     if (tmp < 0) {
839       fprintf(stderr, "rrdtool: `CacheTimeout' must "
840                       "be greater than 0.\n");
841       ERROR("rrdtool: `CacheTimeout' must "
842             "be greater than 0.\n");
843       return 1;
844     }
845     cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
846   } else if (strcasecmp("CacheFlush", key) == 0) {
847     double tmp = atof(value);
848     if (tmp < 0) {
849       fprintf(stderr, "rrdtool: `CacheFlush' must "
850                       "be greater than 0.\n");
851       ERROR("rrdtool: `CacheFlush' must "
852             "be greater than 0.\n");
853       return 1;
854     }
855     cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
856   } else if (strcasecmp("DataDir", key) == 0) {
857     char *tmp;
858     size_t len;
859
860     tmp = strdup(value);
861     if (tmp == NULL) {
862       ERROR("rrdtool plugin: strdup failed.");
863       return 1;
864     }
865
866     len = strlen(tmp);
867     while ((len > 0) && (tmp[len - 1] == '/')) {
868       len--;
869       tmp[len] = 0;
870     }
871
872     if (len == 0) {
873       ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
874       sfree(tmp);
875       return 1;
876     }
877
878     if (datadir != NULL) {
879       sfree(datadir);
880     }
881
882     datadir = tmp;
883   } else if (strcasecmp("StepSize", key) == 0) {
884     unsigned long temp = strtoul(value, NULL, 0);
885     if (temp > 0)
886       rrdcreate_config.stepsize = temp;
887   } else if (strcasecmp("HeartBeat", key) == 0) {
888     int temp = atoi(value);
889     if (temp > 0)
890       rrdcreate_config.heartbeat = temp;
891   } else if (strcasecmp("CreateFilesAsync", key) == 0) {
892     if (IS_TRUE(value))
893       rrdcreate_config.async = 1;
894     else
895       rrdcreate_config.async = 0;
896   } else if (strcasecmp("RRARows", key) == 0) {
897     int tmp = atoi(value);
898     if (tmp <= 0) {
899       fprintf(stderr, "rrdtool: `RRARows' must "
900                       "be greater than 0.\n");
901       ERROR("rrdtool: `RRARows' must "
902             "be greater than 0.\n");
903       return 1;
904     }
905     rrdcreate_config.rrarows = tmp;
906   } else if (strcasecmp("RRATimespan", key) == 0) {
907     char *saveptr = NULL;
908     char *dummy;
909     char *ptr;
910     char *value_copy;
911     int *tmp_alloc;
912
913     value_copy = strdup(value);
914     if (value_copy == NULL)
915       return 1;
916
917     dummy = value_copy;
918     while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
919       dummy = NULL;
920
921       tmp_alloc = realloc(rrdcreate_config.timespans,
922                           sizeof(int) * (rrdcreate_config.timespans_num + 1));
923       if (tmp_alloc == NULL) {
924         fprintf(stderr, "rrdtool: realloc failed.\n");
925         ERROR("rrdtool: realloc failed.\n");
926         free(value_copy);
927         return 1;
928       }
929       rrdcreate_config.timespans = tmp_alloc;
930       rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
931       if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
932         rrdcreate_config.timespans_num++;
933     } /* while (strtok_r) */
934
935     qsort(/* base = */ rrdcreate_config.timespans,
936           /* nmemb  = */ rrdcreate_config.timespans_num,
937           /* size   = */ sizeof(rrdcreate_config.timespans[0]),
938           /* compar = */ rrd_compare_numeric);
939
940     free(value_copy);
941   } else if (strcasecmp("XFF", key) == 0) {
942     double tmp = atof(value);
943     if ((tmp < 0.0) || (tmp >= 1.0)) {
944       fprintf(stderr, "rrdtool: `XFF' must "
945                       "be in the range 0 to 1 (exclusive).");
946       ERROR("rrdtool: `XFF' must "
947             "be in the range 0 to 1 (exclusive).");
948       return 1;
949     }
950     rrdcreate_config.xff = tmp;
951   } else if (strcasecmp("WritesPerSecond", key) == 0) {
952     double wps = atof(value);
953
954     if (wps < 0.0) {
955       fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
956                       "greater than or equal to zero.");
957       return 1;
958     } else if (wps == 0.0) {
959       write_rate = 0.0;
960     } else {
961       write_rate = 1.0 / wps;
962     }
963   } else if (strcasecmp("RandomTimeout", key) == 0) {
964     double tmp;
965
966     tmp = atof(value);
967     if (tmp < 0.0) {
968       fprintf(stderr, "rrdtool: `RandomTimeout' must "
969                       "be greater than or equal to zero.\n");
970       ERROR("rrdtool: `RandomTimeout' must "
971             "be greater then or equal to zero.");
972     } else {
973       random_timeout = DOUBLE_TO_CDTIME_T(tmp);
974     }
975   } else {
976     return -1;
977   }
978   return 0;
979 } /* int rrd_config */
980
981 static int rrd_shutdown(void) {
982   pthread_mutex_lock(&cache_lock);
983   rrd_cache_flush(0);
984   pthread_mutex_unlock(&cache_lock);
985
986   pthread_mutex_lock(&queue_lock);
987   do_shutdown = 1;
988   pthread_cond_signal(&queue_cond);
989   pthread_mutex_unlock(&queue_lock);
990
991   if ((queue_thread_running != 0) &&
992       ((queue_head != NULL) || (flushq_head != NULL))) {
993     INFO("rrdtool plugin: Shutting down the queue thread. "
994          "This may take a while.");
995   } else if (queue_thread_running != 0) {
996     INFO("rrdtool plugin: Shutting down the queue thread.");
997   }
998
999   /* Wait for all the values to be written to disk before returning. */
1000   if (queue_thread_running != 0) {
1001     pthread_join(queue_thread, NULL);
1002     memset(&queue_thread, 0, sizeof(queue_thread));
1003     queue_thread_running = 0;
1004     DEBUG("rrdtool plugin: queue_thread exited.");
1005   }
1006
1007   rrd_cache_destroy();
1008
1009   return 0;
1010 } /* int rrd_shutdown */
1011
1012 static int rrd_init(void) {
1013   static int init_once = 0;
1014
1015   if (init_once != 0)
1016     return 0;
1017   init_once = 1;
1018
1019   if (rrdcreate_config.heartbeat <= 0)
1020     rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1021
1022   /* Set the cache up */
1023   pthread_mutex_lock(&cache_lock);
1024
1025   cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1026   if (cache == NULL) {
1027     pthread_mutex_unlock(&cache_lock);
1028     ERROR("rrdtool plugin: c_avl_create failed.");
1029     return -1;
1030   }
1031
1032   cache_flush_last = cdtime();
1033   if (cache_timeout == 0) {
1034     random_timeout = 0;
1035     cache_flush_timeout = 0;
1036   } else if (cache_flush_timeout < cache_timeout) {
1037     INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1038          "%.3f\". Adjusting \"CacheFlush\" to %.3f seconds.",
1039          CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1040          CDTIME_T_TO_DOUBLE(cache_timeout),
1041          CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1042     cache_flush_timeout = 10 * cache_timeout;
1043   }
1044
1045   /* Assure that "cache_timeout + random_variation" is never negative. */
1046   if (random_timeout > cache_timeout) {
1047     INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1048          CDTIME_T_TO_DOUBLE(cache_timeout));
1049     random_timeout = cache_timeout;
1050   }
1051
1052   pthread_mutex_unlock(&cache_lock);
1053
1054   int status =
1055       plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1056                            /* args = */ NULL, "rrdtool queue");
1057   if (status != 0) {
1058     ERROR("rrdtool plugin: Cannot create queue-thread.");
1059     return -1;
1060   }
1061   queue_thread_running = 1;
1062
1063   DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1064         " heartbeat = %i; rrarows = %i; xff = %lf;",
1065         (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1066         rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1067         rrdcreate_config.xff);
1068
1069   return 0;
1070 } /* int rrd_init */
1071
1072 void module_register(void) {
1073   plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1074   plugin_register_init("rrdtool", rrd_init);
1075   plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1076   plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1077   plugin_register_shutdown("rrdtool", rrd_shutdown);
1078 }