Announce of VDEF in rrd_graph
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.0.33  Copyright Tobias Oetiker, 1997 - 2000
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.5  2001/05/09 05:31:01  oetiker
9  * Bug fix: when update of multiple PDP/CDP RRAs coincided
10  * with interpolation of multiple PDPs an incorrect value was
11  * stored as the CDP. Especially evident for GAUGE data sources.
12  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
13  *
14  * Revision 1.4  2001/03/10 23:54:41  oetiker
15  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
16  * parser and calculator from rrd_graph and puts then in a new file,
17  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
18  * clean-up of aberrant behavior stuff, including a bug fix.
19  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
20  * -- Jake Brutlag <jakeb@corp.webtv.net>
21  *
22  * Revision 1.3  2001/03/04 13:01:55  oetiker
23  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
24  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
25  * This is backwards compatible! But new files using the Aberrant stuff are not readable
26  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
27  * -- Jake Brutlag <jakeb@corp.webtv.net>
28  *
29  * Revision 1.2  2001/03/04 11:14:25  oetiker
30  * added at-style-time@value:value syntax to rrd_update
31  * --  Dave Bodenstab <imdave@mcs.net>
32  *
33  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
34  * checkin
35  *
36  *****************************************************************************/
37
38 #include "rrd_tool.h"
39 #include <sys/types.h>
40 #include <fcntl.h>
41 #include "rrd_hw.h"
42 #include "rrd_rpncalc.h"
43
44 #ifdef WIN32
45  #include <sys/locking.h>
46  #include <sys/stat.h>
47  #include <io.h>
48 #endif
49
50 /* Local prototypes */
51 int LockRRD(FILE *rrd_file);
52 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
53     unsigned short CDP_scratch_idx, FILE *rrd_file);
54  
55 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
56
57
58 #ifdef STANDALONE
59 int 
60 main(int argc, char **argv){
61         rrd_update(argc,argv);
62         if (rrd_test_error()) {
63                 printf("RRDtool 1.0.33  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
64                         "Usage: rrdupdate filename\n"
65                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
66                         "\t\t\ttime|N:value[:value...]\n\n"
67                         "\t\t\tat-time@value[:value...]\n\n"
68                         "\t\t\t[ time:value[:value...] ..]\n\n");
69                                    
70                 printf("ERROR: %s\n",rrd_get_error());
71                 rrd_clear_error();                                                            
72                 return 1;
73         }
74         return 0;
75 }
76 #endif
77
78 int
79 rrd_update(int argc, char **argv)
80 {
81
82     int              arg_i = 2;
83     short            j;
84     long             i,ii,iii=1;
85
86     unsigned long    rra_begin;          /* byte pointer to the rra
87                                           * area in the rrd file.  this
88                                           * pointer never changes value */
89     unsigned long    rra_start;          /* byte pointer to the rra
90                                           * area in the rrd file.  this
91                                           * pointer changes as each rrd is
92                                           * processed. */
93     unsigned long    rra_current;        /* byte pointer to the current write
94                                           * spot in the rrd file. */
95     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
96     unsigned long    interval,
97         pre_int,post_int;                /* interval between this and
98                                           * the last run */
99     unsigned long    proc_pdp_st;        /* which pdp_st was the last
100                                           * to be processed */
101     unsigned long    occu_pdp_st;        /* when was the pdp_st
102                                           * before the last update
103                                           * time */
104     unsigned long    proc_pdp_age;       /* how old was the data in
105                                           * the pdp prep area when it
106                                           * was last updated */
107     unsigned long    occu_pdp_age;       /* how long ago was the last
108                                           * pdp_step time */
109     rrd_value_t      *pdp_new;           /* prepare the incoming data
110                                           * to be added the the
111                                           * existing entry */
112     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
113                                           * to be added the the
114                                           * cdp values */
115
116     long             *tmpl_idx;          /* index representing the settings
117                                             transported by the template index */
118     long             tmpl_cnt = 2;       /* time and data */
119
120     FILE             *rrd_file;
121     rrd_t            rrd;
122     time_t           current_time = time(NULL);
123     char             **updvals;
124     int              schedule_smooth = 0;
125     char             *template = NULL;   
126         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
127                                          /* a vector of future Holt-Winters seasonal coefs */
128     unsigned long    elapsed_pdp_st;
129                                          /* number of elapsed PDP steps since last update */
130     unsigned long    *rra_step_cnt = NULL;
131                                          /* number of rows to be updated in an RRA for a data
132                                           * value. */
133     unsigned long    start_pdp_offset;
134                                          /* number of PDP steps since the last update that
135                                           * are assigned to the first CDP to be generated
136                                           * since the last update. */
137     unsigned short   scratch_idx;
138                                          /* index into the CDP scratch array */
139     enum cf_en       current_cf;
140                                          /* numeric id of the current consolidation function */
141     rpnstack_t       rpnstack; /* used for COMPUTE DS */
142
143     rpnstack_init(&rpnstack);
144
145     while (1) {
146         static struct option long_options[] =
147         {
148             {"template",      required_argument, 0, 't'},
149             {0,0,0,0}
150         };
151         int option_index = 0;
152         int opt;
153         opt = getopt_long(argc, argv, "t:", 
154                           long_options, &option_index);
155         
156         if (opt == EOF)
157           break;
158         
159         switch(opt) {
160         case 't':
161             template = optarg;
162             break;
163
164         case '?':
165             rrd_set_error("unknown option '%s'",argv[optind-1]);
166             rrd_free(&rrd);            
167             return(-1);
168         }
169     }
170
171     /* need at least 2 arguments: filename, data. */
172     if (argc-optind < 2) {
173         rrd_set_error("Not enough arguments");
174         return -1;
175     }
176
177     if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
178         return -1;
179     }
180     rra_current = rra_start = rra_begin = ftell(rrd_file);
181     /* This is defined in the ANSI C standard, section 7.9.5.3:
182
183         When a file is opened with udpate mode ('+' as the second
184         or third character in the ... list of mode argument
185         variables), both input and ouptut may be performed on the
186         associated stream.  However, ...  input may not be directly
187         followed by output without an intervening call to a file
188         positioning function, unless the input oepration encounters
189         end-of-file. */
190     fseek(rrd_file, 0, SEEK_CUR);
191
192     
193     /* get exclusive lock to whole file.
194      * lock gets removed when we close the file.
195      */
196     if (LockRRD(rrd_file) != 0) {
197       rrd_set_error("could not lock RRD");
198       rrd_free(&rrd);
199       fclose(rrd_file);
200       return(-1);   
201     } 
202
203     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
204         rrd_set_error("allocating updvals pointer array");
205         rrd_free(&rrd);
206         fclose(rrd_file);
207         return(-1);
208     }
209
210     if ((pdp_temp = malloc(sizeof(rrd_value_t)
211                            *rrd.stat_head->ds_cnt))==NULL){
212         rrd_set_error("allocating pdp_temp ...");
213         free(updvals);
214         rrd_free(&rrd);
215         fclose(rrd_file);
216         return(-1);
217     }
218
219     if ((tmpl_idx = malloc(sizeof(unsigned long)
220                            *(rrd.stat_head->ds_cnt+1)))==NULL){
221         rrd_set_error("allocating tmpl_idx ...");
222         free(pdp_temp);
223         free(updvals);
224         rrd_free(&rrd);
225         fclose(rrd_file);
226         return(-1);
227     }
228     /* initialize template redirector */
229     /* default config example (assume DS 1 is a CDEF DS)
230        tmpl_idx[0] -> 0; (time)
231        tmpl_idx[1] -> 1; (DS 0)
232        tmpl_idx[2] -> 3; (DS 2)
233        tmpl_idx[3] -> 4; (DS 3) */
234     tmpl_idx[0] = 0; /* time */
235     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
236         {
237            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
238               tmpl_idx[ii++]=i;
239         }
240     tmpl_cnt= ii;
241
242     if (template) {
243         char *dsname;
244         int tmpl_len;
245         dsname = template;
246         tmpl_cnt = 1; /* the first entry is the time */
247         tmpl_len = strlen(template);
248         for(i=0;i<=tmpl_len ;i++) {
249             if (template[i] == ':' || template[i] == '\0') {
250                 template[i] = '\0';
251                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
252                     rrd_set_error("Template contains more DS definitions than RRD");
253                     free(updvals); free(pdp_temp);
254                     free(tmpl_idx); rrd_free(&rrd);
255                     fclose(rrd_file); return(-1);
256                 }
257                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
258                     rrd_set_error("unknown DS name '%s'",dsname);
259                     free(updvals); free(pdp_temp);
260                     free(tmpl_idx); rrd_free(&rrd);
261                     fclose(rrd_file); return(-1);
262                 } else {
263                   /* the first element is always the time */
264                   tmpl_idx[tmpl_cnt-1]++; 
265                   /* go to the next entry on the template */
266                   dsname = &template[i+1];
267                   /* fix the damage we did before */
268                   if (i<tmpl_len) {
269                      template[i]=':';
270                   } 
271
272                 }
273             }       
274         }
275     }
276     if ((pdp_new = malloc(sizeof(rrd_value_t)
277                           *rrd.stat_head->ds_cnt))==NULL){
278         rrd_set_error("allocating pdp_new ...");
279         free(updvals);
280         free(pdp_temp);
281         free(tmpl_idx);
282         rrd_free(&rrd);
283         fclose(rrd_file);
284         return(-1);
285     }
286
287     /* loop through the arguments. */
288     for(arg_i=optind+1; arg_i<argc;arg_i++) {
289         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
290         char *step_start = stepper;
291         char *p;
292         char *parsetime_error = NULL;
293         enum {atstyle, normal} timesyntax;
294         struct time_value ds_tv;
295         if (stepper == NULL){
296                 rrd_set_error("failed duplication argv entry");
297                 free(updvals);
298                 free(pdp_temp);  
299                 free(tmpl_idx);
300                 rrd_free(&rrd);
301                 fclose(rrd_file);
302                 return(-1);
303          }
304         /* initialize all ds input to unknown except the first one
305            which has always got to be set */
306         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
307         strcpy(stepper,argv[arg_i]);
308         updvals[0]=stepper;
309         /* separate all ds elements; first must be examined separately
310            due to alternate time syntax */
311         if ((p=strchr(stepper,'@'))!=NULL) {
312             timesyntax = atstyle;
313             *p = '\0';
314             stepper = p+1;
315         } else if ((p=strchr(stepper,':'))!=NULL) {
316             timesyntax = normal;
317             *p = '\0';
318             stepper = p+1;
319         } else {
320             rrd_set_error("expected timestamp not found in data source from %s:...",
321                           argv[arg_i]);
322             free(step_start);
323             break;
324         }
325         ii=1;
326         updvals[tmpl_idx[ii]] = stepper;
327         while (*stepper) {
328             if (*stepper == ':') {
329                 *stepper = '\0';
330                 ii++;
331                 if (ii<tmpl_cnt){                   
332                     updvals[tmpl_idx[ii]] = stepper+1;
333                 }
334             }
335             stepper++;
336         }
337
338         if (ii != tmpl_cnt-1) {
339             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
340                           tmpl_cnt-1, ii, argv[arg_i]);
341             free(step_start);
342             break;
343         }
344         
345         /* get the time from the reading ... handle N */
346         if (timesyntax == atstyle) {
347             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
348                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
349                 free(step_start);
350                 break;
351             }
352             if (ds_tv.type == RELATIVE_TO_END_TIME ||
353                 ds_tv.type == RELATIVE_TO_START_TIME) {
354                 rrd_set_error("specifying time relative to the 'start' "
355                               "or 'end' makes no sense here: %s",
356                               updvals[0]);
357                 free(step_start);
358                 break;
359             }
360
361             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
362         } else if (strcmp(updvals[0],"N")==0){
363             current_time = time(NULL);
364         } else {
365             current_time = atol(updvals[0]);
366         }
367         
368         if(current_time <= rrd.live_head->last_up){
369             rrd_set_error("illegal attempt to update using time %ld when "
370                           "last update time is %ld (minimum one second step)",
371                           current_time, rrd.live_head->last_up);
372             free(step_start);
373             break;
374         }
375         
376         
377         /* seek to the beginning of the rra's */
378         if (rra_current != rra_begin) {
379             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
380                 rrd_set_error("seek error in rrd");
381                 free(step_start);
382                 break;
383             }
384             rra_current = rra_begin;
385         }
386         rra_start = rra_begin;
387
388         /* when was the current pdp started */
389         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
390         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
391
392         /* when did the last pdp_st occur */
393         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
394         occu_pdp_st = current_time - occu_pdp_age;
395         interval = current_time - rrd.live_head->last_up;
396     
397         if (occu_pdp_st > proc_pdp_st){
398             /* OK we passed the pdp_st moment*/
399             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
400                                                               * occurred before the latest
401                                                               * pdp_st moment*/
402             post_int = occu_pdp_age;                         /* how much after it */
403         } else {
404             pre_int = interval;
405             post_int = 0;
406         }
407
408 #ifdef DEBUG
409         printf(
410                "proc_pdp_age %lu\t"
411                "proc_pdp_st %lu\t" 
412                "occu_pfp_age %lu\t" 
413                "occu_pdp_st %lu\t"
414                "int %lu\t"
415                "pre_int %lu\t"
416                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
417                 occu_pdp_age, occu_pdp_st,
418                interval, pre_int, post_int);
419 #endif
420     
421         /* process the data sources and update the pdp_prep 
422          * area accordingly */
423         for(i=0;i<rrd.stat_head->ds_cnt;i++){
424             enum dst_en dst_idx;
425             dst_idx= dst_conv(rrd.ds_def[i].dst);
426                 /* NOTE: DST_CDEF should never enter this if block, because
427                  * updvals[i+1][0] is initialized to 'U'; unless the caller
428                  * accidently specified a value for the DST_CDEF. To handle 
429                  * this case, an extra check is required. */
430             if((updvals[i+1][0] != 'U') &&
431                    (dst_idx != DST_CDEF) &&
432                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
433                double rate = DNAN;
434                /* the data source type defines how to process the data */
435                 /* pdp_new contains rate * time ... eg the bytes
436                  * transferred during the interval. Doing it this way saves
437                  * a lot of math operations */
438                 
439
440                 switch(dst_idx){
441                 case DST_COUNTER:
442                 case DST_DERIVE:
443                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
444                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
445                        if(dst_idx == DST_COUNTER) {
446                           /* simple overflow catcher sugestet by andres kroonmaa */
447                           /* this will fail terribly for non 32 or 64 bit counters ... */
448                           /* are there any others in SNMP land ? */
449                           if (pdp_new[i] < (double)0.0 ) 
450                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
451                           if (pdp_new[i] < (double)0.0 ) 
452                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
453                        }
454                        rate = pdp_new[i] / interval;
455                     }
456                    else {
457                      pdp_new[i]= DNAN;          
458                    }
459                    break;
460                 case DST_ABSOLUTE:
461                     pdp_new[i]= atof(updvals[i+1]);
462                     rate = pdp_new[i] / interval;                 
463                     break;
464                 case DST_GAUGE:
465                     pdp_new[i] = atof(updvals[i+1]) * interval;
466                     rate = pdp_new[i] / interval;                  
467                     break;
468                 default:
469                     rrd_set_error("rrd contains unknown DS type : '%s'",
470                                   rrd.ds_def[i].dst);
471                     break;
472                 }
473                 /* break out of this for loop if the error string is set */
474                 if (rrd_test_error()){
475                     break;
476                 }
477                /* make sure pdp_temp is neither too large or too small
478                 * if any of these occur it becomes unknown ...
479                 * sorry folks ... */
480                if ( ! isnan(rate) && 
481                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
482                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
483                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
484                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
485                   pdp_new[i] = DNAN;
486                }               
487             } else {
488                 /* no news is news all the same */
489                 pdp_new[i] = DNAN;
490             }
491             
492             /* make a copy of the command line argument for the next run */
493 #ifdef DEBUG
494             fprintf(stderr,
495                     "prep ds[%lu]\t"
496                     "last_arg '%s'\t"
497                     "this_arg '%s'\t"
498                     "pdp_new %10.2f\n",
499                     i,
500                     rrd.pdp_prep[i].last_ds,
501                     updvals[i+1], pdp_new[i]);
502 #endif
503             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
504                 strncpy(rrd.pdp_prep[i].last_ds,
505                         updvals[i+1],LAST_DS_LEN-1);
506                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
507             }
508         }
509         /* break out of the argument parsing loop if the error_string is set */
510         if (rrd_test_error()){
511             free(step_start);
512             break;
513         }
514         /* has a pdp_st moment occurred since the last run ? */
515
516         if (proc_pdp_st == occu_pdp_st){
517             /* no we have not passed a pdp_st moment. therefore update is simple */
518
519             for(i=0;i<rrd.stat_head->ds_cnt;i++){
520                 if(isnan(pdp_new[i]))
521                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
522                 else
523                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
524 #ifdef DEBUG
525                 fprintf(stderr,
526                         "NO PDP  ds[%lu]\t"
527                         "value %10.2f\t"
528                         "unkn_sec %5lu\n",
529                         i,
530                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
531                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
532 #endif
533             }   
534         } else {
535             /* an pdp_st has occurred. */
536
537             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
538              * occurred up to the last run.        
539             pdp_new[] contains rate*seconds from the latest run.
540             pdp_temp[] will contain the rate for cdp */
541
542             for(i=0;i<rrd.stat_head->ds_cnt;i++){
543                 /* update pdp_prep to the current pdp_st */
544                 if(isnan(pdp_new[i]))
545                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
546                 else
547                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
548                         pdp_new[i]/(double)interval*(double)pre_int;
549
550                 /* if too much of the pdp_prep is unknown we dump it */
551                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
552                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
553                     (occu_pdp_st-proc_pdp_st <= 
554                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
555                     pdp_temp[i] = DNAN;
556                 } else {
557                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
558                         / (double)( occu_pdp_st
559                                    - proc_pdp_st
560                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
561                 }
562
563                 /* process CDEF data sources; remember each CDEF DS can
564                  * only reference other DS with a lower index number */
565             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
566                    rpnp_t *rpnp;
567                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
568                    /* substitue data values for OP_VARIABLE nodes */
569                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
570                    {
571                           if (rpnp[ii].op == OP_VARIABLE) {
572                                  rpnp[ii].op = OP_NUMBER;
573                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
574                           }
575                    }
576                    /* run the rpn calculator */
577                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
578                           free(rpnp);
579                           break; /* exits the data sources pdp_temp loop */
580                    }
581                 }
582         
583                 /* make pdp_prep ready for the next run */
584                 if(isnan(pdp_new[i])){
585                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
586                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
587                 } else {
588                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
589                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
590                         pdp_new[i]/(double)interval*(double)post_int;
591                 }
592
593 #ifdef DEBUG
594                 fprintf(stderr,
595                         "PDP UPD ds[%lu]\t"
596                         "pdp_temp %10.2f\t"
597                         "new_prep %10.2f\t"
598                         "new_unkn_sec %5lu\n",
599                         i, pdp_temp[i],
600                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
601                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
602 #endif
603             }
604
605                 /* if there were errors during the last loop, bail out here */
606             if (rrd_test_error()){
607                free(step_start);
608                break;
609             }
610
611                 /* compute the number of elapsed pdp_st moments */
612                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
613 #ifdef DEBUG
614                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
615 #endif
616                 if (rra_step_cnt == NULL)
617                 {
618                    rra_step_cnt = (unsigned long *) 
619                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
620                 }
621
622             for(i = 0, rra_start = rra_begin;
623                 i < rrd.stat_head->rra_cnt;
624             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
625                 i++)
626                 {
627                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
628                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
629                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
630         if (start_pdp_offset <= elapsed_pdp_st) {
631            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
632                       rrd.rra_def[i].pdp_cnt + 1;
633             } else {
634                    rra_step_cnt[i] = 0;
635                 }
636
637                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
638                 {
639                    /* If this is a bulk update, we need to skip ahead in the seasonal
640                         * arrays so that they will be correct for the next observed value;
641                         * note that for the bulk update itself, no update will occur to
642                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
643                         * be set to DNAN. */
644            if (rra_step_cnt[i] > 2) 
645                    {
646                           /* skip update by resetting rra_step_cnt[i],
647                            * note that this is not data source specific; this is due
648                            * to the bulk update, not a DNAN value for the specific data
649                            * source. */
650                           rra_step_cnt[i] = 0;
651               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
652                              &last_seasonal_coef);
653                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
654                              &seasonal_coef);
655                    }
656                 
657                   /* periodically run a smoother for seasonal effects */
658                   /* Need to use first cdp parameter buffer to track
659                    * burnin (burnin requires a specific smoothing schedule).
660                    * The CDP_init_seasonal parameter is really an RRA level,
661                    * not a data source within RRA level parameter, but the rra_def
662                    * is read only for rrd_update (not flushed to disk). */
663                   iii = i*(rrd.stat_head -> ds_cnt);
664                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
665                           <= BURNIN_CYCLES)
666                   {
667                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
668                                  > rrd.rra_def[i].row_cnt - 1) {
669                            /* mark off one of the burnin cycles */
670                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
671                        schedule_smooth = 1;
672                          }  
673                   } else {
674                          /* someone has no doubt invented a trick to deal with this
675                           * wrap around, but at least this code is clear. */
676                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
677                              rrd.rra_ptr[i].cur_row)
678                          {
679                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
680                                   * mapping between PDP and CDP */
681                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
682                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
683                                  {
684 #ifdef DEBUG
685                                         fprintf(stderr,
686                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
687                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
688                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
689 #endif
690                                         schedule_smooth = 1;
691                                  }
692              } else {
693                                  /* can't rely on negative numbers because we are working with
694                                   * unsigned values */
695                                  /* Don't need modulus here. If we've wrapped more than once, only
696                                   * one smooth is executed at the end. */
697                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
698                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
699                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
700                                  {
701 #ifdef DEBUG
702                                         fprintf(stderr,
703                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
704                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
705                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
706 #endif
707                                         schedule_smooth = 1;
708                                  }
709                          }
710                   }
711
712               rra_current = ftell(rrd_file); 
713                 } /* if cf is DEVSEASONAL or SEASONAL */
714
715         if (rrd_test_error()) break;
716
717                     /* update CDP_PREP areas */
718                     /* loop over data soures within each RRA */
719                     for(ii = 0;
720                         ii < rrd.stat_head->ds_cnt;
721                         ii++)
722                         {
723                         
724                         /* iii indexes the CDP prep area for this data source within the RRA */
725                         iii=i*rrd.stat_head->ds_cnt+ii;
726
727                         if (rrd.rra_def[i].pdp_cnt > 1) {
728                           
729                            if (rra_step_cnt[i] > 0) {
730                            /* If we are in this block, as least 1 CDP value will be written to
731                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
732                                 * to be written, then the "fill in" value is the CDP_secondary_val
733                                 * entry. */
734                                   if (isnan(pdp_temp[ii]))
735                   {
736                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
737                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
738                                   } else {
739                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
740                                           * CDP data entries. No matter the CF, the value is the same because
741                                           * the average, max, min, and last of a list of identical values is
742                                           * the same, namely, the value itself. */
743                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
744                                   }
745                      
746                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
747                                       > rrd.rra_def[i].pdp_cnt*
748                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
749                                   {
750                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
751                                          /* initialize carry over */
752                                          if (current_cf == CF_AVERAGE) {
753                                                    if (isnan(pdp_temp[ii])) { 
754                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
755                                                    } else {
756                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
757                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
758                                                    }
759                                          } else {
760                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
761                                          }
762                                   } else {
763                                          rrd_value_t cum_val, cur_val; 
764                                      switch (current_cf) {
765                                                 case CF_AVERAGE:
766                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
767                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
768                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
769                                                (cum_val + cur_val * start_pdp_offset) /
770                                            (rrd.rra_def[i].pdp_cnt
771                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
772                                                    /* initialize carry over value */
773                                                    if (isnan(pdp_temp[ii])) { 
774                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
775                                                    } else {
776                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
777                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
778                                                    }
779                                                    break;
780                                                 case CF_MAXIMUM:
781                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
782                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
783 #ifdef DEBUG
784                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
785                                                           isnan(pdp_temp[ii])) {
786                                                      fprintf(stderr,
787                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
788                                                                 i,ii);
789                                                          exit(-1);
790                                                   }
791 #endif
792                                                   if (cur_val > cum_val)
793                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
794                                                   else
795                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
796                                                   /* initialize carry over value */
797                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
798                                                   break;
799                                                 case CF_MINIMUM:
800                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
801                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
802 #ifdef DEBUG
803                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
804                                                           isnan(pdp_temp[ii])) {
805                                                      fprintf(stderr,
806                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
807                                                                 i,ii);
808                                                          exit(-1);
809                                                   }
810 #endif
811                                                   if (cur_val < cum_val)
812                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
813                                                   else
814                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
815                                                   /* initialize carry over value */
816                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
817                                                   break;
818                                                 case CF_LAST:
819                                                 default:
820                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
821                                                    /* initialize carry over value */
822                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
823                                                 break;
824                                          }
825                                   } /* endif meets xff value requirement for a valid value */
826                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
827                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
828                                   if (isnan(pdp_temp[ii]))
829                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
830                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
831                                   else
832                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
833                } else  /* rra_step_cnt[i]  == 0 */
834                            {
835 #ifdef DEBUG
836                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
837                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
838                                          i,ii);
839                                   } else {
840                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
841                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
842                                   }
843 #endif
844                                   if (isnan(pdp_temp[ii])) {
845                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
846                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
847                                   {
848                                          if (current_cf == CF_AVERAGE) {
849                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
850                                                    elapsed_pdp_st;
851                                          } else {
852                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
853                                          }
854 #ifdef DEBUG
855                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
856                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
857 #endif
858                                   } else {
859                                          switch (current_cf) {
860                                          case CF_AVERAGE:
861                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
862                                                    elapsed_pdp_st;
863                                                 break;
864                                          case CF_MINIMUM:
865                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
866                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
867                                                 break; 
868                                          case CF_MAXIMUM:
869                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
870                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
871                                                 break; 
872                                          case CF_LAST:
873                                          default:
874                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
875                                                 break;
876                                          }
877                                   }
878                            }
879                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
880                            if (elapsed_pdp_st > 2)
881                            {
882                                    switch (current_cf) {
883                                    case CF_AVERAGE:
884                                    default:
885                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
886                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
887                                           break;
888                    case CF_SEASONAL:
889                                    case CF_DEVSEASONAL:
890                                           /* need to update cached seasonal values, so they are consistent
891                                            * with the bulk update */
892                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
893                                            * CDP_last_deviation are the same. */
894                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
895                                                  last_seasonal_coef[ii];
896                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
897                                                  seasonal_coef[ii];
898                                           break;
899                    case CF_HWPREDICT:
900                                           /* need to update the null_count and last_null_count.
901                                            * even do this for non-DNAN pdp_temp because the
902                                            * algorithm is not learning from batch updates. */
903                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
904                                                  elapsed_pdp_st;
905                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
906                                                  elapsed_pdp_st - 1;
907                                           /* fall through */
908                                    case CF_DEVPREDICT:
909                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
910                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
911                                           break;
912                    case CF_FAILURES:
913                                           /* do not count missed bulk values as failures */
914                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
915                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
916                                           /* need to reset violations buffer.
917                                            * could do this more carefully, but for now, just
918                                            * assume a bulk update wipes away all violations. */
919                       erase_violations(&rrd, iii, i);
920                                           break;
921                                    }
922                            } 
923                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
924
925                         if (rrd_test_error()) break;
926
927                         } /* endif data sources loop */
928         } /* end RRA Loop */
929
930                 /* this loop is only entered if elapsed_pdp_st < 3 */
931                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
932                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
933                 {
934                for(i = 0, rra_start = rra_begin;
935                    i < rrd.stat_head->rra_cnt;
936                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
937                    i++)
938                    {
939                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
940
941                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
942                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
943                           {
944                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
945                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
946                                 &seasonal_coef);
947                  rra_current = ftell(rrd_file);
948                           }
949                           if (rrd_test_error()) break;
950                       /* loop over data soures within each RRA */
951                       for(ii = 0;
952                           ii < rrd.stat_head->ds_cnt;
953                           ii++)
954                           {
955                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
956                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
957                                     scratch_idx, seasonal_coef);
958                           }
959            } /* end RRA Loop */
960                    if (rrd_test_error()) break;
961             } /* end elapsed_pdp_st loop */
962
963                 if (rrd_test_error()) break;
964
965                 /* Ready to write to disk */
966                 /* Move sequentially through the file, writing one RRA at a time.
967                  * Note this architecture divorces the computation of CDP with
968                  * flushing updated RRA entries to disk. */
969             for(i = 0, rra_start = rra_begin;
970                 i < rrd.stat_head->rra_cnt;
971             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
972                 i++) {
973                 /* is there anything to write for this RRA? If not, continue. */
974         if (rra_step_cnt[i] == 0) continue;
975
976                 /* write the first row */
977 #ifdef DEBUG
978         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
979 #endif
980             rrd.rra_ptr[i].cur_row++;
981             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
982                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
983                 /* positition on the first row */
984                 rra_pos_tmp = rra_start +
985                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
986                 if(rra_pos_tmp != rra_current) {
987                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
988                       rrd_set_error("seek error in rrd");
989                       break;
990                    }
991                    rra_current = rra_pos_tmp;
992                 }
993
994 #ifdef DEBUG
995             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
996 #endif
997                 scratch_idx = CDP_primary_val;
998                 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
999                 if (rrd_test_error()) break;
1000
1001                 /* write other rows of the bulk update, if any */
1002                 scratch_idx = CDP_secondary_val;
1003                 for ( ; rra_step_cnt[i] > 1; 
1004                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1005                 {
1006                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1007                    {
1008 #ifdef DEBUG
1009               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1010                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1011 #endif
1012                           /* wrap */
1013                           rrd.rra_ptr[i].cur_row = 0;
1014                           /* seek back to beginning of current rra */
1015                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1016                           {
1017                          rrd_set_error("seek error in rrd");
1018                          break;
1019                           }
1020 #ifdef DEBUG
1021                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1022 #endif
1023                           rra_current = rra_start;
1024                    }
1025                    write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1026                 }
1027                 
1028                 if (rrd_test_error())
1029                   break;
1030                 } /* RRA LOOP */
1031
1032             /* break out of the argument parsing loop if error_string is set */
1033             if (rrd_test_error()){
1034                    free(step_start);
1035                    break;
1036             } 
1037             
1038         } /* endif a pdp_st has occurred */ 
1039         rrd.live_head->last_up = current_time;
1040         free(step_start);
1041     } /* function argument loop */
1042
1043     if (seasonal_coef != NULL) free(seasonal_coef);
1044     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1045         if (rra_step_cnt != NULL) free(rra_step_cnt);
1046     rpnstack_free(&rpnstack);
1047
1048     /* if we got here and if there is an error and if the file has not been
1049      * written to, then close things up and return. */
1050     if (rrd_test_error()) {
1051         free(updvals);
1052         free(tmpl_idx);
1053         rrd_free(&rrd);
1054         free(pdp_temp);
1055         free(pdp_new);
1056         fclose(rrd_file);
1057         return(-1);
1058     }
1059
1060     /* aargh ... that was tough ... so many loops ... anyway, its done.
1061      * we just need to write back the live header portion now*/
1062
1063     if (fseek(rrd_file, (sizeof(stat_head_t)
1064                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1065                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1066               SEEK_SET) != 0) {
1067         rrd_set_error("seek rrd for live header writeback");
1068         free(updvals);
1069         free(tmpl_idx);
1070         rrd_free(&rrd);
1071         free(pdp_temp);
1072         free(pdp_new);
1073         fclose(rrd_file);
1074         return(-1);
1075     }
1076
1077     if(fwrite( rrd.live_head,
1078                sizeof(live_head_t), 1, rrd_file) != 1){
1079         rrd_set_error("fwrite live_head to rrd");
1080         free(updvals);
1081         rrd_free(&rrd);
1082         free(tmpl_idx);
1083         free(pdp_temp);
1084         free(pdp_new);
1085         fclose(rrd_file);
1086         return(-1);
1087     }
1088
1089     if(fwrite( rrd.pdp_prep,
1090                sizeof(pdp_prep_t),
1091                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1092         rrd_set_error("ftwrite pdp_prep to rrd");
1093         free(updvals);
1094         rrd_free(&rrd);
1095         free(tmpl_idx);
1096         free(pdp_temp);
1097         free(pdp_new);
1098         fclose(rrd_file);
1099         return(-1);
1100     }
1101
1102     if(fwrite( rrd.cdp_prep,
1103                sizeof(cdp_prep_t),
1104                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1105        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1106
1107         rrd_set_error("ftwrite cdp_prep to rrd");
1108         free(updvals);
1109         free(tmpl_idx);
1110         rrd_free(&rrd);
1111         free(pdp_temp);
1112         free(pdp_new);
1113         fclose(rrd_file);
1114         return(-1);
1115     }
1116
1117     if(fwrite( rrd.rra_ptr,
1118                sizeof(rra_ptr_t), 
1119                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1120         rrd_set_error("fwrite rra_ptr to rrd");
1121         free(updvals);
1122         free(tmpl_idx);
1123         rrd_free(&rrd);
1124         free(pdp_temp);
1125         free(pdp_new);
1126         fclose(rrd_file);
1127         return(-1);
1128     }
1129
1130     /* OK now close the files and free the memory */
1131     if(fclose(rrd_file) != 0){
1132         rrd_set_error("closing rrd");
1133         free(updvals);
1134         free(tmpl_idx);
1135         rrd_free(&rrd);
1136         free(pdp_temp);
1137         free(pdp_new);
1138         return(-1);
1139     }
1140
1141     /* calling the smoothing code here guarantees at most
1142          * one smoothing operation per rrd_update call. Unfortunately,
1143          * it is possible with bulk updates, or a long-delayed update
1144          * for smoothing to occur off-schedule. This really isn't
1145          * critical except during the burning cycles. */
1146         if (schedule_smooth)
1147         {
1148 #ifndef WIN32
1149           rrd_file = fopen(argv[optind],"r+");
1150 #else
1151           rrd_file = fopen(argv[optind],"rb+");
1152 #endif
1153           rra_start = rra_begin;
1154           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1155           {
1156             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1157                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1158             {
1159 #ifdef DEBUG
1160               fprintf(stderr,"Running smoother for rra %ld\n",i);
1161 #endif
1162               apply_smoother(&rrd,i,rra_start,rrd_file);
1163               if (rrd_test_error())
1164                 break;
1165             }
1166             rra_start += rrd.rra_def[i].row_cnt
1167               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1168           }
1169           fclose(rrd_file);
1170         }
1171     rrd_free(&rrd);
1172     free(updvals);
1173     free(tmpl_idx);
1174     free(pdp_new);
1175     free(pdp_temp);
1176     return(0);
1177 }
1178
1179 /*
1180  * get exclusive lock to whole file.
1181  * lock gets removed when we close the file
1182  *
1183  * returns 0 on success
1184  */
1185 int
1186 LockRRD(FILE *rrdfile)
1187 {
1188     int rrd_fd;         /* File descriptor for RRD */
1189     int                 stat;
1190
1191     rrd_fd = fileno(rrdfile);
1192
1193         {
1194 #ifndef WIN32    
1195                 struct flock    lock;
1196     lock.l_type = F_WRLCK;    /* exclusive write lock */
1197     lock.l_len = 0;           /* whole file */
1198     lock.l_start = 0;         /* start of file */
1199     lock.l_whence = SEEK_SET;   /* end of file */
1200
1201     stat = fcntl(rrd_fd, F_SETLK, &lock);
1202 #else
1203                 struct _stat st;
1204
1205                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1206                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1207                 } else {
1208                         stat = -1;
1209                 }
1210 #endif
1211         }
1212
1213     return(stat);
1214 }
1215
1216
1217 void
1218 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1219    unsigned short CDP_scratch_idx, FILE *rrd_file)
1220 {
1221    unsigned long ds_idx, cdp_idx;
1222
1223    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1224    {
1225       /* compute the cdp index */
1226       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1227 #ifdef DEBUG
1228           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1229              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1230              rrd -> rra_def[rra_idx].cf_nam);
1231 #endif 
1232
1233           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1234                  sizeof(rrd_value_t),1,rrd_file) != 1)
1235           { 
1236              rrd_set_error("writing rrd");
1237                  return;
1238           }
1239           *rra_current += sizeof(rrd_value_t);
1240         }
1241 }