1e5a83e07c694c283ddd552ab3b53bd9e247e44b
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.0.33  Copyright Tobias Oetiker, 1997 - 2000
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.2  2001/03/04 11:14:25  oetiker
9  * added at-style-time@value:value syntax to rrd_update
10  * --  Dave Bodenstab <imdave@mcs.net>
11  *
12  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
13  * checkin
14  *
15  *****************************************************************************/
16
17 #include "rrd_tool.h"
18 #include <sys/types.h>
19 #include <fcntl.h>
20
21 #ifdef WIN32
22  #include <sys/locking.h>
23  #include <sys/stat.h>
24  #include <io.h>
25 #endif
26
27
28 /* Prototypes */
29 int LockRRD(FILE *rrd_file);
30
31 /*#define DEBUG */
32
33
34 #ifdef STANDALONE
35 int 
36 main(int argc, char **argv){
37         rrd_update(argc,argv);
38         if (rrd_test_error()) {
39                 printf("RRDtool 1.0.33  Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
40                         "Usage: rrdupdate filename\n"
41                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
42                         "\t\t\ttime|N:value[:value...]\n\n"
43                         "\t\t\tat-time@value[:value...]\n\n"
44                         "\t\t\t[ time:value[:value...] ..]\n\n");
45                                    
46                 printf("ERROR: %s\n",rrd_get_error());
47                 rrd_clear_error();                                                            
48                 return 1;
49         }
50         return 0;
51 }
52 #endif
53
54 int
55 rrd_update(int argc, char **argv)
56 {
57
58     int              arg_i = 2;
59     long             i,ii,iii;
60
61     unsigned long    rra_begin;          /* byte pointer to the rra
62                                           * area in the rrd file.  this
63                                           * pointer never changes value */
64     unsigned long    rra_start;          /* byte pointer to the rra
65                                           * area in the rrd file.  this
66                                           * pointer changes as each rrd is
67                                           * processed. */
68     unsigned long    rra_current;        /* byte pointer to the current write
69                                           * spot in the rrd file. */
70     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
71     unsigned long    interval,
72         pre_int,post_int;                /* interval between this and
73                                           * the last run */
74     unsigned long    proc_pdp_st;        /* which pdp_st was the last
75                                           * to be processed */
76     unsigned long    occu_pdp_st;        /* when was the pdp_st
77                                           * before the last update
78                                           * time */
79     unsigned long    proc_pdp_age;       /* how old was the data in
80                                           * the pdp prep area when it
81                                           * was last updated */
82     unsigned long    occu_pdp_age;       /* how long ago was the last
83                                           * pdp_step time */
84     unsigned long    pdp_st;             /* helper for cdp_prep 
85                                           * processing */
86     rrd_value_t      *pdp_new;           /* prepare the incoming data
87                                           * to be added the the
88                                           * existing entry */
89     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
90                                           * to be added the the
91                                           * cdp values */
92
93     long             *tmpl_idx;          /* index representing the settings
94                                             transported by the template index */
95     long             tmpl_cnt = 2;       /* time and data */
96
97     FILE             *rrd_file;
98     rrd_t            rrd;
99     time_t           current_time = time(NULL);
100     char             **updvals;
101     int              wrote_to_file = 0;
102     char             *template = NULL;          
103
104
105     while (1) {
106         static struct option long_options[] =
107         {
108             {"template",      required_argument, 0, 't'},
109             {0,0,0,0}
110         };
111         int option_index = 0;
112         int opt;
113         opt = getopt_long(argc, argv, "t:", 
114                           long_options, &option_index);
115         
116         if (opt == EOF)
117           break;
118         
119         switch(opt) {
120         case 't':
121             template = optarg;
122             break;
123
124         case '?':
125             rrd_set_error("unknown option '%s'",argv[optind-1]);
126             rrd_free(&rrd);
127             return(-1);
128         }
129     }
130
131     /* need at least 2 arguments: filename, data. */
132     if (argc-optind < 2) {
133         rrd_set_error("Not enough arguments");
134         return -1;
135     }
136
137     if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
138         return -1;
139     }
140     rra_current = rra_start = rra_begin = ftell(rrd_file);
141     /* This is defined in the ANSI C standard, section 7.9.5.3:
142
143         When a file is opened with udpate mode ('+' as the second
144         or third character in the ... list of mode argument
145         variables), both input and ouptut may be performed on the
146         associated stream.  However, ...  input may not be directly
147         followed by output without an intervening call to a file
148         positioning function, unless the input oepration encounters
149         end-of-file. */
150     fseek(rrd_file, 0, SEEK_CUR);
151
152     
153     /* get exclusive lock to whole file.
154      * lock gets removed when we close the file.
155      */
156     if (LockRRD(rrd_file) != 0) {
157       rrd_set_error("could not lock RRD");
158       rrd_free(&rrd);
159       fclose(rrd_file);
160       return(-1);   
161     }
162
163     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
164         rrd_set_error("allocating updvals pointer array");
165         rrd_free(&rrd);
166         fclose(rrd_file);
167         return(-1);
168     }
169
170     if ((pdp_temp = malloc(sizeof(rrd_value_t)
171                            *rrd.stat_head->ds_cnt))==NULL){
172         rrd_set_error("allocating pdp_temp ...");
173         free(updvals);
174         rrd_free(&rrd);
175         fclose(rrd_file);
176         return(-1);
177     }
178
179     if ((tmpl_idx = malloc(sizeof(unsigned long)
180                            *(rrd.stat_head->ds_cnt+1)))==NULL){
181         rrd_set_error("allocating tmpl_idx ...");
182         free(pdp_temp);
183         free(updvals);
184         rrd_free(&rrd);
185         fclose(rrd_file);
186         return(-1);
187     }
188     /* initialize template redirector */
189     /* default config
190        tmpl_idx[0] -> 0; (time)
191        tmpl_idx[1] -> 1; (DS 0)
192        tmpl_idx[2] -> 2; (DS 1)
193        tmpl_idx[3] -> 3; (DS 2)
194        ... */
195     for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
196     tmpl_cnt=rrd.stat_head->ds_cnt+1;
197     if (template) {
198         char *dsname;
199         int tmpl_len;
200         dsname = template;
201         tmpl_cnt = 1; /* the first entry is the time */
202         tmpl_len = strlen(template);
203         for(i=0;i<=tmpl_len ;i++) {
204             if (template[i] == ':' || template[i] == '\0') {
205                 template[i] = '\0';
206                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
207                     rrd_set_error("Template contains more DS definitions than RRD");
208                     free(updvals); free(pdp_temp);
209                     free(tmpl_idx); rrd_free(&rrd);
210                     fclose(rrd_file); return(-1);
211                 }
212                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
213                     rrd_set_error("unknown DS name '%s'",dsname);
214                     free(updvals); free(pdp_temp);
215                     free(tmpl_idx); rrd_free(&rrd);
216                     fclose(rrd_file); return(-1);
217                 } else {
218                   /* the first element is always the time */
219                   tmpl_idx[tmpl_cnt-1]++; 
220                   /* go to the next entry on the template */
221                   dsname = &template[i+1];
222                   /* fix the damage we did before */
223                   if (i<tmpl_len) {
224                      template[i]=':';
225                   } 
226
227                 }
228             }       
229         }
230     }
231     if ((pdp_new = malloc(sizeof(rrd_value_t)
232                           *rrd.stat_head->ds_cnt))==NULL){
233         rrd_set_error("allocating pdp_new ...");
234         free(updvals);
235         free(pdp_temp);
236         free(tmpl_idx);
237         rrd_free(&rrd);
238         fclose(rrd_file);
239         return(-1);
240     }
241
242     /* loop through the arguments. */
243     for(arg_i=optind+1; arg_i<argc;arg_i++) {
244         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
245         char *step_start = stepper;
246         char *p;
247         char *parsetime_error = NULL;
248         enum {atstyle, normal} timesyntax;
249         struct time_value ds_tv;
250         if (stepper == NULL){
251                 rrd_set_error("faild duplication argv entry");
252                 free(updvals);
253                 free(pdp_temp);  
254                 free(tmpl_idx);
255                 rrd_free(&rrd);
256                 fclose(rrd_file);
257                 return(-1);
258          }
259         /* initialize all ds input to unknown except the first one
260            which has always got to be set */
261         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
262         strcpy(stepper,argv[arg_i]);
263         updvals[0]=stepper;
264         /* separate all ds elements; first must be examined separately
265            due to alternate time syntax */
266         if ((p=strchr(stepper,'@'))!=NULL) {
267             timesyntax = atstyle;
268             *p = '\0';
269             stepper = p+1;
270         } else if ((p=strchr(stepper,':'))!=NULL) {
271             timesyntax = normal;
272             *p = '\0';
273             stepper = p+1;
274         } else {
275             rrd_set_error("expected timestamp not found in data source from %s:...",
276                           argv[arg_i]);
277             free(step_start);
278             break;
279         }
280         ii=1;
281         updvals[tmpl_idx[ii]] = stepper;
282         while (*stepper) {
283             if (*stepper == ':') {
284                 *stepper = '\0';
285                 ii++;
286                 if (ii<tmpl_cnt){                   
287                     updvals[tmpl_idx[ii]] = stepper+1;
288                 }
289             }
290             stepper++;
291         }
292
293         if (ii != tmpl_cnt-1) {
294             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
295                           tmpl_cnt-1, ii, argv[arg_i]);
296             free(step_start);
297             break;
298         }
299         
300         /* get the time from the reading ... handle N */
301         if (timesyntax == atstyle) {
302             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
303                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
304                 free(step_start);
305                 break;
306             }
307             if (ds_tv.type == RELATIVE_TO_END_TIME ||
308                 ds_tv.type == RELATIVE_TO_START_TIME) {
309                 rrd_set_error("specifying time relative to the 'start' "
310                               "or 'end' makes no sense here: %s",
311                               updvals[0]);
312                 free(step_start);
313                 break;
314             }
315
316             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
317         } else if (strcmp(updvals[0],"N")==0){
318             current_time = time(NULL);
319         } else {
320             current_time = atol(updvals[0]);
321         }
322         
323         if(current_time <= rrd.live_head->last_up){
324             rrd_set_error("illegal attempt to update using time %ld when "
325                           "last update time is %ld (minimum one second step)",
326                           current_time, rrd.live_head->last_up);
327             free(step_start);
328             break;
329         }
330         
331         
332         /* seek to the beginning of the rrd's */
333         if (rra_current != rra_begin) {
334             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
335                 rrd_set_error("seek error in rrd");
336                 free(step_start);
337                 break;
338             }
339             rra_current = rra_begin;
340         }
341         rra_start = rra_begin;
342
343         /* when was the current pdp started */
344         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
345         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
346
347         /* when did the last pdp_st occur */
348         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
349         occu_pdp_st = current_time - occu_pdp_age;
350         interval = current_time - rrd.live_head->last_up;
351     
352         if (occu_pdp_st > proc_pdp_st){
353             /* OK we passed the pdp_st moment*/
354             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
355                                                               * occurred before the latest
356                                                               * pdp_st moment*/
357             post_int = occu_pdp_age;                         /* how much after it */
358         } else {
359             pre_int = interval;
360             post_int = 0;
361         }
362
363 #ifdef DEBUG
364         printf(
365                "proc_pdp_age %lu\t"
366                "proc_pdp_st %lu\t" 
367                "occu_pfp_age %lu\t" 
368                "occu_pdp_st %lu\t"
369                "int %lu\t"
370                "pre_int %lu\t"
371                "post_int %lu\n", proc_pdp_age, proc_pdp_st, 
372                 occu_pdp_age, occu_pdp_st,
373                interval, pre_int, post_int);
374 #endif
375     
376         /* process the data sources and update the pdp_prep 
377          * area accordingly */
378         for(i=0;i<rrd.stat_head->ds_cnt;i++){
379             enum dst_en dst_idx;
380             dst_idx= dst_conv(rrd.ds_def[i].dst);
381             if((updvals[i+1][0] != 'U') &&
382                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
383                double rate = DNAN;
384                /* the data source type defines how to process the data */
385                 /* pdp_temp contains rate * time ... eg the bytes
386                  * transferred during the interval. Doing it this way saves
387                  * a lot of math operations */
388                 
389
390                 switch(dst_idx){
391                 case DST_COUNTER:
392                 case DST_DERIVE:
393                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
394                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
395                        if(dst_idx == DST_COUNTER) {
396                           /* simple overflow catcher sugestet by andres kroonmaa */
397                           /* this will fail terribly for non 32 or 64 bit counters ... */
398                           /* are there any others in SNMP land ? */
399                           if (pdp_new[i] < (double)0.0 ) 
400                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
401                           if (pdp_new[i] < (double)0.0 ) 
402                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
403                        }
404                        rate = pdp_new[i] / interval;
405                     }
406                    else {
407                      pdp_new[i]= DNAN;          
408                    }
409                    break;
410                 case DST_ABSOLUTE:
411                     pdp_new[i]= atof(updvals[i+1]);
412                     rate = pdp_new[i] / interval;                 
413                     break;
414                 case DST_GAUGE:
415                     pdp_new[i] = atof(updvals[i+1]) * interval;
416                     rate = pdp_new[i] / interval;                  
417                     break;
418                 default:
419                     rrd_set_error("rrd contains unknown DS type : '%s'",
420                                   rrd.ds_def[i].dst);
421                     break;
422                 }
423                 /* break out of this for loop if the error string is set */
424                 if (rrd_test_error()){
425                     break;
426                 }
427                /* make sure pdp_temp is neither too large or too small
428                 * if any of these occur it becomes unknown ...
429                 * sorry folks ... */
430                if ( ! isnan(rate) && 
431                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
432                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
433                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
434                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
435                   pdp_new[i] = DNAN;
436                }               
437             } else {
438                 /* no news is news all the same */
439                 pdp_new[i] = DNAN;
440             }
441             
442             /* make a copy of the command line argument for the next run */
443 #ifdef DEBUG
444             fprintf(stderr,
445                     "prep ds[%lu]\t"
446                     "last_arg '%s'\t"
447                     "this_arg '%s'\t"
448                     "pdp_new %10.2f\n",
449                     i,
450                     rrd.pdp_prep[i].last_ds,
451                     updvals[i+1], pdp_new[i]);
452 #endif
453             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
454                 strncpy(rrd.pdp_prep[i].last_ds,
455                         updvals[i+1],LAST_DS_LEN-1);
456                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
457             }
458         }
459         /* break out of the argument parsing loop if the error_string is set */
460         if (rrd_test_error()){
461             free(step_start);
462             break;
463         }
464         /* has a pdp_st moment occurred since the last run ? */
465
466         if (proc_pdp_st == occu_pdp_st){
467             /* no we have not passed a pdp_st moment. therefore update is simple */
468
469             for(i=0;i<rrd.stat_head->ds_cnt;i++){
470                 if(isnan(pdp_new[i]))
471                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
472                 else
473                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
474 #ifdef DEBUG
475                 fprintf(stderr,
476                         "NO PDP  ds[%lu]\t"
477                         "value %10.2f\t"
478                         "unkn_sec %5lu\n",
479                         i,
480                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
481                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
482 #endif
483             }   
484         } else {
485             /* an pdp_st has occurred. */
486
487             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
488              * occurred up to the last run.        
489             pdp_new[] contains rate*seconds from the latest run.
490             pdp_temp[] will contain the rate for cdp */
491
492
493             for(i=0;i<rrd.stat_head->ds_cnt;i++){
494                 /* update pdp_prep to the current pdp_st */
495                 if(isnan(pdp_new[i]))
496                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
497                 else
498                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
499                         pdp_new[i]/(double)interval*(double)pre_int;
500
501                 /* if too much of the pdp_prep is unknown we dump it */
502                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
503                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
504                     (occu_pdp_st-proc_pdp_st <= 
505                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
506                     pdp_temp[i] = DNAN;
507                 } else {
508                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
509                         / (double)( occu_pdp_st
510                                    - proc_pdp_st
511                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
512                 }
513                 /* make pdp_prep ready for the next run */
514                 if(isnan(pdp_new[i])){
515                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
516                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
517                 } else {
518                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
519                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
520                         pdp_new[i]/(double)interval*(double)post_int;
521                 }
522
523 #ifdef DEBUG
524                 fprintf(stderr,
525                         "PDP UPD ds[%lu]\t"
526                         "pdp_temp %10.2f\t"
527                         "new_prep %10.2f\t"
528                         "new_unkn_sec %5lu\n",
529                         i, pdp_temp[i],
530                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
531                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
532 #endif
533             }
534
535
536             /* now we have to integrate this data into the cdp_prep areas */
537             /* going through the round robin archives */
538             for(i = 0;
539                 i < rrd.stat_head->rra_cnt;
540                 i++){
541                 enum cf_en current_cf = cf_conv(rrd.rra_def[i].cf_nam);
542                 /* going through all pdp_st moments which have occurred 
543                  * since the last run */
544                 for(pdp_st  = proc_pdp_st+rrd.stat_head->pdp_step; 
545                     pdp_st <= occu_pdp_st; 
546                     pdp_st += rrd.stat_head->pdp_step){
547
548 #ifdef DEBUG
549                     fprintf(stderr,"RRA %lu STEP %lu\n",i,pdp_st);
550 #endif
551
552                     if((pdp_st %
553                         (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
554
555                         /* later on the cdp_prep values will be transferred to
556                          * the rra.  we want to be in the right place. */
557                         rrd.rra_ptr[i].cur_row++;
558                         if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
559                             /* oops ... we have to wrap the beast ... */
560                             rrd.rra_ptr[i].cur_row=0;                   
561 #ifdef DEBUG
562                         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
563 #endif
564                         /* determine if a seek is even needed. */
565                         rra_pos_tmp = rra_start +
566                                 rrd.stat_head->ds_cnt*rrd.rra_ptr[i].cur_row*sizeof(rrd_value_t);
567                         if(rra_pos_tmp != rra_current) {
568                             if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
569                                 rrd_set_error("seek error in rrd");
570                                 break;
571                             }
572                             rra_current = rra_pos_tmp;
573                         }
574 #ifdef DEBUG
575                         fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
576 #endif
577                     }
578
579                     for(ii = 0;
580                         ii < rrd.stat_head->ds_cnt;
581                         ii++){
582                         iii=i*rrd.stat_head->ds_cnt+ii;
583                     
584                         /* the contents of cdp_prep[].scratch[CDP_val].u_val depends
585                          * on the consolidation function ! */
586                     
587                         if (isnan(pdp_temp[ii])){    /* pdp is unknown */
588                             rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt++;
589 #ifdef DEBUG
590                             fprintf(stderr,"  ** UNKNOWN ADD %lu\n",
591                                     rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
592 #endif
593                         } else {
594                             if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)){
595                                 /* cdp_prep is unknown when it does not
596                                  * yet contain data. It can not be zero for
597                                  * things like mim and max consolidation
598                                  * functions */
599 #ifdef DEBUG
600                                 fprintf(stderr,"  ** INIT CDP %e\n", pdp_temp[ii]);
601 #endif
602                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
603                             }
604                             else {
605                                 switch (current_cf){
606                                     case CF_AVERAGE:                            
607                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val+=pdp_temp[ii];
608 #ifdef DEBUG
609                                         fprintf(stderr,"  ** AVERAGE %e\n", 
610                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
611 #endif
612                                         break;    
613                                     case CF_MINIMUM:
614                                         if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
615                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
616 #ifdef DEBUG
617                                         fprintf(stderr,"  ** MINIMUM %e\n", 
618                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
619 #endif
620                                         break;
621                                     case CF_MAXIMUM:
622                                         if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
623                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
624 #ifdef DEBUG
625                                         fprintf(stderr,"  ** MAXIMUM %e\n", 
626                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
627 #endif
628                                         break;
629                                     case CF_LAST:
630                                         rrd.cdp_prep[iii].scratch[CDP_val].u_val=pdp_temp[ii];
631 #ifdef DEBUG
632                                         fprintf(stderr,"  ** LAST %e\n", 
633                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val);
634 #endif
635                                         break;    
636                                     default:
637                                         rrd_set_error("Unknown cf %s",
638                                                       rrd.rra_def[i].cf_nam);
639                                         break;
640                                 }
641                             }
642                         }
643
644
645                         /* is the data in the cdp_prep ready to go into
646                          * its rra ? */
647                         if((pdp_st % 
648                             (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step)) == 0){
649
650                             /* prepare cdp_pref for its transition to the rra. */
651                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt 
652                                 > rrd.rra_def[i].pdp_cnt*
653                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
654                                 /* to much of the cdp_prep is unknown ... */
655                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
656                             else if (current_cf == CF_AVERAGE){
657                                 /* for a real average we have to divide
658                                  * the sum we built earlier on. While ignoring
659                                  * the unknown pdps */
660                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val 
661                                         /= (rrd.rra_def[i].pdp_cnt
662                                             -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
663                             }
664                             /* we can write straight away, because we are
665                              * already in the right place ... */
666
667 #ifdef DEBUG
668                             fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld\n",
669                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val,ftell(rrd_file));
670 #endif
671
672                             if(fwrite(&(rrd.cdp_prep[iii].scratch[CDP_val].u_val),
673                                       sizeof(rrd_value_t),1,rrd_file) != 1){
674                                 rrd_set_error("writing rrd");
675                                 break;
676                             }
677                             rra_current += sizeof(rrd_value_t);
678                             wrote_to_file = 1;
679
680 #ifdef DEBUG
681                             fprintf(stderr,"  -- RRA WROTE new at %ld\n",ftell(rrd_file));
682 #endif
683
684                             /* make cdp_prep ready for the next run */
685                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
686                             rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
687                         }
688                     }
689                     /* break out of this loop if error_string has been set */
690                     if (rrd_test_error())
691                         break;
692                 }
693                 /* break out of this loop if error_string has been set */
694                 if (rrd_test_error())
695                     break;
696                 /* to be able to position correctly in the next rra w move
697                  * the rra_start pointer on to the next rra */
698                 rra_start += rrd.rra_def[i].row_cnt
699                         *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
700
701             }
702             /* break out of the argument parsing loop if error_string is set */
703             if (rrd_test_error()){
704                 free(step_start);
705                 break;
706             }
707         }
708         rrd.live_head->last_up = current_time;
709         free(step_start);
710     }
711
712
713     /* if we got here and if there is an error and if the file has not been
714      * written to, then close things up and return. */
715     if (rrd_test_error()) {
716         free(updvals);
717         free(tmpl_idx);
718         rrd_free(&rrd);
719         free(pdp_temp);
720         free(pdp_new);
721         fclose(rrd_file);
722         return(-1);
723     }
724
725     /* aargh ... that was tough ... so many loops ... anyway, its done.
726      * we just need to write back the live header portion now*/
727
728     if (fseek(rrd_file, (sizeof(stat_head_t)
729                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
730                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
731               SEEK_SET) != 0) {
732         rrd_set_error("seek rrd for live header writeback");
733         free(updvals);
734         free(tmpl_idx);
735         rrd_free(&rrd);
736         free(pdp_temp);
737         free(pdp_new);
738         fclose(rrd_file);
739         return(-1);
740     }
741
742     if(fwrite( rrd.live_head,
743                sizeof(live_head_t), 1, rrd_file) != 1){
744         rrd_set_error("fwrite live_head to rrd");
745         free(updvals);
746         rrd_free(&rrd);
747         free(tmpl_idx);
748         free(pdp_temp);
749         free(pdp_new);
750         fclose(rrd_file);
751         return(-1);
752     }
753
754     if(fwrite( rrd.pdp_prep,
755                sizeof(pdp_prep_t),
756                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
757         rrd_set_error("ftwrite pdp_prep to rrd");
758         free(updvals);
759         rrd_free(&rrd);
760         free(tmpl_idx);
761         free(pdp_temp);
762         free(pdp_new);
763         fclose(rrd_file);
764         return(-1);
765     }
766
767     if(fwrite( rrd.cdp_prep,
768                sizeof(cdp_prep_t),
769                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
770        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
771
772         rrd_set_error("ftwrite cdp_prep to rrd");
773         free(updvals);
774         free(tmpl_idx);
775         rrd_free(&rrd);
776         free(pdp_temp);
777         free(pdp_new);
778         fclose(rrd_file);
779         return(-1);
780     }
781
782     if(fwrite( rrd.rra_ptr,
783                sizeof(rra_ptr_t), 
784                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
785         rrd_set_error("fwrite rra_ptr to rrd");
786         free(updvals);
787         free(tmpl_idx);
788         rrd_free(&rrd);
789         free(pdp_temp);
790         free(pdp_new);
791         fclose(rrd_file);
792         return(-1);
793     }
794
795     /* OK now close the files and free the memory */
796     if(fclose(rrd_file) != 0){
797         rrd_set_error("closing rrd");
798         free(updvals);
799         free(tmpl_idx);
800         rrd_free(&rrd);
801         free(pdp_temp);
802         free(pdp_new);
803         return(-1);
804     }
805
806     rrd_free(&rrd);
807     free(updvals);
808     free(tmpl_idx);
809     free(pdp_new);
810     free(pdp_temp);
811     return(0);
812 }
813
814 /*
815  * get exclusive lock to whole file.
816  * lock gets removed when we close the file
817  *
818  * returns 0 on success
819  */
820 int
821 LockRRD(FILE *rrdfile)
822 {
823     int rrd_fd;         /* File descriptor for RRD */
824     int                 stat;
825
826     rrd_fd = fileno(rrdfile);
827
828         {
829 #ifndef WIN32    
830                 struct flock    lock;
831     lock.l_type = F_WRLCK;    /* exclusive write lock */
832     lock.l_len = 0;           /* whole file */
833     lock.l_start = 0;         /* start of file */
834     lock.l_whence = SEEK_SET;   /* end of file */
835
836     stat = fcntl(rrd_fd, F_SETLK, &lock);
837 #else
838                 struct _stat st;
839
840                 if ( _fstat( rrd_fd, &st ) == 0 ) {
841                         stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
842                 } else {
843                         stat = -1;
844                 }
845 #endif
846         }
847
848     return(stat);
849 }