1 /*****************************************************************************
2 * RRDtool 1.2.15 Copyright by Tobi Oetiker, 1997-2006
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
7 *****************************************************************************/
10 #include <sys/types.h>
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 #include <sys/timeb.h>
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
61 * normilize time as returned by gettimeofday. usec part must
64 static void normalize_time(struct timeval *t)
68 t->tv_usec += 1000000L;
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
79 FILE UNUSED(*rrd_file),
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv,
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 info_t *rrd_update_v(int argc, char **argv)
100 info_t *result = NULL;
103 optind = 0; opterr = 0; /* initialize getopt */
106 static struct option long_options[] =
108 {"template", required_argument, 0, 't'},
111 int option_index = 0;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
130 /* need at least 2 arguments: filename, data. */
131 if (argc-optind < 2) {
132 rrd_set_error("Not enough arguments");
136 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137 rc.u_int = _rrd_update(argv[optind], tmplt,
138 argc - optind - 1, argv + optind + 1, result);
139 result->value.u_int = rc.u_int;
145 rrd_update(int argc, char **argv)
149 optind = 0; opterr = 0; /* initialize getopt */
152 static struct option long_options[] =
154 {"template", required_argument, 0, 't'},
157 int option_index = 0;
159 opt = getopt_long(argc, argv, "t:",
160 long_options, &option_index);
171 rrd_set_error("unknown option '%s'",argv[optind-1]);
176 /* need at least 2 arguments: filename, data. */
177 if (argc-optind < 2) {
178 rrd_set_error("Not enough arguments");
183 rc = rrd_update_r(argv[optind], tmplt,
184 argc - optind - 1, argv + optind + 1);
189 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
191 return _rrd_update(filename, tmplt, argc, argv, NULL);
195 _rrd_update(char *filename, char *tmplt, int argc, char **argv,
196 info_t *pcdp_summary)
201 unsigned long i,ii,iii=1;
203 unsigned long rra_begin; /* byte pointer to the rra
204 * area in the rrd file. this
205 * pointer never changes value */
206 unsigned long rra_start; /* byte pointer to the rra
207 * area in the rrd file. this
208 * pointer changes as each rrd is
210 unsigned long rra_current; /* byte pointer to the current write
211 * spot in the rrd file. */
212 unsigned long rra_pos_tmp; /* temporary byte pointer. */
214 pre_int,post_int; /* interval between this and
216 unsigned long proc_pdp_st; /* which pdp_st was the last
218 unsigned long occu_pdp_st; /* when was the pdp_st
219 * before the last update
221 unsigned long proc_pdp_age; /* how old was the data in
222 * the pdp prep area when it
223 * was last updated */
224 unsigned long occu_pdp_age; /* how long ago was the last
226 rrd_value_t *pdp_new; /* prepare the incoming data
227 * to be added the the
229 rrd_value_t *pdp_temp; /* prepare the pdp values
230 * to be added the the
233 long *tmpl_idx; /* index representing the settings
234 transported by the tmplt index */
235 unsigned long tmpl_cnt = 2; /* time and data */
239 time_t current_time = 0;
240 time_t rra_time = 0; /* time of update for a RRA */
241 unsigned long current_time_usec=0;/* microseconds part of current time */
242 struct timeval tmp_time; /* used for time conversion */
245 int schedule_smooth = 0;
246 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247 /* a vector of future Holt-Winters seasonal coefs */
248 unsigned long elapsed_pdp_st;
249 /* number of elapsed PDP steps since last update */
250 unsigned long *rra_step_cnt = NULL;
251 /* number of rows to be updated in an RRA for a data
253 unsigned long start_pdp_offset;
254 /* number of PDP steps since the last update that
255 * are assigned to the first CDP to be generated
256 * since the last update. */
257 unsigned short scratch_idx;
258 /* index into the CDP scratch array */
259 enum cf_en current_cf;
260 /* numeric id of the current consolidation function */
261 rpnstack_t rpnstack; /* used for COMPUTE DS */
262 int version; /* rrd version */
263 char *endptr; /* used in the conversion */
265 void *rrd_mmaped_file;
266 unsigned long rrd_filesize;
270 rpnstack_init(&rpnstack);
272 /* need at least 1 arguments: data. */
274 rrd_set_error("Not enough arguments");
280 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
283 /* initialize time */
284 version = atoi(rrd.stat_head->version);
285 gettimeofday(&tmp_time, 0);
286 normalize_time(&tmp_time);
287 current_time = tmp_time.tv_sec;
289 current_time_usec = tmp_time.tv_usec;
292 current_time_usec = 0;
295 rra_current = rra_start = rra_begin = ftell(rrd_file);
296 /* This is defined in the ANSI C standard, section 7.9.5.3:
298 When a file is opened with udpate mode ('+' as the second
299 or third character in the ... list of mode argument
300 variables), both input and ouptut may be performed on the
301 associated stream. However, ... input may not be directly
302 followed by output without an intervening call to a file
303 positioning function, unless the input oepration encounters
306 fseek(rrd_file, 0, SEEK_END);
307 rrd_filesize = ftell(rrd_file);
308 fseek(rrd_file, rra_current, SEEK_SET);
310 fseek(rrd_file, 0, SEEK_CUR);
314 /* get exclusive lock to whole file.
315 * lock gets removed when we close the file.
317 if (LockRRD(rrd_file) != 0) {
318 rrd_set_error("could not lock RRD");
324 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
325 rrd_set_error("allocating updvals pointer array");
331 if ((pdp_temp = malloc(sizeof(rrd_value_t)
332 *rrd.stat_head->ds_cnt))==NULL){
333 rrd_set_error("allocating pdp_temp ...");
340 if ((tmpl_idx = malloc(sizeof(unsigned long)
341 *(rrd.stat_head->ds_cnt+1)))==NULL){
342 rrd_set_error("allocating tmpl_idx ...");
349 /* initialize tmplt redirector */
350 /* default config example (assume DS 1 is a CDEF DS)
351 tmpl_idx[0] -> 0; (time)
352 tmpl_idx[1] -> 1; (DS 0)
353 tmpl_idx[2] -> 3; (DS 2)
354 tmpl_idx[3] -> 4; (DS 3) */
355 tmpl_idx[0] = 0; /* time */
356 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
358 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
364 /* we should work on a writeable copy here */
366 unsigned int tmpl_len;
367 tmplt = strdup(tmplt);
369 tmpl_cnt = 1; /* the first entry is the time */
370 tmpl_len = strlen(tmplt);
371 for(i=0;i<=tmpl_len ;i++) {
372 if (tmplt[i] == ':' || tmplt[i] == '\0') {
374 if (tmpl_cnt>rrd.stat_head->ds_cnt){
375 rrd_set_error("tmplt contains more DS definitions than RRD");
376 free(updvals); free(pdp_temp);
377 free(tmpl_idx); rrd_free(&rrd);
378 fclose(rrd_file); return(-1);
380 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
381 rrd_set_error("unknown DS name '%s'",dsname);
382 free(updvals); free(pdp_temp);
384 free(tmpl_idx); rrd_free(&rrd);
385 fclose(rrd_file); return(-1);
387 /* the first element is always the time */
388 tmpl_idx[tmpl_cnt-1]++;
389 /* go to the next entry on the tmplt */
390 dsname = &tmplt[i+1];
391 /* fix the damage we did before */
401 if ((pdp_new = malloc(sizeof(rrd_value_t)
402 *rrd.stat_head->ds_cnt))==NULL){
403 rrd_set_error("allocating pdp_new ...");
413 rrd_mmaped_file = mmap(0,
415 PROT_READ | PROT_WRITE,
419 if (rrd_mmaped_file == MAP_FAILED) {
420 rrd_set_error("error mmapping file %s", filename);
429 /* loop through the arguments. */
430 for(arg_i=0; arg_i<argc;arg_i++) {
431 char *stepper = strdup(argv[arg_i]);
432 char *step_start = stepper;
434 char *parsetime_error = NULL;
435 enum {atstyle, normal} timesyntax;
436 struct rrd_time_value ds_tv;
437 if (stepper == NULL){
438 rrd_set_error("failed duplication argv entry");
445 munmap(rrd_mmaped_file, rrd_filesize);
450 /* initialize all ds input to unknown except the first one
451 which has always got to be set */
452 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
454 /* separate all ds elements; first must be examined separately
455 due to alternate time syntax */
456 if ((p=strchr(stepper,'@'))!=NULL) {
457 timesyntax = atstyle;
460 } else if ((p=strchr(stepper,':'))!=NULL) {
465 rrd_set_error("expected timestamp not found in data source from %s",
471 updvals[tmpl_idx[ii]] = stepper;
473 if (*stepper == ':') {
477 updvals[tmpl_idx[ii]] = stepper+1;
483 if (ii != tmpl_cnt-1) {
484 rrd_set_error("expected %lu data source readings (got %lu) from %s",
485 tmpl_cnt-1, ii, argv[arg_i]);
490 /* get the time from the reading ... handle N */
491 if (timesyntax == atstyle) {
492 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
493 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
497 if (ds_tv.type == RELATIVE_TO_END_TIME ||
498 ds_tv.type == RELATIVE_TO_START_TIME) {
499 rrd_set_error("specifying time relative to the 'start' "
500 "or 'end' makes no sense here: %s",
506 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
507 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
509 } else if (strcmp(updvals[0],"N")==0){
510 gettimeofday(&tmp_time, 0);
511 normalize_time(&tmp_time);
512 current_time = tmp_time.tv_sec;
513 current_time_usec = tmp_time.tv_usec;
516 tmp = strtod(updvals[0], 0);
517 current_time = floor(tmp);
518 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
520 /* dont do any correction for old version RRDs */
522 current_time_usec = 0;
524 if(current_time < rrd.live_head->last_up ||
525 (current_time == rrd.live_head->last_up &&
526 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
527 rrd_set_error("illegal attempt to update using time %ld when "
528 "last update time is %ld (minimum one second step)",
529 current_time, rrd.live_head->last_up);
535 /* seek to the beginning of the rra's */
536 if (rra_current != rra_begin) {
538 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
539 rrd_set_error("seek error in rrd");
544 rra_current = rra_begin;
546 rra_start = rra_begin;
548 /* when was the current pdp started */
549 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
550 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
552 /* when did the last pdp_st occur */
553 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
554 occu_pdp_st = current_time - occu_pdp_age;
556 /* interval = current_time - rrd.live_head->last_up; */
557 interval = (double)(current_time - rrd.live_head->last_up)
558 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
560 if (occu_pdp_st > proc_pdp_st){
561 /* OK we passed the pdp_st moment*/
562 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
563 * occurred before the latest
565 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
566 post_int = occu_pdp_age; /* how much after it */
567 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
581 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
582 occu_pdp_age, occu_pdp_st,
583 interval, pre_int, post_int);
586 /* process the data sources and update the pdp_prep
587 * area accordingly */
588 for(i=0;i<rrd.stat_head->ds_cnt;i++){
590 dst_idx= dst_conv(rrd.ds_def[i].dst);
592 /* make sure we do not build diffs with old last_ds values */
593 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
594 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
595 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
596 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
599 /* NOTE: DST_CDEF should never enter this if block, because
600 * updvals[i+1][0] is initialized to 'U'; unless the caller
601 * accidently specified a value for the DST_CDEF. To handle
602 * this case, an extra check is required. */
604 if((updvals[i+1][0] != 'U') &&
605 (dst_idx != DST_CDEF) &&
606 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
608 /* the data source type defines how to process the data */
609 /* pdp_new contains rate * time ... eg the bytes
610 * transferred during the interval. Doing it this way saves
611 * a lot of math operations */
617 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
618 for(ii=0;updvals[i+1][ii] != '\0';ii++){
619 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
620 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
624 if (rrd_test_error()){
627 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
628 if(dst_idx == DST_COUNTER) {
629 /* simple overflow catcher suggested by Andres Kroonmaa */
630 /* this will fail terribly for non 32 or 64 bit counters ... */
631 /* are there any others in SNMP land ? */
632 if (pdp_new[i] < (double)0.0 )
633 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
634 if (pdp_new[i] < (double)0.0 )
635 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
637 rate = pdp_new[i] / interval;
645 pdp_new[i] = strtod(updvals[i+1],&endptr);
647 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
650 if (endptr[0] != '\0'){
651 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
654 rate = pdp_new[i] / interval;
658 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
660 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
663 if (endptr[0] != '\0'){
664 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
667 rate = pdp_new[i] / interval;
670 rrd_set_error("rrd contains unknown DS type : '%s'",
674 /* break out of this for loop if the error string is set */
675 if (rrd_test_error()){
678 /* make sure pdp_temp is neither too large or too small
679 * if any of these occur it becomes unknown ...
681 if ( ! isnan(rate) &&
682 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
683 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
684 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
685 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
689 /* no news is news all the same */
694 /* make a copy of the command line argument for the next run */
702 rrd.pdp_prep[i].last_ds,
703 updvals[i+1], pdp_new[i]);
705 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
706 strncpy(rrd.pdp_prep[i].last_ds,
707 updvals[i+1],LAST_DS_LEN-1);
708 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
711 /* break out of the argument parsing loop if the error_string is set */
712 if (rrd_test_error()){
716 /* has a pdp_st moment occurred since the last run ? */
718 if (proc_pdp_st == occu_pdp_st){
719 /* no we have not passed a pdp_st moment. therefore update is simple */
721 for(i=0;i<rrd.stat_head->ds_cnt;i++){
722 if(isnan(pdp_new[i])) {
723 /* this is not realy accurate if we use subsecond data arival time
724 should have thought of it when going subsecond resolution ...
725 sorry next format change we will have it! */
726 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
728 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
729 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
731 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
740 rrd.pdp_prep[i].scratch[PDP_val].u_val,
741 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
745 /* an pdp_st has occurred. */
747 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
748 * occurred up to the last run.
749 pdp_new[] contains rate*seconds from the latest run.
750 pdp_temp[] will contain the rate for cdp */
752 for(i=0;i<rrd.stat_head->ds_cnt;i++){
753 /* update pdp_prep to the current pdp_st. */
754 double pre_unknown = 0.0;
755 if(isnan(pdp_new[i]))
756 /* a final bit of unkonwn to be added bevore calculation
757 * we use a tempaorary variable for this so that we
758 * don't have to turn integer lines before using the value */
759 pre_unknown = pre_int;
761 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
762 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
764 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
769 /* if too much of the pdp_prep is unknown we dump it */
771 /* removed because this does not agree with the definition
772 a heart beat can be unknown */
773 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775 /* if the interval is larger thatn mrhb we get NAN */
776 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777 (occu_pdp_st-proc_pdp_st <=
778 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
781 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782 / ((double)(occu_pdp_st - proc_pdp_st
783 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
787 /* process CDEF data sources; remember each CDEF DS can
788 * only reference other DS with a lower index number */
789 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
791 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
792 /* substitue data values for OP_VARIABLE nodes */
793 for (ii = 0; rpnp[ii].op != OP_END; ii++)
795 if (rpnp[ii].op == OP_VARIABLE) {
796 rpnp[ii].op = OP_NUMBER;
797 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
800 /* run the rpn calculator */
801 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
803 break; /* exits the data sources pdp_temp loop */
807 /* make pdp_prep ready for the next run */
808 if(isnan(pdp_new[i])){
809 /* this is not realy accurate if we use subsecond data arival time
810 should have thought of it when going subsecond resolution ...
811 sorry next format change we will have it! */
812 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
813 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
815 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
816 rrd.pdp_prep[i].scratch[PDP_val].u_val =
817 pdp_new[i]/interval*post_int;
825 "new_unkn_sec %5lu\n",
827 rrd.pdp_prep[i].scratch[PDP_val].u_val,
828 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
832 /* if there were errors during the last loop, bail out here */
833 if (rrd_test_error()){
838 /* compute the number of elapsed pdp_st moments */
839 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
841 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
843 if (rra_step_cnt == NULL)
845 rra_step_cnt = (unsigned long *)
846 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
849 for(i = 0, rra_start = rra_begin;
850 i < rrd.stat_head->rra_cnt;
851 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
854 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
855 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
856 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
857 if (start_pdp_offset <= elapsed_pdp_st) {
858 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
859 rrd.rra_def[i].pdp_cnt + 1;
864 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
866 /* If this is a bulk update, we need to skip ahead in the seasonal
867 * arrays so that they will be correct for the next observed value;
868 * note that for the bulk update itself, no update will occur to
869 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
871 if (rra_step_cnt[i] > 2)
873 /* skip update by resetting rra_step_cnt[i],
874 * note that this is not data source specific; this is due
875 * to the bulk update, not a DNAN value for the specific data
878 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
879 &last_seasonal_coef);
880 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
884 /* periodically run a smoother for seasonal effects */
885 /* Need to use first cdp parameter buffer to track
886 * burnin (burnin requires a specific smoothing schedule).
887 * The CDP_init_seasonal parameter is really an RRA level,
888 * not a data source within RRA level parameter, but the rra_def
889 * is read only for rrd_update (not flushed to disk). */
890 iii = i*(rrd.stat_head -> ds_cnt);
891 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
894 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
895 > rrd.rra_def[i].row_cnt - 1) {
896 /* mark off one of the burnin cycles */
897 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
901 /* someone has no doubt invented a trick to deal with this
902 * wrap around, but at least this code is clear. */
903 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
904 rrd.rra_ptr[i].cur_row)
906 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
907 * mapping between PDP and CDP */
908 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
909 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
913 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
914 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
915 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
920 /* can't rely on negative numbers because we are working with
922 /* Don't need modulus here. If we've wrapped more than once, only
923 * one smooth is executed at the end. */
924 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
925 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
926 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
930 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
931 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
932 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
939 rra_current = ftell(rrd_file);
940 } /* if cf is DEVSEASONAL or SEASONAL */
942 if (rrd_test_error()) break;
944 /* update CDP_PREP areas */
945 /* loop over data soures within each RRA */
947 ii < rrd.stat_head->ds_cnt;
951 /* iii indexes the CDP prep area for this data source within the RRA */
952 iii=i*rrd.stat_head->ds_cnt+ii;
954 if (rrd.rra_def[i].pdp_cnt > 1) {
956 if (rra_step_cnt[i] > 0) {
957 /* If we are in this block, as least 1 CDP value will be written to
958 * disk, this is the CDP_primary_val entry. If more than 1 value needs
959 * to be written, then the "fill in" value is the CDP_secondary_val
961 if (isnan(pdp_temp[ii]))
963 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
964 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
966 /* CDP_secondary value is the RRA "fill in" value for intermediary
967 * CDP data entries. No matter the CF, the value is the same because
968 * the average, max, min, and last of a list of identical values is
969 * the same, namely, the value itself. */
970 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
973 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
974 > rrd.rra_def[i].pdp_cnt*
975 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
977 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
978 /* initialize carry over */
979 if (current_cf == CF_AVERAGE) {
980 if (isnan(pdp_temp[ii])) {
981 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
984 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
987 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
990 rrd_value_t cum_val, cur_val;
991 switch (current_cf) {
993 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
994 cur_val = IFDNAN(pdp_temp[ii],0.0);
995 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
996 (cum_val + cur_val * start_pdp_offset) /
997 (rrd.rra_def[i].pdp_cnt
998 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
999 /* initialize carry over value */
1000 if (isnan(pdp_temp[ii])) {
1001 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1003 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1004 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1008 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1009 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1011 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1012 isnan(pdp_temp[ii])) {
1014 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1019 if (cur_val > cum_val)
1020 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1022 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1023 /* initialize carry over value */
1024 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1027 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1028 cur_val = IFDNAN(pdp_temp[ii],DINF);
1030 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1031 isnan(pdp_temp[ii])) {
1033 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1038 if (cur_val < cum_val)
1039 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1041 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1042 /* initialize carry over value */
1043 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1047 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1048 /* initialize carry over value */
1049 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1052 } /* endif meets xff value requirement for a valid value */
1053 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1054 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1055 if (isnan(pdp_temp[ii]))
1056 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1057 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1059 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1060 } else /* rra_step_cnt[i] == 0 */
1063 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1064 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1067 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1068 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1071 if (isnan(pdp_temp[ii])) {
1072 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1073 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1075 if (current_cf == CF_AVERAGE) {
1076 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1079 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1083 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1086 switch (current_cf) {
1088 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1092 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1093 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1097 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1101 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1106 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1107 if (elapsed_pdp_st > 2)
1109 switch (current_cf) {
1112 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1113 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1116 case CF_DEVSEASONAL:
1117 /* need to update cached seasonal values, so they are consistent
1118 * with the bulk update */
1119 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1120 * CDP_last_deviation are the same. */
1121 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1122 last_seasonal_coef[ii];
1123 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1127 /* need to update the null_count and last_null_count.
1128 * even do this for non-DNAN pdp_temp because the
1129 * algorithm is not learning from batch updates. */
1130 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1132 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1136 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1137 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1140 /* do not count missed bulk values as failures */
1141 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1142 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1143 /* need to reset violations buffer.
1144 * could do this more carefully, but for now, just
1145 * assume a bulk update wipes away all violations. */
1146 erase_violations(&rrd, iii, i);
1150 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1152 if (rrd_test_error()) break;
1154 } /* endif data sources loop */
1155 } /* end RRA Loop */
1157 /* this loop is only entered if elapsed_pdp_st < 3 */
1158 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1159 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1161 for(i = 0, rra_start = rra_begin;
1162 i < rrd.stat_head->rra_cnt;
1163 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1166 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1168 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1169 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1171 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1172 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1174 rra_current = ftell(rrd_file);
1176 if (rrd_test_error()) break;
1177 /* loop over data soures within each RRA */
1179 ii < rrd.stat_head->ds_cnt;
1182 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1183 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1184 scratch_idx, seasonal_coef);
1186 } /* end RRA Loop */
1187 if (rrd_test_error()) break;
1188 } /* end elapsed_pdp_st loop */
1190 if (rrd_test_error()) break;
1192 /* Ready to write to disk */
1193 /* Move sequentially through the file, writing one RRA at a time.
1194 * Note this architecture divorces the computation of CDP with
1195 * flushing updated RRA entries to disk. */
1196 for(i = 0, rra_start = rra_begin;
1197 i < rrd.stat_head->rra_cnt;
1198 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1200 /* is there anything to write for this RRA? If not, continue. */
1201 if (rra_step_cnt[i] == 0) continue;
1203 /* write the first row */
1205 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1207 rrd.rra_ptr[i].cur_row++;
1208 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1209 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1210 /* positition on the first row */
1211 rra_pos_tmp = rra_start +
1212 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1213 if(rra_pos_tmp != rra_current) {
1215 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1216 rrd_set_error("seek error in rrd");
1220 rra_current = rra_pos_tmp;
1224 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1226 scratch_idx = CDP_primary_val;
1227 if (pcdp_summary != NULL)
1229 rra_time = (current_time - current_time
1230 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1231 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1234 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1235 pcdp_summary, &rra_time, rrd_mmaped_file);
1237 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1238 pcdp_summary, &rra_time);
1240 if (rrd_test_error()) break;
1242 /* write other rows of the bulk update, if any */
1243 scratch_idx = CDP_secondary_val;
1244 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1246 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1249 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1250 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1253 rrd.rra_ptr[i].cur_row = 0;
1254 /* seek back to beginning of current rra */
1255 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1257 rrd_set_error("seek error in rrd");
1261 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1263 rra_current = rra_start;
1265 if (pcdp_summary != NULL)
1267 rra_time = (current_time - current_time
1268 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1269 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1272 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1273 pcdp_summary, &rra_time, rrd_mmaped_file);
1275 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1276 pcdp_summary, &rra_time);
1280 if (rrd_test_error())
1284 /* break out of the argument parsing loop if error_string is set */
1285 if (rrd_test_error()){
1290 } /* endif a pdp_st has occurred */
1291 rrd.live_head->last_up = current_time;
1292 rrd.live_head->last_up_usec = current_time_usec;
1294 } /* function argument loop */
1296 if (seasonal_coef != NULL) free(seasonal_coef);
1297 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1298 if (rra_step_cnt != NULL) free(rra_step_cnt);
1299 rpnstack_free(&rpnstack);
1302 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1303 rrd_set_error("error writing(unmapping) file: %s", filename);
1306 /* if we got here and if there is an error and if the file has not been
1307 * written to, then close things up and return. */
1308 if (rrd_test_error()) {
1318 /* aargh ... that was tough ... so many loops ... anyway, its done.
1319 * we just need to write back the live header portion now*/
1321 if (fseek(rrd_file, (sizeof(stat_head_t)
1322 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1323 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1325 rrd_set_error("seek rrd for live header writeback");
1336 if(fwrite( rrd.live_head,
1337 sizeof(live_head_t), 1, rrd_file) != 1){
1338 rrd_set_error("fwrite live_head to rrd");
1349 if(fwrite( &rrd.live_head->last_up,
1350 sizeof(time_t), 1, rrd_file) != 1){
1351 rrd_set_error("fwrite live_head to rrd");
1363 if(fwrite( rrd.pdp_prep,
1365 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1366 rrd_set_error("ftwrite pdp_prep to rrd");
1376 if(fwrite( rrd.cdp_prep,
1378 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1379 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1381 rrd_set_error("ftwrite cdp_prep to rrd");
1391 if(fwrite( rrd.rra_ptr,
1393 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1394 rrd_set_error("fwrite rra_ptr to rrd");
1404 /* OK now close the files and free the memory */
1405 if(fclose(rrd_file) != 0){
1406 rrd_set_error("closing rrd");
1415 /* calling the smoothing code here guarantees at most
1416 * one smoothing operation per rrd_update call. Unfortunately,
1417 * it is possible with bulk updates, or a long-delayed update
1418 * for smoothing to occur off-schedule. This really isn't
1419 * critical except during the burning cycles. */
1420 if (schedule_smooth)
1422 rrd_file = fopen(filename,"rb+");
1423 rra_start = rra_begin;
1424 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1426 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1427 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1430 fprintf(stderr,"Running smoother for rra %ld\n",i);
1432 apply_smoother(&rrd,i,rra_start,rrd_file);
1433 if (rrd_test_error())
1436 rra_start += rrd.rra_def[i].row_cnt
1437 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1450 * get exclusive lock to whole file.
1451 * lock gets removed when we close the file
1453 * returns 0 on success
1456 LockRRD(FILE *rrdfile)
1458 int rrd_fd; /* File descriptor for RRD */
1461 rrd_fd = fileno(rrdfile);
1464 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1467 if ( _fstat( rrd_fd, &st ) == 0 ) {
1468 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1474 lock.l_type = F_WRLCK; /* exclusive write lock */
1475 lock.l_len = 0; /* whole file */
1476 lock.l_start = 0; /* start of file */
1477 lock.l_whence = SEEK_SET; /* end of file */
1479 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1489 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1490 unsigned short CDP_scratch_idx,
1492 FILE UNUSED(*rrd_file),
1496 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1499 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1500 unsigned short CDP_scratch_idx, FILE *rrd_file,
1501 info_t *pcdp_summary, time_t *rra_time)
1504 unsigned long ds_idx, cdp_idx;
1507 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1509 /* compute the cdp index */
1510 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1512 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1513 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1514 rrd -> rra_def[rra_idx].cf_nam);
1516 if (pcdp_summary != NULL)
1518 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1519 /* append info to the return hash */
1520 pcdp_summary = info_push(pcdp_summary,
1521 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1522 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1523 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1527 memcpy((char *)rrd_mmaped_file + *rra_current,
1528 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1529 sizeof(rrd_value_t));
1531 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1532 sizeof(rrd_value_t),1,rrd_file) != 1)
1534 rrd_set_error("writing rrd");
1538 *rra_current += sizeof(rrd_value_t);
1540 return (pcdp_summary);