Merge remote-tracking branch 'github/pr/2190'
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2013  Florian octo Forster
4  * Copyright (C) 2008-2008  Sebastian Harl
5  * Copyright (C) 2009       Mariusz Gronczewski
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at collectd.org>
22  *   Sebastian Harl <sh at tokkee.org>
23  *   Mariusz Gronczewski <xani666 at gmail.com>
24  **/
25
26 #include "collectd.h"
27
28 #include "common.h"
29 #include "plugin.h"
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
33
34 #include <rrd.h>
35
36 /*
37  * Private types
38  */
39 struct rrd_cache_s {
40   int values_num;
41   char **values;
42   cdtime_t first_value;
43   cdtime_t last_value;
44   int64_t random_variation;
45   enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
46 };
47 typedef struct rrd_cache_s rrd_cache_t;
48
49 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
50 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
51
52 struct rrd_queue_s {
53   char *filename;
54   struct rrd_queue_s *next;
55 };
56 typedef struct rrd_queue_s rrd_queue_t;
57
58 /*
59  * Private variables
60  */
61 static const char *config_keys[] = {
62     "CacheTimeout", "CacheFlush",      "CreateFilesAsync", "DataDir",
63     "StepSize",     "HeartBeat",       "RRARows",          "RRATimespan",
64     "XFF",          "WritesPerSecond", "RandomTimeout"};
65 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
66
67 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
68  * is zero a default, depending on the `interval' member of the value list is
69  * being used. */
70 static char *datadir = NULL;
71 static double write_rate = 0.0;
72 static rrdcreate_config_t rrdcreate_config = {
73     /* stepsize = */ 0,
74     /* heartbeat = */ 0,
75     /* rrarows = */ 1200,
76     /* xff = */ 0.1,
77
78     /* timespans = */ NULL,
79     /* timespans_num = */ 0,
80
81     /* consolidation_functions = */ NULL,
82     /* consolidation_functions_num = */ 0,
83
84     /* async = */ 0};
85
86 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
87  * ALWAYS lock `cache_lock' first! */
88 static cdtime_t cache_timeout = 0;
89 static cdtime_t cache_flush_timeout = 0;
90 static cdtime_t random_timeout = TIME_T_TO_CDTIME_T_STATIC(1);
91 static cdtime_t cache_flush_last;
92 static c_avl_tree_t *cache = NULL;
93 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
94
95 static rrd_queue_t *queue_head = NULL;
96 static rrd_queue_t *queue_tail = NULL;
97 static rrd_queue_t *flushq_head = NULL;
98 static rrd_queue_t *flushq_tail = NULL;
99 static pthread_t queue_thread;
100 static int queue_thread_running = 1;
101 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
102 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
103
104 #if !HAVE_THREADSAFE_LIBRRD
105 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
106 #endif
107
108 static int do_shutdown = 0;
109
110 #if HAVE_THREADSAFE_LIBRRD
111 static int srrd_update(char *filename, char *template, int argc,
112                        const char **argv) {
113   int status;
114
115   optind = 0; /* bug in librrd? */
116   rrd_clear_error();
117
118   status = rrd_update_r(filename, template, argc, (void *)argv);
119
120   if (status != 0) {
121     WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
122             rrd_get_error());
123   }
124
125   return status;
126 } /* int srrd_update */
127 /* #endif HAVE_THREADSAFE_LIBRRD */
128
129 #else  /* !HAVE_THREADSAFE_LIBRRD */
130 static int srrd_update(char *filename, char *template, int argc,
131                        const char **argv) {
132   int status;
133
134   int new_argc;
135   char **new_argv;
136
137   assert(template == NULL);
138
139   new_argc = 2 + argc;
140   new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
141   if (new_argv == NULL) {
142     ERROR("rrdtool plugin: malloc failed.");
143     return -1;
144   }
145
146   new_argv[0] = "update";
147   new_argv[1] = filename;
148
149   memcpy(new_argv + 2, argv, argc * sizeof(char *));
150   new_argv[new_argc] = NULL;
151
152   pthread_mutex_lock(&librrd_lock);
153   optind = 0; /* bug in librrd? */
154   rrd_clear_error();
155
156   status = rrd_update(new_argc, new_argv);
157   pthread_mutex_unlock(&librrd_lock);
158
159   if (status != 0) {
160     WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
161             rrd_get_error());
162   }
163
164   sfree(new_argv);
165
166   return status;
167 } /* int srrd_update */
168 #endif /* !HAVE_THREADSAFE_LIBRRD */
169
170 static int value_list_to_string_multiple(char *buffer, int buffer_len,
171                                          const data_set_t *ds,
172                                          const value_list_t *vl) {
173   int offset;
174   int status;
175   time_t tt;
176
177   memset(buffer, '\0', buffer_len);
178
179   tt = CDTIME_T_TO_TIME_T(vl->time);
180   status = ssnprintf(buffer, buffer_len, "%u", (unsigned int)tt);
181   if ((status < 1) || (status >= buffer_len))
182     return -1;
183   offset = status;
184
185   for (size_t i = 0; i < ds->ds_num; i++) {
186     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
187         (ds->ds[i].type != DS_TYPE_GAUGE) &&
188         (ds->ds[i].type != DS_TYPE_DERIVE) &&
189         (ds->ds[i].type != DS_TYPE_ABSOLUTE))
190       return -1;
191
192     if (ds->ds[i].type == DS_TYPE_COUNTER)
193       status = ssnprintf(buffer + offset, buffer_len - offset, ":%llu",
194                          vl->values[i].counter);
195     else if (ds->ds[i].type == DS_TYPE_GAUGE)
196       status = ssnprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
197                          vl->values[i].gauge);
198     else if (ds->ds[i].type == DS_TYPE_DERIVE)
199       status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
200                          vl->values[i].derive);
201     else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
202       status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
203                          vl->values[i].absolute);
204
205     if ((status < 1) || (status >= (buffer_len - offset)))
206       return -1;
207
208     offset += status;
209   } /* for ds->ds_num */
210
211   return 0;
212 } /* int value_list_to_string_multiple */
213
214 static int value_list_to_string(char *buffer, int buffer_len,
215                                 const data_set_t *ds, const value_list_t *vl) {
216   int status;
217   time_t tt;
218
219   if (ds->ds_num != 1)
220     return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
221
222   tt = CDTIME_T_TO_TIME_T(vl->time);
223   switch (ds->ds[0].type) {
224   case DS_TYPE_DERIVE:
225     status = ssnprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
226                        vl->values[0].derive);
227     break;
228   case DS_TYPE_GAUGE:
229     status = ssnprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
230                        vl->values[0].gauge);
231     break;
232   case DS_TYPE_COUNTER:
233     status = ssnprintf(buffer, buffer_len, "%u:%llu", (unsigned)tt,
234                        vl->values[0].counter);
235     break;
236   case DS_TYPE_ABSOLUTE:
237     status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
238                        vl->values[0].absolute);
239     break;
240   default:
241     return EINVAL;
242   }
243
244   if ((status < 1) || (status >= buffer_len))
245     return ENOMEM;
246
247   return 0;
248 } /* int value_list_to_string */
249
250 static int value_list_to_filename(char *buffer, size_t buffer_size,
251                                   value_list_t const *vl) {
252   char const suffix[] = ".rrd";
253   int status;
254   size_t len;
255
256   if (datadir != NULL) {
257     size_t datadir_len = strlen(datadir) + 1;
258
259     if (datadir_len >= buffer_size)
260       return ENOMEM;
261
262     sstrncpy(buffer, datadir, buffer_size);
263     buffer[datadir_len - 1] = '/';
264     buffer[datadir_len] = 0;
265
266     buffer += datadir_len;
267     buffer_size -= datadir_len;
268   }
269
270   status = FORMAT_VL(buffer, buffer_size, vl);
271   if (status != 0)
272     return status;
273
274   len = strlen(buffer);
275   assert(len < buffer_size);
276   buffer += len;
277   buffer_size -= len;
278
279   if (buffer_size <= sizeof(suffix))
280     return ENOMEM;
281
282   memcpy(buffer, suffix, sizeof(suffix));
283   return 0;
284 } /* int value_list_to_filename */
285
286 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
287   struct timeval tv_next_update;
288   struct timeval tv_now;
289
290   gettimeofday(&tv_next_update, /* timezone = */ NULL);
291
292   while (42) {
293     rrd_queue_t *queue_entry;
294     rrd_cache_t *cache_entry;
295     char **values;
296     int values_num;
297     int status;
298
299     values = NULL;
300     values_num = 0;
301
302     pthread_mutex_lock(&queue_lock);
303     /* Wait for values to arrive */
304     while (42) {
305       struct timespec ts_wait;
306
307       while ((flushq_head == NULL) && (queue_head == NULL) &&
308              (do_shutdown == 0))
309         pthread_cond_wait(&queue_cond, &queue_lock);
310
311       if ((flushq_head == NULL) && (queue_head == NULL))
312         break;
313
314       /* Don't delay if there's something to flush */
315       if (flushq_head != NULL)
316         break;
317
318       /* Don't delay if we're shutting down */
319       if (do_shutdown != 0)
320         break;
321
322       /* Don't delay if no delay was configured. */
323       if (write_rate <= 0.0)
324         break;
325
326       gettimeofday(&tv_now, /* timezone = */ NULL);
327       status = timeval_cmp(tv_next_update, tv_now, NULL);
328       /* We're good to go */
329       if (status <= 0)
330         break;
331
332       /* We're supposed to wait a bit with this update, so we'll
333        * wait for the next addition to the queue or to the end of
334        * the wait period - whichever comes first. */
335       ts_wait.tv_sec = tv_next_update.tv_sec;
336       ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
337
338       status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
339       if (status == ETIMEDOUT)
340         break;
341     } /* while (42) */
342
343     /* XXX: If you need to lock both, cache_lock and queue_lock, at
344      * the same time, ALWAYS lock `cache_lock' first! */
345
346     /* We're in the shutdown phase */
347     if ((flushq_head == NULL) && (queue_head == NULL)) {
348       pthread_mutex_unlock(&queue_lock);
349       break;
350     }
351
352     if (flushq_head != NULL) {
353       /* Dequeue the first flush entry */
354       queue_entry = flushq_head;
355       if (flushq_head == flushq_tail)
356         flushq_head = flushq_tail = NULL;
357       else
358         flushq_head = flushq_head->next;
359     } else /* if (queue_head != NULL) */
360     {
361       /* Dequeue the first regular entry */
362       queue_entry = queue_head;
363       if (queue_head == queue_tail)
364         queue_head = queue_tail = NULL;
365       else
366         queue_head = queue_head->next;
367     }
368
369     /* Unlock the queue again */
370     pthread_mutex_unlock(&queue_lock);
371
372     /* We now need the cache lock so the entry isn't updated while
373      * we make a copy of its values */
374     pthread_mutex_lock(&cache_lock);
375
376     status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
377
378     if (status == 0) {
379       values = cache_entry->values;
380       values_num = cache_entry->values_num;
381
382       cache_entry->values = NULL;
383       cache_entry->values_num = 0;
384       cache_entry->flags = FLAG_NONE;
385     }
386
387     pthread_mutex_unlock(&cache_lock);
388
389     if (status != 0) {
390       sfree(queue_entry->filename);
391       sfree(queue_entry);
392       continue;
393     }
394
395     /* Update `tv_next_update' */
396     if (write_rate > 0.0) {
397       gettimeofday(&tv_now, /* timezone = */ NULL);
398       tv_next_update.tv_sec = tv_now.tv_sec;
399       tv_next_update.tv_usec =
400           tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
401       while (tv_next_update.tv_usec > 1000000) {
402         tv_next_update.tv_sec++;
403         tv_next_update.tv_usec -= 1000000;
404       }
405     }
406
407     /* Write the values to the RRD-file */
408     srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
409     DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
410           (values_num == 1) ? "" : "s", queue_entry->filename);
411
412     for (int i = 0; i < values_num; i++) {
413       sfree(values[i]);
414     }
415     sfree(values);
416     sfree(queue_entry->filename);
417     sfree(queue_entry);
418   } /* while (42) */
419
420   pthread_exit((void *)0);
421   return (void *)0;
422 } /* void *rrd_queue_thread */
423
424 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
425                              rrd_queue_t **tail) {
426   rrd_queue_t *queue_entry;
427
428   queue_entry = malloc(sizeof(*queue_entry));
429   if (queue_entry == NULL)
430     return -1;
431
432   queue_entry->filename = strdup(filename);
433   if (queue_entry->filename == NULL) {
434     free(queue_entry);
435     return -1;
436   }
437
438   queue_entry->next = NULL;
439
440   pthread_mutex_lock(&queue_lock);
441
442   if (*tail == NULL)
443     *head = queue_entry;
444   else
445     (*tail)->next = queue_entry;
446   *tail = queue_entry;
447
448   pthread_cond_signal(&queue_cond);
449   pthread_mutex_unlock(&queue_lock);
450
451   return 0;
452 } /* int rrd_queue_enqueue */
453
454 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
455                              rrd_queue_t **tail) {
456   rrd_queue_t *this;
457   rrd_queue_t *prev;
458
459   pthread_mutex_lock(&queue_lock);
460
461   prev = NULL;
462   this = *head;
463
464   while (this != NULL) {
465     if (strcmp(this->filename, filename) == 0)
466       break;
467
468     prev = this;
469     this = this->next;
470   }
471
472   if (this == NULL) {
473     pthread_mutex_unlock(&queue_lock);
474     return -1;
475   }
476
477   if (prev == NULL)
478     *head = this->next;
479   else
480     prev->next = this->next;
481
482   if (this->next == NULL)
483     *tail = prev;
484
485   pthread_mutex_unlock(&queue_lock);
486
487   sfree(this->filename);
488   sfree(this);
489
490   return 0;
491 } /* int rrd_queue_dequeue */
492
493 /* XXX: You must hold "cache_lock" when calling this function! */
494 static void rrd_cache_flush(cdtime_t timeout) {
495   rrd_cache_t *rc;
496   cdtime_t now;
497
498   char **keys = NULL;
499   int keys_num = 0;
500
501   char *key;
502   c_avl_iterator_t *iter;
503
504   DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
505         CDTIME_T_TO_DOUBLE(timeout));
506
507   now = cdtime();
508   timeout = TIME_T_TO_CDTIME_T(timeout);
509
510   /* Build a list of entries to be flushed */
511   iter = c_avl_get_iterator(cache);
512   while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
513     if (rc->flags != FLAG_NONE)
514       continue;
515     /* timeout == 0  =>  flush everything */
516     else if ((timeout != 0) && ((now - rc->first_value) < timeout))
517       continue;
518     else if (rc->values_num > 0) {
519       int status;
520
521       status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
522       if (status == 0)
523         rc->flags = FLAG_QUEUED;
524     } else /* ancient and no values -> waste of memory */
525     {
526       char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
527       if (tmp == NULL) {
528         char errbuf[1024];
529         ERROR("rrdtool plugin: "
530               "realloc failed: %s",
531               sstrerror(errno, errbuf, sizeof(errbuf)));
532         c_avl_iterator_destroy(iter);
533         sfree(keys);
534         return;
535       }
536       keys = tmp;
537       keys[keys_num] = key;
538       keys_num++;
539     }
540   } /* while (c_avl_iterator_next) */
541   c_avl_iterator_destroy(iter);
542
543   for (int i = 0; i < keys_num; i++) {
544     if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
545       DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
546       continue;
547     }
548
549     assert(rc->values == NULL);
550     assert(rc->values_num == 0);
551
552     sfree(rc);
553     sfree(key);
554     keys[i] = NULL;
555   } /* for (i = 0..keys_num) */
556
557   sfree(keys);
558
559   cache_flush_last = now;
560 } /* void rrd_cache_flush */
561
562 static int rrd_cache_flush_identifier(cdtime_t timeout,
563                                       const char *identifier) {
564   rrd_cache_t *rc;
565   cdtime_t now;
566   int status;
567   char key[2048];
568
569   if (identifier == NULL) {
570     rrd_cache_flush(timeout);
571     return 0;
572   }
573
574   now = cdtime();
575
576   if (datadir == NULL)
577     snprintf(key, sizeof(key), "%s.rrd", identifier);
578   else
579     snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
580   key[sizeof(key) - 1] = 0;
581
582   status = c_avl_get(cache, key, (void *)&rc);
583   if (status != 0) {
584     INFO("rrdtool plugin: rrd_cache_flush_identifier: "
585          "c_avl_get (%s) failed. Does that file really exist?",
586          key);
587     return status;
588   }
589
590   if (rc->flags == FLAG_FLUSHQ) {
591     status = 0;
592   } else if (rc->flags == FLAG_QUEUED) {
593     rrd_queue_dequeue(key, &queue_head, &queue_tail);
594     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
595     if (status == 0)
596       rc->flags = FLAG_FLUSHQ;
597   } else if ((now - rc->first_value) < timeout) {
598     status = 0;
599   } else if (rc->values_num > 0) {
600     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
601     if (status == 0)
602       rc->flags = FLAG_FLUSHQ;
603   }
604
605   return status;
606 } /* int rrd_cache_flush_identifier */
607
608 static int64_t rrd_get_random_variation(void) {
609   long min;
610   long max;
611
612   if (random_timeout == 0)
613     return 0;
614
615   /* Assure that "cache_timeout + random_variation" is never negative. */
616   if (random_timeout > cache_timeout) {
617     INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
618          CDTIME_T_TO_DOUBLE(cache_timeout));
619     random_timeout = cache_timeout;
620   }
621
622   max = (long)(random_timeout / 2);
623   min = max - ((long)random_timeout);
624
625   return (int64_t)cdrand_range(min, max);
626 } /* int64_t rrd_get_random_variation */
627
628 static int rrd_cache_insert(const char *filename, const char *value,
629                             cdtime_t value_time) {
630   rrd_cache_t *rc = NULL;
631   int new_rc = 0;
632   char **values_new;
633
634   pthread_mutex_lock(&cache_lock);
635
636   /* This shouldn't happen, but it did happen at least once, so we'll be
637    * careful. */
638   if (cache == NULL) {
639     pthread_mutex_unlock(&cache_lock);
640     WARNING("rrdtool plugin: cache == NULL.");
641     return -1;
642   }
643
644   c_avl_get(cache, filename, (void *)&rc);
645
646   if (rc == NULL) {
647     rc = malloc(sizeof(*rc));
648     if (rc == NULL) {
649       pthread_mutex_unlock(&cache_lock);
650       return -1;
651     }
652     rc->values_num = 0;
653     rc->values = NULL;
654     rc->first_value = 0;
655     rc->last_value = 0;
656     rc->random_variation = rrd_get_random_variation();
657     rc->flags = FLAG_NONE;
658     new_rc = 1;
659   }
660
661   assert(value_time > 0); /* plugin_dispatch() ensures this. */
662   if (rc->last_value >= value_time) {
663     pthread_mutex_unlock(&cache_lock);
664     DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
665           ">= (value_time = %" PRIu64 ")",
666           rc->last_value, value_time);
667     return -1;
668   }
669
670   values_new =
671       realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
672   if (values_new == NULL) {
673     char errbuf[1024];
674     void *cache_key = NULL;
675
676     sstrerror(errno, errbuf, sizeof(errbuf));
677
678     c_avl_remove(cache, filename, &cache_key, NULL);
679     pthread_mutex_unlock(&cache_lock);
680
681     ERROR("rrdtool plugin: realloc failed: %s", errbuf);
682
683     sfree(cache_key);
684     sfree(rc->values);
685     sfree(rc);
686     return -1;
687   }
688   rc->values = values_new;
689
690   rc->values[rc->values_num] = strdup(value);
691   if (rc->values[rc->values_num] != NULL)
692     rc->values_num++;
693
694   if (rc->values_num == 1)
695     rc->first_value = value_time;
696   rc->last_value = value_time;
697
698   /* Insert if this is the first value */
699   if (new_rc == 1) {
700     void *cache_key = strdup(filename);
701
702     if (cache_key == NULL) {
703       char errbuf[1024];
704       sstrerror(errno, errbuf, sizeof(errbuf));
705
706       pthread_mutex_unlock(&cache_lock);
707
708       ERROR("rrdtool plugin: strdup failed: %s", errbuf);
709
710       sfree(rc->values[0]);
711       sfree(rc->values);
712       sfree(rc);
713       return -1;
714     }
715
716     c_avl_insert(cache, cache_key, rc);
717   }
718
719   DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
720         "values_num = %i; age = %.3f;",
721         filename, rc->values_num,
722         CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
723
724   if ((rc->last_value - rc->first_value) >=
725       (cache_timeout + rc->random_variation)) {
726     /* XXX: If you need to lock both, cache_lock and queue_lock, at
727      * the same time, ALWAYS lock `cache_lock' first! */
728     if (rc->flags == FLAG_NONE) {
729       int status;
730
731       status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
732       if (status == 0)
733         rc->flags = FLAG_QUEUED;
734
735       rc->random_variation = rrd_get_random_variation();
736     } else {
737       DEBUG("rrdtool plugin: `%s' is already queued.", filename);
738     }
739   }
740
741   if ((cache_timeout > 0) &&
742       ((cdtime() - cache_flush_last) > cache_flush_timeout))
743     rrd_cache_flush(cache_flush_timeout);
744
745   pthread_mutex_unlock(&cache_lock);
746
747   return 0;
748 } /* int rrd_cache_insert */
749
750 static int rrd_cache_destroy(void) /* {{{ */
751 {
752   void *key = NULL;
753   void *value = NULL;
754
755   int non_empty = 0;
756
757   pthread_mutex_lock(&cache_lock);
758
759   if (cache == NULL) {
760     pthread_mutex_unlock(&cache_lock);
761     return 0;
762   }
763
764   while (c_avl_pick(cache, &key, &value) == 0) {
765     rrd_cache_t *rc;
766
767     sfree(key);
768     key = NULL;
769
770     rc = value;
771     value = NULL;
772
773     if (rc->values_num > 0)
774       non_empty++;
775
776     for (int i = 0; i < rc->values_num; i++)
777       sfree(rc->values[i]);
778     sfree(rc->values);
779     sfree(rc);
780   }
781
782   c_avl_destroy(cache);
783   cache = NULL;
784
785   if (non_empty > 0) {
786     INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
787          non_empty, (non_empty == 1) ? "entry" : "entries");
788   } else {
789     DEBUG("rrdtool plugin: No values have been lost "
790           "when destroying the cache.");
791   }
792
793   pthread_mutex_unlock(&cache_lock);
794   return 0;
795 } /* }}} int rrd_cache_destroy */
796
797 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
798   int a = *((int *)a_ptr);
799   int b = *((int *)b_ptr);
800
801   if (a < b)
802     return -1;
803   else if (a > b)
804     return 1;
805   else
806     return 0;
807 } /* int rrd_compare_numeric */
808
809 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
810                      user_data_t __attribute__((unused)) * user_data) {
811   struct stat statbuf;
812   char filename[512];
813   char values[512];
814   int status;
815
816   if (do_shutdown)
817     return 0;
818
819   if (0 != strcmp(ds->type, vl->type)) {
820     ERROR("rrdtool plugin: DS type does not match value list type");
821     return -1;
822   }
823
824   if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
825     return -1;
826
827   if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
828     return -1;
829
830   if (stat(filename, &statbuf) == -1) {
831     if (errno == ENOENT) {
832       status = cu_rrd_create_file(filename, ds, vl, &rrdcreate_config);
833       if (status != 0)
834         return -1;
835       else if (rrdcreate_config.async)
836         return 0;
837     } else {
838       char errbuf[1024];
839       ERROR("stat(%s) failed: %s", filename,
840             sstrerror(errno, errbuf, sizeof(errbuf)));
841       return -1;
842     }
843   } else if (!S_ISREG(statbuf.st_mode)) {
844     ERROR("stat(%s): Not a regular file!", filename);
845     return -1;
846   }
847
848   status = rrd_cache_insert(filename, values, vl->time);
849
850   return status;
851 } /* int rrd_write */
852
853 static int rrd_flush(cdtime_t timeout, const char *identifier,
854                      __attribute__((unused)) user_data_t *user_data) {
855   pthread_mutex_lock(&cache_lock);
856
857   if (cache == NULL) {
858     pthread_mutex_unlock(&cache_lock);
859     return 0;
860   }
861
862   rrd_cache_flush_identifier(timeout, identifier);
863
864   pthread_mutex_unlock(&cache_lock);
865   return 0;
866 } /* int rrd_flush */
867
868 static int rrd_config(const char *key, const char *value) {
869   if (strcasecmp("CacheTimeout", key) == 0) {
870     double tmp = atof(value);
871     if (tmp < 0) {
872       fprintf(stderr, "rrdtool: `CacheTimeout' must "
873                       "be greater than 0.\n");
874       ERROR("rrdtool: `CacheTimeout' must "
875             "be greater than 0.\n");
876       return 1;
877     }
878     cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
879   } else if (strcasecmp("CacheFlush", key) == 0) {
880     int tmp = atoi(value);
881     if (tmp < 0) {
882       fprintf(stderr, "rrdtool: `CacheFlush' must "
883                       "be greater than 0.\n");
884       ERROR("rrdtool: `CacheFlush' must "
885             "be greater than 0.\n");
886       return 1;
887     }
888     cache_flush_timeout = tmp;
889   } else if (strcasecmp("DataDir", key) == 0) {
890     char *tmp;
891     size_t len;
892
893     tmp = strdup(value);
894     if (tmp == NULL) {
895       ERROR("rrdtool plugin: strdup failed.");
896       return 1;
897     }
898
899     len = strlen(tmp);
900     while ((len > 0) && (tmp[len - 1] == '/')) {
901       len--;
902       tmp[len] = 0;
903     }
904
905     if (len == 0) {
906       ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
907       sfree(tmp);
908       return 1;
909     }
910
911     if (datadir != NULL) {
912       sfree(datadir);
913     }
914
915     datadir = tmp;
916   } else if (strcasecmp("StepSize", key) == 0) {
917     unsigned long temp = strtoul(value, NULL, 0);
918     if (temp > 0)
919       rrdcreate_config.stepsize = temp;
920   } else if (strcasecmp("HeartBeat", key) == 0) {
921     int temp = atoi(value);
922     if (temp > 0)
923       rrdcreate_config.heartbeat = temp;
924   } else if (strcasecmp("CreateFilesAsync", key) == 0) {
925     if (IS_TRUE(value))
926       rrdcreate_config.async = 1;
927     else
928       rrdcreate_config.async = 0;
929   } else if (strcasecmp("RRARows", key) == 0) {
930     int tmp = atoi(value);
931     if (tmp <= 0) {
932       fprintf(stderr, "rrdtool: `RRARows' must "
933                       "be greater than 0.\n");
934       ERROR("rrdtool: `RRARows' must "
935             "be greater than 0.\n");
936       return 1;
937     }
938     rrdcreate_config.rrarows = tmp;
939   } else if (strcasecmp("RRATimespan", key) == 0) {
940     char *saveptr = NULL;
941     char *dummy;
942     char *ptr;
943     char *value_copy;
944     int *tmp_alloc;
945
946     value_copy = strdup(value);
947     if (value_copy == NULL)
948       return 1;
949
950     dummy = value_copy;
951     while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
952       dummy = NULL;
953
954       tmp_alloc = realloc(rrdcreate_config.timespans,
955                           sizeof(int) * (rrdcreate_config.timespans_num + 1));
956       if (tmp_alloc == NULL) {
957         fprintf(stderr, "rrdtool: realloc failed.\n");
958         ERROR("rrdtool: realloc failed.\n");
959         free(value_copy);
960         return 1;
961       }
962       rrdcreate_config.timespans = tmp_alloc;
963       rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
964       if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
965         rrdcreate_config.timespans_num++;
966     } /* while (strtok_r) */
967
968     qsort(/* base = */ rrdcreate_config.timespans,
969           /* nmemb  = */ rrdcreate_config.timespans_num,
970           /* size   = */ sizeof(rrdcreate_config.timespans[0]),
971           /* compar = */ rrd_compare_numeric);
972
973     free(value_copy);
974   } else if (strcasecmp("XFF", key) == 0) {
975     double tmp = atof(value);
976     if ((tmp < 0.0) || (tmp >= 1.0)) {
977       fprintf(stderr, "rrdtool: `XFF' must "
978                       "be in the range 0 to 1 (exclusive).");
979       ERROR("rrdtool: `XFF' must "
980             "be in the range 0 to 1 (exclusive).");
981       return 1;
982     }
983     rrdcreate_config.xff = tmp;
984   } else if (strcasecmp("WritesPerSecond", key) == 0) {
985     double wps = atof(value);
986
987     if (wps < 0.0) {
988       fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
989                       "greater than or equal to zero.");
990       return 1;
991     } else if (wps == 0.0) {
992       write_rate = 0.0;
993     } else {
994       write_rate = 1.0 / wps;
995     }
996   } else if (strcasecmp("RandomTimeout", key) == 0) {
997     double tmp;
998
999     tmp = atof(value);
1000     if (tmp < 0.0) {
1001       fprintf(stderr, "rrdtool: `RandomTimeout' must "
1002                       "be greater than or equal to zero.\n");
1003       ERROR("rrdtool: `RandomTimeout' must "
1004             "be greater then or equal to zero.");
1005     } else {
1006       random_timeout = DOUBLE_TO_CDTIME_T(tmp);
1007     }
1008   } else {
1009     return -1;
1010   }
1011   return 0;
1012 } /* int rrd_config */
1013
1014 static int rrd_shutdown(void) {
1015   pthread_mutex_lock(&cache_lock);
1016   rrd_cache_flush(0);
1017   pthread_mutex_unlock(&cache_lock);
1018
1019   pthread_mutex_lock(&queue_lock);
1020   do_shutdown = 1;
1021   pthread_cond_signal(&queue_cond);
1022   pthread_mutex_unlock(&queue_lock);
1023
1024   if ((queue_thread_running != 0) &&
1025       ((queue_head != NULL) || (flushq_head != NULL))) {
1026     INFO("rrdtool plugin: Shutting down the queue thread. "
1027          "This may take a while.");
1028   } else if (queue_thread_running != 0) {
1029     INFO("rrdtool plugin: Shutting down the queue thread.");
1030   }
1031
1032   /* Wait for all the values to be written to disk before returning. */
1033   if (queue_thread_running != 0) {
1034     pthread_join(queue_thread, NULL);
1035     memset(&queue_thread, 0, sizeof(queue_thread));
1036     queue_thread_running = 0;
1037     DEBUG("rrdtool plugin: queue_thread exited.");
1038   }
1039
1040   rrd_cache_destroy();
1041
1042   return 0;
1043 } /* int rrd_shutdown */
1044
1045 static int rrd_init(void) {
1046   static int init_once = 0;
1047   int status;
1048
1049   if (init_once != 0)
1050     return 0;
1051   init_once = 1;
1052
1053   if (rrdcreate_config.heartbeat <= 0)
1054     rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1055
1056   /* Set the cache up */
1057   pthread_mutex_lock(&cache_lock);
1058
1059   cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1060   if (cache == NULL) {
1061     pthread_mutex_unlock(&cache_lock);
1062     ERROR("rrdtool plugin: c_avl_create failed.");
1063     return -1;
1064   }
1065
1066   cache_flush_last = cdtime();
1067   if (cache_timeout == 0) {
1068     cache_flush_timeout = 0;
1069   } else if (cache_flush_timeout < cache_timeout)
1070     cache_flush_timeout = 10 * cache_timeout;
1071
1072   pthread_mutex_unlock(&cache_lock);
1073
1074   status =
1075       plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1076                            /* args = */ NULL, "rrdtool queue");
1077   if (status != 0) {
1078     ERROR("rrdtool plugin: Cannot create queue-thread.");
1079     return -1;
1080   }
1081   queue_thread_running = 1;
1082
1083   DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1084         " heartbeat = %i; rrarows = %i; xff = %lf;",
1085         (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1086         rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1087         rrdcreate_config.xff);
1088
1089   return 0;
1090 } /* int rrd_init */
1091
1092 void module_register(void) {
1093   plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1094   plugin_register_init("rrdtool", rrd_init);
1095   plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1096   plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1097   plugin_register_shutdown("rrdtool", rrd_shutdown);
1098 }