08ecb493fd28936614c2952a07fef9fd5484d7ad
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.0.33  Copyright Tobias Oetiker, 1997 - 2000
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.3  2001/03/04 13:01:55  oetiker
9  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
10  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
11  * This is backwards compatible! But new files using the Aberrant stuff are not readable
12  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
13  * -- Jake Brutlag <jakeb@corp.webtv.net>
14  *
15  * Revision 1.2  2001/03/04 11:14:25  oetiker
16  * added at-style-time@value:value syntax to rrd_update
17  * --  Dave Bodenstab <imdave@mcs.net>
18  *
19  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
20  * checkin
21  *
22  *****************************************************************************/
23
24 #include "rrd_tool.h"
25 #include <sys/types.h>
26 #include <fcntl.h>
27
28 #ifdef WIN32
29  #include <sys/locking.h>
30  #include <sys/stat.h>
31  #include <io.h>
32 #endif
33
34 /* Prototypes */
35 int LockRRD(FILE *rrd_file);
36 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
37     unsigned short CDP_scratch_idx, FILE *rrd_file);
38  
39 /*#define DEBUG */
40
41 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
42
43
44 #ifdef STANDALONE
45 int 
46 main(int argc, char **argv){
47         rrd_update(argc,argv);
48         if (rrd_test_error()) {
49                 printf("RRDtool 1.0.33  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
50                         "Usage: rrdupdate filename\n"
51                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
52                         "\t\t\ttime|N:value[:value...]\n\n"
53                         "\t\t\tat-time@value[:value...]\n\n"
54                         "\t\t\t[ time:value[:value...] ..]\n\n");
55                                    
56                 printf("ERROR: %s\n",rrd_get_error());
57                 rrd_clear_error();                                                            
58                 return 1;
59         }
60         return 0;
61 }
62 #endif
63
64 int
65 rrd_update(int argc, char **argv)
66 {
67
68     int              arg_i = 2;
69     short            j;
70     long             i,ii,iii=1;
71
72     unsigned long    rra_begin;          /* byte pointer to the rra
73                                           * area in the rrd file.  this
74                                           * pointer never changes value */
75     unsigned long    rra_start;          /* byte pointer to the rra
76                                           * area in the rrd file.  this
77                                           * pointer changes as each rrd is
78                                           * processed. */
79     unsigned long    rra_current;        /* byte pointer to the current write
80                                           * spot in the rrd file. */
81     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
82     unsigned long    interval,
83         pre_int,post_int;                /* interval between this and
84                                           * the last run */
85     unsigned long    proc_pdp_st;        /* which pdp_st was the last
86                                           * to be processed */
87     unsigned long    occu_pdp_st;        /* when was the pdp_st
88                                           * before the last update
89                                           * time */
90     unsigned long    proc_pdp_age;       /* how old was the data in
91                                           * the pdp prep area when it
92                                           * was last updated */
93     unsigned long    occu_pdp_age;       /* how long ago was the last
94                                           * pdp_step time */
95     rrd_value_t      *pdp_new;           /* prepare the incoming data
96                                           * to be added the the
97                                           * existing entry */
98     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
99                                           * to be added the the
100                                           * cdp values */
101
102     long             *tmpl_idx;          /* index representing the settings
103                                             transported by the template index */
104     long             tmpl_cnt = 2;       /* time and data */
105
106     FILE             *rrd_file;
107     rrd_t            rrd;
108     time_t           current_time = time(NULL);
109     char             **updvals;
110     int              schedule_smooth = 0;
111     char             *template = NULL;   
112         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
113                                          /* a vector of future Holt-Winters seasonal coefs */
114     unsigned long    elapsed_pdp_st;
115                                          /* number of elapsed PDP steps since last update */
116     unsigned long    *rra_step_cnt = NULL;
117                                          /* number of rows to be updated in an RRA for a data
118                                           * value. */
119     unsigned long    start_pdp_offset;
120                                          /* number of PDP steps since the last update that
121                                           * are assigned to the first CDP to be generated
122                                           * since the last update. */
123     unsigned short   scratch_idx;
124                                          /* index into the CDP scratch array */
125     enum cf_en       current_cf;
126                                          /* numeric id of the current consolidation function */
127
128     while (1) {
129         static struct option long_options[] =
130         {
131             {"template",      required_argument, 0, 't'},
132             {0,0,0,0}
133         };
134         int option_index = 0;
135         int opt;
136         opt = getopt_long(argc, argv, "t:", 
137                           long_options, &option_index);
138         
139         if (opt == EOF)
140           break;
141         
142         switch(opt) {
143         case 't':
144             template = optarg;
145             break;
146
147         case '?':
148             rrd_set_error("unknown option '%s'",argv[optind-1]);
149             rrd_free(&rrd);            
150             return(-1);
151         }
152     }
153
154     /* need at least 2 arguments: filename, data. */
155     if (argc-optind < 2) {
156         rrd_set_error("Not enough arguments");
157         return -1;
158     }
159
160     if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
161         return -1;
162     }
163     rra_current = rra_start = rra_begin = ftell(rrd_file);
164     /* This is defined in the ANSI C standard, section 7.9.5.3:
165
166         When a file is opened with udpate mode ('+' as the second
167         or third character in the ... list of mode argument
168         variables), both input and ouptut may be performed on the
169         associated stream.  However, ...  input may not be directly
170         followed by output without an intervening call to a file
171         positioning function, unless the input oepration encounters
172         end-of-file. */
173     fseek(rrd_file, 0, SEEK_CUR);
174
175     
176     /* get exclusive lock to whole file.
177      * lock gets removed when we close the file.
178      */
179     if (LockRRD(rrd_file) != 0) {
180       rrd_set_error("could not lock RRD");
181       rrd_free(&rrd);
182       fclose(rrd_file);
183       return(-1);   
184     } 
185
186     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
187         rrd_set_error("allocating updvals pointer array");
188         rrd_free(&rrd);
189         fclose(rrd_file);
190         return(-1);
191     }
192
193     if ((pdp_temp = malloc(sizeof(rrd_value_t)
194                            *rrd.stat_head->ds_cnt))==NULL){
195         rrd_set_error("allocating pdp_temp ...");
196         free(updvals);
197         rrd_free(&rrd);
198         fclose(rrd_file);
199         return(-1);
200     }
201
202     if ((tmpl_idx = malloc(sizeof(unsigned long)
203                            *(rrd.stat_head->ds_cnt+1)))==NULL){
204         rrd_set_error("allocating tmpl_idx ...");
205         free(pdp_temp);
206         free(updvals);
207         rrd_free(&rrd);
208         fclose(rrd_file);
209         return(-1);
210     }
211     /* initialize template redirector */
212     /* default config
213        tmpl_idx[0] -> 0; (time)
214        tmpl_idx[1] -> 1; (DS 0)
215        tmpl_idx[2] -> 2; (DS 1)
216        tmpl_idx[3] -> 3; (DS 2)
217        ... */
218     for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
219     tmpl_cnt=rrd.stat_head->ds_cnt+1;
220     if (template) {
221         char *dsname;
222         int tmpl_len;
223         dsname = template;
224         tmpl_cnt = 1; /* the first entry is the time */
225         tmpl_len = strlen(template);
226         for(i=0;i<=tmpl_len ;i++) {
227             if (template[i] == ':' || template[i] == '\0') {
228                 template[i] = '\0';
229                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
230                     rrd_set_error("Template contains more DS definitions than RRD");
231                     free(updvals); free(pdp_temp);
232                     free(tmpl_idx); rrd_free(&rrd);
233                     fclose(rrd_file); return(-1);
234                 }
235                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
236                     rrd_set_error("unknown DS name '%s'",dsname);
237                     free(updvals); free(pdp_temp);
238                     free(tmpl_idx); rrd_free(&rrd);
239                     fclose(rrd_file); return(-1);
240                 } else {
241                   /* the first element is always the time */
242                   tmpl_idx[tmpl_cnt-1]++; 
243                   /* go to the next entry on the template */
244                   dsname = &template[i+1];
245                   /* fix the damage we did before */
246                   if (i<tmpl_len) {
247                      template[i]=':';
248                   } 
249
250                 }
251             }       
252         }
253     }
254     if ((pdp_new = malloc(sizeof(rrd_value_t)
255                           *rrd.stat_head->ds_cnt))==NULL){
256         rrd_set_error("allocating pdp_new ...");
257         free(updvals);
258         free(pdp_temp);
259         free(tmpl_idx);
260         rrd_free(&rrd);
261         fclose(rrd_file);
262         return(-1);
263     }
264
265     /* loop through the arguments. */
266     for(arg_i=optind+1; arg_i<argc;arg_i++) {
267         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
268         char *step_start = stepper;
269         char *p;
270         char *parsetime_error = NULL;
271         enum {atstyle, normal} timesyntax;
272         struct time_value ds_tv;
273         if (stepper == NULL){
274                 rrd_set_error("failed duplication argv entry");
275                 free(updvals);
276                 free(pdp_temp);  
277                 free(tmpl_idx);
278                 rrd_free(&rrd);
279                 fclose(rrd_file);
280                 return(-1);
281          }
282         /* initialize all ds input to unknown except the first one
283            which has always got to be set */
284         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
285         strcpy(stepper,argv[arg_i]);
286         updvals[0]=stepper;
287         /* separate all ds elements; first must be examined separately
288            due to alternate time syntax */
289         if ((p=strchr(stepper,'@'))!=NULL) {
290             timesyntax = atstyle;
291             *p = '\0';
292             stepper = p+1;
293         } else if ((p=strchr(stepper,':'))!=NULL) {
294             timesyntax = normal;
295             *p = '\0';
296             stepper = p+1;
297         } else {
298             rrd_set_error("expected timestamp not found in data source from %s:...",
299                           argv[arg_i]);
300             free(step_start);
301             break;
302         }
303         ii=1;
304         updvals[tmpl_idx[ii]] = stepper;
305         while (*stepper) {
306             if (*stepper == ':') {
307                 *stepper = '\0';
308                 ii++;
309                 if (ii<tmpl_cnt){                   
310                     updvals[tmpl_idx[ii]] = stepper+1;
311                 }
312             }
313             stepper++;
314         }
315
316         if (ii != tmpl_cnt-1) {
317             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
318                           tmpl_cnt-1, ii, argv[arg_i]);
319             free(step_start);
320             break;
321         }
322         
323         /* get the time from the reading ... handle N */
324         if (timesyntax == atstyle) {
325             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
326                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
327                 free(step_start);
328                 break;
329             }
330             if (ds_tv.type == RELATIVE_TO_END_TIME ||
331                 ds_tv.type == RELATIVE_TO_START_TIME) {
332                 rrd_set_error("specifying time relative to the 'start' "
333                               "or 'end' makes no sense here: %s",
334                               updvals[0]);
335                 free(step_start);
336                 break;
337             }
338
339             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
340         } else if (strcmp(updvals[0],"N")==0){
341             current_time = time(NULL);
342         } else {
343             current_time = atol(updvals[0]);
344         }
345         
346         if(current_time <= rrd.live_head->last_up){
347             rrd_set_error("illegal attempt to update using time %ld when "
348                           "last update time is %ld (minimum one second step)",
349                           current_time, rrd.live_head->last_up);
350             free(step_start);
351             break;
352         }
353         
354         
355         /* seek to the beginning of the rra's */
356         if (rra_current != rra_begin) {
357             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
358                 rrd_set_error("seek error in rrd");
359                 free(step_start);
360                 break;
361             }
362             rra_current = rra_begin;
363         }
364         rra_start = rra_begin;
365
366         /* when was the current pdp started */
367         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
368         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
369
370         /* when did the last pdp_st occur */
371         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
372         occu_pdp_st = current_time - occu_pdp_age;
373         interval = current_time - rrd.live_head->last_up;
374     
375         if (occu_pdp_st > proc_pdp_st){
376             /* OK we passed the pdp_st moment*/
377             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
378                                                               * occurred before the latest
379                                                               * pdp_st moment*/
380             post_int = occu_pdp_age;                         /* how much after it */
381         } else {
382             pre_int = interval;
383             post_int = 0;
384         }
385
386 #ifdef DEBUG
387         printf(
388                "proc_pdp_age %lu\t"
389                "proc_pdp_st %lu\t" 
390                "occu_pfp_age %lu\t" 
391                "occu_pdp_st %lu\t"
392                "int %lu\t"
393                "pre_int %lu\t"
394                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
395                 occu_pdp_age, occu_pdp_st,
396                interval, pre_int, post_int);
397 #endif
398     
399         /* process the data sources and update the pdp_prep 
400          * area accordingly */
401         for(i=0;i<rrd.stat_head->ds_cnt;i++){
402             enum dst_en dst_idx;
403             dst_idx= dst_conv(rrd.ds_def[i].dst);
404             if((updvals[i+1][0] != 'U') &&
405                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
406                double rate = DNAN;
407                /* the data source type defines how to process the data */
408                 /* pdp_temp contains rate * time ... eg the bytes
409                  * transferred during the interval. Doing it this way saves
410                  * a lot of math operations */
411                 
412
413                 switch(dst_idx){
414                 case DST_COUNTER:
415                 case DST_DERIVE:
416                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
417                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
418                        if(dst_idx == DST_COUNTER) {
419                           /* simple overflow catcher sugestet by andres kroonmaa */
420                           /* this will fail terribly for non 32 or 64 bit counters ... */
421                           /* are there any others in SNMP land ? */
422                           if (pdp_new[i] < (double)0.0 ) 
423                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
424                           if (pdp_new[i] < (double)0.0 ) 
425                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
426                        }
427                        rate = pdp_new[i] / interval;
428                     }
429                    else {
430                      pdp_new[i]= DNAN;          
431                    }
432                    break;
433                 case DST_ABSOLUTE:
434                     pdp_new[i]= atof(updvals[i+1]);
435                     rate = pdp_new[i] / interval;                 
436                     break;
437                 case DST_GAUGE:
438                     pdp_new[i] = atof(updvals[i+1]) * interval;
439                     rate = pdp_new[i] / interval;                  
440                     break;
441                 default:
442                     rrd_set_error("rrd contains unknown DS type : '%s'",
443                                   rrd.ds_def[i].dst);
444                     break;
445                 }
446                 /* break out of this for loop if the error string is set */
447                 if (rrd_test_error()){
448                     break;
449                 }
450                /* make sure pdp_temp is neither too large or too small
451                 * if any of these occur it becomes unknown ...
452                 * sorry folks ... */
453                if ( ! isnan(rate) && 
454                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
455                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
456                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
457                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
458                   pdp_new[i] = DNAN;
459                }               
460             } else {
461                 /* no news is news all the same */
462                 pdp_new[i] = DNAN;
463             }
464             
465             /* make a copy of the command line argument for the next run */
466 #ifdef DEBUG
467             fprintf(stderr,
468                     "prep ds[%lu]\t"
469                     "last_arg '%s'\t"
470                     "this_arg '%s'\t"
471                     "pdp_new %10.2f\n",
472                     i,
473                     rrd.pdp_prep[i].last_ds,
474                     updvals[i+1], pdp_new[i]);
475 #endif
476             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
477                 strncpy(rrd.pdp_prep[i].last_ds,
478                         updvals[i+1],LAST_DS_LEN-1);
479                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
480             }
481         }
482         /* break out of the argument parsing loop if the error_string is set */
483         if (rrd_test_error()){
484             free(step_start);
485             break;
486         }
487         /* has a pdp_st moment occurred since the last run ? */
488
489         if (proc_pdp_st == occu_pdp_st){
490             /* no we have not passed a pdp_st moment. therefore update is simple */
491
492             for(i=0;i<rrd.stat_head->ds_cnt;i++){
493                 if(isnan(pdp_new[i]))
494                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
495                 else
496                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
497 #ifdef DEBUG
498                 fprintf(stderr,
499                         "NO PDP  ds[%lu]\t"
500                         "value %10.2f\t"
501                         "unkn_sec %5lu\n",
502                         i,
503                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
504                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
505 #endif
506             }   
507         } else {
508             /* an pdp_st has occurred. */
509
510             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
511              * occurred up to the last run.        
512             pdp_new[] contains rate*seconds from the latest run.
513             pdp_temp[] will contain the rate for cdp */
514
515
516             for(i=0;i<rrd.stat_head->ds_cnt;i++){
517                 /* update pdp_prep to the current pdp_st */
518                 if(isnan(pdp_new[i]))
519                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
520                 else
521                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
522                         pdp_new[i]/(double)interval*(double)pre_int;
523
524                 /* if too much of the pdp_prep is unknown we dump it */
525                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
526                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
527                     (occu_pdp_st-proc_pdp_st <= 
528                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
529                     pdp_temp[i] = DNAN;
530                 } else {
531                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
532                         / (double)( occu_pdp_st
533                                    - proc_pdp_st
534                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
535                 }
536                 /* make pdp_prep ready for the next run */
537                 if(isnan(pdp_new[i])){
538                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
539                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
540                 } else {
541                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
542                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
543                         pdp_new[i]/(double)interval*(double)post_int;
544                 }
545
546 #ifdef DEBUG
547                 fprintf(stderr,
548                         "PDP UPD ds[%lu]\t"
549                         "pdp_temp %10.2f\t"
550                         "new_prep %10.2f\t"
551                         "new_unkn_sec %5lu\n",
552                         i, pdp_temp[i],
553                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
554                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
555 #endif
556             }
557
558                 /* compute the number of elapsed pdp_st moments */
559                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
560 #ifdef DEBUG
561                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
562 #endif
563                 if (rra_step_cnt == NULL)
564                 {
565                    rra_step_cnt = (unsigned long *) 
566                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
567                 }
568
569             for(i = 0, rra_start = rra_begin;
570                 i < rrd.stat_head->rra_cnt;
571             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
572                 i++)
573                 {
574                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
575                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
576                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
577         if (start_pdp_offset <= elapsed_pdp_st) {
578            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
579                       rrd.rra_def[i].pdp_cnt + 1;
580             } else {
581                    rra_step_cnt[i] = 0;
582                 }
583
584                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
585                 {
586                    /* If this is a bulk update, we need to skip ahead in the seasonal
587                         * arrays so that they will be correct for the next observed value;
588                         * note that for the bulk update itself, no update will occur to
589                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
590                         * be set to DNAN. */
591            if (rra_step_cnt[i] > 2) 
592                    {
593                           /* skip update by resetting rra_step_cnt[i],
594                            * note that this is not data source specific; this is due
595                            * to the bulk update, not a DNAN value for the specific data
596                            * source. */
597                           rra_step_cnt[i] = 0;
598               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
599                              &last_seasonal_coef);
600                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
601                              &seasonal_coef);
602                    }
603                 
604                   /* periodically run a smoother for seasonal effects */
605                   /* Need to use first cdp parameter buffer to track
606                    * burnin (burnin requires a specific smoothing schedule).
607                    * The CDP_init_seasonal parameter is really an RRA level,
608                    * not a data source within RRA level parameter, but the rra_def
609                    * is read only for rrd_update (not flushed to disk). */
610                   iii = i*(rrd.stat_head -> ds_cnt);
611                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
612                           <= BURNIN_CYCLES)
613                   {
614                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
615                                  > rrd.rra_def[i].row_cnt - 1) {
616                            /* mark off one of the burnin cycles */
617                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
618                        schedule_smooth = 1;
619                          }  
620                   } else {
621                          /* someone has no doubt invented a trick to deal with this
622                           * wrap around, but at least this code is clear. */
623                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
624                              rrd.rra_ptr[i].cur_row)
625                          {
626                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
627                                   * mapping between PDP and CDP */
628                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
629                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
630                                  {
631 #ifdef DEBUG
632                                         fprintf(stderr,
633                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
634                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
635                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
636 #endif
637                                         schedule_smooth = 1;
638                                  }
639              } else {
640                                  /* can't rely on negative numbers because we are working with
641                                   * unsigned values */
642                                  /* Don't need modulus here. If we've wrapped more than once, only
643                                   * one smooth is executed at the end. */
644                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
645                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
646                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
647                                  {
648 #ifdef DEBUG
649                                         fprintf(stderr,
650                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
651                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
652                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
653 #endif
654                                         schedule_smooth = 1;
655                                  }
656                          }
657                   }
658
659               rra_current = ftell(rrd_file); 
660                 } /* if cf is DEVSEASONAL or SEASONAL */
661
662         if (rrd_test_error()) break;
663
664                     /* update CDP_PREP areas */
665                     /* loop over data soures within each RRA */
666                     for(ii = 0;
667                         ii < rrd.stat_head->ds_cnt;
668                         ii++)
669                         {
670                         
671                         /* iii indexes the CDP prep area for this data source within the RRA */
672                         iii=i*rrd.stat_head->ds_cnt+ii;
673
674                         if (rrd.rra_def[i].pdp_cnt > 1) {
675                           
676                            if (rra_step_cnt[i] > 0) {
677                            /* If we are in this block, as least 1 CDP value will be written to
678                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
679                                 * to be written, then the "fill in" value is the CDP_secondary_val
680                                 * entry. */
681                                   if (isnan(pdp_temp[ii]))
682                   {
683                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
684                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
685                                   } else {
686                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
687                                           * CDP data entries. No matter the CF, the value is the same because
688                                           * the average, max, min, and last of a list of identical values is
689                                           * the same, namely, the value itself. */
690                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
691                                   }
692                      
693                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
694                                       > rrd.rra_def[i].pdp_cnt*
695                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
696                                   {
697                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
698                                          /* initialize carry over */
699                                          if (current_cf == CF_AVERAGE) {
700                                                    if (isnan(pdp_temp[ii])) { 
701                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
702                                                    } else {
703                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
704                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
705                                                    }
706                                          } else {
707                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
708                                          }
709                                   } else {
710                                          rrd_value_t cum_val, cur_val; 
711                                      switch (current_cf) {
712                                                 case CF_AVERAGE:
713                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
714                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
715                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
716                                                (cum_val + cur_val) /
717                                            (rrd.rra_def[i].pdp_cnt
718                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
719                                                    /* initialize carry over value */
720                                                    if (isnan(pdp_temp[ii])) { 
721                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
722                                                    } else {
723                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
724                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
725                                                    }
726                                                    break;
727                                                 case CF_MAXIMUM:
728                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
729                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
730 #ifdef DEBUG
731                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
732                                                           isnan(pdp_temp[ii])) {
733                                                      fprintf(stderr,
734                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
735                                                                 i,ii);
736                                                          exit(-1);
737                                                   }
738 #endif
739                                                   if (cur_val > cum_val)
740                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
741                                                   else
742                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
743                                                   /* initialize carry over value */
744                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
745                                                   break;
746                                                 case CF_MINIMUM:
747                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
748                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
749 #ifdef DEBUG
750                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
751                                                           isnan(pdp_temp[ii])) {
752                                                      fprintf(stderr,
753                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
754                                                                 i,ii);
755                                                          exit(-1);
756                                                   }
757 #endif
758                                                   if (cur_val < cum_val)
759                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
760                                                   else
761                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
762                                                   /* initialize carry over value */
763                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
764                                                   break;
765                                                 case CF_LAST:
766                                                 default:
767                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
768                                                    /* initialize carry over value */
769                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
770                                                 break;
771                                          }
772                                   } /* endif meets xff value requirement for a valid value */
773                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
774                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
775                                   if (isnan(pdp_temp[ii]))
776                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
777                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
778                                   else
779                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
780                } else  /* rra_step_cnt[i]  == 0 */
781                            {
782 #ifdef DEBUG
783                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
784                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
785                                          i,ii);
786                                   } else {
787                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
788                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
789                                   }
790 #endif
791                                   if (isnan(pdp_temp[ii])) {
792                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
793                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
794                                   {
795                                          if (current_cf == CF_AVERAGE) {
796                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
797                                                    elapsed_pdp_st;
798                                          } else {
799                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
800                                          }
801 #ifdef DEBUG
802                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
803                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
804 #endif
805                                   } else {
806                                          switch (current_cf) {
807                                          case CF_AVERAGE:
808                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
809                                                    elapsed_pdp_st;
810                                                 break;
811                                          case CF_MINIMUM:
812                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
813                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
814                                                 break; 
815                                          case CF_MAXIMUM:
816                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
817                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
818                                                 break; 
819                                          case CF_LAST:
820                                          default:
821                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
822                                                 break;
823                                          }
824                                   }
825                            }
826                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
827                            if (elapsed_pdp_st > 2)
828                            {
829                                    switch (current_cf) {
830                                    case CF_AVERAGE:
831                                    default:
832                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
833                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
834                                           break;
835                    case CF_SEASONAL:
836                                    case CF_DEVSEASONAL:
837                                           /* need to update cached seasonal values, so they are consistent
838                                            * with the bulk update */
839                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
840                                            * CDP_last_deviation are the same. */
841                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
842                                                  last_seasonal_coef[ii];
843                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
844                                                  seasonal_coef[ii];
845                                           break;
846                    case CF_HWPREDICT:
847                                           /* need to update the null_count and last_null_count.
848                                            * even do this for non-DNAN pdp_temp because the
849                                            * algorithm is not learning from batch updates. */
850                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
851                                                  elapsed_pdp_st;
852                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
853                                                  elapsed_pdp_st - 1;
854                                           /* fall through */
855                                    case CF_DEVPREDICT:
856                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
857                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
858                                           break;
859                    case CF_FAILURES:
860                                           /* do not count missed bulk values as failures */
861                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
862                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
863                                           /* need to reset violations buffer.
864                                            * could do this more carefully, but for now, just
865                                            * assume a bulk update wipes away all violations. */
866                       erase_violations(&rrd, iii, i);
867                                           break;
868                                    }
869                            } 
870                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
871
872                         if (rrd_test_error()) break;
873
874                         } /* endif data sources loop */
875         } /* end RRA Loop */
876
877                 /* this loop is only entered if elapsed_pdp_st < 3 */
878                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
879                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
880                 {
881                for(i = 0, rra_start = rra_begin;
882                    i < rrd.stat_head->rra_cnt;
883                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
884                    i++)
885                    {
886                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
887
888                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
889                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
890                           {
891                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
892                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
893                                 &seasonal_coef);
894                           }
895                           if (rrd_test_error()) break;
896                       /* loop over data soures within each RRA */
897                       for(ii = 0;
898                           ii < rrd.stat_head->ds_cnt;
899                           ii++)
900                           {
901                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
902                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
903                                     scratch_idx, seasonal_coef);
904                           }
905            } /* end RRA Loop */
906                    if (rrd_test_error()) break;
907             } /* end elapsed_pdp_st loop */
908
909                 if (rrd_test_error()) break;
910
911                 /* Ready to write to disk */
912                 /* Move sequentially through the file, writing one RRA at a time.
913                  * Note this architecture divorces the computation of CDP with
914                  * flushing updated RRA entries to disk. */
915             for(i = 0, rra_start = rra_begin;
916                 i < rrd.stat_head->rra_cnt;
917             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
918                 i++) {
919                 /* is there anything to write for this RRA? If not, continue. */
920         if (rra_step_cnt[i] == 0) continue;
921
922                 /* write the first row */
923 #ifdef DEBUG
924         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
925 #endif
926             rrd.rra_ptr[i].cur_row++;
927             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
928                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
929                 /* positition on the first row */
930                 rra_pos_tmp = rra_start +
931                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
932                 if(rra_pos_tmp != rra_current) {
933                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
934                       rrd_set_error("seek error in rrd");
935                       break;
936                    }
937                    rra_current = rra_pos_tmp;
938                 }
939 #ifdef DEBUG
940             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
941 #endif
942                 scratch_idx = CDP_primary_val;
943                 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
944                 if (rrd_test_error()) break;
945
946                 /* write other rows of the bulk update, if any */
947                 scratch_idx = CDP_secondary_val;
948                 for ( ; rra_step_cnt[i] > 1; 
949                      rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
950                 {
951                    if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
952                    {
953 #ifdef DEBUG
954               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
955                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
956 #endif
957                           /* wrap */
958                           rrd.rra_ptr[i].cur_row = 0;
959                           /* seek back to beginning of current rra */
960                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
961                           {
962                          rrd_set_error("seek error in rrd");
963                          break;
964                           }
965 #ifdef DEBUG
966                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
967 #endif
968                           rra_current = rra_start;
969                    }
970                    write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
971                 }
972                 
973                 if (rrd_test_error())
974                   break;
975                 } /* RRA LOOP */
976
977             /* break out of the argument parsing loop if error_string is set */
978             if (rrd_test_error()){
979                    free(step_start);
980                    break;
981             } 
982             
983         } /* endif a pdp_st has occurred */ 
984         rrd.live_head->last_up = current_time;
985         free(step_start);
986     } /* function argument loop */
987
988     if (seasonal_coef != NULL) free(seasonal_coef);
989     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
990         if (rra_step_cnt != NULL) free(rra_step_cnt);
991
992     /* if we got here and if there is an error and if the file has not been
993      * written to, then close things up and return. */
994     if (rrd_test_error()) {
995         free(updvals);
996         free(tmpl_idx);
997         rrd_free(&rrd);
998         free(pdp_temp);
999         free(pdp_new);
1000         fclose(rrd_file);
1001         return(-1);
1002     }
1003
1004     /* aargh ... that was tough ... so many loops ... anyway, its done.
1005      * we just need to write back the live header portion now*/
1006
1007     if (fseek(rrd_file, (sizeof(stat_head_t)
1008                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1009                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1010               SEEK_SET) != 0) {
1011         rrd_set_error("seek rrd for live header writeback");
1012         free(updvals);
1013         free(tmpl_idx);
1014         rrd_free(&rrd);
1015         free(pdp_temp);
1016         free(pdp_new);
1017         fclose(rrd_file);
1018         return(-1);
1019     }
1020
1021     if(fwrite( rrd.live_head,
1022                sizeof(live_head_t), 1, rrd_file) != 1){
1023         rrd_set_error("fwrite live_head to rrd");
1024         free(updvals);
1025         rrd_free(&rrd);
1026         free(tmpl_idx);
1027         free(pdp_temp);
1028         free(pdp_new);
1029         fclose(rrd_file);
1030         return(-1);
1031     }
1032
1033     if(fwrite( rrd.pdp_prep,
1034                sizeof(pdp_prep_t),
1035                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1036         rrd_set_error("ftwrite pdp_prep to rrd");
1037         free(updvals);
1038         rrd_free(&rrd);
1039         free(tmpl_idx);
1040         free(pdp_temp);
1041         free(pdp_new);
1042         fclose(rrd_file);
1043         return(-1);
1044     }
1045
1046     if(fwrite( rrd.cdp_prep,
1047                sizeof(cdp_prep_t),
1048                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1049        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1050
1051         rrd_set_error("ftwrite cdp_prep to rrd");
1052         free(updvals);
1053         free(tmpl_idx);
1054         rrd_free(&rrd);
1055         free(pdp_temp);
1056         free(pdp_new);
1057         fclose(rrd_file);
1058         return(-1);
1059     }
1060
1061     if(fwrite( rrd.rra_ptr,
1062                sizeof(rra_ptr_t), 
1063                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1064         rrd_set_error("fwrite rra_ptr to rrd");
1065         free(updvals);
1066         free(tmpl_idx);
1067         rrd_free(&rrd);
1068         free(pdp_temp);
1069         free(pdp_new);
1070         fclose(rrd_file);
1071         return(-1);
1072     }
1073
1074     /* OK now close the files and free the memory */
1075     if(fclose(rrd_file) != 0){
1076         rrd_set_error("closing rrd");
1077         free(updvals);
1078         free(tmpl_idx);
1079         rrd_free(&rrd);
1080         free(pdp_temp);
1081         free(pdp_new);
1082         return(-1);
1083     }
1084
1085     /* calling the smoothing code here guarantees at most
1086          * one smoothing operation per rrd_update call. Unfortunately,
1087          * it is possible with bulk updates, or a long-delayed update
1088          * for smoothing to occur off-schedule. This really isn't
1089          * critical except during the burning cycles. */
1090         if (schedule_smooth)
1091         {
1092 #ifndef WIN32
1093           rrd_file = fopen(argv[optind],"r+");
1094 #else
1095           rrd_file = fopen(argv[optind],"rb+");
1096 #endif
1097           rra_start = rra_begin;
1098           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1099           {
1100             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1101                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1102             {
1103 #ifdef DEBUG
1104               fprintf(stderr,"Running smoother for rra %ld\n",i);
1105 #endif
1106               apply_smoother(&rrd,i,rra_start,rrd_file);
1107               if (rrd_test_error())
1108                 break;
1109             }
1110             rra_start += rrd.rra_def[i].row_cnt
1111               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1112           }
1113           fclose(rrd_file);
1114         }
1115     rrd_free(&rrd);
1116     free(updvals);
1117     free(tmpl_idx);
1118     free(pdp_new);
1119     free(pdp_temp);
1120     return(0);
1121 }
1122
1123 /*
1124  * get exclusive lock to whole file.
1125  * lock gets removed when we close the file
1126  *
1127  * returns 0 on success
1128  */
1129 int
1130 LockRRD(FILE *rrdfile)
1131 {
1132     int rrd_fd;         /* File descriptor for RRD */
1133     int                 stat;
1134
1135     rrd_fd = fileno(rrdfile);
1136
1137         {
1138 #ifndef WIN32    
1139                 struct flock    lock;
1140     lock.l_type = F_WRLCK;    /* exclusive write lock */
1141     lock.l_len = 0;           /* whole file */
1142     lock.l_start = 0;         /* start of file */
1143     lock.l_whence = SEEK_SET;   /* end of file */
1144
1145     stat = fcntl(rrd_fd, F_SETLK, &lock);
1146 #else
1147                 struct _stat st;
1148
1149                 if ( _fstat( rrd_fd, &st ) == 0 ) {
1150                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1151                 } else {
1152                         stat = -1;
1153                 }
1154 #endif
1155         }
1156
1157     return(stat);
1158 }
1159
1160
1161 void
1162 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1163    unsigned short CDP_scratch_idx, FILE *rrd_file)
1164 {
1165    unsigned long ds_idx, cdp_idx;
1166
1167    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1168    {
1169       /* compute the cdp index */
1170       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1171 #ifdef DEBUG
1172           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1173              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1174              rrd -> rra_def[rra_idx].cf_nam);
1175 #endif
1176
1177           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1178                  sizeof(rrd_value_t),1,rrd_file) != 1)
1179           { 
1180              rrd_set_error("writing rrd");
1181                  return;
1182           }
1183           *rra_current += sizeof(rrd_value_t);
1184         }
1185 }