Support for COMPUTE data sources (CDEF data sources). Removes the RPN
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.0.33  Copyright Tobias Oetiker, 1997 - 2000
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.4  2001/03/10 23:54:41  oetiker
9  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
10  * parser and calculator from rrd_graph and puts then in a new file,
11  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
12  * clean-up of aberrant behavior stuff, including a bug fix.
13  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
14  * -- Jake Brutlag <jakeb@corp.webtv.net>
15  *
16  * Revision 1.3  2001/03/04 13:01:55  oetiker
17  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
18  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
19  * This is backwards compatible! But new files using the Aberrant stuff are not readable
20  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
21  * -- Jake Brutlag <jakeb@corp.webtv.net>
22  *
23  * Revision 1.2  2001/03/04 11:14:25  oetiker
24  * added at-style-time@value:value syntax to rrd_update
25  * --  Dave Bodenstab <imdave@mcs.net>
26  *
27  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
28  * checkin
29  *
30  *****************************************************************************/
31
32 #include "rrd_tool.h"
33 #include <sys/types.h>
34 #include <fcntl.h>
35 #include "rrd_hw.h"
36 #include "rrd_rpncalc.h"
37
38 #ifdef WIN32
39  #include <sys/locking.h>
40  #include <sys/stat.h>
41  #include <io.h>
42 #endif
43
44 /* Local prototypes */
45 int LockRRD(FILE *rrd_file);
46 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
47     unsigned short CDP_scratch_idx, FILE *rrd_file);
48  
49 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
50
51
52 #ifdef STANDALONE
53 int 
54 main(int argc, char **argv){
55         rrd_update(argc,argv);
56         if (rrd_test_error()) {
57                 printf("RRDtool 1.0.33  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
58                         "Usage: rrdupdate filename\n"
59                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
60                         "\t\t\ttime|N:value[:value...]\n\n"
61                         "\t\t\tat-time@value[:value...]\n\n"
62                         "\t\t\t[ time:value[:value...] ..]\n\n");
63                                    
64                 printf("ERROR: %s\n",rrd_get_error());
65                 rrd_clear_error();                                                            
66                 return 1;
67         }
68         return 0;
69 }
70 #endif
71
72 int
73 rrd_update(int argc, char **argv)
74 {
75
76     int              arg_i = 2;
77     short            j;
78     long             i,ii,iii=1;
79
80     unsigned long    rra_begin;          /* byte pointer to the rra
81                                           * area in the rrd file.  this
82                                           * pointer never changes value */
83     unsigned long    rra_start;          /* byte pointer to the rra
84                                           * area in the rrd file.  this
85                                           * pointer changes as each rrd is
86                                           * processed. */
87     unsigned long    rra_current;        /* byte pointer to the current write
88                                           * spot in the rrd file. */
89     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
90     unsigned long    interval,
91         pre_int,post_int;                /* interval between this and
92                                           * the last run */
93     unsigned long    proc_pdp_st;        /* which pdp_st was the last
94                                           * to be processed */
95     unsigned long    occu_pdp_st;        /* when was the pdp_st
96                                           * before the last update
97                                           * time */
98     unsigned long    proc_pdp_age;       /* how old was the data in
99                                           * the pdp prep area when it
100                                           * was last updated */
101     unsigned long    occu_pdp_age;       /* how long ago was the last
102                                           * pdp_step time */
103     rrd_value_t      *pdp_new;           /* prepare the incoming data
104                                           * to be added the the
105                                           * existing entry */
106     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
107                                           * to be added the the
108                                           * cdp values */
109
110     long             *tmpl_idx;          /* index representing the settings
111                                             transported by the template index */
112     long             tmpl_cnt = 2;       /* time and data */
113
114     FILE             *rrd_file;
115     rrd_t            rrd;
116     time_t           current_time = time(NULL);
117     char             **updvals;
118     int              schedule_smooth = 0;
119     char             *template = NULL;   
120         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
121                                          /* a vector of future Holt-Winters seasonal coefs */
122     unsigned long    elapsed_pdp_st;
123                                          /* number of elapsed PDP steps since last update */
124     unsigned long    *rra_step_cnt = NULL;
125                                          /* number of rows to be updated in an RRA for a data
126                                           * value. */
127     unsigned long    start_pdp_offset;
128                                          /* number of PDP steps since the last update that
129                                           * are assigned to the first CDP to be generated
130                                           * since the last update. */
131     unsigned short   scratch_idx;
132                                          /* index into the CDP scratch array */
133     enum cf_en       current_cf;
134                                          /* numeric id of the current consolidation function */
135     rpnstack_t       rpnstack; /* used for COMPUTE DS */
136
137     rpnstack_init(&rpnstack);
138
139     while (1) {
140         static struct option long_options[] =
141         {
142             {"template",      required_argument, 0, 't'},
143             {0,0,0,0}
144         };
145         int option_index = 0;
146         int opt;
147         opt = getopt_long(argc, argv, "t:", 
148                           long_options, &option_index);
149         
150         if (opt == EOF)
151           break;
152         
153         switch(opt) {
154         case 't':
155             template = optarg;
156             break;
157
158         case '?':
159             rrd_set_error("unknown option '%s'",argv[optind-1]);
160             rrd_free(&rrd);            
161             return(-1);
162         }
163     }
164
165     /* need at least 2 arguments: filename, data. */
166     if (argc-optind < 2) {
167         rrd_set_error("Not enough arguments");
168         return -1;
169     }
170
171     if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
172         return -1;
173     }
174     rra_current = rra_start = rra_begin = ftell(rrd_file);
175     /* This is defined in the ANSI C standard, section 7.9.5.3:
176
177         When a file is opened with udpate mode ('+' as the second
178         or third character in the ... list of mode argument
179         variables), both input and ouptut may be performed on the
180         associated stream.  However, ...  input may not be directly
181         followed by output without an intervening call to a file
182         positioning function, unless the input oepration encounters
183         end-of-file. */
184     fseek(rrd_file, 0, SEEK_CUR);
185
186     
187     /* get exclusive lock to whole file.
188      * lock gets removed when we close the file.
189      */
190     if (LockRRD(rrd_file) != 0) {
191       rrd_set_error("could not lock RRD");
192       rrd_free(&rrd);
193       fclose(rrd_file);
194       return(-1);   
195     } 
196
197     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
198         rrd_set_error("allocating updvals pointer array");
199         rrd_free(&rrd);
200         fclose(rrd_file);
201         return(-1);
202     }
203
204     if ((pdp_temp = malloc(sizeof(rrd_value_t)
205                            *rrd.stat_head->ds_cnt))==NULL){
206         rrd_set_error("allocating pdp_temp ...");
207         free(updvals);
208         rrd_free(&rrd);
209         fclose(rrd_file);
210         return(-1);
211     }
212
213     if ((tmpl_idx = malloc(sizeof(unsigned long)
214                            *(rrd.stat_head->ds_cnt+1)))==NULL){
215         rrd_set_error("allocating tmpl_idx ...");
216         free(pdp_temp);
217         free(updvals);
218         rrd_free(&rrd);
219         fclose(rrd_file);
220         return(-1);
221     }
222     /* initialize template redirector */
223     /* default config example (assume DS 1 is a CDEF DS)
224        tmpl_idx[0] -> 0; (time)
225        tmpl_idx[1] -> 1; (DS 0)
226        tmpl_idx[2] -> 3; (DS 2)
227        tmpl_idx[3] -> 4; (DS 3) */
228     tmpl_idx[0] = 0; /* time */
229     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
230         {
231            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
232               tmpl_idx[ii++]=i;
233         }
234     tmpl_cnt= ii;
235
236     if (template) {
237         char *dsname;
238         int tmpl_len;
239         dsname = template;
240         tmpl_cnt = 1; /* the first entry is the time */
241         tmpl_len = strlen(template);
242         for(i=0;i<=tmpl_len ;i++) {
243             if (template[i] == ':' || template[i] == '\0') {
244                 template[i] = '\0';
245                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
246                     rrd_set_error("Template contains more DS definitions than RRD");
247                     free(updvals); free(pdp_temp);
248                     free(tmpl_idx); rrd_free(&rrd);
249                     fclose(rrd_file); return(-1);
250                 }
251                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
252                     rrd_set_error("unknown DS name '%s'",dsname);
253                     free(updvals); free(pdp_temp);
254                     free(tmpl_idx); rrd_free(&rrd);
255                     fclose(rrd_file); return(-1);
256                 } else {
257                   /* the first element is always the time */
258                   tmpl_idx[tmpl_cnt-1]++; 
259                   /* go to the next entry on the template */
260                   dsname = &template[i+1];
261                   /* fix the damage we did before */
262                   if (i<tmpl_len) {
263                      template[i]=':';
264                   } 
265
266                 }
267             }       
268         }
269     }
270     if ((pdp_new = malloc(sizeof(rrd_value_t)
271                           *rrd.stat_head->ds_cnt))==NULL){
272         rrd_set_error("allocating pdp_new ...");
273         free(updvals);
274         free(pdp_temp);
275         free(tmpl_idx);
276         rrd_free(&rrd);
277         fclose(rrd_file);
278         return(-1);
279     }
280
281     /* loop through the arguments. */
282     for(arg_i=optind+1; arg_i<argc;arg_i++) {
283         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
284         char *step_start = stepper;
285         char *p;
286         char *parsetime_error = NULL;
287         enum {atstyle, normal} timesyntax;
288         struct time_value ds_tv;
289         if (stepper == NULL){
290                 rrd_set_error("failed duplication argv entry");
291                 free(updvals);
292                 free(pdp_temp);  
293                 free(tmpl_idx);
294                 rrd_free(&rrd);
295                 fclose(rrd_file);
296                 return(-1);
297          }
298         /* initialize all ds input to unknown except the first one
299            which has always got to be set */
300         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
301         strcpy(stepper,argv[arg_i]);
302         updvals[0]=stepper;
303         /* separate all ds elements; first must be examined separately
304            due to alternate time syntax */
305         if ((p=strchr(stepper,'@'))!=NULL) {
306             timesyntax = atstyle;
307             *p = '\0';
308             stepper = p+1;
309         } else if ((p=strchr(stepper,':'))!=NULL) {
310             timesyntax = normal;
311             *p = '\0';
312             stepper = p+1;
313         } else {
314             rrd_set_error("expected timestamp not found in data source from %s:...",
315                           argv[arg_i]);
316             free(step_start);
317             break;
318         }
319         ii=1;
320         updvals[tmpl_idx[ii]] = stepper;
321         while (*stepper) {
322             if (*stepper == ':') {
323                 *stepper = '\0';
324                 ii++;
325                 if (ii<tmpl_cnt){                   
326                     updvals[tmpl_idx[ii]] = stepper+1;
327                 }
328             }
329             stepper++;
330         }
331
332         if (ii != tmpl_cnt-1) {
333             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
334                           tmpl_cnt-1, ii, argv[arg_i]);
335             free(step_start);
336             break;
337         }
338         
339         /* get the time from the reading ... handle N */
340         if (timesyntax == atstyle) {
341             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
342                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
343                 free(step_start);
344                 break;
345             }
346             if (ds_tv.type == RELATIVE_TO_END_TIME ||
347                 ds_tv.type == RELATIVE_TO_START_TIME) {
348                 rrd_set_error("specifying time relative to the 'start' "
349                               "or 'end' makes no sense here: %s",
350                               updvals[0]);
351                 free(step_start);
352                 break;
353             }
354
355             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
356         } else if (strcmp(updvals[0],"N")==0){
357             current_time = time(NULL);
358         } else {
359             current_time = atol(updvals[0]);
360         }
361         
362         if(current_time <= rrd.live_head->last_up){
363             rrd_set_error("illegal attempt to update using time %ld when "
364                           "last update time is %ld (minimum one second step)",
365                           current_time, rrd.live_head->last_up);
366             free(step_start);
367             break;
368         }
369         
370         
371         /* seek to the beginning of the rra's */
372         if (rra_current != rra_begin) {
373             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
374                 rrd_set_error("seek error in rrd");
375                 free(step_start);
376                 break;
377             }
378             rra_current = rra_begin;
379         }
380         rra_start = rra_begin;
381
382         /* when was the current pdp started */
383         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
384         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
385
386         /* when did the last pdp_st occur */
387         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
388         occu_pdp_st = current_time - occu_pdp_age;
389         interval = current_time - rrd.live_head->last_up;
390     
391         if (occu_pdp_st > proc_pdp_st){
392             /* OK we passed the pdp_st moment*/
393             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
394                                                               * occurred before the latest
395                                                               * pdp_st moment*/
396             post_int = occu_pdp_age;                         /* how much after it */
397         } else {
398             pre_int = interval;
399             post_int = 0;
400         }
401
402 #ifdef DEBUG
403         printf(
404                "proc_pdp_age %lu\t"
405                "proc_pdp_st %lu\t" 
406                "occu_pfp_age %lu\t" 
407                "occu_pdp_st %lu\t"
408                "int %lu\t"
409                "pre_int %lu\t"
410                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
411                 occu_pdp_age, occu_pdp_st,
412                interval, pre_int, post_int);
413 #endif
414     
415         /* process the data sources and update the pdp_prep 
416          * area accordingly */
417         for(i=0;i<rrd.stat_head->ds_cnt;i++){
418             enum dst_en dst_idx;
419             dst_idx= dst_conv(rrd.ds_def[i].dst);
420                 /* NOTE: DST_CDEF should never enter this if block, because
421                  * updvals[i+1][0] is initialized to 'U'; unless the caller
422                  * accidently specified a value for the DST_CDEF. To handle 
423                  * this case, an extra check is required. */
424             if((updvals[i+1][0] != 'U') &&
425                    (dst_idx != DST_CDEF) &&
426                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
427                double rate = DNAN;
428                /* the data source type defines how to process the data */
429                 /* pdp_temp contains rate * time ... eg the bytes
430                  * transferred during the interval. Doing it this way saves
431                  * a lot of math operations */
432                 
433
434                 switch(dst_idx){
435                 case DST_COUNTER:
436                 case DST_DERIVE:
437                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
438                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
439                        if(dst_idx == DST_COUNTER) {
440                           /* simple overflow catcher sugestet by andres kroonmaa */
441                           /* this will fail terribly for non 32 or 64 bit counters ... */
442                           /* are there any others in SNMP land ? */
443                           if (pdp_new[i] < (double)0.0 ) 
444                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
445                           if (pdp_new[i] < (double)0.0 ) 
446                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
447                        }
448                        rate = pdp_new[i] / interval;
449                     }
450                    else {
451                      pdp_new[i]= DNAN;          
452                    }
453                    break;
454                 case DST_ABSOLUTE:
455                     pdp_new[i]= atof(updvals[i+1]);
456                     rate = pdp_new[i] / interval;                 
457                     break;
458                 case DST_GAUGE:
459                     pdp_new[i] = atof(updvals[i+1]) * interval;
460                     rate = pdp_new[i] / interval;                  
461                     break;
462                 default:
463                     rrd_set_error("rrd contains unknown DS type : '%s'",
464                                   rrd.ds_def[i].dst);
465                     break;
466                 }
467                 /* break out of this for loop if the error string is set */
468                 if (rrd_test_error()){
469                     break;
470                 }
471                /* make sure pdp_temp is neither too large or too small
472                 * if any of these occur it becomes unknown ...
473                 * sorry folks ... */
474                if ( ! isnan(rate) && 
475                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
476                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
477                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
478                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
479                   pdp_new[i] = DNAN;
480                }               
481             } else {
482                 /* no news is news all the same */
483                 pdp_new[i] = DNAN;
484             }
485             
486             /* make a copy of the command line argument for the next run */
487 #ifdef DEBUG
488             fprintf(stderr,
489                     "prep ds[%lu]\t"
490                     "last_arg '%s'\t"
491                     "this_arg '%s'\t"
492                     "pdp_new %10.2f\n",
493                     i,
494                     rrd.pdp_prep[i].last_ds,
495                     updvals[i+1], pdp_new[i]);
496 #endif
497             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
498                 strncpy(rrd.pdp_prep[i].last_ds,
499                         updvals[i+1],LAST_DS_LEN-1);
500                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
501             }
502         }
503         /* break out of the argument parsing loop if the error_string is set */
504         if (rrd_test_error()){
505             free(step_start);
506             break;
507         }
508         /* has a pdp_st moment occurred since the last run ? */
509
510         if (proc_pdp_st == occu_pdp_st){
511             /* no we have not passed a pdp_st moment. therefore update is simple */
512
513             for(i=0;i<rrd.stat_head->ds_cnt;i++){
514                 if(isnan(pdp_new[i]))
515                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
516                 else
517                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
518 #ifdef DEBUG
519                 fprintf(stderr,
520                         "NO PDP  ds[%lu]\t"
521                         "value %10.2f\t"
522                         "unkn_sec %5lu\n",
523                         i,
524                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
525                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
526 #endif
527             }   
528         } else {
529             /* an pdp_st has occurred. */
530
531             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
532              * occurred up to the last run.        
533             pdp_new[] contains rate*seconds from the latest run.
534             pdp_temp[] will contain the rate for cdp */
535
536             for(i=0;i<rrd.stat_head->ds_cnt;i++){
537                 /* update pdp_prep to the current pdp_st */
538                 if(isnan(pdp_new[i]))
539                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
540                 else
541                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
542                         pdp_new[i]/(double)interval*(double)pre_int;
543
544                 /* if too much of the pdp_prep is unknown we dump it */
545                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
546                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
547                     (occu_pdp_st-proc_pdp_st <= 
548                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
549                     pdp_temp[i] = DNAN;
550                 } else {
551                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
552                         / (double)( occu_pdp_st
553                                    - proc_pdp_st
554                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
555                 }
556
557                 /* process CDEF data sources; remember each CDEF DS can
558                  * only reference other DS with a lower index number */
559             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
560                    rpnp_t *rpnp;
561                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
562                    /* substitue data values for OP_VARIABLE nodes */
563                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
564                    {
565                           if (rpnp[ii].op == OP_VARIABLE) {
566                                  rpnp[ii].op = OP_NUMBER;
567                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
568                           }
569                    }
570                    /* run the rpn calculator */
571                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
572                           free(rpnp);
573                           break; /* exits the data sources pdp_temp loop */
574                    }
575                 }
576         
577                 /* make pdp_prep ready for the next run */
578                 if(isnan(pdp_new[i])){
579                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
580                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
581                 } else {
582                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
583                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
584                         pdp_new[i]/(double)interval*(double)post_int;
585                 }
586
587 #ifdef DEBUG
588                 fprintf(stderr,
589                         "PDP UPD ds[%lu]\t"
590                         "pdp_temp %10.2f\t"
591                         "new_prep %10.2f\t"
592                         "new_unkn_sec %5lu\n",
593                         i, pdp_temp[i],
594                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
595                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
596 #endif
597             }
598
599                 /* if there were errors during the last loop, bail out here */
600             if (rrd_test_error()){
601                free(step_start);
602                break;
603             }
604
605                 /* compute the number of elapsed pdp_st moments */
606                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
607 #ifdef DEBUG
608                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
609 #endif
610                 if (rra_step_cnt == NULL)
611                 {
612                    rra_step_cnt = (unsigned long *) 
613                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
614                 }
615
616             for(i = 0, rra_start = rra_begin;
617                 i < rrd.stat_head->rra_cnt;
618             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
619                 i++)
620                 {
621                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
622                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
623                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
624         if (start_pdp_offset <= elapsed_pdp_st) {
625            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
626                       rrd.rra_def[i].pdp_cnt + 1;
627             } else {
628                    rra_step_cnt[i] = 0;
629                 }
630
631                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
632                 {
633                    /* If this is a bulk update, we need to skip ahead in the seasonal
634                         * arrays so that they will be correct for the next observed value;
635                         * note that for the bulk update itself, no update will occur to
636                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
637                         * be set to DNAN. */
638            if (rra_step_cnt[i] > 2) 
639                    {
640                           /* skip update by resetting rra_step_cnt[i],
641                            * note that this is not data source specific; this is due
642                            * to the bulk update, not a DNAN value for the specific data
643                            * source. */
644                           rra_step_cnt[i] = 0;
645               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
646                              &last_seasonal_coef);
647                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
648                              &seasonal_coef);
649                    }
650                 
651                   /* periodically run a smoother for seasonal effects */
652                   /* Need to use first cdp parameter buffer to track
653                    * burnin (burnin requires a specific smoothing schedule).
654                    * The CDP_init_seasonal parameter is really an RRA level,
655                    * not a data source within RRA level parameter, but the rra_def
656                    * is read only for rrd_update (not flushed to disk). */
657                   iii = i*(rrd.stat_head -> ds_cnt);
658                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
659                           <= BURNIN_CYCLES)
660                   {
661                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
662                                  > rrd.rra_def[i].row_cnt - 1) {
663                            /* mark off one of the burnin cycles */
664                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
665                        schedule_smooth = 1;
666                          }  
667                   } else {
668                          /* someone has no doubt invented a trick to deal with this
669                           * wrap around, but at least this code is clear. */
670                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
671                              rrd.rra_ptr[i].cur_row)
672                          {
673                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
674                                   * mapping between PDP and CDP */
675                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
676                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
677                                  {
678 #ifdef DEBUG
679                                         fprintf(stderr,
680                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
681                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
682                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
683 #endif
684                                         schedule_smooth = 1;
685                                  }
686              } else {
687                                  /* can't rely on negative numbers because we are working with
688                                   * unsigned values */
689                                  /* Don't need modulus here. If we've wrapped more than once, only
690                                   * one smooth is executed at the end. */
691                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
692                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
693                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
694                                  {
695 #ifdef DEBUG
696                                         fprintf(stderr,
697                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
698                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
699                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
700 #endif
701                                         schedule_smooth = 1;
702                                  }
703                          }
704                   }
705
706               rra_current = ftell(rrd_file); 
707                 } /* if cf is DEVSEASONAL or SEASONAL */
708
709         if (rrd_test_error()) break;
710
711                     /* update CDP_PREP areas */
712                     /* loop over data soures within each RRA */
713                     for(ii = 0;
714                         ii < rrd.stat_head->ds_cnt;
715                         ii++)
716                         {
717                         
718                         /* iii indexes the CDP prep area for this data source within the RRA */
719                         iii=i*rrd.stat_head->ds_cnt+ii;
720
721                         if (rrd.rra_def[i].pdp_cnt > 1) {
722                           
723                            if (rra_step_cnt[i] > 0) {
724                            /* If we are in this block, as least 1 CDP value will be written to
725                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
726                                 * to be written, then the "fill in" value is the CDP_secondary_val
727                                 * entry. */
728                                   if (isnan(pdp_temp[ii]))
729                   {
730                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
731                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
732                                   } else {
733                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
734                                           * CDP data entries. No matter the CF, the value is the same because
735                                           * the average, max, min, and last of a list of identical values is
736                                           * the same, namely, the value itself. */
737                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
738                                   }
739                      
740                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
741                                       > rrd.rra_def[i].pdp_cnt*
742                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
743                                   {
744                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
745                                          /* initialize carry over */
746                                          if (current_cf == CF_AVERAGE) {
747                                                    if (isnan(pdp_temp[ii])) { 
748                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
749                                                    } else {
750                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
751                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
752                                                    }
753                                          } else {
754                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
755                                          }
756                                   } else {
757                                          rrd_value_t cum_val, cur_val; 
758                                      switch (current_cf) {
759                                                 case CF_AVERAGE:
760                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
761                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
762                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
763                                                (cum_val + cur_val) /
764                                            (rrd.rra_def[i].pdp_cnt
765                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
766                                                    /* initialize carry over value */
767                                                    if (isnan(pdp_temp[ii])) { 
768                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
769                                                    } else {
770                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
771                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
772                                                    }
773                                                    break;
774                                                 case CF_MAXIMUM:
775                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
776                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
777 #ifdef DEBUG
778                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
779                                                           isnan(pdp_temp[ii])) {
780                                                      fprintf(stderr,
781                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
782                                                                 i,ii);
783                                                          exit(-1);
784                                                   }
785 #endif
786                                                   if (cur_val > cum_val)
787                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
788                                                   else
789                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
790                                                   /* initialize carry over value */
791                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
792                                                   break;
793                                                 case CF_MINIMUM:
794                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
795                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
796 #ifdef DEBUG
797                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
798                                                           isnan(pdp_temp[ii])) {
799                                                      fprintf(stderr,
800                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
801                                                                 i,ii);
802                                                          exit(-1);
803                                                   }
804 #endif
805                                                   if (cur_val < cum_val)
806                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
807                                                   else
808                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
809                                                   /* initialize carry over value */
810                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
811                                                   break;
812                                                 case CF_LAST:
813                                                 default:
814                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
815                                                    /* initialize carry over value */
816                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
817                                                 break;
818                                          }
819                                   } /* endif meets xff value requirement for a valid value */
820                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
821                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
822                                   if (isnan(pdp_temp[ii]))
823                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
824                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
825                                   else
826                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
827                } else  /* rra_step_cnt[i]  == 0 */
828                            {
829 #ifdef DEBUG
830                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
831                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
832                                          i,ii);
833                                   } else {
834                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
835                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
836                                   }
837 #endif
838                                   if (isnan(pdp_temp[ii])) {
839                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
840                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
841                                   {
842                                          if (current_cf == CF_AVERAGE) {
843                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
844                                                    elapsed_pdp_st;
845                                          } else {
846                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
847                                          }
848 #ifdef DEBUG
849                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
850                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
851 #endif
852                                   } else {
853                                          switch (current_cf) {
854                                          case CF_AVERAGE:
855                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
856                                                    elapsed_pdp_st;
857                                                 break;
858                                          case CF_MINIMUM:
859                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
860                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
861                                                 break; 
862                                          case CF_MAXIMUM:
863                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
864                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
865                                                 break; 
866                                          case CF_LAST:
867                                          default:
868                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
869                                                 break;
870                                          }
871                                   }
872                            }
873                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
874                            if (elapsed_pdp_st > 2)
875                            {
876                                    switch (current_cf) {
877                                    case CF_AVERAGE:
878                                    default:
879                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
880                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
881                                           break;
882                    case CF_SEASONAL:
883                                    case CF_DEVSEASONAL:
884                                           /* need to update cached seasonal values, so they are consistent
885                                            * with the bulk update */
886                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
887                                            * CDP_last_deviation are the same. */
888                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
889                                                  last_seasonal_coef[ii];
890                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
891                                                  seasonal_coef[ii];
892                                           break;
893                    case CF_HWPREDICT:
894                                           /* need to update the null_count and last_null_count.
895                                            * even do this for non-DNAN pdp_temp because the
896                                            * algorithm is not learning from batch updates. */
897                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
898                                                  elapsed_pdp_st;
899                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
900                                                  elapsed_pdp_st - 1;
901                                           /* fall through */
902                                    case CF_DEVPREDICT:
903                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
904                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
905                                           break;
906                    case CF_FAILURES:
907                                           /* do not count missed bulk values as failures */
908                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
909                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
910                                           /* need to reset violations buffer.
911                                            * could do this more carefully, but for now, just
912                                            * assume a bulk update wipes away all violations. */
913                       erase_violations(&rrd, iii, i);
914                                           break;
915                                    }
916                            } 
917                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
918
919                         if (rrd_test_error()) break;
920
921                         } /* endif data sources loop */
922         } /* end RRA Loop */
923
924                 /* this loop is only entered if elapsed_pdp_st < 3 */
925                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
926                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
927                 {
928                for(i = 0, rra_start = rra_begin;
929                    i < rrd.stat_head->rra_cnt;
930                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
931                    i++)
932                    {
933                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
934
935                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
936                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
937                           {
938                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
939                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
940                                 &seasonal_coef);
941                  rra_current = ftell(rrd_file);
942                           }
943                           if (rrd_test_error()) break;
944                       /* loop over data soures within each RRA */
945                       for(ii = 0;
946                           ii < rrd.stat_head->ds_cnt;
947                           ii++)
948                           {
949                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
950                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
951                                     scratch_idx, seasonal_coef);
952                           }
953            } /* end RRA Loop */
954                    if (rrd_test_error()) break;
955             } /* end elapsed_pdp_st loop */
956
957                 if (rrd_test_error()) break;
958
959                 /* Ready to write to disk */
960                 /* Move sequentially through the file, writing one RRA at a time.
961                  * Note this architecture divorces the computation of CDP with
962                  * flushing updated RRA entries to disk. */
963             for(i = 0, rra_start = rra_begin;
964                 i < rrd.stat_head->rra_cnt;
965             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
966                 i++) {
967                 /* is there anything to write for this RRA? If not, continue. */
968         if (rra_step_cnt[i] == 0) continue;
969
970                 /* write the first row */
971 #ifdef DEBUG
972         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
973 #endif
974             rrd.rra_ptr[i].cur_row++;
975             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
976                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
977                 /* positition on the first row */
978                 rra_pos_tmp = rra_start +
979                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
980                 if(rra_pos_tmp != rra_current) {
981                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
982                       rrd_set_error("seek error in rrd");
983                       break;
984                    }
985                    rra_current = rra_pos_tmp;
986                 }
987
988 #ifdef DEBUG
989             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
990 #endif
991                 scratch_idx = CDP_primary_val;
992                 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
993                 if (rrd_test_error()) break;
994
995                 /* write other rows of the bulk update, if any */
996                 scratch_idx = CDP_secondary_val;
997                 for ( ; rra_step_cnt[i] > 1; 
998                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
999                 {
1000                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1001                    {
1002 #ifdef DEBUG
1003               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1004                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1005 #endif
1006                           /* wrap */
1007                           rrd.rra_ptr[i].cur_row = 0;
1008                           /* seek back to beginning of current rra */
1009                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1010                           {
1011                          rrd_set_error("seek error in rrd");
1012                          break;
1013                           }
1014 #ifdef DEBUG
1015                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1016 #endif
1017                           rra_current = rra_start;
1018                    }
1019                    write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1020                 }
1021                 
1022                 if (rrd_test_error())
1023                   break;
1024                 } /* RRA LOOP */
1025
1026             /* break out of the argument parsing loop if error_string is set */
1027             if (rrd_test_error()){
1028                    free(step_start);
1029                    break;
1030             } 
1031             
1032         } /* endif a pdp_st has occurred */ 
1033         rrd.live_head->last_up = current_time;
1034         free(step_start);
1035     } /* function argument loop */
1036
1037     if (seasonal_coef != NULL) free(seasonal_coef);
1038     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1039         if (rra_step_cnt != NULL) free(rra_step_cnt);
1040     rpnstack_free(&rpnstack);
1041
1042     /* if we got here and if there is an error and if the file has not been
1043      * written to, then close things up and return. */
1044     if (rrd_test_error()) {
1045         free(updvals);
1046         free(tmpl_idx);
1047         rrd_free(&rrd);
1048         free(pdp_temp);
1049         free(pdp_new);
1050         fclose(rrd_file);
1051         return(-1);
1052     }
1053
1054     /* aargh ... that was tough ... so many loops ... anyway, its done.
1055      * we just need to write back the live header portion now*/
1056
1057     if (fseek(rrd_file, (sizeof(stat_head_t)
1058                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1059                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1060               SEEK_SET) != 0) {
1061         rrd_set_error("seek rrd for live header writeback");
1062         free(updvals);
1063         free(tmpl_idx);
1064         rrd_free(&rrd);
1065         free(pdp_temp);
1066         free(pdp_new);
1067         fclose(rrd_file);
1068         return(-1);
1069     }
1070
1071     if(fwrite( rrd.live_head,
1072                sizeof(live_head_t), 1, rrd_file) != 1){
1073         rrd_set_error("fwrite live_head to rrd");
1074         free(updvals);
1075         rrd_free(&rrd);
1076         free(tmpl_idx);
1077         free(pdp_temp);
1078         free(pdp_new);
1079         fclose(rrd_file);
1080         return(-1);
1081     }
1082
1083     if(fwrite( rrd.pdp_prep,
1084                sizeof(pdp_prep_t),
1085                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1086         rrd_set_error("ftwrite pdp_prep to rrd");
1087         free(updvals);
1088         rrd_free(&rrd);
1089         free(tmpl_idx);
1090         free(pdp_temp);
1091         free(pdp_new);
1092         fclose(rrd_file);
1093         return(-1);
1094     }
1095
1096     if(fwrite( rrd.cdp_prep,
1097                sizeof(cdp_prep_t),
1098                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1099        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1100
1101         rrd_set_error("ftwrite cdp_prep to rrd");
1102         free(updvals);
1103         free(tmpl_idx);
1104         rrd_free(&rrd);
1105         free(pdp_temp);
1106         free(pdp_new);
1107         fclose(rrd_file);
1108         return(-1);
1109     }
1110
1111     if(fwrite( rrd.rra_ptr,
1112                sizeof(rra_ptr_t), 
1113                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1114         rrd_set_error("fwrite rra_ptr to rrd");
1115         free(updvals);
1116         free(tmpl_idx);
1117         rrd_free(&rrd);
1118         free(pdp_temp);
1119         free(pdp_new);
1120         fclose(rrd_file);
1121         return(-1);
1122     }
1123
1124     /* OK now close the files and free the memory */
1125     if(fclose(rrd_file) != 0){
1126         rrd_set_error("closing rrd");
1127         free(updvals);
1128         free(tmpl_idx);
1129         rrd_free(&rrd);
1130         free(pdp_temp);
1131         free(pdp_new);
1132         return(-1);
1133     }
1134
1135     /* calling the smoothing code here guarantees at most
1136          * one smoothing operation per rrd_update call. Unfortunately,
1137          * it is possible with bulk updates, or a long-delayed update
1138          * for smoothing to occur off-schedule. This really isn't
1139          * critical except during the burning cycles. */
1140         if (schedule_smooth)
1141         {
1142 #ifndef WIN32
1143           rrd_file = fopen(argv[optind],"r+");
1144 #else
1145           rrd_file = fopen(argv[optind],"rb+");
1146 #endif
1147           rra_start = rra_begin;
1148           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1149           {
1150             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1151                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1152             {
1153 #ifdef DEBUG
1154               fprintf(stderr,"Running smoother for rra %ld\n",i);
1155 #endif
1156               apply_smoother(&rrd,i,rra_start,rrd_file);
1157               if (rrd_test_error())
1158                 break;
1159             }
1160             rra_start += rrd.rra_def[i].row_cnt
1161               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1162           }
1163           fclose(rrd_file);
1164         }
1165     rrd_free(&rrd);
1166     free(updvals);
1167     free(tmpl_idx);
1168     free(pdp_new);
1169     free(pdp_temp);
1170     return(0);
1171 }
1172
1173 /*
1174  * get exclusive lock to whole file.
1175  * lock gets removed when we close the file
1176  *
1177  * returns 0 on success
1178  */
1179 int
1180 LockRRD(FILE *rrdfile)
1181 {
1182     int rrd_fd;         /* File descriptor for RRD */
1183     int                 stat;
1184
1185     rrd_fd = fileno(rrdfile);
1186
1187         {
1188 #ifndef WIN32    
1189                 struct flock    lock;
1190     lock.l_type = F_WRLCK;    /* exclusive write lock */
1191     lock.l_len = 0;           /* whole file */
1192     lock.l_start = 0;         /* start of file */
1193     lock.l_whence = SEEK_SET;   /* end of file */
1194
1195     stat = fcntl(rrd_fd, F_SETLK, &lock);
1196 #else
1197                 struct _stat st;
1198
1199                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1200                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1201                 } else {
1202                         stat = -1;
1203                 }
1204 #endif
1205         }
1206
1207     return(stat);
1208 }
1209
1210
1211 void
1212 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1213    unsigned short CDP_scratch_idx, FILE *rrd_file)
1214 {
1215    unsigned long ds_idx, cdp_idx;
1216
1217    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1218    {
1219       /* compute the cdp index */
1220       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1221 #ifdef DEBUG
1222           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1223              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1224              rrd -> rra_def[rra_idx].cf_nam);
1225 #endif 
1226
1227           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1228                  sizeof(rrd_value_t),1,rrd_file) != 1)
1229           { 
1230              rrd_set_error("writing rrd");
1231                  return;
1232           }
1233           *rra_current += sizeof(rrd_value_t);
1234         }
1235 }