2 /*****************************************************************************
3 * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 unsigned long *rra_current,
102 time_t *current_time,
103 unsigned long *current_time_usec,
104 rrd_value_t *pdp_temp,
105 rrd_value_t *pdp_new,
106 unsigned long *rra_step_cnt,
109 unsigned long tmpl_cnt,
110 rrd_info_t ** pcdp_summary,
112 unsigned long *skip_update,
113 int *schedule_smooth);
120 unsigned long tmpl_cnt,
121 time_t *current_time,
122 unsigned long *current_time_usec,
125 static int get_time_from_reading(
129 time_t *current_time,
130 unsigned long *current_time_usec,
133 static int update_pdp_prep(
136 rrd_value_t *pdp_new,
139 static int calculate_elapsed_steps(
141 unsigned long current_time,
142 unsigned long current_time_usec,
146 unsigned long *proc_pdp_cnt);
148 static void simple_update(
151 rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
158 unsigned long elapsed_pdp_st,
159 rrd_value_t *pdp_new,
160 rrd_value_t *pdp_temp);
162 static int process_pdp_st(
164 unsigned long ds_idx,
169 rrd_value_t *pdp_new,
170 rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
174 unsigned long *rra_step_cnt,
175 unsigned long rra_begin,
176 rrd_file_t *rrd_file,
177 unsigned long elapsed_pdp_st,
178 unsigned long proc_pdp_cnt,
179 rrd_value_t **last_seasonal_coef,
180 rrd_value_t **seasonal_coef,
181 rrd_value_t *pdp_temp,
182 unsigned long *rra_current,
183 unsigned long *skip_update,
184 int *schedule_smooth);
186 static int do_schedule_smooth(
188 unsigned long rra_idx,
189 unsigned long elapsed_pdp_st);
191 static int update_cdp_prep(
193 unsigned long elapsed_pdp_st,
194 unsigned long start_pdp_offset,
195 unsigned long *rra_step_cnt,
197 rrd_value_t *pdp_temp,
198 rrd_value_t *last_seasonal_coef,
199 rrd_value_t *seasonal_coef,
202 static void update_cdp(
205 rrd_value_t pdp_temp_val,
206 unsigned long rra_step_cnt,
207 unsigned long elapsed_pdp_st,
208 unsigned long start_pdp_offset,
209 unsigned long pdp_cnt,
214 static void initialize_cdp_val(
217 rrd_value_t pdp_temp_val,
218 unsigned long elapsed_pdp_st,
219 unsigned long start_pdp_offset,
220 unsigned long pdp_cnt);
222 static void reset_cdp(
224 unsigned long elapsed_pdp_st,
225 rrd_value_t *pdp_temp,
226 rrd_value_t *last_seasonal_coef,
227 rrd_value_t *seasonal_coef,
231 enum cf_en current_cf);
233 static rrd_value_t initialize_average_carry_over(
234 rrd_value_t pdp_temp_val,
235 unsigned long elapsed_pdp_st,
236 unsigned long start_pdp_offset,
237 unsigned long pdp_cnt);
239 static rrd_value_t calculate_cdp_val(
241 rrd_value_t pdp_temp_val,
242 unsigned long elapsed_pdp_st,
247 static int update_aberrant_cdps(
249 rrd_file_t *rrd_file,
250 unsigned long rra_begin,
251 unsigned long *rra_current,
252 unsigned long elapsed_pdp_st,
253 rrd_value_t *pdp_temp,
254 rrd_value_t **seasonal_coef);
256 static int write_to_rras(
258 rrd_file_t *rrd_file,
259 unsigned long *rra_step_cnt,
260 unsigned long rra_begin,
261 unsigned long *rra_current,
263 unsigned long *skip_update,
264 rrd_info_t ** pcdp_summary);
266 static int write_RRA_row(
267 rrd_file_t *rrd_file,
269 unsigned long rra_idx,
270 unsigned long *rra_current,
271 unsigned short CDP_scratch_idx,
272 rrd_info_t ** pcdp_summary,
275 static int smooth_all_rras(
277 rrd_file_t *rrd_file,
278 unsigned long rra_begin);
281 static int write_changes_to_disk(
283 rrd_file_t *rrd_file,
288 * normalize time as returned by gettimeofday. usec part must
291 static inline void normalize_time(
294 if (t->tv_usec < 0) {
301 * Sets current_time and current_time_usec based on the current time.
302 * current_time_usec is set to 0 if the version number is 1 or 2.
304 static inline void initialize_time(
305 time_t *current_time,
306 unsigned long *current_time_usec,
309 struct timeval tmp_time; /* used for time conversion */
311 gettimeofday(&tmp_time, 0);
312 normalize_time(&tmp_time);
313 *current_time = tmp_time.tv_sec;
315 *current_time_usec = tmp_time.tv_usec;
317 *current_time_usec = 0;
321 static int send_values_to_daemon (const char *addr, const char *file,
322 int values_num, const char * const *values, int silent)
326 status = rrdc_connect ((addr != NULL) ? addr : RRDD_SOCK_PATH);
331 rrd_set_error("Unable to connect to daemon: %s",
334 : rrd_strerror (status));
339 status = rrdc_update (file, values_num, values);
344 rrd_set_error("Failed sending the values to the daemon: %s",
347 : rrd_strerror (status));
355 } /* int send_values_to_daemon */
357 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
359 rrd_info_t *rrd_update_v(
364 rrd_info_t *result = NULL;
366 struct option long_options[] = {
367 {"template", required_argument, 0, 't'},
373 opterr = 0; /* initialize getopt */
376 int option_index = 0;
379 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
390 rrd_set_error("unknown option '%s'", argv[optind - 1]);
395 /* need at least 2 arguments: filename, data. */
396 if (argc - optind < 2) {
397 rrd_set_error("Not enough arguments");
401 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
402 rc.u_int = _rrd_update(argv[optind], tmplt,
404 (const char **) (argv + optind + 1), result);
405 result->value.u_int = rc.u_int;
414 struct option long_options[] = {
415 {"template", required_argument, 0, 't'},
416 {"cache", optional_argument, 0, 'c'},
417 {"nocache", no_argument , 0, 'n'},
418 {"daemon", required_argument, 0, 'd'},
421 int option_index = 0;
425 /* force_cache: 0 = default; -1 = force no cache; 1 = force cache */
427 char *daemon_address = NULL;
430 opterr = 0; /* initialize getopt */
433 opt = getopt_long(argc, argv, "t:cnd:", long_options, &option_index);
440 tmplt = strdup(optarg);
446 if (strcmp ("no", optarg) == 0)
448 else if (strcmp ("yes", optarg) == 0)
452 rrd_set_error ("option 'cache' must be "
453 "either 'yes' or 'no'.");
466 if (daemon_address != NULL)
467 free (daemon_address);
468 daemon_address = strdup (optarg);
469 if (daemon_address == NULL)
471 rrd_set_error("strdup failed.");
477 rrd_set_error("unknown option '%s'", argv[optind - 1]);
482 /* need at least 2 arguments: filename, data. */
483 if (argc - optind < 2) {
484 rrd_set_error("Not enough arguments");
488 if ((tmplt != NULL) && (force_cache > 0))
490 rrd_set_error("The caching daemon cannot be used together with "
495 if ((tmplt == NULL) && (force_cache >= 0))
499 status = send_values_to_daemon (daemon_address,
500 /* file = */ argv[optind],
501 /* values_num = */ argc - optind - 1,
502 /* values = */ (void *) (argv + optind + 1),
503 /* silent = */ (force_cache > 0) ? 0 : 1);
504 if ((status == 0) || (force_cache > 0))
506 } /* if ((tmplt != NULL) && (force_cache >= 0)) */
508 rc = rrd_update_r(argv[optind], tmplt,
509 argc - optind - 1, (const char **) (argv + optind + 1));
516 if (daemon_address != NULL)
518 free (daemon_address);
519 daemon_address = NULL;
525 const char *filename,
530 return _rrd_update(filename, tmplt, argc, argv, NULL);
534 const char *filename,
538 rrd_info_t * pcdp_summary)
543 unsigned long rra_begin; /* byte pointer to the rra
544 * area in the rrd file. this
545 * pointer never changes value */
546 unsigned long rra_current; /* byte pointer to the current write
547 * spot in the rrd file. */
548 rrd_value_t *pdp_new; /* prepare the incoming data to be added
549 * to the existing entry */
550 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
551 * to the cdp values */
553 long *tmpl_idx; /* index representing the settings
554 * transported by the tmplt index */
555 unsigned long tmpl_cnt = 2; /* time and data */
557 time_t current_time = 0;
558 unsigned long current_time_usec = 0; /* microseconds part of current time */
560 int schedule_smooth = 0;
562 /* number of elapsed PDP steps since last update */
563 unsigned long *rra_step_cnt = NULL;
565 int version; /* rrd version */
566 rrd_file_t *rrd_file;
567 char *arg_copy; /* for processing the argv */
568 unsigned long *skip_update; /* RRAs to advance but not write */
570 /* need at least 1 arguments: data. */
572 rrd_set_error("Not enough arguments");
576 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
579 /* We are now at the beginning of the rra's */
580 rra_current = rra_begin = rrd_file->header_len;
582 version = atoi(rrd.stat_head->version);
584 initialize_time(¤t_time, ¤t_time_usec, version);
586 /* get exclusive lock to whole file.
587 * lock gets removed when we close the file.
589 if (rrd_lock(rrd_file) != 0) {
590 rrd_set_error("could not lock RRD");
594 if (allocate_data_structures(&rrd, &updvals,
595 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
596 &rra_step_cnt, &skip_update,
601 /* loop through the arguments. */
602 for (arg_i = 0; arg_i < argc; arg_i++) {
603 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
604 rrd_set_error("failed duplication argv entry");
607 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
608 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
609 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
610 &pcdp_summary, version, skip_update,
611 &schedule_smooth) == -1) {
620 /* if we got here and if there is an error and if the file has not been
621 * written to, then close things up and return. */
622 if (rrd_test_error()) {
623 goto err_free_structures;
626 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
627 goto err_free_structures;
631 /* calling the smoothing code here guarantees at most one smoothing
632 * operation per rrd_update call. Unfortunately, it is possible with bulk
633 * updates, or a long-delayed update for smoothing to occur off-schedule.
634 * This really isn't critical except during the burn-in cycles. */
635 if (schedule_smooth) {
636 smooth_all_rras(&rrd, rrd_file, rra_begin);
639 /* rrd_dontneed(rrd_file,&rrd); */
665 * get exclusive lock to whole file.
666 * lock gets removed when we close the file
668 * returns 0 on success
676 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
679 if (_fstat(file->fd, &st) == 0) {
680 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
687 lock.l_type = F_WRLCK; /* exclusive write lock */
688 lock.l_len = 0; /* whole file */
689 lock.l_start = 0; /* start of file */
690 lock.l_whence = SEEK_SET; /* end of file */
692 rcstat = fcntl(file->fd, F_SETLK, &lock);
700 * Allocate some important arrays used, and initialize the template.
702 * When it returns, either all of the structures are allocated
703 * or none of them are.
705 * Returns 0 on success, -1 on error.
707 static int allocate_data_structures(
710 rrd_value_t **pdp_temp,
713 unsigned long *tmpl_cnt,
714 unsigned long **rra_step_cnt,
715 unsigned long **skip_update,
716 rrd_value_t **pdp_new)
719 if ((*updvals = (char **) malloc(sizeof(char *)
720 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
721 rrd_set_error("allocating updvals pointer array.");
724 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
725 * rrd->stat_head->ds_cnt)) ==
727 rrd_set_error("allocating pdp_temp.");
728 goto err_free_updvals;
730 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
732 rrd->stat_head->rra_cnt)) ==
734 rrd_set_error("allocating skip_update.");
735 goto err_free_pdp_temp;
737 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
738 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
739 rrd_set_error("allocating tmpl_idx.");
740 goto err_free_skip_update;
742 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
745 rra_cnt))) == NULL) {
746 rrd_set_error("allocating rra_step_cnt.");
747 goto err_free_tmpl_idx;
750 /* initialize tmplt redirector */
751 /* default config example (assume DS 1 is a CDEF DS)
752 tmpl_idx[0] -> 0; (time)
753 tmpl_idx[1] -> 1; (DS 0)
754 tmpl_idx[2] -> 3; (DS 2)
755 tmpl_idx[3] -> 4; (DS 3) */
756 (*tmpl_idx)[0] = 0; /* time */
757 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
758 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
759 (*tmpl_idx)[ii++] = i;
764 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
765 goto err_free_rra_step_cnt;
769 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
770 * rrd->stat_head->ds_cnt)) == NULL) {
771 rrd_set_error("allocating pdp_new.");
772 goto err_free_rra_step_cnt;
777 err_free_rra_step_cnt:
781 err_free_skip_update:
791 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
793 * Returns 0 on success.
795 static int parse_template(
798 unsigned long *tmpl_cnt,
801 char *dsname, *tmplt_copy;
802 unsigned int tmpl_len, i;
805 *tmpl_cnt = 1; /* the first entry is the time */
807 /* we should work on a writeable copy here */
808 if ((tmplt_copy = strdup(tmplt)) == NULL) {
809 rrd_set_error("error copying tmplt '%s'", tmplt);
815 tmpl_len = strlen(tmplt_copy);
816 for (i = 0; i <= tmpl_len; i++) {
817 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
818 tmplt_copy[i] = '\0';
819 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
820 rrd_set_error("tmplt contains more DS definitions than RRD");
822 goto out_free_tmpl_copy;
824 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
825 rrd_set_error("unknown DS name '%s'", dsname);
827 goto out_free_tmpl_copy;
829 /* go to the next entry on the tmplt_copy */
831 dsname = &tmplt_copy[i + 1];
841 * Parse an update string, updates the primary data points (PDPs)
842 * and consolidated data points (CDPs), and writes changes to the RRAs.
844 * Returns 0 on success, -1 on error.
846 static int process_arg(
849 rrd_file_t *rrd_file,
850 unsigned long rra_begin,
851 unsigned long *rra_current,
852 time_t *current_time,
853 unsigned long *current_time_usec,
854 rrd_value_t *pdp_temp,
855 rrd_value_t *pdp_new,
856 unsigned long *rra_step_cnt,
859 unsigned long tmpl_cnt,
860 rrd_info_t ** pcdp_summary,
862 unsigned long *skip_update,
863 int *schedule_smooth)
865 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
867 /* a vector of future Holt-Winters seasonal coefs */
868 unsigned long elapsed_pdp_st;
870 double interval, pre_int, post_int; /* interval between this and
872 unsigned long proc_pdp_cnt;
873 unsigned long rra_start;
875 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
876 current_time, current_time_usec, version) == -1) {
879 /* seek to the beginning of the rra's */
880 if (*rra_current != rra_begin) {
882 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
883 rrd_set_error("seek error in rrd");
887 *rra_current = rra_begin;
889 rra_start = rra_begin;
891 interval = (double) (*current_time - rrd->live_head->last_up)
892 + (double) ((long) *current_time_usec -
893 (long) rrd->live_head->last_up_usec) / 1e6f;
895 /* process the data sources and update the pdp_prep
896 * area accordingly */
897 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
901 elapsed_pdp_st = calculate_elapsed_steps(rrd,
903 *current_time_usec, interval,
907 /* has a pdp_st moment occurred since the last run ? */
908 if (elapsed_pdp_st == 0) {
909 /* no we have not passed a pdp_st moment. therefore update is simple */
910 simple_update(rrd, interval, pdp_new);
912 /* an pdp_st has occurred. */
913 if (process_all_pdp_st(rrd, interval,
915 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
918 if (update_all_cdp_prep(rrd, rra_step_cnt,
924 pdp_temp, rra_current,
925 skip_update, schedule_smooth) == -1) {
926 goto err_free_coefficients;
928 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
929 elapsed_pdp_st, pdp_temp,
930 &seasonal_coef) == -1) {
931 goto err_free_coefficients;
933 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
934 rra_current, *current_time, skip_update,
935 pcdp_summary) == -1) {
936 goto err_free_coefficients;
938 } /* endif a pdp_st has occurred */
939 rrd->live_head->last_up = *current_time;
940 rrd->live_head->last_up_usec = *current_time_usec;
943 *rrd->legacy_last_up = rrd->live_head->last_up;
946 free(last_seasonal_coef);
949 err_free_coefficients:
951 free(last_seasonal_coef);
956 * Parse a DS string (time + colon-separated values), storing the
957 * results in current_time, current_time_usec, and updvals.
959 * Returns 0 on success, -1 on error.
966 unsigned long tmpl_cnt,
967 time_t *current_time,
968 unsigned long *current_time_usec,
976 /* initialize all ds input to unknown except the first one
977 which has always got to be set */
978 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
981 /* separate all ds elements; first must be examined separately
982 due to alternate time syntax */
983 if ((p = strchr(input, '@')) != NULL) {
985 } else if ((p = strchr(input, ':')) != NULL) {
988 rrd_set_error("expected timestamp not found in data source from %s",
994 updvals[tmpl_idx[i++]] = p + 1;
999 updvals[tmpl_idx[i++]] = p + 1;
1004 if (i != tmpl_cnt) {
1005 rrd_set_error("expected %lu data source readings (got %lu) from %s",
1006 tmpl_cnt - 1, i, input);
1010 if (get_time_from_reading(rrd, timesyntax, updvals,
1011 current_time, current_time_usec,
1019 * Parse the time in a DS string, store it in current_time and
1020 * current_time_usec and verify that it's later than the last
1021 * update for this DS.
1023 * Returns 0 on success, -1 on error.
1025 static int get_time_from_reading(
1029 time_t *current_time,
1030 unsigned long *current_time_usec,
1034 char *parsetime_error = NULL;
1036 rrd_time_value_t ds_tv;
1037 struct timeval tmp_time; /* used for time conversion */
1039 /* get the time from the reading ... handle N */
1040 if (timesyntax == '@') { /* at-style */
1041 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1042 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1045 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1046 ds_tv.type == RELATIVE_TO_START_TIME) {
1047 rrd_set_error("specifying time relative to the 'start' "
1048 "or 'end' makes no sense here: %s", updvals[0]);
1051 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1052 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1053 } else if (strcmp(updvals[0], "N") == 0) {
1054 gettimeofday(&tmp_time, 0);
1055 normalize_time(&tmp_time);
1056 *current_time = tmp_time.tv_sec;
1057 *current_time_usec = tmp_time.tv_usec;
1059 old_locale = setlocale(LC_NUMERIC, "C");
1060 tmp = strtod(updvals[0], 0);
1061 setlocale(LC_NUMERIC, old_locale);
1062 *current_time = floor(tmp);
1063 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1065 /* dont do any correction for old version RRDs */
1067 *current_time_usec = 0;
1069 if (*current_time < rrd->live_head->last_up ||
1070 (*current_time == rrd->live_head->last_up &&
1071 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1072 rrd_set_error("illegal attempt to update using time %ld when "
1073 "last update time is %ld (minimum one second step)",
1074 *current_time, rrd->live_head->last_up);
1081 * Update pdp_new by interpreting the updvals according to the DS type
1082 * (COUNTER, GAUGE, etc.).
1084 * Returns 0 on success, -1 on error.
1086 static int update_pdp_prep(
1089 rrd_value_t *pdp_new,
1092 unsigned long ds_idx;
1094 char *endptr; /* used in the conversion */
1097 enum dst_en dst_idx;
1099 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1100 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1102 /* make sure we do not build diffs with old last_ds values */
1103 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1104 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1105 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1108 /* NOTE: DST_CDEF should never enter this if block, because
1109 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1110 * accidently specified a value for the DST_CDEF. To handle this case,
1111 * an extra check is required. */
1113 if ((updvals[ds_idx + 1][0] != 'U') &&
1114 (dst_idx != DST_CDEF) &&
1115 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1118 /* pdp_new contains rate * time ... eg the bytes transferred during
1119 * the interval. Doing it this way saves a lot of math operations
1124 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1125 if ((updvals[ds_idx + 1][ii] < '0'
1126 || updvals[ds_idx + 1][ii] > '9')
1127 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1128 rrd_set_error("not a simple integer: '%s'",
1129 updvals[ds_idx + 1]);
1133 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1135 rrd_diff(updvals[ds_idx + 1],
1136 rrd->pdp_prep[ds_idx].last_ds);
1137 if (dst_idx == DST_COUNTER) {
1138 /* simple overflow catcher. This will fail
1139 * terribly for non 32 or 64 bit counters
1140 * ... are there any others in SNMP land?
1142 if (pdp_new[ds_idx] < (double) 0.0)
1143 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1144 if (pdp_new[ds_idx] < (double) 0.0)
1145 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1147 rate = pdp_new[ds_idx] / interval;
1149 pdp_new[ds_idx] = DNAN;
1153 old_locale = setlocale(LC_NUMERIC, "C");
1155 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1156 setlocale(LC_NUMERIC, old_locale);
1158 rrd_set_error("converting '%s' to float: %s",
1159 updvals[ds_idx + 1], rrd_strerror(errno));
1162 if (endptr[0] != '\0') {
1164 ("conversion of '%s' to float not complete: tail '%s'",
1165 updvals[ds_idx + 1], endptr);
1168 rate = pdp_new[ds_idx] / interval;
1172 old_locale = setlocale(LC_NUMERIC, "C");
1174 strtod(updvals[ds_idx + 1], &endptr) * interval;
1175 setlocale(LC_NUMERIC, old_locale);
1177 rrd_set_error("converting '%s' to float: %s",
1178 updvals[ds_idx + 1], rrd_strerror(errno));
1181 if (endptr[0] != '\0') {
1183 ("conversion of '%s' to float not complete: tail '%s'",
1184 updvals[ds_idx + 1], endptr);
1187 rate = pdp_new[ds_idx] / interval;
1190 rrd_set_error("rrd contains unknown DS type : '%s'",
1191 rrd->ds_def[ds_idx].dst);
1194 /* break out of this for loop if the error string is set */
1195 if (rrd_test_error()) {
1198 /* make sure pdp_temp is neither too large or too small
1199 * if any of these occur it becomes unknown ...
1200 * sorry folks ... */
1202 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1203 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1204 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1205 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1206 pdp_new[ds_idx] = DNAN;
1209 /* no news is news all the same */
1210 pdp_new[ds_idx] = DNAN;
1214 /* make a copy of the command line argument for the next run */
1216 fprintf(stderr, "prep ds[%lu]\t"
1220 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1223 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1225 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1231 * How many PDP steps have elapsed since the last update? Returns the answer,
1232 * and stores the time between the last update and the last PDP in pre_time,
1233 * and the time between the last PDP and the current time in post_int.
1235 static int calculate_elapsed_steps(
1237 unsigned long current_time,
1238 unsigned long current_time_usec,
1242 unsigned long *proc_pdp_cnt)
1244 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1245 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1247 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1248 * when it was last updated */
1249 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1251 /* when was the current pdp started */
1252 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1253 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1255 /* when did the last pdp_st occur */
1256 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1257 occu_pdp_st = current_time - occu_pdp_age;
1259 if (occu_pdp_st > proc_pdp_st) {
1260 /* OK we passed the pdp_st moment */
1261 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1262 * occurred before the latest
1264 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1265 *post_int = occu_pdp_age; /* how much after it */
1266 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1268 *pre_int = interval;
1272 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1275 printf("proc_pdp_age %lu\t"
1277 "occu_pfp_age %lu\t"
1281 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1282 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1285 /* compute the number of elapsed pdp_st moments */
1286 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1290 * Increment the PDP values by the values in pdp_new, or else initialize them.
1292 static void simple_update(
1295 rrd_value_t *pdp_new)
1299 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1300 if (isnan(pdp_new[i])) {
1301 /* this is not really accurate if we use subsecond data arrival time
1302 should have thought of it when going subsecond resolution ...
1303 sorry next format change we will have it! */
1304 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1307 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1308 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1310 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1319 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1320 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1326 * Call process_pdp_st for each DS.
1328 * Returns 0 on success, -1 on error.
1330 static int process_all_pdp_st(
1335 unsigned long elapsed_pdp_st,
1336 rrd_value_t *pdp_new,
1337 rrd_value_t *pdp_temp)
1339 unsigned long ds_idx;
1341 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1342 rate*seconds which occurred up to the last run.
1343 pdp_new[] contains rate*seconds from the latest run.
1344 pdp_temp[] will contain the rate for cdp */
1346 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1347 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1348 elapsed_pdp_st * rrd->stat_head->pdp_step,
1349 pdp_new, pdp_temp) == -1) {
1353 fprintf(stderr, "PDP UPD ds[%lu]\t"
1354 "elapsed_pdp_st %lu\t"
1357 "new_unkn_sec %5lu\n",
1361 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1362 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1369 * Process an update that occurs after one of the PDP moments.
1370 * Increments the PDP value, sets NAN if time greater than the
1371 * heartbeats have elapsed, processes CDEFs.
1373 * Returns 0 on success, -1 on error.
1375 static int process_pdp_st(
1377 unsigned long ds_idx,
1381 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1382 rrd_value_t *pdp_new,
1383 rrd_value_t *pdp_temp)
1387 /* update pdp_prep to the current pdp_st. */
1388 double pre_unknown = 0.0;
1389 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1390 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1392 rpnstack_t rpnstack; /* used for COMPUTE DS */
1394 rpnstack_init(&rpnstack);
1397 if (isnan(pdp_new[ds_idx])) {
1398 /* a final bit of unknown to be added before calculation
1399 we use a temporary variable for this so that we
1400 don't have to turn integer lines before using the value */
1401 pre_unknown = pre_int;
1403 if (isnan(scratch[PDP_val].u_val)) {
1404 scratch[PDP_val].u_val = 0;
1406 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1409 /* if too much of the pdp_prep is unknown we dump it */
1410 /* if the interval is larger thatn mrhb we get NAN */
1411 if ((interval > mrhb) ||
1412 (rrd->stat_head->pdp_step / 2.0 <
1413 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1414 pdp_temp[ds_idx] = DNAN;
1416 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1417 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1421 /* process CDEF data sources; remember each CDEF DS can
1422 * only reference other DS with a lower index number */
1423 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1427 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1428 /* substitute data values for OP_VARIABLE nodes */
1429 for (i = 0; rpnp[i].op != OP_END; i++) {
1430 if (rpnp[i].op == OP_VARIABLE) {
1431 rpnp[i].op = OP_NUMBER;
1432 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1435 /* run the rpn calculator */
1436 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1438 rpnstack_free(&rpnstack);
1443 /* make pdp_prep ready for the next run */
1444 if (isnan(pdp_new[ds_idx])) {
1445 /* this is not realy accurate if we use subsecond data arival time
1446 should have thought of it when going subsecond resolution ...
1447 sorry next format change we will have it! */
1448 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1449 scratch[PDP_val].u_val = DNAN;
1451 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1452 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1454 rpnstack_free(&rpnstack);
1459 * Iterate over all the RRAs for a given DS and:
1460 * 1. Decide whether to schedule a smooth later
1461 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1464 * Returns 0 on success, -1 on error
1466 static int update_all_cdp_prep(
1468 unsigned long *rra_step_cnt,
1469 unsigned long rra_begin,
1470 rrd_file_t *rrd_file,
1471 unsigned long elapsed_pdp_st,
1472 unsigned long proc_pdp_cnt,
1473 rrd_value_t **last_seasonal_coef,
1474 rrd_value_t **seasonal_coef,
1475 rrd_value_t *pdp_temp,
1476 unsigned long *rra_current,
1477 unsigned long *skip_update,
1478 int *schedule_smooth)
1480 unsigned long rra_idx;
1482 /* index into the CDP scratch array */
1483 enum cf_en current_cf;
1484 unsigned long rra_start;
1486 /* number of rows to be updated in an RRA for a data value. */
1487 unsigned long start_pdp_offset;
1489 rra_start = rra_begin;
1490 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1491 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1493 rrd->rra_def[rra_idx].pdp_cnt -
1494 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1495 skip_update[rra_idx] = 0;
1496 if (start_pdp_offset <= elapsed_pdp_st) {
1497 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1498 rrd->rra_def[rra_idx].pdp_cnt + 1;
1500 rra_step_cnt[rra_idx] = 0;
1503 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1504 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1505 * so that they will be correct for the next observed value; note that for
1506 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1507 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1508 if (rra_step_cnt[rra_idx] > 1) {
1509 skip_update[rra_idx] = 1;
1510 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1511 elapsed_pdp_st, last_seasonal_coef);
1512 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1513 elapsed_pdp_st + 1, seasonal_coef);
1515 /* periodically run a smoother for seasonal effects */
1516 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1519 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1520 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1521 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1524 *schedule_smooth = 1;
1526 *rra_current = rrd_tell(rrd_file);
1528 if (rrd_test_error())
1532 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1533 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1534 current_cf) == -1) {
1538 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1539 sizeof(rrd_value_t);
1545 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1547 static int do_schedule_smooth(
1549 unsigned long rra_idx,
1550 unsigned long elapsed_pdp_st)
1552 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1553 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1554 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1555 unsigned long seasonal_smooth_idx =
1556 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1557 unsigned long *init_seasonal =
1558 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1560 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1561 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1562 * really an RRA level, not a data source within RRA level parameter, but
1563 * the rra_def is read only for rrd_update (not flushed to disk). */
1564 if (*init_seasonal > BURNIN_CYCLES) {
1565 /* someone has no doubt invented a trick to deal with this wrap around,
1566 * but at least this code is clear. */
1567 if (seasonal_smooth_idx > cur_row) {
1568 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1569 * between PDP and CDP */
1570 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1572 /* can't rely on negative numbers because we are working with
1573 * unsigned values */
1574 return (cur_row + elapsed_pdp_st >= row_cnt
1575 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1577 /* mark off one of the burn-in cycles */
1578 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1582 * For a given RRA, iterate over the data sources and call the appropriate
1583 * consolidation function.
1585 * Returns 0 on success, -1 on error.
1587 static int update_cdp_prep(
1589 unsigned long elapsed_pdp_st,
1590 unsigned long start_pdp_offset,
1591 unsigned long *rra_step_cnt,
1593 rrd_value_t *pdp_temp,
1594 rrd_value_t *last_seasonal_coef,
1595 rrd_value_t *seasonal_coef,
1598 unsigned long ds_idx, cdp_idx;
1600 /* update CDP_PREP areas */
1601 /* loop over data soures within each RRA */
1602 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1604 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1606 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1607 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1608 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1609 elapsed_pdp_st, start_pdp_offset,
1610 rrd->rra_def[rra_idx].pdp_cnt,
1611 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1614 /* Nothing to consolidate if there's one PDP per CDP. However, if
1615 * we've missed some PDPs, let's update null counters etc. */
1616 if (elapsed_pdp_st > 2) {
1617 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1618 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1623 if (rrd_test_error())
1625 } /* endif data sources loop */
1630 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1631 * primary value, secondary value, and # of unknowns.
1633 static void update_cdp(
1636 rrd_value_t pdp_temp_val,
1637 unsigned long rra_step_cnt,
1638 unsigned long elapsed_pdp_st,
1639 unsigned long start_pdp_offset,
1640 unsigned long pdp_cnt,
1645 /* shorthand variables */
1646 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1647 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1648 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1649 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1652 /* If we are in this block, as least 1 CDP value will be written to
1653 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1654 * to be written, then the "fill in" value is the CDP_secondary_val
1656 if (isnan(pdp_temp_val)) {
1657 *cdp_unkn_pdp_cnt += start_pdp_offset;
1658 *cdp_secondary_val = DNAN;
1660 /* CDP_secondary value is the RRA "fill in" value for intermediary
1661 * CDP data entries. No matter the CF, the value is the same because
1662 * the average, max, min, and last of a list of identical values is
1663 * the same, namely, the value itself. */
1664 *cdp_secondary_val = pdp_temp_val;
1667 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1668 *cdp_primary_val = DNAN;
1669 if (current_cf == CF_AVERAGE) {
1671 initialize_average_carry_over(pdp_temp_val,
1673 start_pdp_offset, pdp_cnt);
1675 *cdp_val = pdp_temp_val;
1678 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1679 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1680 } /* endif meets xff value requirement for a valid value */
1681 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1682 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1683 if (isnan(pdp_temp_val))
1684 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1686 *cdp_unkn_pdp_cnt = 0;
1687 } else { /* rra_step_cnt[i] == 0 */
1690 if (isnan(*cdp_val)) {
1691 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1694 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1698 if (isnan(pdp_temp_val)) {
1699 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1702 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1709 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1710 * on the type of consolidation function.
1712 static void initialize_cdp_val(
1715 rrd_value_t pdp_temp_val,
1716 unsigned long elapsed_pdp_st,
1717 unsigned long start_pdp_offset,
1718 unsigned long pdp_cnt)
1720 rrd_value_t cum_val, cur_val;
1722 switch (current_cf) {
1724 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1725 cur_val = IFDNAN(pdp_temp_val, 0.0);
1726 scratch[CDP_primary_val].u_val =
1727 (cum_val + cur_val * start_pdp_offset) /
1728 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1729 scratch[CDP_val].u_val =
1730 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1731 start_pdp_offset, pdp_cnt);
1734 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1735 cur_val = IFDNAN(pdp_temp_val, -DINF);
1738 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1740 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1746 if (cur_val > cum_val)
1747 scratch[CDP_primary_val].u_val = cur_val;
1749 scratch[CDP_primary_val].u_val = cum_val;
1750 /* initialize carry over value */
1751 scratch[CDP_val].u_val = pdp_temp_val;
1754 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1755 cur_val = IFDNAN(pdp_temp_val, DINF);
1758 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1760 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1766 if (cur_val < cum_val)
1767 scratch[CDP_primary_val].u_val = cur_val;
1769 scratch[CDP_primary_val].u_val = cum_val;
1770 /* initialize carry over value */
1771 scratch[CDP_val].u_val = pdp_temp_val;
1775 scratch[CDP_primary_val].u_val = pdp_temp_val;
1776 /* initialize carry over value */
1777 scratch[CDP_val].u_val = pdp_temp_val;
1783 * Update the consolidation function for Holt-Winters functions as
1784 * well as other functions that don't actually consolidate multiple
1787 static void reset_cdp(
1789 unsigned long elapsed_pdp_st,
1790 rrd_value_t *pdp_temp,
1791 rrd_value_t *last_seasonal_coef,
1792 rrd_value_t *seasonal_coef,
1796 enum cf_en current_cf)
1798 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1800 switch (current_cf) {
1803 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1804 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1807 case CF_DEVSEASONAL:
1808 /* need to update cached seasonal values, so they are consistent
1809 * with the bulk update */
1810 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1811 * CDP_last_deviation are the same. */
1812 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1813 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1817 /* need to update the null_count and last_null_count.
1818 * even do this for non-DNAN pdp_temp because the
1819 * algorithm is not learning from batch updates. */
1820 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1821 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1824 scratch[CDP_primary_val].u_val = DNAN;
1825 scratch[CDP_secondary_val].u_val = DNAN;
1828 /* do not count missed bulk values as failures */
1829 scratch[CDP_primary_val].u_val = 0;
1830 scratch[CDP_secondary_val].u_val = 0;
1831 /* need to reset violations buffer.
1832 * could do this more carefully, but for now, just
1833 * assume a bulk update wipes away all violations. */
1834 erase_violations(rrd, cdp_idx, rra_idx);
1839 static rrd_value_t initialize_average_carry_over(
1840 rrd_value_t pdp_temp_val,
1841 unsigned long elapsed_pdp_st,
1842 unsigned long start_pdp_offset,
1843 unsigned long pdp_cnt)
1845 /* initialize carry over value */
1846 if (isnan(pdp_temp_val)) {
1849 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1853 * Update or initialize a CDP value based on the consolidation
1856 * Returns the new value.
1858 static rrd_value_t calculate_cdp_val(
1859 rrd_value_t cdp_val,
1860 rrd_value_t pdp_temp_val,
1861 unsigned long elapsed_pdp_st,
1872 if (isnan(cdp_val)) {
1873 if (current_cf == CF_AVERAGE) {
1874 pdp_temp_val *= elapsed_pdp_st;
1877 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1878 i, ii, pdp_temp_val);
1880 return pdp_temp_val;
1882 if (current_cf == CF_AVERAGE)
1883 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1884 if (current_cf == CF_MINIMUM)
1885 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1886 if (current_cf == CF_MAXIMUM)
1887 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1889 return pdp_temp_val;
1893 * For each RRA, update the seasonal values and then call update_aberrant_CF
1894 * for each data source.
1896 * Return 0 on success, -1 on error.
1898 static int update_aberrant_cdps(
1900 rrd_file_t *rrd_file,
1901 unsigned long rra_begin,
1902 unsigned long *rra_current,
1903 unsigned long elapsed_pdp_st,
1904 rrd_value_t *pdp_temp,
1905 rrd_value_t **seasonal_coef)
1907 unsigned long rra_idx, ds_idx, j;
1909 /* number of PDP steps since the last update that
1910 * are assigned to the first CDP to be generated
1911 * since the last update. */
1912 unsigned short scratch_idx;
1913 unsigned long rra_start;
1914 enum cf_en current_cf;
1916 /* this loop is only entered if elapsed_pdp_st < 3 */
1917 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1918 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1919 rra_start = rra_begin;
1920 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1921 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1922 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1923 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1924 if (scratch_idx == CDP_primary_val) {
1925 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1926 elapsed_pdp_st + 1, seasonal_coef);
1928 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1929 elapsed_pdp_st + 2, seasonal_coef);
1931 *rra_current = rrd_tell(rrd_file);
1933 if (rrd_test_error())
1935 /* loop over data soures within each RRA */
1936 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1937 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1938 rra_idx * (rrd->stat_head->ds_cnt) +
1939 ds_idx, rra_idx, ds_idx, scratch_idx,
1943 rra_start += rrd->rra_def[rra_idx].row_cnt
1944 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1951 * Move sequentially through the file, writing one RRA at a time. Note this
1952 * architecture divorces the computation of CDP with flushing updated RRA
1955 * Return 0 on success, -1 on error.
1957 static int write_to_rras(
1959 rrd_file_t *rrd_file,
1960 unsigned long *rra_step_cnt,
1961 unsigned long rra_begin,
1962 unsigned long *rra_current,
1963 time_t current_time,
1964 unsigned long *skip_update,
1965 rrd_info_t ** pcdp_summary)
1967 unsigned long rra_idx;
1968 unsigned long rra_start;
1969 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1970 time_t rra_time = 0; /* time of update for a RRA */
1972 /* Ready to write to disk */
1973 rra_start = rra_begin;
1974 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1975 /* skip unless there's something to write */
1976 if (rra_step_cnt[rra_idx]) {
1977 /* write the first row */
1979 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1981 rrd->rra_ptr[rra_idx].cur_row++;
1982 if (rrd->rra_ptr[rra_idx].cur_row >=
1983 rrd->rra_def[rra_idx].row_cnt)
1984 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1985 /* position on the first row */
1986 rra_pos_tmp = rra_start +
1987 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1988 sizeof(rrd_value_t);
1989 if (rra_pos_tmp != *rra_current) {
1990 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1991 rrd_set_error("seek error in rrd");
1994 *rra_current = rra_pos_tmp;
1997 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1999 if (!skip_update[rra_idx]) {
2000 if (*pcdp_summary != NULL) {
2001 rra_time = (current_time - current_time
2002 % (rrd->rra_def[rra_idx].pdp_cnt *
2003 rrd->stat_head->pdp_step))
2005 ((rra_step_cnt[rra_idx] -
2006 1) * rrd->rra_def[rra_idx].pdp_cnt *
2007 rrd->stat_head->pdp_step);
2010 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
2011 pcdp_summary, rra_time) == -1)
2015 /* write other rows of the bulk update, if any */
2016 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
2017 if (++rrd->rra_ptr[rra_idx].cur_row ==
2018 rrd->rra_def[rra_idx].row_cnt) {
2021 "Wraparound for RRA %s, %lu updates left\n",
2022 rrd->rra_def[rra_idx].cf_nam,
2023 rra_step_cnt[rra_idx] - 1);
2026 rrd->rra_ptr[rra_idx].cur_row = 0;
2027 /* seek back to beginning of current rra */
2028 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
2029 rrd_set_error("seek error in rrd");
2033 fprintf(stderr, " -- Wraparound Postseek %ld\n",
2036 *rra_current = rra_start;
2038 if (!skip_update[rra_idx]) {
2039 if (*pcdp_summary != NULL) {
2040 rra_time = (current_time - current_time
2041 % (rrd->rra_def[rra_idx].pdp_cnt *
2042 rrd->stat_head->pdp_step))
2044 ((rra_step_cnt[rra_idx] -
2045 2) * rrd->rra_def[rra_idx].pdp_cnt *
2046 rrd->stat_head->pdp_step);
2048 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2049 CDP_secondary_val, pcdp_summary,
2055 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2056 sizeof(rrd_value_t);
2063 * Write out one row of values (one value per DS) to the archive.
2065 * Returns 0 on success, -1 on error.
2067 static int write_RRA_row(
2068 rrd_file_t *rrd_file,
2070 unsigned long rra_idx,
2071 unsigned long *rra_current,
2072 unsigned short CDP_scratch_idx,
2073 rrd_info_t ** pcdp_summary,
2076 unsigned long ds_idx, cdp_idx;
2079 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2080 /* compute the cdp index */
2081 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2083 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2084 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2085 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2087 if (*pcdp_summary != NULL) {
2088 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2089 /* append info to the return hash */
2090 *pcdp_summary = rrd_info_push(*pcdp_summary,
2092 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2093 rrd->rra_def[rra_idx].cf_nam,
2094 rrd->rra_def[rra_idx].pdp_cnt,
2095 rrd->ds_def[ds_idx].ds_nam),
2098 if (rrd_write(rrd_file,
2099 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2100 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2101 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2104 *rra_current += sizeof(rrd_value_t);
2110 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2112 * Returns 0 on success, -1 otherwise
2114 static int smooth_all_rras(
2116 rrd_file_t *rrd_file,
2117 unsigned long rra_begin)
2119 unsigned long rra_start = rra_begin;
2120 unsigned long rra_idx;
2122 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2123 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2124 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2126 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2128 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2129 if (rrd_test_error())
2132 rra_start += rrd->rra_def[rra_idx].row_cnt
2133 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2140 * Flush changes to disk (unless we're using mmap)
2142 * Returns 0 on success, -1 otherwise
2144 static int write_changes_to_disk(
2146 rrd_file_t *rrd_file,
2149 /* we just need to write back the live header portion now */
2150 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2151 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2152 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2154 rrd_set_error("seek rrd for live header writeback");
2158 if (rrd_write(rrd_file, rrd->live_head,
2159 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2160 rrd_set_error("rrd_write live_head to rrd");
2164 if (rrd_write(rrd_file, rrd->legacy_last_up,
2165 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2166 rrd_set_error("rrd_write live_head to rrd");
2172 if (rrd_write(rrd_file, rrd->pdp_prep,
2173 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2174 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2175 rrd_set_error("rrd_write pdp_prep to rrd");
2179 if (rrd_write(rrd_file, rrd->cdp_prep,
2180 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2181 rrd->stat_head->ds_cnt)
2182 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2183 rrd->stat_head->ds_cnt)) {
2185 rrd_set_error("rrd_write cdp_prep to rrd");
2189 if (rrd_write(rrd_file, rrd->rra_ptr,
2190 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2191 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2192 rrd_set_error("rrd_write rra_ptr to rrd");