new trunk based on current 1.2
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
91 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99     char             *tmplt = NULL;          
100         info_t *result = NULL;
101         infoval rc;
102       rc.u_int = -1;
103     optind = 0; opterr = 0;  /* initialize getopt */
104
105     while (1) {
106                 static struct option long_options[] =
107                         {
108                                 {"template",      required_argument, 0, 't'},
109                                 {0,0,0,0}
110                         };
111                 int option_index = 0;
112                 int opt;
113                 opt = getopt_long(argc, argv, "t:", 
114                                                   long_options, &option_index);
115                 
116                 if (opt == EOF)
117                         break;
118                 
119                 switch(opt) {
120                 case 't':
121                         tmplt = optarg;
122                         break;
123                 
124                 case '?':
125                         rrd_set_error("unknown option '%s'",argv[optind-1]);
126                         goto end_tag;
127                 }
128     }
129
130     /* need at least 2 arguments: filename, data. */
131     if (argc-optind < 2) {
132                 rrd_set_error("Not enough arguments");
133                 goto end_tag;
134     }
135     rc.u_int = 0;
136     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137         rc.u_int = _rrd_update(argv[optind], tmplt,
138                       argc - optind - 1, (const char **)(argv + optind + 1), result);
139     result->value.u_int = rc.u_int;
140 end_tag:
141     return result;
142 }
143
144 int
145 rrd_update(int argc, char **argv)
146 {
147     char             *tmplt = NULL;          
148     int rc;
149     optind = 0; opterr = 0;  /* initialize getopt */
150
151     while (1) {
152                 static struct option long_options[] =
153                         {
154                                 {"template",      required_argument, 0, 't'},
155                                 {0,0,0,0}
156                         };
157                 int option_index = 0;
158                 int opt;
159                 opt = getopt_long(argc, argv, "t:", 
160                                                   long_options, &option_index);
161                 
162                 if (opt == EOF)
163                         break;
164                 
165                 switch(opt) {
166                 case 't':
167                         tmplt = optarg;
168                         break;
169                 
170                 case '?':
171                         rrd_set_error("unknown option '%s'",argv[optind-1]);
172                         return(-1);
173                 }
174     }
175
176     /* need at least 2 arguments: filename, data. */
177     if (argc-optind < 2) {
178                 rrd_set_error("Not enough arguments");
179
180                 return -1;
181     }
182  
183         rc = rrd_update_r(argv[optind], tmplt,
184                       argc - optind - 1, (const char **)(argv + optind + 1));
185     return rc;
186 }
187
188 int
189 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
190 {
191    return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
193
194 int
195 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
196    info_t *pcdp_summary)
197 {
198
199     int              arg_i = 2;
200     short            j;
201     unsigned long    i,ii,iii=1;
202
203     unsigned long    rra_begin;          /* byte pointer to the rra
204                                           * area in the rrd file.  this
205                                           * pointer never changes value */
206     unsigned long    rra_start;          /* byte pointer to the rra
207                                           * area in the rrd file.  this
208                                           * pointer changes as each rrd is
209                                           * processed. */
210     unsigned long    rra_current;        /* byte pointer to the current write
211                                           * spot in the rrd file. */
212     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
213     double           interval,
214                      pre_int,post_int;   /* interval between this and
215                                           * the last run */
216     unsigned long    proc_pdp_st;        /* which pdp_st was the last
217                                           * to be processed */
218     unsigned long    occu_pdp_st;        /* when was the pdp_st
219                                           * before the last update
220                                           * time */
221     unsigned long    proc_pdp_age;       /* how old was the data in
222                                           * the pdp prep area when it
223                                           * was last updated */
224     unsigned long    occu_pdp_age;       /* how long ago was the last
225                                           * pdp_step time */
226     rrd_value_t      *pdp_new;           /* prepare the incoming data
227                                           * to be added the the
228                                           * existing entry */
229     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
230                                           * to be added the the
231                                           * cdp values */
232
233     long             *tmpl_idx;          /* index representing the settings
234                                             transported by the tmplt index */
235     unsigned long    tmpl_cnt = 2;       /* time and data */
236
237     FILE             *rrd_file;
238     rrd_t            rrd;
239     time_t           current_time = 0;
240     time_t           rra_time = 0;       /* time of update for a RRA */
241     unsigned long    current_time_usec=0;/* microseconds part of current time */
242     struct timeval   tmp_time;           /* used for time conversion */
243
244     char             **updvals;
245     int              schedule_smooth = 0;
246         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247                                          /* a vector of future Holt-Winters seasonal coefs */
248     unsigned long    elapsed_pdp_st;
249                                          /* number of elapsed PDP steps since last update */
250     unsigned long    *rra_step_cnt = NULL;
251                                          /* number of rows to be updated in an RRA for a data
252                                           * value. */
253     unsigned long    start_pdp_offset;
254                                          /* number of PDP steps since the last update that
255                                           * are assigned to the first CDP to be generated
256                                           * since the last update. */
257     unsigned short   scratch_idx;
258                                          /* index into the CDP scratch array */
259     enum cf_en       current_cf;
260                                          /* numeric id of the current consolidation function */
261     rpnstack_t       rpnstack; /* used for COMPUTE DS */
262     int              version;  /* rrd version */
263     char             *endptr; /* used in the conversion */
264
265 #ifdef HAVE_MMAP
266     void             *rrd_mmaped_file;
267     unsigned long    rrd_filesize;
268 #endif
269
270
271     rpnstack_init(&rpnstack);
272
273     /* need at least 1 arguments: data. */
274     if (argc < 1) {
275         rrd_set_error("Not enough arguments");
276         return -1;
277     }
278     
279     
280
281     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
282         return -1;
283     }
284
285     /* initialize time */
286     version = atoi(rrd.stat_head->version);
287     gettimeofday(&tmp_time, 0);
288     normalize_time(&tmp_time);
289     current_time = tmp_time.tv_sec;
290     if(version >= 3) {
291         current_time_usec = tmp_time.tv_usec;
292     }
293     else {
294         current_time_usec = 0;
295     }
296
297     rra_current = rra_start = rra_begin = ftell(rrd_file);
298     /* This is defined in the ANSI C standard, section 7.9.5.3:
299
300         When a file is opened with udpate mode ('+' as the second
301         or third character in the ... list of mode argument
302         variables), both input and ouptut may be performed on the
303         associated stream.  However, ...  input may not be directly
304         followed by output without an intervening call to a file
305         positioning function, unless the input oepration encounters
306         end-of-file. */
307 #ifdef HAVE_MMAP
308     fseek(rrd_file, 0, SEEK_END);
309     rrd_filesize = ftell(rrd_file);
310     fseek(rrd_file, rra_current, SEEK_SET);
311 #else
312     fseek(rrd_file, 0, SEEK_CUR);
313 #endif
314
315     
316     /* get exclusive lock to whole file.
317      * lock gets removed when we close the file.
318      */
319     if (LockRRD(rrd_file) != 0) {
320       rrd_set_error("could not lock RRD");
321       rrd_free(&rrd);
322       fclose(rrd_file);
323       return(-1);   
324     } 
325
326     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
327         rrd_set_error("allocating updvals pointer array");
328         rrd_free(&rrd);
329         fclose(rrd_file);
330         return(-1);
331     }
332
333     if ((pdp_temp = malloc(sizeof(rrd_value_t)
334                            *rrd.stat_head->ds_cnt))==NULL){
335         rrd_set_error("allocating pdp_temp ...");
336         free(updvals);
337         rrd_free(&rrd);
338         fclose(rrd_file);
339         return(-1);
340     }
341
342     if ((tmpl_idx = malloc(sizeof(unsigned long)
343                            *(rrd.stat_head->ds_cnt+1)))==NULL){
344         rrd_set_error("allocating tmpl_idx ...");
345         free(pdp_temp);
346         free(updvals);
347         rrd_free(&rrd);
348         fclose(rrd_file);
349         return(-1);
350     }
351     /* initialize tmplt redirector */
352     /* default config example (assume DS 1 is a CDEF DS)
353        tmpl_idx[0] -> 0; (time)
354        tmpl_idx[1] -> 1; (DS 0)
355        tmpl_idx[2] -> 3; (DS 2)
356        tmpl_idx[3] -> 4; (DS 3) */
357     tmpl_idx[0] = 0; /* time */
358     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
359         {
360            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
361               tmpl_idx[ii++]=i;
362         }
363     tmpl_cnt= ii;
364
365     if (tmplt) {
366         /* we should work on a writeable copy here */
367         char *dsname;
368         unsigned int tmpl_len;
369         char *tmplt_copy = strdup(tmplt);
370         dsname = tmplt_copy;
371         tmpl_cnt = 1; /* the first entry is the time */
372         tmpl_len = strlen(tmplt_copy);
373         for(i=0;i<=tmpl_len ;i++) {
374             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
375                 tmplt_copy[i] = '\0';
376                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
377                     rrd_set_error("tmplt contains more DS definitions than RRD");
378                     free(updvals); free(pdp_temp);
379                     free(tmpl_idx); rrd_free(&rrd);
380                     fclose(rrd_file); return(-1);
381                 }
382                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
383                     rrd_set_error("unknown DS name '%s'",dsname);
384                     free(updvals); free(pdp_temp);
385                     free(tmplt_copy);
386                     free(tmpl_idx); rrd_free(&rrd);
387                     fclose(rrd_file); return(-1);
388                 } else {
389                   /* the first element is always the time */
390                   tmpl_idx[tmpl_cnt-1]++; 
391                   /* go to the next entry on the tmplt_copy */
392                   dsname = &tmplt_copy[i+1];
393                   /* fix the damage we did before */
394                   if (i<tmpl_len) {
395                      tmplt_copy[i]=':';
396                   } 
397
398                 }
399             }       
400         }
401         free(tmplt_copy);
402     }
403     if ((pdp_new = malloc(sizeof(rrd_value_t)
404                           *rrd.stat_head->ds_cnt))==NULL){
405         rrd_set_error("allocating pdp_new ...");
406         free(updvals);
407         free(pdp_temp);
408         free(tmpl_idx);
409         rrd_free(&rrd);
410         fclose(rrd_file);
411         return(-1);
412     }
413
414 #ifdef HAVE_MMAP
415     rrd_mmaped_file = mmap(0, 
416                         rrd_filesize, 
417                         PROT_READ | PROT_WRITE, 
418                         MAP_SHARED, 
419                         fileno(rrd_file), 
420                         0);
421     if (rrd_mmaped_file == MAP_FAILED) {
422         rrd_set_error("error mmapping file %s", filename);
423         free(updvals);
424         free(pdp_temp);
425         free(tmpl_idx);
426         rrd_free(&rrd);
427         fclose(rrd_file);
428         return(-1);
429     }
430 #ifdef HAVE_MADVISE
431     /* when we use mmaping we tell the kernel the mmap equivalent
432        of POSIX_FADV_RANDOM */
433     madvise(rrd_mmaped_file,rrd_filesize,POSIX_MADV_RANDOM);
434 #endif
435 #endif
436     /* loop through the arguments. */
437     for(arg_i=0; arg_i<argc;arg_i++) {
438         char *stepper = strdup(argv[arg_i]);
439         char *step_start = stepper;
440         char *p;
441         char *parsetime_error = NULL;
442         enum {atstyle, normal} timesyntax;
443         struct rrd_time_value ds_tv;
444         if (stepper == NULL){
445                 rrd_set_error("failed duplication argv entry");
446                 free(step_start);
447                 free(updvals);
448                 free(pdp_temp);  
449                 free(tmpl_idx);
450                 rrd_free(&rrd);
451 #ifdef HAVE_MMAP
452                 munmap(rrd_mmaped_file, rrd_filesize);
453 #endif
454                 fclose(rrd_file);
455                 return(-1);
456          }
457         /* initialize all ds input to unknown except the first one
458            which has always got to be set */
459         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
460         updvals[0]=stepper;
461         /* separate all ds elements; first must be examined separately
462            due to alternate time syntax */
463         if ((p=strchr(stepper,'@'))!=NULL) {
464             timesyntax = atstyle;
465             *p = '\0';
466             stepper = p+1;
467         } else if ((p=strchr(stepper,':'))!=NULL) {
468             timesyntax = normal;
469             *p = '\0';
470             stepper = p+1;
471         } else {
472             rrd_set_error("expected timestamp not found in data source from %s",
473                           argv[arg_i]);
474             free(step_start);
475             break;
476         }
477         ii=1;
478         updvals[tmpl_idx[ii]] = stepper;
479         while (*stepper) {
480             if (*stepper == ':') {
481                 *stepper = '\0';
482                 ii++;
483                 if (ii<tmpl_cnt){                   
484                     updvals[tmpl_idx[ii]] = stepper+1;
485                 }
486             }
487             stepper++;
488         }
489
490         if (ii != tmpl_cnt-1) {
491             rrd_set_error("expected %lu data source readings (got %lu) from %s",
492                           tmpl_cnt-1, ii, argv[arg_i]);
493             free(step_start);
494             break;
495         }
496         
497         /* get the time from the reading ... handle N */
498         if (timesyntax == atstyle) {
499             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
500                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
501                 free(step_start);
502                 break;
503             }
504             if (ds_tv.type == RELATIVE_TO_END_TIME ||
505                 ds_tv.type == RELATIVE_TO_START_TIME) {
506                 rrd_set_error("specifying time relative to the 'start' "
507                               "or 'end' makes no sense here: %s",
508                               updvals[0]);
509                 free(step_start);
510                 break;
511             }
512
513             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
514             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
515             
516         } else if (strcmp(updvals[0],"N")==0){
517             gettimeofday(&tmp_time, 0);
518             normalize_time(&tmp_time);
519             current_time = tmp_time.tv_sec;
520             current_time_usec = tmp_time.tv_usec;
521         } else {
522             double tmp;
523             tmp = strtod(updvals[0], 0);
524             current_time = floor(tmp);
525             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
526         }
527         /* dont do any correction for old version RRDs */
528         if(version < 3) 
529             current_time_usec = 0;
530         
531         if(current_time < rrd.live_head->last_up || 
532           (current_time == rrd.live_head->last_up && 
533            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
534             rrd_set_error("illegal attempt to update using time %ld when "
535                           "last update time is %ld (minimum one second step)",
536                           current_time, rrd.live_head->last_up);
537             free(step_start);
538             break;
539         }
540         
541         
542         /* seek to the beginning of the rra's */
543         if (rra_current != rra_begin) {
544 #ifndef HAVE_MMAP
545             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
546                 rrd_set_error("seek error in rrd");
547                 free(step_start);
548                 break;
549             }
550 #endif
551             rra_current = rra_begin;
552         }
553         rra_start = rra_begin;
554
555         /* when was the current pdp started */
556         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
557         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
558
559         /* when did the last pdp_st occur */
560         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
561         occu_pdp_st = current_time - occu_pdp_age;
562
563         /* interval = current_time - rrd.live_head->last_up; */
564         interval    = (double)(current_time - rrd.live_head->last_up) 
565                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
566
567         if (occu_pdp_st > proc_pdp_st){
568             /* OK we passed the pdp_st moment*/
569             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570                                                               * occurred before the latest
571                                                               * pdp_st moment*/
572             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573             post_int = occu_pdp_age;                         /* how much after it */
574             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
575         } else {
576             pre_int = interval;
577             post_int = 0;
578         }
579
580 #ifdef DEBUG
581         printf(
582                "proc_pdp_age %lu\t"
583                "proc_pdp_st %lu\t" 
584                "occu_pfp_age %lu\t" 
585                "occu_pdp_st %lu\t"
586                "int %lf\t"
587                "pre_int %lf\t"
588                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
589                 occu_pdp_age, occu_pdp_st,
590                interval, pre_int, post_int);
591 #endif
592     
593         /* process the data sources and update the pdp_prep 
594          * area accordingly */
595         for(i=0;i<rrd.stat_head->ds_cnt;i++){
596             enum dst_en dst_idx;
597             dst_idx= dst_conv(rrd.ds_def[i].dst);
598
599             /* make sure we do not build diffs with old last_ds values */
600             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
601                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
602                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
603             }
604
605             /* NOTE: DST_CDEF should never enter this if block, because
606              * updvals[i+1][0] is initialized to 'U'; unless the caller
607              * accidently specified a value for the DST_CDEF. To handle 
608               * this case, an extra check is required. */
609
610             if((updvals[i+1][0] != 'U') &&
611                    (dst_idx != DST_CDEF) &&
612                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
613                double rate = DNAN;
614                /* the data source type defines how to process the data */
615                 /* pdp_new contains rate * time ... eg the bytes
616                  * transferred during the interval. Doing it this way saves
617                  * a lot of math operations */
618                 
619
620                 switch(dst_idx){
621                 case DST_COUNTER:
622                 case DST_DERIVE:
623                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
624                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
625                             if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
626                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
627                                  break;
628                             }
629                        }
630                        if (rrd_test_error()){
631                             break;
632                        }
633                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
634                        if(dst_idx == DST_COUNTER) {
635                           /* simple overflow catcher suggested by Andres Kroonmaa */
636                           /* this will fail terribly for non 32 or 64 bit counters ... */
637                           /* are there any others in SNMP land ? */
638                           if (pdp_new[i] < (double)0.0 ) 
639                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
640                           if (pdp_new[i] < (double)0.0 ) 
641                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
642                        }
643                        rate = pdp_new[i] / interval;
644                     }
645                    else {
646                      pdp_new[i]= DNAN;          
647                    }
648                    break;
649                 case DST_ABSOLUTE:
650                     errno = 0;
651                     pdp_new[i] = strtod(updvals[i+1],&endptr);
652                     if (errno > 0){
653                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
654                         break;
655                     };
656                     if (endptr[0] != '\0'){
657                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
658                         break;
659                     }
660                     rate = pdp_new[i] / interval;                 
661                     break;
662                 case DST_GAUGE:
663                     errno = 0;
664                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
665                     if (errno > 0){
666                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
667                         break;
668                     };
669                     if (endptr[0] != '\0'){
670                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
671                         break;
672                     }
673                     rate = pdp_new[i] / interval;                  
674                     break;
675                 default:
676                     rrd_set_error("rrd contains unknown DS type : '%s'",
677                                   rrd.ds_def[i].dst);
678                     break;
679                 }
680                 /* break out of this for loop if the error string is set */
681                 if (rrd_test_error()){
682                     break;
683                 }
684                /* make sure pdp_temp is neither too large or too small
685                 * if any of these occur it becomes unknown ...
686                 * sorry folks ... */
687                if ( ! isnan(rate) && 
688                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
689                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
690                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
691                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
692                   pdp_new[i] = DNAN;
693                }               
694             } else {
695                 /* no news is news all the same */
696                 pdp_new[i] = DNAN;
697             }
698
699             
700             /* make a copy of the command line argument for the next run */
701 #ifdef DEBUG
702             fprintf(stderr,
703                     "prep ds[%lu]\t"
704                     "last_arg '%s'\t"
705                     "this_arg '%s'\t"
706                     "pdp_new %10.2f\n",
707                     i,
708                     rrd.pdp_prep[i].last_ds,
709                     updvals[i+1], pdp_new[i]);
710 #endif
711             strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
712             rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
713         }
714         /* break out of the argument parsing loop if the error_string is set */
715         if (rrd_test_error()){
716             free(step_start);
717             break;
718         }
719         /* has a pdp_st moment occurred since the last run ? */
720
721         if (proc_pdp_st == occu_pdp_st){
722             /* no we have not passed a pdp_st moment. therefore update is simple */
723
724             for(i=0;i<rrd.stat_head->ds_cnt;i++){
725                 if(isnan(pdp_new[i])) {            
726                     /* this is not realy accurate if we use subsecond data arival time
727                        should have thought of it when going subsecond resolution ...
728                        sorry next format change we will have it! */
729                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
730                 } else {
731                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
732                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
733                      } else {
734                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
735                      }
736                 }
737 #ifdef DEBUG
738                 fprintf(stderr,
739                         "NO PDP  ds[%lu]\t"
740                         "value %10.2f\t"
741                         "unkn_sec %5lu\n",
742                         i,
743                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
744                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
745 #endif
746             }   
747         } else {
748             /* an pdp_st has occurred. */
749
750             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
751              * occurred up to the last run.        
752             pdp_new[] contains rate*seconds from the latest run.
753             pdp_temp[] will contain the rate for cdp */
754
755             for(i=0;i<rrd.stat_head->ds_cnt;i++){
756                 /* update pdp_prep to the current pdp_st. */
757                 double pre_unknown = 0.0;               
758                 if(isnan(pdp_new[i]))
759                     /* a final bit of unkonwn to be added bevore calculation
760                      * we use a tempaorary variable for this so that we 
761                      * don't have to turn integer lines before using the value */                
762                     pre_unknown = pre_int;
763                 else {
764                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
765                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
766                      } else {
767                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
768                      }
769                  }
770                 
771
772                 /* if too much of the pdp_prep is unknown we dump it */
773                 if ( 
774                     /* removed because this does not agree with the definition
775                        a heart beat can be unknown */
776                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
777                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
778                     /* if the interval is larger thatn mrhb we get NAN */
779                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
780                     (occu_pdp_st-proc_pdp_st <= 
781                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
782                     pdp_temp[i] = DNAN;
783                 } else {
784                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
785                         / ((double)(occu_pdp_st - proc_pdp_st
786                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
787                             -pre_unknown);
788                 }
789
790                 /* process CDEF data sources; remember each CDEF DS can
791                  * only reference other DS with a lower index number */
792             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
793                    rpnp_t *rpnp;
794                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
795                    /* substitue data values for OP_VARIABLE nodes */
796                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
797                    {
798                           if (rpnp[ii].op == OP_VARIABLE) {
799                                  rpnp[ii].op = OP_NUMBER;
800                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
801                           }
802                    }
803                    /* run the rpn calculator */
804                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
805                           free(rpnp);
806                           break; /* exits the data sources pdp_temp loop */
807                    }
808                 }
809         
810                 /* make pdp_prep ready for the next run */
811                 if(isnan(pdp_new[i])){
812                     /* this is not realy accurate if we use subsecond data arival time
813                        should have thought of it when going subsecond resolution ...
814                        sorry next format change we will have it! */
815                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
816                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
817                 } else {
818                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
819                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
820                         pdp_new[i]/interval*post_int;
821                 }
822
823 #ifdef DEBUG
824                 fprintf(stderr,
825                         "PDP UPD ds[%lu]\t"
826                         "pdp_temp %10.2f\t"
827                         "new_prep %10.2f\t"
828                         "new_unkn_sec %5lu\n",
829                         i, pdp_temp[i],
830                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
831                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
832 #endif
833             }
834
835                 /* if there were errors during the last loop, bail out here */
836             if (rrd_test_error()){
837                free(step_start);
838                break;
839             }
840
841                 /* compute the number of elapsed pdp_st moments */
842                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
843 #ifdef DEBUG
844                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
845 #endif
846                 if (rra_step_cnt == NULL)
847                 {
848                    rra_step_cnt = (unsigned long *) 
849                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
850                 }
851
852             for(i = 0, rra_start = rra_begin;
853                 i < rrd.stat_head->rra_cnt;
854             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
855                 i++)
856                 {
857                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
858                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
859                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
860         if (start_pdp_offset <= elapsed_pdp_st) {
861            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
862                       rrd.rra_def[i].pdp_cnt + 1;
863             } else {
864                    rra_step_cnt[i] = 0;
865                 }
866
867                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
868                 {
869                    /* If this is a bulk update, we need to skip ahead in the seasonal
870                         * arrays so that they will be correct for the next observed value;
871                         * note that for the bulk update itself, no update will occur to
872                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
873                         * be set to DNAN. */
874            if (rra_step_cnt[i] > 2) 
875                    {
876                           /* skip update by resetting rra_step_cnt[i],
877                            * note that this is not data source specific; this is due
878                            * to the bulk update, not a DNAN value for the specific data
879                            * source. */
880                           rra_step_cnt[i] = 0;
881               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
882                              &last_seasonal_coef);
883                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
884                              &seasonal_coef);
885                    }
886                 
887                   /* periodically run a smoother for seasonal effects */
888                   /* Need to use first cdp parameter buffer to track
889                    * burnin (burnin requires a specific smoothing schedule).
890                    * The CDP_init_seasonal parameter is really an RRA level,
891                    * not a data source within RRA level parameter, but the rra_def
892                    * is read only for rrd_update (not flushed to disk). */
893                   iii = i*(rrd.stat_head -> ds_cnt);
894                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
895                           <= BURNIN_CYCLES)
896                   {
897                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
898                                  > rrd.rra_def[i].row_cnt - 1) {
899                            /* mark off one of the burnin cycles */
900                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
901                        schedule_smooth = 1;
902                          }  
903                   } else {
904                          /* someone has no doubt invented a trick to deal with this
905                           * wrap around, but at least this code is clear. */
906                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
907                              rrd.rra_ptr[i].cur_row)
908                          {
909                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
910                                   * mapping between PDP and CDP */
911                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
912                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
913                                  {
914 #ifdef DEBUG
915                                         fprintf(stderr,
916                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
917                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
918                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
919 #endif
920                                         schedule_smooth = 1;
921                                  }
922              } else {
923                                  /* can't rely on negative numbers because we are working with
924                                   * unsigned values */
925                                  /* Don't need modulus here. If we've wrapped more than once, only
926                                   * one smooth is executed at the end. */
927                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
928                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
929                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
930                                  {
931 #ifdef DEBUG
932                                         fprintf(stderr,
933                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
934                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
935                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
936 #endif
937                                         schedule_smooth = 1;
938                                  }
939                          }
940                   }
941
942               rra_current = ftell(rrd_file); 
943                 } /* if cf is DEVSEASONAL or SEASONAL */
944
945         if (rrd_test_error()) break;
946
947                     /* update CDP_PREP areas */
948                     /* loop over data soures within each RRA */
949                     for(ii = 0;
950                         ii < rrd.stat_head->ds_cnt;
951                         ii++)
952                         {
953                         
954                         /* iii indexes the CDP prep area for this data source within the RRA */
955                         iii=i*rrd.stat_head->ds_cnt+ii;
956
957                         if (rrd.rra_def[i].pdp_cnt > 1) {
958                           
959                            if (rra_step_cnt[i] > 0) {
960                            /* If we are in this block, as least 1 CDP value will be written to
961                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
962                                 * to be written, then the "fill in" value is the CDP_secondary_val
963                                 * entry. */
964                                   if (isnan(pdp_temp[ii]))
965                   {
966                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
967                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
968                                   } else {
969                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
970                                           * CDP data entries. No matter the CF, the value is the same because
971                                           * the average, max, min, and last of a list of identical values is
972                                           * the same, namely, the value itself. */
973                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
974                                   }
975                      
976                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
977                                       > rrd.rra_def[i].pdp_cnt*
978                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
979                                   {
980                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
981                                          /* initialize carry over */
982                                          if (current_cf == CF_AVERAGE) {
983                                                    if (isnan(pdp_temp[ii])) { 
984                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
985                                                    } else {
986                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
987                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
988                                                    }
989                                          } else {
990                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
991                                          }
992                                   } else {
993                                          rrd_value_t cum_val, cur_val; 
994                                      switch (current_cf) {
995                                                 case CF_AVERAGE:
996                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
997                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
998                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
999                                                (cum_val + cur_val * start_pdp_offset) /
1000                                            (rrd.rra_def[i].pdp_cnt
1001                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1002                                                    /* initialize carry over value */
1003                                                    if (isnan(pdp_temp[ii])) { 
1004                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1005                                                    } else {
1006                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1007                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1008                                                    }
1009                                                    break;
1010                                                 case CF_MAXIMUM:
1011                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1012                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1013 #ifdef DEBUG
1014                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1015                                                           isnan(pdp_temp[ii])) {
1016                                                      fprintf(stderr,
1017                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1018                                                                 i,ii);
1019                                                          exit(-1);
1020                                                   }
1021 #endif
1022                                                   if (cur_val > cum_val)
1023                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1024                                                   else
1025                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1026                                                   /* initialize carry over value */
1027                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1028                                                   break;
1029                                                 case CF_MINIMUM:
1030                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1031                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1032 #ifdef DEBUG
1033                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1034                                                           isnan(pdp_temp[ii])) {
1035                                                      fprintf(stderr,
1036                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1037                                                                 i,ii);
1038                                                          exit(-1);
1039                                                   }
1040 #endif
1041                                                   if (cur_val < cum_val)
1042                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1043                                                   else
1044                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1045                                                   /* initialize carry over value */
1046                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1047                                                   break;
1048                                                 case CF_LAST:
1049                                                 default:
1050                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1051                                                    /* initialize carry over value */
1052                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1053                                                 break;
1054                                          }
1055                                   } /* endif meets xff value requirement for a valid value */
1056                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1057                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1058                                   if (isnan(pdp_temp[ii]))
1059                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1060                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1061                                   else
1062                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1063                } else  /* rra_step_cnt[i]  == 0 */
1064                            {
1065 #ifdef DEBUG
1066                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1067                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1068                                          i,ii);
1069                                   } else {
1070                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1071                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1072                                   }
1073 #endif
1074                                   if (isnan(pdp_temp[ii])) {
1075                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1076                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1077                                   {
1078                                          if (current_cf == CF_AVERAGE) {
1079                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1080                                                    elapsed_pdp_st;
1081                                          } else {
1082                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1083                                          }
1084 #ifdef DEBUG
1085                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1086                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1087 #endif
1088                                   } else {
1089                                          switch (current_cf) {
1090                                          case CF_AVERAGE:
1091                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1092                                                    elapsed_pdp_st;
1093                                                 break;
1094                                          case CF_MINIMUM:
1095                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1096                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1097                                                 break; 
1098                                          case CF_MAXIMUM:
1099                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1100                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1101                                                 break; 
1102                                          case CF_LAST:
1103                                          default:
1104                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1105                                                 break;
1106                                          }
1107                                   }
1108                            }
1109                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1110                            if (elapsed_pdp_st > 2)
1111                            {
1112                                    switch (current_cf) {
1113                                    case CF_AVERAGE:
1114                                    default:
1115                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1116                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1117                                           break;
1118                    case CF_SEASONAL:
1119                                    case CF_DEVSEASONAL:
1120                                           /* need to update cached seasonal values, so they are consistent
1121                                            * with the bulk update */
1122                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1123                                            * CDP_last_deviation are the same. */
1124                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1125                                                  last_seasonal_coef[ii];
1126                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1127                                                  seasonal_coef[ii];
1128                                           break;
1129                    case CF_HWPREDICT:
1130                                           /* need to update the null_count and last_null_count.
1131                                            * even do this for non-DNAN pdp_temp because the
1132                                            * algorithm is not learning from batch updates. */
1133                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1134                                                  elapsed_pdp_st;
1135                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1136                                                  elapsed_pdp_st - 1;
1137                                           /* fall through */
1138                                    case CF_DEVPREDICT:
1139                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1140                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1141                                           break;
1142                    case CF_FAILURES:
1143                                           /* do not count missed bulk values as failures */
1144                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1145                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1146                                           /* need to reset violations buffer.
1147                                            * could do this more carefully, but for now, just
1148                                            * assume a bulk update wipes away all violations. */
1149                       erase_violations(&rrd, iii, i);
1150                                           break;
1151                                    }
1152                            } 
1153                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1154
1155                         if (rrd_test_error()) break;
1156
1157                         } /* endif data sources loop */
1158         } /* end RRA Loop */
1159
1160                 /* this loop is only entered if elapsed_pdp_st < 3 */
1161                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1162                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1163                 {
1164                for(i = 0, rra_start = rra_begin;
1165                    i < rrd.stat_head->rra_cnt;
1166                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1167                    i++)
1168                    {
1169                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1170
1171                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1172                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1173                           {
1174                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1175                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1176                                 &seasonal_coef);
1177                  rra_current = ftell(rrd_file);
1178                           }
1179                           if (rrd_test_error()) break;
1180                       /* loop over data soures within each RRA */
1181                       for(ii = 0;
1182                           ii < rrd.stat_head->ds_cnt;
1183                           ii++)
1184                           {
1185                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1186                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1187                                     scratch_idx, seasonal_coef);
1188                           }
1189            } /* end RRA Loop */
1190                    if (rrd_test_error()) break;
1191             } /* end elapsed_pdp_st loop */
1192
1193                 if (rrd_test_error()) break;
1194
1195                 /* Ready to write to disk */
1196                 /* Move sequentially through the file, writing one RRA at a time.
1197                  * Note this architecture divorces the computation of CDP with
1198                  * flushing updated RRA entries to disk. */
1199             for(i = 0, rra_start = rra_begin;
1200                 i < rrd.stat_head->rra_cnt;
1201             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1202                 i++) {
1203                 /* is th5Aere anything to write for this RRA? If not, continue. */
1204         if (rra_step_cnt[i] == 0) continue;
1205
1206                 /* write the first row */
1207 #ifdef DEBUG
1208         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1209 #endif
1210             rrd.rra_ptr[i].cur_row++;
1211             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1212                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1213                 /* positition on the first row */
1214                 rra_pos_tmp = rra_start +
1215                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1216                 if(rra_pos_tmp != rra_current) {
1217 #ifndef HAVE_MMAP
1218                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1219                       rrd_set_error("seek error in rrd");
1220                       break;
1221                    }
1222 #endif
1223                    rra_current = rra_pos_tmp;
1224                 }
1225
1226 #ifdef DEBUG
1227             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1228 #endif
1229                 scratch_idx = CDP_primary_val;
1230                 if (pcdp_summary != NULL)
1231                 {
1232                    rra_time = (current_time - current_time 
1233                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1234                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1235                 }
1236 #ifdef HAVE_MMAP
1237                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1238                    pcdp_summary, &rra_time, rrd_mmaped_file);
1239 #else
1240                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1241                    pcdp_summary, &rra_time);
1242 #endif
1243                 if (rrd_test_error()) break;
1244
1245                 /* write other rows of the bulk update, if any */
1246                 scratch_idx = CDP_secondary_val;
1247                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1248                 {
1249                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1250                    {
1251 #ifdef DEBUG
1252               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1253                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1254 #endif
1255                           /* wrap */
1256                           rrd.rra_ptr[i].cur_row = 0;
1257                           /* seek back to beginning of current rra */
1258                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1259                           {
1260                          rrd_set_error("seek error in rrd");
1261                          break;
1262                           }
1263 #ifdef DEBUG
1264                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1265 #endif
1266                           rra_current = rra_start;
1267                    }
1268                    if (pcdp_summary != NULL)
1269                    {
1270                       rra_time = (current_time - current_time 
1271                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1272                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1273                    }
1274 #ifdef HAVE_MMAP
1275                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1276                       pcdp_summary, &rra_time, rrd_mmaped_file);
1277 #else
1278                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1279                       pcdp_summary, &rra_time);
1280 #endif
1281                 }
1282                 
1283                 if (rrd_test_error())
1284                   break;
1285                 } /* RRA LOOP */
1286
1287             /* break out of the argument parsing loop if error_string is set */
1288             if (rrd_test_error()){
1289                    free(step_start);
1290                    break;
1291             } 
1292             
1293         } /* endif a pdp_st has occurred */ 
1294         rrd.live_head->last_up = current_time;
1295         rrd.live_head->last_up_usec = current_time_usec; 
1296         free(step_start);
1297     } /* function argument loop */
1298
1299     if (seasonal_coef != NULL) free(seasonal_coef);
1300     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1301         if (rra_step_cnt != NULL) free(rra_step_cnt);
1302     rpnstack_free(&rpnstack);
1303
1304 #ifdef HAVE_MMAP
1305     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1306             rrd_set_error("error writing(unmapping) file: %s", filename);
1307     }
1308 #endif    
1309     /* if we got here and if there is an error and if the file has not been
1310      * written to, then close things up and return. */
1311     if (rrd_test_error()) {
1312         free(updvals);
1313         free(tmpl_idx);
1314         rrd_free(&rrd);
1315         free(pdp_temp);
1316         free(pdp_new);
1317         fclose(rrd_file);
1318         return(-1);
1319     }
1320
1321     /* aargh ... that was tough ... so many loops ... anyway, its done.
1322      * we just need to write back the live header portion now*/
1323
1324     if (fseek(rrd_file, (sizeof(stat_head_t)
1325                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1326                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1327               SEEK_SET) != 0) {
1328         rrd_set_error("seek rrd for live header writeback");
1329         free(updvals);
1330         free(tmpl_idx);
1331         rrd_free(&rrd);
1332         free(pdp_temp);
1333         free(pdp_new);
1334         fclose(rrd_file);
1335         return(-1);
1336     }
1337
1338     if(version >= 3) {
1339             if(fwrite( rrd.live_head,
1340                        sizeof(live_head_t), 1, rrd_file) != 1){
1341                 rrd_set_error("fwrite live_head to rrd");
1342                 free(updvals);
1343                 rrd_free(&rrd);
1344                 free(tmpl_idx);
1345                 free(pdp_temp);
1346                 free(pdp_new);
1347                 fclose(rrd_file);
1348                 return(-1);
1349             }
1350     }
1351     else {
1352             if(fwrite( &rrd.live_head->last_up,
1353                        sizeof(time_t), 1, rrd_file) != 1){
1354                 rrd_set_error("fwrite live_head to rrd");
1355                 free(updvals);
1356                 rrd_free(&rrd);
1357                 free(tmpl_idx);
1358                 free(pdp_temp);
1359                 free(pdp_new);
1360                 fclose(rrd_file);
1361                 return(-1);
1362             }
1363     }
1364             
1365
1366     if(fwrite( rrd.pdp_prep,
1367                sizeof(pdp_prep_t),
1368                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1369         rrd_set_error("ftwrite pdp_prep to rrd");
1370         free(updvals);
1371         rrd_free(&rrd);
1372         free(tmpl_idx);
1373         free(pdp_temp);
1374         free(pdp_new);
1375         fclose(rrd_file);
1376         return(-1);
1377     }
1378
1379     if(fwrite( rrd.cdp_prep,
1380                sizeof(cdp_prep_t),
1381                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1382        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1383
1384         rrd_set_error("ftwrite cdp_prep to rrd");
1385         free(updvals);
1386         free(tmpl_idx);
1387         rrd_free(&rrd);
1388         free(pdp_temp);
1389         free(pdp_new);
1390         fclose(rrd_file);
1391         return(-1);
1392     }
1393
1394     if(fwrite( rrd.rra_ptr,
1395                sizeof(rra_ptr_t), 
1396                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1397         rrd_set_error("fwrite rra_ptr to rrd");
1398         free(updvals);
1399         free(tmpl_idx);
1400         rrd_free(&rrd);
1401         free(pdp_temp);
1402         free(pdp_new);
1403         fclose(rrd_file);
1404         return(-1);
1405     }
1406     
1407 #ifdef HAVE_POSIX_FADVISExxx
1408
1409     /* with update we have write ops, so they will probably not be done by now, this means
1410        the buffers will not get freed. But calling this for the whole file - header
1411        will let the data off the hook as soon as it is written when if it is from a previous
1412        update cycle. Calling fdsync to force things is much too hard here. */
1413
1414     if (0 != posix_fadvise(fileno(rrd_file), rra_begin, 0, POSIX_FADV_DONTNEED)) {
1415          rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1416          fclose(rrd_file);
1417          return(-1);
1418     } 
1419 #endif
1420
1421     /* OK now close the files and free the memory */
1422     if(fclose(rrd_file) != 0){
1423         rrd_set_error("closing rrd");
1424         free(updvals);
1425         free(tmpl_idx);
1426         rrd_free(&rrd);
1427         free(pdp_temp);
1428         free(pdp_new);
1429         return(-1);
1430     }
1431
1432     /* calling the smoothing code here guarantees at most
1433          * one smoothing operation per rrd_update call. Unfortunately,
1434          * it is possible with bulk updates, or a long-delayed update
1435          * for smoothing to occur off-schedule. This really isn't
1436          * critical except during the burning cycles. */
1437         if (schedule_smooth)
1438         {
1439           rrd_file = fopen(filename,"rb+");
1440           
1441
1442           rra_start = rra_begin;
1443           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1444           {
1445             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1446                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1447             {
1448 #ifdef DEBUG
1449               fprintf(stderr,"Running smoother for rra %ld\n",i);
1450 #endif
1451               apply_smoother(&rrd,i,rra_start,rrd_file);
1452               if (rrd_test_error())
1453                 break;
1454             }
1455             rra_start += rrd.rra_def[i].row_cnt
1456               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1457           }
1458 #ifdef HAVE_POSIX_FADVISExxx
1459           /* same procedure as above ... */
1460           if (0 != posix_fadvise(fileno(rrd_file), rra_begin, 0, POSIX_FADV_DONTNEED)) {
1461              rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1462              fclose(rrd_file);
1463              return(-1);
1464           } 
1465 #endif
1466           fclose(rrd_file);
1467         }
1468     rrd_free(&rrd);
1469     free(updvals);
1470     free(tmpl_idx);
1471     free(pdp_new);
1472     free(pdp_temp);
1473     return(0);
1474 }
1475
1476 /*
1477  * get exclusive lock to whole file.
1478  * lock gets removed when we close the file
1479  *
1480  * returns 0 on success
1481  */
1482 int
1483 LockRRD(FILE *rrdfile)
1484 {
1485     int rrd_fd;         /* File descriptor for RRD */
1486     int rcstat;
1487
1488     rrd_fd = fileno(rrdfile);
1489
1490         {
1491 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1492     struct _stat st;
1493
1494     if ( _fstat( rrd_fd, &st ) == 0 ) {
1495             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1496     } else {
1497             rcstat = -1;
1498     }
1499 #else
1500     struct flock        lock;
1501     lock.l_type = F_WRLCK;    /* exclusive write lock */
1502     lock.l_len = 0;           /* whole file */
1503     lock.l_start = 0;         /* start of file */
1504     lock.l_whence = SEEK_SET;   /* end of file */
1505
1506     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1507 #endif
1508         }
1509
1510     return(rcstat);
1511 }
1512
1513
1514 #ifdef HAVE_MMAP
1515 info_t
1516 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1517                unsigned short CDP_scratch_idx, 
1518 #ifndef DEBUG
1519 FILE UNUSED(*rrd_file),
1520 #else
1521 FILE *rrd_file,
1522 #endif
1523                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1524 #else
1525 info_t
1526 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1527                unsigned short CDP_scratch_idx, FILE *rrd_file,
1528                    info_t *pcdp_summary, time_t *rra_time)
1529 #endif
1530 {
1531    unsigned long ds_idx, cdp_idx;
1532    infoval iv;
1533   
1534    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1535    {
1536       /* compute the cdp index */
1537       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1538 #ifdef DEBUG
1539           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1540              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1541              rrd -> rra_def[rra_idx].cf_nam);
1542 #endif 
1543       if (pcdp_summary != NULL)
1544           {
1545              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1546              /* append info to the return hash */
1547                  pcdp_summary = info_push(pcdp_summary,
1548                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1549                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1550                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1551          RD_I_VAL, iv);
1552           }
1553 #ifdef HAVE_MMAP
1554           memcpy((char *)rrd_mmaped_file + *rra_current,
1555                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1556                           sizeof(rrd_value_t));
1557 #else
1558           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1559                  sizeof(rrd_value_t),1,rrd_file) != 1)
1560           { 
1561              rrd_set_error("writing rrd");
1562              return 0;
1563           }
1564 #endif
1565           *rra_current += sizeof(rrd_value_t);
1566         }
1567         return (pcdp_summary);
1568 }