Initial revision
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.0.33  Copyright Tobias Oetiker, 1997 - 2000
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.1  2001/02/25 22:25:06  oetiker
9  * Initial revision
10  *
11  *****************************************************************************/
12
13 #include "rrd_tool.h"
14 #include <sys/types.h>
15 #include <fcntl.h>
16
17 #ifdef WIN32
18  #include <sys/locking.h>
19  #include <sys/stat.h>
20  #include <io.h>
21 #endif
22
23
24 /* Prototypes */
25 int LockRRD(FILE *rrd_file);
26
27 /*#define DEBUG */
28
29
30 #ifdef STANDALONE
31 int 
32 main(int argc, char **argv){
33         rrd_update(argc,argv);
34         if (rrd_test_error()) {
35                 printf("RRDtool 1.0.33  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
36                         "Usage: rrdupdate filename\n"
37                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
38                         "\t\t\ttime|N:value[:value...]\n\n"
39                         "\t\t\t[ time:value[:value...] ..]\n\n");
40                                    
41                 printf("ERROR: %s\n",rrd_get_error());
42                 rrd_clear_error();                                                            
43                 return 1;
44         }
45         return 0;
46 }
47 #endif
48
49 int
50 rrd_update(int argc, char **argv)
51 {
52
53     int              arg_i = 2;
54     long             i,ii,iii;
55
56     unsigned long    rra_begin;          /* byte pointer to the rra
57                                           * area in the rrd file.  this
58                                           * pointer never changes value */
59     unsigned long    rra_start;          /* byte pointer to the rra
60                                           * area in the rrd file.  this
61                                           * pointer changes as each rrd is
62                                           * processed. */
63     unsigned long    rra_current;        /* byte pointer to the current write
64                                           * spot in the rrd file. */
65     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
66     unsigned long    interval,
67         pre_int,post_int;                /* interval between this and
68                                           * the last run */
69     unsigned long    proc_pdp_st;        /* which pdp_st was the last
70                                           * to be processed */
71     unsigned long    occu_pdp_st;        /* when was the pdp_st
72                                           * before the last update
73                                           * time */
74     unsigned long    proc_pdp_age;       /* how old was the data in
75                                           * the pdp prep area when it
76                                           * was last updated */
77     unsigned long    occu_pdp_age;       /* how long ago was the last
78                                           * pdp_step time */
79     unsigned long    pdp_st;             /* helper for cdp_prep 
80                                           * processing */
81     rrd_value_t      *pdp_new;           /* prepare the incoming data
82                                           * to be added the the
83                                           * existing entry */
84     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
85                                           * to be added the the
86                                           * cdp values */
87
88     long             *tmpl_idx;          /* index representing the settings
89                                             transported by the template index */
90     long             tmpl_cnt = 2;       /* time and data */
91
92     FILE             *rrd_file;
93     rrd_t            rrd;
94     time_t           current_time = time(NULL);
95     char             **updvals;
96     int              wrote_to_file = 0;
97     char             *template = NULL;          
98
99
100     while (1) {
101         static struct option long_options[] =
102         {
103             {"template",      required_argument, 0, 't'},
104             {0,0,0,0}
105         };
106         int option_index = 0;
107         int opt;
108         opt = getopt_long(argc, argv, "t:", 
109                           long_options, &option_index);
110         
111         if (opt == EOF)
112           break;
113         
114         switch(opt) {
115         case 't':
116             template = optarg;
117             break;
118
119         case '?':
120             rrd_set_error("unknown option '%s'",argv[optind-1]);
121             rrd_free(&rrd);
122             return(-1);
123         }
124     }
125
126     /* need at least 2 arguments: filename, data. */
127     if (argc-optind < 2) {
128         rrd_set_error("Not enough arguments");
129         return -1;
130     }
131
132     if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
133         return -1;
134     }
135     rra_current = rra_start = rra_begin = ftell(rrd_file);
136     /* This is defined in the ANSI C standard, section 7.9.5.3:
137
138         When a file is opened with udpate mode ('+' as the second
139         or third character in the ... list of mode argument
140         variables), both input and ouptut may be performed on the
141         associated stream.  However, ...  input may not be directly
142         followed by output without an intervening call to a file
143         positioning function, unless the input oepration encounters
144         end-of-file. */
145     fseek(rrd_file, 0, SEEK_CUR);
146
147     
148     /* get exclusive lock to whole file.
149      * lock gets removed when we close the file.
150      */
151     if (LockRRD(rrd_file) != 0) {
152       rrd_set_error("could not lock RRD");
153       rrd_free(&rrd);
154       fclose(rrd_file);
155       return(-1);   
156     }
157
158     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
159         rrd_set_error("allocating updvals pointer array");
160         rrd_free(&rrd);
161         fclose(rrd_file);
162         return(-1);
163     }
164
165     if ((pdp_temp = malloc(sizeof(rrd_value_t)
166                            *rrd.stat_head->ds_cnt))==NULL){
167         rrd_set_error("allocating pdp_temp ...");
168         free(updvals);
169         rrd_free(&rrd);
170         fclose(rrd_file);
171         return(-1);
172     }
173
174     if ((tmpl_idx = malloc(sizeof(unsigned long)
175                            *(rrd.stat_head->ds_cnt+1)))==NULL){
176         rrd_set_error("allocating tmpl_idx ...");
177         free(pdp_temp);
178         free(updvals);
179         rrd_free(&rrd);
180         fclose(rrd_file);
181         return(-1);
182     }
183     /* initialize template redirector */
184     /* default config
185        tmpl_idx[0] -> 0; (time)
186        tmpl_idx[1] -> 1; (DS 0)
187        tmpl_idx[2] -> 2; (DS 1)
188        tmpl_idx[3] -> 3; (DS 2)
189        ... */
190     for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
191     tmpl_cnt=rrd.stat_head->ds_cnt+1;
192     if (template) {
193         char *dsname;
194         int tmpl_len;
195         dsname = template;
196         tmpl_cnt = 1; /* the first entry is the time */
197         tmpl_len = strlen(template);
198         for(i=0;i<=tmpl_len ;i++) {
199             if (template[i] == ':' || template[i] == '\0') {
200                 template[i] = '\0';
201                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
202                     rrd_set_error("Template contains more DS definitions than RRD");
203                     free(updvals); free(pdp_temp);
204                     free(tmpl_idx); rrd_free(&rrd);
205                     fclose(rrd_file); return(-1);
206                 }
207                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
208                     rrd_set_error("unknown DS name '%s'",dsname);
209                     free(updvals); free(pdp_temp);
210                     free(tmpl_idx); rrd_free(&rrd);
211                     fclose(rrd_file); return(-1);
212                 } else {
213                   /* the first element is always the time */
214                   tmpl_idx[tmpl_cnt-1]++; 
215                   /* go to the next entry on the template */
216                   dsname = &template[i+1];
217                   /* fix the damage we did before */
218                   if (i<tmpl_len) {
219                      template[i]=':';
220                   } 
221
222                 }
223             }       
224         }
225     }
226     if ((pdp_new = malloc(sizeof(rrd_value_t)
227                           *rrd.stat_head->ds_cnt))==NULL){
228         rrd_set_error("allocating pdp_new ...");
229         free(updvals);
230         free(pdp_temp);
231         free(tmpl_idx);
232         rrd_free(&rrd);
233         fclose(rrd_file);
234         return(-1);
235     }
236
237     /* loop through the arguments. */
238     for(arg_i=optind+1; arg_i<argc;arg_i++) {
239         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
240         char *step_start = stepper;
241         if (stepper == NULL){
242                 rrd_set_error("faild duplication argv entry");
243                 free(updvals);
244                 free(pdp_temp);  
245                 free(tmpl_idx);
246                 rrd_free(&rrd);
247                 fclose(rrd_file);
248                 return(-1);
249          }
250         /* initialize all ds input to unknown except the first one
251            which has always got to be set */
252         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
253         ii=0;
254         strcpy(stepper,argv[arg_i]);
255         updvals[0]=stepper;
256         while (*stepper) {
257             if (*stepper == ':') {
258                 *stepper = '\0';
259                 ii++;
260                 if (ii<tmpl_cnt){                   
261                     updvals[tmpl_idx[ii]] = stepper+1;
262                 }
263             }
264             stepper++;
265         }
266
267         if (ii != tmpl_cnt-1) {
268             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
269                           tmpl_cnt-1, ii, argv[arg_i]);
270             free(step_start);
271             break;
272         }
273         
274         /* get the time from the reading ... handle N */
275         if (strcmp(updvals[0],"N")==0){
276             current_time = time(NULL);
277         } else {
278             current_time = atol(updvals[0]);
279         }
280         
281         if(current_time <= rrd.live_head->last_up){
282             rrd_set_error("illegal attempt to update using time %ld when "
283                           "last update time is %ld (minimum one second step)",
284                           current_time, rrd.live_head->last_up);
285             free(step_start);
286             break;
287         }
288         
289         
290         /* seek to the beginning of the rrd's */
291         if (rra_current != rra_begin) {
292             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
293                 rrd_set_error("seek error in rrd");
294                 free(step_start);
295                 break;
296             }
297             rra_current = rra_begin;
298         }
299         rra_start = rra_begin;
300
301         /* when was the current pdp started */
302         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
303         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
304
305         /* when did the last pdp_st occur */
306         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
307         occu_pdp_st = current_time - occu_pdp_age;
308         interval = current_time - rrd.live_head->last_up;
309     
310         if (occu_pdp_st > proc_pdp_st){
311             /* OK we passed the pdp_st moment*/
312             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
313                                                               * occurred before the latest
314                                                               * pdp_st moment*/
315             post_int = occu_pdp_age;                         /* how much after it */
316         } else {
317             pre_int = interval;
318             post_int = 0;
319         }
320
321 #ifdef DEBUG
322         printf(
323                "proc_pdp_age %lu\t"
324                "proc_pdp_st %lu\t" 
325                "occu_pfp_age %lu\t" 
326                "occu_pdp_st %lu\t"
327                "int %lu\t"
328                "pre_int %lu\t"
329                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
330                 occu_pdp_age, occu_pdp_st,
331                interval, pre_int, post_int);
332 #endif
333     
334         /* process the data sources and update the pdp_prep 
335          * area accordingly */
336         for(i=0;i<rrd.stat_head->ds_cnt;i++){
337             enum dst_en dst_idx;
338             dst_idx= dst_conv(rrd.ds_def[i].dst);
339             if((updvals[i+1][0] != 'U') &&
340                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
341                double rate = DNAN;
342                /* the data source type defines how to process the data */
343                 /* pdp_temp contains rate * time ... eg the bytes
344                  * transferred during the interval. Doing it this way saves
345                  * a lot of math operations */
346                 
347
348                 switch(dst_idx){
349                 case DST_COUNTER:
350                 case DST_DERIVE:
351                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
352                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
353                        if(dst_idx == DST_COUNTER) {
354                           /* simple overflow catcher sugestet by andres kroonmaa */
355                           /* this will fail terribly for non 32 or 64 bit counters ... */
356                           /* are there any others in SNMP land ? */
357                           if (pdp_new[i] < (double)0.0 ) 
358                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
359                           if (pdp_new[i] < (double)0.0 ) 
360                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
361                        }
362                        rate = pdp_new[i] / interval;
363                     }
364                    else {
365                      pdp_new[i]= DNAN;          
366                    }
367                    break;
368                 case DST_ABSOLUTE:
369                     pdp_new[i]= atof(updvals[i+1]);
370                     rate = pdp_new[i] / interval;                 
371                     break;
372                 case DST_GAUGE:
373                     pdp_new[i] = atof(updvals[i+1]) * interval;
374                     rate = pdp_new[i] / interval;                  
375                     break;
376                 default:
377                     rrd_set_error("rrd contains unknown DS type : '%s'",
378                                   rrd.ds_def[i].dst);
379                     break;
380                 }
381                 /* break out of this for loop if the error string is set */
382                 if (rrd_test_error()){
383                     break;
384                 }
385                /* make sure pdp_temp is neither too large or too small
386                 * if any of these occur it becomes unknown ...
387                 * sorry folks ... */
388                if ( ! isnan(rate) && 
389                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
390                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
391                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
392                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
393                   pdp_new[i] = DNAN;
394                }               
395             } else {
396                 /* no news is news all the same */
397                 pdp_new[i] = DNAN;
398             }
399             
400             /* make a copy of the command line argument for the next run */
401 #ifdef DEBUG
402             fprintf(stderr,
403                     "prep ds[%lu]\t"
404                     "last_arg '%s'\t"
405                     "this_arg '%s'\t"
406                     "pdp_new %10.2f\n",
407                     i,
408                     rrd.pdp_prep[i].last_ds,
409                     updvals[i+1], pdp_new[i]);
410 #endif
411             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
412                 strncpy(rrd.pdp_prep[i].last_ds,
413                         updvals[i+1],LAST_DS_LEN-1);
414                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
415             }
416         }
417         /* break out of the argument parsing loop if the error_string is set */
418         if (rrd_test_error()){
419             free(step_start);
420             break;
421         }
422         /* has a pdp_st moment occurred since the last run ? */
423
424         if (proc_pdp_st == occu_pdp_st){
425             /* no we have not passed a pdp_st moment. therefore update is simple */
426
427             for(i=0;i<rrd.stat_head->ds_cnt;i++){
428                 if(isnan(pdp_new[i]))
429                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
430                 else
431                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
432 #ifdef DEBUG
433                 fprintf(stderr,
434                         "NO PDP  ds[%lu]\t"
435                         "value %10.2f\t"
436                         "unkn_sec %5lu\n",
437                         i,
438                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
439                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
440 #endif
441             }   
442         } else {
443             /* an pdp_st has occurred. */
444
445             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
446              * occurred up to the last run.        
447             pdp_new[] contains rate*seconds from the latest run.
448             pdp_temp[] will contain the rate for cdp */
449
450
451             for(i=0;i<rrd.stat_head->ds_cnt;i++){
452                 /* update pdp_prep to the current pdp_st */
453                 if(isnan(pdp_new[i]))
454                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
455                 else
456                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
457                         pdp_new[i]/(double)interval*(double)pre_int;
458
459                 /* if too much of the pdp_prep is unknown we dump it */
460                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
461                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
462                     (occu_pdp_st-proc_pdp_st <= 
463                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
464                     pdp_temp[i] = DNAN;
465                 } else {
466                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
467                         / (double)( occu_pdp_st
468                                    - proc_pdp_st
469                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
470                 }
471                 /* make pdp_prep ready for the next run */
472                 if(isnan(pdp_new[i])){
473                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
474                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
475                 } else {
476                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
477                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
478                         pdp_new[i]/(double)interval*(double)post_int;
479                 }
480
481 #ifdef DEBUG
482                 fprintf(stderr,
483                         "PDP UPD ds[%lu]\t"
484                         "pdp_temp %10.2f\t"
485                         "new_prep %10.2f\t"
486                         "new_unkn_sec %5lu\n",
487                         i, pdp_temp[i],
488                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
489                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
490 #endif
491             }
492
493
494             /* now we have to integrate this data into the cdp_prep areas */
495             /* going through the round robin archives */
496             for(i = 0;
497                 i < rrd.stat_head->rra_cnt;
498                 i++){
499                 enum cf_en current_cf = cf_conv(rrd.rra_def[i].cf_nam);
500                 /* going through all pdp_st moments which have occurred 
501                  * since the last run */
502                 for(pdp_st  = proc_pdp_st+rrd.stat_head->pdp_step; 
503                     pdp_st <= occu_pdp_st; 
504                     pdp_st += rrd.stat_head->pdp_step){
505
506 #ifdef DEBUG
507                     fprintf(stderr,"RRA %lu STEP %lu\n",i,pdp_st);
508 #endif
509
510                     if((pdp_st %
511                         (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
512
513                         /* later on the cdp_prep values will be transferred to
514                          * the rra.  we want to be in the right place. */
515                         rrd.rra_ptr[i].cur_row++;
516                         if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
517                             /* oops ... we have to wrap the beast ... */
518                             rrd.rra_ptr[i].cur_row=0;                   
519 #ifdef DEBUG
520                         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
521 #endif
522                         /* determine if a seek is even needed. */
523                         rra_pos_tmp = rra_start +
524                                 rrd.stat_head->ds_cnt*rrd.rra_ptr[i].cur_row*sizeof(rrd_value_t);
525                         if(rra_pos_tmp != rra_current) {
526                             if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
527                                 rrd_set_error("seek error in rrd");
528                                 break;
529                             }
530                             rra_current = rra_pos_tmp;
531                         }
532 #ifdef DEBUG
533                         fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
534 #endif
535                     }
536
537                     for(ii = 0;
538                         ii < rrd.stat_head->ds_cnt;
539                         ii++){
540                         iii=i*rrd.stat_head->ds_cnt+ii;
541                     
542                         /* the contents of cdp_prep[].scratch[CDP_val].u_val depends
543                          * on the consolidation function ! */
544                     
545                         if (isnan(pdp_temp[ii])){    /* pdp is unknown */
546                             rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt++;
547 #ifdef DEBUG
548                             fprintf(stderr,"  ** UNKNOWN ADD %lu\n",
549                                     rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
550 #endif
551                         } else {
552                             if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)){
553                                 /* cdp_prep is unknown when it does not
554                                  * yet contain data. It can not be zero for
555                                  * things like mim and max consolidation
556                                  * functions */
557 #ifdef DEBUG
558                                 fprintf(stderr,"  ** INIT CDP %e\n", pdp_temp[ii]);
559 #endif
560                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
561                             }
562                             else {
563                                 switch (current_cf){
564                                     case CF_AVERAGE:                            
565                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val+=pdp_temp[ii];
566 #ifdef DEBUG
567                                         fprintf(stderr,"  ** AVERAGE %e\n", 
568                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
569 #endif
570                                         break;    
571                                     case CF_MINIMUM:
572                                         if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
573                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
574 #ifdef DEBUG
575                                         fprintf(stderr,"  ** MINIMUM %e\n", 
576                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
577 #endif
578                                         break;
579                                     case CF_MAXIMUM:
580                                         if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
581                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
582 #ifdef DEBUG
583                                         fprintf(stderr,"  ** MAXIMUM %e\n", 
584                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
585 #endif
586                                         break;
587                                     case CF_LAST:
588                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val=pdp_temp[ii];
589 #ifdef DEBUG
590                                         fprintf(stderr,"  ** LAST %e\n", 
591                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
592 #endif
593                                         break;    
594                                     default:
595                                         rrd_set_error("Unknown cf %s",
596                                                       rrd.rra_def[i].cf_nam);
597                                         break;
598                                 }
599                             }
600                         }
601
602
603                         /* is the data in the cdp_prep ready to go into
604                          * its rra ? */
605                         if((pdp_st % 
606                             (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
607
608                             /* prepare cdp_pref for its transition to the rra. */
609                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt 
610                                 > rrd.rra_def[i].pdp_cnt*
611                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
612                                 /* to much of the cdp_prep is unknown ... */
613                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
614                             else if (current_cf == CF_AVERAGE){
615                                 /* for a real average we have to divide
616                                  * the sum we built earlier on. While ignoring
617                                  * the unknown pdps */
618                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val 
619                                         /= (rrd.rra_def[i].pdp_cnt
620                                             -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
621                             }
622                             /* we can write straight away, because we are
623                              * already in the right place ... */
624
625 #ifdef DEBUG
626                             fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld\n",
627                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val,ftell(rrd_file));
628 #endif
629
630                             if(fwrite(&(rrd.cdp_prep[iii].scratch[CDP_val].u_val),
631                                       sizeof(rrd_value_t),1,rrd_file) != 1){
632                                 rrd_set_error("writing rrd");
633                                 break;
634                             }
635                             rra_current += sizeof(rrd_value_t);
636                             wrote_to_file = 1;
637
638 #ifdef DEBUG
639                             fprintf(stderr,"  -- RRA WROTE new at %ld\n",ftell(rrd_file));
640 #endif
641
642                             /* make cdp_prep ready for the next run */
643                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
644                             rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
645                         }
646                     }
647                     /* break out of this loop if error_string has been set */
648                     if (rrd_test_error())
649                         break;
650                 }
651                 /* break out of this loop if error_string has been set */
652                 if (rrd_test_error())
653                     break;
654                 /* to be able to position correctly in the next rra w move
655                  * the rra_start pointer on to the next rra */
656                 rra_start += rrd.rra_def[i].row_cnt
657                         *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
658
659             }
660             /* break out of the argument parsing loop if error_string is set */
661             if (rrd_test_error()){
662                 free(step_start);
663                 break;
664             }
665         }
666         rrd.live_head->last_up = current_time;
667         free(step_start);
668     }
669
670
671     /* if we got here and if there is an error and if the file has not been
672      * written to, then close things up and return. */
673     if (rrd_test_error()) {
674         free(updvals);
675         free(tmpl_idx);
676         rrd_free(&rrd);
677         free(pdp_temp);
678         free(pdp_new);
679         fclose(rrd_file);
680         return(-1);
681     }
682
683     /* aargh ... that was tough ... so many loops ... anyway, its done.
684      * we just need to write back the live header portion now*/
685
686     if (fseek(rrd_file, (sizeof(stat_head_t)
687                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
688                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
689               SEEK_SET) != 0) {
690         rrd_set_error("seek rrd for live header writeback");
691         free(updvals);
692         free(tmpl_idx);
693         rrd_free(&rrd);
694         free(pdp_temp);
695         free(pdp_new);
696         fclose(rrd_file);
697         return(-1);
698     }
699
700     if(fwrite( rrd.live_head,
701                sizeof(live_head_t), 1, rrd_file) != 1){
702         rrd_set_error("fwrite live_head to rrd");
703         free(updvals);
704         rrd_free(&rrd);
705         free(tmpl_idx);
706         free(pdp_temp);
707         free(pdp_new);
708         fclose(rrd_file);
709         return(-1);
710     }
711
712     if(fwrite( rrd.pdp_prep,
713                sizeof(pdp_prep_t),
714                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
715         rrd_set_error("ftwrite pdp_prep to rrd");
716         free(updvals);
717         rrd_free(&rrd);
718         free(tmpl_idx);
719         free(pdp_temp);
720         free(pdp_new);
721         fclose(rrd_file);
722         return(-1);
723     }
724
725     if(fwrite( rrd.cdp_prep,
726                sizeof(cdp_prep_t),
727                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
728        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
729
730         rrd_set_error("ftwrite cdp_prep to rrd");
731         free(updvals);
732         free(tmpl_idx);
733         rrd_free(&rrd);
734         free(pdp_temp);
735         free(pdp_new);
736         fclose(rrd_file);
737         return(-1);
738     }
739
740     if(fwrite( rrd.rra_ptr,
741                sizeof(rra_ptr_t), 
742                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
743         rrd_set_error("fwrite rra_ptr to rrd");
744         free(updvals);
745         free(tmpl_idx);
746         rrd_free(&rrd);
747         free(pdp_temp);
748         free(pdp_new);
749         fclose(rrd_file);
750         return(-1);
751     }
752
753     /* OK now close the files and free the memory */
754     if(fclose(rrd_file) != 0){
755         rrd_set_error("closing rrd");
756         free(updvals);
757         free(tmpl_idx);
758         rrd_free(&rrd);
759         free(pdp_temp);
760         free(pdp_new);
761         return(-1);
762     }
763
764     rrd_free(&rrd);
765     free(updvals);
766     free(tmpl_idx);
767     free(pdp_new);
768     free(pdp_temp);
769     return(0);
770 }
771
772 /*
773  * get exclusive lock to whole file.
774  * lock gets removed when we close the file
775  *
776  * returns 0 on success
777  */
778 int
779 LockRRD(FILE *rrdfile)
780 {
781     int rrd_fd;         /* File descriptor for RRD */
782     int                 stat;
783
784     rrd_fd = fileno(rrdfile);
785
786         {
787 #ifndef WIN32    
788                 struct flock    lock;
789     lock.l_type = F_WRLCK;    /* exclusive write lock */
790     lock.l_len = 0;           /* whole file */
791     lock.l_start = 0;         /* start of file */
792     lock.l_whence = SEEK_SET;   /* end of file */
793
794     stat = fcntl(rrd_fd, F_SETLK, &lock);
795 #else
796                 struct _stat st;
797
798                 if ( _fstat( rrd_fd, &st ) == 0 ) {
799                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
800                 } else {
801                         stat = -1;
802                 }
803 #endif
804         }
805
806     return(stat);
807 }