guess the option should still be called template :-)
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.11  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 #ifdef STANDALONE
98 int 
99 main(int argc, char **argv){
100         rrd_update(argc,argv);
101         if (rrd_test_error()) {
102                 printf("RRDtool " PACKAGE_VERSION "  Copyright by Tobi Oetiker, 1997-2005\n\n"
103                         "Usage: rrdupdate filename\n"
104                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105                         "\t\t\ttime|N:value[:value...]\n\n"
106                         "\t\t\tat-time@value[:value...]\n\n"
107                         "\t\t\t[ time:value[:value...] ..]\n\n");
108                                    
109                 printf("ERROR: %s\n",rrd_get_error());
110                 rrd_clear_error();                                                            
111                 return 1;
112         }
113         return 0;
114 }
115 #endif
116
117 info_t *rrd_update_v(int argc, char **argv)
118 {
119     char             *tmplt = NULL;          
120         info_t *result = NULL;
121         infoval rc;
122     optind = 0; opterr = 0;  /* initialize getopt */
123
124     while (1) {
125                 static struct option long_options[] =
126                         {
127                                 {"template",      required_argument, 0, 't'},
128                                 {0,0,0,0}
129                         };
130                 int option_index = 0;
131                 int opt;
132                 opt = getopt_long(argc, argv, "t:", 
133                                                   long_options, &option_index);
134                 
135                 if (opt == EOF)
136                         break;
137                 
138                 switch(opt) {
139                 case 't':
140                         tmplt = optarg;
141                         break;
142                 
143                 case '?':
144                         rrd_set_error("unknown option '%s'",argv[optind-1]);
145             rc.u_int = -1;
146                         goto end_tag;
147                 }
148     }
149
150     /* need at least 2 arguments: filename, data. */
151     if (argc-optind < 2) {
152                 rrd_set_error("Not enough arguments");
153         rc.u_int = -1;
154                 goto end_tag;
155     }
156     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157         rc.u_int = _rrd_update(argv[optind], tmplt,
158                       argc - optind - 1, argv + optind + 1, result);
159     result->value.u_int = rc.u_int;
160 end_tag:
161     return result;
162 }
163
164 int
165 rrd_update(int argc, char **argv)
166 {
167     char             *tmplt = NULL;          
168     int rc;
169     optind = 0; opterr = 0;  /* initialize getopt */
170
171     while (1) {
172                 static struct option long_options[] =
173                         {
174                                 {"template",      required_argument, 0, 't'},
175                                 {0,0,0,0}
176                         };
177                 int option_index = 0;
178                 int opt;
179                 opt = getopt_long(argc, argv, "t:", 
180                                                   long_options, &option_index);
181                 
182                 if (opt == EOF)
183                         break;
184                 
185                 switch(opt) {
186                 case 't':
187                         tmplt = optarg;
188                         break;
189                 
190                 case '?':
191                         rrd_set_error("unknown option '%s'",argv[optind-1]);
192                         return(-1);
193                 }
194     }
195
196     /* need at least 2 arguments: filename, data. */
197     if (argc-optind < 2) {
198                 rrd_set_error("Not enough arguments");
199
200                 return -1;
201     }
202  
203         rc = rrd_update_r(argv[optind], tmplt,
204                       argc - optind - 1, argv + optind + 1);
205     return rc;
206 }
207
208 int
209 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
210 {
211    return _rrd_update(filename, tmplt, argc, argv, NULL);
212 }
213
214 int
215 _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
216    info_t *pcdp_summary)
217 {
218
219     int              arg_i = 2;
220     short            j;
221     unsigned long    i,ii,iii=1;
222
223     unsigned long    rra_begin;          /* byte pointer to the rra
224                                           * area in the rrd file.  this
225                                           * pointer never changes value */
226     unsigned long    rra_start;          /* byte pointer to the rra
227                                           * area in the rrd file.  this
228                                           * pointer changes as each rrd is
229                                           * processed. */
230     unsigned long    rra_current;        /* byte pointer to the current write
231                                           * spot in the rrd file. */
232     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
233     double           interval,
234         pre_int,post_int;                /* interval between this and
235                                           * the last run */
236     unsigned long    proc_pdp_st;        /* which pdp_st was the last
237                                           * to be processed */
238     unsigned long    occu_pdp_st;        /* when was the pdp_st
239                                           * before the last update
240                                           * time */
241     unsigned long    proc_pdp_age;       /* how old was the data in
242                                           * the pdp prep area when it
243                                           * was last updated */
244     unsigned long    occu_pdp_age;       /* how long ago was the last
245                                           * pdp_step time */
246     rrd_value_t      *pdp_new;           /* prepare the incoming data
247                                           * to be added the the
248                                           * existing entry */
249     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
250                                           * to be added the the
251                                           * cdp values */
252
253     long             *tmpl_idx;          /* index representing the settings
254                                             transported by the tmplt index */
255     unsigned long    tmpl_cnt = 2;       /* time and data */
256
257     FILE             *rrd_file;
258     rrd_t            rrd;
259     time_t           current_time = 0;
260     time_t           rra_time = 0;       /* time of update for a RRA */
261     unsigned long    current_time_usec=0;/* microseconds part of current time */
262     struct timeval   tmp_time;           /* used for time conversion */
263
264     char             **updvals;
265     int              schedule_smooth = 0;
266         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267                                          /* a vector of future Holt-Winters seasonal coefs */
268     unsigned long    elapsed_pdp_st;
269                                          /* number of elapsed PDP steps since last update */
270     unsigned long    *rra_step_cnt = NULL;
271                                          /* number of rows to be updated in an RRA for a data
272                                           * value. */
273     unsigned long    start_pdp_offset;
274                                          /* number of PDP steps since the last update that
275                                           * are assigned to the first CDP to be generated
276                                           * since the last update. */
277     unsigned short   scratch_idx;
278                                          /* index into the CDP scratch array */
279     enum cf_en       current_cf;
280                                          /* numeric id of the current consolidation function */
281     rpnstack_t       rpnstack; /* used for COMPUTE DS */
282     int              version;  /* rrd version */
283     char             *endptr; /* used in the conversion */
284 #ifdef HAVE_MMAP
285     void             *rrd_mmaped_file;
286     unsigned long    rrd_filesize;
287 #endif
288
289     rpnstack_init(&rpnstack);
290
291     /* need at least 1 arguments: data. */
292     if (argc < 1) {
293         rrd_set_error("Not enough arguments");
294         return -1;
295     }
296     
297     
298
299     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
300         return -1;
301     }
302     /* initialize time */
303     version = atoi(rrd.stat_head->version);
304     gettimeofday(&tmp_time, 0);
305     normalize_time(&tmp_time);
306     current_time = tmp_time.tv_sec;
307     if(version >= 3) {
308         current_time_usec = tmp_time.tv_usec;
309     }
310     else {
311         current_time_usec = 0;
312     }
313
314     rra_current = rra_start = rra_begin = ftell(rrd_file);
315     /* This is defined in the ANSI C standard, section 7.9.5.3:
316
317         When a file is opened with udpate mode ('+' as the second
318         or third character in the ... list of mode argument
319         variables), both input and ouptut may be performed on the
320         associated stream.  However, ...  input may not be directly
321         followed by output without an intervening call to a file
322         positioning function, unless the input oepration encounters
323         end-of-file. */
324 #ifdef HAVE_MMAP
325     fseek(rrd_file, 0, SEEK_END);
326     rrd_filesize = ftell(rrd_file);
327     fseek(rrd_file, rra_current, SEEK_SET);
328 #else
329     fseek(rrd_file, 0, SEEK_CUR);
330 #endif
331
332     
333     /* get exclusive lock to whole file.
334      * lock gets removed when we close the file.
335      */
336     if (LockRRD(rrd_file) != 0) {
337       rrd_set_error("could not lock RRD");
338       rrd_free(&rrd);
339       fclose(rrd_file);
340       return(-1);   
341     } 
342
343     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344         rrd_set_error("allocating updvals pointer array");
345         rrd_free(&rrd);
346         fclose(rrd_file);
347         return(-1);
348     }
349
350     if ((pdp_temp = malloc(sizeof(rrd_value_t)
351                            *rrd.stat_head->ds_cnt))==NULL){
352         rrd_set_error("allocating pdp_temp ...");
353         free(updvals);
354         rrd_free(&rrd);
355         fclose(rrd_file);
356         return(-1);
357     }
358
359     if ((tmpl_idx = malloc(sizeof(unsigned long)
360                            *(rrd.stat_head->ds_cnt+1)))==NULL){
361         rrd_set_error("allocating tmpl_idx ...");
362         free(pdp_temp);
363         free(updvals);
364         rrd_free(&rrd);
365         fclose(rrd_file);
366         return(-1);
367     }
368     /* initialize tmplt redirector */
369     /* default config example (assume DS 1 is a CDEF DS)
370        tmpl_idx[0] -> 0; (time)
371        tmpl_idx[1] -> 1; (DS 0)
372        tmpl_idx[2] -> 3; (DS 2)
373        tmpl_idx[3] -> 4; (DS 3) */
374     tmpl_idx[0] = 0; /* time */
375     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
376         {
377            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
378               tmpl_idx[ii++]=i;
379         }
380     tmpl_cnt= ii;
381
382     if (tmplt) {
383         /* we should work on a writeable copy here */
384         char *dsname;
385         unsigned int tmpl_len;
386         tmplt = strdup(tmplt);
387         dsname = tmplt;
388         tmpl_cnt = 1; /* the first entry is the time */
389         tmpl_len = strlen(tmplt);
390         for(i=0;i<=tmpl_len ;i++) {
391             if (tmplt[i] == ':' || tmplt[i] == '\0') {
392                 tmplt[i] = '\0';
393                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
394                     rrd_set_error("tmplt contains more DS definitions than RRD");
395                     free(updvals); free(pdp_temp);
396                     free(tmpl_idx); rrd_free(&rrd);
397                     fclose(rrd_file); return(-1);
398                 }
399                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
400                     rrd_set_error("unknown DS name '%s'",dsname);
401                     free(updvals); free(pdp_temp);
402                     free(tmplt);
403                     free(tmpl_idx); rrd_free(&rrd);
404                     fclose(rrd_file); return(-1);
405                 } else {
406                   /* the first element is always the time */
407                   tmpl_idx[tmpl_cnt-1]++; 
408                   /* go to the next entry on the tmplt */
409                   dsname = &tmplt[i+1];
410                   /* fix the damage we did before */
411                   if (i<tmpl_len) {
412                      tmplt[i]=':';
413                   } 
414
415                 }
416             }       
417         }
418         free(tmplt);
419     }
420     if ((pdp_new = malloc(sizeof(rrd_value_t)
421                           *rrd.stat_head->ds_cnt))==NULL){
422         rrd_set_error("allocating pdp_new ...");
423         free(updvals);
424         free(pdp_temp);
425         free(tmpl_idx);
426         rrd_free(&rrd);
427         fclose(rrd_file);
428         return(-1);
429     }
430
431 #ifdef HAVE_MMAP
432     rrd_mmaped_file = mmap(0, 
433                         rrd_filesize, 
434                         PROT_READ | PROT_WRITE, 
435                         MAP_SHARED, 
436                         fileno(rrd_file), 
437                         0);
438     if (rrd_mmaped_file == MAP_FAILED) {
439         rrd_set_error("error mmapping file %s", filename);
440         free(updvals);
441         free(pdp_temp);
442         free(tmpl_idx);
443         rrd_free(&rrd);
444         fclose(rrd_file);
445         return(-1);
446     }
447 #endif
448     /* loop through the arguments. */
449     for(arg_i=0; arg_i<argc;arg_i++) {
450         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
451         char *step_start = stepper;
452         char *p;
453         char *parsetime_error = NULL;
454         enum {atstyle, normal} timesyntax;
455         struct rrd_time_value ds_tv;
456         if (stepper == NULL){
457                 rrd_set_error("failed duplication argv entry");
458                 free(updvals);
459                 free(pdp_temp);  
460                 free(tmpl_idx);
461                 rrd_free(&rrd);
462 #ifdef HAVE_MMAP
463                 munmap(rrd_mmaped_file, rrd_filesize);
464 #endif
465                 fclose(rrd_file);
466                 return(-1);
467          }
468         /* initialize all ds input to unknown except the first one
469            which has always got to be set */
470         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471         strcpy(stepper,argv[arg_i]);
472         updvals[0]=stepper;
473         /* separate all ds elements; first must be examined separately
474            due to alternate time syntax */
475         if ((p=strchr(stepper,'@'))!=NULL) {
476             timesyntax = atstyle;
477             *p = '\0';
478             stepper = p+1;
479         } else if ((p=strchr(stepper,':'))!=NULL) {
480             timesyntax = normal;
481             *p = '\0';
482             stepper = p+1;
483         } else {
484             rrd_set_error("expected timestamp not found in data source from %s:...",
485                           argv[arg_i]);
486             free(step_start);
487             break;
488         }
489         ii=1;
490         updvals[tmpl_idx[ii]] = stepper;
491         while (*stepper) {
492             if (*stepper == ':') {
493                 *stepper = '\0';
494                 ii++;
495                 if (ii<tmpl_cnt){                   
496                     updvals[tmpl_idx[ii]] = stepper+1;
497                 }
498             }
499             stepper++;
500         }
501
502         if (ii != tmpl_cnt-1) {
503             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504                           tmpl_cnt-1, ii, argv[arg_i]);
505             free(step_start);
506             break;
507         }
508         
509         /* get the time from the reading ... handle N */
510         if (timesyntax == atstyle) {
511             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
513                 free(step_start);
514                 break;
515             }
516             if (ds_tv.type == RELATIVE_TO_END_TIME ||
517                 ds_tv.type == RELATIVE_TO_START_TIME) {
518                 rrd_set_error("specifying time relative to the 'start' "
519                               "or 'end' makes no sense here: %s",
520                               updvals[0]);
521                 free(step_start);
522                 break;
523             }
524
525             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
527             
528         } else if (strcmp(updvals[0],"N")==0){
529             gettimeofday(&tmp_time, 0);
530             normalize_time(&tmp_time);
531             current_time = tmp_time.tv_sec;
532             current_time_usec = tmp_time.tv_usec;
533         } else {
534             double tmp;
535             tmp = strtod(updvals[0], 0);
536             current_time = floor(tmp);
537             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
538         }
539         /* dont do any correction for old version RRDs */
540         if(version < 3) 
541             current_time_usec = 0;
542         
543         if(current_time < rrd.live_head->last_up || 
544           (current_time == rrd.live_head->last_up && 
545            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
546             rrd_set_error("illegal attempt to update using time %ld when "
547                           "last update time is %ld (minimum one second step)",
548                           current_time, rrd.live_head->last_up);
549             free(step_start);
550             break;
551         }
552         
553         
554         /* seek to the beginning of the rra's */
555         if (rra_current != rra_begin) {
556 #ifndef HAVE_MMAP
557             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
558                 rrd_set_error("seek error in rrd");
559                 free(step_start);
560                 break;
561             }
562 #endif
563             rra_current = rra_begin;
564         }
565         rra_start = rra_begin;
566
567         /* when was the current pdp started */
568         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
570
571         /* when did the last pdp_st occur */
572         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573         occu_pdp_st = current_time - occu_pdp_age;
574
575         /* interval = current_time - rrd.live_head->last_up; */
576         interval    = (double)(current_time - rrd.live_head->last_up) 
577                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
578
579         if (occu_pdp_st > proc_pdp_st){
580             /* OK we passed the pdp_st moment*/
581             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
582                                                               * occurred before the latest
583                                                               * pdp_st moment*/
584             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
585             post_int = occu_pdp_age;                         /* how much after it */
586             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
587         } else {
588             pre_int = interval;
589             post_int = 0;
590         }
591
592 #ifdef DEBUG
593         printf(
594                "proc_pdp_age %lu\t"
595                "proc_pdp_st %lu\t" 
596                "occu_pfp_age %lu\t" 
597                "occu_pdp_st %lu\t"
598                "int %lf\t"
599                "pre_int %lf\t"
600                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
601                 occu_pdp_age, occu_pdp_st,
602                interval, pre_int, post_int);
603 #endif
604     
605         /* process the data sources and update the pdp_prep 
606          * area accordingly */
607         for(i=0;i<rrd.stat_head->ds_cnt;i++){
608             enum dst_en dst_idx;
609             dst_idx= dst_conv(rrd.ds_def[i].dst);
610
611             /* make sure we do not build diffs with old last_ds values */
612             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
613                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
614                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
615             }
616
617             /* NOTE: DST_CDEF should never enter this if block, because
618              * updvals[i+1][0] is initialized to 'U'; unless the caller
619              * accidently specified a value for the DST_CDEF. To handle 
620               * this case, an extra check is required. */
621
622             if((updvals[i+1][0] != 'U') &&
623                    (dst_idx != DST_CDEF) &&
624                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
625                double rate = DNAN;
626                /* the data source type defines how to process the data */
627                 /* pdp_new contains rate * time ... eg the bytes
628                  * transferred during the interval. Doing it this way saves
629                  * a lot of math operations */
630                 
631
632                 switch(dst_idx){
633                 case DST_COUNTER:
634                 case DST_DERIVE:
635                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
637                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
639                                  break;
640                             }
641                        }
642                        if (rrd_test_error()){
643                             break;
644                        }
645                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646                        if(dst_idx == DST_COUNTER) {
647                           /* simple overflow catcher suggested by Andres Kroonmaa */
648                           /* this will fail terribly for non 32 or 64 bit counters ... */
649                           /* are there any others in SNMP land ? */
650                           if (pdp_new[i] < (double)0.0 ) 
651                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
652                           if (pdp_new[i] < (double)0.0 ) 
653                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
654                        }
655                        rate = pdp_new[i] / interval;
656                     }
657                    else {
658                      pdp_new[i]= DNAN;          
659                    }
660                    break;
661                 case DST_ABSOLUTE:
662                     errno = 0;
663                     pdp_new[i] = strtod(updvals[i+1],&endptr);
664                     if (errno > 0){
665                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
666                         break;
667                     };
668                     if (endptr[0] != '\0'){
669                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
670                         break;
671                     }
672                     rate = pdp_new[i] / interval;                 
673                     break;
674                 case DST_GAUGE:
675                     errno = 0;
676                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
677                     if (errno > 0){
678                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
679                         break;
680                     };
681                     if (endptr[0] != '\0'){
682                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
683                         break;
684                     }
685                     rate = pdp_new[i] / interval;                  
686                     break;
687                 default:
688                     rrd_set_error("rrd contains unknown DS type : '%s'",
689                                   rrd.ds_def[i].dst);
690                     break;
691                 }
692                 /* break out of this for loop if the error string is set */
693                 if (rrd_test_error()){
694                     break;
695                 }
696                /* make sure pdp_temp is neither too large or too small
697                 * if any of these occur it becomes unknown ...
698                 * sorry folks ... */
699                if ( ! isnan(rate) && 
700                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
702                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
704                   pdp_new[i] = DNAN;
705                }               
706             } else {
707                 /* no news is news all the same */
708                 pdp_new[i] = DNAN;
709             }
710             
711             /* make a copy of the command line argument for the next run */
712 #ifdef DEBUG
713             fprintf(stderr,
714                     "prep ds[%lu]\t"
715                     "last_arg '%s'\t"
716                     "this_arg '%s'\t"
717                     "pdp_new %10.2f\n",
718                     i,
719                     rrd.pdp_prep[i].last_ds,
720                     updvals[i+1], pdp_new[i]);
721 #endif
722             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
723                 strncpy(rrd.pdp_prep[i].last_ds,
724                         updvals[i+1],LAST_DS_LEN-1);
725                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
726             }
727         }
728         /* break out of the argument parsing loop if the error_string is set */
729         if (rrd_test_error()){
730             free(step_start);
731             break;
732         }
733         /* has a pdp_st moment occurred since the last run ? */
734
735         if (proc_pdp_st == occu_pdp_st){
736             /* no we have not passed a pdp_st moment. therefore update is simple */
737
738             for(i=0;i<rrd.stat_head->ds_cnt;i++){
739                 if(isnan(pdp_new[i]))
740                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval+0.5);
741                 else {
742                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
743                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
744                      } else {
745                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
746                      }
747                 }
748 #ifdef DEBUG
749                 fprintf(stderr,
750                         "NO PDP  ds[%lu]\t"
751                         "value %10.2f\t"
752                         "unkn_sec %5lu\n",
753                         i,
754                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
755                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
756 #endif
757             }   
758         } else {
759             /* an pdp_st has occurred. */
760
761             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
762              * occurred up to the last run.        
763             pdp_new[] contains rate*seconds from the latest run.
764             pdp_temp[] will contain the rate for cdp */
765
766             for(i=0;i<rrd.stat_head->ds_cnt;i++){
767                 /* update pdp_prep to the current pdp_st. */
768                 
769                 if(isnan(pdp_new[i]))
770                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
771                 else {
772                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
773                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
774                      } else {
775                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
776                      }
777                  }
778                 
779
780                 /* if too much of the pdp_prep is unknown we dump it */
781                 if ( 
782                     /* removed because this does not agree with the definition
783                        a heart beat can be unknown */
784                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
785                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
786                     (occu_pdp_st-proc_pdp_st <= 
787                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
788                     pdp_temp[i] = DNAN;
789                 } else {
790                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
791                         / (double)( occu_pdp_st
792                                     - proc_pdp_st
793                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
794                 }
795
796                 /* process CDEF data sources; remember each CDEF DS can
797                  * only reference other DS with a lower index number */
798             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
799                    rpnp_t *rpnp;
800                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
801                    /* substitue data values for OP_VARIABLE nodes */
802                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
803                    {
804                           if (rpnp[ii].op == OP_VARIABLE) {
805                                  rpnp[ii].op = OP_NUMBER;
806                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
807                           }
808                    }
809                    /* run the rpn calculator */
810                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
811                           free(rpnp);
812                           break; /* exits the data sources pdp_temp loop */
813                    }
814                 }
815         
816                 /* make pdp_prep ready for the next run */
817                 if(isnan(pdp_new[i])){
818                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
819                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
820                 } else {
821                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
822                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
823                         pdp_new[i]/interval*post_int;
824                 }
825
826 #ifdef DEBUG
827                 fprintf(stderr,
828                         "PDP UPD ds[%lu]\t"
829                         "pdp_temp %10.2f\t"
830                         "new_prep %10.2f\t"
831                         "new_unkn_sec %5lu\n",
832                         i, pdp_temp[i],
833                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
834                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
835 #endif
836             }
837
838                 /* if there were errors during the last loop, bail out here */
839             if (rrd_test_error()){
840                free(step_start);
841                break;
842             }
843
844                 /* compute the number of elapsed pdp_st moments */
845                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
846 #ifdef DEBUG
847                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
848 #endif
849                 if (rra_step_cnt == NULL)
850                 {
851                    rra_step_cnt = (unsigned long *) 
852                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
853                 }
854
855             for(i = 0, rra_start = rra_begin;
856                 i < rrd.stat_head->rra_cnt;
857             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
858                 i++)
859                 {
860                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
861                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
862                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
863         if (start_pdp_offset <= elapsed_pdp_st) {
864            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
865                       rrd.rra_def[i].pdp_cnt + 1;
866             } else {
867                    rra_step_cnt[i] = 0;
868                 }
869
870                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
871                 {
872                    /* If this is a bulk update, we need to skip ahead in the seasonal
873                         * arrays so that they will be correct for the next observed value;
874                         * note that for the bulk update itself, no update will occur to
875                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
876                         * be set to DNAN. */
877            if (rra_step_cnt[i] > 2) 
878                    {
879                           /* skip update by resetting rra_step_cnt[i],
880                            * note that this is not data source specific; this is due
881                            * to the bulk update, not a DNAN value for the specific data
882                            * source. */
883                           rra_step_cnt[i] = 0;
884               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
885                              &last_seasonal_coef);
886                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
887                              &seasonal_coef);
888                    }
889                 
890                   /* periodically run a smoother for seasonal effects */
891                   /* Need to use first cdp parameter buffer to track
892                    * burnin (burnin requires a specific smoothing schedule).
893                    * The CDP_init_seasonal parameter is really an RRA level,
894                    * not a data source within RRA level parameter, but the rra_def
895                    * is read only for rrd_update (not flushed to disk). */
896                   iii = i*(rrd.stat_head -> ds_cnt);
897                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
898                           <= BURNIN_CYCLES)
899                   {
900                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
901                                  > rrd.rra_def[i].row_cnt - 1) {
902                            /* mark off one of the burnin cycles */
903                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
904                        schedule_smooth = 1;
905                          }  
906                   } else {
907                          /* someone has no doubt invented a trick to deal with this
908                           * wrap around, but at least this code is clear. */
909                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
910                              rrd.rra_ptr[i].cur_row)
911                          {
912                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
913                                   * mapping between PDP and CDP */
914                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
915                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
916                                  {
917 #ifdef DEBUG
918                                         fprintf(stderr,
919                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
920                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
921                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
922 #endif
923                                         schedule_smooth = 1;
924                                  }
925              } else {
926                                  /* can't rely on negative numbers because we are working with
927                                   * unsigned values */
928                                  /* Don't need modulus here. If we've wrapped more than once, only
929                                   * one smooth is executed at the end. */
930                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
931                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
932                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
933                                  {
934 #ifdef DEBUG
935                                         fprintf(stderr,
936                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
937                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
938                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
939 #endif
940                                         schedule_smooth = 1;
941                                  }
942                          }
943                   }
944
945               rra_current = ftell(rrd_file); 
946                 } /* if cf is DEVSEASONAL or SEASONAL */
947
948         if (rrd_test_error()) break;
949
950                     /* update CDP_PREP areas */
951                     /* loop over data soures within each RRA */
952                     for(ii = 0;
953                         ii < rrd.stat_head->ds_cnt;
954                         ii++)
955                         {
956                         
957                         /* iii indexes the CDP prep area for this data source within the RRA */
958                         iii=i*rrd.stat_head->ds_cnt+ii;
959
960                         if (rrd.rra_def[i].pdp_cnt > 1) {
961                           
962                            if (rra_step_cnt[i] > 0) {
963                            /* If we are in this block, as least 1 CDP value will be written to
964                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
965                                 * to be written, then the "fill in" value is the CDP_secondary_val
966                                 * entry. */
967                                   if (isnan(pdp_temp[ii]))
968                   {
969                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
970                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
971                                   } else {
972                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
973                                           * CDP data entries. No matter the CF, the value is the same because
974                                           * the average, max, min, and last of a list of identical values is
975                                           * the same, namely, the value itself. */
976                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
977                                   }
978                      
979                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
980                                       > rrd.rra_def[i].pdp_cnt*
981                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
982                                   {
983                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
984                                          /* initialize carry over */
985                                          if (current_cf == CF_AVERAGE) {
986                                                    if (isnan(pdp_temp[ii])) { 
987                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
988                                                    } else {
989                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
990                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
991                                                    }
992                                          } else {
993                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
994                                          }
995                                   } else {
996                                          rrd_value_t cum_val, cur_val; 
997                                      switch (current_cf) {
998                                                 case CF_AVERAGE:
999                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
1000                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
1001                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1002                                                (cum_val + cur_val * start_pdp_offset) /
1003                                            (rrd.rra_def[i].pdp_cnt
1004                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1005                                                    /* initialize carry over value */
1006                                                    if (isnan(pdp_temp[ii])) { 
1007                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1008                                                    } else {
1009                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1010                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1011                                                    }
1012                                                    break;
1013                                                 case CF_MAXIMUM:
1014                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1015                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1016 #ifdef DEBUG
1017                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1018                                                           isnan(pdp_temp[ii])) {
1019                                                      fprintf(stderr,
1020                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1021                                                                 i,ii);
1022                                                          exit(-1);
1023                                                   }
1024 #endif
1025                                                   if (cur_val > cum_val)
1026                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1027                                                   else
1028                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1029                                                   /* initialize carry over value */
1030                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1031                                                   break;
1032                                                 case CF_MINIMUM:
1033                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1034                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1035 #ifdef DEBUG
1036                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1037                                                           isnan(pdp_temp[ii])) {
1038                                                      fprintf(stderr,
1039                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1040                                                                 i,ii);
1041                                                          exit(-1);
1042                                                   }
1043 #endif
1044                                                   if (cur_val < cum_val)
1045                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1046                                                   else
1047                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1048                                                   /* initialize carry over value */
1049                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1050                                                   break;
1051                                                 case CF_LAST:
1052                                                 default:
1053                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1054                                                    /* initialize carry over value */
1055                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1056                                                 break;
1057                                          }
1058                                   } /* endif meets xff value requirement for a valid value */
1059                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1060                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1061                                   if (isnan(pdp_temp[ii]))
1062                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1063                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1064                                   else
1065                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1066                } else  /* rra_step_cnt[i]  == 0 */
1067                            {
1068 #ifdef DEBUG
1069                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1070                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1071                                          i,ii);
1072                                   } else {
1073                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1074                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1075                                   }
1076 #endif
1077                                   if (isnan(pdp_temp[ii])) {
1078                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1079                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1080                                   {
1081                                          if (current_cf == CF_AVERAGE) {
1082                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1083                                                    elapsed_pdp_st;
1084                                          } else {
1085                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086                                          }
1087 #ifdef DEBUG
1088                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1089                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1090 #endif
1091                                   } else {
1092                                          switch (current_cf) {
1093                                          case CF_AVERAGE:
1094                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1095                                                    elapsed_pdp_st;
1096                                                 break;
1097                                          case CF_MINIMUM:
1098                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1099                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100                                                 break; 
1101                                          case CF_MAXIMUM:
1102                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1103                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1104                                                 break; 
1105                                          case CF_LAST:
1106                                          default:
1107                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1108                                                 break;
1109                                          }
1110                                   }
1111                            }
1112                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1113                            if (elapsed_pdp_st > 2)
1114                            {
1115                                    switch (current_cf) {
1116                                    case CF_AVERAGE:
1117                                    default:
1118                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1119                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1120                                           break;
1121                    case CF_SEASONAL:
1122                                    case CF_DEVSEASONAL:
1123                                           /* need to update cached seasonal values, so they are consistent
1124                                            * with the bulk update */
1125                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1126                                            * CDP_last_deviation are the same. */
1127                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1128                                                  last_seasonal_coef[ii];
1129                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1130                                                  seasonal_coef[ii];
1131                                           break;
1132                    case CF_HWPREDICT:
1133                                           /* need to update the null_count and last_null_count.
1134                                            * even do this for non-DNAN pdp_temp because the
1135                                            * algorithm is not learning from batch updates. */
1136                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1137                                                  elapsed_pdp_st;
1138                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1139                                                  elapsed_pdp_st - 1;
1140                                           /* fall through */
1141                                    case CF_DEVPREDICT:
1142                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1143                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1144                                           break;
1145                    case CF_FAILURES:
1146                                           /* do not count missed bulk values as failures */
1147                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1148                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1149                                           /* need to reset violations buffer.
1150                                            * could do this more carefully, but for now, just
1151                                            * assume a bulk update wipes away all violations. */
1152                       erase_violations(&rrd, iii, i);
1153                                           break;
1154                                    }
1155                            } 
1156                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1157
1158                         if (rrd_test_error()) break;
1159
1160                         } /* endif data sources loop */
1161         } /* end RRA Loop */
1162
1163                 /* this loop is only entered if elapsed_pdp_st < 3 */
1164                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1165                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1166                 {
1167                for(i = 0, rra_start = rra_begin;
1168                    i < rrd.stat_head->rra_cnt;
1169                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1170                    i++)
1171                    {
1172                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1173
1174                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1175                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1176                           {
1177                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1178                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1179                                 &seasonal_coef);
1180                  rra_current = ftell(rrd_file);
1181                           }
1182                           if (rrd_test_error()) break;
1183                       /* loop over data soures within each RRA */
1184                       for(ii = 0;
1185                           ii < rrd.stat_head->ds_cnt;
1186                           ii++)
1187                           {
1188                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1189                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1190                                     scratch_idx, seasonal_coef);
1191                           }
1192            } /* end RRA Loop */
1193                    if (rrd_test_error()) break;
1194             } /* end elapsed_pdp_st loop */
1195
1196                 if (rrd_test_error()) break;
1197
1198                 /* Ready to write to disk */
1199                 /* Move sequentially through the file, writing one RRA at a time.
1200                  * Note this architecture divorces the computation of CDP with
1201                  * flushing updated RRA entries to disk. */
1202             for(i = 0, rra_start = rra_begin;
1203                 i < rrd.stat_head->rra_cnt;
1204             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1205                 i++) {
1206                 /* is there anything to write for this RRA? If not, continue. */
1207         if (rra_step_cnt[i] == 0) continue;
1208
1209                 /* write the first row */
1210 #ifdef DEBUG
1211         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1212 #endif
1213             rrd.rra_ptr[i].cur_row++;
1214             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1215                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1216                 /* positition on the first row */
1217                 rra_pos_tmp = rra_start +
1218                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1219                 if(rra_pos_tmp != rra_current) {
1220 #ifndef HAVE_MMAP
1221                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1222                       rrd_set_error("seek error in rrd");
1223                       break;
1224                    }
1225 #endif
1226                    rra_current = rra_pos_tmp;
1227                 }
1228
1229 #ifdef DEBUG
1230             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1231 #endif
1232                 scratch_idx = CDP_primary_val;
1233                 if (pcdp_summary != NULL)
1234                 {
1235                    rra_time = (current_time - current_time 
1236                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1237                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1238                 }
1239 #ifdef HAVE_MMAP
1240                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1241                    pcdp_summary, &rra_time, rrd_mmaped_file);
1242 #else
1243                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1244                    pcdp_summary, &rra_time);
1245 #endif
1246                 if (rrd_test_error()) break;
1247
1248                 /* write other rows of the bulk update, if any */
1249                 scratch_idx = CDP_secondary_val;
1250                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1251                 {
1252                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1253                    {
1254 #ifdef DEBUG
1255               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1256                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1257 #endif
1258                           /* wrap */
1259                           rrd.rra_ptr[i].cur_row = 0;
1260                           /* seek back to beginning of current rra */
1261                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1262                           {
1263                          rrd_set_error("seek error in rrd");
1264                          break;
1265                           }
1266 #ifdef DEBUG
1267                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1268 #endif
1269                           rra_current = rra_start;
1270                    }
1271                    if (pcdp_summary != NULL)
1272                    {
1273                       rra_time = (current_time - current_time 
1274                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1275                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1276                    }
1277 #ifdef HAVE_MMAP
1278                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1279                       pcdp_summary, &rra_time, rrd_mmaped_file);
1280 #else
1281                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1282                       pcdp_summary, &rra_time);
1283 #endif
1284                 }
1285                 
1286                 if (rrd_test_error())
1287                   break;
1288                 } /* RRA LOOP */
1289
1290             /* break out of the argument parsing loop if error_string is set */
1291             if (rrd_test_error()){
1292                    free(step_start);
1293                    break;
1294             } 
1295             
1296         } /* endif a pdp_st has occurred */ 
1297         rrd.live_head->last_up = current_time;
1298         rrd.live_head->last_up_usec = current_time_usec; 
1299         free(step_start);
1300     } /* function argument loop */
1301
1302     if (seasonal_coef != NULL) free(seasonal_coef);
1303     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1304         if (rra_step_cnt != NULL) free(rra_step_cnt);
1305     rpnstack_free(&rpnstack);
1306
1307 #ifdef HAVE_MMAP
1308     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1309             rrd_set_error("error writing(unmapping) file: %s", filename);
1310     }
1311 #endif    
1312     /* if we got here and if there is an error and if the file has not been
1313      * written to, then close things up and return. */
1314     if (rrd_test_error()) {
1315         free(updvals);
1316         free(tmpl_idx);
1317         rrd_free(&rrd);
1318         free(pdp_temp);
1319         free(pdp_new);
1320         fclose(rrd_file);
1321         return(-1);
1322     }
1323
1324     /* aargh ... that was tough ... so many loops ... anyway, its done.
1325      * we just need to write back the live header portion now*/
1326
1327     if (fseek(rrd_file, (sizeof(stat_head_t)
1328                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1329                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1330               SEEK_SET) != 0) {
1331         rrd_set_error("seek rrd for live header writeback");
1332         free(updvals);
1333         free(tmpl_idx);
1334         rrd_free(&rrd);
1335         free(pdp_temp);
1336         free(pdp_new);
1337         fclose(rrd_file);
1338         return(-1);
1339     }
1340
1341     if(version >= 3) {
1342             if(fwrite( rrd.live_head,
1343                        sizeof(live_head_t), 1, rrd_file) != 1){
1344                 rrd_set_error("fwrite live_head to rrd");
1345                 free(updvals);
1346                 rrd_free(&rrd);
1347                 free(tmpl_idx);
1348                 free(pdp_temp);
1349                 free(pdp_new);
1350                 fclose(rrd_file);
1351                 return(-1);
1352             }
1353     }
1354     else {
1355             if(fwrite( &rrd.live_head->last_up,
1356                        sizeof(time_t), 1, rrd_file) != 1){
1357                 rrd_set_error("fwrite live_head to rrd");
1358                 free(updvals);
1359                 rrd_free(&rrd);
1360                 free(tmpl_idx);
1361                 free(pdp_temp);
1362                 free(pdp_new);
1363                 fclose(rrd_file);
1364                 return(-1);
1365             }
1366     }
1367             
1368
1369     if(fwrite( rrd.pdp_prep,
1370                sizeof(pdp_prep_t),
1371                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1372         rrd_set_error("ftwrite pdp_prep to rrd");
1373         free(updvals);
1374         rrd_free(&rrd);
1375         free(tmpl_idx);
1376         free(pdp_temp);
1377         free(pdp_new);
1378         fclose(rrd_file);
1379         return(-1);
1380     }
1381
1382     if(fwrite( rrd.cdp_prep,
1383                sizeof(cdp_prep_t),
1384                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1385        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1386
1387         rrd_set_error("ftwrite cdp_prep to rrd");
1388         free(updvals);
1389         free(tmpl_idx);
1390         rrd_free(&rrd);
1391         free(pdp_temp);
1392         free(pdp_new);
1393         fclose(rrd_file);
1394         return(-1);
1395     }
1396
1397     if(fwrite( rrd.rra_ptr,
1398                sizeof(rra_ptr_t), 
1399                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1400         rrd_set_error("fwrite rra_ptr to rrd");
1401         free(updvals);
1402         free(tmpl_idx);
1403         rrd_free(&rrd);
1404         free(pdp_temp);
1405         free(pdp_new);
1406         fclose(rrd_file);
1407         return(-1);
1408     }
1409
1410     /* OK now close the files and free the memory */
1411     if(fclose(rrd_file) != 0){
1412         rrd_set_error("closing rrd");
1413         free(updvals);
1414         free(tmpl_idx);
1415         rrd_free(&rrd);
1416         free(pdp_temp);
1417         free(pdp_new);
1418         return(-1);
1419     }
1420
1421     /* calling the smoothing code here guarantees at most
1422          * one smoothing operation per rrd_update call. Unfortunately,
1423          * it is possible with bulk updates, or a long-delayed update
1424          * for smoothing to occur off-schedule. This really isn't
1425          * critical except during the burning cycles. */
1426         if (schedule_smooth)
1427         {
1428           rrd_file = fopen(filename,"rb+");
1429           rra_start = rra_begin;
1430           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1431           {
1432             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1433                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1434             {
1435 #ifdef DEBUG
1436               fprintf(stderr,"Running smoother for rra %ld\n",i);
1437 #endif
1438               apply_smoother(&rrd,i,rra_start,rrd_file);
1439               if (rrd_test_error())
1440                 break;
1441             }
1442             rra_start += rrd.rra_def[i].row_cnt
1443               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1444           }
1445           fclose(rrd_file);
1446         }
1447     rrd_free(&rrd);
1448     free(updvals);
1449     free(tmpl_idx);
1450     free(pdp_new);
1451     free(pdp_temp);
1452     return(0);
1453 }
1454
1455 /*
1456  * get exclusive lock to whole file.
1457  * lock gets removed when we close the file
1458  *
1459  * returns 0 on success
1460  */
1461 int
1462 LockRRD(FILE *rrdfile)
1463 {
1464     int rrd_fd;         /* File descriptor for RRD */
1465     int rcstat;
1466
1467     rrd_fd = fileno(rrdfile);
1468
1469         {
1470 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1471     struct _stat st;
1472
1473     if ( _fstat( rrd_fd, &st ) == 0 ) {
1474             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1475     } else {
1476             rcstat = -1;
1477     }
1478 #else
1479     struct flock        lock;
1480     lock.l_type = F_WRLCK;    /* exclusive write lock */
1481     lock.l_len = 0;           /* whole file */
1482     lock.l_start = 0;         /* start of file */
1483     lock.l_whence = SEEK_SET;   /* end of file */
1484
1485     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1486 #endif
1487         }
1488
1489     return(rcstat);
1490 }
1491
1492
1493 #ifdef HAVE_MMAP
1494 info_t
1495 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1496                unsigned short CDP_scratch_idx, 
1497 #ifndef DEBUG
1498 FILE UNUSED(*rrd_file),
1499 #else
1500 FILE *rrd_file,
1501 #endif
1502                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1503 #else
1504 info_t
1505 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1506                unsigned short CDP_scratch_idx, FILE *rrd_file,
1507                    info_t *pcdp_summary, time_t *rra_time)
1508 #endif
1509 {
1510    unsigned long ds_idx, cdp_idx;
1511    infoval iv;
1512   
1513    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1514    {
1515       /* compute the cdp index */
1516       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1517 #ifdef DEBUG
1518           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1519              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1520              rrd -> rra_def[rra_idx].cf_nam);
1521 #endif 
1522       if (pcdp_summary != NULL)
1523           {
1524              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1525              /* append info to the return hash */
1526                  pcdp_summary = info_push(pcdp_summary,
1527                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1528                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1529                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1530          RD_I_VAL, iv);
1531           }
1532 #ifdef HAVE_MMAP
1533           memcpy((char *)rrd_mmaped_file + *rra_current,
1534                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1535                           sizeof(rrd_value_t));
1536 #else
1537           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1538                  sizeof(rrd_value_t),1,rrd_file) != 1)
1539           { 
1540              rrd_set_error("writing rrd");
1541              return 0;
1542           }
1543 #endif
1544           *rra_current += sizeof(rrd_value_t);
1545         }
1546         return (pcdp_summary);
1547 }