strerror should not turn us recursive here ... -- Peter Stamfest <peter@stamfest.at>
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2002
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.7  2003/02/13 07:05:27  oetiker
9  * Find attached the patch I promised to send to you. Please note that there
10  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
11  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
12  * library is identical to librrd, but it contains support code for per-thread
13  * global variables currently used for error information only. This is similar
14  * to how errno per-thread variables are implemented.  librrd_th must be linked
15  * alongside of libpthred
16  *
17  * There is also a new file "THREADS", holding some documentation.
18  *
19  * -- Peter Stamfest <peter@stamfest.at>
20  *
21  * Revision 1.6  2002/02/01 20:34:49  oetiker
22  * fixed version number and date/time
23  *
24  * Revision 1.5  2001/05/09 05:31:01  oetiker
25  * Bug fix: when update of multiple PDP/CDP RRAs coincided
26  * with interpolation of multiple PDPs an incorrect value was
27  * stored as the CDP. Especially evident for GAUGE data sources.
28  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
29  *
30  * Revision 1.4  2001/03/10 23:54:41  oetiker
31  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
32  * parser and calculator from rrd_graph and puts then in a new file,
33  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
34  * clean-up of aberrant behavior stuff, including a bug fix.
35  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
36  * -- Jake Brutlag <jakeb@corp.webtv.net>
37  *
38  * Revision 1.3  2001/03/04 13:01:55  oetiker
39  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
40  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
41  * This is backwards compatible! But new files using the Aberrant stuff are not readable
42  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
43  * -- Jake Brutlag <jakeb@corp.webtv.net>
44  *
45  * Revision 1.2  2001/03/04 11:14:25  oetiker
46  * added at-style-time@value:value syntax to rrd_update
47  * --  Dave Bodenstab <imdave@mcs.net>
48  *
49  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
50  * checkin
51  *
52  *****************************************************************************/
53
54 #include "rrd_tool.h"
55 #include <sys/types.h>
56 #include <fcntl.h>
57
58 #ifdef WIN32
59  #include <sys/locking.h>
60  #include <sys/stat.h>
61  #include <io.h>
62 #endif
63
64 #include "rrd_hw.h"
65 #include "rrd_rpncalc.h"
66
67 #include "rrd_is_thread_safe.h"
68
69 /* Local prototypes */
70 int LockRRD(FILE *rrd_file);
71 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
72                                         unsigned long *rra_current,
73                                         unsigned short CDP_scratch_idx, FILE *rrd_file);
74 int rrd_update_r(char *filename, char *template, int argc, char **argv);
75
76 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
77
78
79 #ifdef STANDALONE
80 int 
81 main(int argc, char **argv){
82         rrd_update(argc,argv);
83         if (rrd_test_error()) {
84                 printf("RRDtool 1.1.x  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
85                         "Usage: rrdupdate filename\n"
86                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
87                         "\t\t\ttime|N:value[:value...]\n\n"
88                         "\t\t\tat-time@value[:value...]\n\n"
89                         "\t\t\t[ time:value[:value...] ..]\n\n");
90                                    
91                 printf("ERROR: %s\n",rrd_get_error());
92                 rrd_clear_error();                                                            
93                 return 1;
94         }
95         return 0;
96 }
97 #endif
98
99 int
100 rrd_update(int argc, char **argv)
101 {
102     char             *template = NULL;          
103     int rc;
104
105     while (1) {
106                 static struct option long_options[] =
107                         {
108                                 {"template",      required_argument, 0, 't'},
109                                 {0,0,0,0}
110                         };
111                 int option_index = 0;
112                 int opt;
113                 opt = getopt_long(argc, argv, "t:", 
114                                                   long_options, &option_index);
115                 
116                 if (opt == EOF)
117                         break;
118                 
119                 switch(opt) {
120                 case 't':
121                         template = optarg;
122                         break;
123                         
124                 case '?':
125                         rrd_set_error("unknown option '%s'",argv[optind-1]);
126                         /*            rrd_free(&rrd); */
127                         return(-1);
128                 }
129     }
130
131     /* need at least 2 arguments: filename, data. */
132     if (argc-optind < 2) {
133                 rrd_set_error("Not enough arguments");
134
135                 return -1;
136     }
137
138     rc = rrd_update_r(argv[optind], template,
139                       argc - optind - 1, argv + optind + 1);
140     return rc;
141 }
142
143 int
144 rrd_update_r(char *filename, char *template, int argc, char **argv)
145 {
146
147     int              arg_i = 2;
148     short            j;
149     unsigned long    i,ii,iii=1;
150
151     unsigned long    rra_begin;          /* byte pointer to the rra
152                                           * area in the rrd file.  this
153                                           * pointer never changes value */
154     unsigned long    rra_start;          /* byte pointer to the rra
155                                           * area in the rrd file.  this
156                                           * pointer changes as each rrd is
157                                           * processed. */
158     unsigned long    rra_current;        /* byte pointer to the current write
159                                           * spot in the rrd file. */
160     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
161     unsigned long    interval,
162         pre_int,post_int;                /* interval between this and
163                                           * the last run */
164     unsigned long    proc_pdp_st;        /* which pdp_st was the last
165                                           * to be processed */
166     unsigned long    occu_pdp_st;        /* when was the pdp_st
167                                           * before the last update
168                                           * time */
169     unsigned long    proc_pdp_age;       /* how old was the data in
170                                           * the pdp prep area when it
171                                           * was last updated */
172     unsigned long    occu_pdp_age;       /* how long ago was the last
173                                           * pdp_step time */
174     rrd_value_t      *pdp_new;           /* prepare the incoming data
175                                           * to be added the the
176                                           * existing entry */
177     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
178                                           * to be added the the
179                                           * cdp values */
180
181     long             *tmpl_idx;          /* index representing the settings
182                                             transported by the template index */
183     unsigned long    tmpl_cnt = 2;       /* time and data */
184
185     FILE             *rrd_file;
186     rrd_t            rrd;
187     time_t           current_time = time(NULL);
188     char             **updvals;
189     int              schedule_smooth = 0;
190         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
191                                          /* a vector of future Holt-Winters seasonal coefs */
192     unsigned long    elapsed_pdp_st;
193                                          /* number of elapsed PDP steps since last update */
194     unsigned long    *rra_step_cnt = NULL;
195                                          /* number of rows to be updated in an RRA for a data
196                                           * value. */
197     unsigned long    start_pdp_offset;
198                                          /* number of PDP steps since the last update that
199                                           * are assigned to the first CDP to be generated
200                                           * since the last update. */
201     unsigned short   scratch_idx;
202                                          /* index into the CDP scratch array */
203     enum cf_en       current_cf;
204                                          /* numeric id of the current consolidation function */
205     rpnstack_t       rpnstack; /* used for COMPUTE DS */
206
207     rpnstack_init(&rpnstack);
208
209     /* need at least 1 arguments: data. */
210     if (argc < 1) {
211         rrd_set_error("Not enough arguments");
212         return -1;
213     }
214
215     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
216         return -1;
217     }
218     rra_current = rra_start = rra_begin = ftell(rrd_file);
219     /* This is defined in the ANSI C standard, section 7.9.5.3:
220
221         When a file is opened with udpate mode ('+' as the second
222         or third character in the ... list of mode argument
223         variables), both input and ouptut may be performed on the
224         associated stream.  However, ...  input may not be directly
225         followed by output without an intervening call to a file
226         positioning function, unless the input oepration encounters
227         end-of-file. */
228     fseek(rrd_file, 0, SEEK_CUR);
229
230     
231     /* get exclusive lock to whole file.
232      * lock gets removed when we close the file.
233      */
234     if (LockRRD(rrd_file) != 0) {
235       rrd_set_error("could not lock RRD");
236       rrd_free(&rrd);
237       fclose(rrd_file);
238       return(-1);   
239     } 
240
241     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
242         rrd_set_error("allocating updvals pointer array");
243         rrd_free(&rrd);
244         fclose(rrd_file);
245         return(-1);
246     }
247
248     if ((pdp_temp = malloc(sizeof(rrd_value_t)
249                            *rrd.stat_head->ds_cnt))==NULL){
250         rrd_set_error("allocating pdp_temp ...");
251         free(updvals);
252         rrd_free(&rrd);
253         fclose(rrd_file);
254         return(-1);
255     }
256
257     if ((tmpl_idx = malloc(sizeof(unsigned long)
258                            *(rrd.stat_head->ds_cnt+1)))==NULL){
259         rrd_set_error("allocating tmpl_idx ...");
260         free(pdp_temp);
261         free(updvals);
262         rrd_free(&rrd);
263         fclose(rrd_file);
264         return(-1);
265     }
266     /* initialize template redirector */
267     /* default config example (assume DS 1 is a CDEF DS)
268        tmpl_idx[0] -> 0; (time)
269        tmpl_idx[1] -> 1; (DS 0)
270        tmpl_idx[2] -> 3; (DS 2)
271        tmpl_idx[3] -> 4; (DS 3) */
272     tmpl_idx[0] = 0; /* time */
273     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
274         {
275            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
276               tmpl_idx[ii++]=i;
277         }
278     tmpl_cnt= ii;
279
280     if (template) {
281         char *dsname;
282         unsigned int tmpl_len;
283         dsname = template;
284         tmpl_cnt = 1; /* the first entry is the time */
285         tmpl_len = strlen(template);
286         for(i=0;i<=tmpl_len ;i++) {
287             if (template[i] == ':' || template[i] == '\0') {
288                 template[i] = '\0';
289                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
290                     rrd_set_error("Template contains more DS definitions than RRD");
291                     free(updvals); free(pdp_temp);
292                     free(tmpl_idx); rrd_free(&rrd);
293                     fclose(rrd_file); return(-1);
294                 }
295                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
296                     rrd_set_error("unknown DS name '%s'",dsname);
297                     free(updvals); free(pdp_temp);
298                     free(tmpl_idx); rrd_free(&rrd);
299                     fclose(rrd_file); return(-1);
300                 } else {
301                   /* the first element is always the time */
302                   tmpl_idx[tmpl_cnt-1]++; 
303                   /* go to the next entry on the template */
304                   dsname = &template[i+1];
305                   /* fix the damage we did before */
306                   if (i<tmpl_len) {
307                      template[i]=':';
308                   } 
309
310                 }
311             }       
312         }
313     }
314     if ((pdp_new = malloc(sizeof(rrd_value_t)
315                           *rrd.stat_head->ds_cnt))==NULL){
316         rrd_set_error("allocating pdp_new ...");
317         free(updvals);
318         free(pdp_temp);
319         free(tmpl_idx);
320         rrd_free(&rrd);
321         fclose(rrd_file);
322         return(-1);
323     }
324
325     /* loop through the arguments. */
326     for(arg_i=0; arg_i<argc;arg_i++) {
327         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
328         char *step_start = stepper;
329         char *p;
330         char *parsetime_error = NULL;
331         enum {atstyle, normal} timesyntax;
332         struct time_value ds_tv;
333         if (stepper == NULL){
334                 rrd_set_error("failed duplication argv entry");
335                 free(updvals);
336                 free(pdp_temp);  
337                 free(tmpl_idx);
338                 rrd_free(&rrd);
339                 fclose(rrd_file);
340                 return(-1);
341          }
342         /* initialize all ds input to unknown except the first one
343            which has always got to be set */
344         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
345         strcpy(stepper,argv[arg_i]);
346         updvals[0]=stepper;
347         /* separate all ds elements; first must be examined separately
348            due to alternate time syntax */
349         if ((p=strchr(stepper,'@'))!=NULL) {
350             timesyntax = atstyle;
351             *p = '\0';
352             stepper = p+1;
353         } else if ((p=strchr(stepper,':'))!=NULL) {
354             timesyntax = normal;
355             *p = '\0';
356             stepper = p+1;
357         } else {
358             rrd_set_error("expected timestamp not found in data source from %s:...",
359                           argv[arg_i]);
360             free(step_start);
361             break;
362         }
363         ii=1;
364         updvals[tmpl_idx[ii]] = stepper;
365         while (*stepper) {
366             if (*stepper == ':') {
367                 *stepper = '\0';
368                 ii++;
369                 if (ii<tmpl_cnt){                   
370                     updvals[tmpl_idx[ii]] = stepper+1;
371                 }
372             }
373             stepper++;
374         }
375
376         if (ii != tmpl_cnt-1) {
377             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
378                           tmpl_cnt-1, ii, argv[arg_i]);
379             free(step_start);
380             break;
381         }
382         
383         /* get the time from the reading ... handle N */
384         if (timesyntax == atstyle) {
385             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
386                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
387                 free(step_start);
388                 break;
389             }
390             if (ds_tv.type == RELATIVE_TO_END_TIME ||
391                 ds_tv.type == RELATIVE_TO_START_TIME) {
392                 rrd_set_error("specifying time relative to the 'start' "
393                               "or 'end' makes no sense here: %s",
394                               updvals[0]);
395                 free(step_start);
396                 break;
397             }
398
399             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
400         } else if (strcmp(updvals[0],"N")==0){
401             current_time = time(NULL);
402         } else {
403             current_time = atol(updvals[0]);
404         }
405         
406         if(current_time <= rrd.live_head->last_up){
407             rrd_set_error("illegal attempt to update using time %ld when "
408                           "last update time is %ld (minimum one second step)",
409                           current_time, rrd.live_head->last_up);
410             free(step_start);
411             break;
412         }
413         
414         
415         /* seek to the beginning of the rra's */
416         if (rra_current != rra_begin) {
417             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
418                 rrd_set_error("seek error in rrd");
419                 free(step_start);
420                 break;
421             }
422             rra_current = rra_begin;
423         }
424         rra_start = rra_begin;
425
426         /* when was the current pdp started */
427         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
428         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
429
430         /* when did the last pdp_st occur */
431         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
432         occu_pdp_st = current_time - occu_pdp_age;
433         interval = current_time - rrd.live_head->last_up;
434     
435         if (occu_pdp_st > proc_pdp_st){
436             /* OK we passed the pdp_st moment*/
437             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
438                                                               * occurred before the latest
439                                                               * pdp_st moment*/
440             post_int = occu_pdp_age;                         /* how much after it */
441         } else {
442             pre_int = interval;
443             post_int = 0;
444         }
445
446 #ifdef DEBUG
447         printf(
448                "proc_pdp_age %lu\t"
449                "proc_pdp_st %lu\t" 
450                "occu_pfp_age %lu\t" 
451                "occu_pdp_st %lu\t"
452                "int %lu\t"
453                "pre_int %lu\t"
454                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
455                 occu_pdp_age, occu_pdp_st,
456                interval, pre_int, post_int);
457 #endif
458     
459         /* process the data sources and update the pdp_prep 
460          * area accordingly */
461         for(i=0;i<rrd.stat_head->ds_cnt;i++){
462             enum dst_en dst_idx;
463             dst_idx= dst_conv(rrd.ds_def[i].dst);
464                 /* NOTE: DST_CDEF should never enter this if block, because
465                  * updvals[i+1][0] is initialized to 'U'; unless the caller
466                  * accidently specified a value for the DST_CDEF. To handle 
467                  * this case, an extra check is required. */
468             if((updvals[i+1][0] != 'U') &&
469                    (dst_idx != DST_CDEF) &&
470                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
471                double rate = DNAN;
472                /* the data source type defines how to process the data */
473                 /* pdp_new contains rate * time ... eg the bytes
474                  * transferred during the interval. Doing it this way saves
475                  * a lot of math operations */
476                 
477
478                 switch(dst_idx){
479                 case DST_COUNTER:
480                 case DST_DERIVE:
481                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
482                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
483                        if(dst_idx == DST_COUNTER) {
484                           /* simple overflow catcher sugestet by andres kroonmaa */
485                           /* this will fail terribly for non 32 or 64 bit counters ... */
486                           /* are there any others in SNMP land ? */
487                           if (pdp_new[i] < (double)0.0 ) 
488                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
489                           if (pdp_new[i] < (double)0.0 ) 
490                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
491                        }
492                        rate = pdp_new[i] / interval;
493                     }
494                    else {
495                      pdp_new[i]= DNAN;          
496                    }
497                    break;
498                 case DST_ABSOLUTE:
499                     pdp_new[i]= atof(updvals[i+1]);
500                     rate = pdp_new[i] / interval;                 
501                     break;
502                 case DST_GAUGE:
503                     pdp_new[i] = atof(updvals[i+1]) * interval;
504                     rate = pdp_new[i] / interval;                  
505                     break;
506                 default:
507                     rrd_set_error("rrd contains unknown DS type : '%s'",
508                                   rrd.ds_def[i].dst);
509                     break;
510                 }
511                 /* break out of this for loop if the error string is set */
512                 if (rrd_test_error()){
513                     break;
514                 }
515                /* make sure pdp_temp is neither too large or too small
516                 * if any of these occur it becomes unknown ...
517                 * sorry folks ... */
518                if ( ! isnan(rate) && 
519                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
520                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
521                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
522                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
523                   pdp_new[i] = DNAN;
524                }               
525             } else {
526                 /* no news is news all the same */
527                 pdp_new[i] = DNAN;
528             }
529             
530             /* make a copy of the command line argument for the next run */
531 #ifdef DEBUG
532             fprintf(stderr,
533                     "prep ds[%lu]\t"
534                     "last_arg '%s'\t"
535                     "this_arg '%s'\t"
536                     "pdp_new %10.2f\n",
537                     i,
538                     rrd.pdp_prep[i].last_ds,
539                     updvals[i+1], pdp_new[i]);
540 #endif
541             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
542                 strncpy(rrd.pdp_prep[i].last_ds,
543                         updvals[i+1],LAST_DS_LEN-1);
544                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
545             }
546         }
547         /* break out of the argument parsing loop if the error_string is set */
548         if (rrd_test_error()){
549             free(step_start);
550             break;
551         }
552         /* has a pdp_st moment occurred since the last run ? */
553
554         if (proc_pdp_st == occu_pdp_st){
555             /* no we have not passed a pdp_st moment. therefore update is simple */
556
557             for(i=0;i<rrd.stat_head->ds_cnt;i++){
558                 if(isnan(pdp_new[i]))
559                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
560                 else
561                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
562 #ifdef DEBUG
563                 fprintf(stderr,
564                         "NO PDP  ds[%lu]\t"
565                         "value %10.2f\t"
566                         "unkn_sec %5lu\n",
567                         i,
568                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
569                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
570 #endif
571             }   
572         } else {
573             /* an pdp_st has occurred. */
574
575             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
576              * occurred up to the last run.        
577             pdp_new[] contains rate*seconds from the latest run.
578             pdp_temp[] will contain the rate for cdp */
579
580             for(i=0;i<rrd.stat_head->ds_cnt;i++){
581                 /* update pdp_prep to the current pdp_st */
582                 if(isnan(pdp_new[i]))
583                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
584                 else
585                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
586                         pdp_new[i]/(double)interval*(double)pre_int;
587
588                 /* if too much of the pdp_prep is unknown we dump it */
589                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
590                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
591                     (occu_pdp_st-proc_pdp_st <= 
592                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
593                     pdp_temp[i] = DNAN;
594                 } else {
595                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
596                         / (double)( occu_pdp_st
597                                    - proc_pdp_st
598                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
599                 }
600
601                 /* process CDEF data sources; remember each CDEF DS can
602                  * only reference other DS with a lower index number */
603             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
604                    rpnp_t *rpnp;
605                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
606                    /* substitue data values for OP_VARIABLE nodes */
607                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
608                    {
609                           if (rpnp[ii].op == OP_VARIABLE) {
610                                  rpnp[ii].op = OP_NUMBER;
611                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
612                           }
613                    }
614                    /* run the rpn calculator */
615                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
616                           free(rpnp);
617                           break; /* exits the data sources pdp_temp loop */
618                    }
619                 }
620         
621                 /* make pdp_prep ready for the next run */
622                 if(isnan(pdp_new[i])){
623                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
624                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
625                 } else {
626                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
627                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
628                         pdp_new[i]/(double)interval*(double)post_int;
629                 }
630
631 #ifdef DEBUG
632                 fprintf(stderr,
633                         "PDP UPD ds[%lu]\t"
634                         "pdp_temp %10.2f\t"
635                         "new_prep %10.2f\t"
636                         "new_unkn_sec %5lu\n",
637                         i, pdp_temp[i],
638                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
639                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
640 #endif
641             }
642
643                 /* if there were errors during the last loop, bail out here */
644             if (rrd_test_error()){
645                free(step_start);
646                break;
647             }
648
649                 /* compute the number of elapsed pdp_st moments */
650                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
651 #ifdef DEBUG
652                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
653 #endif
654                 if (rra_step_cnt == NULL)
655                 {
656                    rra_step_cnt = (unsigned long *) 
657                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
658                 }
659
660             for(i = 0, rra_start = rra_begin;
661                 i < rrd.stat_head->rra_cnt;
662             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
663                 i++)
664                 {
665                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
666                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
667                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
668         if (start_pdp_offset <= elapsed_pdp_st) {
669            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
670                       rrd.rra_def[i].pdp_cnt + 1;
671             } else {
672                    rra_step_cnt[i] = 0;
673                 }
674
675                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
676                 {
677                    /* If this is a bulk update, we need to skip ahead in the seasonal
678                         * arrays so that they will be correct for the next observed value;
679                         * note that for the bulk update itself, no update will occur to
680                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
681                         * be set to DNAN. */
682            if (rra_step_cnt[i] > 2) 
683                    {
684                           /* skip update by resetting rra_step_cnt[i],
685                            * note that this is not data source specific; this is due
686                            * to the bulk update, not a DNAN value for the specific data
687                            * source. */
688                           rra_step_cnt[i] = 0;
689               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
690                              &last_seasonal_coef);
691                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
692                              &seasonal_coef);
693                    }
694                 
695                   /* periodically run a smoother for seasonal effects */
696                   /* Need to use first cdp parameter buffer to track
697                    * burnin (burnin requires a specific smoothing schedule).
698                    * The CDP_init_seasonal parameter is really an RRA level,
699                    * not a data source within RRA level parameter, but the rra_def
700                    * is read only for rrd_update (not flushed to disk). */
701                   iii = i*(rrd.stat_head -> ds_cnt);
702                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
703                           <= BURNIN_CYCLES)
704                   {
705                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
706                                  > rrd.rra_def[i].row_cnt - 1) {
707                            /* mark off one of the burnin cycles */
708                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
709                        schedule_smooth = 1;
710                          }  
711                   } else {
712                          /* someone has no doubt invented a trick to deal with this
713                           * wrap around, but at least this code is clear. */
714                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
715                              rrd.rra_ptr[i].cur_row)
716                          {
717                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
718                                   * mapping between PDP and CDP */
719                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
720                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
721                                  {
722 #ifdef DEBUG
723                                         fprintf(stderr,
724                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
725                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
726                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
727 #endif
728                                         schedule_smooth = 1;
729                                  }
730              } else {
731                                  /* can't rely on negative numbers because we are working with
732                                   * unsigned values */
733                                  /* Don't need modulus here. If we've wrapped more than once, only
734                                   * one smooth is executed at the end. */
735                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
736                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
737                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
738                                  {
739 #ifdef DEBUG
740                                         fprintf(stderr,
741                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
742                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
743                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
744 #endif
745                                         schedule_smooth = 1;
746                                  }
747                          }
748                   }
749
750               rra_current = ftell(rrd_file); 
751                 } /* if cf is DEVSEASONAL or SEASONAL */
752
753         if (rrd_test_error()) break;
754
755                     /* update CDP_PREP areas */
756                     /* loop over data soures within each RRA */
757                     for(ii = 0;
758                         ii < rrd.stat_head->ds_cnt;
759                         ii++)
760                         {
761                         
762                         /* iii indexes the CDP prep area for this data source within the RRA */
763                         iii=i*rrd.stat_head->ds_cnt+ii;
764
765                         if (rrd.rra_def[i].pdp_cnt > 1) {
766                           
767                            if (rra_step_cnt[i] > 0) {
768                            /* If we are in this block, as least 1 CDP value will be written to
769                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
770                                 * to be written, then the "fill in" value is the CDP_secondary_val
771                                 * entry. */
772                                   if (isnan(pdp_temp[ii]))
773                   {
774                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
775                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
776                                   } else {
777                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
778                                           * CDP data entries. No matter the CF, the value is the same because
779                                           * the average, max, min, and last of a list of identical values is
780                                           * the same, namely, the value itself. */
781                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
782                                   }
783                      
784                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
785                                       > rrd.rra_def[i].pdp_cnt*
786                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
787                                   {
788                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
789                                          /* initialize carry over */
790                                          if (current_cf == CF_AVERAGE) {
791                                                    if (isnan(pdp_temp[ii])) { 
792                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
793                                                    } else {
794                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
795                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
796                                                    }
797                                          } else {
798                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
799                                          }
800                                   } else {
801                                          rrd_value_t cum_val, cur_val; 
802                                      switch (current_cf) {
803                                                 case CF_AVERAGE:
804                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
805                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
806                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
807                                                (cum_val + cur_val * start_pdp_offset) /
808                                            (rrd.rra_def[i].pdp_cnt
809                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
810                                                    /* initialize carry over value */
811                                                    if (isnan(pdp_temp[ii])) { 
812                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
813                                                    } else {
814                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
815                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
816                                                    }
817                                                    break;
818                                                 case CF_MAXIMUM:
819                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
820                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
821 #ifdef DEBUG
822                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
823                                                           isnan(pdp_temp[ii])) {
824                                                      fprintf(stderr,
825                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
826                                                                 i,ii);
827                                                          exit(-1);
828                                                   }
829 #endif
830                                                   if (cur_val > cum_val)
831                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
832                                                   else
833                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
834                                                   /* initialize carry over value */
835                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
836                                                   break;
837                                                 case CF_MINIMUM:
838                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
839                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
840 #ifdef DEBUG
841                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
842                                                           isnan(pdp_temp[ii])) {
843                                                      fprintf(stderr,
844                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
845                                                                 i,ii);
846                                                          exit(-1);
847                                                   }
848 #endif
849                                                   if (cur_val < cum_val)
850                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
851                                                   else
852                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
853                                                   /* initialize carry over value */
854                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
855                                                   break;
856                                                 case CF_LAST:
857                                                 default:
858                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
859                                                    /* initialize carry over value */
860                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
861                                                 break;
862                                          }
863                                   } /* endif meets xff value requirement for a valid value */
864                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
865                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
866                                   if (isnan(pdp_temp[ii]))
867                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
868                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
869                                   else
870                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
871                } else  /* rra_step_cnt[i]  == 0 */
872                            {
873 #ifdef DEBUG
874                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
875                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
876                                          i,ii);
877                                   } else {
878                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
879                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
880                                   }
881 #endif
882                                   if (isnan(pdp_temp[ii])) {
883                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
884                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
885                                   {
886                                          if (current_cf == CF_AVERAGE) {
887                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
888                                                    elapsed_pdp_st;
889                                          } else {
890                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
891                                          }
892 #ifdef DEBUG
893                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
894                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
895 #endif
896                                   } else {
897                                          switch (current_cf) {
898                                          case CF_AVERAGE:
899                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
900                                                    elapsed_pdp_st;
901                                                 break;
902                                          case CF_MINIMUM:
903                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
904                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
905                                                 break; 
906                                          case CF_MAXIMUM:
907                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
908                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
909                                                 break; 
910                                          case CF_LAST:
911                                          default:
912                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
913                                                 break;
914                                          }
915                                   }
916                            }
917                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
918                            if (elapsed_pdp_st > 2)
919                            {
920                                    switch (current_cf) {
921                                    case CF_AVERAGE:
922                                    default:
923                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
924                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
925                                           break;
926                    case CF_SEASONAL:
927                                    case CF_DEVSEASONAL:
928                                           /* need to update cached seasonal values, so they are consistent
929                                            * with the bulk update */
930                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
931                                            * CDP_last_deviation are the same. */
932                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
933                                                  last_seasonal_coef[ii];
934                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
935                                                  seasonal_coef[ii];
936                                           break;
937                    case CF_HWPREDICT:
938                                           /* need to update the null_count and last_null_count.
939                                            * even do this for non-DNAN pdp_temp because the
940                                            * algorithm is not learning from batch updates. */
941                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
942                                                  elapsed_pdp_st;
943                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
944                                                  elapsed_pdp_st - 1;
945                                           /* fall through */
946                                    case CF_DEVPREDICT:
947                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
948                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
949                                           break;
950                    case CF_FAILURES:
951                                           /* do not count missed bulk values as failures */
952                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
953                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
954                                           /* need to reset violations buffer.
955                                            * could do this more carefully, but for now, just
956                                            * assume a bulk update wipes away all violations. */
957                       erase_violations(&rrd, iii, i);
958                                           break;
959                                    }
960                            } 
961                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
962
963                         if (rrd_test_error()) break;
964
965                         } /* endif data sources loop */
966         } /* end RRA Loop */
967
968                 /* this loop is only entered if elapsed_pdp_st < 3 */
969                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
970                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
971                 {
972                for(i = 0, rra_start = rra_begin;
973                    i < rrd.stat_head->rra_cnt;
974                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
975                    i++)
976                    {
977                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
978
979                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
980                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
981                           {
982                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
983                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
984                                 &seasonal_coef);
985                  rra_current = ftell(rrd_file);
986                           }
987                           if (rrd_test_error()) break;
988                       /* loop over data soures within each RRA */
989                       for(ii = 0;
990                           ii < rrd.stat_head->ds_cnt;
991                           ii++)
992                           {
993                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
994                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
995                                     scratch_idx, seasonal_coef);
996                           }
997            } /* end RRA Loop */
998                    if (rrd_test_error()) break;
999             } /* end elapsed_pdp_st loop */
1000
1001                 if (rrd_test_error()) break;
1002
1003                 /* Ready to write to disk */
1004                 /* Move sequentially through the file, writing one RRA at a time.
1005                  * Note this architecture divorces the computation of CDP with
1006                  * flushing updated RRA entries to disk. */
1007             for(i = 0, rra_start = rra_begin;
1008                 i < rrd.stat_head->rra_cnt;
1009             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1010                 i++) {
1011                 /* is there anything to write for this RRA? If not, continue. */
1012         if (rra_step_cnt[i] == 0) continue;
1013
1014                 /* write the first row */
1015 #ifdef DEBUG
1016         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1017 #endif
1018             rrd.rra_ptr[i].cur_row++;
1019             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1020                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1021                 /* positition on the first row */
1022                 rra_pos_tmp = rra_start +
1023                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1024                 if(rra_pos_tmp != rra_current) {
1025                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1026                       rrd_set_error("seek error in rrd");
1027                       break;
1028                    }
1029                    rra_current = rra_pos_tmp;
1030                 }
1031
1032 #ifdef DEBUG
1033             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1034 #endif
1035                 scratch_idx = CDP_primary_val;
1036                 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1037                 if (rrd_test_error()) break;
1038
1039                 /* write other rows of the bulk update, if any */
1040                 scratch_idx = CDP_secondary_val;
1041                 for ( ; rra_step_cnt[i] > 1; 
1042                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1043                 {
1044                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1045                    {
1046 #ifdef DEBUG
1047               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1048                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1049 #endif
1050                           /* wrap */
1051                           rrd.rra_ptr[i].cur_row = 0;
1052                           /* seek back to beginning of current rra */
1053                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1054                           {
1055                          rrd_set_error("seek error in rrd");
1056                          break;
1057                           }
1058 #ifdef DEBUG
1059                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1060 #endif
1061                           rra_current = rra_start;
1062                    }
1063                    write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1064                 }
1065                 
1066                 if (rrd_test_error())
1067                   break;
1068                 } /* RRA LOOP */
1069
1070             /* break out of the argument parsing loop if error_string is set */
1071             if (rrd_test_error()){
1072                    free(step_start);
1073                    break;
1074             } 
1075             
1076         } /* endif a pdp_st has occurred */ 
1077         rrd.live_head->last_up = current_time;
1078         free(step_start);
1079     } /* function argument loop */
1080
1081     if (seasonal_coef != NULL) free(seasonal_coef);
1082     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1083         if (rra_step_cnt != NULL) free(rra_step_cnt);
1084     rpnstack_free(&rpnstack);
1085
1086     /* if we got here and if there is an error and if the file has not been
1087      * written to, then close things up and return. */
1088     if (rrd_test_error()) {
1089         free(updvals);
1090         free(tmpl_idx);
1091         rrd_free(&rrd);
1092         free(pdp_temp);
1093         free(pdp_new);
1094         fclose(rrd_file);
1095         return(-1);
1096     }
1097
1098     /* aargh ... that was tough ... so many loops ... anyway, its done.
1099      * we just need to write back the live header portion now*/
1100
1101     if (fseek(rrd_file, (sizeof(stat_head_t)
1102                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1103                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1104               SEEK_SET) != 0) {
1105         rrd_set_error("seek rrd for live header writeback");
1106         free(updvals);
1107         free(tmpl_idx);
1108         rrd_free(&rrd);
1109         free(pdp_temp);
1110         free(pdp_new);
1111         fclose(rrd_file);
1112         return(-1);
1113     }
1114
1115     if(fwrite( rrd.live_head,
1116                sizeof(live_head_t), 1, rrd_file) != 1){
1117         rrd_set_error("fwrite live_head to rrd");
1118         free(updvals);
1119         rrd_free(&rrd);
1120         free(tmpl_idx);
1121         free(pdp_temp);
1122         free(pdp_new);
1123         fclose(rrd_file);
1124         return(-1);
1125     }
1126
1127     if(fwrite( rrd.pdp_prep,
1128                sizeof(pdp_prep_t),
1129                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1130         rrd_set_error("ftwrite pdp_prep to rrd");
1131         free(updvals);
1132         rrd_free(&rrd);
1133         free(tmpl_idx);
1134         free(pdp_temp);
1135         free(pdp_new);
1136         fclose(rrd_file);
1137         return(-1);
1138     }
1139
1140     if(fwrite( rrd.cdp_prep,
1141                sizeof(cdp_prep_t),
1142                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1143        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1144
1145         rrd_set_error("ftwrite cdp_prep to rrd");
1146         free(updvals);
1147         free(tmpl_idx);
1148         rrd_free(&rrd);
1149         free(pdp_temp);
1150         free(pdp_new);
1151         fclose(rrd_file);
1152         return(-1);
1153     }
1154
1155     if(fwrite( rrd.rra_ptr,
1156                sizeof(rra_ptr_t), 
1157                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1158         rrd_set_error("fwrite rra_ptr to rrd");
1159         free(updvals);
1160         free(tmpl_idx);
1161         rrd_free(&rrd);
1162         free(pdp_temp);
1163         free(pdp_new);
1164         fclose(rrd_file);
1165         return(-1);
1166     }
1167
1168     /* OK now close the files and free the memory */
1169     if(fclose(rrd_file) != 0){
1170         rrd_set_error("closing rrd");
1171         free(updvals);
1172         free(tmpl_idx);
1173         rrd_free(&rrd);
1174         free(pdp_temp);
1175         free(pdp_new);
1176         return(-1);
1177     }
1178
1179     /* calling the smoothing code here guarantees at most
1180          * one smoothing operation per rrd_update call. Unfortunately,
1181          * it is possible with bulk updates, or a long-delayed update
1182          * for smoothing to occur off-schedule. This really isn't
1183          * critical except during the burning cycles. */
1184         if (schedule_smooth)
1185         {
1186 #ifndef WIN32
1187           rrd_file = fopen(filename,"r+");
1188 #else
1189           rrd_file = fopen(filename,"rb+");
1190 #endif
1191           rra_start = rra_begin;
1192           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1193           {
1194             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1195                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1196             {
1197 #ifdef DEBUG
1198               fprintf(stderr,"Running smoother for rra %ld\n",i);
1199 #endif
1200               apply_smoother(&rrd,i,rra_start,rrd_file);
1201               if (rrd_test_error())
1202                 break;
1203             }
1204             rra_start += rrd.rra_def[i].row_cnt
1205               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1206           }
1207           fclose(rrd_file);
1208         }
1209     rrd_free(&rrd);
1210     free(updvals);
1211     free(tmpl_idx);
1212     free(pdp_new);
1213     free(pdp_temp);
1214     return(0);
1215 }
1216
1217 /*
1218  * get exclusive lock to whole file.
1219  * lock gets removed when we close the file
1220  *
1221  * returns 0 on success
1222  */
1223 int
1224 LockRRD(FILE *rrdfile)
1225 {
1226     int rrd_fd;         /* File descriptor for RRD */
1227     int                 stat;
1228
1229     rrd_fd = fileno(rrdfile);
1230
1231         {
1232 #ifndef WIN32    
1233                 struct flock    lock;
1234     lock.l_type = F_WRLCK;    /* exclusive write lock */
1235     lock.l_len = 0;           /* whole file */
1236     lock.l_start = 0;         /* start of file */
1237     lock.l_whence = SEEK_SET;   /* end of file */
1238
1239     stat = fcntl(rrd_fd, F_SETLK, &lock);
1240 #else
1241                 struct _stat st;
1242
1243                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1244                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1245                 } else {
1246                         stat = -1;
1247                 }
1248 #endif
1249         }
1250
1251     return(stat);
1252 }
1253
1254
1255 void
1256 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1257                unsigned short CDP_scratch_idx, FILE *rrd_file)
1258 {
1259    unsigned long ds_idx, cdp_idx;
1260
1261    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1262    {
1263       /* compute the cdp index */
1264       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1265 #ifdef DEBUG
1266           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1267              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1268              rrd -> rra_def[rra_idx].cf_nam);
1269 #endif 
1270
1271           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1272                  sizeof(rrd_value_t),1,rrd_file) != 1)
1273           { 
1274              rrd_set_error("writing rrd");
1275                  return;
1276           }
1277           *rra_current += sizeof(rrd_value_t);
1278         }
1279 }