started updating for 1.2 release
[rrdtool.git] / src / rrdupdate.c
1 /*****************************************************************************
2  * RRDtool 1.2rc6  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.3  2001/03/04 13:01:56  oetiker
9  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
10  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
11  * This is backwards compatible! But new files using the Aberrant stuff are not readable
12  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
13  * -- Jake Brutlag <jakeb@corp.webtv.net>
14  *
15  * Revision 1.2  2001/03/04 11:14:25  oetiker
16  * added at-style-time@value:value syntax to rrd_update
17  * --  Dave Bodenstab <imdave@mcs.net>
18  * Revision 1.1  2001/02/25 22:25:06  oetiker
19  * Initial revision
20  *
21  *****************************************************************************/
22
23 #include "rrd_tool.h"
24 #include <sys/types.h>
25 #include <fcntl.h>
26
27 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
28  #include <sys/locking.h>
29  #include <sys/stat.h>
30  #include <io.h>
31 #endif
32
33 /* Prototypes */
34 int LockRRD(FILE *rrd_file);
35 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
36     unsigned short CDP_scratch_idx, FILE *rrd_file);
37  
38 /*#define DEBUG */
39
40 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
41
42
43 #ifdef STANDALONE
44 int 
45 main(int argc, char **argv){
46         rrd_update(argc,argv);
47         if (rrd_test_error()) {
48                 printf("RRDtool 1.2rc6  Copyright 1997-2005 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
49                         "Usage: rrdupdate filename\n"
50                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
51                         "\t\t\ttime|N:value[:value...]\n\n"
52                         "\t\t\tat-time@value[:value...]\n\n"
53                         "\t\t\t[ time:value[:value...] ..]\n\n");
54                                    
55                 printf("ERROR: %s\n",rrd_get_error());
56                 rrd_clear_error();                                                            
57                 return 1;
58         }
59         return 0;
60 }
61 #endif
62
63 int
64 rrd_update(int argc, char **argv)
65 {
66
67     int              arg_i = 2;
68     short            j;
69     long             i,ii,iii=1;
70
71     unsigned long    rra_begin;          /* byte pointer to the rra
72                                           * area in the rrd file.  this
73                                           * pointer never changes value */
74     unsigned long    rra_start;          /* byte pointer to the rra
75                                           * area in the rrd file.  this
76                                           * pointer changes as each rrd is
77                                           * processed. */
78     unsigned long    rra_current;        /* byte pointer to the current write
79                                           * spot in the rrd file. */
80     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
81     unsigned long    interval,
82         pre_int,post_int;                /* interval between this and
83                                           * the last run */
84     unsigned long    proc_pdp_st;        /* which pdp_st was the last
85                                           * to be processed */
86     unsigned long    occu_pdp_st;        /* when was the pdp_st
87                                           * before the last update
88                                           * time */
89     unsigned long    proc_pdp_age;       /* how old was the data in
90                                           * the pdp prep area when it
91                                           * was last updated */
92     unsigned long    occu_pdp_age;       /* how long ago was the last
93                                           * pdp_step time */
94     rrd_value_t      *pdp_new;           /* prepare the incoming data
95                                           * to be added the the
96                                           * existing entry */
97     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
98                                           * to be added the the
99                                           * cdp values */
100
101     long             *tmpl_idx;          /* index representing the settings
102                                             transported by the template index */
103     long             tmpl_cnt = 2;       /* time and data */
104
105     FILE             *rrd_file;
106     rrd_t            rrd;
107     time_t           current_time = time(NULL);
108     char             **updvals;
109     int              schedule_smooth = 0;
110     char             *template = NULL;   
111         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
112                                          /* a vector of future Holt-Winters seasonal coefs */
113     unsigned long    elapsed_pdp_st;
114                                          /* number of elapsed PDP steps since last update */
115     unsigned long    *rra_step_cnt = NULL;
116                                          /* number of rows to be updated in an RRA for a data
117                                           * value. */
118     unsigned long    start_pdp_offset;
119                                          /* number of PDP steps since the last update that
120                                           * are assigned to the first CDP to be generated
121                                           * since the last update. */
122     unsigned short   scratch_idx;
123                                          /* index into the CDP scratch array */
124     enum cf_en       current_cf;
125                                          /* numeric id of the current consolidation function */
126
127     while (1) {
128         static struct option long_options[] =
129         {
130             {"template",      required_argument, 0, 't'},
131             {0,0,0,0}
132         };
133         int option_index = 0;
134         int opt;
135         opt = getopt_long(argc, argv, "t:", 
136                           long_options, &option_index);
137         
138         if (opt == EOF)
139           break;
140         
141         switch(opt) {
142         case 't':
143             template = optarg;
144             break;
145
146         case '?':
147             rrd_set_error("unknown option '%s'",argv[optind-1]);
148             rrd_free(&rrd);            
149             return(-1);
150         }
151     }
152
153     /* need at least 2 arguments: filename, data. */
154     if (argc-optind < 2) {
155         rrd_set_error("Not enough arguments");
156         return -1;
157     }
158
159     if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
160         return -1;
161     }
162     rra_current = rra_start = rra_begin = ftell(rrd_file);
163     /* This is defined in the ANSI C standard, section 7.9.5.3:
164
165         When a file is opened with udpate mode ('+' as the second
166         or third character in the ... list of mode argument
167         variables), both input and ouptut may be performed on the
168         associated stream.  However, ...  input may not be directly
169         followed by output without an intervening call to a file
170         positioning function, unless the input oepration encounters
171         end-of-file. */
172     fseek(rrd_file, 0, SEEK_CUR);
173
174     
175     /* get exclusive lock to whole file.
176      * lock gets removed when we close the file.
177      */
178     if (LockRRD(rrd_file) != 0) {
179       rrd_set_error("could not lock RRD");
180       rrd_free(&rrd);
181       fclose(rrd_file);
182       return(-1);   
183     } 
184
185     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
186         rrd_set_error("allocating updvals pointer array");
187         rrd_free(&rrd);
188         fclose(rrd_file);
189         return(-1);
190     }
191
192     if ((pdp_temp = malloc(sizeof(rrd_value_t)
193                            *rrd.stat_head->ds_cnt))==NULL){
194         rrd_set_error("allocating pdp_temp ...");
195         free(updvals);
196         rrd_free(&rrd);
197         fclose(rrd_file);
198         return(-1);
199     }
200
201     if ((tmpl_idx = malloc(sizeof(unsigned long)
202                            *(rrd.stat_head->ds_cnt+1)))==NULL){
203         rrd_set_error("allocating tmpl_idx ...");
204         free(pdp_temp);
205         free(updvals);
206         rrd_free(&rrd);
207         fclose(rrd_file);
208         return(-1);
209     }
210     /* initialize template redirector */
211     /* default config
212        tmpl_idx[0] -> 0; (time)
213        tmpl_idx[1] -> 1; (DS 0)
214        tmpl_idx[2] -> 2; (DS 1)
215        tmpl_idx[3] -> 3; (DS 2)
216        ... */
217     for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
218     tmpl_cnt=rrd.stat_head->ds_cnt+1;
219     if (template) {
220         char *dsname;
221         int tmpl_len;
222         dsname = template;
223         tmpl_cnt = 1; /* the first entry is the time */
224         tmpl_len = strlen(template);
225         for(i=0;i<=tmpl_len ;i++) {
226             if (template[i] == ':' || template[i] == '\0') {
227                 template[i] = '\0';
228                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
229                     rrd_set_error("Template contains more DS definitions than RRD");
230                     free(updvals); free(pdp_temp);
231                     free(tmpl_idx); rrd_free(&rrd);
232                     fclose(rrd_file); return(-1);
233                 }
234                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
235                     rrd_set_error("unknown DS name '%s'",dsname);
236                     free(updvals); free(pdp_temp);
237                     free(tmpl_idx); rrd_free(&rrd);
238                     fclose(rrd_file); return(-1);
239                 } else {
240                   /* the first element is always the time */
241                   tmpl_idx[tmpl_cnt-1]++; 
242                   /* go to the next entry on the template */
243                   dsname = &template[i+1];
244                   /* fix the damage we did before */
245                   if (i<tmpl_len) {
246                      template[i]=':';
247                   } 
248
249                 }
250             }       
251         }
252     }
253     if ((pdp_new = malloc(sizeof(rrd_value_t)
254                           *rrd.stat_head->ds_cnt))==NULL){
255         rrd_set_error("allocating pdp_new ...");
256         free(updvals);
257         free(pdp_temp);
258         free(tmpl_idx);
259         rrd_free(&rrd);
260         fclose(rrd_file);
261         return(-1);
262     }
263
264     /* loop through the arguments. */
265     for(arg_i=optind+1; arg_i<argc;arg_i++) {
266         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
267         char *step_start = stepper;
268         char *p;
269         char *parsetime_error = NULL;
270         enum {atstyle, normal} timesyntax;
271         struct time_value ds_tv;
272         if (stepper == NULL){
273                 rrd_set_error("failed duplication argv entry");
274                 free(updvals);
275                 free(pdp_temp);  
276                 free(tmpl_idx);
277                 rrd_free(&rrd);
278                 fclose(rrd_file);
279                 return(-1);
280          }
281         /* initialize all ds input to unknown except the first one
282            which has always got to be set */
283         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
284         strcpy(stepper,argv[arg_i]);
285         updvals[0]=stepper;
286         /* separate all ds elements; first must be examined separately
287            due to alternate time syntax */
288         if ((p=strchr(stepper,'@'))!=NULL) {
289             timesyntax = atstyle;
290             *p = '\0';
291             stepper = p+1;
292         } else if ((p=strchr(stepper,':'))!=NULL) {
293             timesyntax = normal;
294             *p = '\0';
295             stepper = p+1;
296         } else {
297             rrd_set_error("expected timestamp not found in data source from %s:...",
298                           argv[arg_i]);
299             free(step_start);
300             break;
301         }
302         ii=1;
303         updvals[tmpl_idx[ii]] = stepper;
304         while (*stepper) {
305             if (*stepper == ':') {
306                 *stepper = '\0';
307                 ii++;
308                 if (ii<tmpl_cnt){                   
309                     updvals[tmpl_idx[ii]] = stepper+1;
310                 }
311             }
312             stepper++;
313         }
314
315         if (ii != tmpl_cnt-1) {
316             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
317                           tmpl_cnt-1, ii, argv[arg_i]);
318             free(step_start);
319             break;
320         }
321         
322         /* get the time from the reading ... handle N */
323         if (timesyntax == atstyle) {
324             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
325                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
326                 free(step_start);
327                 break;
328             }
329             if (ds_tv.type == RELATIVE_TO_END_TIME ||
330                 ds_tv.type == RELATIVE_TO_START_TIME) {
331                 rrd_set_error("specifying time relative to the 'start' "
332                               "or 'end' makes no sense here: %s",
333                               updvals[0]);
334                 free(step_start);
335                 break;
336             }
337
338             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
339         } else if (strcmp(updvals[0],"N")==0){
340             current_time = time(NULL);
341         } else {
342             current_time = atol(updvals[0]);
343         }
344         
345         if(current_time <= rrd.live_head->last_up){
346             rrd_set_error("illegal attempt to update using time %ld when "
347                           "last update time is %ld (minimum one second step)",
348                           current_time, rrd.live_head->last_up);
349             free(step_start);
350             break;
351         }
352         
353         
354         /* seek to the beginning of the rra's */
355         if (rra_current != rra_begin) {
356             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
357                 rrd_set_error("seek error in rrd");
358                 free(step_start);
359                 break;
360             }
361             rra_current = rra_begin;
362         }
363         rra_start = rra_begin;
364
365         /* when was the current pdp started */
366         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
367         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
368
369         /* when did the last pdp_st occur */
370         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
371         occu_pdp_st = current_time - occu_pdp_age;
372         interval = current_time - rrd.live_head->last_up;
373     
374         if (occu_pdp_st > proc_pdp_st){
375             /* OK we passed the pdp_st moment*/
376             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
377                                                               * occurred before the latest
378                                                               * pdp_st moment*/
379             post_int = occu_pdp_age;                         /* how much after it */
380         } else {
381             pre_int = interval;
382             post_int = 0;
383         }
384
385 #ifdef DEBUG
386         printf(
387                "proc_pdp_age %lu\t"
388                "proc_pdp_st %lu\t" 
389                "occu_pfp_age %lu\t" 
390                "occu_pdp_st %lu\t"
391                "int %lu\t"
392                "pre_int %lu\t"
393                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
394                 occu_pdp_age, occu_pdp_st,
395                interval, pre_int, post_int);
396 #endif
397     
398         /* process the data sources and update the pdp_prep 
399          * area accordingly */
400         for(i=0;i<rrd.stat_head->ds_cnt;i++){
401             enum dst_en dst_idx;
402             dst_idx= dst_conv(rrd.ds_def[i].dst);
403             if((updvals[i+1][0] != 'U') &&
404                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
405                double rate = DNAN;
406                /* the data source type defines how to process the data */
407                 /* pdp_temp contains rate * time ... eg the bytes
408                  * transferred during the interval. Doing it this way saves
409                  * a lot of math operations */
410                 
411
412                 switch(dst_idx){
413                 case DST_COUNTER:
414                 case DST_DERIVE:
415                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
416                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
417                        if(dst_idx == DST_COUNTER) {
418                           /* simple overflow catcher sugestet by andres kroonmaa */
419                           /* this will fail terribly for non 32 or 64 bit counters ... */
420                           /* are there any others in SNMP land ? */
421                           if (pdp_new[i] < (double)0.0 ) 
422                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
423                           if (pdp_new[i] < (double)0.0 ) 
424                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
425                        }
426                        rate = pdp_new[i] / interval;
427                     }
428                    else {
429                      pdp_new[i]= DNAN;          
430                    }
431                    break;
432                 case DST_ABSOLUTE:
433                     pdp_new[i]= atof(updvals[i+1]);
434                     rate = pdp_new[i] / interval;                 
435                     break;
436                 case DST_GAUGE:
437                     pdp_new[i] = atof(updvals[i+1]) * interval;
438                     rate = pdp_new[i] / interval;                  
439                     break;
440                 default:
441                     rrd_set_error("rrd contains unknown DS type : '%s'",
442                                   rrd.ds_def[i].dst);
443                     break;
444                 }
445                 /* break out of this for loop if the error string is set */
446                 if (rrd_test_error()){
447                     break;
448                 }
449                /* make sure pdp_temp is neither too large or too small
450                 * if any of these occur it becomes unknown ...
451                 * sorry folks ... */
452                if ( ! isnan(rate) && 
453                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
454                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
455                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
456                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
457                   pdp_new[i] = DNAN;
458                }               
459             } else {
460                 /* no news is news all the same */
461                 pdp_new[i] = DNAN;
462             }
463             
464             /* make a copy of the command line argument for the next run */
465 #ifdef DEBUG
466             fprintf(stderr,
467                     "prep ds[%lu]\t"
468                     "last_arg '%s'\t"
469                     "this_arg '%s'\t"
470                     "pdp_new %10.2f\n",
471                     i,
472                     rrd.pdp_prep[i].last_ds,
473                     updvals[i+1], pdp_new[i]);
474 #endif
475             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
476                 strncpy(rrd.pdp_prep[i].last_ds,
477                         updvals[i+1],LAST_DS_LEN-1);
478                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
479             }
480         }
481         /* break out of the argument parsing loop if the error_string is set */
482         if (rrd_test_error()){
483             free(step_start);
484             break;
485         }
486         /* has a pdp_st moment occurred since the last run ? */
487
488         if (proc_pdp_st == occu_pdp_st){
489             /* no we have not passed a pdp_st moment. therefore update is simple */
490
491             for(i=0;i<rrd.stat_head->ds_cnt;i++){
492                 if(isnan(pdp_new[i]))
493                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
494                 else
495                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
496 #ifdef DEBUG
497                 fprintf(stderr,
498                         "NO PDP  ds[%lu]\t"
499                         "value %10.2f\t"
500                         "unkn_sec %5lu\n",
501                         i,
502                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
503                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
504 #endif
505             }   
506         } else {
507             /* an pdp_st has occurred. */
508
509             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
510              * occurred up to the last run.        
511             pdp_new[] contains rate*seconds from the latest run.
512             pdp_temp[] will contain the rate for cdp */
513
514
515             for(i=0;i<rrd.stat_head->ds_cnt;i++){
516                 /* update pdp_prep to the current pdp_st */
517                 if(isnan(pdp_new[i]))
518                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
519                 else
520                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
521                         pdp_new[i]/(double)interval*(double)pre_int;
522
523                 /* if too much of the pdp_prep is unknown we dump it */
524                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
525                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
526                     (occu_pdp_st-proc_pdp_st <= 
527                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
528                     pdp_temp[i] = DNAN;
529                 } else {
530                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
531                         / (double)( occu_pdp_st
532                                    - proc_pdp_st
533                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
534                 }
535                 /* make pdp_prep ready for the next run */
536                 if(isnan(pdp_new[i])){
537                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
538                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
539                 } else {
540                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
541                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
542                         pdp_new[i]/(double)interval*(double)post_int;
543                 }
544
545 #ifdef DEBUG
546                 fprintf(stderr,
547                         "PDP UPD ds[%lu]\t"
548                         "pdp_temp %10.2f\t"
549                         "new_prep %10.2f\t"
550                         "new_unkn_sec %5lu\n",
551                         i, pdp_temp[i],
552                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
553                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
554 #endif
555             }
556
557                 /* compute the number of elapsed pdp_st moments */
558                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
559 #ifdef DEBUG
560                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
561 #endif
562                 if (rra_step_cnt == NULL)
563                 {
564                    rra_step_cnt = (unsigned long *) 
565                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
566                 }
567
568             for(i = 0, rra_start = rra_begin;
569                 i < rrd.stat_head->rra_cnt;
570             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
571                 i++)
572                 {
573                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
574                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
575                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
576         if (start_pdp_offset <= elapsed_pdp_st) {
577            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
578                       rrd.rra_def[i].pdp_cnt + 1;
579             } else {
580                    rra_step_cnt[i] = 0;
581                 }
582
583                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
584                 {
585                    /* If this is a bulk update, we need to skip ahead in the seasonal
586                         * arrays so that they will be correct for the next observed value;
587                         * note that for the bulk update itself, no update will occur to
588                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
589                         * be set to DNAN. */
590            if (rra_step_cnt[i] > 2) 
591                    {
592                           /* skip update by resetting rra_step_cnt[i],
593                            * note that this is not data source specific; this is due
594                            * to the bulk update, not a DNAN value for the specific data
595                            * source. */
596                           rra_step_cnt[i] = 0;
597               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
598                              &last_seasonal_coef);
599                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
600                              &seasonal_coef);
601                    }
602                 
603                   /* periodically run a smoother for seasonal effects */
604                   /* Need to use first cdp parameter buffer to track
605                    * burnin (burnin requires a specific smoothing schedule).
606                    * The CDP_init_seasonal parameter is really an RRA level,
607                    * not a data source within RRA level parameter, but the rra_def
608                    * is read only for rrd_update (not flushed to disk). */
609                   iii = i*(rrd.stat_head -> ds_cnt);
610                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
611                           <= BURNIN_CYCLES)
612                   {
613                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
614                                  > rrd.rra_def[i].row_cnt - 1) {
615                            /* mark off one of the burnin cycles */
616                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
617                        schedule_smooth = 1;
618                          }  
619                   } else {
620                          /* someone has no doubt invented a trick to deal with this
621                           * wrap around, but at least this code is clear. */
622                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
623                              rrd.rra_ptr[i].cur_row)
624                          {
625                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
626                                   * mapping between PDP and CDP */
627                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
628                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
629                                  {
630 #ifdef DEBUG
631                                         fprintf(stderr,
632                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
633                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
634                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
635 #endif
636                                         schedule_smooth = 1;
637                                  }
638              } else {
639                                  /* can't rely on negative numbers because we are working with
640                                   * unsigned values */
641                                  /* Don't need modulus here. If we've wrapped more than once, only
642                                   * one smooth is executed at the end. */
643                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
644                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
645                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
646                                  {
647 #ifdef DEBUG
648                                         fprintf(stderr,
649                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
650                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
651                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
652 #endif
653                                         schedule_smooth = 1;
654                                  }
655                          }
656                   }
657
658               rra_current = ftell(rrd_file); 
659                 } /* if cf is DEVSEASONAL or SEASONAL */
660
661         if (rrd_test_error()) break;
662
663                     /* update CDP_PREP areas */
664                     /* loop over data soures within each RRA */
665                     for(ii = 0;
666                         ii < rrd.stat_head->ds_cnt;
667                         ii++)
668                         {
669                         
670                         /* iii indexes the CDP prep area for this data source within the RRA */
671                         iii=i*rrd.stat_head->ds_cnt+ii;
672
673                         if (rrd.rra_def[i].pdp_cnt > 1) {
674                           
675                            if (rra_step_cnt[i] > 0) {
676                            /* If we are in this block, as least 1 CDP value will be written to
677                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
678                                 * to be written, then the "fill in" value is the CDP_secondary_val
679                                 * entry. */
680                                   if (isnan(pdp_temp[ii]))
681                   {
682                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
683                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
684                                   } else {
685                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
686                                           * CDP data entries. No matter the CF, the value is the same because
687                                           * the average, max, min, and last of a list of identical values is
688                                           * the same, namely, the value itself. */
689                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
690                                   }
691                      
692                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
693                                       > rrd.rra_def[i].pdp_cnt*
694                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
695                                   {
696                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
697                                          /* initialize carry over */
698                                          if (current_cf == CF_AVERAGE) {
699                                                    if (isnan(pdp_temp[ii])) { 
700                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
701                                                    } else {
702                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
703                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
704                                                    }
705                                          } else {
706                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
707                                          }
708                                   } else {
709                                          rrd_value_t cum_val, cur_val; 
710                                      switch (current_cf) {
711                                                 case CF_AVERAGE:
712                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
713                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
714                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
715                                                (cum_val + cur_val) /
716                                            (rrd.rra_def[i].pdp_cnt
717                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
718                                                    /* initialize carry over value */
719                                                    if (isnan(pdp_temp[ii])) { 
720                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
721                                                    } else {
722                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
723                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
724                                                    }
725                                                    break;
726                                                 case CF_MAXIMUM:
727                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
728                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
729 #ifdef DEBUG
730                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
731                                                           isnan(pdp_temp[ii])) {
732                                                      fprintf(stderr,
733                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
734                                                                 i,ii);
735                                                          exit(-1);
736                                                   }
737 #endif
738                                                   if (cur_val > cum_val)
739                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
740                                                   else
741                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
742                                                   /* initialize carry over value */
743                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
744                                                   break;
745                                                 case CF_MINIMUM:
746                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
747                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
748 #ifdef DEBUG
749                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
750                                                           isnan(pdp_temp[ii])) {
751                                                      fprintf(stderr,
752                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
753                                                                 i,ii);
754                                                          exit(-1);
755                                                   }
756 #endif
757                                                   if (cur_val < cum_val)
758                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
759                                                   else
760                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
761                                                   /* initialize carry over value */
762                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
763                                                   break;
764                                                 case CF_LAST:
765                                                 default:
766                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
767                                                    /* initialize carry over value */
768                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
769                                                 break;
770                                          }
771                                   } /* endif meets xff value requirement for a valid value */
772                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
773                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
774                                   if (isnan(pdp_temp[ii]))
775                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
776                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
777                                   else
778                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
779                } else  /* rra_step_cnt[i]  == 0 */
780                            {
781 #ifdef DEBUG
782                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
783                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
784                                          i,ii);
785                                   } else {
786                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
787                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
788                                   }
789 #endif
790                                   if (isnan(pdp_temp[ii])) {
791                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
792                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
793                                   {
794                                          if (current_cf == CF_AVERAGE) {
795                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
796                                                    elapsed_pdp_st;
797                                          } else {
798                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
799                                          }
800 #ifdef DEBUG
801                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
802                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
803 #endif
804                                   } else {
805                                          switch (current_cf) {
806                                          case CF_AVERAGE:
807                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
808                                                    elapsed_pdp_st;
809                                                 break;
810                                          case CF_MINIMUM:
811                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
812                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
813                                                 break; 
814                                          case CF_MAXIMUM:
815                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
816                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
817                                                 break; 
818                                          case CF_LAST:
819                                          default:
820                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
821                                                 break;
822                                          }
823                                   }
824                            }
825                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
826                            if (elapsed_pdp_st > 2)
827                            {
828                                    switch (current_cf) {
829                                    case CF_AVERAGE:
830                                    default:
831                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
832                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
833                                           break;
834                    case CF_SEASONAL:
835                                    case CF_DEVSEASONAL:
836                                           /* need to update cached seasonal values, so they are consistent
837                                            * with the bulk update */
838                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
839                                            * CDP_last_deviation are the same. */
840                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
841                                                  last_seasonal_coef[ii];
842                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
843                                                  seasonal_coef[ii];
844                                           break;
845                    case CF_HWPREDICT:
846                                           /* need to update the null_count and last_null_count.
847                                            * even do this for non-DNAN pdp_temp because the
848                                            * algorithm is not learning from batch updates. */
849                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
850                                                  elapsed_pdp_st;
851                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
852                                                  elapsed_pdp_st - 1;
853                                           /* fall through */
854                                    case CF_DEVPREDICT:
855                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
856                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
857                                           break;
858                    case CF_FAILURES:
859                                           /* do not count missed bulk values as failures */
860                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
861                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
862                                           /* need to reset violations buffer.
863                                            * could do this more carefully, but for now, just
864                                            * assume a bulk update wipes away all violations. */
865                       erase_violations(&rrd, iii, i);
866                                           break;
867                                    }
868                            } 
869                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
870
871                         if (rrd_test_error()) break;
872
873                         } /* endif data sources loop */
874         } /* end RRA Loop */
875
876                 /* this loop is only entered if elapsed_pdp_st < 3 */
877                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
878                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
879                 {
880                for(i = 0, rra_start = rra_begin;
881                    i < rrd.stat_head->rra_cnt;
882                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
883                    i++)
884                    {
885                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
886
887                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
888                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
889                           {
890                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
891                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
892                                 &seasonal_coef);
893                           }
894                           if (rrd_test_error()) break;
895                       /* loop over data soures within each RRA */
896                       for(ii = 0;
897                           ii < rrd.stat_head->ds_cnt;
898                           ii++)
899                           {
900                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
901                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
902                                     scratch_idx, seasonal_coef);
903                           }
904            } /* end RRA Loop */
905                    if (rrd_test_error()) break;
906             } /* end elapsed_pdp_st loop */
907
908                 if (rrd_test_error()) break;
909
910                 /* Ready to write to disk */
911                 /* Move sequentially through the file, writing one RRA at a time.
912                  * Note this architecture divorces the computation of CDP with
913                  * flushing updated RRA entries to disk. */
914             for(i = 0, rra_start = rra_begin;
915                 i < rrd.stat_head->rra_cnt;
916             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
917                 i++) {
918                 /* is there anything to write for this RRA? If not, continue. */
919         if (rra_step_cnt[i] == 0) continue;
920
921                 /* write the first row */
922 #ifdef DEBUG
923         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
924 #endif
925             rrd.rra_ptr[i].cur_row++;
926             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
927                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
928                 /* positition on the first row */
929                 rra_pos_tmp = rra_start +
930                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
931                 if(rra_pos_tmp != rra_current) {
932                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
933                       rrd_set_error("seek error in rrd");
934                       break;
935                    }
936                    rra_current = rra_pos_tmp;
937                 }
938 #ifdef DEBUG
939             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
940 #endif
941                 scratch_idx = CDP_primary_val;
942                 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
943                 if (rrd_test_error()) break;
944
945                 /* write other rows of the bulk update, if any */
946                 scratch_idx = CDP_secondary_val;
947                 for ( ; rra_step_cnt[i] > 1; 
948                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
949                 {
950                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
951                    {
952 #ifdef DEBUG
953               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
954                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
955 #endif
956                           /* wrap */
957                           rrd.rra_ptr[i].cur_row = 0;
958                           /* seek back to beginning of current rra */
959                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
960                           {
961                          rrd_set_error("seek error in rrd");
962                          break;
963                           }
964 #ifdef DEBUG
965                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
966 #endif
967                           rra_current = rra_start;
968                    }
969                    write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
970                 }
971                 
972                 if (rrd_test_error())
973                   break;
974                 } /* RRA LOOP */
975
976             /* break out of the argument parsing loop if error_string is set */
977             if (rrd_test_error()){
978                    free(step_start);
979                    break;
980             } 
981             
982         } /* endif a pdp_st has occurred */ 
983         rrd.live_head->last_up = current_time;
984         free(step_start);
985     } /* function argument loop */
986
987     if (seasonal_coef != NULL) free(seasonal_coef);
988     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
989         if (rra_step_cnt != NULL) free(rra_step_cnt);
990
991     /* if we got here and if there is an error and if the file has not been
992      * written to, then close things up and return. */
993     if (rrd_test_error()) {
994         free(updvals);
995         free(tmpl_idx);
996         rrd_free(&rrd);
997         free(pdp_temp);
998         free(pdp_new);
999         fclose(rrd_file);
1000         return(-1);
1001     }
1002
1003     /* aargh ... that was tough ... so many loops ... anyway, its done.
1004      * we just need to write back the live header portion now*/
1005
1006     if (fseek(rrd_file, (sizeof(stat_head_t)
1007                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1008                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1009               SEEK_SET) != 0) {
1010         rrd_set_error("seek rrd for live header writeback");
1011         free(updvals);
1012         free(tmpl_idx);
1013         rrd_free(&rrd);
1014         free(pdp_temp);
1015         free(pdp_new);
1016         fclose(rrd_file);
1017         return(-1);
1018     }
1019
1020     if(fwrite( rrd.live_head,
1021                sizeof(live_head_t), 1, rrd_file) != 1){
1022         rrd_set_error("fwrite live_head to rrd");
1023         free(updvals);
1024         rrd_free(&rrd);
1025         free(tmpl_idx);
1026         free(pdp_temp);
1027         free(pdp_new);
1028         fclose(rrd_file);
1029         return(-1);
1030     }
1031
1032     if(fwrite( rrd.pdp_prep,
1033                sizeof(pdp_prep_t),
1034                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1035         rrd_set_error("ftwrite pdp_prep to rrd");
1036         free(updvals);
1037         rrd_free(&rrd);
1038         free(tmpl_idx);
1039         free(pdp_temp);
1040         free(pdp_new);
1041         fclose(rrd_file);
1042         return(-1);
1043     }
1044
1045     if(fwrite( rrd.cdp_prep,
1046                sizeof(cdp_prep_t),
1047                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1048        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1049
1050         rrd_set_error("ftwrite cdp_prep to rrd");
1051         free(updvals);
1052         free(tmpl_idx);
1053         rrd_free(&rrd);
1054         free(pdp_temp);
1055         free(pdp_new);
1056         fclose(rrd_file);
1057         return(-1);
1058     }
1059
1060     if(fwrite( rrd.rra_ptr,
1061                sizeof(rra_ptr_t), 
1062                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1063         rrd_set_error("fwrite rra_ptr to rrd");
1064         free(updvals);
1065         free(tmpl_idx);
1066         rrd_free(&rrd);
1067         free(pdp_temp);
1068         free(pdp_new);
1069         fclose(rrd_file);
1070         return(-1);
1071     }
1072
1073     /* OK now close the files and free the memory */
1074     if(fclose(rrd_file) != 0){
1075         rrd_set_error("closing rrd");
1076         free(updvals);
1077         free(tmpl_idx);
1078         rrd_free(&rrd);
1079         free(pdp_temp);
1080         free(pdp_new);
1081         return(-1);
1082     }
1083
1084     /* calling the smoothing code here guarantees at most
1085          * one smoothing operation per rrd_update call. Unfortunately,
1086          * it is possible with bulk updates, or a long-delayed update
1087          * for smoothing to occur off-schedule. This really isn't
1088          * critical except during the burning cycles. */
1089         if (schedule_smooth)
1090         {
1091 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1092           rrd_file = fopen(argv[optind],"rb+");
1093 #else
1094           rrd_file = fopen(argv[optind],"r+");
1095 #endif
1096           rra_start = rra_begin;
1097           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1098           {
1099             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1100                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1101             {
1102 #ifdef DEBUG
1103               fprintf(stderr,"Running smoother for rra %ld\n",i);
1104 #endif
1105               apply_smoother(&rrd,i,rra_start,rrd_file);
1106               if (rrd_test_error())
1107                 break;
1108             }
1109             rra_start += rrd.rra_def[i].row_cnt
1110               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1111           }
1112           fclose(rrd_file);
1113         }
1114     rrd_free(&rrd);
1115     free(updvals);
1116     free(tmpl_idx);
1117     free(pdp_new);
1118     free(pdp_temp);
1119     return(0);
1120 }
1121
1122 /*
1123  * get exclusive lock to whole file.
1124  * lock gets removed when we close the file
1125  *
1126  * returns 0 on success
1127  */
1128 int
1129 LockRRD(FILE *rrdfile)
1130 {
1131     int rrd_fd;         /* File descriptor for RRD */
1132     int                 stat;
1133
1134     rrd_fd = fileno(rrdfile);
1135
1136         {
1137 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1138                 struct _stat st;
1139
1140                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1141                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1142                 } else {
1143                         stat = -1;
1144                 }
1145 #else
1146                 struct flock    lock;
1147                 lock.l_type = F_WRLCK;    /* exclusive write lock */
1148                 lock.l_len = 0;       /* whole file */
1149                 lock.l_start = 0;             /* start of file */
1150                 lock.l_whence = SEEK_SET;   /* end of file */
1151
1152                 stat = fcntl(rrd_fd, F_SETLK, &lock);
1153 #endif
1154         }
1155
1156     return(stat);
1157 }
1158
1159
1160 void
1161 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1162    unsigned short CDP_scratch_idx, FILE *rrd_file)
1163 {
1164    unsigned long ds_idx, cdp_idx;
1165
1166    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1167    {
1168       /* compute the cdp index */
1169       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1170 #ifdef DEBUG
1171           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1172              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1173              rrd -> rra_def[rra_idx].cf_nam);
1174 #endif
1175
1176           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1177                  sizeof(rrd_value_t),1,rrd_file) != 1)
1178           { 
1179              rrd_set_error("writing rrd");
1180                  return;
1181           }
1182           *rra_current += sizeof(rrd_value_t);
1183         }
1184 }