1 /*****************************************************************************
2 * RRDtool 1.4.3 Copyright by Tobi Oetiker, 1997-2010
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
8 *****************************************************************************/
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(
49 struct __timezone *tz)
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
64 /* FUNCTION PROTOTYPES */
78 static int allocate_data_structures(
81 rrd_value_t **pdp_temp,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
92 unsigned long *tmpl_cnt,
95 static int process_arg(
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
110 unsigned long *skip_update,
111 int *schedule_smooth);
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
123 static int get_time_from_reading(
127 time_t *current_time,
128 unsigned long *current_time_usec,
131 static int update_pdp_prep(
134 rrd_value_t *pdp_new,
137 static int calculate_elapsed_steps(
139 unsigned long current_time,
140 unsigned long current_time_usec,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
162 unsigned long ds_idx,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
199 static void update_cdp(
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
211 static void initialize_cdp_val(
214 rrd_value_t pdp_temp_val,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
244 static int update_aberrant_cdps(
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
269 static int smooth_all_rras(
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
275 static int write_changes_to_disk(
277 rrd_file_t *rrd_file,
282 * normalize time as returned by gettimeofday. usec part must
285 static void normalize_time(
288 if (t->tv_usec < 0) {
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
309 *current_time_usec = tmp_time.tv_usec;
311 *current_time_usec = 0;
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
322 rrd_info_t *result = NULL;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
332 opterr = 0; /* initialize getopt */
335 int option_index = 0;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
387 int option_index = 0;
391 char *opt_daemon = NULL;
394 opterr = 0; /* initialize getopt */
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
404 tmplt = strdup(optarg);
408 if (opt_daemon != NULL)
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
413 rrd_set_error("strdup failed.");
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) return status;
435 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437 rrd_set_error("The caching daemon cannot be used together with "
442 if (! rrdc_is_connected(opt_daemon))
444 rc = rrd_update_r(argv[optind], tmplt,
445 argc - optind - 1, (const char **) (argv + optind + 1));
447 else /* we are connected */
449 rc = rrdc_update (argv[optind], /* file */
450 argc - optind - 1, /* values_num */
451 (const char *const *) (argv + optind + 1)); /* values */
453 rrd_set_error("Failed sending the values to rrdcached: %s",
463 if (opt_daemon != NULL)
472 const char *filename,
477 return _rrd_update(filename, tmplt, argc, argv, NULL);
481 const char *filename,
485 rrd_info_t * pcdp_summary)
487 return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
491 const char *filename,
495 rrd_info_t * pcdp_summary)
500 unsigned long rra_begin; /* byte pointer to the rra
501 * area in the rrd file. this
502 * pointer never changes value */
503 rrd_value_t *pdp_new; /* prepare the incoming data to be added
504 * to the existing entry */
505 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
506 * to the cdp values */
508 long *tmpl_idx; /* index representing the settings
509 * transported by the tmplt index */
510 unsigned long tmpl_cnt = 2; /* time and data */
512 time_t current_time = 0;
513 unsigned long current_time_usec = 0; /* microseconds part of current time */
515 int schedule_smooth = 0;
517 /* number of elapsed PDP steps since last update */
518 unsigned long *rra_step_cnt = NULL;
520 int version; /* rrd version */
521 rrd_file_t *rrd_file;
522 char *arg_copy; /* for processing the argv */
523 unsigned long *skip_update; /* RRAs to advance but not write */
525 /* need at least 1 arguments: data. */
527 rrd_set_error("Not enough arguments");
532 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
535 /* We are now at the beginning of the rra's */
536 rra_begin = rrd_file->header_len;
538 version = atoi(rrd.stat_head->version);
540 initialize_time(¤t_time, ¤t_time_usec, version);
542 /* get exclusive lock to whole file.
543 * lock gets removed when we close the file.
545 if (rrd_lock(rrd_file) != 0) {
546 rrd_set_error("could not lock RRD");
550 if (allocate_data_structures(&rrd, &updvals,
551 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552 &rra_step_cnt, &skip_update,
557 /* loop through the arguments. */
558 for (arg_i = 0; arg_i < argc; arg_i++) {
559 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560 rrd_set_error("failed duplication argv entry");
563 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
564 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
565 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566 &pcdp_summary, version, skip_update,
567 &schedule_smooth) == -1) {
568 if (rrd_test_error()) { /* Should have error string always here */
571 /* Prepend file name to error message */
572 if ((save_error = strdup(rrd_get_error())) != NULL) {
573 rrd_set_error("%s: %s", filename, save_error);
585 /* if we got here and if there is an error and if the file has not been
586 * written to, then close things up and return. */
587 if (rrd_test_error()) {
588 goto err_free_structures;
591 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
592 goto err_free_structures;
596 /* calling the smoothing code here guarantees at most one smoothing
597 * operation per rrd_update call. Unfortunately, it is possible with bulk
598 * updates, or a long-delayed update for smoothing to occur off-schedule.
599 * This really isn't critical except during the burn-in cycles. */
600 if (schedule_smooth) {
601 smooth_all_rras(&rrd, rrd_file, rra_begin);
604 /* rrd_dontneed(rrd_file,&rrd); */
630 * Allocate some important arrays used, and initialize the template.
632 * When it returns, either all of the structures are allocated
633 * or none of them are.
635 * Returns 0 on success, -1 on error.
637 static int allocate_data_structures(
640 rrd_value_t **pdp_temp,
643 unsigned long *tmpl_cnt,
644 unsigned long **rra_step_cnt,
645 unsigned long **skip_update,
646 rrd_value_t **pdp_new)
649 if ((*updvals = (char **) malloc(sizeof(char *)
650 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
651 rrd_set_error("allocating updvals pointer array.");
654 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
655 * rrd->stat_head->ds_cnt)) ==
657 rrd_set_error("allocating pdp_temp.");
658 goto err_free_updvals;
660 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
662 rrd->stat_head->rra_cnt)) ==
664 rrd_set_error("allocating skip_update.");
665 goto err_free_pdp_temp;
667 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
668 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
669 rrd_set_error("allocating tmpl_idx.");
670 goto err_free_skip_update;
672 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
675 rra_cnt))) == NULL) {
676 rrd_set_error("allocating rra_step_cnt.");
677 goto err_free_tmpl_idx;
680 /* initialize tmplt redirector */
681 /* default config example (assume DS 1 is a CDEF DS)
682 tmpl_idx[0] -> 0; (time)
683 tmpl_idx[1] -> 1; (DS 0)
684 tmpl_idx[2] -> 3; (DS 2)
685 tmpl_idx[3] -> 4; (DS 3) */
686 (*tmpl_idx)[0] = 0; /* time */
687 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
688 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
689 (*tmpl_idx)[ii++] = i;
694 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
695 goto err_free_rra_step_cnt;
699 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700 * rrd->stat_head->ds_cnt)) == NULL) {
701 rrd_set_error("allocating pdp_new.");
702 goto err_free_rra_step_cnt;
707 err_free_rra_step_cnt:
711 err_free_skip_update:
721 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
723 * Returns 0 on success.
725 static int parse_template(
728 unsigned long *tmpl_cnt,
731 char *dsname, *tmplt_copy;
732 unsigned int tmpl_len, i;
735 *tmpl_cnt = 1; /* the first entry is the time */
737 /* we should work on a writeable copy here */
738 if ((tmplt_copy = strdup(tmplt)) == NULL) {
739 rrd_set_error("error copying tmplt '%s'", tmplt);
745 tmpl_len = strlen(tmplt_copy);
746 for (i = 0; i <= tmpl_len; i++) {
747 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
748 tmplt_copy[i] = '\0';
749 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
750 rrd_set_error("tmplt contains more DS definitions than RRD");
752 goto out_free_tmpl_copy;
754 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
755 rrd_set_error("unknown DS name '%s'", dsname);
757 goto out_free_tmpl_copy;
759 /* go to the next entry on the tmplt_copy */
761 dsname = &tmplt_copy[i + 1];
771 * Parse an update string, updates the primary data points (PDPs)
772 * and consolidated data points (CDPs), and writes changes to the RRAs.
774 * Returns 0 on success, -1 on error.
776 static int process_arg(
779 rrd_file_t *rrd_file,
780 unsigned long rra_begin,
781 time_t *current_time,
782 unsigned long *current_time_usec,
783 rrd_value_t *pdp_temp,
784 rrd_value_t *pdp_new,
785 unsigned long *rra_step_cnt,
788 unsigned long tmpl_cnt,
789 rrd_info_t ** pcdp_summary,
791 unsigned long *skip_update,
792 int *schedule_smooth)
794 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
796 /* a vector of future Holt-Winters seasonal coefs */
797 unsigned long elapsed_pdp_st;
799 double interval, pre_int, post_int; /* interval between this and
801 unsigned long proc_pdp_cnt;
803 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
804 current_time, current_time_usec, version) == -1) {
808 interval = (double) (*current_time - rrd->live_head->last_up)
809 + (double) ((long) *current_time_usec -
810 (long) rrd->live_head->last_up_usec) / 1e6f;
812 /* process the data sources and update the pdp_prep
813 * area accordingly */
814 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
818 elapsed_pdp_st = calculate_elapsed_steps(rrd,
820 *current_time_usec, interval,
824 /* has a pdp_st moment occurred since the last run ? */
825 if (elapsed_pdp_st == 0) {
826 /* no we have not passed a pdp_st moment. therefore update is simple */
827 simple_update(rrd, interval, pdp_new);
829 /* an pdp_st has occurred. */
830 if (process_all_pdp_st(rrd, interval,
832 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
835 if (update_all_cdp_prep(rrd, rra_step_cnt,
842 skip_update, schedule_smooth) == -1) {
843 goto err_free_coefficients;
845 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
846 elapsed_pdp_st, pdp_temp,
847 &seasonal_coef) == -1) {
848 goto err_free_coefficients;
850 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
851 *current_time, skip_update,
852 pcdp_summary) == -1) {
853 goto err_free_coefficients;
855 } /* endif a pdp_st has occurred */
856 rrd->live_head->last_up = *current_time;
857 rrd->live_head->last_up_usec = *current_time_usec;
860 *rrd->legacy_last_up = rrd->live_head->last_up;
863 free(last_seasonal_coef);
866 err_free_coefficients:
868 free(last_seasonal_coef);
873 * Parse a DS string (time + colon-separated values), storing the
874 * results in current_time, current_time_usec, and updvals.
876 * Returns 0 on success, -1 on error.
883 unsigned long tmpl_cnt,
884 time_t *current_time,
885 unsigned long *current_time_usec,
893 /* initialize all ds input to unknown except the first one
894 which has always got to be set */
895 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
898 /* separate all ds elements; first must be examined separately
899 due to alternate time syntax */
900 if ((p = strchr(input, '@')) != NULL) {
902 } else if ((p = strchr(input, ':')) != NULL) {
905 rrd_set_error("expected timestamp not found in data source from %s",
911 updvals[tmpl_idx[i++]] = p + 1;
916 updvals[tmpl_idx[i++]] = p + 1;
919 rrd_set_error("found extra data on update argument: %s",p+1);
926 rrd_set_error("expected %lu data source readings (got %lu) from %s",
927 tmpl_cnt - 1, i - 1, input);
931 if (get_time_from_reading(rrd, timesyntax, updvals,
932 current_time, current_time_usec,
940 * Parse the time in a DS string, store it in current_time and
941 * current_time_usec and verify that it's later than the last
942 * update for this DS.
944 * Returns 0 on success, -1 on error.
946 static int get_time_from_reading(
950 time_t *current_time,
951 unsigned long *current_time_usec,
955 char *parsetime_error = NULL;
957 rrd_time_value_t ds_tv;
958 struct timeval tmp_time; /* used for time conversion */
960 /* get the time from the reading ... handle N */
961 if (timesyntax == '@') { /* at-style */
962 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
963 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
966 if (ds_tv.type == RELATIVE_TO_END_TIME ||
967 ds_tv.type == RELATIVE_TO_START_TIME) {
968 rrd_set_error("specifying time relative to the 'start' "
969 "or 'end' makes no sense here: %s", updvals[0]);
972 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
973 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
974 } else if (strcmp(updvals[0], "N") == 0) {
975 gettimeofday(&tmp_time, 0);
976 normalize_time(&tmp_time);
977 *current_time = tmp_time.tv_sec;
978 *current_time_usec = tmp_time.tv_usec;
980 old_locale = setlocale(LC_NUMERIC, NULL);
981 setlocale(LC_NUMERIC, "C");
983 tmp = strtod(updvals[0], 0);
985 rrd_set_error("converting '%s' to float: %s",
986 updvals[0], rrd_strerror(errno));
989 setlocale(LC_NUMERIC, old_locale);
991 gettimeofday(&tmp_time, 0);
992 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
995 *current_time = floor(tmp);
996 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
998 /* dont do any correction for old version RRDs */
1000 *current_time_usec = 0;
1002 if (*current_time < rrd->live_head->last_up ||
1003 (*current_time == rrd->live_head->last_up &&
1004 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1005 rrd_set_error("illegal attempt to update using time %ld when "
1006 "last update time is %ld (minimum one second step)",
1007 *current_time, rrd->live_head->last_up);
1014 * Update pdp_new by interpreting the updvals according to the DS type
1015 * (COUNTER, GAUGE, etc.).
1017 * Returns 0 on success, -1 on error.
1019 static int update_pdp_prep(
1022 rrd_value_t *pdp_new,
1025 unsigned long ds_idx;
1027 char *endptr; /* used in the conversion */
1030 enum dst_en dst_idx;
1032 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1033 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1035 /* make sure we do not build diffs with old last_ds values */
1036 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1037 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1038 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1041 /* NOTE: DST_CDEF should never enter this if block, because
1042 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1043 * accidently specified a value for the DST_CDEF. To handle this case,
1044 * an extra check is required. */
1046 if ((updvals[ds_idx + 1][0] != 'U') &&
1047 (dst_idx != DST_CDEF) &&
1048 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1051 /* pdp_new contains rate * time ... eg the bytes transferred during
1052 * the interval. Doing it this way saves a lot of math operations
1057 /* Check if this is a valid integer. `U' is already handled in
1058 * another branch. */
1059 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1060 if ((ii == 0) && (dst_idx == DST_DERIVE)
1061 && (updvals[ds_idx + 1][ii] == '-'))
1064 if ((updvals[ds_idx + 1][ii] < '0')
1065 || (updvals[ds_idx + 1][ii] > '9')) {
1066 rrd_set_error("not a simple %s integer: '%s'",
1067 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1068 updvals[ds_idx + 1]);
1071 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1073 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1075 rrd_diff(updvals[ds_idx + 1],
1076 rrd->pdp_prep[ds_idx].last_ds);
1077 if (dst_idx == DST_COUNTER) {
1078 /* simple overflow catcher. This will fail
1079 * terribly for non 32 or 64 bit counters
1080 * ... are there any others in SNMP land?
1082 if (pdp_new[ds_idx] < (double) 0.0)
1083 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1084 if (pdp_new[ds_idx] < (double) 0.0)
1085 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1087 rate = pdp_new[ds_idx] / interval;
1089 pdp_new[ds_idx] = DNAN;
1093 old_locale = setlocale(LC_NUMERIC, NULL);
1094 setlocale(LC_NUMERIC, "C");
1096 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1098 rrd_set_error("converting '%s' to float: %s",
1099 updvals[ds_idx + 1], rrd_strerror(errno));
1102 setlocale(LC_NUMERIC, old_locale);
1103 if (endptr[0] != '\0') {
1105 ("conversion of '%s' to float not complete: tail '%s'",
1106 updvals[ds_idx + 1], endptr);
1109 rate = pdp_new[ds_idx] / interval;
1112 old_locale = setlocale(LC_NUMERIC, NULL);
1113 setlocale(LC_NUMERIC, "C");
1116 strtod(updvals[ds_idx + 1], &endptr) * interval;
1118 rrd_set_error("converting '%s' to float: %s",
1119 updvals[ds_idx + 1], rrd_strerror(errno));
1122 setlocale(LC_NUMERIC, old_locale);
1123 if (endptr[0] != '\0') {
1125 ("conversion of '%s' to float not complete: tail '%s'",
1126 updvals[ds_idx + 1], endptr);
1129 rate = pdp_new[ds_idx] / interval;
1132 rrd_set_error("rrd contains unknown DS type : '%s'",
1133 rrd->ds_def[ds_idx].dst);
1136 /* break out of this for loop if the error string is set */
1137 if (rrd_test_error()) {
1140 /* make sure pdp_temp is neither too large or too small
1141 * if any of these occur it becomes unknown ...
1142 * sorry folks ... */
1144 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1145 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1146 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1147 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1148 pdp_new[ds_idx] = DNAN;
1151 /* no news is news all the same */
1152 pdp_new[ds_idx] = DNAN;
1156 /* make a copy of the command line argument for the next run */
1158 fprintf(stderr, "prep ds[%lu]\t"
1162 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1165 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1167 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1173 * How many PDP steps have elapsed since the last update? Returns the answer,
1174 * and stores the time between the last update and the last PDP in pre_time,
1175 * and the time between the last PDP and the current time in post_int.
1177 static int calculate_elapsed_steps(
1179 unsigned long current_time,
1180 unsigned long current_time_usec,
1184 unsigned long *proc_pdp_cnt)
1186 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1187 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1189 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1190 * when it was last updated */
1191 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1193 /* when was the current pdp started */
1194 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1195 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1197 /* when did the last pdp_st occur */
1198 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1199 occu_pdp_st = current_time - occu_pdp_age;
1201 if (occu_pdp_st > proc_pdp_st) {
1202 /* OK we passed the pdp_st moment */
1203 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1204 * occurred before the latest
1206 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1207 *post_int = occu_pdp_age; /* how much after it */
1208 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1210 *pre_int = interval;
1214 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1217 printf("proc_pdp_age %lu\t"
1219 "occu_pfp_age %lu\t"
1223 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1224 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1227 /* compute the number of elapsed pdp_st moments */
1228 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1232 * Increment the PDP values by the values in pdp_new, or else initialize them.
1234 static void simple_update(
1237 rrd_value_t *pdp_new)
1241 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1242 if (isnan(pdp_new[i])) {
1243 /* this is not really accurate if we use subsecond data arrival time
1244 should have thought of it when going subsecond resolution ...
1245 sorry next format change we will have it! */
1246 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1249 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1250 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1252 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1261 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1262 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1268 * Call process_pdp_st for each DS.
1270 * Returns 0 on success, -1 on error.
1272 static int process_all_pdp_st(
1277 unsigned long elapsed_pdp_st,
1278 rrd_value_t *pdp_new,
1279 rrd_value_t *pdp_temp)
1281 unsigned long ds_idx;
1283 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1284 rate*seconds which occurred up to the last run.
1285 pdp_new[] contains rate*seconds from the latest run.
1286 pdp_temp[] will contain the rate for cdp */
1288 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1289 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1290 elapsed_pdp_st * rrd->stat_head->pdp_step,
1291 pdp_new, pdp_temp) == -1) {
1295 fprintf(stderr, "PDP UPD ds[%lu]\t"
1296 "elapsed_pdp_st %lu\t"
1299 "new_unkn_sec %5lu\n",
1303 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1304 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1311 * Process an update that occurs after one of the PDP moments.
1312 * Increments the PDP value, sets NAN if time greater than the
1313 * heartbeats have elapsed, processes CDEFs.
1315 * Returns 0 on success, -1 on error.
1317 static int process_pdp_st(
1319 unsigned long ds_idx,
1323 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1324 rrd_value_t *pdp_new,
1325 rrd_value_t *pdp_temp)
1329 /* update pdp_prep to the current pdp_st. */
1330 double pre_unknown = 0.0;
1331 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1332 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1334 rpnstack_t rpnstack; /* used for COMPUTE DS */
1336 rpnstack_init(&rpnstack);
1339 if (isnan(pdp_new[ds_idx])) {
1340 /* a final bit of unknown to be added before calculation
1341 we use a temporary variable for this so that we
1342 don't have to turn integer lines before using the value */
1343 pre_unknown = pre_int;
1345 if (isnan(scratch[PDP_val].u_val)) {
1346 scratch[PDP_val].u_val = 0;
1348 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1351 /* if too much of the pdp_prep is unknown we dump it */
1352 /* if the interval is larger thatn mrhb we get NAN */
1353 if ((interval > mrhb) ||
1354 (rrd->stat_head->pdp_step / 2.0 <
1355 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1356 pdp_temp[ds_idx] = DNAN;
1358 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1359 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1363 /* process CDEF data sources; remember each CDEF DS can
1364 * only reference other DS with a lower index number */
1365 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1369 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1371 rpnstack_free(&rpnstack);
1374 /* substitute data values for OP_VARIABLE nodes */
1375 for (i = 0; rpnp[i].op != OP_END; i++) {
1376 if (rpnp[i].op == OP_VARIABLE) {
1377 rpnp[i].op = OP_NUMBER;
1378 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1381 /* run the rpn calculator */
1382 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1384 rpnstack_free(&rpnstack);
1390 /* make pdp_prep ready for the next run */
1391 if (isnan(pdp_new[ds_idx])) {
1392 /* this is not realy accurate if we use subsecond data arival time
1393 should have thought of it when going subsecond resolution ...
1394 sorry next format change we will have it! */
1395 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1396 scratch[PDP_val].u_val = DNAN;
1398 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1399 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1401 rpnstack_free(&rpnstack);
1406 * Iterate over all the RRAs for a given DS and:
1407 * 1. Decide whether to schedule a smooth later
1408 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1411 * Returns 0 on success, -1 on error
1413 static int update_all_cdp_prep(
1415 unsigned long *rra_step_cnt,
1416 unsigned long rra_begin,
1417 rrd_file_t *rrd_file,
1418 unsigned long elapsed_pdp_st,
1419 unsigned long proc_pdp_cnt,
1420 rrd_value_t **last_seasonal_coef,
1421 rrd_value_t **seasonal_coef,
1422 rrd_value_t *pdp_temp,
1423 unsigned long *skip_update,
1424 int *schedule_smooth)
1426 unsigned long rra_idx;
1428 /* index into the CDP scratch array */
1429 enum cf_en current_cf;
1430 unsigned long rra_start;
1432 /* number of rows to be updated in an RRA for a data value. */
1433 unsigned long start_pdp_offset;
1435 rra_start = rra_begin;
1436 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1437 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1439 rrd->rra_def[rra_idx].pdp_cnt -
1440 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1441 skip_update[rra_idx] = 0;
1442 if (start_pdp_offset <= elapsed_pdp_st) {
1443 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1444 rrd->rra_def[rra_idx].pdp_cnt + 1;
1446 rra_step_cnt[rra_idx] = 0;
1449 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1450 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1451 * so that they will be correct for the next observed value; note that for
1452 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1453 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1454 if (rra_step_cnt[rra_idx] > 1) {
1455 skip_update[rra_idx] = 1;
1456 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1457 elapsed_pdp_st, last_seasonal_coef);
1458 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1459 elapsed_pdp_st + 1, seasonal_coef);
1461 /* periodically run a smoother for seasonal effects */
1462 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1465 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1466 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1467 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1470 *schedule_smooth = 1;
1473 if (rrd_test_error())
1477 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1478 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1479 current_cf) == -1) {
1483 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1484 sizeof(rrd_value_t);
1490 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1492 static int do_schedule_smooth(
1494 unsigned long rra_idx,
1495 unsigned long elapsed_pdp_st)
1497 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1498 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1499 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1500 unsigned long seasonal_smooth_idx =
1501 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1502 unsigned long *init_seasonal =
1503 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1505 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1506 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1507 * really an RRA level, not a data source within RRA level parameter, but
1508 * the rra_def is read only for rrd_update (not flushed to disk). */
1509 if (*init_seasonal > BURNIN_CYCLES) {
1510 /* someone has no doubt invented a trick to deal with this wrap around,
1511 * but at least this code is clear. */
1512 if (seasonal_smooth_idx > cur_row) {
1513 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1514 * between PDP and CDP */
1515 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1517 /* can't rely on negative numbers because we are working with
1518 * unsigned values */
1519 return (cur_row + elapsed_pdp_st >= row_cnt
1520 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1522 /* mark off one of the burn-in cycles */
1523 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1527 * For a given RRA, iterate over the data sources and call the appropriate
1528 * consolidation function.
1530 * Returns 0 on success, -1 on error.
1532 static int update_cdp_prep(
1534 unsigned long elapsed_pdp_st,
1535 unsigned long start_pdp_offset,
1536 unsigned long *rra_step_cnt,
1538 rrd_value_t *pdp_temp,
1539 rrd_value_t *last_seasonal_coef,
1540 rrd_value_t *seasonal_coef,
1543 unsigned long ds_idx, cdp_idx;
1545 /* update CDP_PREP areas */
1546 /* loop over data soures within each RRA */
1547 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1549 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1551 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1552 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1553 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1554 elapsed_pdp_st, start_pdp_offset,
1555 rrd->rra_def[rra_idx].pdp_cnt,
1556 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1559 /* Nothing to consolidate if there's one PDP per CDP. However, if
1560 * we've missed some PDPs, let's update null counters etc. */
1561 if (elapsed_pdp_st > 2) {
1562 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1563 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1564 (enum cf_en)current_cf);
1568 if (rrd_test_error())
1570 } /* endif data sources loop */
1575 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1576 * primary value, secondary value, and # of unknowns.
1578 static void update_cdp(
1581 rrd_value_t pdp_temp_val,
1582 unsigned long rra_step_cnt,
1583 unsigned long elapsed_pdp_st,
1584 unsigned long start_pdp_offset,
1585 unsigned long pdp_cnt,
1590 /* shorthand variables */
1591 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1592 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1593 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1594 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1597 /* If we are in this block, as least 1 CDP value will be written to
1598 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1599 * to be written, then the "fill in" value is the CDP_secondary_val
1601 if (isnan(pdp_temp_val)) {
1602 *cdp_unkn_pdp_cnt += start_pdp_offset;
1603 *cdp_secondary_val = DNAN;
1605 /* CDP_secondary value is the RRA "fill in" value for intermediary
1606 * CDP data entries. No matter the CF, the value is the same because
1607 * the average, max, min, and last of a list of identical values is
1608 * the same, namely, the value itself. */
1609 *cdp_secondary_val = pdp_temp_val;
1612 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1613 *cdp_primary_val = DNAN;
1615 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1616 start_pdp_offset, pdp_cnt);
1619 initialize_carry_over(pdp_temp_val,current_cf,
1621 start_pdp_offset, pdp_cnt);
1622 /* endif meets xff value requirement for a valid value */
1623 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1624 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1625 if (isnan(pdp_temp_val))
1626 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1628 *cdp_unkn_pdp_cnt = 0;
1629 } else { /* rra_step_cnt[i] == 0 */
1632 if (isnan(*cdp_val)) {
1633 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1636 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1640 if (isnan(pdp_temp_val)) {
1641 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1644 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1651 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1652 * on the type of consolidation function.
1654 static void initialize_cdp_val(
1657 rrd_value_t pdp_temp_val,
1658 unsigned long start_pdp_offset,
1659 unsigned long pdp_cnt)
1661 rrd_value_t cum_val, cur_val;
1663 switch (current_cf) {
1665 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1666 cur_val = IFDNAN(pdp_temp_val, 0.0);
1667 scratch[CDP_primary_val].u_val =
1668 (cum_val + cur_val * start_pdp_offset) /
1669 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1672 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1673 cur_val = IFDNAN(pdp_temp_val, -DINF);
1677 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1679 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1685 if (cur_val > cum_val)
1686 scratch[CDP_primary_val].u_val = cur_val;
1688 scratch[CDP_primary_val].u_val = cum_val;
1691 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1692 cur_val = IFDNAN(pdp_temp_val, DINF);
1695 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1697 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1703 if (cur_val < cum_val)
1704 scratch[CDP_primary_val].u_val = cur_val;
1706 scratch[CDP_primary_val].u_val = cum_val;
1710 scratch[CDP_primary_val].u_val = pdp_temp_val;
1716 * Update the consolidation function for Holt-Winters functions as
1717 * well as other functions that don't actually consolidate multiple
1720 static void reset_cdp(
1722 unsigned long elapsed_pdp_st,
1723 rrd_value_t *pdp_temp,
1724 rrd_value_t *last_seasonal_coef,
1725 rrd_value_t *seasonal_coef,
1729 enum cf_en current_cf)
1731 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1733 switch (current_cf) {
1736 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1737 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1740 case CF_DEVSEASONAL:
1741 /* need to update cached seasonal values, so they are consistent
1742 * with the bulk update */
1743 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1744 * CDP_last_deviation are the same. */
1745 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1746 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1750 /* need to update the null_count and last_null_count.
1751 * even do this for non-DNAN pdp_temp because the
1752 * algorithm is not learning from batch updates. */
1753 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1754 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1757 scratch[CDP_primary_val].u_val = DNAN;
1758 scratch[CDP_secondary_val].u_val = DNAN;
1761 /* do not count missed bulk values as failures */
1762 scratch[CDP_primary_val].u_val = 0;
1763 scratch[CDP_secondary_val].u_val = 0;
1764 /* need to reset violations buffer.
1765 * could do this more carefully, but for now, just
1766 * assume a bulk update wipes away all violations. */
1767 erase_violations(rrd, cdp_idx, rra_idx);
1772 static rrd_value_t initialize_carry_over(
1773 rrd_value_t pdp_temp_val,
1775 unsigned long elapsed_pdp_st,
1776 unsigned long start_pdp_offset,
1777 unsigned long pdp_cnt)
1779 unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1780 if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1781 switch (current_cf) {
1793 switch (current_cf) {
1795 return pdp_temp_val * pdp_into_cdp_cnt ;
1797 return pdp_temp_val;
1803 * Update or initialize a CDP value based on the consolidation
1806 * Returns the new value.
1808 static rrd_value_t calculate_cdp_val(
1809 rrd_value_t cdp_val,
1810 rrd_value_t pdp_temp_val,
1811 unsigned long elapsed_pdp_st,
1822 if (isnan(cdp_val)) {
1823 if (current_cf == CF_AVERAGE) {
1824 pdp_temp_val *= elapsed_pdp_st;
1827 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1828 i, ii, pdp_temp_val);
1830 return pdp_temp_val;
1832 if (current_cf == CF_AVERAGE)
1833 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1834 if (current_cf == CF_MINIMUM)
1835 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1836 if (current_cf == CF_MAXIMUM)
1837 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1839 return pdp_temp_val;
1843 * For each RRA, update the seasonal values and then call update_aberrant_CF
1844 * for each data source.
1846 * Return 0 on success, -1 on error.
1848 static int update_aberrant_cdps(
1850 rrd_file_t *rrd_file,
1851 unsigned long rra_begin,
1852 unsigned long elapsed_pdp_st,
1853 rrd_value_t *pdp_temp,
1854 rrd_value_t **seasonal_coef)
1856 unsigned long rra_idx, ds_idx, j;
1858 /* number of PDP steps since the last update that
1859 * are assigned to the first CDP to be generated
1860 * since the last update. */
1861 unsigned short scratch_idx;
1862 unsigned long rra_start;
1863 enum cf_en current_cf;
1865 /* this loop is only entered if elapsed_pdp_st < 3 */
1866 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1867 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1868 rra_start = rra_begin;
1869 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1870 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1871 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1872 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1873 if (scratch_idx == CDP_primary_val) {
1874 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1875 elapsed_pdp_st + 1, seasonal_coef);
1877 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1878 elapsed_pdp_st + 2, seasonal_coef);
1881 if (rrd_test_error())
1883 /* loop over data soures within each RRA */
1884 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1885 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1886 rra_idx * (rrd->stat_head->ds_cnt) +
1887 ds_idx, rra_idx, ds_idx, scratch_idx,
1891 rra_start += rrd->rra_def[rra_idx].row_cnt
1892 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1899 * Move sequentially through the file, writing one RRA at a time. Note this
1900 * architecture divorces the computation of CDP with flushing updated RRA
1903 * Return 0 on success, -1 on error.
1905 static int write_to_rras(
1907 rrd_file_t *rrd_file,
1908 unsigned long *rra_step_cnt,
1909 unsigned long rra_begin,
1910 time_t current_time,
1911 unsigned long *skip_update,
1912 rrd_info_t ** pcdp_summary)
1914 unsigned long rra_idx;
1915 unsigned long rra_start;
1916 time_t rra_time = 0; /* time of update for a RRA */
1918 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1920 /* Ready to write to disk */
1921 rra_start = rra_begin;
1923 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1924 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1925 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1928 unsigned short scratch_idx;
1929 unsigned long step_subtract;
1931 for (scratch_idx = CDP_primary_val,
1933 rra_step_cnt[rra_idx] > 0;
1934 rra_step_cnt[rra_idx]--,
1935 scratch_idx = CDP_secondary_val,
1936 step_subtract = 2) {
1940 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1942 /* increment, with wrap-around */
1943 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1944 rra_ptr->cur_row = 0;
1946 /* we know what our position should be */
1947 rra_pos_new = rra_start
1948 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1950 /* re-seek if the position is wrong or we wrapped around */
1951 if ((size_t)rra_pos_new != rrd_file->pos) {
1952 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1953 rrd_set_error("seek error in rrd");
1958 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1961 if (skip_update[rra_idx])
1964 if (*pcdp_summary != NULL) {
1965 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1967 rra_time = (current_time - current_time % step_time)
1968 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1972 (rrd_file, rrd, rra_idx, scratch_idx,
1973 pcdp_summary, rra_time) == -1)
1976 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1979 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1986 * Write out one row of values (one value per DS) to the archive.
1988 * Returns 0 on success, -1 on error.
1990 static int write_RRA_row(
1991 rrd_file_t *rrd_file,
1993 unsigned long rra_idx,
1994 unsigned short CDP_scratch_idx,
1995 rrd_info_t ** pcdp_summary,
1998 unsigned long ds_idx, cdp_idx;
2001 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2002 /* compute the cdp index */
2003 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2005 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2006 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2007 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2009 if (*pcdp_summary != NULL) {
2010 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2011 /* append info to the return hash */
2012 *pcdp_summary = rrd_info_push(*pcdp_summary,
2014 ("[%lli]RRA[%s][%lu]DS[%s]",
2015 (long long)rra_time,
2016 rrd->rra_def[rra_idx].cf_nam,
2017 rrd->rra_def[rra_idx].pdp_cnt,
2018 rrd->ds_def[ds_idx].ds_nam),
2022 if (rrd_write(rrd_file,
2023 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2024 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2025 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2033 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2035 * Returns 0 on success, -1 otherwise
2037 static int smooth_all_rras(
2039 rrd_file_t *rrd_file,
2040 unsigned long rra_begin)
2042 unsigned long rra_start = rra_begin;
2043 unsigned long rra_idx;
2045 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2046 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2047 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2049 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2051 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2052 if (rrd_test_error())
2055 rra_start += rrd->rra_def[rra_idx].row_cnt
2056 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2063 * Flush changes to disk (unless we're using mmap)
2065 * Returns 0 on success, -1 otherwise
2067 static int write_changes_to_disk(
2069 rrd_file_t *rrd_file,
2072 /* we just need to write back the live header portion now */
2073 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2074 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2075 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2077 rrd_set_error("seek rrd for live header writeback");
2081 if (rrd_write(rrd_file, rrd->live_head,
2082 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2083 rrd_set_error("rrd_write live_head to rrd");
2087 if (rrd_write(rrd_file, rrd->legacy_last_up,
2088 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2089 rrd_set_error("rrd_write live_head to rrd");
2095 if (rrd_write(rrd_file, rrd->pdp_prep,
2096 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2097 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2098 rrd_set_error("rrd_write pdp_prep to rrd");
2102 if (rrd_write(rrd_file, rrd->cdp_prep,
2103 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2104 rrd->stat_head->ds_cnt)
2105 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2106 rrd->stat_head->ds_cnt)) {
2108 rrd_set_error("rrd_write cdp_prep to rrd");
2112 if (rrd_write(rrd_file, rrd->rra_ptr,
2113 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2114 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2115 rrd_set_error("rrd_write rra_ptr to rrd");