started updating for 1.2 release
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2rc6  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26
27 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
28 /*
29  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
30  * replacement.
31  */
32 #include <sys/timeb.h>
33
34 struct timeval {
35         time_t tv_sec; /* seconds */
36         long tv_usec;  /* microseconds */
37 };
38
39 struct __timezone {
40         int  tz_minuteswest; /* minutes W of Greenwich */
41         int  tz_dsttime;     /* type of dst correction */
42 };
43
44 static gettimeofday(struct timeval *t, struct __timezone *tz) {
45         
46         struct timeb current_time;
47
48         _ftime(&current_time);
49         
50         t->tv_sec  = current_time.time;
51         t->tv_usec = current_time.millitm * 1000;
52 }
53
54 #endif
55 /*
56  * normilize time as returned by gettimeofday. usec part must
57  * be always >= 0
58  */
59 static void normalize_time(struct timeval *t)
60 {
61         if(t->tv_usec < 0) {
62                 t->tv_sec--;
63                 t->tv_usec += 1000000L;
64         }
65 }
66
67 /* Local prototypes */
68 int LockRRD(FILE *rrd_file);
69 #ifdef HAVE_MMAP
70 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
71                                         unsigned long *rra_current,
72                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
73                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
74 #else
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
78                                         info_t *pcdp_summary, time_t *rra_time);
79 #endif
80 int rrd_update_r(char *filename, char *template, int argc, char **argv);
81 int _rrd_update(char *filename, char *template, int argc, char **argv, 
82                                         info_t*);
83
84 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
85
86
87 #ifdef STANDALONE
88 int 
89 main(int argc, char **argv){
90         rrd_update(argc,argv);
91         if (rrd_test_error()) {
92                 printf("RRDtool 1.2rc6  Copyright by Tobi Oetiker, 1997-2005\n\n"
93                         "Usage: rrdupdate filename\n"
94                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
95                         "\t\t\ttime|N:value[:value...]\n\n"
96                         "\t\t\tat-time@value[:value...]\n\n"
97                         "\t\t\t[ time:value[:value...] ..]\n\n");
98                                    
99                 printf("ERROR: %s\n",rrd_get_error());
100                 rrd_clear_error();                                                            
101                 return 1;
102         }
103         return 0;
104 }
105 #endif
106
107 info_t *rrd_update_v(int argc, char **argv)
108 {
109     char             *template = NULL;          
110         info_t *result = NULL;
111         infoval rc;
112
113     while (1) {
114                 static struct option long_options[] =
115                         {
116                                 {"template",      required_argument, 0, 't'},
117                                 {0,0,0,0}
118                         };
119                 int option_index = 0;
120                 int opt;
121                 opt = getopt_long(argc, argv, "t:", 
122                                                   long_options, &option_index);
123                 
124                 if (opt == EOF)
125                         break;
126                 
127                 switch(opt) {
128                 case 't':
129                         template = optarg;
130                         break;
131                 
132                 case '?':
133                         rrd_set_error("unknown option '%s'",argv[optind-1]);
134             rc.u_int = -1;
135                         goto end_tag;
136                 }
137     }
138
139     /* need at least 2 arguments: filename, data. */
140     if (argc-optind < 2) {
141                 rrd_set_error("Not enough arguments");
142         rc.u_int = -1;
143                 goto end_tag;
144     }
145     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
146         rc.u_int = _rrd_update(argv[optind], template,
147                       argc - optind - 1, argv + optind + 1, result);
148     result->value.u_int = rc.u_int;
149 end_tag:
150     return result;
151 }
152
153 int
154 rrd_update(int argc, char **argv)
155 {
156     char             *template = NULL;          
157     int rc;
158
159     while (1) {
160                 static struct option long_options[] =
161                         {
162                                 {"template",      required_argument, 0, 't'},
163                                 {0,0,0,0}
164                         };
165                 int option_index = 0;
166                 int opt;
167                 opt = getopt_long(argc, argv, "t:", 
168                                                   long_options, &option_index);
169                 
170                 if (opt == EOF)
171                         break;
172                 
173                 switch(opt) {
174                 case 't':
175                         template = optarg;
176                         break;
177                 
178                 case '?':
179                         rrd_set_error("unknown option '%s'",argv[optind-1]);
180                         return(-1);
181                 }
182     }
183
184     /* need at least 2 arguments: filename, data. */
185     if (argc-optind < 2) {
186                 rrd_set_error("Not enough arguments");
187
188                 return -1;
189     }
190  
191         rc = rrd_update_r(argv[optind], template,
192                       argc - optind - 1, argv + optind + 1);
193     return rc;
194 }
195
196 int
197 rrd_update_r(char *filename, char *template, int argc, char **argv)
198 {
199    return _rrd_update(filename, template, argc, argv, NULL);
200 }
201
202 int
203 _rrd_update(char *filename, char *template, int argc, char **argv, 
204    info_t *pcdp_summary)
205 {
206
207     int              arg_i = 2;
208     short            j;
209     unsigned long    i,ii,iii=1;
210
211     unsigned long    rra_begin;          /* byte pointer to the rra
212                                           * area in the rrd file.  this
213                                           * pointer never changes value */
214     unsigned long    rra_start;          /* byte pointer to the rra
215                                           * area in the rrd file.  this
216                                           * pointer changes as each rrd is
217                                           * processed. */
218     unsigned long    rra_current;        /* byte pointer to the current write
219                                           * spot in the rrd file. */
220     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
221     double           interval,
222         pre_int,post_int;                /* interval between this and
223                                           * the last run */
224     unsigned long    proc_pdp_st;        /* which pdp_st was the last
225                                           * to be processed */
226     unsigned long    occu_pdp_st;        /* when was the pdp_st
227                                           * before the last update
228                                           * time */
229     unsigned long    proc_pdp_age;       /* how old was the data in
230                                           * the pdp prep area when it
231                                           * was last updated */
232     unsigned long    occu_pdp_age;       /* how long ago was the last
233                                           * pdp_step time */
234     rrd_value_t      *pdp_new;           /* prepare the incoming data
235                                           * to be added the the
236                                           * existing entry */
237     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
238                                           * to be added the the
239                                           * cdp values */
240
241     long             *tmpl_idx;          /* index representing the settings
242                                             transported by the template index */
243     unsigned long    tmpl_cnt = 2;       /* time and data */
244
245     FILE             *rrd_file;
246     rrd_t            rrd;
247     time_t           current_time;
248         time_t           rra_time; /* time of update for a RRA */
249     unsigned long    current_time_usec;  /* microseconds part of current time */
250     struct timeval   tmp_time;           /* used for time conversion */
251
252     char             **updvals;
253     int              schedule_smooth = 0;
254         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
255                                          /* a vector of future Holt-Winters seasonal coefs */
256     unsigned long    elapsed_pdp_st;
257                                          /* number of elapsed PDP steps since last update */
258     unsigned long    *rra_step_cnt = NULL;
259                                          /* number of rows to be updated in an RRA for a data
260                                           * value. */
261     unsigned long    start_pdp_offset;
262                                          /* number of PDP steps since the last update that
263                                           * are assigned to the first CDP to be generated
264                                           * since the last update. */
265     unsigned short   scratch_idx;
266                                          /* index into the CDP scratch array */
267     enum cf_en       current_cf;
268                                          /* numeric id of the current consolidation function */
269     rpnstack_t       rpnstack; /* used for COMPUTE DS */
270     int              version;  /* rrd version */
271     char             *endptr; /* used in the conversion */
272 #ifdef HAVE_MMAP
273     void             *rrd_mmaped_file;
274     unsigned long    rrd_filesize;
275 #endif
276
277     rpnstack_init(&rpnstack);
278
279     /* need at least 1 arguments: data. */
280     if (argc < 1) {
281         rrd_set_error("Not enough arguments");
282         return -1;
283     }
284     
285     
286
287     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
288         return -1;
289     }
290     /* initialize time */
291     version = atoi(rrd.stat_head->version);
292     gettimeofday(&tmp_time, 0);
293     normalize_time(&tmp_time);
294     current_time = tmp_time.tv_sec;
295     if(version >= 3) {
296         current_time_usec = tmp_time.tv_usec;
297     }
298     else {
299         current_time_usec = 0;
300     }
301
302     rra_current = rra_start = rra_begin = ftell(rrd_file);
303     /* This is defined in the ANSI C standard, section 7.9.5.3:
304
305         When a file is opened with udpate mode ('+' as the second
306         or third character in the ... list of mode argument
307         variables), both input and ouptut may be performed on the
308         associated stream.  However, ...  input may not be directly
309         followed by output without an intervening call to a file
310         positioning function, unless the input oepration encounters
311         end-of-file. */
312 #ifdef HAVE_MMAP
313     fseek(rrd_file, 0, SEEK_END);
314     rrd_filesize = ftell(rrd_file);
315     fseek(rrd_file, rra_current, SEEK_SET);
316 #else
317     fseek(rrd_file, 0, SEEK_CUR);
318 #endif
319
320     
321     /* get exclusive lock to whole file.
322      * lock gets removed when we close the file.
323      */
324     if (LockRRD(rrd_file) != 0) {
325       rrd_set_error("could not lock RRD");
326       rrd_free(&rrd);
327       fclose(rrd_file);
328       return(-1);   
329     } 
330
331     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
332         rrd_set_error("allocating updvals pointer array");
333         rrd_free(&rrd);
334         fclose(rrd_file);
335         return(-1);
336     }
337
338     if ((pdp_temp = malloc(sizeof(rrd_value_t)
339                            *rrd.stat_head->ds_cnt))==NULL){
340         rrd_set_error("allocating pdp_temp ...");
341         free(updvals);
342         rrd_free(&rrd);
343         fclose(rrd_file);
344         return(-1);
345     }
346
347     if ((tmpl_idx = malloc(sizeof(unsigned long)
348                            *(rrd.stat_head->ds_cnt+1)))==NULL){
349         rrd_set_error("allocating tmpl_idx ...");
350         free(pdp_temp);
351         free(updvals);
352         rrd_free(&rrd);
353         fclose(rrd_file);
354         return(-1);
355     }
356     /* initialize template redirector */
357     /* default config example (assume DS 1 is a CDEF DS)
358        tmpl_idx[0] -> 0; (time)
359        tmpl_idx[1] -> 1; (DS 0)
360        tmpl_idx[2] -> 3; (DS 2)
361        tmpl_idx[3] -> 4; (DS 3) */
362     tmpl_idx[0] = 0; /* time */
363     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
364         {
365            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
366               tmpl_idx[ii++]=i;
367         }
368     tmpl_cnt= ii;
369
370     if (template) {
371         char *dsname;
372         unsigned int tmpl_len;
373         dsname = template;
374         tmpl_cnt = 1; /* the first entry is the time */
375         tmpl_len = strlen(template);
376         for(i=0;i<=tmpl_len ;i++) {
377             if (template[i] == ':' || template[i] == '\0') {
378                 template[i] = '\0';
379                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
380                     rrd_set_error("Template contains more DS definitions than RRD");
381                     free(updvals); free(pdp_temp);
382                     free(tmpl_idx); rrd_free(&rrd);
383                     fclose(rrd_file); return(-1);
384                 }
385                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
386                     rrd_set_error("unknown DS name '%s'",dsname);
387                     free(updvals); free(pdp_temp);
388                     free(tmpl_idx); rrd_free(&rrd);
389                     fclose(rrd_file); return(-1);
390                 } else {
391                   /* the first element is always the time */
392                   tmpl_idx[tmpl_cnt-1]++; 
393                   /* go to the next entry on the template */
394                   dsname = &template[i+1];
395                   /* fix the damage we did before */
396                   if (i<tmpl_len) {
397                      template[i]=':';
398                   } 
399
400                 }
401             }       
402         }
403     }
404     if ((pdp_new = malloc(sizeof(rrd_value_t)
405                           *rrd.stat_head->ds_cnt))==NULL){
406         rrd_set_error("allocating pdp_new ...");
407         free(updvals);
408         free(pdp_temp);
409         free(tmpl_idx);
410         rrd_free(&rrd);
411         fclose(rrd_file);
412         return(-1);
413     }
414
415 #ifdef HAVE_MMAP
416     rrd_mmaped_file = mmap(0, 
417                         rrd_filesize, 
418                         PROT_READ | PROT_WRITE, 
419                         MAP_SHARED, 
420                         fileno(rrd_file), 
421                         0);
422     if (rrd_mmaped_file == MAP_FAILED) {
423         rrd_set_error("error mmapping file %s", filename);
424         free(updvals);
425         free(pdp_temp);
426         free(tmpl_idx);
427         rrd_free(&rrd);
428         fclose(rrd_file);
429         return(-1);
430     }
431 #endif
432     /* loop through the arguments. */
433     for(arg_i=0; arg_i<argc;arg_i++) {
434         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
435         char *step_start = stepper;
436         char *p;
437         char *parsetime_error = NULL;
438         enum {atstyle, normal} timesyntax;
439         struct rrd_time_value ds_tv;
440         if (stepper == NULL){
441                 rrd_set_error("failed duplication argv entry");
442                 free(updvals);
443                 free(pdp_temp);  
444                 free(tmpl_idx);
445                 rrd_free(&rrd);
446 #ifdef HAVE_MMAP
447                 munmap(rrd_mmaped_file, rrd_filesize);
448 #endif
449                 fclose(rrd_file);
450                 return(-1);
451          }
452         /* initialize all ds input to unknown except the first one
453            which has always got to be set */
454         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
455         strcpy(stepper,argv[arg_i]);
456         updvals[0]=stepper;
457         /* separate all ds elements; first must be examined separately
458            due to alternate time syntax */
459         if ((p=strchr(stepper,'@'))!=NULL) {
460             timesyntax = atstyle;
461             *p = '\0';
462             stepper = p+1;
463         } else if ((p=strchr(stepper,':'))!=NULL) {
464             timesyntax = normal;
465             *p = '\0';
466             stepper = p+1;
467         } else {
468             rrd_set_error("expected timestamp not found in data source from %s:...",
469                           argv[arg_i]);
470             free(step_start);
471             break;
472         }
473         ii=1;
474         updvals[tmpl_idx[ii]] = stepper;
475         while (*stepper) {
476             if (*stepper == ':') {
477                 *stepper = '\0';
478                 ii++;
479                 if (ii<tmpl_cnt){                   
480                     updvals[tmpl_idx[ii]] = stepper+1;
481                 }
482             }
483             stepper++;
484         }
485
486         if (ii != tmpl_cnt-1) {
487             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
488                           tmpl_cnt-1, ii, argv[arg_i]);
489             free(step_start);
490             break;
491         }
492         
493         /* get the time from the reading ... handle N */
494         if (timesyntax == atstyle) {
495             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
496                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
497                 free(step_start);
498                 break;
499             }
500             if (ds_tv.type == RELATIVE_TO_END_TIME ||
501                 ds_tv.type == RELATIVE_TO_START_TIME) {
502                 rrd_set_error("specifying time relative to the 'start' "
503                               "or 'end' makes no sense here: %s",
504                               updvals[0]);
505                 free(step_start);
506                 break;
507             }
508
509             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
510             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
511             
512         } else if (strcmp(updvals[0],"N")==0){
513             gettimeofday(&tmp_time, 0);
514             normalize_time(&tmp_time);
515             current_time = tmp_time.tv_sec;
516             current_time_usec = tmp_time.tv_usec;
517         } else {
518             double tmp;
519             tmp = strtod(updvals[0], 0);
520             current_time = floor(tmp);
521             current_time_usec = (long)((tmp - current_time) * 1000000L);
522         }
523         /* dont do any correction for old version RRDs */
524         if(version < 3) 
525             current_time_usec = 0;
526         
527         if(current_time <= rrd.live_head->last_up){
528             rrd_set_error("illegal attempt to update using time %ld when "
529                           "last update time is %ld (minimum one second step)",
530                           current_time, rrd.live_head->last_up);
531             free(step_start);
532             break;
533         }
534         
535         
536         /* seek to the beginning of the rra's */
537         if (rra_current != rra_begin) {
538 #ifndef HAVE_MMAP
539             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
540                 rrd_set_error("seek error in rrd");
541                 free(step_start);
542                 break;
543             }
544 #endif
545             rra_current = rra_begin;
546         }
547         rra_start = rra_begin;
548
549         /* when was the current pdp started */
550         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
551         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
552
553         /* when did the last pdp_st occur */
554         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
555         occu_pdp_st = current_time - occu_pdp_age;
556         /* interval = current_time - rrd.live_head->last_up; */
557         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
558     
559         if (occu_pdp_st > proc_pdp_st){
560             /* OK we passed the pdp_st moment*/
561             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
562                                                               * occurred before the latest
563                                                               * pdp_st moment*/
564             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
565             post_int = occu_pdp_age;                         /* how much after it */
566             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
567         } else {
568             pre_int = interval;
569             post_int = 0;
570         }
571
572 #ifdef DEBUG
573         printf(
574                "proc_pdp_age %lu\t"
575                "proc_pdp_st %lu\t" 
576                "occu_pfp_age %lu\t" 
577                "occu_pdp_st %lu\t"
578                "int %lf\t"
579                "pre_int %lf\t"
580                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
581                 occu_pdp_age, occu_pdp_st,
582                interval, pre_int, post_int);
583 #endif
584     
585         /* process the data sources and update the pdp_prep 
586          * area accordingly */
587         for(i=0;i<rrd.stat_head->ds_cnt;i++){
588             enum dst_en dst_idx;
589             dst_idx= dst_conv(rrd.ds_def[i].dst);
590                 /* NOTE: DST_CDEF should never enter this if block, because
591                  * updvals[i+1][0] is initialized to 'U'; unless the caller
592                  * accidently specified a value for the DST_CDEF. To handle 
593                  * this case, an extra check is required. */
594             if((updvals[i+1][0] != 'U') &&
595                    (dst_idx != DST_CDEF) &&
596                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
597                double rate = DNAN;
598                /* the data source type defines how to process the data */
599                 /* pdp_new contains rate * time ... eg the bytes
600                  * transferred during the interval. Doing it this way saves
601                  * a lot of math operations */
602                 
603
604                 switch(dst_idx){
605                 case DST_COUNTER:
606                 case DST_DERIVE:
607                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
608                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
609                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
610                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
611                                  break;
612                             }
613                        }
614                        if (rrd_test_error()){
615                             break;
616                        }
617                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
618                        if(dst_idx == DST_COUNTER) {
619                           /* simple overflow catcher suggested by Andres Kroonmaa */
620                           /* this will fail terribly for non 32 or 64 bit counters ... */
621                           /* are there any others in SNMP land ? */
622                           if (pdp_new[i] < (double)0.0 ) 
623                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
624                           if (pdp_new[i] < (double)0.0 ) 
625                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
626                        }
627                        rate = pdp_new[i] / interval;
628                     }
629                    else {
630                      pdp_new[i]= DNAN;          
631                    }
632                    break;
633                 case DST_ABSOLUTE:
634                     errno = 0;
635                     pdp_new[i] = strtod(updvals[i+1],&endptr);
636                     if (errno > 0){
637                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
638                         break;
639                     };
640                     if (endptr[0] != '\0'){
641                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
642                         break;
643                     }
644                     rate = pdp_new[i] / interval;                 
645                     break;
646                 case DST_GAUGE:
647                     errno = 0;
648                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
649                     if (errno > 0){
650                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
651                         break;
652                     };
653                     if (endptr[0] != '\0'){
654                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
655                         break;
656                     }
657                     rate = pdp_new[i] / interval;                  
658                     break;
659                 default:
660                     rrd_set_error("rrd contains unknown DS type : '%s'",
661                                   rrd.ds_def[i].dst);
662                     break;
663                 }
664                 /* break out of this for loop if the error string is set */
665                 if (rrd_test_error()){
666                     break;
667                 }
668                /* make sure pdp_temp is neither too large or too small
669                 * if any of these occur it becomes unknown ...
670                 * sorry folks ... */
671                if ( ! isnan(rate) && 
672                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
673                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
674                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
675                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
676                   pdp_new[i] = DNAN;
677                }               
678             } else {
679                 /* no news is news all the same */
680                 pdp_new[i] = DNAN;
681             }
682             
683             /* make a copy of the command line argument for the next run */
684 #ifdef DEBUG
685             fprintf(stderr,
686                     "prep ds[%lu]\t"
687                     "last_arg '%s'\t"
688                     "this_arg '%s'\t"
689                     "pdp_new %10.2f\n",
690                     i,
691                     rrd.pdp_prep[i].last_ds,
692                     updvals[i+1], pdp_new[i]);
693 #endif
694             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
695                 strncpy(rrd.pdp_prep[i].last_ds,
696                         updvals[i+1],LAST_DS_LEN-1);
697                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
698             }
699         }
700         /* break out of the argument parsing loop if the error_string is set */
701         if (rrd_test_error()){
702             free(step_start);
703             break;
704         }
705         /* has a pdp_st moment occurred since the last run ? */
706
707         if (proc_pdp_st == occu_pdp_st){
708             /* no we have not passed a pdp_st moment. therefore update is simple */
709
710             for(i=0;i<rrd.stat_head->ds_cnt;i++){
711                 if(isnan(pdp_new[i]))
712                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
713                 else
714                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
715 #ifdef DEBUG
716                 fprintf(stderr,
717                         "NO PDP  ds[%lu]\t"
718                         "value %10.2f\t"
719                         "unkn_sec %5lu\n",
720                         i,
721                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
722                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
723 #endif
724             }   
725         } else {
726             /* an pdp_st has occurred. */
727
728             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
729              * occurred up to the last run.        
730             pdp_new[] contains rate*seconds from the latest run.
731             pdp_temp[] will contain the rate for cdp */
732
733             for(i=0;i<rrd.stat_head->ds_cnt;i++){
734                 /* update pdp_prep to the current pdp_st */
735                 if(isnan(pdp_new[i]))
736                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
737                 else
738                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
739                         pdp_new[i]/(double)interval*(double)pre_int;
740
741                 /* if too much of the pdp_prep is unknown we dump it */
742                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
743                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
744                     (occu_pdp_st-proc_pdp_st <= 
745                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
746                     pdp_temp[i] = DNAN;
747                 } else {
748                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
749                         / (double)( occu_pdp_st
750                                    - proc_pdp_st
751                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
752                 }
753
754                 /* process CDEF data sources; remember each CDEF DS can
755                  * only reference other DS with a lower index number */
756             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
757                    rpnp_t *rpnp;
758                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
759                    /* substitue data values for OP_VARIABLE nodes */
760                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
761                    {
762                           if (rpnp[ii].op == OP_VARIABLE) {
763                                  rpnp[ii].op = OP_NUMBER;
764                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
765                           }
766                    }
767                    /* run the rpn calculator */
768                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
769                           free(rpnp);
770                           break; /* exits the data sources pdp_temp loop */
771                    }
772                 }
773         
774                 /* make pdp_prep ready for the next run */
775                 if(isnan(pdp_new[i])){
776                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
777                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
778                 } else {
779                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
780                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
781                         pdp_new[i]/(double)interval*(double)post_int;
782                 }
783
784 #ifdef DEBUG
785                 fprintf(stderr,
786                         "PDP UPD ds[%lu]\t"
787                         "pdp_temp %10.2f\t"
788                         "new_prep %10.2f\t"
789                         "new_unkn_sec %5lu\n",
790                         i, pdp_temp[i],
791                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
792                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
793 #endif
794             }
795
796                 /* if there were errors during the last loop, bail out here */
797             if (rrd_test_error()){
798                free(step_start);
799                break;
800             }
801
802                 /* compute the number of elapsed pdp_st moments */
803                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
804 #ifdef DEBUG
805                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
806 #endif
807                 if (rra_step_cnt == NULL)
808                 {
809                    rra_step_cnt = (unsigned long *) 
810                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
811                 }
812
813             for(i = 0, rra_start = rra_begin;
814                 i < rrd.stat_head->rra_cnt;
815             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
816                 i++)
817                 {
818                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
819                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
820                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
821         if (start_pdp_offset <= elapsed_pdp_st) {
822            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
823                       rrd.rra_def[i].pdp_cnt + 1;
824             } else {
825                    rra_step_cnt[i] = 0;
826                 }
827
828                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
829                 {
830                    /* If this is a bulk update, we need to skip ahead in the seasonal
831                         * arrays so that they will be correct for the next observed value;
832                         * note that for the bulk update itself, no update will occur to
833                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
834                         * be set to DNAN. */
835            if (rra_step_cnt[i] > 2) 
836                    {
837                           /* skip update by resetting rra_step_cnt[i],
838                            * note that this is not data source specific; this is due
839                            * to the bulk update, not a DNAN value for the specific data
840                            * source. */
841                           rra_step_cnt[i] = 0;
842               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
843                              &last_seasonal_coef);
844                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
845                              &seasonal_coef);
846                    }
847                 
848                   /* periodically run a smoother for seasonal effects */
849                   /* Need to use first cdp parameter buffer to track
850                    * burnin (burnin requires a specific smoothing schedule).
851                    * The CDP_init_seasonal parameter is really an RRA level,
852                    * not a data source within RRA level parameter, but the rra_def
853                    * is read only for rrd_update (not flushed to disk). */
854                   iii = i*(rrd.stat_head -> ds_cnt);
855                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
856                           <= BURNIN_CYCLES)
857                   {
858                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
859                                  > rrd.rra_def[i].row_cnt - 1) {
860                            /* mark off one of the burnin cycles */
861                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
862                        schedule_smooth = 1;
863                          }  
864                   } else {
865                          /* someone has no doubt invented a trick to deal with this
866                           * wrap around, but at least this code is clear. */
867                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
868                              rrd.rra_ptr[i].cur_row)
869                          {
870                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
871                                   * mapping between PDP and CDP */
872                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
873                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
874                                  {
875 #ifdef DEBUG
876                                         fprintf(stderr,
877                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
878                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
879                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
880 #endif
881                                         schedule_smooth = 1;
882                                  }
883              } else {
884                                  /* can't rely on negative numbers because we are working with
885                                   * unsigned values */
886                                  /* Don't need modulus here. If we've wrapped more than once, only
887                                   * one smooth is executed at the end. */
888                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
889                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
890                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
891                                  {
892 #ifdef DEBUG
893                                         fprintf(stderr,
894                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
895                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
896                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
897 #endif
898                                         schedule_smooth = 1;
899                                  }
900                          }
901                   }
902
903               rra_current = ftell(rrd_file); 
904                 } /* if cf is DEVSEASONAL or SEASONAL */
905
906         if (rrd_test_error()) break;
907
908                     /* update CDP_PREP areas */
909                     /* loop over data soures within each RRA */
910                     for(ii = 0;
911                         ii < rrd.stat_head->ds_cnt;
912                         ii++)
913                         {
914                         
915                         /* iii indexes the CDP prep area for this data source within the RRA */
916                         iii=i*rrd.stat_head->ds_cnt+ii;
917
918                         if (rrd.rra_def[i].pdp_cnt > 1) {
919                           
920                            if (rra_step_cnt[i] > 0) {
921                            /* If we are in this block, as least 1 CDP value will be written to
922                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
923                                 * to be written, then the "fill in" value is the CDP_secondary_val
924                                 * entry. */
925                                   if (isnan(pdp_temp[ii]))
926                   {
927                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
928                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
929                                   } else {
930                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
931                                           * CDP data entries. No matter the CF, the value is the same because
932                                           * the average, max, min, and last of a list of identical values is
933                                           * the same, namely, the value itself. */
934                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
935                                   }
936                      
937                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
938                                       > rrd.rra_def[i].pdp_cnt*
939                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
940                                   {
941                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
942                                          /* initialize carry over */
943                                          if (current_cf == CF_AVERAGE) {
944                                                    if (isnan(pdp_temp[ii])) { 
945                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
946                                                    } else {
947                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
948                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
949                                                    }
950                                          } else {
951                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
952                                          }
953                                   } else {
954                                          rrd_value_t cum_val, cur_val; 
955                                      switch (current_cf) {
956                                                 case CF_AVERAGE:
957                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
958                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
959                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
960                                                (cum_val + cur_val * start_pdp_offset) /
961                                            (rrd.rra_def[i].pdp_cnt
962                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
963                                                    /* initialize carry over value */
964                                                    if (isnan(pdp_temp[ii])) { 
965                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
966                                                    } else {
967                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
968                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
969                                                    }
970                                                    break;
971                                                 case CF_MAXIMUM:
972                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
973                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
974 #ifdef DEBUG
975                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
976                                                           isnan(pdp_temp[ii])) {
977                                                      fprintf(stderr,
978                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
979                                                                 i,ii);
980                                                          exit(-1);
981                                                   }
982 #endif
983                                                   if (cur_val > cum_val)
984                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
985                                                   else
986                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
987                                                   /* initialize carry over value */
988                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
989                                                   break;
990                                                 case CF_MINIMUM:
991                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
992                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
993 #ifdef DEBUG
994                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
995                                                           isnan(pdp_temp[ii])) {
996                                                      fprintf(stderr,
997                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
998                                                                 i,ii);
999                                                          exit(-1);
1000                                                   }
1001 #endif
1002                                                   if (cur_val < cum_val)
1003                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1004                                                   else
1005                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1006                                                   /* initialize carry over value */
1007                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1008                                                   break;
1009                                                 case CF_LAST:
1010                                                 default:
1011                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1012                                                    /* initialize carry over value */
1013                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1014                                                 break;
1015                                          }
1016                                   } /* endif meets xff value requirement for a valid value */
1017                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1018                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1019                                   if (isnan(pdp_temp[ii]))
1020                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1021                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1022                                   else
1023                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1024                } else  /* rra_step_cnt[i]  == 0 */
1025                            {
1026 #ifdef DEBUG
1027                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1028                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1029                                          i,ii);
1030                                   } else {
1031                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1032                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1033                                   }
1034 #endif
1035                                   if (isnan(pdp_temp[ii])) {
1036                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1037                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1038                                   {
1039                                          if (current_cf == CF_AVERAGE) {
1040                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1041                                                    elapsed_pdp_st;
1042                                          } else {
1043                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1044                                          }
1045 #ifdef DEBUG
1046                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1047                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1048 #endif
1049                                   } else {
1050                                          switch (current_cf) {
1051                                          case CF_AVERAGE:
1052                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1053                                                    elapsed_pdp_st;
1054                                                 break;
1055                                          case CF_MINIMUM:
1056                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1057                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1058                                                 break; 
1059                                          case CF_MAXIMUM:
1060                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1061                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062                                                 break; 
1063                                          case CF_LAST:
1064                                          default:
1065                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1066                                                 break;
1067                                          }
1068                                   }
1069                            }
1070                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1071                            if (elapsed_pdp_st > 2)
1072                            {
1073                                    switch (current_cf) {
1074                                    case CF_AVERAGE:
1075                                    default:
1076                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1077                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1078                                           break;
1079                    case CF_SEASONAL:
1080                                    case CF_DEVSEASONAL:
1081                                           /* need to update cached seasonal values, so they are consistent
1082                                            * with the bulk update */
1083                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1084                                            * CDP_last_deviation are the same. */
1085                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1086                                                  last_seasonal_coef[ii];
1087                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1088                                                  seasonal_coef[ii];
1089                                           break;
1090                    case CF_HWPREDICT:
1091                                           /* need to update the null_count and last_null_count.
1092                                            * even do this for non-DNAN pdp_temp because the
1093                                            * algorithm is not learning from batch updates. */
1094                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1095                                                  elapsed_pdp_st;
1096                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1097                                                  elapsed_pdp_st - 1;
1098                                           /* fall through */
1099                                    case CF_DEVPREDICT:
1100                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1101                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1102                                           break;
1103                    case CF_FAILURES:
1104                                           /* do not count missed bulk values as failures */
1105                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1106                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1107                                           /* need to reset violations buffer.
1108                                            * could do this more carefully, but for now, just
1109                                            * assume a bulk update wipes away all violations. */
1110                       erase_violations(&rrd, iii, i);
1111                                           break;
1112                                    }
1113                            } 
1114                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1115
1116                         if (rrd_test_error()) break;
1117
1118                         } /* endif data sources loop */
1119         } /* end RRA Loop */
1120
1121                 /* this loop is only entered if elapsed_pdp_st < 3 */
1122                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1123                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1124                 {
1125                for(i = 0, rra_start = rra_begin;
1126                    i < rrd.stat_head->rra_cnt;
1127                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1128                    i++)
1129                    {
1130                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1131
1132                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1133                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1134                           {
1135                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1136                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1137                                 &seasonal_coef);
1138                  rra_current = ftell(rrd_file);
1139                           }
1140                           if (rrd_test_error()) break;
1141                       /* loop over data soures within each RRA */
1142                       for(ii = 0;
1143                           ii < rrd.stat_head->ds_cnt;
1144                           ii++)
1145                           {
1146                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1147                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1148                                     scratch_idx, seasonal_coef);
1149                           }
1150            } /* end RRA Loop */
1151                    if (rrd_test_error()) break;
1152             } /* end elapsed_pdp_st loop */
1153
1154                 if (rrd_test_error()) break;
1155
1156                 /* Ready to write to disk */
1157                 /* Move sequentially through the file, writing one RRA at a time.
1158                  * Note this architecture divorces the computation of CDP with
1159                  * flushing updated RRA entries to disk. */
1160             for(i = 0, rra_start = rra_begin;
1161                 i < rrd.stat_head->rra_cnt;
1162             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1163                 i++) {
1164                 /* is there anything to write for this RRA? If not, continue. */
1165         if (rra_step_cnt[i] == 0) continue;
1166
1167                 /* write the first row */
1168 #ifdef DEBUG
1169         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1170 #endif
1171             rrd.rra_ptr[i].cur_row++;
1172             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1173                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1174                 /* positition on the first row */
1175                 rra_pos_tmp = rra_start +
1176                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1177                 if(rra_pos_tmp != rra_current) {
1178 #ifndef HAVE_MMAP
1179                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1180                       rrd_set_error("seek error in rrd");
1181                       break;
1182                    }
1183 #endif
1184                    rra_current = rra_pos_tmp;
1185                 }
1186
1187 #ifdef DEBUG
1188             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1189 #endif
1190                 scratch_idx = CDP_primary_val;
1191                 if (pcdp_summary != NULL)
1192                 {
1193                    rra_time = (current_time - current_time 
1194                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1195                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1196                 }
1197 #ifdef HAVE_MMAP
1198                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1199                    pcdp_summary, &rra_time, rrd_mmaped_file);
1200 #else
1201                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1202                    pcdp_summary, &rra_time);
1203 #endif
1204                 if (rrd_test_error()) break;
1205
1206                 /* write other rows of the bulk update, if any */
1207                 scratch_idx = CDP_secondary_val;
1208                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1209                 {
1210                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1211                    {
1212 #ifdef DEBUG
1213               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1214                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1215 #endif
1216                           /* wrap */
1217                           rrd.rra_ptr[i].cur_row = 0;
1218                           /* seek back to beginning of current rra */
1219                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1220                           {
1221                          rrd_set_error("seek error in rrd");
1222                          break;
1223                           }
1224 #ifdef DEBUG
1225                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1226 #endif
1227                           rra_current = rra_start;
1228                    }
1229                    if (pcdp_summary != NULL)
1230                    {
1231                       rra_time = (current_time - current_time 
1232                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1233                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1234                    }
1235 #ifdef HAVE_MMAP
1236                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1237                       pcdp_summary, &rra_time, rrd_mmaped_file);
1238 #else
1239                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1240                       pcdp_summary, &rra_time);
1241 #endif
1242                 }
1243                 
1244                 if (rrd_test_error())
1245                   break;
1246                 } /* RRA LOOP */
1247
1248             /* break out of the argument parsing loop if error_string is set */
1249             if (rrd_test_error()){
1250                    free(step_start);
1251                    break;
1252             } 
1253             
1254         } /* endif a pdp_st has occurred */ 
1255         rrd.live_head->last_up = current_time;
1256         rrd.live_head->last_up_usec = current_time_usec; 
1257         free(step_start);
1258     } /* function argument loop */
1259
1260     if (seasonal_coef != NULL) free(seasonal_coef);
1261     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1262         if (rra_step_cnt != NULL) free(rra_step_cnt);
1263     rpnstack_free(&rpnstack);
1264
1265 #ifdef HAVE_MMAP
1266     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1267             rrd_set_error("error writing(unmapping) file: %s", filename);
1268     }
1269 #endif    
1270     /* if we got here and if there is an error and if the file has not been
1271      * written to, then close things up and return. */
1272     if (rrd_test_error()) {
1273         free(updvals);
1274         free(tmpl_idx);
1275         rrd_free(&rrd);
1276         free(pdp_temp);
1277         free(pdp_new);
1278         fclose(rrd_file);
1279         return(-1);
1280     }
1281
1282     /* aargh ... that was tough ... so many loops ... anyway, its done.
1283      * we just need to write back the live header portion now*/
1284
1285     if (fseek(rrd_file, (sizeof(stat_head_t)
1286                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1287                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1288               SEEK_SET) != 0) {
1289         rrd_set_error("seek rrd for live header writeback");
1290         free(updvals);
1291         free(tmpl_idx);
1292         rrd_free(&rrd);
1293         free(pdp_temp);
1294         free(pdp_new);
1295         fclose(rrd_file);
1296         return(-1);
1297     }
1298
1299     if(version >= 3) {
1300             if(fwrite( rrd.live_head,
1301                        sizeof(live_head_t), 1, rrd_file) != 1){
1302                 rrd_set_error("fwrite live_head to rrd");
1303                 free(updvals);
1304                 rrd_free(&rrd);
1305                 free(tmpl_idx);
1306                 free(pdp_temp);
1307                 free(pdp_new);
1308                 fclose(rrd_file);
1309                 return(-1);
1310             }
1311     }
1312     else {
1313             if(fwrite( &rrd.live_head->last_up,
1314                        sizeof(time_t), 1, rrd_file) != 1){
1315                 rrd_set_error("fwrite live_head to rrd");
1316                 free(updvals);
1317                 rrd_free(&rrd);
1318                 free(tmpl_idx);
1319                 free(pdp_temp);
1320                 free(pdp_new);
1321                 fclose(rrd_file);
1322                 return(-1);
1323             }
1324     }
1325             
1326
1327     if(fwrite( rrd.pdp_prep,
1328                sizeof(pdp_prep_t),
1329                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1330         rrd_set_error("ftwrite pdp_prep to rrd");
1331         free(updvals);
1332         rrd_free(&rrd);
1333         free(tmpl_idx);
1334         free(pdp_temp);
1335         free(pdp_new);
1336         fclose(rrd_file);
1337         return(-1);
1338     }
1339
1340     if(fwrite( rrd.cdp_prep,
1341                sizeof(cdp_prep_t),
1342                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1343        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1344
1345         rrd_set_error("ftwrite cdp_prep to rrd");
1346         free(updvals);
1347         free(tmpl_idx);
1348         rrd_free(&rrd);
1349         free(pdp_temp);
1350         free(pdp_new);
1351         fclose(rrd_file);
1352         return(-1);
1353     }
1354
1355     if(fwrite( rrd.rra_ptr,
1356                sizeof(rra_ptr_t), 
1357                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1358         rrd_set_error("fwrite rra_ptr to rrd");
1359         free(updvals);
1360         free(tmpl_idx);
1361         rrd_free(&rrd);
1362         free(pdp_temp);
1363         free(pdp_new);
1364         fclose(rrd_file);
1365         return(-1);
1366     }
1367
1368     /* OK now close the files and free the memory */
1369     if(fclose(rrd_file) != 0){
1370         rrd_set_error("closing rrd");
1371         free(updvals);
1372         free(tmpl_idx);
1373         rrd_free(&rrd);
1374         free(pdp_temp);
1375         free(pdp_new);
1376         return(-1);
1377     }
1378
1379     /* calling the smoothing code here guarantees at most
1380          * one smoothing operation per rrd_update call. Unfortunately,
1381          * it is possible with bulk updates, or a long-delayed update
1382          * for smoothing to occur off-schedule. This really isn't
1383          * critical except during the burning cycles. */
1384         if (schedule_smooth)
1385         {
1386 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1387           rrd_file = fopen(filename,"rb+");
1388 #else
1389           rrd_file = fopen(filename,"r+");
1390 #endif
1391           rra_start = rra_begin;
1392           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1393           {
1394             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1395                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1396             {
1397 #ifdef DEBUG
1398               fprintf(stderr,"Running smoother for rra %ld\n",i);
1399 #endif
1400               apply_smoother(&rrd,i,rra_start,rrd_file);
1401               if (rrd_test_error())
1402                 break;
1403             }
1404             rra_start += rrd.rra_def[i].row_cnt
1405               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1406           }
1407           fclose(rrd_file);
1408         }
1409     rrd_free(&rrd);
1410     free(updvals);
1411     free(tmpl_idx);
1412     free(pdp_new);
1413     free(pdp_temp);
1414     return(0);
1415 }
1416
1417 /*
1418  * get exclusive lock to whole file.
1419  * lock gets removed when we close the file
1420  *
1421  * returns 0 on success
1422  */
1423 int
1424 LockRRD(FILE *rrdfile)
1425 {
1426     int rrd_fd;         /* File descriptor for RRD */
1427     int rcstat;
1428
1429     rrd_fd = fileno(rrdfile);
1430
1431         {
1432 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1433     struct _stat st;
1434
1435     if ( _fstat( rrd_fd, &st ) == 0 ) {
1436             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1437     } else {
1438             rcstat = -1;
1439     }
1440 #else
1441     struct flock        lock;
1442     lock.l_type = F_WRLCK;    /* exclusive write lock */
1443     lock.l_len = 0;           /* whole file */
1444     lock.l_start = 0;         /* start of file */
1445     lock.l_whence = SEEK_SET;   /* end of file */
1446
1447     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1448 #endif
1449         }
1450
1451     return(rcstat);
1452 }
1453
1454
1455 #ifdef HAVE_MMAP
1456 info_t
1457 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1458                unsigned short CDP_scratch_idx, FILE *rrd_file,
1459                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1460 #else
1461 info_t
1462 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1463                unsigned short CDP_scratch_idx, FILE *rrd_file,
1464                    info_t *pcdp_summary, time_t *rra_time)
1465 #endif
1466 {
1467    unsigned long ds_idx, cdp_idx;
1468    infoval iv;
1469   
1470    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1471    {
1472       /* compute the cdp index */
1473       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1474 #ifdef DEBUG
1475           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1476              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1477              rrd -> rra_def[rra_idx].cf_nam);
1478 #endif 
1479       if (pcdp_summary != NULL)
1480           {
1481              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1482              /* append info to the return hash */
1483                  pcdp_summary = info_push(pcdp_summary,
1484                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1485                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1486                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1487          RD_I_VAL, iv);
1488           }
1489 #ifdef HAVE_MMAP
1490           memcpy((char *)rrd_mmaped_file + *rra_current,
1491                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1492                           sizeof(rrd_value_t));
1493 #else
1494           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1495                  sizeof(rrd_value_t),1,rrd_file) != 1)
1496           { 
1497              rrd_set_error("writing rrd");
1498              return 0;
1499           }
1500 #endif
1501           *rra_current += sizeof(rrd_value_t);
1502         }
1503         return (pcdp_summary);
1504 }