2 /*****************************************************************************
3 * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 unsigned long *rra_current,
102 time_t *current_time,
103 unsigned long *current_time_usec,
104 rrd_value_t *pdp_temp,
105 rrd_value_t *pdp_new,
106 unsigned long *rra_step_cnt,
109 unsigned long tmpl_cnt,
110 rrd_info_t ** pcdp_summary,
112 unsigned long *skip_update,
113 int *schedule_smooth);
120 unsigned long tmpl_cnt,
121 time_t *current_time,
122 unsigned long *current_time_usec,
125 static int get_time_from_reading(
129 time_t *current_time,
130 unsigned long *current_time_usec,
133 static int update_pdp_prep(
136 rrd_value_t *pdp_new,
139 static int calculate_elapsed_steps(
141 unsigned long current_time,
142 unsigned long current_time_usec,
146 unsigned long *proc_pdp_cnt);
148 static void simple_update(
151 rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
158 unsigned long elapsed_pdp_st,
159 rrd_value_t *pdp_new,
160 rrd_value_t *pdp_temp);
162 static int process_pdp_st(
164 unsigned long ds_idx,
169 rrd_value_t *pdp_new,
170 rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
174 unsigned long *rra_step_cnt,
175 unsigned long rra_begin,
176 rrd_file_t *rrd_file,
177 unsigned long elapsed_pdp_st,
178 unsigned long proc_pdp_cnt,
179 rrd_value_t **last_seasonal_coef,
180 rrd_value_t **seasonal_coef,
181 rrd_value_t *pdp_temp,
182 unsigned long *rra_current,
183 unsigned long *skip_update,
184 int *schedule_smooth);
186 static int do_schedule_smooth(
188 unsigned long rra_idx,
189 unsigned long elapsed_pdp_st);
191 static int update_cdp_prep(
193 unsigned long elapsed_pdp_st,
194 unsigned long start_pdp_offset,
195 unsigned long *rra_step_cnt,
197 rrd_value_t *pdp_temp,
198 rrd_value_t *last_seasonal_coef,
199 rrd_value_t *seasonal_coef,
202 static void update_cdp(
205 rrd_value_t pdp_temp_val,
206 unsigned long rra_step_cnt,
207 unsigned long elapsed_pdp_st,
208 unsigned long start_pdp_offset,
209 unsigned long pdp_cnt,
214 static void initialize_cdp_val(
217 rrd_value_t pdp_temp_val,
218 unsigned long elapsed_pdp_st,
219 unsigned long start_pdp_offset,
220 unsigned long pdp_cnt);
222 static void reset_cdp(
224 unsigned long elapsed_pdp_st,
225 rrd_value_t *pdp_temp,
226 rrd_value_t *last_seasonal_coef,
227 rrd_value_t *seasonal_coef,
231 enum cf_en current_cf);
233 static rrd_value_t initialize_average_carry_over(
234 rrd_value_t pdp_temp_val,
235 unsigned long elapsed_pdp_st,
236 unsigned long start_pdp_offset,
237 unsigned long pdp_cnt);
239 static rrd_value_t calculate_cdp_val(
241 rrd_value_t pdp_temp_val,
242 unsigned long elapsed_pdp_st,
247 static int update_aberrant_cdps(
249 rrd_file_t *rrd_file,
250 unsigned long rra_begin,
251 unsigned long *rra_current,
252 unsigned long elapsed_pdp_st,
253 rrd_value_t *pdp_temp,
254 rrd_value_t **seasonal_coef);
256 static int write_to_rras(
258 rrd_file_t *rrd_file,
259 unsigned long *rra_step_cnt,
260 unsigned long rra_begin,
261 unsigned long *rra_current,
263 unsigned long *skip_update,
264 rrd_info_t ** pcdp_summary);
266 static int write_RRA_row(
267 rrd_file_t *rrd_file,
269 unsigned long rra_idx,
270 unsigned long *rra_current,
271 unsigned short CDP_scratch_idx,
272 rrd_info_t ** pcdp_summary,
275 static int smooth_all_rras(
277 rrd_file_t *rrd_file,
278 unsigned long rra_begin);
281 static int write_changes_to_disk(
283 rrd_file_t *rrd_file,
288 * normalize time as returned by gettimeofday. usec part must
291 static inline void normalize_time(
294 if (t->tv_usec < 0) {
301 * Sets current_time and current_time_usec based on the current time.
302 * current_time_usec is set to 0 if the version number is 1 or 2.
304 static inline void initialize_time(
305 time_t *current_time,
306 unsigned long *current_time_usec,
309 struct timeval tmp_time; /* used for time conversion */
311 gettimeofday(&tmp_time, 0);
312 normalize_time(&tmp_time);
313 *current_time = tmp_time.tv_sec;
315 *current_time_usec = tmp_time.tv_usec;
317 *current_time_usec = 0;
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
323 rrd_info_t *rrd_update_v(
328 rrd_info_t *result = NULL;
330 struct option long_options[] = {
331 {"template", required_argument, 0, 't'},
337 opterr = 0; /* initialize getopt */
340 int option_index = 0;
343 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
354 rrd_set_error("unknown option '%s'", argv[optind - 1]);
359 /* need at least 2 arguments: filename, data. */
360 if (argc - optind < 2) {
361 rrd_set_error("Not enough arguments");
365 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366 rc.u_int = _rrd_update(argv[optind], tmplt,
368 (const char **) (argv + optind + 1), result);
369 result->value.u_int = rc.u_int;
378 struct option long_options[] = {
379 {"template", required_argument, 0, 't'},
380 {"daemon", required_argument, 0, 'd'},
383 int option_index = 0;
390 opterr = 0; /* initialize getopt */
393 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
400 tmplt = strdup(optarg);
406 daemon = strdup (optarg);
409 rrd_set_error("strdup failed.");
415 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 /* need at least 2 arguments: filename, data. */
421 if (argc - optind < 2) {
422 rrd_set_error("Not enough arguments");
426 if ((tmplt != NULL) && (daemon != NULL))
428 rrd_set_error("The caching daemon cannot be used together with "
433 if ((tmplt == NULL) && (daemon == NULL))
437 temp = getenv (ENV_RRDCACHED_ADDRESS);
440 daemon = strdup (temp);
443 rrd_set_error("strdup failed.");
453 status = rrdc_connect (daemon);
456 rrd_set_error("Unable to connect to daemon: %s",
459 : rrd_strerror (status));
463 status = rrdc_update (/* file = */ argv[optind],
464 /* values_num = */ argc - optind - 1,
465 /* values = */ (void *) (argv + optind + 1));
468 rrd_set_error("Failed sending the values to the daemon: %s",
471 : rrd_strerror (status));
476 } /* if (daemon != NULL) */
478 rc = rrd_update_r(argv[optind], tmplt,
479 argc - optind - 1, (const char **) (argv + optind + 1));
495 const char *filename,
500 return _rrd_update(filename, tmplt, argc, argv, NULL);
504 const char *filename,
508 rrd_info_t * pcdp_summary)
513 unsigned long rra_begin; /* byte pointer to the rra
514 * area in the rrd file. this
515 * pointer never changes value */
516 unsigned long rra_current; /* byte pointer to the current write
517 * spot in the rrd file. */
518 rrd_value_t *pdp_new; /* prepare the incoming data to be added
519 * to the existing entry */
520 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
521 * to the cdp values */
523 long *tmpl_idx; /* index representing the settings
524 * transported by the tmplt index */
525 unsigned long tmpl_cnt = 2; /* time and data */
527 time_t current_time = 0;
528 unsigned long current_time_usec = 0; /* microseconds part of current time */
530 int schedule_smooth = 0;
532 /* number of elapsed PDP steps since last update */
533 unsigned long *rra_step_cnt = NULL;
535 int version; /* rrd version */
536 rrd_file_t *rrd_file;
537 char *arg_copy; /* for processing the argv */
538 unsigned long *skip_update; /* RRAs to advance but not write */
540 /* need at least 1 arguments: data. */
542 rrd_set_error("Not enough arguments");
546 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
549 /* We are now at the beginning of the rra's */
550 rra_current = rra_begin = rrd_file->header_len;
552 version = atoi(rrd.stat_head->version);
554 initialize_time(¤t_time, ¤t_time_usec, version);
556 /* get exclusive lock to whole file.
557 * lock gets removed when we close the file.
559 if (rrd_lock(rrd_file) != 0) {
560 rrd_set_error("could not lock RRD");
564 if (allocate_data_structures(&rrd, &updvals,
565 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
566 &rra_step_cnt, &skip_update,
571 /* loop through the arguments. */
572 for (arg_i = 0; arg_i < argc; arg_i++) {
573 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
574 rrd_set_error("failed duplication argv entry");
577 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
578 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
579 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
580 &pcdp_summary, version, skip_update,
581 &schedule_smooth) == -1) {
590 /* if we got here and if there is an error and if the file has not been
591 * written to, then close things up and return. */
592 if (rrd_test_error()) {
593 goto err_free_structures;
596 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
597 goto err_free_structures;
601 /* calling the smoothing code here guarantees at most one smoothing
602 * operation per rrd_update call. Unfortunately, it is possible with bulk
603 * updates, or a long-delayed update for smoothing to occur off-schedule.
604 * This really isn't critical except during the burn-in cycles. */
605 if (schedule_smooth) {
606 smooth_all_rras(&rrd, rrd_file, rra_begin);
609 /* rrd_dontneed(rrd_file,&rrd); */
635 * get exclusive lock to whole file.
636 * lock gets removed when we close the file
638 * returns 0 on success
646 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
649 if (_fstat(file->fd, &st) == 0) {
650 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
657 lock.l_type = F_WRLCK; /* exclusive write lock */
658 lock.l_len = 0; /* whole file */
659 lock.l_start = 0; /* start of file */
660 lock.l_whence = SEEK_SET; /* end of file */
662 rcstat = fcntl(file->fd, F_SETLK, &lock);
670 * Allocate some important arrays used, and initialize the template.
672 * When it returns, either all of the structures are allocated
673 * or none of them are.
675 * Returns 0 on success, -1 on error.
677 static int allocate_data_structures(
680 rrd_value_t **pdp_temp,
683 unsigned long *tmpl_cnt,
684 unsigned long **rra_step_cnt,
685 unsigned long **skip_update,
686 rrd_value_t **pdp_new)
689 if ((*updvals = (char **) malloc(sizeof(char *)
690 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
691 rrd_set_error("allocating updvals pointer array.");
694 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
695 * rrd->stat_head->ds_cnt)) ==
697 rrd_set_error("allocating pdp_temp.");
698 goto err_free_updvals;
700 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
702 rrd->stat_head->rra_cnt)) ==
704 rrd_set_error("allocating skip_update.");
705 goto err_free_pdp_temp;
707 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
708 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
709 rrd_set_error("allocating tmpl_idx.");
710 goto err_free_skip_update;
712 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
715 rra_cnt))) == NULL) {
716 rrd_set_error("allocating rra_step_cnt.");
717 goto err_free_tmpl_idx;
720 /* initialize tmplt redirector */
721 /* default config example (assume DS 1 is a CDEF DS)
722 tmpl_idx[0] -> 0; (time)
723 tmpl_idx[1] -> 1; (DS 0)
724 tmpl_idx[2] -> 3; (DS 2)
725 tmpl_idx[3] -> 4; (DS 3) */
726 (*tmpl_idx)[0] = 0; /* time */
727 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
728 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
729 (*tmpl_idx)[ii++] = i;
734 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
735 goto err_free_rra_step_cnt;
739 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
740 * rrd->stat_head->ds_cnt)) == NULL) {
741 rrd_set_error("allocating pdp_new.");
742 goto err_free_rra_step_cnt;
747 err_free_rra_step_cnt:
751 err_free_skip_update:
761 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
763 * Returns 0 on success.
765 static int parse_template(
768 unsigned long *tmpl_cnt,
771 char *dsname, *tmplt_copy;
772 unsigned int tmpl_len, i;
775 *tmpl_cnt = 1; /* the first entry is the time */
777 /* we should work on a writeable copy here */
778 if ((tmplt_copy = strdup(tmplt)) == NULL) {
779 rrd_set_error("error copying tmplt '%s'", tmplt);
785 tmpl_len = strlen(tmplt_copy);
786 for (i = 0; i <= tmpl_len; i++) {
787 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
788 tmplt_copy[i] = '\0';
789 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
790 rrd_set_error("tmplt contains more DS definitions than RRD");
792 goto out_free_tmpl_copy;
794 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
795 rrd_set_error("unknown DS name '%s'", dsname);
797 goto out_free_tmpl_copy;
799 /* go to the next entry on the tmplt_copy */
801 dsname = &tmplt_copy[i + 1];
811 * Parse an update string, updates the primary data points (PDPs)
812 * and consolidated data points (CDPs), and writes changes to the RRAs.
814 * Returns 0 on success, -1 on error.
816 static int process_arg(
819 rrd_file_t *rrd_file,
820 unsigned long rra_begin,
821 unsigned long *rra_current,
822 time_t *current_time,
823 unsigned long *current_time_usec,
824 rrd_value_t *pdp_temp,
825 rrd_value_t *pdp_new,
826 unsigned long *rra_step_cnt,
829 unsigned long tmpl_cnt,
830 rrd_info_t ** pcdp_summary,
832 unsigned long *skip_update,
833 int *schedule_smooth)
835 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
837 /* a vector of future Holt-Winters seasonal coefs */
838 unsigned long elapsed_pdp_st;
840 double interval, pre_int, post_int; /* interval between this and
842 unsigned long proc_pdp_cnt;
843 unsigned long rra_start;
845 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
846 current_time, current_time_usec, version) == -1) {
849 /* seek to the beginning of the rra's */
850 if (*rra_current != rra_begin) {
852 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
853 rrd_set_error("seek error in rrd");
857 *rra_current = rra_begin;
859 rra_start = rra_begin;
861 interval = (double) (*current_time - rrd->live_head->last_up)
862 + (double) ((long) *current_time_usec -
863 (long) rrd->live_head->last_up_usec) / 1e6f;
865 /* process the data sources and update the pdp_prep
866 * area accordingly */
867 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
871 elapsed_pdp_st = calculate_elapsed_steps(rrd,
873 *current_time_usec, interval,
877 /* has a pdp_st moment occurred since the last run ? */
878 if (elapsed_pdp_st == 0) {
879 /* no we have not passed a pdp_st moment. therefore update is simple */
880 simple_update(rrd, interval, pdp_new);
882 /* an pdp_st has occurred. */
883 if (process_all_pdp_st(rrd, interval,
885 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
888 if (update_all_cdp_prep(rrd, rra_step_cnt,
894 pdp_temp, rra_current,
895 skip_update, schedule_smooth) == -1) {
896 goto err_free_coefficients;
898 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
899 elapsed_pdp_st, pdp_temp,
900 &seasonal_coef) == -1) {
901 goto err_free_coefficients;
903 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
904 rra_current, *current_time, skip_update,
905 pcdp_summary) == -1) {
906 goto err_free_coefficients;
908 } /* endif a pdp_st has occurred */
909 rrd->live_head->last_up = *current_time;
910 rrd->live_head->last_up_usec = *current_time_usec;
913 *rrd->legacy_last_up = rrd->live_head->last_up;
916 free(last_seasonal_coef);
919 err_free_coefficients:
921 free(last_seasonal_coef);
926 * Parse a DS string (time + colon-separated values), storing the
927 * results in current_time, current_time_usec, and updvals.
929 * Returns 0 on success, -1 on error.
936 unsigned long tmpl_cnt,
937 time_t *current_time,
938 unsigned long *current_time_usec,
946 /* initialize all ds input to unknown except the first one
947 which has always got to be set */
948 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
951 /* separate all ds elements; first must be examined separately
952 due to alternate time syntax */
953 if ((p = strchr(input, '@')) != NULL) {
955 } else if ((p = strchr(input, ':')) != NULL) {
958 rrd_set_error("expected timestamp not found in data source from %s",
964 updvals[tmpl_idx[i++]] = p + 1;
969 updvals[tmpl_idx[i++]] = p + 1;
975 rrd_set_error("expected %lu data source readings (got %lu) from %s",
976 tmpl_cnt - 1, i, input);
980 if (get_time_from_reading(rrd, timesyntax, updvals,
981 current_time, current_time_usec,
989 * Parse the time in a DS string, store it in current_time and
990 * current_time_usec and verify that it's later than the last
991 * update for this DS.
993 * Returns 0 on success, -1 on error.
995 static int get_time_from_reading(
999 time_t *current_time,
1000 unsigned long *current_time_usec,
1004 char *parsetime_error = NULL;
1006 rrd_time_value_t ds_tv;
1007 struct timeval tmp_time; /* used for time conversion */
1009 /* get the time from the reading ... handle N */
1010 if (timesyntax == '@') { /* at-style */
1011 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1012 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1015 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1016 ds_tv.type == RELATIVE_TO_START_TIME) {
1017 rrd_set_error("specifying time relative to the 'start' "
1018 "or 'end' makes no sense here: %s", updvals[0]);
1021 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1022 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1023 } else if (strcmp(updvals[0], "N") == 0) {
1024 gettimeofday(&tmp_time, 0);
1025 normalize_time(&tmp_time);
1026 *current_time = tmp_time.tv_sec;
1027 *current_time_usec = tmp_time.tv_usec;
1029 old_locale = setlocale(LC_NUMERIC, "C");
1030 tmp = strtod(updvals[0], 0);
1031 setlocale(LC_NUMERIC, old_locale);
1032 *current_time = floor(tmp);
1033 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1035 /* dont do any correction for old version RRDs */
1037 *current_time_usec = 0;
1039 if (*current_time < rrd->live_head->last_up ||
1040 (*current_time == rrd->live_head->last_up &&
1041 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1042 rrd_set_error("illegal attempt to update using time %ld when "
1043 "last update time is %ld (minimum one second step)",
1044 *current_time, rrd->live_head->last_up);
1051 * Update pdp_new by interpreting the updvals according to the DS type
1052 * (COUNTER, GAUGE, etc.).
1054 * Returns 0 on success, -1 on error.
1056 static int update_pdp_prep(
1059 rrd_value_t *pdp_new,
1062 unsigned long ds_idx;
1064 char *endptr; /* used in the conversion */
1067 enum dst_en dst_idx;
1069 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1070 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1072 /* make sure we do not build diffs with old last_ds values */
1073 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1074 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1075 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1078 /* NOTE: DST_CDEF should never enter this if block, because
1079 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1080 * accidently specified a value for the DST_CDEF. To handle this case,
1081 * an extra check is required. */
1083 if ((updvals[ds_idx + 1][0] != 'U') &&
1084 (dst_idx != DST_CDEF) &&
1085 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1088 /* pdp_new contains rate * time ... eg the bytes transferred during
1089 * the interval. Doing it this way saves a lot of math operations
1094 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1095 if ((updvals[ds_idx + 1][ii] < '0'
1096 || updvals[ds_idx + 1][ii] > '9')
1097 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1098 rrd_set_error("not a simple integer: '%s'",
1099 updvals[ds_idx + 1]);
1103 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1105 rrd_diff(updvals[ds_idx + 1],
1106 rrd->pdp_prep[ds_idx].last_ds);
1107 if (dst_idx == DST_COUNTER) {
1108 /* simple overflow catcher. This will fail
1109 * terribly for non 32 or 64 bit counters
1110 * ... are there any others in SNMP land?
1112 if (pdp_new[ds_idx] < (double) 0.0)
1113 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1114 if (pdp_new[ds_idx] < (double) 0.0)
1115 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1117 rate = pdp_new[ds_idx] / interval;
1119 pdp_new[ds_idx] = DNAN;
1123 old_locale = setlocale(LC_NUMERIC, "C");
1125 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1126 setlocale(LC_NUMERIC, old_locale);
1128 rrd_set_error("converting '%s' to float: %s",
1129 updvals[ds_idx + 1], rrd_strerror(errno));
1132 if (endptr[0] != '\0') {
1134 ("conversion of '%s' to float not complete: tail '%s'",
1135 updvals[ds_idx + 1], endptr);
1138 rate = pdp_new[ds_idx] / interval;
1142 old_locale = setlocale(LC_NUMERIC, "C");
1144 strtod(updvals[ds_idx + 1], &endptr) * interval;
1145 setlocale(LC_NUMERIC, old_locale);
1147 rrd_set_error("converting '%s' to float: %s",
1148 updvals[ds_idx + 1], rrd_strerror(errno));
1151 if (endptr[0] != '\0') {
1153 ("conversion of '%s' to float not complete: tail '%s'",
1154 updvals[ds_idx + 1], endptr);
1157 rate = pdp_new[ds_idx] / interval;
1160 rrd_set_error("rrd contains unknown DS type : '%s'",
1161 rrd->ds_def[ds_idx].dst);
1164 /* break out of this for loop if the error string is set */
1165 if (rrd_test_error()) {
1168 /* make sure pdp_temp is neither too large or too small
1169 * if any of these occur it becomes unknown ...
1170 * sorry folks ... */
1172 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1173 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1174 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1175 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1176 pdp_new[ds_idx] = DNAN;
1179 /* no news is news all the same */
1180 pdp_new[ds_idx] = DNAN;
1184 /* make a copy of the command line argument for the next run */
1186 fprintf(stderr, "prep ds[%lu]\t"
1190 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1193 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1195 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1201 * How many PDP steps have elapsed since the last update? Returns the answer,
1202 * and stores the time between the last update and the last PDP in pre_time,
1203 * and the time between the last PDP and the current time in post_int.
1205 static int calculate_elapsed_steps(
1207 unsigned long current_time,
1208 unsigned long current_time_usec,
1212 unsigned long *proc_pdp_cnt)
1214 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1215 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1217 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1218 * when it was last updated */
1219 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1221 /* when was the current pdp started */
1222 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1223 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1225 /* when did the last pdp_st occur */
1226 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1227 occu_pdp_st = current_time - occu_pdp_age;
1229 if (occu_pdp_st > proc_pdp_st) {
1230 /* OK we passed the pdp_st moment */
1231 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1232 * occurred before the latest
1234 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1235 *post_int = occu_pdp_age; /* how much after it */
1236 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1238 *pre_int = interval;
1242 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1245 printf("proc_pdp_age %lu\t"
1247 "occu_pfp_age %lu\t"
1251 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1252 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1255 /* compute the number of elapsed pdp_st moments */
1256 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1260 * Increment the PDP values by the values in pdp_new, or else initialize them.
1262 static void simple_update(
1265 rrd_value_t *pdp_new)
1269 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1270 if (isnan(pdp_new[i])) {
1271 /* this is not really accurate if we use subsecond data arrival time
1272 should have thought of it when going subsecond resolution ...
1273 sorry next format change we will have it! */
1274 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1277 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1278 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1280 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1289 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1290 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1296 * Call process_pdp_st for each DS.
1298 * Returns 0 on success, -1 on error.
1300 static int process_all_pdp_st(
1305 unsigned long elapsed_pdp_st,
1306 rrd_value_t *pdp_new,
1307 rrd_value_t *pdp_temp)
1309 unsigned long ds_idx;
1311 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1312 rate*seconds which occurred up to the last run.
1313 pdp_new[] contains rate*seconds from the latest run.
1314 pdp_temp[] will contain the rate for cdp */
1316 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1317 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1318 elapsed_pdp_st * rrd->stat_head->pdp_step,
1319 pdp_new, pdp_temp) == -1) {
1323 fprintf(stderr, "PDP UPD ds[%lu]\t"
1324 "elapsed_pdp_st %lu\t"
1327 "new_unkn_sec %5lu\n",
1331 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1332 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1339 * Process an update that occurs after one of the PDP moments.
1340 * Increments the PDP value, sets NAN if time greater than the
1341 * heartbeats have elapsed, processes CDEFs.
1343 * Returns 0 on success, -1 on error.
1345 static int process_pdp_st(
1347 unsigned long ds_idx,
1351 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1352 rrd_value_t *pdp_new,
1353 rrd_value_t *pdp_temp)
1357 /* update pdp_prep to the current pdp_st. */
1358 double pre_unknown = 0.0;
1359 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1360 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1362 rpnstack_t rpnstack; /* used for COMPUTE DS */
1364 rpnstack_init(&rpnstack);
1367 if (isnan(pdp_new[ds_idx])) {
1368 /* a final bit of unknown to be added before calculation
1369 we use a temporary variable for this so that we
1370 don't have to turn integer lines before using the value */
1371 pre_unknown = pre_int;
1373 if (isnan(scratch[PDP_val].u_val)) {
1374 scratch[PDP_val].u_val = 0;
1376 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1379 /* if too much of the pdp_prep is unknown we dump it */
1380 /* if the interval is larger thatn mrhb we get NAN */
1381 if ((interval > mrhb) ||
1382 (rrd->stat_head->pdp_step / 2.0 <
1383 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1384 pdp_temp[ds_idx] = DNAN;
1386 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1387 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1391 /* process CDEF data sources; remember each CDEF DS can
1392 * only reference other DS with a lower index number */
1393 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1397 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1398 /* substitute data values for OP_VARIABLE nodes */
1399 for (i = 0; rpnp[i].op != OP_END; i++) {
1400 if (rpnp[i].op == OP_VARIABLE) {
1401 rpnp[i].op = OP_NUMBER;
1402 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1405 /* run the rpn calculator */
1406 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1408 rpnstack_free(&rpnstack);
1413 /* make pdp_prep ready for the next run */
1414 if (isnan(pdp_new[ds_idx])) {
1415 /* this is not realy accurate if we use subsecond data arival time
1416 should have thought of it when going subsecond resolution ...
1417 sorry next format change we will have it! */
1418 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1419 scratch[PDP_val].u_val = DNAN;
1421 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1422 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1424 rpnstack_free(&rpnstack);
1429 * Iterate over all the RRAs for a given DS and:
1430 * 1. Decide whether to schedule a smooth later
1431 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1434 * Returns 0 on success, -1 on error
1436 static int update_all_cdp_prep(
1438 unsigned long *rra_step_cnt,
1439 unsigned long rra_begin,
1440 rrd_file_t *rrd_file,
1441 unsigned long elapsed_pdp_st,
1442 unsigned long proc_pdp_cnt,
1443 rrd_value_t **last_seasonal_coef,
1444 rrd_value_t **seasonal_coef,
1445 rrd_value_t *pdp_temp,
1446 unsigned long *rra_current,
1447 unsigned long *skip_update,
1448 int *schedule_smooth)
1450 unsigned long rra_idx;
1452 /* index into the CDP scratch array */
1453 enum cf_en current_cf;
1454 unsigned long rra_start;
1456 /* number of rows to be updated in an RRA for a data value. */
1457 unsigned long start_pdp_offset;
1459 rra_start = rra_begin;
1460 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1461 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1463 rrd->rra_def[rra_idx].pdp_cnt -
1464 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1465 skip_update[rra_idx] = 0;
1466 if (start_pdp_offset <= elapsed_pdp_st) {
1467 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1468 rrd->rra_def[rra_idx].pdp_cnt + 1;
1470 rra_step_cnt[rra_idx] = 0;
1473 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1474 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1475 * so that they will be correct for the next observed value; note that for
1476 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1477 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1478 if (rra_step_cnt[rra_idx] > 1) {
1479 skip_update[rra_idx] = 1;
1480 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1481 elapsed_pdp_st, last_seasonal_coef);
1482 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1483 elapsed_pdp_st + 1, seasonal_coef);
1485 /* periodically run a smoother for seasonal effects */
1486 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1489 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1490 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1491 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1494 *schedule_smooth = 1;
1496 *rra_current = rrd_tell(rrd_file);
1498 if (rrd_test_error())
1502 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1503 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1504 current_cf) == -1) {
1508 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1509 sizeof(rrd_value_t);
1515 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1517 static int do_schedule_smooth(
1519 unsigned long rra_idx,
1520 unsigned long elapsed_pdp_st)
1522 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1523 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1524 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1525 unsigned long seasonal_smooth_idx =
1526 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1527 unsigned long *init_seasonal =
1528 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1530 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1531 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1532 * really an RRA level, not a data source within RRA level parameter, but
1533 * the rra_def is read only for rrd_update (not flushed to disk). */
1534 if (*init_seasonal > BURNIN_CYCLES) {
1535 /* someone has no doubt invented a trick to deal with this wrap around,
1536 * but at least this code is clear. */
1537 if (seasonal_smooth_idx > cur_row) {
1538 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1539 * between PDP and CDP */
1540 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1542 /* can't rely on negative numbers because we are working with
1543 * unsigned values */
1544 return (cur_row + elapsed_pdp_st >= row_cnt
1545 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1547 /* mark off one of the burn-in cycles */
1548 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1552 * For a given RRA, iterate over the data sources and call the appropriate
1553 * consolidation function.
1555 * Returns 0 on success, -1 on error.
1557 static int update_cdp_prep(
1559 unsigned long elapsed_pdp_st,
1560 unsigned long start_pdp_offset,
1561 unsigned long *rra_step_cnt,
1563 rrd_value_t *pdp_temp,
1564 rrd_value_t *last_seasonal_coef,
1565 rrd_value_t *seasonal_coef,
1568 unsigned long ds_idx, cdp_idx;
1570 /* update CDP_PREP areas */
1571 /* loop over data soures within each RRA */
1572 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1574 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1576 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1577 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1578 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1579 elapsed_pdp_st, start_pdp_offset,
1580 rrd->rra_def[rra_idx].pdp_cnt,
1581 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1584 /* Nothing to consolidate if there's one PDP per CDP. However, if
1585 * we've missed some PDPs, let's update null counters etc. */
1586 if (elapsed_pdp_st > 2) {
1587 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1588 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1593 if (rrd_test_error())
1595 } /* endif data sources loop */
1600 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1601 * primary value, secondary value, and # of unknowns.
1603 static void update_cdp(
1606 rrd_value_t pdp_temp_val,
1607 unsigned long rra_step_cnt,
1608 unsigned long elapsed_pdp_st,
1609 unsigned long start_pdp_offset,
1610 unsigned long pdp_cnt,
1615 /* shorthand variables */
1616 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1617 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1618 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1619 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1622 /* If we are in this block, as least 1 CDP value will be written to
1623 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1624 * to be written, then the "fill in" value is the CDP_secondary_val
1626 if (isnan(pdp_temp_val)) {
1627 *cdp_unkn_pdp_cnt += start_pdp_offset;
1628 *cdp_secondary_val = DNAN;
1630 /* CDP_secondary value is the RRA "fill in" value for intermediary
1631 * CDP data entries. No matter the CF, the value is the same because
1632 * the average, max, min, and last of a list of identical values is
1633 * the same, namely, the value itself. */
1634 *cdp_secondary_val = pdp_temp_val;
1637 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1638 *cdp_primary_val = DNAN;
1639 if (current_cf == CF_AVERAGE) {
1641 initialize_average_carry_over(pdp_temp_val,
1643 start_pdp_offset, pdp_cnt);
1645 *cdp_val = pdp_temp_val;
1648 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1649 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1650 } /* endif meets xff value requirement for a valid value */
1651 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1652 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1653 if (isnan(pdp_temp_val))
1654 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1656 *cdp_unkn_pdp_cnt = 0;
1657 } else { /* rra_step_cnt[i] == 0 */
1660 if (isnan(*cdp_val)) {
1661 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1664 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1668 if (isnan(pdp_temp_val)) {
1669 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1672 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1679 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1680 * on the type of consolidation function.
1682 static void initialize_cdp_val(
1685 rrd_value_t pdp_temp_val,
1686 unsigned long elapsed_pdp_st,
1687 unsigned long start_pdp_offset,
1688 unsigned long pdp_cnt)
1690 rrd_value_t cum_val, cur_val;
1692 switch (current_cf) {
1694 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1695 cur_val = IFDNAN(pdp_temp_val, 0.0);
1696 scratch[CDP_primary_val].u_val =
1697 (cum_val + cur_val * start_pdp_offset) /
1698 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1699 scratch[CDP_val].u_val =
1700 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1701 start_pdp_offset, pdp_cnt);
1704 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1705 cur_val = IFDNAN(pdp_temp_val, -DINF);
1708 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1710 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1716 if (cur_val > cum_val)
1717 scratch[CDP_primary_val].u_val = cur_val;
1719 scratch[CDP_primary_val].u_val = cum_val;
1720 /* initialize carry over value */
1721 scratch[CDP_val].u_val = pdp_temp_val;
1724 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1725 cur_val = IFDNAN(pdp_temp_val, DINF);
1728 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1730 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1736 if (cur_val < cum_val)
1737 scratch[CDP_primary_val].u_val = cur_val;
1739 scratch[CDP_primary_val].u_val = cum_val;
1740 /* initialize carry over value */
1741 scratch[CDP_val].u_val = pdp_temp_val;
1745 scratch[CDP_primary_val].u_val = pdp_temp_val;
1746 /* initialize carry over value */
1747 scratch[CDP_val].u_val = pdp_temp_val;
1753 * Update the consolidation function for Holt-Winters functions as
1754 * well as other functions that don't actually consolidate multiple
1757 static void reset_cdp(
1759 unsigned long elapsed_pdp_st,
1760 rrd_value_t *pdp_temp,
1761 rrd_value_t *last_seasonal_coef,
1762 rrd_value_t *seasonal_coef,
1766 enum cf_en current_cf)
1768 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1770 switch (current_cf) {
1773 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1774 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1777 case CF_DEVSEASONAL:
1778 /* need to update cached seasonal values, so they are consistent
1779 * with the bulk update */
1780 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1781 * CDP_last_deviation are the same. */
1782 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1783 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1787 /* need to update the null_count and last_null_count.
1788 * even do this for non-DNAN pdp_temp because the
1789 * algorithm is not learning from batch updates. */
1790 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1791 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1794 scratch[CDP_primary_val].u_val = DNAN;
1795 scratch[CDP_secondary_val].u_val = DNAN;
1798 /* do not count missed bulk values as failures */
1799 scratch[CDP_primary_val].u_val = 0;
1800 scratch[CDP_secondary_val].u_val = 0;
1801 /* need to reset violations buffer.
1802 * could do this more carefully, but for now, just
1803 * assume a bulk update wipes away all violations. */
1804 erase_violations(rrd, cdp_idx, rra_idx);
1809 static rrd_value_t initialize_average_carry_over(
1810 rrd_value_t pdp_temp_val,
1811 unsigned long elapsed_pdp_st,
1812 unsigned long start_pdp_offset,
1813 unsigned long pdp_cnt)
1815 /* initialize carry over value */
1816 if (isnan(pdp_temp_val)) {
1819 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1823 * Update or initialize a CDP value based on the consolidation
1826 * Returns the new value.
1828 static rrd_value_t calculate_cdp_val(
1829 rrd_value_t cdp_val,
1830 rrd_value_t pdp_temp_val,
1831 unsigned long elapsed_pdp_st,
1842 if (isnan(cdp_val)) {
1843 if (current_cf == CF_AVERAGE) {
1844 pdp_temp_val *= elapsed_pdp_st;
1847 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1848 i, ii, pdp_temp_val);
1850 return pdp_temp_val;
1852 if (current_cf == CF_AVERAGE)
1853 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1854 if (current_cf == CF_MINIMUM)
1855 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1856 if (current_cf == CF_MAXIMUM)
1857 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1859 return pdp_temp_val;
1863 * For each RRA, update the seasonal values and then call update_aberrant_CF
1864 * for each data source.
1866 * Return 0 on success, -1 on error.
1868 static int update_aberrant_cdps(
1870 rrd_file_t *rrd_file,
1871 unsigned long rra_begin,
1872 unsigned long *rra_current,
1873 unsigned long elapsed_pdp_st,
1874 rrd_value_t *pdp_temp,
1875 rrd_value_t **seasonal_coef)
1877 unsigned long rra_idx, ds_idx, j;
1879 /* number of PDP steps since the last update that
1880 * are assigned to the first CDP to be generated
1881 * since the last update. */
1882 unsigned short scratch_idx;
1883 unsigned long rra_start;
1884 enum cf_en current_cf;
1886 /* this loop is only entered if elapsed_pdp_st < 3 */
1887 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1888 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1889 rra_start = rra_begin;
1890 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1891 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1892 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1893 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1894 if (scratch_idx == CDP_primary_val) {
1895 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1896 elapsed_pdp_st + 1, seasonal_coef);
1898 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1899 elapsed_pdp_st + 2, seasonal_coef);
1901 *rra_current = rrd_tell(rrd_file);
1903 if (rrd_test_error())
1905 /* loop over data soures within each RRA */
1906 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1907 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1908 rra_idx * (rrd->stat_head->ds_cnt) +
1909 ds_idx, rra_idx, ds_idx, scratch_idx,
1913 rra_start += rrd->rra_def[rra_idx].row_cnt
1914 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1921 * Move sequentially through the file, writing one RRA at a time. Note this
1922 * architecture divorces the computation of CDP with flushing updated RRA
1925 * Return 0 on success, -1 on error.
1927 static int write_to_rras(
1929 rrd_file_t *rrd_file,
1930 unsigned long *rra_step_cnt,
1931 unsigned long rra_begin,
1932 unsigned long *rra_current,
1933 time_t current_time,
1934 unsigned long *skip_update,
1935 rrd_info_t ** pcdp_summary)
1937 unsigned long rra_idx;
1938 unsigned long rra_start;
1939 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1940 time_t rra_time = 0; /* time of update for a RRA */
1942 /* Ready to write to disk */
1943 rra_start = rra_begin;
1944 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1945 /* skip unless there's something to write */
1946 if (rra_step_cnt[rra_idx]) {
1947 /* write the first row */
1949 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1951 rrd->rra_ptr[rra_idx].cur_row++;
1952 if (rrd->rra_ptr[rra_idx].cur_row >=
1953 rrd->rra_def[rra_idx].row_cnt)
1954 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1955 /* position on the first row */
1956 rra_pos_tmp = rra_start +
1957 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1958 sizeof(rrd_value_t);
1959 if (rra_pos_tmp != *rra_current) {
1960 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1961 rrd_set_error("seek error in rrd");
1964 *rra_current = rra_pos_tmp;
1967 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1969 if (!skip_update[rra_idx]) {
1970 if (*pcdp_summary != NULL) {
1971 rra_time = (current_time - current_time
1972 % (rrd->rra_def[rra_idx].pdp_cnt *
1973 rrd->stat_head->pdp_step))
1975 ((rra_step_cnt[rra_idx] -
1976 1) * rrd->rra_def[rra_idx].pdp_cnt *
1977 rrd->stat_head->pdp_step);
1980 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1981 pcdp_summary, rra_time) == -1)
1985 /* write other rows of the bulk update, if any */
1986 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1987 if (++rrd->rra_ptr[rra_idx].cur_row ==
1988 rrd->rra_def[rra_idx].row_cnt) {
1991 "Wraparound for RRA %s, %lu updates left\n",
1992 rrd->rra_def[rra_idx].cf_nam,
1993 rra_step_cnt[rra_idx] - 1);
1996 rrd->rra_ptr[rra_idx].cur_row = 0;
1997 /* seek back to beginning of current rra */
1998 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1999 rrd_set_error("seek error in rrd");
2003 fprintf(stderr, " -- Wraparound Postseek %ld\n",
2006 *rra_current = rra_start;
2008 if (!skip_update[rra_idx]) {
2009 if (*pcdp_summary != NULL) {
2010 rra_time = (current_time - current_time
2011 % (rrd->rra_def[rra_idx].pdp_cnt *
2012 rrd->stat_head->pdp_step))
2014 ((rra_step_cnt[rra_idx] -
2015 2) * rrd->rra_def[rra_idx].pdp_cnt *
2016 rrd->stat_head->pdp_step);
2018 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2019 CDP_secondary_val, pcdp_summary,
2025 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2026 sizeof(rrd_value_t);
2033 * Write out one row of values (one value per DS) to the archive.
2035 * Returns 0 on success, -1 on error.
2037 static int write_RRA_row(
2038 rrd_file_t *rrd_file,
2040 unsigned long rra_idx,
2041 unsigned long *rra_current,
2042 unsigned short CDP_scratch_idx,
2043 rrd_info_t ** pcdp_summary,
2046 unsigned long ds_idx, cdp_idx;
2049 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2050 /* compute the cdp index */
2051 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2053 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2054 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2055 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2057 if (*pcdp_summary != NULL) {
2058 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2059 /* append info to the return hash */
2060 *pcdp_summary = rrd_info_push(*pcdp_summary,
2062 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2063 rrd->rra_def[rra_idx].cf_nam,
2064 rrd->rra_def[rra_idx].pdp_cnt,
2065 rrd->ds_def[ds_idx].ds_nam),
2068 if (rrd_write(rrd_file,
2069 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2070 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2071 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2074 *rra_current += sizeof(rrd_value_t);
2080 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2082 * Returns 0 on success, -1 otherwise
2084 static int smooth_all_rras(
2086 rrd_file_t *rrd_file,
2087 unsigned long rra_begin)
2089 unsigned long rra_start = rra_begin;
2090 unsigned long rra_idx;
2092 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2093 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2094 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2096 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2098 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2099 if (rrd_test_error())
2102 rra_start += rrd->rra_def[rra_idx].row_cnt
2103 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2110 * Flush changes to disk (unless we're using mmap)
2112 * Returns 0 on success, -1 otherwise
2114 static int write_changes_to_disk(
2116 rrd_file_t *rrd_file,
2119 /* we just need to write back the live header portion now */
2120 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2121 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2122 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2124 rrd_set_error("seek rrd for live header writeback");
2128 if (rrd_write(rrd_file, rrd->live_head,
2129 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2130 rrd_set_error("rrd_write live_head to rrd");
2134 if (rrd_write(rrd_file, rrd->legacy_last_up,
2135 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2136 rrd_set_error("rrd_write live_head to rrd");
2142 if (rrd_write(rrd_file, rrd->pdp_prep,
2143 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2144 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2145 rrd_set_error("rrd_write pdp_prep to rrd");
2149 if (rrd_write(rrd_file, rrd->cdp_prep,
2150 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2151 rrd->stat_head->ds_cnt)
2152 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2153 rrd->stat_head->ds_cnt)) {
2155 rrd_set_error("rrd_write cdp_prep to rrd");
2159 if (rrd_write(rrd_file, rrd->rra_ptr,
2160 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2161 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2162 rrd_set_error("rrd_write rra_ptr to rrd");