make mvs stop complaining about uninitialized variables ... -- norman wheeler bigpond.com
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.15  Copyright by Tobi Oetiker, 1997-2006
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 #ifdef STANDALONE
98 int 
99 main(int argc, char **argv){
100         rrd_update(argc,argv);
101         if (rrd_test_error()) {
102                 printf("RRDtool " PACKAGE_VERSION "  Copyright by Tobi Oetiker, 1997-2005\n\n"
103                         "Usage: rrdupdate filename\n"
104                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105                         "\t\t\ttime|N:value[:value...]\n\n"
106                         "\t\t\tat-time@value[:value...]\n\n"
107                         "\t\t\t[ time:value[:value...] ..]\n\n");
108                                    
109                 printf("ERROR: %s\n",rrd_get_error());
110                 rrd_clear_error();                                                            
111                 return 1;
112         }
113         return 0;
114 }
115 #endif
116
117 info_t *rrd_update_v(int argc, char **argv)
118 {
119     char             *tmplt = NULL;          
120         info_t *result = NULL;
121         infoval rc;
122       rc.u_int = -1;
123     optind = 0; opterr = 0;  /* initialize getopt */
124
125     while (1) {
126                 static struct option long_options[] =
127                         {
128                                 {"template",      required_argument, 0, 't'},
129                                 {0,0,0,0}
130                         };
131                 int option_index = 0;
132                 int opt;
133                 opt = getopt_long(argc, argv, "t:", 
134                                                   long_options, &option_index);
135                 
136                 if (opt == EOF)
137                         break;
138                 
139                 switch(opt) {
140                 case 't':
141                         tmplt = optarg;
142                         break;
143                 
144                 case '?':
145                         rrd_set_error("unknown option '%s'",argv[optind-1]);
146                         goto end_tag;
147                 }
148     }
149
150     /* need at least 2 arguments: filename, data. */
151     if (argc-optind < 2) {
152                 rrd_set_error("Not enough arguments");
153                 goto end_tag;
154     }
155     rc.u_int = 0;
156     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157         rc.u_int = _rrd_update(argv[optind], tmplt,
158                       argc - optind - 1, argv + optind + 1, result);
159     result->value.u_int = rc.u_int;
160 end_tag:
161     return result;
162 }
163
164 int
165 rrd_update(int argc, char **argv)
166 {
167     char             *tmplt = NULL;          
168     int rc;
169     optind = 0; opterr = 0;  /* initialize getopt */
170
171     while (1) {
172                 static struct option long_options[] =
173                         {
174                                 {"template",      required_argument, 0, 't'},
175                                 {0,0,0,0}
176                         };
177                 int option_index = 0;
178                 int opt;
179                 opt = getopt_long(argc, argv, "t:", 
180                                                   long_options, &option_index);
181                 
182                 if (opt == EOF)
183                         break;
184                 
185                 switch(opt) {
186                 case 't':
187                         tmplt = optarg;
188                         break;
189                 
190                 case '?':
191                         rrd_set_error("unknown option '%s'",argv[optind-1]);
192                         return(-1);
193                 }
194     }
195
196     /* need at least 2 arguments: filename, data. */
197     if (argc-optind < 2) {
198                 rrd_set_error("Not enough arguments");
199
200                 return -1;
201     }
202  
203         rc = rrd_update_r(argv[optind], tmplt,
204                       argc - optind - 1, argv + optind + 1);
205     return rc;
206 }
207
208 int
209 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
210 {
211    return _rrd_update(filename, tmplt, argc, argv, NULL);
212 }
213
214 int
215 _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
216    info_t *pcdp_summary)
217 {
218
219     int              arg_i = 2;
220     short            j;
221     unsigned long    i,ii,iii=1;
222
223     unsigned long    rra_begin;          /* byte pointer to the rra
224                                           * area in the rrd file.  this
225                                           * pointer never changes value */
226     unsigned long    rra_start;          /* byte pointer to the rra
227                                           * area in the rrd file.  this
228                                           * pointer changes as each rrd is
229                                           * processed. */
230     unsigned long    rra_current;        /* byte pointer to the current write
231                                           * spot in the rrd file. */
232     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
233     double           interval,
234                      pre_int,post_int;   /* interval between this and
235                                           * the last run */
236     unsigned long    proc_pdp_st;        /* which pdp_st was the last
237                                           * to be processed */
238     unsigned long    occu_pdp_st;        /* when was the pdp_st
239                                           * before the last update
240                                           * time */
241     unsigned long    proc_pdp_age;       /* how old was the data in
242                                           * the pdp prep area when it
243                                           * was last updated */
244     unsigned long    occu_pdp_age;       /* how long ago was the last
245                                           * pdp_step time */
246     rrd_value_t      *pdp_new;           /* prepare the incoming data
247                                           * to be added the the
248                                           * existing entry */
249     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
250                                           * to be added the the
251                                           * cdp values */
252
253     long             *tmpl_idx;          /* index representing the settings
254                                             transported by the tmplt index */
255     unsigned long    tmpl_cnt = 2;       /* time and data */
256
257     FILE             *rrd_file;
258     rrd_t            rrd;
259     time_t           current_time = 0;
260     time_t           rra_time = 0;       /* time of update for a RRA */
261     unsigned long    current_time_usec=0;/* microseconds part of current time */
262     struct timeval   tmp_time;           /* used for time conversion */
263
264     char             **updvals;
265     int              schedule_smooth = 0;
266         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267                                          /* a vector of future Holt-Winters seasonal coefs */
268     unsigned long    elapsed_pdp_st;
269                                          /* number of elapsed PDP steps since last update */
270     unsigned long    *rra_step_cnt = NULL;
271                                          /* number of rows to be updated in an RRA for a data
272                                           * value. */
273     unsigned long    start_pdp_offset;
274                                          /* number of PDP steps since the last update that
275                                           * are assigned to the first CDP to be generated
276                                           * since the last update. */
277     unsigned short   scratch_idx;
278                                          /* index into the CDP scratch array */
279     enum cf_en       current_cf;
280                                          /* numeric id of the current consolidation function */
281     rpnstack_t       rpnstack; /* used for COMPUTE DS */
282     int              version;  /* rrd version */
283     char             *endptr; /* used in the conversion */
284 #ifdef HAVE_MMAP
285     void             *rrd_mmaped_file;
286     unsigned long    rrd_filesize;
287 #endif
288
289     rpnstack_init(&rpnstack);
290
291     /* need at least 1 arguments: data. */
292     if (argc < 1) {
293         rrd_set_error("Not enough arguments");
294         return -1;
295     }
296     
297     
298
299     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
300         return -1;
301     }
302     /* initialize time */
303     version = atoi(rrd.stat_head->version);
304     gettimeofday(&tmp_time, 0);
305     normalize_time(&tmp_time);
306     current_time = tmp_time.tv_sec;
307     if(version >= 3) {
308         current_time_usec = tmp_time.tv_usec;
309     }
310     else {
311         current_time_usec = 0;
312     }
313
314     rra_current = rra_start = rra_begin = ftell(rrd_file);
315     /* This is defined in the ANSI C standard, section 7.9.5.3:
316
317         When a file is opened with udpate mode ('+' as the second
318         or third character in the ... list of mode argument
319         variables), both input and ouptut may be performed on the
320         associated stream.  However, ...  input may not be directly
321         followed by output without an intervening call to a file
322         positioning function, unless the input oepration encounters
323         end-of-file. */
324 #ifdef HAVE_MMAP
325     fseek(rrd_file, 0, SEEK_END);
326     rrd_filesize = ftell(rrd_file);
327     fseek(rrd_file, rra_current, SEEK_SET);
328 #else
329     fseek(rrd_file, 0, SEEK_CUR);
330 #endif
331
332     
333     /* get exclusive lock to whole file.
334      * lock gets removed when we close the file.
335      */
336     if (LockRRD(rrd_file) != 0) {
337       rrd_set_error("could not lock RRD");
338       rrd_free(&rrd);
339       fclose(rrd_file);
340       return(-1);   
341     } 
342
343     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344         rrd_set_error("allocating updvals pointer array");
345         rrd_free(&rrd);
346         fclose(rrd_file);
347         return(-1);
348     }
349
350     if ((pdp_temp = malloc(sizeof(rrd_value_t)
351                            *rrd.stat_head->ds_cnt))==NULL){
352         rrd_set_error("allocating pdp_temp ...");
353         free(updvals);
354         rrd_free(&rrd);
355         fclose(rrd_file);
356         return(-1);
357     }
358
359     if ((tmpl_idx = malloc(sizeof(unsigned long)
360                            *(rrd.stat_head->ds_cnt+1)))==NULL){
361         rrd_set_error("allocating tmpl_idx ...");
362         free(pdp_temp);
363         free(updvals);
364         rrd_free(&rrd);
365         fclose(rrd_file);
366         return(-1);
367     }
368     /* initialize tmplt redirector */
369     /* default config example (assume DS 1 is a CDEF DS)
370        tmpl_idx[0] -> 0; (time)
371        tmpl_idx[1] -> 1; (DS 0)
372        tmpl_idx[2] -> 3; (DS 2)
373        tmpl_idx[3] -> 4; (DS 3) */
374     tmpl_idx[0] = 0; /* time */
375     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
376         {
377            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
378               tmpl_idx[ii++]=i;
379         }
380     tmpl_cnt= ii;
381
382     if (tmplt) {
383         /* we should work on a writeable copy here */
384         char *dsname;
385         unsigned int tmpl_len;
386         tmplt = strdup(tmplt);
387         dsname = tmplt;
388         tmpl_cnt = 1; /* the first entry is the time */
389         tmpl_len = strlen(tmplt);
390         for(i=0;i<=tmpl_len ;i++) {
391             if (tmplt[i] == ':' || tmplt[i] == '\0') {
392                 tmplt[i] = '\0';
393                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
394                     rrd_set_error("tmplt contains more DS definitions than RRD");
395                     free(updvals); free(pdp_temp);
396                     free(tmpl_idx); rrd_free(&rrd);
397                     fclose(rrd_file); return(-1);
398                 }
399                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
400                     rrd_set_error("unknown DS name '%s'",dsname);
401                     free(updvals); free(pdp_temp);
402                     free(tmplt);
403                     free(tmpl_idx); rrd_free(&rrd);
404                     fclose(rrd_file); return(-1);
405                 } else {
406                   /* the first element is always the time */
407                   tmpl_idx[tmpl_cnt-1]++; 
408                   /* go to the next entry on the tmplt */
409                   dsname = &tmplt[i+1];
410                   /* fix the damage we did before */
411                   if (i<tmpl_len) {
412                      tmplt[i]=':';
413                   } 
414
415                 }
416             }       
417         }
418         free(tmplt);
419     }
420     if ((pdp_new = malloc(sizeof(rrd_value_t)
421                           *rrd.stat_head->ds_cnt))==NULL){
422         rrd_set_error("allocating pdp_new ...");
423         free(updvals);
424         free(pdp_temp);
425         free(tmpl_idx);
426         rrd_free(&rrd);
427         fclose(rrd_file);
428         return(-1);
429     }
430
431 #ifdef HAVE_MMAP
432     rrd_mmaped_file = mmap(0, 
433                         rrd_filesize, 
434                         PROT_READ | PROT_WRITE, 
435                         MAP_SHARED, 
436                         fileno(rrd_file), 
437                         0);
438     if (rrd_mmaped_file == MAP_FAILED) {
439         rrd_set_error("error mmapping file %s", filename);
440         free(updvals);
441         free(pdp_temp);
442         free(tmpl_idx);
443         rrd_free(&rrd);
444         fclose(rrd_file);
445         return(-1);
446     }
447 #endif
448     /* loop through the arguments. */
449     for(arg_i=0; arg_i<argc;arg_i++) {
450         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
451         char *step_start = stepper;
452         char *p;
453         char *parsetime_error = NULL;
454         enum {atstyle, normal} timesyntax;
455         struct rrd_time_value ds_tv;
456         if (stepper == NULL){
457                 rrd_set_error("failed duplication argv entry");
458                 free(updvals);
459                 free(pdp_temp);  
460                 free(tmpl_idx);
461                 rrd_free(&rrd);
462 #ifdef HAVE_MMAP
463                 munmap(rrd_mmaped_file, rrd_filesize);
464 #endif
465                 fclose(rrd_file);
466                 return(-1);
467          }
468         /* initialize all ds input to unknown except the first one
469            which has always got to be set */
470         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471         strcpy(stepper,argv[arg_i]);
472         updvals[0]=stepper;
473         /* separate all ds elements; first must be examined separately
474            due to alternate time syntax */
475         if ((p=strchr(stepper,'@'))!=NULL) {
476             timesyntax = atstyle;
477             *p = '\0';
478             stepper = p+1;
479         } else if ((p=strchr(stepper,':'))!=NULL) {
480             timesyntax = normal;
481             *p = '\0';
482             stepper = p+1;
483         } else {
484             rrd_set_error("expected timestamp not found in data source from %s:...",
485                           argv[arg_i]);
486             free(step_start);
487             break;
488         }
489         ii=1;
490         updvals[tmpl_idx[ii]] = stepper;
491         while (*stepper) {
492             if (*stepper == ':') {
493                 *stepper = '\0';
494                 ii++;
495                 if (ii<tmpl_cnt){                   
496                     updvals[tmpl_idx[ii]] = stepper+1;
497                 }
498             }
499             stepper++;
500         }
501
502         if (ii != tmpl_cnt-1) {
503             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504                           tmpl_cnt-1, ii, argv[arg_i]);
505             free(step_start);
506             break;
507         }
508         
509         /* get the time from the reading ... handle N */
510         if (timesyntax == atstyle) {
511             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
513                 free(step_start);
514                 break;
515             }
516             if (ds_tv.type == RELATIVE_TO_END_TIME ||
517                 ds_tv.type == RELATIVE_TO_START_TIME) {
518                 rrd_set_error("specifying time relative to the 'start' "
519                               "or 'end' makes no sense here: %s",
520                               updvals[0]);
521                 free(step_start);
522                 break;
523             }
524
525             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
527             
528         } else if (strcmp(updvals[0],"N")==0){
529             gettimeofday(&tmp_time, 0);
530             normalize_time(&tmp_time);
531             current_time = tmp_time.tv_sec;
532             current_time_usec = tmp_time.tv_usec;
533         } else {
534             double tmp;
535             tmp = strtod(updvals[0], 0);
536             current_time = floor(tmp);
537             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
538         }
539         /* dont do any correction for old version RRDs */
540         if(version < 3) 
541             current_time_usec = 0;
542         
543         if(current_time < rrd.live_head->last_up || 
544           (current_time == rrd.live_head->last_up && 
545            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
546             rrd_set_error("illegal attempt to update using time %ld when "
547                           "last update time is %ld (minimum one second step)",
548                           current_time, rrd.live_head->last_up);
549             free(step_start);
550             break;
551         }
552         
553         
554         /* seek to the beginning of the rra's */
555         if (rra_current != rra_begin) {
556 #ifndef HAVE_MMAP
557             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
558                 rrd_set_error("seek error in rrd");
559                 free(step_start);
560                 break;
561             }
562 #endif
563             rra_current = rra_begin;
564         }
565         rra_start = rra_begin;
566
567         /* when was the current pdp started */
568         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
570
571         /* when did the last pdp_st occur */
572         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573         occu_pdp_st = current_time - occu_pdp_age;
574
575         /* interval = current_time - rrd.live_head->last_up; */
576         interval    = (double)(current_time - rrd.live_head->last_up) 
577                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
578
579         if (occu_pdp_st > proc_pdp_st){
580             /* OK we passed the pdp_st moment*/
581             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
582                                                               * occurred before the latest
583                                                               * pdp_st moment*/
584             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
585             post_int = occu_pdp_age;                         /* how much after it */
586             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
587         } else {
588             pre_int = interval;
589             post_int = 0;
590         }
591
592 #ifdef DEBUG
593         printf(
594                "proc_pdp_age %lu\t"
595                "proc_pdp_st %lu\t" 
596                "occu_pfp_age %lu\t" 
597                "occu_pdp_st %lu\t"
598                "int %lf\t"
599                "pre_int %lf\t"
600                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
601                 occu_pdp_age, occu_pdp_st,
602                interval, pre_int, post_int);
603 #endif
604     
605         /* process the data sources and update the pdp_prep 
606          * area accordingly */
607         for(i=0;i<rrd.stat_head->ds_cnt;i++){
608             enum dst_en dst_idx;
609             dst_idx= dst_conv(rrd.ds_def[i].dst);
610
611             /* make sure we do not build diffs with old last_ds values */
612             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
613                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
614                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
615             }
616
617             /* NOTE: DST_CDEF should never enter this if block, because
618              * updvals[i+1][0] is initialized to 'U'; unless the caller
619              * accidently specified a value for the DST_CDEF. To handle 
620               * this case, an extra check is required. */
621
622             if((updvals[i+1][0] != 'U') &&
623                    (dst_idx != DST_CDEF) &&
624                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
625                double rate = DNAN;
626                /* the data source type defines how to process the data */
627                 /* pdp_new contains rate * time ... eg the bytes
628                  * transferred during the interval. Doing it this way saves
629                  * a lot of math operations */
630                 
631
632                 switch(dst_idx){
633                 case DST_COUNTER:
634                 case DST_DERIVE:
635                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
637                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
639                                  break;
640                             }
641                        }
642                        if (rrd_test_error()){
643                             break;
644                        }
645                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646                        if(dst_idx == DST_COUNTER) {
647                           /* simple overflow catcher suggested by Andres Kroonmaa */
648                           /* this will fail terribly for non 32 or 64 bit counters ... */
649                           /* are there any others in SNMP land ? */
650                           if (pdp_new[i] < (double)0.0 ) 
651                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
652                           if (pdp_new[i] < (double)0.0 ) 
653                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
654                        }
655                        rate = pdp_new[i] / interval;
656                     }
657                    else {
658                      pdp_new[i]= DNAN;          
659                    }
660                    break;
661                 case DST_ABSOLUTE:
662                     errno = 0;
663                     pdp_new[i] = strtod(updvals[i+1],&endptr);
664                     if (errno > 0){
665                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
666                         break;
667                     };
668                     if (endptr[0] != '\0'){
669                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
670                         break;
671                     }
672                     rate = pdp_new[i] / interval;                 
673                     break;
674                 case DST_GAUGE:
675                     errno = 0;
676                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
677                     if (errno > 0){
678                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
679                         break;
680                     };
681                     if (endptr[0] != '\0'){
682                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
683                         break;
684                     }
685                     rate = pdp_new[i] / interval;                  
686                     break;
687                 default:
688                     rrd_set_error("rrd contains unknown DS type : '%s'",
689                                   rrd.ds_def[i].dst);
690                     break;
691                 }
692                 /* break out of this for loop if the error string is set */
693                 if (rrd_test_error()){
694                     break;
695                 }
696                /* make sure pdp_temp is neither too large or too small
697                 * if any of these occur it becomes unknown ...
698                 * sorry folks ... */
699                if ( ! isnan(rate) && 
700                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
702                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
704                   pdp_new[i] = DNAN;
705                }               
706             } else {
707                 /* no news is news all the same */
708                 pdp_new[i] = DNAN;
709             }
710
711             
712             /* make a copy of the command line argument for the next run */
713 #ifdef DEBUG
714             fprintf(stderr,
715                     "prep ds[%lu]\t"
716                     "last_arg '%s'\t"
717                     "this_arg '%s'\t"
718                     "pdp_new %10.2f\n",
719                     i,
720                     rrd.pdp_prep[i].last_ds,
721                     updvals[i+1], pdp_new[i]);
722 #endif
723             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
724                 strncpy(rrd.pdp_prep[i].last_ds,
725                         updvals[i+1],LAST_DS_LEN-1);
726                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
727             }
728         }
729         /* break out of the argument parsing loop if the error_string is set */
730         if (rrd_test_error()){
731             free(step_start);
732             break;
733         }
734         /* has a pdp_st moment occurred since the last run ? */
735
736         if (proc_pdp_st == occu_pdp_st){
737             /* no we have not passed a pdp_st moment. therefore update is simple */
738
739             for(i=0;i<rrd.stat_head->ds_cnt;i++){
740                 if(isnan(pdp_new[i])) {            
741                     /* this is not realy accurate if we use subsecond data arival time
742                        should have thought of it when going subsecond resolution ...
743                        sorry next format change we will have it! */
744                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
745                 } else {
746                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
747                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
748                      } else {
749                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
750                      }
751                 }
752 #ifdef DEBUG
753                 fprintf(stderr,
754                         "NO PDP  ds[%lu]\t"
755                         "value %10.2f\t"
756                         "unkn_sec %5lu\n",
757                         i,
758                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
759                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
760 #endif
761             }   
762         } else {
763             /* an pdp_st has occurred. */
764
765             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
766              * occurred up to the last run.        
767             pdp_new[] contains rate*seconds from the latest run.
768             pdp_temp[] will contain the rate for cdp */
769
770             for(i=0;i<rrd.stat_head->ds_cnt;i++){
771                 /* update pdp_prep to the current pdp_st. */
772                 double pre_unknown = 0.0;               
773                 if(isnan(pdp_new[i]))
774                     /* a final bit of unkonwn to be added bevore calculation
775                      * we use a tempaorary variable for this so that we 
776                      * don't have to turn integer lines before using the value */                
777                     pre_unknown = pre_int;
778                 else {
779                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
780                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
781                      } else {
782                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
783                      }
784                  }
785                 
786
787                 /* if too much of the pdp_prep is unknown we dump it */
788                 if ( 
789                     /* removed because this does not agree with the definition
790                        a heart beat can be unknown */
791                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
792                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
793                     /* if the interval is larger thatn mrhb we get NAN */
794                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
795                     (occu_pdp_st-proc_pdp_st <= 
796                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
797                     pdp_temp[i] = DNAN;
798                 } else {
799                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
800                         / ((double)(occu_pdp_st - proc_pdp_st
801                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
802                             -pre_unknown);
803                 }
804
805                 /* process CDEF data sources; remember each CDEF DS can
806                  * only reference other DS with a lower index number */
807             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
808                    rpnp_t *rpnp;
809                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
810                    /* substitue data values for OP_VARIABLE nodes */
811                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
812                    {
813                           if (rpnp[ii].op == OP_VARIABLE) {
814                                  rpnp[ii].op = OP_NUMBER;
815                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
816                           }
817                    }
818                    /* run the rpn calculator */
819                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
820                           free(rpnp);
821                           break; /* exits the data sources pdp_temp loop */
822                    }
823                 }
824         
825                 /* make pdp_prep ready for the next run */
826                 if(isnan(pdp_new[i])){
827                     /* this is not realy accurate if we use subsecond data arival time
828                        should have thought of it when going subsecond resolution ...
829                        sorry next format change we will have it! */
830                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
831                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
832                 } else {
833                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
834                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
835                         pdp_new[i]/interval*post_int;
836                 }
837
838 #ifdef DEBUG
839                 fprintf(stderr,
840                         "PDP UPD ds[%lu]\t"
841                         "pdp_temp %10.2f\t"
842                         "new_prep %10.2f\t"
843                         "new_unkn_sec %5lu\n",
844                         i, pdp_temp[i],
845                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
846                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
847 #endif
848             }
849
850                 /* if there were errors during the last loop, bail out here */
851             if (rrd_test_error()){
852                free(step_start);
853                break;
854             }
855
856                 /* compute the number of elapsed pdp_st moments */
857                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
858 #ifdef DEBUG
859                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
860 #endif
861                 if (rra_step_cnt == NULL)
862                 {
863                    rra_step_cnt = (unsigned long *) 
864                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
865                 }
866
867             for(i = 0, rra_start = rra_begin;
868                 i < rrd.stat_head->rra_cnt;
869             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
870                 i++)
871                 {
872                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
873                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
874                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
875         if (start_pdp_offset <= elapsed_pdp_st) {
876            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
877                       rrd.rra_def[i].pdp_cnt + 1;
878             } else {
879                    rra_step_cnt[i] = 0;
880                 }
881
882                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
883                 {
884                    /* If this is a bulk update, we need to skip ahead in the seasonal
885                         * arrays so that they will be correct for the next observed value;
886                         * note that for the bulk update itself, no update will occur to
887                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
888                         * be set to DNAN. */
889            if (rra_step_cnt[i] > 2) 
890                    {
891                           /* skip update by resetting rra_step_cnt[i],
892                            * note that this is not data source specific; this is due
893                            * to the bulk update, not a DNAN value for the specific data
894                            * source. */
895                           rra_step_cnt[i] = 0;
896               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
897                              &last_seasonal_coef);
898                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
899                              &seasonal_coef);
900                    }
901                 
902                   /* periodically run a smoother for seasonal effects */
903                   /* Need to use first cdp parameter buffer to track
904                    * burnin (burnin requires a specific smoothing schedule).
905                    * The CDP_init_seasonal parameter is really an RRA level,
906                    * not a data source within RRA level parameter, but the rra_def
907                    * is read only for rrd_update (not flushed to disk). */
908                   iii = i*(rrd.stat_head -> ds_cnt);
909                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
910                           <= BURNIN_CYCLES)
911                   {
912                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
913                                  > rrd.rra_def[i].row_cnt - 1) {
914                            /* mark off one of the burnin cycles */
915                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
916                        schedule_smooth = 1;
917                          }  
918                   } else {
919                          /* someone has no doubt invented a trick to deal with this
920                           * wrap around, but at least this code is clear. */
921                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
922                              rrd.rra_ptr[i].cur_row)
923                          {
924                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
925                                   * mapping between PDP and CDP */
926                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
927                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
928                                  {
929 #ifdef DEBUG
930                                         fprintf(stderr,
931                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
932                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
933                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
934 #endif
935                                         schedule_smooth = 1;
936                                  }
937              } else {
938                                  /* can't rely on negative numbers because we are working with
939                                   * unsigned values */
940                                  /* Don't need modulus here. If we've wrapped more than once, only
941                                   * one smooth is executed at the end. */
942                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
943                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
944                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
945                                  {
946 #ifdef DEBUG
947                                         fprintf(stderr,
948                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
949                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
950                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
951 #endif
952                                         schedule_smooth = 1;
953                                  }
954                          }
955                   }
956
957               rra_current = ftell(rrd_file); 
958                 } /* if cf is DEVSEASONAL or SEASONAL */
959
960         if (rrd_test_error()) break;
961
962                     /* update CDP_PREP areas */
963                     /* loop over data soures within each RRA */
964                     for(ii = 0;
965                         ii < rrd.stat_head->ds_cnt;
966                         ii++)
967                         {
968                         
969                         /* iii indexes the CDP prep area for this data source within the RRA */
970                         iii=i*rrd.stat_head->ds_cnt+ii;
971
972                         if (rrd.rra_def[i].pdp_cnt > 1) {
973                           
974                            if (rra_step_cnt[i] > 0) {
975                            /* If we are in this block, as least 1 CDP value will be written to
976                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
977                                 * to be written, then the "fill in" value is the CDP_secondary_val
978                                 * entry. */
979                                   if (isnan(pdp_temp[ii]))
980                   {
981                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
982                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
983                                   } else {
984                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
985                                           * CDP data entries. No matter the CF, the value is the same because
986                                           * the average, max, min, and last of a list of identical values is
987                                           * the same, namely, the value itself. */
988                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
989                                   }
990                      
991                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
992                                       > rrd.rra_def[i].pdp_cnt*
993                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
994                                   {
995                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
996                                          /* initialize carry over */
997                                          if (current_cf == CF_AVERAGE) {
998                                                    if (isnan(pdp_temp[ii])) { 
999                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1000                                                    } else {
1001                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003                                                    }
1004                                          } else {
1005                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1006                                          }
1007                                   } else {
1008                                          rrd_value_t cum_val, cur_val; 
1009                                      switch (current_cf) {
1010                                                 case CF_AVERAGE:
1011                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
1012                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
1013                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1014                                                (cum_val + cur_val * start_pdp_offset) /
1015                                            (rrd.rra_def[i].pdp_cnt
1016                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1017                                                    /* initialize carry over value */
1018                                                    if (isnan(pdp_temp[ii])) { 
1019                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1020                                                    } else {
1021                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1022                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1023                                                    }
1024                                                    break;
1025                                                 case CF_MAXIMUM:
1026                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1027                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1028 #ifdef DEBUG
1029                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1030                                                           isnan(pdp_temp[ii])) {
1031                                                      fprintf(stderr,
1032                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1033                                                                 i,ii);
1034                                                          exit(-1);
1035                                                   }
1036 #endif
1037                                                   if (cur_val > cum_val)
1038                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1039                                                   else
1040                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1041                                                   /* initialize carry over value */
1042                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1043                                                   break;
1044                                                 case CF_MINIMUM:
1045                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1046                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1047 #ifdef DEBUG
1048                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1049                                                           isnan(pdp_temp[ii])) {
1050                                                      fprintf(stderr,
1051                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1052                                                                 i,ii);
1053                                                          exit(-1);
1054                                                   }
1055 #endif
1056                                                   if (cur_val < cum_val)
1057                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1058                                                   else
1059                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1060                                                   /* initialize carry over value */
1061                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062                                                   break;
1063                                                 case CF_LAST:
1064                                                 default:
1065                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1066                                                    /* initialize carry over value */
1067                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068                                                 break;
1069                                          }
1070                                   } /* endif meets xff value requirement for a valid value */
1071                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1072                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1073                                   if (isnan(pdp_temp[ii]))
1074                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1075                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1076                                   else
1077                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1078                } else  /* rra_step_cnt[i]  == 0 */
1079                            {
1080 #ifdef DEBUG
1081                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1082                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1083                                          i,ii);
1084                                   } else {
1085                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1086                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1087                                   }
1088 #endif
1089                                   if (isnan(pdp_temp[ii])) {
1090                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1091                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1092                                   {
1093                                          if (current_cf == CF_AVERAGE) {
1094                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1095                                                    elapsed_pdp_st;
1096                                          } else {
1097                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098                                          }
1099 #ifdef DEBUG
1100                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1101                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1102 #endif
1103                                   } else {
1104                                          switch (current_cf) {
1105                                          case CF_AVERAGE:
1106                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1107                                                    elapsed_pdp_st;
1108                                                 break;
1109                                          case CF_MINIMUM:
1110                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1111                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1112                                                 break; 
1113                                          case CF_MAXIMUM:
1114                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1115                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1116                                                 break; 
1117                                          case CF_LAST:
1118                                          default:
1119                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1120                                                 break;
1121                                          }
1122                                   }
1123                            }
1124                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1125                            if (elapsed_pdp_st > 2)
1126                            {
1127                                    switch (current_cf) {
1128                                    case CF_AVERAGE:
1129                                    default:
1130                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1131                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1132                                           break;
1133                    case CF_SEASONAL:
1134                                    case CF_DEVSEASONAL:
1135                                           /* need to update cached seasonal values, so they are consistent
1136                                            * with the bulk update */
1137                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1138                                            * CDP_last_deviation are the same. */
1139                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1140                                                  last_seasonal_coef[ii];
1141                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1142                                                  seasonal_coef[ii];
1143                                           break;
1144                    case CF_HWPREDICT:
1145                                           /* need to update the null_count and last_null_count.
1146                                            * even do this for non-DNAN pdp_temp because the
1147                                            * algorithm is not learning from batch updates. */
1148                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1149                                                  elapsed_pdp_st;
1150                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1151                                                  elapsed_pdp_st - 1;
1152                                           /* fall through */
1153                                    case CF_DEVPREDICT:
1154                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1155                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1156                                           break;
1157                    case CF_FAILURES:
1158                                           /* do not count missed bulk values as failures */
1159                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1160                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1161                                           /* need to reset violations buffer.
1162                                            * could do this more carefully, but for now, just
1163                                            * assume a bulk update wipes away all violations. */
1164                       erase_violations(&rrd, iii, i);
1165                                           break;
1166                                    }
1167                            } 
1168                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1169
1170                         if (rrd_test_error()) break;
1171
1172                         } /* endif data sources loop */
1173         } /* end RRA Loop */
1174
1175                 /* this loop is only entered if elapsed_pdp_st < 3 */
1176                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1177                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1178                 {
1179                for(i = 0, rra_start = rra_begin;
1180                    i < rrd.stat_head->rra_cnt;
1181                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1182                    i++)
1183                    {
1184                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1185
1186                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1187                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1188                           {
1189                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1190                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1191                                 &seasonal_coef);
1192                  rra_current = ftell(rrd_file);
1193                           }
1194                           if (rrd_test_error()) break;
1195                       /* loop over data soures within each RRA */
1196                       for(ii = 0;
1197                           ii < rrd.stat_head->ds_cnt;
1198                           ii++)
1199                           {
1200                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1201                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1202                                     scratch_idx, seasonal_coef);
1203                           }
1204            } /* end RRA Loop */
1205                    if (rrd_test_error()) break;
1206             } /* end elapsed_pdp_st loop */
1207
1208                 if (rrd_test_error()) break;
1209
1210                 /* Ready to write to disk */
1211                 /* Move sequentially through the file, writing one RRA at a time.
1212                  * Note this architecture divorces the computation of CDP with
1213                  * flushing updated RRA entries to disk. */
1214             for(i = 0, rra_start = rra_begin;
1215                 i < rrd.stat_head->rra_cnt;
1216             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1217                 i++) {
1218                 /* is there anything to write for this RRA? If not, continue. */
1219         if (rra_step_cnt[i] == 0) continue;
1220
1221                 /* write the first row */
1222 #ifdef DEBUG
1223         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1224 #endif
1225             rrd.rra_ptr[i].cur_row++;
1226             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1227                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1228                 /* positition on the first row */
1229                 rra_pos_tmp = rra_start +
1230                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1231                 if(rra_pos_tmp != rra_current) {
1232 #ifndef HAVE_MMAP
1233                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1234                       rrd_set_error("seek error in rrd");
1235                       break;
1236                    }
1237 #endif
1238                    rra_current = rra_pos_tmp;
1239                 }
1240
1241 #ifdef DEBUG
1242             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1243 #endif
1244                 scratch_idx = CDP_primary_val;
1245                 if (pcdp_summary != NULL)
1246                 {
1247                    rra_time = (current_time - current_time 
1248                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1249                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1250                 }
1251 #ifdef HAVE_MMAP
1252                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1253                    pcdp_summary, &rra_time, rrd_mmaped_file);
1254 #else
1255                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1256                    pcdp_summary, &rra_time);
1257 #endif
1258                 if (rrd_test_error()) break;
1259
1260                 /* write other rows of the bulk update, if any */
1261                 scratch_idx = CDP_secondary_val;
1262                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1263                 {
1264                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1265                    {
1266 #ifdef DEBUG
1267               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1268                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1269 #endif
1270                           /* wrap */
1271                           rrd.rra_ptr[i].cur_row = 0;
1272                           /* seek back to beginning of current rra */
1273                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1274                           {
1275                          rrd_set_error("seek error in rrd");
1276                          break;
1277                           }
1278 #ifdef DEBUG
1279                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1280 #endif
1281                           rra_current = rra_start;
1282                    }
1283                    if (pcdp_summary != NULL)
1284                    {
1285                       rra_time = (current_time - current_time 
1286                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1287                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1288                    }
1289 #ifdef HAVE_MMAP
1290                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1291                       pcdp_summary, &rra_time, rrd_mmaped_file);
1292 #else
1293                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1294                       pcdp_summary, &rra_time);
1295 #endif
1296                 }
1297                 
1298                 if (rrd_test_error())
1299                   break;
1300                 } /* RRA LOOP */
1301
1302             /* break out of the argument parsing loop if error_string is set */
1303             if (rrd_test_error()){
1304                    free(step_start);
1305                    break;
1306             } 
1307             
1308         } /* endif a pdp_st has occurred */ 
1309         rrd.live_head->last_up = current_time;
1310         rrd.live_head->last_up_usec = current_time_usec; 
1311         free(step_start);
1312     } /* function argument loop */
1313
1314     if (seasonal_coef != NULL) free(seasonal_coef);
1315     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1316         if (rra_step_cnt != NULL) free(rra_step_cnt);
1317     rpnstack_free(&rpnstack);
1318
1319 #ifdef HAVE_MMAP
1320     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1321             rrd_set_error("error writing(unmapping) file: %s", filename);
1322     }
1323 #endif    
1324     /* if we got here and if there is an error and if the file has not been
1325      * written to, then close things up and return. */
1326     if (rrd_test_error()) {
1327         free(updvals);
1328         free(tmpl_idx);
1329         rrd_free(&rrd);
1330         free(pdp_temp);
1331         free(pdp_new);
1332         fclose(rrd_file);
1333         return(-1);
1334     }
1335
1336     /* aargh ... that was tough ... so many loops ... anyway, its done.
1337      * we just need to write back the live header portion now*/
1338
1339     if (fseek(rrd_file, (sizeof(stat_head_t)
1340                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1341                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1342               SEEK_SET) != 0) {
1343         rrd_set_error("seek rrd for live header writeback");
1344         free(updvals);
1345         free(tmpl_idx);
1346         rrd_free(&rrd);
1347         free(pdp_temp);
1348         free(pdp_new);
1349         fclose(rrd_file);
1350         return(-1);
1351     }
1352
1353     if(version >= 3) {
1354             if(fwrite( rrd.live_head,
1355                        sizeof(live_head_t), 1, rrd_file) != 1){
1356                 rrd_set_error("fwrite live_head to rrd");
1357                 free(updvals);
1358                 rrd_free(&rrd);
1359                 free(tmpl_idx);
1360                 free(pdp_temp);
1361                 free(pdp_new);
1362                 fclose(rrd_file);
1363                 return(-1);
1364             }
1365     }
1366     else {
1367             if(fwrite( &rrd.live_head->last_up,
1368                        sizeof(time_t), 1, rrd_file) != 1){
1369                 rrd_set_error("fwrite live_head to rrd");
1370                 free(updvals);
1371                 rrd_free(&rrd);
1372                 free(tmpl_idx);
1373                 free(pdp_temp);
1374                 free(pdp_new);
1375                 fclose(rrd_file);
1376                 return(-1);
1377             }
1378     }
1379             
1380
1381     if(fwrite( rrd.pdp_prep,
1382                sizeof(pdp_prep_t),
1383                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1384         rrd_set_error("ftwrite pdp_prep to rrd");
1385         free(updvals);
1386         rrd_free(&rrd);
1387         free(tmpl_idx);
1388         free(pdp_temp);
1389         free(pdp_new);
1390         fclose(rrd_file);
1391         return(-1);
1392     }
1393
1394     if(fwrite( rrd.cdp_prep,
1395                sizeof(cdp_prep_t),
1396                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1397        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1398
1399         rrd_set_error("ftwrite cdp_prep to rrd");
1400         free(updvals);
1401         free(tmpl_idx);
1402         rrd_free(&rrd);
1403         free(pdp_temp);
1404         free(pdp_new);
1405         fclose(rrd_file);
1406         return(-1);
1407     }
1408
1409     if(fwrite( rrd.rra_ptr,
1410                sizeof(rra_ptr_t), 
1411                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1412         rrd_set_error("fwrite rra_ptr to rrd");
1413         free(updvals);
1414         free(tmpl_idx);
1415         rrd_free(&rrd);
1416         free(pdp_temp);
1417         free(pdp_new);
1418         fclose(rrd_file);
1419         return(-1);
1420     }
1421
1422     /* OK now close the files and free the memory */
1423     if(fclose(rrd_file) != 0){
1424         rrd_set_error("closing rrd");
1425         free(updvals);
1426         free(tmpl_idx);
1427         rrd_free(&rrd);
1428         free(pdp_temp);
1429         free(pdp_new);
1430         return(-1);
1431     }
1432
1433     /* calling the smoothing code here guarantees at most
1434          * one smoothing operation per rrd_update call. Unfortunately,
1435          * it is possible with bulk updates, or a long-delayed update
1436          * for smoothing to occur off-schedule. This really isn't
1437          * critical except during the burning cycles. */
1438         if (schedule_smooth)
1439         {
1440           rrd_file = fopen(filename,"rb+");
1441           rra_start = rra_begin;
1442           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1443           {
1444             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1445                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1446             {
1447 #ifdef DEBUG
1448               fprintf(stderr,"Running smoother for rra %ld\n",i);
1449 #endif
1450               apply_smoother(&rrd,i,rra_start,rrd_file);
1451               if (rrd_test_error())
1452                 break;
1453             }
1454             rra_start += rrd.rra_def[i].row_cnt
1455               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1456           }
1457           fclose(rrd_file);
1458         }
1459     rrd_free(&rrd);
1460     free(updvals);
1461     free(tmpl_idx);
1462     free(pdp_new);
1463     free(pdp_temp);
1464     return(0);
1465 }
1466
1467 /*
1468  * get exclusive lock to whole file.
1469  * lock gets removed when we close the file
1470  *
1471  * returns 0 on success
1472  */
1473 int
1474 LockRRD(FILE *rrdfile)
1475 {
1476     int rrd_fd;         /* File descriptor for RRD */
1477     int rcstat;
1478
1479     rrd_fd = fileno(rrdfile);
1480
1481         {
1482 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1483     struct _stat st;
1484
1485     if ( _fstat( rrd_fd, &st ) == 0 ) {
1486             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1487     } else {
1488             rcstat = -1;
1489     }
1490 #else
1491     struct flock        lock;
1492     lock.l_type = F_WRLCK;    /* exclusive write lock */
1493     lock.l_len = 0;           /* whole file */
1494     lock.l_start = 0;         /* start of file */
1495     lock.l_whence = SEEK_SET;   /* end of file */
1496
1497     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1498 #endif
1499         }
1500
1501     return(rcstat);
1502 }
1503
1504
1505 #ifdef HAVE_MMAP
1506 info_t
1507 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1508                unsigned short CDP_scratch_idx, 
1509 #ifndef DEBUG
1510 FILE UNUSED(*rrd_file),
1511 #else
1512 FILE *rrd_file,
1513 #endif
1514                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1515 #else
1516 info_t
1517 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1518                unsigned short CDP_scratch_idx, FILE *rrd_file,
1519                    info_t *pcdp_summary, time_t *rra_time)
1520 #endif
1521 {
1522    unsigned long ds_idx, cdp_idx;
1523    infoval iv;
1524   
1525    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1526    {
1527       /* compute the cdp index */
1528       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1529 #ifdef DEBUG
1530           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1531              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1532              rrd -> rra_def[rra_idx].cf_nam);
1533 #endif 
1534       if (pcdp_summary != NULL)
1535           {
1536              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1537              /* append info to the return hash */
1538                  pcdp_summary = info_push(pcdp_summary,
1539                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1540                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1541                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1542          RD_I_VAL, iv);
1543           }
1544 #ifdef HAVE_MMAP
1545           memcpy((char *)rrd_mmaped_file + *rra_current,
1546                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1547                           sizeof(rrd_value_t));
1548 #else
1549           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1550                  sizeof(rrd_value_t),1,rrd_file) != 1)
1551           { 
1552              rrd_set_error("writing rrd");
1553              return 0;
1554           }
1555 #endif
1556           *rra_current += sizeof(rrd_value_t);
1557         }
1558         return (pcdp_summary);
1559 }