2 /*****************************************************************************
3 * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 unsigned long *rra_current,
102 time_t *current_time,
103 unsigned long *current_time_usec,
104 rrd_value_t *pdp_temp,
105 rrd_value_t *pdp_new,
106 unsigned long *rra_step_cnt,
109 unsigned long tmpl_cnt,
110 rrd_info_t ** pcdp_summary,
112 unsigned long *skip_update,
113 int *schedule_smooth);
120 unsigned long tmpl_cnt,
121 time_t *current_time,
122 unsigned long *current_time_usec,
125 static int get_time_from_reading(
129 time_t *current_time,
130 unsigned long *current_time_usec,
133 static int update_pdp_prep(
136 rrd_value_t *pdp_new,
139 static int calculate_elapsed_steps(
141 unsigned long current_time,
142 unsigned long current_time_usec,
146 unsigned long *proc_pdp_cnt);
148 static void simple_update(
151 rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
158 unsigned long elapsed_pdp_st,
159 rrd_value_t *pdp_new,
160 rrd_value_t *pdp_temp);
162 static int process_pdp_st(
164 unsigned long ds_idx,
169 rrd_value_t *pdp_new,
170 rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
174 unsigned long *rra_step_cnt,
175 unsigned long rra_begin,
176 rrd_file_t *rrd_file,
177 unsigned long elapsed_pdp_st,
178 unsigned long proc_pdp_cnt,
179 rrd_value_t **last_seasonal_coef,
180 rrd_value_t **seasonal_coef,
181 rrd_value_t *pdp_temp,
182 unsigned long *rra_current,
183 unsigned long *skip_update,
184 int *schedule_smooth);
186 static int do_schedule_smooth(
188 unsigned long rra_idx,
189 unsigned long elapsed_pdp_st);
191 static int update_cdp_prep(
193 unsigned long elapsed_pdp_st,
194 unsigned long start_pdp_offset,
195 unsigned long *rra_step_cnt,
197 rrd_value_t *pdp_temp,
198 rrd_value_t *last_seasonal_coef,
199 rrd_value_t *seasonal_coef,
202 static void update_cdp(
205 rrd_value_t pdp_temp_val,
206 unsigned long rra_step_cnt,
207 unsigned long elapsed_pdp_st,
208 unsigned long start_pdp_offset,
209 unsigned long pdp_cnt,
214 static void initialize_cdp_val(
217 rrd_value_t pdp_temp_val,
218 unsigned long elapsed_pdp_st,
219 unsigned long start_pdp_offset,
220 unsigned long pdp_cnt);
222 static void reset_cdp(
224 unsigned long elapsed_pdp_st,
225 rrd_value_t *pdp_temp,
226 rrd_value_t *last_seasonal_coef,
227 rrd_value_t *seasonal_coef,
231 enum cf_en current_cf);
233 static rrd_value_t initialize_average_carry_over(
234 rrd_value_t pdp_temp_val,
235 unsigned long elapsed_pdp_st,
236 unsigned long start_pdp_offset,
237 unsigned long pdp_cnt);
239 static rrd_value_t calculate_cdp_val(
241 rrd_value_t pdp_temp_val,
242 unsigned long elapsed_pdp_st,
247 static int update_aberrant_cdps(
249 rrd_file_t *rrd_file,
250 unsigned long rra_begin,
251 unsigned long *rra_current,
252 unsigned long elapsed_pdp_st,
253 rrd_value_t *pdp_temp,
254 rrd_value_t **seasonal_coef);
256 static int write_to_rras(
258 rrd_file_t *rrd_file,
259 unsigned long *rra_step_cnt,
260 unsigned long rra_begin,
261 unsigned long *rra_current,
263 unsigned long *skip_update,
264 rrd_info_t ** pcdp_summary);
266 static int write_RRA_row(
267 rrd_file_t *rrd_file,
269 unsigned long rra_idx,
270 unsigned long *rra_current,
271 unsigned short CDP_scratch_idx,
272 rrd_info_t ** pcdp_summary,
275 static int smooth_all_rras(
277 rrd_file_t *rrd_file,
278 unsigned long rra_begin);
281 static int write_changes_to_disk(
283 rrd_file_t *rrd_file,
288 * normalize time as returned by gettimeofday. usec part must
291 static inline void normalize_time(
294 if (t->tv_usec < 0) {
301 * Sets current_time and current_time_usec based on the current time.
302 * current_time_usec is set to 0 if the version number is 1 or 2.
304 static inline void initialize_time(
305 time_t *current_time,
306 unsigned long *current_time_usec,
309 struct timeval tmp_time; /* used for time conversion */
311 gettimeofday(&tmp_time, 0);
312 normalize_time(&tmp_time);
313 *current_time = tmp_time.tv_sec;
315 *current_time_usec = tmp_time.tv_usec;
317 *current_time_usec = 0;
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
323 rrd_info_t *rrd_update_v(
328 rrd_info_t *result = NULL;
330 struct option long_options[] = {
331 {"template", required_argument, 0, 't'},
337 opterr = 0; /* initialize getopt */
340 int option_index = 0;
343 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
354 rrd_set_error("unknown option '%s'", argv[optind - 1]);
359 /* need at least 2 arguments: filename, data. */
360 if (argc - optind < 2) {
361 rrd_set_error("Not enough arguments");
365 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366 rc.u_int = _rrd_update(argv[optind], tmplt,
368 (const char **) (argv + optind + 1), result);
369 result->value.u_int = rc.u_int;
378 struct option long_options[] = {
379 {"template", required_argument, 0, 't'},
380 {"daemon", required_argument, 0, 'd'},
383 int option_index = 0;
387 char *opt_daemon = NULL;
390 opterr = 0; /* initialize getopt */
393 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
400 tmplt = strdup(optarg);
404 if (opt_daemon != NULL)
406 opt_daemon = strdup (optarg);
407 if (opt_daemon == NULL)
409 rrd_set_error("strdup failed.");
415 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 /* need at least 2 arguments: filename, data. */
421 if (argc - optind < 2) {
422 rrd_set_error("Not enough arguments");
426 if ((tmplt != NULL) && (opt_daemon != NULL))
428 rrd_set_error("The caching opt_daemon cannot be used together with "
433 if ((tmplt == NULL) && (opt_daemon == NULL))
437 temp = getenv (ENV_RRDCACHED_ADDRESS);
440 opt_daemon = strdup (temp);
441 if (opt_daemon == NULL)
443 rrd_set_error("strdup failed.");
449 if (opt_daemon != NULL)
453 status = rrdc_connect (opt_daemon);
456 rrd_set_error("Unable to connect to opt_daemon: %s",
459 : rrd_strerror (status));
463 status = rrdc_update (/* file = */ argv[optind],
464 /* values_num = */ argc - optind - 1,
465 /* values = */ (void *) (argv + optind + 1));
468 rrd_set_error("Failed sending the values to the opt_daemon: %s",
471 : rrd_strerror (status));
480 } /* if (opt_daemon != NULL) */
482 rc = rrd_update_r(argv[optind], tmplt,
483 argc - optind - 1, (const char **) (argv + optind + 1));
490 if (opt_daemon != NULL)
499 const char *filename,
504 return _rrd_update(filename, tmplt, argc, argv, NULL);
508 const char *filename,
512 rrd_info_t * pcdp_summary)
517 unsigned long rra_begin; /* byte pointer to the rra
518 * area in the rrd file. this
519 * pointer never changes value */
520 unsigned long rra_current; /* byte pointer to the current write
521 * spot in the rrd file. */
522 rrd_value_t *pdp_new; /* prepare the incoming data to be added
523 * to the existing entry */
524 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
525 * to the cdp values */
527 long *tmpl_idx; /* index representing the settings
528 * transported by the tmplt index */
529 unsigned long tmpl_cnt = 2; /* time and data */
531 time_t current_time = 0;
532 unsigned long current_time_usec = 0; /* microseconds part of current time */
534 int schedule_smooth = 0;
536 /* number of elapsed PDP steps since last update */
537 unsigned long *rra_step_cnt = NULL;
539 int version; /* rrd version */
540 rrd_file_t *rrd_file;
541 char *arg_copy; /* for processing the argv */
542 unsigned long *skip_update; /* RRAs to advance but not write */
544 /* need at least 1 arguments: data. */
546 rrd_set_error("Not enough arguments");
550 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
553 /* We are now at the beginning of the rra's */
554 rra_current = rra_begin = rrd_file->header_len;
556 version = atoi(rrd.stat_head->version);
558 initialize_time(¤t_time, ¤t_time_usec, version);
560 /* get exclusive lock to whole file.
561 * lock gets removed when we close the file.
563 if (rrd_lock(rrd_file) != 0) {
564 rrd_set_error("could not lock RRD");
568 if (allocate_data_structures(&rrd, &updvals,
569 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
570 &rra_step_cnt, &skip_update,
575 /* loop through the arguments. */
576 for (arg_i = 0; arg_i < argc; arg_i++) {
577 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
578 rrd_set_error("failed duplication argv entry");
581 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
582 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
583 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
584 &pcdp_summary, version, skip_update,
585 &schedule_smooth) == -1) {
586 if (rrd_test_error()) { /* Should have error string always here */
589 /* Prepend file name to error message */
590 if ((save_error = strdup(rrd_get_error())) != NULL) {
591 rrd_set_error("%s: %s", filename, save_error);
603 /* if we got here and if there is an error and if the file has not been
604 * written to, then close things up and return. */
605 if (rrd_test_error()) {
606 goto err_free_structures;
609 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
610 goto err_free_structures;
614 /* calling the smoothing code here guarantees at most one smoothing
615 * operation per rrd_update call. Unfortunately, it is possible with bulk
616 * updates, or a long-delayed update for smoothing to occur off-schedule.
617 * This really isn't critical except during the burn-in cycles. */
618 if (schedule_smooth) {
619 smooth_all_rras(&rrd, rrd_file, rra_begin);
622 /* rrd_dontneed(rrd_file,&rrd); */
648 * get exclusive lock to whole file.
649 * lock gets removed when we close the file
651 * returns 0 on success
659 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
662 if (_fstat(file->fd, &st) == 0) {
663 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
670 lock.l_type = F_WRLCK; /* exclusive write lock */
671 lock.l_len = 0; /* whole file */
672 lock.l_start = 0; /* start of file */
673 lock.l_whence = SEEK_SET; /* end of file */
675 rcstat = fcntl(file->fd, F_SETLK, &lock);
683 * Allocate some important arrays used, and initialize the template.
685 * When it returns, either all of the structures are allocated
686 * or none of them are.
688 * Returns 0 on success, -1 on error.
690 static int allocate_data_structures(
693 rrd_value_t **pdp_temp,
696 unsigned long *tmpl_cnt,
697 unsigned long **rra_step_cnt,
698 unsigned long **skip_update,
699 rrd_value_t **pdp_new)
702 if ((*updvals = (char **) malloc(sizeof(char *)
703 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
704 rrd_set_error("allocating updvals pointer array.");
707 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
708 * rrd->stat_head->ds_cnt)) ==
710 rrd_set_error("allocating pdp_temp.");
711 goto err_free_updvals;
713 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
715 rrd->stat_head->rra_cnt)) ==
717 rrd_set_error("allocating skip_update.");
718 goto err_free_pdp_temp;
720 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
721 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
722 rrd_set_error("allocating tmpl_idx.");
723 goto err_free_skip_update;
725 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
728 rra_cnt))) == NULL) {
729 rrd_set_error("allocating rra_step_cnt.");
730 goto err_free_tmpl_idx;
733 /* initialize tmplt redirector */
734 /* default config example (assume DS 1 is a CDEF DS)
735 tmpl_idx[0] -> 0; (time)
736 tmpl_idx[1] -> 1; (DS 0)
737 tmpl_idx[2] -> 3; (DS 2)
738 tmpl_idx[3] -> 4; (DS 3) */
739 (*tmpl_idx)[0] = 0; /* time */
740 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
741 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
742 (*tmpl_idx)[ii++] = i;
747 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
748 goto err_free_rra_step_cnt;
752 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
753 * rrd->stat_head->ds_cnt)) == NULL) {
754 rrd_set_error("allocating pdp_new.");
755 goto err_free_rra_step_cnt;
760 err_free_rra_step_cnt:
764 err_free_skip_update:
774 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
776 * Returns 0 on success.
778 static int parse_template(
781 unsigned long *tmpl_cnt,
784 char *dsname, *tmplt_copy;
785 unsigned int tmpl_len, i;
788 *tmpl_cnt = 1; /* the first entry is the time */
790 /* we should work on a writeable copy here */
791 if ((tmplt_copy = strdup(tmplt)) == NULL) {
792 rrd_set_error("error copying tmplt '%s'", tmplt);
798 tmpl_len = strlen(tmplt_copy);
799 for (i = 0; i <= tmpl_len; i++) {
800 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
801 tmplt_copy[i] = '\0';
802 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
803 rrd_set_error("tmplt contains more DS definitions than RRD");
805 goto out_free_tmpl_copy;
807 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
808 rrd_set_error("unknown DS name '%s'", dsname);
810 goto out_free_tmpl_copy;
812 /* go to the next entry on the tmplt_copy */
814 dsname = &tmplt_copy[i + 1];
824 * Parse an update string, updates the primary data points (PDPs)
825 * and consolidated data points (CDPs), and writes changes to the RRAs.
827 * Returns 0 on success, -1 on error.
829 static int process_arg(
832 rrd_file_t *rrd_file,
833 unsigned long rra_begin,
834 unsigned long *rra_current,
835 time_t *current_time,
836 unsigned long *current_time_usec,
837 rrd_value_t *pdp_temp,
838 rrd_value_t *pdp_new,
839 unsigned long *rra_step_cnt,
842 unsigned long tmpl_cnt,
843 rrd_info_t ** pcdp_summary,
845 unsigned long *skip_update,
846 int *schedule_smooth)
848 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
850 /* a vector of future Holt-Winters seasonal coefs */
851 unsigned long elapsed_pdp_st;
853 double interval, pre_int, post_int; /* interval between this and
855 unsigned long proc_pdp_cnt;
856 unsigned long rra_start;
858 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
859 current_time, current_time_usec, version) == -1) {
862 /* seek to the beginning of the rra's */
863 if (*rra_current != rra_begin) {
865 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
866 rrd_set_error("seek error in rrd");
870 *rra_current = rra_begin;
872 rra_start = rra_begin;
874 interval = (double) (*current_time - rrd->live_head->last_up)
875 + (double) ((long) *current_time_usec -
876 (long) rrd->live_head->last_up_usec) / 1e6f;
878 /* process the data sources and update the pdp_prep
879 * area accordingly */
880 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
884 elapsed_pdp_st = calculate_elapsed_steps(rrd,
886 *current_time_usec, interval,
890 /* has a pdp_st moment occurred since the last run ? */
891 if (elapsed_pdp_st == 0) {
892 /* no we have not passed a pdp_st moment. therefore update is simple */
893 simple_update(rrd, interval, pdp_new);
895 /* an pdp_st has occurred. */
896 if (process_all_pdp_st(rrd, interval,
898 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
901 if (update_all_cdp_prep(rrd, rra_step_cnt,
907 pdp_temp, rra_current,
908 skip_update, schedule_smooth) == -1) {
909 goto err_free_coefficients;
911 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
912 elapsed_pdp_st, pdp_temp,
913 &seasonal_coef) == -1) {
914 goto err_free_coefficients;
916 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
917 rra_current, *current_time, skip_update,
918 pcdp_summary) == -1) {
919 goto err_free_coefficients;
921 } /* endif a pdp_st has occurred */
922 rrd->live_head->last_up = *current_time;
923 rrd->live_head->last_up_usec = *current_time_usec;
926 *rrd->legacy_last_up = rrd->live_head->last_up;
929 free(last_seasonal_coef);
932 err_free_coefficients:
934 free(last_seasonal_coef);
939 * Parse a DS string (time + colon-separated values), storing the
940 * results in current_time, current_time_usec, and updvals.
942 * Returns 0 on success, -1 on error.
949 unsigned long tmpl_cnt,
950 time_t *current_time,
951 unsigned long *current_time_usec,
959 /* initialize all ds input to unknown except the first one
960 which has always got to be set */
961 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
964 /* separate all ds elements; first must be examined separately
965 due to alternate time syntax */
966 if ((p = strchr(input, '@')) != NULL) {
968 } else if ((p = strchr(input, ':')) != NULL) {
971 rrd_set_error("expected timestamp not found in data source from %s",
977 updvals[tmpl_idx[i++]] = p + 1;
982 updvals[tmpl_idx[i++]] = p + 1;
988 rrd_set_error("expected %lu data source readings (got %lu) from %s",
989 tmpl_cnt - 1, i, input);
993 if (get_time_from_reading(rrd, timesyntax, updvals,
994 current_time, current_time_usec,
1002 * Parse the time in a DS string, store it in current_time and
1003 * current_time_usec and verify that it's later than the last
1004 * update for this DS.
1006 * Returns 0 on success, -1 on error.
1008 static int get_time_from_reading(
1012 time_t *current_time,
1013 unsigned long *current_time_usec,
1017 char *parsetime_error = NULL;
1019 rrd_time_value_t ds_tv;
1020 struct timeval tmp_time; /* used for time conversion */
1022 /* get the time from the reading ... handle N */
1023 if (timesyntax == '@') { /* at-style */
1024 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1025 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1028 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1029 ds_tv.type == RELATIVE_TO_START_TIME) {
1030 rrd_set_error("specifying time relative to the 'start' "
1031 "or 'end' makes no sense here: %s", updvals[0]);
1034 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1035 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1036 } else if (strcmp(updvals[0], "N") == 0) {
1037 gettimeofday(&tmp_time, 0);
1038 normalize_time(&tmp_time);
1039 *current_time = tmp_time.tv_sec;
1040 *current_time_usec = tmp_time.tv_usec;
1042 old_locale = setlocale(LC_NUMERIC, "C");
1043 tmp = strtod(updvals[0], 0);
1044 setlocale(LC_NUMERIC, old_locale);
1045 *current_time = floor(tmp);
1046 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1048 /* dont do any correction for old version RRDs */
1050 *current_time_usec = 0;
1052 if (*current_time < rrd->live_head->last_up ||
1053 (*current_time == rrd->live_head->last_up &&
1054 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1055 rrd_set_error("illegal attempt to update using time %ld when "
1056 "last update time is %ld (minimum one second step)",
1057 *current_time, rrd->live_head->last_up);
1064 * Update pdp_new by interpreting the updvals according to the DS type
1065 * (COUNTER, GAUGE, etc.).
1067 * Returns 0 on success, -1 on error.
1069 static int update_pdp_prep(
1072 rrd_value_t *pdp_new,
1075 unsigned long ds_idx;
1077 char *endptr; /* used in the conversion */
1080 enum dst_en dst_idx;
1082 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1083 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1085 /* make sure we do not build diffs with old last_ds values */
1086 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1087 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1088 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1091 /* NOTE: DST_CDEF should never enter this if block, because
1092 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1093 * accidently specified a value for the DST_CDEF. To handle this case,
1094 * an extra check is required. */
1096 if ((updvals[ds_idx + 1][0] != 'U') &&
1097 (dst_idx != DST_CDEF) &&
1098 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1101 /* pdp_new contains rate * time ... eg the bytes transferred during
1102 * the interval. Doing it this way saves a lot of math operations
1107 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1108 if ((updvals[ds_idx + 1][ii] < '0'
1109 || updvals[ds_idx + 1][ii] > '9')
1110 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1111 rrd_set_error("not a simple integer: '%s'",
1112 updvals[ds_idx + 1]);
1116 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1118 rrd_diff(updvals[ds_idx + 1],
1119 rrd->pdp_prep[ds_idx].last_ds);
1120 if (dst_idx == DST_COUNTER) {
1121 /* simple overflow catcher. This will fail
1122 * terribly for non 32 or 64 bit counters
1123 * ... are there any others in SNMP land?
1125 if (pdp_new[ds_idx] < (double) 0.0)
1126 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1127 if (pdp_new[ds_idx] < (double) 0.0)
1128 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1130 rate = pdp_new[ds_idx] / interval;
1132 pdp_new[ds_idx] = DNAN;
1136 old_locale = setlocale(LC_NUMERIC, "C");
1138 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1139 setlocale(LC_NUMERIC, old_locale);
1141 rrd_set_error("converting '%s' to float: %s",
1142 updvals[ds_idx + 1], rrd_strerror(errno));
1145 if (endptr[0] != '\0') {
1147 ("conversion of '%s' to float not complete: tail '%s'",
1148 updvals[ds_idx + 1], endptr);
1151 rate = pdp_new[ds_idx] / interval;
1155 old_locale = setlocale(LC_NUMERIC, "C");
1157 strtod(updvals[ds_idx + 1], &endptr) * interval;
1158 setlocale(LC_NUMERIC, old_locale);
1160 rrd_set_error("converting '%s' to float: %s",
1161 updvals[ds_idx + 1], rrd_strerror(errno));
1164 if (endptr[0] != '\0') {
1166 ("conversion of '%s' to float not complete: tail '%s'",
1167 updvals[ds_idx + 1], endptr);
1170 rate = pdp_new[ds_idx] / interval;
1173 rrd_set_error("rrd contains unknown DS type : '%s'",
1174 rrd->ds_def[ds_idx].dst);
1177 /* break out of this for loop if the error string is set */
1178 if (rrd_test_error()) {
1181 /* make sure pdp_temp is neither too large or too small
1182 * if any of these occur it becomes unknown ...
1183 * sorry folks ... */
1185 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1186 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1187 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1188 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1189 pdp_new[ds_idx] = DNAN;
1192 /* no news is news all the same */
1193 pdp_new[ds_idx] = DNAN;
1197 /* make a copy of the command line argument for the next run */
1199 fprintf(stderr, "prep ds[%lu]\t"
1203 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1206 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1208 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1214 * How many PDP steps have elapsed since the last update? Returns the answer,
1215 * and stores the time between the last update and the last PDP in pre_time,
1216 * and the time between the last PDP and the current time in post_int.
1218 static int calculate_elapsed_steps(
1220 unsigned long current_time,
1221 unsigned long current_time_usec,
1225 unsigned long *proc_pdp_cnt)
1227 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1228 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1230 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1231 * when it was last updated */
1232 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1234 /* when was the current pdp started */
1235 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1236 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1238 /* when did the last pdp_st occur */
1239 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1240 occu_pdp_st = current_time - occu_pdp_age;
1242 if (occu_pdp_st > proc_pdp_st) {
1243 /* OK we passed the pdp_st moment */
1244 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1245 * occurred before the latest
1247 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1248 *post_int = occu_pdp_age; /* how much after it */
1249 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1251 *pre_int = interval;
1255 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1258 printf("proc_pdp_age %lu\t"
1260 "occu_pfp_age %lu\t"
1264 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1265 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1268 /* compute the number of elapsed pdp_st moments */
1269 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1273 * Increment the PDP values by the values in pdp_new, or else initialize them.
1275 static void simple_update(
1278 rrd_value_t *pdp_new)
1282 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1283 if (isnan(pdp_new[i])) {
1284 /* this is not really accurate if we use subsecond data arrival time
1285 should have thought of it when going subsecond resolution ...
1286 sorry next format change we will have it! */
1287 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1290 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1291 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1293 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1302 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1303 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1309 * Call process_pdp_st for each DS.
1311 * Returns 0 on success, -1 on error.
1313 static int process_all_pdp_st(
1318 unsigned long elapsed_pdp_st,
1319 rrd_value_t *pdp_new,
1320 rrd_value_t *pdp_temp)
1322 unsigned long ds_idx;
1324 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1325 rate*seconds which occurred up to the last run.
1326 pdp_new[] contains rate*seconds from the latest run.
1327 pdp_temp[] will contain the rate for cdp */
1329 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1330 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1331 elapsed_pdp_st * rrd->stat_head->pdp_step,
1332 pdp_new, pdp_temp) == -1) {
1336 fprintf(stderr, "PDP UPD ds[%lu]\t"
1337 "elapsed_pdp_st %lu\t"
1340 "new_unkn_sec %5lu\n",
1344 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1345 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1352 * Process an update that occurs after one of the PDP moments.
1353 * Increments the PDP value, sets NAN if time greater than the
1354 * heartbeats have elapsed, processes CDEFs.
1356 * Returns 0 on success, -1 on error.
1358 static int process_pdp_st(
1360 unsigned long ds_idx,
1364 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1365 rrd_value_t *pdp_new,
1366 rrd_value_t *pdp_temp)
1370 /* update pdp_prep to the current pdp_st. */
1371 double pre_unknown = 0.0;
1372 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1373 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1375 rpnstack_t rpnstack; /* used for COMPUTE DS */
1377 rpnstack_init(&rpnstack);
1380 if (isnan(pdp_new[ds_idx])) {
1381 /* a final bit of unknown to be added before calculation
1382 we use a temporary variable for this so that we
1383 don't have to turn integer lines before using the value */
1384 pre_unknown = pre_int;
1386 if (isnan(scratch[PDP_val].u_val)) {
1387 scratch[PDP_val].u_val = 0;
1389 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1392 /* if too much of the pdp_prep is unknown we dump it */
1393 /* if the interval is larger thatn mrhb we get NAN */
1394 if ((interval > mrhb) ||
1395 (rrd->stat_head->pdp_step / 2.0 <
1396 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1397 pdp_temp[ds_idx] = DNAN;
1399 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1400 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1404 /* process CDEF data sources; remember each CDEF DS can
1405 * only reference other DS with a lower index number */
1406 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1410 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1411 /* substitute data values for OP_VARIABLE nodes */
1412 for (i = 0; rpnp[i].op != OP_END; i++) {
1413 if (rpnp[i].op == OP_VARIABLE) {
1414 rpnp[i].op = OP_NUMBER;
1415 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1418 /* run the rpn calculator */
1419 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1421 rpnstack_free(&rpnstack);
1426 /* make pdp_prep ready for the next run */
1427 if (isnan(pdp_new[ds_idx])) {
1428 /* this is not realy accurate if we use subsecond data arival time
1429 should have thought of it when going subsecond resolution ...
1430 sorry next format change we will have it! */
1431 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1432 scratch[PDP_val].u_val = DNAN;
1434 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1435 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1437 rpnstack_free(&rpnstack);
1442 * Iterate over all the RRAs for a given DS and:
1443 * 1. Decide whether to schedule a smooth later
1444 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1447 * Returns 0 on success, -1 on error
1449 static int update_all_cdp_prep(
1451 unsigned long *rra_step_cnt,
1452 unsigned long rra_begin,
1453 rrd_file_t *rrd_file,
1454 unsigned long elapsed_pdp_st,
1455 unsigned long proc_pdp_cnt,
1456 rrd_value_t **last_seasonal_coef,
1457 rrd_value_t **seasonal_coef,
1458 rrd_value_t *pdp_temp,
1459 unsigned long *rra_current,
1460 unsigned long *skip_update,
1461 int *schedule_smooth)
1463 unsigned long rra_idx;
1465 /* index into the CDP scratch array */
1466 enum cf_en current_cf;
1467 unsigned long rra_start;
1469 /* number of rows to be updated in an RRA for a data value. */
1470 unsigned long start_pdp_offset;
1472 rra_start = rra_begin;
1473 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1474 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1476 rrd->rra_def[rra_idx].pdp_cnt -
1477 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1478 skip_update[rra_idx] = 0;
1479 if (start_pdp_offset <= elapsed_pdp_st) {
1480 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1481 rrd->rra_def[rra_idx].pdp_cnt + 1;
1483 rra_step_cnt[rra_idx] = 0;
1486 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1487 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1488 * so that they will be correct for the next observed value; note that for
1489 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1490 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1491 if (rra_step_cnt[rra_idx] > 1) {
1492 skip_update[rra_idx] = 1;
1493 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1494 elapsed_pdp_st, last_seasonal_coef);
1495 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1496 elapsed_pdp_st + 1, seasonal_coef);
1498 /* periodically run a smoother for seasonal effects */
1499 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1502 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1503 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1504 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1507 *schedule_smooth = 1;
1509 *rra_current = rrd_tell(rrd_file);
1511 if (rrd_test_error())
1515 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1516 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1517 current_cf) == -1) {
1521 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1522 sizeof(rrd_value_t);
1528 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1530 static int do_schedule_smooth(
1532 unsigned long rra_idx,
1533 unsigned long elapsed_pdp_st)
1535 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1536 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1537 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1538 unsigned long seasonal_smooth_idx =
1539 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1540 unsigned long *init_seasonal =
1541 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1543 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1544 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1545 * really an RRA level, not a data source within RRA level parameter, but
1546 * the rra_def is read only for rrd_update (not flushed to disk). */
1547 if (*init_seasonal > BURNIN_CYCLES) {
1548 /* someone has no doubt invented a trick to deal with this wrap around,
1549 * but at least this code is clear. */
1550 if (seasonal_smooth_idx > cur_row) {
1551 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1552 * between PDP and CDP */
1553 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1555 /* can't rely on negative numbers because we are working with
1556 * unsigned values */
1557 return (cur_row + elapsed_pdp_st >= row_cnt
1558 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1560 /* mark off one of the burn-in cycles */
1561 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1565 * For a given RRA, iterate over the data sources and call the appropriate
1566 * consolidation function.
1568 * Returns 0 on success, -1 on error.
1570 static int update_cdp_prep(
1572 unsigned long elapsed_pdp_st,
1573 unsigned long start_pdp_offset,
1574 unsigned long *rra_step_cnt,
1576 rrd_value_t *pdp_temp,
1577 rrd_value_t *last_seasonal_coef,
1578 rrd_value_t *seasonal_coef,
1581 unsigned long ds_idx, cdp_idx;
1583 /* update CDP_PREP areas */
1584 /* loop over data soures within each RRA */
1585 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1587 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1589 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1590 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1591 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1592 elapsed_pdp_st, start_pdp_offset,
1593 rrd->rra_def[rra_idx].pdp_cnt,
1594 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1597 /* Nothing to consolidate if there's one PDP per CDP. However, if
1598 * we've missed some PDPs, let's update null counters etc. */
1599 if (elapsed_pdp_st > 2) {
1600 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1601 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1606 if (rrd_test_error())
1608 } /* endif data sources loop */
1613 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1614 * primary value, secondary value, and # of unknowns.
1616 static void update_cdp(
1619 rrd_value_t pdp_temp_val,
1620 unsigned long rra_step_cnt,
1621 unsigned long elapsed_pdp_st,
1622 unsigned long start_pdp_offset,
1623 unsigned long pdp_cnt,
1628 /* shorthand variables */
1629 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1630 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1631 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1632 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1635 /* If we are in this block, as least 1 CDP value will be written to
1636 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1637 * to be written, then the "fill in" value is the CDP_secondary_val
1639 if (isnan(pdp_temp_val)) {
1640 *cdp_unkn_pdp_cnt += start_pdp_offset;
1641 *cdp_secondary_val = DNAN;
1643 /* CDP_secondary value is the RRA "fill in" value for intermediary
1644 * CDP data entries. No matter the CF, the value is the same because
1645 * the average, max, min, and last of a list of identical values is
1646 * the same, namely, the value itself. */
1647 *cdp_secondary_val = pdp_temp_val;
1650 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1651 *cdp_primary_val = DNAN;
1652 if (current_cf == CF_AVERAGE) {
1654 initialize_average_carry_over(pdp_temp_val,
1656 start_pdp_offset, pdp_cnt);
1658 *cdp_val = pdp_temp_val;
1661 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1662 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1663 } /* endif meets xff value requirement for a valid value */
1664 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1665 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1666 if (isnan(pdp_temp_val))
1667 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1669 *cdp_unkn_pdp_cnt = 0;
1670 } else { /* rra_step_cnt[i] == 0 */
1673 if (isnan(*cdp_val)) {
1674 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1677 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1681 if (isnan(pdp_temp_val)) {
1682 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1685 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1692 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1693 * on the type of consolidation function.
1695 static void initialize_cdp_val(
1698 rrd_value_t pdp_temp_val,
1699 unsigned long elapsed_pdp_st,
1700 unsigned long start_pdp_offset,
1701 unsigned long pdp_cnt)
1703 rrd_value_t cum_val, cur_val;
1705 switch (current_cf) {
1707 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1708 cur_val = IFDNAN(pdp_temp_val, 0.0);
1709 scratch[CDP_primary_val].u_val =
1710 (cum_val + cur_val * start_pdp_offset) /
1711 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1712 scratch[CDP_val].u_val =
1713 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1714 start_pdp_offset, pdp_cnt);
1717 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1718 cur_val = IFDNAN(pdp_temp_val, -DINF);
1721 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1723 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1729 if (cur_val > cum_val)
1730 scratch[CDP_primary_val].u_val = cur_val;
1732 scratch[CDP_primary_val].u_val = cum_val;
1733 /* initialize carry over value */
1734 scratch[CDP_val].u_val = pdp_temp_val;
1737 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1738 cur_val = IFDNAN(pdp_temp_val, DINF);
1741 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1743 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1749 if (cur_val < cum_val)
1750 scratch[CDP_primary_val].u_val = cur_val;
1752 scratch[CDP_primary_val].u_val = cum_val;
1753 /* initialize carry over value */
1754 scratch[CDP_val].u_val = pdp_temp_val;
1758 scratch[CDP_primary_val].u_val = pdp_temp_val;
1759 /* initialize carry over value */
1760 scratch[CDP_val].u_val = pdp_temp_val;
1766 * Update the consolidation function for Holt-Winters functions as
1767 * well as other functions that don't actually consolidate multiple
1770 static void reset_cdp(
1772 unsigned long elapsed_pdp_st,
1773 rrd_value_t *pdp_temp,
1774 rrd_value_t *last_seasonal_coef,
1775 rrd_value_t *seasonal_coef,
1779 enum cf_en current_cf)
1781 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1783 switch (current_cf) {
1786 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1787 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1790 case CF_DEVSEASONAL:
1791 /* need to update cached seasonal values, so they are consistent
1792 * with the bulk update */
1793 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1794 * CDP_last_deviation are the same. */
1795 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1796 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1800 /* need to update the null_count and last_null_count.
1801 * even do this for non-DNAN pdp_temp because the
1802 * algorithm is not learning from batch updates. */
1803 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1804 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1807 scratch[CDP_primary_val].u_val = DNAN;
1808 scratch[CDP_secondary_val].u_val = DNAN;
1811 /* do not count missed bulk values as failures */
1812 scratch[CDP_primary_val].u_val = 0;
1813 scratch[CDP_secondary_val].u_val = 0;
1814 /* need to reset violations buffer.
1815 * could do this more carefully, but for now, just
1816 * assume a bulk update wipes away all violations. */
1817 erase_violations(rrd, cdp_idx, rra_idx);
1822 static rrd_value_t initialize_average_carry_over(
1823 rrd_value_t pdp_temp_val,
1824 unsigned long elapsed_pdp_st,
1825 unsigned long start_pdp_offset,
1826 unsigned long pdp_cnt)
1828 /* initialize carry over value */
1829 if (isnan(pdp_temp_val)) {
1832 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1836 * Update or initialize a CDP value based on the consolidation
1839 * Returns the new value.
1841 static rrd_value_t calculate_cdp_val(
1842 rrd_value_t cdp_val,
1843 rrd_value_t pdp_temp_val,
1844 unsigned long elapsed_pdp_st,
1855 if (isnan(cdp_val)) {
1856 if (current_cf == CF_AVERAGE) {
1857 pdp_temp_val *= elapsed_pdp_st;
1860 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1861 i, ii, pdp_temp_val);
1863 return pdp_temp_val;
1865 if (current_cf == CF_AVERAGE)
1866 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1867 if (current_cf == CF_MINIMUM)
1868 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1869 if (current_cf == CF_MAXIMUM)
1870 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1872 return pdp_temp_val;
1876 * For each RRA, update the seasonal values and then call update_aberrant_CF
1877 * for each data source.
1879 * Return 0 on success, -1 on error.
1881 static int update_aberrant_cdps(
1883 rrd_file_t *rrd_file,
1884 unsigned long rra_begin,
1885 unsigned long *rra_current,
1886 unsigned long elapsed_pdp_st,
1887 rrd_value_t *pdp_temp,
1888 rrd_value_t **seasonal_coef)
1890 unsigned long rra_idx, ds_idx, j;
1892 /* number of PDP steps since the last update that
1893 * are assigned to the first CDP to be generated
1894 * since the last update. */
1895 unsigned short scratch_idx;
1896 unsigned long rra_start;
1897 enum cf_en current_cf;
1899 /* this loop is only entered if elapsed_pdp_st < 3 */
1900 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1901 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1902 rra_start = rra_begin;
1903 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1904 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1905 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1906 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1907 if (scratch_idx == CDP_primary_val) {
1908 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1909 elapsed_pdp_st + 1, seasonal_coef);
1911 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1912 elapsed_pdp_st + 2, seasonal_coef);
1914 *rra_current = rrd_tell(rrd_file);
1916 if (rrd_test_error())
1918 /* loop over data soures within each RRA */
1919 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1920 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1921 rra_idx * (rrd->stat_head->ds_cnt) +
1922 ds_idx, rra_idx, ds_idx, scratch_idx,
1926 rra_start += rrd->rra_def[rra_idx].row_cnt
1927 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1934 * Move sequentially through the file, writing one RRA at a time. Note this
1935 * architecture divorces the computation of CDP with flushing updated RRA
1938 * Return 0 on success, -1 on error.
1940 static int write_to_rras(
1942 rrd_file_t *rrd_file,
1943 unsigned long *rra_step_cnt,
1944 unsigned long rra_begin,
1945 unsigned long *rra_current,
1946 time_t current_time,
1947 unsigned long *skip_update,
1948 rrd_info_t ** pcdp_summary)
1950 unsigned long rra_idx;
1951 unsigned long rra_start;
1952 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1953 time_t rra_time = 0; /* time of update for a RRA */
1955 /* Ready to write to disk */
1956 rra_start = rra_begin;
1957 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1958 /* skip unless there's something to write */
1959 if (rra_step_cnt[rra_idx]) {
1960 /* write the first row */
1962 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1964 rrd->rra_ptr[rra_idx].cur_row++;
1965 if (rrd->rra_ptr[rra_idx].cur_row >=
1966 rrd->rra_def[rra_idx].row_cnt)
1967 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1968 /* position on the first row */
1969 rra_pos_tmp = rra_start +
1970 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1971 sizeof(rrd_value_t);
1972 if (rra_pos_tmp != *rra_current) {
1973 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1974 rrd_set_error("seek error in rrd");
1977 *rra_current = rra_pos_tmp;
1980 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1982 if (!skip_update[rra_idx]) {
1983 if (*pcdp_summary != NULL) {
1984 rra_time = (current_time - current_time
1985 % (rrd->rra_def[rra_idx].pdp_cnt *
1986 rrd->stat_head->pdp_step))
1988 ((rra_step_cnt[rra_idx] -
1989 1) * rrd->rra_def[rra_idx].pdp_cnt *
1990 rrd->stat_head->pdp_step);
1993 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1994 pcdp_summary, rra_time) == -1)
1998 /* write other rows of the bulk update, if any */
1999 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
2000 if (++rrd->rra_ptr[rra_idx].cur_row ==
2001 rrd->rra_def[rra_idx].row_cnt) {
2004 "Wraparound for RRA %s, %lu updates left\n",
2005 rrd->rra_def[rra_idx].cf_nam,
2006 rra_step_cnt[rra_idx] - 1);
2009 rrd->rra_ptr[rra_idx].cur_row = 0;
2010 /* seek back to beginning of current rra */
2011 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
2012 rrd_set_error("seek error in rrd");
2016 fprintf(stderr, " -- Wraparound Postseek %ld\n",
2019 *rra_current = rra_start;
2021 if (!skip_update[rra_idx]) {
2022 if (*pcdp_summary != NULL) {
2023 rra_time = (current_time - current_time
2024 % (rrd->rra_def[rra_idx].pdp_cnt *
2025 rrd->stat_head->pdp_step))
2027 ((rra_step_cnt[rra_idx] -
2028 2) * rrd->rra_def[rra_idx].pdp_cnt *
2029 rrd->stat_head->pdp_step);
2031 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2032 CDP_secondary_val, pcdp_summary,
2038 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2039 sizeof(rrd_value_t);
2046 * Write out one row of values (one value per DS) to the archive.
2048 * Returns 0 on success, -1 on error.
2050 static int write_RRA_row(
2051 rrd_file_t *rrd_file,
2053 unsigned long rra_idx,
2054 unsigned long *rra_current,
2055 unsigned short CDP_scratch_idx,
2056 rrd_info_t ** pcdp_summary,
2059 unsigned long ds_idx, cdp_idx;
2062 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2063 /* compute the cdp index */
2064 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2066 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2067 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2068 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2070 if (*pcdp_summary != NULL) {
2071 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2072 /* append info to the return hash */
2073 *pcdp_summary = rrd_info_push(*pcdp_summary,
2075 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2076 rrd->rra_def[rra_idx].cf_nam,
2077 rrd->rra_def[rra_idx].pdp_cnt,
2078 rrd->ds_def[ds_idx].ds_nam),
2081 if (rrd_write(rrd_file,
2082 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2083 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2084 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2087 *rra_current += sizeof(rrd_value_t);
2093 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2095 * Returns 0 on success, -1 otherwise
2097 static int smooth_all_rras(
2099 rrd_file_t *rrd_file,
2100 unsigned long rra_begin)
2102 unsigned long rra_start = rra_begin;
2103 unsigned long rra_idx;
2105 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2106 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2107 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2109 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2111 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2112 if (rrd_test_error())
2115 rra_start += rrd->rra_def[rra_idx].row_cnt
2116 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2123 * Flush changes to disk (unless we're using mmap)
2125 * Returns 0 on success, -1 otherwise
2127 static int write_changes_to_disk(
2129 rrd_file_t *rrd_file,
2132 /* we just need to write back the live header portion now */
2133 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2134 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2135 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2137 rrd_set_error("seek rrd for live header writeback");
2141 if (rrd_write(rrd_file, rrd->live_head,
2142 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2143 rrd_set_error("rrd_write live_head to rrd");
2147 if (rrd_write(rrd_file, rrd->legacy_last_up,
2148 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2149 rrd_set_error("rrd_write live_head to rrd");
2155 if (rrd_write(rrd_file, rrd->pdp_prep,
2156 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2157 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2158 rrd_set_error("rrd_write pdp_prep to rrd");
2162 if (rrd_write(rrd_file, rrd->cdp_prep,
2163 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2164 rrd->stat_head->ds_cnt)
2165 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2166 rrd->stat_head->ds_cnt)) {
2168 rrd_set_error("rrd_write cdp_prep to rrd");
2172 if (rrd_write(rrd_file, rrd->rra_ptr,
2173 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2174 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2175 rrd_set_error("rrd_write rra_ptr to rrd");