2 /*****************************************************************************
3 * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 unsigned long *rra_current,
102 time_t *current_time,
103 unsigned long *current_time_usec,
104 rrd_value_t *pdp_temp,
105 rrd_value_t *pdp_new,
106 unsigned long *rra_step_cnt,
109 unsigned long tmpl_cnt,
110 rrd_info_t ** pcdp_summary,
112 unsigned long *skip_update,
113 int *schedule_smooth);
120 unsigned long tmpl_cnt,
121 time_t *current_time,
122 unsigned long *current_time_usec,
125 static int get_time_from_reading(
129 time_t *current_time,
130 unsigned long *current_time_usec,
133 static int update_pdp_prep(
136 rrd_value_t *pdp_new,
139 static int calculate_elapsed_steps(
141 unsigned long current_time,
142 unsigned long current_time_usec,
146 unsigned long *proc_pdp_cnt);
148 static void simple_update(
151 rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
158 unsigned long elapsed_pdp_st,
159 rrd_value_t *pdp_new,
160 rrd_value_t *pdp_temp);
162 static int process_pdp_st(
164 unsigned long ds_idx,
169 rrd_value_t *pdp_new,
170 rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
174 unsigned long *rra_step_cnt,
175 unsigned long rra_begin,
176 rrd_file_t *rrd_file,
177 unsigned long elapsed_pdp_st,
178 unsigned long proc_pdp_cnt,
179 rrd_value_t **last_seasonal_coef,
180 rrd_value_t **seasonal_coef,
181 rrd_value_t *pdp_temp,
182 unsigned long *rra_current,
183 unsigned long *skip_update,
184 int *schedule_smooth);
186 static int do_schedule_smooth(
188 unsigned long rra_idx,
189 unsigned long elapsed_pdp_st);
191 static int update_cdp_prep(
193 unsigned long elapsed_pdp_st,
194 unsigned long start_pdp_offset,
195 unsigned long *rra_step_cnt,
197 rrd_value_t *pdp_temp,
198 rrd_value_t *last_seasonal_coef,
199 rrd_value_t *seasonal_coef,
202 static void update_cdp(
205 rrd_value_t pdp_temp_val,
206 unsigned long rra_step_cnt,
207 unsigned long elapsed_pdp_st,
208 unsigned long start_pdp_offset,
209 unsigned long pdp_cnt,
214 static void initialize_cdp_val(
217 rrd_value_t pdp_temp_val,
218 unsigned long elapsed_pdp_st,
219 unsigned long start_pdp_offset,
220 unsigned long pdp_cnt);
222 static void reset_cdp(
224 unsigned long elapsed_pdp_st,
225 rrd_value_t *pdp_temp,
226 rrd_value_t *last_seasonal_coef,
227 rrd_value_t *seasonal_coef,
231 enum cf_en current_cf);
233 static rrd_value_t initialize_average_carry_over(
234 rrd_value_t pdp_temp_val,
235 unsigned long elapsed_pdp_st,
236 unsigned long start_pdp_offset,
237 unsigned long pdp_cnt);
239 static rrd_value_t calculate_cdp_val(
241 rrd_value_t pdp_temp_val,
242 unsigned long elapsed_pdp_st,
247 static int update_aberrant_cdps(
249 rrd_file_t *rrd_file,
250 unsigned long rra_begin,
251 unsigned long *rra_current,
252 unsigned long elapsed_pdp_st,
253 rrd_value_t *pdp_temp,
254 rrd_value_t **seasonal_coef);
256 static int write_to_rras(
258 rrd_file_t *rrd_file,
259 unsigned long *rra_step_cnt,
260 unsigned long rra_begin,
261 unsigned long *rra_current,
263 unsigned long *skip_update,
264 rrd_info_t ** pcdp_summary);
266 static int write_RRA_row(
267 rrd_file_t *rrd_file,
269 unsigned long rra_idx,
270 unsigned long *rra_current,
271 unsigned short CDP_scratch_idx,
272 rrd_info_t ** pcdp_summary,
275 static int smooth_all_rras(
277 rrd_file_t *rrd_file,
278 unsigned long rra_begin);
281 static int write_changes_to_disk(
283 rrd_file_t *rrd_file,
288 * normalize time as returned by gettimeofday. usec part must
291 static inline void normalize_time(
294 if (t->tv_usec < 0) {
301 * Sets current_time and current_time_usec based on the current time.
302 * current_time_usec is set to 0 if the version number is 1 or 2.
304 static inline void initialize_time(
305 time_t *current_time,
306 unsigned long *current_time_usec,
309 struct timeval tmp_time; /* used for time conversion */
311 gettimeofday(&tmp_time, 0);
312 normalize_time(&tmp_time);
313 *current_time = tmp_time.tv_sec;
315 *current_time_usec = tmp_time.tv_usec;
317 *current_time_usec = 0;
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
323 rrd_info_t *rrd_update_v(
328 rrd_info_t *result = NULL;
330 struct option long_options[] = {
331 {"template", required_argument, 0, 't'},
337 opterr = 0; /* initialize getopt */
340 int option_index = 0;
343 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
354 rrd_set_error("unknown option '%s'", argv[optind - 1]);
359 /* need at least 2 arguments: filename, data. */
360 if (argc - optind < 2) {
361 rrd_set_error("Not enough arguments");
365 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366 rc.u_int = _rrd_update(argv[optind], tmplt,
368 (const char **) (argv + optind + 1), result);
369 result->value.u_int = rc.u_int;
378 struct option long_options[] = {
379 {"template", required_argument, 0, 't'},
380 {"daemon", required_argument, 0, 'd'},
383 int option_index = 0;
390 opterr = 0; /* initialize getopt */
393 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
400 tmplt = strdup(optarg);
406 daemon = strdup (optarg);
409 rrd_set_error("strdup failed.");
415 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 /* need at least 2 arguments: filename, data. */
421 if (argc - optind < 2) {
422 rrd_set_error("Not enough arguments");
426 if ((tmplt != NULL) && (daemon != NULL))
428 rrd_set_error("The caching daemon cannot be used together with "
433 if ((tmplt == NULL) && (daemon == NULL))
437 temp = getenv (ENV_RRDCACHED_ADDRESS);
440 daemon = strdup (temp);
443 rrd_set_error("strdup failed.");
453 status = rrdc_connect (daemon);
456 rrd_set_error("Unable to connect to daemon: %s",
459 : rrd_strerror (status));
463 status = rrdc_update (/* file = */ argv[optind],
464 /* values_num = */ argc - optind - 1,
465 /* values = */ (void *) (argv + optind + 1));
468 rrd_set_error("Failed sending the values to the daemon: %s",
471 : rrd_strerror (status));
476 } /* if (daemon != NULL) */
478 rc = rrd_update_r(argv[optind], tmplt,
479 argc - optind - 1, (const char **) (argv + optind + 1));
495 const char *filename,
500 return _rrd_update(filename, tmplt, argc, argv, NULL);
504 const char *filename,
508 rrd_info_t * pcdp_summary)
513 unsigned long rra_begin; /* byte pointer to the rra
514 * area in the rrd file. this
515 * pointer never changes value */
516 unsigned long rra_current; /* byte pointer to the current write
517 * spot in the rrd file. */
518 rrd_value_t *pdp_new; /* prepare the incoming data to be added
519 * to the existing entry */
520 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
521 * to the cdp values */
523 long *tmpl_idx; /* index representing the settings
524 * transported by the tmplt index */
525 unsigned long tmpl_cnt = 2; /* time and data */
527 time_t current_time = 0;
528 unsigned long current_time_usec = 0; /* microseconds part of current time */
530 int schedule_smooth = 0;
532 /* number of elapsed PDP steps since last update */
533 unsigned long *rra_step_cnt = NULL;
535 int version; /* rrd version */
536 rrd_file_t *rrd_file;
537 char *arg_copy; /* for processing the argv */
538 unsigned long *skip_update; /* RRAs to advance but not write */
540 /* need at least 1 arguments: data. */
542 rrd_set_error("Not enough arguments");
546 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
549 /* We are now at the beginning of the rra's */
550 rra_current = rra_begin = rrd_file->header_len;
552 version = atoi(rrd.stat_head->version);
554 initialize_time(¤t_time, ¤t_time_usec, version);
556 /* get exclusive lock to whole file.
557 * lock gets removed when we close the file.
559 if (rrd_lock(rrd_file) != 0) {
560 rrd_set_error("could not lock RRD");
564 if (allocate_data_structures(&rrd, &updvals,
565 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
566 &rra_step_cnt, &skip_update,
571 /* loop through the arguments. */
572 for (arg_i = 0; arg_i < argc; arg_i++) {
573 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
574 rrd_set_error("failed duplication argv entry");
577 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
578 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
579 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
580 &pcdp_summary, version, skip_update,
581 &schedule_smooth) == -1) {
582 if (rrd_test_error()) { /* Should have error string always here */
585 /* Prepend file name to error message */
586 if ((save_error = strdup(rrd_get_error())) != NULL) {
587 rrd_set_error("%s: %s", filename, save_error);
599 /* if we got here and if there is an error and if the file has not been
600 * written to, then close things up and return. */
601 if (rrd_test_error()) {
602 goto err_free_structures;
605 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
606 goto err_free_structures;
610 /* calling the smoothing code here guarantees at most one smoothing
611 * operation per rrd_update call. Unfortunately, it is possible with bulk
612 * updates, or a long-delayed update for smoothing to occur off-schedule.
613 * This really isn't critical except during the burn-in cycles. */
614 if (schedule_smooth) {
615 smooth_all_rras(&rrd, rrd_file, rra_begin);
618 /* rrd_dontneed(rrd_file,&rrd); */
644 * get exclusive lock to whole file.
645 * lock gets removed when we close the file
647 * returns 0 on success
655 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
658 if (_fstat(file->fd, &st) == 0) {
659 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
666 lock.l_type = F_WRLCK; /* exclusive write lock */
667 lock.l_len = 0; /* whole file */
668 lock.l_start = 0; /* start of file */
669 lock.l_whence = SEEK_SET; /* end of file */
671 rcstat = fcntl(file->fd, F_SETLK, &lock);
679 * Allocate some important arrays used, and initialize the template.
681 * When it returns, either all of the structures are allocated
682 * or none of them are.
684 * Returns 0 on success, -1 on error.
686 static int allocate_data_structures(
689 rrd_value_t **pdp_temp,
692 unsigned long *tmpl_cnt,
693 unsigned long **rra_step_cnt,
694 unsigned long **skip_update,
695 rrd_value_t **pdp_new)
698 if ((*updvals = (char **) malloc(sizeof(char *)
699 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
700 rrd_set_error("allocating updvals pointer array.");
703 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
704 * rrd->stat_head->ds_cnt)) ==
706 rrd_set_error("allocating pdp_temp.");
707 goto err_free_updvals;
709 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
711 rrd->stat_head->rra_cnt)) ==
713 rrd_set_error("allocating skip_update.");
714 goto err_free_pdp_temp;
716 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
717 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
718 rrd_set_error("allocating tmpl_idx.");
719 goto err_free_skip_update;
721 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
724 rra_cnt))) == NULL) {
725 rrd_set_error("allocating rra_step_cnt.");
726 goto err_free_tmpl_idx;
729 /* initialize tmplt redirector */
730 /* default config example (assume DS 1 is a CDEF DS)
731 tmpl_idx[0] -> 0; (time)
732 tmpl_idx[1] -> 1; (DS 0)
733 tmpl_idx[2] -> 3; (DS 2)
734 tmpl_idx[3] -> 4; (DS 3) */
735 (*tmpl_idx)[0] = 0; /* time */
736 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
737 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
738 (*tmpl_idx)[ii++] = i;
743 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
744 goto err_free_rra_step_cnt;
748 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
749 * rrd->stat_head->ds_cnt)) == NULL) {
750 rrd_set_error("allocating pdp_new.");
751 goto err_free_rra_step_cnt;
756 err_free_rra_step_cnt:
760 err_free_skip_update:
770 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
772 * Returns 0 on success.
774 static int parse_template(
777 unsigned long *tmpl_cnt,
780 char *dsname, *tmplt_copy;
781 unsigned int tmpl_len, i;
784 *tmpl_cnt = 1; /* the first entry is the time */
786 /* we should work on a writeable copy here */
787 if ((tmplt_copy = strdup(tmplt)) == NULL) {
788 rrd_set_error("error copying tmplt '%s'", tmplt);
794 tmpl_len = strlen(tmplt_copy);
795 for (i = 0; i <= tmpl_len; i++) {
796 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
797 tmplt_copy[i] = '\0';
798 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
799 rrd_set_error("tmplt contains more DS definitions than RRD");
801 goto out_free_tmpl_copy;
803 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
804 rrd_set_error("unknown DS name '%s'", dsname);
806 goto out_free_tmpl_copy;
808 /* go to the next entry on the tmplt_copy */
810 dsname = &tmplt_copy[i + 1];
820 * Parse an update string, updates the primary data points (PDPs)
821 * and consolidated data points (CDPs), and writes changes to the RRAs.
823 * Returns 0 on success, -1 on error.
825 static int process_arg(
828 rrd_file_t *rrd_file,
829 unsigned long rra_begin,
830 unsigned long *rra_current,
831 time_t *current_time,
832 unsigned long *current_time_usec,
833 rrd_value_t *pdp_temp,
834 rrd_value_t *pdp_new,
835 unsigned long *rra_step_cnt,
838 unsigned long tmpl_cnt,
839 rrd_info_t ** pcdp_summary,
841 unsigned long *skip_update,
842 int *schedule_smooth)
844 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
846 /* a vector of future Holt-Winters seasonal coefs */
847 unsigned long elapsed_pdp_st;
849 double interval, pre_int, post_int; /* interval between this and
851 unsigned long proc_pdp_cnt;
852 unsigned long rra_start;
854 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
855 current_time, current_time_usec, version) == -1) {
858 /* seek to the beginning of the rra's */
859 if (*rra_current != rra_begin) {
861 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
862 rrd_set_error("seek error in rrd");
866 *rra_current = rra_begin;
868 rra_start = rra_begin;
870 interval = (double) (*current_time - rrd->live_head->last_up)
871 + (double) ((long) *current_time_usec -
872 (long) rrd->live_head->last_up_usec) / 1e6f;
874 /* process the data sources and update the pdp_prep
875 * area accordingly */
876 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
880 elapsed_pdp_st = calculate_elapsed_steps(rrd,
882 *current_time_usec, interval,
886 /* has a pdp_st moment occurred since the last run ? */
887 if (elapsed_pdp_st == 0) {
888 /* no we have not passed a pdp_st moment. therefore update is simple */
889 simple_update(rrd, interval, pdp_new);
891 /* an pdp_st has occurred. */
892 if (process_all_pdp_st(rrd, interval,
894 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
897 if (update_all_cdp_prep(rrd, rra_step_cnt,
903 pdp_temp, rra_current,
904 skip_update, schedule_smooth) == -1) {
905 goto err_free_coefficients;
907 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
908 elapsed_pdp_st, pdp_temp,
909 &seasonal_coef) == -1) {
910 goto err_free_coefficients;
912 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
913 rra_current, *current_time, skip_update,
914 pcdp_summary) == -1) {
915 goto err_free_coefficients;
917 } /* endif a pdp_st has occurred */
918 rrd->live_head->last_up = *current_time;
919 rrd->live_head->last_up_usec = *current_time_usec;
922 *rrd->legacy_last_up = rrd->live_head->last_up;
925 free(last_seasonal_coef);
928 err_free_coefficients:
930 free(last_seasonal_coef);
935 * Parse a DS string (time + colon-separated values), storing the
936 * results in current_time, current_time_usec, and updvals.
938 * Returns 0 on success, -1 on error.
945 unsigned long tmpl_cnt,
946 time_t *current_time,
947 unsigned long *current_time_usec,
955 /* initialize all ds input to unknown except the first one
956 which has always got to be set */
957 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
960 /* separate all ds elements; first must be examined separately
961 due to alternate time syntax */
962 if ((p = strchr(input, '@')) != NULL) {
964 } else if ((p = strchr(input, ':')) != NULL) {
967 rrd_set_error("expected timestamp not found in data source from %s",
973 updvals[tmpl_idx[i++]] = p + 1;
978 updvals[tmpl_idx[i++]] = p + 1;
984 rrd_set_error("expected %lu data source readings (got %lu) from %s",
985 tmpl_cnt - 1, i, input);
989 if (get_time_from_reading(rrd, timesyntax, updvals,
990 current_time, current_time_usec,
998 * Parse the time in a DS string, store it in current_time and
999 * current_time_usec and verify that it's later than the last
1000 * update for this DS.
1002 * Returns 0 on success, -1 on error.
1004 static int get_time_from_reading(
1008 time_t *current_time,
1009 unsigned long *current_time_usec,
1013 char *parsetime_error = NULL;
1015 rrd_time_value_t ds_tv;
1016 struct timeval tmp_time; /* used for time conversion */
1018 /* get the time from the reading ... handle N */
1019 if (timesyntax == '@') { /* at-style */
1020 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1021 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1024 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1025 ds_tv.type == RELATIVE_TO_START_TIME) {
1026 rrd_set_error("specifying time relative to the 'start' "
1027 "or 'end' makes no sense here: %s", updvals[0]);
1030 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1031 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1032 } else if (strcmp(updvals[0], "N") == 0) {
1033 gettimeofday(&tmp_time, 0);
1034 normalize_time(&tmp_time);
1035 *current_time = tmp_time.tv_sec;
1036 *current_time_usec = tmp_time.tv_usec;
1038 old_locale = setlocale(LC_NUMERIC, "C");
1039 tmp = strtod(updvals[0], 0);
1040 setlocale(LC_NUMERIC, old_locale);
1041 *current_time = floor(tmp);
1042 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1044 /* dont do any correction for old version RRDs */
1046 *current_time_usec = 0;
1048 if (*current_time < rrd->live_head->last_up ||
1049 (*current_time == rrd->live_head->last_up &&
1050 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1051 rrd_set_error("illegal attempt to update using time %ld when "
1052 "last update time is %ld (minimum one second step)",
1053 *current_time, rrd->live_head->last_up);
1060 * Update pdp_new by interpreting the updvals according to the DS type
1061 * (COUNTER, GAUGE, etc.).
1063 * Returns 0 on success, -1 on error.
1065 static int update_pdp_prep(
1068 rrd_value_t *pdp_new,
1071 unsigned long ds_idx;
1073 char *endptr; /* used in the conversion */
1076 enum dst_en dst_idx;
1078 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1079 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1081 /* make sure we do not build diffs with old last_ds values */
1082 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1083 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1084 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1087 /* NOTE: DST_CDEF should never enter this if block, because
1088 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1089 * accidently specified a value for the DST_CDEF. To handle this case,
1090 * an extra check is required. */
1092 if ((updvals[ds_idx + 1][0] != 'U') &&
1093 (dst_idx != DST_CDEF) &&
1094 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1097 /* pdp_new contains rate * time ... eg the bytes transferred during
1098 * the interval. Doing it this way saves a lot of math operations
1103 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1104 if ((updvals[ds_idx + 1][ii] < '0'
1105 || updvals[ds_idx + 1][ii] > '9')
1106 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1107 rrd_set_error("not a simple integer: '%s'",
1108 updvals[ds_idx + 1]);
1112 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1114 rrd_diff(updvals[ds_idx + 1],
1115 rrd->pdp_prep[ds_idx].last_ds);
1116 if (dst_idx == DST_COUNTER) {
1117 /* simple overflow catcher. This will fail
1118 * terribly for non 32 or 64 bit counters
1119 * ... are there any others in SNMP land?
1121 if (pdp_new[ds_idx] < (double) 0.0)
1122 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1123 if (pdp_new[ds_idx] < (double) 0.0)
1124 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1126 rate = pdp_new[ds_idx] / interval;
1128 pdp_new[ds_idx] = DNAN;
1132 old_locale = setlocale(LC_NUMERIC, "C");
1134 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1135 setlocale(LC_NUMERIC, old_locale);
1137 rrd_set_error("converting '%s' to float: %s",
1138 updvals[ds_idx + 1], rrd_strerror(errno));
1141 if (endptr[0] != '\0') {
1143 ("conversion of '%s' to float not complete: tail '%s'",
1144 updvals[ds_idx + 1], endptr);
1147 rate = pdp_new[ds_idx] / interval;
1151 old_locale = setlocale(LC_NUMERIC, "C");
1153 strtod(updvals[ds_idx + 1], &endptr) * interval;
1154 setlocale(LC_NUMERIC, old_locale);
1156 rrd_set_error("converting '%s' to float: %s",
1157 updvals[ds_idx + 1], rrd_strerror(errno));
1160 if (endptr[0] != '\0') {
1162 ("conversion of '%s' to float not complete: tail '%s'",
1163 updvals[ds_idx + 1], endptr);
1166 rate = pdp_new[ds_idx] / interval;
1169 rrd_set_error("rrd contains unknown DS type : '%s'",
1170 rrd->ds_def[ds_idx].dst);
1173 /* break out of this for loop if the error string is set */
1174 if (rrd_test_error()) {
1177 /* make sure pdp_temp is neither too large or too small
1178 * if any of these occur it becomes unknown ...
1179 * sorry folks ... */
1181 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1182 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1183 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1184 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1185 pdp_new[ds_idx] = DNAN;
1188 /* no news is news all the same */
1189 pdp_new[ds_idx] = DNAN;
1193 /* make a copy of the command line argument for the next run */
1195 fprintf(stderr, "prep ds[%lu]\t"
1199 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1202 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1204 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1210 * How many PDP steps have elapsed since the last update? Returns the answer,
1211 * and stores the time between the last update and the last PDP in pre_time,
1212 * and the time between the last PDP and the current time in post_int.
1214 static int calculate_elapsed_steps(
1216 unsigned long current_time,
1217 unsigned long current_time_usec,
1221 unsigned long *proc_pdp_cnt)
1223 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1224 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1226 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1227 * when it was last updated */
1228 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1230 /* when was the current pdp started */
1231 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1232 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1234 /* when did the last pdp_st occur */
1235 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1236 occu_pdp_st = current_time - occu_pdp_age;
1238 if (occu_pdp_st > proc_pdp_st) {
1239 /* OK we passed the pdp_st moment */
1240 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1241 * occurred before the latest
1243 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1244 *post_int = occu_pdp_age; /* how much after it */
1245 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1247 *pre_int = interval;
1251 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1254 printf("proc_pdp_age %lu\t"
1256 "occu_pfp_age %lu\t"
1260 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1261 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1264 /* compute the number of elapsed pdp_st moments */
1265 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1269 * Increment the PDP values by the values in pdp_new, or else initialize them.
1271 static void simple_update(
1274 rrd_value_t *pdp_new)
1278 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1279 if (isnan(pdp_new[i])) {
1280 /* this is not really accurate if we use subsecond data arrival time
1281 should have thought of it when going subsecond resolution ...
1282 sorry next format change we will have it! */
1283 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1286 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1287 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1289 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1298 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1299 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1305 * Call process_pdp_st for each DS.
1307 * Returns 0 on success, -1 on error.
1309 static int process_all_pdp_st(
1314 unsigned long elapsed_pdp_st,
1315 rrd_value_t *pdp_new,
1316 rrd_value_t *pdp_temp)
1318 unsigned long ds_idx;
1320 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1321 rate*seconds which occurred up to the last run.
1322 pdp_new[] contains rate*seconds from the latest run.
1323 pdp_temp[] will contain the rate for cdp */
1325 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1326 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1327 elapsed_pdp_st * rrd->stat_head->pdp_step,
1328 pdp_new, pdp_temp) == -1) {
1332 fprintf(stderr, "PDP UPD ds[%lu]\t"
1333 "elapsed_pdp_st %lu\t"
1336 "new_unkn_sec %5lu\n",
1340 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1341 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1348 * Process an update that occurs after one of the PDP moments.
1349 * Increments the PDP value, sets NAN if time greater than the
1350 * heartbeats have elapsed, processes CDEFs.
1352 * Returns 0 on success, -1 on error.
1354 static int process_pdp_st(
1356 unsigned long ds_idx,
1360 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1361 rrd_value_t *pdp_new,
1362 rrd_value_t *pdp_temp)
1366 /* update pdp_prep to the current pdp_st. */
1367 double pre_unknown = 0.0;
1368 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1369 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1371 rpnstack_t rpnstack; /* used for COMPUTE DS */
1373 rpnstack_init(&rpnstack);
1376 if (isnan(pdp_new[ds_idx])) {
1377 /* a final bit of unknown to be added before calculation
1378 we use a temporary variable for this so that we
1379 don't have to turn integer lines before using the value */
1380 pre_unknown = pre_int;
1382 if (isnan(scratch[PDP_val].u_val)) {
1383 scratch[PDP_val].u_val = 0;
1385 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1388 /* if too much of the pdp_prep is unknown we dump it */
1389 /* if the interval is larger thatn mrhb we get NAN */
1390 if ((interval > mrhb) ||
1391 (rrd->stat_head->pdp_step / 2.0 <
1392 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1393 pdp_temp[ds_idx] = DNAN;
1395 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1396 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1400 /* process CDEF data sources; remember each CDEF DS can
1401 * only reference other DS with a lower index number */
1402 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1406 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1407 /* substitute data values for OP_VARIABLE nodes */
1408 for (i = 0; rpnp[i].op != OP_END; i++) {
1409 if (rpnp[i].op == OP_VARIABLE) {
1410 rpnp[i].op = OP_NUMBER;
1411 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1414 /* run the rpn calculator */
1415 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1417 rpnstack_free(&rpnstack);
1422 /* make pdp_prep ready for the next run */
1423 if (isnan(pdp_new[ds_idx])) {
1424 /* this is not realy accurate if we use subsecond data arival time
1425 should have thought of it when going subsecond resolution ...
1426 sorry next format change we will have it! */
1427 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1428 scratch[PDP_val].u_val = DNAN;
1430 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1431 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1433 rpnstack_free(&rpnstack);
1438 * Iterate over all the RRAs for a given DS and:
1439 * 1. Decide whether to schedule a smooth later
1440 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1443 * Returns 0 on success, -1 on error
1445 static int update_all_cdp_prep(
1447 unsigned long *rra_step_cnt,
1448 unsigned long rra_begin,
1449 rrd_file_t *rrd_file,
1450 unsigned long elapsed_pdp_st,
1451 unsigned long proc_pdp_cnt,
1452 rrd_value_t **last_seasonal_coef,
1453 rrd_value_t **seasonal_coef,
1454 rrd_value_t *pdp_temp,
1455 unsigned long *rra_current,
1456 unsigned long *skip_update,
1457 int *schedule_smooth)
1459 unsigned long rra_idx;
1461 /* index into the CDP scratch array */
1462 enum cf_en current_cf;
1463 unsigned long rra_start;
1465 /* number of rows to be updated in an RRA for a data value. */
1466 unsigned long start_pdp_offset;
1468 rra_start = rra_begin;
1469 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1470 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1472 rrd->rra_def[rra_idx].pdp_cnt -
1473 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1474 skip_update[rra_idx] = 0;
1475 if (start_pdp_offset <= elapsed_pdp_st) {
1476 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1477 rrd->rra_def[rra_idx].pdp_cnt + 1;
1479 rra_step_cnt[rra_idx] = 0;
1482 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1483 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1484 * so that they will be correct for the next observed value; note that for
1485 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1486 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1487 if (rra_step_cnt[rra_idx] > 1) {
1488 skip_update[rra_idx] = 1;
1489 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1490 elapsed_pdp_st, last_seasonal_coef);
1491 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1492 elapsed_pdp_st + 1, seasonal_coef);
1494 /* periodically run a smoother for seasonal effects */
1495 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1498 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1499 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1500 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1503 *schedule_smooth = 1;
1505 *rra_current = rrd_tell(rrd_file);
1507 if (rrd_test_error())
1511 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1512 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1513 current_cf) == -1) {
1517 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1518 sizeof(rrd_value_t);
1524 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1526 static int do_schedule_smooth(
1528 unsigned long rra_idx,
1529 unsigned long elapsed_pdp_st)
1531 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1532 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1533 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1534 unsigned long seasonal_smooth_idx =
1535 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1536 unsigned long *init_seasonal =
1537 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1539 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1540 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1541 * really an RRA level, not a data source within RRA level parameter, but
1542 * the rra_def is read only for rrd_update (not flushed to disk). */
1543 if (*init_seasonal > BURNIN_CYCLES) {
1544 /* someone has no doubt invented a trick to deal with this wrap around,
1545 * but at least this code is clear. */
1546 if (seasonal_smooth_idx > cur_row) {
1547 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1548 * between PDP and CDP */
1549 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1551 /* can't rely on negative numbers because we are working with
1552 * unsigned values */
1553 return (cur_row + elapsed_pdp_st >= row_cnt
1554 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1556 /* mark off one of the burn-in cycles */
1557 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1561 * For a given RRA, iterate over the data sources and call the appropriate
1562 * consolidation function.
1564 * Returns 0 on success, -1 on error.
1566 static int update_cdp_prep(
1568 unsigned long elapsed_pdp_st,
1569 unsigned long start_pdp_offset,
1570 unsigned long *rra_step_cnt,
1572 rrd_value_t *pdp_temp,
1573 rrd_value_t *last_seasonal_coef,
1574 rrd_value_t *seasonal_coef,
1577 unsigned long ds_idx, cdp_idx;
1579 /* update CDP_PREP areas */
1580 /* loop over data soures within each RRA */
1581 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1583 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1585 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1586 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1587 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1588 elapsed_pdp_st, start_pdp_offset,
1589 rrd->rra_def[rra_idx].pdp_cnt,
1590 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1593 /* Nothing to consolidate if there's one PDP per CDP. However, if
1594 * we've missed some PDPs, let's update null counters etc. */
1595 if (elapsed_pdp_st > 2) {
1596 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1597 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1602 if (rrd_test_error())
1604 } /* endif data sources loop */
1609 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1610 * primary value, secondary value, and # of unknowns.
1612 static void update_cdp(
1615 rrd_value_t pdp_temp_val,
1616 unsigned long rra_step_cnt,
1617 unsigned long elapsed_pdp_st,
1618 unsigned long start_pdp_offset,
1619 unsigned long pdp_cnt,
1624 /* shorthand variables */
1625 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1626 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1627 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1628 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1631 /* If we are in this block, as least 1 CDP value will be written to
1632 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1633 * to be written, then the "fill in" value is the CDP_secondary_val
1635 if (isnan(pdp_temp_val)) {
1636 *cdp_unkn_pdp_cnt += start_pdp_offset;
1637 *cdp_secondary_val = DNAN;
1639 /* CDP_secondary value is the RRA "fill in" value for intermediary
1640 * CDP data entries. No matter the CF, the value is the same because
1641 * the average, max, min, and last of a list of identical values is
1642 * the same, namely, the value itself. */
1643 *cdp_secondary_val = pdp_temp_val;
1646 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1647 *cdp_primary_val = DNAN;
1648 if (current_cf == CF_AVERAGE) {
1650 initialize_average_carry_over(pdp_temp_val,
1652 start_pdp_offset, pdp_cnt);
1654 *cdp_val = pdp_temp_val;
1657 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1658 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1659 } /* endif meets xff value requirement for a valid value */
1660 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1661 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1662 if (isnan(pdp_temp_val))
1663 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1665 *cdp_unkn_pdp_cnt = 0;
1666 } else { /* rra_step_cnt[i] == 0 */
1669 if (isnan(*cdp_val)) {
1670 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1673 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1677 if (isnan(pdp_temp_val)) {
1678 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1681 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1688 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1689 * on the type of consolidation function.
1691 static void initialize_cdp_val(
1694 rrd_value_t pdp_temp_val,
1695 unsigned long elapsed_pdp_st,
1696 unsigned long start_pdp_offset,
1697 unsigned long pdp_cnt)
1699 rrd_value_t cum_val, cur_val;
1701 switch (current_cf) {
1703 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1704 cur_val = IFDNAN(pdp_temp_val, 0.0);
1705 scratch[CDP_primary_val].u_val =
1706 (cum_val + cur_val * start_pdp_offset) /
1707 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1708 scratch[CDP_val].u_val =
1709 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1710 start_pdp_offset, pdp_cnt);
1713 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1714 cur_val = IFDNAN(pdp_temp_val, -DINF);
1717 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1719 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1725 if (cur_val > cum_val)
1726 scratch[CDP_primary_val].u_val = cur_val;
1728 scratch[CDP_primary_val].u_val = cum_val;
1729 /* initialize carry over value */
1730 scratch[CDP_val].u_val = pdp_temp_val;
1733 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1734 cur_val = IFDNAN(pdp_temp_val, DINF);
1737 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1739 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1745 if (cur_val < cum_val)
1746 scratch[CDP_primary_val].u_val = cur_val;
1748 scratch[CDP_primary_val].u_val = cum_val;
1749 /* initialize carry over value */
1750 scratch[CDP_val].u_val = pdp_temp_val;
1754 scratch[CDP_primary_val].u_val = pdp_temp_val;
1755 /* initialize carry over value */
1756 scratch[CDP_val].u_val = pdp_temp_val;
1762 * Update the consolidation function for Holt-Winters functions as
1763 * well as other functions that don't actually consolidate multiple
1766 static void reset_cdp(
1768 unsigned long elapsed_pdp_st,
1769 rrd_value_t *pdp_temp,
1770 rrd_value_t *last_seasonal_coef,
1771 rrd_value_t *seasonal_coef,
1775 enum cf_en current_cf)
1777 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1779 switch (current_cf) {
1782 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1783 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1786 case CF_DEVSEASONAL:
1787 /* need to update cached seasonal values, so they are consistent
1788 * with the bulk update */
1789 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1790 * CDP_last_deviation are the same. */
1791 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1792 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1796 /* need to update the null_count and last_null_count.
1797 * even do this for non-DNAN pdp_temp because the
1798 * algorithm is not learning from batch updates. */
1799 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1800 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1803 scratch[CDP_primary_val].u_val = DNAN;
1804 scratch[CDP_secondary_val].u_val = DNAN;
1807 /* do not count missed bulk values as failures */
1808 scratch[CDP_primary_val].u_val = 0;
1809 scratch[CDP_secondary_val].u_val = 0;
1810 /* need to reset violations buffer.
1811 * could do this more carefully, but for now, just
1812 * assume a bulk update wipes away all violations. */
1813 erase_violations(rrd, cdp_idx, rra_idx);
1818 static rrd_value_t initialize_average_carry_over(
1819 rrd_value_t pdp_temp_val,
1820 unsigned long elapsed_pdp_st,
1821 unsigned long start_pdp_offset,
1822 unsigned long pdp_cnt)
1824 /* initialize carry over value */
1825 if (isnan(pdp_temp_val)) {
1828 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1832 * Update or initialize a CDP value based on the consolidation
1835 * Returns the new value.
1837 static rrd_value_t calculate_cdp_val(
1838 rrd_value_t cdp_val,
1839 rrd_value_t pdp_temp_val,
1840 unsigned long elapsed_pdp_st,
1851 if (isnan(cdp_val)) {
1852 if (current_cf == CF_AVERAGE) {
1853 pdp_temp_val *= elapsed_pdp_st;
1856 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1857 i, ii, pdp_temp_val);
1859 return pdp_temp_val;
1861 if (current_cf == CF_AVERAGE)
1862 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1863 if (current_cf == CF_MINIMUM)
1864 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1865 if (current_cf == CF_MAXIMUM)
1866 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1868 return pdp_temp_val;
1872 * For each RRA, update the seasonal values and then call update_aberrant_CF
1873 * for each data source.
1875 * Return 0 on success, -1 on error.
1877 static int update_aberrant_cdps(
1879 rrd_file_t *rrd_file,
1880 unsigned long rra_begin,
1881 unsigned long *rra_current,
1882 unsigned long elapsed_pdp_st,
1883 rrd_value_t *pdp_temp,
1884 rrd_value_t **seasonal_coef)
1886 unsigned long rra_idx, ds_idx, j;
1888 /* number of PDP steps since the last update that
1889 * are assigned to the first CDP to be generated
1890 * since the last update. */
1891 unsigned short scratch_idx;
1892 unsigned long rra_start;
1893 enum cf_en current_cf;
1895 /* this loop is only entered if elapsed_pdp_st < 3 */
1896 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1897 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1898 rra_start = rra_begin;
1899 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1900 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1901 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1902 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1903 if (scratch_idx == CDP_primary_val) {
1904 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1905 elapsed_pdp_st + 1, seasonal_coef);
1907 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1908 elapsed_pdp_st + 2, seasonal_coef);
1910 *rra_current = rrd_tell(rrd_file);
1912 if (rrd_test_error())
1914 /* loop over data soures within each RRA */
1915 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1916 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1917 rra_idx * (rrd->stat_head->ds_cnt) +
1918 ds_idx, rra_idx, ds_idx, scratch_idx,
1922 rra_start += rrd->rra_def[rra_idx].row_cnt
1923 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1930 * Move sequentially through the file, writing one RRA at a time. Note this
1931 * architecture divorces the computation of CDP with flushing updated RRA
1934 * Return 0 on success, -1 on error.
1936 static int write_to_rras(
1938 rrd_file_t *rrd_file,
1939 unsigned long *rra_step_cnt,
1940 unsigned long rra_begin,
1941 unsigned long *rra_current,
1942 time_t current_time,
1943 unsigned long *skip_update,
1944 rrd_info_t ** pcdp_summary)
1946 unsigned long rra_idx;
1947 unsigned long rra_start;
1948 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1949 time_t rra_time = 0; /* time of update for a RRA */
1951 /* Ready to write to disk */
1952 rra_start = rra_begin;
1953 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1954 /* skip unless there's something to write */
1955 if (rra_step_cnt[rra_idx]) {
1956 /* write the first row */
1958 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1960 rrd->rra_ptr[rra_idx].cur_row++;
1961 if (rrd->rra_ptr[rra_idx].cur_row >=
1962 rrd->rra_def[rra_idx].row_cnt)
1963 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1964 /* position on the first row */
1965 rra_pos_tmp = rra_start +
1966 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1967 sizeof(rrd_value_t);
1968 if (rra_pos_tmp != *rra_current) {
1969 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1970 rrd_set_error("seek error in rrd");
1973 *rra_current = rra_pos_tmp;
1976 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1978 if (!skip_update[rra_idx]) {
1979 if (*pcdp_summary != NULL) {
1980 rra_time = (current_time - current_time
1981 % (rrd->rra_def[rra_idx].pdp_cnt *
1982 rrd->stat_head->pdp_step))
1984 ((rra_step_cnt[rra_idx] -
1985 1) * rrd->rra_def[rra_idx].pdp_cnt *
1986 rrd->stat_head->pdp_step);
1989 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1990 pcdp_summary, rra_time) == -1)
1994 /* write other rows of the bulk update, if any */
1995 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1996 if (++rrd->rra_ptr[rra_idx].cur_row ==
1997 rrd->rra_def[rra_idx].row_cnt) {
2000 "Wraparound for RRA %s, %lu updates left\n",
2001 rrd->rra_def[rra_idx].cf_nam,
2002 rra_step_cnt[rra_idx] - 1);
2005 rrd->rra_ptr[rra_idx].cur_row = 0;
2006 /* seek back to beginning of current rra */
2007 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
2008 rrd_set_error("seek error in rrd");
2012 fprintf(stderr, " -- Wraparound Postseek %ld\n",
2015 *rra_current = rra_start;
2017 if (!skip_update[rra_idx]) {
2018 if (*pcdp_summary != NULL) {
2019 rra_time = (current_time - current_time
2020 % (rrd->rra_def[rra_idx].pdp_cnt *
2021 rrd->stat_head->pdp_step))
2023 ((rra_step_cnt[rra_idx] -
2024 2) * rrd->rra_def[rra_idx].pdp_cnt *
2025 rrd->stat_head->pdp_step);
2027 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2028 CDP_secondary_val, pcdp_summary,
2034 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2035 sizeof(rrd_value_t);
2042 * Write out one row of values (one value per DS) to the archive.
2044 * Returns 0 on success, -1 on error.
2046 static int write_RRA_row(
2047 rrd_file_t *rrd_file,
2049 unsigned long rra_idx,
2050 unsigned long *rra_current,
2051 unsigned short CDP_scratch_idx,
2052 rrd_info_t ** pcdp_summary,
2055 unsigned long ds_idx, cdp_idx;
2058 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2059 /* compute the cdp index */
2060 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2062 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2063 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2064 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2066 if (*pcdp_summary != NULL) {
2067 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2068 /* append info to the return hash */
2069 *pcdp_summary = rrd_info_push(*pcdp_summary,
2071 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2072 rrd->rra_def[rra_idx].cf_nam,
2073 rrd->rra_def[rra_idx].pdp_cnt,
2074 rrd->ds_def[ds_idx].ds_nam),
2077 if (rrd_write(rrd_file,
2078 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2079 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2080 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2083 *rra_current += sizeof(rrd_value_t);
2089 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2091 * Returns 0 on success, -1 otherwise
2093 static int smooth_all_rras(
2095 rrd_file_t *rrd_file,
2096 unsigned long rra_begin)
2098 unsigned long rra_start = rra_begin;
2099 unsigned long rra_idx;
2101 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2102 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2103 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2105 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2107 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2108 if (rrd_test_error())
2111 rra_start += rrd->rra_def[rra_idx].row_cnt
2112 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2119 * Flush changes to disk (unless we're using mmap)
2121 * Returns 0 on success, -1 otherwise
2123 static int write_changes_to_disk(
2125 rrd_file_t *rrd_file,
2128 /* we just need to write back the live header portion now */
2129 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2130 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2131 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2133 rrd_set_error("seek rrd for live header writeback");
2137 if (rrd_write(rrd_file, rrd->live_head,
2138 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2139 rrd_set_error("rrd_write live_head to rrd");
2143 if (rrd_write(rrd_file, rrd->legacy_last_up,
2144 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2145 rrd_set_error("rrd_write live_head to rrd");
2151 if (rrd_write(rrd_file, rrd->pdp_prep,
2152 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2153 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2154 rrd_set_error("rrd_write pdp_prep to rrd");
2158 if (rrd_write(rrd_file, rrd->cdp_prep,
2159 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2160 rrd->stat_head->ds_cnt)
2161 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2162 rrd->stat_head->ds_cnt)) {
2164 rrd_set_error("rrd_write cdp_prep to rrd");
2168 if (rrd_write(rrd_file, rrd->rra_ptr,
2169 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2170 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2171 rrd_set_error("rrd_write rra_ptr to rrd");