2 /*****************************************************************************
3 * RRDtool 1.3.1 Copyright by Tobi Oetiker, 1997-2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
8 *****************************************************************************/
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
28 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 #include <sys/timeb.h>
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
45 static int gettimeofday(
47 struct __timezone *tz)
50 struct _timeb current_time;
52 _ftime(¤t_time);
54 t->tv_sec = current_time.time;
55 t->tv_usec = current_time.millitm * 1000;
62 /* FUNCTION PROTOTYPES */
76 static int allocate_data_structures(
79 rrd_value_t **pdp_temp,
82 unsigned long *tmpl_cnt,
83 unsigned long **rra_step_cnt,
84 unsigned long **skip_update,
85 rrd_value_t **pdp_new);
87 static int parse_template(
90 unsigned long *tmpl_cnt,
93 static int process_arg(
97 unsigned long rra_begin,
98 unsigned long *rra_current,
100 unsigned long *current_time_usec,
101 rrd_value_t *pdp_temp,
102 rrd_value_t *pdp_new,
103 unsigned long *rra_step_cnt,
106 unsigned long tmpl_cnt,
107 rrd_info_t ** pcdp_summary,
109 unsigned long *skip_update,
110 int *schedule_smooth);
117 unsigned long tmpl_cnt,
118 time_t *current_time,
119 unsigned long *current_time_usec,
122 static int get_time_from_reading(
126 time_t *current_time,
127 unsigned long *current_time_usec,
130 static int update_pdp_prep(
133 rrd_value_t *pdp_new,
136 static int calculate_elapsed_steps(
138 unsigned long current_time,
139 unsigned long current_time_usec,
143 unsigned long *proc_pdp_cnt);
145 static void simple_update(
148 rrd_value_t *pdp_new);
150 static int process_all_pdp_st(
155 unsigned long elapsed_pdp_st,
156 rrd_value_t *pdp_new,
157 rrd_value_t *pdp_temp);
159 static int process_pdp_st(
161 unsigned long ds_idx,
166 rrd_value_t *pdp_new,
167 rrd_value_t *pdp_temp);
169 static int update_all_cdp_prep(
171 unsigned long *rra_step_cnt,
172 unsigned long rra_begin,
173 rrd_file_t *rrd_file,
174 unsigned long elapsed_pdp_st,
175 unsigned long proc_pdp_cnt,
176 rrd_value_t **last_seasonal_coef,
177 rrd_value_t **seasonal_coef,
178 rrd_value_t *pdp_temp,
179 unsigned long *rra_current,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
199 static void update_cdp(
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
211 static void initialize_cdp_val(
214 rrd_value_t pdp_temp_val,
215 unsigned long elapsed_pdp_st,
216 unsigned long start_pdp_offset,
217 unsigned long pdp_cnt);
219 static void reset_cdp(
221 unsigned long elapsed_pdp_st,
222 rrd_value_t *pdp_temp,
223 rrd_value_t *last_seasonal_coef,
224 rrd_value_t *seasonal_coef,
228 enum cf_en current_cf);
230 static rrd_value_t initialize_average_carry_over(
231 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
244 static int update_aberrant_cdps(
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long *rra_current,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
258 unsigned long *rra_current,
260 unsigned long *skip_update,
261 rrd_info_t ** pcdp_summary);
263 static int write_RRA_row(
264 rrd_file_t *rrd_file,
266 unsigned long rra_idx,
267 unsigned long *rra_current,
268 unsigned short CDP_scratch_idx,
269 rrd_info_t ** pcdp_summary,
272 static int smooth_all_rras(
274 rrd_file_t *rrd_file,
275 unsigned long rra_begin);
278 static int write_changes_to_disk(
280 rrd_file_t *rrd_file,
285 * normalize time as returned by gettimeofday. usec part must
288 static inline void normalize_time(
291 if (t->tv_usec < 0) {
298 * Sets current_time and current_time_usec based on the current time.
299 * current_time_usec is set to 0 if the version number is 1 or 2.
301 static inline void initialize_time(
302 time_t *current_time,
303 unsigned long *current_time_usec,
306 struct timeval tmp_time; /* used for time conversion */
308 gettimeofday(&tmp_time, 0);
309 normalize_time(&tmp_time);
310 *current_time = tmp_time.tv_sec;
312 *current_time_usec = tmp_time.tv_usec;
314 *current_time_usec = 0;
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
320 rrd_info_t *rrd_update_v(
325 rrd_info_t *result = NULL;
327 struct option long_options[] = {
328 {"template", required_argument, 0, 't'},
334 opterr = 0; /* initialize getopt */
337 int option_index = 0;
340 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
351 rrd_set_error("unknown option '%s'", argv[optind - 1]);
356 /* need at least 2 arguments: filename, data. */
357 if (argc - optind < 2) {
358 rrd_set_error("Not enough arguments");
362 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363 rc.u_int = _rrd_update(argv[optind], tmplt,
365 (const char **) (argv + optind + 1), result);
366 result->value.u_int = rc.u_int;
375 struct option long_options[] = {
376 {"template", required_argument, 0, 't'},
379 int option_index = 0;
385 opterr = 0; /* initialize getopt */
388 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
395 tmplt = strdup(optarg);
399 rrd_set_error("unknown option '%s'", argv[optind - 1]);
404 /* need at least 2 arguments: filename, data. */
405 if (argc - optind < 2) {
406 rrd_set_error("Not enough arguments");
410 rc = rrd_update_r(argv[optind], tmplt,
411 argc - optind - 1, (const char **) (argv + optind + 1));
418 const char *filename,
423 return _rrd_update(filename, tmplt, argc, argv, NULL);
427 const char *filename,
431 rrd_info_t * pcdp_summary)
436 unsigned long rra_begin; /* byte pointer to the rra
437 * area in the rrd file. this
438 * pointer never changes value */
439 unsigned long rra_current; /* byte pointer to the current write
440 * spot in the rrd file. */
441 rrd_value_t *pdp_new; /* prepare the incoming data to be added
442 * to the existing entry */
443 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
444 * to the cdp values */
446 long *tmpl_idx; /* index representing the settings
447 * transported by the tmplt index */
448 unsigned long tmpl_cnt = 2; /* time and data */
450 time_t current_time = 0;
451 unsigned long current_time_usec = 0; /* microseconds part of current time */
453 int schedule_smooth = 0;
455 /* number of elapsed PDP steps since last update */
456 unsigned long *rra_step_cnt = NULL;
458 int version; /* rrd version */
459 rrd_file_t *rrd_file;
460 char *arg_copy; /* for processing the argv */
461 unsigned long *skip_update; /* RRAs to advance but not write */
463 /* need at least 1 arguments: data. */
465 rrd_set_error("Not enough arguments");
469 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
472 /* We are now at the beginning of the rra's */
473 rra_current = rra_begin = rrd_file->header_len;
475 version = atoi(rrd.stat_head->version);
477 initialize_time(¤t_time, ¤t_time_usec, version);
479 /* get exclusive lock to whole file.
480 * lock gets removed when we close the file.
482 if (rrd_lock(rrd_file) != 0) {
483 rrd_set_error("could not lock RRD");
487 if (allocate_data_structures(&rrd, &updvals,
488 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489 &rra_step_cnt, &skip_update,
494 /* loop through the arguments. */
495 for (arg_i = 0; arg_i < argc; arg_i++) {
496 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497 rrd_set_error("failed duplication argv entry");
500 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
502 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503 &pcdp_summary, version, skip_update,
504 &schedule_smooth) == -1) {
505 if (rrd_test_error()) { /* Should have error string always here */
508 /* Prepend file name to error message */
509 if ((save_error = strdup(rrd_get_error())) != NULL) {
510 rrd_set_error("%s: %s", filename, save_error);
522 /* if we got here and if there is an error and if the file has not been
523 * written to, then close things up and return. */
524 if (rrd_test_error()) {
525 goto err_free_structures;
528 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
529 goto err_free_structures;
533 /* calling the smoothing code here guarantees at most one smoothing
534 * operation per rrd_update call. Unfortunately, it is possible with bulk
535 * updates, or a long-delayed update for smoothing to occur off-schedule.
536 * This really isn't critical except during the burn-in cycles. */
537 if (schedule_smooth) {
538 smooth_all_rras(&rrd, rrd_file, rra_begin);
541 /* rrd_dontneed(rrd_file,&rrd); */
567 * get exclusive lock to whole file.
568 * lock gets removed when we close the file
570 * returns 0 on success
578 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
581 if (_fstat(file->fd, &st) == 0) {
582 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
589 lock.l_type = F_WRLCK; /* exclusive write lock */
590 lock.l_len = 0; /* whole file */
591 lock.l_start = 0; /* start of file */
592 lock.l_whence = SEEK_SET; /* end of file */
594 rcstat = fcntl(file->fd, F_SETLK, &lock);
602 * Allocate some important arrays used, and initialize the template.
604 * When it returns, either all of the structures are allocated
605 * or none of them are.
607 * Returns 0 on success, -1 on error.
609 static int allocate_data_structures(
612 rrd_value_t **pdp_temp,
615 unsigned long *tmpl_cnt,
616 unsigned long **rra_step_cnt,
617 unsigned long **skip_update,
618 rrd_value_t **pdp_new)
621 if ((*updvals = (char **) malloc(sizeof(char *)
622 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
623 rrd_set_error("allocating updvals pointer array.");
626 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
627 * rrd->stat_head->ds_cnt)) ==
629 rrd_set_error("allocating pdp_temp.");
630 goto err_free_updvals;
632 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
634 rrd->stat_head->rra_cnt)) ==
636 rrd_set_error("allocating skip_update.");
637 goto err_free_pdp_temp;
639 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
640 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641 rrd_set_error("allocating tmpl_idx.");
642 goto err_free_skip_update;
644 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
647 rra_cnt))) == NULL) {
648 rrd_set_error("allocating rra_step_cnt.");
649 goto err_free_tmpl_idx;
652 /* initialize tmplt redirector */
653 /* default config example (assume DS 1 is a CDEF DS)
654 tmpl_idx[0] -> 0; (time)
655 tmpl_idx[1] -> 1; (DS 0)
656 tmpl_idx[2] -> 3; (DS 2)
657 tmpl_idx[3] -> 4; (DS 3) */
658 (*tmpl_idx)[0] = 0; /* time */
659 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
660 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
661 (*tmpl_idx)[ii++] = i;
666 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
667 goto err_free_rra_step_cnt;
671 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
672 * rrd->stat_head->ds_cnt)) == NULL) {
673 rrd_set_error("allocating pdp_new.");
674 goto err_free_rra_step_cnt;
679 err_free_rra_step_cnt:
683 err_free_skip_update:
693 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
695 * Returns 0 on success.
697 static int parse_template(
700 unsigned long *tmpl_cnt,
703 char *dsname, *tmplt_copy;
704 unsigned int tmpl_len, i;
707 *tmpl_cnt = 1; /* the first entry is the time */
709 /* we should work on a writeable copy here */
710 if ((tmplt_copy = strdup(tmplt)) == NULL) {
711 rrd_set_error("error copying tmplt '%s'", tmplt);
717 tmpl_len = strlen(tmplt_copy);
718 for (i = 0; i <= tmpl_len; i++) {
719 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
720 tmplt_copy[i] = '\0';
721 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
722 rrd_set_error("tmplt contains more DS definitions than RRD");
724 goto out_free_tmpl_copy;
726 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
727 rrd_set_error("unknown DS name '%s'", dsname);
729 goto out_free_tmpl_copy;
731 /* go to the next entry on the tmplt_copy */
733 dsname = &tmplt_copy[i + 1];
743 * Parse an update string, updates the primary data points (PDPs)
744 * and consolidated data points (CDPs), and writes changes to the RRAs.
746 * Returns 0 on success, -1 on error.
748 static int process_arg(
751 rrd_file_t *rrd_file,
752 unsigned long rra_begin,
753 unsigned long *rra_current,
754 time_t *current_time,
755 unsigned long *current_time_usec,
756 rrd_value_t *pdp_temp,
757 rrd_value_t *pdp_new,
758 unsigned long *rra_step_cnt,
761 unsigned long tmpl_cnt,
762 rrd_info_t ** pcdp_summary,
764 unsigned long *skip_update,
765 int *schedule_smooth)
767 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
769 /* a vector of future Holt-Winters seasonal coefs */
770 unsigned long elapsed_pdp_st;
772 double interval, pre_int, post_int; /* interval between this and
774 unsigned long proc_pdp_cnt;
775 unsigned long rra_start;
777 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
778 current_time, current_time_usec, version) == -1) {
781 /* seek to the beginning of the rra's */
782 if (*rra_current != rra_begin) {
783 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
784 rrd_set_error("seek error in rrd");
787 *rra_current = rra_begin;
789 rra_start = rra_begin;
791 interval = (double) (*current_time - rrd->live_head->last_up)
792 + (double) ((long) *current_time_usec -
793 (long) rrd->live_head->last_up_usec) / 1e6f;
795 /* process the data sources and update the pdp_prep
796 * area accordingly */
797 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
801 elapsed_pdp_st = calculate_elapsed_steps(rrd,
803 *current_time_usec, interval,
807 /* has a pdp_st moment occurred since the last run ? */
808 if (elapsed_pdp_st == 0) {
809 /* no we have not passed a pdp_st moment. therefore update is simple */
810 simple_update(rrd, interval, pdp_new);
812 /* an pdp_st has occurred. */
813 if (process_all_pdp_st(rrd, interval,
815 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
818 if (update_all_cdp_prep(rrd, rra_step_cnt,
824 pdp_temp, rra_current,
825 skip_update, schedule_smooth) == -1) {
826 goto err_free_coefficients;
828 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
829 elapsed_pdp_st, pdp_temp,
830 &seasonal_coef) == -1) {
831 goto err_free_coefficients;
833 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
834 rra_current, *current_time, skip_update,
835 pcdp_summary) == -1) {
836 goto err_free_coefficients;
838 } /* endif a pdp_st has occurred */
839 rrd->live_head->last_up = *current_time;
840 rrd->live_head->last_up_usec = *current_time_usec;
843 *rrd->legacy_last_up = rrd->live_head->last_up;
846 free(last_seasonal_coef);
849 err_free_coefficients:
851 free(last_seasonal_coef);
856 * Parse a DS string (time + colon-separated values), storing the
857 * results in current_time, current_time_usec, and updvals.
859 * Returns 0 on success, -1 on error.
866 unsigned long tmpl_cnt,
867 time_t *current_time,
868 unsigned long *current_time_usec,
876 /* initialize all ds input to unknown except the first one
877 which has always got to be set */
878 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
881 /* separate all ds elements; first must be examined separately
882 due to alternate time syntax */
883 if ((p = strchr(input, '@')) != NULL) {
885 } else if ((p = strchr(input, ':')) != NULL) {
888 rrd_set_error("expected timestamp not found in data source from %s",
894 updvals[tmpl_idx[i++]] = p + 1;
899 updvals[tmpl_idx[i++]] = p + 1;
905 rrd_set_error("expected %lu data source readings (got %lu) from %s",
906 tmpl_cnt - 1, i, input);
910 if (get_time_from_reading(rrd, timesyntax, updvals,
911 current_time, current_time_usec,
919 * Parse the time in a DS string, store it in current_time and
920 * current_time_usec and verify that it's later than the last
921 * update for this DS.
923 * Returns 0 on success, -1 on error.
925 static int get_time_from_reading(
929 time_t *current_time,
930 unsigned long *current_time_usec,
934 char *parsetime_error = NULL;
936 rrd_time_value_t ds_tv;
937 struct timeval tmp_time; /* used for time conversion */
939 /* get the time from the reading ... handle N */
940 if (timesyntax == '@') { /* at-style */
941 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
942 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
945 if (ds_tv.type == RELATIVE_TO_END_TIME ||
946 ds_tv.type == RELATIVE_TO_START_TIME) {
947 rrd_set_error("specifying time relative to the 'start' "
948 "or 'end' makes no sense here: %s", updvals[0]);
951 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
952 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
953 } else if (strcmp(updvals[0], "N") == 0) {
954 gettimeofday(&tmp_time, 0);
955 normalize_time(&tmp_time);
956 *current_time = tmp_time.tv_sec;
957 *current_time_usec = tmp_time.tv_usec;
959 old_locale = setlocale(LC_NUMERIC, "C");
960 tmp = strtod(updvals[0], 0);
961 setlocale(LC_NUMERIC, old_locale);
962 *current_time = floor(tmp);
963 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
965 /* dont do any correction for old version RRDs */
967 *current_time_usec = 0;
969 if (*current_time < rrd->live_head->last_up ||
970 (*current_time == rrd->live_head->last_up &&
971 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
972 rrd_set_error("illegal attempt to update using time %ld when "
973 "last update time is %ld (minimum one second step)",
974 *current_time, rrd->live_head->last_up);
981 * Update pdp_new by interpreting the updvals according to the DS type
982 * (COUNTER, GAUGE, etc.).
984 * Returns 0 on success, -1 on error.
986 static int update_pdp_prep(
989 rrd_value_t *pdp_new,
992 unsigned long ds_idx;
994 char *endptr; /* used in the conversion */
999 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1000 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1002 /* make sure we do not build diffs with old last_ds values */
1003 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1004 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1005 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1008 /* NOTE: DST_CDEF should never enter this if block, because
1009 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1010 * accidently specified a value for the DST_CDEF. To handle this case,
1011 * an extra check is required. */
1013 if ((updvals[ds_idx + 1][0] != 'U') &&
1014 (dst_idx != DST_CDEF) &&
1015 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1018 /* pdp_new contains rate * time ... eg the bytes transferred during
1019 * the interval. Doing it this way saves a lot of math operations
1024 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1025 if ((updvals[ds_idx + 1][ii] < '0'
1026 || updvals[ds_idx + 1][ii] > '9')
1027 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1028 rrd_set_error("not a simple integer: '%s'",
1029 updvals[ds_idx + 1]);
1033 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1035 rrd_diff(updvals[ds_idx + 1],
1036 rrd->pdp_prep[ds_idx].last_ds);
1037 if (dst_idx == DST_COUNTER) {
1038 /* simple overflow catcher. This will fail
1039 * terribly for non 32 or 64 bit counters
1040 * ... are there any others in SNMP land?
1042 if (pdp_new[ds_idx] < (double) 0.0)
1043 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1044 if (pdp_new[ds_idx] < (double) 0.0)
1045 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1047 rate = pdp_new[ds_idx] / interval;
1049 pdp_new[ds_idx] = DNAN;
1053 old_locale = setlocale(LC_NUMERIC, "C");
1055 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1056 setlocale(LC_NUMERIC, old_locale);
1058 rrd_set_error("converting '%s' to float: %s",
1059 updvals[ds_idx + 1], rrd_strerror(errno));
1062 if (endptr[0] != '\0') {
1064 ("conversion of '%s' to float not complete: tail '%s'",
1065 updvals[ds_idx + 1], endptr);
1068 rate = pdp_new[ds_idx] / interval;
1072 old_locale = setlocale(LC_NUMERIC, "C");
1074 strtod(updvals[ds_idx + 1], &endptr) * interval;
1075 setlocale(LC_NUMERIC, old_locale);
1077 rrd_set_error("converting '%s' to float: %s",
1078 updvals[ds_idx + 1], rrd_strerror(errno));
1081 if (endptr[0] != '\0') {
1083 ("conversion of '%s' to float not complete: tail '%s'",
1084 updvals[ds_idx + 1], endptr);
1087 rate = pdp_new[ds_idx] / interval;
1090 rrd_set_error("rrd contains unknown DS type : '%s'",
1091 rrd->ds_def[ds_idx].dst);
1094 /* break out of this for loop if the error string is set */
1095 if (rrd_test_error()) {
1098 /* make sure pdp_temp is neither too large or too small
1099 * if any of these occur it becomes unknown ...
1100 * sorry folks ... */
1102 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1103 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1104 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1105 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1106 pdp_new[ds_idx] = DNAN;
1109 /* no news is news all the same */
1110 pdp_new[ds_idx] = DNAN;
1114 /* make a copy of the command line argument for the next run */
1116 fprintf(stderr, "prep ds[%lu]\t"
1120 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1123 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1125 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1131 * How many PDP steps have elapsed since the last update? Returns the answer,
1132 * and stores the time between the last update and the last PDP in pre_time,
1133 * and the time between the last PDP and the current time in post_int.
1135 static int calculate_elapsed_steps(
1137 unsigned long current_time,
1138 unsigned long current_time_usec,
1142 unsigned long *proc_pdp_cnt)
1144 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1145 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1147 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1148 * when it was last updated */
1149 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1151 /* when was the current pdp started */
1152 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1153 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1155 /* when did the last pdp_st occur */
1156 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1157 occu_pdp_st = current_time - occu_pdp_age;
1159 if (occu_pdp_st > proc_pdp_st) {
1160 /* OK we passed the pdp_st moment */
1161 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1162 * occurred before the latest
1164 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1165 *post_int = occu_pdp_age; /* how much after it */
1166 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1168 *pre_int = interval;
1172 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1175 printf("proc_pdp_age %lu\t"
1177 "occu_pfp_age %lu\t"
1181 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1182 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1185 /* compute the number of elapsed pdp_st moments */
1186 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1190 * Increment the PDP values by the values in pdp_new, or else initialize them.
1192 static void simple_update(
1195 rrd_value_t *pdp_new)
1199 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1200 if (isnan(pdp_new[i])) {
1201 /* this is not really accurate if we use subsecond data arrival time
1202 should have thought of it when going subsecond resolution ...
1203 sorry next format change we will have it! */
1204 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1207 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1208 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1210 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1219 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1220 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1226 * Call process_pdp_st for each DS.
1228 * Returns 0 on success, -1 on error.
1230 static int process_all_pdp_st(
1235 unsigned long elapsed_pdp_st,
1236 rrd_value_t *pdp_new,
1237 rrd_value_t *pdp_temp)
1239 unsigned long ds_idx;
1241 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1242 rate*seconds which occurred up to the last run.
1243 pdp_new[] contains rate*seconds from the latest run.
1244 pdp_temp[] will contain the rate for cdp */
1246 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1247 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1248 elapsed_pdp_st * rrd->stat_head->pdp_step,
1249 pdp_new, pdp_temp) == -1) {
1253 fprintf(stderr, "PDP UPD ds[%lu]\t"
1254 "elapsed_pdp_st %lu\t"
1257 "new_unkn_sec %5lu\n",
1261 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1262 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1269 * Process an update that occurs after one of the PDP moments.
1270 * Increments the PDP value, sets NAN if time greater than the
1271 * heartbeats have elapsed, processes CDEFs.
1273 * Returns 0 on success, -1 on error.
1275 static int process_pdp_st(
1277 unsigned long ds_idx,
1281 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1282 rrd_value_t *pdp_new,
1283 rrd_value_t *pdp_temp)
1287 /* update pdp_prep to the current pdp_st. */
1288 double pre_unknown = 0.0;
1289 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1290 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1292 rpnstack_t rpnstack; /* used for COMPUTE DS */
1294 rpnstack_init(&rpnstack);
1297 if (isnan(pdp_new[ds_idx])) {
1298 /* a final bit of unknown to be added before calculation
1299 we use a temporary variable for this so that we
1300 don't have to turn integer lines before using the value */
1301 pre_unknown = pre_int;
1303 if (isnan(scratch[PDP_val].u_val)) {
1304 scratch[PDP_val].u_val = 0;
1306 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1309 /* if too much of the pdp_prep is unknown we dump it */
1310 /* if the interval is larger thatn mrhb we get NAN */
1311 if ((interval > mrhb) ||
1312 (rrd->stat_head->pdp_step / 2.0 <
1313 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1314 pdp_temp[ds_idx] = DNAN;
1316 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1317 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1321 /* process CDEF data sources; remember each CDEF DS can
1322 * only reference other DS with a lower index number */
1323 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1327 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1328 /* substitute data values for OP_VARIABLE nodes */
1329 for (i = 0; rpnp[i].op != OP_END; i++) {
1330 if (rpnp[i].op == OP_VARIABLE) {
1331 rpnp[i].op = OP_NUMBER;
1332 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1335 /* run the rpn calculator */
1336 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1338 rpnstack_free(&rpnstack);
1343 /* make pdp_prep ready for the next run */
1344 if (isnan(pdp_new[ds_idx])) {
1345 /* this is not realy accurate if we use subsecond data arival time
1346 should have thought of it when going subsecond resolution ...
1347 sorry next format change we will have it! */
1348 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1349 scratch[PDP_val].u_val = DNAN;
1351 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1352 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1354 rpnstack_free(&rpnstack);
1359 * Iterate over all the RRAs for a given DS and:
1360 * 1. Decide whether to schedule a smooth later
1361 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1364 * Returns 0 on success, -1 on error
1366 static int update_all_cdp_prep(
1368 unsigned long *rra_step_cnt,
1369 unsigned long rra_begin,
1370 rrd_file_t *rrd_file,
1371 unsigned long elapsed_pdp_st,
1372 unsigned long proc_pdp_cnt,
1373 rrd_value_t **last_seasonal_coef,
1374 rrd_value_t **seasonal_coef,
1375 rrd_value_t *pdp_temp,
1376 unsigned long *rra_current,
1377 unsigned long *skip_update,
1378 int *schedule_smooth)
1380 unsigned long rra_idx;
1382 /* index into the CDP scratch array */
1383 enum cf_en current_cf;
1384 unsigned long rra_start;
1386 /* number of rows to be updated in an RRA for a data value. */
1387 unsigned long start_pdp_offset;
1389 rra_start = rra_begin;
1390 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1391 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1393 rrd->rra_def[rra_idx].pdp_cnt -
1394 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1395 skip_update[rra_idx] = 0;
1396 if (start_pdp_offset <= elapsed_pdp_st) {
1397 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1398 rrd->rra_def[rra_idx].pdp_cnt + 1;
1400 rra_step_cnt[rra_idx] = 0;
1403 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1404 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1405 * so that they will be correct for the next observed value; note that for
1406 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1407 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1408 if (rra_step_cnt[rra_idx] > 1) {
1409 skip_update[rra_idx] = 1;
1410 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1411 elapsed_pdp_st, last_seasonal_coef);
1412 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1413 elapsed_pdp_st + 1, seasonal_coef);
1415 /* periodically run a smoother for seasonal effects */
1416 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1419 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1420 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1421 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1424 *schedule_smooth = 1;
1426 *rra_current = rrd_tell(rrd_file);
1428 if (rrd_test_error())
1432 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1433 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1434 current_cf) == -1) {
1438 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1439 sizeof(rrd_value_t);
1445 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1447 static int do_schedule_smooth(
1449 unsigned long rra_idx,
1450 unsigned long elapsed_pdp_st)
1452 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1453 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1454 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1455 unsigned long seasonal_smooth_idx =
1456 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1457 unsigned long *init_seasonal =
1458 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1460 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1461 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1462 * really an RRA level, not a data source within RRA level parameter, but
1463 * the rra_def is read only for rrd_update (not flushed to disk). */
1464 if (*init_seasonal > BURNIN_CYCLES) {
1465 /* someone has no doubt invented a trick to deal with this wrap around,
1466 * but at least this code is clear. */
1467 if (seasonal_smooth_idx > cur_row) {
1468 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1469 * between PDP and CDP */
1470 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1472 /* can't rely on negative numbers because we are working with
1473 * unsigned values */
1474 return (cur_row + elapsed_pdp_st >= row_cnt
1475 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1477 /* mark off one of the burn-in cycles */
1478 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1482 * For a given RRA, iterate over the data sources and call the appropriate
1483 * consolidation function.
1485 * Returns 0 on success, -1 on error.
1487 static int update_cdp_prep(
1489 unsigned long elapsed_pdp_st,
1490 unsigned long start_pdp_offset,
1491 unsigned long *rra_step_cnt,
1493 rrd_value_t *pdp_temp,
1494 rrd_value_t *last_seasonal_coef,
1495 rrd_value_t *seasonal_coef,
1498 unsigned long ds_idx, cdp_idx;
1500 /* update CDP_PREP areas */
1501 /* loop over data soures within each RRA */
1502 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1504 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1506 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1507 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1508 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1509 elapsed_pdp_st, start_pdp_offset,
1510 rrd->rra_def[rra_idx].pdp_cnt,
1511 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1514 /* Nothing to consolidate if there's one PDP per CDP. However, if
1515 * we've missed some PDPs, let's update null counters etc. */
1516 if (elapsed_pdp_st > 2) {
1517 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1518 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1523 if (rrd_test_error())
1525 } /* endif data sources loop */
1530 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1531 * primary value, secondary value, and # of unknowns.
1533 static void update_cdp(
1536 rrd_value_t pdp_temp_val,
1537 unsigned long rra_step_cnt,
1538 unsigned long elapsed_pdp_st,
1539 unsigned long start_pdp_offset,
1540 unsigned long pdp_cnt,
1545 /* shorthand variables */
1546 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1547 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1548 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1549 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1552 /* If we are in this block, as least 1 CDP value will be written to
1553 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1554 * to be written, then the "fill in" value is the CDP_secondary_val
1556 if (isnan(pdp_temp_val)) {
1557 *cdp_unkn_pdp_cnt += start_pdp_offset;
1558 *cdp_secondary_val = DNAN;
1560 /* CDP_secondary value is the RRA "fill in" value for intermediary
1561 * CDP data entries. No matter the CF, the value is the same because
1562 * the average, max, min, and last of a list of identical values is
1563 * the same, namely, the value itself. */
1564 *cdp_secondary_val = pdp_temp_val;
1567 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1568 *cdp_primary_val = DNAN;
1569 if (current_cf == CF_AVERAGE) {
1571 initialize_average_carry_over(pdp_temp_val,
1573 start_pdp_offset, pdp_cnt);
1575 *cdp_val = pdp_temp_val;
1578 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1579 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1580 } /* endif meets xff value requirement for a valid value */
1581 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1582 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1583 if (isnan(pdp_temp_val))
1584 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1586 *cdp_unkn_pdp_cnt = 0;
1587 } else { /* rra_step_cnt[i] == 0 */
1590 if (isnan(*cdp_val)) {
1591 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1594 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1598 if (isnan(pdp_temp_val)) {
1599 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1602 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1609 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1610 * on the type of consolidation function.
1612 static void initialize_cdp_val(
1615 rrd_value_t pdp_temp_val,
1616 unsigned long elapsed_pdp_st,
1617 unsigned long start_pdp_offset,
1618 unsigned long pdp_cnt)
1620 rrd_value_t cum_val, cur_val;
1622 switch (current_cf) {
1624 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1625 cur_val = IFDNAN(pdp_temp_val, 0.0);
1626 scratch[CDP_primary_val].u_val =
1627 (cum_val + cur_val * start_pdp_offset) /
1628 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1629 scratch[CDP_val].u_val =
1630 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1631 start_pdp_offset, pdp_cnt);
1634 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1635 cur_val = IFDNAN(pdp_temp_val, -DINF);
1638 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1640 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1646 if (cur_val > cum_val)
1647 scratch[CDP_primary_val].u_val = cur_val;
1649 scratch[CDP_primary_val].u_val = cum_val;
1650 /* initialize carry over value */
1651 scratch[CDP_val].u_val = pdp_temp_val;
1654 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1655 cur_val = IFDNAN(pdp_temp_val, DINF);
1658 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1660 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1666 if (cur_val < cum_val)
1667 scratch[CDP_primary_val].u_val = cur_val;
1669 scratch[CDP_primary_val].u_val = cum_val;
1670 /* initialize carry over value */
1671 scratch[CDP_val].u_val = pdp_temp_val;
1675 scratch[CDP_primary_val].u_val = pdp_temp_val;
1676 /* initialize carry over value */
1677 scratch[CDP_val].u_val = pdp_temp_val;
1683 * Update the consolidation function for Holt-Winters functions as
1684 * well as other functions that don't actually consolidate multiple
1687 static void reset_cdp(
1689 unsigned long elapsed_pdp_st,
1690 rrd_value_t *pdp_temp,
1691 rrd_value_t *last_seasonal_coef,
1692 rrd_value_t *seasonal_coef,
1696 enum cf_en current_cf)
1698 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1700 switch (current_cf) {
1703 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1704 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1707 case CF_DEVSEASONAL:
1708 /* need to update cached seasonal values, so they are consistent
1709 * with the bulk update */
1710 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1711 * CDP_last_deviation are the same. */
1712 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1713 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1717 /* need to update the null_count and last_null_count.
1718 * even do this for non-DNAN pdp_temp because the
1719 * algorithm is not learning from batch updates. */
1720 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1721 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1724 scratch[CDP_primary_val].u_val = DNAN;
1725 scratch[CDP_secondary_val].u_val = DNAN;
1728 /* do not count missed bulk values as failures */
1729 scratch[CDP_primary_val].u_val = 0;
1730 scratch[CDP_secondary_val].u_val = 0;
1731 /* need to reset violations buffer.
1732 * could do this more carefully, but for now, just
1733 * assume a bulk update wipes away all violations. */
1734 erase_violations(rrd, cdp_idx, rra_idx);
1739 static rrd_value_t initialize_average_carry_over(
1740 rrd_value_t pdp_temp_val,
1741 unsigned long elapsed_pdp_st,
1742 unsigned long start_pdp_offset,
1743 unsigned long pdp_cnt)
1745 /* initialize carry over value */
1746 if (isnan(pdp_temp_val)) {
1749 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1753 * Update or initialize a CDP value based on the consolidation
1756 * Returns the new value.
1758 static rrd_value_t calculate_cdp_val(
1759 rrd_value_t cdp_val,
1760 rrd_value_t pdp_temp_val,
1761 unsigned long elapsed_pdp_st,
1772 if (isnan(cdp_val)) {
1773 if (current_cf == CF_AVERAGE) {
1774 pdp_temp_val *= elapsed_pdp_st;
1777 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1778 i, ii, pdp_temp_val);
1780 return pdp_temp_val;
1782 if (current_cf == CF_AVERAGE)
1783 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1784 if (current_cf == CF_MINIMUM)
1785 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1786 if (current_cf == CF_MAXIMUM)
1787 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1789 return pdp_temp_val;
1793 * For each RRA, update the seasonal values and then call update_aberrant_CF
1794 * for each data source.
1796 * Return 0 on success, -1 on error.
1798 static int update_aberrant_cdps(
1800 rrd_file_t *rrd_file,
1801 unsigned long rra_begin,
1802 unsigned long *rra_current,
1803 unsigned long elapsed_pdp_st,
1804 rrd_value_t *pdp_temp,
1805 rrd_value_t **seasonal_coef)
1807 unsigned long rra_idx, ds_idx, j;
1809 /* number of PDP steps since the last update that
1810 * are assigned to the first CDP to be generated
1811 * since the last update. */
1812 unsigned short scratch_idx;
1813 unsigned long rra_start;
1814 enum cf_en current_cf;
1816 /* this loop is only entered if elapsed_pdp_st < 3 */
1817 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1818 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1819 rra_start = rra_begin;
1820 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1821 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1822 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1823 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1824 if (scratch_idx == CDP_primary_val) {
1825 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1826 elapsed_pdp_st + 1, seasonal_coef);
1828 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1829 elapsed_pdp_st + 2, seasonal_coef);
1831 *rra_current = rrd_tell(rrd_file);
1833 if (rrd_test_error())
1835 /* loop over data soures within each RRA */
1836 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1837 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1838 rra_idx * (rrd->stat_head->ds_cnt) +
1839 ds_idx, rra_idx, ds_idx, scratch_idx,
1843 rra_start += rrd->rra_def[rra_idx].row_cnt
1844 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1851 * Move sequentially through the file, writing one RRA at a time. Note this
1852 * architecture divorces the computation of CDP with flushing updated RRA
1855 * Return 0 on success, -1 on error.
1857 static int write_to_rras(
1859 rrd_file_t *rrd_file,
1860 unsigned long *rra_step_cnt,
1861 unsigned long rra_begin,
1862 unsigned long *rra_current,
1863 time_t current_time,
1864 unsigned long *skip_update,
1865 rrd_info_t ** pcdp_summary)
1867 unsigned long rra_idx;
1868 unsigned long rra_start;
1869 time_t rra_time = 0; /* time of update for a RRA */
1871 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1873 /* Ready to write to disk */
1874 rra_start = rra_begin;
1876 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1877 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1878 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1881 unsigned short scratch_idx;
1882 unsigned long step_subtract;
1884 for (scratch_idx = CDP_primary_val,
1886 rra_step_cnt[rra_idx] > 0;
1887 rra_step_cnt[rra_idx]--,
1888 scratch_idx = CDP_secondary_val,
1889 step_subtract = 2) {
1891 unsigned long rra_pos_new;
1893 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1895 /* increment, with wrap-around */
1896 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1897 rra_ptr->cur_row = 0;
1899 /* we know what our position should be */
1900 rra_pos_new = rra_start
1901 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1903 /* re-seek if the position is wrong or we wrapped around */
1904 if (rra_pos_new != *rra_current || rra_ptr->cur_row == 0) {
1905 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1906 rrd_set_error("seek error in rrd");
1909 *rra_current = rra_pos_new;
1912 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1915 if (skip_update[rra_idx])
1918 if (*pcdp_summary != NULL) {
1919 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1921 rra_time = (current_time - current_time % step_time)
1922 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1926 (rrd_file, rrd, rra_idx, rra_current, scratch_idx,
1927 pcdp_summary, rra_time) == -1)
1931 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1938 * Write out one row of values (one value per DS) to the archive.
1940 * Returns 0 on success, -1 on error.
1942 static int write_RRA_row(
1943 rrd_file_t *rrd_file,
1945 unsigned long rra_idx,
1946 unsigned long *rra_current,
1947 unsigned short CDP_scratch_idx,
1948 rrd_info_t ** pcdp_summary,
1951 unsigned long ds_idx, cdp_idx;
1954 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1955 /* compute the cdp index */
1956 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1958 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1959 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1960 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1962 if (*pcdp_summary != NULL) {
1963 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1964 /* append info to the return hash */
1965 *pcdp_summary = rrd_info_push(*pcdp_summary,
1967 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1968 rrd->rra_def[rra_idx].cf_nam,
1969 rrd->rra_def[rra_idx].pdp_cnt,
1970 rrd->ds_def[ds_idx].ds_nam),
1973 if (rrd_write(rrd_file,
1974 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1975 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1976 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1979 *rra_current += sizeof(rrd_value_t);
1985 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1987 * Returns 0 on success, -1 otherwise
1989 static int smooth_all_rras(
1991 rrd_file_t *rrd_file,
1992 unsigned long rra_begin)
1994 unsigned long rra_start = rra_begin;
1995 unsigned long rra_idx;
1997 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1998 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1999 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2001 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2003 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2004 if (rrd_test_error())
2007 rra_start += rrd->rra_def[rra_idx].row_cnt
2008 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2015 * Flush changes to disk (unless we're using mmap)
2017 * Returns 0 on success, -1 otherwise
2019 static int write_changes_to_disk(
2021 rrd_file_t *rrd_file,
2024 /* we just need to write back the live header portion now */
2025 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2026 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2027 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2029 rrd_set_error("seek rrd for live header writeback");
2033 if (rrd_write(rrd_file, rrd->live_head,
2034 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2035 rrd_set_error("rrd_write live_head to rrd");
2039 if (rrd_write(rrd_file, rrd->legacy_last_up,
2040 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2041 rrd_set_error("rrd_write live_head to rrd");
2047 if (rrd_write(rrd_file, rrd->pdp_prep,
2048 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2049 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2050 rrd_set_error("rrd_write pdp_prep to rrd");
2054 if (rrd_write(rrd_file, rrd->cdp_prep,
2055 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2056 rrd->stat_head->ds_cnt)
2057 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2058 rrd->stat_head->ds_cnt)) {
2060 rrd_set_error("rrd_write cdp_prep to rrd");
2064 if (rrd_write(rrd_file, rrd->rra_ptr,
2065 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2066 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2067 rrd_set_error("rrd_write rra_ptr to rrd");