1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 #include <sys/timeb.h>
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
42 static int gettimeofday(
44 struct __timezone *tz)
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
59 * normalize time as returned by gettimeofday. usec part must
62 static inline void normalize_time(
67 t->tv_usec += 1000000L;
71 static inline info_t *write_RRA_row(
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
80 unsigned long ds_idx, cdp_idx;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
97 rrd->rra_def[rra_idx].
99 rrd->rra_def[rra_idx].
102 ds_nam), RD_I_VAL, iv);
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111 *rra_current += sizeof(rrd_value_t);
113 return (pcdp_summary);
117 const char *filename,
122 const char *filename,
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
136 info_t *result = NULL;
141 opterr = 0; /* initialize getopt */
144 static struct option long_options[] = {
145 {"template", required_argument, 0, 't'},
148 int option_index = 0;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
186 struct option long_options[] = {
187 {"template", required_argument, 0, 't'},
190 int option_index = 0;
196 opterr = 0; /* initialize getopt */
199 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
206 tmplt = strdup(optarg);
210 rrd_set_error("unknown option '%s'", argv[optind - 1]);
215 /* need at least 2 arguments: filename, data. */
216 if (argc - optind < 2) {
217 rrd_set_error("Not enough arguments");
222 rc = rrd_update_r(argv[optind], tmplt,
223 argc - optind - 1, (const char **) (argv + optind + 1));
229 const char *filename,
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
238 const char *filename,
242 info_t *pcdp_summary)
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
261 unsigned long proc_pdp_st; /* which pdp_st was the last
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
320 rrd_set_error("Not enough arguments");
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
328 /* We are now at the beginning of the rra's */
329 rra_current = rra_start = rra_begin = rrd_file->header_len;
331 /* initialize time */
332 version = atoi(rrd.stat_head->version);
333 gettimeofday(&tmp_time, 0);
334 normalize_time(&tmp_time);
335 current_time = tmp_time.tv_sec;
337 current_time_usec = tmp_time.tv_usec;
339 current_time_usec = 0;
342 /* get exclusive lock to whole file.
343 * lock gets removed when we close the file.
345 if (LockRRD(rrd_file->fd) != 0) {
346 rrd_set_error("could not lock RRD");
351 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
352 rrd_set_error("allocating updvals pointer array");
356 if ((pdp_temp = malloc(sizeof(rrd_value_t)
357 * rrd.stat_head->ds_cnt)) == NULL) {
358 rrd_set_error("allocating pdp_temp ...");
359 goto err_free_updvals;
362 if ((tmpl_idx = malloc(sizeof(unsigned long)
363 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
364 rrd_set_error("allocating tmpl_idx ...");
365 goto err_free_pdp_temp;
367 /* initialize tmplt redirector */
368 /* default config example (assume DS 1 is a CDEF DS)
369 tmpl_idx[0] -> 0; (time)
370 tmpl_idx[1] -> 1; (DS 0)
371 tmpl_idx[2] -> 3; (DS 2)
372 tmpl_idx[3] -> 4; (DS 3) */
373 tmpl_idx[0] = 0; /* time */
374 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
375 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
381 /* we should work on a writeable copy here */
383 unsigned int tmpl_len;
384 char *tmplt_copy = strdup(tmplt);
387 tmpl_cnt = 1; /* the first entry is the time */
388 tmpl_len = strlen(tmplt_copy);
389 for (i = 0; i <= tmpl_len; i++) {
390 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
391 tmplt_copy[i] = '\0';
392 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
394 ("tmplt contains more DS definitions than RRD");
395 goto err_free_tmpl_idx;
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
398 rrd_set_error("unknown DS name '%s'", dsname);
399 goto err_free_tmpl_idx;
401 /* the first element is always the time */
402 tmpl_idx[tmpl_cnt - 1]++;
403 /* go to the next entry on the tmplt_copy */
404 dsname = &tmplt_copy[i + 1];
405 /* fix the damage we did before */
415 if ((pdp_new = malloc(sizeof(rrd_value_t)
416 * rrd.stat_head->ds_cnt)) == NULL) {
417 rrd_set_error("allocating pdp_new ...");
418 goto err_free_tmpl_idx;
420 /* loop through the arguments. */
421 for (arg_i = 0; arg_i < argc; arg_i++) {
422 char *stepper = strdup(argv[arg_i]);
423 char *step_start = stepper;
425 char *parsetime_error = NULL;
426 enum { atstyle, normal } timesyntax;
427 struct rrd_time_value ds_tv;
429 if (stepper == NULL) {
430 rrd_set_error("failed duplication argv entry");
432 goto err_free_pdp_new;
434 /* initialize all ds input to unknown except the first one
435 which has always got to be set */
436 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
438 updvals[0] = stepper;
439 /* separate all ds elements; first must be examined separately
440 due to alternate time syntax */
441 if ((p = strchr(stepper, '@')) != NULL) {
442 timesyntax = atstyle;
445 } else if ((p = strchr(stepper, ':')) != NULL) {
451 ("expected timestamp not found in data source from %s",
457 updvals[tmpl_idx[ii]] = stepper;
459 if (*stepper == ':') {
463 updvals[tmpl_idx[ii]] = stepper + 1;
469 if (ii != tmpl_cnt - 1) {
471 ("expected %lu data source readings (got %lu) from %s",
472 tmpl_cnt - 1, ii, argv[arg_i]);
477 /* get the time from the reading ... handle N */
478 if (timesyntax == atstyle) {
479 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
480 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
484 if (ds_tv.type == RELATIVE_TO_END_TIME ||
485 ds_tv.type == RELATIVE_TO_START_TIME) {
486 rrd_set_error("specifying time relative to the 'start' "
487 "or 'end' makes no sense here: %s", updvals[0]);
492 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
494 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
496 } else if (strcmp(updvals[0], "N") == 0) {
497 gettimeofday(&tmp_time, 0);
498 normalize_time(&tmp_time);
499 current_time = tmp_time.tv_sec;
500 current_time_usec = tmp_time.tv_usec;
504 tmp = strtod(updvals[0], 0);
505 current_time = floor(tmp);
507 (long) ((tmp - (double) current_time) * 1000000.0);
509 /* dont do any correction for old version RRDs */
511 current_time_usec = 0;
513 if (current_time < rrd.live_head->last_up ||
514 (current_time == rrd.live_head->last_up &&
515 (long) current_time_usec <=
516 (long) rrd.live_head->last_up_usec)) {
517 rrd_set_error("illegal attempt to update using time %ld when "
518 "last update time is %ld (minimum one second step)",
519 current_time, rrd.live_head->last_up);
524 /* seek to the beginning of the rra's */
525 if (rra_current != rra_begin) {
527 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
528 rrd_set_error("seek error in rrd");
533 rra_current = rra_begin;
535 rra_start = rra_begin;
537 /* when was the current pdp started */
538 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
539 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
541 /* when did the last pdp_st occur */
542 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
543 occu_pdp_st = current_time - occu_pdp_age;
545 /* interval = current_time - rrd.live_head->last_up; */
546 interval = (double) (current_time - rrd.live_head->last_up)
547 + (double) ((long) current_time_usec -
548 (long) rrd.live_head->last_up_usec) / 1000000.0;
550 if (occu_pdp_st > proc_pdp_st) {
551 /* OK we passed the pdp_st moment */
552 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
553 * occurred before the latest
555 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
556 post_int = occu_pdp_age; /* how much after it */
557 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
564 printf("proc_pdp_age %lu\t"
570 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
571 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
574 /* process the data sources and update the pdp_prep
575 * area accordingly */
576 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
579 dst_idx = dst_conv(rrd.ds_def[i].dst);
581 /* make sure we do not build diffs with old last_ds values */
582 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
583 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
584 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
587 /* NOTE: DST_CDEF should never enter this if block, because
588 * updvals[i+1][0] is initialized to 'U'; unless the caller
589 * accidently specified a value for the DST_CDEF. To handle
590 * this case, an extra check is required. */
592 if ((updvals[i + 1][0] != 'U') &&
593 (dst_idx != DST_CDEF) &&
594 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
597 /* the data source type defines how to process the data */
598 /* pdp_new contains rate * time ... eg the bytes
599 * transferred during the interval. Doing it this way saves
600 * a lot of math operations */
604 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
605 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
606 if ((updvals[i + 1][ii] < '0'
607 || updvals[i + 1][ii] > '9') && (ii != 0
613 rrd_set_error("not a simple integer: '%s'",
618 if (rrd_test_error()) {
622 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
623 if (dst_idx == DST_COUNTER) {
624 /* simple overflow catcher suggested by Andres Kroonmaa */
625 /* this will fail terribly for non 32 or 64 bit counters ... */
626 /* are there any others in SNMP land ? */
627 if (pdp_new[i] < (double) 0.0)
628 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
629 if (pdp_new[i] < (double) 0.0)
630 pdp_new[i] += (double) 18446744069414584320.0;
633 rate = pdp_new[i] / interval;
640 pdp_new[i] = strtod(updvals[i + 1], &endptr);
642 rrd_set_error("converting '%s' to float: %s",
643 updvals[i + 1], rrd_strerror(errno));
646 if (endptr[0] != '\0') {
648 ("conversion of '%s' to float not complete: tail '%s'",
649 updvals[i + 1], endptr);
652 rate = pdp_new[i] / interval;
656 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
658 rrd_set_error("converting '%s' to float: %s",
659 updvals[i + 1], rrd_strerror(errno));
662 if (endptr[0] != '\0') {
664 ("conversion of '%s' to float not complete: tail '%s'",
665 updvals[i + 1], endptr);
668 rate = pdp_new[i] / interval;
671 rrd_set_error("rrd contains unknown DS type : '%s'",
675 /* break out of this for loop if the error string is set */
676 if (rrd_test_error()) {
679 /* make sure pdp_temp is neither too large or too small
680 * if any of these occur it becomes unknown ...
683 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
684 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
685 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
686 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
690 /* no news is news all the same */
695 /* make a copy of the command line argument for the next run */
702 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
704 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
705 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
707 /* break out of the argument parsing loop if the error_string is set */
708 if (rrd_test_error()) {
712 /* has a pdp_st moment occurred since the last run ? */
714 if (proc_pdp_st == occu_pdp_st) {
715 /* no we have not passed a pdp_st moment. therefore update is simple */
717 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
718 if (isnan(pdp_new[i])) {
719 /* this is not realy accurate if we use subsecond data arival time
720 should have thought of it when going subsecond resolution ...
721 sorry next format change we will have it! */
722 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
725 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
726 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
728 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
737 rrd.pdp_prep[i].scratch[PDP_val].u_val,
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
742 /* an pdp_st has occurred. */
744 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
745 rate*seconds which occurred up to the last run.
746 pdp_new[] contains rate*seconds from the latest run.
747 pdp_temp[] will contain the rate for cdp */
749 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
750 /* update pdp_prep to the current pdp_st. */
751 double pre_unknown = 0.0;
753 if (isnan(pdp_new[i])) {
754 /* a final bit of unkonwn to be added bevore calculation
755 we use a temporary variable for this so that we
756 don't have to turn integer lines before using the value */
757 pre_unknown = pre_int;
759 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
760 rrd.pdp_prep[i].scratch[PDP_val].u_val =
761 pdp_new[i] / interval * pre_int;
763 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
764 pdp_new[i] / interval * pre_int;
769 /* if too much of the pdp_prep is unknown we dump it */
771 /* removed because this does not agree with the
772 definition that a heartbeat can be unknown */
773 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775 /* if the interval is larger thatn mrhb we get NAN */
776 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777 (occu_pdp_st - proc_pdp_st <=
778 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
781 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782 / ((double) (occu_pdp_st - proc_pdp_st
785 scratch[PDP_unkn_sec_cnt].u_cnt)
789 /* process CDEF data sources; remember each CDEF DS can
790 * only reference other DS with a lower index number */
791 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
795 rpn_expand((rpn_cdefds_t *) &
796 (rrd.ds_def[i].par[DS_cdef]));
797 /* substitue data values for OP_VARIABLE nodes */
798 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
799 if (rpnp[ii].op == OP_VARIABLE) {
800 rpnp[ii].op = OP_NUMBER;
801 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
804 /* run the rpn calculator */
805 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
807 break; /* exits the data sources pdp_temp loop */
811 /* make pdp_prep ready for the next run */
812 if (isnan(pdp_new[i])) {
813 /* this is not realy accurate if we use subsecond data arival time
814 should have thought of it when going subsecond resolution ...
815 sorry next format change we will have it! */
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
818 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
820 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821 rrd.pdp_prep[i].scratch[PDP_val].u_val =
822 pdp_new[i] / interval * post_int;
830 "new_unkn_sec %5lu\n",
832 rrd.pdp_prep[i].scratch[PDP_val].u_val,
833 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
837 /* if there were errors during the last loop, bail out here */
838 if (rrd_test_error()) {
843 /* compute the number of elapsed pdp_st moments */
845 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
847 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
849 if (rra_step_cnt == NULL) {
850 rra_step_cnt = (unsigned long *)
851 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
854 for (i = 0, rra_start = rra_begin;
855 i < rrd.stat_head->rra_cnt;
857 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
858 sizeof(rrd_value_t), i++) {
859 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861 (proc_pdp_st / rrd.stat_head->pdp_step) %
862 rrd.rra_def[i].pdp_cnt;
863 if (start_pdp_offset <= elapsed_pdp_st) {
864 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
865 rrd.rra_def[i].pdp_cnt + 1;
870 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
871 /* If this is a bulk update, we need to skip ahead in
872 the seasonal arrays so that they will be correct for
873 the next observed value;
874 note that for the bulk update itself, no update will
875 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
876 and DEVPREDICT will be set to DNAN. */
877 if (rra_step_cnt[i] > 2) {
878 /* skip update by resetting rra_step_cnt[i],
879 note that this is not data source specific; this is
880 due to the bulk update, not a DNAN value for the
881 specific data source. */
883 lookup_seasonal(&rrd, i, rra_start, rrd_file,
884 elapsed_pdp_st, &last_seasonal_coef);
885 lookup_seasonal(&rrd, i, rra_start, rrd_file,
886 elapsed_pdp_st + 1, &seasonal_coef);
889 /* periodically run a smoother for seasonal effects */
890 /* Need to use first cdp parameter buffer to track
891 * burnin (burnin requires a specific smoothing schedule).
892 * The CDP_init_seasonal parameter is really an RRA level,
893 * not a data source within RRA level parameter, but the rra_def
894 * is read only for rrd_update (not flushed to disk). */
895 iii = i * (rrd.stat_head->ds_cnt);
896 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
898 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
899 > rrd.rra_def[i].row_cnt - 1) {
900 /* mark off one of the burnin cycles */
901 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
906 /* someone has no doubt invented a trick to deal with this
907 * wrap around, but at least this code is clear. */
908 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
909 u_cnt > rrd.rra_ptr[i].cur_row) {
910 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
911 * mapping between PDP and CDP */
912 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
914 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
918 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919 rrd.rra_ptr[i].cur_row,
922 par[RRA_seasonal_smooth_idx].u_cnt);
927 /* can't rely on negative numbers because we are working with
929 /* Don't need modulus here. If we've wrapped more than once, only
930 * one smooth is executed at the end. */
931 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
932 rrd.rra_def[i].row_cnt
933 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
934 rrd.rra_def[i].row_cnt >=
935 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
939 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
940 rrd.rra_ptr[i].cur_row,
943 par[RRA_seasonal_smooth_idx].u_cnt);
950 rra_current = rrd_tell(rrd_file);
952 /* if cf is DEVSEASONAL or SEASONAL */
953 if (rrd_test_error())
956 /* update CDP_PREP areas */
957 /* loop over data soures within each RRA */
958 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
960 /* iii indexes the CDP prep area for this data source within the RRA */
961 iii = i * rrd.stat_head->ds_cnt + ii;
963 if (rrd.rra_def[i].pdp_cnt > 1) {
965 if (rra_step_cnt[i] > 0) {
966 /* If we are in this block, as least 1 CDP value will be written to
967 * disk, this is the CDP_primary_val entry. If more than 1 value needs
968 * to be written, then the "fill in" value is the CDP_secondary_val
970 if (isnan(pdp_temp[ii])) {
971 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
972 u_cnt += start_pdp_offset;
973 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
976 /* CDP_secondary value is the RRA "fill in" value for intermediary
977 * CDP data entries. No matter the CF, the value is the same because
978 * the average, max, min, and last of a list of identical values is
979 * the same, namely, the value itself. */
980 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
981 u_val = pdp_temp[ii];
984 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
986 rrd.rra_def[i].pdp_cnt *
987 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
988 rrd.cdp_prep[iii].scratch[CDP_primary_val].
990 /* initialize carry over */
991 if (current_cf == CF_AVERAGE) {
992 if (isnan(pdp_temp[ii])) {
993 rrd.cdp_prep[iii].scratch[CDP_val].
996 rrd.cdp_prep[iii].scratch[CDP_val].
1001 rrd.rra_def[i].pdp_cnt);
1004 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1008 rrd_value_t cum_val, cur_val;
1010 switch (current_cf) {
1013 IFDNAN(rrd.cdp_prep[iii].
1014 scratch[CDP_val].u_val, 0.0);
1015 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1017 scratch[CDP_primary_val].u_val =
1019 cur_val * start_pdp_offset) /
1020 (rrd.rra_def[i].pdp_cnt -
1022 scratch[CDP_unkn_pdp_cnt].u_cnt);
1023 /* initialize carry over value */
1024 if (isnan(pdp_temp[ii])) {
1025 rrd.cdp_prep[iii].scratch[CDP_val].
1028 rrd.cdp_prep[iii].scratch[CDP_val].
1033 rrd.rra_def[i].pdp_cnt);
1038 IFDNAN(rrd.cdp_prep[iii].
1039 scratch[CDP_val].u_val, -DINF);
1040 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1043 (rrd.cdp_prep[iii].scratch[CDP_val].
1044 u_val) && isnan(pdp_temp[ii])) {
1046 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1051 if (cur_val > cum_val)
1053 scratch[CDP_primary_val].u_val =
1057 scratch[CDP_primary_val].u_val =
1059 /* initialize carry over value */
1060 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1065 IFDNAN(rrd.cdp_prep[iii].
1066 scratch[CDP_val].u_val, DINF);
1067 cur_val = IFDNAN(pdp_temp[ii], DINF);
1070 (rrd.cdp_prep[iii].scratch[CDP_val].
1071 u_val) && isnan(pdp_temp[ii])) {
1073 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1078 if (cur_val < cum_val)
1080 scratch[CDP_primary_val].u_val =
1084 scratch[CDP_primary_val].u_val =
1086 /* initialize carry over value */
1087 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1093 scratch[CDP_primary_val].u_val =
1095 /* initialize carry over value */
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1100 } /* endif meets xff value requirement for a valid value */
1101 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1102 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1103 if (isnan(pdp_temp[ii]))
1104 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1108 rrd.rra_def[i].pdp_cnt;
1110 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1112 } else { /* rra_step_cnt[i] == 0 */
1116 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1118 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1122 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1124 rrd.cdp_prep[iii].scratch[CDP_val].
1128 if (isnan(pdp_temp[ii])) {
1129 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1130 u_cnt += elapsed_pdp_st;
1133 (rrd.cdp_prep[iii].scratch[CDP_val].
1135 if (current_cf == CF_AVERAGE) {
1136 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1137 pdp_temp[ii] * elapsed_pdp_st;
1139 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1144 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1146 rrd.cdp_prep[iii].scratch[CDP_val].
1150 switch (current_cf) {
1152 rrd.cdp_prep[iii].scratch[CDP_val].
1154 pdp_temp[ii] * elapsed_pdp_st;
1158 rrd.cdp_prep[iii].scratch[CDP_val].
1160 rrd.cdp_prep[iii].scratch[CDP_val].
1161 u_val = pdp_temp[ii];
1165 rrd.cdp_prep[iii].scratch[CDP_val].
1167 rrd.cdp_prep[iii].scratch[CDP_val].
1168 u_val = pdp_temp[ii];
1172 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1178 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1179 if (elapsed_pdp_st > 2) {
1180 switch (current_cf) {
1183 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1184 u_val = pdp_temp[ii];
1185 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1186 u_val = pdp_temp[ii];
1189 case CF_DEVSEASONAL:
1190 /* need to update cached seasonal values, so they are consistent
1191 * with the bulk update */
1192 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1193 * CDP_last_deviation are the same. */
1195 scratch[CDP_hw_last_seasonal].u_val =
1196 last_seasonal_coef[ii];
1197 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1198 u_val = seasonal_coef[ii];
1201 /* need to update the null_count and last_null_count.
1202 * even do this for non-DNAN pdp_temp because the
1203 * algorithm is not learning from batch updates. */
1204 rrd.cdp_prep[iii].scratch[CDP_null_count].
1205 u_cnt += elapsed_pdp_st;
1207 scratch[CDP_last_null_count].u_cnt +=
1211 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1213 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1217 /* do not count missed bulk values as failures */
1218 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1220 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1222 /* need to reset violations buffer.
1223 * could do this more carefully, but for now, just
1224 * assume a bulk update wipes away all violations. */
1225 erase_violations(&rrd, iii, i);
1229 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1231 if (rrd_test_error())
1234 } /* endif data sources loop */
1235 } /* end RRA Loop */
1237 /* this loop is only entered if elapsed_pdp_st < 3 */
1238 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1239 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1240 for (i = 0, rra_start = rra_begin;
1241 i < rrd.stat_head->rra_cnt;
1243 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1244 sizeof(rrd_value_t), i++) {
1245 if (rrd.rra_def[i].pdp_cnt > 1)
1248 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1249 if (current_cf == CF_SEASONAL
1250 || current_cf == CF_DEVSEASONAL) {
1251 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1252 elapsed_pdp_st + (scratch_idx ==
1256 rra_current = rrd_tell(rrd_file);
1258 if (rrd_test_error())
1260 /* loop over data soures within each RRA */
1261 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1262 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1263 i * (rrd.stat_head->ds_cnt) + ii,
1264 i, ii, scratch_idx, seasonal_coef);
1266 } /* end RRA Loop */
1267 if (rrd_test_error())
1269 } /* end elapsed_pdp_st loop */
1271 if (rrd_test_error())
1274 /* Ready to write to disk */
1275 /* Move sequentially through the file, writing one RRA at a time.
1276 * Note this architecture divorces the computation of CDP with
1277 * flushing updated RRA entries to disk. */
1278 for (i = 0, rra_start = rra_begin;
1279 i < rrd.stat_head->rra_cnt;
1281 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1282 sizeof(rrd_value_t), i++) {
1283 /* is th5Aere anything to write for this RRA? If not, continue. */
1284 if (rra_step_cnt[i] == 0)
1287 /* write the first row */
1289 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1291 rrd.rra_ptr[i].cur_row++;
1292 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1293 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1294 /* positition on the first row */
1295 rra_pos_tmp = rra_start +
1296 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1297 sizeof(rrd_value_t);
1298 if (rra_pos_tmp != rra_current) {
1299 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1300 rrd_set_error("seek error in rrd");
1303 rra_current = rra_pos_tmp;
1306 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1308 scratch_idx = CDP_primary_val;
1309 if (pcdp_summary != NULL) {
1310 rra_time = (current_time - current_time
1311 % (rrd.rra_def[i].pdp_cnt *
1312 rrd.stat_head->pdp_step))
1315 1) * rrd.rra_def[i].pdp_cnt *
1316 rrd.stat_head->pdp_step);
1319 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1320 scratch_idx, pcdp_summary, &rra_time);
1321 if (rrd_test_error())
1324 /* write other rows of the bulk update, if any */
1325 scratch_idx = CDP_secondary_val;
1326 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1327 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1330 "Wraparound for RRA %s, %lu updates left\n",
1331 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1334 rrd.rra_ptr[i].cur_row = 0;
1335 /* seek back to beginning of current rra */
1336 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1337 rrd_set_error("seek error in rrd");
1341 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1344 rra_current = rra_start;
1346 if (pcdp_summary != NULL) {
1347 rra_time = (current_time - current_time
1348 % (rrd.rra_def[i].pdp_cnt *
1349 rrd.stat_head->pdp_step))
1352 2) * rrd.rra_def[i].pdp_cnt *
1353 rrd.stat_head->pdp_step);
1356 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1357 scratch_idx, pcdp_summary, &rra_time);
1360 if (rrd_test_error())
1364 /* break out of the argument parsing loop if error_string is set */
1365 if (rrd_test_error()) {
1370 } /* endif a pdp_st has occurred */
1371 rrd.live_head->last_up = current_time;
1372 rrd.live_head->last_up_usec = current_time_usec;
1374 } /* function argument loop */
1376 if (seasonal_coef != NULL)
1377 free(seasonal_coef);
1378 if (last_seasonal_coef != NULL)
1379 free(last_seasonal_coef);
1380 if (rra_step_cnt != NULL)
1382 rpnstack_free(&rpnstack);
1385 //rrd_flush(rrd_file); //XXX: really needed?
1387 /* if we got here and if there is an error and if the file has not been
1388 * written to, then close things up and return. */
1389 if (rrd_test_error()) {
1390 goto err_free_pdp_new;
1393 /* aargh ... that was tough ... so many loops ... anyway, its done.
1394 * we just need to write back the live header portion now*/
1396 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1397 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1398 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1400 rrd_set_error("seek rrd for live header writeback");
1401 goto err_free_pdp_new;
1403 /* for mmap, we did already write to the underlying mapping, so we do
1404 not need to write again. */
1407 if (rrd_write(rrd_file, rrd.live_head,
1408 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1409 rrd_set_error("rrd_write live_head to rrd");
1410 goto err_free_pdp_new;
1413 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1414 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1415 rrd_set_error("rrd_write live_head to rrd");
1416 goto err_free_pdp_new;
1421 if (rrd_write(rrd_file, rrd.pdp_prep,
1422 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1423 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1424 rrd_set_error("rrd_write pdp_prep to rrd");
1425 goto err_free_pdp_new;
1428 if (rrd_write(rrd_file, rrd.cdp_prep,
1429 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1430 rrd.stat_head->ds_cnt)
1431 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1432 rrd.stat_head->ds_cnt)) {
1434 rrd_set_error("rrd_write cdp_prep to rrd");
1435 goto err_free_pdp_new;
1438 if (rrd_write(rrd_file, rrd.rra_ptr,
1439 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1440 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1441 rrd_set_error("rrd_write rra_ptr to rrd");
1442 goto err_free_pdp_new;
1445 #ifdef HAVE_POSIX_FADVISExxx
1447 /* with update we have write ops, so they will probably not be done by now, this means
1448 the buffers will not get freed. But calling this for the whole file - header
1449 will let the data off the hook as soon as it is written when if it is from a previous
1450 update cycle. Calling fdsync to force things is much too hard here. */
1452 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1453 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1454 rrd_strerror(errno));
1455 goto err_free_pdp_new;
1458 /* rrd_flush(rrd_file); */
1460 /* calling the smoothing code here guarantees at most
1461 * one smoothing operation per rrd_update call. Unfortunately,
1462 * it is possible with bulk updates, or a long-delayed update
1463 * for smoothing to occur off-schedule. This really isn't
1464 * critical except during the burning cycles. */
1465 if (schedule_smooth) {
1467 rra_start = rra_begin;
1468 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1469 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1470 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1472 fprintf(stderr, "Running smoother for rra %ld\n", i);
1474 apply_smoother(&rrd, i, rra_start, rrd_file);
1475 if (rrd_test_error())
1478 rra_start += rrd.rra_def[i].row_cnt
1479 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1481 #ifdef HAVE_POSIX_FADVISExxx
1482 /* same procedure as above ... */
1484 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1485 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1486 rrd_strerror(errno));
1487 goto err_free_pdp_new;
1493 rrd_close(rrd_file);
1510 rrd_close(rrd_file);
1518 * get exclusive lock to whole file.
1519 * lock gets removed when we close the file
1521 * returns 0 on success
1529 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1532 if (_fstat(in_file, &st) == 0) {
1533 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1540 lock.l_type = F_WRLCK; /* exclusive write lock */
1541 lock.l_len = 0; /* whole file */
1542 lock.l_start = 0; /* start of file */
1543 lock.l_whence = SEEK_SET; /* end of file */
1545 rcstat = fcntl(in_file, F_SETLK, &lock);