Replace zu with PRIu64 and llu with new macro, PRIsz, which will make it easier to...
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2013  Florian octo Forster
4  * Copyright (C) 2008-2008  Sebastian Harl
5  * Copyright (C) 2009       Mariusz Gronczewski
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at collectd.org>
22  *   Sebastian Harl <sh at tokkee.org>
23  *   Mariusz Gronczewski <xani666 at gmail.com>
24  **/
25
26 #include "collectd.h"
27
28 #include "common.h"
29 #include "plugin.h"
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
33
34 #include <rrd.h>
35
36 /*
37  * Private types
38  */
39 typedef struct rrd_cache_s {
40   int values_num;
41   char **values;
42   cdtime_t first_value;
43   cdtime_t last_value;
44   int64_t random_variation;
45   enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
46 } rrd_cache_t;
47
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
50
51 struct rrd_queue_s {
52   char *filename;
53   struct rrd_queue_s *next;
54 };
55 typedef struct rrd_queue_s rrd_queue_t;
56
57 /*
58  * Private variables
59  */
60 static const char *config_keys[] = {
61     "CacheTimeout", "CacheFlush",      "CreateFilesAsync", "DataDir",
62     "StepSize",     "HeartBeat",       "RRARows",          "RRATimespan",
63     "XFF",          "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
65
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67  * is zero a default, depending on the `interval' member of the value list is
68  * being used. */
69 static char *datadir = NULL;
70 static double write_rate = 0.0;
71 static rrdcreate_config_t rrdcreate_config = {
72     /* stepsize = */ 0,
73     /* heartbeat = */ 0,
74     /* rrarows = */ 1200,
75     /* xff = */ 0.1,
76
77     /* timespans = */ NULL,
78     /* timespans_num = */ 0,
79
80     /* consolidation_functions = */ NULL,
81     /* consolidation_functions_num = */ 0,
82
83     /* async = */ 0};
84
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86  * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout = 0;
88 static cdtime_t cache_flush_timeout = 0;
89 static cdtime_t random_timeout = 0;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache = NULL;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
93
94 static rrd_queue_t *queue_head = NULL;
95 static rrd_queue_t *queue_tail = NULL;
96 static rrd_queue_t *flushq_head = NULL;
97 static rrd_queue_t *flushq_tail = NULL;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
102
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
105 #endif
106
107 static int do_shutdown = 0;
108
109 #if HAVE_THREADSAFE_LIBRRD
110 static int srrd_update(char *filename, char *template, int argc,
111                        const char **argv) {
112   optind = 0; /* bug in librrd? */
113   rrd_clear_error();
114
115   int status = rrd_update_r(filename, template, argc, (void *)argv);
116   if (status != 0) {
117     WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
118             rrd_get_error());
119   }
120
121   return status;
122 } /* int srrd_update */
123 /* #endif HAVE_THREADSAFE_LIBRRD */
124
125 #else  /* !HAVE_THREADSAFE_LIBRRD */
126 static int srrd_update(char *filename, char *template, int argc,
127                        const char **argv) {
128   int status;
129
130   int new_argc;
131   char **new_argv;
132
133   assert(template == NULL);
134
135   new_argc = 2 + argc;
136   new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137   if (new_argv == NULL) {
138     ERROR("rrdtool plugin: malloc failed.");
139     return -1;
140   }
141
142   new_argv[0] = "update";
143   new_argv[1] = filename;
144
145   memcpy(new_argv + 2, argv, argc * sizeof(char *));
146   new_argv[new_argc] = NULL;
147
148   pthread_mutex_lock(&librrd_lock);
149   optind = 0; /* bug in librrd? */
150   rrd_clear_error();
151
152   status = rrd_update(new_argc, new_argv);
153   pthread_mutex_unlock(&librrd_lock);
154
155   if (status != 0) {
156     WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
157             rrd_get_error());
158   }
159
160   sfree(new_argv);
161
162   return status;
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
165
166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167                                          const data_set_t *ds,
168                                          const value_list_t *vl) {
169   int offset;
170   int status;
171   time_t tt;
172
173   memset(buffer, '\0', buffer_len);
174
175   tt = CDTIME_T_TO_TIME_T(vl->time);
176   status = snprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177   if ((status < 1) || (status >= buffer_len))
178     return -1;
179   offset = status;
180
181   for (size_t i = 0; i < ds->ds_num; i++) {
182     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183         (ds->ds[i].type != DS_TYPE_GAUGE) &&
184         (ds->ds[i].type != DS_TYPE_DERIVE) &&
185         (ds->ds[i].type != DS_TYPE_ABSOLUTE))
186       return -1;
187
188     if (ds->ds[i].type == DS_TYPE_COUNTER)
189       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
190                         (uint64_t)vl->values[i].counter);
191     else if (ds->ds[i].type == DS_TYPE_GAUGE)
192       status = snprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193                         vl->values[i].gauge);
194     else if (ds->ds[i].type == DS_TYPE_DERIVE)
195       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196                         vl->values[i].derive);
197     else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199                         vl->values[i].absolute);
200
201     if ((status < 1) || (status >= (buffer_len - offset)))
202       return -1;
203
204     offset += status;
205   } /* for ds->ds_num */
206
207   return 0;
208 } /* int value_list_to_string_multiple */
209
210 static int value_list_to_string(char *buffer, int buffer_len,
211                                 const data_set_t *ds, const value_list_t *vl) {
212   int status;
213   time_t tt;
214
215   if (ds->ds_num != 1)
216     return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
217
218   tt = CDTIME_T_TO_TIME_T(vl->time);
219   switch (ds->ds[0].type) {
220   case DS_TYPE_DERIVE:
221     status = snprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222                       vl->values[0].derive);
223     break;
224   case DS_TYPE_GAUGE:
225     status = snprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226                       vl->values[0].gauge);
227     break;
228   case DS_TYPE_COUNTER:
229     status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
230                       (uint64_t)vl->values[0].counter);
231     break;
232   case DS_TYPE_ABSOLUTE:
233     status = snprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234                       vl->values[0].absolute);
235     break;
236   default:
237     return EINVAL;
238   }
239
240   if ((status < 1) || (status >= buffer_len))
241     return ENOMEM;
242
243   return 0;
244 } /* int value_list_to_string */
245
246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247                                   value_list_t const *vl) {
248   char const suffix[] = ".rrd";
249   int status;
250   size_t len;
251
252   if (datadir != NULL) {
253     size_t datadir_len = strlen(datadir) + 1;
254
255     if (datadir_len >= buffer_size)
256       return ENOMEM;
257
258     sstrncpy(buffer, datadir, buffer_size);
259     buffer[datadir_len - 1] = '/';
260     buffer[datadir_len] = 0;
261
262     buffer += datadir_len;
263     buffer_size -= datadir_len;
264   }
265
266   status = FORMAT_VL(buffer, buffer_size, vl);
267   if (status != 0)
268     return status;
269
270   len = strlen(buffer);
271   assert(len < buffer_size);
272   buffer += len;
273   buffer_size -= len;
274
275   if (buffer_size <= sizeof(suffix))
276     return ENOMEM;
277
278   memcpy(buffer, suffix, sizeof(suffix));
279   return 0;
280 } /* int value_list_to_filename */
281
282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283   struct timeval tv_next_update;
284   struct timeval tv_now;
285
286   gettimeofday(&tv_next_update, /* timezone = */ NULL);
287
288   while (42) {
289     rrd_queue_t *queue_entry;
290     rrd_cache_t *cache_entry;
291     char **values;
292     int values_num;
293     int status;
294
295     values = NULL;
296     values_num = 0;
297
298     pthread_mutex_lock(&queue_lock);
299     /* Wait for values to arrive */
300     while (42) {
301       struct timespec ts_wait;
302
303       while ((flushq_head == NULL) && (queue_head == NULL) &&
304              (do_shutdown == 0))
305         pthread_cond_wait(&queue_cond, &queue_lock);
306
307       if ((flushq_head == NULL) && (queue_head == NULL))
308         break;
309
310       /* Don't delay if there's something to flush */
311       if (flushq_head != NULL)
312         break;
313
314       /* Don't delay if we're shutting down */
315       if (do_shutdown != 0)
316         break;
317
318       /* Don't delay if no delay was configured. */
319       if (write_rate <= 0.0)
320         break;
321
322       gettimeofday(&tv_now, /* timezone = */ NULL);
323       status = timeval_cmp(tv_next_update, tv_now, NULL);
324       /* We're good to go */
325       if (status <= 0)
326         break;
327
328       /* We're supposed to wait a bit with this update, so we'll
329        * wait for the next addition to the queue or to the end of
330        * the wait period - whichever comes first. */
331       ts_wait.tv_sec = tv_next_update.tv_sec;
332       ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
333
334       status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335       if (status == ETIMEDOUT)
336         break;
337     } /* while (42) */
338
339     /* XXX: If you need to lock both, cache_lock and queue_lock, at
340      * the same time, ALWAYS lock `cache_lock' first! */
341
342     /* We're in the shutdown phase */
343     if ((flushq_head == NULL) && (queue_head == NULL)) {
344       pthread_mutex_unlock(&queue_lock);
345       break;
346     }
347
348     if (flushq_head != NULL) {
349       /* Dequeue the first flush entry */
350       queue_entry = flushq_head;
351       if (flushq_head == flushq_tail)
352         flushq_head = flushq_tail = NULL;
353       else
354         flushq_head = flushq_head->next;
355     } else /* if (queue_head != NULL) */
356     {
357       /* Dequeue the first regular entry */
358       queue_entry = queue_head;
359       if (queue_head == queue_tail)
360         queue_head = queue_tail = NULL;
361       else
362         queue_head = queue_head->next;
363     }
364
365     /* Unlock the queue again */
366     pthread_mutex_unlock(&queue_lock);
367
368     /* We now need the cache lock so the entry isn't updated while
369      * we make a copy of its values */
370     pthread_mutex_lock(&cache_lock);
371
372     status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
373
374     if (status == 0) {
375       values = cache_entry->values;
376       values_num = cache_entry->values_num;
377
378       cache_entry->values = NULL;
379       cache_entry->values_num = 0;
380       cache_entry->flags = FLAG_NONE;
381     }
382
383     pthread_mutex_unlock(&cache_lock);
384
385     if (status != 0) {
386       sfree(queue_entry->filename);
387       sfree(queue_entry);
388       continue;
389     }
390
391     /* Update `tv_next_update' */
392     if (write_rate > 0.0) {
393       gettimeofday(&tv_now, /* timezone = */ NULL);
394       tv_next_update.tv_sec = tv_now.tv_sec;
395       tv_next_update.tv_usec =
396           tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397       while (tv_next_update.tv_usec > 1000000) {
398         tv_next_update.tv_sec++;
399         tv_next_update.tv_usec -= 1000000;
400       }
401     }
402
403     /* Write the values to the RRD-file */
404     srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405     DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406           (values_num == 1) ? "" : "s", queue_entry->filename);
407
408     for (int i = 0; i < values_num; i++) {
409       sfree(values[i]);
410     }
411     sfree(values);
412     sfree(queue_entry->filename);
413     sfree(queue_entry);
414   } /* while (42) */
415
416   pthread_exit((void *)0);
417   return (void *)0;
418 } /* void *rrd_queue_thread */
419
420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421                              rrd_queue_t **tail) {
422   rrd_queue_t *queue_entry;
423
424   queue_entry = malloc(sizeof(*queue_entry));
425   if (queue_entry == NULL)
426     return -1;
427
428   queue_entry->filename = strdup(filename);
429   if (queue_entry->filename == NULL) {
430     free(queue_entry);
431     return -1;
432   }
433
434   queue_entry->next = NULL;
435
436   pthread_mutex_lock(&queue_lock);
437
438   if (*tail == NULL)
439     *head = queue_entry;
440   else
441     (*tail)->next = queue_entry;
442   *tail = queue_entry;
443
444   pthread_cond_signal(&queue_cond);
445   pthread_mutex_unlock(&queue_lock);
446
447   return 0;
448 } /* int rrd_queue_enqueue */
449
450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451                              rrd_queue_t **tail) {
452   rrd_queue_t *this;
453   rrd_queue_t *prev;
454
455   pthread_mutex_lock(&queue_lock);
456
457   prev = NULL;
458   this = *head;
459
460   while (this != NULL) {
461     if (strcmp(this->filename, filename) == 0)
462       break;
463
464     prev = this;
465     this = this->next;
466   }
467
468   if (this == NULL) {
469     pthread_mutex_unlock(&queue_lock);
470     return -1;
471   }
472
473   if (prev == NULL)
474     *head = this->next;
475   else
476     prev->next = this->next;
477
478   if (this->next == NULL)
479     *tail = prev;
480
481   pthread_mutex_unlock(&queue_lock);
482
483   sfree(this->filename);
484   sfree(this);
485
486   return 0;
487 } /* int rrd_queue_dequeue */
488
489 /* XXX: You must hold "cache_lock" when calling this function! */
490 static void rrd_cache_flush(cdtime_t timeout) {
491   rrd_cache_t *rc;
492   cdtime_t now;
493
494   char **keys = NULL;
495   int keys_num = 0;
496
497   char *key;
498   c_avl_iterator_t *iter;
499
500   DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501         CDTIME_T_TO_DOUBLE(timeout));
502
503   now = cdtime();
504
505   /* Build a list of entries to be flushed */
506   iter = c_avl_get_iterator(cache);
507   while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508     if (rc->flags != FLAG_NONE)
509       continue;
510     /* timeout == 0  =>  flush everything */
511     else if ((timeout != 0) && ((now - rc->first_value) < timeout))
512       continue;
513     else if (rc->values_num > 0) {
514       int status;
515
516       status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
517       if (status == 0)
518         rc->flags = FLAG_QUEUED;
519     } else /* ancient and no values -> waste of memory */
520     {
521       char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
522       if (tmp == NULL) {
523         char errbuf[1024];
524         ERROR("rrdtool plugin: "
525               "realloc failed: %s",
526               sstrerror(errno, errbuf, sizeof(errbuf)));
527         c_avl_iterator_destroy(iter);
528         sfree(keys);
529         return;
530       }
531       keys = tmp;
532       keys[keys_num] = key;
533       keys_num++;
534     }
535   } /* while (c_avl_iterator_next) */
536   c_avl_iterator_destroy(iter);
537
538   for (int i = 0; i < keys_num; i++) {
539     if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
540       DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
541       continue;
542     }
543
544     assert(rc->values == NULL);
545     assert(rc->values_num == 0);
546
547     sfree(rc);
548     sfree(key);
549     keys[i] = NULL;
550   } /* for (i = 0..keys_num) */
551
552   sfree(keys);
553
554   cache_flush_last = now;
555 } /* void rrd_cache_flush */
556
557 static int rrd_cache_flush_identifier(cdtime_t timeout,
558                                       const char *identifier) {
559   rrd_cache_t *rc;
560   cdtime_t now;
561   int status;
562   char key[2048];
563
564   if (identifier == NULL) {
565     rrd_cache_flush(timeout);
566     return 0;
567   }
568
569   now = cdtime();
570
571   if (datadir == NULL)
572     snprintf(key, sizeof(key), "%s.rrd", identifier);
573   else
574     snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
575   key[sizeof(key) - 1] = 0;
576
577   status = c_avl_get(cache, key, (void *)&rc);
578   if (status != 0) {
579     INFO("rrdtool plugin: rrd_cache_flush_identifier: "
580          "c_avl_get (%s) failed. Does that file really exist?",
581          key);
582     return status;
583   }
584
585   if (rc->flags == FLAG_FLUSHQ) {
586     status = 0;
587   } else if (rc->flags == FLAG_QUEUED) {
588     rrd_queue_dequeue(key, &queue_head, &queue_tail);
589     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
590     if (status == 0)
591       rc->flags = FLAG_FLUSHQ;
592   } else if ((now - rc->first_value) < timeout) {
593     status = 0;
594   } else if (rc->values_num > 0) {
595     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
596     if (status == 0)
597       rc->flags = FLAG_FLUSHQ;
598   }
599
600   return status;
601 } /* int rrd_cache_flush_identifier */
602
603 static int64_t rrd_get_random_variation(void) {
604   if (random_timeout == 0)
605     return 0;
606
607   return (int64_t)cdrand_range(-random_timeout, random_timeout);
608 } /* int64_t rrd_get_random_variation */
609
610 static int rrd_cache_insert(const char *filename, const char *value,
611                             cdtime_t value_time) {
612   rrd_cache_t *rc = NULL;
613   int new_rc = 0;
614   char **values_new;
615
616   pthread_mutex_lock(&cache_lock);
617
618   /* This shouldn't happen, but it did happen at least once, so we'll be
619    * careful. */
620   if (cache == NULL) {
621     pthread_mutex_unlock(&cache_lock);
622     WARNING("rrdtool plugin: cache == NULL.");
623     return -1;
624   }
625
626   c_avl_get(cache, filename, (void *)&rc);
627
628   if (rc == NULL) {
629     rc = malloc(sizeof(*rc));
630     if (rc == NULL) {
631       pthread_mutex_unlock(&cache_lock);
632       return -1;
633     }
634     rc->values_num = 0;
635     rc->values = NULL;
636     rc->first_value = 0;
637     rc->last_value = 0;
638     rc->random_variation = rrd_get_random_variation();
639     rc->flags = FLAG_NONE;
640     new_rc = 1;
641   }
642
643   assert(value_time > 0); /* plugin_dispatch() ensures this. */
644   if (rc->last_value >= value_time) {
645     pthread_mutex_unlock(&cache_lock);
646     DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
647           ">= (value_time = %" PRIu64 ")",
648           rc->last_value, value_time);
649     return -1;
650   }
651
652   values_new =
653       realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
654   if (values_new == NULL) {
655     char errbuf[1024];
656     void *cache_key = NULL;
657
658     sstrerror(errno, errbuf, sizeof(errbuf));
659
660     c_avl_remove(cache, filename, &cache_key, NULL);
661     pthread_mutex_unlock(&cache_lock);
662
663     ERROR("rrdtool plugin: realloc failed: %s", errbuf);
664
665     sfree(cache_key);
666     sfree(rc->values);
667     sfree(rc);
668     return -1;
669   }
670   rc->values = values_new;
671
672   rc->values[rc->values_num] = strdup(value);
673   if (rc->values[rc->values_num] != NULL)
674     rc->values_num++;
675
676   if (rc->values_num == 1)
677     rc->first_value = value_time;
678   rc->last_value = value_time;
679
680   /* Insert if this is the first value */
681   if (new_rc == 1) {
682     void *cache_key = strdup(filename);
683
684     if (cache_key == NULL) {
685       char errbuf[1024];
686       sstrerror(errno, errbuf, sizeof(errbuf));
687
688       pthread_mutex_unlock(&cache_lock);
689
690       ERROR("rrdtool plugin: strdup failed: %s", errbuf);
691
692       sfree(rc->values[0]);
693       sfree(rc->values);
694       sfree(rc);
695       return -1;
696     }
697
698     c_avl_insert(cache, cache_key, rc);
699   }
700
701   DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
702         "values_num = %i; age = %.3f;",
703         filename, rc->values_num,
704         CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
705
706   if ((rc->last_value - rc->first_value) >=
707       (cache_timeout + rc->random_variation)) {
708     /* XXX: If you need to lock both, cache_lock and queue_lock, at
709      * the same time, ALWAYS lock `cache_lock' first! */
710     if (rc->flags == FLAG_NONE) {
711       int status;
712
713       status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
714       if (status == 0)
715         rc->flags = FLAG_QUEUED;
716
717       rc->random_variation = rrd_get_random_variation();
718     } else {
719       DEBUG("rrdtool plugin: `%s' is already queued.", filename);
720     }
721   }
722
723   if ((cache_timeout > 0) &&
724       ((cdtime() - cache_flush_last) > cache_flush_timeout))
725     rrd_cache_flush(cache_timeout + random_timeout);
726
727   pthread_mutex_unlock(&cache_lock);
728
729   return 0;
730 } /* int rrd_cache_insert */
731
732 static int rrd_cache_destroy(void) /* {{{ */
733 {
734   void *key = NULL;
735   void *value = NULL;
736
737   int non_empty = 0;
738
739   pthread_mutex_lock(&cache_lock);
740
741   if (cache == NULL) {
742     pthread_mutex_unlock(&cache_lock);
743     return 0;
744   }
745
746   while (c_avl_pick(cache, &key, &value) == 0) {
747     rrd_cache_t *rc;
748
749     sfree(key);
750     key = NULL;
751
752     rc = value;
753     value = NULL;
754
755     if (rc->values_num > 0)
756       non_empty++;
757
758     for (int i = 0; i < rc->values_num; i++)
759       sfree(rc->values[i]);
760     sfree(rc->values);
761     sfree(rc);
762   }
763
764   c_avl_destroy(cache);
765   cache = NULL;
766
767   if (non_empty > 0) {
768     INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
769          non_empty, (non_empty == 1) ? "entry" : "entries");
770   } else {
771     DEBUG("rrdtool plugin: No values have been lost "
772           "when destroying the cache.");
773   }
774
775   pthread_mutex_unlock(&cache_lock);
776   return 0;
777 } /* }}} int rrd_cache_destroy */
778
779 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
780   int a = *((int *)a_ptr);
781   int b = *((int *)b_ptr);
782
783   if (a < b)
784     return -1;
785   else if (a > b)
786     return 1;
787   else
788     return 0;
789 } /* int rrd_compare_numeric */
790
791 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
792                      user_data_t __attribute__((unused)) * user_data) {
793
794   if (do_shutdown)
795     return 0;
796
797   if (0 != strcmp(ds->type, vl->type)) {
798     ERROR("rrdtool plugin: DS type does not match value list type");
799     return -1;
800   }
801
802   char filename[PATH_MAX];
803   if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
804     return -1;
805
806   char values[32 * ds->ds_num];
807   if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
808     return -1;
809
810   struct stat statbuf = {0};
811   if (stat(filename, &statbuf) == -1) {
812     if (errno == ENOENT) {
813       if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
814         return -1;
815       } else if (rrdcreate_config.async) {
816         return 0;
817       }
818     } else {
819       char errbuf[1024];
820       ERROR("rrdtool plugin: stat(%s) failed: %s", filename,
821             sstrerror(errno, errbuf, sizeof(errbuf)));
822       return -1;
823     }
824   } else if (!S_ISREG(statbuf.st_mode)) {
825     ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
826     return -1;
827   }
828
829   return rrd_cache_insert(filename, values, vl->time);
830 } /* int rrd_write */
831
832 static int rrd_flush(cdtime_t timeout, const char *identifier,
833                      __attribute__((unused)) user_data_t *user_data) {
834   pthread_mutex_lock(&cache_lock);
835
836   if (cache == NULL) {
837     pthread_mutex_unlock(&cache_lock);
838     return 0;
839   }
840
841   rrd_cache_flush_identifier(timeout, identifier);
842
843   pthread_mutex_unlock(&cache_lock);
844   return 0;
845 } /* int rrd_flush */
846
847 static int rrd_config(const char *key, const char *value) {
848   if (strcasecmp("CacheTimeout", key) == 0) {
849     double tmp = atof(value);
850     if (tmp < 0) {
851       fprintf(stderr, "rrdtool: `CacheTimeout' must "
852                       "be greater than 0.\n");
853       ERROR("rrdtool: `CacheTimeout' must "
854             "be greater than 0.\n");
855       return 1;
856     }
857     cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
858   } else if (strcasecmp("CacheFlush", key) == 0) {
859     double tmp = atof(value);
860     if (tmp < 0) {
861       fprintf(stderr, "rrdtool: `CacheFlush' must "
862                       "be greater than 0.\n");
863       ERROR("rrdtool: `CacheFlush' must "
864             "be greater than 0.\n");
865       return 1;
866     }
867     cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
868   } else if (strcasecmp("DataDir", key) == 0) {
869     char *tmp;
870     size_t len;
871
872     tmp = strdup(value);
873     if (tmp == NULL) {
874       ERROR("rrdtool plugin: strdup failed.");
875       return 1;
876     }
877
878     len = strlen(tmp);
879     while ((len > 0) && (tmp[len - 1] == '/')) {
880       len--;
881       tmp[len] = 0;
882     }
883
884     if (len == 0) {
885       ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
886       sfree(tmp);
887       return 1;
888     }
889
890     if (datadir != NULL) {
891       sfree(datadir);
892     }
893
894     datadir = tmp;
895   } else if (strcasecmp("StepSize", key) == 0) {
896     unsigned long temp = strtoul(value, NULL, 0);
897     if (temp > 0)
898       rrdcreate_config.stepsize = temp;
899   } else if (strcasecmp("HeartBeat", key) == 0) {
900     int temp = atoi(value);
901     if (temp > 0)
902       rrdcreate_config.heartbeat = temp;
903   } else if (strcasecmp("CreateFilesAsync", key) == 0) {
904     if (IS_TRUE(value))
905       rrdcreate_config.async = 1;
906     else
907       rrdcreate_config.async = 0;
908   } else if (strcasecmp("RRARows", key) == 0) {
909     int tmp = atoi(value);
910     if (tmp <= 0) {
911       fprintf(stderr, "rrdtool: `RRARows' must "
912                       "be greater than 0.\n");
913       ERROR("rrdtool: `RRARows' must "
914             "be greater than 0.\n");
915       return 1;
916     }
917     rrdcreate_config.rrarows = tmp;
918   } else if (strcasecmp("RRATimespan", key) == 0) {
919     char *saveptr = NULL;
920     char *dummy;
921     char *ptr;
922     char *value_copy;
923     int *tmp_alloc;
924
925     value_copy = strdup(value);
926     if (value_copy == NULL)
927       return 1;
928
929     dummy = value_copy;
930     while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
931       dummy = NULL;
932
933       tmp_alloc = realloc(rrdcreate_config.timespans,
934                           sizeof(int) * (rrdcreate_config.timespans_num + 1));
935       if (tmp_alloc == NULL) {
936         fprintf(stderr, "rrdtool: realloc failed.\n");
937         ERROR("rrdtool: realloc failed.\n");
938         free(value_copy);
939         return 1;
940       }
941       rrdcreate_config.timespans = tmp_alloc;
942       rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
943       if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
944         rrdcreate_config.timespans_num++;
945     } /* while (strtok_r) */
946
947     qsort(/* base = */ rrdcreate_config.timespans,
948           /* nmemb  = */ rrdcreate_config.timespans_num,
949           /* size   = */ sizeof(rrdcreate_config.timespans[0]),
950           /* compar = */ rrd_compare_numeric);
951
952     free(value_copy);
953   } else if (strcasecmp("XFF", key) == 0) {
954     double tmp = atof(value);
955     if ((tmp < 0.0) || (tmp >= 1.0)) {
956       fprintf(stderr, "rrdtool: `XFF' must "
957                       "be in the range 0 to 1 (exclusive).");
958       ERROR("rrdtool: `XFF' must "
959             "be in the range 0 to 1 (exclusive).");
960       return 1;
961     }
962     rrdcreate_config.xff = tmp;
963   } else if (strcasecmp("WritesPerSecond", key) == 0) {
964     double wps = atof(value);
965
966     if (wps < 0.0) {
967       fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
968                       "greater than or equal to zero.");
969       return 1;
970     } else if (wps == 0.0) {
971       write_rate = 0.0;
972     } else {
973       write_rate = 1.0 / wps;
974     }
975   } else if (strcasecmp("RandomTimeout", key) == 0) {
976     double tmp;
977
978     tmp = atof(value);
979     if (tmp < 0.0) {
980       fprintf(stderr, "rrdtool: `RandomTimeout' must "
981                       "be greater than or equal to zero.\n");
982       ERROR("rrdtool: `RandomTimeout' must "
983             "be greater then or equal to zero.");
984     } else {
985       random_timeout = DOUBLE_TO_CDTIME_T(tmp);
986     }
987   } else {
988     return -1;
989   }
990   return 0;
991 } /* int rrd_config */
992
993 static int rrd_shutdown(void) {
994   pthread_mutex_lock(&cache_lock);
995   rrd_cache_flush(0);
996   pthread_mutex_unlock(&cache_lock);
997
998   pthread_mutex_lock(&queue_lock);
999   do_shutdown = 1;
1000   pthread_cond_signal(&queue_cond);
1001   pthread_mutex_unlock(&queue_lock);
1002
1003   if ((queue_thread_running != 0) &&
1004       ((queue_head != NULL) || (flushq_head != NULL))) {
1005     INFO("rrdtool plugin: Shutting down the queue thread. "
1006          "This may take a while.");
1007   } else if (queue_thread_running != 0) {
1008     INFO("rrdtool plugin: Shutting down the queue thread.");
1009   }
1010
1011   /* Wait for all the values to be written to disk before returning. */
1012   if (queue_thread_running != 0) {
1013     pthread_join(queue_thread, NULL);
1014     memset(&queue_thread, 0, sizeof(queue_thread));
1015     queue_thread_running = 0;
1016     DEBUG("rrdtool plugin: queue_thread exited.");
1017   }
1018
1019   rrd_cache_destroy();
1020
1021   return 0;
1022 } /* int rrd_shutdown */
1023
1024 static int rrd_init(void) {
1025   static int init_once = 0;
1026
1027   if (init_once != 0)
1028     return 0;
1029   init_once = 1;
1030
1031   if (rrdcreate_config.heartbeat <= 0)
1032     rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1033
1034   /* Set the cache up */
1035   pthread_mutex_lock(&cache_lock);
1036
1037   cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1038   if (cache == NULL) {
1039     pthread_mutex_unlock(&cache_lock);
1040     ERROR("rrdtool plugin: c_avl_create failed.");
1041     return -1;
1042   }
1043
1044   cache_flush_last = cdtime();
1045   if (cache_timeout == 0) {
1046     random_timeout = 0;
1047     cache_flush_timeout = 0;
1048   } else if (cache_flush_timeout < cache_timeout) {
1049     INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1050          "%.3f\". "
1051          "Ajusting \"CacheFlush\" to %.3f seconds.",
1052          CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1053          CDTIME_T_TO_DOUBLE(cache_timeout),
1054          CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1055     cache_flush_timeout = 10 * cache_timeout;
1056   }
1057
1058   /* Assure that "cache_timeout + random_variation" is never negative. */
1059   if (random_timeout > cache_timeout) {
1060     INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1061          CDTIME_T_TO_DOUBLE(cache_timeout));
1062     random_timeout = cache_timeout;
1063   }
1064
1065   pthread_mutex_unlock(&cache_lock);
1066
1067   int status =
1068       plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1069                            /* args = */ NULL, "rrdtool queue");
1070   if (status != 0) {
1071     ERROR("rrdtool plugin: Cannot create queue-thread.");
1072     return -1;
1073   }
1074   queue_thread_running = 1;
1075
1076   DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1077         " heartbeat = %i; rrarows = %i; xff = %lf;",
1078         (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1079         rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1080         rrdcreate_config.xff);
1081
1082   return 0;
1083 } /* int rrd_init */
1084
1085 void module_register(void) {
1086   plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1087   plugin_register_init("rrdtool", rrd_init);
1088   plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1089   plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1090   plugin_register_shutdown("rrdtool", rrd_shutdown);
1091 }