fixed leak in VDEF_PERCENT handlin -- Perry Stoll <perry_stoll@yahoo.com>
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.6  2002/02/01 20:34:49  oetiker
9  * fixed version number and date/time
10  *
11  * Revision 1.5  2001/05/09 05:31:01  oetiker
12  * Bug fix: when update of multiple PDP/CDP RRAs coincided
13  * with interpolation of multiple PDPs an incorrect value was
14  * stored as the CDP. Especially evident for GAUGE data sources.
15  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
16  *
17  * Revision 1.4  2001/03/10 23:54:41  oetiker
18  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
19  * parser and calculator from rrd_graph and puts then in a new file,
20  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
21  * clean-up of aberrant behavior stuff, including a bug fix.
22  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
23  * -- Jake Brutlag <jakeb@corp.webtv.net>
24  *
25  * Revision 1.3  2001/03/04 13:01:55  oetiker
26  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
27  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
28  * This is backwards compatible! But new files using the Aberrant stuff are not readable
29  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
30  * -- Jake Brutlag <jakeb@corp.webtv.net>
31  *
32  * Revision 1.2  2001/03/04 11:14:25  oetiker
33  * added at-style-time@value:value syntax to rrd_update
34  * --  Dave Bodenstab <imdave@mcs.net>
35  *
36  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
37  * checkin
38  *
39  *****************************************************************************/
40
41 #include "rrd_tool.h"
42 #include <sys/types.h>
43 #include <fcntl.h>
44 #include "rrd_hw.h"
45 #include "rrd_rpncalc.h"
46
47 #ifdef WIN32
48  #include <sys/locking.h>
49  #include <sys/stat.h>
50  #include <io.h>
51 #endif
52
53 /* Local prototypes */
54 int LockRRD(FILE *rrd_file);
55 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
56     unsigned short CDP_scratch_idx, FILE *rrd_file);
57  
58 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
59
60
61 #ifdef STANDALONE
62 int 
63 main(int argc, char **argv){
64         rrd_update(argc,argv);
65         if (rrd_test_error()) {
66                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
67                         "Usage: rrdupdate filename\n"
68                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
69                         "\t\t\ttime|N:value[:value...]\n\n"
70                         "\t\t\tat-time@value[:value...]\n\n"
71                         "\t\t\t[ time:value[:value...] ..]\n\n");
72                                    
73                 printf("ERROR: %s\n",rrd_get_error());
74                 rrd_clear_error();                                                            
75                 return 1;
76         }
77         return 0;
78 }
79 #endif
80
81 int
82 rrd_update(int argc, char **argv)
83 {
84
85     int              arg_i = 2;
86     short            j;
87     long             i,ii,iii=1;
88
89     unsigned long    rra_begin;          /* byte pointer to the rra
90                                           * area in the rrd file.  this
91                                           * pointer never changes value */
92     unsigned long    rra_start;          /* byte pointer to the rra
93                                           * area in the rrd file.  this
94                                           * pointer changes as each rrd is
95                                           * processed. */
96     unsigned long    rra_current;        /* byte pointer to the current write
97                                           * spot in the rrd file. */
98     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
99     unsigned long    interval,
100         pre_int,post_int;                /* interval between this and
101                                           * the last run */
102     unsigned long    proc_pdp_st;        /* which pdp_st was the last
103                                           * to be processed */
104     unsigned long    occu_pdp_st;        /* when was the pdp_st
105                                           * before the last update
106                                           * time */
107     unsigned long    proc_pdp_age;       /* how old was the data in
108                                           * the pdp prep area when it
109                                           * was last updated */
110     unsigned long    occu_pdp_age;       /* how long ago was the last
111                                           * pdp_step time */
112     rrd_value_t      *pdp_new;           /* prepare the incoming data
113                                           * to be added the the
114                                           * existing entry */
115     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
116                                           * to be added the the
117                                           * cdp values */
118
119     long             *tmpl_idx;          /* index representing the settings
120                                             transported by the template index */
121     long             tmpl_cnt = 2;       /* time and data */
122
123     FILE             *rrd_file;
124     rrd_t            rrd;
125     time_t           current_time = time(NULL);
126     char             **updvals;
127     int              schedule_smooth = 0;
128     char             *template = NULL;   
129         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
130                                          /* a vector of future Holt-Winters seasonal coefs */
131     unsigned long    elapsed_pdp_st;
132                                          /* number of elapsed PDP steps since last update */
133     unsigned long    *rra_step_cnt = NULL;
134                                          /* number of rows to be updated in an RRA for a data
135                                           * value. */
136     unsigned long    start_pdp_offset;
137                                          /* number of PDP steps since the last update that
138                                           * are assigned to the first CDP to be generated
139                                           * since the last update. */
140     unsigned short   scratch_idx;
141                                          /* index into the CDP scratch array */
142     enum cf_en       current_cf;
143                                          /* numeric id of the current consolidation function */
144     rpnstack_t       rpnstack; /* used for COMPUTE DS */
145
146     rpnstack_init(&rpnstack);
147
148     while (1) {
149         static struct option long_options[] =
150         {
151             {"template",      required_argument, 0, 't'},
152             {0,0,0,0}
153         };
154         int option_index = 0;
155         int opt;
156         opt = getopt_long(argc, argv, "t:", 
157                           long_options, &option_index);
158         
159         if (opt == EOF)
160           break;
161         
162         switch(opt) {
163         case 't':
164             template = optarg;
165             break;
166
167         case '?':
168             rrd_set_error("unknown option '%s'",argv[optind-1]);
169             rrd_free(&rrd);            
170             return(-1);
171         }
172     }
173
174     /* need at least 2 arguments: filename, data. */
175     if (argc-optind < 2) {
176         rrd_set_error("Not enough arguments");
177         return -1;
178     }
179
180     if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
181         return -1;
182     }
183     rra_current = rra_start = rra_begin = ftell(rrd_file);
184     /* This is defined in the ANSI C standard, section 7.9.5.3:
185
186         When a file is opened with udpate mode ('+' as the second
187         or third character in the ... list of mode argument
188         variables), both input and ouptut may be performed on the
189         associated stream.  However, ...  input may not be directly
190         followed by output without an intervening call to a file
191         positioning function, unless the input oepration encounters
192         end-of-file. */
193     fseek(rrd_file, 0, SEEK_CUR);
194
195     
196     /* get exclusive lock to whole file.
197      * lock gets removed when we close the file.
198      */
199     if (LockRRD(rrd_file) != 0) {
200       rrd_set_error("could not lock RRD");
201       rrd_free(&rrd);
202       fclose(rrd_file);
203       return(-1);   
204     } 
205
206     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
207         rrd_set_error("allocating updvals pointer array");
208         rrd_free(&rrd);
209         fclose(rrd_file);
210         return(-1);
211     }
212
213     if ((pdp_temp = malloc(sizeof(rrd_value_t)
214                            *rrd.stat_head->ds_cnt))==NULL){
215         rrd_set_error("allocating pdp_temp ...");
216         free(updvals);
217         rrd_free(&rrd);
218         fclose(rrd_file);
219         return(-1);
220     }
221
222     if ((tmpl_idx = malloc(sizeof(unsigned long)
223                            *(rrd.stat_head->ds_cnt+1)))==NULL){
224         rrd_set_error("allocating tmpl_idx ...");
225         free(pdp_temp);
226         free(updvals);
227         rrd_free(&rrd);
228         fclose(rrd_file);
229         return(-1);
230     }
231     /* initialize template redirector */
232     /* default config example (assume DS 1 is a CDEF DS)
233        tmpl_idx[0] -> 0; (time)
234        tmpl_idx[1] -> 1; (DS 0)
235        tmpl_idx[2] -> 3; (DS 2)
236        tmpl_idx[3] -> 4; (DS 3) */
237     tmpl_idx[0] = 0; /* time */
238     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
239         {
240            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
241               tmpl_idx[ii++]=i;
242         }
243     tmpl_cnt= ii;
244
245     if (template) {
246         char *dsname;
247         int tmpl_len;
248         dsname = template;
249         tmpl_cnt = 1; /* the first entry is the time */
250         tmpl_len = strlen(template);
251         for(i=0;i<=tmpl_len ;i++) {
252             if (template[i] == ':' || template[i] == '\0') {
253                 template[i] = '\0';
254                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
255                     rrd_set_error("Template contains more DS definitions than RRD");
256                     free(updvals); free(pdp_temp);
257                     free(tmpl_idx); rrd_free(&rrd);
258                     fclose(rrd_file); return(-1);
259                 }
260                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
261                     rrd_set_error("unknown DS name '%s'",dsname);
262                     free(updvals); free(pdp_temp);
263                     free(tmpl_idx); rrd_free(&rrd);
264                     fclose(rrd_file); return(-1);
265                 } else {
266                   /* the first element is always the time */
267                   tmpl_idx[tmpl_cnt-1]++; 
268                   /* go to the next entry on the template */
269                   dsname = &template[i+1];
270                   /* fix the damage we did before */
271                   if (i<tmpl_len) {
272                      template[i]=':';
273                   } 
274
275                 }
276             }       
277         }
278     }
279     if ((pdp_new = malloc(sizeof(rrd_value_t)
280                           *rrd.stat_head->ds_cnt))==NULL){
281         rrd_set_error("allocating pdp_new ...");
282         free(updvals);
283         free(pdp_temp);
284         free(tmpl_idx);
285         rrd_free(&rrd);
286         fclose(rrd_file);
287         return(-1);
288     }
289
290     /* loop through the arguments. */
291     for(arg_i=optind+1; arg_i<argc;arg_i++) {
292         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
293         char *step_start = stepper;
294         char *p;
295         char *parsetime_error = NULL;
296         enum {atstyle, normal} timesyntax;
297         struct time_value ds_tv;
298         if (stepper == NULL){
299                 rrd_set_error("failed duplication argv entry");
300                 free(updvals);
301                 free(pdp_temp);  
302                 free(tmpl_idx);
303                 rrd_free(&rrd);
304                 fclose(rrd_file);
305                 return(-1);
306          }
307         /* initialize all ds input to unknown except the first one
308            which has always got to be set */
309         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
310         strcpy(stepper,argv[arg_i]);
311         updvals[0]=stepper;
312         /* separate all ds elements; first must be examined separately
313            due to alternate time syntax */
314         if ((p=strchr(stepper,'@'))!=NULL) {
315             timesyntax = atstyle;
316             *p = '\0';
317             stepper = p+1;
318         } else if ((p=strchr(stepper,':'))!=NULL) {
319             timesyntax = normal;
320             *p = '\0';
321             stepper = p+1;
322         } else {
323             rrd_set_error("expected timestamp not found in data source from %s:...",
324                           argv[arg_i]);
325             free(step_start);
326             break;
327         }
328         ii=1;
329         updvals[tmpl_idx[ii]] = stepper;
330         while (*stepper) {
331             if (*stepper == ':') {
332                 *stepper = '\0';
333                 ii++;
334                 if (ii<tmpl_cnt){                   
335                     updvals[tmpl_idx[ii]] = stepper+1;
336                 }
337             }
338             stepper++;
339         }
340
341         if (ii != tmpl_cnt-1) {
342             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
343                           tmpl_cnt-1, ii, argv[arg_i]);
344             free(step_start);
345             break;
346         }
347         
348         /* get the time from the reading ... handle N */
349         if (timesyntax == atstyle) {
350             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
351                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
352                 free(step_start);
353                 break;
354             }
355             if (ds_tv.type == RELATIVE_TO_END_TIME ||
356                 ds_tv.type == RELATIVE_TO_START_TIME) {
357                 rrd_set_error("specifying time relative to the 'start' "
358                               "or 'end' makes no sense here: %s",
359                               updvals[0]);
360                 free(step_start);
361                 break;
362             }
363
364             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
365         } else if (strcmp(updvals[0],"N")==0){
366             current_time = time(NULL);
367         } else {
368             current_time = atol(updvals[0]);
369         }
370         
371         if(current_time <= rrd.live_head->last_up){
372             rrd_set_error("illegal attempt to update using time %ld when "
373                           "last update time is %ld (minimum one second step)",
374                           current_time, rrd.live_head->last_up);
375             free(step_start);
376             break;
377         }
378         
379         
380         /* seek to the beginning of the rra's */
381         if (rra_current != rra_begin) {
382             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
383                 rrd_set_error("seek error in rrd");
384                 free(step_start);
385                 break;
386             }
387             rra_current = rra_begin;
388         }
389         rra_start = rra_begin;
390
391         /* when was the current pdp started */
392         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
393         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
394
395         /* when did the last pdp_st occur */
396         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
397         occu_pdp_st = current_time - occu_pdp_age;
398         interval = current_time - rrd.live_head->last_up;
399     
400         if (occu_pdp_st > proc_pdp_st){
401             /* OK we passed the pdp_st moment*/
402             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
403                                                               * occurred before the latest
404                                                               * pdp_st moment*/
405             post_int = occu_pdp_age;                         /* how much after it */
406         } else {
407             pre_int = interval;
408             post_int = 0;
409         }
410
411 #ifdef DEBUG
412         printf(
413                "proc_pdp_age %lu\t"
414                "proc_pdp_st %lu\t" 
415                "occu_pfp_age %lu\t" 
416                "occu_pdp_st %lu\t"
417                "int %lu\t"
418                "pre_int %lu\t"
419                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
420                 occu_pdp_age, occu_pdp_st,
421                interval, pre_int, post_int);
422 #endif
423     
424         /* process the data sources and update the pdp_prep 
425          * area accordingly */
426         for(i=0;i<rrd.stat_head->ds_cnt;i++){
427             enum dst_en dst_idx;
428             dst_idx= dst_conv(rrd.ds_def[i].dst);
429                 /* NOTE: DST_CDEF should never enter this if block, because
430                  * updvals[i+1][0] is initialized to 'U'; unless the caller
431                  * accidently specified a value for the DST_CDEF. To handle 
432                  * this case, an extra check is required. */
433             if((updvals[i+1][0] != 'U') &&
434                    (dst_idx != DST_CDEF) &&
435                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
436                double rate = DNAN;
437                /* the data source type defines how to process the data */
438                 /* pdp_new contains rate * time ... eg the bytes
439                  * transferred during the interval. Doing it this way saves
440                  * a lot of math operations */
441                 
442
443                 switch(dst_idx){
444                 case DST_COUNTER:
445                 case DST_DERIVE:
446                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
447                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
448                        if(dst_idx == DST_COUNTER) {
449                           /* simple overflow catcher sugestet by andres kroonmaa */
450                           /* this will fail terribly for non 32 or 64 bit counters ... */
451                           /* are there any others in SNMP land ? */
452                           if (pdp_new[i] < (double)0.0 ) 
453                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
454                           if (pdp_new[i] < (double)0.0 ) 
455                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
456                        }
457                        rate = pdp_new[i] / interval;
458                     }
459                    else {
460                      pdp_new[i]= DNAN;          
461                    }
462                    break;
463                 case DST_ABSOLUTE:
464                     pdp_new[i]= atof(updvals[i+1]);
465                     rate = pdp_new[i] / interval;                 
466                     break;
467                 case DST_GAUGE:
468                     pdp_new[i] = atof(updvals[i+1]) * interval;
469                     rate = pdp_new[i] / interval;                  
470                     break;
471                 default:
472                     rrd_set_error("rrd contains unknown DS type : '%s'",
473                                   rrd.ds_def[i].dst);
474                     break;
475                 }
476                 /* break out of this for loop if the error string is set */
477                 if (rrd_test_error()){
478                     break;
479                 }
480                /* make sure pdp_temp is neither too large or too small
481                 * if any of these occur it becomes unknown ...
482                 * sorry folks ... */
483                if ( ! isnan(rate) && 
484                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
485                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
486                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
487                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
488                   pdp_new[i] = DNAN;
489                }               
490             } else {
491                 /* no news is news all the same */
492                 pdp_new[i] = DNAN;
493             }
494             
495             /* make a copy of the command line argument for the next run */
496 #ifdef DEBUG
497             fprintf(stderr,
498                     "prep ds[%lu]\t"
499                     "last_arg '%s'\t"
500                     "this_arg '%s'\t"
501                     "pdp_new %10.2f\n",
502                     i,
503                     rrd.pdp_prep[i].last_ds,
504                     updvals[i+1], pdp_new[i]);
505 #endif
506             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
507                 strncpy(rrd.pdp_prep[i].last_ds,
508                         updvals[i+1],LAST_DS_LEN-1);
509                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
510             }
511         }
512         /* break out of the argument parsing loop if the error_string is set */
513         if (rrd_test_error()){
514             free(step_start);
515             break;
516         }
517         /* has a pdp_st moment occurred since the last run ? */
518
519         if (proc_pdp_st == occu_pdp_st){
520             /* no we have not passed a pdp_st moment. therefore update is simple */
521
522             for(i=0;i<rrd.stat_head->ds_cnt;i++){
523                 if(isnan(pdp_new[i]))
524                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
525                 else
526                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
527 #ifdef DEBUG
528                 fprintf(stderr,
529                         "NO PDP  ds[%lu]\t"
530                         "value %10.2f\t"
531                         "unkn_sec %5lu\n",
532                         i,
533                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
534                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
535 #endif
536             }   
537         } else {
538             /* an pdp_st has occurred. */
539
540             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
541              * occurred up to the last run.        
542             pdp_new[] contains rate*seconds from the latest run.
543             pdp_temp[] will contain the rate for cdp */
544
545             for(i=0;i<rrd.stat_head->ds_cnt;i++){
546                 /* update pdp_prep to the current pdp_st */
547                 if(isnan(pdp_new[i]))
548                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
549                 else
550                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
551                         pdp_new[i]/(double)interval*(double)pre_int;
552
553                 /* if too much of the pdp_prep is unknown we dump it */
554                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
555                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
556                     (occu_pdp_st-proc_pdp_st <= 
557                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
558                     pdp_temp[i] = DNAN;
559                 } else {
560                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
561                         / (double)( occu_pdp_st
562                                    - proc_pdp_st
563                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
564                 }
565
566                 /* process CDEF data sources; remember each CDEF DS can
567                  * only reference other DS with a lower index number */
568             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
569                    rpnp_t *rpnp;
570                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
571                    /* substitue data values for OP_VARIABLE nodes */
572                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
573                    {
574                           if (rpnp[ii].op == OP_VARIABLE) {
575                                  rpnp[ii].op = OP_NUMBER;
576                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
577                           }
578                    }
579                    /* run the rpn calculator */
580                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
581                           free(rpnp);
582                           break; /* exits the data sources pdp_temp loop */
583                    }
584                 }
585         
586                 /* make pdp_prep ready for the next run */
587                 if(isnan(pdp_new[i])){
588                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
589                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
590                 } else {
591                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
592                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
593                         pdp_new[i]/(double)interval*(double)post_int;
594                 }
595
596 #ifdef DEBUG
597                 fprintf(stderr,
598                         "PDP UPD ds[%lu]\t"
599                         "pdp_temp %10.2f\t"
600                         "new_prep %10.2f\t"
601                         "new_unkn_sec %5lu\n",
602                         i, pdp_temp[i],
603                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
604                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
605 #endif
606             }
607
608                 /* if there were errors during the last loop, bail out here */
609             if (rrd_test_error()){
610                free(step_start);
611                break;
612             }
613
614                 /* compute the number of elapsed pdp_st moments */
615                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
616 #ifdef DEBUG
617                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
618 #endif
619                 if (rra_step_cnt == NULL)
620                 {
621                    rra_step_cnt = (unsigned long *) 
622                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
623                 }
624
625             for(i = 0, rra_start = rra_begin;
626                 i < rrd.stat_head->rra_cnt;
627             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
628                 i++)
629                 {
630                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
631                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
632                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
633         if (start_pdp_offset <= elapsed_pdp_st) {
634            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
635                       rrd.rra_def[i].pdp_cnt + 1;
636             } else {
637                    rra_step_cnt[i] = 0;
638                 }
639
640                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
641                 {
642                    /* If this is a bulk update, we need to skip ahead in the seasonal
643                         * arrays so that they will be correct for the next observed value;
644                         * note that for the bulk update itself, no update will occur to
645                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
646                         * be set to DNAN. */
647            if (rra_step_cnt[i] > 2) 
648                    {
649                           /* skip update by resetting rra_step_cnt[i],
650                            * note that this is not data source specific; this is due
651                            * to the bulk update, not a DNAN value for the specific data
652                            * source. */
653                           rra_step_cnt[i] = 0;
654               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
655                              &last_seasonal_coef);
656                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
657                              &seasonal_coef);
658                    }
659                 
660                   /* periodically run a smoother for seasonal effects */
661                   /* Need to use first cdp parameter buffer to track
662                    * burnin (burnin requires a specific smoothing schedule).
663                    * The CDP_init_seasonal parameter is really an RRA level,
664                    * not a data source within RRA level parameter, but the rra_def
665                    * is read only for rrd_update (not flushed to disk). */
666                   iii = i*(rrd.stat_head -> ds_cnt);
667                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
668                           <= BURNIN_CYCLES)
669                   {
670                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
671                                  > rrd.rra_def[i].row_cnt - 1) {
672                            /* mark off one of the burnin cycles */
673                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
674                        schedule_smooth = 1;
675                          }  
676                   } else {
677                          /* someone has no doubt invented a trick to deal with this
678                           * wrap around, but at least this code is clear. */
679                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
680                              rrd.rra_ptr[i].cur_row)
681                          {
682                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
683                                   * mapping between PDP and CDP */
684                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
685                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
686                                  {
687 #ifdef DEBUG
688                                         fprintf(stderr,
689                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
690                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
691                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
692 #endif
693                                         schedule_smooth = 1;
694                                  }
695              } else {
696                                  /* can't rely on negative numbers because we are working with
697                                   * unsigned values */
698                                  /* Don't need modulus here. If we've wrapped more than once, only
699                                   * one smooth is executed at the end. */
700                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
701                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
702                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
703                                  {
704 #ifdef DEBUG
705                                         fprintf(stderr,
706                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
707                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
708                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
709 #endif
710                                         schedule_smooth = 1;
711                                  }
712                          }
713                   }
714
715               rra_current = ftell(rrd_file); 
716                 } /* if cf is DEVSEASONAL or SEASONAL */
717
718         if (rrd_test_error()) break;
719
720                     /* update CDP_PREP areas */
721                     /* loop over data soures within each RRA */
722                     for(ii = 0;
723                         ii < rrd.stat_head->ds_cnt;
724                         ii++)
725                         {
726                         
727                         /* iii indexes the CDP prep area for this data source within the RRA */
728                         iii=i*rrd.stat_head->ds_cnt+ii;
729
730                         if (rrd.rra_def[i].pdp_cnt > 1) {
731                           
732                            if (rra_step_cnt[i] > 0) {
733                            /* If we are in this block, as least 1 CDP value will be written to
734                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
735                                 * to be written, then the "fill in" value is the CDP_secondary_val
736                                 * entry. */
737                                   if (isnan(pdp_temp[ii]))
738                   {
739                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
740                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
741                                   } else {
742                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
743                                           * CDP data entries. No matter the CF, the value is the same because
744                                           * the average, max, min, and last of a list of identical values is
745                                           * the same, namely, the value itself. */
746                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
747                                   }
748                      
749                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
750                                       > rrd.rra_def[i].pdp_cnt*
751                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
752                                   {
753                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
754                                          /* initialize carry over */
755                                          if (current_cf == CF_AVERAGE) {
756                                                    if (isnan(pdp_temp[ii])) { 
757                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
758                                                    } else {
759                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
760                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
761                                                    }
762                                          } else {
763                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
764                                          }
765                                   } else {
766                                          rrd_value_t cum_val, cur_val; 
767                                      switch (current_cf) {
768                                                 case CF_AVERAGE:
769                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
770                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
771                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
772                                                (cum_val + cur_val * start_pdp_offset) /
773                                            (rrd.rra_def[i].pdp_cnt
774                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
775                                                    /* initialize carry over value */
776                                                    if (isnan(pdp_temp[ii])) { 
777                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
778                                                    } else {
779                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
780                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
781                                                    }
782                                                    break;
783                                                 case CF_MAXIMUM:
784                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
785                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
786 #ifdef DEBUG
787                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
788                                                           isnan(pdp_temp[ii])) {
789                                                      fprintf(stderr,
790                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
791                                                                 i,ii);
792                                                          exit(-1);
793                                                   }
794 #endif
795                                                   if (cur_val > cum_val)
796                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
797                                                   else
798                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
799                                                   /* initialize carry over value */
800                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
801                                                   break;
802                                                 case CF_MINIMUM:
803                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
804                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
805 #ifdef DEBUG
806                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
807                                                           isnan(pdp_temp[ii])) {
808                                                      fprintf(stderr,
809                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
810                                                                 i,ii);
811                                                          exit(-1);
812                                                   }
813 #endif
814                                                   if (cur_val < cum_val)
815                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
816                                                   else
817                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
818                                                   /* initialize carry over value */
819                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
820                                                   break;
821                                                 case CF_LAST:
822                                                 default:
823                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
824                                                    /* initialize carry over value */
825                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
826                                                 break;
827                                          }
828                                   } /* endif meets xff value requirement for a valid value */
829                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
830                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
831                                   if (isnan(pdp_temp[ii]))
832                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
833                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
834                                   else
835                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
836                } else  /* rra_step_cnt[i]  == 0 */
837                            {
838 #ifdef DEBUG
839                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
840                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
841                                          i,ii);
842                                   } else {
843                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
844                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
845                                   }
846 #endif
847                                   if (isnan(pdp_temp[ii])) {
848                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
849                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
850                                   {
851                                          if (current_cf == CF_AVERAGE) {
852                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
853                                                    elapsed_pdp_st;
854                                          } else {
855                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
856                                          }
857 #ifdef DEBUG
858                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
859                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
860 #endif
861                                   } else {
862                                          switch (current_cf) {
863                                          case CF_AVERAGE:
864                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
865                                                    elapsed_pdp_st;
866                                                 break;
867                                          case CF_MINIMUM:
868                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
869                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
870                                                 break; 
871                                          case CF_MAXIMUM:
872                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
873                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
874                                                 break; 
875                                          case CF_LAST:
876                                          default:
877                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
878                                                 break;
879                                          }
880                                   }
881                            }
882                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
883                            if (elapsed_pdp_st > 2)
884                            {
885                                    switch (current_cf) {
886                                    case CF_AVERAGE:
887                                    default:
888                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
889                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
890                                           break;
891                    case CF_SEASONAL:
892                                    case CF_DEVSEASONAL:
893                                           /* need to update cached seasonal values, so they are consistent
894                                            * with the bulk update */
895                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
896                                            * CDP_last_deviation are the same. */
897                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
898                                                  last_seasonal_coef[ii];
899                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
900                                                  seasonal_coef[ii];
901                                           break;
902                    case CF_HWPREDICT:
903                                           /* need to update the null_count and last_null_count.
904                                            * even do this for non-DNAN pdp_temp because the
905                                            * algorithm is not learning from batch updates. */
906                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
907                                                  elapsed_pdp_st;
908                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
909                                                  elapsed_pdp_st - 1;
910                                           /* fall through */
911                                    case CF_DEVPREDICT:
912                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
913                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
914                                           break;
915                    case CF_FAILURES:
916                                           /* do not count missed bulk values as failures */
917                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
918                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
919                                           /* need to reset violations buffer.
920                                            * could do this more carefully, but for now, just
921                                            * assume a bulk update wipes away all violations. */
922                       erase_violations(&rrd, iii, i);
923                                           break;
924                                    }
925                            } 
926                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
927
928                         if (rrd_test_error()) break;
929
930                         } /* endif data sources loop */
931         } /* end RRA Loop */
932
933                 /* this loop is only entered if elapsed_pdp_st < 3 */
934                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
935                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
936                 {
937                for(i = 0, rra_start = rra_begin;
938                    i < rrd.stat_head->rra_cnt;
939                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
940                    i++)
941                    {
942                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
943
944                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
945                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
946                           {
947                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
948                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
949                                 &seasonal_coef);
950                  rra_current = ftell(rrd_file);
951                           }
952                           if (rrd_test_error()) break;
953                       /* loop over data soures within each RRA */
954                       for(ii = 0;
955                           ii < rrd.stat_head->ds_cnt;
956                           ii++)
957                           {
958                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
959                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
960                                     scratch_idx, seasonal_coef);
961                           }
962            } /* end RRA Loop */
963                    if (rrd_test_error()) break;
964             } /* end elapsed_pdp_st loop */
965
966                 if (rrd_test_error()) break;
967
968                 /* Ready to write to disk */
969                 /* Move sequentially through the file, writing one RRA at a time.
970                  * Note this architecture divorces the computation of CDP with
971                  * flushing updated RRA entries to disk. */
972             for(i = 0, rra_start = rra_begin;
973                 i < rrd.stat_head->rra_cnt;
974             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
975                 i++) {
976                 /* is there anything to write for this RRA? If not, continue. */
977         if (rra_step_cnt[i] == 0) continue;
978
979                 /* write the first row */
980 #ifdef DEBUG
981         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
982 #endif
983             rrd.rra_ptr[i].cur_row++;
984             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
985                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
986                 /* positition on the first row */
987                 rra_pos_tmp = rra_start +
988                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
989                 if(rra_pos_tmp != rra_current) {
990                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
991                       rrd_set_error("seek error in rrd");
992                       break;
993                    }
994                    rra_current = rra_pos_tmp;
995                 }
996
997 #ifdef DEBUG
998             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
999 #endif
1000                 scratch_idx = CDP_primary_val;
1001                 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1002                 if (rrd_test_error()) break;
1003
1004                 /* write other rows of the bulk update, if any */
1005                 scratch_idx = CDP_secondary_val;
1006                 for ( ; rra_step_cnt[i] > 1; 
1007                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1008                 {
1009                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1010                    {
1011 #ifdef DEBUG
1012               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1013                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1014 #endif
1015                           /* wrap */
1016                           rrd.rra_ptr[i].cur_row = 0;
1017                           /* seek back to beginning of current rra */
1018                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1019                           {
1020                          rrd_set_error("seek error in rrd");
1021                          break;
1022                           }
1023 #ifdef DEBUG
1024                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1025 #endif
1026                           rra_current = rra_start;
1027                    }
1028                    write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1029                 }
1030                 
1031                 if (rrd_test_error())
1032                   break;
1033                 } /* RRA LOOP */
1034
1035             /* break out of the argument parsing loop if error_string is set */
1036             if (rrd_test_error()){
1037                    free(step_start);
1038                    break;
1039             } 
1040             
1041         } /* endif a pdp_st has occurred */ 
1042         rrd.live_head->last_up = current_time;
1043         free(step_start);
1044     } /* function argument loop */
1045
1046     if (seasonal_coef != NULL) free(seasonal_coef);
1047     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1048         if (rra_step_cnt != NULL) free(rra_step_cnt);
1049     rpnstack_free(&rpnstack);
1050
1051     /* if we got here and if there is an error and if the file has not been
1052      * written to, then close things up and return. */
1053     if (rrd_test_error()) {
1054         free(updvals);
1055         free(tmpl_idx);
1056         rrd_free(&rrd);
1057         free(pdp_temp);
1058         free(pdp_new);
1059         fclose(rrd_file);
1060         return(-1);
1061     }
1062
1063     /* aargh ... that was tough ... so many loops ... anyway, its done.
1064      * we just need to write back the live header portion now*/
1065
1066     if (fseek(rrd_file, (sizeof(stat_head_t)
1067                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1068                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1069               SEEK_SET) != 0) {
1070         rrd_set_error("seek rrd for live header writeback");
1071         free(updvals);
1072         free(tmpl_idx);
1073         rrd_free(&rrd);
1074         free(pdp_temp);
1075         free(pdp_new);
1076         fclose(rrd_file);
1077         return(-1);
1078     }
1079
1080     if(fwrite( rrd.live_head,
1081                sizeof(live_head_t), 1, rrd_file) != 1){
1082         rrd_set_error("fwrite live_head to rrd");
1083         free(updvals);
1084         rrd_free(&rrd);
1085         free(tmpl_idx);
1086         free(pdp_temp);
1087         free(pdp_new);
1088         fclose(rrd_file);
1089         return(-1);
1090     }
1091
1092     if(fwrite( rrd.pdp_prep,
1093                sizeof(pdp_prep_t),
1094                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1095         rrd_set_error("ftwrite pdp_prep to rrd");
1096         free(updvals);
1097         rrd_free(&rrd);
1098         free(tmpl_idx);
1099         free(pdp_temp);
1100         free(pdp_new);
1101         fclose(rrd_file);
1102         return(-1);
1103     }
1104
1105     if(fwrite( rrd.cdp_prep,
1106                sizeof(cdp_prep_t),
1107                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1108        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1109
1110         rrd_set_error("ftwrite cdp_prep to rrd");
1111         free(updvals);
1112         free(tmpl_idx);
1113         rrd_free(&rrd);
1114         free(pdp_temp);
1115         free(pdp_new);
1116         fclose(rrd_file);
1117         return(-1);
1118     }
1119
1120     if(fwrite( rrd.rra_ptr,
1121                sizeof(rra_ptr_t), 
1122                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1123         rrd_set_error("fwrite rra_ptr to rrd");
1124         free(updvals);
1125         free(tmpl_idx);
1126         rrd_free(&rrd);
1127         free(pdp_temp);
1128         free(pdp_new);
1129         fclose(rrd_file);
1130         return(-1);
1131     }
1132
1133     /* OK now close the files and free the memory */
1134     if(fclose(rrd_file) != 0){
1135         rrd_set_error("closing rrd");
1136         free(updvals);
1137         free(tmpl_idx);
1138         rrd_free(&rrd);
1139         free(pdp_temp);
1140         free(pdp_new);
1141         return(-1);
1142     }
1143
1144     /* calling the smoothing code here guarantees at most
1145          * one smoothing operation per rrd_update call. Unfortunately,
1146          * it is possible with bulk updates, or a long-delayed update
1147          * for smoothing to occur off-schedule. This really isn't
1148          * critical except during the burning cycles. */
1149         if (schedule_smooth)
1150         {
1151 #ifndef WIN32
1152           rrd_file = fopen(argv[optind],"r+");
1153 #else
1154           rrd_file = fopen(argv[optind],"rb+");
1155 #endif
1156           rra_start = rra_begin;
1157           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1158           {
1159             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1160                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1161             {
1162 #ifdef DEBUG
1163               fprintf(stderr,"Running smoother for rra %ld\n",i);
1164 #endif
1165               apply_smoother(&rrd,i,rra_start,rrd_file);
1166               if (rrd_test_error())
1167                 break;
1168             }
1169             rra_start += rrd.rra_def[i].row_cnt
1170               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1171           }
1172           fclose(rrd_file);
1173         }
1174     rrd_free(&rrd);
1175     free(updvals);
1176     free(tmpl_idx);
1177     free(pdp_new);
1178     free(pdp_temp);
1179     return(0);
1180 }
1181
1182 /*
1183  * get exclusive lock to whole file.
1184  * lock gets removed when we close the file
1185  *
1186  * returns 0 on success
1187  */
1188 int
1189 LockRRD(FILE *rrdfile)
1190 {
1191     int rrd_fd;         /* File descriptor for RRD */
1192     int                 stat;
1193
1194     rrd_fd = fileno(rrdfile);
1195
1196         {
1197 #ifndef WIN32    
1198                 struct flock    lock;
1199     lock.l_type = F_WRLCK;    /* exclusive write lock */
1200     lock.l_len = 0;           /* whole file */
1201     lock.l_start = 0;         /* start of file */
1202     lock.l_whence = SEEK_SET;   /* end of file */
1203
1204     stat = fcntl(rrd_fd, F_SETLK, &lock);
1205 #else
1206                 struct _stat st;
1207
1208                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1209                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1210                 } else {
1211                         stat = -1;
1212                 }
1213 #endif
1214         }
1215
1216     return(stat);
1217 }
1218
1219
1220 void
1221 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1222    unsigned short CDP_scratch_idx, FILE *rrd_file)
1223 {
1224    unsigned long ds_idx, cdp_idx;
1225
1226    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1227    {
1228       /* compute the cdp index */
1229       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1230 #ifdef DEBUG
1231           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1232              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1233              rrd -> rra_def[rra_idx].cf_nam);
1234 #endif 
1235
1236           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1237                  sizeof(rrd_value_t),1,rrd_file) != 1)
1238           { 
1239              rrd_set_error("writing rrd");
1240                  return;
1241           }
1242           *rra_current += sizeof(rrd_value_t);
1243         }
1244 }