prepare for the release of rrdtool-1.2.9
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.9  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 struct timeval {
36         time_t tv_sec; /* seconds */
37         long tv_usec;  /* microseconds */
38 };
39
40 struct __timezone {
41         int  tz_minuteswest; /* minutes W of Greenwich */
42         int  tz_dsttime;     /* type of dst correction */
43 };
44
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
46         
47         struct timeb current_time;
48
49         _ftime(&current_time);
50         
51         t->tv_sec  = current_time.time;
52         t->tv_usec = current_time.millitm * 1000;
53 }
54
55 #endif
56 /*
57  * normilize time as returned by gettimeofday. usec part must
58  * be always >= 0
59  */
60 static void normalize_time(struct timeval *t)
61 {
62         if(t->tv_usec < 0) {
63                 t->tv_sec--;
64                 t->tv_usec += 1000000L;
65         }
66 }
67
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
70 #ifdef HAVE_MMAP
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
72                                         unsigned long *rra_current,
73                                         unsigned short CDP_scratch_idx,
74 #ifndef DEBUG
75 FILE UNUSED(*rrd_file),
76 #else
77 FILE *rrd_file,
78 #endif
79                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
80 #else
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
82                                         unsigned long *rra_current,
83                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
84                                         info_t *pcdp_summary, time_t *rra_time);
85 #endif
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv, 
88                                         info_t*);
89
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
91
92
93 #ifdef STANDALONE
94 int 
95 main(int argc, char **argv){
96         rrd_update(argc,argv);
97         if (rrd_test_error()) {
98                 printf("RRDtool " PACKAGE_VERSION "  Copyright by Tobi Oetiker, 1997-2005\n\n"
99                         "Usage: rrdupdate filename\n"
100                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101                         "\t\t\ttime|N:value[:value...]\n\n"
102                         "\t\t\tat-time@value[:value...]\n\n"
103                         "\t\t\t[ time:value[:value...] ..]\n\n");
104                                    
105                 printf("ERROR: %s\n",rrd_get_error());
106                 rrd_clear_error();                                                            
107                 return 1;
108         }
109         return 0;
110 }
111 #endif
112
113 info_t *rrd_update_v(int argc, char **argv)
114 {
115     char             *template = NULL;          
116         info_t *result = NULL;
117         infoval rc;
118     optind = 0; opterr = 0;  /* initialize getopt */
119
120     while (1) {
121                 static struct option long_options[] =
122                         {
123                                 {"template",      required_argument, 0, 't'},
124                                 {0,0,0,0}
125                         };
126                 int option_index = 0;
127                 int opt;
128                 opt = getopt_long(argc, argv, "t:", 
129                                                   long_options, &option_index);
130                 
131                 if (opt == EOF)
132                         break;
133                 
134                 switch(opt) {
135                 case 't':
136                         template = optarg;
137                         break;
138                 
139                 case '?':
140                         rrd_set_error("unknown option '%s'",argv[optind-1]);
141             rc.u_int = -1;
142                         goto end_tag;
143                 }
144     }
145
146     /* need at least 2 arguments: filename, data. */
147     if (argc-optind < 2) {
148                 rrd_set_error("Not enough arguments");
149         rc.u_int = -1;
150                 goto end_tag;
151     }
152     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153         rc.u_int = _rrd_update(argv[optind], template,
154                       argc - optind - 1, argv + optind + 1, result);
155     result->value.u_int = rc.u_int;
156 end_tag:
157     return result;
158 }
159
160 int
161 rrd_update(int argc, char **argv)
162 {
163     char             *template = NULL;          
164     int rc;
165     optind = 0; opterr = 0;  /* initialize getopt */
166
167     while (1) {
168                 static struct option long_options[] =
169                         {
170                                 {"template",      required_argument, 0, 't'},
171                                 {0,0,0,0}
172                         };
173                 int option_index = 0;
174                 int opt;
175                 opt = getopt_long(argc, argv, "t:", 
176                                                   long_options, &option_index);
177                 
178                 if (opt == EOF)
179                         break;
180                 
181                 switch(opt) {
182                 case 't':
183                         template = optarg;
184                         break;
185                 
186                 case '?':
187                         rrd_set_error("unknown option '%s'",argv[optind-1]);
188                         return(-1);
189                 }
190     }
191
192     /* need at least 2 arguments: filename, data. */
193     if (argc-optind < 2) {
194                 rrd_set_error("Not enough arguments");
195
196                 return -1;
197     }
198  
199         rc = rrd_update_r(argv[optind], template,
200                       argc - optind - 1, argv + optind + 1);
201     return rc;
202 }
203
204 int
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
206 {
207    return _rrd_update(filename, template, argc, argv, NULL);
208 }
209
210 int
211 _rrd_update(char *filename, char *template, int argc, char **argv, 
212    info_t *pcdp_summary)
213 {
214
215     int              arg_i = 2;
216     short            j;
217     unsigned long    i,ii,iii=1;
218
219     unsigned long    rra_begin;          /* byte pointer to the rra
220                                           * area in the rrd file.  this
221                                           * pointer never changes value */
222     unsigned long    rra_start;          /* byte pointer to the rra
223                                           * area in the rrd file.  this
224                                           * pointer changes as each rrd is
225                                           * processed. */
226     unsigned long    rra_current;        /* byte pointer to the current write
227                                           * spot in the rrd file. */
228     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
229     double           interval,
230         pre_int,post_int;                /* interval between this and
231                                           * the last run */
232     unsigned long    proc_pdp_st;        /* which pdp_st was the last
233                                           * to be processed */
234     unsigned long    occu_pdp_st;        /* when was the pdp_st
235                                           * before the last update
236                                           * time */
237     unsigned long    proc_pdp_age;       /* how old was the data in
238                                           * the pdp prep area when it
239                                           * was last updated */
240     unsigned long    occu_pdp_age;       /* how long ago was the last
241                                           * pdp_step time */
242     rrd_value_t      *pdp_new;           /* prepare the incoming data
243                                           * to be added the the
244                                           * existing entry */
245     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
246                                           * to be added the the
247                                           * cdp values */
248
249     long             *tmpl_idx;          /* index representing the settings
250                                             transported by the template index */
251     unsigned long    tmpl_cnt = 2;       /* time and data */
252
253     FILE             *rrd_file;
254     rrd_t            rrd;
255     time_t           current_time = 0;
256     time_t           rra_time = 0;       /* time of update for a RRA */
257     unsigned long    current_time_usec=0;/* microseconds part of current time */
258     struct timeval   tmp_time;           /* used for time conversion */
259
260     char             **updvals;
261     int              schedule_smooth = 0;
262         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263                                          /* a vector of future Holt-Winters seasonal coefs */
264     unsigned long    elapsed_pdp_st;
265                                          /* number of elapsed PDP steps since last update */
266     unsigned long    *rra_step_cnt = NULL;
267                                          /* number of rows to be updated in an RRA for a data
268                                           * value. */
269     unsigned long    start_pdp_offset;
270                                          /* number of PDP steps since the last update that
271                                           * are assigned to the first CDP to be generated
272                                           * since the last update. */
273     unsigned short   scratch_idx;
274                                          /* index into the CDP scratch array */
275     enum cf_en       current_cf;
276                                          /* numeric id of the current consolidation function */
277     rpnstack_t       rpnstack; /* used for COMPUTE DS */
278     int              version;  /* rrd version */
279     char             *endptr; /* used in the conversion */
280 #ifdef HAVE_MMAP
281     void             *rrd_mmaped_file;
282     unsigned long    rrd_filesize;
283 #endif
284
285     rpnstack_init(&rpnstack);
286
287     /* need at least 1 arguments: data. */
288     if (argc < 1) {
289         rrd_set_error("Not enough arguments");
290         return -1;
291     }
292     
293     
294
295     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
296         return -1;
297     }
298     /* initialize time */
299     version = atoi(rrd.stat_head->version);
300     gettimeofday(&tmp_time, 0);
301     normalize_time(&tmp_time);
302     current_time = tmp_time.tv_sec;
303     if(version >= 3) {
304         current_time_usec = tmp_time.tv_usec;
305     }
306     else {
307         current_time_usec = 0;
308     }
309
310     rra_current = rra_start = rra_begin = ftell(rrd_file);
311     /* This is defined in the ANSI C standard, section 7.9.5.3:
312
313         When a file is opened with udpate mode ('+' as the second
314         or third character in the ... list of mode argument
315         variables), both input and ouptut may be performed on the
316         associated stream.  However, ...  input may not be directly
317         followed by output without an intervening call to a file
318         positioning function, unless the input oepration encounters
319         end-of-file. */
320 #ifdef HAVE_MMAP
321     fseek(rrd_file, 0, SEEK_END);
322     rrd_filesize = ftell(rrd_file);
323     fseek(rrd_file, rra_current, SEEK_SET);
324 #else
325     fseek(rrd_file, 0, SEEK_CUR);
326 #endif
327
328     
329     /* get exclusive lock to whole file.
330      * lock gets removed when we close the file.
331      */
332     if (LockRRD(rrd_file) != 0) {
333       rrd_set_error("could not lock RRD");
334       rrd_free(&rrd);
335       fclose(rrd_file);
336       return(-1);   
337     } 
338
339     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340         rrd_set_error("allocating updvals pointer array");
341         rrd_free(&rrd);
342         fclose(rrd_file);
343         return(-1);
344     }
345
346     if ((pdp_temp = malloc(sizeof(rrd_value_t)
347                            *rrd.stat_head->ds_cnt))==NULL){
348         rrd_set_error("allocating pdp_temp ...");
349         free(updvals);
350         rrd_free(&rrd);
351         fclose(rrd_file);
352         return(-1);
353     }
354
355     if ((tmpl_idx = malloc(sizeof(unsigned long)
356                            *(rrd.stat_head->ds_cnt+1)))==NULL){
357         rrd_set_error("allocating tmpl_idx ...");
358         free(pdp_temp);
359         free(updvals);
360         rrd_free(&rrd);
361         fclose(rrd_file);
362         return(-1);
363     }
364     /* initialize template redirector */
365     /* default config example (assume DS 1 is a CDEF DS)
366        tmpl_idx[0] -> 0; (time)
367        tmpl_idx[1] -> 1; (DS 0)
368        tmpl_idx[2] -> 3; (DS 2)
369        tmpl_idx[3] -> 4; (DS 3) */
370     tmpl_idx[0] = 0; /* time */
371     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
372         {
373            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
374               tmpl_idx[ii++]=i;
375         }
376     tmpl_cnt= ii;
377
378     if (template) {
379         char *dsname;
380         unsigned int tmpl_len;
381         dsname = template;
382         tmpl_cnt = 1; /* the first entry is the time */
383         tmpl_len = strlen(template);
384         for(i=0;i<=tmpl_len ;i++) {
385             if (template[i] == ':' || template[i] == '\0') {
386                 template[i] = '\0';
387                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388                     rrd_set_error("Template contains more DS definitions than RRD");
389                     free(updvals); free(pdp_temp);
390                     free(tmpl_idx); rrd_free(&rrd);
391                     fclose(rrd_file); return(-1);
392                 }
393                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394                     rrd_set_error("unknown DS name '%s'",dsname);
395                     free(updvals); free(pdp_temp);
396                     free(tmpl_idx); rrd_free(&rrd);
397                     fclose(rrd_file); return(-1);
398                 } else {
399                   /* the first element is always the time */
400                   tmpl_idx[tmpl_cnt-1]++; 
401                   /* go to the next entry on the template */
402                   dsname = &template[i+1];
403                   /* fix the damage we did before */
404                   if (i<tmpl_len) {
405                      template[i]=':';
406                   } 
407
408                 }
409             }       
410         }
411     }
412     if ((pdp_new = malloc(sizeof(rrd_value_t)
413                           *rrd.stat_head->ds_cnt))==NULL){
414         rrd_set_error("allocating pdp_new ...");
415         free(updvals);
416         free(pdp_temp);
417         free(tmpl_idx);
418         rrd_free(&rrd);
419         fclose(rrd_file);
420         return(-1);
421     }
422
423 #ifdef HAVE_MMAP
424     rrd_mmaped_file = mmap(0, 
425                         rrd_filesize, 
426                         PROT_READ | PROT_WRITE, 
427                         MAP_SHARED, 
428                         fileno(rrd_file), 
429                         0);
430     if (rrd_mmaped_file == MAP_FAILED) {
431         rrd_set_error("error mmapping file %s", filename);
432         free(updvals);
433         free(pdp_temp);
434         free(tmpl_idx);
435         rrd_free(&rrd);
436         fclose(rrd_file);
437         return(-1);
438     }
439 #endif
440     /* loop through the arguments. */
441     for(arg_i=0; arg_i<argc;arg_i++) {
442         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443         char *step_start = stepper;
444         char *p;
445         char *parsetime_error = NULL;
446         enum {atstyle, normal} timesyntax;
447         struct rrd_time_value ds_tv;
448         if (stepper == NULL){
449                 rrd_set_error("failed duplication argv entry");
450                 free(updvals);
451                 free(pdp_temp);  
452                 free(tmpl_idx);
453                 rrd_free(&rrd);
454 #ifdef HAVE_MMAP
455                 munmap(rrd_mmaped_file, rrd_filesize);
456 #endif
457                 fclose(rrd_file);
458                 return(-1);
459          }
460         /* initialize all ds input to unknown except the first one
461            which has always got to be set */
462         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463         strcpy(stepper,argv[arg_i]);
464         updvals[0]=stepper;
465         /* separate all ds elements; first must be examined separately
466            due to alternate time syntax */
467         if ((p=strchr(stepper,'@'))!=NULL) {
468             timesyntax = atstyle;
469             *p = '\0';
470             stepper = p+1;
471         } else if ((p=strchr(stepper,':'))!=NULL) {
472             timesyntax = normal;
473             *p = '\0';
474             stepper = p+1;
475         } else {
476             rrd_set_error("expected timestamp not found in data source from %s:...",
477                           argv[arg_i]);
478             free(step_start);
479             break;
480         }
481         ii=1;
482         updvals[tmpl_idx[ii]] = stepper;
483         while (*stepper) {
484             if (*stepper == ':') {
485                 *stepper = '\0';
486                 ii++;
487                 if (ii<tmpl_cnt){                   
488                     updvals[tmpl_idx[ii]] = stepper+1;
489                 }
490             }
491             stepper++;
492         }
493
494         if (ii != tmpl_cnt-1) {
495             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496                           tmpl_cnt-1, ii, argv[arg_i]);
497             free(step_start);
498             break;
499         }
500         
501         /* get the time from the reading ... handle N */
502         if (timesyntax == atstyle) {
503             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
505                 free(step_start);
506                 break;
507             }
508             if (ds_tv.type == RELATIVE_TO_END_TIME ||
509                 ds_tv.type == RELATIVE_TO_START_TIME) {
510                 rrd_set_error("specifying time relative to the 'start' "
511                               "or 'end' makes no sense here: %s",
512                               updvals[0]);
513                 free(step_start);
514                 break;
515             }
516
517             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
519             
520         } else if (strcmp(updvals[0],"N")==0){
521             gettimeofday(&tmp_time, 0);
522             normalize_time(&tmp_time);
523             current_time = tmp_time.tv_sec;
524             current_time_usec = tmp_time.tv_usec;
525         } else {
526             double tmp;
527             tmp = strtod(updvals[0], 0);
528             current_time = floor(tmp);
529             current_time_usec = (long)((tmp - current_time) * 1000000L);
530         }
531         /* dont do any correction for old version RRDs */
532         if(version < 3) 
533             current_time_usec = 0;
534         
535         if(current_time <= rrd.live_head->last_up){
536             rrd_set_error("illegal attempt to update using time %ld when "
537                           "last update time is %ld (minimum one second step)",
538                           current_time, rrd.live_head->last_up);
539             free(step_start);
540             break;
541         }
542         
543         
544         /* seek to the beginning of the rra's */
545         if (rra_current != rra_begin) {
546 #ifndef HAVE_MMAP
547             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548                 rrd_set_error("seek error in rrd");
549                 free(step_start);
550                 break;
551             }
552 #endif
553             rra_current = rra_begin;
554         }
555         rra_start = rra_begin;
556
557         /* when was the current pdp started */
558         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
560
561         /* when did the last pdp_st occur */
562         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563         occu_pdp_st = current_time - occu_pdp_age;
564         /* interval = current_time - rrd.live_head->last_up; */
565         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
566     
567         if (occu_pdp_st > proc_pdp_st){
568             /* OK we passed the pdp_st moment*/
569             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570                                                               * occurred before the latest
571                                                               * pdp_st moment*/
572             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573             post_int = occu_pdp_age;                         /* how much after it */
574             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
575         } else {
576             pre_int = interval;
577             post_int = 0;
578         }
579
580 #ifdef DEBUG
581         printf(
582                "proc_pdp_age %lu\t"
583                "proc_pdp_st %lu\t" 
584                "occu_pfp_age %lu\t" 
585                "occu_pdp_st %lu\t"
586                "int %lf\t"
587                "pre_int %lf\t"
588                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
589                 occu_pdp_age, occu_pdp_st,
590                interval, pre_int, post_int);
591 #endif
592     
593         /* process the data sources and update the pdp_prep 
594          * area accordingly */
595         for(i=0;i<rrd.stat_head->ds_cnt;i++){
596             enum dst_en dst_idx;
597             dst_idx= dst_conv(rrd.ds_def[i].dst);
598                 /* NOTE: DST_CDEF should never enter this if block, because
599                  * updvals[i+1][0] is initialized to 'U'; unless the caller
600                  * accidently specified a value for the DST_CDEF. To handle 
601                  * this case, an extra check is required. */
602             if((updvals[i+1][0] != 'U') &&
603                    (dst_idx != DST_CDEF) &&
604                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
605                double rate = DNAN;
606                /* the data source type defines how to process the data */
607                 /* pdp_new contains rate * time ... eg the bytes
608                  * transferred during the interval. Doing it this way saves
609                  * a lot of math operations */
610                 
611
612                 switch(dst_idx){
613                 case DST_COUNTER:
614                 case DST_DERIVE:
615                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
616                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
617                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
618                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
619                                  break;
620                             }
621                        }
622                        if (rrd_test_error()){
623                             break;
624                        }
625                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
626                        if(dst_idx == DST_COUNTER) {
627                           /* simple overflow catcher suggested by Andres Kroonmaa */
628                           /* this will fail terribly for non 32 or 64 bit counters ... */
629                           /* are there any others in SNMP land ? */
630                           if (pdp_new[i] < (double)0.0 ) 
631                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
632                           if (pdp_new[i] < (double)0.0 ) 
633                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
634                        }
635                        rate = pdp_new[i] / interval;
636                     }
637                    else {
638                      pdp_new[i]= DNAN;          
639                    }
640                    break;
641                 case DST_ABSOLUTE:
642                     errno = 0;
643                     pdp_new[i] = strtod(updvals[i+1],&endptr);
644                     if (errno > 0){
645                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
646                         break;
647                     };
648                     if (endptr[0] != '\0'){
649                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
650                         break;
651                     }
652                     rate = pdp_new[i] / interval;                 
653                     break;
654                 case DST_GAUGE:
655                     errno = 0;
656                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
657                     if (errno > 0){
658                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
659                         break;
660                     };
661                     if (endptr[0] != '\0'){
662                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
663                         break;
664                     }
665                     rate = pdp_new[i] / interval;                  
666                     break;
667                 default:
668                     rrd_set_error("rrd contains unknown DS type : '%s'",
669                                   rrd.ds_def[i].dst);
670                     break;
671                 }
672                 /* break out of this for loop if the error string is set */
673                 if (rrd_test_error()){
674                     break;
675                 }
676                /* make sure pdp_temp is neither too large or too small
677                 * if any of these occur it becomes unknown ...
678                 * sorry folks ... */
679                if ( ! isnan(rate) && 
680                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
681                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
682                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
683                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
684                   pdp_new[i] = DNAN;
685                }               
686             } else {
687                 /* no news is news all the same */
688                 pdp_new[i] = DNAN;
689             }
690             
691             /* make a copy of the command line argument for the next run */
692 #ifdef DEBUG
693             fprintf(stderr,
694                     "prep ds[%lu]\t"
695                     "last_arg '%s'\t"
696                     "this_arg '%s'\t"
697                     "pdp_new %10.2f\n",
698                     i,
699                     rrd.pdp_prep[i].last_ds,
700                     updvals[i+1], pdp_new[i]);
701 #endif
702             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
703                 strncpy(rrd.pdp_prep[i].last_ds,
704                         updvals[i+1],LAST_DS_LEN-1);
705                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
706             }
707         }
708         /* break out of the argument parsing loop if the error_string is set */
709         if (rrd_test_error()){
710             free(step_start);
711             break;
712         }
713         /* has a pdp_st moment occurred since the last run ? */
714
715         if (proc_pdp_st == occu_pdp_st){
716             /* no we have not passed a pdp_st moment. therefore update is simple */
717
718             for(i=0;i<rrd.stat_head->ds_cnt;i++){
719                 if(isnan(pdp_new[i]))
720                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
721                 else
722                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
723 #ifdef DEBUG
724                 fprintf(stderr,
725                         "NO PDP  ds[%lu]\t"
726                         "value %10.2f\t"
727                         "unkn_sec %5lu\n",
728                         i,
729                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
730                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
731 #endif
732             }   
733         } else {
734             /* an pdp_st has occurred. */
735
736             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
737              * occurred up to the last run.        
738             pdp_new[] contains rate*seconds from the latest run.
739             pdp_temp[] will contain the rate for cdp */
740
741             for(i=0;i<rrd.stat_head->ds_cnt;i++){
742                 /* update pdp_prep to the current pdp_st */
743                 if(isnan(pdp_new[i]))
744                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
745                 else
746                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
747                         pdp_new[i]/(double)interval*(double)pre_int;
748
749                 /* if too much of the pdp_prep is unknown we dump it */
750                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
751                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
752                     (occu_pdp_st-proc_pdp_st <= 
753                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
754                     pdp_temp[i] = DNAN;
755                 } else {
756                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
757                         / (double)( occu_pdp_st
758                                    - proc_pdp_st
759                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
760                 }
761
762                 /* process CDEF data sources; remember each CDEF DS can
763                  * only reference other DS with a lower index number */
764             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
765                    rpnp_t *rpnp;
766                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
767                    /* substitue data values for OP_VARIABLE nodes */
768                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
769                    {
770                           if (rpnp[ii].op == OP_VARIABLE) {
771                                  rpnp[ii].op = OP_NUMBER;
772                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
773                           }
774                    }
775                    /* run the rpn calculator */
776                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
777                           free(rpnp);
778                           break; /* exits the data sources pdp_temp loop */
779                    }
780                 }
781         
782                 /* make pdp_prep ready for the next run */
783                 if(isnan(pdp_new[i])){
784                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
785                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
786                 } else {
787                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
788                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
789                         pdp_new[i]/(double)interval*(double)post_int;
790                 }
791
792 #ifdef DEBUG
793                 fprintf(stderr,
794                         "PDP UPD ds[%lu]\t"
795                         "pdp_temp %10.2f\t"
796                         "new_prep %10.2f\t"
797                         "new_unkn_sec %5lu\n",
798                         i, pdp_temp[i],
799                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
800                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
801 #endif
802             }
803
804                 /* if there were errors during the last loop, bail out here */
805             if (rrd_test_error()){
806                free(step_start);
807                break;
808             }
809
810                 /* compute the number of elapsed pdp_st moments */
811                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
812 #ifdef DEBUG
813                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
814 #endif
815                 if (rra_step_cnt == NULL)
816                 {
817                    rra_step_cnt = (unsigned long *) 
818                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
819                 }
820
821             for(i = 0, rra_start = rra_begin;
822                 i < rrd.stat_head->rra_cnt;
823             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
824                 i++)
825                 {
826                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
827                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
828                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
829         if (start_pdp_offset <= elapsed_pdp_st) {
830            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
831                       rrd.rra_def[i].pdp_cnt + 1;
832             } else {
833                    rra_step_cnt[i] = 0;
834                 }
835
836                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
837                 {
838                    /* If this is a bulk update, we need to skip ahead in the seasonal
839                         * arrays so that they will be correct for the next observed value;
840                         * note that for the bulk update itself, no update will occur to
841                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
842                         * be set to DNAN. */
843            if (rra_step_cnt[i] > 2) 
844                    {
845                           /* skip update by resetting rra_step_cnt[i],
846                            * note that this is not data source specific; this is due
847                            * to the bulk update, not a DNAN value for the specific data
848                            * source. */
849                           rra_step_cnt[i] = 0;
850               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
851                              &last_seasonal_coef);
852                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
853                              &seasonal_coef);
854                    }
855                 
856                   /* periodically run a smoother for seasonal effects */
857                   /* Need to use first cdp parameter buffer to track
858                    * burnin (burnin requires a specific smoothing schedule).
859                    * The CDP_init_seasonal parameter is really an RRA level,
860                    * not a data source within RRA level parameter, but the rra_def
861                    * is read only for rrd_update (not flushed to disk). */
862                   iii = i*(rrd.stat_head -> ds_cnt);
863                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
864                           <= BURNIN_CYCLES)
865                   {
866                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
867                                  > rrd.rra_def[i].row_cnt - 1) {
868                            /* mark off one of the burnin cycles */
869                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
870                        schedule_smooth = 1;
871                          }  
872                   } else {
873                          /* someone has no doubt invented a trick to deal with this
874                           * wrap around, but at least this code is clear. */
875                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
876                              rrd.rra_ptr[i].cur_row)
877                          {
878                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
879                                   * mapping between PDP and CDP */
880                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
881                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
882                                  {
883 #ifdef DEBUG
884                                         fprintf(stderr,
885                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
886                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
887                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
888 #endif
889                                         schedule_smooth = 1;
890                                  }
891              } else {
892                                  /* can't rely on negative numbers because we are working with
893                                   * unsigned values */
894                                  /* Don't need modulus here. If we've wrapped more than once, only
895                                   * one smooth is executed at the end. */
896                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
897                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
898                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
899                                  {
900 #ifdef DEBUG
901                                         fprintf(stderr,
902                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
903                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
904                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
905 #endif
906                                         schedule_smooth = 1;
907                                  }
908                          }
909                   }
910
911               rra_current = ftell(rrd_file); 
912                 } /* if cf is DEVSEASONAL or SEASONAL */
913
914         if (rrd_test_error()) break;
915
916                     /* update CDP_PREP areas */
917                     /* loop over data soures within each RRA */
918                     for(ii = 0;
919                         ii < rrd.stat_head->ds_cnt;
920                         ii++)
921                         {
922                         
923                         /* iii indexes the CDP prep area for this data source within the RRA */
924                         iii=i*rrd.stat_head->ds_cnt+ii;
925
926                         if (rrd.rra_def[i].pdp_cnt > 1) {
927                           
928                            if (rra_step_cnt[i] > 0) {
929                            /* If we are in this block, as least 1 CDP value will be written to
930                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
931                                 * to be written, then the "fill in" value is the CDP_secondary_val
932                                 * entry. */
933                                   if (isnan(pdp_temp[ii]))
934                   {
935                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
936                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
937                                   } else {
938                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
939                                           * CDP data entries. No matter the CF, the value is the same because
940                                           * the average, max, min, and last of a list of identical values is
941                                           * the same, namely, the value itself. */
942                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
943                                   }
944                      
945                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
946                                       > rrd.rra_def[i].pdp_cnt*
947                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
948                                   {
949                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
950                                          /* initialize carry over */
951                                          if (current_cf == CF_AVERAGE) {
952                                                    if (isnan(pdp_temp[ii])) { 
953                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
954                                                    } else {
955                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
956                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
957                                                    }
958                                          } else {
959                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
960                                          }
961                                   } else {
962                                          rrd_value_t cum_val, cur_val; 
963                                      switch (current_cf) {
964                                                 case CF_AVERAGE:
965                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
966                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
967                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
968                                                (cum_val + cur_val * start_pdp_offset) /
969                                            (rrd.rra_def[i].pdp_cnt
970                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
971                                                    /* initialize carry over value */
972                                                    if (isnan(pdp_temp[ii])) { 
973                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
974                                                    } else {
975                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
976                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
977                                                    }
978                                                    break;
979                                                 case CF_MAXIMUM:
980                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
981                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
982 #ifdef DEBUG
983                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
984                                                           isnan(pdp_temp[ii])) {
985                                                      fprintf(stderr,
986                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
987                                                                 i,ii);
988                                                          exit(-1);
989                                                   }
990 #endif
991                                                   if (cur_val > cum_val)
992                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
993                                                   else
994                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
995                                                   /* initialize carry over value */
996                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
997                                                   break;
998                                                 case CF_MINIMUM:
999                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1000                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1001 #ifdef DEBUG
1002                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1003                                                           isnan(pdp_temp[ii])) {
1004                                                      fprintf(stderr,
1005                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1006                                                                 i,ii);
1007                                                          exit(-1);
1008                                                   }
1009 #endif
1010                                                   if (cur_val < cum_val)
1011                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1012                                                   else
1013                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1014                                                   /* initialize carry over value */
1015                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1016                                                   break;
1017                                                 case CF_LAST:
1018                                                 default:
1019                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1020                                                    /* initialize carry over value */
1021                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1022                                                 break;
1023                                          }
1024                                   } /* endif meets xff value requirement for a valid value */
1025                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1026                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1027                                   if (isnan(pdp_temp[ii]))
1028                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1029                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1030                                   else
1031                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1032                } else  /* rra_step_cnt[i]  == 0 */
1033                            {
1034 #ifdef DEBUG
1035                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1036                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1037                                          i,ii);
1038                                   } else {
1039                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1040                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1041                                   }
1042 #endif
1043                                   if (isnan(pdp_temp[ii])) {
1044                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1045                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1046                                   {
1047                                          if (current_cf == CF_AVERAGE) {
1048                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1049                                                    elapsed_pdp_st;
1050                                          } else {
1051                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1052                                          }
1053 #ifdef DEBUG
1054                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1055                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1056 #endif
1057                                   } else {
1058                                          switch (current_cf) {
1059                                          case CF_AVERAGE:
1060                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1061                                                    elapsed_pdp_st;
1062                                                 break;
1063                                          case CF_MINIMUM:
1064                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1065                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1066                                                 break; 
1067                                          case CF_MAXIMUM:
1068                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1069                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1070                                                 break; 
1071                                          case CF_LAST:
1072                                          default:
1073                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1074                                                 break;
1075                                          }
1076                                   }
1077                            }
1078                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1079                            if (elapsed_pdp_st > 2)
1080                            {
1081                                    switch (current_cf) {
1082                                    case CF_AVERAGE:
1083                                    default:
1084                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1085                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1086                                           break;
1087                    case CF_SEASONAL:
1088                                    case CF_DEVSEASONAL:
1089                                           /* need to update cached seasonal values, so they are consistent
1090                                            * with the bulk update */
1091                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1092                                            * CDP_last_deviation are the same. */
1093                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1094                                                  last_seasonal_coef[ii];
1095                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1096                                                  seasonal_coef[ii];
1097                                           break;
1098                    case CF_HWPREDICT:
1099                                           /* need to update the null_count and last_null_count.
1100                                            * even do this for non-DNAN pdp_temp because the
1101                                            * algorithm is not learning from batch updates. */
1102                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1103                                                  elapsed_pdp_st;
1104                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1105                                                  elapsed_pdp_st - 1;
1106                                           /* fall through */
1107                                    case CF_DEVPREDICT:
1108                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1109                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1110                                           break;
1111                    case CF_FAILURES:
1112                                           /* do not count missed bulk values as failures */
1113                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1114                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1115                                           /* need to reset violations buffer.
1116                                            * could do this more carefully, but for now, just
1117                                            * assume a bulk update wipes away all violations. */
1118                       erase_violations(&rrd, iii, i);
1119                                           break;
1120                                    }
1121                            } 
1122                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1123
1124                         if (rrd_test_error()) break;
1125
1126                         } /* endif data sources loop */
1127         } /* end RRA Loop */
1128
1129                 /* this loop is only entered if elapsed_pdp_st < 3 */
1130                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1131                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1132                 {
1133                for(i = 0, rra_start = rra_begin;
1134                    i < rrd.stat_head->rra_cnt;
1135                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1136                    i++)
1137                    {
1138                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1139
1140                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1141                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1142                           {
1143                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1144                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1145                                 &seasonal_coef);
1146                  rra_current = ftell(rrd_file);
1147                           }
1148                           if (rrd_test_error()) break;
1149                       /* loop over data soures within each RRA */
1150                       for(ii = 0;
1151                           ii < rrd.stat_head->ds_cnt;
1152                           ii++)
1153                           {
1154                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1155                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1156                                     scratch_idx, seasonal_coef);
1157                           }
1158            } /* end RRA Loop */
1159                    if (rrd_test_error()) break;
1160             } /* end elapsed_pdp_st loop */
1161
1162                 if (rrd_test_error()) break;
1163
1164                 /* Ready to write to disk */
1165                 /* Move sequentially through the file, writing one RRA at a time.
1166                  * Note this architecture divorces the computation of CDP with
1167                  * flushing updated RRA entries to disk. */
1168             for(i = 0, rra_start = rra_begin;
1169                 i < rrd.stat_head->rra_cnt;
1170             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1171                 i++) {
1172                 /* is there anything to write for this RRA? If not, continue. */
1173         if (rra_step_cnt[i] == 0) continue;
1174
1175                 /* write the first row */
1176 #ifdef DEBUG
1177         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1178 #endif
1179             rrd.rra_ptr[i].cur_row++;
1180             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1181                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1182                 /* positition on the first row */
1183                 rra_pos_tmp = rra_start +
1184                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1185                 if(rra_pos_tmp != rra_current) {
1186 #ifndef HAVE_MMAP
1187                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1188                       rrd_set_error("seek error in rrd");
1189                       break;
1190                    }
1191 #endif
1192                    rra_current = rra_pos_tmp;
1193                 }
1194
1195 #ifdef DEBUG
1196             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1197 #endif
1198                 scratch_idx = CDP_primary_val;
1199                 if (pcdp_summary != NULL)
1200                 {
1201                    rra_time = (current_time - current_time 
1202                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1203                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1204                 }
1205 #ifdef HAVE_MMAP
1206                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1207                    pcdp_summary, &rra_time, rrd_mmaped_file);
1208 #else
1209                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1210                    pcdp_summary, &rra_time);
1211 #endif
1212                 if (rrd_test_error()) break;
1213
1214                 /* write other rows of the bulk update, if any */
1215                 scratch_idx = CDP_secondary_val;
1216                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1217                 {
1218                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1219                    {
1220 #ifdef DEBUG
1221               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1222                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1223 #endif
1224                           /* wrap */
1225                           rrd.rra_ptr[i].cur_row = 0;
1226                           /* seek back to beginning of current rra */
1227                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1228                           {
1229                          rrd_set_error("seek error in rrd");
1230                          break;
1231                           }
1232 #ifdef DEBUG
1233                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1234 #endif
1235                           rra_current = rra_start;
1236                    }
1237                    if (pcdp_summary != NULL)
1238                    {
1239                       rra_time = (current_time - current_time 
1240                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1241                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1242                    }
1243 #ifdef HAVE_MMAP
1244                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1245                       pcdp_summary, &rra_time, rrd_mmaped_file);
1246 #else
1247                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1248                       pcdp_summary, &rra_time);
1249 #endif
1250                 }
1251                 
1252                 if (rrd_test_error())
1253                   break;
1254                 } /* RRA LOOP */
1255
1256             /* break out of the argument parsing loop if error_string is set */
1257             if (rrd_test_error()){
1258                    free(step_start);
1259                    break;
1260             } 
1261             
1262         } /* endif a pdp_st has occurred */ 
1263         rrd.live_head->last_up = current_time;
1264         rrd.live_head->last_up_usec = current_time_usec; 
1265         free(step_start);
1266     } /* function argument loop */
1267
1268     if (seasonal_coef != NULL) free(seasonal_coef);
1269     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1270         if (rra_step_cnt != NULL) free(rra_step_cnt);
1271     rpnstack_free(&rpnstack);
1272
1273 #ifdef HAVE_MMAP
1274     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1275             rrd_set_error("error writing(unmapping) file: %s", filename);
1276     }
1277 #endif    
1278     /* if we got here and if there is an error and if the file has not been
1279      * written to, then close things up and return. */
1280     if (rrd_test_error()) {
1281         free(updvals);
1282         free(tmpl_idx);
1283         rrd_free(&rrd);
1284         free(pdp_temp);
1285         free(pdp_new);
1286         fclose(rrd_file);
1287         return(-1);
1288     }
1289
1290     /* aargh ... that was tough ... so many loops ... anyway, its done.
1291      * we just need to write back the live header portion now*/
1292
1293     if (fseek(rrd_file, (sizeof(stat_head_t)
1294                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1295                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1296               SEEK_SET) != 0) {
1297         rrd_set_error("seek rrd for live header writeback");
1298         free(updvals);
1299         free(tmpl_idx);
1300         rrd_free(&rrd);
1301         free(pdp_temp);
1302         free(pdp_new);
1303         fclose(rrd_file);
1304         return(-1);
1305     }
1306
1307     if(version >= 3) {
1308             if(fwrite( rrd.live_head,
1309                        sizeof(live_head_t), 1, rrd_file) != 1){
1310                 rrd_set_error("fwrite live_head to rrd");
1311                 free(updvals);
1312                 rrd_free(&rrd);
1313                 free(tmpl_idx);
1314                 free(pdp_temp);
1315                 free(pdp_new);
1316                 fclose(rrd_file);
1317                 return(-1);
1318             }
1319     }
1320     else {
1321             if(fwrite( &rrd.live_head->last_up,
1322                        sizeof(time_t), 1, rrd_file) != 1){
1323                 rrd_set_error("fwrite live_head to rrd");
1324                 free(updvals);
1325                 rrd_free(&rrd);
1326                 free(tmpl_idx);
1327                 free(pdp_temp);
1328                 free(pdp_new);
1329                 fclose(rrd_file);
1330                 return(-1);
1331             }
1332     }
1333             
1334
1335     if(fwrite( rrd.pdp_prep,
1336                sizeof(pdp_prep_t),
1337                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1338         rrd_set_error("ftwrite pdp_prep to rrd");
1339         free(updvals);
1340         rrd_free(&rrd);
1341         free(tmpl_idx);
1342         free(pdp_temp);
1343         free(pdp_new);
1344         fclose(rrd_file);
1345         return(-1);
1346     }
1347
1348     if(fwrite( rrd.cdp_prep,
1349                sizeof(cdp_prep_t),
1350                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1351        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1352
1353         rrd_set_error("ftwrite cdp_prep to rrd");
1354         free(updvals);
1355         free(tmpl_idx);
1356         rrd_free(&rrd);
1357         free(pdp_temp);
1358         free(pdp_new);
1359         fclose(rrd_file);
1360         return(-1);
1361     }
1362
1363     if(fwrite( rrd.rra_ptr,
1364                sizeof(rra_ptr_t), 
1365                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1366         rrd_set_error("fwrite rra_ptr to rrd");
1367         free(updvals);
1368         free(tmpl_idx);
1369         rrd_free(&rrd);
1370         free(pdp_temp);
1371         free(pdp_new);
1372         fclose(rrd_file);
1373         return(-1);
1374     }
1375
1376     /* OK now close the files and free the memory */
1377     if(fclose(rrd_file) != 0){
1378         rrd_set_error("closing rrd");
1379         free(updvals);
1380         free(tmpl_idx);
1381         rrd_free(&rrd);
1382         free(pdp_temp);
1383         free(pdp_new);
1384         return(-1);
1385     }
1386
1387     /* calling the smoothing code here guarantees at most
1388          * one smoothing operation per rrd_update call. Unfortunately,
1389          * it is possible with bulk updates, or a long-delayed update
1390          * for smoothing to occur off-schedule. This really isn't
1391          * critical except during the burning cycles. */
1392         if (schedule_smooth)
1393         {
1394           rrd_file = fopen(filename,"rb+");
1395           rra_start = rra_begin;
1396           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1397           {
1398             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1399                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1400             {
1401 #ifdef DEBUG
1402               fprintf(stderr,"Running smoother for rra %ld\n",i);
1403 #endif
1404               apply_smoother(&rrd,i,rra_start,rrd_file);
1405               if (rrd_test_error())
1406                 break;
1407             }
1408             rra_start += rrd.rra_def[i].row_cnt
1409               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1410           }
1411           fclose(rrd_file);
1412         }
1413     rrd_free(&rrd);
1414     free(updvals);
1415     free(tmpl_idx);
1416     free(pdp_new);
1417     free(pdp_temp);
1418     return(0);
1419 }
1420
1421 /*
1422  * get exclusive lock to whole file.
1423  * lock gets removed when we close the file
1424  *
1425  * returns 0 on success
1426  */
1427 int
1428 LockRRD(FILE *rrdfile)
1429 {
1430     int rrd_fd;         /* File descriptor for RRD */
1431     int rcstat;
1432
1433     rrd_fd = fileno(rrdfile);
1434
1435         {
1436 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1437     struct _stat st;
1438
1439     if ( _fstat( rrd_fd, &st ) == 0 ) {
1440             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1441     } else {
1442             rcstat = -1;
1443     }
1444 #else
1445     struct flock        lock;
1446     lock.l_type = F_WRLCK;    /* exclusive write lock */
1447     lock.l_len = 0;           /* whole file */
1448     lock.l_start = 0;         /* start of file */
1449     lock.l_whence = SEEK_SET;   /* end of file */
1450
1451     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1452 #endif
1453         }
1454
1455     return(rcstat);
1456 }
1457
1458
1459 #ifdef HAVE_MMAP
1460 info_t
1461 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1462                unsigned short CDP_scratch_idx, 
1463 #ifndef DEBUG
1464 FILE UNUSED(*rrd_file),
1465 #else
1466 FILE *rrd_file,
1467 #endif
1468                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1469 #else
1470 info_t
1471 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1472                unsigned short CDP_scratch_idx, FILE *rrd_file,
1473                    info_t *pcdp_summary, time_t *rra_time)
1474 #endif
1475 {
1476    unsigned long ds_idx, cdp_idx;
1477    infoval iv;
1478   
1479    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1480    {
1481       /* compute the cdp index */
1482       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1483 #ifdef DEBUG
1484           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1485              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1486              rrd -> rra_def[rra_idx].cf_nam);
1487 #endif 
1488       if (pcdp_summary != NULL)
1489           {
1490              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1491              /* append info to the return hash */
1492                  pcdp_summary = info_push(pcdp_summary,
1493                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1494                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1495                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1496          RD_I_VAL, iv);
1497           }
1498 #ifdef HAVE_MMAP
1499           memcpy((char *)rrd_mmaped_file + *rra_current,
1500                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1501                           sizeof(rrd_value_t));
1502 #else
1503           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1504                  sizeof(rrd_value_t),1,rrd_file) != 1)
1505           { 
1506              rrd_set_error("writing rrd");
1507              return 0;
1508           }
1509 #endif
1510           *rra_current += sizeof(rrd_value_t);
1511         }
1512         return (pcdp_summary);
1513 }