rrdtool plugin: Check return value of c_avl_get().
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006-2013  Florian octo Forster
4  * Copyright (C) 2008-2008  Sebastian Harl
5  * Copyright (C) 2009       Mariusz Gronczewski
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at collectd.org>
22  *   Sebastian Harl <sh at tokkee.org>
23  *   Mariusz Gronczewski <xani666 at gmail.com>
24  **/
25
26 #include "collectd.h"
27
28 #include "common.h"
29 #include "plugin.h"
30 #include "utils_avltree.h"
31 #include "utils_random.h"
32 #include "utils_rrdcreate.h"
33
34 #include <rrd.h>
35
36 /*
37  * Private types
38  */
39 struct rrd_cache_s {
40   int values_num;
41   char **values;
42   cdtime_t first_value;
43   cdtime_t last_value;
44   int64_t random_variation;
45   enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
46 };
47 typedef struct rrd_cache_s rrd_cache_t;
48
49 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
50 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
51
52 struct rrd_queue_s {
53   char *filename;
54   struct rrd_queue_s *next;
55 };
56 typedef struct rrd_queue_s rrd_queue_t;
57
58 /*
59  * Private variables
60  */
61 static const char *config_keys[] = {
62     "CacheTimeout", "CacheFlush",      "CreateFilesAsync", "DataDir",
63     "StepSize",     "HeartBeat",       "RRARows",          "RRATimespan",
64     "XFF",          "WritesPerSecond", "RandomTimeout"};
65 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
66
67 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
68  * is zero a default, depending on the `interval' member of the value list is
69  * being used. */
70 static char *datadir = NULL;
71 static double write_rate = 0.0;
72 static rrdcreate_config_t rrdcreate_config = {
73     /* stepsize = */ 0,
74     /* heartbeat = */ 0,
75     /* rrarows = */ 1200,
76     /* xff = */ 0.1,
77
78     /* timespans = */ NULL,
79     /* timespans_num = */ 0,
80
81     /* consolidation_functions = */ NULL,
82     /* consolidation_functions_num = */ 0,
83
84     /* async = */ 0};
85
86 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
87  * ALWAYS lock `cache_lock' first! */
88 static cdtime_t cache_timeout = 0;
89 static cdtime_t cache_flush_timeout = 0;
90 static cdtime_t random_timeout = 0;
91 static cdtime_t cache_flush_last;
92 static c_avl_tree_t *cache = NULL;
93 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
94
95 static rrd_queue_t *queue_head = NULL;
96 static rrd_queue_t *queue_tail = NULL;
97 static rrd_queue_t *flushq_head = NULL;
98 static rrd_queue_t *flushq_tail = NULL;
99 static pthread_t queue_thread;
100 static int queue_thread_running = 1;
101 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
102 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
103
104 #if !HAVE_THREADSAFE_LIBRRD
105 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
106 #endif
107
108 static int do_shutdown = 0;
109
110 #if HAVE_THREADSAFE_LIBRRD
111 static int srrd_update(char *filename, char *template, int argc,
112                        const char **argv) {
113   int status;
114
115   optind = 0; /* bug in librrd? */
116   rrd_clear_error();
117
118   status = rrd_update_r(filename, template, argc, (void *)argv);
119
120   if (status != 0) {
121     WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
122             rrd_get_error());
123   }
124
125   return (status);
126 } /* int srrd_update */
127 /* #endif HAVE_THREADSAFE_LIBRRD */
128
129 #else  /* !HAVE_THREADSAFE_LIBRRD */
130 static int srrd_update(char *filename, char *template, int argc,
131                        const char **argv) {
132   int status;
133
134   int new_argc;
135   char **new_argv;
136
137   assert(template == NULL);
138
139   new_argc = 2 + argc;
140   new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
141   if (new_argv == NULL) {
142     ERROR("rrdtool plugin: malloc failed.");
143     return (-1);
144   }
145
146   new_argv[0] = "update";
147   new_argv[1] = filename;
148
149   memcpy(new_argv + 2, argv, argc * sizeof(char *));
150   new_argv[new_argc] = NULL;
151
152   pthread_mutex_lock(&librrd_lock);
153   optind = 0; /* bug in librrd? */
154   rrd_clear_error();
155
156   status = rrd_update(new_argc, new_argv);
157   pthread_mutex_unlock(&librrd_lock);
158
159   if (status != 0) {
160     WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
161             rrd_get_error());
162   }
163
164   sfree(new_argv);
165
166   return (status);
167 } /* int srrd_update */
168 #endif /* !HAVE_THREADSAFE_LIBRRD */
169
170 static int value_list_to_string_multiple(char *buffer, int buffer_len,
171                                          const data_set_t *ds,
172                                          const value_list_t *vl) {
173   int offset;
174   int status;
175   time_t tt;
176
177   memset(buffer, '\0', buffer_len);
178
179   tt = CDTIME_T_TO_TIME_T(vl->time);
180   status = ssnprintf(buffer, buffer_len, "%u", (unsigned int)tt);
181   if ((status < 1) || (status >= buffer_len))
182     return (-1);
183   offset = status;
184
185   for (size_t i = 0; i < ds->ds_num; i++) {
186     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
187         (ds->ds[i].type != DS_TYPE_GAUGE) &&
188         (ds->ds[i].type != DS_TYPE_DERIVE) &&
189         (ds->ds[i].type != DS_TYPE_ABSOLUTE))
190       return (-1);
191
192     if (ds->ds[i].type == DS_TYPE_COUNTER)
193       status = ssnprintf(buffer + offset, buffer_len - offset, ":%llu",
194                          vl->values[i].counter);
195     else if (ds->ds[i].type == DS_TYPE_GAUGE)
196       status = ssnprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
197                          vl->values[i].gauge);
198     else if (ds->ds[i].type == DS_TYPE_DERIVE)
199       status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
200                          vl->values[i].derive);
201     else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
202       status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
203                          vl->values[i].absolute);
204
205     if ((status < 1) || (status >= (buffer_len - offset)))
206       return (-1);
207
208     offset += status;
209   } /* for ds->ds_num */
210
211   return (0);
212 } /* int value_list_to_string_multiple */
213
214 static int value_list_to_string(char *buffer, int buffer_len,
215                                 const data_set_t *ds, const value_list_t *vl) {
216   int status;
217   time_t tt;
218
219   if (ds->ds_num != 1)
220     return (value_list_to_string_multiple(buffer, buffer_len, ds, vl));
221
222   tt = CDTIME_T_TO_TIME_T(vl->time);
223   switch (ds->ds[0].type) {
224   case DS_TYPE_DERIVE:
225     status = ssnprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
226                        vl->values[0].derive);
227     break;
228   case DS_TYPE_GAUGE:
229     status = ssnprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
230                        vl->values[0].gauge);
231     break;
232   case DS_TYPE_COUNTER:
233     status = ssnprintf(buffer, buffer_len, "%u:%llu", (unsigned)tt,
234                        vl->values[0].counter);
235     break;
236   case DS_TYPE_ABSOLUTE:
237     status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
238                        vl->values[0].absolute);
239     break;
240   default:
241     return (EINVAL);
242   }
243
244   if ((status < 1) || (status >= buffer_len))
245     return (ENOMEM);
246
247   return (0);
248 } /* int value_list_to_string */
249
250 static int value_list_to_filename(char *buffer, size_t buffer_size,
251                                   value_list_t const *vl) {
252   char const suffix[] = ".rrd";
253   int status;
254   size_t len;
255
256   if (datadir != NULL) {
257     size_t datadir_len = strlen(datadir) + 1;
258
259     if (datadir_len >= buffer_size)
260       return (ENOMEM);
261
262     sstrncpy(buffer, datadir, buffer_size);
263     buffer[datadir_len - 1] = '/';
264     buffer[datadir_len] = 0;
265
266     buffer += datadir_len;
267     buffer_size -= datadir_len;
268   }
269
270   status = FORMAT_VL(buffer, buffer_size, vl);
271   if (status != 0)
272     return (status);
273
274   len = strlen(buffer);
275   assert(len < buffer_size);
276   buffer += len;
277   buffer_size -= len;
278
279   if (buffer_size <= sizeof(suffix))
280     return (ENOMEM);
281
282   memcpy(buffer, suffix, sizeof(suffix));
283   return (0);
284 } /* int value_list_to_filename */
285
286 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
287   struct timeval tv_next_update;
288   struct timeval tv_now;
289
290   gettimeofday(&tv_next_update, /* timezone = */ NULL);
291
292   while (42) {
293     rrd_queue_t *queue_entry;
294     rrd_cache_t *cache_entry;
295     char **values;
296     int values_num;
297     int status;
298
299     values = NULL;
300     values_num = 0;
301
302     pthread_mutex_lock(&queue_lock);
303     /* Wait for values to arrive */
304     while (42) {
305       struct timespec ts_wait;
306
307       while ((flushq_head == NULL) && (queue_head == NULL) &&
308              (do_shutdown == 0))
309         pthread_cond_wait(&queue_cond, &queue_lock);
310
311       if ((flushq_head == NULL) && (queue_head == NULL))
312         break;
313
314       /* Don't delay if there's something to flush */
315       if (flushq_head != NULL)
316         break;
317
318       /* Don't delay if we're shutting down */
319       if (do_shutdown != 0)
320         break;
321
322       /* Don't delay if no delay was configured. */
323       if (write_rate <= 0.0)
324         break;
325
326       gettimeofday(&tv_now, /* timezone = */ NULL);
327       status = timeval_cmp(tv_next_update, tv_now, NULL);
328       /* We're good to go */
329       if (status <= 0)
330         break;
331
332       /* We're supposed to wait a bit with this update, so we'll
333        * wait for the next addition to the queue or to the end of
334        * the wait period - whichever comes first. */
335       ts_wait.tv_sec = tv_next_update.tv_sec;
336       ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
337
338       status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
339       if (status == ETIMEDOUT)
340         break;
341     } /* while (42) */
342
343     /* XXX: If you need to lock both, cache_lock and queue_lock, at
344      * the same time, ALWAYS lock `cache_lock' first! */
345
346     /* We're in the shutdown phase */
347     if ((flushq_head == NULL) && (queue_head == NULL)) {
348       pthread_mutex_unlock(&queue_lock);
349       break;
350     }
351
352     if (flushq_head != NULL) {
353       /* Dequeue the first flush entry */
354       queue_entry = flushq_head;
355       if (flushq_head == flushq_tail)
356         flushq_head = flushq_tail = NULL;
357       else
358         flushq_head = flushq_head->next;
359     } else /* if (queue_head != NULL) */
360     {
361       /* Dequeue the first regular entry */
362       queue_entry = queue_head;
363       if (queue_head == queue_tail)
364         queue_head = queue_tail = NULL;
365       else
366         queue_head = queue_head->next;
367     }
368
369     /* Unlock the queue again */
370     pthread_mutex_unlock(&queue_lock);
371
372     /* We now need the cache lock so the entry isn't updated while
373      * we make a copy of its values */
374     pthread_mutex_lock(&cache_lock);
375
376     status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
377
378     if (status == 0) {
379       values = cache_entry->values;
380       values_num = cache_entry->values_num;
381
382       cache_entry->values = NULL;
383       cache_entry->values_num = 0;
384       cache_entry->flags = FLAG_NONE;
385     }
386
387     pthread_mutex_unlock(&cache_lock);
388
389     if (status != 0) {
390       sfree(queue_entry->filename);
391       sfree(queue_entry);
392       continue;
393     }
394
395     /* Update `tv_next_update' */
396     if (write_rate > 0.0) {
397       gettimeofday(&tv_now, /* timezone = */ NULL);
398       tv_next_update.tv_sec = tv_now.tv_sec;
399       tv_next_update.tv_usec =
400           tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
401       while (tv_next_update.tv_usec > 1000000) {
402         tv_next_update.tv_sec++;
403         tv_next_update.tv_usec -= 1000000;
404       }
405     }
406
407     /* Write the values to the RRD-file */
408     srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
409     DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
410           (values_num == 1) ? "" : "s", queue_entry->filename);
411
412     for (int i = 0; i < values_num; i++) {
413       sfree(values[i]);
414     }
415     sfree(values);
416     sfree(queue_entry->filename);
417     sfree(queue_entry);
418   } /* while (42) */
419
420   pthread_exit((void *)0);
421   return ((void *)0);
422 } /* void *rrd_queue_thread */
423
424 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
425                              rrd_queue_t **tail) {
426   rrd_queue_t *queue_entry;
427
428   queue_entry = malloc(sizeof(*queue_entry));
429   if (queue_entry == NULL)
430     return (-1);
431
432   queue_entry->filename = strdup(filename);
433   if (queue_entry->filename == NULL) {
434     free(queue_entry);
435     return (-1);
436   }
437
438   queue_entry->next = NULL;
439
440   pthread_mutex_lock(&queue_lock);
441
442   if (*tail == NULL)
443     *head = queue_entry;
444   else
445     (*tail)->next = queue_entry;
446   *tail = queue_entry;
447
448   pthread_cond_signal(&queue_cond);
449   pthread_mutex_unlock(&queue_lock);
450
451   return (0);
452 } /* int rrd_queue_enqueue */
453
454 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
455                              rrd_queue_t **tail) {
456   rrd_queue_t *this;
457   rrd_queue_t *prev;
458
459   pthread_mutex_lock(&queue_lock);
460
461   prev = NULL;
462   this = *head;
463
464   while (this != NULL) {
465     if (strcmp(this->filename, filename) == 0)
466       break;
467
468     prev = this;
469     this = this->next;
470   }
471
472   if (this == NULL) {
473     pthread_mutex_unlock(&queue_lock);
474     return (-1);
475   }
476
477   if (prev == NULL)
478     *head = this->next;
479   else
480     prev->next = this->next;
481
482   if (this->next == NULL)
483     *tail = prev;
484
485   pthread_mutex_unlock(&queue_lock);
486
487   sfree(this->filename);
488   sfree(this);
489
490   return (0);
491 } /* int rrd_queue_dequeue */
492
493 /* XXX: You must hold "cache_lock" when calling this function! */
494 static void rrd_cache_flush(cdtime_t timeout) {
495   rrd_cache_t *rc;
496   cdtime_t now;
497
498   char **keys = NULL;
499   int keys_num = 0;
500
501   char *key;
502   c_avl_iterator_t *iter;
503
504   DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
505         CDTIME_T_TO_DOUBLE(timeout));
506
507   now = cdtime();
508
509   /* Build a list of entries to be flushed */
510   iter = c_avl_get_iterator(cache);
511   while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
512     if (rc->flags != FLAG_NONE)
513       continue;
514     /* timeout == 0  =>  flush everything */
515     else if ((timeout != 0) && ((now - rc->first_value) < timeout))
516       continue;
517     else if (rc->values_num > 0) {
518       int status;
519
520       status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
521       if (status == 0)
522         rc->flags = FLAG_QUEUED;
523     } else /* ancient and no values -> waste of memory */
524     {
525       char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
526       if (tmp == NULL) {
527         char errbuf[1024];
528         ERROR("rrdtool plugin: "
529               "realloc failed: %s",
530               sstrerror(errno, errbuf, sizeof(errbuf)));
531         c_avl_iterator_destroy(iter);
532         sfree(keys);
533         return;
534       }
535       keys = tmp;
536       keys[keys_num] = key;
537       keys_num++;
538     }
539   } /* while (c_avl_iterator_next) */
540   c_avl_iterator_destroy(iter);
541
542   for (int i = 0; i < keys_num; i++) {
543     if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
544       DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
545       continue;
546     }
547
548     assert(rc->values == NULL);
549     assert(rc->values_num == 0);
550
551     sfree(rc);
552     sfree(key);
553     keys[i] = NULL;
554   } /* for (i = 0..keys_num) */
555
556   sfree(keys);
557
558   cache_flush_last = now;
559 } /* void rrd_cache_flush */
560
561 static int rrd_cache_flush_identifier(cdtime_t timeout,
562                                       const char *identifier) {
563   rrd_cache_t *rc;
564   cdtime_t now;
565   int status;
566   char key[2048];
567
568   if (identifier == NULL) {
569     rrd_cache_flush(timeout);
570     return (0);
571   }
572
573   now = cdtime();
574
575   if (datadir == NULL)
576     snprintf(key, sizeof(key), "%s.rrd", identifier);
577   else
578     snprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
579   key[sizeof(key) - 1] = 0;
580
581   status = c_avl_get(cache, key, (void *)&rc);
582   if (status != 0) {
583     INFO("rrdtool plugin: rrd_cache_flush_identifier: "
584          "c_avl_get (%s) failed. Does that file really exist?",
585          key);
586     return (status);
587   }
588
589   if (rc->flags == FLAG_FLUSHQ) {
590     status = 0;
591   } else if (rc->flags == FLAG_QUEUED) {
592     rrd_queue_dequeue(key, &queue_head, &queue_tail);
593     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
594     if (status == 0)
595       rc->flags = FLAG_FLUSHQ;
596   } else if ((now - rc->first_value) < timeout) {
597     status = 0;
598   } else if (rc->values_num > 0) {
599     status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
600     if (status == 0)
601       rc->flags = FLAG_FLUSHQ;
602   }
603
604   return (status);
605 } /* int rrd_cache_flush_identifier */
606
607 static int64_t rrd_get_random_variation(void) {
608   if (random_timeout == 0)
609     return (0);
610
611   return (int64_t)cdrand_range(-random_timeout, random_timeout);
612 } /* int64_t rrd_get_random_variation */
613
614 static int rrd_cache_insert(const char *filename, const char *value,
615                             cdtime_t value_time) {
616   rrd_cache_t *rc = NULL;
617   int new_rc = 0;
618   char **values_new;
619
620   pthread_mutex_lock(&cache_lock);
621
622   /* This shouldn't happen, but it did happen at least once, so we'll be
623    * careful. */
624   if (cache == NULL) {
625     pthread_mutex_unlock(&cache_lock);
626     WARNING("rrdtool plugin: cache == NULL.");
627     return (-1);
628   }
629
630   int status = c_avl_get(cache, filename, (void *)&rc);
631   if ((status != 0) || (rc == NULL)) {
632     rc = malloc(sizeof(*rc));
633     if (rc == NULL) {
634       pthread_mutex_unlock(&cache_lock);
635       return (-1);
636     }
637     rc->values_num = 0;
638     rc->values = NULL;
639     rc->first_value = 0;
640     rc->last_value = 0;
641     rc->random_variation = rrd_get_random_variation();
642     rc->flags = FLAG_NONE;
643     new_rc = 1;
644   }
645
646   assert(value_time > 0); /* plugin_dispatch() ensures this. */
647   if (rc->last_value >= value_time) {
648     pthread_mutex_unlock(&cache_lock);
649     DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
650           ">= (value_time = %" PRIu64 ")",
651           rc->last_value, value_time);
652     return (-1);
653   }
654
655   values_new =
656       realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
657   if (values_new == NULL) {
658     char errbuf[1024];
659     void *cache_key = NULL;
660
661     sstrerror(errno, errbuf, sizeof(errbuf));
662
663     c_avl_remove(cache, filename, &cache_key, NULL);
664     pthread_mutex_unlock(&cache_lock);
665
666     ERROR("rrdtool plugin: realloc failed: %s", errbuf);
667
668     sfree(cache_key);
669     sfree(rc->values);
670     sfree(rc);
671     return (-1);
672   }
673   rc->values = values_new;
674
675   rc->values[rc->values_num] = strdup(value);
676   if (rc->values[rc->values_num] != NULL)
677     rc->values_num++;
678
679   if (rc->values_num == 1)
680     rc->first_value = value_time;
681   rc->last_value = value_time;
682
683   /* Insert if this is the first value */
684   if (new_rc == 1) {
685     void *cache_key = strdup(filename);
686
687     if (cache_key == NULL) {
688       char errbuf[1024];
689       sstrerror(errno, errbuf, sizeof(errbuf));
690
691       pthread_mutex_unlock(&cache_lock);
692
693       ERROR("rrdtool plugin: strdup failed: %s", errbuf);
694
695       sfree(rc->values[0]);
696       sfree(rc->values);
697       sfree(rc);
698       return (-1);
699     }
700
701     c_avl_insert(cache, cache_key, rc);
702   }
703
704   DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
705         "values_num = %i; age = %.3f;",
706         filename, rc->values_num,
707         CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
708
709   if ((rc->last_value - rc->first_value) >=
710       (cache_timeout + rc->random_variation)) {
711     /* XXX: If you need to lock both, cache_lock and queue_lock, at
712      * the same time, ALWAYS lock `cache_lock' first! */
713     if (rc->flags == FLAG_NONE) {
714       int status;
715
716       status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
717       if (status == 0)
718         rc->flags = FLAG_QUEUED;
719
720       rc->random_variation = rrd_get_random_variation();
721     } else {
722       DEBUG("rrdtool plugin: `%s' is already queued.", filename);
723     }
724   }
725
726   if ((cache_timeout > 0) &&
727       ((cdtime() - cache_flush_last) > cache_flush_timeout))
728     rrd_cache_flush(cache_timeout + random_timeout);
729
730   pthread_mutex_unlock(&cache_lock);
731
732   return (0);
733 } /* int rrd_cache_insert */
734
735 static int rrd_cache_destroy(void) /* {{{ */
736 {
737   void *key = NULL;
738   void *value = NULL;
739
740   int non_empty = 0;
741
742   pthread_mutex_lock(&cache_lock);
743
744   if (cache == NULL) {
745     pthread_mutex_unlock(&cache_lock);
746     return (0);
747   }
748
749   while (c_avl_pick(cache, &key, &value) == 0) {
750     rrd_cache_t *rc;
751
752     sfree(key);
753     key = NULL;
754
755     rc = value;
756     value = NULL;
757
758     if (rc->values_num > 0)
759       non_empty++;
760
761     for (int i = 0; i < rc->values_num; i++)
762       sfree(rc->values[i]);
763     sfree(rc->values);
764     sfree(rc);
765   }
766
767   c_avl_destroy(cache);
768   cache = NULL;
769
770   if (non_empty > 0) {
771     INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
772          non_empty, (non_empty == 1) ? "entry" : "entries");
773   } else {
774     DEBUG("rrdtool plugin: No values have been lost "
775           "when destroying the cache.");
776   }
777
778   pthread_mutex_unlock(&cache_lock);
779   return (0);
780 } /* }}} int rrd_cache_destroy */
781
782 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
783   int a = *((int *)a_ptr);
784   int b = *((int *)b_ptr);
785
786   if (a < b)
787     return (-1);
788   else if (a > b)
789     return (1);
790   else
791     return (0);
792 } /* int rrd_compare_numeric */
793
794 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
795                      user_data_t __attribute__((unused)) * user_data) {
796   struct stat statbuf;
797   char filename[512];
798   char values[512];
799   int status;
800
801   if (do_shutdown)
802     return (0);
803
804   if (0 != strcmp(ds->type, vl->type)) {
805     ERROR("rrdtool plugin: DS type does not match value list type");
806     return -1;
807   }
808
809   if (value_list_to_filename(filename, sizeof(filename), vl) != 0)
810     return (-1);
811
812   if (value_list_to_string(values, sizeof(values), ds, vl) != 0)
813     return (-1);
814
815   if (stat(filename, &statbuf) == -1) {
816     if (errno == ENOENT) {
817       status = cu_rrd_create_file(filename, ds, vl, &rrdcreate_config);
818       if (status != 0)
819         return (-1);
820       else if (rrdcreate_config.async)
821         return (0);
822     } else {
823       char errbuf[1024];
824       ERROR("stat(%s) failed: %s", filename,
825             sstrerror(errno, errbuf, sizeof(errbuf)));
826       return (-1);
827     }
828   } else if (!S_ISREG(statbuf.st_mode)) {
829     ERROR("stat(%s): Not a regular file!", filename);
830     return (-1);
831   }
832
833   status = rrd_cache_insert(filename, values, vl->time);
834
835   return (status);
836 } /* int rrd_write */
837
838 static int rrd_flush(cdtime_t timeout, const char *identifier,
839                      __attribute__((unused)) user_data_t *user_data) {
840   pthread_mutex_lock(&cache_lock);
841
842   if (cache == NULL) {
843     pthread_mutex_unlock(&cache_lock);
844     return (0);
845   }
846
847   rrd_cache_flush_identifier(timeout, identifier);
848
849   pthread_mutex_unlock(&cache_lock);
850   return (0);
851 } /* int rrd_flush */
852
853 static int rrd_config(const char *key, const char *value) {
854   if (strcasecmp("CacheTimeout", key) == 0) {
855     double tmp = atof(value);
856     if (tmp < 0) {
857       fprintf(stderr, "rrdtool: `CacheTimeout' must "
858                       "be greater than 0.\n");
859       ERROR("rrdtool: `CacheTimeout' must "
860             "be greater than 0.\n");
861       return (1);
862     }
863     cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
864   } else if (strcasecmp("CacheFlush", key) == 0) {
865     double tmp = atof(value);
866     if (tmp < 0) {
867       fprintf(stderr, "rrdtool: `CacheFlush' must "
868                       "be greater than 0.\n");
869       ERROR("rrdtool: `CacheFlush' must "
870             "be greater than 0.\n");
871       return (1);
872     }
873     cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
874   } else if (strcasecmp("DataDir", key) == 0) {
875     char *tmp;
876     size_t len;
877
878     tmp = strdup(value);
879     if (tmp == NULL) {
880       ERROR("rrdtool plugin: strdup failed.");
881       return (1);
882     }
883
884     len = strlen(tmp);
885     while ((len > 0) && (tmp[len - 1] == '/')) {
886       len--;
887       tmp[len] = 0;
888     }
889
890     if (len == 0) {
891       ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
892       sfree(tmp);
893       return (1);
894     }
895
896     if (datadir != NULL) {
897       sfree(datadir);
898     }
899
900     datadir = tmp;
901   } else if (strcasecmp("StepSize", key) == 0) {
902     unsigned long temp = strtoul(value, NULL, 0);
903     if (temp > 0)
904       rrdcreate_config.stepsize = temp;
905   } else if (strcasecmp("HeartBeat", key) == 0) {
906     int temp = atoi(value);
907     if (temp > 0)
908       rrdcreate_config.heartbeat = temp;
909   } else if (strcasecmp("CreateFilesAsync", key) == 0) {
910     if (IS_TRUE(value))
911       rrdcreate_config.async = 1;
912     else
913       rrdcreate_config.async = 0;
914   } else if (strcasecmp("RRARows", key) == 0) {
915     int tmp = atoi(value);
916     if (tmp <= 0) {
917       fprintf(stderr, "rrdtool: `RRARows' must "
918                       "be greater than 0.\n");
919       ERROR("rrdtool: `RRARows' must "
920             "be greater than 0.\n");
921       return (1);
922     }
923     rrdcreate_config.rrarows = tmp;
924   } else if (strcasecmp("RRATimespan", key) == 0) {
925     char *saveptr = NULL;
926     char *dummy;
927     char *ptr;
928     char *value_copy;
929     int *tmp_alloc;
930
931     value_copy = strdup(value);
932     if (value_copy == NULL)
933       return (1);
934
935     dummy = value_copy;
936     while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
937       dummy = NULL;
938
939       tmp_alloc = realloc(rrdcreate_config.timespans,
940                           sizeof(int) * (rrdcreate_config.timespans_num + 1));
941       if (tmp_alloc == NULL) {
942         fprintf(stderr, "rrdtool: realloc failed.\n");
943         ERROR("rrdtool: realloc failed.\n");
944         free(value_copy);
945         return (1);
946       }
947       rrdcreate_config.timespans = tmp_alloc;
948       rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
949       if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
950         rrdcreate_config.timespans_num++;
951     } /* while (strtok_r) */
952
953     qsort(/* base = */ rrdcreate_config.timespans,
954           /* nmemb  = */ rrdcreate_config.timespans_num,
955           /* size   = */ sizeof(rrdcreate_config.timespans[0]),
956           /* compar = */ rrd_compare_numeric);
957
958     free(value_copy);
959   } else if (strcasecmp("XFF", key) == 0) {
960     double tmp = atof(value);
961     if ((tmp < 0.0) || (tmp >= 1.0)) {
962       fprintf(stderr, "rrdtool: `XFF' must "
963                       "be in the range 0 to 1 (exclusive).");
964       ERROR("rrdtool: `XFF' must "
965             "be in the range 0 to 1 (exclusive).");
966       return (1);
967     }
968     rrdcreate_config.xff = tmp;
969   } else if (strcasecmp("WritesPerSecond", key) == 0) {
970     double wps = atof(value);
971
972     if (wps < 0.0) {
973       fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
974                       "greater than or equal to zero.");
975       return (1);
976     } else if (wps == 0.0) {
977       write_rate = 0.0;
978     } else {
979       write_rate = 1.0 / wps;
980     }
981   } else if (strcasecmp("RandomTimeout", key) == 0) {
982     double tmp;
983
984     tmp = atof(value);
985     if (tmp < 0.0) {
986       fprintf(stderr, "rrdtool: `RandomTimeout' must "
987                       "be greater than or equal to zero.\n");
988       ERROR("rrdtool: `RandomTimeout' must "
989             "be greater then or equal to zero.");
990     } else {
991       random_timeout = DOUBLE_TO_CDTIME_T(tmp);
992     }
993   } else {
994     return (-1);
995   }
996   return (0);
997 } /* int rrd_config */
998
999 static int rrd_shutdown(void) {
1000   pthread_mutex_lock(&cache_lock);
1001   rrd_cache_flush(0);
1002   pthread_mutex_unlock(&cache_lock);
1003
1004   pthread_mutex_lock(&queue_lock);
1005   do_shutdown = 1;
1006   pthread_cond_signal(&queue_cond);
1007   pthread_mutex_unlock(&queue_lock);
1008
1009   if ((queue_thread_running != 0) &&
1010       ((queue_head != NULL) || (flushq_head != NULL))) {
1011     INFO("rrdtool plugin: Shutting down the queue thread. "
1012          "This may take a while.");
1013   } else if (queue_thread_running != 0) {
1014     INFO("rrdtool plugin: Shutting down the queue thread.");
1015   }
1016
1017   /* Wait for all the values to be written to disk before returning. */
1018   if (queue_thread_running != 0) {
1019     pthread_join(queue_thread, NULL);
1020     memset(&queue_thread, 0, sizeof(queue_thread));
1021     queue_thread_running = 0;
1022     DEBUG("rrdtool plugin: queue_thread exited.");
1023   }
1024
1025   rrd_cache_destroy();
1026
1027   return (0);
1028 } /* int rrd_shutdown */
1029
1030 static int rrd_init(void) {
1031   static int init_once = 0;
1032   int status;
1033
1034   if (init_once != 0)
1035     return (0);
1036   init_once = 1;
1037
1038   if (rrdcreate_config.heartbeat <= 0)
1039     rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1040
1041   /* Set the cache up */
1042   pthread_mutex_lock(&cache_lock);
1043
1044   cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1045   if (cache == NULL) {
1046     pthread_mutex_unlock(&cache_lock);
1047     ERROR("rrdtool plugin: c_avl_create failed.");
1048     return (-1);
1049   }
1050
1051   cache_flush_last = cdtime();
1052   if (cache_timeout == 0) {
1053     random_timeout = 0;
1054     cache_flush_timeout = 0;
1055   } else if (cache_flush_timeout < cache_timeout) {
1056     INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1057          "%.3f\". Adjusting \"CacheFlush\" to %.3f seconds.",
1058          CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1059          CDTIME_T_TO_DOUBLE(cache_timeout),
1060          CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1061     cache_flush_timeout = 10 * cache_timeout;
1062   }
1063
1064   /* Assure that "cache_timeout + random_variation" is never negative. */
1065   if (random_timeout > cache_timeout) {
1066     INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1067          CDTIME_T_TO_DOUBLE(cache_timeout));
1068     random_timeout = cache_timeout;
1069   }
1070
1071   pthread_mutex_unlock(&cache_lock);
1072
1073   status =
1074       plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1075                            /* args = */ NULL, "rrdtool queue");
1076   if (status != 0) {
1077     ERROR("rrdtool plugin: Cannot create queue-thread.");
1078     return (-1);
1079   }
1080   queue_thread_running = 1;
1081
1082   DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1083         " heartbeat = %i; rrarows = %i; xff = %lf;",
1084         (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1085         rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1086         rrdcreate_config.xff);
1087
1088   return (0);
1089 } /* int rrd_init */
1090
1091 void module_register(void) {
1092   plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1093   plugin_register_init("rrdtool", rrd_init);
1094   plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1095   plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1096   plugin_register_shutdown("rrdtool", rrd_shutdown);
1097 }