increasse revision of shared library number since the library source code has changed
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.0  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
15
16 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
21
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
24
25 #include "rrd_is_thread_safe.h"
26
27 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
28 /*
29  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
30  * replacement.
31  */
32 #include <sys/timeb.h>
33
34 struct timeval {
35         time_t tv_sec; /* seconds */
36         long tv_usec;  /* microseconds */
37 };
38
39 struct __timezone {
40         int  tz_minuteswest; /* minutes W of Greenwich */
41         int  tz_dsttime;     /* type of dst correction */
42 };
43
44 static gettimeofday(struct timeval *t, struct __timezone *tz) {
45         
46         struct timeb current_time;
47
48         _ftime(&current_time);
49         
50         t->tv_sec  = current_time.time;
51         t->tv_usec = current_time.millitm * 1000;
52 }
53
54 #endif
55 /*
56  * normilize time as returned by gettimeofday. usec part must
57  * be always >= 0
58  */
59 static void normalize_time(struct timeval *t)
60 {
61         if(t->tv_usec < 0) {
62                 t->tv_sec--;
63                 t->tv_usec += 1000000L;
64         }
65 }
66
67 /* Local prototypes */
68 int LockRRD(FILE *rrd_file);
69 #ifdef HAVE_MMAP
70 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
71                                         unsigned long *rra_current,
72                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
73                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
74 #else
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
78                                         info_t *pcdp_summary, time_t *rra_time);
79 #endif
80 int rrd_update_r(char *filename, char *template, int argc, char **argv);
81 int _rrd_update(char *filename, char *template, int argc, char **argv, 
82                                         info_t*);
83
84 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
85
86
87 #ifdef STANDALONE
88 int 
89 main(int argc, char **argv){
90         rrd_update(argc,argv);
91         if (rrd_test_error()) {
92                 printf("RRDtool 1.2.0  Copyright by Tobi Oetiker, 1997-2005\n\n"
93                         "Usage: rrdupdate filename\n"
94                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
95                         "\t\t\ttime|N:value[:value...]\n\n"
96                         "\t\t\tat-time@value[:value...]\n\n"
97                         "\t\t\t[ time:value[:value...] ..]\n\n");
98                                    
99                 printf("ERROR: %s\n",rrd_get_error());
100                 rrd_clear_error();                                                            
101                 return 1;
102         }
103         return 0;
104 }
105 #endif
106
107 info_t *rrd_update_v(int argc, char **argv)
108 {
109     char             *template = NULL;          
110         info_t *result = NULL;
111         infoval rc;
112     optind = 0; opterr = 0;  /* initialize getopt */
113
114     while (1) {
115                 static struct option long_options[] =
116                         {
117                                 {"template",      required_argument, 0, 't'},
118                                 {0,0,0,0}
119                         };
120                 int option_index = 0;
121                 int opt;
122                 opt = getopt_long(argc, argv, "t:", 
123                                                   long_options, &option_index);
124                 
125                 if (opt == EOF)
126                         break;
127                 
128                 switch(opt) {
129                 case 't':
130                         template = optarg;
131                         break;
132                 
133                 case '?':
134                         rrd_set_error("unknown option '%s'",argv[optind-1]);
135             rc.u_int = -1;
136                         goto end_tag;
137                 }
138     }
139
140     /* need at least 2 arguments: filename, data. */
141     if (argc-optind < 2) {
142                 rrd_set_error("Not enough arguments");
143         rc.u_int = -1;
144                 goto end_tag;
145     }
146     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
147         rc.u_int = _rrd_update(argv[optind], template,
148                       argc - optind - 1, argv + optind + 1, result);
149     result->value.u_int = rc.u_int;
150 end_tag:
151     return result;
152 }
153
154 int
155 rrd_update(int argc, char **argv)
156 {
157     char             *template = NULL;          
158     int rc;
159     optind = 0; opterr = 0;  /* initialize getopt */
160
161     while (1) {
162                 static struct option long_options[] =
163                         {
164                                 {"template",      required_argument, 0, 't'},
165                                 {0,0,0,0}
166                         };
167                 int option_index = 0;
168                 int opt;
169                 opt = getopt_long(argc, argv, "t:", 
170                                                   long_options, &option_index);
171                 
172                 if (opt == EOF)
173                         break;
174                 
175                 switch(opt) {
176                 case 't':
177                         template = optarg;
178                         break;
179                 
180                 case '?':
181                         rrd_set_error("unknown option '%s'",argv[optind-1]);
182                         return(-1);
183                 }
184     }
185
186     /* need at least 2 arguments: filename, data. */
187     if (argc-optind < 2) {
188                 rrd_set_error("Not enough arguments");
189
190                 return -1;
191     }
192  
193         rc = rrd_update_r(argv[optind], template,
194                       argc - optind - 1, argv + optind + 1);
195     return rc;
196 }
197
198 int
199 rrd_update_r(char *filename, char *template, int argc, char **argv)
200 {
201    return _rrd_update(filename, template, argc, argv, NULL);
202 }
203
204 int
205 _rrd_update(char *filename, char *template, int argc, char **argv, 
206    info_t *pcdp_summary)
207 {
208
209     int              arg_i = 2;
210     short            j;
211     unsigned long    i,ii,iii=1;
212
213     unsigned long    rra_begin;          /* byte pointer to the rra
214                                           * area in the rrd file.  this
215                                           * pointer never changes value */
216     unsigned long    rra_start;          /* byte pointer to the rra
217                                           * area in the rrd file.  this
218                                           * pointer changes as each rrd is
219                                           * processed. */
220     unsigned long    rra_current;        /* byte pointer to the current write
221                                           * spot in the rrd file. */
222     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
223     double           interval,
224         pre_int,post_int;                /* interval between this and
225                                           * the last run */
226     unsigned long    proc_pdp_st;        /* which pdp_st was the last
227                                           * to be processed */
228     unsigned long    occu_pdp_st;        /* when was the pdp_st
229                                           * before the last update
230                                           * time */
231     unsigned long    proc_pdp_age;       /* how old was the data in
232                                           * the pdp prep area when it
233                                           * was last updated */
234     unsigned long    occu_pdp_age;       /* how long ago was the last
235                                           * pdp_step time */
236     rrd_value_t      *pdp_new;           /* prepare the incoming data
237                                           * to be added the the
238                                           * existing entry */
239     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
240                                           * to be added the the
241                                           * cdp values */
242
243     long             *tmpl_idx;          /* index representing the settings
244                                             transported by the template index */
245     unsigned long    tmpl_cnt = 2;       /* time and data */
246
247     FILE             *rrd_file;
248     rrd_t            rrd;
249     time_t           current_time;
250         time_t           rra_time; /* time of update for a RRA */
251     unsigned long    current_time_usec;  /* microseconds part of current time */
252     struct timeval   tmp_time;           /* used for time conversion */
253
254     char             **updvals;
255     int              schedule_smooth = 0;
256         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
257                                          /* a vector of future Holt-Winters seasonal coefs */
258     unsigned long    elapsed_pdp_st;
259                                          /* number of elapsed PDP steps since last update */
260     unsigned long    *rra_step_cnt = NULL;
261                                          /* number of rows to be updated in an RRA for a data
262                                           * value. */
263     unsigned long    start_pdp_offset;
264                                          /* number of PDP steps since the last update that
265                                           * are assigned to the first CDP to be generated
266                                           * since the last update. */
267     unsigned short   scratch_idx;
268                                          /* index into the CDP scratch array */
269     enum cf_en       current_cf;
270                                          /* numeric id of the current consolidation function */
271     rpnstack_t       rpnstack; /* used for COMPUTE DS */
272     int              version;  /* rrd version */
273     char             *endptr; /* used in the conversion */
274 #ifdef HAVE_MMAP
275     void             *rrd_mmaped_file;
276     unsigned long    rrd_filesize;
277 #endif
278
279     rpnstack_init(&rpnstack);
280
281     /* need at least 1 arguments: data. */
282     if (argc < 1) {
283         rrd_set_error("Not enough arguments");
284         return -1;
285     }
286     
287     
288
289     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
290         return -1;
291     }
292     /* initialize time */
293     version = atoi(rrd.stat_head->version);
294     gettimeofday(&tmp_time, 0);
295     normalize_time(&tmp_time);
296     current_time = tmp_time.tv_sec;
297     if(version >= 3) {
298         current_time_usec = tmp_time.tv_usec;
299     }
300     else {
301         current_time_usec = 0;
302     }
303
304     rra_current = rra_start = rra_begin = ftell(rrd_file);
305     /* This is defined in the ANSI C standard, section 7.9.5.3:
306
307         When a file is opened with udpate mode ('+' as the second
308         or third character in the ... list of mode argument
309         variables), both input and ouptut may be performed on the
310         associated stream.  However, ...  input may not be directly
311         followed by output without an intervening call to a file
312         positioning function, unless the input oepration encounters
313         end-of-file. */
314 #ifdef HAVE_MMAP
315     fseek(rrd_file, 0, SEEK_END);
316     rrd_filesize = ftell(rrd_file);
317     fseek(rrd_file, rra_current, SEEK_SET);
318 #else
319     fseek(rrd_file, 0, SEEK_CUR);
320 #endif
321
322     
323     /* get exclusive lock to whole file.
324      * lock gets removed when we close the file.
325      */
326     if (LockRRD(rrd_file) != 0) {
327       rrd_set_error("could not lock RRD");
328       rrd_free(&rrd);
329       fclose(rrd_file);
330       return(-1);   
331     } 
332
333     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
334         rrd_set_error("allocating updvals pointer array");
335         rrd_free(&rrd);
336         fclose(rrd_file);
337         return(-1);
338     }
339
340     if ((pdp_temp = malloc(sizeof(rrd_value_t)
341                            *rrd.stat_head->ds_cnt))==NULL){
342         rrd_set_error("allocating pdp_temp ...");
343         free(updvals);
344         rrd_free(&rrd);
345         fclose(rrd_file);
346         return(-1);
347     }
348
349     if ((tmpl_idx = malloc(sizeof(unsigned long)
350                            *(rrd.stat_head->ds_cnt+1)))==NULL){
351         rrd_set_error("allocating tmpl_idx ...");
352         free(pdp_temp);
353         free(updvals);
354         rrd_free(&rrd);
355         fclose(rrd_file);
356         return(-1);
357     }
358     /* initialize template redirector */
359     /* default config example (assume DS 1 is a CDEF DS)
360        tmpl_idx[0] -> 0; (time)
361        tmpl_idx[1] -> 1; (DS 0)
362        tmpl_idx[2] -> 3; (DS 2)
363        tmpl_idx[3] -> 4; (DS 3) */
364     tmpl_idx[0] = 0; /* time */
365     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
366         {
367            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
368               tmpl_idx[ii++]=i;
369         }
370     tmpl_cnt= ii;
371
372     if (template) {
373         char *dsname;
374         unsigned int tmpl_len;
375         dsname = template;
376         tmpl_cnt = 1; /* the first entry is the time */
377         tmpl_len = strlen(template);
378         for(i=0;i<=tmpl_len ;i++) {
379             if (template[i] == ':' || template[i] == '\0') {
380                 template[i] = '\0';
381                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
382                     rrd_set_error("Template contains more DS definitions than RRD");
383                     free(updvals); free(pdp_temp);
384                     free(tmpl_idx); rrd_free(&rrd);
385                     fclose(rrd_file); return(-1);
386                 }
387                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
388                     rrd_set_error("unknown DS name '%s'",dsname);
389                     free(updvals); free(pdp_temp);
390                     free(tmpl_idx); rrd_free(&rrd);
391                     fclose(rrd_file); return(-1);
392                 } else {
393                   /* the first element is always the time */
394                   tmpl_idx[tmpl_cnt-1]++; 
395                   /* go to the next entry on the template */
396                   dsname = &template[i+1];
397                   /* fix the damage we did before */
398                   if (i<tmpl_len) {
399                      template[i]=':';
400                   } 
401
402                 }
403             }       
404         }
405     }
406     if ((pdp_new = malloc(sizeof(rrd_value_t)
407                           *rrd.stat_head->ds_cnt))==NULL){
408         rrd_set_error("allocating pdp_new ...");
409         free(updvals);
410         free(pdp_temp);
411         free(tmpl_idx);
412         rrd_free(&rrd);
413         fclose(rrd_file);
414         return(-1);
415     }
416
417 #ifdef HAVE_MMAP
418     rrd_mmaped_file = mmap(0, 
419                         rrd_filesize, 
420                         PROT_READ | PROT_WRITE, 
421                         MAP_SHARED, 
422                         fileno(rrd_file), 
423                         0);
424     if (rrd_mmaped_file == MAP_FAILED) {
425         rrd_set_error("error mmapping file %s", filename);
426         free(updvals);
427         free(pdp_temp);
428         free(tmpl_idx);
429         rrd_free(&rrd);
430         fclose(rrd_file);
431         return(-1);
432     }
433 #endif
434     /* loop through the arguments. */
435     for(arg_i=0; arg_i<argc;arg_i++) {
436         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
437         char *step_start = stepper;
438         char *p;
439         char *parsetime_error = NULL;
440         enum {atstyle, normal} timesyntax;
441         struct rrd_time_value ds_tv;
442         if (stepper == NULL){
443                 rrd_set_error("failed duplication argv entry");
444                 free(updvals);
445                 free(pdp_temp);  
446                 free(tmpl_idx);
447                 rrd_free(&rrd);
448 #ifdef HAVE_MMAP
449                 munmap(rrd_mmaped_file, rrd_filesize);
450 #endif
451                 fclose(rrd_file);
452                 return(-1);
453          }
454         /* initialize all ds input to unknown except the first one
455            which has always got to be set */
456         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
457         strcpy(stepper,argv[arg_i]);
458         updvals[0]=stepper;
459         /* separate all ds elements; first must be examined separately
460            due to alternate time syntax */
461         if ((p=strchr(stepper,'@'))!=NULL) {
462             timesyntax = atstyle;
463             *p = '\0';
464             stepper = p+1;
465         } else if ((p=strchr(stepper,':'))!=NULL) {
466             timesyntax = normal;
467             *p = '\0';
468             stepper = p+1;
469         } else {
470             rrd_set_error("expected timestamp not found in data source from %s:...",
471                           argv[arg_i]);
472             free(step_start);
473             break;
474         }
475         ii=1;
476         updvals[tmpl_idx[ii]] = stepper;
477         while (*stepper) {
478             if (*stepper == ':') {
479                 *stepper = '\0';
480                 ii++;
481                 if (ii<tmpl_cnt){                   
482                     updvals[tmpl_idx[ii]] = stepper+1;
483                 }
484             }
485             stepper++;
486         }
487
488         if (ii != tmpl_cnt-1) {
489             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
490                           tmpl_cnt-1, ii, argv[arg_i]);
491             free(step_start);
492             break;
493         }
494         
495         /* get the time from the reading ... handle N */
496         if (timesyntax == atstyle) {
497             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
498                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
499                 free(step_start);
500                 break;
501             }
502             if (ds_tv.type == RELATIVE_TO_END_TIME ||
503                 ds_tv.type == RELATIVE_TO_START_TIME) {
504                 rrd_set_error("specifying time relative to the 'start' "
505                               "or 'end' makes no sense here: %s",
506                               updvals[0]);
507                 free(step_start);
508                 break;
509             }
510
511             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
512             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
513             
514         } else if (strcmp(updvals[0],"N")==0){
515             gettimeofday(&tmp_time, 0);
516             normalize_time(&tmp_time);
517             current_time = tmp_time.tv_sec;
518             current_time_usec = tmp_time.tv_usec;
519         } else {
520             double tmp;
521             tmp = strtod(updvals[0], 0);
522             current_time = floor(tmp);
523             current_time_usec = (long)((tmp - current_time) * 1000000L);
524         }
525         /* dont do any correction for old version RRDs */
526         if(version < 3) 
527             current_time_usec = 0;
528         
529         if(current_time <= rrd.live_head->last_up){
530             rrd_set_error("illegal attempt to update using time %ld when "
531                           "last update time is %ld (minimum one second step)",
532                           current_time, rrd.live_head->last_up);
533             free(step_start);
534             break;
535         }
536         
537         
538         /* seek to the beginning of the rra's */
539         if (rra_current != rra_begin) {
540 #ifndef HAVE_MMAP
541             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
542                 rrd_set_error("seek error in rrd");
543                 free(step_start);
544                 break;
545             }
546 #endif
547             rra_current = rra_begin;
548         }
549         rra_start = rra_begin;
550
551         /* when was the current pdp started */
552         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
553         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
554
555         /* when did the last pdp_st occur */
556         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
557         occu_pdp_st = current_time - occu_pdp_age;
558         /* interval = current_time - rrd.live_head->last_up; */
559         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
560     
561         if (occu_pdp_st > proc_pdp_st){
562             /* OK we passed the pdp_st moment*/
563             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
564                                                               * occurred before the latest
565                                                               * pdp_st moment*/
566             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
567             post_int = occu_pdp_age;                         /* how much after it */
568             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
569         } else {
570             pre_int = interval;
571             post_int = 0;
572         }
573
574 #ifdef DEBUG
575         printf(
576                "proc_pdp_age %lu\t"
577                "proc_pdp_st %lu\t" 
578                "occu_pfp_age %lu\t" 
579                "occu_pdp_st %lu\t"
580                "int %lf\t"
581                "pre_int %lf\t"
582                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
583                 occu_pdp_age, occu_pdp_st,
584                interval, pre_int, post_int);
585 #endif
586     
587         /* process the data sources and update the pdp_prep 
588          * area accordingly */
589         for(i=0;i<rrd.stat_head->ds_cnt;i++){
590             enum dst_en dst_idx;
591             dst_idx= dst_conv(rrd.ds_def[i].dst);
592                 /* NOTE: DST_CDEF should never enter this if block, because
593                  * updvals[i+1][0] is initialized to 'U'; unless the caller
594                  * accidently specified a value for the DST_CDEF. To handle 
595                  * this case, an extra check is required. */
596             if((updvals[i+1][0] != 'U') &&
597                    (dst_idx != DST_CDEF) &&
598                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
599                double rate = DNAN;
600                /* the data source type defines how to process the data */
601                 /* pdp_new contains rate * time ... eg the bytes
602                  * transferred during the interval. Doing it this way saves
603                  * a lot of math operations */
604                 
605
606                 switch(dst_idx){
607                 case DST_COUNTER:
608                 case DST_DERIVE:
609                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
610                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
611                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
612                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
613                                  break;
614                             }
615                        }
616                        if (rrd_test_error()){
617                             break;
618                        }
619                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
620                        if(dst_idx == DST_COUNTER) {
621                           /* simple overflow catcher suggested by Andres Kroonmaa */
622                           /* this will fail terribly for non 32 or 64 bit counters ... */
623                           /* are there any others in SNMP land ? */
624                           if (pdp_new[i] < (double)0.0 ) 
625                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
626                           if (pdp_new[i] < (double)0.0 ) 
627                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
628                        }
629                        rate = pdp_new[i] / interval;
630                     }
631                    else {
632                      pdp_new[i]= DNAN;          
633                    }
634                    break;
635                 case DST_ABSOLUTE:
636                     errno = 0;
637                     pdp_new[i] = strtod(updvals[i+1],&endptr);
638                     if (errno > 0){
639                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
640                         break;
641                     };
642                     if (endptr[0] != '\0'){
643                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
644                         break;
645                     }
646                     rate = pdp_new[i] / interval;                 
647                     break;
648                 case DST_GAUGE:
649                     errno = 0;
650                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
651                     if (errno > 0){
652                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
653                         break;
654                     };
655                     if (endptr[0] != '\0'){
656                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
657                         break;
658                     }
659                     rate = pdp_new[i] / interval;                  
660                     break;
661                 default:
662                     rrd_set_error("rrd contains unknown DS type : '%s'",
663                                   rrd.ds_def[i].dst);
664                     break;
665                 }
666                 /* break out of this for loop if the error string is set */
667                 if (rrd_test_error()){
668                     break;
669                 }
670                /* make sure pdp_temp is neither too large or too small
671                 * if any of these occur it becomes unknown ...
672                 * sorry folks ... */
673                if ( ! isnan(rate) && 
674                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
675                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
676                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
677                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
678                   pdp_new[i] = DNAN;
679                }               
680             } else {
681                 /* no news is news all the same */
682                 pdp_new[i] = DNAN;
683             }
684             
685             /* make a copy of the command line argument for the next run */
686 #ifdef DEBUG
687             fprintf(stderr,
688                     "prep ds[%lu]\t"
689                     "last_arg '%s'\t"
690                     "this_arg '%s'\t"
691                     "pdp_new %10.2f\n",
692                     i,
693                     rrd.pdp_prep[i].last_ds,
694                     updvals[i+1], pdp_new[i]);
695 #endif
696             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
697                 strncpy(rrd.pdp_prep[i].last_ds,
698                         updvals[i+1],LAST_DS_LEN-1);
699                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
700             }
701         }
702         /* break out of the argument parsing loop if the error_string is set */
703         if (rrd_test_error()){
704             free(step_start);
705             break;
706         }
707         /* has a pdp_st moment occurred since the last run ? */
708
709         if (proc_pdp_st == occu_pdp_st){
710             /* no we have not passed a pdp_st moment. therefore update is simple */
711
712             for(i=0;i<rrd.stat_head->ds_cnt;i++){
713                 if(isnan(pdp_new[i]))
714                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
715                 else
716                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
717 #ifdef DEBUG
718                 fprintf(stderr,
719                         "NO PDP  ds[%lu]\t"
720                         "value %10.2f\t"
721                         "unkn_sec %5lu\n",
722                         i,
723                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
724                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
725 #endif
726             }   
727         } else {
728             /* an pdp_st has occurred. */
729
730             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
731              * occurred up to the last run.        
732             pdp_new[] contains rate*seconds from the latest run.
733             pdp_temp[] will contain the rate for cdp */
734
735             for(i=0;i<rrd.stat_head->ds_cnt;i++){
736                 /* update pdp_prep to the current pdp_st */
737                 if(isnan(pdp_new[i]))
738                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
739                 else
740                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
741                         pdp_new[i]/(double)interval*(double)pre_int;
742
743                 /* if too much of the pdp_prep is unknown we dump it */
744                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
745                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
746                     (occu_pdp_st-proc_pdp_st <= 
747                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
748                     pdp_temp[i] = DNAN;
749                 } else {
750                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
751                         / (double)( occu_pdp_st
752                                    - proc_pdp_st
753                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
754                 }
755
756                 /* process CDEF data sources; remember each CDEF DS can
757                  * only reference other DS with a lower index number */
758             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
759                    rpnp_t *rpnp;
760                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
761                    /* substitue data values for OP_VARIABLE nodes */
762                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
763                    {
764                           if (rpnp[ii].op == OP_VARIABLE) {
765                                  rpnp[ii].op = OP_NUMBER;
766                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
767                           }
768                    }
769                    /* run the rpn calculator */
770                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
771                           free(rpnp);
772                           break; /* exits the data sources pdp_temp loop */
773                    }
774                 }
775         
776                 /* make pdp_prep ready for the next run */
777                 if(isnan(pdp_new[i])){
778                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
779                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
780                 } else {
781                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
782                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
783                         pdp_new[i]/(double)interval*(double)post_int;
784                 }
785
786 #ifdef DEBUG
787                 fprintf(stderr,
788                         "PDP UPD ds[%lu]\t"
789                         "pdp_temp %10.2f\t"
790                         "new_prep %10.2f\t"
791                         "new_unkn_sec %5lu\n",
792                         i, pdp_temp[i],
793                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
794                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
795 #endif
796             }
797
798                 /* if there were errors during the last loop, bail out here */
799             if (rrd_test_error()){
800                free(step_start);
801                break;
802             }
803
804                 /* compute the number of elapsed pdp_st moments */
805                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
806 #ifdef DEBUG
807                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
808 #endif
809                 if (rra_step_cnt == NULL)
810                 {
811                    rra_step_cnt = (unsigned long *) 
812                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
813                 }
814
815             for(i = 0, rra_start = rra_begin;
816                 i < rrd.stat_head->rra_cnt;
817             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
818                 i++)
819                 {
820                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
821                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
822                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
823         if (start_pdp_offset <= elapsed_pdp_st) {
824            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
825                       rrd.rra_def[i].pdp_cnt + 1;
826             } else {
827                    rra_step_cnt[i] = 0;
828                 }
829
830                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
831                 {
832                    /* If this is a bulk update, we need to skip ahead in the seasonal
833                         * arrays so that they will be correct for the next observed value;
834                         * note that for the bulk update itself, no update will occur to
835                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
836                         * be set to DNAN. */
837            if (rra_step_cnt[i] > 2) 
838                    {
839                           /* skip update by resetting rra_step_cnt[i],
840                            * note that this is not data source specific; this is due
841                            * to the bulk update, not a DNAN value for the specific data
842                            * source. */
843                           rra_step_cnt[i] = 0;
844               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
845                              &last_seasonal_coef);
846                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
847                              &seasonal_coef);
848                    }
849                 
850                   /* periodically run a smoother for seasonal effects */
851                   /* Need to use first cdp parameter buffer to track
852                    * burnin (burnin requires a specific smoothing schedule).
853                    * The CDP_init_seasonal parameter is really an RRA level,
854                    * not a data source within RRA level parameter, but the rra_def
855                    * is read only for rrd_update (not flushed to disk). */
856                   iii = i*(rrd.stat_head -> ds_cnt);
857                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
858                           <= BURNIN_CYCLES)
859                   {
860                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
861                                  > rrd.rra_def[i].row_cnt - 1) {
862                            /* mark off one of the burnin cycles */
863                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
864                        schedule_smooth = 1;
865                          }  
866                   } else {
867                          /* someone has no doubt invented a trick to deal with this
868                           * wrap around, but at least this code is clear. */
869                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
870                              rrd.rra_ptr[i].cur_row)
871                          {
872                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
873                                   * mapping between PDP and CDP */
874                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
875                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
876                                  {
877 #ifdef DEBUG
878                                         fprintf(stderr,
879                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
880                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
881                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
882 #endif
883                                         schedule_smooth = 1;
884                                  }
885              } else {
886                                  /* can't rely on negative numbers because we are working with
887                                   * unsigned values */
888                                  /* Don't need modulus here. If we've wrapped more than once, only
889                                   * one smooth is executed at the end. */
890                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
891                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
892                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
893                                  {
894 #ifdef DEBUG
895                                         fprintf(stderr,
896                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
897                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
898                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
899 #endif
900                                         schedule_smooth = 1;
901                                  }
902                          }
903                   }
904
905               rra_current = ftell(rrd_file); 
906                 } /* if cf is DEVSEASONAL or SEASONAL */
907
908         if (rrd_test_error()) break;
909
910                     /* update CDP_PREP areas */
911                     /* loop over data soures within each RRA */
912                     for(ii = 0;
913                         ii < rrd.stat_head->ds_cnt;
914                         ii++)
915                         {
916                         
917                         /* iii indexes the CDP prep area for this data source within the RRA */
918                         iii=i*rrd.stat_head->ds_cnt+ii;
919
920                         if (rrd.rra_def[i].pdp_cnt > 1) {
921                           
922                            if (rra_step_cnt[i] > 0) {
923                            /* If we are in this block, as least 1 CDP value will be written to
924                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
925                                 * to be written, then the "fill in" value is the CDP_secondary_val
926                                 * entry. */
927                                   if (isnan(pdp_temp[ii]))
928                   {
929                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
930                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
931                                   } else {
932                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
933                                           * CDP data entries. No matter the CF, the value is the same because
934                                           * the average, max, min, and last of a list of identical values is
935                                           * the same, namely, the value itself. */
936                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
937                                   }
938                      
939                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
940                                       > rrd.rra_def[i].pdp_cnt*
941                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
942                                   {
943                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
944                                          /* initialize carry over */
945                                          if (current_cf == CF_AVERAGE) {
946                                                    if (isnan(pdp_temp[ii])) { 
947                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
948                                                    } else {
949                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
950                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
951                                                    }
952                                          } else {
953                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
954                                          }
955                                   } else {
956                                          rrd_value_t cum_val, cur_val; 
957                                      switch (current_cf) {
958                                                 case CF_AVERAGE:
959                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
960                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
961                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
962                                                (cum_val + cur_val * start_pdp_offset) /
963                                            (rrd.rra_def[i].pdp_cnt
964                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
965                                                    /* initialize carry over value */
966                                                    if (isnan(pdp_temp[ii])) { 
967                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
968                                                    } else {
969                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
970                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
971                                                    }
972                                                    break;
973                                                 case CF_MAXIMUM:
974                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
975                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
976 #ifdef DEBUG
977                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
978                                                           isnan(pdp_temp[ii])) {
979                                                      fprintf(stderr,
980                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
981                                                                 i,ii);
982                                                          exit(-1);
983                                                   }
984 #endif
985                                                   if (cur_val > cum_val)
986                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
987                                                   else
988                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
989                                                   /* initialize carry over value */
990                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
991                                                   break;
992                                                 case CF_MINIMUM:
993                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
994                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
995 #ifdef DEBUG
996                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
997                                                           isnan(pdp_temp[ii])) {
998                                                      fprintf(stderr,
999                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1000                                                                 i,ii);
1001                                                          exit(-1);
1002                                                   }
1003 #endif
1004                                                   if (cur_val < cum_val)
1005                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1006                                                   else
1007                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1008                                                   /* initialize carry over value */
1009                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1010                                                   break;
1011                                                 case CF_LAST:
1012                                                 default:
1013                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1014                                                    /* initialize carry over value */
1015                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1016                                                 break;
1017                                          }
1018                                   } /* endif meets xff value requirement for a valid value */
1019                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1020                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1021                                   if (isnan(pdp_temp[ii]))
1022                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1023                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1024                                   else
1025                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1026                } else  /* rra_step_cnt[i]  == 0 */
1027                            {
1028 #ifdef DEBUG
1029                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1030                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1031                                          i,ii);
1032                                   } else {
1033                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1034                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1035                                   }
1036 #endif
1037                                   if (isnan(pdp_temp[ii])) {
1038                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1039                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1040                                   {
1041                                          if (current_cf == CF_AVERAGE) {
1042                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1043                                                    elapsed_pdp_st;
1044                                          } else {
1045                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1046                                          }
1047 #ifdef DEBUG
1048                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1049                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1050 #endif
1051                                   } else {
1052                                          switch (current_cf) {
1053                                          case CF_AVERAGE:
1054                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1055                                                    elapsed_pdp_st;
1056                                                 break;
1057                                          case CF_MINIMUM:
1058                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1059                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1060                                                 break; 
1061                                          case CF_MAXIMUM:
1062                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1063                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1064                                                 break; 
1065                                          case CF_LAST:
1066                                          default:
1067                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068                                                 break;
1069                                          }
1070                                   }
1071                            }
1072                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1073                            if (elapsed_pdp_st > 2)
1074                            {
1075                                    switch (current_cf) {
1076                                    case CF_AVERAGE:
1077                                    default:
1078                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1079                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1080                                           break;
1081                    case CF_SEASONAL:
1082                                    case CF_DEVSEASONAL:
1083                                           /* need to update cached seasonal values, so they are consistent
1084                                            * with the bulk update */
1085                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1086                                            * CDP_last_deviation are the same. */
1087                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1088                                                  last_seasonal_coef[ii];
1089                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1090                                                  seasonal_coef[ii];
1091                                           break;
1092                    case CF_HWPREDICT:
1093                                           /* need to update the null_count and last_null_count.
1094                                            * even do this for non-DNAN pdp_temp because the
1095                                            * algorithm is not learning from batch updates. */
1096                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1097                                                  elapsed_pdp_st;
1098                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1099                                                  elapsed_pdp_st - 1;
1100                                           /* fall through */
1101                                    case CF_DEVPREDICT:
1102                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1103                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1104                                           break;
1105                    case CF_FAILURES:
1106                                           /* do not count missed bulk values as failures */
1107                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1108                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1109                                           /* need to reset violations buffer.
1110                                            * could do this more carefully, but for now, just
1111                                            * assume a bulk update wipes away all violations. */
1112                       erase_violations(&rrd, iii, i);
1113                                           break;
1114                                    }
1115                            } 
1116                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1117
1118                         if (rrd_test_error()) break;
1119
1120                         } /* endif data sources loop */
1121         } /* end RRA Loop */
1122
1123                 /* this loop is only entered if elapsed_pdp_st < 3 */
1124                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1125                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1126                 {
1127                for(i = 0, rra_start = rra_begin;
1128                    i < rrd.stat_head->rra_cnt;
1129                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1130                    i++)
1131                    {
1132                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1133
1134                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1135                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1136                           {
1137                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1138                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1139                                 &seasonal_coef);
1140                  rra_current = ftell(rrd_file);
1141                           }
1142                           if (rrd_test_error()) break;
1143                       /* loop over data soures within each RRA */
1144                       for(ii = 0;
1145                           ii < rrd.stat_head->ds_cnt;
1146                           ii++)
1147                           {
1148                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1149                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1150                                     scratch_idx, seasonal_coef);
1151                           }
1152            } /* end RRA Loop */
1153                    if (rrd_test_error()) break;
1154             } /* end elapsed_pdp_st loop */
1155
1156                 if (rrd_test_error()) break;
1157
1158                 /* Ready to write to disk */
1159                 /* Move sequentially through the file, writing one RRA at a time.
1160                  * Note this architecture divorces the computation of CDP with
1161                  * flushing updated RRA entries to disk. */
1162             for(i = 0, rra_start = rra_begin;
1163                 i < rrd.stat_head->rra_cnt;
1164             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1165                 i++) {
1166                 /* is there anything to write for this RRA? If not, continue. */
1167         if (rra_step_cnt[i] == 0) continue;
1168
1169                 /* write the first row */
1170 #ifdef DEBUG
1171         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1172 #endif
1173             rrd.rra_ptr[i].cur_row++;
1174             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1175                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1176                 /* positition on the first row */
1177                 rra_pos_tmp = rra_start +
1178                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1179                 if(rra_pos_tmp != rra_current) {
1180 #ifndef HAVE_MMAP
1181                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1182                       rrd_set_error("seek error in rrd");
1183                       break;
1184                    }
1185 #endif
1186                    rra_current = rra_pos_tmp;
1187                 }
1188
1189 #ifdef DEBUG
1190             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1191 #endif
1192                 scratch_idx = CDP_primary_val;
1193                 if (pcdp_summary != NULL)
1194                 {
1195                    rra_time = (current_time - current_time 
1196                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1197                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1198                 }
1199 #ifdef HAVE_MMAP
1200                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1201                    pcdp_summary, &rra_time, rrd_mmaped_file);
1202 #else
1203                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1204                    pcdp_summary, &rra_time);
1205 #endif
1206                 if (rrd_test_error()) break;
1207
1208                 /* write other rows of the bulk update, if any */
1209                 scratch_idx = CDP_secondary_val;
1210                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1211                 {
1212                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1213                    {
1214 #ifdef DEBUG
1215               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1216                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1217 #endif
1218                           /* wrap */
1219                           rrd.rra_ptr[i].cur_row = 0;
1220                           /* seek back to beginning of current rra */
1221                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1222                           {
1223                          rrd_set_error("seek error in rrd");
1224                          break;
1225                           }
1226 #ifdef DEBUG
1227                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1228 #endif
1229                           rra_current = rra_start;
1230                    }
1231                    if (pcdp_summary != NULL)
1232                    {
1233                       rra_time = (current_time - current_time 
1234                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1235                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1236                    }
1237 #ifdef HAVE_MMAP
1238                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1239                       pcdp_summary, &rra_time, rrd_mmaped_file);
1240 #else
1241                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1242                       pcdp_summary, &rra_time);
1243 #endif
1244                 }
1245                 
1246                 if (rrd_test_error())
1247                   break;
1248                 } /* RRA LOOP */
1249
1250             /* break out of the argument parsing loop if error_string is set */
1251             if (rrd_test_error()){
1252                    free(step_start);
1253                    break;
1254             } 
1255             
1256         } /* endif a pdp_st has occurred */ 
1257         rrd.live_head->last_up = current_time;
1258         rrd.live_head->last_up_usec = current_time_usec; 
1259         free(step_start);
1260     } /* function argument loop */
1261
1262     if (seasonal_coef != NULL) free(seasonal_coef);
1263     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1264         if (rra_step_cnt != NULL) free(rra_step_cnt);
1265     rpnstack_free(&rpnstack);
1266
1267 #ifdef HAVE_MMAP
1268     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1269             rrd_set_error("error writing(unmapping) file: %s", filename);
1270     }
1271 #endif    
1272     /* if we got here and if there is an error and if the file has not been
1273      * written to, then close things up and return. */
1274     if (rrd_test_error()) {
1275         free(updvals);
1276         free(tmpl_idx);
1277         rrd_free(&rrd);
1278         free(pdp_temp);
1279         free(pdp_new);
1280         fclose(rrd_file);
1281         return(-1);
1282     }
1283
1284     /* aargh ... that was tough ... so many loops ... anyway, its done.
1285      * we just need to write back the live header portion now*/
1286
1287     if (fseek(rrd_file, (sizeof(stat_head_t)
1288                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1289                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1290               SEEK_SET) != 0) {
1291         rrd_set_error("seek rrd for live header writeback");
1292         free(updvals);
1293         free(tmpl_idx);
1294         rrd_free(&rrd);
1295         free(pdp_temp);
1296         free(pdp_new);
1297         fclose(rrd_file);
1298         return(-1);
1299     }
1300
1301     if(version >= 3) {
1302             if(fwrite( rrd.live_head,
1303                        sizeof(live_head_t), 1, rrd_file) != 1){
1304                 rrd_set_error("fwrite live_head to rrd");
1305                 free(updvals);
1306                 rrd_free(&rrd);
1307                 free(tmpl_idx);
1308                 free(pdp_temp);
1309                 free(pdp_new);
1310                 fclose(rrd_file);
1311                 return(-1);
1312             }
1313     }
1314     else {
1315             if(fwrite( &rrd.live_head->last_up,
1316                        sizeof(time_t), 1, rrd_file) != 1){
1317                 rrd_set_error("fwrite live_head to rrd");
1318                 free(updvals);
1319                 rrd_free(&rrd);
1320                 free(tmpl_idx);
1321                 free(pdp_temp);
1322                 free(pdp_new);
1323                 fclose(rrd_file);
1324                 return(-1);
1325             }
1326     }
1327             
1328
1329     if(fwrite( rrd.pdp_prep,
1330                sizeof(pdp_prep_t),
1331                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1332         rrd_set_error("ftwrite pdp_prep to rrd");
1333         free(updvals);
1334         rrd_free(&rrd);
1335         free(tmpl_idx);
1336         free(pdp_temp);
1337         free(pdp_new);
1338         fclose(rrd_file);
1339         return(-1);
1340     }
1341
1342     if(fwrite( rrd.cdp_prep,
1343                sizeof(cdp_prep_t),
1344                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1345        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1346
1347         rrd_set_error("ftwrite cdp_prep to rrd");
1348         free(updvals);
1349         free(tmpl_idx);
1350         rrd_free(&rrd);
1351         free(pdp_temp);
1352         free(pdp_new);
1353         fclose(rrd_file);
1354         return(-1);
1355     }
1356
1357     if(fwrite( rrd.rra_ptr,
1358                sizeof(rra_ptr_t), 
1359                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1360         rrd_set_error("fwrite rra_ptr to rrd");
1361         free(updvals);
1362         free(tmpl_idx);
1363         rrd_free(&rrd);
1364         free(pdp_temp);
1365         free(pdp_new);
1366         fclose(rrd_file);
1367         return(-1);
1368     }
1369
1370     /* OK now close the files and free the memory */
1371     if(fclose(rrd_file) != 0){
1372         rrd_set_error("closing rrd");
1373         free(updvals);
1374         free(tmpl_idx);
1375         rrd_free(&rrd);
1376         free(pdp_temp);
1377         free(pdp_new);
1378         return(-1);
1379     }
1380
1381     /* calling the smoothing code here guarantees at most
1382          * one smoothing operation per rrd_update call. Unfortunately,
1383          * it is possible with bulk updates, or a long-delayed update
1384          * for smoothing to occur off-schedule. This really isn't
1385          * critical except during the burning cycles. */
1386         if (schedule_smooth)
1387         {
1388           rrd_file = fopen(filename,"rb+");
1389           rra_start = rra_begin;
1390           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1391           {
1392             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1393                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1394             {
1395 #ifdef DEBUG
1396               fprintf(stderr,"Running smoother for rra %ld\n",i);
1397 #endif
1398               apply_smoother(&rrd,i,rra_start,rrd_file);
1399               if (rrd_test_error())
1400                 break;
1401             }
1402             rra_start += rrd.rra_def[i].row_cnt
1403               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1404           }
1405           fclose(rrd_file);
1406         }
1407     rrd_free(&rrd);
1408     free(updvals);
1409     free(tmpl_idx);
1410     free(pdp_new);
1411     free(pdp_temp);
1412     return(0);
1413 }
1414
1415 /*
1416  * get exclusive lock to whole file.
1417  * lock gets removed when we close the file
1418  *
1419  * returns 0 on success
1420  */
1421 int
1422 LockRRD(FILE *rrdfile)
1423 {
1424     int rrd_fd;         /* File descriptor for RRD */
1425     int rcstat;
1426
1427     rrd_fd = fileno(rrdfile);
1428
1429         {
1430 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1431     struct _stat st;
1432
1433     if ( _fstat( rrd_fd, &st ) == 0 ) {
1434             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1435     } else {
1436             rcstat = -1;
1437     }
1438 #else
1439     struct flock        lock;
1440     lock.l_type = F_WRLCK;    /* exclusive write lock */
1441     lock.l_len = 0;           /* whole file */
1442     lock.l_start = 0;         /* start of file */
1443     lock.l_whence = SEEK_SET;   /* end of file */
1444
1445     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1446 #endif
1447         }
1448
1449     return(rcstat);
1450 }
1451
1452
1453 #ifdef HAVE_MMAP
1454 info_t
1455 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1456                unsigned short CDP_scratch_idx, FILE *rrd_file,
1457                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1458 #else
1459 info_t
1460 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1461                unsigned short CDP_scratch_idx, FILE *rrd_file,
1462                    info_t *pcdp_summary, time_t *rra_time)
1463 #endif
1464 {
1465    unsigned long ds_idx, cdp_idx;
1466    infoval iv;
1467   
1468    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1469    {
1470       /* compute the cdp index */
1471       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1472 #ifdef DEBUG
1473           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1474              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1475              rrd -> rra_def[rra_idx].cf_nam);
1476 #endif 
1477       if (pcdp_summary != NULL)
1478           {
1479              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1480              /* append info to the return hash */
1481                  pcdp_summary = info_push(pcdp_summary,
1482                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1483                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1484                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1485          RD_I_VAL, iv);
1486           }
1487 #ifdef HAVE_MMAP
1488           memcpy((char *)rrd_mmaped_file + *rra_current,
1489                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1490                           sizeof(rrd_value_t));
1491 #else
1492           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1493                  sizeof(rrd_value_t),1,rrd_file) != 1)
1494           { 
1495              rrd_set_error("writing rrd");
1496              return 0;
1497           }
1498 #endif
1499           *rra_current += sizeof(rrd_value_t);
1500         }
1501         return (pcdp_summary);
1502 }