2 /*****************************************************************************
3 * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 unsigned long *rra_current,
102 time_t *current_time,
103 unsigned long *current_time_usec,
104 rrd_value_t *pdp_temp,
105 rrd_value_t *pdp_new,
106 unsigned long *rra_step_cnt,
109 unsigned long tmpl_cnt,
110 rrd_info_t ** pcdp_summary,
112 unsigned long *skip_update,
113 int *schedule_smooth);
120 unsigned long tmpl_cnt,
121 time_t *current_time,
122 unsigned long *current_time_usec,
125 static int get_time_from_reading(
129 time_t *current_time,
130 unsigned long *current_time_usec,
133 static int update_pdp_prep(
136 rrd_value_t *pdp_new,
139 static int calculate_elapsed_steps(
141 unsigned long current_time,
142 unsigned long current_time_usec,
146 unsigned long *proc_pdp_cnt);
148 static void simple_update(
151 rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
158 unsigned long elapsed_pdp_st,
159 rrd_value_t *pdp_new,
160 rrd_value_t *pdp_temp);
162 static int process_pdp_st(
164 unsigned long ds_idx,
169 rrd_value_t *pdp_new,
170 rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
174 unsigned long *rra_step_cnt,
175 unsigned long rra_begin,
176 rrd_file_t *rrd_file,
177 unsigned long elapsed_pdp_st,
178 unsigned long proc_pdp_cnt,
179 rrd_value_t **last_seasonal_coef,
180 rrd_value_t **seasonal_coef,
181 rrd_value_t *pdp_temp,
182 unsigned long *rra_current,
183 unsigned long *skip_update,
184 int *schedule_smooth);
186 static int do_schedule_smooth(
188 unsigned long rra_idx,
189 unsigned long elapsed_pdp_st);
191 static int update_cdp_prep(
193 unsigned long elapsed_pdp_st,
194 unsigned long start_pdp_offset,
195 unsigned long *rra_step_cnt,
197 rrd_value_t *pdp_temp,
198 rrd_value_t *last_seasonal_coef,
199 rrd_value_t *seasonal_coef,
202 static void update_cdp(
205 rrd_value_t pdp_temp_val,
206 unsigned long rra_step_cnt,
207 unsigned long elapsed_pdp_st,
208 unsigned long start_pdp_offset,
209 unsigned long pdp_cnt,
214 static void initialize_cdp_val(
217 rrd_value_t pdp_temp_val,
218 unsigned long elapsed_pdp_st,
219 unsigned long start_pdp_offset,
220 unsigned long pdp_cnt);
222 static void reset_cdp(
224 unsigned long elapsed_pdp_st,
225 rrd_value_t *pdp_temp,
226 rrd_value_t *last_seasonal_coef,
227 rrd_value_t *seasonal_coef,
231 enum cf_en current_cf);
233 static rrd_value_t initialize_average_carry_over(
234 rrd_value_t pdp_temp_val,
235 unsigned long elapsed_pdp_st,
236 unsigned long start_pdp_offset,
237 unsigned long pdp_cnt);
239 static rrd_value_t calculate_cdp_val(
241 rrd_value_t pdp_temp_val,
242 unsigned long elapsed_pdp_st,
247 static int update_aberrant_cdps(
249 rrd_file_t *rrd_file,
250 unsigned long rra_begin,
251 unsigned long *rra_current,
252 unsigned long elapsed_pdp_st,
253 rrd_value_t *pdp_temp,
254 rrd_value_t **seasonal_coef);
256 static int write_to_rras(
258 rrd_file_t *rrd_file,
259 unsigned long *rra_step_cnt,
260 unsigned long rra_begin,
261 unsigned long *rra_current,
263 unsigned long *skip_update,
264 rrd_info_t ** pcdp_summary);
266 static int write_RRA_row(
267 rrd_file_t *rrd_file,
269 unsigned long rra_idx,
270 unsigned long *rra_current,
271 unsigned short CDP_scratch_idx,
272 rrd_info_t ** pcdp_summary,
275 static int smooth_all_rras(
277 rrd_file_t *rrd_file,
278 unsigned long rra_begin);
281 static int write_changes_to_disk(
283 rrd_file_t *rrd_file,
288 * normalize time as returned by gettimeofday. usec part must
291 static inline void normalize_time(
294 if (t->tv_usec < 0) {
301 * Sets current_time and current_time_usec based on the current time.
302 * current_time_usec is set to 0 if the version number is 1 or 2.
304 static inline void initialize_time(
305 time_t *current_time,
306 unsigned long *current_time_usec,
309 struct timeval tmp_time; /* used for time conversion */
311 gettimeofday(&tmp_time, 0);
312 normalize_time(&tmp_time);
313 *current_time = tmp_time.tv_sec;
315 *current_time_usec = tmp_time.tv_usec;
317 *current_time_usec = 0;
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
323 rrd_info_t *rrd_update_v(
328 rrd_info_t *result = NULL;
330 struct option long_options[] = {
331 {"template", required_argument, 0, 't'},
337 opterr = 0; /* initialize getopt */
340 int option_index = 0;
343 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
354 rrd_set_error("unknown option '%s'", argv[optind - 1]);
359 /* need at least 2 arguments: filename, data. */
360 if (argc - optind < 2) {
361 rrd_set_error("Not enough arguments");
365 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366 rc.u_int = _rrd_update(argv[optind], tmplt,
368 (const char **) (argv + optind + 1), result);
369 result->value.u_int = rc.u_int;
378 struct option long_options[] = {
379 {"template", required_argument, 0, 't'},
380 {"cache", optional_argument, 0, 'c'},
381 {"nocache", no_argument , 0, 'n'},
382 {"daemon", required_argument, 0, 'd'},
385 int option_index = 0;
392 opterr = 0; /* initialize getopt */
395 opt = getopt_long(argc, argv, "t:cnd:", long_options, &option_index);
402 tmplt = strdup(optarg);
408 daemon = strdup (optarg);
411 rrd_set_error("strdup failed.");
417 rrd_set_error("unknown option '%s'", argv[optind - 1]);
422 /* need at least 2 arguments: filename, data. */
423 if (argc - optind < 2) {
424 rrd_set_error("Not enough arguments");
428 if ((tmplt != NULL) && (daemon != NULL))
430 rrd_set_error("The caching daemon cannot be used together with "
439 status = rrdc_connect (daemon);
442 rrd_set_error("Unable to connect to daemon: %s",
445 : rrd_strerror (status));
449 status = rrdc_update (/* file = */ argv[optind],
450 /* values_num = */ argc - optind - 1,
451 /* values = */ (void *) (argv + optind + 1));
454 rrd_set_error("Failed sending the values to the daemon: %s",
457 : rrd_strerror (status));
462 } /* if (daemon != NULL) */
464 rc = rrd_update_r(argv[optind], tmplt,
465 argc - optind - 1, (const char **) (argv + optind + 1));
481 const char *filename,
486 return _rrd_update(filename, tmplt, argc, argv, NULL);
490 const char *filename,
494 rrd_info_t * pcdp_summary)
499 unsigned long rra_begin; /* byte pointer to the rra
500 * area in the rrd file. this
501 * pointer never changes value */
502 unsigned long rra_current; /* byte pointer to the current write
503 * spot in the rrd file. */
504 rrd_value_t *pdp_new; /* prepare the incoming data to be added
505 * to the existing entry */
506 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
507 * to the cdp values */
509 long *tmpl_idx; /* index representing the settings
510 * transported by the tmplt index */
511 unsigned long tmpl_cnt = 2; /* time and data */
513 time_t current_time = 0;
514 unsigned long current_time_usec = 0; /* microseconds part of current time */
516 int schedule_smooth = 0;
518 /* number of elapsed PDP steps since last update */
519 unsigned long *rra_step_cnt = NULL;
521 int version; /* rrd version */
522 rrd_file_t *rrd_file;
523 char *arg_copy; /* for processing the argv */
524 unsigned long *skip_update; /* RRAs to advance but not write */
526 /* need at least 1 arguments: data. */
528 rrd_set_error("Not enough arguments");
532 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
535 /* We are now at the beginning of the rra's */
536 rra_current = rra_begin = rrd_file->header_len;
538 version = atoi(rrd.stat_head->version);
540 initialize_time(¤t_time, ¤t_time_usec, version);
542 /* get exclusive lock to whole file.
543 * lock gets removed when we close the file.
545 if (rrd_lock(rrd_file) != 0) {
546 rrd_set_error("could not lock RRD");
550 if (allocate_data_structures(&rrd, &updvals,
551 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552 &rra_step_cnt, &skip_update,
557 /* loop through the arguments. */
558 for (arg_i = 0; arg_i < argc; arg_i++) {
559 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560 rrd_set_error("failed duplication argv entry");
563 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
564 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
565 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566 &pcdp_summary, version, skip_update,
567 &schedule_smooth) == -1) {
576 /* if we got here and if there is an error and if the file has not been
577 * written to, then close things up and return. */
578 if (rrd_test_error()) {
579 goto err_free_structures;
582 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
583 goto err_free_structures;
587 /* calling the smoothing code here guarantees at most one smoothing
588 * operation per rrd_update call. Unfortunately, it is possible with bulk
589 * updates, or a long-delayed update for smoothing to occur off-schedule.
590 * This really isn't critical except during the burn-in cycles. */
591 if (schedule_smooth) {
592 smooth_all_rras(&rrd, rrd_file, rra_begin);
595 /* rrd_dontneed(rrd_file,&rrd); */
621 * get exclusive lock to whole file.
622 * lock gets removed when we close the file
624 * returns 0 on success
632 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
635 if (_fstat(file->fd, &st) == 0) {
636 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
643 lock.l_type = F_WRLCK; /* exclusive write lock */
644 lock.l_len = 0; /* whole file */
645 lock.l_start = 0; /* start of file */
646 lock.l_whence = SEEK_SET; /* end of file */
648 rcstat = fcntl(file->fd, F_SETLK, &lock);
656 * Allocate some important arrays used, and initialize the template.
658 * When it returns, either all of the structures are allocated
659 * or none of them are.
661 * Returns 0 on success, -1 on error.
663 static int allocate_data_structures(
666 rrd_value_t **pdp_temp,
669 unsigned long *tmpl_cnt,
670 unsigned long **rra_step_cnt,
671 unsigned long **skip_update,
672 rrd_value_t **pdp_new)
675 if ((*updvals = (char **) malloc(sizeof(char *)
676 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
677 rrd_set_error("allocating updvals pointer array.");
680 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
681 * rrd->stat_head->ds_cnt)) ==
683 rrd_set_error("allocating pdp_temp.");
684 goto err_free_updvals;
686 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
688 rrd->stat_head->rra_cnt)) ==
690 rrd_set_error("allocating skip_update.");
691 goto err_free_pdp_temp;
693 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
694 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
695 rrd_set_error("allocating tmpl_idx.");
696 goto err_free_skip_update;
698 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
701 rra_cnt))) == NULL) {
702 rrd_set_error("allocating rra_step_cnt.");
703 goto err_free_tmpl_idx;
706 /* initialize tmplt redirector */
707 /* default config example (assume DS 1 is a CDEF DS)
708 tmpl_idx[0] -> 0; (time)
709 tmpl_idx[1] -> 1; (DS 0)
710 tmpl_idx[2] -> 3; (DS 2)
711 tmpl_idx[3] -> 4; (DS 3) */
712 (*tmpl_idx)[0] = 0; /* time */
713 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
714 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
715 (*tmpl_idx)[ii++] = i;
720 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
721 goto err_free_rra_step_cnt;
725 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
726 * rrd->stat_head->ds_cnt)) == NULL) {
727 rrd_set_error("allocating pdp_new.");
728 goto err_free_rra_step_cnt;
733 err_free_rra_step_cnt:
737 err_free_skip_update:
747 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
749 * Returns 0 on success.
751 static int parse_template(
754 unsigned long *tmpl_cnt,
757 char *dsname, *tmplt_copy;
758 unsigned int tmpl_len, i;
761 *tmpl_cnt = 1; /* the first entry is the time */
763 /* we should work on a writeable copy here */
764 if ((tmplt_copy = strdup(tmplt)) == NULL) {
765 rrd_set_error("error copying tmplt '%s'", tmplt);
771 tmpl_len = strlen(tmplt_copy);
772 for (i = 0; i <= tmpl_len; i++) {
773 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
774 tmplt_copy[i] = '\0';
775 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
776 rrd_set_error("tmplt contains more DS definitions than RRD");
778 goto out_free_tmpl_copy;
780 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
781 rrd_set_error("unknown DS name '%s'", dsname);
783 goto out_free_tmpl_copy;
785 /* go to the next entry on the tmplt_copy */
787 dsname = &tmplt_copy[i + 1];
797 * Parse an update string, updates the primary data points (PDPs)
798 * and consolidated data points (CDPs), and writes changes to the RRAs.
800 * Returns 0 on success, -1 on error.
802 static int process_arg(
805 rrd_file_t *rrd_file,
806 unsigned long rra_begin,
807 unsigned long *rra_current,
808 time_t *current_time,
809 unsigned long *current_time_usec,
810 rrd_value_t *pdp_temp,
811 rrd_value_t *pdp_new,
812 unsigned long *rra_step_cnt,
815 unsigned long tmpl_cnt,
816 rrd_info_t ** pcdp_summary,
818 unsigned long *skip_update,
819 int *schedule_smooth)
821 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
823 /* a vector of future Holt-Winters seasonal coefs */
824 unsigned long elapsed_pdp_st;
826 double interval, pre_int, post_int; /* interval between this and
828 unsigned long proc_pdp_cnt;
829 unsigned long rra_start;
831 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
832 current_time, current_time_usec, version) == -1) {
835 /* seek to the beginning of the rra's */
836 if (*rra_current != rra_begin) {
838 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
839 rrd_set_error("seek error in rrd");
843 *rra_current = rra_begin;
845 rra_start = rra_begin;
847 interval = (double) (*current_time - rrd->live_head->last_up)
848 + (double) ((long) *current_time_usec -
849 (long) rrd->live_head->last_up_usec) / 1e6f;
851 /* process the data sources and update the pdp_prep
852 * area accordingly */
853 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
857 elapsed_pdp_st = calculate_elapsed_steps(rrd,
859 *current_time_usec, interval,
863 /* has a pdp_st moment occurred since the last run ? */
864 if (elapsed_pdp_st == 0) {
865 /* no we have not passed a pdp_st moment. therefore update is simple */
866 simple_update(rrd, interval, pdp_new);
868 /* an pdp_st has occurred. */
869 if (process_all_pdp_st(rrd, interval,
871 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
874 if (update_all_cdp_prep(rrd, rra_step_cnt,
880 pdp_temp, rra_current,
881 skip_update, schedule_smooth) == -1) {
882 goto err_free_coefficients;
884 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
885 elapsed_pdp_st, pdp_temp,
886 &seasonal_coef) == -1) {
887 goto err_free_coefficients;
889 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
890 rra_current, *current_time, skip_update,
891 pcdp_summary) == -1) {
892 goto err_free_coefficients;
894 } /* endif a pdp_st has occurred */
895 rrd->live_head->last_up = *current_time;
896 rrd->live_head->last_up_usec = *current_time_usec;
899 *rrd->legacy_last_up = rrd->live_head->last_up;
902 free(last_seasonal_coef);
905 err_free_coefficients:
907 free(last_seasonal_coef);
912 * Parse a DS string (time + colon-separated values), storing the
913 * results in current_time, current_time_usec, and updvals.
915 * Returns 0 on success, -1 on error.
922 unsigned long tmpl_cnt,
923 time_t *current_time,
924 unsigned long *current_time_usec,
932 /* initialize all ds input to unknown except the first one
933 which has always got to be set */
934 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
937 /* separate all ds elements; first must be examined separately
938 due to alternate time syntax */
939 if ((p = strchr(input, '@')) != NULL) {
941 } else if ((p = strchr(input, ':')) != NULL) {
944 rrd_set_error("expected timestamp not found in data source from %s",
950 updvals[tmpl_idx[i++]] = p + 1;
955 updvals[tmpl_idx[i++]] = p + 1;
961 rrd_set_error("expected %lu data source readings (got %lu) from %s",
962 tmpl_cnt - 1, i, input);
966 if (get_time_from_reading(rrd, timesyntax, updvals,
967 current_time, current_time_usec,
975 * Parse the time in a DS string, store it in current_time and
976 * current_time_usec and verify that it's later than the last
977 * update for this DS.
979 * Returns 0 on success, -1 on error.
981 static int get_time_from_reading(
985 time_t *current_time,
986 unsigned long *current_time_usec,
990 char *parsetime_error = NULL;
992 rrd_time_value_t ds_tv;
993 struct timeval tmp_time; /* used for time conversion */
995 /* get the time from the reading ... handle N */
996 if (timesyntax == '@') { /* at-style */
997 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
998 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1001 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1002 ds_tv.type == RELATIVE_TO_START_TIME) {
1003 rrd_set_error("specifying time relative to the 'start' "
1004 "or 'end' makes no sense here: %s", updvals[0]);
1007 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1008 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1009 } else if (strcmp(updvals[0], "N") == 0) {
1010 gettimeofday(&tmp_time, 0);
1011 normalize_time(&tmp_time);
1012 *current_time = tmp_time.tv_sec;
1013 *current_time_usec = tmp_time.tv_usec;
1015 old_locale = setlocale(LC_NUMERIC, "C");
1016 tmp = strtod(updvals[0], 0);
1017 setlocale(LC_NUMERIC, old_locale);
1018 *current_time = floor(tmp);
1019 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1021 /* dont do any correction for old version RRDs */
1023 *current_time_usec = 0;
1025 if (*current_time < rrd->live_head->last_up ||
1026 (*current_time == rrd->live_head->last_up &&
1027 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1028 rrd_set_error("illegal attempt to update using time %ld when "
1029 "last update time is %ld (minimum one second step)",
1030 *current_time, rrd->live_head->last_up);
1037 * Update pdp_new by interpreting the updvals according to the DS type
1038 * (COUNTER, GAUGE, etc.).
1040 * Returns 0 on success, -1 on error.
1042 static int update_pdp_prep(
1045 rrd_value_t *pdp_new,
1048 unsigned long ds_idx;
1050 char *endptr; /* used in the conversion */
1053 enum dst_en dst_idx;
1055 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1056 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1058 /* make sure we do not build diffs with old last_ds values */
1059 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1060 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1061 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1064 /* NOTE: DST_CDEF should never enter this if block, because
1065 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1066 * accidently specified a value for the DST_CDEF. To handle this case,
1067 * an extra check is required. */
1069 if ((updvals[ds_idx + 1][0] != 'U') &&
1070 (dst_idx != DST_CDEF) &&
1071 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1074 /* pdp_new contains rate * time ... eg the bytes transferred during
1075 * the interval. Doing it this way saves a lot of math operations
1080 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1081 if ((updvals[ds_idx + 1][ii] < '0'
1082 || updvals[ds_idx + 1][ii] > '9')
1083 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1084 rrd_set_error("not a simple integer: '%s'",
1085 updvals[ds_idx + 1]);
1089 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1091 rrd_diff(updvals[ds_idx + 1],
1092 rrd->pdp_prep[ds_idx].last_ds);
1093 if (dst_idx == DST_COUNTER) {
1094 /* simple overflow catcher. This will fail
1095 * terribly for non 32 or 64 bit counters
1096 * ... are there any others in SNMP land?
1098 if (pdp_new[ds_idx] < (double) 0.0)
1099 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1100 if (pdp_new[ds_idx] < (double) 0.0)
1101 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1103 rate = pdp_new[ds_idx] / interval;
1105 pdp_new[ds_idx] = DNAN;
1109 old_locale = setlocale(LC_NUMERIC, "C");
1111 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1112 setlocale(LC_NUMERIC, old_locale);
1114 rrd_set_error("converting '%s' to float: %s",
1115 updvals[ds_idx + 1], rrd_strerror(errno));
1118 if (endptr[0] != '\0') {
1120 ("conversion of '%s' to float not complete: tail '%s'",
1121 updvals[ds_idx + 1], endptr);
1124 rate = pdp_new[ds_idx] / interval;
1128 old_locale = setlocale(LC_NUMERIC, "C");
1130 strtod(updvals[ds_idx + 1], &endptr) * interval;
1131 setlocale(LC_NUMERIC, old_locale);
1133 rrd_set_error("converting '%s' to float: %s",
1134 updvals[ds_idx + 1], rrd_strerror(errno));
1137 if (endptr[0] != '\0') {
1139 ("conversion of '%s' to float not complete: tail '%s'",
1140 updvals[ds_idx + 1], endptr);
1143 rate = pdp_new[ds_idx] / interval;
1146 rrd_set_error("rrd contains unknown DS type : '%s'",
1147 rrd->ds_def[ds_idx].dst);
1150 /* break out of this for loop if the error string is set */
1151 if (rrd_test_error()) {
1154 /* make sure pdp_temp is neither too large or too small
1155 * if any of these occur it becomes unknown ...
1156 * sorry folks ... */
1158 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1159 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1160 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1161 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1162 pdp_new[ds_idx] = DNAN;
1165 /* no news is news all the same */
1166 pdp_new[ds_idx] = DNAN;
1170 /* make a copy of the command line argument for the next run */
1172 fprintf(stderr, "prep ds[%lu]\t"
1176 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1179 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1181 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1187 * How many PDP steps have elapsed since the last update? Returns the answer,
1188 * and stores the time between the last update and the last PDP in pre_time,
1189 * and the time between the last PDP and the current time in post_int.
1191 static int calculate_elapsed_steps(
1193 unsigned long current_time,
1194 unsigned long current_time_usec,
1198 unsigned long *proc_pdp_cnt)
1200 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1201 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1203 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1204 * when it was last updated */
1205 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1207 /* when was the current pdp started */
1208 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1209 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1211 /* when did the last pdp_st occur */
1212 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1213 occu_pdp_st = current_time - occu_pdp_age;
1215 if (occu_pdp_st > proc_pdp_st) {
1216 /* OK we passed the pdp_st moment */
1217 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1218 * occurred before the latest
1220 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1221 *post_int = occu_pdp_age; /* how much after it */
1222 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1224 *pre_int = interval;
1228 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1231 printf("proc_pdp_age %lu\t"
1233 "occu_pfp_age %lu\t"
1237 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1238 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1241 /* compute the number of elapsed pdp_st moments */
1242 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1246 * Increment the PDP values by the values in pdp_new, or else initialize them.
1248 static void simple_update(
1251 rrd_value_t *pdp_new)
1255 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1256 if (isnan(pdp_new[i])) {
1257 /* this is not really accurate if we use subsecond data arrival time
1258 should have thought of it when going subsecond resolution ...
1259 sorry next format change we will have it! */
1260 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1263 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1264 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1266 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1275 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1276 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1282 * Call process_pdp_st for each DS.
1284 * Returns 0 on success, -1 on error.
1286 static int process_all_pdp_st(
1291 unsigned long elapsed_pdp_st,
1292 rrd_value_t *pdp_new,
1293 rrd_value_t *pdp_temp)
1295 unsigned long ds_idx;
1297 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1298 rate*seconds which occurred up to the last run.
1299 pdp_new[] contains rate*seconds from the latest run.
1300 pdp_temp[] will contain the rate for cdp */
1302 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1303 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1304 elapsed_pdp_st * rrd->stat_head->pdp_step,
1305 pdp_new, pdp_temp) == -1) {
1309 fprintf(stderr, "PDP UPD ds[%lu]\t"
1310 "elapsed_pdp_st %lu\t"
1313 "new_unkn_sec %5lu\n",
1317 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1318 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1325 * Process an update that occurs after one of the PDP moments.
1326 * Increments the PDP value, sets NAN if time greater than the
1327 * heartbeats have elapsed, processes CDEFs.
1329 * Returns 0 on success, -1 on error.
1331 static int process_pdp_st(
1333 unsigned long ds_idx,
1337 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1338 rrd_value_t *pdp_new,
1339 rrd_value_t *pdp_temp)
1343 /* update pdp_prep to the current pdp_st. */
1344 double pre_unknown = 0.0;
1345 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1346 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1348 rpnstack_t rpnstack; /* used for COMPUTE DS */
1350 rpnstack_init(&rpnstack);
1353 if (isnan(pdp_new[ds_idx])) {
1354 /* a final bit of unknown to be added before calculation
1355 we use a temporary variable for this so that we
1356 don't have to turn integer lines before using the value */
1357 pre_unknown = pre_int;
1359 if (isnan(scratch[PDP_val].u_val)) {
1360 scratch[PDP_val].u_val = 0;
1362 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1365 /* if too much of the pdp_prep is unknown we dump it */
1366 /* if the interval is larger thatn mrhb we get NAN */
1367 if ((interval > mrhb) ||
1368 (rrd->stat_head->pdp_step / 2.0 <
1369 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1370 pdp_temp[ds_idx] = DNAN;
1372 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1373 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1377 /* process CDEF data sources; remember each CDEF DS can
1378 * only reference other DS with a lower index number */
1379 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1383 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1384 /* substitute data values for OP_VARIABLE nodes */
1385 for (i = 0; rpnp[i].op != OP_END; i++) {
1386 if (rpnp[i].op == OP_VARIABLE) {
1387 rpnp[i].op = OP_NUMBER;
1388 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1391 /* run the rpn calculator */
1392 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1394 rpnstack_free(&rpnstack);
1399 /* make pdp_prep ready for the next run */
1400 if (isnan(pdp_new[ds_idx])) {
1401 /* this is not realy accurate if we use subsecond data arival time
1402 should have thought of it when going subsecond resolution ...
1403 sorry next format change we will have it! */
1404 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1405 scratch[PDP_val].u_val = DNAN;
1407 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1408 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1410 rpnstack_free(&rpnstack);
1415 * Iterate over all the RRAs for a given DS and:
1416 * 1. Decide whether to schedule a smooth later
1417 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1420 * Returns 0 on success, -1 on error
1422 static int update_all_cdp_prep(
1424 unsigned long *rra_step_cnt,
1425 unsigned long rra_begin,
1426 rrd_file_t *rrd_file,
1427 unsigned long elapsed_pdp_st,
1428 unsigned long proc_pdp_cnt,
1429 rrd_value_t **last_seasonal_coef,
1430 rrd_value_t **seasonal_coef,
1431 rrd_value_t *pdp_temp,
1432 unsigned long *rra_current,
1433 unsigned long *skip_update,
1434 int *schedule_smooth)
1436 unsigned long rra_idx;
1438 /* index into the CDP scratch array */
1439 enum cf_en current_cf;
1440 unsigned long rra_start;
1442 /* number of rows to be updated in an RRA for a data value. */
1443 unsigned long start_pdp_offset;
1445 rra_start = rra_begin;
1446 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1447 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1449 rrd->rra_def[rra_idx].pdp_cnt -
1450 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1451 skip_update[rra_idx] = 0;
1452 if (start_pdp_offset <= elapsed_pdp_st) {
1453 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1454 rrd->rra_def[rra_idx].pdp_cnt + 1;
1456 rra_step_cnt[rra_idx] = 0;
1459 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1460 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1461 * so that they will be correct for the next observed value; note that for
1462 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1463 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1464 if (rra_step_cnt[rra_idx] > 1) {
1465 skip_update[rra_idx] = 1;
1466 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1467 elapsed_pdp_st, last_seasonal_coef);
1468 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1469 elapsed_pdp_st + 1, seasonal_coef);
1471 /* periodically run a smoother for seasonal effects */
1472 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1475 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1476 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1477 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1480 *schedule_smooth = 1;
1482 *rra_current = rrd_tell(rrd_file);
1484 if (rrd_test_error())
1488 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1489 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1490 current_cf) == -1) {
1494 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1495 sizeof(rrd_value_t);
1501 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1503 static int do_schedule_smooth(
1505 unsigned long rra_idx,
1506 unsigned long elapsed_pdp_st)
1508 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1509 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1510 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1511 unsigned long seasonal_smooth_idx =
1512 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1513 unsigned long *init_seasonal =
1514 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1516 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1517 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1518 * really an RRA level, not a data source within RRA level parameter, but
1519 * the rra_def is read only for rrd_update (not flushed to disk). */
1520 if (*init_seasonal > BURNIN_CYCLES) {
1521 /* someone has no doubt invented a trick to deal with this wrap around,
1522 * but at least this code is clear. */
1523 if (seasonal_smooth_idx > cur_row) {
1524 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1525 * between PDP and CDP */
1526 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1528 /* can't rely on negative numbers because we are working with
1529 * unsigned values */
1530 return (cur_row + elapsed_pdp_st >= row_cnt
1531 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1533 /* mark off one of the burn-in cycles */
1534 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1538 * For a given RRA, iterate over the data sources and call the appropriate
1539 * consolidation function.
1541 * Returns 0 on success, -1 on error.
1543 static int update_cdp_prep(
1545 unsigned long elapsed_pdp_st,
1546 unsigned long start_pdp_offset,
1547 unsigned long *rra_step_cnt,
1549 rrd_value_t *pdp_temp,
1550 rrd_value_t *last_seasonal_coef,
1551 rrd_value_t *seasonal_coef,
1554 unsigned long ds_idx, cdp_idx;
1556 /* update CDP_PREP areas */
1557 /* loop over data soures within each RRA */
1558 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1560 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1562 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1563 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1564 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1565 elapsed_pdp_st, start_pdp_offset,
1566 rrd->rra_def[rra_idx].pdp_cnt,
1567 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1570 /* Nothing to consolidate if there's one PDP per CDP. However, if
1571 * we've missed some PDPs, let's update null counters etc. */
1572 if (elapsed_pdp_st > 2) {
1573 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1574 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1579 if (rrd_test_error())
1581 } /* endif data sources loop */
1586 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1587 * primary value, secondary value, and # of unknowns.
1589 static void update_cdp(
1592 rrd_value_t pdp_temp_val,
1593 unsigned long rra_step_cnt,
1594 unsigned long elapsed_pdp_st,
1595 unsigned long start_pdp_offset,
1596 unsigned long pdp_cnt,
1601 /* shorthand variables */
1602 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1603 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1604 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1605 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1608 /* If we are in this block, as least 1 CDP value will be written to
1609 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1610 * to be written, then the "fill in" value is the CDP_secondary_val
1612 if (isnan(pdp_temp_val)) {
1613 *cdp_unkn_pdp_cnt += start_pdp_offset;
1614 *cdp_secondary_val = DNAN;
1616 /* CDP_secondary value is the RRA "fill in" value for intermediary
1617 * CDP data entries. No matter the CF, the value is the same because
1618 * the average, max, min, and last of a list of identical values is
1619 * the same, namely, the value itself. */
1620 *cdp_secondary_val = pdp_temp_val;
1623 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1624 *cdp_primary_val = DNAN;
1625 if (current_cf == CF_AVERAGE) {
1627 initialize_average_carry_over(pdp_temp_val,
1629 start_pdp_offset, pdp_cnt);
1631 *cdp_val = pdp_temp_val;
1634 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1635 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1636 } /* endif meets xff value requirement for a valid value */
1637 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1638 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1639 if (isnan(pdp_temp_val))
1640 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1642 *cdp_unkn_pdp_cnt = 0;
1643 } else { /* rra_step_cnt[i] == 0 */
1646 if (isnan(*cdp_val)) {
1647 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1650 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1654 if (isnan(pdp_temp_val)) {
1655 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1658 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1665 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1666 * on the type of consolidation function.
1668 static void initialize_cdp_val(
1671 rrd_value_t pdp_temp_val,
1672 unsigned long elapsed_pdp_st,
1673 unsigned long start_pdp_offset,
1674 unsigned long pdp_cnt)
1676 rrd_value_t cum_val, cur_val;
1678 switch (current_cf) {
1680 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1681 cur_val = IFDNAN(pdp_temp_val, 0.0);
1682 scratch[CDP_primary_val].u_val =
1683 (cum_val + cur_val * start_pdp_offset) /
1684 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1685 scratch[CDP_val].u_val =
1686 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1687 start_pdp_offset, pdp_cnt);
1690 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1691 cur_val = IFDNAN(pdp_temp_val, -DINF);
1694 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1696 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1702 if (cur_val > cum_val)
1703 scratch[CDP_primary_val].u_val = cur_val;
1705 scratch[CDP_primary_val].u_val = cum_val;
1706 /* initialize carry over value */
1707 scratch[CDP_val].u_val = pdp_temp_val;
1710 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1711 cur_val = IFDNAN(pdp_temp_val, DINF);
1714 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1716 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1722 if (cur_val < cum_val)
1723 scratch[CDP_primary_val].u_val = cur_val;
1725 scratch[CDP_primary_val].u_val = cum_val;
1726 /* initialize carry over value */
1727 scratch[CDP_val].u_val = pdp_temp_val;
1731 scratch[CDP_primary_val].u_val = pdp_temp_val;
1732 /* initialize carry over value */
1733 scratch[CDP_val].u_val = pdp_temp_val;
1739 * Update the consolidation function for Holt-Winters functions as
1740 * well as other functions that don't actually consolidate multiple
1743 static void reset_cdp(
1745 unsigned long elapsed_pdp_st,
1746 rrd_value_t *pdp_temp,
1747 rrd_value_t *last_seasonal_coef,
1748 rrd_value_t *seasonal_coef,
1752 enum cf_en current_cf)
1754 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1756 switch (current_cf) {
1759 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1760 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1763 case CF_DEVSEASONAL:
1764 /* need to update cached seasonal values, so they are consistent
1765 * with the bulk update */
1766 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1767 * CDP_last_deviation are the same. */
1768 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1769 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1773 /* need to update the null_count and last_null_count.
1774 * even do this for non-DNAN pdp_temp because the
1775 * algorithm is not learning from batch updates. */
1776 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1777 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1780 scratch[CDP_primary_val].u_val = DNAN;
1781 scratch[CDP_secondary_val].u_val = DNAN;
1784 /* do not count missed bulk values as failures */
1785 scratch[CDP_primary_val].u_val = 0;
1786 scratch[CDP_secondary_val].u_val = 0;
1787 /* need to reset violations buffer.
1788 * could do this more carefully, but for now, just
1789 * assume a bulk update wipes away all violations. */
1790 erase_violations(rrd, cdp_idx, rra_idx);
1795 static rrd_value_t initialize_average_carry_over(
1796 rrd_value_t pdp_temp_val,
1797 unsigned long elapsed_pdp_st,
1798 unsigned long start_pdp_offset,
1799 unsigned long pdp_cnt)
1801 /* initialize carry over value */
1802 if (isnan(pdp_temp_val)) {
1805 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1809 * Update or initialize a CDP value based on the consolidation
1812 * Returns the new value.
1814 static rrd_value_t calculate_cdp_val(
1815 rrd_value_t cdp_val,
1816 rrd_value_t pdp_temp_val,
1817 unsigned long elapsed_pdp_st,
1828 if (isnan(cdp_val)) {
1829 if (current_cf == CF_AVERAGE) {
1830 pdp_temp_val *= elapsed_pdp_st;
1833 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1834 i, ii, pdp_temp_val);
1836 return pdp_temp_val;
1838 if (current_cf == CF_AVERAGE)
1839 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1840 if (current_cf == CF_MINIMUM)
1841 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1842 if (current_cf == CF_MAXIMUM)
1843 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1845 return pdp_temp_val;
1849 * For each RRA, update the seasonal values and then call update_aberrant_CF
1850 * for each data source.
1852 * Return 0 on success, -1 on error.
1854 static int update_aberrant_cdps(
1856 rrd_file_t *rrd_file,
1857 unsigned long rra_begin,
1858 unsigned long *rra_current,
1859 unsigned long elapsed_pdp_st,
1860 rrd_value_t *pdp_temp,
1861 rrd_value_t **seasonal_coef)
1863 unsigned long rra_idx, ds_idx, j;
1865 /* number of PDP steps since the last update that
1866 * are assigned to the first CDP to be generated
1867 * since the last update. */
1868 unsigned short scratch_idx;
1869 unsigned long rra_start;
1870 enum cf_en current_cf;
1872 /* this loop is only entered if elapsed_pdp_st < 3 */
1873 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1874 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1875 rra_start = rra_begin;
1876 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1877 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1878 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1879 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1880 if (scratch_idx == CDP_primary_val) {
1881 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1882 elapsed_pdp_st + 1, seasonal_coef);
1884 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1885 elapsed_pdp_st + 2, seasonal_coef);
1887 *rra_current = rrd_tell(rrd_file);
1889 if (rrd_test_error())
1891 /* loop over data soures within each RRA */
1892 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1893 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1894 rra_idx * (rrd->stat_head->ds_cnt) +
1895 ds_idx, rra_idx, ds_idx, scratch_idx,
1899 rra_start += rrd->rra_def[rra_idx].row_cnt
1900 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1907 * Move sequentially through the file, writing one RRA at a time. Note this
1908 * architecture divorces the computation of CDP with flushing updated RRA
1911 * Return 0 on success, -1 on error.
1913 static int write_to_rras(
1915 rrd_file_t *rrd_file,
1916 unsigned long *rra_step_cnt,
1917 unsigned long rra_begin,
1918 unsigned long *rra_current,
1919 time_t current_time,
1920 unsigned long *skip_update,
1921 rrd_info_t ** pcdp_summary)
1923 unsigned long rra_idx;
1924 unsigned long rra_start;
1925 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1926 time_t rra_time = 0; /* time of update for a RRA */
1928 /* Ready to write to disk */
1929 rra_start = rra_begin;
1930 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1931 /* skip unless there's something to write */
1932 if (rra_step_cnt[rra_idx]) {
1933 /* write the first row */
1935 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1937 rrd->rra_ptr[rra_idx].cur_row++;
1938 if (rrd->rra_ptr[rra_idx].cur_row >=
1939 rrd->rra_def[rra_idx].row_cnt)
1940 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1941 /* position on the first row */
1942 rra_pos_tmp = rra_start +
1943 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1944 sizeof(rrd_value_t);
1945 if (rra_pos_tmp != *rra_current) {
1946 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1947 rrd_set_error("seek error in rrd");
1950 *rra_current = rra_pos_tmp;
1953 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1955 if (!skip_update[rra_idx]) {
1956 if (*pcdp_summary != NULL) {
1957 rra_time = (current_time - current_time
1958 % (rrd->rra_def[rra_idx].pdp_cnt *
1959 rrd->stat_head->pdp_step))
1961 ((rra_step_cnt[rra_idx] -
1962 1) * rrd->rra_def[rra_idx].pdp_cnt *
1963 rrd->stat_head->pdp_step);
1966 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1967 pcdp_summary, rra_time) == -1)
1971 /* write other rows of the bulk update, if any */
1972 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1973 if (++rrd->rra_ptr[rra_idx].cur_row ==
1974 rrd->rra_def[rra_idx].row_cnt) {
1977 "Wraparound for RRA %s, %lu updates left\n",
1978 rrd->rra_def[rra_idx].cf_nam,
1979 rra_step_cnt[rra_idx] - 1);
1982 rrd->rra_ptr[rra_idx].cur_row = 0;
1983 /* seek back to beginning of current rra */
1984 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1985 rrd_set_error("seek error in rrd");
1989 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1992 *rra_current = rra_start;
1994 if (!skip_update[rra_idx]) {
1995 if (*pcdp_summary != NULL) {
1996 rra_time = (current_time - current_time
1997 % (rrd->rra_def[rra_idx].pdp_cnt *
1998 rrd->stat_head->pdp_step))
2000 ((rra_step_cnt[rra_idx] -
2001 2) * rrd->rra_def[rra_idx].pdp_cnt *
2002 rrd->stat_head->pdp_step);
2004 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2005 CDP_secondary_val, pcdp_summary,
2011 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2012 sizeof(rrd_value_t);
2019 * Write out one row of values (one value per DS) to the archive.
2021 * Returns 0 on success, -1 on error.
2023 static int write_RRA_row(
2024 rrd_file_t *rrd_file,
2026 unsigned long rra_idx,
2027 unsigned long *rra_current,
2028 unsigned short CDP_scratch_idx,
2029 rrd_info_t ** pcdp_summary,
2032 unsigned long ds_idx, cdp_idx;
2035 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2036 /* compute the cdp index */
2037 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2039 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2040 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2041 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2043 if (*pcdp_summary != NULL) {
2044 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2045 /* append info to the return hash */
2046 *pcdp_summary = rrd_info_push(*pcdp_summary,
2048 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2049 rrd->rra_def[rra_idx].cf_nam,
2050 rrd->rra_def[rra_idx].pdp_cnt,
2051 rrd->ds_def[ds_idx].ds_nam),
2054 if (rrd_write(rrd_file,
2055 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2056 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2057 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2060 *rra_current += sizeof(rrd_value_t);
2066 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2068 * Returns 0 on success, -1 otherwise
2070 static int smooth_all_rras(
2072 rrd_file_t *rrd_file,
2073 unsigned long rra_begin)
2075 unsigned long rra_start = rra_begin;
2076 unsigned long rra_idx;
2078 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2079 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2080 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2082 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2084 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2085 if (rrd_test_error())
2088 rra_start += rrd->rra_def[rra_idx].row_cnt
2089 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2096 * Flush changes to disk (unless we're using mmap)
2098 * Returns 0 on success, -1 otherwise
2100 static int write_changes_to_disk(
2102 rrd_file_t *rrd_file,
2105 /* we just need to write back the live header portion now */
2106 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2107 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2108 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2110 rrd_set_error("seek rrd for live header writeback");
2114 if (rrd_write(rrd_file, rrd->live_head,
2115 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2116 rrd_set_error("rrd_write live_head to rrd");
2120 if (rrd_write(rrd_file, rrd->legacy_last_up,
2121 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2122 rrd_set_error("rrd_write live_head to rrd");
2128 if (rrd_write(rrd_file, rrd->pdp_prep,
2129 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2130 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2131 rrd_set_error("rrd_write pdp_prep to rrd");
2135 if (rrd_write(rrd_file, rrd->cdp_prep,
2136 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2137 rrd->stat_head->ds_cnt)
2138 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2139 rrd->stat_head->ds_cnt)) {
2141 rrd_set_error("rrd_write cdp_prep to rrd");
2145 if (rrd_write(rrd_file, rrd->rra_ptr,
2146 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2147 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2148 rrd_set_error("rrd_write rra_ptr to rrd");