prepare for the release of rrdtool-1.2.11
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.11  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
27
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
34
35 struct timeval {
36         time_t tv_sec; /* seconds */
37         long tv_usec;  /* microseconds */
38 };
39
40 struct __timezone {
41         int  tz_minuteswest; /* minutes W of Greenwich */
42         int  tz_dsttime;     /* type of dst correction */
43 };
44
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
46         
47         struct timeb current_time;
48
49         _ftime(&current_time);
50         
51         t->tv_sec  = current_time.time;
52         t->tv_usec = current_time.millitm * 1000;
53 }
54
55 #endif
56 /*
57  * normilize time as returned by gettimeofday. usec part must
58  * be always >= 0
59  */
60 static void normalize_time(struct timeval *t)
61 {
62         if(t->tv_usec < 0) {
63                 t->tv_sec--;
64                 t->tv_usec += 1000000L;
65         }
66 }
67
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
70 #ifdef HAVE_MMAP
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
72                                         unsigned long *rra_current,
73                                         unsigned short CDP_scratch_idx,
74 #ifndef DEBUG
75 FILE UNUSED(*rrd_file),
76 #else
77 FILE *rrd_file,
78 #endif
79                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
80 #else
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
82                                         unsigned long *rra_current,
83                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
84                                         info_t *pcdp_summary, time_t *rra_time);
85 #endif
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv, 
88                                         info_t*);
89
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
91
92
93 #ifdef STANDALONE
94 int 
95 main(int argc, char **argv){
96         rrd_update(argc,argv);
97         if (rrd_test_error()) {
98                 printf("RRDtool " PACKAGE_VERSION "  Copyright by Tobi Oetiker, 1997-2005\n\n"
99                         "Usage: rrdupdate filename\n"
100                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101                         "\t\t\ttime|N:value[:value...]\n\n"
102                         "\t\t\tat-time@value[:value...]\n\n"
103                         "\t\t\t[ time:value[:value...] ..]\n\n");
104                                    
105                 printf("ERROR: %s\n",rrd_get_error());
106                 rrd_clear_error();                                                            
107                 return 1;
108         }
109         return 0;
110 }
111 #endif
112
113 info_t *rrd_update_v(int argc, char **argv)
114 {
115     char             *template = NULL;          
116         info_t *result = NULL;
117         infoval rc;
118     optind = 0; opterr = 0;  /* initialize getopt */
119
120     while (1) {
121                 static struct option long_options[] =
122                         {
123                                 {"template",      required_argument, 0, 't'},
124                                 {0,0,0,0}
125                         };
126                 int option_index = 0;
127                 int opt;
128                 opt = getopt_long(argc, argv, "t:", 
129                                                   long_options, &option_index);
130                 
131                 if (opt == EOF)
132                         break;
133                 
134                 switch(opt) {
135                 case 't':
136                         template = optarg;
137                         break;
138                 
139                 case '?':
140                         rrd_set_error("unknown option '%s'",argv[optind-1]);
141             rc.u_int = -1;
142                         goto end_tag;
143                 }
144     }
145
146     /* need at least 2 arguments: filename, data. */
147     if (argc-optind < 2) {
148                 rrd_set_error("Not enough arguments");
149         rc.u_int = -1;
150                 goto end_tag;
151     }
152     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153         rc.u_int = _rrd_update(argv[optind], template,
154                       argc - optind - 1, argv + optind + 1, result);
155     result->value.u_int = rc.u_int;
156 end_tag:
157     return result;
158 }
159
160 int
161 rrd_update(int argc, char **argv)
162 {
163     char             *template = NULL;          
164     int rc;
165     optind = 0; opterr = 0;  /* initialize getopt */
166
167     while (1) {
168                 static struct option long_options[] =
169                         {
170                                 {"template",      required_argument, 0, 't'},
171                                 {0,0,0,0}
172                         };
173                 int option_index = 0;
174                 int opt;
175                 opt = getopt_long(argc, argv, "t:", 
176                                                   long_options, &option_index);
177                 
178                 if (opt == EOF)
179                         break;
180                 
181                 switch(opt) {
182                 case 't':
183                         template = optarg;
184                         break;
185                 
186                 case '?':
187                         rrd_set_error("unknown option '%s'",argv[optind-1]);
188                         return(-1);
189                 }
190     }
191
192     /* need at least 2 arguments: filename, data. */
193     if (argc-optind < 2) {
194                 rrd_set_error("Not enough arguments");
195
196                 return -1;
197     }
198  
199         rc = rrd_update_r(argv[optind], template,
200                       argc - optind - 1, argv + optind + 1);
201     return rc;
202 }
203
204 int
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
206 {
207    return _rrd_update(filename, template, argc, argv, NULL);
208 }
209
210 int
211 _rrd_update(char *filename, char *template, int argc, char **argv, 
212    info_t *pcdp_summary)
213 {
214
215     int              arg_i = 2;
216     short            j;
217     unsigned long    i,ii,iii=1;
218
219     unsigned long    rra_begin;          /* byte pointer to the rra
220                                           * area in the rrd file.  this
221                                           * pointer never changes value */
222     unsigned long    rra_start;          /* byte pointer to the rra
223                                           * area in the rrd file.  this
224                                           * pointer changes as each rrd is
225                                           * processed. */
226     unsigned long    rra_current;        /* byte pointer to the current write
227                                           * spot in the rrd file. */
228     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
229     double           interval,
230         pre_int,post_int;                /* interval between this and
231                                           * the last run */
232     unsigned long    proc_pdp_st;        /* which pdp_st was the last
233                                           * to be processed */
234     unsigned long    occu_pdp_st;        /* when was the pdp_st
235                                           * before the last update
236                                           * time */
237     unsigned long    proc_pdp_age;       /* how old was the data in
238                                           * the pdp prep area when it
239                                           * was last updated */
240     unsigned long    occu_pdp_age;       /* how long ago was the last
241                                           * pdp_step time */
242     rrd_value_t      *pdp_new;           /* prepare the incoming data
243                                           * to be added the the
244                                           * existing entry */
245     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
246                                           * to be added the the
247                                           * cdp values */
248
249     long             *tmpl_idx;          /* index representing the settings
250                                             transported by the template index */
251     unsigned long    tmpl_cnt = 2;       /* time and data */
252
253     FILE             *rrd_file;
254     rrd_t            rrd;
255     time_t           current_time = 0;
256     time_t           rra_time = 0;       /* time of update for a RRA */
257     unsigned long    current_time_usec=0;/* microseconds part of current time */
258     struct timeval   tmp_time;           /* used for time conversion */
259
260     char             **updvals;
261     int              schedule_smooth = 0;
262         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263                                          /* a vector of future Holt-Winters seasonal coefs */
264     unsigned long    elapsed_pdp_st;
265                                          /* number of elapsed PDP steps since last update */
266     unsigned long    *rra_step_cnt = NULL;
267                                          /* number of rows to be updated in an RRA for a data
268                                           * value. */
269     unsigned long    start_pdp_offset;
270                                          /* number of PDP steps since the last update that
271                                           * are assigned to the first CDP to be generated
272                                           * since the last update. */
273     unsigned short   scratch_idx;
274                                          /* index into the CDP scratch array */
275     enum cf_en       current_cf;
276                                          /* numeric id of the current consolidation function */
277     rpnstack_t       rpnstack; /* used for COMPUTE DS */
278     int              version;  /* rrd version */
279     char             *endptr; /* used in the conversion */
280 #ifdef HAVE_MMAP
281     void             *rrd_mmaped_file;
282     unsigned long    rrd_filesize;
283 #endif
284
285     rpnstack_init(&rpnstack);
286
287     /* need at least 1 arguments: data. */
288     if (argc < 1) {
289         rrd_set_error("Not enough arguments");
290         return -1;
291     }
292     
293     
294
295     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
296         return -1;
297     }
298     /* initialize time */
299     version = atoi(rrd.stat_head->version);
300     gettimeofday(&tmp_time, 0);
301     normalize_time(&tmp_time);
302     current_time = tmp_time.tv_sec;
303     if(version >= 3) {
304         current_time_usec = tmp_time.tv_usec;
305     }
306     else {
307         current_time_usec = 0;
308     }
309
310     rra_current = rra_start = rra_begin = ftell(rrd_file);
311     /* This is defined in the ANSI C standard, section 7.9.5.3:
312
313         When a file is opened with udpate mode ('+' as the second
314         or third character in the ... list of mode argument
315         variables), both input and ouptut may be performed on the
316         associated stream.  However, ...  input may not be directly
317         followed by output without an intervening call to a file
318         positioning function, unless the input oepration encounters
319         end-of-file. */
320 #ifdef HAVE_MMAP
321     fseek(rrd_file, 0, SEEK_END);
322     rrd_filesize = ftell(rrd_file);
323     fseek(rrd_file, rra_current, SEEK_SET);
324 #else
325     fseek(rrd_file, 0, SEEK_CUR);
326 #endif
327
328     
329     /* get exclusive lock to whole file.
330      * lock gets removed when we close the file.
331      */
332     if (LockRRD(rrd_file) != 0) {
333       rrd_set_error("could not lock RRD");
334       rrd_free(&rrd);
335       fclose(rrd_file);
336       return(-1);   
337     } 
338
339     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340         rrd_set_error("allocating updvals pointer array");
341         rrd_free(&rrd);
342         fclose(rrd_file);
343         return(-1);
344     }
345
346     if ((pdp_temp = malloc(sizeof(rrd_value_t)
347                            *rrd.stat_head->ds_cnt))==NULL){
348         rrd_set_error("allocating pdp_temp ...");
349         free(updvals);
350         rrd_free(&rrd);
351         fclose(rrd_file);
352         return(-1);
353     }
354
355     if ((tmpl_idx = malloc(sizeof(unsigned long)
356                            *(rrd.stat_head->ds_cnt+1)))==NULL){
357         rrd_set_error("allocating tmpl_idx ...");
358         free(pdp_temp);
359         free(updvals);
360         rrd_free(&rrd);
361         fclose(rrd_file);
362         return(-1);
363     }
364     /* initialize template redirector */
365     /* default config example (assume DS 1 is a CDEF DS)
366        tmpl_idx[0] -> 0; (time)
367        tmpl_idx[1] -> 1; (DS 0)
368        tmpl_idx[2] -> 3; (DS 2)
369        tmpl_idx[3] -> 4; (DS 3) */
370     tmpl_idx[0] = 0; /* time */
371     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
372         {
373            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
374               tmpl_idx[ii++]=i;
375         }
376     tmpl_cnt= ii;
377
378     if (template) {
379         char *dsname;
380         unsigned int tmpl_len;
381         dsname = template;
382         tmpl_cnt = 1; /* the first entry is the time */
383         tmpl_len = strlen(template);
384         for(i=0;i<=tmpl_len ;i++) {
385             if (template[i] == ':' || template[i] == '\0') {
386                 template[i] = '\0';
387                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388                     rrd_set_error("Template contains more DS definitions than RRD");
389                     free(updvals); free(pdp_temp);
390                     free(tmpl_idx); rrd_free(&rrd);
391                     fclose(rrd_file); return(-1);
392                 }
393                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394                     rrd_set_error("unknown DS name '%s'",dsname);
395                     free(updvals); free(pdp_temp);
396                     free(tmpl_idx); rrd_free(&rrd);
397                     fclose(rrd_file); return(-1);
398                 } else {
399                   /* the first element is always the time */
400                   tmpl_idx[tmpl_cnt-1]++; 
401                   /* go to the next entry on the template */
402                   dsname = &template[i+1];
403                   /* fix the damage we did before */
404                   if (i<tmpl_len) {
405                      template[i]=':';
406                   } 
407
408                 }
409             }       
410         }
411     }
412     if ((pdp_new = malloc(sizeof(rrd_value_t)
413                           *rrd.stat_head->ds_cnt))==NULL){
414         rrd_set_error("allocating pdp_new ...");
415         free(updvals);
416         free(pdp_temp);
417         free(tmpl_idx);
418         rrd_free(&rrd);
419         fclose(rrd_file);
420         return(-1);
421     }
422
423 #ifdef HAVE_MMAP
424     rrd_mmaped_file = mmap(0, 
425                         rrd_filesize, 
426                         PROT_READ | PROT_WRITE, 
427                         MAP_SHARED, 
428                         fileno(rrd_file), 
429                         0);
430     if (rrd_mmaped_file == MAP_FAILED) {
431         rrd_set_error("error mmapping file %s", filename);
432         free(updvals);
433         free(pdp_temp);
434         free(tmpl_idx);
435         rrd_free(&rrd);
436         fclose(rrd_file);
437         return(-1);
438     }
439 #endif
440     /* loop through the arguments. */
441     for(arg_i=0; arg_i<argc;arg_i++) {
442         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443         char *step_start = stepper;
444         char *p;
445         char *parsetime_error = NULL;
446         enum {atstyle, normal} timesyntax;
447         struct rrd_time_value ds_tv;
448         if (stepper == NULL){
449                 rrd_set_error("failed duplication argv entry");
450                 free(updvals);
451                 free(pdp_temp);  
452                 free(tmpl_idx);
453                 rrd_free(&rrd);
454 #ifdef HAVE_MMAP
455                 munmap(rrd_mmaped_file, rrd_filesize);
456 #endif
457                 fclose(rrd_file);
458                 return(-1);
459          }
460         /* initialize all ds input to unknown except the first one
461            which has always got to be set */
462         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463         strcpy(stepper,argv[arg_i]);
464         updvals[0]=stepper;
465         /* separate all ds elements; first must be examined separately
466            due to alternate time syntax */
467         if ((p=strchr(stepper,'@'))!=NULL) {
468             timesyntax = atstyle;
469             *p = '\0';
470             stepper = p+1;
471         } else if ((p=strchr(stepper,':'))!=NULL) {
472             timesyntax = normal;
473             *p = '\0';
474             stepper = p+1;
475         } else {
476             rrd_set_error("expected timestamp not found in data source from %s:...",
477                           argv[arg_i]);
478             free(step_start);
479             break;
480         }
481         ii=1;
482         updvals[tmpl_idx[ii]] = stepper;
483         while (*stepper) {
484             if (*stepper == ':') {
485                 *stepper = '\0';
486                 ii++;
487                 if (ii<tmpl_cnt){                   
488                     updvals[tmpl_idx[ii]] = stepper+1;
489                 }
490             }
491             stepper++;
492         }
493
494         if (ii != tmpl_cnt-1) {
495             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496                           tmpl_cnt-1, ii, argv[arg_i]);
497             free(step_start);
498             break;
499         }
500         
501         /* get the time from the reading ... handle N */
502         if (timesyntax == atstyle) {
503             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
505                 free(step_start);
506                 break;
507             }
508             if (ds_tv.type == RELATIVE_TO_END_TIME ||
509                 ds_tv.type == RELATIVE_TO_START_TIME) {
510                 rrd_set_error("specifying time relative to the 'start' "
511                               "or 'end' makes no sense here: %s",
512                               updvals[0]);
513                 free(step_start);
514                 break;
515             }
516
517             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
519             
520         } else if (strcmp(updvals[0],"N")==0){
521             gettimeofday(&tmp_time, 0);
522             normalize_time(&tmp_time);
523             current_time = tmp_time.tv_sec;
524             current_time_usec = tmp_time.tv_usec;
525         } else {
526             double tmp;
527             tmp = strtod(updvals[0], 0);
528             current_time = floor(tmp);
529             current_time_usec = (long)((tmp - current_time) * 1000000L);
530         }
531         /* dont do any correction for old version RRDs */
532         if(version < 3) 
533             current_time_usec = 0;
534         
535         if(current_time <= rrd.live_head->last_up){
536             rrd_set_error("illegal attempt to update using time %ld when "
537                           "last update time is %ld (minimum one second step)",
538                           current_time, rrd.live_head->last_up);
539             free(step_start);
540             break;
541         }
542         
543         
544         /* seek to the beginning of the rra's */
545         if (rra_current != rra_begin) {
546 #ifndef HAVE_MMAP
547             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548                 rrd_set_error("seek error in rrd");
549                 free(step_start);
550                 break;
551             }
552 #endif
553             rra_current = rra_begin;
554         }
555         rra_start = rra_begin;
556
557         /* when was the current pdp started */
558         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
560
561         /* when did the last pdp_st occur */
562         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563         occu_pdp_st = current_time - occu_pdp_age;
564         /* interval = current_time - rrd.live_head->last_up; */
565         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
566     
567         if (occu_pdp_st > proc_pdp_st){
568             /* OK we passed the pdp_st moment*/
569             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570                                                               * occurred before the latest
571                                                               * pdp_st moment*/
572             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573             post_int = occu_pdp_age;                         /* how much after it */
574             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
575         } else {
576             pre_int = interval;
577             post_int = 0;
578         }
579
580 #ifdef DEBUG
581         printf(
582                "proc_pdp_age %lu\t"
583                "proc_pdp_st %lu\t" 
584                "occu_pfp_age %lu\t" 
585                "occu_pdp_st %lu\t"
586                "int %lf\t"
587                "pre_int %lf\t"
588                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
589                 occu_pdp_age, occu_pdp_st,
590                interval, pre_int, post_int);
591 #endif
592     
593         /* process the data sources and update the pdp_prep 
594          * area accordingly */
595         for(i=0;i<rrd.stat_head->ds_cnt;i++){
596             enum dst_en dst_idx;
597             dst_idx= dst_conv(rrd.ds_def[i].dst);
598
599             /* make sure we do not build diffs with old last_ds values */
600             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
601                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
602                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
603             }
604
605             /* NOTE: DST_CDEF should never enter this if block, because
606              * updvals[i+1][0] is initialized to 'U'; unless the caller
607              * accidently specified a value for the DST_CDEF. To handle 
608               * this case, an extra check is required. */
609
610             if((updvals[i+1][0] != 'U') &&
611                    (dst_idx != DST_CDEF) &&
612                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
613                double rate = DNAN;
614                /* the data source type defines how to process the data */
615                 /* pdp_new contains rate * time ... eg the bytes
616                  * transferred during the interval. Doing it this way saves
617                  * a lot of math operations */
618                 
619
620                 switch(dst_idx){
621                 case DST_COUNTER:
622                 case DST_DERIVE:
623                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
624                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
625                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
626                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
627                                  break;
628                             }
629                        }
630                        if (rrd_test_error()){
631                             break;
632                        }
633                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
634                        if(dst_idx == DST_COUNTER) {
635                           /* simple overflow catcher suggested by Andres Kroonmaa */
636                           /* this will fail terribly for non 32 or 64 bit counters ... */
637                           /* are there any others in SNMP land ? */
638                           if (pdp_new[i] < (double)0.0 ) 
639                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
640                           if (pdp_new[i] < (double)0.0 ) 
641                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
642                        }
643                        rate = pdp_new[i] / interval;
644                     }
645                    else {
646                      pdp_new[i]= DNAN;          
647                    }
648                    break;
649                 case DST_ABSOLUTE:
650                     errno = 0;
651                     pdp_new[i] = strtod(updvals[i+1],&endptr);
652                     if (errno > 0){
653                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
654                         break;
655                     };
656                     if (endptr[0] != '\0'){
657                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
658                         break;
659                     }
660                     rate = pdp_new[i] / interval;                 
661                     break;
662                 case DST_GAUGE:
663                     errno = 0;
664                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
665                     if (errno > 0){
666                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
667                         break;
668                     };
669                     if (endptr[0] != '\0'){
670                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
671                         break;
672                     }
673                     rate = pdp_new[i] / interval;                  
674                     break;
675                 default:
676                     rrd_set_error("rrd contains unknown DS type : '%s'",
677                                   rrd.ds_def[i].dst);
678                     break;
679                 }
680                 /* break out of this for loop if the error string is set */
681                 if (rrd_test_error()){
682                     break;
683                 }
684                /* make sure pdp_temp is neither too large or too small
685                 * if any of these occur it becomes unknown ...
686                 * sorry folks ... */
687                if ( ! isnan(rate) && 
688                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
689                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
690                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
691                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
692                   pdp_new[i] = DNAN;
693                }               
694             } else {
695                 /* no news is news all the same */
696                 pdp_new[i] = DNAN;
697             }
698             
699             /* make a copy of the command line argument for the next run */
700 #ifdef DEBUG
701             fprintf(stderr,
702                     "prep ds[%lu]\t"
703                     "last_arg '%s'\t"
704                     "this_arg '%s'\t"
705                     "pdp_new %10.2f\n",
706                     i,
707                     rrd.pdp_prep[i].last_ds,
708                     updvals[i+1], pdp_new[i]);
709 #endif
710             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
711                 strncpy(rrd.pdp_prep[i].last_ds,
712                         updvals[i+1],LAST_DS_LEN-1);
713                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
714             }
715         }
716         /* break out of the argument parsing loop if the error_string is set */
717         if (rrd_test_error()){
718             free(step_start);
719             break;
720         }
721         /* has a pdp_st moment occurred since the last run ? */
722
723         if (proc_pdp_st == occu_pdp_st){
724             /* no we have not passed a pdp_st moment. therefore update is simple */
725
726             for(i=0;i<rrd.stat_head->ds_cnt;i++){
727                 if(isnan(pdp_new[i]))
728                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
729                 else
730                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
731 #ifdef DEBUG
732                 fprintf(stderr,
733                         "NO PDP  ds[%lu]\t"
734                         "value %10.2f\t"
735                         "unkn_sec %5lu\n",
736                         i,
737                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
738                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 #endif
740             }   
741         } else {
742             /* an pdp_st has occurred. */
743
744             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
745              * occurred up to the last run.        
746             pdp_new[] contains rate*seconds from the latest run.
747             pdp_temp[] will contain the rate for cdp */
748
749             for(i=0;i<rrd.stat_head->ds_cnt;i++){
750                 /* update pdp_prep to the current pdp_st */
751                 if(isnan(pdp_new[i]))
752                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
753                 else
754                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
755                         pdp_new[i]/(double)interval*(double)pre_int;
756
757                 /* if too much of the pdp_prep is unknown we dump it */
758                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
759                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
760                     (occu_pdp_st-proc_pdp_st <= 
761                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
762                     pdp_temp[i] = DNAN;
763                 } else {
764                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
765                         / (double)( occu_pdp_st
766                                    - proc_pdp_st
767                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
768                 }
769
770                 /* process CDEF data sources; remember each CDEF DS can
771                  * only reference other DS with a lower index number */
772             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
773                    rpnp_t *rpnp;
774                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
775                    /* substitue data values for OP_VARIABLE nodes */
776                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
777                    {
778                           if (rpnp[ii].op == OP_VARIABLE) {
779                                  rpnp[ii].op = OP_NUMBER;
780                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
781                           }
782                    }
783                    /* run the rpn calculator */
784                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
785                           free(rpnp);
786                           break; /* exits the data sources pdp_temp loop */
787                    }
788                 }
789         
790                 /* make pdp_prep ready for the next run */
791                 if(isnan(pdp_new[i])){
792                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
793                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
794                 } else {
795                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
796                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
797                         pdp_new[i]/(double)interval*(double)post_int;
798                 }
799
800 #ifdef DEBUG
801                 fprintf(stderr,
802                         "PDP UPD ds[%lu]\t"
803                         "pdp_temp %10.2f\t"
804                         "new_prep %10.2f\t"
805                         "new_unkn_sec %5lu\n",
806                         i, pdp_temp[i],
807                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
808                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
809 #endif
810             }
811
812                 /* if there were errors during the last loop, bail out here */
813             if (rrd_test_error()){
814                free(step_start);
815                break;
816             }
817
818                 /* compute the number of elapsed pdp_st moments */
819                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
820 #ifdef DEBUG
821                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
822 #endif
823                 if (rra_step_cnt == NULL)
824                 {
825                    rra_step_cnt = (unsigned long *) 
826                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
827                 }
828
829             for(i = 0, rra_start = rra_begin;
830                 i < rrd.stat_head->rra_cnt;
831             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
832                 i++)
833                 {
834                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
835                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
836                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
837         if (start_pdp_offset <= elapsed_pdp_st) {
838            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
839                       rrd.rra_def[i].pdp_cnt + 1;
840             } else {
841                    rra_step_cnt[i] = 0;
842                 }
843
844                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
845                 {
846                    /* If this is a bulk update, we need to skip ahead in the seasonal
847                         * arrays so that they will be correct for the next observed value;
848                         * note that for the bulk update itself, no update will occur to
849                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
850                         * be set to DNAN. */
851            if (rra_step_cnt[i] > 2) 
852                    {
853                           /* skip update by resetting rra_step_cnt[i],
854                            * note that this is not data source specific; this is due
855                            * to the bulk update, not a DNAN value for the specific data
856                            * source. */
857                           rra_step_cnt[i] = 0;
858               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
859                              &last_seasonal_coef);
860                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
861                              &seasonal_coef);
862                    }
863                 
864                   /* periodically run a smoother for seasonal effects */
865                   /* Need to use first cdp parameter buffer to track
866                    * burnin (burnin requires a specific smoothing schedule).
867                    * The CDP_init_seasonal parameter is really an RRA level,
868                    * not a data source within RRA level parameter, but the rra_def
869                    * is read only for rrd_update (not flushed to disk). */
870                   iii = i*(rrd.stat_head -> ds_cnt);
871                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
872                           <= BURNIN_CYCLES)
873                   {
874                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
875                                  > rrd.rra_def[i].row_cnt - 1) {
876                            /* mark off one of the burnin cycles */
877                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
878                        schedule_smooth = 1;
879                          }  
880                   } else {
881                          /* someone has no doubt invented a trick to deal with this
882                           * wrap around, but at least this code is clear. */
883                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
884                              rrd.rra_ptr[i].cur_row)
885                          {
886                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
887                                   * mapping between PDP and CDP */
888                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
889                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
890                                  {
891 #ifdef DEBUG
892                                         fprintf(stderr,
893                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
894                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
895                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
896 #endif
897                                         schedule_smooth = 1;
898                                  }
899              } else {
900                                  /* can't rely on negative numbers because we are working with
901                                   * unsigned values */
902                                  /* Don't need modulus here. If we've wrapped more than once, only
903                                   * one smooth is executed at the end. */
904                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
905                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
906                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
907                                  {
908 #ifdef DEBUG
909                                         fprintf(stderr,
910                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
911                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
912                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
913 #endif
914                                         schedule_smooth = 1;
915                                  }
916                          }
917                   }
918
919               rra_current = ftell(rrd_file); 
920                 } /* if cf is DEVSEASONAL or SEASONAL */
921
922         if (rrd_test_error()) break;
923
924                     /* update CDP_PREP areas */
925                     /* loop over data soures within each RRA */
926                     for(ii = 0;
927                         ii < rrd.stat_head->ds_cnt;
928                         ii++)
929                         {
930                         
931                         /* iii indexes the CDP prep area for this data source within the RRA */
932                         iii=i*rrd.stat_head->ds_cnt+ii;
933
934                         if (rrd.rra_def[i].pdp_cnt > 1) {
935                           
936                            if (rra_step_cnt[i] > 0) {
937                            /* If we are in this block, as least 1 CDP value will be written to
938                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
939                                 * to be written, then the "fill in" value is the CDP_secondary_val
940                                 * entry. */
941                                   if (isnan(pdp_temp[ii]))
942                   {
943                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
944                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
945                                   } else {
946                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
947                                           * CDP data entries. No matter the CF, the value is the same because
948                                           * the average, max, min, and last of a list of identical values is
949                                           * the same, namely, the value itself. */
950                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
951                                   }
952                      
953                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
954                                       > rrd.rra_def[i].pdp_cnt*
955                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
956                                   {
957                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
958                                          /* initialize carry over */
959                                          if (current_cf == CF_AVERAGE) {
960                                                    if (isnan(pdp_temp[ii])) { 
961                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
962                                                    } else {
963                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
964                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
965                                                    }
966                                          } else {
967                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
968                                          }
969                                   } else {
970                                          rrd_value_t cum_val, cur_val; 
971                                      switch (current_cf) {
972                                                 case CF_AVERAGE:
973                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
974                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
975                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
976                                                (cum_val + cur_val * start_pdp_offset) /
977                                            (rrd.rra_def[i].pdp_cnt
978                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
979                                                    /* initialize carry over value */
980                                                    if (isnan(pdp_temp[ii])) { 
981                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
982                                                    } else {
983                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
984                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
985                                                    }
986                                                    break;
987                                                 case CF_MAXIMUM:
988                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
989                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
990 #ifdef DEBUG
991                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
992                                                           isnan(pdp_temp[ii])) {
993                                                      fprintf(stderr,
994                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
995                                                                 i,ii);
996                                                          exit(-1);
997                                                   }
998 #endif
999                                                   if (cur_val > cum_val)
1000                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1001                                                   else
1002                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1003                                                   /* initialize carry over value */
1004                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1005                                                   break;
1006                                                 case CF_MINIMUM:
1007                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1008                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1009 #ifdef DEBUG
1010                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1011                                                           isnan(pdp_temp[ii])) {
1012                                                      fprintf(stderr,
1013                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1014                                                                 i,ii);
1015                                                          exit(-1);
1016                                                   }
1017 #endif
1018                                                   if (cur_val < cum_val)
1019                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1020                                                   else
1021                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1022                                                   /* initialize carry over value */
1023                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1024                                                   break;
1025                                                 case CF_LAST:
1026                                                 default:
1027                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1028                                                    /* initialize carry over value */
1029                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1030                                                 break;
1031                                          }
1032                                   } /* endif meets xff value requirement for a valid value */
1033                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1034                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1035                                   if (isnan(pdp_temp[ii]))
1036                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1037                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1038                                   else
1039                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1040                } else  /* rra_step_cnt[i]  == 0 */
1041                            {
1042 #ifdef DEBUG
1043                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1044                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1045                                          i,ii);
1046                                   } else {
1047                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1048                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1049                                   }
1050 #endif
1051                                   if (isnan(pdp_temp[ii])) {
1052                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1053                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1054                                   {
1055                                          if (current_cf == CF_AVERAGE) {
1056                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1057                                                    elapsed_pdp_st;
1058                                          } else {
1059                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1060                                          }
1061 #ifdef DEBUG
1062                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1063                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1064 #endif
1065                                   } else {
1066                                          switch (current_cf) {
1067                                          case CF_AVERAGE:
1068                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1069                                                    elapsed_pdp_st;
1070                                                 break;
1071                                          case CF_MINIMUM:
1072                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1073                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1074                                                 break; 
1075                                          case CF_MAXIMUM:
1076                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1077                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078                                                 break; 
1079                                          case CF_LAST:
1080                                          default:
1081                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082                                                 break;
1083                                          }
1084                                   }
1085                            }
1086                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1087                            if (elapsed_pdp_st > 2)
1088                            {
1089                                    switch (current_cf) {
1090                                    case CF_AVERAGE:
1091                                    default:
1092                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1093                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1094                                           break;
1095                    case CF_SEASONAL:
1096                                    case CF_DEVSEASONAL:
1097                                           /* need to update cached seasonal values, so they are consistent
1098                                            * with the bulk update */
1099                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1100                                            * CDP_last_deviation are the same. */
1101                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1102                                                  last_seasonal_coef[ii];
1103                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1104                                                  seasonal_coef[ii];
1105                                           break;
1106                    case CF_HWPREDICT:
1107                                           /* need to update the null_count and last_null_count.
1108                                            * even do this for non-DNAN pdp_temp because the
1109                                            * algorithm is not learning from batch updates. */
1110                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1111                                                  elapsed_pdp_st;
1112                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1113                                                  elapsed_pdp_st - 1;
1114                                           /* fall through */
1115                                    case CF_DEVPREDICT:
1116                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1117                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1118                                           break;
1119                    case CF_FAILURES:
1120                                           /* do not count missed bulk values as failures */
1121                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1122                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1123                                           /* need to reset violations buffer.
1124                                            * could do this more carefully, but for now, just
1125                                            * assume a bulk update wipes away all violations. */
1126                       erase_violations(&rrd, iii, i);
1127                                           break;
1128                                    }
1129                            } 
1130                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1131
1132                         if (rrd_test_error()) break;
1133
1134                         } /* endif data sources loop */
1135         } /* end RRA Loop */
1136
1137                 /* this loop is only entered if elapsed_pdp_st < 3 */
1138                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1139                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1140                 {
1141                for(i = 0, rra_start = rra_begin;
1142                    i < rrd.stat_head->rra_cnt;
1143                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1144                    i++)
1145                    {
1146                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1147
1148                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1149                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1150                           {
1151                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1152                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1153                                 &seasonal_coef);
1154                  rra_current = ftell(rrd_file);
1155                           }
1156                           if (rrd_test_error()) break;
1157                       /* loop over data soures within each RRA */
1158                       for(ii = 0;
1159                           ii < rrd.stat_head->ds_cnt;
1160                           ii++)
1161                           {
1162                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1163                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1164                                     scratch_idx, seasonal_coef);
1165                           }
1166            } /* end RRA Loop */
1167                    if (rrd_test_error()) break;
1168             } /* end elapsed_pdp_st loop */
1169
1170                 if (rrd_test_error()) break;
1171
1172                 /* Ready to write to disk */
1173                 /* Move sequentially through the file, writing one RRA at a time.
1174                  * Note this architecture divorces the computation of CDP with
1175                  * flushing updated RRA entries to disk. */
1176             for(i = 0, rra_start = rra_begin;
1177                 i < rrd.stat_head->rra_cnt;
1178             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1179                 i++) {
1180                 /* is there anything to write for this RRA? If not, continue. */
1181         if (rra_step_cnt[i] == 0) continue;
1182
1183                 /* write the first row */
1184 #ifdef DEBUG
1185         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1186 #endif
1187             rrd.rra_ptr[i].cur_row++;
1188             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1189                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1190                 /* positition on the first row */
1191                 rra_pos_tmp = rra_start +
1192                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1193                 if(rra_pos_tmp != rra_current) {
1194 #ifndef HAVE_MMAP
1195                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1196                       rrd_set_error("seek error in rrd");
1197                       break;
1198                    }
1199 #endif
1200                    rra_current = rra_pos_tmp;
1201                 }
1202
1203 #ifdef DEBUG
1204             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1205 #endif
1206                 scratch_idx = CDP_primary_val;
1207                 if (pcdp_summary != NULL)
1208                 {
1209                    rra_time = (current_time - current_time 
1210                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1211                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1212                 }
1213 #ifdef HAVE_MMAP
1214                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1215                    pcdp_summary, &rra_time, rrd_mmaped_file);
1216 #else
1217                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1218                    pcdp_summary, &rra_time);
1219 #endif
1220                 if (rrd_test_error()) break;
1221
1222                 /* write other rows of the bulk update, if any */
1223                 scratch_idx = CDP_secondary_val;
1224                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1225                 {
1226                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1227                    {
1228 #ifdef DEBUG
1229               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1230                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1231 #endif
1232                           /* wrap */
1233                           rrd.rra_ptr[i].cur_row = 0;
1234                           /* seek back to beginning of current rra */
1235                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1236                           {
1237                          rrd_set_error("seek error in rrd");
1238                          break;
1239                           }
1240 #ifdef DEBUG
1241                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1242 #endif
1243                           rra_current = rra_start;
1244                    }
1245                    if (pcdp_summary != NULL)
1246                    {
1247                       rra_time = (current_time - current_time 
1248                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1249                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1250                    }
1251 #ifdef HAVE_MMAP
1252                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1253                       pcdp_summary, &rra_time, rrd_mmaped_file);
1254 #else
1255                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1256                       pcdp_summary, &rra_time);
1257 #endif
1258                 }
1259                 
1260                 if (rrd_test_error())
1261                   break;
1262                 } /* RRA LOOP */
1263
1264             /* break out of the argument parsing loop if error_string is set */
1265             if (rrd_test_error()){
1266                    free(step_start);
1267                    break;
1268             } 
1269             
1270         } /* endif a pdp_st has occurred */ 
1271         rrd.live_head->last_up = current_time;
1272         rrd.live_head->last_up_usec = current_time_usec; 
1273         free(step_start);
1274     } /* function argument loop */
1275
1276     if (seasonal_coef != NULL) free(seasonal_coef);
1277     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1278         if (rra_step_cnt != NULL) free(rra_step_cnt);
1279     rpnstack_free(&rpnstack);
1280
1281 #ifdef HAVE_MMAP
1282     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1283             rrd_set_error("error writing(unmapping) file: %s", filename);
1284     }
1285 #endif    
1286     /* if we got here and if there is an error and if the file has not been
1287      * written to, then close things up and return. */
1288     if (rrd_test_error()) {
1289         free(updvals);
1290         free(tmpl_idx);
1291         rrd_free(&rrd);
1292         free(pdp_temp);
1293         free(pdp_new);
1294         fclose(rrd_file);
1295         return(-1);
1296     }
1297
1298     /* aargh ... that was tough ... so many loops ... anyway, its done.
1299      * we just need to write back the live header portion now*/
1300
1301     if (fseek(rrd_file, (sizeof(stat_head_t)
1302                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1303                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1304               SEEK_SET) != 0) {
1305         rrd_set_error("seek rrd for live header writeback");
1306         free(updvals);
1307         free(tmpl_idx);
1308         rrd_free(&rrd);
1309         free(pdp_temp);
1310         free(pdp_new);
1311         fclose(rrd_file);
1312         return(-1);
1313     }
1314
1315     if(version >= 3) {
1316             if(fwrite( rrd.live_head,
1317                        sizeof(live_head_t), 1, rrd_file) != 1){
1318                 rrd_set_error("fwrite live_head to rrd");
1319                 free(updvals);
1320                 rrd_free(&rrd);
1321                 free(tmpl_idx);
1322                 free(pdp_temp);
1323                 free(pdp_new);
1324                 fclose(rrd_file);
1325                 return(-1);
1326             }
1327     }
1328     else {
1329             if(fwrite( &rrd.live_head->last_up,
1330                        sizeof(time_t), 1, rrd_file) != 1){
1331                 rrd_set_error("fwrite live_head to rrd");
1332                 free(updvals);
1333                 rrd_free(&rrd);
1334                 free(tmpl_idx);
1335                 free(pdp_temp);
1336                 free(pdp_new);
1337                 fclose(rrd_file);
1338                 return(-1);
1339             }
1340     }
1341             
1342
1343     if(fwrite( rrd.pdp_prep,
1344                sizeof(pdp_prep_t),
1345                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1346         rrd_set_error("ftwrite pdp_prep to rrd");
1347         free(updvals);
1348         rrd_free(&rrd);
1349         free(tmpl_idx);
1350         free(pdp_temp);
1351         free(pdp_new);
1352         fclose(rrd_file);
1353         return(-1);
1354     }
1355
1356     if(fwrite( rrd.cdp_prep,
1357                sizeof(cdp_prep_t),
1358                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1359        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1360
1361         rrd_set_error("ftwrite cdp_prep to rrd");
1362         free(updvals);
1363         free(tmpl_idx);
1364         rrd_free(&rrd);
1365         free(pdp_temp);
1366         free(pdp_new);
1367         fclose(rrd_file);
1368         return(-1);
1369     }
1370
1371     if(fwrite( rrd.rra_ptr,
1372                sizeof(rra_ptr_t), 
1373                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1374         rrd_set_error("fwrite rra_ptr to rrd");
1375         free(updvals);
1376         free(tmpl_idx);
1377         rrd_free(&rrd);
1378         free(pdp_temp);
1379         free(pdp_new);
1380         fclose(rrd_file);
1381         return(-1);
1382     }
1383
1384     /* OK now close the files and free the memory */
1385     if(fclose(rrd_file) != 0){
1386         rrd_set_error("closing rrd");
1387         free(updvals);
1388         free(tmpl_idx);
1389         rrd_free(&rrd);
1390         free(pdp_temp);
1391         free(pdp_new);
1392         return(-1);
1393     }
1394
1395     /* calling the smoothing code here guarantees at most
1396          * one smoothing operation per rrd_update call. Unfortunately,
1397          * it is possible with bulk updates, or a long-delayed update
1398          * for smoothing to occur off-schedule. This really isn't
1399          * critical except during the burning cycles. */
1400         if (schedule_smooth)
1401         {
1402           rrd_file = fopen(filename,"rb+");
1403           rra_start = rra_begin;
1404           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1405           {
1406             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1407                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1408             {
1409 #ifdef DEBUG
1410               fprintf(stderr,"Running smoother for rra %ld\n",i);
1411 #endif
1412               apply_smoother(&rrd,i,rra_start,rrd_file);
1413               if (rrd_test_error())
1414                 break;
1415             }
1416             rra_start += rrd.rra_def[i].row_cnt
1417               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1418           }
1419           fclose(rrd_file);
1420         }
1421     rrd_free(&rrd);
1422     free(updvals);
1423     free(tmpl_idx);
1424     free(pdp_new);
1425     free(pdp_temp);
1426     return(0);
1427 }
1428
1429 /*
1430  * get exclusive lock to whole file.
1431  * lock gets removed when we close the file
1432  *
1433  * returns 0 on success
1434  */
1435 int
1436 LockRRD(FILE *rrdfile)
1437 {
1438     int rrd_fd;         /* File descriptor for RRD */
1439     int rcstat;
1440
1441     rrd_fd = fileno(rrdfile);
1442
1443         {
1444 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1445     struct _stat st;
1446
1447     if ( _fstat( rrd_fd, &st ) == 0 ) {
1448             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1449     } else {
1450             rcstat = -1;
1451     }
1452 #else
1453     struct flock        lock;
1454     lock.l_type = F_WRLCK;    /* exclusive write lock */
1455     lock.l_len = 0;           /* whole file */
1456     lock.l_start = 0;         /* start of file */
1457     lock.l_whence = SEEK_SET;   /* end of file */
1458
1459     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1460 #endif
1461         }
1462
1463     return(rcstat);
1464 }
1465
1466
1467 #ifdef HAVE_MMAP
1468 info_t
1469 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1470                unsigned short CDP_scratch_idx, 
1471 #ifndef DEBUG
1472 FILE UNUSED(*rrd_file),
1473 #else
1474 FILE *rrd_file,
1475 #endif
1476                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1477 #else
1478 info_t
1479 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1480                unsigned short CDP_scratch_idx, FILE *rrd_file,
1481                    info_t *pcdp_summary, time_t *rra_time)
1482 #endif
1483 {
1484    unsigned long ds_idx, cdp_idx;
1485    infoval iv;
1486   
1487    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1488    {
1489       /* compute the cdp index */
1490       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1491 #ifdef DEBUG
1492           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1493              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1494              rrd -> rra_def[rra_idx].cf_nam);
1495 #endif 
1496       if (pcdp_summary != NULL)
1497           {
1498              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1499              /* append info to the return hash */
1500                  pcdp_summary = info_push(pcdp_summary,
1501                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1502                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1503                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1504          RD_I_VAL, iv);
1505           }
1506 #ifdef HAVE_MMAP
1507           memcpy((char *)rrd_mmaped_file + *rra_current,
1508                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1509                           sizeof(rrd_value_t));
1510 #else
1511           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1512                  sizeof(rrd_value_t),1,rrd_file) != 1)
1513           { 
1514              rrd_set_error("writing rrd");
1515              return 0;
1516           }
1517 #endif
1518           *rra_current += sizeof(rrd_value_t);
1519         }
1520         return (pcdp_summary);
1521 }