prepare for the release of rrdtool-1.2.16
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.16  Copyright by Tobi Oetiker, 1997-2006
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
41
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
46
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48
49         struct _timeb current_time;
50
51         _ftime(&current_time);
52
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
55
56         return 0;
57 }
58
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
71
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
92                                         info_t*);
93
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
95
96
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99     char             *tmplt = NULL;          
100         info_t *result = NULL;
101         infoval rc;
102       rc.u_int = -1;
103     optind = 0; opterr = 0;  /* initialize getopt */
104
105     while (1) {
106                 static struct option long_options[] =
107                         {
108                                 {"template",      required_argument, 0, 't'},
109                                 {0,0,0,0}
110                         };
111                 int option_index = 0;
112                 int opt;
113                 opt = getopt_long(argc, argv, "t:", 
114                                                   long_options, &option_index);
115                 
116                 if (opt == EOF)
117                         break;
118                 
119                 switch(opt) {
120                 case 't':
121                         tmplt = optarg;
122                         break;
123                 
124                 case '?':
125                         rrd_set_error("unknown option '%s'",argv[optind-1]);
126                         goto end_tag;
127                 }
128     }
129
130     /* need at least 2 arguments: filename, data. */
131     if (argc-optind < 2) {
132                 rrd_set_error("Not enough arguments");
133                 goto end_tag;
134     }
135     rc.u_int = 0;
136     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137         rc.u_int = _rrd_update(argv[optind], tmplt,
138                       argc - optind - 1, argv + optind + 1, result);
139     result->value.u_int = rc.u_int;
140 end_tag:
141     return result;
142 }
143
144 int
145 rrd_update(int argc, char **argv)
146 {
147     char             *tmplt = NULL;          
148     int rc;
149     optind = 0; opterr = 0;  /* initialize getopt */
150
151     while (1) {
152                 static struct option long_options[] =
153                         {
154                                 {"template",      required_argument, 0, 't'},
155                                 {0,0,0,0}
156                         };
157                 int option_index = 0;
158                 int opt;
159                 opt = getopt_long(argc, argv, "t:", 
160                                                   long_options, &option_index);
161                 
162                 if (opt == EOF)
163                         break;
164                 
165                 switch(opt) {
166                 case 't':
167                         tmplt = optarg;
168                         break;
169                 
170                 case '?':
171                         rrd_set_error("unknown option '%s'",argv[optind-1]);
172                         return(-1);
173                 }
174     }
175
176     /* need at least 2 arguments: filename, data. */
177     if (argc-optind < 2) {
178                 rrd_set_error("Not enough arguments");
179
180                 return -1;
181     }
182  
183         rc = rrd_update_r(argv[optind], tmplt,
184                       argc - optind - 1, argv + optind + 1);
185     return rc;
186 }
187
188 int
189 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
190 {
191    return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
193
194 int
195 _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
196    info_t *pcdp_summary)
197 {
198
199     int              arg_i = 2;
200     short            j;
201     unsigned long    i,ii,iii=1;
202
203     unsigned long    rra_begin;          /* byte pointer to the rra
204                                           * area in the rrd file.  this
205                                           * pointer never changes value */
206     unsigned long    rra_start;          /* byte pointer to the rra
207                                           * area in the rrd file.  this
208                                           * pointer changes as each rrd is
209                                           * processed. */
210     unsigned long    rra_current;        /* byte pointer to the current write
211                                           * spot in the rrd file. */
212     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
213     double           interval,
214                      pre_int,post_int;   /* interval between this and
215                                           * the last run */
216     unsigned long    proc_pdp_st;        /* which pdp_st was the last
217                                           * to be processed */
218     unsigned long    occu_pdp_st;        /* when was the pdp_st
219                                           * before the last update
220                                           * time */
221     unsigned long    proc_pdp_age;       /* how old was the data in
222                                           * the pdp prep area when it
223                                           * was last updated */
224     unsigned long    occu_pdp_age;       /* how long ago was the last
225                                           * pdp_step time */
226     rrd_value_t      *pdp_new;           /* prepare the incoming data
227                                           * to be added the the
228                                           * existing entry */
229     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
230                                           * to be added the the
231                                           * cdp values */
232
233     long             *tmpl_idx;          /* index representing the settings
234                                             transported by the tmplt index */
235     unsigned long    tmpl_cnt = 2;       /* time and data */
236
237     FILE             *rrd_file;
238     rrd_t            rrd;
239     time_t           current_time = 0;
240     time_t           rra_time = 0;       /* time of update for a RRA */
241     unsigned long    current_time_usec=0;/* microseconds part of current time */
242     struct timeval   tmp_time;           /* used for time conversion */
243
244     char             **updvals;
245     int              schedule_smooth = 0;
246         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247                                          /* a vector of future Holt-Winters seasonal coefs */
248     unsigned long    elapsed_pdp_st;
249                                          /* number of elapsed PDP steps since last update */
250     unsigned long    *rra_step_cnt = NULL;
251                                          /* number of rows to be updated in an RRA for a data
252                                           * value. */
253     unsigned long    start_pdp_offset;
254                                          /* number of PDP steps since the last update that
255                                           * are assigned to the first CDP to be generated
256                                           * since the last update. */
257     unsigned short   scratch_idx;
258                                          /* index into the CDP scratch array */
259     enum cf_en       current_cf;
260                                          /* numeric id of the current consolidation function */
261     rpnstack_t       rpnstack; /* used for COMPUTE DS */
262     int              version;  /* rrd version */
263     char             *endptr; /* used in the conversion */
264 #ifdef HAVE_MMAP
265     void             *rrd_mmaped_file;
266     unsigned long    rrd_filesize;
267 #endif
268
269
270     rpnstack_init(&rpnstack);
271
272     /* need at least 1 arguments: data. */
273     if (argc < 1) {
274         rrd_set_error("Not enough arguments");
275         return -1;
276     }
277     
278     
279
280     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
281         return -1;
282     }
283     /* initialize time */
284     version = atoi(rrd.stat_head->version);
285     gettimeofday(&tmp_time, 0);
286     normalize_time(&tmp_time);
287     current_time = tmp_time.tv_sec;
288     if(version >= 3) {
289         current_time_usec = tmp_time.tv_usec;
290     }
291     else {
292         current_time_usec = 0;
293     }
294
295     rra_current = rra_start = rra_begin = ftell(rrd_file);
296     /* This is defined in the ANSI C standard, section 7.9.5.3:
297
298         When a file is opened with udpate mode ('+' as the second
299         or third character in the ... list of mode argument
300         variables), both input and ouptut may be performed on the
301         associated stream.  However, ...  input may not be directly
302         followed by output without an intervening call to a file
303         positioning function, unless the input oepration encounters
304         end-of-file. */
305 #ifdef HAVE_MMAP
306     fseek(rrd_file, 0, SEEK_END);
307     rrd_filesize = ftell(rrd_file);
308     fseek(rrd_file, rra_current, SEEK_SET);
309 #else
310     fseek(rrd_file, 0, SEEK_CUR);
311 #endif
312
313     
314     /* get exclusive lock to whole file.
315      * lock gets removed when we close the file.
316      */
317     if (LockRRD(rrd_file) != 0) {
318       rrd_set_error("could not lock RRD");
319       rrd_free(&rrd);
320       fclose(rrd_file);
321       return(-1);   
322     } 
323
324     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
325         rrd_set_error("allocating updvals pointer array");
326         rrd_free(&rrd);
327         fclose(rrd_file);
328         return(-1);
329     }
330
331     if ((pdp_temp = malloc(sizeof(rrd_value_t)
332                            *rrd.stat_head->ds_cnt))==NULL){
333         rrd_set_error("allocating pdp_temp ...");
334         free(updvals);
335         rrd_free(&rrd);
336         fclose(rrd_file);
337         return(-1);
338     }
339
340     if ((tmpl_idx = malloc(sizeof(unsigned long)
341                            *(rrd.stat_head->ds_cnt+1)))==NULL){
342         rrd_set_error("allocating tmpl_idx ...");
343         free(pdp_temp);
344         free(updvals);
345         rrd_free(&rrd);
346         fclose(rrd_file);
347         return(-1);
348     }
349     /* initialize tmplt redirector */
350     /* default config example (assume DS 1 is a CDEF DS)
351        tmpl_idx[0] -> 0; (time)
352        tmpl_idx[1] -> 1; (DS 0)
353        tmpl_idx[2] -> 3; (DS 2)
354        tmpl_idx[3] -> 4; (DS 3) */
355     tmpl_idx[0] = 0; /* time */
356     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
357         {
358            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
359               tmpl_idx[ii++]=i;
360         }
361     tmpl_cnt= ii;
362
363     if (tmplt) {
364         /* we should work on a writeable copy here */
365         char *dsname;
366         unsigned int tmpl_len;
367         tmplt = strdup(tmplt);
368         dsname = tmplt;
369         tmpl_cnt = 1; /* the first entry is the time */
370         tmpl_len = strlen(tmplt);
371         for(i=0;i<=tmpl_len ;i++) {
372             if (tmplt[i] == ':' || tmplt[i] == '\0') {
373                 tmplt[i] = '\0';
374                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
375                     rrd_set_error("tmplt contains more DS definitions than RRD");
376                     free(updvals); free(pdp_temp);
377                     free(tmpl_idx); rrd_free(&rrd);
378                     fclose(rrd_file); return(-1);
379                 }
380                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
381                     rrd_set_error("unknown DS name '%s'",dsname);
382                     free(updvals); free(pdp_temp);
383                     free(tmplt);
384                     free(tmpl_idx); rrd_free(&rrd);
385                     fclose(rrd_file); return(-1);
386                 } else {
387                   /* the first element is always the time */
388                   tmpl_idx[tmpl_cnt-1]++; 
389                   /* go to the next entry on the tmplt */
390                   dsname = &tmplt[i+1];
391                   /* fix the damage we did before */
392                   if (i<tmpl_len) {
393                      tmplt[i]=':';
394                   } 
395
396                 }
397             }       
398         }
399         free(tmplt);
400     }
401     if ((pdp_new = malloc(sizeof(rrd_value_t)
402                           *rrd.stat_head->ds_cnt))==NULL){
403         rrd_set_error("allocating pdp_new ...");
404         free(updvals);
405         free(pdp_temp);
406         free(tmpl_idx);
407         rrd_free(&rrd);
408         fclose(rrd_file);
409         return(-1);
410     }
411
412 #ifdef HAVE_MMAP
413     rrd_mmaped_file = mmap(0, 
414                         rrd_filesize, 
415                         PROT_READ | PROT_WRITE, 
416                         MAP_SHARED, 
417                         fileno(rrd_file), 
418                         0);
419     if (rrd_mmaped_file == MAP_FAILED) {
420         rrd_set_error("error mmapping file %s", filename);
421         free(updvals);
422         free(pdp_temp);
423         free(tmpl_idx);
424         rrd_free(&rrd);
425         fclose(rrd_file);
426         return(-1);
427     }
428 #endif
429     /* loop through the arguments. */
430     for(arg_i=0; arg_i<argc;arg_i++) {
431         char *stepper = strdup(argv[arg_i]);
432         char *step_start = stepper;
433         char *p;
434         char *parsetime_error = NULL;
435         enum {atstyle, normal} timesyntax;
436         struct rrd_time_value ds_tv;
437         if (stepper == NULL){
438                 rrd_set_error("failed duplication argv entry");
439                 free(step_start);
440                 free(updvals);
441                 free(pdp_temp);  
442                 free(tmpl_idx);
443                 rrd_free(&rrd);
444 #ifdef HAVE_MMAP
445                 munmap(rrd_mmaped_file, rrd_filesize);
446 #endif
447                 fclose(rrd_file);
448                 return(-1);
449          }
450         /* initialize all ds input to unknown except the first one
451            which has always got to be set */
452         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
453         updvals[0]=stepper;
454         /* separate all ds elements; first must be examined separately
455            due to alternate time syntax */
456         if ((p=strchr(stepper,'@'))!=NULL) {
457             timesyntax = atstyle;
458             *p = '\0';
459             stepper = p+1;
460         } else if ((p=strchr(stepper,':'))!=NULL) {
461             timesyntax = normal;
462             *p = '\0';
463             stepper = p+1;
464         } else {
465             rrd_set_error("expected timestamp not found in data source from %s",
466                           argv[arg_i]);
467             free(step_start);
468             break;
469         }
470         ii=1;
471         updvals[tmpl_idx[ii]] = stepper;
472         while (*stepper) {
473             if (*stepper == ':') {
474                 *stepper = '\0';
475                 ii++;
476                 if (ii<tmpl_cnt){                   
477                     updvals[tmpl_idx[ii]] = stepper+1;
478                 }
479             }
480             stepper++;
481         }
482
483         if (ii != tmpl_cnt-1) {
484             rrd_set_error("expected %lu data source readings (got %lu) from %s",
485                           tmpl_cnt-1, ii, argv[arg_i]);
486             free(step_start);
487             break;
488         }
489         
490         /* get the time from the reading ... handle N */
491         if (timesyntax == atstyle) {
492             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
493                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
494                 free(step_start);
495                 break;
496             }
497             if (ds_tv.type == RELATIVE_TO_END_TIME ||
498                 ds_tv.type == RELATIVE_TO_START_TIME) {
499                 rrd_set_error("specifying time relative to the 'start' "
500                               "or 'end' makes no sense here: %s",
501                               updvals[0]);
502                 free(step_start);
503                 break;
504             }
505
506             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
507             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
508             
509         } else if (strcmp(updvals[0],"N")==0){
510             gettimeofday(&tmp_time, 0);
511             normalize_time(&tmp_time);
512             current_time = tmp_time.tv_sec;
513             current_time_usec = tmp_time.tv_usec;
514         } else {
515             double tmp;
516             tmp = strtod(updvals[0], 0);
517             current_time = floor(tmp);
518             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
519         }
520         /* dont do any correction for old version RRDs */
521         if(version < 3) 
522             current_time_usec = 0;
523         
524         if(current_time < rrd.live_head->last_up || 
525           (current_time == rrd.live_head->last_up && 
526            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
527             rrd_set_error("illegal attempt to update using time %ld when "
528                           "last update time is %ld (minimum one second step)",
529                           current_time, rrd.live_head->last_up);
530             free(step_start);
531             break;
532         }
533         
534         
535         /* seek to the beginning of the rra's */
536         if (rra_current != rra_begin) {
537 #ifndef HAVE_MMAP
538             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
539                 rrd_set_error("seek error in rrd");
540                 free(step_start);
541                 break;
542             }
543 #endif
544             rra_current = rra_begin;
545         }
546         rra_start = rra_begin;
547
548         /* when was the current pdp started */
549         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
550         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
551
552         /* when did the last pdp_st occur */
553         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
554         occu_pdp_st = current_time - occu_pdp_age;
555
556         /* interval = current_time - rrd.live_head->last_up; */
557         interval    = (double)(current_time - rrd.live_head->last_up) 
558                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
559
560         if (occu_pdp_st > proc_pdp_st){
561             /* OK we passed the pdp_st moment*/
562             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
563                                                               * occurred before the latest
564                                                               * pdp_st moment*/
565             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
566             post_int = occu_pdp_age;                         /* how much after it */
567             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
568         } else {
569             pre_int = interval;
570             post_int = 0;
571         }
572
573 #ifdef DEBUG
574         printf(
575                "proc_pdp_age %lu\t"
576                "proc_pdp_st %lu\t" 
577                "occu_pfp_age %lu\t" 
578                "occu_pdp_st %lu\t"
579                "int %lf\t"
580                "pre_int %lf\t"
581                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
582                 occu_pdp_age, occu_pdp_st,
583                interval, pre_int, post_int);
584 #endif
585     
586         /* process the data sources and update the pdp_prep 
587          * area accordingly */
588         for(i=0;i<rrd.stat_head->ds_cnt;i++){
589             enum dst_en dst_idx;
590             dst_idx= dst_conv(rrd.ds_def[i].dst);
591
592             /* make sure we do not build diffs with old last_ds values */
593             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
594                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
595                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
596             }
597
598             /* NOTE: DST_CDEF should never enter this if block, because
599              * updvals[i+1][0] is initialized to 'U'; unless the caller
600              * accidently specified a value for the DST_CDEF. To handle 
601               * this case, an extra check is required. */
602
603             if((updvals[i+1][0] != 'U') &&
604                    (dst_idx != DST_CDEF) &&
605                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
606                double rate = DNAN;
607                /* the data source type defines how to process the data */
608                 /* pdp_new contains rate * time ... eg the bytes
609                  * transferred during the interval. Doing it this way saves
610                  * a lot of math operations */
611                 
612
613                 switch(dst_idx){
614                 case DST_COUNTER:
615                 case DST_DERIVE:
616                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
617                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
618                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
619                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
620                                  break;
621                             }
622                        }
623                        if (rrd_test_error()){
624                             break;
625                        }
626                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
627                        if(dst_idx == DST_COUNTER) {
628                           /* simple overflow catcher suggested by Andres Kroonmaa */
629                           /* this will fail terribly for non 32 or 64 bit counters ... */
630                           /* are there any others in SNMP land ? */
631                           if (pdp_new[i] < (double)0.0 ) 
632                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
633                           if (pdp_new[i] < (double)0.0 ) 
634                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
635                        }
636                        rate = pdp_new[i] / interval;
637                     }
638                    else {
639                      pdp_new[i]= DNAN;          
640                    }
641                    break;
642                 case DST_ABSOLUTE:
643                     errno = 0;
644                     pdp_new[i] = strtod(updvals[i+1],&endptr);
645                     if (errno > 0){
646                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
647                         break;
648                     };
649                     if (endptr[0] != '\0'){
650                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
651                         break;
652                     }
653                     rate = pdp_new[i] / interval;                 
654                     break;
655                 case DST_GAUGE:
656                     errno = 0;
657                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
658                     if (errno > 0){
659                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
660                         break;
661                     };
662                     if (endptr[0] != '\0'){
663                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
664                         break;
665                     }
666                     rate = pdp_new[i] / interval;                  
667                     break;
668                 default:
669                     rrd_set_error("rrd contains unknown DS type : '%s'",
670                                   rrd.ds_def[i].dst);
671                     break;
672                 }
673                 /* break out of this for loop if the error string is set */
674                 if (rrd_test_error()){
675                     break;
676                 }
677                /* make sure pdp_temp is neither too large or too small
678                 * if any of these occur it becomes unknown ...
679                 * sorry folks ... */
680                if ( ! isnan(rate) && 
681                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
682                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
683                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
684                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
685                   pdp_new[i] = DNAN;
686                }               
687             } else {
688                 /* no news is news all the same */
689                 pdp_new[i] = DNAN;
690             }
691
692             
693             /* make a copy of the command line argument for the next run */
694 #ifdef DEBUG
695             fprintf(stderr,
696                     "prep ds[%lu]\t"
697                     "last_arg '%s'\t"
698                     "this_arg '%s'\t"
699                     "pdp_new %10.2f\n",
700                     i,
701                     rrd.pdp_prep[i].last_ds,
702                     updvals[i+1], pdp_new[i]);
703 #endif
704             strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
705             rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
706         }
707         /* break out of the argument parsing loop if the error_string is set */
708         if (rrd_test_error()){
709             free(step_start);
710             break;
711         }
712         /* has a pdp_st moment occurred since the last run ? */
713
714         if (proc_pdp_st == occu_pdp_st){
715             /* no we have not passed a pdp_st moment. therefore update is simple */
716
717             for(i=0;i<rrd.stat_head->ds_cnt;i++){
718                 if(isnan(pdp_new[i])) {            
719                     /* this is not realy accurate if we use subsecond data arival time
720                        should have thought of it when going subsecond resolution ...
721                        sorry next format change we will have it! */
722                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
723                 } else {
724                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
725                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
726                      } else {
727                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
728                      }
729                 }
730 #ifdef DEBUG
731                 fprintf(stderr,
732                         "NO PDP  ds[%lu]\t"
733                         "value %10.2f\t"
734                         "unkn_sec %5lu\n",
735                         i,
736                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
737                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
738 #endif
739             }   
740         } else {
741             /* an pdp_st has occurred. */
742
743             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
744              * occurred up to the last run.        
745             pdp_new[] contains rate*seconds from the latest run.
746             pdp_temp[] will contain the rate for cdp */
747
748             for(i=0;i<rrd.stat_head->ds_cnt;i++){
749                 /* update pdp_prep to the current pdp_st. */
750                 double pre_unknown = 0.0;               
751                 if(isnan(pdp_new[i]))
752                     /* a final bit of unkonwn to be added bevore calculation
753                      * we use a tempaorary variable for this so that we 
754                      * don't have to turn integer lines before using the value */                
755                     pre_unknown = pre_int;
756                 else {
757                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
758                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
759                      } else {
760                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
761                      }
762                  }
763                 
764
765                 /* if too much of the pdp_prep is unknown we dump it */
766                 if ( 
767                     /* removed because this does not agree with the definition
768                        a heart beat can be unknown */
769                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
770                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
771                     /* if the interval is larger thatn mrhb we get NAN */
772                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
773                     (occu_pdp_st-proc_pdp_st <= 
774                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
775                     pdp_temp[i] = DNAN;
776                 } else {
777                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
778                         / ((double)(occu_pdp_st - proc_pdp_st
779                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
780                             -pre_unknown);
781                 }
782
783                 /* process CDEF data sources; remember each CDEF DS can
784                  * only reference other DS with a lower index number */
785             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
786                    rpnp_t *rpnp;
787                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
788                    /* substitue data values for OP_VARIABLE nodes */
789                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
790                    {
791                           if (rpnp[ii].op == OP_VARIABLE) {
792                                  rpnp[ii].op = OP_NUMBER;
793                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
794                           }
795                    }
796                    /* run the rpn calculator */
797                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
798                           free(rpnp);
799                           break; /* exits the data sources pdp_temp loop */
800                    }
801                 }
802         
803                 /* make pdp_prep ready for the next run */
804                 if(isnan(pdp_new[i])){
805                     /* this is not realy accurate if we use subsecond data arival time
806                        should have thought of it when going subsecond resolution ...
807                        sorry next format change we will have it! */
808                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
809                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
810                 } else {
811                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
812                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
813                         pdp_new[i]/interval*post_int;
814                 }
815
816 #ifdef DEBUG
817                 fprintf(stderr,
818                         "PDP UPD ds[%lu]\t"
819                         "pdp_temp %10.2f\t"
820                         "new_prep %10.2f\t"
821                         "new_unkn_sec %5lu\n",
822                         i, pdp_temp[i],
823                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
824                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
825 #endif
826             }
827
828                 /* if there were errors during the last loop, bail out here */
829             if (rrd_test_error()){
830                free(step_start);
831                break;
832             }
833
834                 /* compute the number of elapsed pdp_st moments */
835                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
836 #ifdef DEBUG
837                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
838 #endif
839                 if (rra_step_cnt == NULL)
840                 {
841                    rra_step_cnt = (unsigned long *) 
842                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
843                 }
844
845             for(i = 0, rra_start = rra_begin;
846                 i < rrd.stat_head->rra_cnt;
847             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
848                 i++)
849                 {
850                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
851                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
852                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
853         if (start_pdp_offset <= elapsed_pdp_st) {
854            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
855                       rrd.rra_def[i].pdp_cnt + 1;
856             } else {
857                    rra_step_cnt[i] = 0;
858                 }
859
860                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
861                 {
862                    /* If this is a bulk update, we need to skip ahead in the seasonal
863                         * arrays so that they will be correct for the next observed value;
864                         * note that for the bulk update itself, no update will occur to
865                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
866                         * be set to DNAN. */
867            if (rra_step_cnt[i] > 2) 
868                    {
869                           /* skip update by resetting rra_step_cnt[i],
870                            * note that this is not data source specific; this is due
871                            * to the bulk update, not a DNAN value for the specific data
872                            * source. */
873                           rra_step_cnt[i] = 0;
874               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
875                              &last_seasonal_coef);
876                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
877                              &seasonal_coef);
878                    }
879                 
880                   /* periodically run a smoother for seasonal effects */
881                   /* Need to use first cdp parameter buffer to track
882                    * burnin (burnin requires a specific smoothing schedule).
883                    * The CDP_init_seasonal parameter is really an RRA level,
884                    * not a data source within RRA level parameter, but the rra_def
885                    * is read only for rrd_update (not flushed to disk). */
886                   iii = i*(rrd.stat_head -> ds_cnt);
887                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
888                           <= BURNIN_CYCLES)
889                   {
890                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
891                                  > rrd.rra_def[i].row_cnt - 1) {
892                            /* mark off one of the burnin cycles */
893                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
894                        schedule_smooth = 1;
895                          }  
896                   } else {
897                          /* someone has no doubt invented a trick to deal with this
898                           * wrap around, but at least this code is clear. */
899                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
900                              rrd.rra_ptr[i].cur_row)
901                          {
902                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
903                                   * mapping between PDP and CDP */
904                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
905                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
906                                  {
907 #ifdef DEBUG
908                                         fprintf(stderr,
909                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
910                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
911                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
912 #endif
913                                         schedule_smooth = 1;
914                                  }
915              } else {
916                                  /* can't rely on negative numbers because we are working with
917                                   * unsigned values */
918                                  /* Don't need modulus here. If we've wrapped more than once, only
919                                   * one smooth is executed at the end. */
920                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
921                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
922                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
923                                  {
924 #ifdef DEBUG
925                                         fprintf(stderr,
926                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
927                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
928                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
929 #endif
930                                         schedule_smooth = 1;
931                                  }
932                          }
933                   }
934
935               rra_current = ftell(rrd_file); 
936                 } /* if cf is DEVSEASONAL or SEASONAL */
937
938         if (rrd_test_error()) break;
939
940                     /* update CDP_PREP areas */
941                     /* loop over data soures within each RRA */
942                     for(ii = 0;
943                         ii < rrd.stat_head->ds_cnt;
944                         ii++)
945                         {
946                         
947                         /* iii indexes the CDP prep area for this data source within the RRA */
948                         iii=i*rrd.stat_head->ds_cnt+ii;
949
950                         if (rrd.rra_def[i].pdp_cnt > 1) {
951                           
952                            if (rra_step_cnt[i] > 0) {
953                            /* If we are in this block, as least 1 CDP value will be written to
954                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
955                                 * to be written, then the "fill in" value is the CDP_secondary_val
956                                 * entry. */
957                                   if (isnan(pdp_temp[ii]))
958                   {
959                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
960                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
961                                   } else {
962                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
963                                           * CDP data entries. No matter the CF, the value is the same because
964                                           * the average, max, min, and last of a list of identical values is
965                                           * the same, namely, the value itself. */
966                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
967                                   }
968                      
969                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
970                                       > rrd.rra_def[i].pdp_cnt*
971                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
972                                   {
973                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
974                                          /* initialize carry over */
975                                          if (current_cf == CF_AVERAGE) {
976                                                    if (isnan(pdp_temp[ii])) { 
977                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
978                                                    } else {
979                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
980                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
981                                                    }
982                                          } else {
983                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
984                                          }
985                                   } else {
986                                          rrd_value_t cum_val, cur_val; 
987                                      switch (current_cf) {
988                                                 case CF_AVERAGE:
989                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
990                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
991                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
992                                                (cum_val + cur_val * start_pdp_offset) /
993                                            (rrd.rra_def[i].pdp_cnt
994                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
995                                                    /* initialize carry over value */
996                                                    if (isnan(pdp_temp[ii])) { 
997                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
998                                                    } else {
999                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1000                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1001                                                    }
1002                                                    break;
1003                                                 case CF_MAXIMUM:
1004                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1005                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1006 #ifdef DEBUG
1007                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1008                                                           isnan(pdp_temp[ii])) {
1009                                                      fprintf(stderr,
1010                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1011                                                                 i,ii);
1012                                                          exit(-1);
1013                                                   }
1014 #endif
1015                                                   if (cur_val > cum_val)
1016                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1017                                                   else
1018                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1019                                                   /* initialize carry over value */
1020                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1021                                                   break;
1022                                                 case CF_MINIMUM:
1023                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1024                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1025 #ifdef DEBUG
1026                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1027                                                           isnan(pdp_temp[ii])) {
1028                                                      fprintf(stderr,
1029                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1030                                                                 i,ii);
1031                                                          exit(-1);
1032                                                   }
1033 #endif
1034                                                   if (cur_val < cum_val)
1035                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1036                                                   else
1037                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1038                                                   /* initialize carry over value */
1039                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1040                                                   break;
1041                                                 case CF_LAST:
1042                                                 default:
1043                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1044                                                    /* initialize carry over value */
1045                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1046                                                 break;
1047                                          }
1048                                   } /* endif meets xff value requirement for a valid value */
1049                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1050                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1051                                   if (isnan(pdp_temp[ii]))
1052                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1053                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1054                                   else
1055                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1056                } else  /* rra_step_cnt[i]  == 0 */
1057                            {
1058 #ifdef DEBUG
1059                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1060                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1061                                          i,ii);
1062                                   } else {
1063                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1064                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1065                                   }
1066 #endif
1067                                   if (isnan(pdp_temp[ii])) {
1068                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1069                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1070                                   {
1071                                          if (current_cf == CF_AVERAGE) {
1072                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1073                                                    elapsed_pdp_st;
1074                                          } else {
1075                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1076                                          }
1077 #ifdef DEBUG
1078                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1079                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1080 #endif
1081                                   } else {
1082                                          switch (current_cf) {
1083                                          case CF_AVERAGE:
1084                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1085                                                    elapsed_pdp_st;
1086                                                 break;
1087                                          case CF_MINIMUM:
1088                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1089                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1090                                                 break; 
1091                                          case CF_MAXIMUM:
1092                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1093                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094                                                 break; 
1095                                          case CF_LAST:
1096                                          default:
1097                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098                                                 break;
1099                                          }
1100                                   }
1101                            }
1102                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1103                            if (elapsed_pdp_st > 2)
1104                            {
1105                                    switch (current_cf) {
1106                                    case CF_AVERAGE:
1107                                    default:
1108                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1109                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1110                                           break;
1111                    case CF_SEASONAL:
1112                                    case CF_DEVSEASONAL:
1113                                           /* need to update cached seasonal values, so they are consistent
1114                                            * with the bulk update */
1115                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1116                                            * CDP_last_deviation are the same. */
1117                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1118                                                  last_seasonal_coef[ii];
1119                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1120                                                  seasonal_coef[ii];
1121                                           break;
1122                    case CF_HWPREDICT:
1123                                           /* need to update the null_count and last_null_count.
1124                                            * even do this for non-DNAN pdp_temp because the
1125                                            * algorithm is not learning from batch updates. */
1126                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1127                                                  elapsed_pdp_st;
1128                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1129                                                  elapsed_pdp_st - 1;
1130                                           /* fall through */
1131                                    case CF_DEVPREDICT:
1132                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1133                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1134                                           break;
1135                    case CF_FAILURES:
1136                                           /* do not count missed bulk values as failures */
1137                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1138                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1139                                           /* need to reset violations buffer.
1140                                            * could do this more carefully, but for now, just
1141                                            * assume a bulk update wipes away all violations. */
1142                       erase_violations(&rrd, iii, i);
1143                                           break;
1144                                    }
1145                            } 
1146                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1147
1148                         if (rrd_test_error()) break;
1149
1150                         } /* endif data sources loop */
1151         } /* end RRA Loop */
1152
1153                 /* this loop is only entered if elapsed_pdp_st < 3 */
1154                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1155                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1156                 {
1157                for(i = 0, rra_start = rra_begin;
1158                    i < rrd.stat_head->rra_cnt;
1159                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1160                    i++)
1161                    {
1162                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1163
1164                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1165                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1166                           {
1167                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1168                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1169                                 &seasonal_coef);
1170                  rra_current = ftell(rrd_file);
1171                           }
1172                           if (rrd_test_error()) break;
1173                       /* loop over data soures within each RRA */
1174                       for(ii = 0;
1175                           ii < rrd.stat_head->ds_cnt;
1176                           ii++)
1177                           {
1178                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1179                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1180                                     scratch_idx, seasonal_coef);
1181                           }
1182            } /* end RRA Loop */
1183                    if (rrd_test_error()) break;
1184             } /* end elapsed_pdp_st loop */
1185
1186                 if (rrd_test_error()) break;
1187
1188                 /* Ready to write to disk */
1189                 /* Move sequentially through the file, writing one RRA at a time.
1190                  * Note this architecture divorces the computation of CDP with
1191                  * flushing updated RRA entries to disk. */
1192             for(i = 0, rra_start = rra_begin;
1193                 i < rrd.stat_head->rra_cnt;
1194             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1195                 i++) {
1196                 /* is there anything to write for this RRA? If not, continue. */
1197         if (rra_step_cnt[i] == 0) continue;
1198
1199                 /* write the first row */
1200 #ifdef DEBUG
1201         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1202 #endif
1203             rrd.rra_ptr[i].cur_row++;
1204             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1205                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1206                 /* positition on the first row */
1207                 rra_pos_tmp = rra_start +
1208                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1209                 if(rra_pos_tmp != rra_current) {
1210 #ifndef HAVE_MMAP
1211                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1212                       rrd_set_error("seek error in rrd");
1213                       break;
1214                    }
1215 #endif
1216                    rra_current = rra_pos_tmp;
1217                 }
1218
1219 #ifdef DEBUG
1220             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1221 #endif
1222                 scratch_idx = CDP_primary_val;
1223                 if (pcdp_summary != NULL)
1224                 {
1225                    rra_time = (current_time - current_time 
1226                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1227                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1228                 }
1229 #ifdef HAVE_MMAP
1230                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1231                    pcdp_summary, &rra_time, rrd_mmaped_file);
1232 #else
1233                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1234                    pcdp_summary, &rra_time);
1235 #endif
1236                 if (rrd_test_error()) break;
1237
1238                 /* write other rows of the bulk update, if any */
1239                 scratch_idx = CDP_secondary_val;
1240                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1241                 {
1242                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1243                    {
1244 #ifdef DEBUG
1245               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1246                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1247 #endif
1248                           /* wrap */
1249                           rrd.rra_ptr[i].cur_row = 0;
1250                           /* seek back to beginning of current rra */
1251                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1252                           {
1253                          rrd_set_error("seek error in rrd");
1254                          break;
1255                           }
1256 #ifdef DEBUG
1257                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1258 #endif
1259                           rra_current = rra_start;
1260                    }
1261                    if (pcdp_summary != NULL)
1262                    {
1263                       rra_time = (current_time - current_time 
1264                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1265                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1266                    }
1267 #ifdef HAVE_MMAP
1268                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1269                       pcdp_summary, &rra_time, rrd_mmaped_file);
1270 #else
1271                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1272                       pcdp_summary, &rra_time);
1273 #endif
1274                 }
1275                 
1276                 if (rrd_test_error())
1277                   break;
1278                 } /* RRA LOOP */
1279
1280             /* break out of the argument parsing loop if error_string is set */
1281             if (rrd_test_error()){
1282                    free(step_start);
1283                    break;
1284             } 
1285             
1286         } /* endif a pdp_st has occurred */ 
1287         rrd.live_head->last_up = current_time;
1288         rrd.live_head->last_up_usec = current_time_usec; 
1289         free(step_start);
1290     } /* function argument loop */
1291
1292     if (seasonal_coef != NULL) free(seasonal_coef);
1293     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1294         if (rra_step_cnt != NULL) free(rra_step_cnt);
1295     rpnstack_free(&rpnstack);
1296
1297 #ifdef HAVE_MMAP
1298     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1299             rrd_set_error("error writing(unmapping) file: %s", filename);
1300     }
1301 #endif    
1302     /* if we got here and if there is an error and if the file has not been
1303      * written to, then close things up and return. */
1304     if (rrd_test_error()) {
1305         free(updvals);
1306         free(tmpl_idx);
1307         rrd_free(&rrd);
1308         free(pdp_temp);
1309         free(pdp_new);
1310         fclose(rrd_file);
1311         return(-1);
1312     }
1313
1314     /* aargh ... that was tough ... so many loops ... anyway, its done.
1315      * we just need to write back the live header portion now*/
1316
1317     if (fseek(rrd_file, (sizeof(stat_head_t)
1318                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1319                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1320               SEEK_SET) != 0) {
1321         rrd_set_error("seek rrd for live header writeback");
1322         free(updvals);
1323         free(tmpl_idx);
1324         rrd_free(&rrd);
1325         free(pdp_temp);
1326         free(pdp_new);
1327         fclose(rrd_file);
1328         return(-1);
1329     }
1330
1331     if(version >= 3) {
1332             if(fwrite( rrd.live_head,
1333                        sizeof(live_head_t), 1, rrd_file) != 1){
1334                 rrd_set_error("fwrite live_head to rrd");
1335                 free(updvals);
1336                 rrd_free(&rrd);
1337                 free(tmpl_idx);
1338                 free(pdp_temp);
1339                 free(pdp_new);
1340                 fclose(rrd_file);
1341                 return(-1);
1342             }
1343     }
1344     else {
1345             if(fwrite( &rrd.live_head->last_up,
1346                        sizeof(time_t), 1, rrd_file) != 1){
1347                 rrd_set_error("fwrite live_head to rrd");
1348                 free(updvals);
1349                 rrd_free(&rrd);
1350                 free(tmpl_idx);
1351                 free(pdp_temp);
1352                 free(pdp_new);
1353                 fclose(rrd_file);
1354                 return(-1);
1355             }
1356     }
1357             
1358
1359     if(fwrite( rrd.pdp_prep,
1360                sizeof(pdp_prep_t),
1361                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1362         rrd_set_error("ftwrite pdp_prep to rrd");
1363         free(updvals);
1364         rrd_free(&rrd);
1365         free(tmpl_idx);
1366         free(pdp_temp);
1367         free(pdp_new);
1368         fclose(rrd_file);
1369         return(-1);
1370     }
1371
1372     if(fwrite( rrd.cdp_prep,
1373                sizeof(cdp_prep_t),
1374                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1375        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1376
1377         rrd_set_error("ftwrite cdp_prep to rrd");
1378         free(updvals);
1379         free(tmpl_idx);
1380         rrd_free(&rrd);
1381         free(pdp_temp);
1382         free(pdp_new);
1383         fclose(rrd_file);
1384         return(-1);
1385     }
1386
1387     if(fwrite( rrd.rra_ptr,
1388                sizeof(rra_ptr_t), 
1389                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1390         rrd_set_error("fwrite rra_ptr to rrd");
1391         free(updvals);
1392         free(tmpl_idx);
1393         rrd_free(&rrd);
1394         free(pdp_temp);
1395         free(pdp_new);
1396         fclose(rrd_file);
1397         return(-1);
1398     }
1399
1400     /* OK now close the files and free the memory */
1401     if(fclose(rrd_file) != 0){
1402         rrd_set_error("closing rrd");
1403         free(updvals);
1404         free(tmpl_idx);
1405         rrd_free(&rrd);
1406         free(pdp_temp);
1407         free(pdp_new);
1408         return(-1);
1409     }
1410
1411     /* calling the smoothing code here guarantees at most
1412          * one smoothing operation per rrd_update call. Unfortunately,
1413          * it is possible with bulk updates, or a long-delayed update
1414          * for smoothing to occur off-schedule. This really isn't
1415          * critical except during the burning cycles. */
1416         if (schedule_smooth)
1417         {
1418           rrd_file = fopen(filename,"rb+");
1419           rra_start = rra_begin;
1420           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1421           {
1422             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1423                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1424             {
1425 #ifdef DEBUG
1426               fprintf(stderr,"Running smoother for rra %ld\n",i);
1427 #endif
1428               apply_smoother(&rrd,i,rra_start,rrd_file);
1429               if (rrd_test_error())
1430                 break;
1431             }
1432             rra_start += rrd.rra_def[i].row_cnt
1433               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1434           }
1435           fclose(rrd_file);
1436         }
1437     rrd_free(&rrd);
1438     free(updvals);
1439     free(tmpl_idx);
1440     free(pdp_new);
1441     free(pdp_temp);
1442     return(0);
1443 }
1444
1445 /*
1446  * get exclusive lock to whole file.
1447  * lock gets removed when we close the file
1448  *
1449  * returns 0 on success
1450  */
1451 int
1452 LockRRD(FILE *rrdfile)
1453 {
1454     int rrd_fd;         /* File descriptor for RRD */
1455     int rcstat;
1456
1457     rrd_fd = fileno(rrdfile);
1458
1459         {
1460 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1461     struct _stat st;
1462
1463     if ( _fstat( rrd_fd, &st ) == 0 ) {
1464             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1465     } else {
1466             rcstat = -1;
1467     }
1468 #else
1469     struct flock        lock;
1470     lock.l_type = F_WRLCK;    /* exclusive write lock */
1471     lock.l_len = 0;           /* whole file */
1472     lock.l_start = 0;         /* start of file */
1473     lock.l_whence = SEEK_SET;   /* end of file */
1474
1475     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1476 #endif
1477         }
1478
1479     return(rcstat);
1480 }
1481
1482
1483 #ifdef HAVE_MMAP
1484 info_t
1485 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1486                unsigned short CDP_scratch_idx, 
1487 #ifndef DEBUG
1488 FILE UNUSED(*rrd_file),
1489 #else
1490 FILE *rrd_file,
1491 #endif
1492                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1493 #else
1494 info_t
1495 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1496                unsigned short CDP_scratch_idx, FILE *rrd_file,
1497                    info_t *pcdp_summary, time_t *rra_time)
1498 #endif
1499 {
1500    unsigned long ds_idx, cdp_idx;
1501    infoval iv;
1502   
1503    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1504    {
1505       /* compute the cdp index */
1506       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1507 #ifdef DEBUG
1508           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1509              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1510              rrd -> rra_def[rra_idx].cf_nam);
1511 #endif 
1512       if (pcdp_summary != NULL)
1513           {
1514              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1515              /* append info to the return hash */
1516                  pcdp_summary = info_push(pcdp_summary,
1517                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1518                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1519                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1520          RD_I_VAL, iv);
1521           }
1522 #ifdef HAVE_MMAP
1523           memcpy((char *)rrd_mmaped_file + *rra_current,
1524                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1525                           sizeof(rrd_value_t));
1526 #else
1527           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1528                  sizeof(rrd_value_t),1,rrd_file) != 1)
1529           { 
1530              rrd_set_error("writing rrd");
1531              return 0;
1532           }
1533 #endif
1534           *rra_current += sizeof(rrd_value_t);
1535         }
1536         return (pcdp_summary);
1537 }