Move the variable declaration closer to use
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2013  Florian octo Forster
4  * Copyright (C) 2008-2008  Sebastian Harl
5  * Copyright (C) 2009       Mariusz Gronczewski
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at collectd.org>
22  *   Sebastian Harl <sh at tokkee.org>
23  *   Mariusz Gronczewski <xani666 at gmail.com>
24  **/
25
26 #include "collectd.h"
27
28 #include "plugin.h"
29 #include "utils/avltree/avltree.h"
30 #include "utils/common/common.h"
31 #include "utils/rrdcreate/rrdcreate.h"
32 #include "utils_random.h"
33
34 #include <rrd.h>
35
36 /*
37  * Private types
38  */
39 typedef struct rrd_cache_s {
40   int values_num;
41   char **values;
42   cdtime_t first_value;
43   cdtime_t last_value;
44   int64_t random_variation;
45   enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
46 } rrd_cache_t;
47
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
50
51 struct rrd_queue_s {
52   char *filename;
53   struct rrd_queue_s *next;
54 };
55 typedef struct rrd_queue_s rrd_queue_t;
56
57 /*
58  * Private variables
59  */
60 static const char *config_keys[] = {
61     "CacheTimeout", "CacheFlush",      "CreateFilesAsync", "DataDir",
62     "StepSize",     "HeartBeat",       "RRARows",          "RRATimespan",
63     "XFF",          "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
65
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67  * is zero a default, depending on the `interval' member of the value list is
68  * being used. */
69 static char *datadir;
70 static double write_rate;
71 static rrdcreate_config_t rrdcreate_config = {
72     /* stepsize = */ 0,
73     /* heartbeat = */ 0,
74     /* rrarows = */ 1200,
75     /* xff = */ 0.1,
76
77     /* timespans = */ NULL,
78     /* timespans_num = */ 0,
79
80     /* consolidation_functions = */ NULL,
81     /* consolidation_functions_num = */ 0,
82
83     /* async = */ 0};
84
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86  * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout;
88 static cdtime_t cache_flush_timeout;
89 static cdtime_t random_timeout;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
93
94 static rrd_queue_t *queue_head;
95 static rrd_queue_t *queue_tail;
96 static rrd_queue_t *flushq_head;
97 static rrd_queue_t *flushq_tail;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
102
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
105 #endif
106
107 static int do_shutdown;
108
109 #if HAVE_THREADSAFE_LIBRRD
110 static int srrd_update(char *filename, char *template, int argc,
111                        const char **argv) {
112   optind = 0; /* bug in librrd? */
113   rrd_clear_error();
114
115   int status = rrd_update_r(filename, template, argc, (void *)argv);
116   if (status != 0) {
117     WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
118             rrd_get_error());
119   }
120
121   return status;
122 } /* int srrd_update */
123   /* #endif HAVE_THREADSAFE_LIBRRD */
124
125 #else  /* !HAVE_THREADSAFE_LIBRRD */
126 static int srrd_update(char *filename, char *template, int argc,
127                        const char **argv) {
128   int status;
129
130   int new_argc;
131   char **new_argv;
132
133   assert(template == NULL);
134
135   new_argc = 2 + argc;
136   new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137   if (new_argv == NULL) {
138     ERROR("rrdtool plugin: malloc failed.");
139     return -1;
140   }
141
142   new_argv[0] = "update";
143   new_argv[1] = filename;
144
145   memcpy(new_argv + 2, argv, argc * sizeof(char *));
146   new_argv[new_argc] = NULL;
147
148   pthread_mutex_lock(&librrd_lock);
149   optind = 0; /* bug in librrd? */
150   rrd_clear_error();
151
152   status = rrd_update(new_argc, new_argv);
153   pthread_mutex_unlock(&librrd_lock);
154
155   if (status != 0) {
156     WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
157             rrd_get_error());
158   }
159
160   sfree(new_argv);
161
162   return status;
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
165
166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167                                          const data_set_t *ds,
168                                          const value_list_t *vl) {
169   int offset;
170   int status;
171   time_t tt;
172
173   memset(buffer, '\0', buffer_len);
174
175   tt = CDTIME_T_TO_TIME_T(vl->time);
176   status = ssnprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177   if ((status < 1) || (status >= buffer_len))
178     return -1;
179   offset = status;
180
181   for (size_t i = 0; i < ds->ds_num; i++) {
182     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183         (ds->ds[i].type != DS_TYPE_GAUGE) &&
184         (ds->ds[i].type != DS_TYPE_DERIVE) &&
185         (ds->ds[i].type != DS_TYPE_ABSOLUTE))
186       return -1;
187
188     if (ds->ds[i].type == DS_TYPE_COUNTER)
189       status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
190                          (uint64_t)vl->values[i].counter);
191     else if (ds->ds[i].type == DS_TYPE_GAUGE)
192       status = ssnprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193                          vl->values[i].gauge);
194     else if (ds->ds[i].type == DS_TYPE_DERIVE)
195       status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196                          vl->values[i].derive);
197     else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198       status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199                          vl->values[i].absolute);
200
201     if ((status < 1) || (status >= (buffer_len - offset)))
202       return -1;
203
204     offset += status;
205   } /* for ds->ds_num */
206
207   return 0;
208 } /* int value_list_to_string_multiple */
209
210 static int value_list_to_string(char *buffer, int buffer_len,
211                                 const data_set_t *ds, const value_list_t *vl) {
212   int status;
213   time_t tt;
214
215   if (ds->ds_num != 1)
216     return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
217
218   tt = CDTIME_T_TO_TIME_T(vl->time);
219   switch (ds->ds[0].type) {
220   case DS_TYPE_DERIVE:
221     status = ssnprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222                        vl->values[0].derive);
223     break;
224   case DS_TYPE_GAUGE:
225     status = ssnprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226                        vl->values[0].gauge);
227     break;
228   case DS_TYPE_COUNTER:
229     status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
230                        (uint64_t)vl->values[0].counter);
231     break;
232   case DS_TYPE_ABSOLUTE:
233     status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234                        vl->values[0].absolute);
235     break;
236   default:
237     return EINVAL;
238   }
239
240   if ((status < 1) || (status >= buffer_len))
241     return ENOMEM;
242
243   return 0;
244 } /* int value_list_to_string */
245
246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247                                   value_list_t const *vl) {
248   char const suffix[] = ".rrd";
249   int status;
250   size_t len;
251
252   if (datadir != NULL) {
253     size_t datadir_len = strlen(datadir) + 1;
254
255     if (datadir_len >= buffer_size)
256       return ENOMEM;
257
258     sstrncpy(buffer, datadir, buffer_size);
259     buffer[datadir_len - 1] = '/';
260     buffer[datadir_len] = 0;
261
262     buffer += datadir_len;
263     buffer_size -= datadir_len;
264   }
265
266   status = FORMAT_VL(buffer, buffer_size, vl);
267   if (status != 0)
268     return status;
269
270   len = strlen(buffer);
271   assert(len < buffer_size);
272   buffer += len;
273   buffer_size -= len;
274
275   if (buffer_size <= sizeof(suffix))
276     return ENOMEM;
277
278   memcpy(buffer, suffix, sizeof(suffix));
279   return 0;
280 } /* int value_list_to_filename */
281
282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283   struct timeval tv_next_update;
284   struct timeval tv_now;
285
286   gettimeofday(&tv_next_update, /* timezone = */ NULL);
287
288   while (42) {
289     rrd_queue_t *queue_entry;
290     rrd_cache_t *cache_entry;
291     char **values;
292     int values_num;
293     int status;
294
295     values = NULL;
296     values_num = 0;
297
298     pthread_mutex_lock(&queue_lock);
299     /* Wait for values to arrive */
300     while (42) {
301       struct timespec ts_wait;
302
303       while ((flushq_head == NULL) && (queue_head == NULL) &&
304              (do_shutdown == 0))
305         pthread_cond_wait(&queue_cond, &queue_lock);
306
307       if ((flushq_head == NULL) && (queue_head == NULL))
308         break;
309
310       /* Don't delay if there's something to flush */
311       if (flushq_head != NULL)
312         break;
313
314       /* Don't delay if we're shutting down */
315       if (do_shutdown != 0)
316         break;
317
318       /* Don't delay if no delay was configured. */
319       if (write_rate <= 0.0)
320         break;
321
322       gettimeofday(&tv_now, /* timezone = */ NULL);
323       status = timeval_cmp(tv_next_update, tv_now, NULL);
324       /* We're good to go */
325       if (status <= 0)
326         break;
327
328       /* We're supposed to wait a bit with this update, so we'll
329        * wait for the next addition to the queue or to the end of
330        * the wait period - whichever comes first. */
331       ts_wait.tv_sec = tv_next_update.tv_sec;
332       ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
333
334       status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335       if (status == ETIMEDOUT)
336         break;
337     } /* while (42) */
338
339     /* XXX: If you need to lock both, cache_lock and queue_lock, at
340      * the same time, ALWAYS lock `cache_lock' first! */
341
342     /* We're in the shutdown phase */
343     if ((flushq_head == NULL) && (queue_head == NULL)) {
344       pthread_mutex_unlock(&queue_lock);
345       break;
346     }
347
348     if (flushq_head != NULL) {
349       /* Dequeue the first flush entry */
350       queue_entry = flushq_head;
351       if (flushq_head == flushq_tail)
352         flushq_head = flushq_tail = NULL;
353       else
354         flushq_head = flushq_head->next;
355     } else /* if (queue_head != NULL) */
356     {
357       /* Dequeue the first regular entry */
358       queue_entry = queue_head;
359       if (queue_head == queue_tail)
360         queue_head = queue_tail = NULL;
361       else
362         queue_head = queue_head->next;
363     }
364
365     /* Unlock the queue again */
366     pthread_mutex_unlock(&queue_lock);
367
368     /* We now need the cache lock so the entry isn't updated while
369      * we make a copy of its values */
370     pthread_mutex_lock(&cache_lock);
371
372     status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
373
374     if (status == 0) {
375       values = cache_entry->values;
376       values_num = cache_entry->values_num;
377
378       cache_entry->values = NULL;
379       cache_entry->values_num = 0;
380       cache_entry->flags = FLAG_NONE;
381     }
382
383     pthread_mutex_unlock(&cache_lock);
384
385     if (status != 0) {
386       sfree(queue_entry->filename);
387       sfree(queue_entry);
388       continue;
389     }
390
391     /* Update `tv_next_update' */
392     if (write_rate > 0.0) {
393       gettimeofday(&tv_now, /* timezone = */ NULL);
394       tv_next_update.tv_sec = tv_now.tv_sec;
395       tv_next_update.tv_usec =
396           tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397       while (tv_next_update.tv_usec > 1000000) {
398         tv_next_update.tv_sec++;
399         tv_next_update.tv_usec -= 1000000;
400       }
401     }
402
403     /* Write the values to the RRD-file */
404     srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405     DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406           (values_num == 1) ? "" : "s", queue_entry->filename);
407
408     for (int i = 0; i < values_num; i++) {
409       sfree(values[i]);
410     }
411     sfree(values);
412     sfree(queue_entry->filename);
413     sfree(queue_entry);
414   } /* while (42) */
415
416   pthread_exit((void *)0);
417   return (void *)0;
418 } /* void *rrd_queue_thread */
419
420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421                              rrd_queue_t **tail) {
422   rrd_queue_t *queue_entry;
423
424   queue_entry = malloc(sizeof(*queue_entry));
425   if (queue_entry == NULL)
426     return -1;
427
428   queue_entry->filename = strdup(filename);
429   if (queue_entry->filename == NULL) {
430     free(queue_entry);
431     return -1;
432   }
433
434   queue_entry->next = NULL;
435
436   pthread_mutex_lock(&queue_lock);
437
438   if (*tail == NULL)
439     *head = queue_entry;
440   else
441     (*tail)->next = queue_entry;
442   *tail = queue_entry;
443
444   pthread_cond_signal(&queue_cond);
445   pthread_mutex_unlock(&queue_lock);
446
447   return 0;
448 } /* int rrd_queue_enqueue */
449
450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451                              rrd_queue_t **tail) {
452   rrd_queue_t *this;
453   rrd_queue_t *prev;
454
455   pthread_mutex_lock(&queue_lock);
456
457   prev = NULL;
458   this = *head;
459
460   while (this != NULL) {
461     if (strcmp(this->filename, filename) == 0)
462       break;
463
464     prev = this;
465     this = this->next;
466   }
467
468   if (this == NULL) {
469     pthread_mutex_unlock(&queue_lock);
470     return -1;
471   }
472
473   if (prev == NULL)
474     *head = this->next;
475   else
476     prev->next = this->next;
477
478   if (this->next == NULL)
479     *tail = prev;
480
481   pthread_mutex_unlock(&queue_lock);
482
483   sfree(this->filename);
484   sfree(this);
485
486   return 0;
487 } /* int rrd_queue_dequeue */
488
489 /* XXX: You must hold "cache_lock" when calling this function! */
490 static void rrd_cache_flush(cdtime_t timeout) {
491   rrd_cache_t *rc;
492   cdtime_t now;
493
494   char **keys = NULL;
495   int keys_num = 0;
496
497   char *key;
498   c_avl_iterator_t *iter;
499
500   DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501         CDTIME_T_TO_DOUBLE(timeout));
502
503   now = cdtime();
504
505   /* Build a list of entries to be flushed */
506   iter = c_avl_get_iterator(cache);
507   while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508     if (rc->flags != FLAG_NONE)
509       continue;
510     /* timeout == 0  =>  flush everything */
511     else if ((timeout != 0) && ((now - rc->first_value) < timeout))
512       continue;
513     else if (rc->values_num > 0) {
514       int status;
515
516       status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
517       if (status == 0)
518         rc->flags = FLAG_QUEUED;
519     } else /* ancient and no values -> waste of memory */
520     {
521       char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
522       if (tmp == NULL) {
523         ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
524         c_avl_iterator_destroy(iter);
525         sfree(keys);
526         return;
527       }
528       keys = tmp;
529       keys[keys_num] = key;
530       keys_num++;
531     }
532   } /* while (c_avl_iterator_next) */
533   c_avl_iterator_destroy(iter);
534
535   for (int i = 0; i < keys_num; i++) {
536     if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
537       DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
538       continue;
539     }
540
541     assert(rc->values == NULL);
542     assert(rc->values_num == 0);
543
544     sfree(rc);
545     sfree(key);
546     keys[i] = NULL;
547   } /* for (i = 0..keys_num) */
548
549   sfree(keys);
550
551   cache_flush_last = now;
552 } /* void rrd_cache_flush */
553
554 static int rrd_cache_flush_identifier(cdtime_t timeout,
555                                       const char *identifier) {
556   rrd_cache_t *rc;
557   cdtime_t now;
558   int status;
559   char key[2048];
560
561   if (identifier == NULL) {
562     rrd_cache_flush(timeout);
563     return 0;
564   }
565
566   now = cdtime();
567
568   if (datadir == NULL)
569     ssnprintf(key, sizeof(key), "%s.rrd", identifier);
570   else
571     ssnprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
572   key[sizeof(key) - 1] = '\0';
573
574   status = c_avl_get(cache, key, (void *)&rc);
575   if (status != 0) {
576     INFO("rrdtool plugin: rrd_cache_flush_identifier: "
577          "c_avl_get (%s) failed. Does that file really exist?",
578          key);
579     return status;
580   }
581
582   if (rc->flags == FLAG_FLUSHQ) {
583     status = 0;
584   } else if (rc->flags == FLAG_QUEUED) {
585     rrd_queue_dequeue(key, &queue_head, &queue_tail);
586     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
587     if (status == 0)
588       rc->flags = FLAG_FLUSHQ;
589   } else if ((now - rc->first_value) < timeout) {
590     status = 0;
591   } else if (rc->values_num > 0) {
592     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
593     if (status == 0)
594       rc->flags = FLAG_FLUSHQ;
595   }
596
597   return status;
598 } /* int rrd_cache_flush_identifier */
599
600 static int64_t rrd_get_random_variation(void) {
601   if (random_timeout == 0)
602     return 0;
603
604   return (int64_t)cdrand_range(-random_timeout, random_timeout);
605 } /* int64_t rrd_get_random_variation */
606
607 static int rrd_cache_insert(const char *filename, const char *value,
608                             cdtime_t value_time) {
609   rrd_cache_t *rc = NULL;
610   int new_rc = 0;
611   char **values_new;
612
613   pthread_mutex_lock(&cache_lock);
614
615   /* This shouldn't happen, but it did happen at least once, so we'll be
616    * careful. */
617   if (cache == NULL) {
618     pthread_mutex_unlock(&cache_lock);
619     WARNING("rrdtool plugin: cache == NULL.");
620     return -1;
621   }
622
623   int status = c_avl_get(cache, filename, (void *)&rc);
624   if ((status != 0) || (rc == NULL)) {
625     rc = malloc(sizeof(*rc));
626     if (rc == NULL) {
627       ERROR("rrdtool plugin: malloc failed: %s", STRERRNO);
628       pthread_mutex_unlock(&cache_lock);
629       return -1;
630     }
631     rc->values_num = 0;
632     rc->values = NULL;
633     rc->first_value = 0;
634     rc->last_value = 0;
635     rc->random_variation = rrd_get_random_variation();
636     rc->flags = FLAG_NONE;
637     new_rc = 1;
638   }
639
640   assert(value_time > 0); /* plugin_dispatch() ensures this. */
641   if (rc->last_value >= value_time) {
642     pthread_mutex_unlock(&cache_lock);
643     DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
644           ">= (value_time = %" PRIu64 ")",
645           rc->last_value, value_time);
646     return -1;
647   }
648
649   values_new =
650       realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
651   if (values_new == NULL) {
652     void *cache_key = NULL;
653
654     c_avl_remove(cache, filename, &cache_key, NULL);
655     pthread_mutex_unlock(&cache_lock);
656
657     ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
658
659     sfree(cache_key);
660     sfree(rc->values);
661     sfree(rc);
662     return -1;
663   }
664   rc->values = values_new;
665
666   rc->values[rc->values_num] = strdup(value);
667   if (rc->values[rc->values_num] != NULL)
668     rc->values_num++;
669
670   if (rc->values_num == 1)
671     rc->first_value = value_time;
672   rc->last_value = value_time;
673
674   /* Insert if this is the first value */
675   if (new_rc == 1) {
676     void *cache_key = strdup(filename);
677
678     if (cache_key == NULL) {
679       pthread_mutex_unlock(&cache_lock);
680
681       ERROR("rrdtool plugin: strdup failed: %s", STRERRNO);
682
683       sfree(rc->values[0]);
684       sfree(rc->values);
685       sfree(rc);
686       return -1;
687     }
688
689     c_avl_insert(cache, cache_key, rc);
690   }
691
692   DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
693         "values_num = %i; age = %.3f;",
694         filename, rc->values_num,
695         CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
696
697   if ((rc->last_value - rc->first_value) >=
698       (cache_timeout + rc->random_variation)) {
699     /* XXX: If you need to lock both, cache_lock and queue_lock, at
700      * the same time, ALWAYS lock `cache_lock' first! */
701     if (rc->flags == FLAG_NONE) {
702       int status;
703
704       status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
705       if (status == 0)
706         rc->flags = FLAG_QUEUED;
707
708       rc->random_variation = rrd_get_random_variation();
709     } else {
710       DEBUG("rrdtool plugin: `%s' is already queued.", filename);
711     }
712   }
713
714   if ((cache_timeout > 0) &&
715       ((cdtime() - cache_flush_last) > cache_flush_timeout))
716     rrd_cache_flush(cache_timeout + random_timeout);
717
718   pthread_mutex_unlock(&cache_lock);
719
720   return 0;
721 } /* int rrd_cache_insert */
722
723 static int rrd_cache_destroy(void) /* {{{ */
724 {
725   void *key = NULL;
726   void *value = NULL;
727
728   int non_empty = 0;
729
730   pthread_mutex_lock(&cache_lock);
731
732   if (cache == NULL) {
733     pthread_mutex_unlock(&cache_lock);
734     return 0;
735   }
736
737   while (c_avl_pick(cache, &key, &value) == 0) {
738     rrd_cache_t *rc;
739
740     sfree(key);
741     key = NULL;
742
743     rc = value;
744     value = NULL;
745
746     if (rc->values_num > 0)
747       non_empty++;
748
749     for (int i = 0; i < rc->values_num; i++)
750       sfree(rc->values[i]);
751     sfree(rc->values);
752     sfree(rc);
753   }
754
755   c_avl_destroy(cache);
756   cache = NULL;
757
758   if (non_empty > 0) {
759     INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
760          non_empty, (non_empty == 1) ? "entry" : "entries");
761   } else {
762     DEBUG("rrdtool plugin: No values have been lost "
763           "when destroying the cache.");
764   }
765
766   pthread_mutex_unlock(&cache_lock);
767   return 0;
768 } /* }}} int rrd_cache_destroy */
769
770 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
771   int a = *((int *)a_ptr);
772   int b = *((int *)b_ptr);
773
774   if (a < b)
775     return -1;
776   else if (a > b)
777     return 1;
778   else
779     return 0;
780 } /* int rrd_compare_numeric */
781
782 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
783                      user_data_t __attribute__((unused)) * user_data) {
784
785   if (do_shutdown)
786     return 0;
787
788   if (0 != strcmp(ds->type, vl->type)) {
789     ERROR("rrdtool plugin: DS type does not match value list type");
790     return -1;
791   }
792
793   char filename[PATH_MAX];
794   if (value_list_to_filename(filename, sizeof(filename), vl) != 0) {
795     ERROR("rrdtool plugin: failed to build filename");
796     return -1;
797   }
798
799   char values[32 * (ds->ds_num + 1)];
800   if (value_list_to_string(values, sizeof(values), ds, vl) != 0) {
801     ERROR("rrdtool plugin: failed to build values string");
802     return -1;
803   }
804
805   struct stat statbuf = {0};
806   if (stat(filename, &statbuf) == -1) {
807     if (errno == ENOENT) {
808       if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
809         ERROR("rrdtool plugin: cu_rrd_create_file (%s) failed.", filename);
810         return -1;
811       } else if (rrdcreate_config.async) {
812         return 0;
813       }
814     } else {
815       ERROR("rrdtool plugin: stat(%s) failed: %s", filename, STRERRNO);
816       return -1;
817     }
818   } else if (!S_ISREG(statbuf.st_mode)) {
819     ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
820     return -1;
821   }
822
823   return rrd_cache_insert(filename, values, vl->time);
824 } /* int rrd_write */
825
826 static int rrd_flush(cdtime_t timeout, const char *identifier,
827                      __attribute__((unused)) user_data_t *user_data) {
828   pthread_mutex_lock(&cache_lock);
829
830   if (cache == NULL) {
831     pthread_mutex_unlock(&cache_lock);
832     return 0;
833   }
834
835   rrd_cache_flush_identifier(timeout, identifier);
836
837   pthread_mutex_unlock(&cache_lock);
838   return 0;
839 } /* int rrd_flush */
840
841 static int rrd_config(const char *key, const char *value) {
842   if (strcasecmp("CacheTimeout", key) == 0) {
843     double tmp = atof(value);
844     if (tmp < 0) {
845       fprintf(stderr, "rrdtool: `CacheTimeout' must "
846                       "be greater than 0.\n");
847       ERROR("rrdtool: `CacheTimeout' must "
848             "be greater than 0.\n");
849       return 1;
850     }
851     cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
852   } else if (strcasecmp("CacheFlush", key) == 0) {
853     double tmp = atof(value);
854     if (tmp < 0) {
855       fprintf(stderr, "rrdtool: `CacheFlush' must "
856                       "be greater than 0.\n");
857       ERROR("rrdtool: `CacheFlush' must "
858             "be greater than 0.\n");
859       return 1;
860     }
861     cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
862   } else if (strcasecmp("DataDir", key) == 0) {
863     char *tmp;
864     size_t len;
865
866     tmp = strdup(value);
867     if (tmp == NULL) {
868       ERROR("rrdtool plugin: strdup failed.");
869       return 1;
870     }
871
872     len = strlen(tmp);
873     while ((len > 0) && (tmp[len - 1] == '/')) {
874       len--;
875       tmp[len] = 0;
876     }
877
878     if (len == 0) {
879       ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
880       sfree(tmp);
881       return 1;
882     }
883
884     if (datadir != NULL) {
885       sfree(datadir);
886     }
887
888     datadir = tmp;
889   } else if (strcasecmp("StepSize", key) == 0) {
890     unsigned long temp = strtoul(value, NULL, 0);
891     if (temp > 0)
892       rrdcreate_config.stepsize = temp;
893   } else if (strcasecmp("HeartBeat", key) == 0) {
894     int temp = atoi(value);
895     if (temp > 0)
896       rrdcreate_config.heartbeat = temp;
897   } else if (strcasecmp("CreateFilesAsync", key) == 0) {
898     if (IS_TRUE(value))
899       rrdcreate_config.async = 1;
900     else
901       rrdcreate_config.async = 0;
902   } else if (strcasecmp("RRARows", key) == 0) {
903     int tmp = atoi(value);
904     if (tmp <= 0) {
905       fprintf(stderr, "rrdtool: `RRARows' must "
906                       "be greater than 0.\n");
907       ERROR("rrdtool: `RRARows' must "
908             "be greater than 0.\n");
909       return 1;
910     }
911     rrdcreate_config.rrarows = tmp;
912   } else if (strcasecmp("RRATimespan", key) == 0) {
913     char *saveptr = NULL;
914     char *dummy;
915     char *ptr;
916     char *value_copy;
917     int *tmp_alloc;
918
919     value_copy = strdup(value);
920     if (value_copy == NULL)
921       return 1;
922
923     dummy = value_copy;
924     while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
925       dummy = NULL;
926
927       tmp_alloc = realloc(rrdcreate_config.timespans,
928                           sizeof(int) * (rrdcreate_config.timespans_num + 1));
929       if (tmp_alloc == NULL) {
930         fprintf(stderr, "rrdtool: realloc failed.\n");
931         ERROR("rrdtool: realloc failed.\n");
932         free(value_copy);
933         return 1;
934       }
935       rrdcreate_config.timespans = tmp_alloc;
936       rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
937       if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
938         rrdcreate_config.timespans_num++;
939     } /* while (strtok_r) */
940
941     qsort(/* base = */ rrdcreate_config.timespans,
942           /* nmemb  = */ rrdcreate_config.timespans_num,
943           /* size   = */ sizeof(rrdcreate_config.timespans[0]),
944           /* compar = */ rrd_compare_numeric);
945
946     free(value_copy);
947   } else if (strcasecmp("XFF", key) == 0) {
948     double tmp = atof(value);
949     if ((tmp < 0.0) || (tmp >= 1.0)) {
950       fprintf(stderr, "rrdtool: `XFF' must "
951                       "be in the range 0 to 1 (exclusive).");
952       ERROR("rrdtool: `XFF' must "
953             "be in the range 0 to 1 (exclusive).");
954       return 1;
955     }
956     rrdcreate_config.xff = tmp;
957   } else if (strcasecmp("WritesPerSecond", key) == 0) {
958     double wps = atof(value);
959
960     if (wps < 0.0) {
961       fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
962                       "greater than or equal to zero.");
963       return 1;
964     } else if (wps == 0.0) {
965       write_rate = 0.0;
966     } else {
967       write_rate = 1.0 / wps;
968     }
969   } else if (strcasecmp("RandomTimeout", key) == 0) {
970     double tmp;
971
972     tmp = atof(value);
973     if (tmp < 0.0) {
974       fprintf(stderr, "rrdtool: `RandomTimeout' must "
975                       "be greater than or equal to zero.\n");
976       ERROR("rrdtool: `RandomTimeout' must "
977             "be greater then or equal to zero.");
978     } else {
979       random_timeout = DOUBLE_TO_CDTIME_T(tmp);
980     }
981   } else {
982     return -1;
983   }
984   return 0;
985 } /* int rrd_config */
986
987 static int rrd_shutdown(void) {
988   pthread_mutex_lock(&cache_lock);
989   rrd_cache_flush(0);
990   pthread_mutex_unlock(&cache_lock);
991
992   pthread_mutex_lock(&queue_lock);
993   do_shutdown = 1;
994   pthread_cond_signal(&queue_cond);
995   pthread_mutex_unlock(&queue_lock);
996
997   if ((queue_thread_running != 0) &&
998       ((queue_head != NULL) || (flushq_head != NULL))) {
999     INFO("rrdtool plugin: Shutting down the queue thread. "
1000          "This may take a while.");
1001   } else if (queue_thread_running != 0) {
1002     INFO("rrdtool plugin: Shutting down the queue thread.");
1003   }
1004
1005   /* Wait for all the values to be written to disk before returning. */
1006   if (queue_thread_running != 0) {
1007     pthread_join(queue_thread, NULL);
1008     memset(&queue_thread, 0, sizeof(queue_thread));
1009     queue_thread_running = 0;
1010     DEBUG("rrdtool plugin: queue_thread exited.");
1011   }
1012
1013   rrd_cache_destroy();
1014
1015   return 0;
1016 } /* int rrd_shutdown */
1017
1018 static int rrd_init(void) {
1019   static int init_once;
1020
1021   if (init_once != 0)
1022     return 0;
1023   init_once = 1;
1024
1025   if (rrdcreate_config.heartbeat <= 0)
1026     rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1027
1028   /* Set the cache up */
1029   pthread_mutex_lock(&cache_lock);
1030
1031   cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1032   if (cache == NULL) {
1033     pthread_mutex_unlock(&cache_lock);
1034     ERROR("rrdtool plugin: c_avl_create failed.");
1035     return -1;
1036   }
1037
1038   cache_flush_last = cdtime();
1039   if (cache_timeout == 0) {
1040     random_timeout = 0;
1041     cache_flush_timeout = 0;
1042   } else if (cache_flush_timeout < cache_timeout) {
1043     INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1044          "%.3f\". Adjusting \"CacheFlush\" to %.3f seconds.",
1045          CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1046          CDTIME_T_TO_DOUBLE(cache_timeout),
1047          CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1048     cache_flush_timeout = 10 * cache_timeout;
1049   }
1050
1051   /* Assure that "cache_timeout + random_variation" is never negative. */
1052   if (random_timeout > cache_timeout) {
1053     INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1054          CDTIME_T_TO_DOUBLE(cache_timeout));
1055     random_timeout = cache_timeout;
1056   }
1057
1058   pthread_mutex_unlock(&cache_lock);
1059
1060   int status =
1061       plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1062                            /* args = */ NULL, "rrdtool queue");
1063   if (status != 0) {
1064     ERROR("rrdtool plugin: Cannot create queue-thread.");
1065     return -1;
1066   }
1067   queue_thread_running = 1;
1068
1069   DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1070         " heartbeat = %i; rrarows = %i; xff = %lf;",
1071         (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1072         rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1073         rrdcreate_config.xff);
1074
1075   return 0;
1076 } /* int rrd_init */
1077
1078 void module_register(void) {
1079   plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1080   plugin_register_init("rrdtool", rrd_init);
1081   plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1082   plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1083   plugin_register_shutdown("rrdtool", rrd_shutdown);
1084 }