2 /*****************************************************************************
3 * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
9 *****************************************************************************/
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
34 #include <sys/timeb.h>
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
48 static int gettimeofday(
50 struct __timezone *tz)
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
65 /* FUNCTION PROTOTYPES */
79 static int allocate_data_structures(
82 rrd_value_t **pdp_temp,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
93 unsigned long *tmpl_cnt,
96 static int process_arg(
100 unsigned long rra_begin,
101 unsigned long *rra_current,
102 time_t *current_time,
103 unsigned long *current_time_usec,
104 rrd_value_t *pdp_temp,
105 rrd_value_t *pdp_new,
106 unsigned long *rra_step_cnt,
109 unsigned long tmpl_cnt,
110 rrd_info_t ** pcdp_summary,
112 unsigned long *skip_update,
113 int *schedule_smooth);
120 unsigned long tmpl_cnt,
121 time_t *current_time,
122 unsigned long *current_time_usec,
125 static int get_time_from_reading(
129 time_t *current_time,
130 unsigned long *current_time_usec,
133 static int update_pdp_prep(
136 rrd_value_t *pdp_new,
139 static int calculate_elapsed_steps(
141 unsigned long current_time,
142 unsigned long current_time_usec,
146 unsigned long *proc_pdp_cnt);
148 static void simple_update(
151 rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
158 unsigned long elapsed_pdp_st,
159 rrd_value_t *pdp_new,
160 rrd_value_t *pdp_temp);
162 static int process_pdp_st(
164 unsigned long ds_idx,
169 rrd_value_t *pdp_new,
170 rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
174 unsigned long *rra_step_cnt,
175 unsigned long rra_begin,
176 rrd_file_t *rrd_file,
177 unsigned long elapsed_pdp_st,
178 unsigned long proc_pdp_cnt,
179 rrd_value_t **last_seasonal_coef,
180 rrd_value_t **seasonal_coef,
181 rrd_value_t *pdp_temp,
182 unsigned long *rra_current,
183 unsigned long *skip_update,
184 int *schedule_smooth);
186 static int do_schedule_smooth(
188 unsigned long rra_idx,
189 unsigned long elapsed_pdp_st);
191 static int update_cdp_prep(
193 unsigned long elapsed_pdp_st,
194 unsigned long start_pdp_offset,
195 unsigned long *rra_step_cnt,
197 rrd_value_t *pdp_temp,
198 rrd_value_t *last_seasonal_coef,
199 rrd_value_t *seasonal_coef,
202 static void update_cdp(
205 rrd_value_t pdp_temp_val,
206 unsigned long rra_step_cnt,
207 unsigned long elapsed_pdp_st,
208 unsigned long start_pdp_offset,
209 unsigned long pdp_cnt,
214 static void initialize_cdp_val(
217 rrd_value_t pdp_temp_val,
218 unsigned long elapsed_pdp_st,
219 unsigned long start_pdp_offset,
220 unsigned long pdp_cnt);
222 static void reset_cdp(
224 unsigned long elapsed_pdp_st,
225 rrd_value_t *pdp_temp,
226 rrd_value_t *last_seasonal_coef,
227 rrd_value_t *seasonal_coef,
231 enum cf_en current_cf);
233 static rrd_value_t initialize_average_carry_over(
234 rrd_value_t pdp_temp_val,
235 unsigned long elapsed_pdp_st,
236 unsigned long start_pdp_offset,
237 unsigned long pdp_cnt);
239 static rrd_value_t calculate_cdp_val(
241 rrd_value_t pdp_temp_val,
242 unsigned long elapsed_pdp_st,
247 static int update_aberrant_cdps(
249 rrd_file_t *rrd_file,
250 unsigned long rra_begin,
251 unsigned long *rra_current,
252 unsigned long elapsed_pdp_st,
253 rrd_value_t *pdp_temp,
254 rrd_value_t **seasonal_coef);
256 static int write_to_rras(
258 rrd_file_t *rrd_file,
259 unsigned long *rra_step_cnt,
260 unsigned long rra_begin,
261 unsigned long *rra_current,
263 unsigned long *skip_update,
264 rrd_info_t ** pcdp_summary);
266 static int write_RRA_row(
267 rrd_file_t *rrd_file,
269 unsigned long rra_idx,
270 unsigned long *rra_current,
271 unsigned short CDP_scratch_idx,
272 rrd_info_t ** pcdp_summary,
275 static int smooth_all_rras(
277 rrd_file_t *rrd_file,
278 unsigned long rra_begin);
281 static int write_changes_to_disk(
283 rrd_file_t *rrd_file,
288 * normalize time as returned by gettimeofday. usec part must
291 static inline void normalize_time(
294 if (t->tv_usec < 0) {
301 * Sets current_time and current_time_usec based on the current time.
302 * current_time_usec is set to 0 if the version number is 1 or 2.
304 static inline void initialize_time(
305 time_t *current_time,
306 unsigned long *current_time_usec,
309 struct timeval tmp_time; /* used for time conversion */
311 gettimeofday(&tmp_time, 0);
312 normalize_time(&tmp_time);
313 *current_time = tmp_time.tv_sec;
315 *current_time_usec = tmp_time.tv_usec;
317 *current_time_usec = 0;
321 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
323 rrd_info_t *rrd_update_v(
328 rrd_info_t *result = NULL;
330 struct option long_options[] = {
331 {"template", required_argument, 0, 't'},
337 opterr = 0; /* initialize getopt */
340 int option_index = 0;
343 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
354 rrd_set_error("unknown option '%s'", argv[optind - 1]);
359 /* need at least 2 arguments: filename, data. */
360 if (argc - optind < 2) {
361 rrd_set_error("Not enough arguments");
365 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
366 rc.u_int = _rrd_update(argv[optind], tmplt,
368 (const char **) (argv + optind + 1), result);
369 result->value.u_int = rc.u_int;
378 struct option long_options[] = {
379 {"template", required_argument, 0, 't'},
380 {"daemon", required_argument, 0, 'd'},
383 int option_index = 0;
390 opterr = 0; /* initialize getopt */
393 opt = getopt_long(argc, argv, "t:cnd:", long_options, &option_index);
400 tmplt = strdup(optarg);
406 daemon = strdup (optarg);
409 rrd_set_error("strdup failed.");
415 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 /* need at least 2 arguments: filename, data. */
421 if (argc - optind < 2) {
422 rrd_set_error("Not enough arguments");
426 if ((tmplt != NULL) && (daemon != NULL))
428 rrd_set_error("The caching daemon cannot be used together with "
437 status = rrdc_connect (daemon);
440 rrd_set_error("Unable to connect to daemon: %s",
443 : rrd_strerror (status));
447 status = rrdc_update (/* file = */ argv[optind],
448 /* values_num = */ argc - optind - 1,
449 /* values = */ (void *) (argv + optind + 1));
452 rrd_set_error("Failed sending the values to the daemon: %s",
455 : rrd_strerror (status));
460 } /* if (daemon != NULL) */
462 rc = rrd_update_r(argv[optind], tmplt,
463 argc - optind - 1, (const char **) (argv + optind + 1));
479 const char *filename,
484 return _rrd_update(filename, tmplt, argc, argv, NULL);
488 const char *filename,
492 rrd_info_t * pcdp_summary)
497 unsigned long rra_begin; /* byte pointer to the rra
498 * area in the rrd file. this
499 * pointer never changes value */
500 unsigned long rra_current; /* byte pointer to the current write
501 * spot in the rrd file. */
502 rrd_value_t *pdp_new; /* prepare the incoming data to be added
503 * to the existing entry */
504 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
505 * to the cdp values */
507 long *tmpl_idx; /* index representing the settings
508 * transported by the tmplt index */
509 unsigned long tmpl_cnt = 2; /* time and data */
511 time_t current_time = 0;
512 unsigned long current_time_usec = 0; /* microseconds part of current time */
514 int schedule_smooth = 0;
516 /* number of elapsed PDP steps since last update */
517 unsigned long *rra_step_cnt = NULL;
519 int version; /* rrd version */
520 rrd_file_t *rrd_file;
521 char *arg_copy; /* for processing the argv */
522 unsigned long *skip_update; /* RRAs to advance but not write */
524 /* need at least 1 arguments: data. */
526 rrd_set_error("Not enough arguments");
530 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
533 /* We are now at the beginning of the rra's */
534 rra_current = rra_begin = rrd_file->header_len;
536 version = atoi(rrd.stat_head->version);
538 initialize_time(¤t_time, ¤t_time_usec, version);
540 /* get exclusive lock to whole file.
541 * lock gets removed when we close the file.
543 if (rrd_lock(rrd_file) != 0) {
544 rrd_set_error("could not lock RRD");
548 if (allocate_data_structures(&rrd, &updvals,
549 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
550 &rra_step_cnt, &skip_update,
555 /* loop through the arguments. */
556 for (arg_i = 0; arg_i < argc; arg_i++) {
557 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
558 rrd_set_error("failed duplication argv entry");
561 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
562 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
563 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
564 &pcdp_summary, version, skip_update,
565 &schedule_smooth) == -1) {
574 /* if we got here and if there is an error and if the file has not been
575 * written to, then close things up and return. */
576 if (rrd_test_error()) {
577 goto err_free_structures;
580 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
581 goto err_free_structures;
585 /* calling the smoothing code here guarantees at most one smoothing
586 * operation per rrd_update call. Unfortunately, it is possible with bulk
587 * updates, or a long-delayed update for smoothing to occur off-schedule.
588 * This really isn't critical except during the burn-in cycles. */
589 if (schedule_smooth) {
590 smooth_all_rras(&rrd, rrd_file, rra_begin);
593 /* rrd_dontneed(rrd_file,&rrd); */
619 * get exclusive lock to whole file.
620 * lock gets removed when we close the file
622 * returns 0 on success
630 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
633 if (_fstat(file->fd, &st) == 0) {
634 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
641 lock.l_type = F_WRLCK; /* exclusive write lock */
642 lock.l_len = 0; /* whole file */
643 lock.l_start = 0; /* start of file */
644 lock.l_whence = SEEK_SET; /* end of file */
646 rcstat = fcntl(file->fd, F_SETLK, &lock);
654 * Allocate some important arrays used, and initialize the template.
656 * When it returns, either all of the structures are allocated
657 * or none of them are.
659 * Returns 0 on success, -1 on error.
661 static int allocate_data_structures(
664 rrd_value_t **pdp_temp,
667 unsigned long *tmpl_cnt,
668 unsigned long **rra_step_cnt,
669 unsigned long **skip_update,
670 rrd_value_t **pdp_new)
673 if ((*updvals = (char **) malloc(sizeof(char *)
674 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
675 rrd_set_error("allocating updvals pointer array.");
678 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
679 * rrd->stat_head->ds_cnt)) ==
681 rrd_set_error("allocating pdp_temp.");
682 goto err_free_updvals;
684 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
686 rrd->stat_head->rra_cnt)) ==
688 rrd_set_error("allocating skip_update.");
689 goto err_free_pdp_temp;
691 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
692 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
693 rrd_set_error("allocating tmpl_idx.");
694 goto err_free_skip_update;
696 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
699 rra_cnt))) == NULL) {
700 rrd_set_error("allocating rra_step_cnt.");
701 goto err_free_tmpl_idx;
704 /* initialize tmplt redirector */
705 /* default config example (assume DS 1 is a CDEF DS)
706 tmpl_idx[0] -> 0; (time)
707 tmpl_idx[1] -> 1; (DS 0)
708 tmpl_idx[2] -> 3; (DS 2)
709 tmpl_idx[3] -> 4; (DS 3) */
710 (*tmpl_idx)[0] = 0; /* time */
711 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
712 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
713 (*tmpl_idx)[ii++] = i;
718 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
719 goto err_free_rra_step_cnt;
723 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
724 * rrd->stat_head->ds_cnt)) == NULL) {
725 rrd_set_error("allocating pdp_new.");
726 goto err_free_rra_step_cnt;
731 err_free_rra_step_cnt:
735 err_free_skip_update:
745 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
747 * Returns 0 on success.
749 static int parse_template(
752 unsigned long *tmpl_cnt,
755 char *dsname, *tmplt_copy;
756 unsigned int tmpl_len, i;
759 *tmpl_cnt = 1; /* the first entry is the time */
761 /* we should work on a writeable copy here */
762 if ((tmplt_copy = strdup(tmplt)) == NULL) {
763 rrd_set_error("error copying tmplt '%s'", tmplt);
769 tmpl_len = strlen(tmplt_copy);
770 for (i = 0; i <= tmpl_len; i++) {
771 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
772 tmplt_copy[i] = '\0';
773 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
774 rrd_set_error("tmplt contains more DS definitions than RRD");
776 goto out_free_tmpl_copy;
778 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
779 rrd_set_error("unknown DS name '%s'", dsname);
781 goto out_free_tmpl_copy;
783 /* go to the next entry on the tmplt_copy */
785 dsname = &tmplt_copy[i + 1];
795 * Parse an update string, updates the primary data points (PDPs)
796 * and consolidated data points (CDPs), and writes changes to the RRAs.
798 * Returns 0 on success, -1 on error.
800 static int process_arg(
803 rrd_file_t *rrd_file,
804 unsigned long rra_begin,
805 unsigned long *rra_current,
806 time_t *current_time,
807 unsigned long *current_time_usec,
808 rrd_value_t *pdp_temp,
809 rrd_value_t *pdp_new,
810 unsigned long *rra_step_cnt,
813 unsigned long tmpl_cnt,
814 rrd_info_t ** pcdp_summary,
816 unsigned long *skip_update,
817 int *schedule_smooth)
819 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
821 /* a vector of future Holt-Winters seasonal coefs */
822 unsigned long elapsed_pdp_st;
824 double interval, pre_int, post_int; /* interval between this and
826 unsigned long proc_pdp_cnt;
827 unsigned long rra_start;
829 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
830 current_time, current_time_usec, version) == -1) {
833 /* seek to the beginning of the rra's */
834 if (*rra_current != rra_begin) {
836 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
837 rrd_set_error("seek error in rrd");
841 *rra_current = rra_begin;
843 rra_start = rra_begin;
845 interval = (double) (*current_time - rrd->live_head->last_up)
846 + (double) ((long) *current_time_usec -
847 (long) rrd->live_head->last_up_usec) / 1e6f;
849 /* process the data sources and update the pdp_prep
850 * area accordingly */
851 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
855 elapsed_pdp_st = calculate_elapsed_steps(rrd,
857 *current_time_usec, interval,
861 /* has a pdp_st moment occurred since the last run ? */
862 if (elapsed_pdp_st == 0) {
863 /* no we have not passed a pdp_st moment. therefore update is simple */
864 simple_update(rrd, interval, pdp_new);
866 /* an pdp_st has occurred. */
867 if (process_all_pdp_st(rrd, interval,
869 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
872 if (update_all_cdp_prep(rrd, rra_step_cnt,
878 pdp_temp, rra_current,
879 skip_update, schedule_smooth) == -1) {
880 goto err_free_coefficients;
882 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
883 elapsed_pdp_st, pdp_temp,
884 &seasonal_coef) == -1) {
885 goto err_free_coefficients;
887 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
888 rra_current, *current_time, skip_update,
889 pcdp_summary) == -1) {
890 goto err_free_coefficients;
892 } /* endif a pdp_st has occurred */
893 rrd->live_head->last_up = *current_time;
894 rrd->live_head->last_up_usec = *current_time_usec;
897 *rrd->legacy_last_up = rrd->live_head->last_up;
900 free(last_seasonal_coef);
903 err_free_coefficients:
905 free(last_seasonal_coef);
910 * Parse a DS string (time + colon-separated values), storing the
911 * results in current_time, current_time_usec, and updvals.
913 * Returns 0 on success, -1 on error.
920 unsigned long tmpl_cnt,
921 time_t *current_time,
922 unsigned long *current_time_usec,
930 /* initialize all ds input to unknown except the first one
931 which has always got to be set */
932 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
935 /* separate all ds elements; first must be examined separately
936 due to alternate time syntax */
937 if ((p = strchr(input, '@')) != NULL) {
939 } else if ((p = strchr(input, ':')) != NULL) {
942 rrd_set_error("expected timestamp not found in data source from %s",
948 updvals[tmpl_idx[i++]] = p + 1;
953 updvals[tmpl_idx[i++]] = p + 1;
959 rrd_set_error("expected %lu data source readings (got %lu) from %s",
960 tmpl_cnt - 1, i, input);
964 if (get_time_from_reading(rrd, timesyntax, updvals,
965 current_time, current_time_usec,
973 * Parse the time in a DS string, store it in current_time and
974 * current_time_usec and verify that it's later than the last
975 * update for this DS.
977 * Returns 0 on success, -1 on error.
979 static int get_time_from_reading(
983 time_t *current_time,
984 unsigned long *current_time_usec,
988 char *parsetime_error = NULL;
990 rrd_time_value_t ds_tv;
991 struct timeval tmp_time; /* used for time conversion */
993 /* get the time from the reading ... handle N */
994 if (timesyntax == '@') { /* at-style */
995 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
996 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
999 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1000 ds_tv.type == RELATIVE_TO_START_TIME) {
1001 rrd_set_error("specifying time relative to the 'start' "
1002 "or 'end' makes no sense here: %s", updvals[0]);
1005 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1006 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1007 } else if (strcmp(updvals[0], "N") == 0) {
1008 gettimeofday(&tmp_time, 0);
1009 normalize_time(&tmp_time);
1010 *current_time = tmp_time.tv_sec;
1011 *current_time_usec = tmp_time.tv_usec;
1013 old_locale = setlocale(LC_NUMERIC, "C");
1014 tmp = strtod(updvals[0], 0);
1015 setlocale(LC_NUMERIC, old_locale);
1016 *current_time = floor(tmp);
1017 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1019 /* dont do any correction for old version RRDs */
1021 *current_time_usec = 0;
1023 if (*current_time < rrd->live_head->last_up ||
1024 (*current_time == rrd->live_head->last_up &&
1025 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1026 rrd_set_error("illegal attempt to update using time %ld when "
1027 "last update time is %ld (minimum one second step)",
1028 *current_time, rrd->live_head->last_up);
1035 * Update pdp_new by interpreting the updvals according to the DS type
1036 * (COUNTER, GAUGE, etc.).
1038 * Returns 0 on success, -1 on error.
1040 static int update_pdp_prep(
1043 rrd_value_t *pdp_new,
1046 unsigned long ds_idx;
1048 char *endptr; /* used in the conversion */
1051 enum dst_en dst_idx;
1053 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1054 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1056 /* make sure we do not build diffs with old last_ds values */
1057 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1058 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1059 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1062 /* NOTE: DST_CDEF should never enter this if block, because
1063 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1064 * accidently specified a value for the DST_CDEF. To handle this case,
1065 * an extra check is required. */
1067 if ((updvals[ds_idx + 1][0] != 'U') &&
1068 (dst_idx != DST_CDEF) &&
1069 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1072 /* pdp_new contains rate * time ... eg the bytes transferred during
1073 * the interval. Doing it this way saves a lot of math operations
1078 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1079 if ((updvals[ds_idx + 1][ii] < '0'
1080 || updvals[ds_idx + 1][ii] > '9')
1081 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1082 rrd_set_error("not a simple integer: '%s'",
1083 updvals[ds_idx + 1]);
1087 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1089 rrd_diff(updvals[ds_idx + 1],
1090 rrd->pdp_prep[ds_idx].last_ds);
1091 if (dst_idx == DST_COUNTER) {
1092 /* simple overflow catcher. This will fail
1093 * terribly for non 32 or 64 bit counters
1094 * ... are there any others in SNMP land?
1096 if (pdp_new[ds_idx] < (double) 0.0)
1097 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1098 if (pdp_new[ds_idx] < (double) 0.0)
1099 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1101 rate = pdp_new[ds_idx] / interval;
1103 pdp_new[ds_idx] = DNAN;
1107 old_locale = setlocale(LC_NUMERIC, "C");
1109 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1110 setlocale(LC_NUMERIC, old_locale);
1112 rrd_set_error("converting '%s' to float: %s",
1113 updvals[ds_idx + 1], rrd_strerror(errno));
1116 if (endptr[0] != '\0') {
1118 ("conversion of '%s' to float not complete: tail '%s'",
1119 updvals[ds_idx + 1], endptr);
1122 rate = pdp_new[ds_idx] / interval;
1126 old_locale = setlocale(LC_NUMERIC, "C");
1128 strtod(updvals[ds_idx + 1], &endptr) * interval;
1129 setlocale(LC_NUMERIC, old_locale);
1131 rrd_set_error("converting '%s' to float: %s",
1132 updvals[ds_idx + 1], rrd_strerror(errno));
1135 if (endptr[0] != '\0') {
1137 ("conversion of '%s' to float not complete: tail '%s'",
1138 updvals[ds_idx + 1], endptr);
1141 rate = pdp_new[ds_idx] / interval;
1144 rrd_set_error("rrd contains unknown DS type : '%s'",
1145 rrd->ds_def[ds_idx].dst);
1148 /* break out of this for loop if the error string is set */
1149 if (rrd_test_error()) {
1152 /* make sure pdp_temp is neither too large or too small
1153 * if any of these occur it becomes unknown ...
1154 * sorry folks ... */
1156 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1157 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1158 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1159 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1160 pdp_new[ds_idx] = DNAN;
1163 /* no news is news all the same */
1164 pdp_new[ds_idx] = DNAN;
1168 /* make a copy of the command line argument for the next run */
1170 fprintf(stderr, "prep ds[%lu]\t"
1174 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1177 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1179 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1185 * How many PDP steps have elapsed since the last update? Returns the answer,
1186 * and stores the time between the last update and the last PDP in pre_time,
1187 * and the time between the last PDP and the current time in post_int.
1189 static int calculate_elapsed_steps(
1191 unsigned long current_time,
1192 unsigned long current_time_usec,
1196 unsigned long *proc_pdp_cnt)
1198 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1199 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1201 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1202 * when it was last updated */
1203 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1205 /* when was the current pdp started */
1206 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1207 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1209 /* when did the last pdp_st occur */
1210 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1211 occu_pdp_st = current_time - occu_pdp_age;
1213 if (occu_pdp_st > proc_pdp_st) {
1214 /* OK we passed the pdp_st moment */
1215 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1216 * occurred before the latest
1218 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1219 *post_int = occu_pdp_age; /* how much after it */
1220 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1222 *pre_int = interval;
1226 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1229 printf("proc_pdp_age %lu\t"
1231 "occu_pfp_age %lu\t"
1235 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1236 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1239 /* compute the number of elapsed pdp_st moments */
1240 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1244 * Increment the PDP values by the values in pdp_new, or else initialize them.
1246 static void simple_update(
1249 rrd_value_t *pdp_new)
1253 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1254 if (isnan(pdp_new[i])) {
1255 /* this is not really accurate if we use subsecond data arrival time
1256 should have thought of it when going subsecond resolution ...
1257 sorry next format change we will have it! */
1258 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1261 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1262 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1264 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1273 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1274 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1280 * Call process_pdp_st for each DS.
1282 * Returns 0 on success, -1 on error.
1284 static int process_all_pdp_st(
1289 unsigned long elapsed_pdp_st,
1290 rrd_value_t *pdp_new,
1291 rrd_value_t *pdp_temp)
1293 unsigned long ds_idx;
1295 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1296 rate*seconds which occurred up to the last run.
1297 pdp_new[] contains rate*seconds from the latest run.
1298 pdp_temp[] will contain the rate for cdp */
1300 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1301 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1302 elapsed_pdp_st * rrd->stat_head->pdp_step,
1303 pdp_new, pdp_temp) == -1) {
1307 fprintf(stderr, "PDP UPD ds[%lu]\t"
1308 "elapsed_pdp_st %lu\t"
1311 "new_unkn_sec %5lu\n",
1315 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1316 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1323 * Process an update that occurs after one of the PDP moments.
1324 * Increments the PDP value, sets NAN if time greater than the
1325 * heartbeats have elapsed, processes CDEFs.
1327 * Returns 0 on success, -1 on error.
1329 static int process_pdp_st(
1331 unsigned long ds_idx,
1335 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1336 rrd_value_t *pdp_new,
1337 rrd_value_t *pdp_temp)
1341 /* update pdp_prep to the current pdp_st. */
1342 double pre_unknown = 0.0;
1343 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1344 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1346 rpnstack_t rpnstack; /* used for COMPUTE DS */
1348 rpnstack_init(&rpnstack);
1351 if (isnan(pdp_new[ds_idx])) {
1352 /* a final bit of unknown to be added before calculation
1353 we use a temporary variable for this so that we
1354 don't have to turn integer lines before using the value */
1355 pre_unknown = pre_int;
1357 if (isnan(scratch[PDP_val].u_val)) {
1358 scratch[PDP_val].u_val = 0;
1360 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1363 /* if too much of the pdp_prep is unknown we dump it */
1364 /* if the interval is larger thatn mrhb we get NAN */
1365 if ((interval > mrhb) ||
1366 (rrd->stat_head->pdp_step / 2.0 <
1367 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1368 pdp_temp[ds_idx] = DNAN;
1370 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1371 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1375 /* process CDEF data sources; remember each CDEF DS can
1376 * only reference other DS with a lower index number */
1377 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1381 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1382 /* substitute data values for OP_VARIABLE nodes */
1383 for (i = 0; rpnp[i].op != OP_END; i++) {
1384 if (rpnp[i].op == OP_VARIABLE) {
1385 rpnp[i].op = OP_NUMBER;
1386 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1389 /* run the rpn calculator */
1390 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1392 rpnstack_free(&rpnstack);
1397 /* make pdp_prep ready for the next run */
1398 if (isnan(pdp_new[ds_idx])) {
1399 /* this is not realy accurate if we use subsecond data arival time
1400 should have thought of it when going subsecond resolution ...
1401 sorry next format change we will have it! */
1402 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1403 scratch[PDP_val].u_val = DNAN;
1405 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1406 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1408 rpnstack_free(&rpnstack);
1413 * Iterate over all the RRAs for a given DS and:
1414 * 1. Decide whether to schedule a smooth later
1415 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1418 * Returns 0 on success, -1 on error
1420 static int update_all_cdp_prep(
1422 unsigned long *rra_step_cnt,
1423 unsigned long rra_begin,
1424 rrd_file_t *rrd_file,
1425 unsigned long elapsed_pdp_st,
1426 unsigned long proc_pdp_cnt,
1427 rrd_value_t **last_seasonal_coef,
1428 rrd_value_t **seasonal_coef,
1429 rrd_value_t *pdp_temp,
1430 unsigned long *rra_current,
1431 unsigned long *skip_update,
1432 int *schedule_smooth)
1434 unsigned long rra_idx;
1436 /* index into the CDP scratch array */
1437 enum cf_en current_cf;
1438 unsigned long rra_start;
1440 /* number of rows to be updated in an RRA for a data value. */
1441 unsigned long start_pdp_offset;
1443 rra_start = rra_begin;
1444 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1445 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1447 rrd->rra_def[rra_idx].pdp_cnt -
1448 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1449 skip_update[rra_idx] = 0;
1450 if (start_pdp_offset <= elapsed_pdp_st) {
1451 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1452 rrd->rra_def[rra_idx].pdp_cnt + 1;
1454 rra_step_cnt[rra_idx] = 0;
1457 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1458 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1459 * so that they will be correct for the next observed value; note that for
1460 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1461 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1462 if (rra_step_cnt[rra_idx] > 1) {
1463 skip_update[rra_idx] = 1;
1464 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1465 elapsed_pdp_st, last_seasonal_coef);
1466 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1467 elapsed_pdp_st + 1, seasonal_coef);
1469 /* periodically run a smoother for seasonal effects */
1470 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1473 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1474 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1475 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1478 *schedule_smooth = 1;
1480 *rra_current = rrd_tell(rrd_file);
1482 if (rrd_test_error())
1486 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1487 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1488 current_cf) == -1) {
1492 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1493 sizeof(rrd_value_t);
1499 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1501 static int do_schedule_smooth(
1503 unsigned long rra_idx,
1504 unsigned long elapsed_pdp_st)
1506 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1507 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1508 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1509 unsigned long seasonal_smooth_idx =
1510 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1511 unsigned long *init_seasonal =
1512 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1514 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1515 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1516 * really an RRA level, not a data source within RRA level parameter, but
1517 * the rra_def is read only for rrd_update (not flushed to disk). */
1518 if (*init_seasonal > BURNIN_CYCLES) {
1519 /* someone has no doubt invented a trick to deal with this wrap around,
1520 * but at least this code is clear. */
1521 if (seasonal_smooth_idx > cur_row) {
1522 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1523 * between PDP and CDP */
1524 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1526 /* can't rely on negative numbers because we are working with
1527 * unsigned values */
1528 return (cur_row + elapsed_pdp_st >= row_cnt
1529 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1531 /* mark off one of the burn-in cycles */
1532 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1536 * For a given RRA, iterate over the data sources and call the appropriate
1537 * consolidation function.
1539 * Returns 0 on success, -1 on error.
1541 static int update_cdp_prep(
1543 unsigned long elapsed_pdp_st,
1544 unsigned long start_pdp_offset,
1545 unsigned long *rra_step_cnt,
1547 rrd_value_t *pdp_temp,
1548 rrd_value_t *last_seasonal_coef,
1549 rrd_value_t *seasonal_coef,
1552 unsigned long ds_idx, cdp_idx;
1554 /* update CDP_PREP areas */
1555 /* loop over data soures within each RRA */
1556 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1558 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1560 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1561 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1562 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1563 elapsed_pdp_st, start_pdp_offset,
1564 rrd->rra_def[rra_idx].pdp_cnt,
1565 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1568 /* Nothing to consolidate if there's one PDP per CDP. However, if
1569 * we've missed some PDPs, let's update null counters etc. */
1570 if (elapsed_pdp_st > 2) {
1571 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1572 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1577 if (rrd_test_error())
1579 } /* endif data sources loop */
1584 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1585 * primary value, secondary value, and # of unknowns.
1587 static void update_cdp(
1590 rrd_value_t pdp_temp_val,
1591 unsigned long rra_step_cnt,
1592 unsigned long elapsed_pdp_st,
1593 unsigned long start_pdp_offset,
1594 unsigned long pdp_cnt,
1599 /* shorthand variables */
1600 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1601 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1602 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1603 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1606 /* If we are in this block, as least 1 CDP value will be written to
1607 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1608 * to be written, then the "fill in" value is the CDP_secondary_val
1610 if (isnan(pdp_temp_val)) {
1611 *cdp_unkn_pdp_cnt += start_pdp_offset;
1612 *cdp_secondary_val = DNAN;
1614 /* CDP_secondary value is the RRA "fill in" value for intermediary
1615 * CDP data entries. No matter the CF, the value is the same because
1616 * the average, max, min, and last of a list of identical values is
1617 * the same, namely, the value itself. */
1618 *cdp_secondary_val = pdp_temp_val;
1621 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1622 *cdp_primary_val = DNAN;
1623 if (current_cf == CF_AVERAGE) {
1625 initialize_average_carry_over(pdp_temp_val,
1627 start_pdp_offset, pdp_cnt);
1629 *cdp_val = pdp_temp_val;
1632 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1633 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1634 } /* endif meets xff value requirement for a valid value */
1635 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1636 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1637 if (isnan(pdp_temp_val))
1638 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1640 *cdp_unkn_pdp_cnt = 0;
1641 } else { /* rra_step_cnt[i] == 0 */
1644 if (isnan(*cdp_val)) {
1645 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1648 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1652 if (isnan(pdp_temp_val)) {
1653 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1656 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1663 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1664 * on the type of consolidation function.
1666 static void initialize_cdp_val(
1669 rrd_value_t pdp_temp_val,
1670 unsigned long elapsed_pdp_st,
1671 unsigned long start_pdp_offset,
1672 unsigned long pdp_cnt)
1674 rrd_value_t cum_val, cur_val;
1676 switch (current_cf) {
1678 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1679 cur_val = IFDNAN(pdp_temp_val, 0.0);
1680 scratch[CDP_primary_val].u_val =
1681 (cum_val + cur_val * start_pdp_offset) /
1682 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1683 scratch[CDP_val].u_val =
1684 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1685 start_pdp_offset, pdp_cnt);
1688 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1689 cur_val = IFDNAN(pdp_temp_val, -DINF);
1692 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1694 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1700 if (cur_val > cum_val)
1701 scratch[CDP_primary_val].u_val = cur_val;
1703 scratch[CDP_primary_val].u_val = cum_val;
1704 /* initialize carry over value */
1705 scratch[CDP_val].u_val = pdp_temp_val;
1708 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1709 cur_val = IFDNAN(pdp_temp_val, DINF);
1712 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1714 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1720 if (cur_val < cum_val)
1721 scratch[CDP_primary_val].u_val = cur_val;
1723 scratch[CDP_primary_val].u_val = cum_val;
1724 /* initialize carry over value */
1725 scratch[CDP_val].u_val = pdp_temp_val;
1729 scratch[CDP_primary_val].u_val = pdp_temp_val;
1730 /* initialize carry over value */
1731 scratch[CDP_val].u_val = pdp_temp_val;
1737 * Update the consolidation function for Holt-Winters functions as
1738 * well as other functions that don't actually consolidate multiple
1741 static void reset_cdp(
1743 unsigned long elapsed_pdp_st,
1744 rrd_value_t *pdp_temp,
1745 rrd_value_t *last_seasonal_coef,
1746 rrd_value_t *seasonal_coef,
1750 enum cf_en current_cf)
1752 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1754 switch (current_cf) {
1757 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1758 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1761 case CF_DEVSEASONAL:
1762 /* need to update cached seasonal values, so they are consistent
1763 * with the bulk update */
1764 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1765 * CDP_last_deviation are the same. */
1766 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1767 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1771 /* need to update the null_count and last_null_count.
1772 * even do this for non-DNAN pdp_temp because the
1773 * algorithm is not learning from batch updates. */
1774 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1775 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1778 scratch[CDP_primary_val].u_val = DNAN;
1779 scratch[CDP_secondary_val].u_val = DNAN;
1782 /* do not count missed bulk values as failures */
1783 scratch[CDP_primary_val].u_val = 0;
1784 scratch[CDP_secondary_val].u_val = 0;
1785 /* need to reset violations buffer.
1786 * could do this more carefully, but for now, just
1787 * assume a bulk update wipes away all violations. */
1788 erase_violations(rrd, cdp_idx, rra_idx);
1793 static rrd_value_t initialize_average_carry_over(
1794 rrd_value_t pdp_temp_val,
1795 unsigned long elapsed_pdp_st,
1796 unsigned long start_pdp_offset,
1797 unsigned long pdp_cnt)
1799 /* initialize carry over value */
1800 if (isnan(pdp_temp_val)) {
1803 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1807 * Update or initialize a CDP value based on the consolidation
1810 * Returns the new value.
1812 static rrd_value_t calculate_cdp_val(
1813 rrd_value_t cdp_val,
1814 rrd_value_t pdp_temp_val,
1815 unsigned long elapsed_pdp_st,
1826 if (isnan(cdp_val)) {
1827 if (current_cf == CF_AVERAGE) {
1828 pdp_temp_val *= elapsed_pdp_st;
1831 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1832 i, ii, pdp_temp_val);
1834 return pdp_temp_val;
1836 if (current_cf == CF_AVERAGE)
1837 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1838 if (current_cf == CF_MINIMUM)
1839 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1840 if (current_cf == CF_MAXIMUM)
1841 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1843 return pdp_temp_val;
1847 * For each RRA, update the seasonal values and then call update_aberrant_CF
1848 * for each data source.
1850 * Return 0 on success, -1 on error.
1852 static int update_aberrant_cdps(
1854 rrd_file_t *rrd_file,
1855 unsigned long rra_begin,
1856 unsigned long *rra_current,
1857 unsigned long elapsed_pdp_st,
1858 rrd_value_t *pdp_temp,
1859 rrd_value_t **seasonal_coef)
1861 unsigned long rra_idx, ds_idx, j;
1863 /* number of PDP steps since the last update that
1864 * are assigned to the first CDP to be generated
1865 * since the last update. */
1866 unsigned short scratch_idx;
1867 unsigned long rra_start;
1868 enum cf_en current_cf;
1870 /* this loop is only entered if elapsed_pdp_st < 3 */
1871 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1872 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1873 rra_start = rra_begin;
1874 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1875 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1876 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1877 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1878 if (scratch_idx == CDP_primary_val) {
1879 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1880 elapsed_pdp_st + 1, seasonal_coef);
1882 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1883 elapsed_pdp_st + 2, seasonal_coef);
1885 *rra_current = rrd_tell(rrd_file);
1887 if (rrd_test_error())
1889 /* loop over data soures within each RRA */
1890 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1891 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1892 rra_idx * (rrd->stat_head->ds_cnt) +
1893 ds_idx, rra_idx, ds_idx, scratch_idx,
1897 rra_start += rrd->rra_def[rra_idx].row_cnt
1898 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1905 * Move sequentially through the file, writing one RRA at a time. Note this
1906 * architecture divorces the computation of CDP with flushing updated RRA
1909 * Return 0 on success, -1 on error.
1911 static int write_to_rras(
1913 rrd_file_t *rrd_file,
1914 unsigned long *rra_step_cnt,
1915 unsigned long rra_begin,
1916 unsigned long *rra_current,
1917 time_t current_time,
1918 unsigned long *skip_update,
1919 rrd_info_t ** pcdp_summary)
1921 unsigned long rra_idx;
1922 unsigned long rra_start;
1923 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1924 time_t rra_time = 0; /* time of update for a RRA */
1926 /* Ready to write to disk */
1927 rra_start = rra_begin;
1928 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1929 /* skip unless there's something to write */
1930 if (rra_step_cnt[rra_idx]) {
1931 /* write the first row */
1933 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1935 rrd->rra_ptr[rra_idx].cur_row++;
1936 if (rrd->rra_ptr[rra_idx].cur_row >=
1937 rrd->rra_def[rra_idx].row_cnt)
1938 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1939 /* position on the first row */
1940 rra_pos_tmp = rra_start +
1941 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1942 sizeof(rrd_value_t);
1943 if (rra_pos_tmp != *rra_current) {
1944 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1945 rrd_set_error("seek error in rrd");
1948 *rra_current = rra_pos_tmp;
1951 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1953 if (!skip_update[rra_idx]) {
1954 if (*pcdp_summary != NULL) {
1955 rra_time = (current_time - current_time
1956 % (rrd->rra_def[rra_idx].pdp_cnt *
1957 rrd->stat_head->pdp_step))
1959 ((rra_step_cnt[rra_idx] -
1960 1) * rrd->rra_def[rra_idx].pdp_cnt *
1961 rrd->stat_head->pdp_step);
1964 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1965 pcdp_summary, rra_time) == -1)
1969 /* write other rows of the bulk update, if any */
1970 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1971 if (++rrd->rra_ptr[rra_idx].cur_row ==
1972 rrd->rra_def[rra_idx].row_cnt) {
1975 "Wraparound for RRA %s, %lu updates left\n",
1976 rrd->rra_def[rra_idx].cf_nam,
1977 rra_step_cnt[rra_idx] - 1);
1980 rrd->rra_ptr[rra_idx].cur_row = 0;
1981 /* seek back to beginning of current rra */
1982 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1983 rrd_set_error("seek error in rrd");
1987 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1990 *rra_current = rra_start;
1992 if (!skip_update[rra_idx]) {
1993 if (*pcdp_summary != NULL) {
1994 rra_time = (current_time - current_time
1995 % (rrd->rra_def[rra_idx].pdp_cnt *
1996 rrd->stat_head->pdp_step))
1998 ((rra_step_cnt[rra_idx] -
1999 2) * rrd->rra_def[rra_idx].pdp_cnt *
2000 rrd->stat_head->pdp_step);
2002 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
2003 CDP_secondary_val, pcdp_summary,
2009 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
2010 sizeof(rrd_value_t);
2017 * Write out one row of values (one value per DS) to the archive.
2019 * Returns 0 on success, -1 on error.
2021 static int write_RRA_row(
2022 rrd_file_t *rrd_file,
2024 unsigned long rra_idx,
2025 unsigned long *rra_current,
2026 unsigned short CDP_scratch_idx,
2027 rrd_info_t ** pcdp_summary,
2030 unsigned long ds_idx, cdp_idx;
2033 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2034 /* compute the cdp index */
2035 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2037 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2038 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2039 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2041 if (*pcdp_summary != NULL) {
2042 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2043 /* append info to the return hash */
2044 *pcdp_summary = rrd_info_push(*pcdp_summary,
2046 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2047 rrd->rra_def[rra_idx].cf_nam,
2048 rrd->rra_def[rra_idx].pdp_cnt,
2049 rrd->ds_def[ds_idx].ds_nam),
2052 if (rrd_write(rrd_file,
2053 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2054 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2055 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2058 *rra_current += sizeof(rrd_value_t);
2064 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2066 * Returns 0 on success, -1 otherwise
2068 static int smooth_all_rras(
2070 rrd_file_t *rrd_file,
2071 unsigned long rra_begin)
2073 unsigned long rra_start = rra_begin;
2074 unsigned long rra_idx;
2076 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2077 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2078 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2080 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2082 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2083 if (rrd_test_error())
2086 rra_start += rrd->rra_def[rra_idx].row_cnt
2087 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2094 * Flush changes to disk (unless we're using mmap)
2096 * Returns 0 on success, -1 otherwise
2098 static int write_changes_to_disk(
2100 rrd_file_t *rrd_file,
2103 /* we just need to write back the live header portion now */
2104 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2105 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2106 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2108 rrd_set_error("seek rrd for live header writeback");
2112 if (rrd_write(rrd_file, rrd->live_head,
2113 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2114 rrd_set_error("rrd_write live_head to rrd");
2118 if (rrd_write(rrd_file, rrd->legacy_last_up,
2119 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2120 rrd_set_error("rrd_write live_head to rrd");
2126 if (rrd_write(rrd_file, rrd->pdp_prep,
2127 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2128 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2129 rrd_set_error("rrd_write pdp_prep to rrd");
2133 if (rrd_write(rrd_file, rrd->cdp_prep,
2134 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2135 rrd->stat_head->ds_cnt)
2136 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2137 rrd->stat_head->ds_cnt)) {
2139 rrd_set_error("rrd_write cdp_prep to rrd");
2143 if (rrd_write(rrd_file, rrd->rra_ptr,
2144 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2145 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2146 rrd_set_error("rrd_write rra_ptr to rrd");