1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
13 # include <sys/mman.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(
49 struct __timezone *tz)
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
64 * normilize time as returned by gettimeofday. usec part must
67 static void normalize_time(
72 t->tv_usec += 1000000L;
76 /* Local prototypes */
81 info_t *write_RRA_row(
83 unsigned long rra_idx,
84 unsigned long *rra_current,
85 unsigned short CDP_scratch_idx,
93 void *rrd_mmaped_file);
95 info_t *write_RRA_row(
97 unsigned long rra_idx,
98 unsigned long *rra_current,
99 unsigned short CDP_scratch_idx,
101 info_t *pcdp_summary,
105 const char *filename,
110 const char *filename,
116 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
119 info_t *rrd_update_v(
124 info_t *result = NULL;
129 opterr = 0; /* initialize getopt */
132 static struct option long_options[] = {
133 {"template", required_argument, 0, 't'},
136 int option_index = 0;
139 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
150 rrd_set_error("unknown option '%s'", argv[optind - 1]);
155 /* need at least 2 arguments: filename, data. */
156 if (argc - optind < 2) {
157 rrd_set_error("Not enough arguments");
161 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
162 rc.u_int = _rrd_update(argv[optind], tmplt,
164 (const char **) (argv + optind + 1), result);
165 result->value.u_int = rc.u_int;
178 opterr = 0; /* initialize getopt */
181 static struct option long_options[] = {
182 {"template", required_argument, 0, 't'},
185 int option_index = 0;
188 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
199 rrd_set_error("unknown option '%s'", argv[optind - 1]);
204 /* need at least 2 arguments: filename, data. */
205 if (argc - optind < 2) {
206 rrd_set_error("Not enough arguments");
211 rc = rrd_update_r(argv[optind], tmplt,
212 argc - optind - 1, (const char **) (argv + optind + 1));
217 const char *filename,
222 return _rrd_update(filename, tmplt, argc, argv, NULL);
226 const char *filename,
230 info_t *pcdp_summary)
235 unsigned long i, ii, iii = 1;
237 unsigned long rra_begin; /* byte pointer to the rra
238 * area in the rrd file. this
239 * pointer never changes value */
240 unsigned long rra_start; /* byte pointer to the rra
241 * area in the rrd file. this
242 * pointer changes as each rrd is
244 unsigned long rra_current; /* byte pointer to the current write
245 * spot in the rrd file. */
246 unsigned long rra_pos_tmp; /* temporary byte pointer. */
247 double interval, pre_int, post_int; /* interval between this and
249 unsigned long proc_pdp_st; /* which pdp_st was the last
251 unsigned long occu_pdp_st; /* when was the pdp_st
252 * before the last update
254 unsigned long proc_pdp_age; /* how old was the data in
255 * the pdp prep area when it
256 * was last updated */
257 unsigned long occu_pdp_age; /* how long ago was the last
259 rrd_value_t *pdp_new; /* prepare the incoming data
260 * to be added the the
262 rrd_value_t *pdp_temp; /* prepare the pdp values
263 * to be added the the
266 long *tmpl_idx; /* index representing the settings
267 transported by the tmplt index */
268 unsigned long tmpl_cnt = 2; /* time and data */
271 time_t current_time = 0;
272 time_t rra_time = 0; /* time of update for a RRA */
273 unsigned long current_time_usec = 0; /* microseconds part of current time */
274 struct timeval tmp_time; /* used for time conversion */
277 int schedule_smooth = 0;
278 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
280 /* a vector of future Holt-Winters seasonal coefs */
281 unsigned long elapsed_pdp_st;
283 /* number of elapsed PDP steps since last update */
284 unsigned long *rra_step_cnt = NULL;
286 /* number of rows to be updated in an RRA for a data
288 unsigned long start_pdp_offset;
290 /* number of PDP steps since the last update that
291 * are assigned to the first CDP to be generated
292 * since the last update. */
293 unsigned short scratch_idx;
295 /* index into the CDP scratch array */
296 enum cf_en current_cf;
298 /* numeric id of the current consolidation function */
299 rpnstack_t rpnstack; /* used for COMPUTE DS */
300 int version; /* rrd version */
301 char *endptr; /* used in the conversion */
302 rrd_file_t *rrd_file;
304 rpnstack_init(&rpnstack);
306 /* need at least 1 arguments: data. */
308 rrd_set_error("Not enough arguments");
312 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
313 if (rrd_file == NULL) {
317 /* initialize time */
318 version = atoi(rrd.stat_head->version);
319 gettimeofday(&tmp_time, 0);
320 normalize_time(&tmp_time);
321 current_time = tmp_time.tv_sec;
323 current_time_usec = tmp_time.tv_usec;
325 current_time_usec = 0;
328 rra_current = rra_start = rra_begin = rrd_file->header_len;
329 /* This is defined in the ANSI C standard, section 7.9.5.3:
331 When a file is opened with udpate mode ('+' as the second
332 or third character in the ... list of mode argument
333 variables), both input and output may be performed on the
334 associated stream. However, ... input may not be directly
335 followed by output without an intervening call to a file
336 positioning function, unless the input operation encounters
338 #if 0 //def HAVE_MMAP
339 rrd_filesize = rrd_file->file_size;
340 fseek(rrd_file->fd, 0, SEEK_END);
341 rrd_filesize = ftell(rrd_file->fd);
342 fseek(rrd_file->fd, rra_current, SEEK_SET);
344 // fseek(rrd_file->fd, 0, SEEK_CUR);
348 /* get exclusive lock to whole file.
349 * lock gets removed when we close the file.
351 if (LockRRD(rrd_file->fd) != 0) {
352 rrd_set_error("could not lock RRD");
359 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
360 rrd_set_error("allocating updvals pointer array");
366 if ((pdp_temp = malloc(sizeof(rrd_value_t)
367 * rrd.stat_head->ds_cnt)) == NULL) {
368 rrd_set_error("allocating pdp_temp ...");
375 if ((tmpl_idx = malloc(sizeof(unsigned long)
376 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
377 rrd_set_error("allocating tmpl_idx ...");
384 /* initialize tmplt redirector */
385 /* default config example (assume DS 1 is a CDEF DS)
386 tmpl_idx[0] -> 0; (time)
387 tmpl_idx[1] -> 1; (DS 0)
388 tmpl_idx[2] -> 3; (DS 2)
389 tmpl_idx[3] -> 4; (DS 3) */
390 tmpl_idx[0] = 0; /* time */
391 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
392 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
398 /* we should work on a writeable copy here */
400 unsigned int tmpl_len;
401 char *tmplt_copy = strdup(tmplt);
404 tmpl_cnt = 1; /* the first entry is the time */
405 tmpl_len = strlen(tmplt_copy);
406 for (i = 0; i <= tmpl_len; i++) {
407 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
408 tmplt_copy[i] = '\0';
409 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
411 ("tmplt contains more DS definitions than RRD");
419 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
420 rrd_set_error("unknown DS name '%s'", dsname);
429 /* the first element is always the time */
430 tmpl_idx[tmpl_cnt - 1]++;
431 /* go to the next entry on the tmplt_copy */
432 dsname = &tmplt_copy[i + 1];
433 /* fix the damage we did before */
443 if ((pdp_new = malloc(sizeof(rrd_value_t)
444 * rrd.stat_head->ds_cnt)) == NULL) {
445 rrd_set_error("allocating pdp_new ...");
453 #if 0 //def HAVE_MMAP
454 rrd_mmaped_file = mmap(0,
456 PROT_READ | PROT_WRITE,
457 MAP_SHARED, fileno(in_file), 0);
458 if (rrd_mmaped_file == MAP_FAILED) {
459 rrd_set_error("error mmapping file %s", filename);
468 /* when we use mmaping we tell the kernel the mmap equivalent
469 of POSIX_FADV_RANDOM */
470 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
473 /* loop through the arguments. */
474 for (arg_i = 0; arg_i < argc; arg_i++) {
475 char *stepper = strdup(argv[arg_i]);
476 char *step_start = stepper;
478 char *parsetime_error = NULL;
479 enum { atstyle, normal } timesyntax;
480 struct rrd_time_value ds_tv;
482 if (stepper == NULL) {
483 rrd_set_error("failed duplication argv entry");
495 /* initialize all ds input to unknown except the first one
496 which has always got to be set */
497 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
499 updvals[0] = stepper;
500 /* separate all ds elements; first must be examined separately
501 due to alternate time syntax */
502 if ((p = strchr(stepper, '@')) != NULL) {
503 timesyntax = atstyle;
506 } else if ((p = strchr(stepper, ':')) != NULL) {
512 ("expected timestamp not found in data source from %s",
518 updvals[tmpl_idx[ii]] = stepper;
520 if (*stepper == ':') {
524 updvals[tmpl_idx[ii]] = stepper + 1;
530 if (ii != tmpl_cnt - 1) {
532 ("expected %lu data source readings (got %lu) from %s",
533 tmpl_cnt - 1, ii, argv[arg_i]);
538 /* get the time from the reading ... handle N */
539 if (timesyntax == atstyle) {
540 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
541 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
545 if (ds_tv.type == RELATIVE_TO_END_TIME ||
546 ds_tv.type == RELATIVE_TO_START_TIME) {
547 rrd_set_error("specifying time relative to the 'start' "
548 "or 'end' makes no sense here: %s", updvals[0]);
553 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
554 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
556 } else if (strcmp(updvals[0], "N") == 0) {
557 gettimeofday(&tmp_time, 0);
558 normalize_time(&tmp_time);
559 current_time = tmp_time.tv_sec;
560 current_time_usec = tmp_time.tv_usec;
564 tmp = strtod(updvals[0], 0);
565 current_time = floor(tmp);
567 (long) ((tmp - (double) current_time) * 1000000.0);
569 /* dont do any correction for old version RRDs */
571 current_time_usec = 0;
573 if (current_time < rrd.live_head->last_up ||
574 (current_time == rrd.live_head->last_up &&
575 (long) current_time_usec <=
576 (long) rrd.live_head->last_up_usec)) {
577 rrd_set_error("illegal attempt to update using time %ld when "
578 "last update time is %ld (minimum one second step)",
579 current_time, rrd.live_head->last_up);
585 /* seek to the beginning of the rra's */
586 if (rra_current != rra_begin) {
588 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
589 rrd_set_error("seek error in rrd");
594 rra_current = rra_begin;
596 rra_start = rra_begin;
598 /* when was the current pdp started */
599 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
600 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
602 /* when did the last pdp_st occur */
603 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
604 occu_pdp_st = current_time - occu_pdp_age;
606 /* interval = current_time - rrd.live_head->last_up; */
607 interval = (double) (current_time - rrd.live_head->last_up)
608 + (double) ((long) current_time_usec -
609 (long) rrd.live_head->last_up_usec) / 1000000.0;
611 if (occu_pdp_st > proc_pdp_st) {
612 /* OK we passed the pdp_st moment */
613 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
614 * occurred before the latest
616 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
617 post_int = occu_pdp_age; /* how much after it */
618 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
625 printf("proc_pdp_age %lu\t"
631 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
632 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
635 /* process the data sources and update the pdp_prep
636 * area accordingly */
637 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
640 dst_idx = dst_conv(rrd.ds_def[i].dst);
642 /* make sure we do not build diffs with old last_ds values */
643 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
644 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
645 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
648 /* NOTE: DST_CDEF should never enter this if block, because
649 * updvals[i+1][0] is initialized to 'U'; unless the caller
650 * accidently specified a value for the DST_CDEF. To handle
651 * this case, an extra check is required. */
653 if ((updvals[i + 1][0] != 'U') &&
654 (dst_idx != DST_CDEF) &&
655 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
658 /* the data source type defines how to process the data */
659 /* pdp_new contains rate * time ... eg the bytes
660 * transferred during the interval. Doing it this way saves
661 * a lot of math operations */
667 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
668 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
669 if ((updvals[i + 1][ii] < '0'
670 || updvals[i + 1][ii] > '9') && (ii != 0
676 rrd_set_error("not a simple integer: '%s'",
681 if (rrd_test_error()) {
685 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
686 if (dst_idx == DST_COUNTER) {
687 /* simple overflow catcher suggested by Andres Kroonmaa */
688 /* this will fail terribly for non 32 or 64 bit counters ... */
689 /* are there any others in SNMP land ? */
690 if (pdp_new[i] < (double) 0.0)
691 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
692 if (pdp_new[i] < (double) 0.0)
693 pdp_new[i] += (double) 18446744069414584320.0;
696 rate = pdp_new[i] / interval;
703 pdp_new[i] = strtod(updvals[i + 1], &endptr);
705 rrd_set_error("converting '%s' to float: %s",
706 updvals[i + 1], rrd_strerror(errno));
709 if (endptr[0] != '\0') {
711 ("conversion of '%s' to float not complete: tail '%s'",
712 updvals[i + 1], endptr);
715 rate = pdp_new[i] / interval;
719 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
721 rrd_set_error("converting '%s' to float: %s",
722 updvals[i + 1], rrd_strerror(errno));
725 if (endptr[0] != '\0') {
727 ("conversion of '%s' to float not complete: tail '%s'",
728 updvals[i + 1], endptr);
731 rate = pdp_new[i] / interval;
734 rrd_set_error("rrd contains unknown DS type : '%s'",
738 /* break out of this for loop if the error string is set */
739 if (rrd_test_error()) {
742 /* make sure pdp_temp is neither too large or too small
743 * if any of these occur it becomes unknown ...
746 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
747 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
748 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
749 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
753 /* no news is news all the same */
758 /* make a copy of the command line argument for the next run */
765 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
767 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
768 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
770 /* break out of the argument parsing loop if the error_string is set */
771 if (rrd_test_error()) {
775 /* has a pdp_st moment occurred since the last run ? */
777 if (proc_pdp_st == occu_pdp_st) {
778 /* no we have not passed a pdp_st moment. therefore update is simple */
780 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
781 if (isnan(pdp_new[i])) {
782 /* this is not realy accurate if we use subsecond data arival time
783 should have thought of it when going subsecond resolution ...
784 sorry next format change we will have it! */
785 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
788 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
789 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
791 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
800 rrd.pdp_prep[i].scratch[PDP_val].u_val,
801 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
805 /* an pdp_st has occurred. */
807 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
808 * occurred up to the last run.
809 pdp_new[] contains rate*seconds from the latest run.
810 pdp_temp[] will contain the rate for cdp */
812 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
813 /* update pdp_prep to the current pdp_st. */
814 double pre_unknown = 0.0;
816 if (isnan(pdp_new[i]))
817 /* a final bit of unkonwn to be added bevore calculation
818 * we use a tempaorary variable for this so that we
819 * don't have to turn integer lines before using the value */
820 pre_unknown = pre_int;
822 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
823 rrd.pdp_prep[i].scratch[PDP_val].u_val =
824 pdp_new[i] / interval * pre_int;
826 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
827 pdp_new[i] / interval * pre_int;
832 /* if too much of the pdp_prep is unknown we dump it */
834 /* removed because this does not agree with the definition
835 a heart beat can be unknown */
836 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
837 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
838 /* if the interval is larger thatn mrhb we get NAN */
839 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
840 (occu_pdp_st - proc_pdp_st <=
841 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
844 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
845 / ((double) (occu_pdp_st - proc_pdp_st
848 scratch[PDP_unkn_sec_cnt].u_cnt)
852 /* process CDEF data sources; remember each CDEF DS can
853 * only reference other DS with a lower index number */
854 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
858 rpn_expand((rpn_cdefds_t *) &
859 (rrd.ds_def[i].par[DS_cdef]));
860 /* substitue data values for OP_VARIABLE nodes */
861 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
862 if (rpnp[ii].op == OP_VARIABLE) {
863 rpnp[ii].op = OP_NUMBER;
864 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
867 /* run the rpn calculator */
868 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
870 break; /* exits the data sources pdp_temp loop */
874 /* make pdp_prep ready for the next run */
875 if (isnan(pdp_new[i])) {
876 /* this is not realy accurate if we use subsecond data arival time
877 should have thought of it when going subsecond resolution ...
878 sorry next format change we will have it! */
879 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
881 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
883 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
884 rrd.pdp_prep[i].scratch[PDP_val].u_val =
885 pdp_new[i] / interval * post_int;
893 "new_unkn_sec %5lu\n",
895 rrd.pdp_prep[i].scratch[PDP_val].u_val,
896 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
900 /* if there were errors during the last loop, bail out here */
901 if (rrd_test_error()) {
906 /* compute the number of elapsed pdp_st moments */
908 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
910 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
912 if (rra_step_cnt == NULL) {
913 rra_step_cnt = (unsigned long *)
914 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
917 for (i = 0, rra_start = rra_begin;
918 i < rrd.stat_head->rra_cnt;
920 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
921 sizeof(rrd_value_t), i++) {
922 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
923 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
924 (proc_pdp_st / rrd.stat_head->pdp_step) %
925 rrd.rra_def[i].pdp_cnt;
926 if (start_pdp_offset <= elapsed_pdp_st) {
927 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
928 rrd.rra_def[i].pdp_cnt + 1;
933 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
934 /* If this is a bulk update, we need to skip ahead in the seasonal
935 * arrays so that they will be correct for the next observed value;
936 * note that for the bulk update itself, no update will occur to
937 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
939 if (rra_step_cnt[i] > 2) {
940 /* skip update by resetting rra_step_cnt[i],
941 * note that this is not data source specific; this is due
942 * to the bulk update, not a DNAN value for the specific data
945 lookup_seasonal(&rrd, i, rra_start, rrd_file,
946 elapsed_pdp_st, &last_seasonal_coef);
947 lookup_seasonal(&rrd, i, rra_start, rrd_file,
948 elapsed_pdp_st + 1, &seasonal_coef);
951 /* periodically run a smoother for seasonal effects */
952 /* Need to use first cdp parameter buffer to track
953 * burnin (burnin requires a specific smoothing schedule).
954 * The CDP_init_seasonal parameter is really an RRA level,
955 * not a data source within RRA level parameter, but the rra_def
956 * is read only for rrd_update (not flushed to disk). */
957 iii = i * (rrd.stat_head->ds_cnt);
958 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
960 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
961 > rrd.rra_def[i].row_cnt - 1) {
962 /* mark off one of the burnin cycles */
963 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
968 /* someone has no doubt invented a trick to deal with this
969 * wrap around, but at least this code is clear. */
970 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
971 u_cnt > rrd.rra_ptr[i].cur_row) {
972 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
973 * mapping between PDP and CDP */
974 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
976 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
980 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
981 rrd.rra_ptr[i].cur_row,
984 par[RRA_seasonal_smooth_idx].u_cnt);
989 /* can't rely on negative numbers because we are working with
991 /* Don't need modulus here. If we've wrapped more than once, only
992 * one smooth is executed at the end. */
993 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
994 rrd.rra_def[i].row_cnt
995 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
996 rrd.rra_def[i].row_cnt >=
997 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
1001 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1002 rrd.rra_ptr[i].cur_row,
1005 par[RRA_seasonal_smooth_idx].u_cnt);
1007 schedule_smooth = 1;
1012 rra_current = rrd_tell(rrd_file);
1014 /* if cf is DEVSEASONAL or SEASONAL */
1015 if (rrd_test_error())
1018 /* update CDP_PREP areas */
1019 /* loop over data soures within each RRA */
1020 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1022 /* iii indexes the CDP prep area for this data source within the RRA */
1023 iii = i * rrd.stat_head->ds_cnt + ii;
1025 if (rrd.rra_def[i].pdp_cnt > 1) {
1027 if (rra_step_cnt[i] > 0) {
1028 /* If we are in this block, as least 1 CDP value will be written to
1029 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1030 * to be written, then the "fill in" value is the CDP_secondary_val
1032 if (isnan(pdp_temp[ii])) {
1033 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1034 u_cnt += start_pdp_offset;
1035 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1038 /* CDP_secondary value is the RRA "fill in" value for intermediary
1039 * CDP data entries. No matter the CF, the value is the same because
1040 * the average, max, min, and last of a list of identical values is
1041 * the same, namely, the value itself. */
1042 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1043 u_val = pdp_temp[ii];
1046 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1048 rrd.rra_def[i].pdp_cnt *
1049 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1050 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1052 /* initialize carry over */
1053 if (current_cf == CF_AVERAGE) {
1054 if (isnan(pdp_temp[ii])) {
1055 rrd.cdp_prep[iii].scratch[CDP_val].
1058 rrd.cdp_prep[iii].scratch[CDP_val].
1063 rrd.rra_def[i].pdp_cnt);
1066 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1070 rrd_value_t cum_val, cur_val;
1072 switch (current_cf) {
1075 IFDNAN(rrd.cdp_prep[iii].
1076 scratch[CDP_val].u_val, 0.0);
1077 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1079 scratch[CDP_primary_val].u_val =
1081 cur_val * start_pdp_offset) /
1082 (rrd.rra_def[i].pdp_cnt -
1084 scratch[CDP_unkn_pdp_cnt].u_cnt);
1085 /* initialize carry over value */
1086 if (isnan(pdp_temp[ii])) {
1087 rrd.cdp_prep[iii].scratch[CDP_val].
1090 rrd.cdp_prep[iii].scratch[CDP_val].
1095 rrd.rra_def[i].pdp_cnt);
1100 IFDNAN(rrd.cdp_prep[iii].
1101 scratch[CDP_val].u_val, -DINF);
1102 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1105 (rrd.cdp_prep[iii].scratch[CDP_val].
1106 u_val) && isnan(pdp_temp[ii])) {
1108 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1113 if (cur_val > cum_val)
1115 scratch[CDP_primary_val].u_val =
1119 scratch[CDP_primary_val].u_val =
1121 /* initialize carry over value */
1122 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1127 IFDNAN(rrd.cdp_prep[iii].
1128 scratch[CDP_val].u_val, DINF);
1129 cur_val = IFDNAN(pdp_temp[ii], DINF);
1132 (rrd.cdp_prep[iii].scratch[CDP_val].
1133 u_val) && isnan(pdp_temp[ii])) {
1135 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1140 if (cur_val < cum_val)
1142 scratch[CDP_primary_val].u_val =
1146 scratch[CDP_primary_val].u_val =
1148 /* initialize carry over value */
1149 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1155 scratch[CDP_primary_val].u_val =
1157 /* initialize carry over value */
1158 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1162 } /* endif meets xff value requirement for a valid value */
1163 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1164 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1165 if (isnan(pdp_temp[ii]))
1166 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1170 rrd.rra_def[i].pdp_cnt;
1172 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1174 } else { /* rra_step_cnt[i] == 0 */
1178 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1180 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1184 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1186 rrd.cdp_prep[iii].scratch[CDP_val].
1190 if (isnan(pdp_temp[ii])) {
1191 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1192 u_cnt += elapsed_pdp_st;
1195 (rrd.cdp_prep[iii].scratch[CDP_val].
1197 if (current_cf == CF_AVERAGE) {
1198 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1199 pdp_temp[ii] * elapsed_pdp_st;
1201 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1206 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1208 rrd.cdp_prep[iii].scratch[CDP_val].
1212 switch (current_cf) {
1214 rrd.cdp_prep[iii].scratch[CDP_val].
1216 pdp_temp[ii] * elapsed_pdp_st;
1220 rrd.cdp_prep[iii].scratch[CDP_val].
1222 rrd.cdp_prep[iii].scratch[CDP_val].
1223 u_val = pdp_temp[ii];
1227 rrd.cdp_prep[iii].scratch[CDP_val].
1229 rrd.cdp_prep[iii].scratch[CDP_val].
1230 u_val = pdp_temp[ii];
1234 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1240 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1241 if (elapsed_pdp_st > 2) {
1242 switch (current_cf) {
1245 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1246 u_val = pdp_temp[ii];
1247 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1248 u_val = pdp_temp[ii];
1251 case CF_DEVSEASONAL:
1252 /* need to update cached seasonal values, so they are consistent
1253 * with the bulk update */
1254 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1255 * CDP_last_deviation are the same. */
1257 scratch[CDP_hw_last_seasonal].u_val =
1258 last_seasonal_coef[ii];
1259 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1260 u_val = seasonal_coef[ii];
1263 /* need to update the null_count and last_null_count.
1264 * even do this for non-DNAN pdp_temp because the
1265 * algorithm is not learning from batch updates. */
1266 rrd.cdp_prep[iii].scratch[CDP_null_count].
1267 u_cnt += elapsed_pdp_st;
1269 scratch[CDP_last_null_count].u_cnt +=
1273 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1275 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1279 /* do not count missed bulk values as failures */
1280 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1282 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1284 /* need to reset violations buffer.
1285 * could do this more carefully, but for now, just
1286 * assume a bulk update wipes away all violations. */
1287 erase_violations(&rrd, iii, i);
1291 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1293 if (rrd_test_error())
1296 } /* endif data sources loop */
1297 } /* end RRA Loop */
1299 /* this loop is only entered if elapsed_pdp_st < 3 */
1300 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1301 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1302 for (i = 0, rra_start = rra_begin;
1303 i < rrd.stat_head->rra_cnt;
1305 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1306 sizeof(rrd_value_t), i++) {
1307 if (rrd.rra_def[i].pdp_cnt > 1)
1310 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1311 if (current_cf == CF_SEASONAL
1312 || current_cf == CF_DEVSEASONAL) {
1313 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1314 elapsed_pdp_st + (scratch_idx ==
1318 rra_current = rrd_tell(rrd_file);
1320 if (rrd_test_error())
1322 /* loop over data soures within each RRA */
1323 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1324 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1325 i * (rrd.stat_head->ds_cnt) + ii,
1326 i, ii, scratch_idx, seasonal_coef);
1328 } /* end RRA Loop */
1329 if (rrd_test_error())
1331 } /* end elapsed_pdp_st loop */
1333 if (rrd_test_error())
1336 /* Ready to write to disk */
1337 /* Move sequentially through the file, writing one RRA at a time.
1338 * Note this architecture divorces the computation of CDP with
1339 * flushing updated RRA entries to disk. */
1340 for (i = 0, rra_start = rra_begin;
1341 i < rrd.stat_head->rra_cnt;
1343 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1344 sizeof(rrd_value_t), i++) {
1345 /* is th5Aere anything to write for this RRA? If not, continue. */
1346 if (rra_step_cnt[i] == 0)
1349 /* write the first row */
1351 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1353 rrd.rra_ptr[i].cur_row++;
1354 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1355 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1356 /* positition on the first row */
1357 rra_pos_tmp = rra_start +
1358 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1359 sizeof(rrd_value_t);
1360 if (rra_pos_tmp != rra_current) {
1362 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1363 rrd_set_error("seek error in rrd");
1367 rra_current = rra_pos_tmp;
1370 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1372 scratch_idx = CDP_primary_val;
1373 if (pcdp_summary != NULL) {
1374 rra_time = (current_time - current_time
1375 % (rrd.rra_def[i].pdp_cnt *
1376 rrd.stat_head->pdp_step))
1379 1) * rrd.rra_def[i].pdp_cnt *
1380 rrd.stat_head->pdp_step);
1384 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1385 rrd_file->fd, pcdp_summary, &rra_time,
1386 rrd_file->file_start);
1389 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1390 rrd_file->fd, pcdp_summary, &rra_time);
1392 if (rrd_test_error())
1395 /* write other rows of the bulk update, if any */
1396 scratch_idx = CDP_secondary_val;
1397 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1398 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1401 "Wraparound for RRA %s, %lu updates left\n",
1402 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1405 rrd.rra_ptr[i].cur_row = 0;
1406 /* seek back to beginning of current rra */
1407 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1408 rrd_set_error("seek error in rrd");
1412 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1415 rra_current = rra_start;
1417 if (pcdp_summary != NULL) {
1418 rra_time = (current_time - current_time
1419 % (rrd.rra_def[i].pdp_cnt *
1420 rrd.stat_head->pdp_step))
1423 2) * rrd.rra_def[i].pdp_cnt *
1424 rrd.stat_head->pdp_step);
1428 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1429 rrd_file->fd, pcdp_summary, &rra_time,
1430 rrd_file->file_start);
1433 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1434 rrd_file->fd, pcdp_summary, &rra_time);
1438 if (rrd_test_error())
1442 /* break out of the argument parsing loop if error_string is set */
1443 if (rrd_test_error()) {
1448 } /* endif a pdp_st has occurred */
1449 rrd.live_head->last_up = current_time;
1450 rrd.live_head->last_up_usec = current_time_usec;
1452 } /* function argument loop */
1454 if (seasonal_coef != NULL)
1455 free(seasonal_coef);
1456 if (last_seasonal_coef != NULL)
1457 free(last_seasonal_coef);
1458 if (rra_step_cnt != NULL)
1460 rpnstack_free(&rpnstack);
1463 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1464 rrd_set_error("error writing(unmapping) file: %s", filename);
1467 /* if we got here and if there is an error and if the file has not been
1468 * written to, then close things up and return. */
1469 if (rrd_test_error()) {
1475 close(rrd_file->fd);
1479 /* aargh ... that was tough ... so many loops ... anyway, its done.
1480 * we just need to write back the live header portion now*/
1482 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1483 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1484 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1486 rrd_set_error("seek rrd for live header writeback");
1492 close(rrd_file->fd);
1497 if (rrd_write(rrd_file, rrd.live_head,
1498 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1499 rrd_set_error("rrd_write live_head to rrd");
1505 close(rrd_file->fd);
1509 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1510 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1511 rrd_set_error("rrd_write live_head to rrd");
1517 close(rrd_file->fd);
1523 if (rrd_write(rrd_file, rrd.pdp_prep,
1524 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1525 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1526 rrd_set_error("rrd_write pdp_prep to rrd");
1532 close(rrd_file->fd);
1536 if (rrd_write(rrd_file, rrd.cdp_prep,
1537 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1538 rrd.stat_head->ds_cnt)
1539 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1540 rrd.stat_head->ds_cnt)) {
1542 rrd_set_error("rrd_write cdp_prep to rrd");
1548 close(rrd_file->fd);
1552 if (rrd_write(rrd_file, rrd.rra_ptr,
1553 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1554 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1555 rrd_set_error("rrd_write rra_ptr to rrd");
1561 close(rrd_file->fd);
1564 #ifdef HAVE_POSIX_FADVISExxx
1566 /* with update we have write ops, so they will probably not be done by now, this means
1567 the buffers will not get freed. But calling this for the whole file - header
1568 will let the data off the hook as soon as it is written when if it is from a previous
1569 update cycle. Calling fdsync to force things is much too hard here. */
1571 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1572 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1573 rrd_strerror(errno));
1574 close(rrd_file->fd);
1578 /*XXX: ? */ rrd_flush(rrd_file);
1580 /* calling the smoothing code here guarantees at most
1581 * one smoothing operation per rrd_update call. Unfortunately,
1582 * it is possible with bulk updates, or a long-delayed update
1583 * for smoothing to occur off-schedule. This really isn't
1584 * critical except during the burning cycles. */
1585 if (schedule_smooth) {
1586 // in_file = fopen(filename,"rb+");
1589 rra_start = rra_begin;
1590 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1591 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1592 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1594 fprintf(stderr, "Running smoother for rra %ld\n", i);
1596 apply_smoother(&rrd, i, rra_start, rrd_file);
1597 if (rrd_test_error())
1600 rra_start += rrd.rra_def[i].row_cnt
1601 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1603 #ifdef HAVE_POSIX_FADVISExxx
1604 /* same procedure as above ... */
1606 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1607 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1608 rrd_strerror(errno));
1609 close(rrd_file->fd);
1613 close(rrd_file->fd);
1616 /* OK now close the files and free the memory */
1617 if (close(rrd_file->fd) != 0) {
1618 rrd_set_error("closing rrd");
1636 * get exclusive lock to whole file.
1637 * lock gets removed when we close the file
1639 * returns 0 on success
1647 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1650 if (_fstat(in_file, &st) == 0) {
1651 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1658 lock.l_type = F_WRLCK; /* exclusive write lock */
1659 lock.l_len = 0; /* whole file */
1660 lock.l_start = 0; /* start of file */
1661 lock.l_whence = SEEK_SET; /* end of file */
1663 rcstat = fcntl(in_file, F_SETLK, &lock);
1683 unsigned long rra_idx,
1684 unsigned long *rra_current,
1685 unsigned short CDP_scratch_idx,
1687 int UNUSED(in_file),
1691 info_t *pcdp_summary,
1693 void *rrd_mmaped_file)
1706 unsigned long rra_idx,
1707 unsigned long *rra_current,
1708 unsigned short CDP_scratch_idx,
1710 info_t *pcdp_summary,
1714 unsigned long ds_idx, cdp_idx;
1717 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1718 /* compute the cdp index */
1719 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1721 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1722 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1723 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1725 if (pcdp_summary != NULL) {
1726 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1727 /* append info to the return hash */
1728 pcdp_summary = info_push(pcdp_summary,
1729 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1731 rrd->rra_def[rra_idx].
1733 rrd->rra_def[rra_idx].
1735 rrd->ds_def[ds_idx].
1736 ds_nam), RD_I_VAL, iv);
1739 memcpy((char *) rrd_mmaped_file + *rra_current,
1740 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1741 sizeof(rrd_value_t));
1745 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1746 sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
1747 rrd_set_error("writing rrd");
1751 *rra_current += sizeof(rrd_value_t);
1753 return (pcdp_summary);