added posix_fadvise support (untested) ... this should help performance by
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
91 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99     char             *tmplt = NULL;          
100         info_t *result = NULL;
101         infoval rc;
102       rc.u_int = -1;
103     optind = 0; opterr = 0;  /* initialize getopt */
104
105     while (1) {
106                 static struct option long_options[] =
107                         {
108                                 {"template",      required_argument, 0, 't'},
109                                 {0,0,0,0}
110                         };
111                 int option_index = 0;
112                 int opt;
113                 opt = getopt_long(argc, argv, "t:", 
114                                                   long_options, &option_index);
115                 
116                 if (opt == EOF)
117                         break;
118                 
119                 switch(opt) {
120                 case 't':
121                         tmplt = optarg;
122                         break;
123                 
124                 case '?':
125                         rrd_set_error("unknown option '%s'",argv[optind-1]);
126                         goto end_tag;
127                 }
128     }
129
130     /* need at least 2 arguments: filename, data. */
131     if (argc-optind < 2) {
132                 rrd_set_error("Not enough arguments");
133                 goto end_tag;
134     }
135     rc.u_int = 0;
136     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137         rc.u_int = _rrd_update(argv[optind], tmplt,
138                       argc - optind - 1, (const char **)(argv + optind + 1), result);
139     result->value.u_int = rc.u_int;
140 end_tag:
141     return result;
142 }
143
144 int
145 rrd_update(int argc, char **argv)
146 {
147     char             *tmplt = NULL;          
148     int rc;
149     optind = 0; opterr = 0;  /* initialize getopt */
150
151     while (1) {
152                 static struct option long_options[] =
153                         {
154                                 {"template",      required_argument, 0, 't'},
155                                 {0,0,0,0}
156                         };
157                 int option_index = 0;
158                 int opt;
159                 opt = getopt_long(argc, argv, "t:", 
160                                                   long_options, &option_index);
161                 
162                 if (opt == EOF)
163                         break;
164                 
165                 switch(opt) {
166                 case 't':
167                         tmplt = optarg;
168                         break;
169                 
170                 case '?':
171                         rrd_set_error("unknown option '%s'",argv[optind-1]);
172                         return(-1);
173                 }
174     }
175
176     /* need at least 2 arguments: filename, data. */
177     if (argc-optind < 2) {
178                 rrd_set_error("Not enough arguments");
179
180                 return -1;
181     }
182  
183         rc = rrd_update_r(argv[optind], tmplt,
184                       argc - optind - 1, (const char **)(argv + optind + 1));
185     return rc;
186 }
187
188 int
189 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
190 {
191    return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
193
194 int
195 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
196    info_t *pcdp_summary)
197 {
198
199     int              arg_i = 2;
200     short            j;
201     unsigned long    i,ii,iii=1;
202
203     unsigned long    rra_begin;          /* byte pointer to the rra
204                                           * area in the rrd file.  this
205                                           * pointer never changes value */
206     unsigned long    rra_start;          /* byte pointer to the rra
207                                           * area in the rrd file.  this
208                                           * pointer changes as each rrd is
209                                           * processed. */
210     unsigned long    rra_current;        /* byte pointer to the current write
211                                           * spot in the rrd file. */
212     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
213     double           interval,
214                      pre_int,post_int;   /* interval between this and
215                                           * the last run */
216     unsigned long    proc_pdp_st;        /* which pdp_st was the last
217                                           * to be processed */
218     unsigned long    occu_pdp_st;        /* when was the pdp_st
219                                           * before the last update
220                                           * time */
221     unsigned long    proc_pdp_age;       /* how old was the data in
222                                           * the pdp prep area when it
223                                           * was last updated */
224     unsigned long    occu_pdp_age;       /* how long ago was the last
225                                           * pdp_step time */
226     rrd_value_t      *pdp_new;           /* prepare the incoming data
227                                           * to be added the the
228                                           * existing entry */
229     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
230                                           * to be added the the
231                                           * cdp values */
232
233     long             *tmpl_idx;          /* index representing the settings
234                                             transported by the tmplt index */
235     unsigned long    tmpl_cnt = 2;       /* time and data */
236
237     FILE             *rrd_file;
238     rrd_t            rrd;
239     time_t           current_time = 0;
240     time_t           rra_time = 0;       /* time of update for a RRA */
241     unsigned long    current_time_usec=0;/* microseconds part of current time */
242     struct timeval   tmp_time;           /* used for time conversion */
243
244     char             **updvals;
245     int              schedule_smooth = 0;
246         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247                                          /* a vector of future Holt-Winters seasonal coefs */
248     unsigned long    elapsed_pdp_st;
249                                          /* number of elapsed PDP steps since last update */
250     unsigned long    *rra_step_cnt = NULL;
251                                          /* number of rows to be updated in an RRA for a data
252                                           * value. */
253     unsigned long    start_pdp_offset;
254                                          /* number of PDP steps since the last update that
255                                           * are assigned to the first CDP to be generated
256                                           * since the last update. */
257     unsigned short   scratch_idx;
258                                          /* index into the CDP scratch array */
259     enum cf_en       current_cf;
260                                          /* numeric id of the current consolidation function */
261     rpnstack_t       rpnstack; /* used for COMPUTE DS */
262     int              version;  /* rrd version */
263     char             *endptr; /* used in the conversion */
264
265 #ifdef HAVE_MMAP
266     void             *rrd_mmaped_file;
267     unsigned long    rrd_filesize;
268 #endif
269
270
271     rpnstack_init(&rpnstack);
272
273     /* need at least 1 arguments: data. */
274     if (argc < 1) {
275         rrd_set_error("Not enough arguments");
276         return -1;
277     }
278     
279     
280
281     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
282         return -1;
283     }
284
285     /* initialize time */
286     version = atoi(rrd.stat_head->version);
287     gettimeofday(&tmp_time, 0);
288     normalize_time(&tmp_time);
289     current_time = tmp_time.tv_sec;
290     if(version >= 3) {
291         current_time_usec = tmp_time.tv_usec;
292     }
293     else {
294         current_time_usec = 0;
295     }
296
297     rra_current = rra_start = rra_begin = ftell(rrd_file);
298     /* This is defined in the ANSI C standard, section 7.9.5.3:
299
300         When a file is opened with udpate mode ('+' as the second
301         or third character in the ... list of mode argument
302         variables), both input and ouptut may be performed on the
303         associated stream.  However, ...  input may not be directly
304         followed by output without an intervening call to a file
305         positioning function, unless the input oepration encounters
306         end-of-file. */
307 #ifdef HAVE_MMAP
308     fseek(rrd_file, 0, SEEK_END);
309     rrd_filesize = ftell(rrd_file);
310     fseek(rrd_file, rra_current, SEEK_SET);
311 #else
312     fseek(rrd_file, 0, SEEK_CUR);
313 #endif
314
315     
316     /* get exclusive lock to whole file.
317      * lock gets removed when we close the file.
318      */
319     if (LockRRD(rrd_file) != 0) {
320       rrd_set_error("could not lock RRD");
321       rrd_free(&rrd);
322       fclose(rrd_file);
323       return(-1);   
324     } 
325
326     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
327         rrd_set_error("allocating updvals pointer array");
328         rrd_free(&rrd);
329         fclose(rrd_file);
330         return(-1);
331     }
332
333     if ((pdp_temp = malloc(sizeof(rrd_value_t)
334                            *rrd.stat_head->ds_cnt))==NULL){
335         rrd_set_error("allocating pdp_temp ...");
336         free(updvals);
337         rrd_free(&rrd);
338         fclose(rrd_file);
339         return(-1);
340     }
341
342     if ((tmpl_idx = malloc(sizeof(unsigned long)
343                            *(rrd.stat_head->ds_cnt+1)))==NULL){
344         rrd_set_error("allocating tmpl_idx ...");
345         free(pdp_temp);
346         free(updvals);
347         rrd_free(&rrd);
348         fclose(rrd_file);
349         return(-1);
350     }
351     /* initialize tmplt redirector */
352     /* default config example (assume DS 1 is a CDEF DS)
353        tmpl_idx[0] -> 0; (time)
354        tmpl_idx[1] -> 1; (DS 0)
355        tmpl_idx[2] -> 3; (DS 2)
356        tmpl_idx[3] -> 4; (DS 3) */
357     tmpl_idx[0] = 0; /* time */
358     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
359         {
360            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
361               tmpl_idx[ii++]=i;
362         }
363     tmpl_cnt= ii;
364
365     if (tmplt) {
366         /* we should work on a writeable copy here */
367         char *dsname;
368         unsigned int tmpl_len;
369         char *tmplt_copy = strdup(tmplt);
370         dsname = tmplt_copy;
371         tmpl_cnt = 1; /* the first entry is the time */
372         tmpl_len = strlen(tmplt_copy);
373         for(i=0;i<=tmpl_len ;i++) {
374             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
375                 tmplt_copy[i] = '\0';
376                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
377                     rrd_set_error("tmplt contains more DS definitions than RRD");
378                     free(updvals); free(pdp_temp);
379                     free(tmpl_idx); rrd_free(&rrd);
380                     fclose(rrd_file); return(-1);
381                 }
382                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
383                     rrd_set_error("unknown DS name '%s'",dsname);
384                     free(updvals); free(pdp_temp);
385                     free(tmplt_copy);
386                     free(tmpl_idx); rrd_free(&rrd);
387                     fclose(rrd_file); return(-1);
388                 } else {
389                   /* the first element is always the time */
390                   tmpl_idx[tmpl_cnt-1]++; 
391                   /* go to the next entry on the tmplt_copy */
392                   dsname = &tmplt_copy[i+1];
393                   /* fix the damage we did before */
394                   if (i<tmpl_len) {
395                      tmplt_copy[i]=':';
396                   } 
397
398                 }
399             }       
400         }
401         free(tmplt_copy);
402     }
403     if ((pdp_new = malloc(sizeof(rrd_value_t)
404                           *rrd.stat_head->ds_cnt))==NULL){
405         rrd_set_error("allocating pdp_new ...");
406         free(updvals);
407         free(pdp_temp);
408         free(tmpl_idx);
409         rrd_free(&rrd);
410         fclose(rrd_file);
411         return(-1);
412     }
413
414 #ifdef HAVE_MMAP
415     rrd_mmaped_file = mmap(0, 
416                         rrd_filesize, 
417                         PROT_READ | PROT_WRITE, 
418                         MAP_SHARED, 
419                         fileno(rrd_file), 
420                         0);
421     if (rrd_mmaped_file == MAP_FAILED) {
422         rrd_set_error("error mmapping file %s", filename);
423         free(updvals);
424         free(pdp_temp);
425         free(tmpl_idx);
426         rrd_free(&rrd);
427         fclose(rrd_file);
428         return(-1);
429     }
430 #endif
431     /* loop through the arguments. */
432     for(arg_i=0; arg_i<argc;arg_i++) {
433         char *stepper = strdup(argv[arg_i]);
434         char *step_start = stepper;
435         char *p;
436         char *parsetime_error = NULL;
437         enum {atstyle, normal} timesyntax;
438         struct rrd_time_value ds_tv;
439         if (stepper == NULL){
440                 rrd_set_error("failed duplication argv entry");
441                 free(step_start);
442                 free(updvals);
443                 free(pdp_temp);  
444                 free(tmpl_idx);
445                 rrd_free(&rrd);
446 #ifdef HAVE_MMAP
447                 munmap(rrd_mmaped_file, rrd_filesize);
448 #endif
449                 fclose(rrd_file);
450                 return(-1);
451          }
452         /* initialize all ds input to unknown except the first one
453            which has always got to be set */
454         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
455         updvals[0]=stepper;
456         /* separate all ds elements; first must be examined separately
457            due to alternate time syntax */
458         if ((p=strchr(stepper,'@'))!=NULL) {
459             timesyntax = atstyle;
460             *p = '\0';
461             stepper = p+1;
462         } else if ((p=strchr(stepper,':'))!=NULL) {
463             timesyntax = normal;
464             *p = '\0';
465             stepper = p+1;
466         } else {
467             rrd_set_error("expected timestamp not found in data source from %s",
468                           argv[arg_i]);
469             free(step_start);
470             break;
471         }
472         ii=1;
473         updvals[tmpl_idx[ii]] = stepper;
474         while (*stepper) {
475             if (*stepper == ':') {
476                 *stepper = '\0';
477                 ii++;
478                 if (ii<tmpl_cnt){                   
479                     updvals[tmpl_idx[ii]] = stepper+1;
480                 }
481             }
482             stepper++;
483         }
484
485         if (ii != tmpl_cnt-1) {
486             rrd_set_error("expected %lu data source readings (got %lu) from %s",
487                           tmpl_cnt-1, ii, argv[arg_i]);
488             free(step_start);
489             break;
490         }
491         
492         /* get the time from the reading ... handle N */
493         if (timesyntax == atstyle) {
494             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
495                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
496                 free(step_start);
497                 break;
498             }
499             if (ds_tv.type == RELATIVE_TO_END_TIME ||
500                 ds_tv.type == RELATIVE_TO_START_TIME) {
501                 rrd_set_error("specifying time relative to the 'start' "
502                               "or 'end' makes no sense here: %s",
503                               updvals[0]);
504                 free(step_start);
505                 break;
506             }
507
508             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
509             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
510             
511         } else if (strcmp(updvals[0],"N")==0){
512             gettimeofday(&tmp_time, 0);
513             normalize_time(&tmp_time);
514             current_time = tmp_time.tv_sec;
515             current_time_usec = tmp_time.tv_usec;
516         } else {
517             double tmp;
518             tmp = strtod(updvals[0], 0);
519             current_time = floor(tmp);
520             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
521         }
522         /* dont do any correction for old version RRDs */
523         if(version < 3) 
524             current_time_usec = 0;
525         
526         if(current_time < rrd.live_head->last_up || 
527           (current_time == rrd.live_head->last_up && 
528            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
529             rrd_set_error("illegal attempt to update using time %ld when "
530                           "last update time is %ld (minimum one second step)",
531                           current_time, rrd.live_head->last_up);
532             free(step_start);
533             break;
534         }
535         
536         
537         /* seek to the beginning of the rra's */
538         if (rra_current != rra_begin) {
539 #ifndef HAVE_MMAP
540             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
541                 rrd_set_error("seek error in rrd");
542                 free(step_start);
543                 break;
544             }
545 #endif
546             rra_current = rra_begin;
547         }
548         rra_start = rra_begin;
549
550         /* when was the current pdp started */
551         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
552         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
553
554         /* when did the last pdp_st occur */
555         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
556         occu_pdp_st = current_time - occu_pdp_age;
557
558         /* interval = current_time - rrd.live_head->last_up; */
559         interval    = (double)(current_time - rrd.live_head->last_up) 
560                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
561
562         if (occu_pdp_st > proc_pdp_st){
563             /* OK we passed the pdp_st moment*/
564             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
565                                                               * occurred before the latest
566                                                               * pdp_st moment*/
567             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
568             post_int = occu_pdp_age;                         /* how much after it */
569             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
570         } else {
571             pre_int = interval;
572             post_int = 0;
573         }
574
575 #ifdef DEBUG
576         printf(
577                "proc_pdp_age %lu\t"
578                "proc_pdp_st %lu\t" 
579                "occu_pfp_age %lu\t" 
580                "occu_pdp_st %lu\t"
581                "int %lf\t"
582                "pre_int %lf\t"
583                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
584                 occu_pdp_age, occu_pdp_st,
585                interval, pre_int, post_int);
586 #endif
587     
588         /* process the data sources and update the pdp_prep 
589          * area accordingly */
590         for(i=0;i<rrd.stat_head->ds_cnt;i++){
591             enum dst_en dst_idx;
592             dst_idx= dst_conv(rrd.ds_def[i].dst);
593
594             /* make sure we do not build diffs with old last_ds values */
595             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
596                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
597                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
598             }
599
600             /* NOTE: DST_CDEF should never enter this if block, because
601              * updvals[i+1][0] is initialized to 'U'; unless the caller
602              * accidently specified a value for the DST_CDEF. To handle 
603               * this case, an extra check is required. */
604
605             if((updvals[i+1][0] != 'U') &&
606                    (dst_idx != DST_CDEF) &&
607                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
608                double rate = DNAN;
609                /* the data source type defines how to process the data */
610                 /* pdp_new contains rate * time ... eg the bytes
611                  * transferred during the interval. Doing it this way saves
612                  * a lot of math operations */
613                 
614
615                 switch(dst_idx){
616                 case DST_COUNTER:
617                 case DST_DERIVE:
618                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
619                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
620                             if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
621                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
622                                  break;
623                             }
624                        }
625                        if (rrd_test_error()){
626                             break;
627                        }
628                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
629                        if(dst_idx == DST_COUNTER) {
630                           /* simple overflow catcher suggested by Andres Kroonmaa */
631                           /* this will fail terribly for non 32 or 64 bit counters ... */
632                           /* are there any others in SNMP land ? */
633                           if (pdp_new[i] < (double)0.0 ) 
634                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
635                           if (pdp_new[i] < (double)0.0 ) 
636                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
637                        }
638                        rate = pdp_new[i] / interval;
639                     }
640                    else {
641                      pdp_new[i]= DNAN;          
642                    }
643                    break;
644                 case DST_ABSOLUTE:
645                     errno = 0;
646                     pdp_new[i] = strtod(updvals[i+1],&endptr);
647                     if (errno > 0){
648                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
649                         break;
650                     };
651                     if (endptr[0] != '\0'){
652                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
653                         break;
654                     }
655                     rate = pdp_new[i] / interval;                 
656                     break;
657                 case DST_GAUGE:
658                     errno = 0;
659                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
660                     if (errno > 0){
661                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
662                         break;
663                     };
664                     if (endptr[0] != '\0'){
665                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
666                         break;
667                     }
668                     rate = pdp_new[i] / interval;                  
669                     break;
670                 default:
671                     rrd_set_error("rrd contains unknown DS type : '%s'",
672                                   rrd.ds_def[i].dst);
673                     break;
674                 }
675                 /* break out of this for loop if the error string is set */
676                 if (rrd_test_error()){
677                     break;
678                 }
679                /* make sure pdp_temp is neither too large or too small
680                 * if any of these occur it becomes unknown ...
681                 * sorry folks ... */
682                if ( ! isnan(rate) && 
683                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
684                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
685                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
686                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
687                   pdp_new[i] = DNAN;
688                }               
689             } else {
690                 /* no news is news all the same */
691                 pdp_new[i] = DNAN;
692             }
693
694             
695             /* make a copy of the command line argument for the next run */
696 #ifdef DEBUG
697             fprintf(stderr,
698                     "prep ds[%lu]\t"
699                     "last_arg '%s'\t"
700                     "this_arg '%s'\t"
701                     "pdp_new %10.2f\n",
702                     i,
703                     rrd.pdp_prep[i].last_ds,
704                     updvals[i+1], pdp_new[i]);
705 #endif
706             strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
707             rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
708         }
709         /* break out of the argument parsing loop if the error_string is set */
710         if (rrd_test_error()){
711             free(step_start);
712             break;
713         }
714         /* has a pdp_st moment occurred since the last run ? */
715
716         if (proc_pdp_st == occu_pdp_st){
717             /* no we have not passed a pdp_st moment. therefore update is simple */
718
719             for(i=0;i<rrd.stat_head->ds_cnt;i++){
720                 if(isnan(pdp_new[i])) {            
721                     /* this is not realy accurate if we use subsecond data arival time
722                        should have thought of it when going subsecond resolution ...
723                        sorry next format change we will have it! */
724                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
725                 } else {
726                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
727                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
728                      } else {
729                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
730                      }
731                 }
732 #ifdef DEBUG
733                 fprintf(stderr,
734                         "NO PDP  ds[%lu]\t"
735                         "value %10.2f\t"
736                         "unkn_sec %5lu\n",
737                         i,
738                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
739                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
740 #endif
741             }   
742         } else {
743             /* an pdp_st has occurred. */
744
745             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
746              * occurred up to the last run.        
747             pdp_new[] contains rate*seconds from the latest run.
748             pdp_temp[] will contain the rate for cdp */
749
750             for(i=0;i<rrd.stat_head->ds_cnt;i++){
751                 /* update pdp_prep to the current pdp_st. */
752                 double pre_unknown = 0.0;               
753                 if(isnan(pdp_new[i]))
754                     /* a final bit of unkonwn to be added bevore calculation
755                      * we use a tempaorary variable for this so that we 
756                      * don't have to turn integer lines before using the value */                
757                     pre_unknown = pre_int;
758                 else {
759                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
760                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
761                      } else {
762                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
763                      }
764                  }
765                 
766
767                 /* if too much of the pdp_prep is unknown we dump it */
768                 if ( 
769                     /* removed because this does not agree with the definition
770                        a heart beat can be unknown */
771                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
772                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
773                     /* if the interval is larger thatn mrhb we get NAN */
774                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
775                     (occu_pdp_st-proc_pdp_st <= 
776                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
777                     pdp_temp[i] = DNAN;
778                 } else {
779                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
780                         / ((double)(occu_pdp_st - proc_pdp_st
781                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
782                             -pre_unknown);
783                 }
784
785                 /* process CDEF data sources; remember each CDEF DS can
786                  * only reference other DS with a lower index number */
787             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
788                    rpnp_t *rpnp;
789                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
790                    /* substitue data values for OP_VARIABLE nodes */
791                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
792                    {
793                           if (rpnp[ii].op == OP_VARIABLE) {
794                                  rpnp[ii].op = OP_NUMBER;
795                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
796                           }
797                    }
798                    /* run the rpn calculator */
799                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
800                           free(rpnp);
801                           break; /* exits the data sources pdp_temp loop */
802                    }
803                 }
804         
805                 /* make pdp_prep ready for the next run */
806                 if(isnan(pdp_new[i])){
807                     /* this is not realy accurate if we use subsecond data arival time
808                        should have thought of it when going subsecond resolution ...
809                        sorry next format change we will have it! */
810                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
811                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
812                 } else {
813                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
814                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
815                         pdp_new[i]/interval*post_int;
816                 }
817
818 #ifdef DEBUG
819                 fprintf(stderr,
820                         "PDP UPD ds[%lu]\t"
821                         "pdp_temp %10.2f\t"
822                         "new_prep %10.2f\t"
823                         "new_unkn_sec %5lu\n",
824                         i, pdp_temp[i],
825                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
826                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
827 #endif
828             }
829
830                 /* if there were errors during the last loop, bail out here */
831             if (rrd_test_error()){
832                free(step_start);
833                break;
834             }
835
836                 /* compute the number of elapsed pdp_st moments */
837                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
838 #ifdef DEBUG
839                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
840 #endif
841                 if (rra_step_cnt == NULL)
842                 {
843                    rra_step_cnt = (unsigned long *) 
844                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
845                 }
846
847             for(i = 0, rra_start = rra_begin;
848                 i < rrd.stat_head->rra_cnt;
849             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
850                 i++)
851                 {
852                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
853                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
854                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
855         if (start_pdp_offset <= elapsed_pdp_st) {
856            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
857                       rrd.rra_def[i].pdp_cnt + 1;
858             } else {
859                    rra_step_cnt[i] = 0;
860                 }
861
862                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
863                 {
864                    /* If this is a bulk update, we need to skip ahead in the seasonal
865                         * arrays so that they will be correct for the next observed value;
866                         * note that for the bulk update itself, no update will occur to
867                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
868                         * be set to DNAN. */
869            if (rra_step_cnt[i] > 2) 
870                    {
871                           /* skip update by resetting rra_step_cnt[i],
872                            * note that this is not data source specific; this is due
873                            * to the bulk update, not a DNAN value for the specific data
874                            * source. */
875                           rra_step_cnt[i] = 0;
876               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
877                              &last_seasonal_coef);
878                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
879                              &seasonal_coef);
880                    }
881                 
882                   /* periodically run a smoother for seasonal effects */
883                   /* Need to use first cdp parameter buffer to track
884                    * burnin (burnin requires a specific smoothing schedule).
885                    * The CDP_init_seasonal parameter is really an RRA level,
886                    * not a data source within RRA level parameter, but the rra_def
887                    * is read only for rrd_update (not flushed to disk). */
888                   iii = i*(rrd.stat_head -> ds_cnt);
889                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
890                           <= BURNIN_CYCLES)
891                   {
892                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
893                                  > rrd.rra_def[i].row_cnt - 1) {
894                            /* mark off one of the burnin cycles */
895                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
896                        schedule_smooth = 1;
897                          }  
898                   } else {
899                          /* someone has no doubt invented a trick to deal with this
900                           * wrap around, but at least this code is clear. */
901                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
902                              rrd.rra_ptr[i].cur_row)
903                          {
904                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
905                                   * mapping between PDP and CDP */
906                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
908                                  {
909 #ifdef DEBUG
910                                         fprintf(stderr,
911                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
912                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
913                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
914 #endif
915                                         schedule_smooth = 1;
916                                  }
917              } else {
918                                  /* can't rely on negative numbers because we are working with
919                                   * unsigned values */
920                                  /* Don't need modulus here. If we've wrapped more than once, only
921                                   * one smooth is executed at the end. */
922                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
923                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
924                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
925                                  {
926 #ifdef DEBUG
927                                         fprintf(stderr,
928                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
929                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
930                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932                                         schedule_smooth = 1;
933                                  }
934                          }
935                   }
936
937               rra_current = ftell(rrd_file); 
938                 } /* if cf is DEVSEASONAL or SEASONAL */
939
940         if (rrd_test_error()) break;
941
942                     /* update CDP_PREP areas */
943                     /* loop over data soures within each RRA */
944                     for(ii = 0;
945                         ii < rrd.stat_head->ds_cnt;
946                         ii++)
947                         {
948                         
949                         /* iii indexes the CDP prep area for this data source within the RRA */
950                         iii=i*rrd.stat_head->ds_cnt+ii;
951
952                         if (rrd.rra_def[i].pdp_cnt > 1) {
953                           
954                            if (rra_step_cnt[i] > 0) {
955                            /* If we are in this block, as least 1 CDP value will be written to
956                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
957                                 * to be written, then the "fill in" value is the CDP_secondary_val
958                                 * entry. */
959                                   if (isnan(pdp_temp[ii]))
960                   {
961                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
962                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
963                                   } else {
964                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
965                                           * CDP data entries. No matter the CF, the value is the same because
966                                           * the average, max, min, and last of a list of identical values is
967                                           * the same, namely, the value itself. */
968                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
969                                   }
970                      
971                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
972                                       > rrd.rra_def[i].pdp_cnt*
973                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
974                                   {
975                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
976                                          /* initialize carry over */
977                                          if (current_cf == CF_AVERAGE) {
978                                                    if (isnan(pdp_temp[ii])) { 
979                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
980                                                    } else {
981                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
982                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
983                                                    }
984                                          } else {
985                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
986                                          }
987                                   } else {
988                                          rrd_value_t cum_val, cur_val; 
989                                      switch (current_cf) {
990                                                 case CF_AVERAGE:
991                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
992                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
993                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
994                                                (cum_val + cur_val * start_pdp_offset) /
995                                            (rrd.rra_def[i].pdp_cnt
996                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
997                                                    /* initialize carry over value */
998                                                    if (isnan(pdp_temp[ii])) { 
999                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1000                                                    } else {
1001                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003                                                    }
1004                                                    break;
1005                                                 case CF_MAXIMUM:
1006                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1007                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1008 #ifdef DEBUG
1009                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1010                                                           isnan(pdp_temp[ii])) {
1011                                                      fprintf(stderr,
1012                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1013                                                                 i,ii);
1014                                                          exit(-1);
1015                                                   }
1016 #endif
1017                                                   if (cur_val > cum_val)
1018                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1019                                                   else
1020                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1021                                                   /* initialize carry over value */
1022                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1023                                                   break;
1024                                                 case CF_MINIMUM:
1025                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1026                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1027 #ifdef DEBUG
1028                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1029                                                           isnan(pdp_temp[ii])) {
1030                                                      fprintf(stderr,
1031                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1032                                                                 i,ii);
1033                                                          exit(-1);
1034                                                   }
1035 #endif
1036                                                   if (cur_val < cum_val)
1037                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1038                                                   else
1039                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1040                                                   /* initialize carry over value */
1041                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042                                                   break;
1043                                                 case CF_LAST:
1044                                                 default:
1045                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1046                                                    /* initialize carry over value */
1047                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1048                                                 break;
1049                                          }
1050                                   } /* endif meets xff value requirement for a valid value */
1051                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1052                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1053                                   if (isnan(pdp_temp[ii]))
1054                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1055                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1056                                   else
1057                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1058                } else  /* rra_step_cnt[i]  == 0 */
1059                            {
1060 #ifdef DEBUG
1061                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1062                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1063                                          i,ii);
1064                                   } else {
1065                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1066                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1067                                   }
1068 #endif
1069                                   if (isnan(pdp_temp[ii])) {
1070                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1071                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1072                                   {
1073                                          if (current_cf == CF_AVERAGE) {
1074                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1075                                                    elapsed_pdp_st;
1076                                          } else {
1077                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078                                          }
1079 #ifdef DEBUG
1080                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1081                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1082 #endif
1083                                   } else {
1084                                          switch (current_cf) {
1085                                          case CF_AVERAGE:
1086                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1087                                                    elapsed_pdp_st;
1088                                                 break;
1089                                          case CF_MINIMUM:
1090                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1091                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1092                                                 break; 
1093                                          case CF_MAXIMUM:
1094                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096                                                 break; 
1097                                          case CF_LAST:
1098                                          default:
1099                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100                                                 break;
1101                                          }
1102                                   }
1103                            }
1104                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1105                            if (elapsed_pdp_st > 2)
1106                            {
1107                                    switch (current_cf) {
1108                                    case CF_AVERAGE:
1109                                    default:
1110                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1111                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1112                                           break;
1113                    case CF_SEASONAL:
1114                                    case CF_DEVSEASONAL:
1115                                           /* need to update cached seasonal values, so they are consistent
1116                                            * with the bulk update */
1117                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1118                                            * CDP_last_deviation are the same. */
1119                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1120                                                  last_seasonal_coef[ii];
1121                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1122                                                  seasonal_coef[ii];
1123                                           break;
1124                    case CF_HWPREDICT:
1125                                           /* need to update the null_count and last_null_count.
1126                                            * even do this for non-DNAN pdp_temp because the
1127                                            * algorithm is not learning from batch updates. */
1128                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1129                                                  elapsed_pdp_st;
1130                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1131                                                  elapsed_pdp_st - 1;
1132                                           /* fall through */
1133                                    case CF_DEVPREDICT:
1134                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1135                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1136                                           break;
1137                    case CF_FAILURES:
1138                                           /* do not count missed bulk values as failures */
1139                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1140                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1141                                           /* need to reset violations buffer.
1142                                            * could do this more carefully, but for now, just
1143                                            * assume a bulk update wipes away all violations. */
1144                       erase_violations(&rrd, iii, i);
1145                                           break;
1146                                    }
1147                            } 
1148                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1149
1150                         if (rrd_test_error()) break;
1151
1152                         } /* endif data sources loop */
1153         } /* end RRA Loop */
1154
1155                 /* this loop is only entered if elapsed_pdp_st < 3 */
1156                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1157                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1158                 {
1159                for(i = 0, rra_start = rra_begin;
1160                    i < rrd.stat_head->rra_cnt;
1161                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1162                    i++)
1163                    {
1164                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1165
1166                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1167                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1168                           {
1169                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1170                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1171                                 &seasonal_coef);
1172                  rra_current = ftell(rrd_file);
1173                           }
1174                           if (rrd_test_error()) break;
1175                       /* loop over data soures within each RRA */
1176                       for(ii = 0;
1177                           ii < rrd.stat_head->ds_cnt;
1178                           ii++)
1179                           {
1180                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1181                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1182                                     scratch_idx, seasonal_coef);
1183                           }
1184            } /* end RRA Loop */
1185                    if (rrd_test_error()) break;
1186             } /* end elapsed_pdp_st loop */
1187
1188                 if (rrd_test_error()) break;
1189
1190                 /* Ready to write to disk */
1191                 /* Move sequentially through the file, writing one RRA at a time.
1192                  * Note this architecture divorces the computation of CDP with
1193                  * flushing updated RRA entries to disk. */
1194             for(i = 0, rra_start = rra_begin;
1195                 i < rrd.stat_head->rra_cnt;
1196             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1197                 i++) {
1198                 /* is there anything to write for this RRA? If not, continue. */
1199         if (rra_step_cnt[i] == 0) continue;
1200
1201                 /* write the first row */
1202 #ifdef DEBUG
1203         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1204 #endif
1205             rrd.rra_ptr[i].cur_row++;
1206             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1207                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1208                 /* positition on the first row */
1209                 rra_pos_tmp = rra_start +
1210                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1211                 if(rra_pos_tmp != rra_current) {
1212 #ifndef HAVE_MMAP
1213                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1214                       rrd_set_error("seek error in rrd");
1215                       break;
1216                    }
1217 #endif
1218                    rra_current = rra_pos_tmp;
1219                 }
1220
1221 #ifdef DEBUG
1222             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1223 #endif
1224                 scratch_idx = CDP_primary_val;
1225                 if (pcdp_summary != NULL)
1226                 {
1227                    rra_time = (current_time - current_time 
1228                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1229                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1230                 }
1231 #ifdef HAVE_MMAP
1232                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1233                    pcdp_summary, &rra_time, rrd_mmaped_file);
1234 #else
1235                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1236                    pcdp_summary, &rra_time);
1237 #endif
1238                 if (rrd_test_error()) break;
1239
1240                 /* write other rows of the bulk update, if any */
1241                 scratch_idx = CDP_secondary_val;
1242                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1243                 {
1244                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1245                    {
1246 #ifdef DEBUG
1247               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1248                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1249 #endif
1250                           /* wrap */
1251                           rrd.rra_ptr[i].cur_row = 0;
1252                           /* seek back to beginning of current rra */
1253                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1254                           {
1255                          rrd_set_error("seek error in rrd");
1256                          break;
1257                           }
1258 #ifdef DEBUG
1259                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1260 #endif
1261                           rra_current = rra_start;
1262                    }
1263                    if (pcdp_summary != NULL)
1264                    {
1265                       rra_time = (current_time - current_time 
1266                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1267                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1268                    }
1269 #ifdef HAVE_MMAP
1270                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1271                       pcdp_summary, &rra_time, rrd_mmaped_file);
1272 #else
1273                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1274                       pcdp_summary, &rra_time);
1275 #endif
1276                 }
1277                 
1278                 if (rrd_test_error())
1279                   break;
1280                 } /* RRA LOOP */
1281
1282             /* break out of the argument parsing loop if error_string is set */
1283             if (rrd_test_error()){
1284                    free(step_start);
1285                    break;
1286             } 
1287             
1288         } /* endif a pdp_st has occurred */ 
1289         rrd.live_head->last_up = current_time;
1290         rrd.live_head->last_up_usec = current_time_usec; 
1291         free(step_start);
1292     } /* function argument loop */
1293
1294     if (seasonal_coef != NULL) free(seasonal_coef);
1295     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1296         if (rra_step_cnt != NULL) free(rra_step_cnt);
1297     rpnstack_free(&rpnstack);
1298
1299 #ifdef HAVE_MMAP
1300     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1301             rrd_set_error("error writing(unmapping) file: %s", filename);
1302     }
1303 #endif    
1304     /* if we got here and if there is an error and if the file has not been
1305      * written to, then close things up and return. */
1306     if (rrd_test_error()) {
1307         free(updvals);
1308         free(tmpl_idx);
1309         rrd_free(&rrd);
1310         free(pdp_temp);
1311         free(pdp_new);
1312         fclose(rrd_file);
1313         return(-1);
1314     }
1315
1316     /* aargh ... that was tough ... so many loops ... anyway, its done.
1317      * we just need to write back the live header portion now*/
1318
1319     if (fseek(rrd_file, (sizeof(stat_head_t)
1320                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1321                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1322               SEEK_SET) != 0) {
1323         rrd_set_error("seek rrd for live header writeback");
1324         free(updvals);
1325         free(tmpl_idx);
1326         rrd_free(&rrd);
1327         free(pdp_temp);
1328         free(pdp_new);
1329         fclose(rrd_file);
1330         return(-1);
1331     }
1332
1333     if(version >= 3) {
1334             if(fwrite( rrd.live_head,
1335                        sizeof(live_head_t), 1, rrd_file) != 1){
1336                 rrd_set_error("fwrite live_head to rrd");
1337                 free(updvals);
1338                 rrd_free(&rrd);
1339                 free(tmpl_idx);
1340                 free(pdp_temp);
1341                 free(pdp_new);
1342                 fclose(rrd_file);
1343                 return(-1);
1344             }
1345     }
1346     else {
1347             if(fwrite( &rrd.live_head->last_up,
1348                        sizeof(time_t), 1, rrd_file) != 1){
1349                 rrd_set_error("fwrite live_head to rrd");
1350                 free(updvals);
1351                 rrd_free(&rrd);
1352                 free(tmpl_idx);
1353                 free(pdp_temp);
1354                 free(pdp_new);
1355                 fclose(rrd_file);
1356                 return(-1);
1357             }
1358     }
1359             
1360
1361     if(fwrite( rrd.pdp_prep,
1362                sizeof(pdp_prep_t),
1363                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1364         rrd_set_error("ftwrite pdp_prep to rrd");
1365         free(updvals);
1366         rrd_free(&rrd);
1367         free(tmpl_idx);
1368         free(pdp_temp);
1369         free(pdp_new);
1370         fclose(rrd_file);
1371         return(-1);
1372     }
1373
1374     if(fwrite( rrd.cdp_prep,
1375                sizeof(cdp_prep_t),
1376                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1377        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1378
1379         rrd_set_error("ftwrite cdp_prep to rrd");
1380         free(updvals);
1381         free(tmpl_idx);
1382         rrd_free(&rrd);
1383         free(pdp_temp);
1384         free(pdp_new);
1385         fclose(rrd_file);
1386         return(-1);
1387     }
1388
1389     if(fwrite( rrd.rra_ptr,
1390                sizeof(rra_ptr_t), 
1391                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1392         rrd_set_error("fwrite rra_ptr to rrd");
1393         free(updvals);
1394         free(tmpl_idx);
1395         rrd_free(&rrd);
1396         free(pdp_temp);
1397         free(pdp_new);
1398         fclose(rrd_file);
1399         return(-1);
1400     }
1401 #ifdef POSIX_FADVISE
1402
1403     /* with update we have write ops, so they will probably not be done by now, this means
1404        the buffers will not get freed. But calling this for the whole file - header
1405        will let the data off the hook as soon as it is written when if it is from a previous
1406        update cycle. Calling fdsync to force things is much too hard here. */
1407
1408     if (0 != posix_fadvise(fileno(in_file), rra_begin, 0, POSIX_FADV_DONTNEED)) {
1409          rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",file_name, rrd_strerror(errno));
1410          fclose(in_file);
1411          return(-1);
1412     } 
1413 #endif
1414
1415     /* OK now close the files and free the memory */
1416     if(fclose(rrd_file) != 0){
1417         rrd_set_error("closing rrd");
1418         free(updvals);
1419         free(tmpl_idx);
1420         rrd_free(&rrd);
1421         free(pdp_temp);
1422         free(pdp_new);
1423         return(-1);
1424     }
1425
1426     /* calling the smoothing code here guarantees at most
1427          * one smoothing operation per rrd_update call. Unfortunately,
1428          * it is possible with bulk updates, or a long-delayed update
1429          * for smoothing to occur off-schedule. This really isn't
1430          * critical except during the burning cycles. */
1431         if (schedule_smooth)
1432         {
1433           rrd_file = fopen(filename,"rb+");
1434           
1435
1436           rra_start = rra_begin;
1437           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1438           {
1439             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1440                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1441             {
1442 #ifdef DEBUG
1443               fprintf(stderr,"Running smoother for rra %ld\n",i);
1444 #endif
1445               apply_smoother(&rrd,i,rra_start,rrd_file);
1446               if (rrd_test_error())
1447                 break;
1448             }
1449             rra_start += rrd.rra_def[i].row_cnt
1450               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1451           }
1452 #ifdef POSIX_FADVISE
1453           /* same procedure as above ... */
1454           if (0 != posix_fadvise(fileno(in_file), rrd_head_size, 0, POSIX_FADV_DONTNEED)) {
1455              rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",file_name, rrd_strerror(errno));
1456              fclose(in_file);
1457              return(-1);
1458           } 
1459 #endif
1460           fclose(rrd_file);
1461         }
1462     rrd_free(&rrd);
1463     free(updvals);
1464     free(tmpl_idx);
1465     free(pdp_new);
1466     free(pdp_temp);
1467     return(0);
1468 }
1469
1470 /*
1471  * get exclusive lock to whole file.
1472  * lock gets removed when we close the file
1473  *
1474  * returns 0 on success
1475  */
1476 int
1477 LockRRD(FILE *rrdfile)
1478 {
1479     int rrd_fd;         /* File descriptor for RRD */
1480     int rcstat;
1481
1482     rrd_fd = fileno(rrdfile);
1483
1484         {
1485 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1486     struct _stat st;
1487
1488     if ( _fstat( rrd_fd, &st ) == 0 ) {
1489             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1490     } else {
1491             rcstat = -1;
1492     }
1493 #else
1494     struct flock        lock;
1495     lock.l_type = F_WRLCK;    /* exclusive write lock */
1496     lock.l_len = 0;           /* whole file */
1497     lock.l_start = 0;         /* start of file */
1498     lock.l_whence = SEEK_SET;   /* end of file */
1499
1500     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1501 #endif
1502         }
1503
1504     return(rcstat);
1505 }
1506
1507
1508 #ifdef HAVE_MMAP
1509 info_t
1510 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1511                unsigned short CDP_scratch_idx, 
1512 #ifndef DEBUG
1513 FILE UNUSED(*rrd_file),
1514 #else
1515 FILE *rrd_file,
1516 #endif
1517                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1518 #else
1519 info_t
1520 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1521                unsigned short CDP_scratch_idx, FILE *rrd_file,
1522                    info_t *pcdp_summary, time_t *rra_time)
1523 #endif
1524 {
1525    unsigned long ds_idx, cdp_idx;
1526    infoval iv;
1527   
1528    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1529    {
1530       /* compute the cdp index */
1531       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1532 #ifdef DEBUG
1533           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1534              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1535              rrd -> rra_def[rra_idx].cf_nam);
1536 #endif 
1537       if (pcdp_summary != NULL)
1538           {
1539              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1540              /* append info to the return hash */
1541                  pcdp_summary = info_push(pcdp_summary,
1542                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1543                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1544                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1545          RD_I_VAL, iv);
1546           }
1547 #ifdef HAVE_MMAP
1548           memcpy((char *)rrd_mmaped_file + *rra_current,
1549                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1550                           sizeof(rrd_value_t));
1551 #else
1552           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1553                  sizeof(rrd_value_t),1,rrd_file) != 1)
1554           { 
1555              rrd_set_error("writing rrd");
1556              return 0;
1557           }
1558 #endif
1559           *rra_current += sizeof(rrd_value_t);
1560         }
1561         return (pcdp_summary);
1562 }