1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
8 * Revision 1.8 2003/03/31 21:22:12 oetiker
9 * enables RRDtool updates with microsecond or in case of windows millisecond
10 * precision. This is needed to reduce time measurement error when archive step
11 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
13 * Revision 1.7 2003/02/13 07:05:27 oetiker
14 * Find attached the patch I promised to send to you. Please note that there
15 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
16 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
17 * library is identical to librrd, but it contains support code for per-thread
18 * global variables currently used for error information only. This is similar
19 * to how errno per-thread variables are implemented. librrd_th must be linked
20 * alongside of libpthred
22 * There is also a new file "THREADS", holding some documentation.
24 * -- Peter Stamfest <peter@stamfest.at>
26 * Revision 1.6 2002/02/01 20:34:49 oetiker
27 * fixed version number and date/time
29 * Revision 1.5 2001/05/09 05:31:01 oetiker
30 * Bug fix: when update of multiple PDP/CDP RRAs coincided
31 * with interpolation of multiple PDPs an incorrect value was
32 * stored as the CDP. Especially evident for GAUGE data sources.
33 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
35 * Revision 1.4 2001/03/10 23:54:41 oetiker
36 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
37 * parser and calculator from rrd_graph and puts then in a new file,
38 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
39 * clean-up of aberrant behavior stuff, including a bug fix.
40 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
41 * -- Jake Brutlag <jakeb@corp.webtv.net>
43 * Revision 1.3 2001/03/04 13:01:55 oetiker
44 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
45 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
46 * This is backwards compatible! But new files using the Aberrant stuff are not readable
47 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
48 * -- Jake Brutlag <jakeb@corp.webtv.net>
50 * Revision 1.2 2001/03/04 11:14:25 oetiker
51 * added at-style-time@value:value syntax to rrd_update
52 * -- Dave Bodenstab <imdave@mcs.net>
54 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
57 *****************************************************************************/
60 #include <sys/types.h>
64 #include <sys/locking.h>
70 #include "rrd_rpncalc.h"
72 #include "rrd_is_thread_safe.h"
76 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
79 #include <sys/timeb.h>
82 time_t tv_sec; /* seconds */
83 long tv_usec; /* microseconds */
87 int tz_minuteswest; /* minutes W of Greenwich */
88 int tz_dsttime; /* type of dst correction */
91 static gettimeofday(struct timeval *t, struct __timezone *tz) {
93 struct timeb current_time;
95 _ftime(¤t_time);
97 t->tv_sec = current_time.time;
98 t->tv_usec = current_time.millitm * 1000;
103 * normilize time as returned by gettimeofday. usec part must
106 static void normalize_time(struct timeval *t)
110 t->tv_usec += 1000000L;
114 /* Local prototypes */
115 int LockRRD(FILE *rrd_file);
116 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
117 unsigned long *rra_current,
118 unsigned short CDP_scratch_idx, FILE *rrd_file);
119 int rrd_update_r(char *filename, char *template, int argc, char **argv);
121 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
126 main(int argc, char **argv){
127 rrd_update(argc,argv);
128 if (rrd_test_error()) {
129 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
130 "Usage: rrdupdate filename\n"
131 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
132 "\t\t\ttime|N:value[:value...]\n\n"
133 "\t\t\tat-time@value[:value...]\n\n"
134 "\t\t\t[ time:value[:value...] ..]\n\n");
136 printf("ERROR: %s\n",rrd_get_error());
145 rrd_update(int argc, char **argv)
147 char *template = NULL;
151 static struct option long_options[] =
153 {"template", required_argument, 0, 't'},
156 int option_index = 0;
158 opt = getopt_long(argc, argv, "t:",
159 long_options, &option_index);
170 rrd_set_error("unknown option '%s'",argv[optind-1]);
175 /* need at least 2 arguments: filename, data. */
176 if (argc-optind < 2) {
177 rrd_set_error("Not enough arguments");
182 rc = rrd_update_r(argv[optind], template,
183 argc - optind - 1, argv + optind + 1);
188 rrd_update_r(char *filename, char *template, int argc, char **argv)
193 unsigned long i,ii,iii=1;
195 unsigned long rra_begin; /* byte pointer to the rra
196 * area in the rrd file. this
197 * pointer never changes value */
198 unsigned long rra_start; /* byte pointer to the rra
199 * area in the rrd file. this
200 * pointer changes as each rrd is
202 unsigned long rra_current; /* byte pointer to the current write
203 * spot in the rrd file. */
204 unsigned long rra_pos_tmp; /* temporary byte pointer. */
206 pre_int,post_int; /* interval between this and
208 unsigned long proc_pdp_st; /* which pdp_st was the last
210 unsigned long occu_pdp_st; /* when was the pdp_st
211 * before the last update
213 unsigned long proc_pdp_age; /* how old was the data in
214 * the pdp prep area when it
215 * was last updated */
216 unsigned long occu_pdp_age; /* how long ago was the last
218 rrd_value_t *pdp_new; /* prepare the incoming data
219 * to be added the the
221 rrd_value_t *pdp_temp; /* prepare the pdp values
222 * to be added the the
225 long *tmpl_idx; /* index representing the settings
226 transported by the template index */
227 unsigned long tmpl_cnt = 2; /* time and data */
232 unsigned long current_time_usec; /* microseconds part of current time */
233 struct timeval tmp_time; /* used for time conversion */
236 int schedule_smooth = 0;
237 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
238 /* a vector of future Holt-Winters seasonal coefs */
239 unsigned long elapsed_pdp_st;
240 /* number of elapsed PDP steps since last update */
241 unsigned long *rra_step_cnt = NULL;
242 /* number of rows to be updated in an RRA for a data
244 unsigned long start_pdp_offset;
245 /* number of PDP steps since the last update that
246 * are assigned to the first CDP to be generated
247 * since the last update. */
248 unsigned short scratch_idx;
249 /* index into the CDP scratch array */
250 enum cf_en current_cf;
251 /* numeric id of the current consolidation function */
252 rpnstack_t rpnstack; /* used for COMPUTE DS */
253 int version; /* rrd version */
255 rpnstack_init(&rpnstack);
257 /* need at least 1 arguments: data. */
259 rrd_set_error("Not enough arguments");
265 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
268 /* initialize time */
269 version = atoi(rrd.stat_head->version);
270 gettimeofday(&tmp_time, 0);
271 normalize_time(&tmp_time);
272 current_time = tmp_time.tv_sec;
274 current_time_usec = tmp_time.tv_usec;
277 current_time_usec = 0;
280 rra_current = rra_start = rra_begin = ftell(rrd_file);
281 /* This is defined in the ANSI C standard, section 7.9.5.3:
283 When a file is opened with udpate mode ('+' as the second
284 or third character in the ... list of mode argument
285 variables), both input and ouptut may be performed on the
286 associated stream. However, ... input may not be directly
287 followed by output without an intervening call to a file
288 positioning function, unless the input oepration encounters
290 fseek(rrd_file, 0, SEEK_CUR);
293 /* get exclusive lock to whole file.
294 * lock gets removed when we close the file.
296 if (LockRRD(rrd_file) != 0) {
297 rrd_set_error("could not lock RRD");
303 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
304 rrd_set_error("allocating updvals pointer array");
310 if ((pdp_temp = malloc(sizeof(rrd_value_t)
311 *rrd.stat_head->ds_cnt))==NULL){
312 rrd_set_error("allocating pdp_temp ...");
319 if ((tmpl_idx = malloc(sizeof(unsigned long)
320 *(rrd.stat_head->ds_cnt+1)))==NULL){
321 rrd_set_error("allocating tmpl_idx ...");
328 /* initialize template redirector */
329 /* default config example (assume DS 1 is a CDEF DS)
330 tmpl_idx[0] -> 0; (time)
331 tmpl_idx[1] -> 1; (DS 0)
332 tmpl_idx[2] -> 3; (DS 2)
333 tmpl_idx[3] -> 4; (DS 3) */
334 tmpl_idx[0] = 0; /* time */
335 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
337 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
344 unsigned int tmpl_len;
346 tmpl_cnt = 1; /* the first entry is the time */
347 tmpl_len = strlen(template);
348 for(i=0;i<=tmpl_len ;i++) {
349 if (template[i] == ':' || template[i] == '\0') {
351 if (tmpl_cnt>rrd.stat_head->ds_cnt){
352 rrd_set_error("Template contains more DS definitions than RRD");
353 free(updvals); free(pdp_temp);
354 free(tmpl_idx); rrd_free(&rrd);
355 fclose(rrd_file); return(-1);
357 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
358 rrd_set_error("unknown DS name '%s'",dsname);
359 free(updvals); free(pdp_temp);
360 free(tmpl_idx); rrd_free(&rrd);
361 fclose(rrd_file); return(-1);
363 /* the first element is always the time */
364 tmpl_idx[tmpl_cnt-1]++;
365 /* go to the next entry on the template */
366 dsname = &template[i+1];
367 /* fix the damage we did before */
376 if ((pdp_new = malloc(sizeof(rrd_value_t)
377 *rrd.stat_head->ds_cnt))==NULL){
378 rrd_set_error("allocating pdp_new ...");
387 /* loop through the arguments. */
388 for(arg_i=0; arg_i<argc;arg_i++) {
389 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
390 char *step_start = stepper;
392 char *parsetime_error = NULL;
393 enum {atstyle, normal} timesyntax;
394 struct time_value ds_tv;
395 if (stepper == NULL){
396 rrd_set_error("failed duplication argv entry");
404 /* initialize all ds input to unknown except the first one
405 which has always got to be set */
406 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
407 strcpy(stepper,argv[arg_i]);
409 /* separate all ds elements; first must be examined separately
410 due to alternate time syntax */
411 if ((p=strchr(stepper,'@'))!=NULL) {
412 timesyntax = atstyle;
415 } else if ((p=strchr(stepper,':'))!=NULL) {
420 rrd_set_error("expected timestamp not found in data source from %s:...",
426 updvals[tmpl_idx[ii]] = stepper;
428 if (*stepper == ':') {
432 updvals[tmpl_idx[ii]] = stepper+1;
438 if (ii != tmpl_cnt-1) {
439 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
440 tmpl_cnt-1, ii, argv[arg_i]);
445 /* get the time from the reading ... handle N */
446 if (timesyntax == atstyle) {
447 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
448 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
452 if (ds_tv.type == RELATIVE_TO_END_TIME ||
453 ds_tv.type == RELATIVE_TO_START_TIME) {
454 rrd_set_error("specifying time relative to the 'start' "
455 "or 'end' makes no sense here: %s",
461 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
462 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
464 } else if (strcmp(updvals[0],"N")==0){
465 gettimeofday(&tmp_time, 0);
466 normalize_time(&tmp_time);
467 current_time = tmp_time.tv_sec;
468 current_time_usec = tmp_time.tv_usec;
471 tmp = strtod(updvals[0], 0);
472 current_time = floor(tmp);
473 current_time_usec = (long)((tmp - current_time) * 1000000L);
475 /* dont do any correction for old version RRDs */
477 current_time_usec = 0;
479 if(current_time <= rrd.live_head->last_up){
480 rrd_set_error("illegal attempt to update using time %ld when "
481 "last update time is %ld (minimum one second step)",
482 current_time, rrd.live_head->last_up);
488 /* seek to the beginning of the rra's */
489 if (rra_current != rra_begin) {
490 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
491 rrd_set_error("seek error in rrd");
495 rra_current = rra_begin;
497 rra_start = rra_begin;
499 /* when was the current pdp started */
500 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
501 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
503 /* when did the last pdp_st occur */
504 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
505 occu_pdp_st = current_time - occu_pdp_age;
506 /* interval = current_time - rrd.live_head->last_up; */
507 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
509 if (occu_pdp_st > proc_pdp_st){
510 /* OK we passed the pdp_st moment*/
511 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
512 * occurred before the latest
514 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
515 post_int = occu_pdp_age; /* how much after it */
516 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
530 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
531 occu_pdp_age, occu_pdp_st,
532 interval, pre_int, post_int);
535 /* process the data sources and update the pdp_prep
536 * area accordingly */
537 for(i=0;i<rrd.stat_head->ds_cnt;i++){
539 dst_idx= dst_conv(rrd.ds_def[i].dst);
540 /* NOTE: DST_CDEF should never enter this if block, because
541 * updvals[i+1][0] is initialized to 'U'; unless the caller
542 * accidently specified a value for the DST_CDEF. To handle
543 * this case, an extra check is required. */
544 if((updvals[i+1][0] != 'U') &&
545 (dst_idx != DST_CDEF) &&
546 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
548 /* the data source type defines how to process the data */
549 /* pdp_new contains rate * time ... eg the bytes
550 * transferred during the interval. Doing it this way saves
551 * a lot of math operations */
557 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
558 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
559 if(dst_idx == DST_COUNTER) {
560 /* simple overflow catcher sugestet by andres kroonmaa */
561 /* this will fail terribly for non 32 or 64 bit counters ... */
562 /* are there any others in SNMP land ? */
563 if (pdp_new[i] < (double)0.0 )
564 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
565 if (pdp_new[i] < (double)0.0 )
566 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
568 rate = pdp_new[i] / interval;
575 pdp_new[i]= atof(updvals[i+1]);
576 rate = pdp_new[i] / interval;
579 pdp_new[i] = atof(updvals[i+1]) * interval;
580 rate = pdp_new[i] / interval;
583 rrd_set_error("rrd contains unknown DS type : '%s'",
587 /* break out of this for loop if the error string is set */
588 if (rrd_test_error()){
591 /* make sure pdp_temp is neither too large or too small
592 * if any of these occur it becomes unknown ...
594 if ( ! isnan(rate) &&
595 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
596 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
597 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
598 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
602 /* no news is news all the same */
606 /* make a copy of the command line argument for the next run */
614 rrd.pdp_prep[i].last_ds,
615 updvals[i+1], pdp_new[i]);
617 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
618 strncpy(rrd.pdp_prep[i].last_ds,
619 updvals[i+1],LAST_DS_LEN-1);
620 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
623 /* break out of the argument parsing loop if the error_string is set */
624 if (rrd_test_error()){
628 /* has a pdp_st moment occurred since the last run ? */
630 if (proc_pdp_st == occu_pdp_st){
631 /* no we have not passed a pdp_st moment. therefore update is simple */
633 for(i=0;i<rrd.stat_head->ds_cnt;i++){
634 if(isnan(pdp_new[i]))
635 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
637 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
644 rrd.pdp_prep[i].scratch[PDP_val].u_val,
645 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
649 /* an pdp_st has occurred. */
651 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
652 * occurred up to the last run.
653 pdp_new[] contains rate*seconds from the latest run.
654 pdp_temp[] will contain the rate for cdp */
656 for(i=0;i<rrd.stat_head->ds_cnt;i++){
657 /* update pdp_prep to the current pdp_st */
658 if(isnan(pdp_new[i]))
659 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
661 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
662 pdp_new[i]/(double)interval*(double)pre_int;
664 /* if too much of the pdp_prep is unknown we dump it */
665 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
666 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
667 (occu_pdp_st-proc_pdp_st <=
668 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
671 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
672 / (double)( occu_pdp_st
674 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
677 /* process CDEF data sources; remember each CDEF DS can
678 * only reference other DS with a lower index number */
679 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
681 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
682 /* substitue data values for OP_VARIABLE nodes */
683 for (ii = 0; rpnp[ii].op != OP_END; ii++)
685 if (rpnp[ii].op == OP_VARIABLE) {
686 rpnp[ii].op = OP_NUMBER;
687 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
690 /* run the rpn calculator */
691 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
693 break; /* exits the data sources pdp_temp loop */
697 /* make pdp_prep ready for the next run */
698 if(isnan(pdp_new[i])){
699 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
700 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
702 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
703 rrd.pdp_prep[i].scratch[PDP_val].u_val =
704 pdp_new[i]/(double)interval*(double)post_int;
712 "new_unkn_sec %5lu\n",
714 rrd.pdp_prep[i].scratch[PDP_val].u_val,
715 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
719 /* if there were errors during the last loop, bail out here */
720 if (rrd_test_error()){
725 /* compute the number of elapsed pdp_st moments */
726 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
728 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
730 if (rra_step_cnt == NULL)
732 rra_step_cnt = (unsigned long *)
733 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
736 for(i = 0, rra_start = rra_begin;
737 i < rrd.stat_head->rra_cnt;
738 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
741 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
742 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
743 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
744 if (start_pdp_offset <= elapsed_pdp_st) {
745 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
746 rrd.rra_def[i].pdp_cnt + 1;
751 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
753 /* If this is a bulk update, we need to skip ahead in the seasonal
754 * arrays so that they will be correct for the next observed value;
755 * note that for the bulk update itself, no update will occur to
756 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
758 if (rra_step_cnt[i] > 2)
760 /* skip update by resetting rra_step_cnt[i],
761 * note that this is not data source specific; this is due
762 * to the bulk update, not a DNAN value for the specific data
765 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
766 &last_seasonal_coef);
767 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
771 /* periodically run a smoother for seasonal effects */
772 /* Need to use first cdp parameter buffer to track
773 * burnin (burnin requires a specific smoothing schedule).
774 * The CDP_init_seasonal parameter is really an RRA level,
775 * not a data source within RRA level parameter, but the rra_def
776 * is read only for rrd_update (not flushed to disk). */
777 iii = i*(rrd.stat_head -> ds_cnt);
778 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
781 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
782 > rrd.rra_def[i].row_cnt - 1) {
783 /* mark off one of the burnin cycles */
784 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
788 /* someone has no doubt invented a trick to deal with this
789 * wrap around, but at least this code is clear. */
790 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
791 rrd.rra_ptr[i].cur_row)
793 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
794 * mapping between PDP and CDP */
795 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
796 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
800 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
801 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
802 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
807 /* can't rely on negative numbers because we are working with
809 /* Don't need modulus here. If we've wrapped more than once, only
810 * one smooth is executed at the end. */
811 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
812 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
813 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
817 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
818 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
819 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
826 rra_current = ftell(rrd_file);
827 } /* if cf is DEVSEASONAL or SEASONAL */
829 if (rrd_test_error()) break;
831 /* update CDP_PREP areas */
832 /* loop over data soures within each RRA */
834 ii < rrd.stat_head->ds_cnt;
838 /* iii indexes the CDP prep area for this data source within the RRA */
839 iii=i*rrd.stat_head->ds_cnt+ii;
841 if (rrd.rra_def[i].pdp_cnt > 1) {
843 if (rra_step_cnt[i] > 0) {
844 /* If we are in this block, as least 1 CDP value will be written to
845 * disk, this is the CDP_primary_val entry. If more than 1 value needs
846 * to be written, then the "fill in" value is the CDP_secondary_val
848 if (isnan(pdp_temp[ii]))
850 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
851 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
853 /* CDP_secondary value is the RRA "fill in" value for intermediary
854 * CDP data entries. No matter the CF, the value is the same because
855 * the average, max, min, and last of a list of identical values is
856 * the same, namely, the value itself. */
857 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
860 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
861 > rrd.rra_def[i].pdp_cnt*
862 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
864 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
865 /* initialize carry over */
866 if (current_cf == CF_AVERAGE) {
867 if (isnan(pdp_temp[ii])) {
868 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
870 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
871 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
874 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
877 rrd_value_t cum_val, cur_val;
878 switch (current_cf) {
880 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
881 cur_val = IFDNAN(pdp_temp[ii],0.0);
882 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
883 (cum_val + cur_val * start_pdp_offset) /
884 (rrd.rra_def[i].pdp_cnt
885 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
886 /* initialize carry over value */
887 if (isnan(pdp_temp[ii])) {
888 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
890 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
891 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
895 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
896 cur_val = IFDNAN(pdp_temp[ii],-DINF);
898 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
899 isnan(pdp_temp[ii])) {
901 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
906 if (cur_val > cum_val)
907 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
909 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
910 /* initialize carry over value */
911 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
914 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
915 cur_val = IFDNAN(pdp_temp[ii],DINF);
917 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
918 isnan(pdp_temp[ii])) {
920 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
925 if (cur_val < cum_val)
926 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
928 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
929 /* initialize carry over value */
930 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
934 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
935 /* initialize carry over value */
936 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
939 } /* endif meets xff value requirement for a valid value */
940 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
941 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
942 if (isnan(pdp_temp[ii]))
943 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
944 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
946 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
947 } else /* rra_step_cnt[i] == 0 */
950 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
951 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
954 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
955 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
958 if (isnan(pdp_temp[ii])) {
959 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
960 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
962 if (current_cf == CF_AVERAGE) {
963 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
966 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
969 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
970 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
973 switch (current_cf) {
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
979 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
980 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
983 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
984 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
988 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
993 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
994 if (elapsed_pdp_st > 2)
996 switch (current_cf) {
999 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1000 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1003 case CF_DEVSEASONAL:
1004 /* need to update cached seasonal values, so they are consistent
1005 * with the bulk update */
1006 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1007 * CDP_last_deviation are the same. */
1008 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1009 last_seasonal_coef[ii];
1010 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1014 /* need to update the null_count and last_null_count.
1015 * even do this for non-DNAN pdp_temp because the
1016 * algorithm is not learning from batch updates. */
1017 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1019 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1023 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1024 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1027 /* do not count missed bulk values as failures */
1028 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1029 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1030 /* need to reset violations buffer.
1031 * could do this more carefully, but for now, just
1032 * assume a bulk update wipes away all violations. */
1033 erase_violations(&rrd, iii, i);
1037 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1039 if (rrd_test_error()) break;
1041 } /* endif data sources loop */
1042 } /* end RRA Loop */
1044 /* this loop is only entered if elapsed_pdp_st < 3 */
1045 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1046 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1048 for(i = 0, rra_start = rra_begin;
1049 i < rrd.stat_head->rra_cnt;
1050 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1053 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1055 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1056 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1058 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1059 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1061 rra_current = ftell(rrd_file);
1063 if (rrd_test_error()) break;
1064 /* loop over data soures within each RRA */
1066 ii < rrd.stat_head->ds_cnt;
1069 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1070 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1071 scratch_idx, seasonal_coef);
1073 } /* end RRA Loop */
1074 if (rrd_test_error()) break;
1075 } /* end elapsed_pdp_st loop */
1077 if (rrd_test_error()) break;
1079 /* Ready to write to disk */
1080 /* Move sequentially through the file, writing one RRA at a time.
1081 * Note this architecture divorces the computation of CDP with
1082 * flushing updated RRA entries to disk. */
1083 for(i = 0, rra_start = rra_begin;
1084 i < rrd.stat_head->rra_cnt;
1085 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1087 /* is there anything to write for this RRA? If not, continue. */
1088 if (rra_step_cnt[i] == 0) continue;
1090 /* write the first row */
1092 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1094 rrd.rra_ptr[i].cur_row++;
1095 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1096 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1097 /* positition on the first row */
1098 rra_pos_tmp = rra_start +
1099 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1100 if(rra_pos_tmp != rra_current) {
1101 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1102 rrd_set_error("seek error in rrd");
1105 rra_current = rra_pos_tmp;
1109 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1111 scratch_idx = CDP_primary_val;
1112 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1113 if (rrd_test_error()) break;
1115 /* write other rows of the bulk update, if any */
1116 scratch_idx = CDP_secondary_val;
1117 for ( ; rra_step_cnt[i] > 1;
1118 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1120 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1123 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1124 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1127 rrd.rra_ptr[i].cur_row = 0;
1128 /* seek back to beginning of current rra */
1129 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1131 rrd_set_error("seek error in rrd");
1135 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1137 rra_current = rra_start;
1139 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1142 if (rrd_test_error())
1146 /* break out of the argument parsing loop if error_string is set */
1147 if (rrd_test_error()){
1152 } /* endif a pdp_st has occurred */
1153 rrd.live_head->last_up = current_time;
1154 rrd.live_head->last_up_usec = current_time_usec;
1156 } /* function argument loop */
1158 if (seasonal_coef != NULL) free(seasonal_coef);
1159 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1160 if (rra_step_cnt != NULL) free(rra_step_cnt);
1161 rpnstack_free(&rpnstack);
1163 /* if we got here and if there is an error and if the file has not been
1164 * written to, then close things up and return. */
1165 if (rrd_test_error()) {
1175 /* aargh ... that was tough ... so many loops ... anyway, its done.
1176 * we just need to write back the live header portion now*/
1178 if (fseek(rrd_file, (sizeof(stat_head_t)
1179 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1180 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1182 rrd_set_error("seek rrd for live header writeback");
1193 if(fwrite( rrd.live_head,
1194 sizeof(live_head_t), 1, rrd_file) != 1){
1195 rrd_set_error("fwrite live_head to rrd");
1206 if(fwrite( &rrd.live_head->last_up,
1207 sizeof(time_t), 1, rrd_file) != 1){
1208 rrd_set_error("fwrite live_head to rrd");
1220 if(fwrite( rrd.pdp_prep,
1222 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1223 rrd_set_error("ftwrite pdp_prep to rrd");
1233 if(fwrite( rrd.cdp_prep,
1235 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1236 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1238 rrd_set_error("ftwrite cdp_prep to rrd");
1248 if(fwrite( rrd.rra_ptr,
1250 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1251 rrd_set_error("fwrite rra_ptr to rrd");
1261 /* OK now close the files and free the memory */
1262 if(fclose(rrd_file) != 0){
1263 rrd_set_error("closing rrd");
1272 /* calling the smoothing code here guarantees at most
1273 * one smoothing operation per rrd_update call. Unfortunately,
1274 * it is possible with bulk updates, or a long-delayed update
1275 * for smoothing to occur off-schedule. This really isn't
1276 * critical except during the burning cycles. */
1277 if (schedule_smooth)
1280 rrd_file = fopen(filename,"r+");
1282 rrd_file = fopen(filename,"rb+");
1284 rra_start = rra_begin;
1285 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1287 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1288 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1291 fprintf(stderr,"Running smoother for rra %ld\n",i);
1293 apply_smoother(&rrd,i,rra_start,rrd_file);
1294 if (rrd_test_error())
1297 rra_start += rrd.rra_def[i].row_cnt
1298 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1311 * get exclusive lock to whole file.
1312 * lock gets removed when we close the file
1314 * returns 0 on success
1317 LockRRD(FILE *rrdfile)
1319 int rrd_fd; /* File descriptor for RRD */
1322 rrd_fd = fileno(rrdfile);
1327 lock.l_type = F_WRLCK; /* exclusive write lock */
1328 lock.l_len = 0; /* whole file */
1329 lock.l_start = 0; /* start of file */
1330 lock.l_whence = SEEK_SET; /* end of file */
1332 stat = fcntl(rrd_fd, F_SETLK, &lock);
1336 if ( _fstat( rrd_fd, &st ) == 0 ) {
1337 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1349 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1350 unsigned short CDP_scratch_idx, FILE *rrd_file)
1352 unsigned long ds_idx, cdp_idx;
1354 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1356 /* compute the cdp index */
1357 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1359 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1360 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1361 rrd -> rra_def[rra_idx].cf_nam);
1364 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1365 sizeof(rrd_value_t),1,rrd_file) != 1)
1367 rrd_set_error("writing rrd");
1370 *rra_current += sizeof(rrd_value_t);