2 /*****************************************************************************
3 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
108 unsigned long tmpl_cnt,
109 rrd_info_t ** pcdp_summary,
111 unsigned long *skip_update,
112 int *schedule_smooth);
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
124 static int get_time_from_reading(
128 time_t *current_time,
129 unsigned long *current_time_usec,
132 static int update_pdp_prep(
135 rrd_value_t *pdp_new,
138 static int calculate_elapsed_steps(
140 unsigned long current_time,
141 unsigned long current_time_usec,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
163 unsigned long ds_idx,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *skip_update,
182 int *schedule_smooth);
184 static int do_schedule_smooth(
186 unsigned long rra_idx,
187 unsigned long elapsed_pdp_st);
189 static int update_cdp_prep(
191 unsigned long elapsed_pdp_st,
192 unsigned long start_pdp_offset,
193 unsigned long *rra_step_cnt,
195 rrd_value_t *pdp_temp,
196 rrd_value_t *last_seasonal_coef,
197 rrd_value_t *seasonal_coef,
200 static void update_cdp(
203 rrd_value_t pdp_temp_val,
204 unsigned long rra_step_cnt,
205 unsigned long elapsed_pdp_st,
206 unsigned long start_pdp_offset,
207 unsigned long pdp_cnt,
212 static void initialize_cdp_val(
215 rrd_value_t pdp_temp_val,
216 unsigned long elapsed_pdp_st,
217 unsigned long start_pdp_offset,
218 unsigned long pdp_cnt);
220 static void reset_cdp(
222 unsigned long elapsed_pdp_st,
223 rrd_value_t *pdp_temp,
224 rrd_value_t *last_seasonal_coef,
225 rrd_value_t *seasonal_coef,
229 enum cf_en current_cf);
231 static rrd_value_t initialize_average_carry_over(
232 rrd_value_t pdp_temp_val,
233 unsigned long elapsed_pdp_st,
234 unsigned long start_pdp_offset,
235 unsigned long pdp_cnt);
237 static rrd_value_t calculate_cdp_val(
239 rrd_value_t pdp_temp_val,
240 unsigned long elapsed_pdp_st,
245 static int update_aberrant_cdps(
247 rrd_file_t *rrd_file,
248 unsigned long rra_begin,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
259 unsigned long *skip_update,
260 rrd_info_t ** pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
265 unsigned long rra_idx,
266 unsigned short CDP_scratch_idx,
267 rrd_info_t ** pcdp_summary,
270 static int smooth_all_rras(
272 rrd_file_t *rrd_file,
273 unsigned long rra_begin);
276 static int write_changes_to_disk(
278 rrd_file_t *rrd_file,
283 * normalize time as returned by gettimeofday. usec part must
286 static inline void normalize_time(
289 if (t->tv_usec < 0) {
296 * Sets current_time and current_time_usec based on the current time.
297 * current_time_usec is set to 0 if the version number is 1 or 2.
299 static inline void initialize_time(
300 time_t *current_time,
301 unsigned long *current_time_usec,
304 struct timeval tmp_time; /* used for time conversion */
306 gettimeofday(&tmp_time, 0);
307 normalize_time(&tmp_time);
308 *current_time = tmp_time.tv_sec;
310 *current_time_usec = tmp_time.tv_usec;
312 *current_time_usec = 0;
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318 rrd_info_t *rrd_update_v(
323 rrd_info_t *result = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
332 opterr = 0; /* initialize getopt */
335 int option_index = 0;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
354 /* need at least 2 arguments: filename, data. */
355 if (argc - optind < 2) {
356 rrd_set_error("Not enough arguments");
360 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
361 rc.u_int = _rrd_update(argv[optind], tmplt,
363 (const char **) (argv + optind + 1), result);
364 result->value.u_int = rc.u_int;
373 struct option long_options[] = {
374 {"template", required_argument, 0, 't'},
375 {"daemon", required_argument, 0, 'd'},
378 int option_index = 0;
382 char *opt_daemon = NULL;
385 opterr = 0; /* initialize getopt */
388 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
395 tmplt = strdup(optarg);
399 if (opt_daemon != NULL)
401 opt_daemon = strdup (optarg);
402 if (opt_daemon == NULL)
404 rrd_set_error("strdup failed.");
410 rrd_set_error("unknown option '%s'", argv[optind - 1]);
415 /* need at least 2 arguments: filename, data. */
416 if (argc - optind < 2) {
417 rrd_set_error("Not enough arguments");
421 if ((tmplt != NULL) && (opt_daemon != NULL))
423 rrd_set_error("The caching opt_daemon cannot be used together with "
428 if ((tmplt == NULL) && (opt_daemon == NULL))
432 temp = getenv (ENV_RRDCACHED_ADDRESS);
435 opt_daemon = strdup (temp);
436 if (opt_daemon == NULL)
438 rrd_set_error("strdup failed.");
444 if (opt_daemon != NULL)
448 status = rrdc_connect (opt_daemon);
451 rrd_set_error("Unable to connect to opt_daemon: %s",
454 : rrd_strerror (status));
458 status = rrdc_update (/* file = */ argv[optind],
459 /* values_num = */ argc - optind - 1,
460 /* values = */ (void *) (argv + optind + 1));
463 rrd_set_error("Failed sending the values to the opt_daemon: %s",
466 : rrd_strerror (status));
475 } /* if (opt_daemon != NULL) */
477 rc = rrd_update_r(argv[optind], tmplt,
478 argc - optind - 1, (const char **) (argv + optind + 1));
485 if (opt_daemon != NULL)
494 const char *filename,
499 return _rrd_update(filename, tmplt, argc, argv, NULL);
503 const char *filename,
507 rrd_info_t * pcdp_summary)
512 unsigned long rra_begin; /* byte pointer to the rra
513 * area in the rrd file. this
514 * pointer never changes value */
515 rrd_value_t *pdp_new; /* prepare the incoming data to be added
516 * to the existing entry */
517 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
518 * to the cdp values */
520 long *tmpl_idx; /* index representing the settings
521 * transported by the tmplt index */
522 unsigned long tmpl_cnt = 2; /* time and data */
524 time_t current_time = 0;
525 unsigned long current_time_usec = 0; /* microseconds part of current time */
527 int schedule_smooth = 0;
529 /* number of elapsed PDP steps since last update */
530 unsigned long *rra_step_cnt = NULL;
532 int version; /* rrd version */
533 rrd_file_t *rrd_file;
534 char *arg_copy; /* for processing the argv */
535 unsigned long *skip_update; /* RRAs to advance but not write */
537 /* need at least 1 arguments: data. */
539 rrd_set_error("Not enough arguments");
543 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
546 /* We are now at the beginning of the rra's */
547 rra_begin = rrd_file->header_len;
549 version = atoi(rrd.stat_head->version);
551 initialize_time(¤t_time, ¤t_time_usec, version);
553 /* get exclusive lock to whole file.
554 * lock gets removed when we close the file.
556 if (rrd_lock(rrd_file) != 0) {
557 rrd_set_error("could not lock RRD");
561 if (allocate_data_structures(&rrd, &updvals,
562 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
563 &rra_step_cnt, &skip_update,
568 /* loop through the arguments. */
569 for (arg_i = 0; arg_i < argc; arg_i++) {
570 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
571 rrd_set_error("failed duplication argv entry");
574 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
575 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
576 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
577 &pcdp_summary, version, skip_update,
578 &schedule_smooth) == -1) {
579 if (rrd_test_error()) { /* Should have error string always here */
582 /* Prepend file name to error message */
583 if ((save_error = strdup(rrd_get_error())) != NULL) {
584 rrd_set_error("%s: %s", filename, save_error);
596 /* if we got here and if there is an error and if the file has not been
597 * written to, then close things up and return. */
598 if (rrd_test_error()) {
599 goto err_free_structures;
602 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
603 goto err_free_structures;
607 /* calling the smoothing code here guarantees at most one smoothing
608 * operation per rrd_update call. Unfortunately, it is possible with bulk
609 * updates, or a long-delayed update for smoothing to occur off-schedule.
610 * This really isn't critical except during the burn-in cycles. */
611 if (schedule_smooth) {
612 smooth_all_rras(&rrd, rrd_file, rra_begin);
615 /* rrd_dontneed(rrd_file,&rrd); */
641 * get exclusive lock to whole file.
642 * lock gets removed when we close the file
644 * returns 0 on success
652 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
655 if (_fstat(file->fd, &st) == 0) {
656 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
663 lock.l_type = F_WRLCK; /* exclusive write lock */
664 lock.l_len = 0; /* whole file */
665 lock.l_start = 0; /* start of file */
666 lock.l_whence = SEEK_SET; /* end of file */
668 rcstat = fcntl(file->fd, F_SETLK, &lock);
676 * Allocate some important arrays used, and initialize the template.
678 * When it returns, either all of the structures are allocated
679 * or none of them are.
681 * Returns 0 on success, -1 on error.
683 static int allocate_data_structures(
686 rrd_value_t **pdp_temp,
689 unsigned long *tmpl_cnt,
690 unsigned long **rra_step_cnt,
691 unsigned long **skip_update,
692 rrd_value_t **pdp_new)
695 if ((*updvals = (char **) malloc(sizeof(char *)
696 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
697 rrd_set_error("allocating updvals pointer array.");
700 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
701 * rrd->stat_head->ds_cnt)) ==
703 rrd_set_error("allocating pdp_temp.");
704 goto err_free_updvals;
706 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
708 rrd->stat_head->rra_cnt)) ==
710 rrd_set_error("allocating skip_update.");
711 goto err_free_pdp_temp;
713 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
714 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
715 rrd_set_error("allocating tmpl_idx.");
716 goto err_free_skip_update;
718 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
721 rra_cnt))) == NULL) {
722 rrd_set_error("allocating rra_step_cnt.");
723 goto err_free_tmpl_idx;
726 /* initialize tmplt redirector */
727 /* default config example (assume DS 1 is a CDEF DS)
728 tmpl_idx[0] -> 0; (time)
729 tmpl_idx[1] -> 1; (DS 0)
730 tmpl_idx[2] -> 3; (DS 2)
731 tmpl_idx[3] -> 4; (DS 3) */
732 (*tmpl_idx)[0] = 0; /* time */
733 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
734 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
735 (*tmpl_idx)[ii++] = i;
740 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
741 goto err_free_rra_step_cnt;
745 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
746 * rrd->stat_head->ds_cnt)) == NULL) {
747 rrd_set_error("allocating pdp_new.");
748 goto err_free_rra_step_cnt;
753 err_free_rra_step_cnt:
757 err_free_skip_update:
767 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
769 * Returns 0 on success.
771 static int parse_template(
774 unsigned long *tmpl_cnt,
777 char *dsname, *tmplt_copy;
778 unsigned int tmpl_len, i;
781 *tmpl_cnt = 1; /* the first entry is the time */
783 /* we should work on a writeable copy here */
784 if ((tmplt_copy = strdup(tmplt)) == NULL) {
785 rrd_set_error("error copying tmplt '%s'", tmplt);
791 tmpl_len = strlen(tmplt_copy);
792 for (i = 0; i <= tmpl_len; i++) {
793 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
794 tmplt_copy[i] = '\0';
795 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
796 rrd_set_error("tmplt contains more DS definitions than RRD");
798 goto out_free_tmpl_copy;
800 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
801 rrd_set_error("unknown DS name '%s'", dsname);
803 goto out_free_tmpl_copy;
805 /* go to the next entry on the tmplt_copy */
807 dsname = &tmplt_copy[i + 1];
817 * Parse an update string, updates the primary data points (PDPs)
818 * and consolidated data points (CDPs), and writes changes to the RRAs.
820 * Returns 0 on success, -1 on error.
822 static int process_arg(
825 rrd_file_t *rrd_file,
826 unsigned long rra_begin,
827 time_t *current_time,
828 unsigned long *current_time_usec,
829 rrd_value_t *pdp_temp,
830 rrd_value_t *pdp_new,
831 unsigned long *rra_step_cnt,
834 unsigned long tmpl_cnt,
835 rrd_info_t ** pcdp_summary,
837 unsigned long *skip_update,
838 int *schedule_smooth)
840 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
842 /* a vector of future Holt-Winters seasonal coefs */
843 unsigned long elapsed_pdp_st;
845 double interval, pre_int, post_int; /* interval between this and
847 unsigned long proc_pdp_cnt;
849 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
850 current_time, current_time_usec, version) == -1) {
854 interval = (double) (*current_time - rrd->live_head->last_up)
855 + (double) ((long) *current_time_usec -
856 (long) rrd->live_head->last_up_usec) / 1e6f;
858 /* process the data sources and update the pdp_prep
859 * area accordingly */
860 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
864 elapsed_pdp_st = calculate_elapsed_steps(rrd,
866 *current_time_usec, interval,
870 /* has a pdp_st moment occurred since the last run ? */
871 if (elapsed_pdp_st == 0) {
872 /* no we have not passed a pdp_st moment. therefore update is simple */
873 simple_update(rrd, interval, pdp_new);
875 /* an pdp_st has occurred. */
876 if (process_all_pdp_st(rrd, interval,
878 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
881 if (update_all_cdp_prep(rrd, rra_step_cnt,
888 skip_update, schedule_smooth) == -1) {
889 goto err_free_coefficients;
891 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
892 elapsed_pdp_st, pdp_temp,
893 &seasonal_coef) == -1) {
894 goto err_free_coefficients;
896 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
897 *current_time, skip_update,
898 pcdp_summary) == -1) {
899 goto err_free_coefficients;
901 } /* endif a pdp_st has occurred */
902 rrd->live_head->last_up = *current_time;
903 rrd->live_head->last_up_usec = *current_time_usec;
906 *rrd->legacy_last_up = rrd->live_head->last_up;
909 free(last_seasonal_coef);
912 err_free_coefficients:
914 free(last_seasonal_coef);
919 * Parse a DS string (time + colon-separated values), storing the
920 * results in current_time, current_time_usec, and updvals.
922 * Returns 0 on success, -1 on error.
929 unsigned long tmpl_cnt,
930 time_t *current_time,
931 unsigned long *current_time_usec,
939 /* initialize all ds input to unknown except the first one
940 which has always got to be set */
941 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
944 /* separate all ds elements; first must be examined separately
945 due to alternate time syntax */
946 if ((p = strchr(input, '@')) != NULL) {
948 } else if ((p = strchr(input, ':')) != NULL) {
951 rrd_set_error("expected timestamp not found in data source from %s",
957 updvals[tmpl_idx[i++]] = p + 1;
962 updvals[tmpl_idx[i++]] = p + 1;
968 rrd_set_error("expected %lu data source readings (got %lu) from %s",
969 tmpl_cnt - 1, i, input);
973 if (get_time_from_reading(rrd, timesyntax, updvals,
974 current_time, current_time_usec,
982 * Parse the time in a DS string, store it in current_time and
983 * current_time_usec and verify that it's later than the last
984 * update for this DS.
986 * Returns 0 on success, -1 on error.
988 static int get_time_from_reading(
992 time_t *current_time,
993 unsigned long *current_time_usec,
997 char *parsetime_error = NULL;
999 rrd_time_value_t ds_tv;
1000 struct timeval tmp_time; /* used for time conversion */
1002 /* get the time from the reading ... handle N */
1003 if (timesyntax == '@') { /* at-style */
1004 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1005 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1008 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1009 ds_tv.type == RELATIVE_TO_START_TIME) {
1010 rrd_set_error("specifying time relative to the 'start' "
1011 "or 'end' makes no sense here: %s", updvals[0]);
1014 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1015 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1016 } else if (strcmp(updvals[0], "N") == 0) {
1017 gettimeofday(&tmp_time, 0);
1018 normalize_time(&tmp_time);
1019 *current_time = tmp_time.tv_sec;
1020 *current_time_usec = tmp_time.tv_usec;
1022 old_locale = setlocale(LC_NUMERIC, "C");
1023 tmp = strtod(updvals[0], 0);
1024 setlocale(LC_NUMERIC, old_locale);
1025 *current_time = floor(tmp);
1026 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1028 /* dont do any correction for old version RRDs */
1030 *current_time_usec = 0;
1032 if (*current_time < rrd->live_head->last_up ||
1033 (*current_time == rrd->live_head->last_up &&
1034 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1035 rrd_set_error("illegal attempt to update using time %ld when "
1036 "last update time is %ld (minimum one second step)",
1037 *current_time, rrd->live_head->last_up);
1044 * Update pdp_new by interpreting the updvals according to the DS type
1045 * (COUNTER, GAUGE, etc.).
1047 * Returns 0 on success, -1 on error.
1049 static int update_pdp_prep(
1052 rrd_value_t *pdp_new,
1055 unsigned long ds_idx;
1057 char *endptr; /* used in the conversion */
1060 enum dst_en dst_idx;
1062 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1063 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1065 /* make sure we do not build diffs with old last_ds values */
1066 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1067 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1068 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1071 /* NOTE: DST_CDEF should never enter this if block, because
1072 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1073 * accidently specified a value for the DST_CDEF. To handle this case,
1074 * an extra check is required. */
1076 if ((updvals[ds_idx + 1][0] != 'U') &&
1077 (dst_idx != DST_CDEF) &&
1078 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1081 /* pdp_new contains rate * time ... eg the bytes transferred during
1082 * the interval. Doing it this way saves a lot of math operations
1087 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1088 if ((updvals[ds_idx + 1][ii] < '0'
1089 || updvals[ds_idx + 1][ii] > '9')
1090 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1091 rrd_set_error("not a simple integer: '%s'",
1092 updvals[ds_idx + 1]);
1096 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1098 rrd_diff(updvals[ds_idx + 1],
1099 rrd->pdp_prep[ds_idx].last_ds);
1100 if (dst_idx == DST_COUNTER) {
1101 /* simple overflow catcher. This will fail
1102 * terribly for non 32 or 64 bit counters
1103 * ... are there any others in SNMP land?
1105 if (pdp_new[ds_idx] < (double) 0.0)
1106 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1107 if (pdp_new[ds_idx] < (double) 0.0)
1108 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1110 rate = pdp_new[ds_idx] / interval;
1112 pdp_new[ds_idx] = DNAN;
1116 old_locale = setlocale(LC_NUMERIC, "C");
1118 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1119 setlocale(LC_NUMERIC, old_locale);
1121 rrd_set_error("converting '%s' to float: %s",
1122 updvals[ds_idx + 1], rrd_strerror(errno));
1125 if (endptr[0] != '\0') {
1127 ("conversion of '%s' to float not complete: tail '%s'",
1128 updvals[ds_idx + 1], endptr);
1131 rate = pdp_new[ds_idx] / interval;
1135 old_locale = setlocale(LC_NUMERIC, "C");
1137 strtod(updvals[ds_idx + 1], &endptr) * interval;
1138 setlocale(LC_NUMERIC, old_locale);
1140 rrd_set_error("converting '%s' to float: %s",
1141 updvals[ds_idx + 1], rrd_strerror(errno));
1144 if (endptr[0] != '\0') {
1146 ("conversion of '%s' to float not complete: tail '%s'",
1147 updvals[ds_idx + 1], endptr);
1150 rate = pdp_new[ds_idx] / interval;
1153 rrd_set_error("rrd contains unknown DS type : '%s'",
1154 rrd->ds_def[ds_idx].dst);
1157 /* break out of this for loop if the error string is set */
1158 if (rrd_test_error()) {
1161 /* make sure pdp_temp is neither too large or too small
1162 * if any of these occur it becomes unknown ...
1163 * sorry folks ... */
1165 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1166 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1167 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1168 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1169 pdp_new[ds_idx] = DNAN;
1172 /* no news is news all the same */
1173 pdp_new[ds_idx] = DNAN;
1177 /* make a copy of the command line argument for the next run */
1179 fprintf(stderr, "prep ds[%lu]\t"
1183 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1186 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1188 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1194 * How many PDP steps have elapsed since the last update? Returns the answer,
1195 * and stores the time between the last update and the last PDP in pre_time,
1196 * and the time between the last PDP and the current time in post_int.
1198 static int calculate_elapsed_steps(
1200 unsigned long current_time,
1201 unsigned long current_time_usec,
1205 unsigned long *proc_pdp_cnt)
1207 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1208 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1210 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1211 * when it was last updated */
1212 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1214 /* when was the current pdp started */
1215 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1216 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1218 /* when did the last pdp_st occur */
1219 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1220 occu_pdp_st = current_time - occu_pdp_age;
1222 if (occu_pdp_st > proc_pdp_st) {
1223 /* OK we passed the pdp_st moment */
1224 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1225 * occurred before the latest
1227 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1228 *post_int = occu_pdp_age; /* how much after it */
1229 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1231 *pre_int = interval;
1235 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1238 printf("proc_pdp_age %lu\t"
1240 "occu_pfp_age %lu\t"
1244 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1245 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1248 /* compute the number of elapsed pdp_st moments */
1249 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1253 * Increment the PDP values by the values in pdp_new, or else initialize them.
1255 static void simple_update(
1258 rrd_value_t *pdp_new)
1262 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1263 if (isnan(pdp_new[i])) {
1264 /* this is not really accurate if we use subsecond data arrival time
1265 should have thought of it when going subsecond resolution ...
1266 sorry next format change we will have it! */
1267 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1270 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1271 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1273 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1282 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1283 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1289 * Call process_pdp_st for each DS.
1291 * Returns 0 on success, -1 on error.
1293 static int process_all_pdp_st(
1298 unsigned long elapsed_pdp_st,
1299 rrd_value_t *pdp_new,
1300 rrd_value_t *pdp_temp)
1302 unsigned long ds_idx;
1304 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1305 rate*seconds which occurred up to the last run.
1306 pdp_new[] contains rate*seconds from the latest run.
1307 pdp_temp[] will contain the rate for cdp */
1309 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1310 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1311 elapsed_pdp_st * rrd->stat_head->pdp_step,
1312 pdp_new, pdp_temp) == -1) {
1316 fprintf(stderr, "PDP UPD ds[%lu]\t"
1317 "elapsed_pdp_st %lu\t"
1320 "new_unkn_sec %5lu\n",
1324 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1325 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1332 * Process an update that occurs after one of the PDP moments.
1333 * Increments the PDP value, sets NAN if time greater than the
1334 * heartbeats have elapsed, processes CDEFs.
1336 * Returns 0 on success, -1 on error.
1338 static int process_pdp_st(
1340 unsigned long ds_idx,
1344 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1345 rrd_value_t *pdp_new,
1346 rrd_value_t *pdp_temp)
1350 /* update pdp_prep to the current pdp_st. */
1351 double pre_unknown = 0.0;
1352 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1353 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1355 rpnstack_t rpnstack; /* used for COMPUTE DS */
1357 rpnstack_init(&rpnstack);
1360 if (isnan(pdp_new[ds_idx])) {
1361 /* a final bit of unknown to be added before calculation
1362 we use a temporary variable for this so that we
1363 don't have to turn integer lines before using the value */
1364 pre_unknown = pre_int;
1366 if (isnan(scratch[PDP_val].u_val)) {
1367 scratch[PDP_val].u_val = 0;
1369 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1372 /* if too much of the pdp_prep is unknown we dump it */
1373 /* if the interval is larger thatn mrhb we get NAN */
1374 if ((interval > mrhb) ||
1375 (rrd->stat_head->pdp_step / 2.0 <
1376 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1377 pdp_temp[ds_idx] = DNAN;
1379 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1380 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1384 /* process CDEF data sources; remember each CDEF DS can
1385 * only reference other DS with a lower index number */
1386 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1390 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1391 /* substitute data values for OP_VARIABLE nodes */
1392 for (i = 0; rpnp[i].op != OP_END; i++) {
1393 if (rpnp[i].op == OP_VARIABLE) {
1394 rpnp[i].op = OP_NUMBER;
1395 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1398 /* run the rpn calculator */
1399 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1401 rpnstack_free(&rpnstack);
1406 /* make pdp_prep ready for the next run */
1407 if (isnan(pdp_new[ds_idx])) {
1408 /* this is not realy accurate if we use subsecond data arival time
1409 should have thought of it when going subsecond resolution ...
1410 sorry next format change we will have it! */
1411 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1412 scratch[PDP_val].u_val = DNAN;
1414 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1415 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1417 rpnstack_free(&rpnstack);
1422 * Iterate over all the RRAs for a given DS and:
1423 * 1. Decide whether to schedule a smooth later
1424 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1427 * Returns 0 on success, -1 on error
1429 static int update_all_cdp_prep(
1431 unsigned long *rra_step_cnt,
1432 unsigned long rra_begin,
1433 rrd_file_t *rrd_file,
1434 unsigned long elapsed_pdp_st,
1435 unsigned long proc_pdp_cnt,
1436 rrd_value_t **last_seasonal_coef,
1437 rrd_value_t **seasonal_coef,
1438 rrd_value_t *pdp_temp,
1439 unsigned long *skip_update,
1440 int *schedule_smooth)
1442 unsigned long rra_idx;
1444 /* index into the CDP scratch array */
1445 enum cf_en current_cf;
1446 unsigned long rra_start;
1448 /* number of rows to be updated in an RRA for a data value. */
1449 unsigned long start_pdp_offset;
1451 rra_start = rra_begin;
1452 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1453 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1455 rrd->rra_def[rra_idx].pdp_cnt -
1456 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1457 skip_update[rra_idx] = 0;
1458 if (start_pdp_offset <= elapsed_pdp_st) {
1459 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1460 rrd->rra_def[rra_idx].pdp_cnt + 1;
1462 rra_step_cnt[rra_idx] = 0;
1465 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1466 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1467 * so that they will be correct for the next observed value; note that for
1468 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1469 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1470 if (rra_step_cnt[rra_idx] > 1) {
1471 skip_update[rra_idx] = 1;
1472 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1473 elapsed_pdp_st, last_seasonal_coef);
1474 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1475 elapsed_pdp_st + 1, seasonal_coef);
1477 /* periodically run a smoother for seasonal effects */
1478 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1481 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1482 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1483 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1486 *schedule_smooth = 1;
1489 if (rrd_test_error())
1493 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1494 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1495 current_cf) == -1) {
1499 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1500 sizeof(rrd_value_t);
1506 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1508 static int do_schedule_smooth(
1510 unsigned long rra_idx,
1511 unsigned long elapsed_pdp_st)
1513 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1514 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1515 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1516 unsigned long seasonal_smooth_idx =
1517 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1518 unsigned long *init_seasonal =
1519 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1521 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1522 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1523 * really an RRA level, not a data source within RRA level parameter, but
1524 * the rra_def is read only for rrd_update (not flushed to disk). */
1525 if (*init_seasonal > BURNIN_CYCLES) {
1526 /* someone has no doubt invented a trick to deal with this wrap around,
1527 * but at least this code is clear. */
1528 if (seasonal_smooth_idx > cur_row) {
1529 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1530 * between PDP and CDP */
1531 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1533 /* can't rely on negative numbers because we are working with
1534 * unsigned values */
1535 return (cur_row + elapsed_pdp_st >= row_cnt
1536 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1538 /* mark off one of the burn-in cycles */
1539 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1543 * For a given RRA, iterate over the data sources and call the appropriate
1544 * consolidation function.
1546 * Returns 0 on success, -1 on error.
1548 static int update_cdp_prep(
1550 unsigned long elapsed_pdp_st,
1551 unsigned long start_pdp_offset,
1552 unsigned long *rra_step_cnt,
1554 rrd_value_t *pdp_temp,
1555 rrd_value_t *last_seasonal_coef,
1556 rrd_value_t *seasonal_coef,
1559 unsigned long ds_idx, cdp_idx;
1561 /* update CDP_PREP areas */
1562 /* loop over data soures within each RRA */
1563 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1565 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1567 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1568 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1569 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1570 elapsed_pdp_st, start_pdp_offset,
1571 rrd->rra_def[rra_idx].pdp_cnt,
1572 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1575 /* Nothing to consolidate if there's one PDP per CDP. However, if
1576 * we've missed some PDPs, let's update null counters etc. */
1577 if (elapsed_pdp_st > 2) {
1578 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1579 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1584 if (rrd_test_error())
1586 } /* endif data sources loop */
1591 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1592 * primary value, secondary value, and # of unknowns.
1594 static void update_cdp(
1597 rrd_value_t pdp_temp_val,
1598 unsigned long rra_step_cnt,
1599 unsigned long elapsed_pdp_st,
1600 unsigned long start_pdp_offset,
1601 unsigned long pdp_cnt,
1606 /* shorthand variables */
1607 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1608 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1609 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1610 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1613 /* If we are in this block, as least 1 CDP value will be written to
1614 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1615 * to be written, then the "fill in" value is the CDP_secondary_val
1617 if (isnan(pdp_temp_val)) {
1618 *cdp_unkn_pdp_cnt += start_pdp_offset;
1619 *cdp_secondary_val = DNAN;
1621 /* CDP_secondary value is the RRA "fill in" value for intermediary
1622 * CDP data entries. No matter the CF, the value is the same because
1623 * the average, max, min, and last of a list of identical values is
1624 * the same, namely, the value itself. */
1625 *cdp_secondary_val = pdp_temp_val;
1628 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1629 *cdp_primary_val = DNAN;
1630 if (current_cf == CF_AVERAGE) {
1632 initialize_average_carry_over(pdp_temp_val,
1634 start_pdp_offset, pdp_cnt);
1636 *cdp_val = pdp_temp_val;
1639 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1640 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1641 } /* endif meets xff value requirement for a valid value */
1642 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1643 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1644 if (isnan(pdp_temp_val))
1645 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1647 *cdp_unkn_pdp_cnt = 0;
1648 } else { /* rra_step_cnt[i] == 0 */
1651 if (isnan(*cdp_val)) {
1652 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1655 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1659 if (isnan(pdp_temp_val)) {
1660 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1663 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1670 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1671 * on the type of consolidation function.
1673 static void initialize_cdp_val(
1676 rrd_value_t pdp_temp_val,
1677 unsigned long elapsed_pdp_st,
1678 unsigned long start_pdp_offset,
1679 unsigned long pdp_cnt)
1681 rrd_value_t cum_val, cur_val;
1683 switch (current_cf) {
1685 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1686 cur_val = IFDNAN(pdp_temp_val, 0.0);
1687 scratch[CDP_primary_val].u_val =
1688 (cum_val + cur_val * start_pdp_offset) /
1689 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1690 scratch[CDP_val].u_val =
1691 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1692 start_pdp_offset, pdp_cnt);
1695 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1696 cur_val = IFDNAN(pdp_temp_val, -DINF);
1699 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1701 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1707 if (cur_val > cum_val)
1708 scratch[CDP_primary_val].u_val = cur_val;
1710 scratch[CDP_primary_val].u_val = cum_val;
1711 /* initialize carry over value */
1712 scratch[CDP_val].u_val = pdp_temp_val;
1715 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1716 cur_val = IFDNAN(pdp_temp_val, DINF);
1719 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1721 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1727 if (cur_val < cum_val)
1728 scratch[CDP_primary_val].u_val = cur_val;
1730 scratch[CDP_primary_val].u_val = cum_val;
1731 /* initialize carry over value */
1732 scratch[CDP_val].u_val = pdp_temp_val;
1736 scratch[CDP_primary_val].u_val = pdp_temp_val;
1737 /* initialize carry over value */
1738 scratch[CDP_val].u_val = pdp_temp_val;
1744 * Update the consolidation function for Holt-Winters functions as
1745 * well as other functions that don't actually consolidate multiple
1748 static void reset_cdp(
1750 unsigned long elapsed_pdp_st,
1751 rrd_value_t *pdp_temp,
1752 rrd_value_t *last_seasonal_coef,
1753 rrd_value_t *seasonal_coef,
1757 enum cf_en current_cf)
1759 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1761 switch (current_cf) {
1764 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1765 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1768 case CF_DEVSEASONAL:
1769 /* need to update cached seasonal values, so they are consistent
1770 * with the bulk update */
1771 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1772 * CDP_last_deviation are the same. */
1773 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1774 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1778 /* need to update the null_count and last_null_count.
1779 * even do this for non-DNAN pdp_temp because the
1780 * algorithm is not learning from batch updates. */
1781 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1782 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1785 scratch[CDP_primary_val].u_val = DNAN;
1786 scratch[CDP_secondary_val].u_val = DNAN;
1789 /* do not count missed bulk values as failures */
1790 scratch[CDP_primary_val].u_val = 0;
1791 scratch[CDP_secondary_val].u_val = 0;
1792 /* need to reset violations buffer.
1793 * could do this more carefully, but for now, just
1794 * assume a bulk update wipes away all violations. */
1795 erase_violations(rrd, cdp_idx, rra_idx);
1800 static rrd_value_t initialize_average_carry_over(
1801 rrd_value_t pdp_temp_val,
1802 unsigned long elapsed_pdp_st,
1803 unsigned long start_pdp_offset,
1804 unsigned long pdp_cnt)
1806 /* initialize carry over value */
1807 if (isnan(pdp_temp_val)) {
1810 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1814 * Update or initialize a CDP value based on the consolidation
1817 * Returns the new value.
1819 static rrd_value_t calculate_cdp_val(
1820 rrd_value_t cdp_val,
1821 rrd_value_t pdp_temp_val,
1822 unsigned long elapsed_pdp_st,
1833 if (isnan(cdp_val)) {
1834 if (current_cf == CF_AVERAGE) {
1835 pdp_temp_val *= elapsed_pdp_st;
1838 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1839 i, ii, pdp_temp_val);
1841 return pdp_temp_val;
1843 if (current_cf == CF_AVERAGE)
1844 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1845 if (current_cf == CF_MINIMUM)
1846 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1847 if (current_cf == CF_MAXIMUM)
1848 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1850 return pdp_temp_val;
1854 * For each RRA, update the seasonal values and then call update_aberrant_CF
1855 * for each data source.
1857 * Return 0 on success, -1 on error.
1859 static int update_aberrant_cdps(
1861 rrd_file_t *rrd_file,
1862 unsigned long rra_begin,
1863 unsigned long elapsed_pdp_st,
1864 rrd_value_t *pdp_temp,
1865 rrd_value_t **seasonal_coef)
1867 unsigned long rra_idx, ds_idx, j;
1869 /* number of PDP steps since the last update that
1870 * are assigned to the first CDP to be generated
1871 * since the last update. */
1872 unsigned short scratch_idx;
1873 unsigned long rra_start;
1874 enum cf_en current_cf;
1876 /* this loop is only entered if elapsed_pdp_st < 3 */
1877 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1878 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1879 rra_start = rra_begin;
1880 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1881 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1882 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1883 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1884 if (scratch_idx == CDP_primary_val) {
1885 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1886 elapsed_pdp_st + 1, seasonal_coef);
1888 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1889 elapsed_pdp_st + 2, seasonal_coef);
1892 if (rrd_test_error())
1894 /* loop over data soures within each RRA */
1895 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1896 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1897 rra_idx * (rrd->stat_head->ds_cnt) +
1898 ds_idx, rra_idx, ds_idx, scratch_idx,
1902 rra_start += rrd->rra_def[rra_idx].row_cnt
1903 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1910 * Move sequentially through the file, writing one RRA at a time. Note this
1911 * architecture divorces the computation of CDP with flushing updated RRA
1914 * Return 0 on success, -1 on error.
1916 static int write_to_rras(
1918 rrd_file_t *rrd_file,
1919 unsigned long *rra_step_cnt,
1920 unsigned long rra_begin,
1921 time_t current_time,
1922 unsigned long *skip_update,
1923 rrd_info_t ** pcdp_summary)
1925 unsigned long rra_idx;
1926 unsigned long rra_start;
1927 time_t rra_time = 0; /* time of update for a RRA */
1929 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1931 /* Ready to write to disk */
1932 rra_start = rra_begin;
1934 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1935 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1936 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1939 unsigned short scratch_idx;
1940 unsigned long step_subtract;
1942 for (scratch_idx = CDP_primary_val,
1944 rra_step_cnt[rra_idx] > 0;
1945 rra_step_cnt[rra_idx]--,
1946 scratch_idx = CDP_secondary_val,
1947 step_subtract = 2) {
1951 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1953 /* increment, with wrap-around */
1954 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1955 rra_ptr->cur_row = 0;
1957 /* we know what our position should be */
1958 rra_pos_new = rra_start
1959 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1961 /* re-seek if the position is wrong or we wrapped around */
1962 if (rra_pos_new != rrd_file->pos) {
1963 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1964 rrd_set_error("seek error in rrd");
1969 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1972 if (skip_update[rra_idx])
1975 if (*pcdp_summary != NULL) {
1976 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1978 rra_time = (current_time - current_time % step_time)
1979 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1983 (rrd_file, rrd, rra_idx, scratch_idx,
1984 pcdp_summary, rra_time) == -1)
1988 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1995 * Write out one row of values (one value per DS) to the archive.
1997 * Returns 0 on success, -1 on error.
1999 static int write_RRA_row(
2000 rrd_file_t *rrd_file,
2002 unsigned long rra_idx,
2003 unsigned short CDP_scratch_idx,
2004 rrd_info_t ** pcdp_summary,
2007 unsigned long ds_idx, cdp_idx;
2010 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2011 /* compute the cdp index */
2012 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2014 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2015 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2016 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2018 if (*pcdp_summary != NULL) {
2019 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2020 /* append info to the return hash */
2021 *pcdp_summary = rrd_info_push(*pcdp_summary,
2023 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2024 rrd->rra_def[rra_idx].cf_nam,
2025 rrd->rra_def[rra_idx].pdp_cnt,
2026 rrd->ds_def[ds_idx].ds_nam),
2029 if (rrd_write(rrd_file,
2030 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2031 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2032 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2040 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2042 * Returns 0 on success, -1 otherwise
2044 static int smooth_all_rras(
2046 rrd_file_t *rrd_file,
2047 unsigned long rra_begin)
2049 unsigned long rra_start = rra_begin;
2050 unsigned long rra_idx;
2052 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2053 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2054 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2056 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2058 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2059 if (rrd_test_error())
2062 rra_start += rrd->rra_def[rra_idx].row_cnt
2063 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2070 * Flush changes to disk (unless we're using mmap)
2072 * Returns 0 on success, -1 otherwise
2074 static int write_changes_to_disk(
2076 rrd_file_t *rrd_file,
2079 /* we just need to write back the live header portion now */
2080 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2081 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2082 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2084 rrd_set_error("seek rrd for live header writeback");
2088 if (rrd_write(rrd_file, rrd->live_head,
2089 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2090 rrd_set_error("rrd_write live_head to rrd");
2094 if (rrd_write(rrd_file, rrd->legacy_last_up,
2095 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2096 rrd_set_error("rrd_write live_head to rrd");
2102 if (rrd_write(rrd_file, rrd->pdp_prep,
2103 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2104 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2105 rrd_set_error("rrd_write pdp_prep to rrd");
2109 if (rrd_write(rrd_file, rrd->cdp_prep,
2110 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2111 rrd->stat_head->ds_cnt)
2112 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2113 rrd->stat_head->ds_cnt)) {
2115 rrd_set_error("rrd_write cdp_prep to rrd");
2119 if (rrd_write(rrd_file, rrd->rra_ptr,
2120 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2121 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2122 rrd_set_error("rrd_write rra_ptr to rrd");