2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2013 Florian octo Forster
4 * Copyright (C) 2008-2008 Sebastian Harl
5 * Copyright (C) 2009 Mariusz Gronczewski
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * Florian octo Forster <octo at collectd.org>
22 * Sebastian Harl <sh at tokkee.org>
23 * Mariusz Gronczewski <xani666 at gmail.com>
29 #include "utils/avltree/avltree.h"
30 #include "utils/common/common.h"
31 #include "utils/rrdcreate/rrdcreate.h"
32 #include "utils_random.h"
39 typedef struct rrd_cache_s {
44 int64_t random_variation;
45 enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
53 struct rrd_queue_s *next;
55 typedef struct rrd_queue_s rrd_queue_t;
60 static const char *config_keys[] = {
61 "CacheTimeout", "CacheFlush", "CreateFilesAsync", "DataDir",
62 "StepSize", "HeartBeat", "RRARows", "RRATimespan",
63 "XFF", "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67 * is zero a default, depending on the `interval' member of the value list is
70 static double write_rate;
71 static rrdcreate_config_t rrdcreate_config = {
77 /* timespans = */ NULL,
78 /* timespans_num = */ 0,
80 /* consolidation_functions = */ NULL,
81 /* consolidation_functions_num = */ 0,
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86 * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout;
88 static cdtime_t cache_flush_timeout;
89 static cdtime_t random_timeout;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
94 static rrd_queue_t *queue_head;
95 static rrd_queue_t *queue_tail;
96 static rrd_queue_t *flushq_head;
97 static rrd_queue_t *flushq_tail;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
107 static int do_shutdown;
109 #if HAVE_THREADSAFE_LIBRRD
110 static int srrd_update(char *filename, char *template, int argc,
112 optind = 0; /* bug in librrd? */
115 int status = rrd_update_r(filename, template, argc, (void *)argv);
117 WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
122 } /* int srrd_update */
123 /* #endif HAVE_THREADSAFE_LIBRRD */
125 #else /* !HAVE_THREADSAFE_LIBRRD */
126 static int srrd_update(char *filename, char *template, int argc,
133 assert(template == NULL);
136 new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137 if (new_argv == NULL) {
138 ERROR("rrdtool plugin: malloc failed.");
142 new_argv[0] = "update";
143 new_argv[1] = filename;
145 memcpy(new_argv + 2, argv, argc * sizeof(char *));
146 new_argv[new_argc] = NULL;
148 pthread_mutex_lock(&librrd_lock);
149 optind = 0; /* bug in librrd? */
152 status = rrd_update(new_argc, new_argv);
153 pthread_mutex_unlock(&librrd_lock);
156 WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167 const data_set_t *ds,
168 const value_list_t *vl) {
173 memset(buffer, '\0', buffer_len);
175 tt = CDTIME_T_TO_TIME_T(vl->time);
176 status = ssnprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177 if ((status < 1) || (status >= buffer_len))
181 for (size_t i = 0; i < ds->ds_num; i++) {
182 if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183 (ds->ds[i].type != DS_TYPE_GAUGE) &&
184 (ds->ds[i].type != DS_TYPE_DERIVE) &&
185 (ds->ds[i].type != DS_TYPE_ABSOLUTE))
188 if (ds->ds[i].type == DS_TYPE_COUNTER)
189 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
190 (uint64_t)vl->values[i].counter);
191 else if (ds->ds[i].type == DS_TYPE_GAUGE)
192 status = ssnprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193 vl->values[i].gauge);
194 else if (ds->ds[i].type == DS_TYPE_DERIVE)
195 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196 vl->values[i].derive);
197 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199 vl->values[i].absolute);
201 if ((status < 1) || (status >= (buffer_len - offset)))
205 } /* for ds->ds_num */
208 } /* int value_list_to_string_multiple */
210 static int value_list_to_string(char *buffer, int buffer_len,
211 const data_set_t *ds, const value_list_t *vl) {
216 return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
218 tt = CDTIME_T_TO_TIME_T(vl->time);
219 switch (ds->ds[0].type) {
221 status = ssnprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222 vl->values[0].derive);
225 status = ssnprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226 vl->values[0].gauge);
228 case DS_TYPE_COUNTER:
229 status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
230 (uint64_t)vl->values[0].counter);
232 case DS_TYPE_ABSOLUTE:
233 status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234 vl->values[0].absolute);
240 if ((status < 1) || (status >= buffer_len))
244 } /* int value_list_to_string */
246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247 value_list_t const *vl) {
248 char const suffix[] = ".rrd";
252 if (datadir != NULL) {
253 size_t datadir_len = strlen(datadir) + 1;
255 if (datadir_len >= buffer_size)
258 sstrncpy(buffer, datadir, buffer_size);
259 buffer[datadir_len - 1] = '/';
260 buffer[datadir_len] = 0;
262 buffer += datadir_len;
263 buffer_size -= datadir_len;
266 status = FORMAT_VL(buffer, buffer_size, vl);
270 len = strlen(buffer);
271 assert(len < buffer_size);
275 if (buffer_size <= sizeof(suffix))
278 memcpy(buffer, suffix, sizeof(suffix));
280 } /* int value_list_to_filename */
282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283 struct timeval tv_next_update;
284 struct timeval tv_now;
286 gettimeofday(&tv_next_update, /* timezone = */ NULL);
289 rrd_queue_t *queue_entry;
290 rrd_cache_t *cache_entry;
298 pthread_mutex_lock(&queue_lock);
299 /* Wait for values to arrive */
301 struct timespec ts_wait;
303 while ((flushq_head == NULL) && (queue_head == NULL) &&
305 pthread_cond_wait(&queue_cond, &queue_lock);
307 if ((flushq_head == NULL) && (queue_head == NULL))
310 /* Don't delay if there's something to flush */
311 if (flushq_head != NULL)
314 /* Don't delay if we're shutting down */
315 if (do_shutdown != 0)
318 /* Don't delay if no delay was configured. */
319 if (write_rate <= 0.0)
322 gettimeofday(&tv_now, /* timezone = */ NULL);
323 status = timeval_cmp(tv_next_update, tv_now, NULL);
324 /* We're good to go */
328 /* We're supposed to wait a bit with this update, so we'll
329 * wait for the next addition to the queue or to the end of
330 * the wait period - whichever comes first. */
331 ts_wait.tv_sec = tv_next_update.tv_sec;
332 ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
334 status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335 if (status == ETIMEDOUT)
339 /* XXX: If you need to lock both, cache_lock and queue_lock, at
340 * the same time, ALWAYS lock `cache_lock' first! */
342 /* We're in the shutdown phase */
343 if ((flushq_head == NULL) && (queue_head == NULL)) {
344 pthread_mutex_unlock(&queue_lock);
348 if (flushq_head != NULL) {
349 /* Dequeue the first flush entry */
350 queue_entry = flushq_head;
351 if (flushq_head == flushq_tail)
352 flushq_head = flushq_tail = NULL;
354 flushq_head = flushq_head->next;
355 } else /* if (queue_head != NULL) */
357 /* Dequeue the first regular entry */
358 queue_entry = queue_head;
359 if (queue_head == queue_tail)
360 queue_head = queue_tail = NULL;
362 queue_head = queue_head->next;
365 /* Unlock the queue again */
366 pthread_mutex_unlock(&queue_lock);
368 /* We now need the cache lock so the entry isn't updated while
369 * we make a copy of its values */
370 pthread_mutex_lock(&cache_lock);
372 status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
375 values = cache_entry->values;
376 values_num = cache_entry->values_num;
378 cache_entry->values = NULL;
379 cache_entry->values_num = 0;
380 cache_entry->flags = FLAG_NONE;
383 pthread_mutex_unlock(&cache_lock);
386 sfree(queue_entry->filename);
391 /* Update `tv_next_update' */
392 if (write_rate > 0.0) {
393 gettimeofday(&tv_now, /* timezone = */ NULL);
394 tv_next_update.tv_sec = tv_now.tv_sec;
395 tv_next_update.tv_usec =
396 tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397 while (tv_next_update.tv_usec > 1000000) {
398 tv_next_update.tv_sec++;
399 tv_next_update.tv_usec -= 1000000;
403 /* Write the values to the RRD-file */
404 srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405 DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406 (values_num == 1) ? "" : "s", queue_entry->filename);
408 for (int i = 0; i < values_num; i++) {
412 sfree(queue_entry->filename);
416 pthread_exit((void *)0);
418 } /* void *rrd_queue_thread */
420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421 rrd_queue_t **tail) {
422 rrd_queue_t *queue_entry;
424 queue_entry = malloc(sizeof(*queue_entry));
425 if (queue_entry == NULL)
428 queue_entry->filename = strdup(filename);
429 if (queue_entry->filename == NULL) {
434 queue_entry->next = NULL;
436 pthread_mutex_lock(&queue_lock);
441 (*tail)->next = queue_entry;
444 pthread_cond_signal(&queue_cond);
445 pthread_mutex_unlock(&queue_lock);
448 } /* int rrd_queue_enqueue */
450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451 rrd_queue_t **tail) {
455 pthread_mutex_lock(&queue_lock);
460 while (this != NULL) {
461 if (strcmp(this->filename, filename) == 0)
469 pthread_mutex_unlock(&queue_lock);
476 prev->next = this->next;
478 if (this->next == NULL)
481 pthread_mutex_unlock(&queue_lock);
483 sfree(this->filename);
487 } /* int rrd_queue_dequeue */
489 /* XXX: You must hold "cache_lock" when calling this function! */
490 static void rrd_cache_flush(cdtime_t timeout) {
498 c_avl_iterator_t *iter;
500 DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501 CDTIME_T_TO_DOUBLE(timeout));
505 /* Build a list of entries to be flushed */
506 iter = c_avl_get_iterator(cache);
507 while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508 if (rc->flags != FLAG_NONE)
510 /* timeout == 0 => flush everything */
511 else if ((timeout != 0) && ((now - rc->first_value) < timeout))
513 else if (rc->values_num > 0) {
516 status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
518 rc->flags = FLAG_QUEUED;
519 } else /* ancient and no values -> waste of memory */
521 char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
523 ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
524 c_avl_iterator_destroy(iter);
529 keys[keys_num] = key;
532 } /* while (c_avl_iterator_next) */
533 c_avl_iterator_destroy(iter);
535 for (int i = 0; i < keys_num; i++) {
536 if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
537 DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
541 assert(rc->values == NULL);
542 assert(rc->values_num == 0);
547 } /* for (i = 0..keys_num) */
551 cache_flush_last = now;
552 } /* void rrd_cache_flush */
554 static int rrd_cache_flush_identifier(cdtime_t timeout,
555 const char *identifier) {
561 if (identifier == NULL) {
562 rrd_cache_flush(timeout);
569 ssnprintf(key, sizeof(key), "%s.rrd", identifier);
571 ssnprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
572 key[sizeof(key) - 1] = '\0';
574 status = c_avl_get(cache, key, (void *)&rc);
576 INFO("rrdtool plugin: rrd_cache_flush_identifier: "
577 "c_avl_get (%s) failed. Does that file really exist?",
582 if (rc->flags == FLAG_FLUSHQ) {
584 } else if (rc->flags == FLAG_QUEUED) {
585 rrd_queue_dequeue(key, &queue_head, &queue_tail);
586 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
588 rc->flags = FLAG_FLUSHQ;
589 } else if ((now - rc->first_value) < timeout) {
591 } else if (rc->values_num > 0) {
592 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
594 rc->flags = FLAG_FLUSHQ;
598 } /* int rrd_cache_flush_identifier */
600 static int64_t rrd_get_random_variation(void) {
601 if (random_timeout == 0)
604 return (int64_t)cdrand_range(-random_timeout, random_timeout);
605 } /* int64_t rrd_get_random_variation */
607 static int rrd_cache_insert(const char *filename, const char *value,
608 cdtime_t value_time) {
609 rrd_cache_t *rc = NULL;
613 pthread_mutex_lock(&cache_lock);
615 /* This shouldn't happen, but it did happen at least once, so we'll be
618 pthread_mutex_unlock(&cache_lock);
619 WARNING("rrdtool plugin: cache == NULL.");
623 int status = c_avl_get(cache, filename, (void *)&rc);
624 if ((status != 0) || (rc == NULL)) {
625 rc = malloc(sizeof(*rc));
627 ERROR("rrdtool plugin: malloc failed: %s", STRERRNO);
628 pthread_mutex_unlock(&cache_lock);
635 rc->random_variation = rrd_get_random_variation();
636 rc->flags = FLAG_NONE;
640 assert(value_time > 0); /* plugin_dispatch() ensures this. */
641 if (rc->last_value >= value_time) {
642 pthread_mutex_unlock(&cache_lock);
643 DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
644 ">= (value_time = %" PRIu64 ")",
645 rc->last_value, value_time);
650 realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
651 if (values_new == NULL) {
652 void *cache_key = NULL;
654 c_avl_remove(cache, filename, &cache_key, NULL);
655 pthread_mutex_unlock(&cache_lock);
657 ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
664 rc->values = values_new;
666 rc->values[rc->values_num] = strdup(value);
667 if (rc->values[rc->values_num] != NULL)
670 if (rc->values_num == 1)
671 rc->first_value = value_time;
672 rc->last_value = value_time;
674 /* Insert if this is the first value */
676 void *cache_key = strdup(filename);
678 if (cache_key == NULL) {
679 pthread_mutex_unlock(&cache_lock);
681 ERROR("rrdtool plugin: strdup failed: %s", STRERRNO);
683 sfree(rc->values[0]);
689 c_avl_insert(cache, cache_key, rc);
692 DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
693 "values_num = %i; age = %.3f;",
694 filename, rc->values_num,
695 CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
697 if ((rc->last_value - rc->first_value) >=
698 (cache_timeout + rc->random_variation)) {
699 /* XXX: If you need to lock both, cache_lock and queue_lock, at
700 * the same time, ALWAYS lock `cache_lock' first! */
701 if (rc->flags == FLAG_NONE) {
704 status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
706 rc->flags = FLAG_QUEUED;
708 rc->random_variation = rrd_get_random_variation();
710 DEBUG("rrdtool plugin: `%s' is already queued.", filename);
714 if ((cache_timeout > 0) &&
715 ((cdtime() - cache_flush_last) > cache_flush_timeout))
716 rrd_cache_flush(cache_timeout + random_timeout);
718 pthread_mutex_unlock(&cache_lock);
721 } /* int rrd_cache_insert */
723 static int rrd_cache_destroy(void) /* {{{ */
730 pthread_mutex_lock(&cache_lock);
733 pthread_mutex_unlock(&cache_lock);
737 while (c_avl_pick(cache, &key, &value) == 0) {
746 if (rc->values_num > 0)
749 for (int i = 0; i < rc->values_num; i++)
750 sfree(rc->values[i]);
755 c_avl_destroy(cache);
759 INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
760 non_empty, (non_empty == 1) ? "entry" : "entries");
762 DEBUG("rrdtool plugin: No values have been lost "
763 "when destroying the cache.");
766 pthread_mutex_unlock(&cache_lock);
768 } /* }}} int rrd_cache_destroy */
770 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
771 int a = *((int *)a_ptr);
772 int b = *((int *)b_ptr);
780 } /* int rrd_compare_numeric */
782 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
783 user_data_t __attribute__((unused)) * user_data) {
788 if (0 != strcmp(ds->type, vl->type)) {
789 ERROR("rrdtool plugin: DS type does not match value list type");
793 char filename[PATH_MAX];
794 if (value_list_to_filename(filename, sizeof(filename), vl) != 0) {
795 ERROR("rrdtool plugin: failed to build filename");
799 char values[32 * (ds->ds_num + 1)];
800 if (value_list_to_string(values, sizeof(values), ds, vl) != 0) {
801 ERROR("rrdtool plugin: failed to build values string");
805 struct stat statbuf = {0};
806 if (stat(filename, &statbuf) == -1) {
807 if (errno == ENOENT) {
808 if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
809 ERROR("rrdtool plugin: cu_rrd_create_file (%s) failed.", filename);
811 } else if (rrdcreate_config.async) {
815 ERROR("rrdtool plugin: stat(%s) failed: %s", filename, STRERRNO);
818 } else if (!S_ISREG(statbuf.st_mode)) {
819 ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
823 return rrd_cache_insert(filename, values, vl->time);
824 } /* int rrd_write */
826 static int rrd_flush(cdtime_t timeout, const char *identifier,
827 __attribute__((unused)) user_data_t *user_data) {
828 pthread_mutex_lock(&cache_lock);
831 pthread_mutex_unlock(&cache_lock);
835 rrd_cache_flush_identifier(timeout, identifier);
837 pthread_mutex_unlock(&cache_lock);
839 } /* int rrd_flush */
841 static int rrd_config(const char *key, const char *value) {
842 if (strcasecmp("CacheTimeout", key) == 0) {
843 double tmp = atof(value);
845 fprintf(stderr, "rrdtool: `CacheTimeout' must "
846 "be greater than 0.\n");
847 ERROR("rrdtool: `CacheTimeout' must "
848 "be greater than 0.\n");
851 cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
852 } else if (strcasecmp("CacheFlush", key) == 0) {
853 double tmp = atof(value);
855 fprintf(stderr, "rrdtool: `CacheFlush' must "
856 "be greater than 0.\n");
857 ERROR("rrdtool: `CacheFlush' must "
858 "be greater than 0.\n");
861 cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
862 } else if (strcasecmp("DataDir", key) == 0) {
868 ERROR("rrdtool plugin: strdup failed.");
873 while ((len > 0) && (tmp[len - 1] == '/')) {
879 ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
884 if (datadir != NULL) {
889 } else if (strcasecmp("StepSize", key) == 0) {
890 unsigned long temp = strtoul(value, NULL, 0);
892 rrdcreate_config.stepsize = temp;
893 } else if (strcasecmp("HeartBeat", key) == 0) {
894 int temp = atoi(value);
896 rrdcreate_config.heartbeat = temp;
897 } else if (strcasecmp("CreateFilesAsync", key) == 0) {
899 rrdcreate_config.async = 1;
901 rrdcreate_config.async = 0;
902 } else if (strcasecmp("RRARows", key) == 0) {
903 int tmp = atoi(value);
905 fprintf(stderr, "rrdtool: `RRARows' must "
906 "be greater than 0.\n");
907 ERROR("rrdtool: `RRARows' must "
908 "be greater than 0.\n");
911 rrdcreate_config.rrarows = tmp;
912 } else if (strcasecmp("RRATimespan", key) == 0) {
913 char *saveptr = NULL;
919 value_copy = strdup(value);
920 if (value_copy == NULL)
924 while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
927 tmp_alloc = realloc(rrdcreate_config.timespans,
928 sizeof(int) * (rrdcreate_config.timespans_num + 1));
929 if (tmp_alloc == NULL) {
930 fprintf(stderr, "rrdtool: realloc failed.\n");
931 ERROR("rrdtool: realloc failed.\n");
935 rrdcreate_config.timespans = tmp_alloc;
936 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
937 if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
938 rrdcreate_config.timespans_num++;
939 } /* while (strtok_r) */
941 qsort(/* base = */ rrdcreate_config.timespans,
942 /* nmemb = */ rrdcreate_config.timespans_num,
943 /* size = */ sizeof(rrdcreate_config.timespans[0]),
944 /* compar = */ rrd_compare_numeric);
947 } else if (strcasecmp("XFF", key) == 0) {
948 double tmp = atof(value);
949 if ((tmp < 0.0) || (tmp >= 1.0)) {
950 fprintf(stderr, "rrdtool: `XFF' must "
951 "be in the range 0 to 1 (exclusive).");
952 ERROR("rrdtool: `XFF' must "
953 "be in the range 0 to 1 (exclusive).");
956 rrdcreate_config.xff = tmp;
957 } else if (strcasecmp("WritesPerSecond", key) == 0) {
958 double wps = atof(value);
961 fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
962 "greater than or equal to zero.");
964 } else if (wps == 0.0) {
967 write_rate = 1.0 / wps;
969 } else if (strcasecmp("RandomTimeout", key) == 0) {
974 fprintf(stderr, "rrdtool: `RandomTimeout' must "
975 "be greater than or equal to zero.\n");
976 ERROR("rrdtool: `RandomTimeout' must "
977 "be greater then or equal to zero.");
979 random_timeout = DOUBLE_TO_CDTIME_T(tmp);
985 } /* int rrd_config */
987 static int rrd_shutdown(void) {
988 pthread_mutex_lock(&cache_lock);
990 pthread_mutex_unlock(&cache_lock);
992 pthread_mutex_lock(&queue_lock);
994 pthread_cond_signal(&queue_cond);
995 pthread_mutex_unlock(&queue_lock);
997 if ((queue_thread_running != 0) &&
998 ((queue_head != NULL) || (flushq_head != NULL))) {
999 INFO("rrdtool plugin: Shutting down the queue thread. "
1000 "This may take a while.");
1001 } else if (queue_thread_running != 0) {
1002 INFO("rrdtool plugin: Shutting down the queue thread.");
1005 /* Wait for all the values to be written to disk before returning. */
1006 if (queue_thread_running != 0) {
1007 pthread_join(queue_thread, NULL);
1008 memset(&queue_thread, 0, sizeof(queue_thread));
1009 queue_thread_running = 0;
1010 DEBUG("rrdtool plugin: queue_thread exited.");
1013 rrd_cache_destroy();
1016 } /* int rrd_shutdown */
1018 static int rrd_init(void) {
1019 static int init_once;
1025 if (rrdcreate_config.heartbeat <= 0)
1026 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1028 /* Set the cache up */
1029 pthread_mutex_lock(&cache_lock);
1031 cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1032 if (cache == NULL) {
1033 pthread_mutex_unlock(&cache_lock);
1034 ERROR("rrdtool plugin: c_avl_create failed.");
1038 cache_flush_last = cdtime();
1039 if (cache_timeout == 0) {
1041 cache_flush_timeout = 0;
1042 } else if (cache_flush_timeout < cache_timeout) {
1043 INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1044 "%.3f\". Adjusting \"CacheFlush\" to %.3f seconds.",
1045 CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1046 CDTIME_T_TO_DOUBLE(cache_timeout),
1047 CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1048 cache_flush_timeout = 10 * cache_timeout;
1051 /* Assure that "cache_timeout + random_variation" is never negative. */
1052 if (random_timeout > cache_timeout) {
1053 INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1054 CDTIME_T_TO_DOUBLE(cache_timeout));
1055 random_timeout = cache_timeout;
1058 pthread_mutex_unlock(&cache_lock);
1061 plugin_thread_create(&queue_thread, /* attr = */ NULL, rrd_queue_thread,
1062 /* args = */ NULL, "rrdtool queue");
1064 ERROR("rrdtool plugin: Cannot create queue-thread.");
1067 queue_thread_running = 1;
1069 DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1070 " heartbeat = %i; rrarows = %i; xff = %lf;",
1071 (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1072 rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1073 rrdcreate_config.xff);
1076 } /* int rrd_init */
1078 void module_register(void) {
1079 plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1080 plugin_register_init("rrdtool", rrd_init);
1081 plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1082 plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1083 plugin_register_shutdown("rrdtool", rrd_shutdown);